diff --git a/src/emonhub_interfacer.py b/src/emonhub_interfacer.py index 42f5080a..9ec1ad7c 100644 --- a/src/emonhub_interfacer.py +++ b/src/emonhub_interfacer.py @@ -90,6 +90,21 @@ def __init__(self, name): self.missed = {} self.rx_msg = {} + def processRxc(self, rxc): + if rxc: + rxc = self._process_rx(rxc) + if rxc: + for channel in self._settings["pubchannels"]: + + # Initialise channel if needed + if channel not in self._pub_channels: + self._pub_channels[channel] = [] + + # Add cargo item to channel + self._pub_channels[channel].append(rxc) + + + @log_exceptions_from_class_method def run(self): """ @@ -102,21 +117,16 @@ def run(self): # Only read if there is a pub channel defined for the interfacer if len(self._settings["pubchannels"]): # Read the input and process data if available - rxc = self.read() - if rxc: - rxc = self._process_rx(rxc) - if rxc: - for channel in self._settings["pubchannels"]: - self._log.debug("%d Sent to channel(start)' : %s", rxc.uri, channel) - - # Initialise channel if needed - if channel not in self._pub_channels: - self._pub_channels[channel] = [] - - # Add cargo item to channel - self._pub_channels[channel].append(rxc) - - self._log.debug("%d Sent to channel(end)' : %s", rxc.uri, channel) + result = self.read() + + if isinstance(result, list): + for rxc in result: + self.processRxc(rxc) + elif isinstance(result, dict): + for rxc in result: + self.processRxc(result[rxc]) + else: + self.processRxc(result) # Subscriber channels for channel in self._settings["subchannels"]: @@ -291,12 +301,11 @@ def _process_rx(self, cargo): if 'datalength' in ehc.nodelist[node]: del ehc.nodelist[node]['datalength'] - # If not in nodelist and pass through disabled return false if node not in ehc.nodelist and self._settings['nodelistonly']: self._log.warning("%d Discarded RX frame not in nodelist, node:%s, length:%s bytes", cargo.uri, node, len(rxc.realdata)) return False - + # Data whitening uses for ensuring rfm sync if node in ehc.nodelist and 'rx' in ehc.nodelist[node] and 'whitening' in ehc.nodelist[node]['rx']: whitening = ehc.nodelist[node]['rx']['whitening'] diff --git a/src/interfacers/EmonHubMBUSInterfacer.py b/src/interfacers/EmonHubMBUSInterfacer.py index e7bd151c..50128cfd 100644 --- a/src/interfacers/EmonHubMBUSInterfacer.py +++ b/src/interfacers/EmonHubMBUSInterfacer.py @@ -35,7 +35,7 @@ class EmonHubMBUSInterfacer(EmonHubInterfacer): - def __init__(self, name, device="/dev/ttyUSB0", device_vid=False, device_pid=False, baud=2400, use_meterbus_lib=False): + def __init__(self, name, device="/dev/ttyUSB0", device_vid=False, device_pid=False, baud=2400, use_meterbus_lib=True): """Initialize Interfacer """ @@ -63,21 +63,37 @@ def __init__(self, name, device="/dev/ttyUSB0", device_vid=False, device_pid=Fal self.invalid_count = 0 # Only load module if it is installed - self.connect() - - # If use_meterbus_lib is true, try to load module - # pip3 install pyMeterBus - self.use_meterbus_lib = False - if use_meterbus_lib: - try: - from pyMeterBus import meterbus - self.meterbus = meterbus + + try: + # If we need a socket connection, use meterbus_lib + # pip3 install pyMeterBus + if (device.index("socket://")>=0): + self._log.info("Connecting using meterbus_lib:" + device) + self.ser=serial.serial_for_url(device, str(baud), 8, 'E', 1, timeout=1) self.use_meterbus_lib = True - except ModuleNotFoundError as err: - self._log.error(err) - self.use_meterbus_lib = False + if use_meterbus_lib: + try: + self._log.info("importing mertbus_lib") + import meterbus + self.meterbus = meterbus + self.use_meterbus_lib = True + except ModuleNotFoundError as err: + self._log.error(err) + self.use_meterbus_lib = False + else: + self.connect() + + + if self.ping_address(self.ser, 1, 3): + self._log.info("ok ping") + else: + self._log.info("no reply ping") + except ModuleNotFoundError as err: + self._log.error(err) + self.ser = False + def connect(self): """Connect to MBUS @@ -127,6 +143,21 @@ def connect(self): except Exception: self._log.error("Could not connect to MBUS serial") self.ser = False + + + def ping_address(self, ser, address, retries=5, read_echo=False): + for i in range(0, retries + 1): + self.meterbus.send_ping_frame(ser, address, read_echo) + try: + frame = self.meterbus.load(self.meterbus.recv_frame(ser, 1)) + if isinstance(frame, self.meterbus.TelegramACK): + return True + except self.meterbus.MBusFrameDecodeError as e: + pass + + time.sleep(0.5) + + return False def mbus_serial_write(self,data): try: @@ -228,7 +259,7 @@ def decodeInt(self,bytes): return struct.unpack("i", bytearray(bytes))[0] return False - def parse_frame(self,data,records): + def parse_frame(self,meter, data,records): data_types = ['null','int','int','int','int','float','int','int','null','bcd','bcd','bcd','bcd','var','bcd','null'] data_lengths = [0,1,2,3,4,4,6,8,0,1,2,3,4,6,6,0] vif = { @@ -442,42 +473,50 @@ def parse_frame(self,data,records): return result - def parse_frame_meterbus_lib(self,data,records): + def parse_frame_meterbus_lib(self,meter, data,records): + self._log.debug("parse_frame_meterbus_lib"); telegram = self.meterbus.load(data) meterbus_obj = json.loads(telegram.to_JSON()) result = {} + idx = 0; for record in meterbus_obj['body']['records']: if type(record['value'])==int or type(record['value'])==float: name = record['type'].replace('VIFUnit.','').replace('VIFUnitExt.','').lower() + if name in result: + name = name + str(idx) + value = record['value'] unit = record['unit'].replace('MeasureUnit.','') result[name] = [value, unit] - + idx = idx+1 return result - def request_data(self, address, records): + def request_data(self, meter, address, records): for i in range(0,2): - self.mbus_short_frame(address, 0x5b) + if self.use_meterbus_lib: + self.meterbus.send_request_frame(self.ser, address) + else: + self.mbus_short_frame(address, 0x5b) # time.sleep(1.0) - result = self.read_data_frame(records) + result = self.read_data_frame(meter, records) if result!=None: return result else: time.sleep(0.2) - def request_data_sdm120(self, address, records): + def request_data_sdm120(self, meter, address, records): for i in range(0,2): self.mbus_request_sdm120(address) # time.sleep(1.0) - result = self.read_data_frame(records) + result = self.read_data_frame(meter, records) if result!=None: return result else: time.sleep(0.2) - def read_data_frame(self,records): + def read_data_frame(self,meter, records): data = [] bid = 0 bid_end = 255 @@ -529,9 +568,9 @@ def read_data_frame(self,records): if valid: # Parse frame if still valid if self.use_meterbus_lib: - return self.parse_frame_meterbus_lib(data,records) + return self.parse_frame_meterbus_lib(meter, data,records) else: - return self.parse_frame(data,records) + return self.parse_frame(meter, data,records) bid += 1 time.sleep(0.1) @@ -550,12 +589,24 @@ def read_data_frame(self,records): self._log.debug("Invalid count = 10. Restarting MBUS serial connection on next read") self.ser = False - def add_result_to_cargo(self,meter,c,result): + def add_result_to_cargo(self,meter,nodesName,c,result): if result != None: self._log.debug("Decoded MBUS data: " + json.dumps(result)) - + nodesNameHash = {} + for nameTranslator in nodesName: + self._log.debug("nameTranslator:" + nameTranslator); + nameTranslatorPart = nameTranslator.split(':') + nodesNameHash[nameTranslatorPart[0]]=nameTranslatorPart[1] + self._log.debug(nameTranslatorPart[0] + " <> " + nameTranslatorPart[1]); + + + for key in result: - c.names.append(meter+"_"+key) + key1=key + if key in nodesNameHash: + key1 = nodesNameHash[key] + + c.names.append(key1+"_"+meter) c.realdata.append(result[key][0]) c.units.append(result[key][1]) else: @@ -576,59 +627,72 @@ def read(self): self.next_interval = False if not self.ser: - self.connect() - - c = Cargo.new_cargo() - c.names = [] - c.realdata = [] - c.units = [] - c.nodeid = self._settings['nodename'] + try: + if use_meterbus_lib: + self._log.info("Connecting using meterbus_lib:" + device) + self.ser=serial.serial_for_url(device, str(baud), 8, 'E', 1, timeout=1) + else: + self.connect() + except Exception: + self._log.error("Could not connect to MBUS serial") + self.ser = False + res = [] + # Support for multiple MBUS meters on a single bus for meter in self._settings['meters']: + c = Cargo.new_cargo() + c.names = [] + c.realdata = [] + c.units = [] address = self._settings['meters'][meter]['address'] meter_type = self._settings['meters'][meter]['type'] - + if not self._settings['nodename']: + c.nodeid = meter + else: + c.nodeid = self._settings['nodename'] + + meterPrefix = self._settings['meters'][meter]['name']; + nodesName = self._settings['meters'][meter]['nodesName']; + res.append(c) + # Most mbus meters use standard request, page 0 or default, all records if meter_type=="standard": - result = self.request_data(address,[]) - self.add_result_to_cargo(meter,c,result) + result = self.request_data(meter, address,[]) + self.add_result_to_cargo(meterPrefix, nodesName, c,result) # Qalcosonic E3 if meter_type=="qalcosonic_e3": - result = self.request_data(address,[4,5,6,7,8,9,10,11,12,13,14,15]) - self.add_result_to_cargo(meter,c,result) + result = self.request_data(meter, address,[4,5,6,7,8,9,10,11,12,13,14,15]) + self.add_result_to_cargo(meterPrefix,nodesName,c,result) # ------------------------------------------------------ # Sontex Multical 531 if meter_type=="sontex531": # p1 self.set_page(address, 1) - result = self.request_data(address,[4,5]) - self.add_result_to_cargo(meter,c,result) + result = self.request_data(meter, address,[4,5]) + self.add_result_to_cargo(meterPrefix,nodesName,c,result) # p3 self.set_page(address, 3) - result = self.request_data(address,[1,2,3,4]) - self.add_result_to_cargo(meter,c,result) + result = self.request_data(meter, address,[1,2,3,4]) + self.add_result_to_cargo(meterPrefix,nodesName,c,result) # SDM120 special request command elif meter_type=="sdm120": # 1. Get energy data - result = self.request_data(address,[1]) - self.add_result_to_cargo(meter,c,result) + result = self.request_data(meter, address,[1]) + self.add_result_to_cargo(meterPrefix,nodesName,c,result) # 2. Get instantaneous data - result = self.request_data_sdm120(address,[1,7,11,23]) - self.add_result_to_cargo(meter,c,result) + result = self.request_data_sdm120(meter, address,[1,7,11,23]) + self.add_result_to_cargo(meterPrefix,nodesName,c,result) elif meter_type=="kamstrup403": - result = self.request_data(address,[1,4,7,8,9,10,11,12,14]) - self.add_result_to_cargo(meter,c,result) + result = self.request_data(meter, address,[1,4,7,8,9,10,11,12,14]) + self.add_result_to_cargo(meterPrefix,nodesName,c,result) # ------------------------------------------------------ - - - if len(c.realdata) > 0: - return c + return res else: self.next_interval = True @@ -666,12 +730,17 @@ def set(self, **kwargs): for meter in setting: # default address = 1 + name="" meter_type = "standard" records = [] # address if 'address' in setting[meter]: address = int(setting[meter]['address']) + if 'name' in setting[meter]: + name = setting[meter]['name'] + if 'nodesName' in setting[meter]: + nodesName = setting[meter]['nodesName'] # type e.g sdm if 'type' in setting[meter]: meter_type = str(setting[meter]['type']) @@ -679,6 +748,8 @@ def set(self, **kwargs): self._settings['meters'][meter] = { 'address':address, 'type':meter_type, + 'name':name, + 'nodesName':nodesName } continue else: @@ -686,5 +757,3 @@ def set(self, **kwargs): # include kwargs from parent super().set(**kwargs) - -