From b3fb9f7a0d7b64173b44ba665aead06591a4d24b Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Mon, 23 Mar 2026 15:57:10 -0700 Subject: [PATCH 1/7] Add pumping capability and migrate vendor backends - PumpingCapability with calibration, chatterbox, tests - AgrowDriver (extends Driver), AgrowDosePumpArray device - Cole Parmer Masterflex backend - PumpBackend extends CapabilityBackend Co-Authored-By: Claude Opus 4.6 (1M context) --- pylabrobot/agrowpumps/__init__.py | 1 + .../agrowpumps/agrowdosepump_backend.py | 212 ++++++++++++++++++ pylabrobot/agrowpumps/agrowdosepump_tests.py | 76 +++++++ pylabrobot/capabilities/pumping/__init__.py | 5 + pylabrobot/capabilities/pumping/backend.py | 19 ++ .../capabilities/pumping/calibration.py | 176 +++++++++++++++ pylabrobot/capabilities/pumping/chatterbox.py | 20 ++ pylabrobot/capabilities/pumping/errors.py | 2 + pylabrobot/capabilities/pumping/pumping.py | 85 +++++++ .../capabilities/pumping/pumping_tests.py | 78 +++++++ pylabrobot/cole_parmer/__init__.py | 1 + pylabrobot/cole_parmer/masterflex_backend.py | 100 +++++++++ .../pumps/agrowpumps/agrowdosepump_backend.py | 207 +++-------------- .../pumps/agrowpumps/agrowdosepump_tests.py | 50 ++--- .../pumps/cole_parmer/masterflex_backend.py | 85 ++----- 15 files changed, 845 insertions(+), 272 deletions(-) create mode 100644 pylabrobot/agrowpumps/__init__.py create mode 100644 pylabrobot/agrowpumps/agrowdosepump_backend.py create mode 100644 pylabrobot/agrowpumps/agrowdosepump_tests.py create mode 100644 pylabrobot/capabilities/pumping/__init__.py create mode 100644 pylabrobot/capabilities/pumping/backend.py create mode 100644 pylabrobot/capabilities/pumping/calibration.py create mode 100644 pylabrobot/capabilities/pumping/chatterbox.py create mode 100644 pylabrobot/capabilities/pumping/errors.py create mode 100644 pylabrobot/capabilities/pumping/pumping.py create mode 100644 pylabrobot/capabilities/pumping/pumping_tests.py create mode 100644 pylabrobot/cole_parmer/__init__.py create mode 100644 pylabrobot/cole_parmer/masterflex_backend.py diff --git a/pylabrobot/agrowpumps/__init__.py b/pylabrobot/agrowpumps/__init__.py new file mode 100644 index 00000000000..122f9c721d3 --- /dev/null +++ b/pylabrobot/agrowpumps/__init__.py @@ -0,0 +1 @@ +from .agrowdosepump_backend import AgrowChannelBackend, AgrowDosePumpArray, AgrowDriver diff --git a/pylabrobot/agrowpumps/agrowdosepump_backend.py b/pylabrobot/agrowpumps/agrowdosepump_backend.py new file mode 100644 index 00000000000..c247a17f553 --- /dev/null +++ b/pylabrobot/agrowpumps/agrowdosepump_backend.py @@ -0,0 +1,212 @@ +import asyncio +import logging +import threading +import time +from typing import Dict, List, Optional, Union + +try: + from pymodbus.client import AsyncModbusSerialClient # type: ignore + + _MODBUS_IMPORT_ERROR = None +except ImportError as e: + AsyncModbusSerialClient = None # type: ignore + _MODBUS_IMPORT_ERROR = e + +from pylabrobot.capabilities.capability import Capability +from pylabrobot.capabilities.pumping.backend import PumpBackend +from pylabrobot.capabilities.pumping.calibration import PumpCalibration +from pylabrobot.capabilities.pumping.pumping import PumpingCapability +from pylabrobot.device import Device, Driver + +logger = logging.getLogger("pylabrobot") + + +class AgrowDriver(Driver): + """Modbus driver for Agrow dose pump arrays.""" + + def __init__(self, port: str, address: Union[int, str]): + super().__init__() + if _MODBUS_IMPORT_ERROR is not None: + raise RuntimeError( + "pymodbus is not installed. Install with: pip install pylabrobot[modbus]. " + f"Import error: {_MODBUS_IMPORT_ERROR}" + ) + if not isinstance(port, str): + raise ValueError("Port must be a string") + self.port = port + if address not in range(0, 256): + raise ValueError("Pump address out of range") + self.address = int(address) + self._keep_alive_thread: Optional[threading.Thread] = None + self._pump_index_to_address: Optional[Dict[int, int]] = None + self._modbus: Optional["AsyncModbusSerialClient"] = None + self._num_channels: Optional[int] = None + self._keep_alive_thread_active = False + + @property + def modbus(self) -> "AsyncModbusSerialClient": + if self._modbus is None: + raise RuntimeError("Modbus connection not established") + return self._modbus + + @property + def pump_index_to_address(self) -> Dict[int, int]: + if self._pump_index_to_address is None: + raise RuntimeError("Pump mappings not established") + return self._pump_index_to_address + + @property + def num_channels(self) -> int: + if self._num_channels is None: + raise RuntimeError("Number of channels not established") + return self._num_channels + + def _start_keep_alive_thread(self): + async def keep_alive(): + i = 0 + while self._keep_alive_thread_active: + time.sleep(0.1) + i += 1 + if i == 250: + await self.modbus.read_holding_registers(0, 1, unit=self.address) + i = 0 + + def manage_async_keep_alive(): + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(keep_alive()) + loop.close() + except Exception as e: + logger.error("Error in keep alive thread: %s", e) + + self._keep_alive_thread_active = True + self._keep_alive_thread = threading.Thread(target=manage_async_keep_alive, daemon=True) + self._keep_alive_thread.start() + + async def setup(self): + await self._setup_modbus() + register_return = await self.modbus.read_holding_registers(19, 2, unit=self.address) + self._num_channels = int( + "".join(chr(r // 256) + chr(r % 256) for r in register_return.registers)[2] + ) + self._start_keep_alive_thread() + self._pump_index_to_address = {pump: pump + 100 for pump in range(0, self.num_channels)} + + async def _setup_modbus(self): + if AsyncModbusSerialClient is None: + raise RuntimeError( + "pymodbus is not installed. Install with: pip install pylabrobot[modbus]." + f" Import error: {_MODBUS_IMPORT_ERROR}" + ) + self._modbus = AsyncModbusSerialClient( + port=self.port, + baudrate=115200, + timeout=1, + stopbits=1, + bytesize=8, + parity="E", + retry_on_empty=True, + ) + await self.modbus.connect() + if not self.modbus.connected: + raise ConnectionError("Modbus connection failed during pump setup") + + async def stop(self): + for pump in self.pump_index_to_address: + await self.write_speed(pump, 0) + if self._keep_alive_thread is not None: + self._keep_alive_thread_active = False + self._keep_alive_thread.join() + self.modbus.close() + assert not self.modbus.connected, "Modbus failing to disconnect" + + async def write_speed(self, channel: int, speed: int): + if speed not in range(101): + raise ValueError("Pump speed out of range. Value should be between 0 and 100.") + await self.modbus.write_register( + self.pump_index_to_address[channel], + speed, + unit=self.address, + ) + + +class AgrowChannelBackend(PumpBackend): + """Per-channel PumpBackend adapter that delegates to a shared AgrowDriver.""" + + def __init__(self, connection: AgrowDriver, channel: int): + self._driver = connection + self._channel = channel + + async def setup(self): + pass # lifecycle managed by the device via AgrowDriver + + async def stop(self): + pass # lifecycle managed by the device via AgrowDriver + + async def run_revolutions(self, num_revolutions: float): + raise NotImplementedError( + "Revolution based pumping commands are not available for Agrow pumps." + ) + + async def run_continuously(self, speed: float): + await self._driver.write_speed(self._channel, int(speed)) + + async def halt(self): + await self._driver.write_speed(self._channel, 0) + + def serialize(self): + return { + "port": self._driver.port, + "address": self._driver.address, + "channel": self._channel, + } + + +class AgrowDosePumpArray(Device): + """Agrow dose pump array device. + + Exposes each channel as an individual PumpingCapability via `self.pumps`. + """ + + def __init__( + self, + port: str, + address: Union[int, str], + calibrations: Optional[List[Optional[PumpCalibration]]] = None, + ): + self._channel_backends: List[AgrowChannelBackend] = [] + self.pumps: List[PumpingCapability] = [] + self._calibrations = calibrations + super().__init__(driver=AgrowDriver(port=port, address=address)) + self._driver: AgrowDriver + + async def setup(self): + await self._driver.setup() + num_channels = self._driver.num_channels + + self._channel_backends = [AgrowChannelBackend(self._driver, ch) for ch in range(num_channels)] + self.pumps = [] + for i, backend in enumerate(self._channel_backends): + cal = None + if self._calibrations is not None and i < len(self._calibrations): + cal = self._calibrations[i] + cap = PumpingCapability(backend=backend, calibration=cal) + self.pumps.append(cap) + + self._capabilities: List[Capability] = list(self.pumps) + for c in self._capabilities: + await c._on_setup() + self._setup_finished = True + + async def stop(self): + for cap in reversed(self._capabilities): + await cap._on_stop() + await self._driver.stop() + self._setup_finished = False + + def serialize(self): + return { + "port": self._driver.port, + "address": self._driver.address, + } diff --git a/pylabrobot/agrowpumps/agrowdosepump_tests.py b/pylabrobot/agrowpumps/agrowdosepump_tests.py new file mode 100644 index 00000000000..3503185d47d --- /dev/null +++ b/pylabrobot/agrowpumps/agrowdosepump_tests.py @@ -0,0 +1,76 @@ +# mypy: disable-error-code="attr-defined,assignment" +import unittest +from unittest.mock import AsyncMock, patch + +import pytest + +pytest.importorskip("pymodbus") + +from pylabrobot.agrowpumps import AgrowDosePumpArray + + +class SimulatedModbusClient: + """Duck-typed modbus client for testing.""" + + def __init__(self): + self._connected = False + self.write_register = AsyncMock() + + async def connect(self): + self._connected = True + + @property + def connected(self): + return self._connected + + async def read_holding_registers(self, address: int, count: int, **kwargs): + if "unit" not in kwargs: + raise ValueError("unit must be specified") + if address == 19: + result = AsyncMock() + result.registers = [16708, 13824, 0, 0, 0, 0, 0][:count] + return result + + def close(self, reconnect=False): + self._connected = False + + +class TestAgrowPumps(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self.device = AgrowDosePumpArray(port="simulated", address=1) + + async def _mock_setup_modbus(): + self.device._driver._modbus = SimulatedModbusClient() + + with patch.object(self.device._driver, "_setup_modbus", _mock_setup_modbus): + await self.device.setup() + + async def asyncTearDown(self): + await self.device.stop() + + async def test_setup(self): + self.assertEqual(self.device._driver.port, "simulated") + self.assertEqual(self.device._driver.address, 1) + self.assertEqual(len(self.device.pumps), 6) + self.assertEqual( + self.device._driver._pump_index_to_address, + {pump: pump + 100 for pump in range(0, 6)}, + ) + + async def test_run_continuously(self): + self.device._driver.modbus.write_register.reset_mock() + await self.device.pumps[0].run_continuously(speed=1) + self.device._driver.modbus.write_register.assert_called_once_with(100, 1, unit=1) + + # invalid speed: cannot be bigger than 100 + with self.assertRaises(ValueError): + await self.device.pumps[0].run_continuously(speed=101) + + async def test_run_revolutions(self): + with self.assertRaises(NotImplementedError): + await self.device.pumps[0].run_revolutions(num_revolutions=1.0) + + async def test_halt_single_channel(self): + self.device._driver.modbus.write_register.reset_mock() + await self.device.pumps[2].halt() + self.device._driver.modbus.write_register.assert_called_once_with(102, 0, unit=1) diff --git a/pylabrobot/capabilities/pumping/__init__.py b/pylabrobot/capabilities/pumping/__init__.py new file mode 100644 index 00000000000..4e3c3a32ec0 --- /dev/null +++ b/pylabrobot/capabilities/pumping/__init__.py @@ -0,0 +1,5 @@ +from .backend import PumpBackend +from .calibration import PumpCalibration +from .chatterbox import PumpChatterboxBackend +from .errors import NotCalibratedError +from .pumping import PumpingCapability diff --git a/pylabrobot/capabilities/pumping/backend.py b/pylabrobot/capabilities/pumping/backend.py new file mode 100644 index 00000000000..467d22977ce --- /dev/null +++ b/pylabrobot/capabilities/pumping/backend.py @@ -0,0 +1,19 @@ +from abc import ABCMeta, abstractmethod + +from pylabrobot.capabilities.capability import CapabilityBackend + + +class PumpBackend(CapabilityBackend, metaclass=ABCMeta): + """Abstract backend for a single pump.""" + + @abstractmethod + async def run_revolutions(self, num_revolutions: float): + """Run for a given number of revolutions.""" + + @abstractmethod + async def run_continuously(self, speed: float): + """Run continuously at a given speed. If speed is 0, halt.""" + + @abstractmethod + async def halt(self): + """Halt the pump.""" diff --git a/pylabrobot/capabilities/pumping/calibration.py b/pylabrobot/capabilities/pumping/calibration.py new file mode 100644 index 00000000000..5ae1ba0e9b5 --- /dev/null +++ b/pylabrobot/capabilities/pumping/calibration.py @@ -0,0 +1,176 @@ +from __future__ import annotations + +import csv +import json +from typing import Dict, List, Literal, Optional, Union + +from pylabrobot.serializer import SerializableMixin + + +class PumpCalibration(SerializableMixin): + """Calibration for a single pump or pump array + + Attributes: + calibration: The calibration of the pump or pump array. + """ + + def __init__( + self, + calibration: List[Union[float, int]], + calibration_mode: Literal["duration", "revolutions"] = "duration", + ): + """Initialize a PumpCalibration object. + + Args: + calibration: calibration of the pump in pump-specific volume per time/revolution units. + calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for + volume per revolution. Defaults to "duration". + + Raises: + ValueError: if a value in the calibration is outside expected parameters. + """ + + if any(value <= 0 for value in calibration): + raise ValueError("A value in the calibration is is outside expected parameters.") + if calibration_mode not in ["duration", "revolutions"]: + raise ValueError("calibration_mode must be 'duration' or 'revolutions'") + self.calibration = calibration + self.calibration_mode = calibration_mode + + def __getitem__(self, item: int) -> Union[float, int]: + return self.calibration[item] # type: ignore + + def __len__(self) -> int: + """Return the length of the calibration.""" + return len(self.calibration) + + @classmethod + def load_calibration( + cls, + calibration: Optional[Union[dict, list, float, int, str]] = None, + num_items: Optional[int] = None, + calibration_mode: Literal["duration", "revolutions"] = "duration", + ) -> PumpCalibration: + """Load a calibration from a file, dictionary, list, or value. + + Args: + calibration: pump calibration file, dictionary, list, or value. + calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for + volume per revolution. Defaults to "duration". + num_items: number of items in the calibration. Required if calibration is a value. + + Raises: + NotImplementedError: if the calibration filetype or format is not supported. + ValueError: if num_items is not specified when calibration is a value. + """ + + if isinstance(calibration, dict): + return PumpCalibration.load_from_dict( + calibration=calibration, calibration_mode=calibration_mode + ) + if isinstance(calibration, list): + return PumpCalibration.load_from_list( + calibration=calibration, calibration_mode=calibration_mode + ) + if isinstance(calibration, (float, int)): + if num_items is None: + raise ValueError("num_items must be specified if calibration is a value.") + return PumpCalibration.load_from_value( + value=calibration, + num_items=num_items, + calibration_mode=calibration_mode, + ) + if isinstance(calibration, str): + if calibration.endswith(".json"): + return PumpCalibration.load_from_json( + file_path=calibration, calibration_mode=calibration_mode + ) + if calibration.endswith(".csv"): + return PumpCalibration.load_from_csv( + file_path=calibration, calibration_mode=calibration_mode + ) + raise NotImplementedError("Calibration filetype not supported.") + raise NotImplementedError("Calibration format not supported.") + + def serialize(self) -> dict: + return { + "calibration": self.calibration, + "calibration_mode": self.calibration_mode, + } + + @classmethod + def deserialize(cls, data: dict) -> PumpCalibration: + return cls( + calibration=data["calibration"], + calibration_mode=data["calibration_mode"], + ) + + @classmethod + def load_from_json( + cls, + file_path: str, + calibration_mode: Literal["duration", "revolutions"] = "duration", + ) -> PumpCalibration: + """Load a calibration from a json file.""" + + with open(file_path, "rb") as f: + calibration = json.load(f) + if isinstance(calibration, dict): + calibration = {int(key): float(value) for key, value in calibration.items()} + return PumpCalibration.load_from_dict( + calibration=calibration, calibration_mode=calibration_mode + ) + if isinstance(calibration, list): + return PumpCalibration(calibration=calibration, calibration_mode=calibration_mode) + raise TypeError(f"Calibration pulled from {file_path} is not a dictionary or list.") + + @classmethod + def load_from_csv( + cls, + file_path: str, + calibration_mode: Literal["duration", "revolutions"] = "duration", + ) -> PumpCalibration: + """Load a calibration from a csv file.""" + + with open(file_path, encoding="utf-8", newline="") as f: + csv_file = list(csv.reader(f)) + num_columns = len(csv_file[0]) + if num_columns != 2: + raise ValueError("CSV file must have two columns.") + calibration = {int(row[0]): float(row[1]) for row in csv_file} + return PumpCalibration.load_from_dict( + calibration=calibration, calibration_mode=calibration_mode + ) + + @classmethod + def load_from_dict( + cls, + calibration: Dict[int, Union[int, float]], + calibration_mode: Literal["duration", "revolutions"] = "duration", + ) -> PumpCalibration: + """Load a calibration from a dictionary (0-indexed).""" + + if sorted(calibration.keys()) != list(range(len(calibration))): + raise ValueError("Keys must be a contiguous range of integers starting at 0.") + calibration_list = [calibration[key] for key in sorted(calibration.keys())] + return cls(calibration=calibration_list, calibration_mode=calibration_mode) + + @classmethod + def load_from_list( + cls, + calibration: List[Union[int, float]], + calibration_mode: Literal["duration", "revolutions"] = "duration", + ) -> PumpCalibration: + """Load a calibration from a list.""" + return cls(calibration=calibration, calibration_mode=calibration_mode) + + @classmethod + def load_from_value( + cls, + value: Union[float, int], + num_items: int, + calibration_mode: Literal["duration", "revolutions"] = "duration", + ) -> PumpCalibration: + """Load a calibration from a single value applied to all channels.""" + calibration = [value] * num_items + return cls(calibration, calibration_mode) diff --git a/pylabrobot/capabilities/pumping/chatterbox.py b/pylabrobot/capabilities/pumping/chatterbox.py new file mode 100644 index 00000000000..70a25ebd57e --- /dev/null +++ b/pylabrobot/capabilities/pumping/chatterbox.py @@ -0,0 +1,20 @@ +from .backend import PumpBackend + + +class PumpChatterboxBackend(PumpBackend): + """Chatterbox backend for device-free testing.""" + + async def setup(self): + print("Setting up the pump.") + + async def stop(self): + print("Stopping the pump.") + + async def run_revolutions(self, num_revolutions: float): + print(f"Running {num_revolutions} revolutions.") + + async def run_continuously(self, speed: float): + print(f"Running continuously at speed {speed}.") + + async def halt(self): + print("Halting the pump.") diff --git a/pylabrobot/capabilities/pumping/errors.py b/pylabrobot/capabilities/pumping/errors.py new file mode 100644 index 00000000000..64f19783fca --- /dev/null +++ b/pylabrobot/capabilities/pumping/errors.py @@ -0,0 +1,2 @@ +class NotCalibratedError(Exception): + """Error raised when calling a method that requires the pump to be calibrated.""" diff --git a/pylabrobot/capabilities/pumping/pumping.py b/pylabrobot/capabilities/pumping/pumping.py new file mode 100644 index 00000000000..c956d1b4a9a --- /dev/null +++ b/pylabrobot/capabilities/pumping/pumping.py @@ -0,0 +1,85 @@ +import asyncio +from typing import Optional, Union + +from pylabrobot.capabilities.capability import Capability, need_capability_ready + +from .backend import PumpBackend +from .calibration import PumpCalibration + + +class PumpingCapability(Capability): + """Single-pump capability.""" + + def __init__( + self, + backend: PumpBackend, + calibration: Optional[PumpCalibration] = None, + ): + super().__init__(backend=backend) + self.backend: PumpBackend = backend + if calibration is not None and len(calibration) != 1: + raise ValueError("Calibration may only have a single item for this pump") + self.calibration = calibration + + @need_capability_ready + async def run_revolutions(self, num_revolutions: float): + """Run for a given number of revolutions. + + Args: + num_revolutions: number of revolutions to run. + """ + await self.backend.run_revolutions(num_revolutions=num_revolutions) + + @need_capability_ready + async def run_continuously(self, speed: float): + """Run continuously at a given speed. If speed is 0, the pump will be halted. + + Args: + speed: speed in rpm/pump-specific units. + """ + await self.backend.run_continuously(speed=speed) + + @need_capability_ready + async def run_for_duration(self, speed: Union[float, int], duration: Union[float, int]): + """Run the pump at specified speed for the specified duration. + + Args: + speed: speed in rpm/pump-specific units. + duration: duration in seconds. + """ + if duration < 0: + raise ValueError("Duration must be positive.") + await self.run_continuously(speed=speed) + await asyncio.sleep(duration) + await self.run_continuously(speed=0) + + @need_capability_ready + async def pump_volume(self, speed: Union[float, int], volume: Union[float, int]): + """Run the pump at specified speed for the specified volume. Requires calibration. + + Args: + speed: speed in rpm/pump-specific units. + volume: volume to pump. + """ + if self.calibration is None: + raise TypeError( + "Pump is not calibrated. Volume based pumping and related functions unavailable." + ) + if self.calibration.calibration_mode == "duration": + duration = volume / self.calibration[0] + await self.run_for_duration(speed=speed, duration=duration) + elif self.calibration.calibration_mode == "revolutions": + num_revolutions = volume / self.calibration[0] + await self.run_revolutions(num_revolutions=num_revolutions) + else: + raise ValueError("Calibration mode not recognized.") + + @need_capability_ready + async def halt(self): + """Halt the pump.""" + await self.backend.halt() + + async def _on_stop(self): + if self._setup_finished: + await self.backend.halt() + await super()._on_stop() diff --git a/pylabrobot/capabilities/pumping/pumping_tests.py b/pylabrobot/capabilities/pumping/pumping_tests.py new file mode 100644 index 00000000000..969c58c85f1 --- /dev/null +++ b/pylabrobot/capabilities/pumping/pumping_tests.py @@ -0,0 +1,78 @@ +import unittest +from unittest.mock import AsyncMock, Mock + +from pylabrobot.capabilities.pumping.backend import PumpBackend +from pylabrobot.capabilities.pumping.calibration import PumpCalibration +from pylabrobot.capabilities.pumping.pumping import PumpingCapability + + +class TestPumpingCapability(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self.mock_backend = Mock(spec=PumpBackend) + self.mock_backend.run_revolutions = AsyncMock() + self.mock_backend.run_continuously = AsyncMock() + self.mock_backend.halt = AsyncMock() + self.test_calibration = PumpCalibration.load_calibration(1, num_items=1) + + async def _make_cap(self, calibration=None): + cap = PumpingCapability(backend=self.mock_backend, calibration=calibration) + await cap._on_setup() + return cap + + async def test_setup(self): + cap = await self._make_cap() + self.assertIsNone(cap.calibration) + self.assertTrue(cap.setup_finished) + + async def test_run_revolutions(self): + cap = await self._make_cap() + await cap.run_revolutions(num_revolutions=1) + self.mock_backend.run_revolutions.assert_called_once_with(num_revolutions=1) + + async def test_run_continuously(self): + cap = await self._make_cap() + await cap.run_continuously(speed=100) + self.mock_backend.run_continuously.assert_called_once_with(speed=100) + + async def test_halt(self): + cap = await self._make_cap() + await cap.halt() + self.mock_backend.halt.assert_called_once() + + async def test_run_for_duration(self): + cap = await self._make_cap() + await cap.run_for_duration(speed=1, duration=0) + self.mock_backend.run_continuously.assert_called_with(speed=0) + + async def test_run_invalid_duration(self): + cap = await self._make_cap() + with self.assertRaises(ValueError): + await cap.run_for_duration(speed=1, duration=-1) + + async def test_pump_volume_duration_mode(self): + cap = await self._make_cap(calibration=self.test_calibration) + cap.calibration.calibration_mode = "duration" + cap.run_for_duration = AsyncMock() + await cap.pump_volume(speed=1, volume=1) + cap.run_for_duration.assert_called_once_with(speed=1, duration=1.0) + + async def test_pump_volume_revolutions_mode(self): + cap = await self._make_cap(calibration=self.test_calibration) + cap.calibration.calibration_mode = "revolutions" + cap.run_revolutions = AsyncMock() + await cap.pump_volume(speed=1, volume=1) + cap.run_revolutions.assert_called_once_with(num_revolutions=1.0) + + async def test_pump_volume_no_calibration(self): + cap = await self._make_cap() + with self.assertRaises(TypeError): + await cap.pump_volume(speed=1, volume=1) + + async def test_not_setup_raises(self): + cap = PumpingCapability(backend=self.mock_backend) + with self.assertRaises(RuntimeError): + await cap.run_continuously(speed=1) + + +if __name__ == "__main__": + unittest.main() diff --git a/pylabrobot/cole_parmer/__init__.py b/pylabrobot/cole_parmer/__init__.py new file mode 100644 index 00000000000..76b87527436 --- /dev/null +++ b/pylabrobot/cole_parmer/__init__.py @@ -0,0 +1 @@ +from .masterflex_backend import MasterflexBackend, MasterflexPump diff --git a/pylabrobot/cole_parmer/masterflex_backend.py b/pylabrobot/cole_parmer/masterflex_backend.py new file mode 100644 index 00000000000..80524c35db3 --- /dev/null +++ b/pylabrobot/cole_parmer/masterflex_backend.py @@ -0,0 +1,100 @@ +try: + import serial # type: ignore + + HAS_SERIAL = True +except ImportError as e: + HAS_SERIAL = False + _SERIAL_IMPORT_ERROR = e + +from typing import Optional + +from pylabrobot.capabilities.pumping.backend import PumpBackend +from pylabrobot.capabilities.pumping.calibration import PumpCalibration +from pylabrobot.capabilities.pumping.pumping import PumpingCapability +from pylabrobot.device import Device, Driver +from pylabrobot.io.serial import Serial + + +class MasterflexBackend(PumpBackend, Driver): + """Backend for the Cole Parmer Masterflex L/S pump + + tested on: + 07551-20 + + should be same as: + 07522-20 + 07522-30 + 07551-30 + 07575-30 + 07575-40 + + Documentation available at: + - https://pim-resources.coleparmer.com/instruction-manual/a-1299-1127b-en.pdf + - https://web.archive.org/web/20210924061132/https://pim-resources.coleparmer.com/ + instruction-manual/a-1299-1127b-en.pdf + """ + + def __init__(self, com_port: str): + if not HAS_SERIAL: + raise RuntimeError( + "pyserial is not installed. Install with: pip install pylabrobot[serial]. " + f"Import error: {_SERIAL_IMPORT_ERROR}" + ) + self.com_port = com_port + self.io = Serial( + port=self.com_port, + baudrate=4800, + timeout=1, + parity=serial.PARITY_ODD, + stopbits=serial.STOPBITS_ONE, + bytesize=serial.SEVENBITS, + human_readable_device_name="Masterflex Pump", + ) + + async def setup(self): + await self.io.setup() + await self.io.write(b"\x05") # Enquiry; ready to send. + await self.io.write(b"\x05P02\r") + + def serialize(self): + return {"type": self.__class__.__name__, "com_port": self.com_port} + + async def stop(self): + await self.io.stop() + + async def send_command(self, command: str): + command = "\x02P02" + command + "\x0d" + await self.io.write(command.encode()) + return self.io.read() + + async def run_revolutions(self, num_revolutions: float): + num_revolutions = round(num_revolutions, 2) + cmd = f"V{num_revolutions}G" + await self.send_command(cmd) + + async def run_continuously(self, speed: float): + if speed == 0: + await self.halt() + return + + direction = "+" if speed > 0 else "-" + speed_int = int(abs(speed)) + cmd = f"S{direction}{speed_int}G0" + await self.send_command(cmd) + + async def halt(self): + await self.send_command("H") + + +class MasterflexPump(Device): + """Cole Parmer Masterflex L/S pump.""" + + def __init__( + self, + com_port: str, + calibration: Optional[PumpCalibration] = None, + ): + backend = MasterflexBackend(com_port=com_port) + super().__init__(driver=backend) + self.pumping = PumpingCapability(backend=backend, calibration=calibration) + self._capabilities = [self.pumping] diff --git a/pylabrobot/legacy/pumps/agrowpumps/agrowdosepump_backend.py b/pylabrobot/legacy/pumps/agrowpumps/agrowdosepump_backend.py index 605d835a542..01d75aebbc5 100644 --- a/pylabrobot/legacy/pumps/agrowpumps/agrowdosepump_backend.py +++ b/pylabrobot/legacy/pumps/agrowpumps/agrowdosepump_backend.py @@ -1,146 +1,47 @@ -import asyncio -import logging -import threading -import time -from typing import Dict, List, Optional, Union +"""Legacy. Use pylabrobot.agrowpumps instead.""" -try: - from pymodbus.client import AsyncModbusSerialClient # type: ignore - - _MODBUS_IMPORT_ERROR = None -except ImportError as e: - AsyncModbusSerialClient = None # type: ignore - _MODBUS_IMPORT_ERROR = e +from typing import Dict, List, Union +from pylabrobot.agrowpumps.agrowdosepump_backend import AgrowChannelBackend, AgrowDriver from pylabrobot.legacy.pumps.backend import PumpArrayBackend -logger = logging.getLogger("pylabrobot") - class AgrowPumpArrayBackend(PumpArrayBackend): - """ - AgrowPumpArray allows users to control AgrowPumps via Modbus communication. - - https://www.agrowtek.com/doc/im/IM_MODBUS.pdf - https://agrowtek.com/doc/im/IM_LX1.pdf - - Attributes: - port: The port that the AgrowPumpArray is connected to. - address: The address of the AgrowPumpArray client registers. - - Properties: - num_channels: The number of channels that the AgrowPumpArray has. - pump_index_to_address: A dictionary that maps pump indices to their Modbus addresses. - """ + """Legacy. Use pylabrobot.agrowpumps.AgrowDosePumpArray instead.""" def __init__(self, port: str, address: Union[int, str]): - if _MODBUS_IMPORT_ERROR is not None: - raise RuntimeError( - "pymodbus is not installed. Install with: pip install pylabrobot[modbus]. " - f"Import error: {_MODBUS_IMPORT_ERROR}" - ) - if not isinstance(port, str): - raise ValueError("Port must be a string") - self.port = port - if address not in range(0, 256): - raise ValueError("Pump address out of range") - self.address = int(address) - self._keep_alive_thread: Optional[threading.Thread] = None - self._pump_index_to_address: Optional[Dict[int, int]] = None - self._modbus: Optional["AsyncModbusSerialClient"] = None - self._num_channels: Optional[int] = None - self._keep_alive_thread_active = False + self._driver = AgrowDriver(port=port, address=address) + self._backends: List[AgrowChannelBackend] = [] @property - def modbus(self) -> "AsyncModbusSerialClient": - """Returns the Modbus connection to the AgrowPumpArray.""" - if self._modbus is None: - raise RuntimeError("Modbus connection not established") - return self._modbus + def port(self): + return self._driver.port @property - def pump_index_to_address(self) -> Dict[int, int]: - """Returns a dictionary that maps pump indices to their Modbus addresses. + def address(self): + return self._driver.address - Returns: - Dict[int, int]: A dictionary that maps pump indices to their Modbus addresses. - """ - - if self._pump_index_to_address is None: - raise RuntimeError("Pump mappings not established") - return self._pump_index_to_address + @property + def modbus(self): + return self._driver.modbus @property def num_channels(self) -> int: - """The number of channels that the AgrowPumpArray has. - - Returns: - The number of channels that the AgrowPumpArray has. - """ - if self._num_channels is None: - raise RuntimeError("Number of channels not established") - return self._num_channels - - def start_keep_alive_thread(self): - """Creates a daemon thread that sends a Modbus request every 25 seconds to keep the connection - alive.""" - - async def keep_alive(): - """Sends a Modbus request every 25 seconds to keep the connection alive. - Sleep for 0.1 seconds so we can respond to `stop` events fast. - """ - i = 0 - while self._keep_alive_thread_active: - time.sleep(0.1) - i += 1 - if i == 250: - await self.modbus.read_holding_registers(0, 1, unit=self.address) - i = 0 - - def manage_async_keep_alive(): - """Manages the keep alive thread.""" - try: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(keep_alive()) - loop.close() - except Exception as e: - logger.error("Error in keep alive thread: %s", e) - - self._keep_alive_thread_active = True - self._keep_alive_thread = threading.Thread(target=manage_async_keep_alive, daemon=True) - self._keep_alive_thread.start() + return self._driver.num_channels + + @property + def pump_index_to_address(self) -> Dict[int, int]: + return self._driver.pump_index_to_address async def setup(self): - """Sets up the Modbus connection to the AgrowPumpArray and creates the - pump mappings needed to issue commands. - """ - await self._setup_modbus() - register_return = await self.modbus.read_holding_registers(19, 2, unit=self.address) - self._num_channels = int( - "".join(chr(r // 256) + chr(r % 256) for r in register_return.registers)[2] - ) - self.start_keep_alive_thread() - self._pump_index_to_address = {pump: pump + 100 for pump in range(0, self.num_channels)} - - async def _setup_modbus(self): - if AsyncModbusSerialClient is None: - raise RuntimeError( - "pymodbus is not installed. Install with: pip install pylabrobot[modbus]." - f" Import error: {_MODBUS_IMPORT_ERROR}" - ) - self._modbus = AsyncModbusSerialClient( - port=self.port, - baudrate=115200, - timeout=1, - stopbits=1, - bytesize=8, - parity="E", - retry_on_empty=True, - ) - await self.modbus.connect() - if not self.modbus.connected: - raise ConnectionError("Modbus connection failed during pump setup") + await self._driver.setup() + self._backends = [ + AgrowChannelBackend(self._driver, ch) for ch in range(self._driver.num_channels) + ] + + async def stop(self): + await self.halt() + await self._driver.stop() def serialize(self): return { @@ -150,66 +51,20 @@ def serialize(self): } async def run_revolutions(self, num_revolutions: List[float], use_channels: List[int]): - """Run the specified channels at the speed selected. If speed is 0, the pump will be halted. - - Args: - num_revolutions: number of revolutions to run pumps. - use_channels: pump array channels to run - - Raises: - NotImplementedError: Revolution based pumping commands are not available for this array. - """ - raise NotImplementedError( "Revolution based pumping commands are not available for this pump array." ) async def run_continuously(self, speed: List[float], use_channels: List[int]): - """Run pumps at the specified speeds. - - Args: - speed: rate at which to run pump. - use_channels: pump array channels to run - - Raises: - ValueError: Pump address out of range - ValueError: Pump speed out of range - """ - - for pump_index, pump_speed in zip(use_channels, speed): - pump_speed = int(pump_speed) - if pump_speed not in range(101): - raise ValueError("Pump speed out of range. Value should be between 0 and 100.") - await self.modbus.write_register( - self.pump_index_to_address[pump_index], - pump_speed, - unit=self.address, - ) + for channel, pump_speed in zip(use_channels, speed): + await self._backends[channel].run_continuously(pump_speed) async def halt(self): - """Halt the entire pump array.""" - assert self.modbus is not None, "Modbus connection not established" - assert self.pump_index_to_address is not None, "Pump address mapping not established" - logger.info("Halting pump array") - for pump in self.pump_index_to_address: - address = self.pump_index_to_address[pump] - await self.modbus.write_register(address, 0, unit=self.address) - - async def stop(self): - """Close the connection to the pump array.""" - await self.halt() - assert self.modbus is not None, "Modbus connection not established" - if self._keep_alive_thread is not None: - self._keep_alive_thread_active = False - self._keep_alive_thread.join() - self.modbus.close() - assert not self.modbus.connected, "Modbus failing to disconnect" - - -# Deprecated alias with warning # TODO: remove mid May 2025 (giving people 1 month to update) -# https://github.com/PyLabRobot/pylabrobot/issues/466 + for backend in self._backends: + await backend.halt() +# Deprecated alias class AgrowPumpArray: def __init__(self, *args, **kwargs): raise RuntimeError( diff --git a/pylabrobot/legacy/pumps/agrowpumps/agrowdosepump_tests.py b/pylabrobot/legacy/pumps/agrowpumps/agrowdosepump_tests.py index 4a46ba87987..546506a2b09 100644 --- a/pylabrobot/legacy/pumps/agrowpumps/agrowdosepump_tests.py +++ b/pylabrobot/legacy/pumps/agrowpumps/agrowdosepump_tests.py @@ -1,26 +1,21 @@ +# mypy: disable-error-code="attr-defined,assignment" import unittest -from unittest.mock import AsyncMock, call +from unittest.mock import AsyncMock, call, patch import pytest pytest.importorskip("pymodbus") -from pymodbus.client import AsyncModbusSerialClient # type: ignore - from pylabrobot.legacy.pumps import PumpArray from pylabrobot.legacy.pumps.agrowpumps import AgrowPumpArrayBackend -class SimulatedModbusClient(AsyncModbusSerialClient): - """ - SimulatedModbusClient allows users to simulate Modbus communication. - - Attributes: - connected: A boolean that indicates whether the simulated client is connected. - """ +class SimulatedModbusClient: + """Duck-typed modbus client for testing.""" - def __init__(self, connected: bool = False): - self._connected = connected + def __init__(self): + self._connected = False + self.write_register = AsyncMock() async def connect(self): self._connected = True @@ -29,35 +24,28 @@ async def connect(self): def connected(self): return self._connected - async def read_holding_registers(self, address: int, count: int, **kwargs): # type: ignore - """Simulates reading holding registers from the AgrowPumpArray.""" + async def read_holding_registers(self, address: int, count: int, **kwargs): if "unit" not in kwargs: raise ValueError("unit must be specified") if address == 19: - return_register = AsyncMock() - return_register.registers = [16708, 13824, 0, 0, 0, 0, 0][:count] - return return_register - - write_register = AsyncMock() + result = AsyncMock() + result.registers = [16708, 13824, 0, 0, 0, 0, 0][:count] + return result def close(self, reconnect=False): - assert not self.connected, "Modbus connection not established" self._connected = False class TestAgrowPumps(unittest.IsolatedAsyncioTestCase): - """TestAgrowPumps allows users to test AgrowPumps.""" - async def asyncSetUp(self): self.agrow_backend = AgrowPumpArrayBackend(port="simulated", address=1) async def _mock_setup_modbus(): - self.agrow_backend._modbus = SimulatedModbusClient() - - self.agrow_backend._setup_modbus = _mock_setup_modbus # type: ignore[method-assign] + self.agrow_backend._driver._modbus = SimulatedModbusClient() - self.pump_array = PumpArray(backend=self.agrow_backend, calibration=None) - await self.pump_array.setup() + with patch.object(self.agrow_backend._driver, "_setup_modbus", _mock_setup_modbus): + self.pump_array = PumpArray(backend=self.agrow_backend, calibration=None) + await self.pump_array.setup() async def asyncTearDown(self): await self.pump_array.stop() @@ -66,14 +54,14 @@ async def test_setup(self): self.assertEqual(self.agrow_backend.port, "simulated") self.assertEqual(self.agrow_backend.address, 1) self.assertEqual( - self.agrow_backend._pump_index_to_address, + self.agrow_backend.pump_index_to_address, {pump: pump + 100 for pump in range(0, 6)}, ) async def test_run_continuously(self): - self.agrow_backend.modbus.write_register.reset_mock() # type: ignore[attr-defined] + self.agrow_backend.modbus.write_register.reset_mock() await self.pump_array.run_continuously(speed=1, use_channels=[0]) - self.agrow_backend.modbus.write_register.assert_called_once_with(100, 1, unit=1) # type: ignore[attr-defined] + self.agrow_backend.modbus.write_register.assert_called_once_with(100, 1, unit=1) # invalid speed: cannot be bigger than 100 with self.assertRaises(ValueError): @@ -86,6 +74,6 @@ async def test_run_revolutions(self): async def test_halt(self): await self.pump_array.halt() - self.agrow_backend.modbus.write_register.assert_has_calls( # type: ignore[attr-defined] + self.agrow_backend.modbus.write_register.assert_has_calls( [call(100 + i, 0, unit=1) for i in range(6)] ) diff --git a/pylabrobot/legacy/pumps/cole_parmer/masterflex_backend.py b/pylabrobot/legacy/pumps/cole_parmer/masterflex_backend.py index 9612f02a024..63ed99931f7 100644 --- a/pylabrobot/legacy/pumps/cole_parmer/masterflex_backend.py +++ b/pylabrobot/legacy/pumps/cole_parmer/masterflex_backend.py @@ -1,91 +1,46 @@ -try: - import serial # type: ignore +"""Legacy. Use pylabrobot.cole_parmer instead.""" - HAS_SERIAL = True -except ImportError as e: - HAS_SERIAL = False - _SERIAL_IMPORT_ERROR = e - -from pylabrobot.io.serial import Serial +from pylabrobot.cole_parmer import masterflex_backend as _new from pylabrobot.legacy.pumps.backend import PumpBackend class MasterflexBackend(PumpBackend): - """Backend for the Cole Parmer Masterflex L/S pump + """Legacy. Use pylabrobot.cole_parmer.MasterflexBackend instead.""" - tested on: - 07551-20 + def __init__(self, com_port: str): + self._new = _new.MasterflexBackend(com_port=com_port) - should be same as: - 07522-20 - 07522-30 - 07551-30 - 07575-30 - 07575-40 + @property + def io(self): + return self._new.io - Documentation available at: - - https://pim-resources.coleparmer.com/instruction-manual/a-1299-1127b-en.pdf - - https://web.archive.org/web/20210924061132/https://pim-resources.coleparmer.com/ - instruction-manual/a-1299-1127b-en.pdf - """ - - def __init__(self, com_port: str): - if not HAS_SERIAL: - raise RuntimeError( - "pyserial is not installed. Install with: pip install pylabrobot[serial]. " - f"Import error: {_SERIAL_IMPORT_ERROR}" - ) - self.com_port = com_port - self.io = Serial( - port=self.com_port, - baudrate=4800, - timeout=1, - parity=serial.PARITY_ODD, - stopbits=serial.STOPBITS_ONE, - bytesize=serial.SEVENBITS, - human_readable_device_name="Masterflex Pump", - ) + @io.setter + def io(self, value): + self._new.io = value async def setup(self): - await self.io.setup() + await self._new.setup() - await self.io.write(b"\x05") # Enquiry; ready to send. - await self.io.write(b"\x05P02\r") + async def stop(self): + await self._new.stop() def serialize(self): - return {**super().serialize(), "com_port": self.com_port} - - async def stop(self): - await self.io.stop() + return self._new.serialize() async def send_command(self, command: str): - command = "\x02P02" + command + "\x0d" - await self.io.write(command.encode()) - return self.io.read() + return await self._new.send_command(command) async def run_revolutions(self, num_revolutions: float): - num_revolutions = round(num_revolutions, 2) - cmd = f"V{num_revolutions}G" - await self.send_command(cmd) + await self._new.run_revolutions(num_revolutions) async def run_continuously(self, speed: float): - if speed == 0: - self.halt() - return - - direction = "+" if speed > 0 else "-" - speed = int(abs(speed)) - cmd = f"S{direction}{speed}G0" - await self.send_command(cmd) + await self._new.run_continuously(speed) async def halt(self): - await self.send_command("H") - - -# Deprecated alias with warning # TODO: remove mid May 2025 (giving people 1 month to update) -# https://github.com/PyLabRobot/pylabrobot/issues/466 + await self._new.halt() +# Deprecated alias class Masterflex: def __init__(self, *args, **kwargs): raise RuntimeError("`Masterflex` is deprecated. Please use `MasterflexBackend` instead.") From 51d60b5c000ce7244cf85a146fc87dde2f9d20d1 Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Thu, 26 Mar 2026 10:05:42 -0700 Subject: [PATCH 2/7] Split MasterflexDriver from MasterflexBackend, merge v1b1 Follow CLARIOstar pattern: Driver owns connection, Backend owns capability operations. Remove stale setup/stop from capability backends. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../agrowpumps/agrowdosepump_backend.py | 6 ---- pylabrobot/cole_parmer/__init__.py | 2 +- pylabrobot/cole_parmer/masterflex_backend.py | 36 +++++++++++++------ .../pumps/cole_parmer/masterflex_backend.py | 24 +++++++------ 4 files changed, 39 insertions(+), 29 deletions(-) diff --git a/pylabrobot/agrowpumps/agrowdosepump_backend.py b/pylabrobot/agrowpumps/agrowdosepump_backend.py index c247a17f553..230c9b969e1 100644 --- a/pylabrobot/agrowpumps/agrowdosepump_backend.py +++ b/pylabrobot/agrowpumps/agrowdosepump_backend.py @@ -138,12 +138,6 @@ def __init__(self, connection: AgrowDriver, channel: int): self._driver = connection self._channel = channel - async def setup(self): - pass # lifecycle managed by the device via AgrowDriver - - async def stop(self): - pass # lifecycle managed by the device via AgrowDriver - async def run_revolutions(self, num_revolutions: float): raise NotImplementedError( "Revolution based pumping commands are not available for Agrow pumps." diff --git a/pylabrobot/cole_parmer/__init__.py b/pylabrobot/cole_parmer/__init__.py index 76b87527436..308c97ae35a 100644 --- a/pylabrobot/cole_parmer/__init__.py +++ b/pylabrobot/cole_parmer/__init__.py @@ -1 +1 @@ -from .masterflex_backend import MasterflexBackend, MasterflexPump +from .masterflex_backend import MasterflexBackend, MasterflexDriver, MasterflexPump diff --git a/pylabrobot/cole_parmer/masterflex_backend.py b/pylabrobot/cole_parmer/masterflex_backend.py index 80524c35db3..f209beeff85 100644 --- a/pylabrobot/cole_parmer/masterflex_backend.py +++ b/pylabrobot/cole_parmer/masterflex_backend.py @@ -15,8 +15,8 @@ from pylabrobot.io.serial import Serial -class MasterflexBackend(PumpBackend, Driver): - """Backend for the Cole Parmer Masterflex L/S pump +class MasterflexDriver(Driver): + """Serial driver for Cole Parmer Masterflex L/S pumps. tested on: 07551-20 @@ -35,6 +35,7 @@ class MasterflexBackend(PumpBackend, Driver): """ def __init__(self, com_port: str): + super().__init__() if not HAS_SERIAL: raise RuntimeError( "pyserial is not installed. Install with: pip install pylabrobot[serial]. " @@ -56,9 +57,6 @@ async def setup(self): await self.io.write(b"\x05") # Enquiry; ready to send. await self.io.write(b"\x05P02\r") - def serialize(self): - return {"type": self.__class__.__name__, "com_port": self.com_port} - async def stop(self): await self.io.stop() @@ -67,10 +65,20 @@ async def send_command(self, command: str): await self.io.write(command.encode()) return self.io.read() + def serialize(self): + return {"type": self.__class__.__name__, "com_port": self.com_port} + + +class MasterflexBackend(PumpBackend): + """Pump capability backend for Masterflex L/S pumps.""" + + def __init__(self, driver: MasterflexDriver): + self._driver = driver + async def run_revolutions(self, num_revolutions: float): num_revolutions = round(num_revolutions, 2) cmd = f"V{num_revolutions}G" - await self.send_command(cmd) + await self._driver.send_command(cmd) async def run_continuously(self, speed: float): if speed == 0: @@ -80,10 +88,15 @@ async def run_continuously(self, speed: float): direction = "+" if speed > 0 else "-" speed_int = int(abs(speed)) cmd = f"S{direction}{speed_int}G0" - await self.send_command(cmd) + await self._driver.send_command(cmd) async def halt(self): - await self.send_command("H") + await self._driver.send_command("H") + + def serialize(self): + return { + "com_port": self._driver.com_port, + } class MasterflexPump(Device): @@ -94,7 +107,8 @@ def __init__( com_port: str, calibration: Optional[PumpCalibration] = None, ): - backend = MasterflexBackend(com_port=com_port) - super().__init__(driver=backend) - self.pumping = PumpingCapability(backend=backend, calibration=calibration) + driver = MasterflexDriver(com_port=com_port) + super().__init__(driver=driver) + self._driver: MasterflexDriver + self.pumping = PumpingCapability(backend=MasterflexBackend(driver), calibration=calibration) self._capabilities = [self.pumping] diff --git a/pylabrobot/legacy/pumps/cole_parmer/masterflex_backend.py b/pylabrobot/legacy/pumps/cole_parmer/masterflex_backend.py index 63ed99931f7..3423b1021e8 100644 --- a/pylabrobot/legacy/pumps/cole_parmer/masterflex_backend.py +++ b/pylabrobot/legacy/pumps/cole_parmer/masterflex_backend.py @@ -1,6 +1,7 @@ """Legacy. Use pylabrobot.cole_parmer instead.""" -from pylabrobot.cole_parmer import masterflex_backend as _new +from pylabrobot.cole_parmer.masterflex_backend import MasterflexBackend as _NewBackend +from pylabrobot.cole_parmer.masterflex_backend import MasterflexDriver from pylabrobot.legacy.pumps.backend import PumpBackend @@ -8,36 +9,37 @@ class MasterflexBackend(PumpBackend): """Legacy. Use pylabrobot.cole_parmer.MasterflexBackend instead.""" def __init__(self, com_port: str): - self._new = _new.MasterflexBackend(com_port=com_port) + self._driver = MasterflexDriver(com_port=com_port) + self._backend = _NewBackend(self._driver) @property def io(self): - return self._new.io + return self._driver.io @io.setter def io(self, value): - self._new.io = value + self._driver.io = value async def setup(self): - await self._new.setup() + await self._driver.setup() async def stop(self): - await self._new.stop() + await self._driver.stop() def serialize(self): - return self._new.serialize() + return self._driver.serialize() async def send_command(self, command: str): - return await self._new.send_command(command) + return await self._driver.send_command(command) async def run_revolutions(self, num_revolutions: float): - await self._new.run_revolutions(num_revolutions) + await self._backend.run_revolutions(num_revolutions) async def run_continuously(self, speed: float): - await self._new.run_continuously(speed) + await self._backend.run_continuously(speed) async def halt(self): - await self._new.halt() + await self._backend.halt() # Deprecated alias From 0f7e6a03049b2706fbda0742cbc8cd5fcab59d2e Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Thu, 26 Mar 2026 13:26:03 -0700 Subject: [PATCH 3/7] Fix missing awaits in legacy Pump frontend Legacy Pump.run_revolutions, run_continuously, halt were not awaiting the backend coroutines. Fixed + updated test mock accordingly. Co-Authored-By: Claude Opus 4.6 (1M context) --- pylabrobot/legacy/pumps/pump.py | 38 ++--------------- pylabrobot/legacy/pumps/pump_tests.py | 2 +- pylabrobot/legacy/pumps/pumparray.py | 60 ++++----------------------- 3 files changed, 11 insertions(+), 89 deletions(-) diff --git a/pylabrobot/legacy/pumps/pump.py b/pylabrobot/legacy/pumps/pump.py index 5b9d3b31670..b35dc742139 100644 --- a/pylabrobot/legacy/pumps/pump.py +++ b/pylabrobot/legacy/pumps/pump.py @@ -39,35 +39,12 @@ def deserialize(cls, data: dict): return super().deserialize(data_copy) async def run_revolutions(self, num_revolutions: float): - """Run a given number of revolutions. This method will return after the command has been sent, - and the pump will run until `halt` is called. - - Args: - num_revolutions: number of revolutions to run - """ - - self.backend.run_revolutions(num_revolutions=num_revolutions) + await self.backend.run_revolutions(num_revolutions=num_revolutions) async def run_continuously(self, speed: float): - """Run continuously at a given speed. This method will return after the command has been sent, - and the pump will run until `halt` is called. - - If speed is 0, the pump will be halted. - - Args: - speed: speed in rpm/pump-specific units. - """ - - self.backend.run_continuously(speed=speed) + await self.backend.run_continuously(speed=speed) async def run_for_duration(self, speed: Union[float, int], duration: Union[float, int]): - """Run the pump at specified speed for the specified duration. - - Args: - speed: speed in rpm/pump-specific units. - duration: duration to run pump. - """ - if duration < 0: raise ValueError("Duration must be positive.") await self.run_continuously(speed=speed) @@ -75,14 +52,6 @@ async def run_for_duration(self, speed: Union[float, int], duration: Union[float await self.run_continuously(speed=0) async def pump_volume(self, speed: Union[float, int], volume: Union[float, int]): - """Run the pump at specified speed for the specified volume. Note that this function requires - the pump to be calibrated at the input speed. - - Args: - speed: speed in rpm/pump-specific units. - volume: volume to pump. - """ - if self.calibration is None: raise TypeError( "Pump is not calibrated. Volume based pumping and related functions unavailable." @@ -97,5 +66,4 @@ async def pump_volume(self, speed: Union[float, int], volume: Union[float, int]) raise ValueError("Calibration mode not recognized.") async def halt(self): - """Halt the pump.""" - self.backend.halt() + await self.backend.halt() diff --git a/pylabrobot/legacy/pumps/pump_tests.py b/pylabrobot/legacy/pumps/pump_tests.py index 71200e0c58d..6305ee1060b 100644 --- a/pylabrobot/legacy/pumps/pump_tests.py +++ b/pylabrobot/legacy/pumps/pump_tests.py @@ -15,7 +15,7 @@ class TestPump(unittest.IsolatedAsyncioTestCase): """ def setUp(self): - self.mock_backend = Mock(spec=PumpBackend) + self.mock_backend = AsyncMock() self.test_calibration = PumpCalibration.load_calibration(1, num_items=1) async def test_setup(self): diff --git a/pylabrobot/legacy/pumps/pumparray.py b/pylabrobot/legacy/pumps/pumparray.py index 9d7ca1a5cec..3c08400dce7 100644 --- a/pylabrobot/legacy/pumps/pumparray.py +++ b/pylabrobot/legacy/pumps/pumparray.py @@ -1,6 +1,9 @@ +"""Legacy. Use pylabrobot.agrowpumps or similar Device with PumpingCapability instead.""" + import asyncio from typing import List, Optional, Union +from pylabrobot.capabilities.pumping.pumping import PumpingCapability from pylabrobot.legacy.machines.machine import Machine from pylabrobot.legacy.pumps.backend import PumpArrayBackend from pylabrobot.legacy.pumps.calibration import PumpCalibration @@ -8,15 +11,7 @@ class PumpArray(Machine): - """Front-end for a pump array. - - Attributes: - backend: The backend that the pump array is controlled through. - calibration: The calibration of the pump. - - Properties: - num_channels: The number of channels that the pump array has. - """ + """Legacy. Use AgrowDosePumpArray or similar Device with per-channel PumpingCapability instead.""" def __init__( self, @@ -29,12 +24,6 @@ def __init__( @property def num_channels(self) -> int: - """Returns the number of channels that the pump array has. - - Returns: - int: The number of channels that the pump array has. - """ - return self.backend.num_channels def serialize(self) -> dict: @@ -59,13 +48,6 @@ async def run_revolutions( num_revolutions: Union[float, List[float]], use_channels: Union[int, List[int]], ): - """Run the specified channels for the specified number of revolutions. - - Args: - num_revolutions: number of revolutions to run pumps. - use_channels: pump array channels to run. - """ - if isinstance(use_channels, int): use_channels = [use_channels] if isinstance(num_revolutions, float): @@ -77,13 +59,6 @@ async def run_continuously( speed: Union[float, int, List[float], List[int]], use_channels: Union[int, List[int]], ): - """Run the specified channels at the specified speeds. - - Args: - speed: speed in rpm/pump-specific units. - use_channels: pump array channels to run. - """ - if isinstance(use_channels, list) and len(set(use_channels)) != len(use_channels): raise ValueError("Channels in use channels must be unique.") if isinstance(use_channels, int): @@ -93,10 +68,10 @@ async def run_continuously( if any(channel not in range(0, self.num_channels) for channel in use_channels): raise ValueError( - f"Pump address out of range for this pump array. \ - Value should be between 0 and {self.num_channels}" + f"Pump address out of range for this pump array. " + f"Value should be between 0 and {self.num_channels}" ) - if any(speed < 0 for speed in speed): + if any(s < 0 for s in speed): raise ValueError("Speed must be positive.") if isinstance(speed[0], int): speed = [float(x) for x in speed] @@ -116,14 +91,6 @@ async def run_for_duration( use_channels: Union[int, List[int]], duration: Union[float, int], ): - """Run the specified channels at the specified speeds for the specified duration. - - Args: - speed: speed in rpm/pump-specific units. - use_channels: pump array channels to run. - duration: duration to run pumps (seconds). - """ - if duration < 0: raise ValueError("Duration must be positive.") await self.run_continuously(speed=speed, use_channels=use_channels) @@ -136,19 +103,6 @@ async def pump_volume( use_channels: Union[int, List[int]], volume: Union[float, int, List[float], List[int]], ): - """Run the specified channels at the specified speeds for the specified volume. Note that this - function requires the pump to be calibrated at the input speed. - - Args: - speed: speed in rpm/pump-specific units. use_channels: pump array channels to run using - 0-index. volume: volume to pump. - calibration_mode: units of calibration. Volume per seconds ("duration") or volume per - revolution ("revolutions"). - - Raises: - NotCalibratedError: if the pump is not calibrated. - """ - if self.calibration is None: raise NotCalibratedError( "Pump is not calibrated. Volume based pumping and related functions unavailable." From a4bf0d28a166a39ec43d2c0d12817ba6ac38aa36 Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Thu, 26 Mar 2026 13:43:00 -0700 Subject: [PATCH 4/7] Port legacy Pump and PumpArray frontends to use PumpingCapability MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adapter pattern: _PumpAdapter and _ChannelAdapter wrap legacy backends into new CapabilityBackend interface. Frontends delegate to PumpingCapability — no duplicated orchestration logic. Co-Authored-By: Claude Opus 4.6 (1M context) --- pylabrobot/legacy/pumps/pump.py | 55 ++++++----- pylabrobot/legacy/pumps/pumparray.py | 136 +++++++++++++++++---------- 2 files changed, 119 insertions(+), 72 deletions(-) diff --git a/pylabrobot/legacy/pumps/pump.py b/pylabrobot/legacy/pumps/pump.py index b35dc742139..6707dd06605 100644 --- a/pylabrobot/legacy/pumps/pump.py +++ b/pylabrobot/legacy/pumps/pump.py @@ -1,12 +1,29 @@ -import asyncio from typing import Optional, Union +from pylabrobot.capabilities.pumping.backend import PumpBackend as _NewPumpBackend +from pylabrobot.capabilities.pumping.pumping import PumpingCapability from pylabrobot.legacy.machines.machine import Machine from .backend import PumpBackend from .calibration import PumpCalibration +class _PumpAdapter(_NewPumpBackend): + """Adapts a legacy PumpBackend to the new PumpBackend (CapabilityBackend).""" + + def __init__(self, legacy: PumpBackend): + self._legacy = legacy + + async def run_revolutions(self, num_revolutions: float): + await self._legacy.run_revolutions(num_revolutions=num_revolutions) + + async def run_continuously(self, speed: float): + await self._legacy.run_continuously(speed=speed) + + async def halt(self): + await self._legacy.halt() + + class Pump(Machine): """Frontend for a (peristaltic) pump.""" @@ -16,10 +33,19 @@ def __init__( calibration: Optional[PumpCalibration] = None, ): super().__init__(backend=backend) - self.backend: PumpBackend = backend # fix type + self.backend: PumpBackend = backend if calibration is not None and len(calibration) != 1: raise ValueError("Calibration may only have a single item for this pump") self.calibration = calibration + self._pumping = PumpingCapability(backend=_PumpAdapter(backend), calibration=calibration) + + async def setup(self, **backend_kwargs): + await super().setup(**backend_kwargs) + await self._pumping._on_setup() + + async def stop(self): + await self._pumping._on_stop() + await super().stop() def serialize(self) -> dict: if self.calibration is None: @@ -39,31 +65,16 @@ def deserialize(cls, data: dict): return super().deserialize(data_copy) async def run_revolutions(self, num_revolutions: float): - await self.backend.run_revolutions(num_revolutions=num_revolutions) + await self._pumping.run_revolutions(num_revolutions=num_revolutions) async def run_continuously(self, speed: float): - await self.backend.run_continuously(speed=speed) + await self._pumping.run_continuously(speed=speed) async def run_for_duration(self, speed: Union[float, int], duration: Union[float, int]): - if duration < 0: - raise ValueError("Duration must be positive.") - await self.run_continuously(speed=speed) - await asyncio.sleep(duration) - await self.run_continuously(speed=0) + await self._pumping.run_for_duration(speed=speed, duration=duration) async def pump_volume(self, speed: Union[float, int], volume: Union[float, int]): - if self.calibration is None: - raise TypeError( - "Pump is not calibrated. Volume based pumping and related functions unavailable." - ) - if self.calibration.calibration_mode == "duration": - duration = volume / self.calibration[0] - await self.run_for_duration(speed=speed, duration=duration) - elif self.calibration.calibration_mode == "revolutions": - num_revolutions = volume / self.calibration[0] - await self.run_revolutions(num_revolutions=num_revolutions) - else: - raise ValueError("Calibration mode not recognized.") + await self._pumping.pump_volume(speed=speed, volume=volume) async def halt(self): - await self.backend.halt() + await self._pumping.halt() diff --git a/pylabrobot/legacy/pumps/pumparray.py b/pylabrobot/legacy/pumps/pumparray.py index 3c08400dce7..da3543de68c 100644 --- a/pylabrobot/legacy/pumps/pumparray.py +++ b/pylabrobot/legacy/pumps/pumparray.py @@ -1,8 +1,7 @@ -"""Legacy. Use pylabrobot.agrowpumps or similar Device with PumpingCapability instead.""" - import asyncio from typing import List, Optional, Union +from pylabrobot.capabilities.pumping.backend import PumpBackend as _NewPumpBackend from pylabrobot.capabilities.pumping.pumping import PumpingCapability from pylabrobot.legacy.machines.machine import Machine from pylabrobot.legacy.pumps.backend import PumpArrayBackend @@ -10,8 +9,27 @@ from pylabrobot.legacy.pumps.errors import NotCalibratedError +class _ChannelAdapter(_NewPumpBackend): + """Adapts one channel of a legacy PumpArrayBackend to the new PumpBackend.""" + + def __init__(self, legacy: PumpArrayBackend, channel: int): + self._legacy = legacy + self._channel = channel + + async def run_revolutions(self, num_revolutions: float): + await self._legacy.run_revolutions( + num_revolutions=[num_revolutions], use_channels=[self._channel] + ) + + async def run_continuously(self, speed: float): + await self._legacy.run_continuously(speed=[speed], use_channels=[self._channel]) + + async def halt(self): + await self._legacy.run_continuously(speed=[0.0], use_channels=[self._channel]) + + class PumpArray(Machine): - """Legacy. Use AgrowDosePumpArray or similar Device with per-channel PumpingCapability instead.""" + """Front-end for a pump array.""" def __init__( self, @@ -19,13 +37,28 @@ def __init__( calibration: Optional[PumpCalibration] = None, ): super().__init__(backend=backend) - self.backend: PumpArrayBackend = backend # fix type + self.backend: PumpArrayBackend = backend self.calibration = calibration + self._pumps: List[PumpingCapability] = [] @property def num_channels(self) -> int: return self.backend.num_channels + async def setup(self, **backend_kwargs): + await super().setup(**backend_kwargs) + self._pumps = [ + PumpingCapability(backend=_ChannelAdapter(self.backend, ch)) + for ch in range(self.num_channels) + ] + for p in self._pumps: + await p._on_setup() + + async def stop(self): + for p in reversed(self._pumps): + await p._on_stop() + await super().stop() + def serialize(self) -> dict: if self.calibration is None: return super().serialize() @@ -43,47 +76,56 @@ def deserialize(cls, data: dict): data_copy["calibration"] = calibration return super().deserialize(data_copy) - async def run_revolutions( - self, - num_revolutions: Union[float, List[float]], - use_channels: Union[int, List[int]], - ): - if isinstance(use_channels, int): - use_channels = [use_channels] - if isinstance(num_revolutions, float): - num_revolutions = [num_revolutions] * len(use_channels) - await self.backend.run_revolutions(num_revolutions=num_revolutions, use_channels=use_channels) + # -- helpers ---------------------------------------------------------------- - async def run_continuously( - self, - speed: Union[float, int, List[float], List[int]], - use_channels: Union[int, List[int]], - ): - if isinstance(use_channels, list) and len(set(use_channels)) != len(use_channels): - raise ValueError("Channels in use channels must be unique.") + def _normalize_channels(self, use_channels: Union[int, List[int]]) -> List[int]: if isinstance(use_channels, int): use_channels = [use_channels] - if isinstance(speed, (float, int)): - speed = [speed] * len(use_channels) - - if any(channel not in range(0, self.num_channels) for channel in use_channels): + if len(set(use_channels)) != len(use_channels): + raise ValueError("Channels in use channels must be unique.") + if any(ch not in range(0, self.num_channels) for ch in use_channels): raise ValueError( f"Pump address out of range for this pump array. " f"Value should be between 0 and {self.num_channels}" ) + if any(ch < 0 for ch in use_channels): + raise ValueError("Channels in use channels must be positive.") + return use_channels + + @staticmethod + def _normalize_speeds( + speed: Union[float, int, List[float], List[int]], n: int + ) -> List[float]: + if isinstance(speed, (float, int)): + speed = [float(speed)] * n if any(s < 0 for s in speed): raise ValueError("Speed must be positive.") - if isinstance(speed[0], int): - speed = [float(x) for x in speed] - if len(speed) != len(use_channels): + if len(speed) != n: raise ValueError("Speed and use_channels must be the same length.") - if any(channel < 0 for channel in use_channels): - raise ValueError("Channels in use channels must be positive.") + return [float(s) for s in speed] - await self.backend.run_continuously( - speed=speed, # type: ignore[arg-type] - use_channels=use_channels, - ) + # -- public API ------------------------------------------------------------- + + async def run_revolutions( + self, + num_revolutions: Union[float, List[float]], + use_channels: Union[int, List[int]], + ): + channels = self._normalize_channels(use_channels) + if isinstance(num_revolutions, (float, int)): + num_revolutions = [float(num_revolutions)] * len(channels) + for ch, rev in zip(channels, num_revolutions): + await self._pumps[ch].run_revolutions(num_revolutions=rev) + + async def run_continuously( + self, + speed: Union[float, int, List[float], List[int]], + use_channels: Union[int, List[int]], + ): + channels = self._normalize_channels(use_channels) + speeds = self._normalize_speeds(speed, len(channels)) + for ch, s in zip(channels, speeds): + await self._pumps[ch].run_continuously(speed=s) async def run_for_duration( self, @@ -107,41 +149,35 @@ async def pump_volume( raise NotCalibratedError( "Pump is not calibrated. Volume based pumping and related functions unavailable." ) - if isinstance(use_channels, int): - use_channels = [use_channels] - if isinstance(speed, (float, int)): - speed = [speed] * len(use_channels) + channels = self._normalize_channels(use_channels) + speeds = self._normalize_speeds(speed, len(channels)) if isinstance(volume, (float, int)): - volume = [volume] * len(use_channels) + volume = [float(volume)] * len(channels) if not all(vol >= 0 for vol in volume): raise ValueError("Volume must be positive.") - if not len(speed) == len(use_channels) == len(volume): + if len(volume) != len(channels): raise ValueError("Speed, use_channels, and volume must be the same length.") if self.calibration.calibration_mode == "duration": durations = [ channel_volume / self.calibration[channel] - for channel, channel_volume in zip(use_channels, volume) + for channel, channel_volume in zip(channels, volume) ] tasks = [ asyncio.create_task( - self.run_for_duration( - speed=channel_speed, - use_channels=channel, - duration=duration, - ) + self.run_for_duration(speed=s, use_channels=ch, duration=d) ) - for channel_speed, channel, duration in zip(speed, use_channels, durations) + for s, ch, d in zip(speeds, channels, durations) ] elif self.calibration.calibration_mode == "revolutions": num_rotations = [ channel_volume / self.calibration[channel] - for channel, channel_volume in zip(use_channels, volume) + for channel, channel_volume in zip(channels, volume) ] tasks = [ asyncio.create_task( - self.run_revolutions(num_revolutions=num_rotation, use_channels=channel) + self.run_revolutions(num_revolutions=r, use_channels=ch) ) - for num_rotation, channel in zip(num_rotations, use_channels) + for r, ch in zip(num_rotations, channels) ] else: raise ValueError("Calibration mode must be 'duration' or 'revolutions'.") From 82d1a35208d2eda942aef2f28d034b3ff222da56 Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Thu, 26 Mar 2026 13:49:36 -0700 Subject: [PATCH 5/7] Fix lint and type errors: remove unused import, unify PumpCalibration Legacy PumpCalibration now re-exports from new module to avoid type mismatch when passing calibration to PumpingCapability. Co-Authored-By: Claude Opus 4.6 (1M context) --- pylabrobot/legacy/pumps/calibration.py | 220 +------------------------ pylabrobot/legacy/pumps/pump_tests.py | 2 +- 2 files changed, 4 insertions(+), 218 deletions(-) diff --git a/pylabrobot/legacy/pumps/calibration.py b/pylabrobot/legacy/pumps/calibration.py index 48a25afb6e8..82f3b8969b2 100644 --- a/pylabrobot/legacy/pumps/calibration.py +++ b/pylabrobot/legacy/pumps/calibration.py @@ -1,219 +1,5 @@ -from __future__ import annotations +"""Legacy. Use pylabrobot.capabilities.pumping.calibration instead.""" -import csv -import json -from typing import Dict, List, Literal, Optional, Union +from pylabrobot.capabilities.pumping.calibration import PumpCalibration -from pylabrobot.serializer import SerializableMixin - - -class PumpCalibration(SerializableMixin): - """Calibration for a single pump or pump array - - Attributes: - calibration: The calibration of the pump or pump array. - """ - - def __init__( - self, - calibration: List[Union[float, int]], - calibration_mode: Literal["duration", "revolutions"] = "duration", - ): - """Initialize a PumpCalibration object. - - Args: - calibration: calibration of the pump in pump-specific volume per time/revolution units. - calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for - volume per revolution. Defaults to "duration". - - Raises: - ValueError: if a value in the calibration is outside expected parameters. - """ - - if any(value <= 0 for value in calibration): - raise ValueError("A value in the calibration is is outside expected parameters.") - if calibration_mode not in ["duration", "revolutions"]: - raise ValueError("calibration_mode must be 'duration' or 'revolutions'") - self.calibration = calibration - self.calibration_mode = calibration_mode - - def __getitem__(self, item: int) -> Union[float, int]: - return self.calibration[item] # type: ignore - - def __len__(self) -> int: - """Return the length of the calibration.""" - - return len(self.calibration) - - @classmethod - def load_calibration( - cls, - calibration: Optional[Union[dict, list, float, int, str]] = None, - num_items: Optional[int] = None, - calibration_mode: Literal["duration", "revolutions"] = "duration", - ) -> PumpCalibration: - """Load a calibration from a file, dictionary, list, or value. :param calibration: pump - calibration file, dictionary, list, or value. If None, returns an empty PumpCalibration - object. - - Args: - calibration: pump calibration file, dictionary, list, or value. - calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for - volume per revolution. Defaults to "duration". - num_items: number of items in the calibration. Required if calibration is a value. - - Raises: - NotImplementedError: if the calibration filetype or format is not supported. - ValueError: if num_items is not specified when calibration is a value. - """ - - if isinstance(calibration, dict): - return PumpCalibration.load_from_dict( - calibration=calibration, calibration_mode=calibration_mode - ) - if isinstance(calibration, list): - return PumpCalibration.load_from_list( - calibration=calibration, calibration_mode=calibration_mode - ) - if isinstance(calibration, (float, int)): - if num_items is None: - raise ValueError("num_items must be specified if calibration is a value.") - return PumpCalibration.load_from_value( - value=calibration, - num_items=num_items, - calibration_mode=calibration_mode, - ) - if isinstance(calibration, str): - if calibration.endswith(".json"): - return PumpCalibration.load_from_json( - file_path=calibration, calibration_mode=calibration_mode - ) - if calibration.endswith(".csv"): - return PumpCalibration.load_from_csv( - file_path=calibration, calibration_mode=calibration_mode - ) - raise NotImplementedError("Calibration filetype not supported.") - raise NotImplementedError("Calibration format not supported.") - - def serialize(self) -> dict: - return { - "calibration": self.calibration, - "calibration_mode": self.calibration_mode, - } - - @classmethod - def deserialize(cls, data: dict) -> PumpCalibration: - return cls( - calibration=data["calibration"], - calibration_mode=data["calibration_mode"], - ) - - @classmethod - def load_from_json( - cls, - file_path: str, - calibration_mode: Literal["duration", "revolutions"] = "duration", - ) -> PumpCalibration: - """Load a calibration from a json file. - - Args: - file_path: json file to load calibration from. - calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for - volume per revolution. Defaults to "duration". - - Raises: - TypeError: if the calibration pulled from the json is not a dictionary or list. - """ - - with open(file_path, "rb") as f: - calibration = json.load(f) - if isinstance(calibration, dict): - calibration = {int(key): float(value) for key, value in calibration.items()} - return PumpCalibration.load_from_dict( - calibration=calibration, calibration_mode=calibration_mode - ) - if isinstance(calibration, list): - return PumpCalibration(calibration=calibration, calibration_mode=calibration_mode) - raise TypeError(f"Calibration pulled from {file_path} is not a dictionary or list.") - - @classmethod - def load_from_csv( - cls, - file_path: str, - calibration_mode: Literal["duration", "revolutions"] = "duration", - ) -> PumpCalibration: - """Load a calibration from a csv file. - - Args: - file_path: csv file to load calibration from. 0-indexed. The first column is treated as the - index, the second column as the value. - calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for - volume per revolution. Defaults to "duration". - """ - - with open(file_path, encoding="utf-8", newline="") as f: - csv_file = list(csv.reader(f)) - num_columns = len(csv_file[0]) - if num_columns != 2: - raise ValueError("CSV file must have two columns.") - calibration = {int(row[0]): float(row[1]) for row in csv_file} - return PumpCalibration.load_from_dict( - calibration=calibration, calibration_mode=calibration_mode - ) - - @classmethod - def load_from_dict( - cls, - calibration: Dict[int, Union[int, float]], - calibration_mode: Literal["duration", "revolutions"] = "duration", - ) -> PumpCalibration: - """Load a calibration from a dictionary. - - Args: - calibration: dictionary to load calibration from. 0-indexed. - calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for - volume per revolution. Defaults to "duration". - - Raises: - ValueError: if the calibration dictionary is not formatted correctly. - """ - - if sorted(calibration.keys()) != list(range(len(calibration))): - raise ValueError("Keys must be a contiguous range of integers starting at 0.") - calibration_list = [calibration[key] for key in sorted(calibration.keys())] - return cls(calibration=calibration_list, calibration_mode=calibration_mode) - - @classmethod - def load_from_list( - cls, - calibration: List[Union[int, float]], - calibration_mode: Literal["duration", "revolutions"] = "duration", - ) -> PumpCalibration: - """Load a calibration from a list. Equivalent to PumpCalibration(calibration). - - Args: - calibration: list to load calibration from. - calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for - volume per revolution. Defaults to "duration". - """ - - return cls(calibration=calibration, calibration_mode=calibration_mode) - - @classmethod - def load_from_value( - cls, - value: Union[float, int], - num_items: int, - calibration_mode: Literal["duration", "revolutions"] = "duration", - ) -> PumpCalibration: - """Load a calibration from a value. Equivalent to PumpCalibration([value] * num_items). - - Args: - value: value to load calibration from. - num_items: number of items in the calibration. - calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for - volume per revolution. Defaults to "duration". - """ - - calibration = [value] * num_items - return cls(calibration, calibration_mode) +__all__ = ["PumpCalibration"] diff --git a/pylabrobot/legacy/pumps/pump_tests.py b/pylabrobot/legacy/pumps/pump_tests.py index 6305ee1060b..344d089d679 100644 --- a/pylabrobot/legacy/pumps/pump_tests.py +++ b/pylabrobot/legacy/pumps/pump_tests.py @@ -2,7 +2,7 @@ from unittest.mock import AsyncMock, Mock from pylabrobot.legacy.pumps import PumpArray -from pylabrobot.legacy.pumps.backend import PumpArrayBackend, PumpBackend +from pylabrobot.legacy.pumps.backend import PumpArrayBackend from pylabrobot.legacy.pumps.calibration import PumpCalibration from pylabrobot.legacy.pumps.errors import NotCalibratedError from pylabrobot.legacy.pumps.pump import Pump From 2c8bd98984a6ed19b5df09216e0af7c65671e62c Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Thu, 26 Mar 2026 17:20:20 -0700 Subject: [PATCH 6/7] Fix pumparray.py formatting Co-Authored-By: Claude Opus 4.6 (1M context) --- pylabrobot/legacy/pumps/pumparray.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/pylabrobot/legacy/pumps/pumparray.py b/pylabrobot/legacy/pumps/pumparray.py index da3543de68c..3085064c526 100644 --- a/pylabrobot/legacy/pumps/pumparray.py +++ b/pylabrobot/legacy/pumps/pumparray.py @@ -93,9 +93,7 @@ def _normalize_channels(self, use_channels: Union[int, List[int]]) -> List[int]: return use_channels @staticmethod - def _normalize_speeds( - speed: Union[float, int, List[float], List[int]], n: int - ) -> List[float]: + def _normalize_speeds(speed: Union[float, int, List[float], List[int]], n: int) -> List[float]: if isinstance(speed, (float, int)): speed = [float(speed)] * n if any(s < 0 for s in speed): @@ -163,9 +161,7 @@ async def pump_volume( for channel, channel_volume in zip(channels, volume) ] tasks = [ - asyncio.create_task( - self.run_for_duration(speed=s, use_channels=ch, duration=d) - ) + asyncio.create_task(self.run_for_duration(speed=s, use_channels=ch, duration=d)) for s, ch, d in zip(speeds, channels, durations) ] elif self.calibration.calibration_mode == "revolutions": @@ -174,9 +170,7 @@ async def pump_volume( for channel, channel_volume in zip(channels, volume) ] tasks = [ - asyncio.create_task( - self.run_revolutions(num_revolutions=r, use_channels=ch) - ) + asyncio.create_task(self.run_revolutions(num_revolutions=r, use_channels=ch)) for r, ch in zip(num_rotations, channels) ] else: From b7c320734ccbecc53203ab0449ab280ccfbdc009 Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Thu, 26 Mar 2026 17:30:29 -0700 Subject: [PATCH 7/7] Address PR review comments - Fix typo "is is" in calibration error message - Use NotCalibratedError instead of TypeError in pump_volume - Use logging instead of print in chatterbox backend - Fix missing await on io.read() in MasterflexDriver.send_command - Fix legacy serialize() to return correct type name - Fix _PumpAdapter to not await sync legacy backend methods - Add num_revolutions list length validation in PumpArray - Fix off-by-one in channel range error message - Restore Mock(spec=PumpBackend) in pump tests Co-Authored-By: Claude Opus 4.6 (1M context) --- pylabrobot/capabilities/pumping/calibration.py | 2 +- pylabrobot/capabilities/pumping/chatterbox.py | 16 +++++++--------- pylabrobot/capabilities/pumping/pumping.py | 3 ++- pylabrobot/capabilities/pumping/pumping_tests.py | 3 ++- pylabrobot/cole_parmer/masterflex_backend.py | 2 +- .../pumps/cole_parmer/masterflex_backend.py | 2 +- pylabrobot/legacy/pumps/pump.py | 6 +++--- pylabrobot/legacy/pumps/pump_tests.py | 4 ++-- pylabrobot/legacy/pumps/pumparray.py | 4 +++- 9 files changed, 22 insertions(+), 20 deletions(-) diff --git a/pylabrobot/capabilities/pumping/calibration.py b/pylabrobot/capabilities/pumping/calibration.py index 5ae1ba0e9b5..84c0d888c46 100644 --- a/pylabrobot/capabilities/pumping/calibration.py +++ b/pylabrobot/capabilities/pumping/calibration.py @@ -31,7 +31,7 @@ def __init__( """ if any(value <= 0 for value in calibration): - raise ValueError("A value in the calibration is is outside expected parameters.") + raise ValueError("A value in the calibration is outside expected parameters.") if calibration_mode not in ["duration", "revolutions"]: raise ValueError("calibration_mode must be 'duration' or 'revolutions'") self.calibration = calibration diff --git a/pylabrobot/capabilities/pumping/chatterbox.py b/pylabrobot/capabilities/pumping/chatterbox.py index 70a25ebd57e..efdf806ff70 100644 --- a/pylabrobot/capabilities/pumping/chatterbox.py +++ b/pylabrobot/capabilities/pumping/chatterbox.py @@ -1,20 +1,18 @@ +import logging + from .backend import PumpBackend +logger = logging.getLogger(__name__) + class PumpChatterboxBackend(PumpBackend): """Chatterbox backend for device-free testing.""" - async def setup(self): - print("Setting up the pump.") - - async def stop(self): - print("Stopping the pump.") - async def run_revolutions(self, num_revolutions: float): - print(f"Running {num_revolutions} revolutions.") + logger.info("Running %s revolutions.", num_revolutions) async def run_continuously(self, speed: float): - print(f"Running continuously at speed {speed}.") + logger.info("Running continuously at speed %s.", speed) async def halt(self): - print("Halting the pump.") + logger.info("Halting the pump.") diff --git a/pylabrobot/capabilities/pumping/pumping.py b/pylabrobot/capabilities/pumping/pumping.py index c956d1b4a9a..fabbcc3be33 100644 --- a/pylabrobot/capabilities/pumping/pumping.py +++ b/pylabrobot/capabilities/pumping/pumping.py @@ -2,6 +2,7 @@ from typing import Optional, Union from pylabrobot.capabilities.capability import Capability, need_capability_ready +from pylabrobot.capabilities.pumping.errors import NotCalibratedError from .backend import PumpBackend from .calibration import PumpCalibration @@ -62,7 +63,7 @@ async def pump_volume(self, speed: Union[float, int], volume: Union[float, int]) volume: volume to pump. """ if self.calibration is None: - raise TypeError( + raise NotCalibratedError( "Pump is not calibrated. Volume based pumping and related functions unavailable." ) if self.calibration.calibration_mode == "duration": diff --git a/pylabrobot/capabilities/pumping/pumping_tests.py b/pylabrobot/capabilities/pumping/pumping_tests.py index 969c58c85f1..144e506bca5 100644 --- a/pylabrobot/capabilities/pumping/pumping_tests.py +++ b/pylabrobot/capabilities/pumping/pumping_tests.py @@ -3,6 +3,7 @@ from pylabrobot.capabilities.pumping.backend import PumpBackend from pylabrobot.capabilities.pumping.calibration import PumpCalibration +from pylabrobot.capabilities.pumping.errors import NotCalibratedError from pylabrobot.capabilities.pumping.pumping import PumpingCapability @@ -65,7 +66,7 @@ async def test_pump_volume_revolutions_mode(self): async def test_pump_volume_no_calibration(self): cap = await self._make_cap() - with self.assertRaises(TypeError): + with self.assertRaises(NotCalibratedError): await cap.pump_volume(speed=1, volume=1) async def test_not_setup_raises(self): diff --git a/pylabrobot/cole_parmer/masterflex_backend.py b/pylabrobot/cole_parmer/masterflex_backend.py index f209beeff85..6c0ab10f407 100644 --- a/pylabrobot/cole_parmer/masterflex_backend.py +++ b/pylabrobot/cole_parmer/masterflex_backend.py @@ -63,7 +63,7 @@ async def stop(self): async def send_command(self, command: str): command = "\x02P02" + command + "\x0d" await self.io.write(command.encode()) - return self.io.read() + return await self.io.read() def serialize(self): return {"type": self.__class__.__name__, "com_port": self.com_port} diff --git a/pylabrobot/legacy/pumps/cole_parmer/masterflex_backend.py b/pylabrobot/legacy/pumps/cole_parmer/masterflex_backend.py index 3423b1021e8..c7ca690b254 100644 --- a/pylabrobot/legacy/pumps/cole_parmer/masterflex_backend.py +++ b/pylabrobot/legacy/pumps/cole_parmer/masterflex_backend.py @@ -27,7 +27,7 @@ async def stop(self): await self._driver.stop() def serialize(self): - return self._driver.serialize() + return {"type": self.__class__.__name__, "com_port": self._driver.com_port} async def send_command(self, command: str): return await self._driver.send_command(command) diff --git a/pylabrobot/legacy/pumps/pump.py b/pylabrobot/legacy/pumps/pump.py index 6707dd06605..1f28c81765e 100644 --- a/pylabrobot/legacy/pumps/pump.py +++ b/pylabrobot/legacy/pumps/pump.py @@ -15,13 +15,13 @@ def __init__(self, legacy: PumpBackend): self._legacy = legacy async def run_revolutions(self, num_revolutions: float): - await self._legacy.run_revolutions(num_revolutions=num_revolutions) + self._legacy.run_revolutions(num_revolutions=num_revolutions) async def run_continuously(self, speed: float): - await self._legacy.run_continuously(speed=speed) + self._legacy.run_continuously(speed=speed) async def halt(self): - await self._legacy.halt() + self._legacy.halt() class Pump(Machine): diff --git a/pylabrobot/legacy/pumps/pump_tests.py b/pylabrobot/legacy/pumps/pump_tests.py index 344d089d679..71200e0c58d 100644 --- a/pylabrobot/legacy/pumps/pump_tests.py +++ b/pylabrobot/legacy/pumps/pump_tests.py @@ -2,7 +2,7 @@ from unittest.mock import AsyncMock, Mock from pylabrobot.legacy.pumps import PumpArray -from pylabrobot.legacy.pumps.backend import PumpArrayBackend +from pylabrobot.legacy.pumps.backend import PumpArrayBackend, PumpBackend from pylabrobot.legacy.pumps.calibration import PumpCalibration from pylabrobot.legacy.pumps.errors import NotCalibratedError from pylabrobot.legacy.pumps.pump import Pump @@ -15,7 +15,7 @@ class TestPump(unittest.IsolatedAsyncioTestCase): """ def setUp(self): - self.mock_backend = AsyncMock() + self.mock_backend = Mock(spec=PumpBackend) self.test_calibration = PumpCalibration.load_calibration(1, num_items=1) async def test_setup(self): diff --git a/pylabrobot/legacy/pumps/pumparray.py b/pylabrobot/legacy/pumps/pumparray.py index 3085064c526..af5d7ff1034 100644 --- a/pylabrobot/legacy/pumps/pumparray.py +++ b/pylabrobot/legacy/pumps/pumparray.py @@ -86,7 +86,7 @@ def _normalize_channels(self, use_channels: Union[int, List[int]]) -> List[int]: if any(ch not in range(0, self.num_channels) for ch in use_channels): raise ValueError( f"Pump address out of range for this pump array. " - f"Value should be between 0 and {self.num_channels}" + f"Value should be between 0 and {self.num_channels - 1}" ) if any(ch < 0 for ch in use_channels): raise ValueError("Channels in use channels must be positive.") @@ -112,6 +112,8 @@ async def run_revolutions( channels = self._normalize_channels(use_channels) if isinstance(num_revolutions, (float, int)): num_revolutions = [float(num_revolutions)] * len(channels) + if len(num_revolutions) != len(channels): + raise ValueError("num_revolutions and use_channels must be the same length.") for ch, rev in zip(channels, num_revolutions): await self._pumps[ch].run_revolutions(num_revolutions=rev)