Skip to content

Commit

Permalink
refactoring the all functions in regards to check update of plugin an…
Browse files Browse the repository at this point in the history
…d firware via a DNS request
  • Loading branch information
pipiche38 committed Dec 1, 2024
1 parent 86187cc commit 5e640d5
Showing 1 changed file with 135 additions and 49 deletions.
184 changes: 135 additions & 49 deletions Modules/checkingUpdate.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@
# SPDX-License-Identifier: GPL-3.0 license

# Use DNS TXT to check latest version available on gitHub
# - stable
# - beta


import dns.resolver
import requests
Expand All @@ -31,66 +28,122 @@


def check_plugin_version_against_dns(self, zigbee_communication, branch, zigate_model):
self.log.logging("Plugin", "Debug", f"check_plugin_version_against_dns {zigbee_communication} {branch} {zigate_model}")
"""
Checks the plugin version against DNS TXT records and fetches firmware details if applicable.
plugin_version = None
plugin_version = _get_dns_txt_record(self, PLUGIN_TXT_RECORD)
self.log.logging("Plugin", "Debug", f"check_plugin_version_against_dns {plugin_version}")
Args:
zigbee_communication (str): Communication type ('native' or 'zigpy').
branch (str): Current plugin branch (e.g., 'master', 'beta').
zigate_model (str): Zigate model identifier.
Returns:
tuple: Plugin version, firmware major version, firmware minor version.
"""
self.log.logging("Plugin", "Debug", f"check_plugin_version_against_dns {zigbee_communication} {branch} {zigate_model}")

# Fetch and parse plugin version DNS record
plugin_version = _fetch_and_parse_dns_record(self, PLUGIN_TXT_RECORD, "Plugin")
if plugin_version is None:
# Something weird happened
self.log.logging("Plugin", "Error", "Unable to get access to plugin expected version. Is Internet access available ?")
self.log.logging("Plugin", "Error", "Unable to access plugin version. Is Internet access available?")
return (0, 0, 0)

plugin_version_dict = _parse_dns_txt_record( plugin_version)
self.log.logging("Plugin", "Debug", f"check_plugin_version_against_dns {plugin_version} {plugin_version_dict}")
self.log.logging("Plugin", "Debug", f" plugin version: >{plugin_version}")

# If native communication (zigate) let's find the zigate firmware
firmware_version = None
firmware_version_dict = {}
if zigbee_communication == "native":
zigate_plugin_record = ZIGATE_DNS_RECORDS.get(zigate_model)
firmware_version = _get_dns_txt_record(self, zigate_plugin_record)
firmware_version_dict = _parse_dns_txt_record(firmware_version)
self.log.logging("Plugin", "Debug", f"check_plugin_version_against_dns {firmware_version} {firmware_version_dict}")

self.log.logging("Plugin", "Debug", f"check_plugin_version_against_dns {plugin_version} {plugin_version_dict}")
firmware_version = _fetch_and_parse_dns_record(self, ZIGATE_DNS_RECORDS.get(zigate_model), "Firmware")
if firmware_version:
firmware_version_dict = firmware_version

if zigbee_communication == "native" and branch in plugin_version_dict and "firmMajor" in firmware_version_dict and "firmMinor" in firmware_version_dict:
return (plugin_version_dict[branch], firmware_version_dict["firmMajor"], firmware_version_dict["firmMinor"])
self.log.logging("Plugin", "Debug", f" firmware version: >{firmware_version_dict}")

# Determine the response based on communication type and branch support
if zigbee_communication == "native" and branch in plugin_version and "firmMajor" in firmware_version_dict and "firmMinor" in firmware_version_dict:
return (plugin_version[branch], firmware_version_dict["firmMajor"], firmware_version_dict["firmMinor"])

if zigbee_communication == "zigpy" and branch in plugin_version_dict:
return (plugin_version_dict[branch], 0, 0)
if zigbee_communication == "zigpy" and branch in plugin_version:
return (plugin_version[branch], 0, 0)

self.log.logging("Plugin", "Error", f"You are running {branch}-{plugin_version}, a NOT SUPPORTED version. ")
self.log.logging("Plugin", "Error", f"You are running on branch: {branch}, which is NOT SUPPORTED.")
return (0, 0, 0)


def _fetch_and_parse_dns_record(self, record_name, record_type):
"""
Fetches and parses a DNS TXT record.
Args:
record_name (str): The name of the DNS TXT record.
record_type (str): Type of record (e.g., 'Plugin', 'Firmware') for logging.
Returns:
dict or None: Parsed DNS record as a dictionary, or None if unavailable.
"""
if not record_name:
self.log.logging(record_type, "Error", f"{record_type} DNS record not found.")
return None

self.log.logging(record_type, "Debug", f"Fetching {record_type} DNS record: {record_name}")
record = _get_dns_txt_record(self, record_name)
if record is None:
self.log.logging(record_type, "Error", f"Failed to fetch {record_type} DNS record: {record_name}")
return None

self.log.logging(record_type, "Debug", f"Fetching {record_type} DNS record: {record_name} = {record}")

parsed_record = _parse_dns_txt_record(record)
self.log.logging(record_type, "Debug", f"Fetched and parsed {record_type} DNS record: {parsed_record}")
return parsed_record


def _get_dns_txt_record(self, record, timeout=1):
"""
Fetch a DNS TXT record.
Args:
record (str): The DNS record to fetch.
timeout (int): Timeout for the DNS query in seconds.
Returns:
str or None: The DNS TXT record as a string, or None if unavailable.
"""
if not self.internet_available:
self.log.logging("Plugin", "Error", f"Internet unavailable, skipping DNS resolution for {record}")
return None

try:
result = dns.resolver.resolve(record, "TXT", tcp=True, lifetime=1).response.answer[0]
# Attempt to resolve the DNS TXT record
result = dns.resolver.resolve(record, "TXT", tcp=True, lifetime=timeout).response.answer[0]
return str(result[0]).strip('"')

except dns.resolver.Timeout:
error_message = f"DNS resolution timed out for {record} after {timeout} second"
self.internet_available = False
_handle_dns_error(self, f"DNS resolution timed out for {record} after {timeout} second(s)", fatal=True)

except dns.resolver.NoAnswer:
error_message = f"DNS TXT record not found for {record}"
_handle_dns_error(self, f"DNS TXT record not found for {record}")

except dns.resolver.NoNameservers:
error_message = f"No nameservers found for {record}"
self.internet_available = False
_handle_dns_error(self, f"No nameservers found for {record}", fatal=True)

except Exception as e:
error_message = f"An unexpected error occurred while resolving DNS TXT record for {record}: {e}"
_handle_dns_error(self, f"Unexpected error while resolving DNS TXT record for {record}: {e}")

self.log.logging("Plugin", "Error", error_message)
return None


def _handle_dns_error(self, message, fatal=False):
"""
Handle DNS errors with consistent logging.
Args:
message (str): The error message to log.
fatal (bool): If True, set internet_available to False.
"""
self.log.logging("Plugin", "Error", message)
if fatal:
self.internet_available = False


def _parse_dns_txt_record(txt_record):
version_dict = {}
if txt_record and txt_record != "":
Expand All @@ -100,39 +153,72 @@ def _parse_dns_txt_record(txt_record):


def is_plugin_update_available(self, currentVersion, availVersion):
if availVersion == 0:
return False
"""
Check if a plugin update is available.
currentMaj, currentMin, currentUpd = currentVersion.split(".")
availMaj, availMin, availUpd = availVersion.split(".")
Args:
currentVersion (str): Current plugin version (e.g., "1.0.0").
availVersion (str): Available plugin version (e.g., "1.1.0").
if availMaj > currentMaj:
self.log.logging("Plugin", "Status", "Zigbee4Domoticz plugin: upgrade available: %s" %availVersion)
return True
Returns:
bool: True if an update is available, False otherwise.
"""
if availVersion == "0":
return False

if availMaj == currentMaj and (
availMin == currentMin
and availUpd > currentUpd
or availMin > currentMin
):
self.log.logging("Plugin", "Status", "Zigbee4Domoticz plugin: upgrade available: %s" %availVersion)
if _is_newer_version(self, currentVersion, availVersion):
self.log.logging("Plugin", "Status", f"Zigbee4Domoticz plugin: upgrade available: {availVersion}")
return True

return False


def _is_newer_version(self, currentVersion, availVersion):
"""
Compare two version strings to determine if the second is newer.
Args:
currentVersion (str): Current version string (e.g., "1.0.0").
availVersion (str): Available version string (e.g., "1.1.0").
Returns:
bool: True if availVersion is newer than currentVersion, False otherwise.
"""
current = tuple(map(int, currentVersion.split(".")))
available = tuple(map(int, availVersion.split(".")))
return available > current


def is_zigate_firmware_available(self, currentMajorVersion, currentFirmwareVersion, availfirmMajor, availfirmMinor):
if not (availfirmMinor and currentFirmwareVersion):
return False
if int(availfirmMinor, 16) > int(currentFirmwareVersion, 16):
self.log.logging("Plugin", "Debug", "Zigate Firmware update available")
self.log.logging("Plugin", "Status", "Zigate Firmware update available")
return True
return False


def is_internet_available():
"""
Check if the internet is available by sending a GET request to a reliable website.
Returns:
bool: True if the internet is available, False otherwise.
"""
url = "http://www.google.com"
timeout = 3

try:
response = requests.get("http://www.google.com", timeout=3)
# Check if the status code is a success code (2xx)
return response.status_code == 200
response = requests.get(url, timeout=timeout)
# Return True if the status code indicates success (2xx)
return 200 <= response.status_code < 300
except requests.ConnectionError:
# Handle cases where the connection fails
return False
except requests.Timeout:
# Handle timeout errors
return False
except requests.RequestException as e:
# Handle other request exceptions
print(f"Unexpected error while checking internet availability: {e}")
return False

0 comments on commit 5e640d5

Please sign in to comment.