diff --git a/pylabrobot/agrowpumps/__init__.py b/pylabrobot/agrowpumps/__init__.py new file mode 100644 index 00000000000..122f9c721d3 --- /dev/null +++ b/pylabrobot/agrowpumps/__init__.py @@ -0,0 +1 @@ +from .agrowdosepump_backend import AgrowChannelBackend, AgrowDosePumpArray, AgrowDriver diff --git a/pylabrobot/agrowpumps/agrowdosepump_backend.py b/pylabrobot/agrowpumps/agrowdosepump_backend.py new file mode 100644 index 00000000000..230c9b969e1 --- /dev/null +++ b/pylabrobot/agrowpumps/agrowdosepump_backend.py @@ -0,0 +1,206 @@ +import asyncio +import logging +import threading +import time +from typing import Dict, List, Optional, Union + +try: + from pymodbus.client import AsyncModbusSerialClient # type: ignore + + _MODBUS_IMPORT_ERROR = None +except ImportError as e: + AsyncModbusSerialClient = None # type: ignore + _MODBUS_IMPORT_ERROR = e + +from pylabrobot.capabilities.capability import Capability +from pylabrobot.capabilities.pumping.backend import PumpBackend +from pylabrobot.capabilities.pumping.calibration import PumpCalibration +from pylabrobot.capabilities.pumping.pumping import PumpingCapability +from pylabrobot.device import Device, Driver + +logger = logging.getLogger("pylabrobot") + + +class AgrowDriver(Driver): + """Modbus driver for Agrow dose pump arrays.""" + + def __init__(self, port: str, address: Union[int, str]): + super().__init__() + if _MODBUS_IMPORT_ERROR is not None: + raise RuntimeError( + "pymodbus is not installed. Install with: pip install pylabrobot[modbus]. " + f"Import error: {_MODBUS_IMPORT_ERROR}" + ) + if not isinstance(port, str): + raise ValueError("Port must be a string") + self.port = port + if address not in range(0, 256): + raise ValueError("Pump address out of range") + self.address = int(address) + self._keep_alive_thread: Optional[threading.Thread] = None + self._pump_index_to_address: Optional[Dict[int, int]] = None + self._modbus: Optional["AsyncModbusSerialClient"] = None + self._num_channels: Optional[int] = None + self._keep_alive_thread_active = False + + @property + def modbus(self) -> "AsyncModbusSerialClient": + if self._modbus is None: + raise RuntimeError("Modbus connection not established") + return self._modbus + + @property + def pump_index_to_address(self) -> Dict[int, int]: + if self._pump_index_to_address is None: + raise RuntimeError("Pump mappings not established") + return self._pump_index_to_address + + @property + def num_channels(self) -> int: + if self._num_channels is None: + raise RuntimeError("Number of channels not established") + return self._num_channels + + def _start_keep_alive_thread(self): + async def keep_alive(): + i = 0 + while self._keep_alive_thread_active: + time.sleep(0.1) + i += 1 + if i == 250: + await self.modbus.read_holding_registers(0, 1, unit=self.address) + i = 0 + + def manage_async_keep_alive(): + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(keep_alive()) + loop.close() + except Exception as e: + logger.error("Error in keep alive thread: %s", e) + + self._keep_alive_thread_active = True + self._keep_alive_thread = threading.Thread(target=manage_async_keep_alive, daemon=True) + self._keep_alive_thread.start() + + async def setup(self): + await self._setup_modbus() + register_return = await self.modbus.read_holding_registers(19, 2, unit=self.address) + self._num_channels = int( + "".join(chr(r // 256) + chr(r % 256) for r in register_return.registers)[2] + ) + self._start_keep_alive_thread() + self._pump_index_to_address = {pump: pump + 100 for pump in range(0, self.num_channels)} + + async def _setup_modbus(self): + if AsyncModbusSerialClient is None: + raise RuntimeError( + "pymodbus is not installed. Install with: pip install pylabrobot[modbus]." + f" Import error: {_MODBUS_IMPORT_ERROR}" + ) + self._modbus = AsyncModbusSerialClient( + port=self.port, + baudrate=115200, + timeout=1, + stopbits=1, + bytesize=8, + parity="E", + retry_on_empty=True, + ) + await self.modbus.connect() + if not self.modbus.connected: + raise ConnectionError("Modbus connection failed during pump setup") + + async def stop(self): + for pump in self.pump_index_to_address: + await self.write_speed(pump, 0) + if self._keep_alive_thread is not None: + self._keep_alive_thread_active = False + self._keep_alive_thread.join() + self.modbus.close() + assert not self.modbus.connected, "Modbus failing to disconnect" + + async def write_speed(self, channel: int, speed: int): + if speed not in range(101): + raise ValueError("Pump speed out of range. Value should be between 0 and 100.") + await self.modbus.write_register( + self.pump_index_to_address[channel], + speed, + unit=self.address, + ) + + +class AgrowChannelBackend(PumpBackend): + """Per-channel PumpBackend adapter that delegates to a shared AgrowDriver.""" + + def __init__(self, connection: AgrowDriver, channel: int): + self._driver = connection + self._channel = channel + + async def run_revolutions(self, num_revolutions: float): + raise NotImplementedError( + "Revolution based pumping commands are not available for Agrow pumps." + ) + + async def run_continuously(self, speed: float): + await self._driver.write_speed(self._channel, int(speed)) + + async def halt(self): + await self._driver.write_speed(self._channel, 0) + + def serialize(self): + return { + "port": self._driver.port, + "address": self._driver.address, + "channel": self._channel, + } + + +class AgrowDosePumpArray(Device): + """Agrow dose pump array device. + + Exposes each channel as an individual PumpingCapability via `self.pumps`. + """ + + def __init__( + self, + port: str, + address: Union[int, str], + calibrations: Optional[List[Optional[PumpCalibration]]] = None, + ): + self._channel_backends: List[AgrowChannelBackend] = [] + self.pumps: List[PumpingCapability] = [] + self._calibrations = calibrations + super().__init__(driver=AgrowDriver(port=port, address=address)) + self._driver: AgrowDriver + + async def setup(self): + await self._driver.setup() + num_channels = self._driver.num_channels + + self._channel_backends = [AgrowChannelBackend(self._driver, ch) for ch in range(num_channels)] + self.pumps = [] + for i, backend in enumerate(self._channel_backends): + cal = None + if self._calibrations is not None and i < len(self._calibrations): + cal = self._calibrations[i] + cap = PumpingCapability(backend=backend, calibration=cal) + self.pumps.append(cap) + + self._capabilities: List[Capability] = list(self.pumps) + for c in self._capabilities: + await c._on_setup() + self._setup_finished = True + + async def stop(self): + for cap in reversed(self._capabilities): + await cap._on_stop() + await self._driver.stop() + self._setup_finished = False + + def serialize(self): + return { + "port": self._driver.port, + "address": self._driver.address, + } diff --git a/pylabrobot/agrowpumps/agrowdosepump_tests.py b/pylabrobot/agrowpumps/agrowdosepump_tests.py new file mode 100644 index 00000000000..3503185d47d --- /dev/null +++ b/pylabrobot/agrowpumps/agrowdosepump_tests.py @@ -0,0 +1,76 @@ +# mypy: disable-error-code="attr-defined,assignment" +import unittest +from unittest.mock import AsyncMock, patch + +import pytest + +pytest.importorskip("pymodbus") + +from pylabrobot.agrowpumps import AgrowDosePumpArray + + +class SimulatedModbusClient: + """Duck-typed modbus client for testing.""" + + def __init__(self): + self._connected = False + self.write_register = AsyncMock() + + async def connect(self): + self._connected = True + + @property + def connected(self): + return self._connected + + async def read_holding_registers(self, address: int, count: int, **kwargs): + if "unit" not in kwargs: + raise ValueError("unit must be specified") + if address == 19: + result = AsyncMock() + result.registers = [16708, 13824, 0, 0, 0, 0, 0][:count] + return result + + def close(self, reconnect=False): + self._connected = False + + +class TestAgrowPumps(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self.device = AgrowDosePumpArray(port="simulated", address=1) + + async def _mock_setup_modbus(): + self.device._driver._modbus = SimulatedModbusClient() + + with patch.object(self.device._driver, "_setup_modbus", _mock_setup_modbus): + await self.device.setup() + + async def asyncTearDown(self): + await self.device.stop() + + async def test_setup(self): + self.assertEqual(self.device._driver.port, "simulated") + self.assertEqual(self.device._driver.address, 1) + self.assertEqual(len(self.device.pumps), 6) + self.assertEqual( + self.device._driver._pump_index_to_address, + {pump: pump + 100 for pump in range(0, 6)}, + ) + + async def test_run_continuously(self): + self.device._driver.modbus.write_register.reset_mock() + await self.device.pumps[0].run_continuously(speed=1) + self.device._driver.modbus.write_register.assert_called_once_with(100, 1, unit=1) + + # invalid speed: cannot be bigger than 100 + with self.assertRaises(ValueError): + await self.device.pumps[0].run_continuously(speed=101) + + async def test_run_revolutions(self): + with self.assertRaises(NotImplementedError): + await self.device.pumps[0].run_revolutions(num_revolutions=1.0) + + async def test_halt_single_channel(self): + self.device._driver.modbus.write_register.reset_mock() + await self.device.pumps[2].halt() + self.device._driver.modbus.write_register.assert_called_once_with(102, 0, unit=1) diff --git a/pylabrobot/capabilities/pumping/__init__.py b/pylabrobot/capabilities/pumping/__init__.py new file mode 100644 index 00000000000..4e3c3a32ec0 --- /dev/null +++ b/pylabrobot/capabilities/pumping/__init__.py @@ -0,0 +1,5 @@ +from .backend import PumpBackend +from .calibration import PumpCalibration +from .chatterbox import PumpChatterboxBackend +from .errors import NotCalibratedError +from .pumping import PumpingCapability diff --git a/pylabrobot/capabilities/pumping/backend.py b/pylabrobot/capabilities/pumping/backend.py new file mode 100644 index 00000000000..467d22977ce --- /dev/null +++ b/pylabrobot/capabilities/pumping/backend.py @@ -0,0 +1,19 @@ +from abc import ABCMeta, abstractmethod + +from pylabrobot.capabilities.capability import CapabilityBackend + + +class PumpBackend(CapabilityBackend, metaclass=ABCMeta): + """Abstract backend for a single pump.""" + + @abstractmethod + async def run_revolutions(self, num_revolutions: float): + """Run for a given number of revolutions.""" + + @abstractmethod + async def run_continuously(self, speed: float): + """Run continuously at a given speed. If speed is 0, halt.""" + + @abstractmethod + async def halt(self): + """Halt the pump.""" diff --git a/pylabrobot/capabilities/pumping/calibration.py b/pylabrobot/capabilities/pumping/calibration.py new file mode 100644 index 00000000000..84c0d888c46 --- /dev/null +++ b/pylabrobot/capabilities/pumping/calibration.py @@ -0,0 +1,176 @@ +from __future__ import annotations + +import csv +import json +from typing import Dict, List, Literal, Optional, Union + +from pylabrobot.serializer import SerializableMixin + + +class PumpCalibration(SerializableMixin): + """Calibration for a single pump or pump array + + Attributes: + calibration: The calibration of the pump or pump array. + """ + + def __init__( + self, + calibration: List[Union[float, int]], + calibration_mode: Literal["duration", "revolutions"] = "duration", + ): + """Initialize a PumpCalibration object. + + Args: + calibration: calibration of the pump in pump-specific volume per time/revolution units. + calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for + volume per revolution. Defaults to "duration". + + Raises: + ValueError: if a value in the calibration is outside expected parameters. + """ + + if any(value <= 0 for value in calibration): + raise ValueError("A value in the calibration is outside expected parameters.") + if calibration_mode not in ["duration", "revolutions"]: + raise ValueError("calibration_mode must be 'duration' or 'revolutions'") + self.calibration = calibration + self.calibration_mode = calibration_mode + + def __getitem__(self, item: int) -> Union[float, int]: + return self.calibration[item] # type: ignore + + def __len__(self) -> int: + """Return the length of the calibration.""" + return len(self.calibration) + + @classmethod + def load_calibration( + cls, + calibration: Optional[Union[dict, list, float, int, str]] = None, + num_items: Optional[int] = None, + calibration_mode: Literal["duration", "revolutions"] = "duration", + ) -> PumpCalibration: + """Load a calibration from a file, dictionary, list, or value. + + Args: + calibration: pump calibration file, dictionary, list, or value. + calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for + volume per revolution. Defaults to "duration". + num_items: number of items in the calibration. Required if calibration is a value. + + Raises: + NotImplementedError: if the calibration filetype or format is not supported. + ValueError: if num_items is not specified when calibration is a value. + """ + + if isinstance(calibration, dict): + return PumpCalibration.load_from_dict( + calibration=calibration, calibration_mode=calibration_mode + ) + if isinstance(calibration, list): + return PumpCalibration.load_from_list( + calibration=calibration, calibration_mode=calibration_mode + ) + if isinstance(calibration, (float, int)): + if num_items is None: + raise ValueError("num_items must be specified if calibration is a value.") + return PumpCalibration.load_from_value( + value=calibration, + num_items=num_items, + calibration_mode=calibration_mode, + ) + if isinstance(calibration, str): + if calibration.endswith(".json"): + return PumpCalibration.load_from_json( + file_path=calibration, calibration_mode=calibration_mode + ) + if calibration.endswith(".csv"): + return PumpCalibration.load_from_csv( + file_path=calibration, calibration_mode=calibration_mode + ) + raise NotImplementedError("Calibration filetype not supported.") + raise NotImplementedError("Calibration format not supported.") + + def serialize(self) -> dict: + return { + "calibration": self.calibration, + "calibration_mode": self.calibration_mode, + } + + @classmethod + def deserialize(cls, data: dict) -> PumpCalibration: + return cls( + calibration=data["calibration"], + calibration_mode=data["calibration_mode"], + ) + + @classmethod + def load_from_json( + cls, + file_path: str, + calibration_mode: Literal["duration", "revolutions"] = "duration", + ) -> PumpCalibration: + """Load a calibration from a json file.""" + + with open(file_path, "rb") as f: + calibration = json.load(f) + if isinstance(calibration, dict): + calibration = {int(key): float(value) for key, value in calibration.items()} + return PumpCalibration.load_from_dict( + calibration=calibration, calibration_mode=calibration_mode + ) + if isinstance(calibration, list): + return PumpCalibration(calibration=calibration, calibration_mode=calibration_mode) + raise TypeError(f"Calibration pulled from {file_path} is not a dictionary or list.") + + @classmethod + def load_from_csv( + cls, + file_path: str, + calibration_mode: Literal["duration", "revolutions"] = "duration", + ) -> PumpCalibration: + """Load a calibration from a csv file.""" + + with open(file_path, encoding="utf-8", newline="") as f: + csv_file = list(csv.reader(f)) + num_columns = len(csv_file[0]) + if num_columns != 2: + raise ValueError("CSV file must have two columns.") + calibration = {int(row[0]): float(row[1]) for row in csv_file} + return PumpCalibration.load_from_dict( + calibration=calibration, calibration_mode=calibration_mode + ) + + @classmethod + def load_from_dict( + cls, + calibration: Dict[int, Union[int, float]], + calibration_mode: Literal["duration", "revolutions"] = "duration", + ) -> PumpCalibration: + """Load a calibration from a dictionary (0-indexed).""" + + if sorted(calibration.keys()) != list(range(len(calibration))): + raise ValueError("Keys must be a contiguous range of integers starting at 0.") + calibration_list = [calibration[key] for key in sorted(calibration.keys())] + return cls(calibration=calibration_list, calibration_mode=calibration_mode) + + @classmethod + def load_from_list( + cls, + calibration: List[Union[int, float]], + calibration_mode: Literal["duration", "revolutions"] = "duration", + ) -> PumpCalibration: + """Load a calibration from a list.""" + return cls(calibration=calibration, calibration_mode=calibration_mode) + + @classmethod + def load_from_value( + cls, + value: Union[float, int], + num_items: int, + calibration_mode: Literal["duration", "revolutions"] = "duration", + ) -> PumpCalibration: + """Load a calibration from a single value applied to all channels.""" + calibration = [value] * num_items + return cls(calibration, calibration_mode) diff --git a/pylabrobot/capabilities/pumping/chatterbox.py b/pylabrobot/capabilities/pumping/chatterbox.py new file mode 100644 index 00000000000..efdf806ff70 --- /dev/null +++ b/pylabrobot/capabilities/pumping/chatterbox.py @@ -0,0 +1,18 @@ +import logging + +from .backend import PumpBackend + +logger = logging.getLogger(__name__) + + +class PumpChatterboxBackend(PumpBackend): + """Chatterbox backend for device-free testing.""" + + async def run_revolutions(self, num_revolutions: float): + logger.info("Running %s revolutions.", num_revolutions) + + async def run_continuously(self, speed: float): + logger.info("Running continuously at speed %s.", speed) + + async def halt(self): + logger.info("Halting the pump.") diff --git a/pylabrobot/capabilities/pumping/errors.py b/pylabrobot/capabilities/pumping/errors.py new file mode 100644 index 00000000000..64f19783fca --- /dev/null +++ b/pylabrobot/capabilities/pumping/errors.py @@ -0,0 +1,2 @@ +class NotCalibratedError(Exception): + """Error raised when calling a method that requires the pump to be calibrated.""" diff --git a/pylabrobot/capabilities/pumping/pumping.py b/pylabrobot/capabilities/pumping/pumping.py new file mode 100644 index 00000000000..fabbcc3be33 --- /dev/null +++ b/pylabrobot/capabilities/pumping/pumping.py @@ -0,0 +1,86 @@ +import asyncio +from typing import Optional, Union + +from pylabrobot.capabilities.capability import Capability, need_capability_ready +from pylabrobot.capabilities.pumping.errors import NotCalibratedError + +from .backend import PumpBackend +from .calibration import PumpCalibration + + +class PumpingCapability(Capability): + """Single-pump capability.""" + + def __init__( + self, + backend: PumpBackend, + calibration: Optional[PumpCalibration] = None, + ): + super().__init__(backend=backend) + self.backend: PumpBackend = backend + if calibration is not None and len(calibration) != 1: + raise ValueError("Calibration may only have a single item for this pump") + self.calibration = calibration + + @need_capability_ready + async def run_revolutions(self, num_revolutions: float): + """Run for a given number of revolutions. + + Args: + num_revolutions: number of revolutions to run. + """ + await self.backend.run_revolutions(num_revolutions=num_revolutions) + + @need_capability_ready + async def run_continuously(self, speed: float): + """Run continuously at a given speed. If speed is 0, the pump will be halted. + + Args: + speed: speed in rpm/pump-specific units. + """ + await self.backend.run_continuously(speed=speed) + + @need_capability_ready + async def run_for_duration(self, speed: Union[float, int], duration: Union[float, int]): + """Run the pump at specified speed for the specified duration. + + Args: + speed: speed in rpm/pump-specific units. + duration: duration in seconds. + """ + if duration < 0: + raise ValueError("Duration must be positive.") + await self.run_continuously(speed=speed) + await asyncio.sleep(duration) + await self.run_continuously(speed=0) + + @need_capability_ready + async def pump_volume(self, speed: Union[float, int], volume: Union[float, int]): + """Run the pump at specified speed for the specified volume. Requires calibration. + + Args: + speed: speed in rpm/pump-specific units. + volume: volume to pump. + """ + if self.calibration is None: + raise NotCalibratedError( + "Pump is not calibrated. Volume based pumping and related functions unavailable." + ) + if self.calibration.calibration_mode == "duration": + duration = volume / self.calibration[0] + await self.run_for_duration(speed=speed, duration=duration) + elif self.calibration.calibration_mode == "revolutions": + num_revolutions = volume / self.calibration[0] + await self.run_revolutions(num_revolutions=num_revolutions) + else: + raise ValueError("Calibration mode not recognized.") + + @need_capability_ready + async def halt(self): + """Halt the pump.""" + await self.backend.halt() + + async def _on_stop(self): + if self._setup_finished: + await self.backend.halt() + await super()._on_stop() diff --git a/pylabrobot/capabilities/pumping/pumping_tests.py b/pylabrobot/capabilities/pumping/pumping_tests.py new file mode 100644 index 00000000000..144e506bca5 --- /dev/null +++ b/pylabrobot/capabilities/pumping/pumping_tests.py @@ -0,0 +1,79 @@ +import unittest +from unittest.mock import AsyncMock, Mock + +from pylabrobot.capabilities.pumping.backend import PumpBackend +from pylabrobot.capabilities.pumping.calibration import PumpCalibration +from pylabrobot.capabilities.pumping.errors import NotCalibratedError +from pylabrobot.capabilities.pumping.pumping import PumpingCapability + + +class TestPumpingCapability(unittest.IsolatedAsyncioTestCase): + def setUp(self): + self.mock_backend = Mock(spec=PumpBackend) + self.mock_backend.run_revolutions = AsyncMock() + self.mock_backend.run_continuously = AsyncMock() + self.mock_backend.halt = AsyncMock() + self.test_calibration = PumpCalibration.load_calibration(1, num_items=1) + + async def _make_cap(self, calibration=None): + cap = PumpingCapability(backend=self.mock_backend, calibration=calibration) + await cap._on_setup() + return cap + + async def test_setup(self): + cap = await self._make_cap() + self.assertIsNone(cap.calibration) + self.assertTrue(cap.setup_finished) + + async def test_run_revolutions(self): + cap = await self._make_cap() + await cap.run_revolutions(num_revolutions=1) + self.mock_backend.run_revolutions.assert_called_once_with(num_revolutions=1) + + async def test_run_continuously(self): + cap = await self._make_cap() + await cap.run_continuously(speed=100) + self.mock_backend.run_continuously.assert_called_once_with(speed=100) + + async def test_halt(self): + cap = await self._make_cap() + await cap.halt() + self.mock_backend.halt.assert_called_once() + + async def test_run_for_duration(self): + cap = await self._make_cap() + await cap.run_for_duration(speed=1, duration=0) + self.mock_backend.run_continuously.assert_called_with(speed=0) + + async def test_run_invalid_duration(self): + cap = await self._make_cap() + with self.assertRaises(ValueError): + await cap.run_for_duration(speed=1, duration=-1) + + async def test_pump_volume_duration_mode(self): + cap = await self._make_cap(calibration=self.test_calibration) + cap.calibration.calibration_mode = "duration" + cap.run_for_duration = AsyncMock() + await cap.pump_volume(speed=1, volume=1) + cap.run_for_duration.assert_called_once_with(speed=1, duration=1.0) + + async def test_pump_volume_revolutions_mode(self): + cap = await self._make_cap(calibration=self.test_calibration) + cap.calibration.calibration_mode = "revolutions" + cap.run_revolutions = AsyncMock() + await cap.pump_volume(speed=1, volume=1) + cap.run_revolutions.assert_called_once_with(num_revolutions=1.0) + + async def test_pump_volume_no_calibration(self): + cap = await self._make_cap() + with self.assertRaises(NotCalibratedError): + await cap.pump_volume(speed=1, volume=1) + + async def test_not_setup_raises(self): + cap = PumpingCapability(backend=self.mock_backend) + with self.assertRaises(RuntimeError): + await cap.run_continuously(speed=1) + + +if __name__ == "__main__": + unittest.main() diff --git a/pylabrobot/cole_parmer/__init__.py b/pylabrobot/cole_parmer/__init__.py new file mode 100644 index 00000000000..308c97ae35a --- /dev/null +++ b/pylabrobot/cole_parmer/__init__.py @@ -0,0 +1 @@ +from .masterflex_backend import MasterflexBackend, MasterflexDriver, MasterflexPump diff --git a/pylabrobot/cole_parmer/masterflex_backend.py b/pylabrobot/cole_parmer/masterflex_backend.py new file mode 100644 index 00000000000..6c0ab10f407 --- /dev/null +++ b/pylabrobot/cole_parmer/masterflex_backend.py @@ -0,0 +1,114 @@ +try: + import serial # type: ignore + + HAS_SERIAL = True +except ImportError as e: + HAS_SERIAL = False + _SERIAL_IMPORT_ERROR = e + +from typing import Optional + +from pylabrobot.capabilities.pumping.backend import PumpBackend +from pylabrobot.capabilities.pumping.calibration import PumpCalibration +from pylabrobot.capabilities.pumping.pumping import PumpingCapability +from pylabrobot.device import Device, Driver +from pylabrobot.io.serial import Serial + + +class MasterflexDriver(Driver): + """Serial driver for Cole Parmer Masterflex L/S pumps. + + tested on: + 07551-20 + + should be same as: + 07522-20 + 07522-30 + 07551-30 + 07575-30 + 07575-40 + + Documentation available at: + - https://pim-resources.coleparmer.com/instruction-manual/a-1299-1127b-en.pdf + - https://web.archive.org/web/20210924061132/https://pim-resources.coleparmer.com/ + instruction-manual/a-1299-1127b-en.pdf + """ + + def __init__(self, com_port: str): + super().__init__() + if not HAS_SERIAL: + raise RuntimeError( + "pyserial is not installed. Install with: pip install pylabrobot[serial]. " + f"Import error: {_SERIAL_IMPORT_ERROR}" + ) + self.com_port = com_port + self.io = Serial( + port=self.com_port, + baudrate=4800, + timeout=1, + parity=serial.PARITY_ODD, + stopbits=serial.STOPBITS_ONE, + bytesize=serial.SEVENBITS, + human_readable_device_name="Masterflex Pump", + ) + + async def setup(self): + await self.io.setup() + await self.io.write(b"\x05") # Enquiry; ready to send. + await self.io.write(b"\x05P02\r") + + async def stop(self): + await self.io.stop() + + async def send_command(self, command: str): + command = "\x02P02" + command + "\x0d" + await self.io.write(command.encode()) + return await self.io.read() + + def serialize(self): + return {"type": self.__class__.__name__, "com_port": self.com_port} + + +class MasterflexBackend(PumpBackend): + """Pump capability backend for Masterflex L/S pumps.""" + + def __init__(self, driver: MasterflexDriver): + self._driver = driver + + async def run_revolutions(self, num_revolutions: float): + num_revolutions = round(num_revolutions, 2) + cmd = f"V{num_revolutions}G" + await self._driver.send_command(cmd) + + async def run_continuously(self, speed: float): + if speed == 0: + await self.halt() + return + + direction = "+" if speed > 0 else "-" + speed_int = int(abs(speed)) + cmd = f"S{direction}{speed_int}G0" + await self._driver.send_command(cmd) + + async def halt(self): + await self._driver.send_command("H") + + def serialize(self): + return { + "com_port": self._driver.com_port, + } + + +class MasterflexPump(Device): + """Cole Parmer Masterflex L/S pump.""" + + def __init__( + self, + com_port: str, + calibration: Optional[PumpCalibration] = None, + ): + driver = MasterflexDriver(com_port=com_port) + super().__init__(driver=driver) + self._driver: MasterflexDriver + self.pumping = PumpingCapability(backend=MasterflexBackend(driver), calibration=calibration) + self._capabilities = [self.pumping] diff --git a/pylabrobot/legacy/pumps/agrowpumps/agrowdosepump_backend.py b/pylabrobot/legacy/pumps/agrowpumps/agrowdosepump_backend.py index 605d835a542..01d75aebbc5 100644 --- a/pylabrobot/legacy/pumps/agrowpumps/agrowdosepump_backend.py +++ b/pylabrobot/legacy/pumps/agrowpumps/agrowdosepump_backend.py @@ -1,146 +1,47 @@ -import asyncio -import logging -import threading -import time -from typing import Dict, List, Optional, Union +"""Legacy. Use pylabrobot.agrowpumps instead.""" -try: - from pymodbus.client import AsyncModbusSerialClient # type: ignore - - _MODBUS_IMPORT_ERROR = None -except ImportError as e: - AsyncModbusSerialClient = None # type: ignore - _MODBUS_IMPORT_ERROR = e +from typing import Dict, List, Union +from pylabrobot.agrowpumps.agrowdosepump_backend import AgrowChannelBackend, AgrowDriver from pylabrobot.legacy.pumps.backend import PumpArrayBackend -logger = logging.getLogger("pylabrobot") - class AgrowPumpArrayBackend(PumpArrayBackend): - """ - AgrowPumpArray allows users to control AgrowPumps via Modbus communication. - - https://www.agrowtek.com/doc/im/IM_MODBUS.pdf - https://agrowtek.com/doc/im/IM_LX1.pdf - - Attributes: - port: The port that the AgrowPumpArray is connected to. - address: The address of the AgrowPumpArray client registers. - - Properties: - num_channels: The number of channels that the AgrowPumpArray has. - pump_index_to_address: A dictionary that maps pump indices to their Modbus addresses. - """ + """Legacy. Use pylabrobot.agrowpumps.AgrowDosePumpArray instead.""" def __init__(self, port: str, address: Union[int, str]): - if _MODBUS_IMPORT_ERROR is not None: - raise RuntimeError( - "pymodbus is not installed. Install with: pip install pylabrobot[modbus]. " - f"Import error: {_MODBUS_IMPORT_ERROR}" - ) - if not isinstance(port, str): - raise ValueError("Port must be a string") - self.port = port - if address not in range(0, 256): - raise ValueError("Pump address out of range") - self.address = int(address) - self._keep_alive_thread: Optional[threading.Thread] = None - self._pump_index_to_address: Optional[Dict[int, int]] = None - self._modbus: Optional["AsyncModbusSerialClient"] = None - self._num_channels: Optional[int] = None - self._keep_alive_thread_active = False + self._driver = AgrowDriver(port=port, address=address) + self._backends: List[AgrowChannelBackend] = [] @property - def modbus(self) -> "AsyncModbusSerialClient": - """Returns the Modbus connection to the AgrowPumpArray.""" - if self._modbus is None: - raise RuntimeError("Modbus connection not established") - return self._modbus + def port(self): + return self._driver.port @property - def pump_index_to_address(self) -> Dict[int, int]: - """Returns a dictionary that maps pump indices to their Modbus addresses. + def address(self): + return self._driver.address - Returns: - Dict[int, int]: A dictionary that maps pump indices to their Modbus addresses. - """ - - if self._pump_index_to_address is None: - raise RuntimeError("Pump mappings not established") - return self._pump_index_to_address + @property + def modbus(self): + return self._driver.modbus @property def num_channels(self) -> int: - """The number of channels that the AgrowPumpArray has. - - Returns: - The number of channels that the AgrowPumpArray has. - """ - if self._num_channels is None: - raise RuntimeError("Number of channels not established") - return self._num_channels - - def start_keep_alive_thread(self): - """Creates a daemon thread that sends a Modbus request every 25 seconds to keep the connection - alive.""" - - async def keep_alive(): - """Sends a Modbus request every 25 seconds to keep the connection alive. - Sleep for 0.1 seconds so we can respond to `stop` events fast. - """ - i = 0 - while self._keep_alive_thread_active: - time.sleep(0.1) - i += 1 - if i == 250: - await self.modbus.read_holding_registers(0, 1, unit=self.address) - i = 0 - - def manage_async_keep_alive(): - """Manages the keep alive thread.""" - try: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(keep_alive()) - loop.close() - except Exception as e: - logger.error("Error in keep alive thread: %s", e) - - self._keep_alive_thread_active = True - self._keep_alive_thread = threading.Thread(target=manage_async_keep_alive, daemon=True) - self._keep_alive_thread.start() + return self._driver.num_channels + + @property + def pump_index_to_address(self) -> Dict[int, int]: + return self._driver.pump_index_to_address async def setup(self): - """Sets up the Modbus connection to the AgrowPumpArray and creates the - pump mappings needed to issue commands. - """ - await self._setup_modbus() - register_return = await self.modbus.read_holding_registers(19, 2, unit=self.address) - self._num_channels = int( - "".join(chr(r // 256) + chr(r % 256) for r in register_return.registers)[2] - ) - self.start_keep_alive_thread() - self._pump_index_to_address = {pump: pump + 100 for pump in range(0, self.num_channels)} - - async def _setup_modbus(self): - if AsyncModbusSerialClient is None: - raise RuntimeError( - "pymodbus is not installed. Install with: pip install pylabrobot[modbus]." - f" Import error: {_MODBUS_IMPORT_ERROR}" - ) - self._modbus = AsyncModbusSerialClient( - port=self.port, - baudrate=115200, - timeout=1, - stopbits=1, - bytesize=8, - parity="E", - retry_on_empty=True, - ) - await self.modbus.connect() - if not self.modbus.connected: - raise ConnectionError("Modbus connection failed during pump setup") + await self._driver.setup() + self._backends = [ + AgrowChannelBackend(self._driver, ch) for ch in range(self._driver.num_channels) + ] + + async def stop(self): + await self.halt() + await self._driver.stop() def serialize(self): return { @@ -150,66 +51,20 @@ def serialize(self): } async def run_revolutions(self, num_revolutions: List[float], use_channels: List[int]): - """Run the specified channels at the speed selected. If speed is 0, the pump will be halted. - - Args: - num_revolutions: number of revolutions to run pumps. - use_channels: pump array channels to run - - Raises: - NotImplementedError: Revolution based pumping commands are not available for this array. - """ - raise NotImplementedError( "Revolution based pumping commands are not available for this pump array." ) async def run_continuously(self, speed: List[float], use_channels: List[int]): - """Run pumps at the specified speeds. - - Args: - speed: rate at which to run pump. - use_channels: pump array channels to run - - Raises: - ValueError: Pump address out of range - ValueError: Pump speed out of range - """ - - for pump_index, pump_speed in zip(use_channels, speed): - pump_speed = int(pump_speed) - if pump_speed not in range(101): - raise ValueError("Pump speed out of range. Value should be between 0 and 100.") - await self.modbus.write_register( - self.pump_index_to_address[pump_index], - pump_speed, - unit=self.address, - ) + for channel, pump_speed in zip(use_channels, speed): + await self._backends[channel].run_continuously(pump_speed) async def halt(self): - """Halt the entire pump array.""" - assert self.modbus is not None, "Modbus connection not established" - assert self.pump_index_to_address is not None, "Pump address mapping not established" - logger.info("Halting pump array") - for pump in self.pump_index_to_address: - address = self.pump_index_to_address[pump] - await self.modbus.write_register(address, 0, unit=self.address) - - async def stop(self): - """Close the connection to the pump array.""" - await self.halt() - assert self.modbus is not None, "Modbus connection not established" - if self._keep_alive_thread is not None: - self._keep_alive_thread_active = False - self._keep_alive_thread.join() - self.modbus.close() - assert not self.modbus.connected, "Modbus failing to disconnect" - - -# Deprecated alias with warning # TODO: remove mid May 2025 (giving people 1 month to update) -# https://github.com/PyLabRobot/pylabrobot/issues/466 + for backend in self._backends: + await backend.halt() +# Deprecated alias class AgrowPumpArray: def __init__(self, *args, **kwargs): raise RuntimeError( diff --git a/pylabrobot/legacy/pumps/agrowpumps/agrowdosepump_tests.py b/pylabrobot/legacy/pumps/agrowpumps/agrowdosepump_tests.py index 4a46ba87987..546506a2b09 100644 --- a/pylabrobot/legacy/pumps/agrowpumps/agrowdosepump_tests.py +++ b/pylabrobot/legacy/pumps/agrowpumps/agrowdosepump_tests.py @@ -1,26 +1,21 @@ +# mypy: disable-error-code="attr-defined,assignment" import unittest -from unittest.mock import AsyncMock, call +from unittest.mock import AsyncMock, call, patch import pytest pytest.importorskip("pymodbus") -from pymodbus.client import AsyncModbusSerialClient # type: ignore - from pylabrobot.legacy.pumps import PumpArray from pylabrobot.legacy.pumps.agrowpumps import AgrowPumpArrayBackend -class SimulatedModbusClient(AsyncModbusSerialClient): - """ - SimulatedModbusClient allows users to simulate Modbus communication. - - Attributes: - connected: A boolean that indicates whether the simulated client is connected. - """ +class SimulatedModbusClient: + """Duck-typed modbus client for testing.""" - def __init__(self, connected: bool = False): - self._connected = connected + def __init__(self): + self._connected = False + self.write_register = AsyncMock() async def connect(self): self._connected = True @@ -29,35 +24,28 @@ async def connect(self): def connected(self): return self._connected - async def read_holding_registers(self, address: int, count: int, **kwargs): # type: ignore - """Simulates reading holding registers from the AgrowPumpArray.""" + async def read_holding_registers(self, address: int, count: int, **kwargs): if "unit" not in kwargs: raise ValueError("unit must be specified") if address == 19: - return_register = AsyncMock() - return_register.registers = [16708, 13824, 0, 0, 0, 0, 0][:count] - return return_register - - write_register = AsyncMock() + result = AsyncMock() + result.registers = [16708, 13824, 0, 0, 0, 0, 0][:count] + return result def close(self, reconnect=False): - assert not self.connected, "Modbus connection not established" self._connected = False class TestAgrowPumps(unittest.IsolatedAsyncioTestCase): - """TestAgrowPumps allows users to test AgrowPumps.""" - async def asyncSetUp(self): self.agrow_backend = AgrowPumpArrayBackend(port="simulated", address=1) async def _mock_setup_modbus(): - self.agrow_backend._modbus = SimulatedModbusClient() - - self.agrow_backend._setup_modbus = _mock_setup_modbus # type: ignore[method-assign] + self.agrow_backend._driver._modbus = SimulatedModbusClient() - self.pump_array = PumpArray(backend=self.agrow_backend, calibration=None) - await self.pump_array.setup() + with patch.object(self.agrow_backend._driver, "_setup_modbus", _mock_setup_modbus): + self.pump_array = PumpArray(backend=self.agrow_backend, calibration=None) + await self.pump_array.setup() async def asyncTearDown(self): await self.pump_array.stop() @@ -66,14 +54,14 @@ async def test_setup(self): self.assertEqual(self.agrow_backend.port, "simulated") self.assertEqual(self.agrow_backend.address, 1) self.assertEqual( - self.agrow_backend._pump_index_to_address, + self.agrow_backend.pump_index_to_address, {pump: pump + 100 for pump in range(0, 6)}, ) async def test_run_continuously(self): - self.agrow_backend.modbus.write_register.reset_mock() # type: ignore[attr-defined] + self.agrow_backend.modbus.write_register.reset_mock() await self.pump_array.run_continuously(speed=1, use_channels=[0]) - self.agrow_backend.modbus.write_register.assert_called_once_with(100, 1, unit=1) # type: ignore[attr-defined] + self.agrow_backend.modbus.write_register.assert_called_once_with(100, 1, unit=1) # invalid speed: cannot be bigger than 100 with self.assertRaises(ValueError): @@ -86,6 +74,6 @@ async def test_run_revolutions(self): async def test_halt(self): await self.pump_array.halt() - self.agrow_backend.modbus.write_register.assert_has_calls( # type: ignore[attr-defined] + self.agrow_backend.modbus.write_register.assert_has_calls( [call(100 + i, 0, unit=1) for i in range(6)] ) diff --git a/pylabrobot/legacy/pumps/calibration.py b/pylabrobot/legacy/pumps/calibration.py index 48a25afb6e8..82f3b8969b2 100644 --- a/pylabrobot/legacy/pumps/calibration.py +++ b/pylabrobot/legacy/pumps/calibration.py @@ -1,219 +1,5 @@ -from __future__ import annotations +"""Legacy. Use pylabrobot.capabilities.pumping.calibration instead.""" -import csv -import json -from typing import Dict, List, Literal, Optional, Union +from pylabrobot.capabilities.pumping.calibration import PumpCalibration -from pylabrobot.serializer import SerializableMixin - - -class PumpCalibration(SerializableMixin): - """Calibration for a single pump or pump array - - Attributes: - calibration: The calibration of the pump or pump array. - """ - - def __init__( - self, - calibration: List[Union[float, int]], - calibration_mode: Literal["duration", "revolutions"] = "duration", - ): - """Initialize a PumpCalibration object. - - Args: - calibration: calibration of the pump in pump-specific volume per time/revolution units. - calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for - volume per revolution. Defaults to "duration". - - Raises: - ValueError: if a value in the calibration is outside expected parameters. - """ - - if any(value <= 0 for value in calibration): - raise ValueError("A value in the calibration is is outside expected parameters.") - if calibration_mode not in ["duration", "revolutions"]: - raise ValueError("calibration_mode must be 'duration' or 'revolutions'") - self.calibration = calibration - self.calibration_mode = calibration_mode - - def __getitem__(self, item: int) -> Union[float, int]: - return self.calibration[item] # type: ignore - - def __len__(self) -> int: - """Return the length of the calibration.""" - - return len(self.calibration) - - @classmethod - def load_calibration( - cls, - calibration: Optional[Union[dict, list, float, int, str]] = None, - num_items: Optional[int] = None, - calibration_mode: Literal["duration", "revolutions"] = "duration", - ) -> PumpCalibration: - """Load a calibration from a file, dictionary, list, or value. :param calibration: pump - calibration file, dictionary, list, or value. If None, returns an empty PumpCalibration - object. - - Args: - calibration: pump calibration file, dictionary, list, or value. - calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for - volume per revolution. Defaults to "duration". - num_items: number of items in the calibration. Required if calibration is a value. - - Raises: - NotImplementedError: if the calibration filetype or format is not supported. - ValueError: if num_items is not specified when calibration is a value. - """ - - if isinstance(calibration, dict): - return PumpCalibration.load_from_dict( - calibration=calibration, calibration_mode=calibration_mode - ) - if isinstance(calibration, list): - return PumpCalibration.load_from_list( - calibration=calibration, calibration_mode=calibration_mode - ) - if isinstance(calibration, (float, int)): - if num_items is None: - raise ValueError("num_items must be specified if calibration is a value.") - return PumpCalibration.load_from_value( - value=calibration, - num_items=num_items, - calibration_mode=calibration_mode, - ) - if isinstance(calibration, str): - if calibration.endswith(".json"): - return PumpCalibration.load_from_json( - file_path=calibration, calibration_mode=calibration_mode - ) - if calibration.endswith(".csv"): - return PumpCalibration.load_from_csv( - file_path=calibration, calibration_mode=calibration_mode - ) - raise NotImplementedError("Calibration filetype not supported.") - raise NotImplementedError("Calibration format not supported.") - - def serialize(self) -> dict: - return { - "calibration": self.calibration, - "calibration_mode": self.calibration_mode, - } - - @classmethod - def deserialize(cls, data: dict) -> PumpCalibration: - return cls( - calibration=data["calibration"], - calibration_mode=data["calibration_mode"], - ) - - @classmethod - def load_from_json( - cls, - file_path: str, - calibration_mode: Literal["duration", "revolutions"] = "duration", - ) -> PumpCalibration: - """Load a calibration from a json file. - - Args: - file_path: json file to load calibration from. - calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for - volume per revolution. Defaults to "duration". - - Raises: - TypeError: if the calibration pulled from the json is not a dictionary or list. - """ - - with open(file_path, "rb") as f: - calibration = json.load(f) - if isinstance(calibration, dict): - calibration = {int(key): float(value) for key, value in calibration.items()} - return PumpCalibration.load_from_dict( - calibration=calibration, calibration_mode=calibration_mode - ) - if isinstance(calibration, list): - return PumpCalibration(calibration=calibration, calibration_mode=calibration_mode) - raise TypeError(f"Calibration pulled from {file_path} is not a dictionary or list.") - - @classmethod - def load_from_csv( - cls, - file_path: str, - calibration_mode: Literal["duration", "revolutions"] = "duration", - ) -> PumpCalibration: - """Load a calibration from a csv file. - - Args: - file_path: csv file to load calibration from. 0-indexed. The first column is treated as the - index, the second column as the value. - calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for - volume per revolution. Defaults to "duration". - """ - - with open(file_path, encoding="utf-8", newline="") as f: - csv_file = list(csv.reader(f)) - num_columns = len(csv_file[0]) - if num_columns != 2: - raise ValueError("CSV file must have two columns.") - calibration = {int(row[0]): float(row[1]) for row in csv_file} - return PumpCalibration.load_from_dict( - calibration=calibration, calibration_mode=calibration_mode - ) - - @classmethod - def load_from_dict( - cls, - calibration: Dict[int, Union[int, float]], - calibration_mode: Literal["duration", "revolutions"] = "duration", - ) -> PumpCalibration: - """Load a calibration from a dictionary. - - Args: - calibration: dictionary to load calibration from. 0-indexed. - calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for - volume per revolution. Defaults to "duration". - - Raises: - ValueError: if the calibration dictionary is not formatted correctly. - """ - - if sorted(calibration.keys()) != list(range(len(calibration))): - raise ValueError("Keys must be a contiguous range of integers starting at 0.") - calibration_list = [calibration[key] for key in sorted(calibration.keys())] - return cls(calibration=calibration_list, calibration_mode=calibration_mode) - - @classmethod - def load_from_list( - cls, - calibration: List[Union[int, float]], - calibration_mode: Literal["duration", "revolutions"] = "duration", - ) -> PumpCalibration: - """Load a calibration from a list. Equivalent to PumpCalibration(calibration). - - Args: - calibration: list to load calibration from. - calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for - volume per revolution. Defaults to "duration". - """ - - return cls(calibration=calibration, calibration_mode=calibration_mode) - - @classmethod - def load_from_value( - cls, - value: Union[float, int], - num_items: int, - calibration_mode: Literal["duration", "revolutions"] = "duration", - ) -> PumpCalibration: - """Load a calibration from a value. Equivalent to PumpCalibration([value] * num_items). - - Args: - value: value to load calibration from. - num_items: number of items in the calibration. - calibration_mode: units of the calibration. "duration" for volume per time, "revolutions" for - volume per revolution. Defaults to "duration". - """ - - calibration = [value] * num_items - return cls(calibration, calibration_mode) +__all__ = ["PumpCalibration"] diff --git a/pylabrobot/legacy/pumps/cole_parmer/masterflex_backend.py b/pylabrobot/legacy/pumps/cole_parmer/masterflex_backend.py index 9612f02a024..c7ca690b254 100644 --- a/pylabrobot/legacy/pumps/cole_parmer/masterflex_backend.py +++ b/pylabrobot/legacy/pumps/cole_parmer/masterflex_backend.py @@ -1,91 +1,48 @@ -try: - import serial # type: ignore +"""Legacy. Use pylabrobot.cole_parmer instead.""" - HAS_SERIAL = True -except ImportError as e: - HAS_SERIAL = False - _SERIAL_IMPORT_ERROR = e - -from pylabrobot.io.serial import Serial +from pylabrobot.cole_parmer.masterflex_backend import MasterflexBackend as _NewBackend +from pylabrobot.cole_parmer.masterflex_backend import MasterflexDriver from pylabrobot.legacy.pumps.backend import PumpBackend class MasterflexBackend(PumpBackend): - """Backend for the Cole Parmer Masterflex L/S pump + """Legacy. Use pylabrobot.cole_parmer.MasterflexBackend instead.""" - tested on: - 07551-20 + def __init__(self, com_port: str): + self._driver = MasterflexDriver(com_port=com_port) + self._backend = _NewBackend(self._driver) - should be same as: - 07522-20 - 07522-30 - 07551-30 - 07575-30 - 07575-40 + @property + def io(self): + return self._driver.io - Documentation available at: - - https://pim-resources.coleparmer.com/instruction-manual/a-1299-1127b-en.pdf - - https://web.archive.org/web/20210924061132/https://pim-resources.coleparmer.com/ - instruction-manual/a-1299-1127b-en.pdf - """ - - def __init__(self, com_port: str): - if not HAS_SERIAL: - raise RuntimeError( - "pyserial is not installed. Install with: pip install pylabrobot[serial]. " - f"Import error: {_SERIAL_IMPORT_ERROR}" - ) - self.com_port = com_port - self.io = Serial( - port=self.com_port, - baudrate=4800, - timeout=1, - parity=serial.PARITY_ODD, - stopbits=serial.STOPBITS_ONE, - bytesize=serial.SEVENBITS, - human_readable_device_name="Masterflex Pump", - ) + @io.setter + def io(self, value): + self._driver.io = value async def setup(self): - await self.io.setup() + await self._driver.setup() - await self.io.write(b"\x05") # Enquiry; ready to send. - await self.io.write(b"\x05P02\r") + async def stop(self): + await self._driver.stop() def serialize(self): - return {**super().serialize(), "com_port": self.com_port} - - async def stop(self): - await self.io.stop() + return {"type": self.__class__.__name__, "com_port": self._driver.com_port} async def send_command(self, command: str): - command = "\x02P02" + command + "\x0d" - await self.io.write(command.encode()) - return self.io.read() + return await self._driver.send_command(command) async def run_revolutions(self, num_revolutions: float): - num_revolutions = round(num_revolutions, 2) - cmd = f"V{num_revolutions}G" - await self.send_command(cmd) + await self._backend.run_revolutions(num_revolutions) async def run_continuously(self, speed: float): - if speed == 0: - self.halt() - return - - direction = "+" if speed > 0 else "-" - speed = int(abs(speed)) - cmd = f"S{direction}{speed}G0" - await self.send_command(cmd) + await self._backend.run_continuously(speed) async def halt(self): - await self.send_command("H") - - -# Deprecated alias with warning # TODO: remove mid May 2025 (giving people 1 month to update) -# https://github.com/PyLabRobot/pylabrobot/issues/466 + await self._backend.halt() +# Deprecated alias class Masterflex: def __init__(self, *args, **kwargs): raise RuntimeError("`Masterflex` is deprecated. Please use `MasterflexBackend` instead.") diff --git a/pylabrobot/legacy/pumps/pump.py b/pylabrobot/legacy/pumps/pump.py index 5b9d3b31670..1f28c81765e 100644 --- a/pylabrobot/legacy/pumps/pump.py +++ b/pylabrobot/legacy/pumps/pump.py @@ -1,12 +1,29 @@ -import asyncio from typing import Optional, Union +from pylabrobot.capabilities.pumping.backend import PumpBackend as _NewPumpBackend +from pylabrobot.capabilities.pumping.pumping import PumpingCapability from pylabrobot.legacy.machines.machine import Machine from .backend import PumpBackend from .calibration import PumpCalibration +class _PumpAdapter(_NewPumpBackend): + """Adapts a legacy PumpBackend to the new PumpBackend (CapabilityBackend).""" + + def __init__(self, legacy: PumpBackend): + self._legacy = legacy + + async def run_revolutions(self, num_revolutions: float): + self._legacy.run_revolutions(num_revolutions=num_revolutions) + + async def run_continuously(self, speed: float): + self._legacy.run_continuously(speed=speed) + + async def halt(self): + self._legacy.halt() + + class Pump(Machine): """Frontend for a (peristaltic) pump.""" @@ -16,10 +33,19 @@ def __init__( calibration: Optional[PumpCalibration] = None, ): super().__init__(backend=backend) - self.backend: PumpBackend = backend # fix type + self.backend: PumpBackend = backend if calibration is not None and len(calibration) != 1: raise ValueError("Calibration may only have a single item for this pump") self.calibration = calibration + self._pumping = PumpingCapability(backend=_PumpAdapter(backend), calibration=calibration) + + async def setup(self, **backend_kwargs): + await super().setup(**backend_kwargs) + await self._pumping._on_setup() + + async def stop(self): + await self._pumping._on_stop() + await super().stop() def serialize(self) -> dict: if self.calibration is None: @@ -39,63 +65,16 @@ def deserialize(cls, data: dict): return super().deserialize(data_copy) async def run_revolutions(self, num_revolutions: float): - """Run a given number of revolutions. This method will return after the command has been sent, - and the pump will run until `halt` is called. - - Args: - num_revolutions: number of revolutions to run - """ - - self.backend.run_revolutions(num_revolutions=num_revolutions) + await self._pumping.run_revolutions(num_revolutions=num_revolutions) async def run_continuously(self, speed: float): - """Run continuously at a given speed. This method will return after the command has been sent, - and the pump will run until `halt` is called. - - If speed is 0, the pump will be halted. - - Args: - speed: speed in rpm/pump-specific units. - """ - - self.backend.run_continuously(speed=speed) + await self._pumping.run_continuously(speed=speed) async def run_for_duration(self, speed: Union[float, int], duration: Union[float, int]): - """Run the pump at specified speed for the specified duration. - - Args: - speed: speed in rpm/pump-specific units. - duration: duration to run pump. - """ - - if duration < 0: - raise ValueError("Duration must be positive.") - await self.run_continuously(speed=speed) - await asyncio.sleep(duration) - await self.run_continuously(speed=0) + await self._pumping.run_for_duration(speed=speed, duration=duration) async def pump_volume(self, speed: Union[float, int], volume: Union[float, int]): - """Run the pump at specified speed for the specified volume. Note that this function requires - the pump to be calibrated at the input speed. - - Args: - speed: speed in rpm/pump-specific units. - volume: volume to pump. - """ - - if self.calibration is None: - raise TypeError( - "Pump is not calibrated. Volume based pumping and related functions unavailable." - ) - if self.calibration.calibration_mode == "duration": - duration = volume / self.calibration[0] - await self.run_for_duration(speed=speed, duration=duration) - elif self.calibration.calibration_mode == "revolutions": - num_revolutions = volume / self.calibration[0] - await self.run_revolutions(num_revolutions=num_revolutions) - else: - raise ValueError("Calibration mode not recognized.") + await self._pumping.pump_volume(speed=speed, volume=volume) async def halt(self): - """Halt the pump.""" - self.backend.halt() + await self._pumping.halt() diff --git a/pylabrobot/legacy/pumps/pumparray.py b/pylabrobot/legacy/pumps/pumparray.py index 9d7ca1a5cec..af5d7ff1034 100644 --- a/pylabrobot/legacy/pumps/pumparray.py +++ b/pylabrobot/legacy/pumps/pumparray.py @@ -1,22 +1,35 @@ import asyncio from typing import List, Optional, Union +from pylabrobot.capabilities.pumping.backend import PumpBackend as _NewPumpBackend +from pylabrobot.capabilities.pumping.pumping import PumpingCapability from pylabrobot.legacy.machines.machine import Machine from pylabrobot.legacy.pumps.backend import PumpArrayBackend from pylabrobot.legacy.pumps.calibration import PumpCalibration from pylabrobot.legacy.pumps.errors import NotCalibratedError -class PumpArray(Machine): - """Front-end for a pump array. +class _ChannelAdapter(_NewPumpBackend): + """Adapts one channel of a legacy PumpArrayBackend to the new PumpBackend.""" + + def __init__(self, legacy: PumpArrayBackend, channel: int): + self._legacy = legacy + self._channel = channel - Attributes: - backend: The backend that the pump array is controlled through. - calibration: The calibration of the pump. + async def run_revolutions(self, num_revolutions: float): + await self._legacy.run_revolutions( + num_revolutions=[num_revolutions], use_channels=[self._channel] + ) - Properties: - num_channels: The number of channels that the pump array has. - """ + async def run_continuously(self, speed: float): + await self._legacy.run_continuously(speed=[speed], use_channels=[self._channel]) + + async def halt(self): + await self._legacy.run_continuously(speed=[0.0], use_channels=[self._channel]) + + +class PumpArray(Machine): + """Front-end for a pump array.""" def __init__( self, @@ -24,18 +37,27 @@ def __init__( calibration: Optional[PumpCalibration] = None, ): super().__init__(backend=backend) - self.backend: PumpArrayBackend = backend # fix type + self.backend: PumpArrayBackend = backend self.calibration = calibration + self._pumps: List[PumpingCapability] = [] @property def num_channels(self) -> int: - """Returns the number of channels that the pump array has. + return self.backend.num_channels - Returns: - int: The number of channels that the pump array has. - """ + async def setup(self, **backend_kwargs): + await super().setup(**backend_kwargs) + self._pumps = [ + PumpingCapability(backend=_ChannelAdapter(self.backend, ch)) + for ch in range(self.num_channels) + ] + for p in self._pumps: + await p._on_setup() - return self.backend.num_channels + async def stop(self): + for p in reversed(self._pumps): + await p._on_stop() + await super().stop() def serialize(self) -> dict: if self.calibration is None: @@ -54,61 +76,56 @@ def deserialize(cls, data: dict): data_copy["calibration"] = calibration return super().deserialize(data_copy) + # -- helpers ---------------------------------------------------------------- + + def _normalize_channels(self, use_channels: Union[int, List[int]]) -> List[int]: + if isinstance(use_channels, int): + use_channels = [use_channels] + if len(set(use_channels)) != len(use_channels): + raise ValueError("Channels in use channels must be unique.") + if any(ch not in range(0, self.num_channels) for ch in use_channels): + raise ValueError( + f"Pump address out of range for this pump array. " + f"Value should be between 0 and {self.num_channels - 1}" + ) + if any(ch < 0 for ch in use_channels): + raise ValueError("Channels in use channels must be positive.") + return use_channels + + @staticmethod + def _normalize_speeds(speed: Union[float, int, List[float], List[int]], n: int) -> List[float]: + if isinstance(speed, (float, int)): + speed = [float(speed)] * n + if any(s < 0 for s in speed): + raise ValueError("Speed must be positive.") + if len(speed) != n: + raise ValueError("Speed and use_channels must be the same length.") + return [float(s) for s in speed] + + # -- public API ------------------------------------------------------------- + async def run_revolutions( self, num_revolutions: Union[float, List[float]], use_channels: Union[int, List[int]], ): - """Run the specified channels for the specified number of revolutions. - - Args: - num_revolutions: number of revolutions to run pumps. - use_channels: pump array channels to run. - """ - - if isinstance(use_channels, int): - use_channels = [use_channels] - if isinstance(num_revolutions, float): - num_revolutions = [num_revolutions] * len(use_channels) - await self.backend.run_revolutions(num_revolutions=num_revolutions, use_channels=use_channels) + channels = self._normalize_channels(use_channels) + if isinstance(num_revolutions, (float, int)): + num_revolutions = [float(num_revolutions)] * len(channels) + if len(num_revolutions) != len(channels): + raise ValueError("num_revolutions and use_channels must be the same length.") + for ch, rev in zip(channels, num_revolutions): + await self._pumps[ch].run_revolutions(num_revolutions=rev) async def run_continuously( self, speed: Union[float, int, List[float], List[int]], use_channels: Union[int, List[int]], ): - """Run the specified channels at the specified speeds. - - Args: - speed: speed in rpm/pump-specific units. - use_channels: pump array channels to run. - """ - - if isinstance(use_channels, list) and len(set(use_channels)) != len(use_channels): - raise ValueError("Channels in use channels must be unique.") - if isinstance(use_channels, int): - use_channels = [use_channels] - if isinstance(speed, (float, int)): - speed = [speed] * len(use_channels) - - if any(channel not in range(0, self.num_channels) for channel in use_channels): - raise ValueError( - f"Pump address out of range for this pump array. \ - Value should be between 0 and {self.num_channels}" - ) - if any(speed < 0 for speed in speed): - raise ValueError("Speed must be positive.") - if isinstance(speed[0], int): - speed = [float(x) for x in speed] - if len(speed) != len(use_channels): - raise ValueError("Speed and use_channels must be the same length.") - if any(channel < 0 for channel in use_channels): - raise ValueError("Channels in use channels must be positive.") - - await self.backend.run_continuously( - speed=speed, # type: ignore[arg-type] - use_channels=use_channels, - ) + channels = self._normalize_channels(use_channels) + speeds = self._normalize_speeds(speed, len(channels)) + for ch, s in zip(channels, speeds): + await self._pumps[ch].run_continuously(speed=s) async def run_for_duration( self, @@ -116,14 +133,6 @@ async def run_for_duration( use_channels: Union[int, List[int]], duration: Union[float, int], ): - """Run the specified channels at the specified speeds for the specified duration. - - Args: - speed: speed in rpm/pump-specific units. - use_channels: pump array channels to run. - duration: duration to run pumps (seconds). - """ - if duration < 0: raise ValueError("Duration must be positive.") await self.run_continuously(speed=speed, use_channels=use_channels) @@ -136,58 +145,35 @@ async def pump_volume( use_channels: Union[int, List[int]], volume: Union[float, int, List[float], List[int]], ): - """Run the specified channels at the specified speeds for the specified volume. Note that this - function requires the pump to be calibrated at the input speed. - - Args: - speed: speed in rpm/pump-specific units. use_channels: pump array channels to run using - 0-index. volume: volume to pump. - calibration_mode: units of calibration. Volume per seconds ("duration") or volume per - revolution ("revolutions"). - - Raises: - NotCalibratedError: if the pump is not calibrated. - """ - if self.calibration is None: raise NotCalibratedError( "Pump is not calibrated. Volume based pumping and related functions unavailable." ) - if isinstance(use_channels, int): - use_channels = [use_channels] - if isinstance(speed, (float, int)): - speed = [speed] * len(use_channels) + channels = self._normalize_channels(use_channels) + speeds = self._normalize_speeds(speed, len(channels)) if isinstance(volume, (float, int)): - volume = [volume] * len(use_channels) + volume = [float(volume)] * len(channels) if not all(vol >= 0 for vol in volume): raise ValueError("Volume must be positive.") - if not len(speed) == len(use_channels) == len(volume): + if len(volume) != len(channels): raise ValueError("Speed, use_channels, and volume must be the same length.") if self.calibration.calibration_mode == "duration": durations = [ channel_volume / self.calibration[channel] - for channel, channel_volume in zip(use_channels, volume) + for channel, channel_volume in zip(channels, volume) ] tasks = [ - asyncio.create_task( - self.run_for_duration( - speed=channel_speed, - use_channels=channel, - duration=duration, - ) - ) - for channel_speed, channel, duration in zip(speed, use_channels, durations) + asyncio.create_task(self.run_for_duration(speed=s, use_channels=ch, duration=d)) + for s, ch, d in zip(speeds, channels, durations) ] elif self.calibration.calibration_mode == "revolutions": num_rotations = [ channel_volume / self.calibration[channel] - for channel, channel_volume in zip(use_channels, volume) + for channel, channel_volume in zip(channels, volume) ] tasks = [ - asyncio.create_task( - self.run_revolutions(num_revolutions=num_rotation, use_channels=channel) - ) - for num_rotation, channel in zip(num_rotations, use_channels) + asyncio.create_task(self.run_revolutions(num_revolutions=r, use_channels=ch)) + for r, ch in zip(num_rotations, channels) ] else: raise ValueError("Calibration mode must be 'duration' or 'revolutions'.")