From aa65eb06d1d19d0077a0fbd2d02208f86d9ed853 Mon Sep 17 00:00:00 2001 From: Snuffy2 Date: Sat, 28 Dec 2024 17:28:08 -0500 Subject: [PATCH] Update code from linters --- README.md | 2 +- custom_components/places/__init__.py | 16 +- custom_components/places/config_flow.py | 152 +-- custom_components/places/const.py | 36 +- custom_components/places/sensor.py | 1573 +++++++++++------------ icon/map-search-outline.svg | 2 +- 6 files changed, 855 insertions(+), 926 deletions(-) diff --git a/README.md b/README.md index 48a368fe..b558ed9e 100644 --- a/README.md +++ b/README.md @@ -75,7 +75,7 @@ Key | Required | Default | Description | `Language` | `No` |location's local language | Requested\* language(s) for state and attributes. Two-Letter language code(s), separated by commas.
\*Refer to [Notes](#notes) `Extended Attributes` | `No` | `False` | Show extended attributes: wikidata_id, osm_dict, osm_details_dict, wikidata_dict *(if they exist)*. Provides many additional attributes for advanced logic. **Warning, this will make the attributes very long!** `Show Last Updated` | `No` | `False` | Show last updated time at end of state `(since xx:yy)` -`Use GPS Accuracy` | `No` | `True` | Use GPS Accuracy when determining whether to update the places sensor (if 0, don't update the places sensor). By not updaing when GPS Accuracy is 0, should prevent inaccurate locations from being set in the places sensors.

**Set this to `False` if your Device Tracker has a GPS Accuracy (`gps_accuracy`) attribute, but it always shows 0 even if the latitude and longitude are correct.** +`Use GPS Accuracy` | `No` | `True` | Use GPS Accuracy when determining whether to update the places sensor (if 0, don't update the places sensor). By not updating when GPS Accuracy is 0, should prevent inaccurate locations from being set in the places sensors.

**Set this to `False` if your Device Tracker has a GPS Accuracy (`gps_accuracy`) attribute, but it always shows 0 even if the latitude and longitude are correct.**

Advanced Display Options

diff --git a/custom_components/places/__init__.py b/custom_components/places/__init__.py index 355a7801..11e82170 100644 --- a/custom_components/places/__init__.py +++ b/custom_components/places/__init__.py @@ -1,3 +1,5 @@ +"""Initialize Home Assistant places integration.""" + import logging from homeassistant import config_entries, core @@ -6,14 +8,12 @@ from .const import DOMAIN -_LOGGER = logging.getLogger(__name__) +_LOGGER: logging.Logger = logging.getLogger(__name__) PLATFORMS: list[str] = [Platform.SENSOR] CONFIG_SCHEMA = cv.empty_config_schema(DOMAIN) -async def async_setup_entry( - hass: core.HomeAssistant, entry: config_entries.ConfigEntry -) -> bool: +async def async_setup_entry(hass: core.HomeAssistant, entry: config_entries.ConfigEntry) -> bool: """Set up from a config entry.""" # _LOGGER.debug(f"[init async_setup_entry] entry: {entry.data}") @@ -26,15 +26,13 @@ async def async_setup_entry( return True -async def async_unload_entry( - hass: core.HomeAssistant, entry: config_entries.ConfigEntry -) -> bool: +async def async_unload_entry(hass: core.HomeAssistant, entry: config_entries.ConfigEntry) -> bool: """Unload a config entry.""" # This is called when an entry/configured device is to be removed. The class # needs to unload itself, and remove callbacks. See the classes for further # details - _LOGGER.info(f"Unloading: {entry.data}") - unload_ok = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) + _LOGGER.info("Unloading: %s", entry.data) + unload_ok: bool = await hass.config_entries.async_unload_platforms(entry, PLATFORMS) if unload_ok: hass.data[DOMAIN].pop(entry.entry_id) diff --git a/custom_components/places/config_flow.py b/custom_components/places/config_flow.py index 40239877..b1bb937f 100644 --- a/custom_components/places/config_flow.py +++ b/custom_components/places/config_flow.py @@ -1,9 +1,12 @@ +"""Config Flow for places integration.""" + from __future__ import annotations import logging from typing import Any import voluptuous as vol + from homeassistant import config_entries, core from homeassistant.const import ( ATTR_FRIENDLY_NAME, @@ -40,24 +43,20 @@ TRACKING_DOMAINS_NEED_LATLONG, ) -_LOGGER = logging.getLogger(__name__) +_LOGGER: logging.Logger = logging.getLogger(__name__) MAP_PROVIDER_OPTIONS = ["apple", "google", "osm"] STATE_OPTIONS = ["zone, place", "formatted_place", "zone_name, place"] DATE_FORMAT_OPTIONS = ["mm/dd", "dd/mm"] MAP_ZOOM_MIN = 1 MAP_ZOOM_MAX = 20 -COMPONENT_CONFIG_URL = ( - "https://github.com/custom-components/places#configuration-options" -) +COMPONENT_CONFIG_URL = "https://github.com/custom-components/places#configuration-options" # Note the input displayed to the user will be translated. See the # translations/.json file and strings.json. See here for further information: # https://developers.home-assistant.io/docs/config_entries_config_flow_handler/#translations -def get_devicetracker_id_entities( - hass: core.HomeAssistant, current_entity=None -) -> list[str]: +def get_devicetracker_id_entities(hass: core.HomeAssistant, current_entity=None) -> list[str]: """Get the list of valid entities. For sensors, only include ones with latitude and longitude attributes.""" dt_list = [] for dom in TRACKING_DOMAINS: @@ -68,28 +67,24 @@ def get_devicetracker_id_entities( and CONF_LONGITUDE in hass.states.get(ent.entity_id).attributes ): # _LOGGER.debug(f"Entity: {ent}") - dt_list.append( - selector.SelectOptionDict( - value=str(ent.entity_id), - label=f"{ent.attributes.get(ATTR_FRIENDLY_NAME)} ({ent.entity_id})", - ) + dt_list.extend( + [ + selector.SelectOptionDict( + value=str(ent.entity_id), + label=f"{ent.attributes.get(ATTR_FRIENDLY_NAME)} ({ent.entity_id})", + ) + ] ) # Optional: Include the current entity in the list as well. if current_entity is not None: # _LOGGER.debug(f"current_entity: {current_entity}") dt_list_entities = [d["value"] for d in dt_list] - if ( - current_entity not in dt_list_entities - and hass.states.get(current_entity) is not None - ): + if current_entity not in dt_list_entities and hass.states.get(current_entity) is not None: if ( ATTR_FRIENDLY_NAME in hass.states.get(current_entity).attributes - and hass.states.get(current_entity).attributes.get(ATTR_FRIENDLY_NAME) - is not None + and hass.states.get(current_entity).attributes.get(ATTR_FRIENDLY_NAME) is not None ): - current_name = hass.states.get(current_entity).attributes.get( - ATTR_FRIENDLY_NAME - ) + current_name = hass.states.get(current_entity).attributes.get(ATTR_FRIENDLY_NAME) # _LOGGER.debug(f"current_name: {current_name}") dt_list.append( selector.SelectOptionDict( @@ -120,11 +115,13 @@ def get_home_zone_entities(hass: core.HomeAssistant) -> list[str]: # _LOGGER.debug(f"Geting entities for domain: {dom}") for ent in hass.states.async_all(dom): # _LOGGER.debug(f"Entity: {ent}") - zone_list.append( - selector.SelectOptionDict( - value=str(ent.entity_id), - label=f"{ent.attributes.get(ATTR_FRIENDLY_NAME)} ({ent.entity_id})", - ) + zone_list.extend( + [ + selector.SelectOptionDict( + value=str(ent.entity_id), + label=f"{ent.attributes.get(ATTR_FRIENDLY_NAME)} ({ent.entity_id})", + ) + ] ) if zone_list: zone_list_sorted = sorted(zone_list, key=lambda d: d["label"].casefold()) @@ -146,6 +143,8 @@ async def validate_input(hass: core.HomeAssistant, data: dict) -> dict[str, Any] class PlacesConfigFlow(config_entries.ConfigFlow, domain=DOMAIN): + """Config Flow for places integration.""" + VERSION = 1 # Connection classes in homeassistant/config_entries.py are now deprecated @@ -162,12 +161,10 @@ async def async_step_user(self, user_input=None) -> FlowResult: try: info = await validate_input(self.hass, user_input) # _LOGGER.debug(f"[New Sensor] info: {info}") - _LOGGER.debug(f"[New Sensor] user_input: {user_input}") + _LOGGER.debug("[New Sensor] user_input: %s", user_input) return self.async_create_entry(title=info["title"], data=user_input) - except Exception as err: - _LOGGER.exception( - f"[config_flow async_step_user] Unexpected exception: {err}" - ) + except Exception as err: # noqa: BLE001 + _LOGGER.error("[config_flow async_step_user] Unexpected exception: %s", err) errors["base"] = "unknown" devicetracker_id_list = get_devicetracker_id_entities(self.hass) zone_list = get_home_zone_entities(self.hass) @@ -194,9 +191,7 @@ async def async_step_user(self, user_input=None) -> FlowResult: mode=selector.SelectSelectorMode.DROPDOWN, ) ), - vol.Optional( - CONF_HOME_ZONE, default=DEFAULT_HOME_ZONE - ): selector.SelectSelector( + vol.Optional(CONF_HOME_ZONE, default=DEFAULT_HOME_ZONE): selector.SelectSelector( selector.SelectSelectorConfig( options=zone_list, multiple=False, @@ -214,9 +209,7 @@ async def async_step_user(self, user_input=None) -> FlowResult: mode=selector.SelectSelectorMode.DROPDOWN, ) ), - vol.Optional( - CONF_MAP_ZOOM, default=int(DEFAULT_MAP_ZOOM) - ): selector.NumberSelector( + vol.Optional(CONF_MAP_ZOOM, default=int(DEFAULT_MAP_ZOOM)): selector.NumberSelector( selector.NumberSelectorConfig( min=MAP_ZOOM_MIN, max=MAP_ZOOM_MAX, @@ -227,9 +220,9 @@ async def async_step_user(self, user_input=None) -> FlowResult: vol.Optional( CONF_EXTENDED_ATTR, default=DEFAULT_EXTENDED_ATTR ): selector.BooleanSelector(selector.BooleanSelectorConfig()), - vol.Optional( - CONF_SHOW_TIME, default=DEFAULT_SHOW_TIME - ): selector.BooleanSelector(selector.BooleanSelectorConfig()), + vol.Optional(CONF_SHOW_TIME, default=DEFAULT_SHOW_TIME): selector.BooleanSelector( + selector.BooleanSelectorConfig() + ), vol.Optional( CONF_DATE_FORMAT, default=DEFAULT_DATE_FORMAT ): selector.SelectSelector( @@ -240,9 +233,9 @@ async def async_step_user(self, user_input=None) -> FlowResult: mode=selector.SelectSelectorMode.DROPDOWN, ) ), - vol.Optional( - CONF_USE_GPS, default=DEFAULT_USE_GPS - ): selector.BooleanSelector(selector.BooleanSelectorConfig()), + vol.Optional(CONF_USE_GPS, default=DEFAULT_USE_GPS): selector.BooleanSelector( + selector.BooleanSelectorConfig() + ), } ) # If there is no user input or there were errors, show the form again, including any errors that were found with the input. @@ -276,10 +269,10 @@ async def async_step_init(self, user_input=None): if user_input is not None: # _LOGGER.debug(f"[options_flow async_step_init] user_input initial: {user_input}") # Bring in other keys not in the Options Flow - for m in dict(self.config_entry.data).keys(): + for m in dict(self.config_entry.data): user_input.setdefault(m, self.config_entry.data[m]) # Remove any keys with blank values - for m in dict(user_input).keys(): + for m in dict(user_input): # _LOGGER.debug(f"[Options Update] {m} [{type(user_input.get(m))}]: {user_input.get(m)}") if isinstance(user_input.get(m), str) and not user_input.get(m): user_input.pop(m) @@ -293,22 +286,15 @@ async def async_step_init(self, user_input=None): # Include the current entity in the list as well. Although it may still fail in validation checking. devicetracker_id_list = get_devicetracker_id_entities( - self.hass, - self.config_entry.data[CONF_DEVICETRACKER_ID] - if CONF_DEVICETRACKER_ID in self.config_entry.data - else None, + self.hass, self.config_entry.data.get(CONF_DEVICETRACKER_ID, None) ) - zone_list = get_home_zone_entities(self.hass) + zone_list: list[str] = get_home_zone_entities(self.hass) # _LOGGER.debug(f"Trackable entities including sensors with lat/long: {devicetracker_id_list}") OPTIONS_SCHEMA = vol.Schema( { vol.Required( CONF_DEVICETRACKER_ID, - default=( - self.config_entry.data[CONF_DEVICETRACKER_ID] - if CONF_DEVICETRACKER_ID in self.config_entry.data - else None - ), + default=(self.config_entry.data.get(CONF_DEVICETRACKER_ID, None)), ): selector.SelectSelector( selector.SelectSelectorConfig( options=devicetracker_id_list, @@ -320,19 +306,15 @@ async def async_step_init(self, user_input=None): vol.Optional( CONF_API_KEY, default="", - description={ - "suggested_value": self.config_entry.data[CONF_API_KEY] - if CONF_API_KEY in self.config_entry.data - else None - }, + description={"suggested_value": self.config_entry.data.get(CONF_API_KEY, None)}, ): str, vol.Optional( CONF_DISPLAY_OPTIONS, default=DEFAULT_DISPLAY_OPTIONS, description={ - "suggested_value": self.config_entry.data[CONF_DISPLAY_OPTIONS] - if CONF_DISPLAY_OPTIONS in self.config_entry.data - else DEFAULT_DISPLAY_OPTIONS + "suggested_value": self.config_entry.data.get( + CONF_DISPLAY_OPTIONS, DEFAULT_DISPLAY_OPTIONS + ) }, ): selector.SelectSelector( selector.SelectSelectorConfig( @@ -346,9 +328,7 @@ async def async_step_init(self, user_input=None): CONF_HOME_ZONE, default="", description={ - "suggested_value": self.config_entry.data[CONF_HOME_ZONE] - if CONF_HOME_ZONE in self.config_entry.data - else None + "suggested_value": self.config_entry.data.get(CONF_HOME_ZONE, None) }, ): selector.SelectSelector( selector.SelectSelectorConfig( @@ -362,9 +342,9 @@ async def async_step_init(self, user_input=None): CONF_MAP_PROVIDER, default=DEFAULT_MAP_PROVIDER, description={ - "suggested_value": self.config_entry.data[CONF_MAP_PROVIDER] - if CONF_MAP_PROVIDER in self.config_entry.data - else DEFAULT_MAP_PROVIDER + "suggested_value": self.config_entry.data.get( + CONF_MAP_PROVIDER, DEFAULT_MAP_PROVIDER + ) }, ): selector.SelectSelector( selector.SelectSelectorConfig( @@ -378,9 +358,9 @@ async def async_step_init(self, user_input=None): CONF_MAP_ZOOM, default=DEFAULT_MAP_ZOOM, description={ - "suggested_value": self.config_entry.data[CONF_MAP_ZOOM] - if CONF_MAP_ZOOM in self.config_entry.data - else DEFAULT_MAP_ZOOM + "suggested_value": self.config_entry.data.get( + CONF_MAP_ZOOM, DEFAULT_MAP_ZOOM + ) }, ): selector.NumberSelector( selector.NumberSelectorConfig( @@ -393,34 +373,24 @@ async def async_step_init(self, user_input=None): CONF_LANGUAGE, default="", description={ - "suggested_value": self.config_entry.data[CONF_LANGUAGE] - if CONF_LANGUAGE in self.config_entry.data - else None + "suggested_value": self.config_entry.data.get(CONF_LANGUAGE, None) }, ): str, vol.Optional( CONF_EXTENDED_ATTR, - default=( - self.config_entry.data[CONF_EXTENDED_ATTR] - if CONF_EXTENDED_ATTR in self.config_entry.data - else DEFAULT_EXTENDED_ATTR - ), + default=(self.config_entry.data.get(CONF_EXTENDED_ATTR, DEFAULT_EXTENDED_ATTR)), ): selector.BooleanSelector(selector.BooleanSelectorConfig()), vol.Optional( CONF_SHOW_TIME, - default=( - self.config_entry.data[CONF_SHOW_TIME] - if CONF_SHOW_TIME in self.config_entry.data - else DEFAULT_SHOW_TIME - ), + default=(self.config_entry.data.get(CONF_SHOW_TIME, DEFAULT_SHOW_TIME)), ): selector.BooleanSelector(selector.BooleanSelectorConfig()), vol.Optional( CONF_DATE_FORMAT, default=DEFAULT_DATE_FORMAT, description={ - "suggested_value": self.config_entry.data[CONF_DATE_FORMAT] - if CONF_DATE_FORMAT in self.config_entry.data - else DEFAULT_DATE_FORMAT + "suggested_value": self.config_entry.data.get( + CONF_DATE_FORMAT, DEFAULT_DATE_FORMAT + ) }, ): selector.SelectSelector( selector.SelectSelectorConfig( @@ -432,11 +402,7 @@ async def async_step_init(self, user_input=None): ), vol.Optional( CONF_USE_GPS, - default=( - self.config_entry.data[CONF_USE_GPS] - if CONF_USE_GPS in self.config_entry.data - else DEFAULT_USE_GPS - ), + default=(self.config_entry.data.get(CONF_USE_GPS, DEFAULT_USE_GPS)), ): selector.BooleanSelector(selector.BooleanSelectorConfig()), } ) diff --git a/custom_components/places/const.py b/custom_components/places/const.py index 3db7914b..ad799db5 100644 --- a/custom_components/places/const.py +++ b/custom_components/places/const.py @@ -1,3 +1,7 @@ +"""Constants for places.""" + +from collections.abc import MutableMapping + from homeassistant.const import ( ATTR_GPS_ACCURACY, CONF_API_KEY, @@ -27,18 +31,18 @@ # Settings -TRACKING_DOMAINS = [ - str(Platform.DEVICE_TRACKER), - str("person"), - str(Platform.SENSOR), +TRACKING_DOMAINS: list[str] = [ + Platform.DEVICE_TRACKER, + "person", + Platform.SENSOR, CONF_ZONE, "variable", ] -TRACKING_DOMAINS_NEED_LATLONG = [ - str(Platform.SENSOR), +TRACKING_DOMAINS_NEED_LATLONG: list[str] = [ + Platform.SENSOR, "variable", ] -HOME_LOCATION_DOMAINS = [CONF_ZONE] +HOME_LOCATION_DOMAINS: list[str] = [CONF_ZONE] # Config CONF_DEVICETRACKER_ID = "devicetracker_id" @@ -115,7 +119,7 @@ # Attribute Lists -CONFIG_ATTRIBUTES_LIST = [ +CONFIG_ATTRIBUTES_LIST: list[str] = [ CONF_API_KEY, CONF_DEVICETRACKER_ID, CONF_EXTENDED_ATTR, @@ -131,7 +135,7 @@ CONF_USE_GPS, CONF_UNIQUE_ID, ] -RESET_ATTRIBUTE_LIST = [ +RESET_ATTRIBUTE_LIST: list[str] = [ ATTR_CITY, ATTR_CITY_CLEAN, ATTR_COUNTRY, @@ -160,7 +164,7 @@ ATTR_WIKIDATA_DICT, ATTR_WIKIDATA_ID, ] -EXTRA_STATE_ATTRIBUTE_LIST = [ +EXTRA_STATE_ATTRIBUTE_LIST: list[str] = [ ATTR_PLACE_NAME, ATTR_STREET_NUMBER, ATTR_STREET, @@ -203,7 +207,7 @@ ATTR_LAST_CHANGED, ATTR_LAST_UPDATED, ] -JSON_IGNORE_ATTRIBUTE_LIST = [ +JSON_IGNORE_ATTRIBUTE_LIST: list[str] = [ ATTR_ATTRIBUTES, ATTR_DEVICETRACKER_ID, ATTR_DISPLAY_OPTIONS, @@ -218,7 +222,7 @@ ATTR_LOCATION_PREVIOUS, ATTR_PREVIOUS_STATE, ] -JSON_ATTRIBUTE_LIST = [ +JSON_ATTRIBUTE_LIST: list[str] = [ ATTR_CITY, ATTR_CITY_CLEAN, ATTR_COUNTRY, @@ -264,7 +268,7 @@ ATTR_WIKIDATA_ID, ATTR_SHOW_DATE, ] -EVENT_ATTRIBUTE_LIST = [ +EVENT_ATTRIBUTE_LIST: list[str] = [ ATTR_PLACE_NAME, ATTR_LAST_CHANGED, ATTR_LAST_PLACE_NAME, @@ -284,13 +288,13 @@ ATTR_OSM_ID, ATTR_OSM_TYPE, ] -EXTENDED_ATTRIBUTE_LIST = [ +EXTENDED_ATTRIBUTE_LIST: list[str] = [ ATTR_WIKIDATA_ID, ATTR_OSM_DICT, ATTR_OSM_DETAILS_DICT, ATTR_WIKIDATA_DICT, ] -PLACE_NAME_DUPLICATE_LIST = [ +PLACE_NAME_DUPLICATE_LIST: list[str] = [ ATTR_STREET, ATTR_STREET_REF, ATTR_PLACE_NEIGHBOURHOOD, @@ -306,7 +310,7 @@ ATTR_DEVICETRACKER_ZONE_NAME, ] -DISPLAY_OPTIONS_MAP = { +DISPLAY_OPTIONS_MAP: MutableMapping[str, str] = { "driving": ATTR_DRIVING, "place_name": ATTR_PLACE_NAME, "name": ATTR_PLACE_NAME, diff --git a/custom_components/places/sensor.py b/custom_components/places/sensor.py index c6b17591..f7f2da8d 100644 --- a/custom_components/places/sensor.py +++ b/custom_components/places/sensor.py @@ -1,5 +1,4 @@ -""" -Place Support for OpenStreetMap Geocode sensors. +"""Place Support for OpenStreetMap Geocode sensors. Previous Authors: Jim Thompson, Ian Richardson Current Author: Snuffy2 @@ -12,21 +11,25 @@ GitHub: https://github.com/custom-components/places """ +from collections.abc import MutableMapping +import contextlib import copy +from datetime import datetime, timedelta import json import locale import logging -import os +from pathlib import Path import re -from datetime import datetime, timedelta +from typing import Any from zoneinfo import ZoneInfo -import homeassistant.helpers.entity_registry as er import requests -from homeassistant import config_entries, core +from urllib3.exceptions import NewConnectionError + from homeassistant.components.recorder import DATA_INSTANCE as RECORDER_INSTANCE from homeassistant.components.sensor import SensorEntity from homeassistant.components.zone import ATTR_PASSIVE +from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( ATTR_FRIENDLY_NAME, ATTR_GPS_ACCURACY, @@ -42,15 +45,13 @@ STATE_UNAVAILABLE, STATE_UNKNOWN, ) -from homeassistant.core import Event +from homeassistant.core import Event, HomeAssistant, callback from homeassistant.helpers.entity import generate_entity_id -from homeassistant.helpers.event import ( - EventStateChangedData, - async_track_state_change_event, -) +from homeassistant.helpers.entity_platform import AddEntitiesCallback +import homeassistant.helpers.entity_registry as er +from homeassistant.helpers.event import EventStateChangedData, async_track_state_change_event from homeassistant.util import Throttle, slugify from homeassistant.util.location import distance -from urllib3.exceptions import NewConnectionError from .const import ( ATTR_ATTRIBUTES, @@ -145,127 +146,190 @@ VERSION, ) -_LOGGER = logging.getLogger(__name__) +_LOGGER: logging.Logger = logging.getLogger(__name__) THROTTLE_INTERVAL = timedelta(seconds=600) MIN_THROTTLE_INTERVAL = timedelta(seconds=10) SCAN_INTERVAL = timedelta(seconds=30) -PLACES_JSON_FOLDER = "" async def async_setup_entry( - hass: core.HomeAssistant, - config_entry: config_entries.ConfigEntry, - async_add_entities, + hass: HomeAssistant, + config_entry: ConfigEntry, + async_add_entities: AddEntitiesCallback, ) -> None: - """Setup the sensor platform with a config_entry (config_flow).""" - global PLACES_JSON_FOLDER - PLACES_JSON_FOLDER = hass.config.path("custom_components", DOMAIN, "json_sensors") - _LOGGER.debug(f"json_sensors Location: {PLACES_JSON_FOLDER}") - await hass.async_add_executor_job(_create_json_folder) - + """Create places sensor entities.""" # _LOGGER.debug(f"[aync_setup_entity] all entities: {hass.data.get(DOMAIN)}") - config = hass.data.get(DOMAIN).get(config_entry.entry_id) - unique_id = config_entry.entry_id - name = config.get(CONF_NAME) - filename = f"{DOMAIN}-{slugify(unique_id)}.json" - imported_attributes = await hass.async_add_executor_job( - _get_dict_from_json_file, name, filename + config: MutableMapping[str, Any] = dict(config_entry.data) + unique_id: str = config_entry.entry_id + name: str = config[CONF_NAME] + json_folder: str = hass.config.path("custom_components", DOMAIN, "json_sensors") + await hass.async_add_executor_job(_create_json_folder, json_folder) + filename: str = f"{DOMAIN}-{slugify(unique_id)}.json" + imported_attributes: MutableMapping[str, Any] = await hass.async_add_executor_job( + _get_dict_from_json_file, name, filename, json_folder ) # _LOGGER.debug(f"[async_setup_entry] name: {name}") # _LOGGER.debug(f"[async_setup_entry] unique_id: {unique_id}") # _LOGGER.debug(f"[async_setup_entry] config: {config}") if config.get(CONF_EXTENDED_ATTR, DEFAULT_EXTENDED_ATTR): - _LOGGER.debug( - f"({name}) Extended Attr is True. Excluding from Recorder" - ) + _LOGGER.debug("(%s) Extended Attr is True. Excluding from Recorder", name) async_add_entities( - [PlacesNoRecorder(hass, config, config_entry, name, unique_id, imported_attributes)], + [ + PlacesNoRecorder( + hass=hass, + config=config, + config_entry=config_entry, + name=name, + unique_id=unique_id, + imported_attributes=imported_attributes, + ) + ], update_before_add=True, ) else: async_add_entities( - [Places(hass, config, config_entry, name, unique_id, imported_attributes)], + [ + Places( + hass=hass, + config=config, + config_entry=config_entry, + name=name, + unique_id=unique_id, + imported_attributes=imported_attributes, + ) + ], update_before_add=True, ) -def _create_json_folder(): +def _create_json_folder(json_folder: str) -> None: try: - os.makedirs(PLACES_JSON_FOLDER, exist_ok=True) + Path(json_folder).mkdir(parents=True, exist_ok=True) except OSError as e: _LOGGER.warning( - f"OSError creating folder for JSON sensor files: { - e.__class__.__qualname__}: {e}" + "OSError creating folder for JSON sensor files: %s: %s", e.__class__.__qualname__, e ) - except Exception as e: - _LOGGER.warning( - f"Unknown Exception creating folder for JSON sensor files: { - e.__class__.__qualname__}: {e}" - ) - - -def _get_dict_from_json_file(name, filename): - sensor_attributes = {} + # except Exception as e: + # _LOGGER.warning( + # "Unknown Exception creating folder for JSON sensor files: %s: %s", + # e.__class__.__qualname__, + # e, + # ) + + +def _get_dict_from_json_file( + name: str, filename: str, json_folder: str +) -> MutableMapping[str, Any]: + sensor_attributes: MutableMapping[str, Any] = {} try: - with open( - os.path.join(PLACES_JSON_FOLDER, filename), - "r", - ) as jsonfile: + json_file_path: Path = Path(json_folder) / filename + with json_file_path.open() as jsonfile: sensor_attributes = json.load(jsonfile) except OSError as e: _LOGGER.debug( - f"({name}) [Init] No JSON file to import " - f"({filename}): {e.__class__.__qualname__}: {e}" + "(%s) [Init] No JSON file to import (%s): %s: %s", + name, + filename, + e.__class__.__qualname__, + e, ) return {} - except Exception as e: + # except Exception as e: + # _LOGGER.debug( + # "(%s) [Init] Unknown Exception importing JSON file (%s): %s: %s", + # name, + # filename, + # e.__class__.__qualname__, + # e, + # ) + # return {} + return sensor_attributes + + +def _remove_json_file(name: str, filename: str, json_folder: str) -> None: + try: + json_file_path: Path = Path(json_folder) / filename + json_file_path.unlink() + except OSError as e: _LOGGER.debug( - f"({name}) [Init] Unknown Exception importing JSON file " - f"({filename}): {e.__class__.__qualname__}: {e}" + "(%s) OSError removing JSON sensor file (%s): %s: %s", + name, + filename, + e.__class__.__qualname__, + e, ) - return {} - return sensor_attributes + # except Exception as e: + # _LOGGER.debug( + # "(%s) Unknown Exception removing JSON sensor file (%s): %s: %s", + # name, + # filename, + # e.__class__.__qualname__, + # e, + # ) + else: + _LOGGER.debug("(%s) JSON sensor file removed: %s", name, filename) + + +def _is_float(value: Any) -> bool: + if value is not None: + try: + float(value) + except ValueError: + return False + else: + return True + else: + return False class Places(SensorEntity): """Representation of a Places Sensor.""" def __init__( - self, hass, config, config_entry, name, unique_id, imported_attributes - ): + self, + hass: HomeAssistant, + config: MutableMapping[str, Any], + config_entry: ConfigEntry, + name: str, + unique_id: str, + imported_attributes: MutableMapping[str, Any], + ) -> None: """Initialize the sensor.""" self._attr_should_poll = True - _LOGGER.info(f"({name}) [Init] Places sensor: {name}") - _LOGGER.debug(f"({name}) [Init] System Locale: {locale.getlocale()}") + _LOGGER.info("(%s) [Init] Places sensor: %s", name, name) + _LOGGER.debug("(%s) [Init] System Locale: %s", name, locale.getlocale()) _LOGGER.debug( - f"({name}) [Init] System Locale Date Format: { - str(locale.nl_langinfo(locale.D_FMT))}" + "(%s) [Init] System Locale Date Format: %s", name, locale.nl_langinfo(locale.D_FMT) ) - _LOGGER.debug(f"({name}) [Init] HASS TimeZone: {hass.config.time_zone}") + _LOGGER.debug("(%s) [Init] HASS TimeZone: %s", name, hass.config.time_zone) self._warn_if_device_tracker_prob = False - self._internal_attr = {} + self._internal_attr: MutableMapping[str, Any] = {} self._set_attr(ATTR_INITIAL_UPDATE, True) self._config = config self._config_entry = config_entry - self._hass = hass + self._hass: HomeAssistant = hass self._set_attr(CONF_NAME, name) - self._attr_name = name + self._attr_name: str = name self._set_attr(CONF_UNIQUE_ID, unique_id) self._attr_unique_id = unique_id registry = er.async_get(self._hass) - current_entity_id = registry.async_get_entity_id( - PLATFORM, DOMAIN, self._attr_unique_id - ) + self._json_folder: str = hass.config.path("custom_components", DOMAIN, "json_sensors") + _LOGGER.debug("json_sensors Location: %s", self._json_folder) + current_entity_id = registry.async_get_entity_id(PLATFORM, DOMAIN, self._attr_unique_id) if current_entity_id is not None: - self._entity_id = current_entity_id + self._entity_id: str = current_entity_id else: self._entity_id = generate_entity_id( ENTITY_ID_FORMAT, slugify(name.lower()), hass=self._hass ) - _LOGGER.debug(f"({self._attr_name}) [Init] entity_id: {self._entity_id}") + _LOGGER.debug("(%s) [Init] entity_id: %s", self._attr_name, self._entity_id) + self._street_num_i: int = -1 + self._street_i: int = -1 + self._temp_i: int = 0 + self._adv_options_state_list: list = [] self._set_attr(CONF_ICON, DEFAULT_ICON) self._attr_icon = DEFAULT_ICON self._set_attr(CONF_API_KEY, config.get(CONF_API_KEY)) @@ -273,19 +337,15 @@ def __init__( CONF_DISPLAY_OPTIONS, config.setdefault(CONF_DISPLAY_OPTIONS, DEFAULT_DISPLAY_OPTIONS).lower(), ) - self._set_attr(CONF_DEVICETRACKER_ID, config.get(CONF_DEVICETRACKER_ID).lower()) + self._set_attr(CONF_DEVICETRACKER_ID, config[CONF_DEVICETRACKER_ID].lower()) # Consider reconciling this in the future - self._set_attr(ATTR_DEVICETRACKER_ID, config.get(CONF_DEVICETRACKER_ID).lower()) - self._set_attr( - CONF_HOME_ZONE, config.setdefault(CONF_HOME_ZONE, DEFAULT_HOME_ZONE).lower() - ) + self._set_attr(ATTR_DEVICETRACKER_ID, config[CONF_DEVICETRACKER_ID].lower()) + self._set_attr(CONF_HOME_ZONE, config.setdefault(CONF_HOME_ZONE, DEFAULT_HOME_ZONE).lower()) self._set_attr( CONF_MAP_PROVIDER, config.setdefault(CONF_MAP_PROVIDER, DEFAULT_MAP_PROVIDER).lower(), ) - self._set_attr( - CONF_MAP_ZOOM, int(config.setdefault(CONF_MAP_ZOOM, DEFAULT_MAP_ZOOM)) - ) + self._set_attr(CONF_MAP_ZOOM, int(config.setdefault(CONF_MAP_ZOOM, DEFAULT_MAP_ZOOM))) self._set_attr(CONF_LANGUAGE, config.get(CONF_LANGUAGE)) if not self._is_attr_blank(CONF_LANGUAGE): @@ -297,9 +357,7 @@ def __init__( CONF_EXTENDED_ATTR, config.setdefault(CONF_EXTENDED_ATTR, DEFAULT_EXTENDED_ATTR), ) - self._set_attr( - CONF_SHOW_TIME, config.setdefault(CONF_SHOW_TIME, DEFAULT_SHOW_TIME) - ) + self._set_attr(CONF_SHOW_TIME, config.setdefault(CONF_SHOW_TIME, DEFAULT_SHOW_TIME)) self._set_attr( CONF_DATE_FORMAT, config.setdefault(CONF_DATE_FORMAT, DEFAULT_DATE_FORMAT).lower(), @@ -311,8 +369,9 @@ def __init__( ) self._set_attr(ATTR_DISPLAY_OPTIONS, self._get_attr(CONF_DISPLAY_OPTIONS)) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) [Init] JSON Filename: " - f"{self._get_attr(ATTR_JSON_FILENAME)}" + "(%s) [Init] JSON Filename: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_JSON_FILENAME), ) self._attr_native_value = None # Represents the state in SensorEntity @@ -320,61 +379,39 @@ def __init__( if ( not self._is_attr_blank(CONF_HOME_ZONE) - and CONF_LATITUDE - in hass.states.get(self._get_attr(CONF_HOME_ZONE)).attributes - and hass.states.get(self._get_attr(CONF_HOME_ZONE)).attributes.get( - CONF_LATITUDE - ) + and CONF_LATITUDE in hass.states.get(self._get_attr(CONF_HOME_ZONE)).attributes + and hass.states.get(self._get_attr(CONF_HOME_ZONE)).attributes.get(CONF_LATITUDE) is not None - and self._is_float( - hass.states.get(self._get_attr(CONF_HOME_ZONE)).attributes.get( - CONF_LATITUDE - ) + and _is_float( + hass.states.get(self._get_attr(CONF_HOME_ZONE)).attributes.get(CONF_LATITUDE) ) ): self._set_attr( ATTR_HOME_LATITUDE, - str( - hass.states.get(self._get_attr(CONF_HOME_ZONE)).attributes.get( - CONF_LATITUDE - ) - ), + str(hass.states.get(self._get_attr(CONF_HOME_ZONE)).attributes.get(CONF_LATITUDE)), ) if ( not self._is_attr_blank(CONF_HOME_ZONE) - and CONF_LONGITUDE - in hass.states.get(self._get_attr(CONF_HOME_ZONE)).attributes - and hass.states.get(self._get_attr(CONF_HOME_ZONE)).attributes.get( - CONF_LONGITUDE - ) + and CONF_LONGITUDE in hass.states.get(self._get_attr(CONF_HOME_ZONE)).attributes + and hass.states.get(self._get_attr(CONF_HOME_ZONE)).attributes.get(CONF_LONGITUDE) is not None - and self._is_float( - hass.states.get(self._get_attr(CONF_HOME_ZONE)).attributes.get( - CONF_LONGITUDE - ) + and _is_float( + hass.states.get(self._get_attr(CONF_HOME_ZONE)).attributes.get(CONF_LONGITUDE) ) ): self._set_attr( ATTR_HOME_LONGITUDE, - str( - hass.states.get(self._get_attr(CONF_HOME_ZONE)).attributes.get( - CONF_LONGITUDE - ) - ), + str(hass.states.get(self._get_attr(CONF_HOME_ZONE)).attributes.get(CONF_LONGITUDE)), ) self._attr_entity_picture = ( - hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).attributes.get( - ATTR_PICTURE - ) + hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).attributes.get(ATTR_PICTURE) if hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)) else None ) self._set_attr(ATTR_SHOW_DATE, False) # self._set_attr(ATTR_UPDATES_SKIPPED, 0) - # sensor_attributes = self._hass.async_add_executor_job(self._get_dict_from_json_file, self._get_attr(CONF_NAME), self._get_attr(ATTR_JSON_FILENAME),) - # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [Init] Sensor Attributes to Import: {imported_attributes}") self._import_attributes_from_json(imported_attributes) ## # For debugging: @@ -386,28 +423,29 @@ def __init__( ## if not self._get_attr(ATTR_INITIAL_UPDATE): _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - "[Init] Sensor Attributes Imported from JSON file" + "(%s) [Init] Sensor Attributes Imported from JSON file", self._get_attr(CONF_NAME) ) self._cleanup_attributes() if self._get_attr(CONF_EXTENDED_ATTR): self._exclude_event_types() _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) [Init] Tracked Entity ID: " - f"{self._get_attr(CONF_DEVICETRACKER_ID)}" + "(%s) [Init] Tracked Entity ID: %s", + self._get_attr(CONF_NAME), + self._get_attr(CONF_DEVICETRACKER_ID), ) - def _exclude_event_types(self): + def _exclude_event_types(self) -> None: if RECORDER_INSTANCE in self._hass.data: ha_history_recorder = self._hass.data[RECORDER_INSTANCE] ha_history_recorder.exclude_event_types.add(EVENT_TYPE) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - f"exclude_event_types: {ha_history_recorder.exclude_event_types}" + "(%s) exclude_event_types: %s", + self._get_attr(CONF_NAME), + ha_history_recorder.exclude_event_types, ) async def async_added_to_hass(self) -> None: - """Added to hass.""" + """Run after sensor is added to HA.""" await super().async_added_to_hass() self.async_on_remove( async_track_state_change_event( @@ -417,39 +455,23 @@ async def async_added_to_hass(self) -> None: ) ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - "[Init] Subscribed to Tracked Entity state change events" + "(%s) [Init] Subscribed to Tracked Entity state change events", + self._get_attr(CONF_NAME), ) - def _remove_json_file(self, name, filename): - try: - os.remove(os.path.join(PLACES_JSON_FOLDER, filename)) - except OSError as e: - _LOGGER.debug( - f"({name}) OSError removing JSON sensor file " - f"({filename}): {e.__class__.__qualname__}: {e}" - ) - except Exception as e: - _LOGGER.debug( - f"({name}) Unknown Exception removing JSON sensor file " - f"({filename}): {e.__class__.__qualname__}: {e}" - ) - else: - _LOGGER.debug(f"({name}) JSON sensor file removed: {filename}") - async def async_will_remove_from_hass(self) -> None: """Run when entity will be removed from hass.""" await self._hass.async_add_executor_job( - self._remove_json_file, + _remove_json_file, self._get_attr(CONF_NAME), self._get_attr(ATTR_JSON_FILENAME), + self._json_folder, ) if RECORDER_INSTANCE in self._hass.data and self._get_attr(CONF_EXTENDED_ATTR): _LOGGER.debug( - f"({self._attr_name}) Removing entity exclusion from recorder: " - f"{self._entity_id}" + "(%s) Removing entity exclusion from recorder: %s", self._attr_name, self._entity_id ) # Only do this if no places entities with extended_attr exist ex_attr_count = 0 @@ -457,20 +479,19 @@ async def async_will_remove_from_hass(self) -> None: if ent.get(CONF_EXTENDED_ATTR): ex_attr_count += 1 - if ( - self._get_attr(CONF_EXTENDED_ATTR) and ex_attr_count == 1 - ) or ex_attr_count == 0: + if (self._get_attr(CONF_EXTENDED_ATTR) and ex_attr_count == 1) or ex_attr_count == 0: _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - f"Removing event exclusion from recorder: {EVENT_TYPE}" + "(%s) Removing event exclusion from recorder: %s", + self._get_attr(CONF_NAME), + EVENT_TYPE, ) ha_history_recorder = self._hass.data[RECORDER_INSTANCE] ha_history_recorder.exclude_event_types.discard(EVENT_TYPE) @property - def extra_state_attributes(self): + def extra_state_attributes(self) -> dict[str, Any]: """Return the state attributes.""" - return_attr = {} + return_attr: dict[str, Any] = {} self._cleanup_attributes() for attr in EXTRA_STATE_ATTRIBUTE_LIST: if self._get_attr(attr): @@ -483,9 +504,9 @@ def extra_state_attributes(self): # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) Extra State Attributes: {return_attr}") return return_attr - def _import_attributes_from_json(self, json_attr=None): + def _import_attributes_from_json(self, json_attr: MutableMapping[str, Any]) -> None: """Import the JSON state attributes. Takes a Dictionary as input.""" - if json_attr is None or not isinstance(json_attr, dict) or not json_attr: + if not isinstance(json_attr, MutableMapping): return self._set_attr(ATTR_INITIAL_UPDATE, False) @@ -501,45 +522,34 @@ def _import_attributes_from_json(self, json_attr=None): json_attr.pop(attr, None) if json_attr is not None and json_attr: _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - f"[import_attributes] Attributes not imported: {json_attr}" + "(%s) [import_attributes] Attributes not imported: %s", + self._get_attr(CONF_NAME), + json_attr, ) - def _cleanup_attributes(self): + def _cleanup_attributes(self) -> None: for attr in list(self._internal_attr): if self._is_attr_blank(attr): self._clear_attr(attr) - def _is_attr_blank(self, attr): + def _is_attr_blank(self, attr: str) -> bool: if self._internal_attr.get(attr) or self._internal_attr.get(attr) == 0: return False - else: - return True + return True - def _get_attr(self, attr, default=None): + def _get_attr(self, attr: Any, default=None): # None | Any: if attr is None or (default is None and self._is_attr_blank(attr)): return None - else: - return self._internal_attr.get(attr, default) + return self._internal_attr.get(attr, default) - def _set_attr(self, attr, value=None): + def _set_attr(self, attr: str, value=None) -> None: if attr is not None: self._internal_attr.update({attr: value}) - def _clear_attr(self, attr): + def _clear_attr(self, attr: str) -> None: self._internal_attr.pop(attr, None) - def _is_float(self, value): - if value is not None: - try: - float(value) - return True - except ValueError: - return False - else: - return False - - async def _async_is_devicetracker_set(self): + async def _async_is_devicetracker_set(self) -> int: proceed_with_update = 0 # 0: False. 1: True. 2: False, but set direction of travel to stationary @@ -552,21 +562,23 @@ async def _async_is_devicetracker_set(self): str, ) and self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).lower() - in ["none", STATE_UNKNOWN, STATE_UNAVAILABLE] + in {"none", STATE_UNKNOWN, STATE_UNAVAILABLE} ) ): if self._warn_if_device_tracker_prob or self._get_attr(ATTR_INITIAL_UPDATE): _LOGGER.warning( - f"({self._get_attr(CONF_NAME)}) Tracked Entity " - f"({self._get_attr(CONF_DEVICETRACKER_ID)}) " - "is not set or is not available. Not Proceeding with Update." + "(%s) Tracked Entity (%s) " + "is not set or is not available. Not Proceeding with Update", + self._get_attr(CONF_NAME), + self._get_attr(CONF_DEVICETRACKER_ID), ) self._warn_if_device_tracker_prob = False else: _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) Tracked Entity " - f"({self._get_attr(CONF_DEVICETRACKER_ID)}) " - "is not set or is not available. Not Proceeding with Update." + "(%s) Tracked Entity (%s) " + "is not set or is not available. Not Proceeding with Update", + self._get_attr(CONF_NAME), + self._get_attr(CONF_DEVICETRACKER_ID), ) return 0 # 0: False. 1: True. 2: False, but set direction of travel to stationary @@ -579,23 +591,23 @@ async def _async_is_devicetracker_set(self): in self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).attributes and CONF_LONGITUDE in self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).attributes - and self._hass.states.get( - self._get_attr(CONF_DEVICETRACKER_ID) - ).attributes.get(CONF_LATITUDE) + and self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).attributes.get( + CONF_LATITUDE + ) is not None - and self._hass.states.get( - self._get_attr(CONF_DEVICETRACKER_ID) - ).attributes.get(CONF_LONGITUDE) + and self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).attributes.get( + CONF_LONGITUDE + ) is not None - and self._is_float( - self._hass.states.get( - self._get_attr(CONF_DEVICETRACKER_ID) - ).attributes.get(CONF_LATITUDE) + and _is_float( + self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).attributes.get( + CONF_LATITUDE + ) ) - and self._is_float( - self._hass.states.get( - self._get_attr(CONF_DEVICETRACKER_ID) - ).attributes.get(CONF_LONGITUDE) + and _is_float( + self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).attributes.get( + CONF_LONGITUDE + ) ) ): self._warn_if_device_tracker_prob = True @@ -604,37 +616,38 @@ async def _async_is_devicetracker_set(self): else: if self._warn_if_device_tracker_prob or self._get_attr(ATTR_INITIAL_UPDATE): _LOGGER.warning( - f"({self._get_attr(CONF_NAME)}) Tracked Entity " - f"({self._get_attr(CONF_DEVICETRACKER_ID)}) " - "Latitude/Longitude is not set or is not a number. " - "Not Proceeding with Update." + "(%s) Tracked Entity (%s) " + "Latitude/Longitude is not set or is not a number. Not Proceeding with Update.", + self._get_attr(CONF_NAME), + self._get_attr(CONF_DEVICETRACKER_ID), ) self._warn_if_device_tracker_prob = False else: _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) Tracked Entity " - f"({self._get_attr(CONF_DEVICETRACKER_ID)}) " - "Latitude/Longitude is not set or is not a number. " - "Not Proceeding with Update." + "(%s) Tracked Entity (%s) " + "Latitude/Longitude is not set or is not a number. Not Proceeding with Update.", + self._get_attr(CONF_NAME), + self._get_attr(CONF_DEVICETRACKER_ID), ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Tracked Entity " - f"({self._get_attr(CONF_DEVICETRACKER_ID)}) details: " - f"{self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID))}" + "(%s) Tracked Entity (%s) details: %s", + self._get_attr(CONF_NAME), + self._get_attr(CONF_DEVICETRACKER_ID), + self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)), ) return 0 # 0: False. 1: True. 2: False, but set direction of travel to stationary return proceed_with_update @Throttle(MIN_THROTTLE_INTERVAL) - @core.callback - def _async_tsc_update(self, event: Event[EventStateChangedData]): - """Call the _async_do_update function based on the TSC (track state change) event""" + @callback + def _async_tsc_update(self, event: Event[EventStateChangedData]) -> None: + """Call the _async_do_update function based on the TSC (track state change) event.""" # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [TSC Update] event: {event}") new_state = event.data["new_state"] if new_state is None or ( isinstance(new_state.state, str) - and new_state.state.lower() in ["none", STATE_UNKNOWN, STATE_UNAVAILABLE] + and new_state.state.lower() in {"none", STATE_UNKNOWN, STATE_UNAVAILABLE} ): return # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [TSC Update] new_state: {new_state}") @@ -643,80 +656,70 @@ def _async_tsc_update(self, event: Event[EventStateChangedData]): self._hass.async_create_task(self._async_do_update(update_type)) @Throttle(THROTTLE_INTERVAL) - async def async_update(self): - """Call the _async_do_update function based on scan interval and throttle""" + async def async_update(self) -> None: + """Call the _async_do_update function based on scan interval and throttle.""" update_type = "Scan Interval" self._hass.async_create_task(self._async_do_update(update_type)) - async def _async_clear_since_from_state(self, orig_state): + @staticmethod + async def _async_clear_since_from_state(orig_state: str) -> str: return re.sub(r" \(since \d\d[:/]\d\d\)", "", orig_state) - async def _async_in_zone(self): + async def _async_in_zone(self) -> bool: if not self._is_attr_blank(ATTR_DEVICETRACKER_ZONE): zone_state = self._hass.states.get( f"{CONF_ZONE}.{(self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower()}" ) - if (self._get_attr(CONF_DEVICETRACKER_ID)).split(".")[0] == CONF_ZONE: - return False - elif ( - "stationary" in (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower() - or (self._get_attr(ATTR_DEVICETRACKER_ZONE)) - .lower() - .startswith("statzon") - or (self._get_attr(ATTR_DEVICETRACKER_ZONE)) - .lower() - .startswith("ic3_statzone_") - or (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower() == "away" - or (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower() == "not_home" - or (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower() == "notset" - or (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower() == "not_set" - ): - return False - elif ( - zone_state is not None - and zone_state.attributes.get(ATTR_PASSIVE, False) is True + if ( + (self._get_attr(CONF_DEVICETRACKER_ID)).split(".")[0] == CONF_ZONE + or ( + "stationary" in (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower() + or (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower().startswith("statzon") + or (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower().startswith("ic3_statzone_") + or (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower() == "away" + or (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower() == "not_home" + or (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower() == "notset" + or (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower() == "not_set" + ) + or ( + zone_state is not None + and zone_state.attributes.get(ATTR_PASSIVE, False) is True + ) ): return False - else: - return True - else: - return False + return True + return False - async def _async_cleanup_attributes(self): + async def _async_cleanup_attributes(self) -> None: for attr in list(self._internal_attr): if self._is_attr_blank(attr): self._clear_attr(attr) - async def _async_check_for_updated_entity_name(self): + async def _async_check_for_updated_entity_name(self) -> None: if hasattr(self, "entity_id") and self._entity_id is not None: # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) Entity ID: {self._entity_id}") if ( self._hass.states.get(str(self._entity_id)) is not None - and self._hass.states.get(str(self._entity_id)).attributes.get( - ATTR_FRIENDLY_NAME - ) + and self._hass.states.get(str(self._entity_id)).attributes.get(ATTR_FRIENDLY_NAME) is not None and self._get_attr(CONF_NAME) - != self._hass.states.get(str(self._entity_id)).attributes.get( - ATTR_FRIENDLY_NAME - ) + != self._hass.states.get(str(self._entity_id)).attributes.get(ATTR_FRIENDLY_NAME) ): _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Sensor Name Changed. Updating Name to: " - f"{self._hass.states.get( - str(self._entity_id)).attributes.get(ATTR_FRIENDLY_NAME)}" + "(%s) Sensor Name Changed. Updating Name to: %s", + self._get_attr(CONF_NAME), + self._hass.states.get(str(self._entity_id)).attributes.get(ATTR_FRIENDLY_NAME), ) self._set_attr( CONF_NAME, - self._hass.states.get(str(self._entity_id)).attributes.get( - ATTR_FRIENDLY_NAME - ), + self._hass.states.get(str(self._entity_id)).attributes.get(ATTR_FRIENDLY_NAME), ) self._config.update({CONF_NAME: self._get_attr(CONF_NAME)}) self._set_attr(CONF_NAME, self._get_attr(CONF_NAME)) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Updated Config Name: " - f"{self._config.get(CONF_NAME, None)}" + "(%s) Updated Config Name: %s", + self._get_attr(CONF_NAME), + self._config.get(CONF_NAME), ) self._hass.config_entries.async_update_entry( self._config_entry, @@ -724,11 +727,12 @@ async def _async_check_for_updated_entity_name(self): options=self._config_entry.options, ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Updated ConfigEntry Name: " - f"{self._config_entry.data.get(CONF_NAME)}" + "(%s) Updated ConfigEntry Name: %s", + self._get_attr(CONF_NAME), + self._config_entry.data.get(CONF_NAME), ) - async def _async_get_zone_details(self): + async def _async_get_zone_details(self) -> None: if (self._get_attr(CONF_DEVICETRACKER_ID)).split(".")[0] != CONF_ZONE: self._set_attr( ATTR_DEVICETRACKER_ZONE, @@ -741,26 +745,17 @@ async def _async_get_zone_details(self): ).attributes.get(CONF_ZONE) if devicetracker_zone_id is not None: devicetracker_zone_id = f"{CONF_ZONE}.{devicetracker_zone_id}" - devicetracker_zone_name_state = self._hass.states.get( - devicetracker_zone_id - ) + devicetracker_zone_name_state = self._hass.states.get(devicetracker_zone_id) # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) Tracked Entity Zone ID: {devicetracker_zone_id}") # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) Tracked Entity Zone Name State: {devicetracker_zone_name_state}") if devicetracker_zone_name_state is not None: - if ( - devicetracker_zone_name_state.attributes.get(CONF_FRIENDLY_NAME) - is not None - ): + if devicetracker_zone_name_state.attributes.get(CONF_FRIENDLY_NAME) is not None: self._set_attr( ATTR_DEVICETRACKER_ZONE_NAME, - devicetracker_zone_name_state.attributes.get( - CONF_FRIENDLY_NAME - ), + devicetracker_zone_name_state.attributes.get(CONF_FRIENDLY_NAME), ) else: - self._set_attr( - ATTR_DEVICETRACKER_ZONE_NAME, devicetracker_zone_name_state.name - ) + self._set_attr(ATTR_DEVICETRACKER_ZONE_NAME, devicetracker_zone_name_state.name) else: self._set_attr( ATTR_DEVICETRACKER_ZONE_NAME, @@ -775,158 +770,182 @@ async def _async_get_zone_details(self): (self._get_attr(ATTR_DEVICETRACKER_ZONE_NAME)).title(), ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Tracked Entity Zone Name: " - f"{self._get_attr(ATTR_DEVICETRACKER_ZONE_NAME)}" + "(%s) Tracked Entity Zone Name: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_DEVICETRACKER_ZONE_NAME), ) else: _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Tracked Entity Zone: " - f"{self._get_attr(ATTR_DEVICETRACKER_ZONE)}" + "(%s) Tracked Entity Zone: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_DEVICETRACKER_ZONE), ) self._set_attr( ATTR_DEVICETRACKER_ZONE_NAME, self._get_attr(ATTR_DEVICETRACKER_ZONE), ) - async def _async_determine_if_update_needed(self): + async def _async_determine_if_update_needed(self) -> int: proceed_with_update = 1 # 0: False. 1: True. 2: False, but set direction of travel to stationary if self._get_attr(ATTR_INITIAL_UPDATE): - _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) Performing Initial Update for user..." - ) + _LOGGER.info("(%s) Performing Initial Update for user", self._get_attr(CONF_NAME)) # 0: False. 1: True. 2: False, but set direction of travel to stationary return 1 - elif self._is_attr_blank(ATTR_NATIVE_VALUE) or ( + if self._is_attr_blank(ATTR_NATIVE_VALUE) or ( isinstance(self._get_attr(ATTR_NATIVE_VALUE), str) and (self._get_attr(ATTR_NATIVE_VALUE)).lower() - in ["none", STATE_UNKNOWN, STATE_UNAVAILABLE] + in {"none", STATE_UNKNOWN, STATE_UNAVAILABLE} ): _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) Previous State is Unknown, performing update." + "(%s) Previous State is Unknown, performing update", self._get_attr(CONF_NAME) ) # 0: False. 1: True. 2: False, but set direction of travel to stationary return 1 - elif self._get_attr(ATTR_LOCATION_CURRENT) == self._get_attr( - ATTR_LOCATION_PREVIOUS - ): + if self._get_attr(ATTR_LOCATION_CURRENT) == self._get_attr(ATTR_LOCATION_PREVIOUS): _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) " - "Not performing update because coordinates are identical" + "(%s) Not performing update because coordinates are identical", + self._get_attr(CONF_NAME), ) return 2 # 0: False. 1: True. 2: False, but set direction of travel to stationary - elif int(self._get_attr(ATTR_DISTANCE_TRAVELED_M)) < 10: + if int(self._get_attr(ATTR_DISTANCE_TRAVELED_M)) < 10: _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) " - "Not performing update, distance traveled from last update is less than 10 m (" - f"{round(self._get_attr(ATTR_DISTANCE_TRAVELED_M), 1)} m)" + "(%s) " + "Not performing update, distance traveled from last update is less than 10 m (%s m)", + self._get_attr(CONF_NAME), + round(self._get_attr(ATTR_DISTANCE_TRAVELED_M), 1), ) return 2 # 0: False. 1: True. 2: False, but set direction of travel to stationary return proceed_with_update # 0: False. 1: True. 2: False, but set direction of travel to stationary - def _get_dict_from_url(self, url, name, dict_name): - get_dict = {} - _LOGGER.info(f"({self._get_attr(CONF_NAME)}) Requesting data for {name}") - _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) {name} URL: {url}") + def _get_dict_from_url(self, url: str, name: str, dict_name: str) -> None: + get_dict: MutableMapping[str, Any] = {} + _LOGGER.info("(%s) Requesting data for %s", self._get_attr(CONF_NAME), name) + _LOGGER.debug("(%s) {name} URL: %s", self._get_attr(CONF_NAME), url) self._set_attr(dict_name, {}) - headers = {"user-agent": f"Mozilla/5.0 (Home Assistant) {DOMAIN}/{VERSION}"} + headers: dict[str, str] = {"user-agent": f"Mozilla/5.0 (Home Assistant) {DOMAIN}/{VERSION}"} try: get_response = requests.get(url, headers=headers) except requests.exceptions.RetryError as e: get_response = None _LOGGER.warning( - f"({self._get_attr(CONF_NAME)}) Retry Error connecting to " - f"{name} [{e.__class__.__qualname__}: {e}]: {url}" + "(%s) Retry Error connecting to %s [%s: %s]: %s", + self._get_attr(CONF_NAME), + name, + e.__class__.__qualname__, + e, + url, ) return except requests.exceptions.ConnectionError as e: get_response = None _LOGGER.warning( - f"({self._get_attr(CONF_NAME)}) Connection Error connecting to " - f"{name} [{e.__class__.__qualname__}: {e}]: {url}" + "(%s) Connection Error connecting to %s [%s: %s]: %s", + self._get_attr(CONF_NAME), + name, + e.__class__.__qualname__, + e, + url, ) return except requests.exceptions.HTTPError as e: get_response = None _LOGGER.warning( - f"({self._get_attr(CONF_NAME)}) HTTP Error connecting to " - f"{name} [{e.__class__.__qualname__}: {e}]: {url}" + "(%s) HTTP Error connecting to %s [%s: %s]: %s", + self._get_attr(CONF_NAME), + name, + e.__class__.__qualname__, + e, + url, ) return except requests.exceptions.Timeout as e: get_response = None _LOGGER.warning( - f"({self._get_attr(CONF_NAME)}) Timeout connecting to " - f"{name} [{e.__class__.__qualname__}: {e}]: {url}" + "(%s) Timeout connecting to %s [%s: %s]: %s", + self._get_attr(CONF_NAME), + name, + e.__class__.__qualname__, + e, + url, ) return except OSError as e: # Includes error code 101, network unreachable get_response = None _LOGGER.warning( - f"({self._get_attr(CONF_NAME)}) " - f"Network unreachable error when connecting to {name} " - f"[{e.__class__.__qualname__}: {e}]: {url}" + "(%s) Network unreachable error when connecting to %s [%s: %s]: %s", + self._get_attr(CONF_NAME), + name, + e.__class__.__qualname__, + e, + url, ) return except NewConnectionError as e: get_response = None _LOGGER.warning( - f"({self._get_attr(CONF_NAME)}) " - f"New Connection Error connecting to {name} " - f"[{e.__class__.__qualname__}: {e}]: {url}" - ) - return - except Exception as e: - get_response = None - _LOGGER.warning( - f"({self._get_attr(CONF_NAME)}) " - f"Unknown Exception connecting to {name} " - f"[{e.__class__.__qualname__}: {e}]: {url}" + "(%s) New Connection Error connecting to %s [%s: %s]: %s", + self._get_attr(CONF_NAME), + name, + e.__class__.__qualname__, + e, + url, ) return - - get_json_input = {} - if get_response is not None and get_response: + # except Exception as e: + # get_response = None + # _LOGGER.warning( + # "(%s) Unknown Exception connecting to %s [%s: %s]: %s", + # self._get_attr(CONF_NAME), + # name, + # e.__class__.__qualname__, + # e, + # url, + # ) + # return + + get_json_input: str | None = None + if get_response: get_json_input = get_response.text - _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) {name} Response: {get_json_input}" - ) + _LOGGER.debug("(%s) %s Response: %s", self._get_attr(CONF_NAME), name, get_json_input) - if get_json_input is not None and get_json_input: + if get_json_input: try: get_dict = json.loads(get_json_input) except json.decoder.JSONDecodeError as e: _LOGGER.warning( - f"({self._get_attr(CONF_NAME)}) JSON Decode Error with {name} info " - f"[{e.__class__.__qualname__}: {e}]: {get_json_input}" + "(%s) JSON Decode Error with %s info [%s: %s]: %s", + self._get_attr(CONF_NAME), + name, + e.__class__.__qualname__, + e, + get_json_input, ) return if "error_message" in get_dict: _LOGGER.warning( - f"({self._get_attr(CONF_NAME)}) An error occurred contacting the web service for " - f"{name}: {get_dict.get('error_message')}" + "(%s) An error occurred contacting the web service for %s: %s", + self._get_attr(CONF_NAME), + name, + get_dict.get("error_message"), ) return - if ( - isinstance(get_dict, list) - and len(get_dict) == 1 - and isinstance(get_dict[0], dict) - ): + if isinstance(get_dict, list) and len(get_dict) == 1 and isinstance(get_dict[0], dict): self._set_attr(dict_name, get_dict[0]) return self._set_attr(dict_name, get_dict) return - async def _async_get_map_link(self): + async def _async_get_map_link(self) -> None: if self._get_attr(CONF_MAP_PROVIDER) == "google": self._set_attr( ATTR_MAP_LINK, @@ -959,42 +978,41 @@ async def _async_get_map_link(self): ), ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Map Link Type: " - f"{self._get_attr(CONF_MAP_PROVIDER)}" + "(%s) Map Link Type: %s", self._get_attr(CONF_NAME), self._get_attr(CONF_MAP_PROVIDER) ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Map Link URL: " - f"{self._get_attr(ATTR_MAP_LINK)}" + "(%s) Map Link URL: %s", self._get_attr(CONF_NAME), self._get_attr(ATTR_MAP_LINK) ) - async def _async_get_gps_accuracy(self): + async def _async_get_gps_accuracy(self) -> int: if ( self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)) and self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).attributes and ATTR_GPS_ACCURACY in self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).attributes - and self._hass.states.get( - self._get_attr(CONF_DEVICETRACKER_ID) - ).attributes.get(ATTR_GPS_ACCURACY) + and self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).attributes.get( + ATTR_GPS_ACCURACY + ) is not None - and self._is_float( - self._hass.states.get( - self._get_attr(CONF_DEVICETRACKER_ID) - ).attributes.get(ATTR_GPS_ACCURACY) + and _is_float( + self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).attributes.get( + ATTR_GPS_ACCURACY + ) ) ): self._set_attr( ATTR_GPS_ACCURACY, float( - self._hass.states.get( - self._get_attr(CONF_DEVICETRACKER_ID) - ).attributes.get(ATTR_GPS_ACCURACY) + self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).attributes.get( + ATTR_GPS_ACCURACY + ) ), ) else: _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) GPS Accuracy attribute not found in: " - f"{self._get_attr(CONF_DEVICETRACKER_ID)}" + "(%s) GPS Accuracy attribute not found in: %s", + self._get_attr(CONF_NAME), + self._get_attr(CONF_DEVICETRACKER_ID), ) proceed_with_update = 1 # 0: False. 1: True. 2: False, but set direction of travel to stationary @@ -1004,16 +1022,17 @@ async def _async_get_gps_accuracy(self): proceed_with_update = 0 # 0: False. 1: True. 2: False, but set direction of travel to stationary _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) GPS Accuracy is 0.0, not performing update" + "(%s) GPS Accuracy is 0.0, not performing update", self._get_attr(CONF_NAME) ) else: _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) GPS Accuracy: " - f"{round(self._get_attr(ATTR_GPS_ACCURACY), 3)}" + "(%s) GPS Accuracy: %s", + self._get_attr(CONF_NAME), + round(self._get_attr(ATTR_GPS_ACCURACY), 3), ) return proceed_with_update - async def _async_get_driving_status(self): + async def _async_get_driving_status(self) -> None: self._clear_attr(ATTR_DRIVING) isDriving = False if not await self._async_in_zone(): @@ -1025,7 +1044,7 @@ async def _async_get_driving_status(self): if isDriving: self._set_attr(ATTR_DRIVING, "Driving") - async def _async_parse_osm_dict(self): + async def _async_parse_osm_dict(self) -> None: if "type" in (self._get_attr(ATTR_OSM_DICT)): self._set_attr(ATTR_PLACE_TYPE, self._get_attr(ATTR_OSM_DICT).get("type")) if self._get_attr(ATTR_PLACE_TYPE) == "yes": @@ -1036,9 +1055,9 @@ async def _async_parse_osm_dict(self): ) else: self._clear_attr(ATTR_PLACE_TYPE) - if "address" in (self._get_attr(ATTR_OSM_DICT)) and self._get_attr( - ATTR_PLACE_TYPE - ) in (self._get_attr(ATTR_OSM_DICT)).get("address"): + if "address" in (self._get_attr(ATTR_OSM_DICT)) and self._get_attr(ATTR_PLACE_TYPE) in ( + self._get_attr(ATTR_OSM_DICT) + ).get("address"): self._set_attr( ATTR_PLACE_NAME, self._get_attr(ATTR_OSM_DICT) @@ -1070,9 +1089,7 @@ async def _async_parse_osm_dict(self): ) if not self._is_attr_blank(CONF_LANGUAGE): for language in (self._get_attr(CONF_LANGUAGE)).split(","): - if "name:" + language in (self._get_attr(ATTR_OSM_DICT)).get( - "namedetails" - ): + if "name:" + language in (self._get_attr(ATTR_OSM_DICT)).get("namedetails"): self._set_attr( ATTR_PLACE_NAME, self._get_attr(ATTR_OSM_DICT) @@ -1088,11 +1105,7 @@ async def _async_parse_osm_dict(self): if "house_number" in (self._get_attr(ATTR_OSM_DICT)).get("address"): self._set_attr( ATTR_STREET_NUMBER, - ( - (self._get_attr(ATTR_OSM_DICT)) - .get("address") - .get("house_number") - ), + ((self._get_attr(ATTR_OSM_DICT)).get("address").get("house_number")), ) if "road" in (self._get_attr(ATTR_OSM_DICT)).get("address"): self._set_attr( @@ -1113,11 +1126,10 @@ async def _async_parse_osm_dict(self): self._get_attr(ATTR_OSM_DICT).get("address").get("retail"), ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Place Name: " - f"{self._get_attr(ATTR_PLACE_NAME)}" + "(%s) Place Name: %s", self._get_attr(CONF_NAME), self._get_attr(ATTR_PLACE_NAME) ) - CITY_LIST = [ + CITY_LIST: list[str] = [ "city", "town", "village", @@ -1126,7 +1138,7 @@ async def _async_parse_osm_dict(self): "city_district", "municipality", ] - POSTAL_TOWN_LIST = [ + POSTAL_TOWN_LIST: list[str] = [ "city", "town", "village", @@ -1135,7 +1147,7 @@ async def _async_parse_osm_dict(self): "borough", "suburb", ] - NEIGHBOURHOOD_LIST = [ + NEIGHBOURHOOD_LIST: list[str] = [ "village", "township", "hamlet", @@ -1144,44 +1156,35 @@ async def _async_parse_osm_dict(self): "quarter", "neighbourhood", ] - _LOGGER.debug(f"CITY_LIST: {CITY_LIST}") + _LOGGER.debug("CITY_LIST: %s", CITY_LIST) for city_type in CITY_LIST: - try: + with contextlib.suppress(ValueError): POSTAL_TOWN_LIST.remove(city_type) - except ValueError: - pass - try: + + with contextlib.suppress(ValueError): NEIGHBOURHOOD_LIST.remove(city_type) - except ValueError: - pass if city_type in (self._get_attr(ATTR_OSM_DICT)).get("address"): self._set_attr( ATTR_CITY, (self._get_attr(ATTR_OSM_DICT)).get("address").get(city_type), ) break - _LOGGER.debug(f"POSTAL_TOWN_LIST: {POSTAL_TOWN_LIST}") + _LOGGER.debug("POSTAL_TOWN_LIST: %s", POSTAL_TOWN_LIST) for postal_town_type in POSTAL_TOWN_LIST: - try: + with contextlib.suppress(ValueError): NEIGHBOURHOOD_LIST.remove(postal_town_type) - except ValueError: - pass if postal_town_type in (self._get_attr(ATTR_OSM_DICT)).get("address"): self._set_attr( ATTR_POSTAL_TOWN, - (self._get_attr(ATTR_OSM_DICT)) - .get("address") - .get(postal_town_type), + (self._get_attr(ATTR_OSM_DICT)).get("address").get(postal_town_type), ) break - _LOGGER.debug(f"NEIGHBOURHOOD_LIST: {NEIGHBOURHOOD_LIST}") + _LOGGER.debug("NEIGHBOURHOOD_LIST: %s", NEIGHBOURHOOD_LIST) for neighbourhood_type in NEIGHBOURHOOD_LIST: if neighbourhood_type in (self._get_attr(ATTR_OSM_DICT)).get("address"): self._set_attr( ATTR_PLACE_NEIGHBOURHOOD, - (self._get_attr(ATTR_OSM_DICT)) - .get("address") - .get(neighbourhood_type), + (self._get_attr(ATTR_OSM_DICT)).get("address").get(neighbourhood_type), ) break @@ -1225,10 +1228,7 @@ async def _async_parse_osm_dict(self): if "country_code" in (self._get_attr(ATTR_OSM_DICT)).get("address"): self._set_attr( ATTR_COUNTRY_CODE, - (self._get_attr(ATTR_OSM_DICT)) - .get("address") - .get("country_code") - .upper(), + (self._get_attr(ATTR_OSM_DICT)).get("address").get("country_code").upper(), ) if "postcode" in (self._get_attr(ATTR_OSM_DICT)).get("address"): self._set_attr( @@ -1259,7 +1259,7 @@ async def _async_parse_osm_dict(self): and (self._get_attr(ATTR_OSM_DICT)).get("namedetails") is not None and "ref" in (self._get_attr(ATTR_OSM_DICT)).get("namedetails") ): - street_refs = re.split( + street_refs: list = re.split( r"[\;\\\/\,\.\:]", (self._get_attr(ATTR_OSM_DICT)).get("namedetails").get("ref"), ) @@ -1271,14 +1271,19 @@ async def _async_parse_osm_dict(self): break if not self._is_attr_blank(ATTR_STREET_REF): _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Street: " - f"{self._get_attr(ATTR_STREET)} / " - f"Street Ref: {self._get_attr(ATTR_STREET_REF)}" + "(%s) Street: %s / Street Ref: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_STREET), + self._get_attr(ATTR_STREET_REF), ) - dupe_attributes_check = [] - for attr in PLACE_NAME_DUPLICATE_LIST: - if not self._is_attr_blank(attr): - dupe_attributes_check.append(self._get_attr(attr)) + dupe_attributes_check: list = [] + dupe_attributes_check.extend( + [ + self._get_attr(attr) + for attr in PLACE_NAME_DUPLICATE_LIST + if not self._is_attr_blank(attr) + ] + ) if ( not self._is_attr_blank(ATTR_PLACE_NAME) and self._get_attr(ATTR_PLACE_NAME) not in dupe_attributes_check @@ -1286,24 +1291,28 @@ async def _async_parse_osm_dict(self): self._set_attr(ATTR_PLACE_NAME_NO_DUPE, self._get_attr(ATTR_PLACE_NAME)) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - "Entity attributes after parsing OSM Dict: " - f"{self._internal_attr}" + "(%s) Entity attributes after parsing OSM Dict: %s", + self._get_attr(CONF_NAME), + self._internal_attr, ) - async def _async_build_formatted_place(self): - formatted_place_array = [] + async def _async_build_formatted_place(self) -> None: + formatted_place_array: list = [] if not await self._async_in_zone(): if not self._is_attr_blank(ATTR_DRIVING) and "driving" in ( self._get_attr(ATTR_DISPLAY_OPTIONS_LIST) ): formatted_place_array.append(self._get_attr(ATTR_DRIVING)) # Don't use place name if the same as another attributes - use_place_name = True - sensor_attributes_values = [] - for attr in PLACE_NAME_DUPLICATE_LIST: - if not self._is_attr_blank(attr): - sensor_attributes_values.append(self._get_attr(attr)) + use_place_name: bool = True + sensor_attributes_values: list = [] + sensor_attributes_values.extend( + [ + self._get_attr(attr) + for attr in PLACE_NAME_DUPLICATE_LIST + if not self._is_attr_blank(attr) + ] + ) # if not self._is_attr_blank(ATTR_PLACE_NAME): # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) Duplicated List [Place Name: {self._get_attr(ATTR_PLACE_NAME)}]: {sensor_attributes_values}") if self._is_attr_blank(ATTR_PLACE_NAME): @@ -1312,9 +1321,7 @@ async def _async_build_formatted_place(self): elif self._get_attr(ATTR_PLACE_NAME) in sensor_attributes_values: # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) Not Using Place Name: {self._get_attr(ATTR_PLACE_NAME)}") use_place_name = False - _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) use_place_name: {use_place_name}" - ) + _LOGGER.debug("(%s) use_place_name: %s", self._get_attr(CONF_NAME), use_place_name) if not use_place_name: if ( not self._is_attr_blank(ATTR_PLACE_TYPE) @@ -1336,31 +1343,24 @@ async def _async_build_formatted_place(self): (self._get_attr(ATTR_PLACE_CATEGORY)).title().strip() ) street = None - if self._is_attr_blank(ATTR_STREET) and not self._is_attr_blank( - ATTR_STREET_REF - ): + if self._is_attr_blank(ATTR_STREET) and not self._is_attr_blank(ATTR_STREET_REF): street = (self._get_attr(ATTR_STREET_REF)).strip() - _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Using street_ref: {street}" - ) + _LOGGER.debug("(%s) Using street_ref: %s", self._get_attr(CONF_NAME), street) elif not self._is_attr_blank(ATTR_STREET): if ( not self._is_attr_blank(ATTR_PLACE_CATEGORY) and (self._get_attr(ATTR_PLACE_CATEGORY)).lower() == "highway" and not self._is_attr_blank(ATTR_PLACE_TYPE) - and (self._get_attr(ATTR_PLACE_TYPE)).lower() - in ["motorway", "trunk"] + and (self._get_attr(ATTR_PLACE_TYPE)).lower() in {"motorway", "trunk"} and not self._is_attr_blank(ATTR_STREET_REF) ): street = (self._get_attr(ATTR_STREET_REF)).strip() _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Using street_ref: {street}" + "(%s) Using street_ref: %s", self._get_attr(CONF_NAME), street ) else: street = (self._get_attr(ATTR_STREET)).strip() - _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Using street: {street}" - ) + _LOGGER.debug("(%s) Using street: %s", self._get_attr(CONF_NAME), street) if street and self._is_attr_blank(ATTR_STREET_NUMBER): formatted_place_array.append(street) elif street and not self._is_attr_blank(ATTR_STREET_NUMBER): @@ -1372,9 +1372,7 @@ async def _async_build_formatted_place(self): and (self._get_attr(ATTR_PLACE_TYPE)).lower() == "house" and not self._is_attr_blank(ATTR_PLACE_NEIGHBOURHOOD) ): - formatted_place_array.append( - (self._get_attr(ATTR_PLACE_NEIGHBOURHOOD)).strip() - ) + formatted_place_array.append((self._get_attr(ATTR_PLACE_NEIGHBOURHOOD)).strip()) else: formatted_place_array.append((self._get_attr(ATTR_PLACE_NAME)).strip()) @@ -1387,38 +1385,36 @@ async def _async_build_formatted_place(self): if not self._is_attr_blank(ATTR_STATE_ABBR): formatted_place_array.append(self._get_attr(ATTR_STATE_ABBR)) else: - formatted_place_array.append( - (self._get_attr(ATTR_DEVICETRACKER_ZONE_NAME)).strip() - ) - formatted_place = ", ".join(item for item in formatted_place_array) + formatted_place_array.append((self._get_attr(ATTR_DEVICETRACKER_ZONE_NAME)).strip()) + formatted_place: str = ", ".join(item for item in formatted_place_array) formatted_place = formatted_place.replace("\n", " ").replace(" ", " ").strip() self._set_attr(ATTR_FORMATTED_PLACE, formatted_place) - async def _async_build_from_advanced_options(self, curr_options): + async def _async_build_from_advanced_options(self, curr_options: str) -> None: # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [adv_options] Options: {curr_options}") if curr_options.count("[") != curr_options.count("]"): _LOGGER.error( - f"({self._get_attr(CONF_NAME)}) " - "[adv_options] Bracket Count Mismatch: " - f"{curr_options}" + "(%s) [adv_options] Bracket Count Mismatch: %s", + self._get_attr(CONF_NAME), + curr_options, ) return - elif curr_options.count("(") != curr_options.count(")"): + if curr_options.count("(") != curr_options.count(")"): _LOGGER.error( - f"({self._get_attr(CONF_NAME)}) " - "[adv_options] Parenthesis Count Mismatch: " - f"{curr_options}" + "(%s) [adv_options] Parenthesis Count Mismatch: %s", + self._get_attr(CONF_NAME), + curr_options, ) return - incl = [] - excl = [] - incl_attr = {} - excl_attr = {} + incl: list = [] + excl: list = [] + incl_attr: MutableMapping[str, Any] = {} + excl_attr: MutableMapping[str, Any] = {} none_opt = None next_opt = None if curr_options is None or not curr_options: return - elif "[" in curr_options or "(" in curr_options: + if "[" in curr_options or "(" in curr_options: # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [adv_options] Options has a [ or ( and optional ,") comma_num = curr_options.find(",") bracket_num = curr_options.find("[") @@ -1437,17 +1433,17 @@ async def _async_build_from_advanced_options(self, curr_options): if ret_state is not None and ret_state: self._adv_options_state_list.append(ret_state) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - "[adv_options] Updated state list: " - f"{self._adv_options_state_list}" + "(%s) [adv_options] Updated state list: %s", + self._get_attr(CONF_NAME), + self._adv_options_state_list, ) - next_opt = curr_options[(comma_num + 1):] + next_opt = curr_options[(comma_num + 1) :] # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [adv_options] Next Options: {next_opt}") if next_opt is not None and next_opt: await self._async_build_from_advanced_options(next_opt.strip()) # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [adv_options] Back from recursion") return - elif ( + if ( bracket_num != -1 and (comma_num == -1 or bracket_num < comma_num) and (paren_num == -1 or bracket_num < paren_num) @@ -1456,18 +1452,11 @@ async def _async_build_from_advanced_options(self, curr_options): # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [adv_options] Bracket is First") opt = curr_options[:bracket_num] # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [adv_options] Option: {opt}") - none_opt, next_opt = await self._async_parse_bracket( - curr_options[bracket_num:] - ) - if ( - next_opt is not None - and next_opt - and len(next_opt) > 1 - and next_opt[0] == "(" - ): + none_opt, next_opt = await self._async_parse_bracket(curr_options[bracket_num:]) + if next_opt is not None and next_opt and len(next_opt) > 1 and next_opt[0] == "(": # Parse Parenthesis - incl, excl, incl_attr, excl_attr, next_opt = ( - await self._async_parse_parens(next_opt) + incl, excl, incl_attr, excl_attr, next_opt = await self._async_parse_parens( + next_opt ) if opt is not None and opt: @@ -1477,27 +1466,22 @@ async def _async_build_from_advanced_options(self, curr_options): if ret_state is not None and ret_state: self._adv_options_state_list.append(ret_state) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - "[adv_options] Updated state list: " - f"{self._adv_options_state_list}" + "(%s) [adv_options] Updated state list: %s", + self._get_attr(CONF_NAME), + self._adv_options_state_list, ) elif none_opt is not None and none_opt: await self._async_build_from_advanced_options(none_opt.strip()) # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [adv_options] Back from recursion") - if ( - next_opt is not None - and next_opt - and len(next_opt) > 1 - and next_opt[0] == "," - ): + if next_opt is not None and next_opt and len(next_opt) > 1 and next_opt[0] == ",": next_opt = next_opt[1:] # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [adv_options] Next Options: {next_opt}") if next_opt is not None and next_opt: await self._async_build_from_advanced_options(next_opt.strip()) # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [adv_options] Back from recursion") return - elif ( + if ( paren_num != -1 and (comma_num == -1 or paren_num < comma_num) and (bracket_num == -1 or paren_num < bracket_num) @@ -1506,15 +1490,10 @@ async def _async_build_from_advanced_options(self, curr_options): # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [adv_options] Parenthesis is First") opt = curr_options[:paren_num] # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [adv_options] Option: {opt}") - incl, excl, incl_attr, excl_attr, next_opt = ( - await self._async_parse_parens(curr_options[paren_num:]) + incl, excl, incl_attr, excl_attr, next_opt = await self._async_parse_parens( + curr_options[paren_num:] ) - if ( - next_opt is not None - and next_opt - and len(next_opt) > 1 - and next_opt[0] == "[" - ): + if next_opt is not None and next_opt and len(next_opt) > 1 and next_opt[0] == "[": # Parse Bracket none_opt, next_opt = await self._async_parse_bracket(next_opt) @@ -1525,20 +1504,15 @@ async def _async_build_from_advanced_options(self, curr_options): if ret_state is not None and ret_state: self._adv_options_state_list.append(ret_state) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - "[adv_options] Updated state list: " - f"{self._adv_options_state_list}" + "(%s) [adv_options] Updated state list: %s", + self._get_attr(CONF_NAME), + self._adv_options_state_list, ) elif none_opt is not None and none_opt: await self._async_build_from_advanced_options(none_opt.strip()) # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [adv_options] Back from recursion") - if ( - next_opt is not None - and next_opt - and len(next_opt) > 1 - and next_opt[0] == "," - ): + if next_opt is not None and next_opt and len(next_opt) > 1 and next_opt[0] == ",": next_opt = next_opt[1:] # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [adv_options] Next Options: {next_opt}") if next_opt is not None and next_opt: @@ -1546,7 +1520,7 @@ async def _async_build_from_advanced_options(self, curr_options): # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [adv_options] Back from recursion") return return - elif "," in curr_options: + if "," in curr_options: # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [adv_options] Options has , but no [ or (, splitting") for opt in curr_options.split(","): if opt is not None and opt: @@ -1554,30 +1528,28 @@ async def _async_build_from_advanced_options(self, curr_options): if ret_state is not None and ret_state: self._adv_options_state_list.append(ret_state) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - "[adv_options] Updated state list: " - f"{self._adv_options_state_list}" + "(%s) [adv_options] Updated state list: %s", + self._get_attr(CONF_NAME), + self._adv_options_state_list, ) return - else: - # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [adv_options] Options should just be a single term") - ret_state = await self._async_get_option_state(curr_options.strip()) - if ret_state is not None and ret_state: - self._adv_options_state_list.append(ret_state) - _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - "[adv_options] Updated state list: " - f"{self._adv_options_state_list}" - ) - return + # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [adv_options] Options should just be a single term") + ret_state = await self._async_get_option_state(curr_options.strip()) + if ret_state is not None and ret_state: + self._adv_options_state_list.append(ret_state) + _LOGGER.debug( + "(%s) [adv_options] Updated state list: %s", + self._get_attr(CONF_NAME), + self._adv_options_state_list, + ) return - async def _async_parse_parens(self, curr_options): - incl = [] - excl = [] - incl_attr = {} - excl_attr = {} - incl_excl_list = [] + async def _async_parse_parens(self, curr_options: str): + incl: list = [] + excl: list = [] + incl_attr: MutableMapping[str, Any] = {} + excl_attr: MutableMapping[str, Any] = {} + incl_excl_list: list = [] empty_paren = False next_opt = None paren_count = 1 @@ -1590,8 +1562,8 @@ async def _async_parse_parens(self, curr_options): close_paren_num = 0 else: for i, c in enumerate(curr_options): - if c in [",", ")"] and paren_count == 1: - incl_excl_list.append(curr_options[(last_comma + 1): i].strip()) + if c in {",", ")"} and paren_count == 1: + incl_excl_list.append(curr_options[(last_comma + 1) : i].strip()) last_comma = i if c == "(": paren_count += 1 @@ -1619,23 +1591,18 @@ async def _async_parse_parens(self, curr_options): # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [parse_parens] item: {item}") if item is not None and item: if "(" in item: - if ( - ")" not in item - or item.count("(") > 1 - or item.count(")") > 1 - ): + if ")" not in item or item.count("(") > 1 or item.count(")") > 1: _LOGGER.error( - f"({self._get_attr(CONF_NAME)}) " - f"[parse_parens] Parenthesis Mismatch: {item}" + "(%s) [parse_parens] Parenthesis Mismatch: %s", + self._get_attr(CONF_NAME), + item, ) continue paren_attr = item[: item.find("(")] paren_attr_first = True paren_attr_incl = True paren_attr_list = [] - for attr_item in item[ - (item.find("(") + 1): item.find(")") - ].split(","): + for attr_item in item[(item.find("(") + 1) : item.find(")")].split(","): if paren_attr_first: paren_attr_first = False if attr_item == "-": @@ -1659,14 +1626,15 @@ async def _async_parse_parens(self, curr_options): elif not empty_paren: _LOGGER.error( - f"({self._get_attr(CONF_NAME)}) " - f"[parse_parens] Parenthesis Mismatch: {curr_options}" + "(%s) [parse_parens] Parenthesis Mismatch: %s", + self._get_attr(CONF_NAME), + curr_options, ) - next_opt = curr_options[(close_paren_num + 1):] + next_opt = curr_options[(close_paren_num + 1) :] # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [parse_parens] Raw Next Options: {next_opt}") return incl, excl, incl_attr, excl_attr, next_opt - async def _async_parse_bracket(self, curr_options): + async def _async_parse_bracket(self, curr_options: str): # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [parse_bracket] Options: {curr_options}") empty_bracket = False none_opt = None @@ -1692,49 +1660,59 @@ async def _async_parse_bracket(self, curr_options): if empty_bracket or (close_bracket_num > 0 and bracket_count == 0): none_opt = curr_options[:close_bracket_num].strip() # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [parse_bracket] None Options: {none_opt}") - next_opt = curr_options[(close_bracket_num + 1):].strip() + next_opt = curr_options[(close_bracket_num + 1) :].strip() # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [parse_bracket] Raw Next Options: {next_opt}") else: _LOGGER.error( - f"({self._get_attr(CONF_NAME)}) " - f"[parse_bracket] Bracket Mismatch Error: {curr_options}" + "(%s) [parse_bracket] Bracket Mismatch Error: %s", + self._get_attr(CONF_NAME), + curr_options, ) return none_opt, next_opt async def _async_get_option_state( - self, opt, incl=None, excl=None, incl_attr=None, excl_attr=None - ): + self, + opt: str, + incl: list | None = None, + excl: list | None = None, + incl_attr: MutableMapping[str, Any] | None = None, + excl_attr: MutableMapping[str, Any] | None = None, + ) -> str | None: incl = [] if incl is None else incl excl = [] if excl is None else excl incl_attr = {} if incl_attr is None else incl_attr excl_attr = {} if excl_attr is None else excl_attr - if opt is not None and opt: + if opt: opt = str(opt).lower().strip() # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [get_option_state] Option: {opt}") - out = self._get_attr(DISPLAY_OPTIONS_MAP.get(opt)) + out: str | None = self._get_attr(DISPLAY_OPTIONS_MAP.get(opt)) if ( - DISPLAY_OPTIONS_MAP.get(opt) - in [ATTR_DEVICETRACKER_ZONE, ATTR_DEVICETRACKER_ZONE_NAME] + DISPLAY_OPTIONS_MAP.get(opt) in {ATTR_DEVICETRACKER_ZONE, ATTR_DEVICETRACKER_ZONE_NAME} and not await self._async_in_zone() ): out = None - _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [get_option_state] State: {out}") + _LOGGER.debug("(%s) [get_option_state] State: %s", self._get_attr(CONF_NAME), out) # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [get_option_state] incl list: {incl}") # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [get_option_state] excl list: {excl}") # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [get_option_state] incl_attr dict: {incl_attr}") # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [get_option_state] excl_attr dict: {excl_attr}") - if out is not None and out: - if incl and str(out).strip().lower() not in incl: - out = None - elif excl and str(out).strip().lower() in excl: + if out: + if ( + incl + and str(out).strip().lower() not in incl + or excl + and str(out).strip().lower() in excl + ): out = None if incl_attr: for attr, states in incl_attr.items(): # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [get_option_state] incl_attr: {attr} / State: {self._get_attr(DISPLAY_OPTIONS_MAP.get(attr))}") # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [get_option_state] incl_states: {states}") + map_attr: str | None = DISPLAY_OPTIONS_MAP.get(attr) if ( - self._is_attr_blank(DISPLAY_OPTIONS_MAP.get(attr)) - or self._get_attr(DISPLAY_OPTIONS_MAP.get(attr)) not in states + not map_attr + or self._is_attr_blank(map_attr) + or self._get_attr(map_attr) not in states ): out = None if excl_attr: @@ -1744,10 +1722,9 @@ async def _async_get_option_state( if self._get_attr(DISPLAY_OPTIONS_MAP.get(attr)) in states: out = None _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - f"[get_option_state] State after incl/excl: {out}" + "(%s) [get_option_state] State after incl/excl: %s", self._get_attr(CONF_NAME), out ) - if out is not None and out: + if out: if out == out.lower() and ( DISPLAY_OPTIONS_MAP.get(opt) == ATTR_DEVICETRACKER_ZONE_NAME or DISPLAY_OPTIONS_MAP.get(opt) == ATTR_PLACE_TYPE @@ -1766,10 +1743,9 @@ async def _async_get_option_state( # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) [get_option_state] street_num_i: {self._street_num_i}") self._temp_i += 1 return out - else: - return None + return None - async def _async_compile_state_from_advanced_options(self): + async def _async_compile_state_from_advanced_options(self) -> None: self._street_num_i += 1 first = True for i, out in enumerate(self._adv_options_state_list): @@ -1795,39 +1771,35 @@ async def _async_compile_state_from_advanced_options(self): ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) New State from Advanced Display Options: " - f"{self._get_attr(ATTR_NATIVE_VALUE)}" + "(%s) New State from Advanced Display Options: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_NATIVE_VALUE), ) - async def _async_build_state_from_display_options(self): + async def _async_build_state_from_display_options(self) -> None: # Options: "formatted_place, driving, zone, zone_name, place_name, place, street_number, street, city, county, state, postal_code, country, formatted_address, do_not_show_not_home" display_options = self._get_attr(ATTR_DISPLAY_OPTIONS_LIST) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Building State from Display Options: " - f"{self._get_attr(ATTR_DISPLAY_OPTIONS)}" + "(%s) Building State from Display Options: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_DISPLAY_OPTIONS), ) - user_display = [] + user_display: list = [] if "driving" in display_options and not self._is_attr_blank(ATTR_DRIVING): user_display.append(self._get_attr(ATTR_DRIVING)) if ( "zone_name" in display_options and not self._is_attr_blank(ATTR_DEVICETRACKER_ZONE_NAME) - and ( - await self._async_in_zone() - or "do_not_show_not_home" not in display_options - ) + and (await self._async_in_zone() or "do_not_show_not_home" not in display_options) ): user_display.append(self._get_attr(ATTR_DEVICETRACKER_ZONE_NAME)) elif ( "zone" in display_options and not self._is_attr_blank(ATTR_DEVICETRACKER_ZONE) - and ( - await self._async_in_zone() - or "do_not_show_not_home" not in display_options - ) + and (await self._async_in_zone() or "do_not_show_not_home" not in display_options) ): user_display.append(self._get_attr(ATTR_DEVICETRACKER_ZONE)) @@ -1855,9 +1827,7 @@ async def _async_build_state_from_display_options(self): if not self._is_attr_blank(ATTR_STREET): user_display.append(self._get_attr(ATTR_STREET)) else: - if "street_number" in display_options and not self._is_attr_blank( - ATTR_STREET_NUMBER - ): + if "street_number" in display_options and not self._is_attr_blank(ATTR_STREET_NUMBER): user_display.append(self._get_attr(ATTR_STREET_NUMBER)) if "street" in display_options and not self._is_attr_blank(ATTR_STREET): user_display.append(self._get_attr(ATTR_STREET)) @@ -1865,13 +1835,14 @@ async def _async_build_state_from_display_options(self): user_display.append(self._get_attr(ATTR_CITY)) if "county" in display_options and not self._is_attr_blank(ATTR_COUNTY): user_display.append(self._get_attr(ATTR_COUNTY)) - if "state" in display_options and not self._is_attr_blank(ATTR_REGION): - user_display.append(self._get_attr(ATTR_REGION)) - elif "region" in display_options and not self._is_attr_blank(ATTR_REGION): - user_display.append(self._get_attr(ATTR_REGION)) - if "postal_code" in display_options and not self._is_attr_blank( - ATTR_POSTAL_CODE + if ( + "state" in display_options + and not self._is_attr_blank(ATTR_REGION) + or "region" in display_options + and not self._is_attr_blank(ATTR_REGION) ): + user_display.append(self._get_attr(ATTR_REGION)) + if "postal_code" in display_options and not self._is_attr_blank(ATTR_POSTAL_CODE): user_display.append(self._get_attr(ATTR_POSTAL_CODE)) if "country" in display_options and not self._is_attr_blank(ATTR_COUNTRY): user_display.append(self._get_attr(ATTR_COUNTRY)) @@ -1894,14 +1865,13 @@ async def _async_build_state_from_display_options(self): if user_display: self._set_attr(ATTR_NATIVE_VALUE, ", ".join(item for item in user_display)) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) New State from Display Options: " - f"{self._get_attr(ATTR_NATIVE_VALUE)}" + "(%s) New State from Display Options: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_NATIVE_VALUE), ) - async def _async_get_extended_attr(self): - if not self._is_attr_blank(ATTR_OSM_ID) and not self._is_attr_blank( - ATTR_OSM_TYPE - ): + async def _async_get_extended_attr(self) -> None: + if not self._is_attr_blank(ATTR_OSM_ID) and not self._is_attr_blank(ATTR_OSM_TYPE): if (self._get_attr(ATTR_OSM_TYPE)).lower() == "node": osm_type_abbr = "N" elif (self._get_attr(ATTR_OSM_TYPE)).lower() == "way": @@ -1909,14 +1879,16 @@ async def _async_get_extended_attr(self): elif (self._get_attr(ATTR_OSM_TYPE)).lower() == "relation": osm_type_abbr = "R" - osm_details_url = ( + osm_details_url: str = ( "https://nominatim.openstreetmap.org/lookup?osm_ids=" f"{osm_type_abbr}{self._get_attr(ATTR_OSM_ID)}" "&format=json&addressdetails=1&extratags=1&namedetails=1" - f"&email={self._get_attr(CONF_API_KEY) if not self._is_attr_blank( - CONF_API_KEY) else ''}" - f"&accept-language={self._get_attr(CONF_LANGUAGE) - if not self._is_attr_blank(CONF_LANGUAGE) else ''}" + f"&email={ + self._get_attr(CONF_API_KEY) if not self._is_attr_blank(CONF_API_KEY) else '' + }" + f"&accept-language={ + self._get_attr(CONF_LANGUAGE) if not self._is_attr_blank(CONF_LANGUAGE) else '' + }" ) await self._hass.async_add_executor_job( self._get_dict_from_url, @@ -1931,26 +1903,21 @@ async def _async_get_extended_attr(self): if ( not self._is_attr_blank(ATTR_OSM_DETAILS_DICT) and "extratags" in (self._get_attr(ATTR_OSM_DETAILS_DICT)) - and (self._get_attr(ATTR_OSM_DETAILS_DICT)).get("extratags") - is not None - and "wikidata" - in (self._get_attr(ATTR_OSM_DETAILS_DICT)).get("extratags") - and (self._get_attr(ATTR_OSM_DETAILS_DICT)) - .get("extratags") - .get("wikidata") + and (self._get_attr(ATTR_OSM_DETAILS_DICT)).get("extratags") is not None + and "wikidata" in (self._get_attr(ATTR_OSM_DETAILS_DICT)).get("extratags") + and (self._get_attr(ATTR_OSM_DETAILS_DICT)).get("extratags").get("wikidata") is not None ): self._set_attr( ATTR_WIKIDATA_ID, - (self._get_attr(ATTR_OSM_DETAILS_DICT)) - .get("extratags") - .get("wikidata"), + (self._get_attr(ATTR_OSM_DETAILS_DICT)).get("extratags").get("wikidata"), ) self._set_attr(ATTR_WIKIDATA_DICT, {}) if not self._is_attr_blank(ATTR_WIKIDATA_ID): wikidata_url = f"https://www.wikidata.org/wiki/Special:EntityData/{ - self._get_attr(ATTR_WIKIDATA_ID)}.json" + self._get_attr(ATTR_WIKIDATA_ID) + }.json" await self._hass.async_add_executor_job( self._get_dict_from_url, wikidata_url, @@ -1958,9 +1925,9 @@ async def _async_get_extended_attr(self): ATTR_WIKIDATA_DICT, ) - async def _async_fire_event_data(self, prev_last_place_name): - _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) Building Event Data") - event_data = {} + async def _async_fire_event_data(self, prev_last_place_name: str) -> None: + _LOGGER.debug("(%s) Building Event Data", self._get_attr(CONF_NAME)) + event_data: MutableMapping[str, Any] = {} if not self._is_attr_blank(CONF_NAME): event_data.update({"entity": self._get_attr(CONF_NAME)}) if not self._is_attr_blank(ATTR_PREVIOUS_STATE): @@ -1976,9 +1943,7 @@ async def _async_fire_event_data(self, prev_last_place_name): not self._is_attr_blank(ATTR_LAST_PLACE_NAME) and self._get_attr(ATTR_LAST_PLACE_NAME) != prev_last_place_name ): - event_data.update( - {ATTR_LAST_PLACE_NAME: self._get_attr(ATTR_LAST_PLACE_NAME)} - ) + event_data.update({ATTR_LAST_PLACE_NAME: self._get_attr(ATTR_LAST_PLACE_NAME)}) if self._get_attr(CONF_EXTENDED_ATTR): for attr in EXTENDED_ATTRIBUTE_LIST: @@ -1987,46 +1952,53 @@ async def _async_fire_event_data(self, prev_last_place_name): self._hass.bus.fire(EVENT_TYPE, event_data) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Event Details [event_type: " - f"{DOMAIN}_state_update]: {event_data}" + "(%s) Event Details [event_type: %s_state_update]: %s", + self._get_attr(CONF_NAME), + DOMAIN, + event_data, ) _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) Event Fired [event_type: " - f"{DOMAIN}_state_update]" + "(%s) Event Fired [event_type: %s_state_update]", self._get_attr(CONF_NAME), DOMAIN ) - def _write_sensor_to_json(self, name, filename): - sensor_attributes = copy.deepcopy(self._internal_attr) - for k, v in list(sensor_attributes.items()): + def _write_sensor_to_json(self, name: str, filename: str) -> None: + sensor_attributes: MutableMapping[str, Any] = copy.deepcopy(self._internal_attr) + for k, v in sensor_attributes.items(): if isinstance(v, (datetime)): # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) Removing Sensor Attribute: {k}") sensor_attributes.pop(k) # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) Sensor Attributes to Save: {sensor_attributes}") try: - with open( - os.path.join(PLACES_JSON_FOLDER, filename), - "w", - ) as jsonfile: + json_file_path: Path = Path(self._json_folder) / filename + with json_file_path.open("w") as jsonfile: json.dump(sensor_attributes, jsonfile) except OSError as e: _LOGGER.debug( - f"({name}) OSError writing sensor to JSON " - f"({filename}): {e.__class__.__qualname__}: {e}" - ) - except Exception as e: - _LOGGER.debug( - f"({name}) Unknown Exception writing sensor to JSON " - f"({filename}): {e.__class__.__qualname__}: {e}" - ) - - async def _async_get_initial_last_place_name(self): + "(%s) OSError writing sensor to JSON (%s): %s: %s", + name, + filename, + e.__class__.__qualname__, + e, + ) + # except Exception as e: + # _LOGGER.debug( + # "(%s) Unknown Exception writing sensor to JSON (%s): %s: %s", + # name, + # filename, + # e.__class__.__qualname__, + # e, + # ) + + async def _async_get_initial_last_place_name(self) -> None: _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Previous State: " - f"{self._get_attr(ATTR_PREVIOUS_STATE)}" + "(%s) Previous State: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_PREVIOUS_STATE), ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Previous last_place_name: " - f"{self._get_attr(ATTR_LAST_PLACE_NAME)}" + "(%s) Previous last_place_name: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_LAST_PLACE_NAME), ) if not await self._async_in_zone(): @@ -2035,13 +2007,14 @@ async def _async_get_initial_last_place_name(self): # If place name is set self._set_attr(ATTR_LAST_PLACE_NAME, self._get_attr(ATTR_PLACE_NAME)) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Previous place is Place Name, " - f"last_place_name is set: {self._get_attr(ATTR_LAST_PLACE_NAME)}" + "(%s) Previous place is Place Name, last_place_name is set: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_LAST_PLACE_NAME), ) else: # If blank, keep previous last_place_name _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Previous Place Name is None, keeping prior" + "(%s) Previous Place Name is None, keeping prior", self._get_attr(CONF_NAME) ) else: # Previously In a Zone @@ -2050,22 +2023,22 @@ async def _async_get_initial_last_place_name(self): self._get_attr(ATTR_DEVICETRACKER_ZONE_NAME), ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Previous Place is Zone: " - f"{self._get_attr(ATTR_LAST_PLACE_NAME)}" + "(%s) Previous Place is Zone: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_LAST_PLACE_NAME), ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) last_place_name (Initial): " - f"{self._get_attr(ATTR_LAST_PLACE_NAME)}" + "(%s) last_place_name (Initial): %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_LAST_PLACE_NAME), ) - async def _async_update_coordinates_and_distance(self): + async def _async_update_coordinates_and_distance(self) -> int: last_distance_traveled_m = self._get_attr(ATTR_DISTANCE_FROM_HOME_M) proceed_with_update = 1 # 0: False. 1: True. 2: False, but set direction of travel to stationary - if not self._is_attr_blank(ATTR_LATITUDE) and not self._is_attr_blank( - ATTR_LONGITUDE - ): + if not self._is_attr_blank(ATTR_LATITUDE) and not self._is_attr_blank(ATTR_LONGITUDE): self._set_attr( ATTR_LOCATION_CURRENT, f"{self._get_attr(ATTR_LATITUDE)},{self._get_attr(ATTR_LONGITUDE)}", @@ -2075,16 +2048,14 @@ async def _async_update_coordinates_and_distance(self): ): self._set_attr( ATTR_LOCATION_PREVIOUS, - f"{self._get_attr(ATTR_LATITUDE_OLD)}," - f"{self._get_attr(ATTR_LONGITUDE_OLD)}", + f"{self._get_attr(ATTR_LATITUDE_OLD)},{self._get_attr(ATTR_LONGITUDE_OLD)}", ) if not self._is_attr_blank(ATTR_HOME_LATITUDE) and not self._is_attr_blank( ATTR_HOME_LONGITUDE ): self._set_attr( ATTR_HOME_LOCATION, - f"{self._get_attr(ATTR_HOME_LATITUDE)}," - f"{self._get_attr(ATTR_HOME_LONGITUDE)}", + f"{self._get_attr(ATTR_HOME_LATITUDE)},{self._get_attr(ATTR_HOME_LONGITUDE)}", ) if ( @@ -2138,9 +2109,7 @@ async def _async_update_coordinates_and_distance(self): # elif last_distance_traveled_m > self._get_attr(ATTR_DISTANCE_FROM_HOME_M): if last_distance_traveled_m > self._get_attr(ATTR_DISTANCE_FROM_HOME_M): self._set_attr(ATTR_DIRECTION_OF_TRAVEL, "towards home") - elif last_distance_traveled_m < self._get_attr( - ATTR_DISTANCE_FROM_HOME_M - ): + elif last_distance_traveled_m < self._get_attr(ATTR_DISTANCE_FROM_HOME_M): self._set_attr(ATTR_DIRECTION_OF_TRAVEL, "away from home") else: self._set_attr(ATTR_DIRECTION_OF_TRAVEL, "stationary") @@ -2150,147 +2119,153 @@ async def _async_update_coordinates_and_distance(self): self._set_attr(ATTR_DISTANCE_TRAVELED_MI, 0) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Previous Location: " - f"{self._get_attr(ATTR_LOCATION_PREVIOUS)}" + "(%s) Previous Location: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_LOCATION_PREVIOUS), ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Current Location: " - f"{self._get_attr(ATTR_LOCATION_CURRENT)}" + "(%s) Current Location: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_LOCATION_CURRENT), ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Home Location: " - f"{self._get_attr(ATTR_HOME_LOCATION)}" + "(%s) Home Location: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_HOME_LOCATION), ) _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) Distance from home " - f"[{(self._get_attr(CONF_HOME_ZONE)).split('.')[1]}]: " - f"{self._get_attr(ATTR_DISTANCE_FROM_HOME_KM)} km" + "(%s) Distance from home [%s]: %s km", + self._get_attr(CONF_NAME), + (self._get_attr(CONF_HOME_ZONE)).split(".")[1], + self._get_attr(ATTR_DISTANCE_FROM_HOME_KM), ) _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) Travel Direction: " - f"{self._get_attr(ATTR_DIRECTION_OF_TRAVEL)}" + "(%s) Travel Direction: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_DIRECTION_OF_TRAVEL), ) _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) Meters traveled since last update: " - f"{round(self._get_attr(ATTR_DISTANCE_TRAVELED_M), 1)}" + "(%s) Meters traveled since last update: %s", + self._get_attr(CONF_NAME), + round(self._get_attr(ATTR_DISTANCE_TRAVELED_M), 1), ) else: proceed_with_update = 0 # 0: False. 1: True. 2: False, but set direction of travel to stationary _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) " - "Problem with updated lat/long, not performing update: " - f"old_latitude={self._get_attr(ATTR_LATITUDE_OLD)}, " - f"old_longitude={self._get_attr(ATTR_LONGITUDE_OLD)}, " - f"new_latitude={self._get_attr(ATTR_LATITUDE)}, " - f"new_longitude={self._get_attr(ATTR_LONGITUDE)}, " - f"home_latitude={self._get_attr(ATTR_HOME_LATITUDE)}, " - f"home_longitude={self._get_attr(ATTR_HOME_LONGITUDE)}" + "(%s) Problem with updated lat/long, not performing update: " + "old_latitude=%s, old_longitude=%s, " + "new_latitude=%s, new_longitude=%s, " + "home_latitude=%s, home_longitude=%s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_LATITUDE_OLD), + self._get_attr(ATTR_LONGITUDE_OLD), + self._get_attr(ATTR_LATITUDE), + self._get_attr(ATTR_LONGITUDE), + self._get_attr(ATTR_HOME_LATITUDE), + self._get_attr(ATTR_HOME_LONGITUDE), ) return proceed_with_update - async def _async_finalize_last_place_name(self, prev_last_place_name=None): + async def _async_finalize_last_place_name(self, prev_last_place_name: str) -> None: if self._get_attr(ATTR_INITIAL_UPDATE): self._set_attr(ATTR_LAST_PLACE_NAME, prev_last_place_name) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - "Runnining initial update after load, using prior last_place_name" + "(%s) Runnining initial update after load, using prior last_place_name", + self._get_attr(CONF_NAME), ) elif self._get_attr(ATTR_LAST_PLACE_NAME) == self._get_attr( ATTR_PLACE_NAME - ) or self._get_attr(ATTR_LAST_PLACE_NAME) == self._get_attr( - ATTR_DEVICETRACKER_ZONE_NAME - ): + ) or self._get_attr(ATTR_LAST_PLACE_NAME) == self._get_attr(ATTR_DEVICETRACKER_ZONE_NAME): # If current place name/zone are the same as previous, keep older last_place_name self._set_attr(ATTR_LAST_PLACE_NAME, prev_last_place_name) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - "Initial last_place_name is same as new: place_name=" - f"{self._get_attr(ATTR_PLACE_NAME)} or devicetracker_zone_name=" - f"{self._get_attr(ATTR_DEVICETRACKER_ZONE_NAME)}, " - "keeping previous last_place_name" + "(%s) Initial last_place_name is same as new: place_name=%s or devicetracker_zone_name=%s, " + "keeping previous last_place_name", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_PLACE_NAME), + self._get_attr(ATTR_DEVICETRACKER_ZONE_NAME), ) else: - _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Keeping initial last_place_name" - ) + _LOGGER.debug("(%s) Keeping initial last_place_name", self._get_attr(CONF_NAME)) _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) last_place_name: " - f"{self._get_attr(ATTR_LAST_PLACE_NAME)}" + "(%s) last_place_name: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_LAST_PLACE_NAME), ) - async def _async_do_update(self, reason): + async def _async_do_update(self, reason: str) -> None: """Get the latest data and updates the states.""" _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) Starting {reason} Update (Tracked Entity: " - f"{self._get_attr(CONF_DEVICETRACKER_ID)})" + "(%s) Starting %s Update (Tracked Entity: %s)", + self._get_attr(CONF_NAME), + reason, + self._get_attr(CONF_DEVICETRACKER_ID), ) if self._hass.config.time_zone is not None: - now = datetime.now(tz=ZoneInfo(str(self._hass.config.time_zone))) + now: datetime = datetime.now(tz=ZoneInfo(str(self._hass.config.time_zone))) else: now = datetime.now() - previous_attr = copy.deepcopy(self._internal_attr) + previous_attr: MutableMapping[str, Any] = copy.deepcopy(self._internal_attr) await self._async_check_for_updated_entity_name() await self._async_cleanup_attributes() # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) Previous entity attributes: {self._internal_attr}") - if not self._is_attr_blank(ATTR_NATIVE_VALUE) and self._get_attr( - CONF_SHOW_TIME - ): + if not self._is_attr_blank(ATTR_NATIVE_VALUE) and self._get_attr(CONF_SHOW_TIME): self._set_attr( ATTR_PREVIOUS_STATE, - await self._async_clear_since_from_state( - str(self._get_attr(ATTR_NATIVE_VALUE)) - ), + await Places._async_clear_since_from_state(str(self._get_attr(ATTR_NATIVE_VALUE))), ) else: self._set_attr(ATTR_PREVIOUS_STATE, self._get_attr(ATTR_NATIVE_VALUE)) - if self._is_float(self._get_attr(ATTR_LATITUDE)): + if _is_float(self._get_attr(ATTR_LATITUDE)): self._set_attr(ATTR_LATITUDE_OLD, str(self._get_attr(ATTR_LATITUDE))) - if self._is_float(self._get_attr(ATTR_LONGITUDE)): + if _is_float(self._get_attr(ATTR_LONGITUDE)): self._set_attr(ATTR_LONGITUDE_OLD, str(self._get_attr(ATTR_LONGITUDE))) prev_last_place_name = self._get_attr(ATTR_LAST_PLACE_NAME) - proceed_with_update = await self._async_is_devicetracker_set() + proceed_with_update: int = await self._async_is_devicetracker_set() # 0: False. 1: True. 2: False, but set direction of travel to stationary _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - f"[is_devicetracker_set] proceed_with_update: {proceed_with_update}" + "(%s) [is_devicetracker_set] proceed_with_update: %s", + self._get_attr(CONF_NAME), + proceed_with_update, ) if proceed_with_update == 1: # 0: False. 1: True. 2: False, but set direction of travel to stationary - if self._is_float( - self._hass.states.get( - self._get_attr(CONF_DEVICETRACKER_ID) - ).attributes.get(CONF_LATITUDE) + if _is_float( + self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).attributes.get( + CONF_LATITUDE + ) ): self._set_attr( ATTR_LATITUDE, str( - self._hass.states.get( - self._get_attr(CONF_DEVICETRACKER_ID) - ).attributes.get(CONF_LATITUDE) + self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).attributes.get( + CONF_LATITUDE + ) ), ) - if self._is_float( - self._hass.states.get( - self._get_attr(CONF_DEVICETRACKER_ID) - ).attributes.get(CONF_LONGITUDE) + if _is_float( + self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).attributes.get( + CONF_LONGITUDE + ) ): self._set_attr( ATTR_LONGITUDE, str( - self._hass.states.get( - self._get_attr(CONF_DEVICETRACKER_ID) - ).attributes.get(CONF_LONGITUDE) + self._hass.states.get(self._get_attr(CONF_DEVICETRACKER_ID)).attributes.get( + CONF_LONGITUDE + ) ), ) proceed_with_update = await self._async_get_gps_accuracy() _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - f"[is_devicetracker_set] proceed_with_update: {proceed_with_update}" + "(%s) [is_devicetracker_set] proceed_with_update: %s", + self._get_attr(CONF_NAME), + proceed_with_update, ) if proceed_with_update == 1: @@ -2299,44 +2274,48 @@ async def _async_do_update(self, reason): await self._async_get_zone_details() proceed_with_update = await self._async_update_coordinates_and_distance() _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - "[update_coordinates_and_distance] proceed_with_update: " - f"{proceed_with_update}" + "(%s) [update_coordinates_and_distance] proceed_with_update: %s", + self._get_attr(CONF_NAME), + proceed_with_update, ) if proceed_with_update == 1: # 0: False. 1: True. 2: False, but set direction of travel to stationary proceed_with_update = await self._async_determine_if_update_needed() _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - f"[determine_if_update_needed] proceed_with_update: { - proceed_with_update}" + "(%s) [determine_if_update_needed] proceed_with_update: %s", + self._get_attr(CONF_NAME), + proceed_with_update, ) if proceed_with_update == 1: # 0: False. 1: True. 2: False, but set direction of travel to stationary _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) Meets criteria, proceeding with OpenStreetMap query" + "(%s) Meets criteria, proceeding with OpenStreetMap query", + self._get_attr(CONF_NAME), ) _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) Tracked Entity Zone: " - f"{self._get_attr(ATTR_DEVICETRACKER_ZONE)}" + "(%s) Tracked Entity Zone: %s", # f" / Skipped Updates: {self._get_attr(ATTR_UPDATES_SKIPPED)}" + self._get_attr(CONF_NAME), + self._get_attr(ATTR_DEVICETRACKER_ZONE), ) await self._async_reset_attributes() await self._async_get_map_link() - osm_url = ( + osm_url: str = ( "https://nominatim.openstreetmap.org/reverse?format=jsonv2&lat=" f"{self._get_attr(ATTR_LATITUDE)}" f"&lon={self._get_attr(ATTR_LONGITUDE)}" - f"&accept-language={self._get_attr(CONF_LANGUAGE) - if not self._is_attr_blank(CONF_LANGUAGE) else ''}" + f"&accept-language={ + self._get_attr(CONF_LANGUAGE) if not self._is_attr_blank(CONF_LANGUAGE) else '' + }" "&addressdetails=1&namedetails=1&zoom=18&limit=1" - f"&email={self._get_attr(CONF_API_KEY) if not self._is_attr_blank( - CONF_API_KEY) else ''}" + f"&email={ + self._get_attr(CONF_API_KEY) if not self._is_attr_blank(CONF_API_KEY) else '' + }" ) await self._hass.async_add_executor_job( @@ -2347,11 +2326,11 @@ async def _async_do_update(self, reason): await self._async_parse_osm_dict() await self._async_finalize_last_place_name(prev_last_place_name) - display_options = [] + display_options: list = [] if not self._is_attr_blank(ATTR_DISPLAY_OPTIONS): options_array = (self._get_attr(ATTR_DISPLAY_OPTIONS)).split(",") for option in options_array: - display_options.append(option.strip()) + display_options.extend([option.strip()]) self._set_attr(ATTR_DISPLAY_OPTIONS_LIST, display_options) await self._async_get_driving_status() @@ -2363,13 +2342,13 @@ async def _async_do_update(self, reason): self._get_attr(ATTR_FORMATTED_PLACE), ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) New State using formatted_place: " - f"{self._get_attr(ATTR_NATIVE_VALUE)}" + "(%s) New State using formatted_place: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_NATIVE_VALUE), ) elif any( - ext in (self._get_attr(ATTR_DISPLAY_OPTIONS)) - for ext in ["(", ")", "[", "]"] + ext in (self._get_attr(ATTR_DISPLAY_OPTIONS)) for ext in ["(", ")", "[", "]"] ): # Replace place option with expanded definition # temp_opt = self._get_attr(ATTR_DISPLAY_OPTIONS) @@ -2380,14 +2359,15 @@ async def _async_do_update(self, reason): # ) # self._set_attr(ATTR_DISPLAY_OPTIONS, temp_opt) self._clear_attr(ATTR_DISPLAY_OPTIONS_LIST) - display_options = None + display_options = [] self._adv_options_state_list = [] self._street_num_i = -1 self._street_i = -1 self._temp_i = 0 _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) Initial Advanced Display Options: " - f"{self._get_attr(ATTR_DISPLAY_OPTIONS)}" + "(%s) Initial Advanced Display Options: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_DISPLAY_OPTIONS), ) await self._async_build_from_advanced_options( @@ -2401,16 +2381,16 @@ async def _async_do_update(self, reason): elif not await self._async_in_zone(): await self._async_build_state_from_display_options() elif ( - "zone" in display_options - and not self._is_attr_blank(ATTR_DEVICETRACKER_ZONE) + "zone" in display_options and not self._is_attr_blank(ATTR_DEVICETRACKER_ZONE) ) or self._is_attr_blank(ATTR_DEVICETRACKER_ZONE_NAME): self._set_attr( ATTR_NATIVE_VALUE, self._get_attr(ATTR_DEVICETRACKER_ZONE), ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) New State from Tracked Entity Zone: " - f"{self._get_attr(ATTR_NATIVE_VALUE)}" + "(%s) New State from Tracked Entity Zone: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_NATIVE_VALUE), ) elif not self._is_attr_blank(ATTR_DEVICETRACKER_ZONE_NAME): self._set_attr( @@ -2418,13 +2398,12 @@ async def _async_do_update(self, reason): self._get_attr(ATTR_DEVICETRACKER_ZONE_NAME), ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) New State from Tracked Entity Zone Name: " - f"{self._get_attr(ATTR_NATIVE_VALUE)}" + "(%s) New State from Tracked Entity Zone Name: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_NATIVE_VALUE), ) - current_time = f"{now.hour:02}:{now.minute:02}" - self._set_attr( - ATTR_LAST_CHANGED, str(now.isoformat(sep=" ", timespec="seconds")) - ) + current_time: str = f"{now.hour:02}:{now.minute:02}" + self._set_attr(ATTR_LAST_CHANGED, str(now.isoformat(sep=" ", timespec="seconds"))) # Final check to see if the New State is different from the Previous State and should update or not. # If not, attributes are reset to what they were before the update started. @@ -2435,10 +2414,7 @@ async def _async_do_update(self, reason): and not self._is_attr_blank(ATTR_NATIVE_VALUE) and (self._get_attr(ATTR_PREVIOUS_STATE)).lower().strip() != (self._get_attr(ATTR_NATIVE_VALUE)).lower().strip() - and (self._get_attr(ATTR_PREVIOUS_STATE)) - .replace(" ", "") - .lower() - .strip() + and (self._get_attr(ATTR_PREVIOUS_STATE)).replace(" ", "").lower().strip() != (self._get_attr(ATTR_NATIVE_VALUE)).lower().strip() and self._get_attr(ATTR_PREVIOUS_STATE).lower().strip() != (self._get_attr(ATTR_DEVICETRACKER_ZONE)).lower().strip() @@ -2456,7 +2432,7 @@ async def _async_do_update(self, reason): self._set_attr( ATTR_NATIVE_VALUE, str( - await self._async_clear_since_from_state( + await Places._async_clear_since_from_state( str(self._get_attr(ATTR_NATIVE_VALUE)) ) )[: 255 - 14] @@ -2470,14 +2446,13 @@ async def _async_do_update(self, reason): self._get_attr(ATTR_NATIVE_VALUE)[:255], ) _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) New State: " - f"{self._get_attr(ATTR_NATIVE_VALUE)}" + "(%s) New State: %s", + self._get_attr(CONF_NAME), + self._get_attr(ATTR_NATIVE_VALUE), ) else: self._clear_attr(ATTR_NATIVE_VALUE) - _LOGGER.warning( - f"({self._get_attr(CONF_NAME)}) New State is None" - ) + _LOGGER.warning("(%s) New State is None", self._get_attr(CONF_NAME)) if not self._is_attr_blank(ATTR_NATIVE_VALUE): self._attr_native_value = self._get_attr(ATTR_NATIVE_VALUE) else: @@ -2492,24 +2467,20 @@ async def _async_do_update(self, reason): else: self._internal_attr = previous_attr _LOGGER.info( - f"({self._get_attr(CONF_NAME)}) " - "No entity update needed, Previous State = New State" + "(%s) No entity update needed, Previous State = New State", + self._get_attr(CONF_NAME), ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - "Reverting attributes back to before the update started" + "(%s) Reverting attributes back to before the update started", + self._get_attr(CONF_NAME), ) - changed_diff_sec = await self._async_get_seconds_from_last_change( - now - ) + changed_diff_sec: int = await self._async_get_seconds_from_last_change(now) if ( self._get_attr(ATTR_DIRECTION_OF_TRAVEL) != "stationary" and changed_diff_sec >= 60 ): - await self._async_change_dot_to_stationary( - now, changed_diff_sec - ) + await self._async_change_dot_to_stationary(now, changed_diff_sec) if ( self._get_attr(CONF_SHOW_TIME) and changed_diff_sec >= 86399 @@ -2519,8 +2490,8 @@ async def _async_do_update(self, reason): else: self._internal_attr = previous_attr _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - "Reverting attributes back to before the update started" + "(%s) Reverting attributes back to before the update started", + self._get_attr(CONF_NAME), ) changed_diff_sec = await self._async_get_seconds_from_last_change(now) @@ -2538,32 +2509,26 @@ async def _async_do_update(self, reason): ): await self._async_change_show_time_to_date() - self._set_attr( - ATTR_LAST_UPDATED, str(now.isoformat(sep=" ", timespec="seconds")) - ) + self._set_attr(ATTR_LAST_UPDATED, str(now.isoformat(sep=" ", timespec="seconds"))) # _LOGGER.debug(f"({self._get_attr(CONF_NAME)}) Final entity attributes: {await self._async__internal_attr}") - _LOGGER.info(f"({self._get_attr(CONF_NAME)}) End of Update") + _LOGGER.info("(%s) End of Update", self._get_attr(CONF_NAME)) - async def _async_change_dot_to_stationary(self, now, changed_diff_sec): + async def _async_change_dot_to_stationary(self, now: datetime, changed_diff_sec: int) -> None: self._set_attr(ATTR_DIRECTION_OF_TRAVEL, "stationary") - self._set_attr( - ATTR_LAST_CHANGED, str(now.isoformat(sep=" ", timespec="seconds")) - ) + self._set_attr(ATTR_LAST_CHANGED, str(now.isoformat(sep=" ", timespec="seconds"))) await self._hass.async_add_executor_job( self._write_sensor_to_json, self._get_attr(CONF_NAME), self._get_attr(ATTR_JSON_FILENAME), ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - "Updating direction of travel to stationary (Last changed " - f"{int(changed_diff_sec)} seconds ago)" + "(%s) Updating direction of travel to stationary (Last changed %s seconds ago)", + self._get_attr(CONF_NAME), + int(changed_diff_sec), ) - async def _async_change_show_time_to_date(self): - if not self._is_attr_blank(ATTR_NATIVE_VALUE) and self._get_attr( - CONF_SHOW_TIME - ): + async def _async_change_show_time_to_date(self) -> None: + if not self._is_attr_blank(ATTR_NATIVE_VALUE) and self._get_attr(CONF_SHOW_TIME): # localedate = str(locale.nl_langinfo(locale.D_FMT)).replace(" ", "") # if localedate.lower().endswith("%y"): # localemmdd = localedate[:-3] @@ -2581,8 +2546,7 @@ async def _async_change_show_time_to_date(self): ) self._set_attr( ATTR_NATIVE_VALUE, - f"{await self._async_clear_since_from_state(str(self._get_attr(ATTR_NATIVE_VALUE)))}" - + f" (since {mmddstring})", + f"{await Places._async_clear_since_from_state(str(self._get_attr(ATTR_NATIVE_VALUE)))} (since {mmddstring})", ) if not self._is_attr_blank(ATTR_NATIVE_VALUE): @@ -2596,24 +2560,23 @@ async def _async_change_show_time_to_date(self): self._get_attr(ATTR_JSON_FILENAME), ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) " - "Updating state to show date instead of time since last change" + "(%s) Updating state to show date instead of time since last change", + self._get_attr(CONF_NAME), ) _LOGGER.debug( - f"({self._get_attr(CONF_NAME)}) New State: " - f"{self._get_attr(ATTR_NATIVE_VALUE)}" + "(%s) New State: %s", self._get_attr(CONF_NAME), self._get_attr(ATTR_NATIVE_VALUE) ) - async def _async_get_seconds_from_last_change(self, now): + async def _async_get_seconds_from_last_change(self, now: datetime) -> int: if self._is_attr_blank(ATTR_LAST_CHANGED): return 3600 try: - last_changed = datetime.fromisoformat(self._get_attr(ATTR_LAST_CHANGED)) + last_changed: datetime = datetime.fromisoformat(self._get_attr(ATTR_LAST_CHANGED)) except (TypeError, ValueError) as e: _LOGGER.warning( - f"Error converting Last Changed date/time " - f"({self._get_attr(ATTR_LAST_CHANGED)}) " - f"into datetime: {repr(e)}" + "Error converting Last Changed date/time (%s) into datetime: %r", + self._get_attr(ATTR_LAST_CHANGED), + e, ) return 3600 else: @@ -2624,20 +2587,16 @@ async def _async_get_seconds_from_last_change(self, now): changed_diff_sec = (datetime.now() - last_changed).total_seconds() except (TypeError, OverflowError) as e: _LOGGER.warning( - "Error calculating the seconds between last change to now: " - f"{repr(e)}" + "Error calculating the seconds between last change to now: %r", e ) return 3600 except OverflowError as e: - _LOGGER.warning( - "Error calculating the seconds between last change to now: " - f"{repr(e)}" - ) + _LOGGER.warning("Error calculating the seconds between last change to now: %r", e) return 3600 - return changed_diff_sec + return int(changed_diff_sec) - async def _async_reset_attributes(self): - """Resets attributes.""" + async def _async_reset_attributes(self) -> None: + """Reset sensor attributes.""" for attr in RESET_ATTRIBUTE_LIST: self._clear_attr(attr) # self._set_attr(ATTR_UPDATES_SKIPPED, 0) @@ -2645,4 +2604,6 @@ async def _async_reset_attributes(self): class PlacesNoRecorder(Places): + """Places Class without the HA Recorder.""" + _unrecorded_attributes = frozenset({MATCH_ALL}) diff --git a/icon/map-search-outline.svg b/icon/map-search-outline.svg index 436219ff..86d1df93 100644 --- a/icon/map-search-outline.svg +++ b/icon/map-search-outline.svg @@ -1 +1 @@ - \ No newline at end of file +