Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
pipiche38 committed Oct 29, 2023
1 parent 4063930 commit da0c9f4
Showing 1 changed file with 23 additions and 24 deletions.
47 changes: 23 additions & 24 deletions Classes/ZigpyTransport/AppGeneric.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,45 +174,43 @@ def get_zigpy_version(self):

def packet_received(self, packet: t.ZigbeePacket) -> None:
"""Notify zigpy of a received Zigbee packet."""

try:
device = self.get_device_with_address(packet.src)
self.log.logging("TransportZigpy", "Log", "identified device - %s (%s)" %(str(device), type(device)) )
sender = self.get_device_with_address(packet.src)
self.log.logging("TransportZigpy", "Debug", "identified device - %s (%s)" %(str(sender), type(sender)) )

except KeyError:
self.log.logging("TransportZigpy", "Log", "Unknown device %r", packet.src)
self.log.logging("TransportZigpy", "Debug", "Unknown device %r", packet.src)
return

sender=device
profile=packet.profile_id
cluster=packet.cluster_id
src_ep=packet.src_ep
dst_ep=packet.dst_ep
message=packet.data.serialize()
dst_addressing=(packet.dst).addr_mode


self.log.logging("TransportZigpy", "Debug", "identified device - %s (%s)" % (str(sender), type(sender)))

profile, cluster, src_ep, dst_ep = packet.profile_id, packet.cluster_id, packet.src_ep, packet.dst_ep
message = packet.data.serialize()
hex_message = binascii.hexlify(message).decode("utf-8")
dst_addressing = packet.dst.addr_mode if packet.dst else None

write_capture_rx_frames( self, sender, profile, cluster, src_ep, dst_ep, message, binascii.hexlify(message).decode("utf-8"), dst_addressing)

self.log.logging("TransportZigpy", "Log", "packet_received - %s %s %s %s %s %s %s %s" %(
self.log.logging("TransportZigpy", "Debug", "packet_received - %s %s %s %s %s %s %s %s" %(
sender, profile, cluster, src_ep, dst_ep, message, binascii.hexlify(message).decode("utf-8"), dst_addressing))

if sender.nwk == 0x0000:
# When coming from coordinator we have to send it back to zigpy
self.log.logging("TransportZigpy", "Log", "packet_received from Controller Sender: %s Profile: %04x Cluster: %04x srcEp: %02x dstEp: %02x message: %s" %(
self.log.logging("TransportZigpy", "Debug", "packet_received from Controller Sender: %s Profile: %04x Cluster: %04x srcEp: %02x dstEp: %02x message: %s" %(
str(sender.nwk), profile, cluster, src_ep, dst_ep, binascii.hexlify(message).decode("utf-8")))
super(type(self),self).packet_received(packet)

if cluster == 0x8036:
# This has been handle via on_zdo_mgmt_permitjoin_rsp()
self.log.logging("TransportZigpy", "Log", "packet_received 0x8036: %s Profile: %04x Cluster: %04x srcEp: %02x dstEp: %02x message: %s" %(
self.log.logging("TransportZigpy", "Debug", "packet_received 0x8036: %s Profile: %04x Cluster: %04x srcEp: %02x dstEp: %02x message: %s" %(
str(sender.nwk), profile, cluster, src_ep, dst_ep, binascii.hexlify(message).decode("utf-8")))
self.callBackFunction( build_plugin_8014_frame_content(self, str(sender), binascii.hexlify(message).decode("utf-8") ) )
super(type(self),self).packet_received(packet)
return

if cluster == 0x8034:
# This has been handle via on_zdo_mgmt_leave_rsp()
self.log.logging("TransportZigpy", "Log", "packet_received 0x8036: %s Profile: %04x Cluster: %04x srcEp: %02x dstEp: %02x message: %s" %(
self.log.logging("TransportZigpy", "Debug", "packet_received 0x8036: %s Profile: %04x Cluster: %04x srcEp: %02x dstEp: %02x message: %s" %(
str(sender.nwk), profile, cluster, src_ep, dst_ep, binascii.hexlify(message).decode("utf-8")))
self.callBackFunction( build_plugin_8047_frame_content(self, str(sender), binascii.hexlify(message).decode("utf-8")) )
return
Expand All @@ -222,29 +220,30 @@ def packet_received(self, packet: t.ZigbeePacket) -> None:
addr_mode = 0x02
addr = sender.nwk.serialize()[::-1].hex()
if profile and cluster:
self.log.logging( "TransportZigpy", "Log", "packet_received device 1: %s Profile: %04x Cluster: %04x sEP: %s dEp: %s message: %s lqi: %s" % (
self.log.logging( "TransportZigpy", "Debug", "packet_received device 1: %s Profile: %04x Cluster: %04x sEP: %s dEp: %s message: %s lqi: %s" % (
str(sender), profile, cluster, src_ep, dst_ep, binascii.hexlify(message).decode("utf-8"), sender.lqi)),

elif sender.ieee is not None:
addr = "%016x" % t.uint64_t.deserialize(sender.ieee.serialize())[0]
addr_mode = 0x03
if profile and cluster:
self.log.logging( "TransportZigpy", "Log", "packet_received device 1: %s Profile: %04x Cluster: %04x sEP: %s dEp: %s message: %s lqi: %s" % (
self.log.logging( "TransportZigpy", "Debug", "packet_received device 1: %s Profile: %04x Cluster: %04x sEP: %s dEp: %s message: %s lqi: %s" % (
str(sender), profile, cluster, src_ep, dst_ep, binascii.hexlify(message).decode("utf-8"), sender.lqi)),

if sender.lqi is None:
sender.lqi = 0x00

# Let's force profile to ZDP if eps == 0x00
if src_ep == dst_ep == 0x00:
profile = 0x0000

if profile and cluster:
self.log.logging( "TransportZigpy", "Log", "packet_received device 2: %s Profile: %04x Cluster: %04x sEP: %s dEp: %s message: %s lqi: %s" % (
if profile is not None and cluster is not None:
self.log.logging( "TransportZigpy", "Debug", "packet_received device 2: %s Profile: %04x Cluster: %04x sEP: %s dEp: %s message: %s lqi: %s" % (
str(addr), profile, cluster, src_ep, dst_ep, binascii.hexlify(message).decode("utf-8"), sender.lqi), )

if addr:
if addr is not None:
plugin_frame = build_plugin_8002_frame_content(self, addr, profile, cluster, src_ep, dst_ep, message, sender.lqi, src_addrmode=addr_mode)
self.log.logging("TransportZigpy", "Log", "packet_received Sender: %s frame for plugin: %s" % (addr, plugin_frame))
self.log.logging("TransportZigpy", "Debug", "packet_received Sender: %s frame for plugin: %s" % (addr, plugin_frame))
self.callBackFunction(plugin_frame)
else:
self.log.logging( "TransportZigpy", "Error", "packet_received - Issue with sender is %s %s" % (
Expand Down

0 comments on commit da0c9f4

Please sign in to comment.