diff --git a/README.md b/README.md index 6c990711..ba86b5d9 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # emonHub -emonHub is used in the OpenEnergyMonitor system to read data received over serial from either the EmonPi board or the RFM12/69Pi adapter board then forward the data to emonCMS in a decoded ready-to-use form - based on the configuration in [emonhub.conf](https://github.com/openenergymonitor/emonhub/configuration.md) +emonHub is used in the OpenEnergyMonitor system to read data received over serial from either the EmonPi board or the RFM12/69Pi adapter board then forward the data to emonCMS in a decoded ready-to-use form - based on the configuration in [emonhub.conf](conf/emonhub.conf) More generally: Emonhub consists of a series of interfacers that can read/subscribe or send/publish data to and from a multitude of services. EmonHub supports decoding data from: @@ -84,7 +84,7 @@ journalctl -f -u emonhub The emonhub configuration guide can be found here: -[emonhub.conf configuration](https://github.com/openenergymonitor/emonhub/configuration.md) +[emonhub.conf configuration](configuration.md) ## EmonHub Emoncms config module diff --git a/configuration.md b/configuration.md index 6d7e37ac..85da2b85 100644 --- a/configuration.md +++ b/configuration.md @@ -10,7 +10,7 @@ Hub is a section for emonhub global settings such as the loglevel. ## 2. `interfacers` -Interfacers holds the configuration for the different interfacers that emonhub supports such as the EmonHubJeeInterfacer for reading and writing to the RFM69Pi adapter board or emonPi board via serial, or the EmonHubMqttInterfacer which can be used to publish the data received from EmonHubJeeInterfacer to MQTT topics. For more interfacer examples see [conf/interfacer_examples](https://github.com/openenergymonitor/emonhub/conf/interfacer_examples) +Interfacers holds the configuration for the different interfacers that emonhub supports such as the EmonHubJeeInterfacer for reading and writing to the RFM69Pi adapter board or emonPi board via serial, or the EmonHubMqttInterfacer which can be used to publish the data received from EmonHubJeeInterfacer to MQTT topics. For more interfacer examples see [conf/interfacer_examples](conf/interfacer_examples) ### Channels @@ -54,7 +54,7 @@ Nodes holds the decoder configuration for rfm12/69 node data which are sent as b [nodes] ``` -**View full latest [default emonHub.conf](https://github.com/openenergymonitor/emonhub/conf/emonpi.default.emonhub.conf)** +**View full latest [default emonHub.conf](conf/emonpi.default.emonhub.conf)** *** @@ -126,7 +126,7 @@ There are two formats that can be used for publishing node data to MQTT: payload: 100,200,300 ``` -The 'node only format' is used with the emoncms [Nodes Module](https://github.com/emoncms/nodes) (now deprecated on Emoncms V9+) and the emonPiLCD python service. +The 'node only format' is used with the emoncms Nodes Module (now deprecated on Emoncms V9+) and the emonPiLCD python service. #### **2. Node variable format** diff --git a/install.sh b/install.sh index 845f30b0..a4e71391 100755 --- a/install.sh +++ b/install.sh @@ -19,7 +19,7 @@ if [ "$emonSD_pi_env" = "" ]; then fi sudo apt-get install -y python3-serial python3-configobj python3-pip python3-pymodbus bluetooth libbluetooth-dev -sudo pip3 install paho-mqtt requests pybluez +sudo pip3 install paho-mqtt requests pybluez py-sds011 sdm_modbus if [ "$emonSD_pi_env" = "1" ]; then # Only install the GPIO library if on a Pi. Used by Pulse interfacer diff --git a/src/emonhub.py b/src/emonhub.py index e4f203d7..e2635856 100755 --- a/src/emonhub.py +++ b/src/emonhub.py @@ -169,15 +169,16 @@ def _update_settings(self, settings): if 'log_backup_count' in settings['hub']: for handler in self._log.handlers: if isinstance(handler, logging.handlers.RotatingFileHandler): - handler.backupCount = settings['hub']['log_backup_count'] + handler.backupCount = int(settings['hub']['log_backup_count']) self._log.info("Logging backup count set to %d", handler.backupCount) if 'log_max_bytes' in settings['hub']: for handler in self._log.handlers: if isinstance(handler, logging.handlers.RotatingFileHandler): - handler.maxBytes = settings['hub']['log_max_bytes'] + handler.maxBytes = int(settings['hub']['log_max_bytes']) self._log.info("Logging max file size set to %d bytes", handler.maxBytes) # Interfacers + interfacers_to_delete = [] for name in self._interfacers: # Delete interfacers if not listed or have no 'Type' in the settings without further checks # (This also provides an ability to delete & rebuild by commenting 'Type' in conf) @@ -197,6 +198,9 @@ def _update_settings(self, settings): # Delete interfacers if setting changed or name is unlisted or Type is missing self._log.info("Deleting interfacer '%s'", name) self._interfacers[name].stop = True + interfacers_to_delete.append(name) + + for name in interfacers_to_delete: del self._interfacers[name] for name, I in settings['interfacers'].items(): diff --git a/src/interfacers/EmonHubDS18B20Interfacer.py b/src/interfacers/EmonHubDS18B20Interfacer.py new file mode 100644 index 00000000..8fd9e485 --- /dev/null +++ b/src/interfacers/EmonHubDS18B20Interfacer.py @@ -0,0 +1,157 @@ +import time +import json +import Cargo +import os +import glob +from emonhub_interfacer import EmonHubInterfacer + +""" +[[DS18B20]] + Type = EmonHubDS18B20Interfacer + [[[init_settings]]] + [[[runtimesettings]]] + pubchannels = ToEmonCMS, + read_interval = 10 + ids = 28-000008e2db06, 28-000009770529, 28-0000096a49b4 + names = ambient, cylb, cylt +""" + +class DS18B20: + def __init__(self): + os.system('modprobe w1-gpio') + os.system('modprobe w1-therm') + self._base_dir = '/sys/bus/w1/devices/' + + def scan(self): + devices = glob.glob(self._base_dir + '28*') + sensors = [] + for device in devices: + sensor = device.replace(self._base_dir,"") + sensors.append(sensor) + return sensors + + def _read_raw(self,sensor): + f = open(self._base_dir + sensor + '/w1_slave', 'r') + lines = f.readlines() + f.close() + return lines + + def tempC(self,sensor): + lines = self._read_raw(sensor) + # retry = 0 + while lines[0].strip()[-3:] != 'YES': + # time.sleep(0.2) + # lines = self._read_raw(sensor) + # retry += 1 + # if retry==3: return False + return False + + equals_pos = lines[1].find('t=') + if equals_pos != -1: + temp_string = lines[1][equals_pos+2:] + temp_c = float(temp_string) / 1000.0 + return temp_c + +"""class EmonHubDS18B20Interfacer + +DS18B20 interfacer for use in development + +""" + +class EmonHubDS18B20Interfacer(EmonHubInterfacer): + + def __init__(self, name): + """Initialize Interfacer + + """ + # Initialization + super(EmonHubDS18B20Interfacer, self).__init__(name) + + # This line will stop the default values printing to logfile at start-up + # self._settings.update(self._defaults) + + # Interfacer specific settings + self._DS18B20_settings = {'read_interval': 10.0,'nodename':'sensors','ids':[],'names':[]} + + self.ds = DS18B20() + + self.next_interval = True + + + def read(self): + """Read data and process + + Return data as a list: [NodeID, val1, val2] + + """ + + if int(time.time())%self._settings['read_interval']==0: + if self.next_interval: + self.next_interval = False + + c = Cargo.new_cargo() + c.names = [] + c.realdata = [] + c.nodeid = self._settings['nodename'] + + if self.ds: + for sensor in self.ds.scan(): + # Check if user has set a name for given sensor id + name = sensor + try: + index = self._settings['ids'].index(sensor) + if index0: + return c + + else: + self.next_interval = True + + return False + + + def set(self, **kwargs): + for key, setting in self._DS18B20_settings.items(): + # Decide which setting value to use + if key in kwargs: + setting = kwargs[key] + else: + setting = self._DS18B20_settings[key] + + if key in self._settings and self._settings[key] == setting: + continue + elif key == 'read_interval': + self._log.info("Setting %s read_interval: %s", self.name, setting) + self._settings[key] = float(setting) + continue + elif key == 'nodename': + self._log.info("Setting %s nodename: %s", self.name, setting) + self._settings[key] = str(setting) + continue + elif key == 'ids': + self._log.info("Setting %s ids: %s", self.name, ", ".join(setting)) + self._settings[key] = setting + continue + elif key == 'names': + self._log.info("Setting %s names: %s", self.name, ", ".join(setting)) + self._settings[key] = setting + continue + else: + self._log.warning("'%s' is not valid for %s: %s", setting, self.name, key) + + # include kwargs from parent + super().set(**kwargs) diff --git a/src/interfacers/EmonHubGraphiteInterfacer.py b/src/interfacers/EmonHubGraphiteInterfacer.py index 6077ef2b..4123bdb6 100644 --- a/src/interfacers/EmonHubGraphiteInterfacer.py +++ b/src/interfacers/EmonHubGraphiteInterfacer.py @@ -98,7 +98,7 @@ def _send_metrics(self, metrics=[]): try: sock = socket.socket() sock.connect((host, port)) - sock.sendall(message) + sock.sendall(message.encode()) sock.close() except socket.error as e: self._log.error(e) diff --git a/src/interfacers/EmonHubMBUSInterfacer.py b/src/interfacers/EmonHubMBUSInterfacer.py new file mode 100644 index 00000000..a785adc6 --- /dev/null +++ b/src/interfacers/EmonHubMBUSInterfacer.py @@ -0,0 +1,336 @@ +import time +import json +import Cargo +import serial +import struct +from emonhub_interfacer import EmonHubInterfacer + +""" +[[MBUS]] + Type = EmonHubMBUSInterfacer + [[[init_settings]]] + device = /dev/ttyUSB0 + baud = 2400 + [[[runtimesettings]]] + pubchannels = ToEmonCMS, + address = 100 + pages = 0, + read_interval = 10 + nodename = MBUS +""" + +"""class EmonHubMBUSInterfacer + +MBUS interfacer for use in development + +""" + +class EmonHubMBUSInterfacer(EmonHubInterfacer): + + def __init__(self, name, device="/dev/ttyUSB0", baud=2400): + """Initialize Interfacer + + """ + # Initialization + super(EmonHubMBUSInterfacer, self).__init__(name) + + # This line will stop the default values printing to logfile at start-up + # self._settings.update(self._defaults) + + # Interfacer specific settings + self._MBUS_settings = {'address': 100, 'pages': [0], 'read_interval': 10.0,'nodename':'MBUS'} + + self.next_interval = True + + # Only load module if it is installed + try: + self._log.debug("Connecting to MBUS serial: "+device+" "+str(baud)) + self.ser = serial.Serial(device, baud, 8, 'E', 1, 0.5) + except ModuleNotFoundError as err: + self._log.error(err) + self.ser = False + + + def mbus_short_frame(self, address, C_field): + data = [0]*5 + data[0] = 0x10 + data[1] = C_field + data[2] = address + data[3] = data[1]+data[2] + data[4] = 0x16 + self.ser.write(data) + + def mbus_request(self, address, telegram): + data = [0]*13 + data[0] = 0x68 + data[1] = 0x07 + data[2] = 0x07 + data[3] = 0x68 + + data[4] = 0x53 + data[5] = address + data[6] = 0x51 + + data[7] = 0x01 + data[8] = 0xFF + data[9] = 0x08 + data[10] = telegram + + checksum = 0 + for c in range(4,11): checksum += data[c] + data[11] = checksum % 256 + + data[12] = 0x16 + + self.ser.write(data) + + def set_page(self, address, page): + for retry in range(10): + self.mbus_request(address,page) + time.sleep(0.3) + if self.ser.in_waiting and ord(self.ser.read(1))==0xE5: + self._log.debug("ACK") + time.sleep(0.5) + return True + else: + time.sleep(0.2) + return False + + def parse_frame(self,data): + data_types = ['null','int','int','int','int','float','int','int','null','bcd','bcd','bcd','bcd','var','bcd','null'] + data_lengths = [0,1,2,3,4,4,6,8,0,1,2,3,4,6,6,0] + vif = { + 0x03: (0.001,"Energy","kWh"), + 0x06: (1,"Energy","kWh"), + 0x13: (0.001,"Volume","m3"), + 0x14: (0.01,"Volume","m3"), + 0x15: (0.1,"Volume","m3"), + 0x16: (1,"Volume","m3"), + #0x20: (1,"Ontime Seconds","s"), + #0x22: (1,"Ontime Hours","h"), + 0x2b: (1,"Power","W"), + 0x2e: (1000,"Power","W"), + #0x3b: (1,"FlowRate",""), # mm3/h + 0x3e: (1,"FlowRate","m3/h"), # m3/h + #0x40: (1,"FlowRate",""), # 1.0e-7 m3/min + 0x5b: (1,"FlowT","C"), + 0x5f: (1,"ReturnT","C"), + 0x63: (1,"DeltaT","C"), + 0x67: (1,"ExternalT","C"), + #0x6D: (1,"TIME & DATE",""), + #0x70: (1,"Average duration",""), + #0x74: (1,"Duration seconds actual",""), + #0x75: (1,"Duration minutes actual",""), + #0x78: (1,"Fab No",""), + #0x79: (1,"Enhanced","") + } + function_types = ["","Max","Min","error","special","special","more_to_follow"] + + result = {} + bid = 19 + record = 0 + while bid=0x80: + DIFE = data[bid] + bid += 1 + + VIF = data[bid] + bid += 1 + VIFE = 0 + if VIF>=0x80: + VIFE = data[bid] + bid += 1 + + data_field = DIF & 0x0F # AND logic + + function = (DIF & 0x30) >> 4 + if function0: + bytes = data[bid:bid+data_len] + bid += data_len + + vif_name = "" + if VIF in vif: + scale = vif[VIF][0] + name = vif[VIF][1] + unit = vif[VIF][2] + + if function!='': name += "_"+function + + value = False + + if data_type=="int": + if data_len==1: + value = bytes[0] + if data_len==2: + value = bytes[0] + (bytes[1]<<8) + if data_len==3: + value = bytes[0] + (bytes[1]<<8) + (bytes[2]<<16) + if data_len==4: + value = bytes[0] + (bytes[1]<<8) + (bytes[2]<<16) + (bytes[3]<<24) + + if data_type=="float": + if data_len==4: + value = struct.unpack("f",bytearray(bytes))[0] + + value *= scale + + #self._log.debug(hex(DIF)+"\t"+hex(DIFE)+"\t"+hex(VIF)+"\t"+hex(VIFE)+"\t"+data_type+str(data_len)+"\t"+" ["+",".join(map(str, bytes))+"] "+name+" = "+str(value)+" "+str(unit)) + #self._log.debug(vif_name+" "+function+" = "+str(value)+" "+str(unit)) + + if name in result: + name += str(record) + + result[name] = [value,unit] + + if 'FlowT' in result and 'ReturnT' in result and 'FlowRate' in result: + value = 4150 * (result['FlowT'][0] - result['ReturnT'][0]) * (result['FlowRate'][0] * (1000 / 3600)) + result['heat_calc'] = [value,"W"] + + return result + + def request_data(self,address): + self.mbus_short_frame(address,0x5b) + time.sleep(0.5) + + data = [] + bid = 0 + bid_end = 255 + bid_checksum = 255 + checksum = 0 + valid = False + + while self.ser.in_waiting: + # Read in byte + val = ord(self.ser.read(1)) + data.append(val) + + # Long frame start, reset checksum + if bid==0 and val==0x68: + valid = True + checksum = 0 + + # 2nd byte is the frame length + if valid and bid==1: + length = val + bid_end = length+4+2-1 + bid_checksum = bid_end-1 + + if valid and bid==2 and val!=length: valid = False # 3rd byte is also length, check that its the same as 2nd byte + if valid and bid==3 and val!=0x68: valid = False # 4th byte is the start byte again + if valid and bid>3 and bid1: + page = pages[pindex] + self._log.debug("Set page: "+str(page)) + self.set_page(self._settings['address'],page) + + for p in range(len(pages)): + result = self.request_data(self._settings['address']) + if result==None: + time.sleep(0.2) + result = self.request_data(self._settings['address']) + + if result!=None: + self._log.debug("Decoded MBUS data: "+json.dumps(result)) + + for key in result: + c.names.append(key) + c.realdata.append(result[key][0]) + c.units.append(result[key][1]) + else: + self._log.debug("Decoded MBUS data: None") + + if len(pages)>1: + pindex += 1 + if pindex>=len(pages): + pindex -= len(pages) + page = pages[pindex] + self._log.debug("Set page: "+str(page)) + self.set_page(self._settings['address'],page) + + if len(c.realdata)>0: + return c + + else: + self.next_interval = True + + return False + + + def set(self, **kwargs): + for key, setting in self._MBUS_settings.items(): + # Decide which setting value to use + if key in kwargs: + setting = kwargs[key] + else: + setting = self._MBUS_settings[key] + + if key in self._settings and self._settings[key] == setting: + continue + elif key == 'address': + self._log.info("Setting %s address: %s", self.name, setting) + self._settings[key] = int(setting) + continue + elif key == 'pages': + if type(setting)==list: + setting = list(map(int, setting)) + else: + setting = [int(setting)] + self._log.info("Setting %s pages: %s", self.name, json.dumps(setting)) + self._settings[key] = setting + continue + elif key == 'read_interval': + self._log.info("Setting %s read_interval: %s", self.name, setting) + self._settings[key] = float(setting) + continue + elif key == 'nodename': + self._log.info("Setting %s nodename: %s", self.name, setting) + self._settings[key] = str(setting) + continue + else: + self._log.warning("'%s' is not valid for %s: %s", setting, self.name, key) + + # include kwargs from parent + super().set(**kwargs) diff --git a/src/interfacers/EmonHubMqttInterfacer.py b/src/interfacers/EmonHubMqttInterfacer.py index 27113ef9..1f5c8fe1 100644 --- a/src/interfacers/EmonHubMqttInterfacer.py +++ b/src/interfacers/EmonHubMqttInterfacer.py @@ -181,7 +181,8 @@ def _process_post(self, databuffer): if 'rssi' in frame: payload = payload + "," + str(frame['rssi']) - self._log.info("Publishing: %s %s", topic, payload) + self._log.info("Publishing 'node' formatted msg") + self._log.debug("Publishing: %s %s", topic, payload) result = self._mqttc.publish(topic, payload=payload, qos=2, retain=False) if result[0] == 4: diff --git a/src/interfacers/EmonHubOEMInterfacer.py b/src/interfacers/EmonHubOEMInterfacer.py new file mode 100644 index 00000000..791c98f6 --- /dev/null +++ b/src/interfacers/EmonHubOEMInterfacer.py @@ -0,0 +1,372 @@ +import time +import json +import datetime +import Cargo +import re +from . import EmonHubSerialInterfacer as ehi + +"""class EmonHubOEMInterfacer + +Monitors the serial port for data from 'Serial Format 2' type devices + +""" + +class EmonHubOEMInterfacer(ehi.EmonHubSerialInterfacer): + + def __init__(self, name, com_port='/dev/ttyAMA0', com_baud=38400): + """Initialize Interfacer + + com_port (string): path to COM port + + """ + + # Initialization + super().__init__(name, com_port, com_baud) + + # Display device firmware version and current settings + self.info = ["", ""] + + self._rx_buf = "" + # self._ser.flushInput() + + # Initialize settings + self._defaults.update({'pause': 'off', 'interval': 0, 'datacode': 'h', 'nodename': 'test'}) + + self._config_map = {'g':'group','i':'baseid','b':'frequency','d':'period','k0':'vcal','k1':'ical1','k2':'ical2','k3':'ical3','k4':'ical4','f':'acfreq','m1':'m1','t0':'t0','a':'Vrms'} + self._config_map_inv = dict(map(reversed, self._config_map.items())) + + self._last_config = {} + self._config = {} + + self._config_format = "new" + + # This line will stop the default values printing to logfile at start-up + # unless they have been overwritten by emonhub.conf entries + # comment out if diagnosing a startup value issue + self._settings.update(self._defaults) + + self._first_data_packet_received = False + + + def add(self, cargo): + """Append data to buffer. + + data (list): node and values (eg: '[node,val1,val2,...]') + + """ + + #just send it + txc = self._process_tx(cargo) + self.send(txc) + + + def pre_process_data_format(self, f): + """Pre process data + + checks for valid data format, returns pre-processed data + + """ + + c = Cargo.new_cargo(rawdata=f) + c.names = [] + c.realdata = [] + + # Fetch default nodename from settings + if self._settings["nodename"] != "": + c.nodename = self._settings["nodename"] + c.nodeid = self._settings["nodename"] + + # ------------------------------------------------------------------- + # JSON FORMAT e.g {"power1":100,"power2":200} + # ------------------------------------------------------------------- + # Start with a quick check for the expected starting character + if f[0]=="{" or f[0]=="[": + try: # Attempt to decode json + json_data = json.loads(f) + for name in json_data: + if re.match(r'^[\w-]+$', name): # only alpha-numeric input names + c.realdata.append(float(json_data[name])) # check that value is numeric + c.names.append(name) + # else: + # self._log.debug("invalid input name: %s" % kv[0]) + except ValueError as e: + # self._log.debug("Invalid JSON: "+f) + return False + self._settings['datacode'] = False # Disable further attempt at data decode + # ------------------------------------------------------------------- + # KEY:VALUE FORMAT e.g power1:100,power2:200 + # ------------------------------------------------------------------- + elif ":" in f: + for kv_str in f.split(','): + kv = kv_str.split(':') + if len(kv) == 2: + if re.match(r'^[\w-]+$', kv[0]): + if len(kv[1])>0: + try: + c.realdata.append(float(kv[1])) + c.names.append(kv[0]) + except Exception: + # self._log.debug("input value is not numeric: %s" % kv[1]) + return False + else: + # self._log.debug("invalid input name: %s" % kv[0]) + return False + self._settings['datacode'] = False # Disable further attempt at data decode + # ------------------------------------------------------------------- + # BINARY FORMAT e.g OK 5 0 0 0 0 (-0)' + # ------------------------------------------------------------------- + elif " " in f: + # Split string by space + ssv = f.split(' ') + # Strip leading 'OK' from frame if needed + if ssv[0] == 'OK': + ssv = ssv[1:] + # Extract RSSI value if it's available + if ssv[-1].startswith('(') and ssv[-1].endswith(')'): + r = ssv[-1][1:-1] + try: + c.rssi = int(r) + except ValueError: + #self._log.warning("Packet discarded as the RSSI format is invalid: " + str(f)) + return False + ssv = ssv[:-1] + # Extract node id from frame + try: + c.nodeid = int(ssv[0]) + int(self._settings['nodeoffset']) + except ValueError: + return False + # Store data as a list of integer values + try: + c.realdata = [int(i) for i in ssv[1:]] + except ValueError: + return False + + if len(c.realdata) == 0: + return False + + # If we are here the data is valid and processed + return c + + def read(self): + """Read data from serial port and process if complete line received. + + Return data as a list: [NodeID, val1, val2] + + """ + + if not self._ser: + return + + # Read serial RX + try: + self._rx_buf = self._rx_buf + self._ser.readline().decode() + except UnicodeDecodeError: + return + + # If line incomplete, exit + if '\r\n' not in self._rx_buf: + return + + # Remove CR,LF. + f = self._rx_buf[:-2].strip() + + # Reset buffer + self._rx_buf = '' + + if not f: + return + + if f[0] == '\x01': + #self._log.debug("Ignoring frame consisting of SOH character" + str(f)) + return + + # Check for valid data format (json, keyval, binary) and pre-process into cargo if valid + c = self.pre_process_data_format(f) + + # If valid data + if c: + # Discard first data packet and send configuration + if not self._first_data_packet_received: + self._first_data_packet_received = True + self.update_all() + return + else: + return c + + self._log.debug(f) + + # Enable online calibration for EmonTx + if f=="'+++' then [Enter] for config mode": + # special silent version that does not print helptext + self._ser.write("++s\r\n".encode()) + time.sleep(0.1) + self._ser.write("k\r\n".encode()) + time.sleep(0.1) + self._ser.write("x\r\n".encode()) + + """ + # Handle config + fp = f.split(' ') + if len(fp)==1: + id = f[0] + elif len(fp)>1: + id = fp[0] + + # If the received key resides in the config map (e.g b for frequency) + # check if the config value matches the value sent from the hardware unit + # if they do not match then attempt to fix the calibration value + # Sending the value will trigger a further confirmation reply which is checked here again + if id in self._config_map: + key = self._config_map[id] + if key in self._config: + cmd = "%s%s" % (id,self._config[key]) + if f == cmd: + self._log.debug(key+" correct: "+cmd) + else: + self.send_cal(key,cmd) + self._log.debug(key+" updated: "+cmd) + return + """ + + return + + def send_cmd(self,cmd): + self._ser.write((cmd+"\n").encode()); + # Wait for reply + rx_buf = "" + start = time.time() + while (time.time()-start)<1.0: + rx_buf = rx_buf + self._ser.readline().decode() + if '\r\n' in rx_buf: + return rx_buf.strip() + return False + + def check_config_format(self): + self._config_format = "new" + if self.send_cmd("4v"): + self._config_format = "old" + self._log.debug("Config format: "+self._config_format) + if self._config_format=="new": time.sleep(2.1) + + def send_config(self,key,cmd): + reply = self.send_cmd(cmd) + if reply: + self._log.debug("CONFIG SET:"+key.ljust(12,' ')+" cmd:"+cmd.ljust(15,' ')+" reply:"+reply) + else: + self._log.error("CONFIG FAIL: "+key+" cmd: "+cmd+" (no reply)") + + + def update_all(self): + # Send all available configuration + self._log.debug("---------------------------------------------------------------------") + self.check_config_format() + for key in self._config: + if self._config_format=="new": + cmd = self._config_map_inv[key]+str(self._config[key]) + else: + cmd = str(self._config[key])+self._config_map_inv[key] + self.send_config(key,cmd) + self._log.debug("---------------------------------------------------------------------") + + def update_if_changed(self,key): + # has the setting updated + if key in self._last_config: + if self._last_config[key] != self._config[key]: + if self._config_format=="new": + cmd = self._config_map_inv[key]+str(self._config[key]) + else: + cmd = str(self._config[key])+self._config_map_inv[key] + self.send_config(key,cmd) + + def set(self, **kwargs): + + for key, setting in self._settings.items(): + if key in kwargs: + # replace default + self._settings[key] = kwargs[key] + + if "group" in kwargs: + self._config["group"] = int(kwargs["group"]) + self.update_if_changed("group") + + if "frequency" in kwargs: + self._config["frequency"] = int(kwargs["frequency"]) + self.update_if_changed("frequency") + + if "baseid" in kwargs: + self._config["baseid"] = int(kwargs["baseid"]) + self.update_if_changed("baseid") + + if "period" in kwargs: + self._config["period"] = float(kwargs["period"]) + self.update_if_changed("period") + + if "vcal" in kwargs: + self._config["vcal"] = " %.2f 0.00" % float(kwargs["vcal"]) + self.update_if_changed("vcal") + + # up to 4 ical channels + for ch in range(1,5): + key = "ical"+str(ch) + if key in kwargs: + if isinstance(kwargs[key],list): + if len(kwargs[key])==2: + self._config[key] = " %.2f %.2f" % (float(kwargs[key][0]),float(kwargs[key][1])) + else: + self._config[key] = " %.2f 0.00" % float(kwargs[key]) + self.update_if_changed(key) + + #if "cmd" in kwargs: + # self._log.debug(kwargs["cmd"]) + + + self._last_config = self._config.copy() + + def action(self): + """Actions that need to be done on a regular basis. + + This should be called in main loop by instantiater. + + """ + + t = time.time() + + # Broadcast time to synchronize emonGLCD + interval = int(self._settings['interval']) + if interval: # A value of 0 means don't do anything + if t - self._interval_timestamp > interval: + self._interval_timestamp = t + now = datetime.datetime.now() + self._log.debug(self.name + " broadcasting time: %02d:%02d" % (now.hour, now.minute)) + self._ser.write(b"00,%02d,%02d,00,s" % (now.hour, now.minute)) + + def _process_post(self, databuffer): + """Send data to server/broker or other output + + """ + + for frame in databuffer: + self._log.debug("node = " + str(frame[1]) + " node_data = " + json.dumps(frame)) + self.send(frame) + return True + + def send(self, cargo): + f = cargo + cmd = "s" + + if self.getName() in f.encoded: + data = f.encoded[self.getName()] + else: + data = f.realdata + + payload = "" + for value in data: + if not 0 < int(value) < 255: + self._log.warning(self.name + " discarding Tx packet: values out of scope") + return + payload += str(int(value)) + "," + + payload += cmd + + self._log.debug(str(f.uri) + " sent TX packet: " + payload) + self._ser.write(payload.encode()) diff --git a/src/interfacers/EmonHubRedisInterfacer.py b/src/interfacers/EmonHubRedisInterfacer.py new file mode 100644 index 00000000..cdb9de44 --- /dev/null +++ b/src/interfacers/EmonHubRedisInterfacer.py @@ -0,0 +1,90 @@ +import time +import json +import Cargo +from emonhub_interfacer import EmonHubInterfacer + +""" +[[Redis]] + Type = EmonHubRedisInterfacer + [[[init_settings]]] + redis_host = localhost + redis_port = 6379 + redis_db = 0 + [[[runtimesettings]]] + subchannels = ToEmonCMS, + prefix = "emonhub:" +""" + +"""class EmonHubRedisInterfacer + +Redis interfacer for use in development + +""" + +class EmonHubRedisInterfacer(EmonHubInterfacer): + + def __init__(self, name, redis_host='localhost', redis_port=6379, redis_db=0): + """Initialize Interfacer + + """ + # Initialization + super(EmonHubRedisInterfacer, self).__init__(name) + self._settings.update(self._defaults) + + # Interfacer specific settings + self._redis_settings = {'prefix': ''} + + # Only load module if it is installed + try: + import redis + self.r = redis.Redis(redis_host,redis_port,redis_db) + except ModuleNotFoundError as err: + self._log.error(err) + self.r = False + + def add(self, cargo): + """set data in redis + + """ + if not self.r: + return False + + nodeid = cargo.nodeid + + if len(cargo.names)<=len(cargo.realdata): + for i in range(0,len(cargo.names)): + name = cargo.names[i] + value = cargo.realdata[i] + + name_parts = [] + if self._settings['prefix']!='': name_parts.append(self._settings['prefix']) + name_parts.append(str(nodeid)) + name_parts.append(str(name)) + + name = ":".join(name_parts) + + self._log.info("redis set "+name+" "+str(value)) + try: + self.r.set(name,value) + except Exception as err: + self._log.error(err) + return False + + def set(self, **kwargs): + for key, setting in self._redis_settings.items(): + # Decide which setting value to use + if key in kwargs: + setting = kwargs[key] + else: + setting = self._redis_settings[key] + if key in self._settings and self._settings[key] == setting: + continue + elif key == 'prefix': + self._log.info("Setting %s prefix: %s", self.name, setting) + self._settings[key] = setting + continue + else: + self._log.warning("'%s' is not valid for %s: %s", setting, self.name, key) + + # include kwargs from parent + super().set(**kwargs) diff --git a/src/interfacers/EmonHubSDM120Interfacer.py b/src/interfacers/EmonHubSDM120Interfacer.py new file mode 100644 index 00000000..6505b105 --- /dev/null +++ b/src/interfacers/EmonHubSDM120Interfacer.py @@ -0,0 +1,121 @@ +import time +import json +import Cargo +import os +import glob +from emonhub_interfacer import EmonHubInterfacer + +""" +[[SDM120]] + Type = EmonHubSDM120Interfacer + [[[init_settings]]] + device = /dev/ttyUSB0 + baud = 2400 + [[[runtimesettings]]] + pubchannels = ToEmonCMS, + read_interval = 10 + nodename = sdm120 +""" + +"""class EmonHubSDM120Interfacer + +SDM120 interfacer for use in development + +""" + +class EmonHubSDM120Interfacer(EmonHubInterfacer): + + def __init__(self, name, device="/dev/modbus", baud=2400): + """Initialize Interfacer + + """ + # Initialization + super(EmonHubSDM120Interfacer, self).__init__(name) + + # This line will stop the default values printing to logfile at start-up + # self._settings.update(self._defaults) + + # Interfacer specific settings + self._SDM120_settings = {'read_interval': 10.0,'nodename':'sdm120'} + + self.next_interval = True + + # Only load module if it is installed + try: + import sdm_modbus + self._log.info("Connecting to SDM120 device="+str(device)+" baud="+str(baud)) + self._sdm = sdm_modbus.SDM120(device="/dev/modbus", baud=2400) + self._sdm_registers = sdm_modbus.registerType.INPUT + except ModuleNotFoundError as err: + self._log.error(err) + self._sdm = False + + + def read(self): + """Read data and process + + Return data as a list: [NodeID, val1, val2] + + """ + + if int(time.time())%self._settings['read_interval']==0: + if self.next_interval: + self.next_interval = False + + c = Cargo.new_cargo() + c.names = [] + c.realdata = [] + c.nodeid = self._settings['nodename'] + + if self._sdm and self._sdm.connected(): + r = self._sdm.read_all(self._sdm_registers) + + # for i in r: + # self._log.debug(i+" "+str(r[i])) + + self._log.debug("%.2f %.2f %.4f %.4f %.3f %.3f" % (r['voltage'],r['power_active'],r['power_factor'],r['frequency'],r['import_energy_active'],r['current'])) + + c.names = ['sdm_V','sdm_P','sdm_PF','sdm_FR','sdm_E','sdm_I'] + c.realdata = [ + round(r['voltage'],2), + round(r['power_active'],2), + round(r['power_factor'],4), + round(r['frequency'],4), + round(r['import_energy_active'],3), + round(r['current'],3) + ] + + if len(c.realdata)>0: + return c + else: + self._log.error("Could not read from SDM120") + + else: + self.next_interval = True + + return False + + + def set(self, **kwargs): + for key, setting in self._SDM120_settings.items(): + # Decide which setting value to use + if key in kwargs: + setting = kwargs[key] + else: + setting = self._SDM120_settings[key] + + if key in self._settings and self._settings[key] == setting: + continue + elif key == 'read_interval': + self._log.info("Setting %s read_interval: %s", self.name, setting) + self._settings[key] = float(setting) + continue + elif key == 'nodename': + self._log.info("Setting %s nodename: %s", self.name, setting) + self._settings[key] = str(setting) + continue + else: + self._log.warning("'%s' is not valid for %s: %s", setting, self.name, key) + + # include kwargs from parent + super().set(**kwargs) diff --git a/src/interfacers/EmonHubSDS011Interfacer.py b/src/interfacers/EmonHubSDS011Interfacer.py index b13fd03a..59030a79 100644 --- a/src/interfacers/EmonHubSDS011Interfacer.py +++ b/src/interfacers/EmonHubSDS011Interfacer.py @@ -3,103 +3,148 @@ import time, Cargo, serial, struct from emonhub_interfacer import EmonHubInterfacer -"""class EmonHubTemplateInterfacer - -Template interfacer for use in development +""" +class EmonHubSDS011Interfacer + +$ sudo pip3 install sds011 +https://pypi.org/project/sds011/ + +[interfacers] +### This interfacer sets up and manages the SDS011 dust sensor. +[[SDS011]] + Type = EmonHubSDS011Interfacer + [[[init_settings]]] + # default com port if using USB to UART adapter + com_port = /dev/ttyUSB0 + [[[runtimesettings]]] + # one measurment every few minutes offers decent granularity and at least a few years of lifetime to the sensor + # a value of 0 for a reading every second. + readinterval = 5 + nodename = SDS011 + pubchannels = ToEmonCMS, """ class EmonHubSDS011Interfacer(EmonHubInterfacer): - def __init__(self, name, serial_port='/dev/ttyUSB0'): - """Initialize Interfacer - - """ + def __init__(self, name, com_port="", readinterval=5): + """Initialize Interfacer""" # Initialization super(EmonHubSDS011Interfacer, self).__init__(name) + + # Only load module if it is installed + try: + from sds011 import SDS011 + except ModuleNotFoundError as err: + self._log.error(err) self._settings.update(self._defaults) - self._template_settings = {'nodename':'SDS011','readinterval':10.0} - - # Open serial port - self._ser = self._open_serial_port(serial_port, 9600) - - self.byte, self.lastbyte = "\x00", "\x00" - self.pm_25_sum = 0 - self.pm_10_sum = 0 + self._template_settings = {'nodename':'SDS011','readinterval':5} + + ### GLOBALS ### + self.previous_time = time.time() + self.warmup_time = 15 # seconds to spin up the SDS011 before taking a reading + self.sensor_present = False + self.first_reading_done = False + self.sensor_waking = False + self.timenow = time.time() self.count = 0 - self.lasttime = time.time() + self.readinterval = readinterval * 60 # convert to seconds. + + ### INIT COM PORT ### + try: + self._log.info("INFO: Opening sensor serial port...") + self.sensor = SDS011(com_port, use_query_mode=True) + self.sensor.set_work_period(read=False, work_time=0) + # self.sensor.set_work_period + self.sensor.sleep(sleep=False) # wake the sensor just in case. + time.sleep(2) + first_reading = self.sensor.query() + # sensor.set_report_mode + self.previous_time = time.time() + if first_reading is not None: + self._log.info("COM port open and SDS011 active") + self._log.info("testing reading PM2.5/PM10: " + str(first_reading)) + self.sensor_present = True + else: + self._log.error("COM port opened but sensor readings unavailable.") + self._log.info("Check connections or the selected COM port in settings") + except: + self._log.error("Couldn't open COM port") + def close(self): - """Close serial port""" - - # Close serial port - if self._ser is not None: - self._log.debug("Closing serial port") - self._ser.close() + """close interfacer""" + return - def _open_serial_port(self, serial_port, baudrate): - """Open serial port""" - - try: - s = serial.Serial(serial_port, baudrate, stopbits=1, parity="N", timeout=2) - s.flushInput() - self._log.debug("Opening serial port: " + str(serial_port) + " @ "+ str(baudrate) + " bits/s") - except serial.SerialException as e: - self._log.error(e) - s = False - return s def read(self): - """Read data and process - - """ - if not self._ser: return False - - self.lastbyte = self.byte - self.byte = self._ser.read(size=1) + '''Read data and process''' + + if not self.sensor_present: return False - # Valid packet header - if self.lastbyte == b"\xaa" and self.byte == b"\xc0": + self.timenow = time.time() - sentence = self._ser.read(size=8) # Read 8 more bytes - readings = struct.unpack('=self._settings['readinterval']: - self.lasttime = time.time() - if self.count>0: - pm_25 = round(self.pm_25_sum/self.count,3) - pm_10 = round(self.pm_10_sum/self.count,3) - self.pm_25_sum = 0 - self.pm_10_sum = 0 - self.count = 0 - self._log.debug("PM 2.5:"+str(pm_25)+"μg/m^3 PM 10:"+str(pm_10)+"μg/m^3") - + try: + if self.first_reading_done is False: + if self.timenow >= (self.previous_time + self.warmup_time): # 15 seconds warmup for first reading, just in case. + self.first_reading_done = True + self.previous_time = self.timenow + readings = self.sensor.query() + self._log.debug("First readings:" + str(readings)) + if readings is not None: + readings = list(readings) + else: return False + self.count += 1 + if self.readinterval > 30: + self.sensor.sleep() + self._log.debug("Sensor put to sleep") + # create a new cargo object, set data values + c = Cargo.new_cargo() + c.nodeid = self._settings['nodename'] + c.names = ["pm_25","pm_10","msg"] + c.realdata = [readings[0],readings[1],self.count] + self._log.info("SDS011 First Cargo : " + str(c.realdata)) + return c + + if self.timenow >= (self.previous_time + self.readinterval): + if (self.previous_time + self.readinterval) >= self.timenow: + self.sensor.sleep(sleep=False) + time.sleep(1) + self.previous_time = self.timenow + readings = self.sensor.query() + if readings is not None: readings = list(readings) + else: return False + self._log.debug("READINGS:" + str(readings)) + if self.readinterval >= 30: + self.sensor.sleep() + self._log.debug("Sensor returned to sleep") + self.sensor_waking=False + self.count += 1 # create a new cargo object, set data values c = Cargo.new_cargo() c.nodeid = self._settings['nodename'] - c.names = ["pm_25","pm_10"] - c.realdata = [pm_25,pm_10] - return c - + c.names = ["pm_25","pm_10","msg"] + c.realdata = [readings[0],readings[1],self.count] + self._log.info("SDS011 Cargo : " + str(c.realdata)) + return c + elif self.timenow >= (self.previous_time + self.readinterval - self.warmup_time): + if (self.sensor_waking == True) or (self.readinterval <= 30): + return False + self.sensor.sleep(sleep=False) + self._log.debug("Sensor warming up... 15s until reading") + self.sensor_waking=True + return False + except: + self._log.debug("An exceptional SDS011 exception has occurred!") + # nothing to return return False + def set(self, **kwargs): - """ - - """ + """ Runtime Settings """ for key, setting in self._template_settings.items(): # Decide which setting value to use @@ -110,8 +155,18 @@ def set(self, **kwargs): if key in self._settings and self._settings[key] == setting: continue elif key == 'readinterval': - self._log.info("Setting " + self.name + " readinterval: " + str(setting)) - self._settings[key] = float(setting) + self._log.info("Setting " + self.name + " readinterval: " + str(setting) + " minutes") + self._settings[key] = int(setting) + self.readinterval = int(setting) * 60 + if int(setting) == 0: + self.readinterval = 5 # fastest interval is 5 seconds. + self._log.debug("SDS011 readinterval set to : " + str(self.readinterval) + "s") + self.previous_time = time.time() + self.first_reading_done = False + try: + self.sensor.sleep(sleep=False) + except: + self._log.debug("Failed waking sensor.") continue elif key == 'nodename': self._log.info("Setting " + self.name + " nodename: " + str(setting)) diff --git a/src/interfacers/__init__.py b/src/interfacers/__init__.py index 5c0b3e78..ceca7a2c 100644 --- a/src/interfacers/__init__.py +++ b/src/interfacers/__init__.py @@ -3,10 +3,10 @@ "EmonHubSocketInterfacer", "EmonHubSerialInterfacer", "EmonHubJeeInterfacer", + "EmonHubOEMInterfacer", "EmonHubPacketGenInterfacer", "EmonHubEmoncmsHTTPInterfacer", "EmonHubMqttInterfacer", - "EmonHubTx3eInterfacer", "EmonHubVEDirectInterfacer", # "EmonHubSmilicsInterfacer", @@ -17,6 +17,10 @@ "EmonHubTeslaPowerWallInterfacer", "EmonHubSDS011Interfacer", "EmonHubModbusRenogyInterfacer", - "EmonHubTemplateInterfacer" + "EmonHubTemplateInterfacer", + "EmonHubDS18B20Interfacer", + "EmonHubRedisInterfacer", + "EmonHubSDM120Interfacer", + "EmonHubMBUSInterfacer" #"EmonFroniusModbusTcpInterfacer" ] diff --git a/version.txt b/version.txt index ebf14b46..ccbccc3d 100644 --- a/version.txt +++ b/version.txt @@ -1 +1 @@ -2.1.8 +2.2.0