From 5e640d5accff9ee038ab448252d38eee2c661edc Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Sun, 1 Dec 2024 18:30:45 +0100 Subject: [PATCH] refactoring the all functions in regards to check update of plugin and firware via a DNS request --- Modules/checkingUpdate.py | 184 ++++++++++++++++++++++++++++---------- 1 file changed, 135 insertions(+), 49 deletions(-) diff --git a/Modules/checkingUpdate.py b/Modules/checkingUpdate.py index d67c305f8..bd1eec363 100644 --- a/Modules/checkingUpdate.py +++ b/Modules/checkingUpdate.py @@ -11,9 +11,6 @@ # SPDX-License-Identifier: GPL-3.0 license # Use DNS TXT to check latest version available on gitHub -# - stable -# - beta - import dns.resolver import requests @@ -31,66 +28,122 @@ def check_plugin_version_against_dns(self, zigbee_communication, branch, zigate_model): - self.log.logging("Plugin", "Debug", f"check_plugin_version_against_dns {zigbee_communication} {branch} {zigate_model}") + """ + Checks the plugin version against DNS TXT records and fetches firmware details if applicable. - plugin_version = None - plugin_version = _get_dns_txt_record(self, PLUGIN_TXT_RECORD) - self.log.logging("Plugin", "Debug", f"check_plugin_version_against_dns {plugin_version}") + Args: + zigbee_communication (str): Communication type ('native' or 'zigpy'). + branch (str): Current plugin branch (e.g., 'master', 'beta'). + zigate_model (str): Zigate model identifier. + Returns: + tuple: Plugin version, firmware major version, firmware minor version. + """ + self.log.logging("Plugin", "Debug", f"check_plugin_version_against_dns {zigbee_communication} {branch} {zigate_model}") + + # Fetch and parse plugin version DNS record + plugin_version = _fetch_and_parse_dns_record(self, PLUGIN_TXT_RECORD, "Plugin") if plugin_version is None: - # Something weird happened - self.log.logging("Plugin", "Error", "Unable to get access to plugin expected version. Is Internet access available ?") + self.log.logging("Plugin", "Error", "Unable to access plugin version. Is Internet access available?") return (0, 0, 0) - plugin_version_dict = _parse_dns_txt_record( plugin_version) - self.log.logging("Plugin", "Debug", f"check_plugin_version_against_dns {plugin_version} {plugin_version_dict}") + self.log.logging("Plugin", "Debug", f" plugin version: >{plugin_version}") - # If native communication (zigate) let's find the zigate firmware - firmware_version = None + firmware_version_dict = {} if zigbee_communication == "native": - zigate_plugin_record = ZIGATE_DNS_RECORDS.get(zigate_model) - firmware_version = _get_dns_txt_record(self, zigate_plugin_record) - firmware_version_dict = _parse_dns_txt_record(firmware_version) - self.log.logging("Plugin", "Debug", f"check_plugin_version_against_dns {firmware_version} {firmware_version_dict}") - - self.log.logging("Plugin", "Debug", f"check_plugin_version_against_dns {plugin_version} {plugin_version_dict}") + firmware_version = _fetch_and_parse_dns_record(self, ZIGATE_DNS_RECORDS.get(zigate_model), "Firmware") + if firmware_version: + firmware_version_dict = firmware_version - if zigbee_communication == "native" and branch in plugin_version_dict and "firmMajor" in firmware_version_dict and "firmMinor" in firmware_version_dict: - return (plugin_version_dict[branch], firmware_version_dict["firmMajor"], firmware_version_dict["firmMinor"]) + self.log.logging("Plugin", "Debug", f" firmware version: >{firmware_version_dict}") + + # Determine the response based on communication type and branch support + if zigbee_communication == "native" and branch in plugin_version and "firmMajor" in firmware_version_dict and "firmMinor" in firmware_version_dict: + return (plugin_version[branch], firmware_version_dict["firmMajor"], firmware_version_dict["firmMinor"]) - if zigbee_communication == "zigpy" and branch in plugin_version_dict: - return (plugin_version_dict[branch], 0, 0) + if zigbee_communication == "zigpy" and branch in plugin_version: + return (plugin_version[branch], 0, 0) - self.log.logging("Plugin", "Error", f"You are running {branch}-{plugin_version}, a NOT SUPPORTED version. ") + self.log.logging("Plugin", "Error", f"You are running on branch: {branch}, which is NOT SUPPORTED.") return (0, 0, 0) +def _fetch_and_parse_dns_record(self, record_name, record_type): + """ + Fetches and parses a DNS TXT record. + + Args: + record_name (str): The name of the DNS TXT record. + record_type (str): Type of record (e.g., 'Plugin', 'Firmware') for logging. + + Returns: + dict or None: Parsed DNS record as a dictionary, or None if unavailable. + """ + if not record_name: + self.log.logging(record_type, "Error", f"{record_type} DNS record not found.") + return None + + self.log.logging(record_type, "Debug", f"Fetching {record_type} DNS record: {record_name}") + record = _get_dns_txt_record(self, record_name) + if record is None: + self.log.logging(record_type, "Error", f"Failed to fetch {record_type} DNS record: {record_name}") + return None + + self.log.logging(record_type, "Debug", f"Fetching {record_type} DNS record: {record_name} = {record}") + + parsed_record = _parse_dns_txt_record(record) + self.log.logging(record_type, "Debug", f"Fetched and parsed {record_type} DNS record: {parsed_record}") + return parsed_record + + def _get_dns_txt_record(self, record, timeout=1): + """ + Fetch a DNS TXT record. + + Args: + record (str): The DNS record to fetch. + timeout (int): Timeout for the DNS query in seconds. + + Returns: + str or None: The DNS TXT record as a string, or None if unavailable. + """ if not self.internet_available: + self.log.logging("Plugin", "Error", f"Internet unavailable, skipping DNS resolution for {record}") return None try: - result = dns.resolver.resolve(record, "TXT", tcp=True, lifetime=1).response.answer[0] + # Attempt to resolve the DNS TXT record + result = dns.resolver.resolve(record, "TXT", tcp=True, lifetime=timeout).response.answer[0] return str(result[0]).strip('"') except dns.resolver.Timeout: - error_message = f"DNS resolution timed out for {record} after {timeout} second" - self.internet_available = False + _handle_dns_error(self, f"DNS resolution timed out for {record} after {timeout} second(s)", fatal=True) except dns.resolver.NoAnswer: - error_message = f"DNS TXT record not found for {record}" + _handle_dns_error(self, f"DNS TXT record not found for {record}") except dns.resolver.NoNameservers: - error_message = f"No nameservers found for {record}" - self.internet_available = False + _handle_dns_error(self, f"No nameservers found for {record}", fatal=True) except Exception as e: - error_message = f"An unexpected error occurred while resolving DNS TXT record for {record}: {e}" + _handle_dns_error(self, f"Unexpected error while resolving DNS TXT record for {record}: {e}") - self.log.logging("Plugin", "Error", error_message) return None +def _handle_dns_error(self, message, fatal=False): + """ + Handle DNS errors with consistent logging. + + Args: + message (str): The error message to log. + fatal (bool): If True, set internet_available to False. + """ + self.log.logging("Plugin", "Error", message) + if fatal: + self.internet_available = False + + def _parse_dns_txt_record(txt_record): version_dict = {} if txt_record and txt_record != "": @@ -100,39 +153,72 @@ def _parse_dns_txt_record(txt_record): def is_plugin_update_available(self, currentVersion, availVersion): - if availVersion == 0: - return False + """ + Check if a plugin update is available. - currentMaj, currentMin, currentUpd = currentVersion.split(".") - availMaj, availMin, availUpd = availVersion.split(".") + Args: + currentVersion (str): Current plugin version (e.g., "1.0.0"). + availVersion (str): Available plugin version (e.g., "1.1.0"). - if availMaj > currentMaj: - self.log.logging("Plugin", "Status", "Zigbee4Domoticz plugin: upgrade available: %s" %availVersion) - return True + Returns: + bool: True if an update is available, False otherwise. + """ + if availVersion == "0": + return False - if availMaj == currentMaj and ( - availMin == currentMin - and availUpd > currentUpd - or availMin > currentMin - ): - self.log.logging("Plugin", "Status", "Zigbee4Domoticz plugin: upgrade available: %s" %availVersion) + if _is_newer_version(self, currentVersion, availVersion): + self.log.logging("Plugin", "Status", f"Zigbee4Domoticz plugin: upgrade available: {availVersion}") return True + return False +def _is_newer_version(self, currentVersion, availVersion): + """ + Compare two version strings to determine if the second is newer. + + Args: + currentVersion (str): Current version string (e.g., "1.0.0"). + availVersion (str): Available version string (e.g., "1.1.0"). + + Returns: + bool: True if availVersion is newer than currentVersion, False otherwise. + """ + current = tuple(map(int, currentVersion.split("."))) + available = tuple(map(int, availVersion.split("."))) + return available > current + + def is_zigate_firmware_available(self, currentMajorVersion, currentFirmwareVersion, availfirmMajor, availfirmMinor): if not (availfirmMinor and currentFirmwareVersion): return False if int(availfirmMinor, 16) > int(currentFirmwareVersion, 16): - self.log.logging("Plugin", "Debug", "Zigate Firmware update available") + self.log.logging("Plugin", "Status", "Zigate Firmware update available") return True return False def is_internet_available(): + """ + Check if the internet is available by sending a GET request to a reliable website. + + Returns: + bool: True if the internet is available, False otherwise. + """ + url = "http://www.google.com" + timeout = 3 + try: - response = requests.get("http://www.google.com", timeout=3) - # Check if the status code is a success code (2xx) - return response.status_code == 200 + response = requests.get(url, timeout=timeout) + # Return True if the status code indicates success (2xx) + return 200 <= response.status_code < 300 except requests.ConnectionError: + # Handle cases where the connection fails + return False + except requests.Timeout: + # Handle timeout errors + return False + except requests.RequestException as e: + # Handle other request exceptions + print(f"Unexpected error while checking internet availability: {e}") return False