diff --git a/src/emonhub.py b/src/emonhub.py index aee21c02..46486900 100755 --- a/src/emonhub.py +++ b/src/emonhub.py @@ -16,6 +16,7 @@ import signal import argparse import pprint + try: import pymodbus pymodbus_found = True @@ -85,7 +86,7 @@ class EmonHub(object): - __version__ = "emonHub 'emon-pi' variant v1.2" + __version__ = "emonHub emon-pi variant v2.0.0" def __init__(self, setup): """Setup an OpenEnergyMonitor emonHub. @@ -124,6 +125,11 @@ def run(self): # Set signal handler to catch SIGINT and shutdown gracefully signal.signal(signal.SIGINT, self._sigint_handler) + # Initialise thread restart counters + restart_count={} + for I in self._interfacers.itervalues(): + restart_count[I.name]=0 + # Until asked to stop while not self._exit: @@ -133,12 +139,47 @@ def run(self): self._update_settings(self._setup.settings) # For all Interfacers + kill_list=[] for I in self._interfacers.itervalues(): - # Check thread is still running + # Check threads are still running if not I.isAlive(): - #I.start() - self._log.warning(I.name + " thread is dead") # had to be restarted") - + kill_list.append(I.name) # <-avoid modification of iterable within loop + + # Read each interfacers pub channels + for pub_channel in I._settings['pubchannels']: + + if pub_channel in I._pub_channels: + if len(I._pub_channels[pub_channel])>0: + + # POP cargo item (one at a time) + cargo = I._pub_channels[pub_channel].pop(0) + + # Post to each subscriber interface + for sub_interfacer in self._interfacers.itervalues(): + # For each subsciber channel + for sub_channel in sub_interfacer._settings['subchannels']: + # If channel names match + if sub_channel==pub_channel: + # init if empty + if not sub_channel in sub_interfacer._sub_channels: + sub_interfacer._sub_channels[sub_channel] = [] + + # APPEND cargo item + sub_interfacer._sub_channels[sub_channel].append(cargo) + + # ->avoid modification of iterable within loop + for name in kill_list: + self._log.warning(name + " thread is dead.") + + # The following should trigger a restart ... unless the + # interfacer is also removed from the settings table. + del(self._interfacers[name]) + + # Trigger restart by calling update settings + self._log.warning("Attempting to restart thread "+name+" (thread has been restarted "+str(restart_count[name])+" times...") + restart_count[name]+=1 + self._update_settings(self._setup.settings) + # Sleep until next iteration time.sleep(0.2) @@ -208,7 +249,7 @@ def _update_settings(self, settings): if I['Type'] in ('EmonModbusTcpInterfacer','EmonFroniusModbusTcpInterfacer') and not pymodbus_found : self._log.error("Python module pymodbus not installed. unable to load modbus interfacer") # This gets the class from the 'Type' string - interfacer = getattr(ehi, I['Type'])(name, **I['init_settings']) + interfacer = getattr(ehi, I['Type'])(name,**I['init_settings']) interfacer.set(**I['runtimesettings']) interfacer.init_settings = I['init_settings'] interfacer.start() diff --git a/src/interfacers/EmonHubBMWInterfacer.py b/src/interfacers/EmonHubBMWInterfacer.py index 79ebd453..eabeb62e 100644 --- a/src/interfacers/EmonHubBMWInterfacer.py +++ b/src/interfacers/EmonHubBMWInterfacer.py @@ -20,8 +20,6 @@ import requests import os.path -from pydispatch import dispatcher - from emonhub_interfacer import EmonHubInterfacer """class EmonHubBMWInterfacer diff --git a/src/interfacers/EmonHubEmoncmsHTTPInterfacer.py b/src/interfacers/EmonHubEmoncmsHTTPInterfacer.py index 60ba988b..c6491842 100644 --- a/src/interfacers/EmonHubEmoncmsHTTPInterfacer.py +++ b/src/interfacers/EmonHubEmoncmsHTTPInterfacer.py @@ -4,7 +4,6 @@ import json import urllib2 import httplib -from pydispatch import dispatcher from emonhub_interfacer import EmonHubInterfacer class EmonHubEmoncmsHTTPInterfacer(EmonHubInterfacer): @@ -26,27 +25,12 @@ def __init__(self, name): 'sendinterval': 30 } - self.buffer = [] + # Initialize message queue + self._pub_channels = {} + self._sub_channels = {} + self.lastsent = time.time() self.lastsentstatus = time.time() - - def receiver(self, cargo): - - # Create a frame of data in "emonCMS format" - f = [] - try: - f.append(float(cargo.timestamp)) - f.append(cargo.nodeid) - for i in cargo.realdata: - f.append(i) - if cargo.rssi: - f.append(cargo.rssi) - self._log.debug(str(cargo.uri) + " adding frame to buffer => "+ str(f)) - except: - self._log.warning("Failed to create emonCMS frame " + str(f)) - - # Append to bulk post buffer - self.buffer.append(f) def action(self): @@ -54,10 +38,51 @@ def action(self): if (now-self.lastsent) > (int(self._settings['sendinterval'])): self.lastsent = now - # print json.dumps(self.buffer) + if int(self._settings['senddata']): - self.bulkpost(self.buffer) - self.buffer = [] + # It might be better here to combine the output from all sub channels + # into a single bulk post, most of the time there is only one sub channel + for channel in self._settings["subchannels"]: + if channel in self._sub_channels: + + # only try to prepare and send data if there is any + if len(self._sub_channels[channel])>0: + + bulkdata = [] + + for cargo in self._sub_channels[channel]: + # Create a frame of data in "emonCMS format" + f = [] + try: + f.append(float(cargo.timestamp)) + f.append(cargo.nodeid) + for i in cargo.realdata: + f.append(i) + if cargo.rssi: + f.append(cargo.rssi) + self._log.debug(str(cargo.uri) + " adding frame to buffer => "+ str(f)) + except: + self._log.warning("Failed to create emonCMS frame " + str(f)) + + bulkdata.append(f) + + # Get the length of the data to be sent + bulkdata_length = len(bulkdata) + self._log.debug("Sending bulkdata, length: "+str(bulkdata_length)) + + # Attempt to send the data + success = self.bulkpost(bulkdata) + + self._log.debug("Sending bulkdata, success: "+str(success)) + + # if bulk post is successful delete the range posted + if success: + for i in range(0,bulkdata_length): + self._sub_channels[channel].pop(0) + self._log.debug("Deleted sent data from queue") + + self._log.debug("New queue length: "+str(len(self._sub_channels[channel]))) + if (now-self.lastsentstatus)> (int(self._settings['sendinterval'])): self.lastsentstatus = now @@ -68,7 +93,7 @@ def bulkpost(self,databuffer): if not 'apikey' in self._settings.keys() or str.__len__(str(self._settings['apikey'])) != 32 \ or str.lower(str(self._settings['apikey'])) == 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx': - return + return False data_string = json.dumps(databuffer, separators=(',', ':')) @@ -100,6 +125,7 @@ def bulkpost(self,databuffer): return True else: self._log.warning("send failure: wanted 'ok' but got '" +reply+ "'") + return False def _send_post(self, post_url, post_body=None): """ @@ -157,9 +183,3 @@ def set(self, **kwargs): if key in kwargs.keys(): # replace default self._settings[key] = kwargs[key] - - # Subscribe to internal channels - for channel in self._settings["subchannels"]: - dispatcher.connect(self.receiver, channel) - self._log.debug(self._name+" Subscribed to channel' : " + str(channel)) - diff --git a/src/interfacers/EmonHubJeeInterfacer.py b/src/interfacers/EmonHubJeeInterfacer.py index c1dea069..f98ef9a4 100644 --- a/src/interfacers/EmonHubJeeInterfacer.py +++ b/src/interfacers/EmonHubJeeInterfacer.py @@ -1,7 +1,5 @@ import time -from pydispatch import dispatcher - import datetime import Cargo import EmonHubSerialInterfacer as ehi @@ -49,6 +47,7 @@ def __init__(self, name, com_port='/dev/ttyAMA0', com_baud=0): else: self._log.warning("Device communication error - check settings") self._rx_buf="" + self._ser.flushInput() # Initialize settings @@ -66,7 +65,7 @@ def __init__(self, name, com_port='/dev/ttyAMA0', com_baud=0): # Pre-load Jee settings only if info string available for checks if all(i in self.info[1] for i in (" i", " g", " @ ", " MHz")): self._settings.update(self._jee_settings) - + def read(self): """Read data from serial port and process if complete line received. @@ -146,12 +145,6 @@ def read(self): return c - # # unix timestamp - # t = round(time.time(), 2) - # - # # Process data frame - # self._r xq.put(self._process_rx(f, t)) - def set(self, **kwargs): """Send configuration parameters to the "Jee" type device through COM port @@ -221,26 +214,10 @@ def action(self): self._ser.write("00,%02d,%02d,00,s" % (now.hour, now.minute)) def send (self, cargo): - """ - """ - #self._process_tx(self._txq.get()) - #self._rxq.put( self._process_rx(f, t)) - #dest = f[1] - #packet = f[2:-1] - #self.send_packet(packet, dest) - # TODO amalgamate into 1 send - - #def send_packet(self, packet, id=0, cmd="s"): - """ - """ f = cargo cmd = "s" - # # If the use of acks gets implemented - # ack = False - # if ack: - # cmd = "a" if self.getName() in f.encoded: data = f.encoded[self.getName()] else: @@ -257,4 +234,3 @@ def send (self, cargo): self._log.debug(str(f.uri) + " sent TX packet: " + payload) self._ser.write(payload) - diff --git a/src/interfacers/EmonHubMqttInterfacer.py b/src/interfacers/EmonHubMqttInterfacer.py index d25958c3..4e78a387 100644 --- a/src/interfacers/EmonHubMqttInterfacer.py +++ b/src/interfacers/EmonHubMqttInterfacer.py @@ -3,7 +3,6 @@ """ import time import paho.mqtt.client as mqtt -from pydispatch import dispatcher from emonhub_interfacer import EmonHubInterfacer import Cargo @@ -22,9 +21,8 @@ def __init__(self, name, mqtt_user=" ", mqtt_passwd=" ", mqtt_host="127.0.0.1", self._connected = False self._settings = { - 'subchannels':['ch1'], - 'pubchannels':['ch2'], - + 'pubchannels':[], + 'subchannels':[], # emonhub/rx/10/values format - default emoncms nodes module 'node_format_enable': 1, 'node_format_basetopic': 'emonhub/', @@ -32,15 +30,15 @@ def __init__(self, name, mqtt_user=" ", mqtt_passwd=" ", mqtt_host="127.0.0.1", # nodes/emontx/power1 format 'nodevar_format_enable': 0, 'nodevar_format_basetopic': "nodes/" - }; - + } + self._mqttc = mqtt.Client() self._mqttc.on_connect = self.on_connect self._mqttc.on_disconnect = self.on_disconnect self._mqttc.on_message = self.on_message self._mqttc.on_subscribe = self.on_subscribe - + # The action method is called from emonhub_interfacer.py method run def action(self): if not self._connected: self._log.info("Connecting to MQTT Server") @@ -50,6 +48,72 @@ def action(self): except: self._log.info("Could not connect...") time.sleep(1.0) + else: + # Itterate through sub channels + for channel in self._settings["subchannels"]: + # Check for data in sub channel buffer + + if channel in self._sub_channels: + while len(self._sub_channels[channel])>0: + cargo = self._sub_channels[channel].pop(0) + + # ---------------------------------------------------------- + # General MQTT format: emonhub/rx/emonpi/power1 ... 100 + # ---------------------------------------------------------- + if int(self._settings["nodevar_format_enable"])==1: + + # Node id or nodename if given + nodestr = str(cargo.nodeid) + if cargo.nodename!=False: nodestr = str(cargo.nodename) + + varid = 1 + for value in cargo.realdata: + # Variable id or variable name if given + varstr = str(varid) + if (varid-1) Return code: "+str(rc)) - + def on_disconnect(self, client, userdata, rc): if rc != 0: self._log.info("Unexpected disconnection") @@ -78,7 +142,7 @@ def on_disconnect(self, client, userdata, rc): def on_subscribe(self, mqttc, obj, mid, granted_qos): self._log.info("on_subscribe") - + def on_message(self, client, userdata, msg): topic_parts = msg.topic.split("/") @@ -98,76 +162,18 @@ def on_message(self, client, userdata, msg): # rxc = self._process_tx(rxc) if rxc: for channel in self._settings["pubchannels"]: - dispatcher.send(channel, cargo=rxc) + + # Initialize channel if needed + if not channel in self._pub_channels: + self._pub_channels[channel] = [] + + # Add cargo item to channel + self._pub_channels[channel].append(rxc) + self._log.debug(str(rxc.uri) + " Sent to channel' : " + str(channel)) - - def receiver(self, cargo): - if self._connected: - # ---------------------------------------------------------- - # General MQTT format: emonhub/rx/emonpi/power1 ... 100 - # ---------------------------------------------------------- - if int(self._settings["nodevar_format_enable"])==1: - - # Node id or nodename if given - nodestr = str(cargo.nodeid) - if cargo.nodename!=False: nodestr = str(cargo.nodename) - - varid = 1 - for value in cargo.realdata: - # Variable id or variable name if given - varstr = str(varid) - if (varid-1)