Skip to content

Commit

Permalink
upgrade zigpy layers to the latest radio libs
Browse files Browse the repository at this point in the history
  • Loading branch information
pipiche38 committed Nov 11, 2023
1 parent 9cf49cd commit 19a7f64
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 95 deletions.
126 changes: 33 additions & 93 deletions Classes/ZigpyTransport/AppGeneric.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,126 +173,66 @@ def packet_received(
self,
packet: t.ZigbeePacket
) -> None:

"""Notify zigpy of a received Zigbee packet."""
self.log.logging("TransportZigpy", "Debug", "packet_received %s" %(packet))

try:
sender = self.get_device_with_address(packet.src)
self.log.logging("TransportZigpy", "Log", "identified device - %s (%s)" %(str(sender), type(sender)) )

except KeyError:
self.log.logging("TransportZigpy", "Log", "Unknown device %r" %packet.src)
super(type(self),self).packet_received(packet)
return

self.log.logging("TransportZigpy", "Log", "identified device - %s (%s)" % (str(sender), type(sender)))
sender , addr_mode, profile, cluster, src_ep, dst_ep = (
packet.src.address.serialize()[::-1].hex(),
int(packet.src.addr_mode),
int(packet.profile_id),
int(packet.cluster_id),
int(packet.src_ep),
int(packet.dst_ep)
)

profile, cluster, src_ep, dst_ep = packet.profile_id, packet.cluster_id, packet.src_ep, packet.dst_ep
#self.log.logging("TransportZigpy", "Debug", " Src : %s (%s)" %(sender,type(sender)))
#self.log.logging("TransportZigpy", "Debug", " AddrMod : %02X" %(addr_mode))
#self.log.logging("TransportZigpy", "Debug", " src Ep : %02X" %(dst_ep))
#self.log.logging("TransportZigpy", "Debug", " dst Ep : %02x" %(dst_ep))
#self.log.logging("TransportZigpy", "Debug", " Profile : %04X" %(profile))
#self.log.logging("TransportZigpy", "Debug", " Cluster : %04X" %(cluster))

message = packet.data.serialize()
hex_message = binascii.hexlify(message).decode("utf-8")
dst_addressing = packet.dst.addr_mode if packet.dst else None

self.log.logging("TransportZigpy", "Debug", "packet_received - %s %s %s %s %s %s %s %s" %(
sender, profile, cluster, src_ep, dst_ep, message, hex_message, dst_addressing))

_handle_message( self, sender, profile, cluster, src_ep, dst_ep, message, dst_addressing, packet)
return
packet.src, profile, cluster, src_ep, dst_ep, message, hex_message, dst_addressing))

def handle_message(
self,
sender: zigpy.device.Device,
profile: int,
cluster: int,
src_ep: int,
dst_ep: int,
message: bytes,
dst_addressing=None,
) -> None:

"""Notify zigpy of a received handle_message."""
_handle_message( self, sender, profile, cluster, src_ep, dst_ep, message, dst_addressing)


def _handle_message(
self,
sender: zigpy.device.Device,
profile: int,
cluster: int,
src_ep: int,
dst_ep: int,
message: bytes,
dst_addressing=None,
Packet=None
) -> None:

hex_message = binascii.hexlify(message).decode("utf-8")
write_capture_rx_frames( self, sender, profile, cluster, src_ep, dst_ep, message, hex_message, dst_addressing)
write_capture_rx_frames( self, packet.src, profile, cluster, src_ep, dst_ep, message, hex_message, dst_addressing)

if sender.nwk == 0x0000:
if packet.src == 0x0000 or ( zigpy.zdo.ZDO_ENDPOINT in (packet.src_ep, packet.dst_ep)):
self.log.logging("TransportZigpy", "Debug", "handle_message from Controller Sender: %s Profile: %04x Cluster: %04x srcEp: %02x dstEp: %02x message: %s" %(
str(sender.nwk), profile, cluster, src_ep, dst_ep, hex_message))
if Packet:
super(type(self),self).packet_received(Packet)
else:
super(type(self),self).handle_message(sender, profile, cluster, src_ep, dst_ep, message)
sender, profile, cluster, src_ep, dst_ep, hex_message))
super(type(self),self).packet_received(packet)

if cluster == 0x8036:
# This has been handle via on_zdo_mgmt_permitjoin_rsp()
self.log.logging("TransportZigpy", "Debug", "handle_message 0x8036: %s Profile: %04x Cluster: %04x srcEp: %02x dstEp: %02x message: %s" %(
str(sender.nwk), profile, cluster, src_ep, dst_ep, hex_message))
self.callBackFunction( build_plugin_8014_frame_content(self, str(sender), hex_message ) )
if Packet:
super(type(self),self).packet_received(Packet)
else:
super(type(self),self).handle_message(sender, profile, cluster, src_ep, dst_ep, message)
sender, profile, cluster, src_ep, dst_ep, hex_message))
self.callBackFunction( build_plugin_8014_frame_content(self, sender, hex_message ) )
super(type(self),self).packet_received(packet)
return

if cluster == 0x8034:
# This has been handle via on_zdo_mgmt_leave_rsp()
self.log.logging("TransportZigpy", "Debug", "handle_message 0x8036: %s Profile: %04x Cluster: %04x srcEp: %02x dstEp: %02x message: %s" %(
str(sender.nwk), profile, cluster, src_ep, dst_ep, hex_message))
self.callBackFunction( build_plugin_8047_frame_content(self, str(sender), hex_message) )
sender, profile, cluster, src_ep, dst_ep, hex_message))
self.callBackFunction( build_plugin_8047_frame_content(self, sender, hex_message) )
return

addr = None
if sender.nwk is not None:
addr_mode = 0x02
addr = sender.nwk.serialize()[::-1].hex()
if profile and cluster:
self.log.logging(
"TransportZigpy",
"Debug",
"handle_message device 1: %s Profile: %04x Cluster: %04x sEP: %s dEp: %s message: %s lqi: %s" % (
str(sender), profile, cluster, src_ep, dst_ep, hex_message, sender.lqi)),

elif sender.ieee is not None:
addr = "%016x" % t.uint64_t.deserialize(sender.ieee.serialize())[0]
addr_mode = 0x03
if profile and cluster:
self.log.logging(
"TransportZigpy",
"Debug",
"handle_message device 1: %s Profile: %04x Cluster: %04x sEP: %s dEp: %s message: %s lqi: %s" % (
str(sender), profile, cluster, src_ep, dst_ep, hex_message, sender.lqi)),

if sender.lqi is None:
sender.lqi = 0x00

if src_ep == dst_ep == 0x00:
profile = 0x0000
packet.lqi = 0x00 if packet.lqi is None else packet.lqi
profile = 0x0000 if src_ep == dst_ep == 0x00 else profile

if profile and cluster:
self.log.logging( "TransportZigpy", "Debug", "handle_message device 2: %s Profile: %04x Cluster: %04x sEP: %s dEp: %s message: %s lqi: %s" %(
str(addr), profile, cluster, src_ep, dst_ep, hex_message, sender.lqi), )

if addr:
plugin_frame = build_plugin_8002_frame_content(self, addr, profile, cluster, src_ep, dst_ep, message, sender.lqi, src_addrmode=addr_mode)
self.log.logging("TransportZigpy", "Debug", "handle_message Sender: %s frame for plugin: %s" % (addr, plugin_frame))
self.callBackFunction(plugin_frame)
else:
self.log.logging( "TransportZigpy", "Error", "handle_message - Issue with sender is %s %s" % (
sender.nwk, sender.ieee), )
sender, profile, cluster, src_ep, dst_ep, hex_message, packet.lqi), )

plugin_frame = build_plugin_8002_frame_content(self, sender, profile, cluster, src_ep, dst_ep, message, packet.lqi, src_addrmode=addr_mode)
self.log.logging("TransportZigpy", "Debug", "handle_message Sender: %s frame for plugin: %s" % (sender, plugin_frame))
self.callBackFunction(plugin_frame)

return

Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
zigpy==0.59.0
zigpy_znp==0.11.6
zigpy_deconz==0.21.1
bellows==0.36.8
zigpy-cli==1.0.4
zigpy_znp==0.11.2
bellows==0.35.8
dnspython==2.3.0
pyserial>=3.5
z4d-certified-devices
Expand Down

0 comments on commit 19a7f64

Please sign in to comment.