diff --git a/config/devices.toml b/config/devices.toml index 9e9a226..555b88d 100644 --- a/config/devices.toml +++ b/config/devices.toml @@ -8,6 +8,8 @@ protocol="TCP" # only support for TCP now littlewordendian=false #pollinterval=1 # Overrides global setting; device publishes at this interval #combinemeasurements=true # Overrides global setting; Combines all measurements of a device to reduce the number of created measurements in the cloud +#retries=3 # how often each modbus command is tried at timeout, error, ... +#timeout=3 # timeout for modbus responses [[device.registers]] diff --git a/config/modbus.toml b/config/modbus.toml index 98e64b0..d9493d8 100644 --- a/config/modbus.toml +++ b/config/modbus.toml @@ -2,6 +2,7 @@ pollinterval=2 loglevel="INFO" #combinemeasurements=true # if not set equals false; combines all measurements of a device to reduce the number of created measurements in the cloud +#skipontimeout=true # if not set equals false; skip the poll-model of a slave if it has a timeout on one read command [serial] port="/dev/ttyRS485" diff --git a/tedge_modbus/reader/reader.py b/tedge_modbus/reader/reader.py index d4b0253..0eda048 100644 --- a/tedge_modbus/reader/reader.py +++ b/tedge_modbus/reader/reader.py @@ -13,7 +13,8 @@ import tomli from paho.mqtt import client as mqtt_client from pymodbus.client import ModbusTcpClient, ModbusSerialClient -from pymodbus.exceptions import ConnectionException +from pymodbus.exceptions import ConnectionException, ModbusIOException +from pymodbus.constants import Defaults from watchdog.events import FileSystemEventHandler, DirModifiedEvent, FileModifiedEvent from watchdog.observers import Observer @@ -301,11 +302,15 @@ def get_modbus_client(self, device): stopbits=device["stopbits"], parity=device["parity"], bytesize=device["databits"], + timeout=device.get("timeout", Defaults.Timeout), + retries=device.get("retries", Defaults.Retries), ) if device["protocol"] == "TCP": return ModbusTcpClient( host=device["ip"], port=device["port"], + timeout=device.get("timeout", Defaults.Timeout), + retries=device.get("retries", Defaults.Retries), # TODO: Check if these parameters really supported by ModbusTcpClient? auto_open=True, auto_close=True, @@ -315,64 +320,109 @@ def get_modbus_client(self, device): "Expected protocol to be RTU or TCP. Got " + device["protocol"] + "." ) + def _read_block(self, device, ranges, read_fn, register_type): + """ + Read a group of Modbus ranges with unified error handling. + + Returns: + dict: mapping of register/bit index to value + """ + error_messages = { + "holding": "Failed to read holding registers: %s", + "input": "Failed to read input registers: %s", + "coils": "Failed to read coils: %s", + "discrete_input": "Failed to read discrete inputs: %s", + } + value_attributes = { + "holding": "registers", + "input": "registers", + "coils": "bits", + "discrete_input": "bits", + } + results = {} + + for value_range in ranges: + result = read_fn( + address=value_range[0], + count=value_range[-1] - value_range[0] + 1, + slave=device["address"], + ) + + if result.isError(): + if isinstance(result, ModbusIOException) and self.base_config[ + "modbus" + ].get("skipontimeout", False): + self.logger.debug(error_messages[register_type], result) + self.logger.debug("Skip device polling.") + raise result + + self.logger.error(error_messages[register_type], result) + continue + + results.update( + dict(zip(value_range, getattr(result, value_attributes[register_type]))) + ) + + return results + def get_data_from_device(self, device, poll_model): """Get Modbus information from the device""" - # pylint: disable=too-many-locals client = self.get_modbus_client(device) holding_register, input_registers, coils, discrete_input = poll_model - hr_results = {} - ir_result = {} + + error = None coil_results = {} di_result = {} - error = None + hr_results = {} + ir_result = {} + try: - for hr_range in holding_register: - result = client.read_holding_registers( - address=hr_range[0], - count=hr_range[-1] - hr_range[0] + 1, - slave=device["address"], - ) - if result.isError(): - self.logger.error("Failed to read holding register: %s", result) - continue - hr_results.update(dict(zip(hr_range, result.registers))) - for ir_range in input_registers: - result = client.read_input_registers( - address=ir_range[0], - count=ir_range[-1] - ir_range[0] + 1, - slave=device["address"], - ) - if result.isError(): - self.logger.error("Failed to read input registers: %s", result) - continue - ir_result.update(dict(zip(ir_range, result.registers))) - for coil_range in coils: - result = client.read_coils( - address=coil_range[0], - count=coil_range[-1] - coil_range[0] + 1, - slave=device["address"], - ) - if result.isError(): - self.logger.error("Failed to read coils: %s", result) - continue - coil_results.update(dict(zip(coil_range, result.bits))) - for di_range in discrete_input: - result = client.read_discrete_inputs( - address=di_range[0], - count=di_range[-1] - di_range[0] + 1, - slave=device["address"], - ) - if result.isError(): - self.logger.error("Failed to read discrete input: %s", result) - continue - di_result.update(dict(zip(di_range, result.bits))) + hr_results = self._read_block( + device, + holding_register, + client.read_holding_registers, + "holding", + ) + + ir_result = self._read_block( + device, + input_registers, + client.read_input_registers, + "input", + ) + + coil_results = self._read_block( + device, + coils, + client.read_coils, + "coils", + ) + + di_result = self._read_block( + device, + discrete_input, + client.read_discrete_inputs, + "discrete_input", + ) + except ConnectionException as e: error = e self.logger.error("Failed to connect to device: %s: %s", device["name"], e) + + except ModbusIOException as e: + error = e + if self.base_config["modbus"].get("skipontimeout", False): + self.logger.info("Skip polling %s: %s", device["name"], e) + else: + self.logger.error("Failed to read: %s", e) + except Exception as e: error = e self.logger.error("Failed to read: %s", e) - client.close() + + finally: + client.close() + return coil_results, di_result, hr_results, ir_result, error def read_base_definition(self, base_path): diff --git a/tests/unit/test_reader.py b/tests/unit/test_reader.py index c00f496..1b435be 100644 --- a/tests/unit/test_reader.py +++ b/tests/unit/test_reader.py @@ -3,6 +3,9 @@ parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")) sys.path.insert(0, parent_dir) +from pymodbus.exceptions import ModbusIOException +from pymodbus.bit_read_message import ReadCoilsResponse +from pymodbus.register_read_message import ReadInputRegistersResponse import unittest from unittest.mock import patch, MagicMock from tedge_modbus.reader.reader import ModbusPoll @@ -82,6 +85,132 @@ def test_uses_global_poll_interval_as_fallback(self): call_args, _ = self.poll.poll_scheduler.enter.call_args self.assertEqual(call_args[0], 5) + def test_skip_on_timeout(self): + """ + GIVEN skipontimeout is True + WHEN get_data_from_device is called + AND the device times out on the first request + THEN polling is skipped immediately + """ + # GIVEN + self.poll.base_config = { + "modbus": { + "pollinterval": 5, + "skipontimeout": True, + } + } + + device_config = { + "name": "normal_poller", + "address": 1, + } + + poll_model = ( + [[1, 2]], # holding registers + [[1, 2]], + [[1, 2]], + [[1, 2]], + ) + + mock_client = MagicMock() + + # Simulate Modbus timeout + mock_client.read_holding_registers.return_value = ModbusIOException( + "Modbus Error: [Input/Output] No Response received from the remote unit/Unable to decode response" + ) + + with patch.object( + self.poll, + "get_modbus_client", + return_value=mock_client, + ): + # WHEN + coils, di, hr, ir, error = self.poll.get_data_from_device( + device_config, poll_model + ) + + # THEN + assert isinstance(error, ModbusIOException) + assert coils == {} + assert di == {} + assert hr == {} + assert ir == {} + + # Ensure we exited early + mock_client.read_holding_registers.assert_called_once() + mock_client.read_input_registers.assert_not_called() + mock_client.read_coils.assert_not_called() + mock_client.read_discrete_inputs.assert_not_called() + + mock_client.close.assert_called_once() + + def test_wo_skip_on_timeout(self): + """ + GIVEN skipontimeout is False + WHEN get_data_from_device is called + AND the device times out on the first request + THEN polling continues without errors + """ + # GIVEN + self.poll.base_config = { + "modbus": { + "pollinterval": 5, + } + } + + device_config = { + "name": "normal_poller", + "address": 1, + } + + poll_model = ( + [[1, 2]], # holding registers + [[1, 2]], + [[1, 2]], + [[1, 2]], + ) + + mock_client = MagicMock() + + # Simulate Modbus timeout + mock_client.read_holding_registers.return_value = ModbusIOException( + "Modbus Error: [Input/Output] No Response received from the remote unit/Unable to decode response" + ) + mock_client.read_input_registers.return_value = ReadInputRegistersResponse( + [100, 523] + ) # Simulate a valid response for input registers + mock_client.read_coils.return_value = ReadCoilsResponse( + [True, False] + ) # Simulate a valid response for coils + mock_client.read_discrete_inputs.return_value = ModbusIOException( + "Modbus Error: [Input/Output] No Response received from the remote unit/Unable to decode response" + ) + + with patch.object( + self.poll, + "get_modbus_client", + return_value=mock_client, + ): + # WHEN + coils, di, hr, ir, error = self.poll.get_data_from_device( + device_config, poll_model + ) + + # THEN + assert error is None + assert coils == {1: True, 2: False} + assert di == {} + assert hr == {} + assert ir == {1: 100, 2: 523} + + # Ensure we exited early + mock_client.read_holding_registers.assert_called_once() + mock_client.read_input_registers.assert_called_once() + mock_client.read_coils.assert_called_once() + mock_client.read_discrete_inputs.assert_called_once() + + mock_client.close.assert_called_once() + def test_defaults_to_no_measurement_combination(self): """ GIVEN no global measurement combination