-
-
Notifications
You must be signed in to change notification settings - Fork 43
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refacor Modules/input.py and breakdown into specialized unit (#1678)
* re-factoring of inputs , breakdown in small modules
- Loading branch information
Showing
49 changed files
with
2,836 additions
and
4,313 deletions.
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
#!/usr/bin/env python3 | ||
# coding: utf-8 -*- | ||
# | ||
# Author: pipiche38 | ||
# | ||
|
||
from Modules.zigbeeController import receiveZigateEpList | ||
from Modules.tools import DeviceExist, updSQN, updLQI | ||
from Modules.pairingProcess import interview_state_8045 | ||
from Modules.errorCodes import DisplayStatusCode | ||
|
||
def Decode8045(self, Devices, MsgData, MsgLQI): | ||
MsgDataSQN = MsgData[:2] | ||
MsgDataStatus = MsgData[2:4] | ||
MsgDataShAddr = MsgData[4:8] | ||
MsgDataEpCount = MsgData[8:10] | ||
MsgDataEPlist = MsgData[10:] | ||
|
||
self.log.logging('Pairing', 'Debug', 'Decode8045 - Reception Active endpoint response: SQN: %s Status: %s Short Addr: %s List: %s Ep List: %s' % ( | ||
MsgDataSQN, DisplayStatusCode(MsgDataStatus), MsgDataShAddr, MsgDataEpCount, MsgDataEPlist)) | ||
|
||
if MsgDataShAddr == '0000': | ||
receiveZigateEpList(self, MsgDataEpCount, MsgDataEPlist) | ||
return | ||
|
||
if not DeviceExist(self, Devices, MsgDataShAddr): | ||
self.log.logging('Input', 'Error', 'Decode8045 - KeyError: MsgDataShAddr = ' + MsgDataShAddr) | ||
return | ||
|
||
if self.ListOfDevices[MsgDataShAddr]['Status'] == 'inDB': | ||
return | ||
|
||
self.ListOfDevices[MsgDataShAddr]['Status'] = '8045' | ||
updSQN(self, MsgDataShAddr, MsgDataSQN) | ||
updLQI(self, MsgDataShAddr, MsgLQI) | ||
|
||
for i in range(0, 2 * int(MsgDataEpCount, 16), 2): | ||
tmpEp = MsgDataEPlist[i:i + 2] | ||
if not self.ListOfDevices[MsgDataShAddr]['Ep'].get(tmpEp): | ||
self.ListOfDevices[MsgDataShAddr]['Ep'][tmpEp] = {} | ||
|
||
if not self.ListOfDevices[MsgDataShAddr].get('Epv2'): | ||
self.ListOfDevices[MsgDataShAddr]['Epv2'] = {} | ||
|
||
self.log.logging('Input', 'Status', '[%s] NEW OBJECT: %s Active Endpoint Response Ep: %s LQI: %s' % ( | ||
'-', MsgDataShAddr, tmpEp, int(MsgLQI, 16))) | ||
|
||
if self.ListOfDevices[MsgDataShAddr]['Status'] != '8045': | ||
self.log.logging('Input', 'Log', '[%s] NEW OBJECT: %s/%s receiving 0x8043 while in status: %s' % ( | ||
'-', MsgDataShAddr, tmpEp, self.ListOfDevices[MsgDataShAddr]['Status'])) | ||
|
||
self.ListOfDevices[MsgDataShAddr]['NbEp'] = str(int(MsgDataEpCount, 16)) | ||
interview_state_8045(self, MsgDataShAddr, RIA=None, status=None) | ||
|
||
self.log.logging('Pairing', 'Debug', 'Decode8045 - Device: ' + str(MsgDataShAddr) + ' updated ListofDevices with ' + str(self.ListOfDevices[MsgDataShAddr]['Ep'])) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,48 @@ | ||
|
||
from Modules.basicOutputs import handle_unknow_device | ||
from Modules.tools import zigpy_plugin_sanity_check | ||
|
||
|
||
|
||
def Decode8141(self, Devices, MsgData, MsgLQI): | ||
MsgComplete = MsgData[:2] | ||
MsgAttType = MsgData[2:4] | ||
MsgAttID = MsgData[4:8] | ||
MsgAttFlag = MsgData[8:10] | ||
|
||
self.log.logging('Input', 'Log', f'Decode8141 - Attribute Discovery Extended Response - MsgComplete: {MsgComplete} AttType: {MsgAttType} Attribute: {MsgAttID} Flag: {MsgAttFlag}') | ||
|
||
if len(MsgData) > 10: | ||
MsgSrcAddr = MsgData[10:14] | ||
MsgSrcEp = MsgData[14:16] | ||
MsgClusterID = MsgData[16:20] | ||
|
||
self.log.logging('Input', 'Log', f'Decode8141 - Attribute Discovery Extended Response - MsgComplete: {MsgComplete} AttType: {MsgAttType} Attribute: {MsgAttID} Flag: {MsgAttFlag}') | ||
|
||
if MsgSrcAddr not in self.ListOfDevices: | ||
if not zigpy_plugin_sanity_check(self, MsgSrcAddr): | ||
handle_unknow_device(self, MsgSrcAddr) | ||
return | ||
|
||
if 'Attributes List' not in self.ListOfDevices[MsgSrcAddr]: | ||
self.ListOfDevices[MsgSrcAddr]['Attributes List Extended'] = {'Ep': {}} | ||
|
||
if 'Ep' not in self.ListOfDevices[MsgSrcAddr]['Attributes List Extended']: | ||
self.ListOfDevices[MsgSrcAddr]['Attributes List Extended']['Ep'] = {} | ||
|
||
if MsgSrcEp not in self.ListOfDevices[MsgSrcAddr]['Attributes List Extended']['Ep']: | ||
self.ListOfDevices[MsgSrcAddr]['Attributes List Extended']['Ep'][MsgSrcEp] = {} | ||
|
||
if MsgClusterID not in self.ListOfDevices[MsgSrcAddr]['Attributes List Extended']['Ep'][MsgSrcEp]: | ||
self.ListOfDevices[MsgSrcAddr]['Attributes List Extended']['Ep'][MsgSrcEp][MsgClusterID] = {} | ||
|
||
if MsgAttID not in self.ListOfDevices[MsgSrcAddr]['Attributes List Extended']['Ep'][MsgSrcEp][MsgClusterID]: | ||
self.ListOfDevices[MsgSrcAddr]['Attributes List Extended']['Ep'][MsgSrcEp][MsgClusterID][MsgAttID] = {} | ||
|
||
att_info = self.ListOfDevices[MsgSrcAddr]['Attributes List Extended']['Ep'][MsgSrcEp][MsgClusterID][MsgAttID] | ||
att_info['Type'] = MsgAttType | ||
att_info['Read'] = int(MsgAttFlag, 16) & 1 | ||
att_info['Write'] = (int(MsgAttFlag, 16) & 2) >> 1 | ||
att_info['Reportable'] = (int(MsgAttFlag, 16) & 4) >> 2 | ||
att_info['Scene'] = (int(MsgAttFlag, 16) & 8) >> 3 | ||
att_info['Global'] = (int(MsgAttFlag, 16) & 16) >> 4 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
from Modules.basicOutputs import getListofAttribute, handle_unknow_device | ||
from Modules.tools import zigpy_plugin_sanity_check | ||
|
||
|
||
def Decode8140(self, Devices, MsgData, MsgLQI): | ||
self.log.logging('Input', 'Debug', 'Decode8140 - Attribute Discovery Response - Data: %s LQI: %s' % (MsgData, MsgLQI)) | ||
if MsgData[:2] == 'f7': | ||
return zigpy_Decode8140(self, Devices, MsgData[2:], MsgLQI) | ||
|
||
MsgComplete = MsgData[:2] | ||
MsgAttType = MsgData[2:4] | ||
MsgAttID = MsgData[4:8] | ||
|
||
if MsgComplete == '01' and MsgAttType == '00' and (MsgAttID == '0000'): | ||
return | ||
|
||
if len(MsgData) <= 8: | ||
return | ||
|
||
MsgSrcAddr = MsgData[8:12] | ||
MsgSrcEp = MsgData[12:14] | ||
MsgClusterID = MsgData[14:18] | ||
self.log.logging('Input', 'Debug', 'Decode8140 - Attribute Discovery Response - %s/%s - Cluster: %s - Attribute: %s - Attribute Type: %s Complete: %s' % (MsgSrcAddr, MsgSrcEp, MsgClusterID, MsgAttID, MsgAttType, MsgComplete), MsgSrcAddr) | ||
if MsgSrcAddr not in self.ListOfDevices: | ||
if not zigpy_plugin_sanity_check(self, MsgSrcAddr): | ||
handle_unknow_device(self, MsgSrcAddr) | ||
return | ||
|
||
if 'Attributes List' not in self.ListOfDevices[MsgSrcAddr]: | ||
self.ListOfDevices[MsgSrcAddr]['Attributes List'] = {'Ep': {}} | ||
if 'Ep' not in self.ListOfDevices[MsgSrcAddr]['Attributes List']: | ||
self.ListOfDevices[MsgSrcAddr]['Attributes List']['Ep'] = {} | ||
if MsgSrcEp not in self.ListOfDevices[MsgSrcAddr]['Attributes List']['Ep']: | ||
self.ListOfDevices[MsgSrcAddr]['Attributes List']['Ep'][MsgSrcEp] = {} | ||
if MsgClusterID not in self.ListOfDevices[MsgSrcAddr]['Attributes List']['Ep'][MsgSrcEp]: | ||
self.ListOfDevices[MsgSrcAddr]['Attributes List']['Ep'][MsgSrcEp][MsgClusterID] = {} | ||
if MsgAttID in self.ListOfDevices[MsgSrcAddr]['Attributes List']['Ep'][MsgSrcEp][MsgClusterID] and self.ListOfDevices[MsgSrcAddr]['Attributes List']['Ep'][MsgSrcEp][MsgClusterID][MsgAttID] == MsgAttType: | ||
return | ||
|
||
self.ListOfDevices[MsgSrcAddr]['Attributes List']['Ep'][MsgSrcEp][MsgClusterID][MsgAttID] = MsgAttType | ||
|
||
if MsgComplete != '01': | ||
next_start = '%04x' % (int(MsgAttID, 16) + 1) | ||
getListofAttribute(self, MsgSrcAddr, MsgSrcEp, MsgClusterID, start_attribute=next_start) | ||
|
||
|
||
def zigpy_Decode8140(self, Devices, MsgData, MsgLQI): | ||
|
||
MsgComplete = MsgData[:2] | ||
MsgSrcAddr = MsgData[2:6] | ||
MsgSrcEp = MsgData[6:8] | ||
MsgClusterID = MsgData[8:12] | ||
|
||
if "Attributes List" not in self.ListOfDevices[MsgSrcAddr]: | ||
self.ListOfDevices[MsgSrcAddr]["Attributes List"] = {"Ep": {}} | ||
|
||
if "Ep" not in self.ListOfDevices[MsgSrcAddr]["Attributes List"]: | ||
self.ListOfDevices[MsgSrcAddr]["Attributes List"]["Ep"] = {} | ||
|
||
if MsgSrcEp not in self.ListOfDevices[MsgSrcAddr]["Attributes List"]["Ep"]: | ||
self.ListOfDevices[MsgSrcAddr]["Attributes List"]["Ep"][MsgSrcEp] = {} | ||
|
||
if MsgClusterID not in self.ListOfDevices[MsgSrcAddr]["Attributes List"]["Ep"][MsgSrcEp]: | ||
self.ListOfDevices[MsgSrcAddr]["Attributes List"]["Ep"][MsgSrcEp][MsgClusterID] = {} | ||
|
||
idx = 12 | ||
while idx < len( MsgData ): | ||
Attribute = MsgData[idx : idx + 4] | ||
idx += 4 | ||
Attribute_type = MsgData[idx : idx + 2] | ||
idx += 2 | ||
self.ListOfDevices[MsgSrcAddr]["Attributes List"]["Ep"][MsgSrcEp][MsgClusterID][Attribute] = Attribute_type | ||
|
||
if MsgComplete != "01": | ||
next_start = "%04x" % (int(Attribute, 16) + 1) | ||
getListofAttribute( self, MsgSrcAddr, MsgSrcEp, MsgClusterID, start_attribute=next_start, ) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
def Decode8034(self, Devices, MsgData, MsgLQI): | ||
MsgDataStatus = MsgData[2:4] | ||
MsgNetworkAddressInterest = MsgData[4:8] | ||
MsgXMLTag = MsgData[10:12] | ||
MsgCountField = MsgData[12:14] | ||
MsgFieldValues = MsgData[14:] | ||
self.log.logging('Input', 'Log', 'Decode8034 - Complex Descriptor for: %s xmlTag: %s fieldCount: %s fieldValue: %s, Status: %s' % (MsgNetworkAddressInterest, MsgXMLTag, MsgCountField, MsgFieldValues, MsgDataStatus)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,130 @@ | ||
from Modules.callback import callbackDeviceAwake | ||
from Modules.domoTools import lastSeenUpdate | ||
from Modules.inRawAps import inRawAps | ||
from Modules.tools import (retreive_cmd_payload_from_8002, timeStamped, updLQI, | ||
updSQN) | ||
from Modules.zb_tables_management import mgmt_rtg_rsp | ||
from Modules.zigateConsts import ADDRESS_MODE, ZIGBEE_COMMAND_IDENTIFIER | ||
from Z4D_decoders.z4d_decoder_Remotes import Decode80A7 | ||
|
||
|
||
def Decode8002(self, Devices, MsgData, MsgLQI): | ||
if len(MsgData) < 22: | ||
self.log.logging('Input', 'Error', 'Invalid frame %s too short' % MsgData) | ||
return | ||
|
||
MsgProfilID = MsgData[2:6] | ||
MsgClusterID = MsgData[6:10] | ||
MsgSourcePoint = MsgData[10:12] | ||
MsgDestPoint = MsgData[12:14] | ||
MsgSourceAddressMode = MsgData[14:16] | ||
|
||
if int(MsgSourceAddressMode, 16) in [ADDRESS_MODE['short'], ADDRESS_MODE['group']]: | ||
MsgSourceAddress = MsgData[16:20] | ||
MsgDestinationAddressMode = MsgData[20:22] | ||
|
||
if int(MsgDestinationAddressMode, 16) in [ADDRESS_MODE['short'], ADDRESS_MODE['group']]: | ||
if len(MsgData) < 26: | ||
self.log.logging('Input', 'Error', 'Invalid frame %s too short' % MsgData) | ||
return | ||
|
||
MsgDestinationAddress = MsgData[22:26] | ||
MsgPayload = MsgData[26:] | ||
|
||
elif int(MsgDestinationAddressMode, 16) == ADDRESS_MODE['ieee']: | ||
if len(MsgData) < 38: | ||
self.log.logging('Input', 'Error', 'Invalid frame %s too short' % MsgData) | ||
return | ||
|
||
MsgDestinationAddress = MsgData[22:38] | ||
MsgPayload = MsgData[38:] | ||
|
||
else: | ||
self.log.logging('Input', 'Log', 'Decode8002 - Unexpected Destination ADDR_MOD: %s, drop packet %s' % (MsgDestinationAddressMode, MsgData)) | ||
return | ||
|
||
elif int(MsgSourceAddressMode, 16) == ADDRESS_MODE['ieee']: | ||
if len(MsgData) < 38: | ||
self.log.logging('Input', 'Error', 'Invalid frame %s too short' % MsgData) | ||
return | ||
|
||
MsgSourceAddress = MsgData[16:32] | ||
MsgDestinationAddressMode = MsgData[32:34] | ||
if int(MsgDestinationAddressMode, 16) in [ADDRESS_MODE['short'], ADDRESS_MODE['group']]: | ||
MsgDestinationAddress = MsgData[34:38] | ||
MsgPayload = MsgData[38:] | ||
|
||
elif int(MsgDestinationAddressMode, 16) == ADDRESS_MODE['ieee']: | ||
if len(MsgData) < 40: | ||
self.log.logging('Input', 'Error', 'Invalid frame %s too short' % MsgData) | ||
return | ||
|
||
MsgDestinationAddress = MsgData[34:40] | ||
MsgPayload = MsgData[40:] | ||
|
||
else: | ||
self.log.logging('Input', 'Log', 'Decode8002 - Unexpected Destination ADDR_MOD: %s, drop packet %s' % (MsgDestinationAddressMode, MsgData)) | ||
return | ||
|
||
else: | ||
self.log.logging('Input', 'Log', 'Decode8002 - Unexpected Source ADDR_MOD: %s, drop packet %s' % (MsgSourceAddressMode, MsgData)) | ||
return | ||
|
||
if len(MsgPayload) < 4: | ||
self.log.logging('Input', 'Error', 'Invalid frame %s, Payload %s too short' % (MsgData, MsgPayload)) | ||
return | ||
|
||
self.log.logging('Input', 'Debug', 'Reception Data indication, Source Address: ' + MsgSourceAddress + ' Destination Address: ' + MsgDestinationAddress + ' ProfilID: ' + MsgProfilID + ' ClusterID: ' + MsgClusterID + ' Message Payload: ' + MsgPayload, MsgSourceAddress) | ||
srcnwkid = dstnwkid = None | ||
|
||
if len(MsgDestinationAddress) != 4: | ||
self.log.logging('Input', 'Error', 'not handling IEEE address') | ||
return | ||
|
||
srcnwkid = MsgSourceAddress | ||
dstnwkid = MsgDestinationAddress | ||
if srcnwkid not in self.ListOfDevices: | ||
self.log.logging('Input', 'Debug', 'Decode8002 - Unknown NwkId: %s Ep: %s Cluster: %s Payload: %s' % (srcnwkid, MsgSourcePoint, MsgClusterID, MsgPayload)) | ||
return | ||
|
||
timeStamped(self, srcnwkid, 32770) | ||
lastSeenUpdate(self, Devices, NwkId=srcnwkid) | ||
updLQI(self, srcnwkid, MsgLQI) | ||
if MsgClusterID in ('8032', '8033'): | ||
mgmt_rtg_rsp(self, srcnwkid, MsgSourcePoint, MsgClusterID, dstnwkid, MsgDestPoint, MsgPayload) | ||
return | ||
|
||
if MsgProfilID != '0104': | ||
self.log.logging('inRawAPS', 'Debug', 'Decode8002 - NwkId: %s Ep: %s Cluster: %s Payload: %s' % (srcnwkid, MsgSourcePoint, MsgClusterID, MsgPayload), srcnwkid) | ||
return | ||
|
||
(default_response, GlobalCommand, Sqn, ManufacturerCode, Command, Data) = retreive_cmd_payload_from_8002(MsgPayload) | ||
if 'SQN' in self.ListOfDevices[srcnwkid] and Sqn == self.ListOfDevices[srcnwkid]['SQN']: | ||
self.log.logging('inRawAPS', 'Debug', 'Decode8002 - Duplicate message drop NwkId: %s Ep: %s Cluster: %s GlobalCommand: %5s Command: %s Data: %s' % (srcnwkid, MsgSourcePoint, MsgClusterID, GlobalCommand, Command, Data), srcnwkid) | ||
return | ||
|
||
updSQN(self, srcnwkid, Sqn) | ||
if GlobalCommand and int(Command, 16) in ZIGBEE_COMMAND_IDENTIFIER: | ||
self.log.logging('inRawAPS', 'Debug', 'Decode8002 - NwkId: %s Ep: %s Cluster: %s GlobalCommand: %5s Command: %s (%33s) Data: %s' % (srcnwkid, MsgSourcePoint, MsgClusterID, GlobalCommand, Command, ZIGBEE_COMMAND_IDENTIFIER[int(Command, 16)], Data), srcnwkid) | ||
|
||
else: | ||
self.log.logging('inRawAPS', 'Debug', 'Decode8002 - NwkId: %s Ep: %s Cluster: %s GlobalCommand: %5s Command: %s Data: %s' % (srcnwkid, MsgSourcePoint, MsgClusterID, GlobalCommand, Command, Data), srcnwkid) | ||
|
||
updLQI(self, srcnwkid, MsgLQI) | ||
|
||
if MsgClusterID == '0005' and MsgPayload[:2] == '05': | ||
cmd = MsgPayload[8:10] | ||
direction = MsgPayload[10:12] | ||
data = Sqn + MsgSourcePoint + MsgClusterID + cmd + direction + '000000' + srcnwkid | ||
self.log.logging('inRawAPS', 'Debug', 'Decode8002 - Sqn: %s NwkId %s Ep %s Cluster %s Cmd %s Direction %s' % (Sqn, srcnwkid, MsgClusterID, MsgClusterID, cmd, direction), srcnwkid) | ||
Decode80A7(self, Devices, data, MsgLQI) | ||
return | ||
|
||
if 'Manufacturer' not in self.ListOfDevices[srcnwkid] and 'Manufacturer Name' not in self.ListOfDevices[srcnwkid]: | ||
return | ||
|
||
if 'Manufacturer' in self.ListOfDevices[srcnwkid] and self.ListOfDevices[srcnwkid]['Manufacturer'] in ('', {}) and ('Manufacturer Name' in self.ListOfDevices[srcnwkid]) and (self.ListOfDevices[srcnwkid]['Manufacturer Name'] in ('', {})): | ||
return | ||
|
||
inRawAps(self, Devices, srcnwkid, MsgSourcePoint, MsgClusterID, dstnwkid, MsgDestPoint, Sqn, GlobalCommand, ManufacturerCode, Command, Data, MsgPayload) | ||
callbackDeviceAwake(self, Devices, srcnwkid, MsgSourcePoint, MsgClusterID) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
|
||
from Modules.basicOutputs import send_default_response | ||
|
||
|
||
def Decode7000(self, Devices, MsgData, MsgLQI): | ||
uSrcAddress = MsgData[:4] | ||
u8SrcEndpoint = MsgData[4:6] | ||
u16ClusterId = MsgData[6:10] | ||
bDirection = MsgData[10:12] | ||
bDisableDefaultResponse = MsgData[12:14] | ||
bManufacturerSpecific = MsgData[14:16] | ||
eFrameType = MsgData[16:18] | ||
u16ManufacturerCode = MsgData[18:22] | ||
u8CommandIdentifier = MsgData[22:24] | ||
u8TransactionSequenceNumber = MsgData[24:26] | ||
if uSrcAddress not in self.ListOfDevices: | ||
return | ||
self.log.logging('Input', 'Debug', 'Decode7000 - Default Response Notification [%s] %s/%s Cluster: %s DefaultReponse: %s ManufSpec: %s ManufCode: %s Command: %s Direction: %s FrameType: %s' % (u8TransactionSequenceNumber, uSrcAddress, u8SrcEndpoint, u16ClusterId, bDisableDefaultResponse, bManufacturerSpecific, u16ManufacturerCode, u8CommandIdentifier, bDirection, eFrameType)) | ||
if bDisableDefaultResponse == '00': | ||
send_default_response(self, uSrcAddress, u8SrcEndpoint, u16ClusterId, bDirection, bDisableDefaultResponse, bManufacturerSpecific, u16ManufacturerCode, eFrameType, u8CommandIdentifier, u8TransactionSequenceNumber) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
from Modules.errorCodes import DisplayStatusCode | ||
|
||
def Decode8101(self, Devices, MsgData, MsgLQI): | ||
MsgDataSQN = MsgData[:2] | ||
MsgDataEp = MsgData[2:4] | ||
MsgClusterId = MsgData[4:8] | ||
MsgDataCommand = MsgData[8:10] | ||
MsgDataStatus = MsgData[10:12] | ||
self.log.logging('Input', 'Debug', 'Decode8101 - Default response - SQN: %s, EP: %s, ClusterID: %s , DataCommand: %s, - Status: [%s] %s' % (MsgDataSQN, MsgDataEp, MsgClusterId, MsgDataCommand, MsgDataStatus, DisplayStatusCode(MsgDataStatus))) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
from Modules.deviceAnnoucement import device_annoucementv2 | ||
|
||
def Decode004D(self, Devices, MsgData, MsgLQI): | ||
device_annoucementv2(self, Devices, MsgData, MsgLQI) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
from Modules.errorCodes import DisplayStatusCode | ||
|
||
def Decode804B(self, Devices, MsgData, MsgLQI): | ||
MsgSequenceNumber = MsgData[:2] | ||
MsgDataStatus = MsgData[2:4] | ||
MsgServerMask = MsgData[4:8] | ||
self.log.logging('Input', 'Log', 'ZigateRead - MsgType 804B - System Server Discovery response, Sequence number: ' + MsgSequenceNumber + ' Status: ' + DisplayStatusCode(MsgDataStatus) + ' Server Mask: ' + MsgServerMask) |
Oops, something went wrong.