-
Notifications
You must be signed in to change notification settings - Fork 22
/
vreglink.py
86 lines (66 loc) · 3.03 KB
/
vreglink.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
import dbus
import struct
from vedbus import VeDbusItemExport
class VregLinkItem(VeDbusItemExport):
def __init__(self, *args, getvreg=None, setvreg=None, **kwargs):
super().__init__(*args, **kwargs)
self.getvreg = getvreg
self.setvreg = setvreg
@dbus.service.method('com.victronenergy.VregLink',
in_signature='q', out_signature='qay')
def GetVreg(self, regid):
return self.getvreg(int(regid))
@dbus.service.method('com.victronenergy.VregLink',
in_signature='qay', out_signature='qay')
def SetVreg(self, regid, data):
return self.setvreg(int(regid), bytes(data))
class VregLink:
def device_init_late(self):
super().device_init_late()
vregtype = lambda *args, **kwargs: VregLinkItem(*args, **kwargs,
getvreg=self.vreglink_get, setvreg=self.vreglink_set)
self.dbus.add_path('/Devices/0/VregLink', None, itemtype=vregtype)
self.dbus.add_path('/Devices/0/DeviceInstance', self.devinst)
self.dbus.add_path('/Devices/0/ProductId', self.productid)
self.dbus.add_path('/Devices/0/ProductName', self.productname)
self.dbus.add_path('/Devices/0/ServiceName', self.dbus.get_name())
self.dbus.add_path('/Devices/0/CustomName', self.dbus['/CustomName'])
self.dbus.add_path('/Devices/0/FirmwareVersion',
self.dbus['/FirmwareVersion'])
self.dbus.add_path('/Devices/0/IpAddress', self.spec.target)
def vreglink_get(self, regid):
return self.vreglink_exec(regid)
def vreglink_set(self, regid, data):
return self.vreglink_exec(regid, data)
def vreglink_exec(self, regid, data=None):
iswrite = data is not None
if iswrite:
dlen = len(data)
if dlen & 1:
data += b'\0'
data = struct.unpack('>%dH' % (len(data) / 2), data)
data = [regid, dlen, *data]
else:
data = [regid]
nread = 3 + self.vreglink_size
r = self.modbus.readwrite_registers(read_address=self.vreglink_base,
read_count=nread,
write_address=self.vreglink_base,
write_registers=data,
unit=self.unit)
if r.isError():
self.log.error('Modbus error accessing vreg %#04x: %s', regid, r)
return 0x8100 if iswrite else 0x8000, []
if r.registers[0] != regid:
self.log.error('Invalid vreg response: %s', r.registers)
return 0x8100 if iswrite else 0x8000, []
stat = r.registers[1]
size = r.registers[2]
data = r.registers[3:]
data = struct.pack('>%dH' % (len(data)), *data)
if size > len(data):
self.log.warning('Truncated data for vreg %04x: %s < %s',
regid, len(data), size)
size = len(data)
data = data[0:size]
return stat, data