Skip to content

Commit b3fb9f7

Browse files
rickwierengaclaude
andcommitted
Add pumping capability and migrate vendor backends
- PumpingCapability with calibration, chatterbox, tests - AgrowDriver (extends Driver), AgrowDosePumpArray device - Cole Parmer Masterflex backend - PumpBackend extends CapabilityBackend Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent a051237 commit b3fb9f7

15 files changed

Lines changed: 845 additions & 272 deletions

File tree

pylabrobot/agrowpumps/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .agrowdosepump_backend import AgrowChannelBackend, AgrowDosePumpArray, AgrowDriver
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
import asyncio
2+
import logging
3+
import threading
4+
import time
5+
from typing import Dict, List, Optional, Union
6+
7+
try:
8+
from pymodbus.client import AsyncModbusSerialClient # type: ignore
9+
10+
_MODBUS_IMPORT_ERROR = None
11+
except ImportError as e:
12+
AsyncModbusSerialClient = None # type: ignore
13+
_MODBUS_IMPORT_ERROR = e
14+
15+
from pylabrobot.capabilities.capability import Capability
16+
from pylabrobot.capabilities.pumping.backend import PumpBackend
17+
from pylabrobot.capabilities.pumping.calibration import PumpCalibration
18+
from pylabrobot.capabilities.pumping.pumping import PumpingCapability
19+
from pylabrobot.device import Device, Driver
20+
21+
logger = logging.getLogger("pylabrobot")
22+
23+
24+
class AgrowDriver(Driver):
25+
"""Modbus driver for Agrow dose pump arrays."""
26+
27+
def __init__(self, port: str, address: Union[int, str]):
28+
super().__init__()
29+
if _MODBUS_IMPORT_ERROR is not None:
30+
raise RuntimeError(
31+
"pymodbus is not installed. Install with: pip install pylabrobot[modbus]. "
32+
f"Import error: {_MODBUS_IMPORT_ERROR}"
33+
)
34+
if not isinstance(port, str):
35+
raise ValueError("Port must be a string")
36+
self.port = port
37+
if address not in range(0, 256):
38+
raise ValueError("Pump address out of range")
39+
self.address = int(address)
40+
self._keep_alive_thread: Optional[threading.Thread] = None
41+
self._pump_index_to_address: Optional[Dict[int, int]] = None
42+
self._modbus: Optional["AsyncModbusSerialClient"] = None
43+
self._num_channels: Optional[int] = None
44+
self._keep_alive_thread_active = False
45+
46+
@property
47+
def modbus(self) -> "AsyncModbusSerialClient":
48+
if self._modbus is None:
49+
raise RuntimeError("Modbus connection not established")
50+
return self._modbus
51+
52+
@property
53+
def pump_index_to_address(self) -> Dict[int, int]:
54+
if self._pump_index_to_address is None:
55+
raise RuntimeError("Pump mappings not established")
56+
return self._pump_index_to_address
57+
58+
@property
59+
def num_channels(self) -> int:
60+
if self._num_channels is None:
61+
raise RuntimeError("Number of channels not established")
62+
return self._num_channels
63+
64+
def _start_keep_alive_thread(self):
65+
async def keep_alive():
66+
i = 0
67+
while self._keep_alive_thread_active:
68+
time.sleep(0.1)
69+
i += 1
70+
if i == 250:
71+
await self.modbus.read_holding_registers(0, 1, unit=self.address)
72+
i = 0
73+
74+
def manage_async_keep_alive():
75+
try:
76+
loop = asyncio.new_event_loop()
77+
asyncio.set_event_loop(loop)
78+
loop.run_until_complete(keep_alive())
79+
loop.close()
80+
except Exception as e:
81+
logger.error("Error in keep alive thread: %s", e)
82+
83+
self._keep_alive_thread_active = True
84+
self._keep_alive_thread = threading.Thread(target=manage_async_keep_alive, daemon=True)
85+
self._keep_alive_thread.start()
86+
87+
async def setup(self):
88+
await self._setup_modbus()
89+
register_return = await self.modbus.read_holding_registers(19, 2, unit=self.address)
90+
self._num_channels = int(
91+
"".join(chr(r // 256) + chr(r % 256) for r in register_return.registers)[2]
92+
)
93+
self._start_keep_alive_thread()
94+
self._pump_index_to_address = {pump: pump + 100 for pump in range(0, self.num_channels)}
95+
96+
async def _setup_modbus(self):
97+
if AsyncModbusSerialClient is None:
98+
raise RuntimeError(
99+
"pymodbus is not installed. Install with: pip install pylabrobot[modbus]."
100+
f" Import error: {_MODBUS_IMPORT_ERROR}"
101+
)
102+
self._modbus = AsyncModbusSerialClient(
103+
port=self.port,
104+
baudrate=115200,
105+
timeout=1,
106+
stopbits=1,
107+
bytesize=8,
108+
parity="E",
109+
retry_on_empty=True,
110+
)
111+
await self.modbus.connect()
112+
if not self.modbus.connected:
113+
raise ConnectionError("Modbus connection failed during pump setup")
114+
115+
async def stop(self):
116+
for pump in self.pump_index_to_address:
117+
await self.write_speed(pump, 0)
118+
if self._keep_alive_thread is not None:
119+
self._keep_alive_thread_active = False
120+
self._keep_alive_thread.join()
121+
self.modbus.close()
122+
assert not self.modbus.connected, "Modbus failing to disconnect"
123+
124+
async def write_speed(self, channel: int, speed: int):
125+
if speed not in range(101):
126+
raise ValueError("Pump speed out of range. Value should be between 0 and 100.")
127+
await self.modbus.write_register(
128+
self.pump_index_to_address[channel],
129+
speed,
130+
unit=self.address,
131+
)
132+
133+
134+
class AgrowChannelBackend(PumpBackend):
135+
"""Per-channel PumpBackend adapter that delegates to a shared AgrowDriver."""
136+
137+
def __init__(self, connection: AgrowDriver, channel: int):
138+
self._driver = connection
139+
self._channel = channel
140+
141+
async def setup(self):
142+
pass # lifecycle managed by the device via AgrowDriver
143+
144+
async def stop(self):
145+
pass # lifecycle managed by the device via AgrowDriver
146+
147+
async def run_revolutions(self, num_revolutions: float):
148+
raise NotImplementedError(
149+
"Revolution based pumping commands are not available for Agrow pumps."
150+
)
151+
152+
async def run_continuously(self, speed: float):
153+
await self._driver.write_speed(self._channel, int(speed))
154+
155+
async def halt(self):
156+
await self._driver.write_speed(self._channel, 0)
157+
158+
def serialize(self):
159+
return {
160+
"port": self._driver.port,
161+
"address": self._driver.address,
162+
"channel": self._channel,
163+
}
164+
165+
166+
class AgrowDosePumpArray(Device):
167+
"""Agrow dose pump array device.
168+
169+
Exposes each channel as an individual PumpingCapability via `self.pumps`.
170+
"""
171+
172+
def __init__(
173+
self,
174+
port: str,
175+
address: Union[int, str],
176+
calibrations: Optional[List[Optional[PumpCalibration]]] = None,
177+
):
178+
self._channel_backends: List[AgrowChannelBackend] = []
179+
self.pumps: List[PumpingCapability] = []
180+
self._calibrations = calibrations
181+
super().__init__(driver=AgrowDriver(port=port, address=address))
182+
self._driver: AgrowDriver
183+
184+
async def setup(self):
185+
await self._driver.setup()
186+
num_channels = self._driver.num_channels
187+
188+
self._channel_backends = [AgrowChannelBackend(self._driver, ch) for ch in range(num_channels)]
189+
self.pumps = []
190+
for i, backend in enumerate(self._channel_backends):
191+
cal = None
192+
if self._calibrations is not None and i < len(self._calibrations):
193+
cal = self._calibrations[i]
194+
cap = PumpingCapability(backend=backend, calibration=cal)
195+
self.pumps.append(cap)
196+
197+
self._capabilities: List[Capability] = list(self.pumps)
198+
for c in self._capabilities:
199+
await c._on_setup()
200+
self._setup_finished = True
201+
202+
async def stop(self):
203+
for cap in reversed(self._capabilities):
204+
await cap._on_stop()
205+
await self._driver.stop()
206+
self._setup_finished = False
207+
208+
def serialize(self):
209+
return {
210+
"port": self._driver.port,
211+
"address": self._driver.address,
212+
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# mypy: disable-error-code="attr-defined,assignment"
2+
import unittest
3+
from unittest.mock import AsyncMock, patch
4+
5+
import pytest
6+
7+
pytest.importorskip("pymodbus")
8+
9+
from pylabrobot.agrowpumps import AgrowDosePumpArray
10+
11+
12+
class SimulatedModbusClient:
13+
"""Duck-typed modbus client for testing."""
14+
15+
def __init__(self):
16+
self._connected = False
17+
self.write_register = AsyncMock()
18+
19+
async def connect(self):
20+
self._connected = True
21+
22+
@property
23+
def connected(self):
24+
return self._connected
25+
26+
async def read_holding_registers(self, address: int, count: int, **kwargs):
27+
if "unit" not in kwargs:
28+
raise ValueError("unit must be specified")
29+
if address == 19:
30+
result = AsyncMock()
31+
result.registers = [16708, 13824, 0, 0, 0, 0, 0][:count]
32+
return result
33+
34+
def close(self, reconnect=False):
35+
self._connected = False
36+
37+
38+
class TestAgrowPumps(unittest.IsolatedAsyncioTestCase):
39+
async def asyncSetUp(self):
40+
self.device = AgrowDosePumpArray(port="simulated", address=1)
41+
42+
async def _mock_setup_modbus():
43+
self.device._driver._modbus = SimulatedModbusClient()
44+
45+
with patch.object(self.device._driver, "_setup_modbus", _mock_setup_modbus):
46+
await self.device.setup()
47+
48+
async def asyncTearDown(self):
49+
await self.device.stop()
50+
51+
async def test_setup(self):
52+
self.assertEqual(self.device._driver.port, "simulated")
53+
self.assertEqual(self.device._driver.address, 1)
54+
self.assertEqual(len(self.device.pumps), 6)
55+
self.assertEqual(
56+
self.device._driver._pump_index_to_address,
57+
{pump: pump + 100 for pump in range(0, 6)},
58+
)
59+
60+
async def test_run_continuously(self):
61+
self.device._driver.modbus.write_register.reset_mock()
62+
await self.device.pumps[0].run_continuously(speed=1)
63+
self.device._driver.modbus.write_register.assert_called_once_with(100, 1, unit=1)
64+
65+
# invalid speed: cannot be bigger than 100
66+
with self.assertRaises(ValueError):
67+
await self.device.pumps[0].run_continuously(speed=101)
68+
69+
async def test_run_revolutions(self):
70+
with self.assertRaises(NotImplementedError):
71+
await self.device.pumps[0].run_revolutions(num_revolutions=1.0)
72+
73+
async def test_halt_single_channel(self):
74+
self.device._driver.modbus.write_register.reset_mock()
75+
await self.device.pumps[2].halt()
76+
self.device._driver.modbus.write_register.assert_called_once_with(102, 0, unit=1)
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from .backend import PumpBackend
2+
from .calibration import PumpCalibration
3+
from .chatterbox import PumpChatterboxBackend
4+
from .errors import NotCalibratedError
5+
from .pumping import PumpingCapability
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from abc import ABCMeta, abstractmethod
2+
3+
from pylabrobot.capabilities.capability import CapabilityBackend
4+
5+
6+
class PumpBackend(CapabilityBackend, metaclass=ABCMeta):
7+
"""Abstract backend for a single pump."""
8+
9+
@abstractmethod
10+
async def run_revolutions(self, num_revolutions: float):
11+
"""Run for a given number of revolutions."""
12+
13+
@abstractmethod
14+
async def run_continuously(self, speed: float):
15+
"""Run continuously at a given speed. If speed is 0, halt."""
16+
17+
@abstractmethod
18+
async def halt(self):
19+
"""Halt the pump."""

0 commit comments

Comments
 (0)