Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/devices.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ protocol="TCP" # only support for TCP now
littlewordendian=false
#pollinterval=1 # Overrides global setting; device publishes at this interval
#combinemeasurements=true # Overrides global setting; Combines all measurements of a device to reduce the number of created measurements in the cloud
#retries=3 # how often each modbus command is tried at timeout, error, ...
#timeout=3 # timeout for modbus responses


[[device.registers]]
Expand Down
1 change: 1 addition & 0 deletions config/modbus.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
pollinterval=2
loglevel="INFO"
#combinemeasurements=true # if not set equals false; combines all measurements of a device to reduce the number of created measurements in the cloud
#skipontimeout=true # if not set equals false; skip the poll-model of a slave if it has a timeout on one read command

[serial]
port="/dev/ttyRS485"
Expand Down
142 changes: 96 additions & 46 deletions tedge_modbus/reader/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
import tomli
from paho.mqtt import client as mqtt_client
from pymodbus.client import ModbusTcpClient, ModbusSerialClient
from pymodbus.exceptions import ConnectionException
from pymodbus.exceptions import ConnectionException, ModbusIOException
from pymodbus.constants import Defaults
from watchdog.events import FileSystemEventHandler, DirModifiedEvent, FileModifiedEvent
from watchdog.observers import Observer

Expand Down Expand Up @@ -301,11 +302,15 @@ def get_modbus_client(self, device):
stopbits=device["stopbits"],
parity=device["parity"],
bytesize=device["databits"],
timeout=device.get("timeout", Defaults.Timeout),
retries=device.get("retries", Defaults.Retries),
)
if device["protocol"] == "TCP":
return ModbusTcpClient(
host=device["ip"],
port=device["port"],
timeout=device.get("timeout", Defaults.Timeout),
retries=device.get("retries", Defaults.Retries),
# TODO: Check if these parameters really supported by ModbusTcpClient?
auto_open=True,
auto_close=True,
Expand All @@ -315,64 +320,109 @@ def get_modbus_client(self, device):
"Expected protocol to be RTU or TCP. Got " + device["protocol"] + "."
)

def _read_block(self, device, ranges, read_fn, register_type):
"""
Read a group of Modbus ranges with unified error handling.

Returns:
dict: mapping of register/bit index to value
"""
error_messages = {
"holding": "Failed to read holding registers: %s",
"input": "Failed to read input registers: %s",
"coils": "Failed to read coils: %s",
"discrete_input": "Failed to read discrete inputs: %s",
}
value_attributes = {
"holding": "registers",
"input": "registers",
"coils": "bits",
"discrete_input": "bits",
}
results = {}

for value_range in ranges:
result = read_fn(
address=value_range[0],
count=value_range[-1] - value_range[0] + 1,
slave=device["address"],
)

if result.isError():
if isinstance(result, ModbusIOException) and self.base_config[
"modbus"
].get("skipontimeout", False):
self.logger.debug(error_messages[register_type], result)
self.logger.debug("Skip device polling.")
raise result

self.logger.error(error_messages[register_type], result)
continue

results.update(
dict(zip(value_range, getattr(result, value_attributes[register_type])))
)

return results

def get_data_from_device(self, device, poll_model):
"""Get Modbus information from the device"""
# pylint: disable=too-many-locals
client = self.get_modbus_client(device)
holding_register, input_registers, coils, discrete_input = poll_model
hr_results = {}
ir_result = {}

error = None
coil_results = {}
di_result = {}
error = None
hr_results = {}
ir_result = {}

try:
for hr_range in holding_register:
result = client.read_holding_registers(
address=hr_range[0],
count=hr_range[-1] - hr_range[0] + 1,
slave=device["address"],
)
if result.isError():
self.logger.error("Failed to read holding register: %s", result)
continue
hr_results.update(dict(zip(hr_range, result.registers)))
for ir_range in input_registers:
result = client.read_input_registers(
address=ir_range[0],
count=ir_range[-1] - ir_range[0] + 1,
slave=device["address"],
)
if result.isError():
self.logger.error("Failed to read input registers: %s", result)
continue
ir_result.update(dict(zip(ir_range, result.registers)))
for coil_range in coils:
result = client.read_coils(
address=coil_range[0],
count=coil_range[-1] - coil_range[0] + 1,
slave=device["address"],
)
if result.isError():
self.logger.error("Failed to read coils: %s", result)
continue
coil_results.update(dict(zip(coil_range, result.bits)))
for di_range in discrete_input:
result = client.read_discrete_inputs(
address=di_range[0],
count=di_range[-1] - di_range[0] + 1,
slave=device["address"],
)
if result.isError():
self.logger.error("Failed to read discrete input: %s", result)
continue
di_result.update(dict(zip(di_range, result.bits)))
hr_results = self._read_block(
device,
holding_register,
client.read_holding_registers,
"holding",
)

ir_result = self._read_block(
device,
input_registers,
client.read_input_registers,
"input",
)

coil_results = self._read_block(
device,
coils,
client.read_coils,
"coils",
)

di_result = self._read_block(
device,
discrete_input,
client.read_discrete_inputs,
"discrete_input",
)

except ConnectionException as e:
error = e
self.logger.error("Failed to connect to device: %s: %s", device["name"], e)

except ModbusIOException as e:
error = e
if self.base_config["modbus"].get("skipontimeout", False):
self.logger.info("Skip polling %s: %s", device["name"], e)
else:
self.logger.error("Failed to read: %s", e)

except Exception as e:
error = e
self.logger.error("Failed to read: %s", e)
client.close()

finally:
client.close()

return coil_results, di_result, hr_results, ir_result, error

def read_base_definition(self, base_path):
Expand Down
129 changes: 129 additions & 0 deletions tests/unit/test_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

parent_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))
sys.path.insert(0, parent_dir)
from pymodbus.exceptions import ModbusIOException
from pymodbus.bit_read_message import ReadCoilsResponse
from pymodbus.register_read_message import ReadInputRegistersResponse
import unittest
from unittest.mock import patch, MagicMock
from tedge_modbus.reader.reader import ModbusPoll
Expand Down Expand Up @@ -82,6 +85,132 @@ def test_uses_global_poll_interval_as_fallback(self):
call_args, _ = self.poll.poll_scheduler.enter.call_args
self.assertEqual(call_args[0], 5)

def test_skip_on_timeout(self):
"""
GIVEN skipontimeout is True
WHEN get_data_from_device is called
AND the device times out on the first request
THEN polling is skipped immediately
"""
# GIVEN
self.poll.base_config = {
"modbus": {
"pollinterval": 5,
"skipontimeout": True,
}
}

device_config = {
"name": "normal_poller",
"address": 1,
}

poll_model = (
[[1, 2]], # holding registers
[[1, 2]],
[[1, 2]],
[[1, 2]],
)

mock_client = MagicMock()

# Simulate Modbus timeout
mock_client.read_holding_registers.return_value = ModbusIOException(
"Modbus Error: [Input/Output] No Response received from the remote unit/Unable to decode response"
)

with patch.object(
self.poll,
"get_modbus_client",
return_value=mock_client,
):
# WHEN
coils, di, hr, ir, error = self.poll.get_data_from_device(
device_config, poll_model
)

# THEN
assert isinstance(error, ModbusIOException)
assert coils == {}
assert di == {}
assert hr == {}
assert ir == {}

# Ensure we exited early
mock_client.read_holding_registers.assert_called_once()
mock_client.read_input_registers.assert_not_called()
mock_client.read_coils.assert_not_called()
mock_client.read_discrete_inputs.assert_not_called()

mock_client.close.assert_called_once()

def test_wo_skip_on_timeout(self):
"""
GIVEN skipontimeout is False
WHEN get_data_from_device is called
AND the device times out on the first request
THEN polling continues without errors
"""
# GIVEN
self.poll.base_config = {
"modbus": {
"pollinterval": 5,
}
}

device_config = {
"name": "normal_poller",
"address": 1,
}

poll_model = (
[[1, 2]], # holding registers
[[1, 2]],
[[1, 2]],
[[1, 2]],
)

mock_client = MagicMock()

# Simulate Modbus timeout
mock_client.read_holding_registers.return_value = ModbusIOException(
"Modbus Error: [Input/Output] No Response received from the remote unit/Unable to decode response"
)
mock_client.read_input_registers.return_value = ReadInputRegistersResponse(
[100, 523]
) # Simulate a valid response for input registers
mock_client.read_coils.return_value = ReadCoilsResponse(
[True, False]
) # Simulate a valid response for coils
mock_client.read_discrete_inputs.return_value = ModbusIOException(
"Modbus Error: [Input/Output] No Response received from the remote unit/Unable to decode response"
)

with patch.object(
self.poll,
"get_modbus_client",
return_value=mock_client,
):
# WHEN
coils, di, hr, ir, error = self.poll.get_data_from_device(
device_config, poll_model
)

# THEN
assert error is None
assert coils == {1: True, 2: False}
assert di == {}
assert hr == {}
assert ir == {1: 100, 2: 523}

# Ensure we exited early
mock_client.read_holding_registers.assert_called_once()
mock_client.read_input_registers.assert_called_once()
mock_client.read_coils.assert_called_once()
mock_client.read_discrete_inputs.assert_called_once()

mock_client.close.assert_called_once()

def test_defaults_to_no_measurement_combination(self):
"""
GIVEN no global measurement combination
Expand Down
Loading