From da0c9f4b3ff23eb265c718c6dadeecffe09ef202 Mon Sep 17 00:00:00 2001 From: Patrick Pichon Date: Sun, 29 Oct 2023 12:26:06 +0100 Subject: [PATCH] refactor --- Classes/ZigpyTransport/AppGeneric.py | 47 ++++++++++++++-------------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/Classes/ZigpyTransport/AppGeneric.py b/Classes/ZigpyTransport/AppGeneric.py index f5c278525..07fce4794 100644 --- a/Classes/ZigpyTransport/AppGeneric.py +++ b/Classes/ZigpyTransport/AppGeneric.py @@ -174,37 +174,35 @@ def get_zigpy_version(self): def packet_received(self, packet: t.ZigbeePacket) -> None: """Notify zigpy of a received Zigbee packet.""" - try: - device = self.get_device_with_address(packet.src) - self.log.logging("TransportZigpy", "Log", "identified device - %s (%s)" %(str(device), type(device)) ) - + sender = self.get_device_with_address(packet.src) + self.log.logging("TransportZigpy", "Debug", "identified device - %s (%s)" %(str(sender), type(sender)) ) + except KeyError: - self.log.logging("TransportZigpy", "Log", "Unknown device %r", packet.src) + self.log.logging("TransportZigpy", "Debug", "Unknown device %r", packet.src) return - - sender=device - profile=packet.profile_id - cluster=packet.cluster_id - src_ep=packet.src_ep - dst_ep=packet.dst_ep - message=packet.data.serialize() - dst_addressing=(packet.dst).addr_mode - + + self.log.logging("TransportZigpy", "Debug", "identified device - %s (%s)" % (str(sender), type(sender))) + + profile, cluster, src_ep, dst_ep = packet.profile_id, packet.cluster_id, packet.src_ep, packet.dst_ep + message = packet.data.serialize() + hex_message = binascii.hexlify(message).decode("utf-8") + dst_addressing = packet.dst.addr_mode if packet.dst else None + write_capture_rx_frames( self, sender, profile, cluster, src_ep, dst_ep, message, binascii.hexlify(message).decode("utf-8"), dst_addressing) - self.log.logging("TransportZigpy", "Log", "packet_received - %s %s %s %s %s %s %s %s" %( + self.log.logging("TransportZigpy", "Debug", "packet_received - %s %s %s %s %s %s %s %s" %( sender, profile, cluster, src_ep, dst_ep, message, binascii.hexlify(message).decode("utf-8"), dst_addressing)) if sender.nwk == 0x0000: # When coming from coordinator we have to send it back to zigpy - self.log.logging("TransportZigpy", "Log", "packet_received from Controller Sender: %s Profile: %04x Cluster: %04x srcEp: %02x dstEp: %02x message: %s" %( + self.log.logging("TransportZigpy", "Debug", "packet_received from Controller Sender: %s Profile: %04x Cluster: %04x srcEp: %02x dstEp: %02x message: %s" %( str(sender.nwk), profile, cluster, src_ep, dst_ep, binascii.hexlify(message).decode("utf-8"))) super(type(self),self).packet_received(packet) if cluster == 0x8036: # This has been handle via on_zdo_mgmt_permitjoin_rsp() - self.log.logging("TransportZigpy", "Log", "packet_received 0x8036: %s Profile: %04x Cluster: %04x srcEp: %02x dstEp: %02x message: %s" %( + self.log.logging("TransportZigpy", "Debug", "packet_received 0x8036: %s Profile: %04x Cluster: %04x srcEp: %02x dstEp: %02x message: %s" %( str(sender.nwk), profile, cluster, src_ep, dst_ep, binascii.hexlify(message).decode("utf-8"))) self.callBackFunction( build_plugin_8014_frame_content(self, str(sender), binascii.hexlify(message).decode("utf-8") ) ) super(type(self),self).packet_received(packet) @@ -212,7 +210,7 @@ def packet_received(self, packet: t.ZigbeePacket) -> None: if cluster == 0x8034: # This has been handle via on_zdo_mgmt_leave_rsp() - self.log.logging("TransportZigpy", "Log", "packet_received 0x8036: %s Profile: %04x Cluster: %04x srcEp: %02x dstEp: %02x message: %s" %( + self.log.logging("TransportZigpy", "Debug", "packet_received 0x8036: %s Profile: %04x Cluster: %04x srcEp: %02x dstEp: %02x message: %s" %( str(sender.nwk), profile, cluster, src_ep, dst_ep, binascii.hexlify(message).decode("utf-8"))) self.callBackFunction( build_plugin_8047_frame_content(self, str(sender), binascii.hexlify(message).decode("utf-8")) ) return @@ -222,29 +220,30 @@ def packet_received(self, packet: t.ZigbeePacket) -> None: addr_mode = 0x02 addr = sender.nwk.serialize()[::-1].hex() if profile and cluster: - self.log.logging( "TransportZigpy", "Log", "packet_received device 1: %s Profile: %04x Cluster: %04x sEP: %s dEp: %s message: %s lqi: %s" % ( + self.log.logging( "TransportZigpy", "Debug", "packet_received device 1: %s Profile: %04x Cluster: %04x sEP: %s dEp: %s message: %s lqi: %s" % ( str(sender), profile, cluster, src_ep, dst_ep, binascii.hexlify(message).decode("utf-8"), sender.lqi)), elif sender.ieee is not None: addr = "%016x" % t.uint64_t.deserialize(sender.ieee.serialize())[0] addr_mode = 0x03 if profile and cluster: - self.log.logging( "TransportZigpy", "Log", "packet_received device 1: %s Profile: %04x Cluster: %04x sEP: %s dEp: %s message: %s lqi: %s" % ( + self.log.logging( "TransportZigpy", "Debug", "packet_received device 1: %s Profile: %04x Cluster: %04x sEP: %s dEp: %s message: %s lqi: %s" % ( str(sender), profile, cluster, src_ep, dst_ep, binascii.hexlify(message).decode("utf-8"), sender.lqi)), if sender.lqi is None: sender.lqi = 0x00 + # Let's force profile to ZDP if eps == 0x00 if src_ep == dst_ep == 0x00: profile = 0x0000 - if profile and cluster: - self.log.logging( "TransportZigpy", "Log", "packet_received device 2: %s Profile: %04x Cluster: %04x sEP: %s dEp: %s message: %s lqi: %s" % ( + if profile is not None and cluster is not None: + self.log.logging( "TransportZigpy", "Debug", "packet_received device 2: %s Profile: %04x Cluster: %04x sEP: %s dEp: %s message: %s lqi: %s" % ( str(addr), profile, cluster, src_ep, dst_ep, binascii.hexlify(message).decode("utf-8"), sender.lqi), ) - if addr: + if addr is not None: plugin_frame = build_plugin_8002_frame_content(self, addr, profile, cluster, src_ep, dst_ep, message, sender.lqi, src_addrmode=addr_mode) - self.log.logging("TransportZigpy", "Log", "packet_received Sender: %s frame for plugin: %s" % (addr, plugin_frame)) + self.log.logging("TransportZigpy", "Debug", "packet_received Sender: %s frame for plugin: %s" % (addr, plugin_frame)) self.callBackFunction(plugin_frame) else: self.log.logging( "TransportZigpy", "Error", "packet_received - Issue with sender is %s %s" % (