diff --git a/pylabrobot/bulk_dispensers/__init__.py b/pylabrobot/bulk_dispensers/__init__.py new file mode 100644 index 00000000000..e224ad00513 --- /dev/null +++ b/pylabrobot/bulk_dispensers/__init__.py @@ -0,0 +1,3 @@ +from pylabrobot.bulk_dispensers.backend import BulkDispenserBackend +from pylabrobot.bulk_dispensers.bulk_dispenser import BulkDispenser +from pylabrobot.bulk_dispensers.chatterbox import BulkDispenserChatterboxBackend diff --git a/pylabrobot/bulk_dispensers/backend.py b/pylabrobot/bulk_dispensers/backend.py new file mode 100644 index 00000000000..a7c7abfeaf2 --- /dev/null +++ b/pylabrobot/bulk_dispensers/backend.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +from abc import ABCMeta, abstractmethod + +from pylabrobot.machines.backend import MachineBackend + + +class BulkDispenserBackend(MachineBackend, metaclass=ABCMeta): + """Abstract class for bulk dispenser backends. + + Volumes are specified in microliters (float). Concrete backends are responsible + for converting to instrument-specific units. + """ + + @abstractmethod + async def dispense(self) -> None: + pass + + @abstractmethod + async def prime(self, volume: float) -> None: + pass + + @abstractmethod + async def empty(self, volume: float) -> None: + pass + + @abstractmethod + async def shake(self, time: float, distance: int, speed: int) -> None: + """Shake the plate. + + Args: + time: Shake duration in seconds. + distance: Shake distance in mm (1-5). + speed: Shake frequency in Hz (1-20). + """ + + @abstractmethod + async def move_plate_out(self) -> None: + pass + + @abstractmethod + async def set_plate_type(self, plate_type: int) -> None: + pass + + @abstractmethod + async def set_cassette_type(self, cassette_type: int) -> None: + pass + + @abstractmethod + async def set_column_volume(self, column: int, volume: float) -> None: + """Set dispense volume for a column. + + Args: + column: Column number (0 = all columns). + volume: Volume in microliters. + """ + + @abstractmethod + async def set_dispensing_height(self, height: int) -> None: + """Set dispensing height. + + Args: + height: Height in 1/100 mm (500-5500). + """ + + @abstractmethod + async def set_pump_speed(self, speed: int) -> None: + """Set pump speed as percentage of cassette range. + + Args: + speed: Speed percentage (1-100). + """ + + @abstractmethod + async def set_dispensing_order(self, order: int) -> None: + """Set dispensing order. + + Args: + order: 0 = row-wise, 1 = column-wise. + """ + + @abstractmethod + async def abort(self) -> None: + pass diff --git a/pylabrobot/bulk_dispensers/bulk_dispenser.py b/pylabrobot/bulk_dispensers/bulk_dispenser.py new file mode 100644 index 00000000000..e4783df925b --- /dev/null +++ b/pylabrobot/bulk_dispensers/bulk_dispenser.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from pylabrobot.bulk_dispensers.backend import BulkDispenserBackend +from pylabrobot.machines.machine import Machine, need_setup_finished + + +class BulkDispenser(Machine): + """Frontend for bulk reagent dispensers.""" + + def __init__(self, backend: BulkDispenserBackend) -> None: + super().__init__(backend=backend) + self.backend: BulkDispenserBackend = backend + + @need_setup_finished + async def dispense(self, **backend_kwargs) -> None: + await self.backend.dispense(**backend_kwargs) + + @need_setup_finished + async def prime(self, volume: float, **backend_kwargs) -> None: + await self.backend.prime(volume=volume, **backend_kwargs) + + @need_setup_finished + async def empty(self, volume: float, **backend_kwargs) -> None: + await self.backend.empty(volume=volume, **backend_kwargs) + + @need_setup_finished + async def shake(self, time: float, distance: int, speed: int, **backend_kwargs) -> None: + await self.backend.shake(time=time, distance=distance, speed=speed, **backend_kwargs) + + @need_setup_finished + async def move_plate_out(self, **backend_kwargs) -> None: + await self.backend.move_plate_out(**backend_kwargs) + + @need_setup_finished + async def set_plate_type(self, plate_type: int, **backend_kwargs) -> None: + await self.backend.set_plate_type(plate_type=plate_type, **backend_kwargs) + + @need_setup_finished + async def set_cassette_type(self, cassette_type: int, **backend_kwargs) -> None: + await self.backend.set_cassette_type(cassette_type=cassette_type, **backend_kwargs) + + @need_setup_finished + async def set_column_volume(self, column: int, volume: float, **backend_kwargs) -> None: + await self.backend.set_column_volume(column=column, volume=volume, **backend_kwargs) + + @need_setup_finished + async def set_dispensing_height(self, height: int, **backend_kwargs) -> None: + await self.backend.set_dispensing_height(height=height, **backend_kwargs) + + @need_setup_finished + async def set_pump_speed(self, speed: int, **backend_kwargs) -> None: + await self.backend.set_pump_speed(speed=speed, **backend_kwargs) + + @need_setup_finished + async def set_dispensing_order(self, order: int, **backend_kwargs) -> None: + await self.backend.set_dispensing_order(order=order, **backend_kwargs) + + @need_setup_finished + async def abort(self, **backend_kwargs) -> None: + await self.backend.abort(**backend_kwargs) diff --git a/pylabrobot/bulk_dispensers/chatterbox.py b/pylabrobot/bulk_dispensers/chatterbox.py new file mode 100644 index 00000000000..eddf02b68c8 --- /dev/null +++ b/pylabrobot/bulk_dispensers/chatterbox.py @@ -0,0 +1,47 @@ +from pylabrobot.bulk_dispensers.backend import BulkDispenserBackend + + +class BulkDispenserChatterboxBackend(BulkDispenserBackend): + """A backend that prints operations for testing without hardware.""" + + async def setup(self) -> None: + print("Setting up bulk dispenser.") + + async def stop(self) -> None: + print("Stopping bulk dispenser.") + + async def dispense(self) -> None: + print("Dispensing.") + + async def prime(self, volume: float) -> None: + print(f"Priming with {volume} uL.") + + async def empty(self, volume: float) -> None: + print(f"Emptying with {volume} uL.") + + async def shake(self, time: float, distance: int, speed: int) -> None: + print(f"Shaking for {time}s, distance={distance}mm, speed={speed}Hz.") + + async def move_plate_out(self) -> None: + print("Moving plate out.") + + async def set_plate_type(self, plate_type: int) -> None: + print(f"Setting plate type to {plate_type}.") + + async def set_cassette_type(self, cassette_type: int) -> None: + print(f"Setting cassette type to {cassette_type}.") + + async def set_column_volume(self, column: int, volume: float) -> None: + print(f"Setting column {column} volume to {volume} uL.") + + async def set_dispensing_height(self, height: int) -> None: + print(f"Setting dispensing height to {height}.") + + async def set_pump_speed(self, speed: int) -> None: + print(f"Setting pump speed to {speed}%.") + + async def set_dispensing_order(self, order: int) -> None: + print(f"Setting dispensing order to {order}.") + + async def abort(self) -> None: + print("Aborting.") diff --git a/pylabrobot/bulk_dispensers/tests/__init__.py b/pylabrobot/bulk_dispensers/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/pylabrobot/bulk_dispensers/tests/bulk_dispenser_tests.py b/pylabrobot/bulk_dispensers/tests/bulk_dispenser_tests.py new file mode 100644 index 00000000000..26314145247 --- /dev/null +++ b/pylabrobot/bulk_dispensers/tests/bulk_dispenser_tests.py @@ -0,0 +1,91 @@ +import unittest + +from pylabrobot.bulk_dispensers import ( + BulkDispenser, + BulkDispenserBackend, + BulkDispenserChatterboxBackend, +) + + +class BulkDispenserSetupTests(unittest.IsolatedAsyncioTestCase): + """Test setup/stop lifecycle and need_setup_finished guard.""" + + def setUp(self): + self.backend = BulkDispenserChatterboxBackend() + self.dispenser = BulkDispenser(backend=self.backend) + + async def test_methods_fail_before_setup(self): + with self.assertRaises(RuntimeError): + await self.dispenser.dispense() + with self.assertRaises(RuntimeError): + await self.dispenser.prime(volume=100.0) + with self.assertRaises(RuntimeError): + await self.dispenser.abort() + + async def test_setup_and_stop(self): + await self.dispenser.setup() + self.assertTrue(self.dispenser.setup_finished) + await self.dispenser.stop() + self.assertFalse(self.dispenser.setup_finished) + + async def test_context_manager(self): + async with BulkDispenser(backend=BulkDispenserChatterboxBackend()) as d: + self.assertTrue(d.setup_finished) + self.assertFalse(d.setup_finished) + + +class BulkDispenserDelegationTests(unittest.IsolatedAsyncioTestCase): + """Test that frontend methods delegate to the backend.""" + + async def asyncSetUp(self): + self.backend = unittest.mock.MagicMock(spec=BulkDispenserBackend) + self.dispenser = BulkDispenser(backend=self.backend) + self.dispenser._setup_finished = True + + async def test_dispense(self): + await self.dispenser.dispense() + self.backend.dispense.assert_awaited_once() + + async def test_prime(self): + await self.dispenser.prime(volume=50.0) + self.backend.prime.assert_awaited_once_with(volume=50.0) + + async def test_empty(self): + await self.dispenser.empty(volume=100.0) + self.backend.empty.assert_awaited_once_with(volume=100.0) + + async def test_shake(self): + await self.dispenser.shake(time=5.0, distance=3, speed=10) + self.backend.shake.assert_awaited_once_with(time=5.0, distance=3, speed=10) + + async def test_move_plate_out(self): + await self.dispenser.move_plate_out() + self.backend.move_plate_out.assert_awaited_once() + + async def test_set_plate_type(self): + await self.dispenser.set_plate_type(plate_type=3) + self.backend.set_plate_type.assert_awaited_once_with(plate_type=3) + + async def test_set_cassette_type(self): + await self.dispenser.set_cassette_type(cassette_type=1) + self.backend.set_cassette_type.assert_awaited_once_with(cassette_type=1) + + async def test_set_column_volume(self): + await self.dispenser.set_column_volume(column=0, volume=25.0) + self.backend.set_column_volume.assert_awaited_once_with(column=0, volume=25.0) + + async def test_set_dispensing_height(self): + await self.dispenser.set_dispensing_height(height=2500) + self.backend.set_dispensing_height.assert_awaited_once_with(height=2500) + + async def test_set_pump_speed(self): + await self.dispenser.set_pump_speed(speed=50) + self.backend.set_pump_speed.assert_awaited_once_with(speed=50) + + async def test_set_dispensing_order(self): + await self.dispenser.set_dispensing_order(order=1) + self.backend.set_dispensing_order.assert_awaited_once_with(order=1) + + async def test_abort(self): + await self.dispenser.abort() + self.backend.abort.assert_awaited_once() diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/__init__.py b/pylabrobot/bulk_dispensers/thermo_scientific/__init__.py new file mode 100644 index 00000000000..96f12e46471 --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/__init__.py @@ -0,0 +1,13 @@ +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi import ( + CassetteType, + DispensingOrder, + EmptyMode, + MultidropCombiBackend, + MultidropCombiCommunicationError, + MultidropCombiError, + MultidropCombiInstrumentError, + PrimeMode, + plate_to_pla_params, + plate_to_type_index, + plate_well_count, +) diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/__init__.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/__init__.py new file mode 100644 index 00000000000..0d4ac7fe73b --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/__init__.py @@ -0,0 +1,19 @@ +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.backend import ( + MultidropCombiBackend, +) +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.enums import ( + CassetteType, + DispensingOrder, + EmptyMode, + PrimeMode, +) +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.errors import ( + MultidropCombiCommunicationError, + MultidropCombiError, + MultidropCombiInstrumentError, +) +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.helpers import ( + plate_to_pla_params, + plate_to_type_index, + plate_well_count, +) diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/actions.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/actions.py new file mode 100644 index 00000000000..d0bc885d3bb --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/actions.py @@ -0,0 +1,30 @@ +"""Control operations mixin for the Multidrop Combi.""" + +from __future__ import annotations + +from typing import Optional + +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.errors import ( + MultidropCombiCommunicationError, +) +from pylabrobot.io.serial import Serial + + +class MultidropCombiActionsMixin: + """Mixin providing control operations for the Multidrop Combi.""" + + io: Optional[Serial] + + async def abort(self) -> None: + """Send ESC character to abort the current operation.""" + if self.io is None: + raise MultidropCombiCommunicationError("Not connected to instrument", operation="abort") + await self.io.write(b"\x1b") + + async def restart(self) -> None: + """Restart the instrument (equivalent to power cycle).""" + await self._send_command("RST", timeout=10.0) # type: ignore[attr-defined] + + async def acknowledge_error(self) -> None: + """Clear instrument error state.""" + await self._send_command("EAK", timeout=5.0) # type: ignore[attr-defined] diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/backend.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/backend.py new file mode 100644 index 00000000000..fe373c6f96d --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/backend.py @@ -0,0 +1,123 @@ +"""Composed backend for the Thermo Scientific Multidrop Combi.""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Optional + +from pylabrobot.bulk_dispensers.backend import BulkDispenserBackend +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.actions import ( + MultidropCombiActionsMixin, +) +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.commands import ( + MultidropCombiCommandsMixin, +) +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.communication import ( + MultidropCombiCommunicationMixin, +) +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.queries import ( + MultidropCombiQueriesMixin, +) +from pylabrobot.io.serial import Serial + +logger = logging.getLogger(__name__) + + +class MultidropCombiBackend( + MultidropCombiCommunicationMixin, + MultidropCombiQueriesMixin, + MultidropCombiActionsMixin, + MultidropCombiCommandsMixin, + BulkDispenserBackend, +): + """Backend for the Thermo Scientific Multidrop Combi reagent dispenser. + + Communication is via RS232/USB serial at 9600 baud, 8N1. + + Args: + port: Serial port (e.g. "COM3", "/dev/ttyUSB0"). If None, auto-detected by VID/PID. + timeout: Default serial read timeout in seconds. + """ + + def __init__( + self, + port: str | None = None, + timeout: float = 30.0, + ) -> None: + super().__init__() + self._port = port + self.timeout = timeout + self.io: Optional[Serial] = None + self._command_lock: Optional[asyncio.Lock] = None + self._instrument_name: str = "" + self._firmware_version: str = "" + self._serial_number: str = "" + + async def setup(self) -> None: + self._command_lock = asyncio.Lock() + + # When port is specified, skip VID/PID discovery (the Multidrop is often + # connected via an RS232-to-USB adapter with a different VID/PID). + if self._port: + self.io = Serial( + human_readable_device_name="Multidrop Combi", + port=self._port, + baudrate=9600, + bytesize=8, + parity="N", + stopbits=1, + timeout=self.timeout, + write_timeout=5, + ) + else: + self.io = Serial( + human_readable_device_name="Multidrop Combi", + vid=0x0AB6, + pid=0x0344, + baudrate=9600, + bytesize=8, + parity="N", + stopbits=1, + timeout=self.timeout, + write_timeout=5, + ) + await self.io.setup() + + # Enable XON/XOFF flow control on the underlying serial port + if self.io._ser is not None: + self.io._ser.xonxoff = True + + await self._drain_stale_data() + + info = await self._enter_remote_mode() + self._instrument_name = info["instrument_name"] + self._firmware_version = info["firmware_version"] + self._serial_number = info["serial_number"] + + logger.info( + "Connected to %s (FW: %s, SN: %s)", + self._instrument_name, + self._firmware_version, + self._serial_number, + ) + + # Clear any pending errors + try: + await self.acknowledge_error() + except Exception: + pass + + async def stop(self) -> None: + await self._exit_remote_mode() + if self.io is not None: + await self.io.stop() + self.io = None + self._command_lock = None + + def serialize(self) -> dict: + return { + **super().serialize(), + "port": self._port, + "timeout": self.timeout, + } diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/commands.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/commands.py new file mode 100644 index 00000000000..7e4fd3de14a --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/commands.py @@ -0,0 +1,251 @@ +"""Operational commands mixin for the Multidrop Combi. + +All volume parameters at the public interface are in microliters (float). +Internally, volumes are converted to the instrument's native 1/10 uL units. +""" + +from __future__ import annotations + +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.enums import ( + EmptyMode, + PrimeMode, +) + +# Per-command timeout constants (seconds) +COMMAND_TIMEOUTS = { + "SPL": 5.0, + "SCT": 5.0, + "SCV": 5.0, + "SDH": 5.0, + "SPS": 5.0, + "SDO": 5.0, + "SOF": 5.0, + "SPV": 5.0, + "PLA": 5.0, + "EAK": 5.0, + "POU": 10.0, + "RST": 10.0, + "DIS": 120.0, + "PRI": 60.0, + "EMP": 60.0, + "SHA": 120.0, + "BGN": 120.0, +} + + +def _ul_to_tenths(volume_ul: float) -> int: + """Convert microliters to 1/10 uL integer.""" + return round(volume_ul * 10) + + +class MultidropCombiCommandsMixin: + """Mixin providing operational commands for the Multidrop Combi.""" + + async def dispense(self) -> None: + """Dispense liquid to the plate (DIS command).""" + await self._send_command("DIS", timeout=COMMAND_TIMEOUTS["DIS"]) # type: ignore[attr-defined] + + async def prime(self, volume: float, mode: PrimeMode = PrimeMode.STANDARD) -> None: + """Prime dispenser hoses. + + Args: + volume: Prime volume in microliters. + mode: Prime mode (standard, continuous, stop continuous, calibration). + """ + vol_tenths = _ul_to_tenths(volume) + if vol_tenths < 10 or vol_tenths > 100000: + raise ValueError(f"Prime volume must be 1-10000 uL, got {volume} uL") + cmd = f"PRI {vol_tenths}" + if mode != PrimeMode.STANDARD: + cmd += f" {mode.value}" + timeout = COMMAND_TIMEOUTS["PRI"] + volume / 100.0 + await self._send_command(cmd, timeout=timeout) # type: ignore[attr-defined] + + async def empty(self, volume: float, mode: EmptyMode = EmptyMode.STANDARD) -> None: + """Empty dispenser hoses. + + Args: + volume: Empty volume in microliters. + mode: Empty mode (standard or continuous). + """ + vol_tenths = _ul_to_tenths(volume) + if vol_tenths < 10 or vol_tenths > 100000: + raise ValueError(f"Empty volume must be 1-10000 uL, got {volume} uL") + cmd = f"EMP {vol_tenths}" + if mode != EmptyMode.STANDARD: + cmd += f" {mode.value}" + timeout = COMMAND_TIMEOUTS["EMP"] + volume / 100.0 + await self._send_command(cmd, timeout=timeout) # type: ignore[attr-defined] + + async def shake(self, time: float, distance: int, speed: int) -> None: + """Shake the plate. + + Args: + time: Duration in seconds. + distance: Shake distance in mm (1-5). + speed: Shake frequency in Hz (1-20). + """ + if not 1 <= distance <= 5: + raise ValueError(f"Shake distance must be 1-5 mm, got {distance}") + if not 1 <= speed <= 20: + raise ValueError(f"Shake speed must be 1-20 Hz, got {speed}") + time_hundredths = round(time * 100) + if time_hundredths < 1: + raise ValueError(f"Shake time must be > 0, got {time}s") + timeout = COMMAND_TIMEOUTS["SHA"] + time + await self._send_command( # type: ignore[attr-defined] + f"SHA {time_hundredths} {distance} {speed}", timeout=timeout + ) + + async def move_plate_out(self) -> None: + """Move plate carrier to loading position (POU command).""" + await self._send_command( # type: ignore[attr-defined] + "POU", timeout=COMMAND_TIMEOUTS["POU"] + ) + + async def set_plate_type(self, plate_type: int) -> None: + """Set plate type. + + Args: + plate_type: Plate type index (0-29; 0-9 factory-defined, 10-29 user-defined). + """ + if not 0 <= plate_type <= 29: + raise ValueError(f"Plate type must be 0-29, got {plate_type}") + await self._send_command( # type: ignore[attr-defined] + f"SPL {plate_type}", timeout=COMMAND_TIMEOUTS["SPL"] + ) + + async def set_cassette_type(self, cassette_type: int) -> None: + """Set cassette type. + + Args: + cassette_type: Cassette type (0=Standard, 1=Small, 2-3=User-defined). + """ + if not 0 <= cassette_type <= 3: + raise ValueError(f"Cassette type must be 0-3, got {cassette_type}") + await self._send_command( # type: ignore[attr-defined] + f"SCT {cassette_type}", timeout=COMMAND_TIMEOUTS["SCT"] + ) + + async def set_column_volume(self, column: int, volume: float) -> None: + """Set dispense volume for a column. + + Args: + column: Column number (0 = all columns, 1-48 = specific column). + volume: Volume in microliters. + """ + if not 0 <= column <= 48: + raise ValueError(f"Column must be 0-48, got {column}") + vol_tenths = _ul_to_tenths(volume) + await self._send_command( # type: ignore[attr-defined] + f"SCV {column} {vol_tenths}", timeout=COMMAND_TIMEOUTS["SCV"] + ) + + async def set_dispensing_height(self, height: int) -> None: + """Set dispensing height. + + Args: + height: Height in 1/100 mm (500-5500). + """ + if not 500 <= height <= 5500: + raise ValueError(f"Dispensing height must be 500-5500, got {height}") + await self._send_command( # type: ignore[attr-defined] + f"SDH {height}", timeout=COMMAND_TIMEOUTS["SDH"] + ) + + async def set_pump_speed(self, speed: int) -> None: + """Set pump speed as percentage of cassette range. + + Args: + speed: Speed percentage (1-100). + """ + if not 1 <= speed <= 100: + raise ValueError(f"Pump speed must be 1-100, got {speed}") + await self._send_command( # type: ignore[attr-defined] + f"SPS {speed}", timeout=COMMAND_TIMEOUTS["SPS"] + ) + + async def set_dispensing_order(self, order: int) -> None: + """Set dispensing order. + + Args: + order: 0 = row-wise, 1 = column-wise. + """ + if order not in (0, 1): + raise ValueError(f"Dispensing order must be 0 or 1, got {order}") + await self._send_command( # type: ignore[attr-defined] + f"SDO {order}", timeout=COMMAND_TIMEOUTS["SDO"] + ) + + async def set_dispense_offset(self, x_offset: int, y_offset: int) -> None: + """Set X/Y dispense offset. + + Args: + x_offset: X offset in 1/100 mm (±300). + y_offset: Y offset in 1/100 mm (±300). + """ + if not -300 <= x_offset <= 300: + raise ValueError(f"X offset must be ±300, got {x_offset}") + if not -300 <= y_offset <= 300: + raise ValueError(f"Y offset must be ±300, got {y_offset}") + await self._send_command( # type: ignore[attr-defined] + f"SOF {x_offset} {y_offset}", timeout=COMMAND_TIMEOUTS["SOF"] + ) + + async def set_predispense_volume(self, volume: float) -> None: + """Set predispense volume. + + Args: + volume: Predispense volume in microliters. + """ + vol_tenths = _ul_to_tenths(volume) + if vol_tenths < 10 or vol_tenths > 100000: + raise ValueError(f"Predispense volume must be 1-10000 uL, got {volume} uL") + await self._send_command( # type: ignore[attr-defined] + f"SPV {vol_tenths}", timeout=COMMAND_TIMEOUTS["SPV"] + ) + + async def define_plate( + self, + column_positions: int, + row_positions: int, + rows: int, + columns: int, + height: int, + max_volume: int, + x_offset: int = 0, + y_offset: int = 0, + ) -> None: + """Define a remote plate (PLA command). + + Args: + column_positions: Number of column positions. + row_positions: Number of row positions. + rows: Number of rows. + columns: Number of columns. + height: Plate height in 1/100 mm. + max_volume: Maximum well volume in 1/10 uL. + x_offset: X offset in 1/100 mm. + y_offset: Y offset in 1/100 mm. + """ + await self._send_command( # type: ignore[attr-defined] + f"PLA {column_positions} {row_positions} {rows} {columns} " + f"{height} {max_volume} {x_offset} {y_offset}", + timeout=COMMAND_TIMEOUTS["PLA"], + ) + + async def start_protocol( + self, plate_type: int | None = None, protocol_name: str | None = None + ) -> None: + """Start a protocol from instrument memory (BGN command). + + Args: + plate_type: Optional plate type override. + protocol_name: Optional protocol name. + """ + cmd = "BGN" + if plate_type is not None: + cmd += f" {plate_type}" + if protocol_name is not None: + cmd += f" {protocol_name}" + await self._send_command(cmd, timeout=COMMAND_TIMEOUTS["BGN"]) # type: ignore[attr-defined] diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/communication.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/communication.py new file mode 100644 index 00000000000..c08de8f02ef --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/communication.py @@ -0,0 +1,223 @@ +"""Low-level serial communication mixin for the Multidrop Combi. + +Ported from the SiLA implementation's serial_transport.py to use +pylabrobot's async Serial wrapper. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Optional + +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.errors import ( + MultidropCombiCommunicationError, + MultidropCombiInstrumentError, +) +from pylabrobot.io.serial import Serial + +logger = logging.getLogger(__name__) + +STATUS_OK = 0 + +ERROR_DESCRIPTIONS = { + 1: "Internal firmware error", + 2: "Unrecognized command", + 3: "Invalid command arguments", + 4: "Pump position error", + 5: "Plate X position error", + 6: "Plate Y position error", + 7: "Z position error", + 9: "Attempt to reset serial number", + 10: "Nonvolatile parameters lost", + 11: "No more memory for user data", + 12: "Pump or X motor was running", + 13: "X and Z positions conflict", + 14: "Cannot dispense: pump not primed", + 15: "Missing prime vessel", + 16: "Rotor shield not in place", + 17: "Dispense volume for all columns is 0", + 18: "Invalid plate type (bad plate index)", + 19: "Plate has not been defined", + 20: "Invalid rows in plate definition", + 21: "Invalid columns in plate definition", + 22: "Plate height is invalid", + 23: "Plate well volume invalid (too small or too big)", + 24: "Invalid cassette type (bad cassette index)", + 25: "Cassette not defined", + 26: "Invalid volume increment for cassette", + 27: "Invalid maximum volume for cassette", + 28: "Invalid minimum volume for cassette", + 29: "Invalid min/max pump speed for cassette", + 30: "Invalid pump rotor offset in cassette definition", + 32: "Dispensing volume not within cassette limits", + 33: "Invalid selector channel", + 34: "Invalid dispensing speed", + 35: "Dispensing height too low for plate", + 36: "Predispense volume not within cassette limits", + 37: "Invalid dispensing order", + 38: "Invalid X or Y dispensing offset", + 39: "RFID option not present", + 40: "RFID tag not present", + 41: "RFID tag data checksum incorrect", + 43: "Wrong cassette type", + 44: "Protocol/plate in use, cannot modify or delete", + 45: "Protocol/plate/cassette is read-only", +} + + +class MultidropCombiCommunicationMixin: + """Mixin providing low-level serial communication for the Multidrop Combi.""" + + io: Optional[Serial] + _command_lock: Optional[asyncio.Lock] + + async def _send_command(self, cmd: str, timeout: float | None = None) -> list[str]: + """Send a command and return the data lines from the response. + + Args: + cmd: Command string (e.g. "DIS", "SPL 1", "SCV 0 500"). + timeout: Per-command read timeout in seconds. If None, uses default. + + Returns: + List of data lines (between echo and END terminator). + + Raises: + MultidropCombiCommunicationError: If not connected or communication fails. + MultidropCombiInstrumentError: If instrument returns non-zero status code. + """ + if self.io is None or self._command_lock is None: + raise MultidropCombiCommunicationError("Not connected to instrument", operation=cmd) + + assert self.io._ser is not None, "Serial port not open. Did you call setup()?" + + cmd_code = cmd.split()[0] + + async with self._command_lock: + original_timeout = self.io._ser.timeout + if timeout is not None: + self.io._ser.timeout = timeout + try: + logger.debug("TX: %r", cmd) + await self.io.write(f"{cmd}\r".encode("ascii")) + + lines: list[str] = [] + while True: + raw = await self.io.readline() + if not raw: + raise MultidropCombiCommunicationError( + f"Timeout reading response for {cmd_code}", operation=cmd + ) + line = raw.decode("ascii", errors="replace").strip() + logger.debug("RX: %r", line) + if not line: + continue + lines.append(line) + + if line.startswith(cmd_code) and " END " in line: + break + + # Parse status from END terminator + end_line = lines[-1] + parts = end_line.split() + status_code = int(parts[-1]) if parts[-1].isdigit() else -1 + + if status_code != STATUS_OK: + desc = ERROR_DESCRIPTIONS.get(status_code, "Unknown error") + logger.error("Command %s failed (status %d). RX lines: %s", cmd_code, status_code, lines) + raise MultidropCombiInstrumentError(status_code, desc) + + # Return data lines: skip echo (first) and END line (last) + # The instrument may echo just the command code or the full command + data_lines = [] + for line in lines[:-1]: + line_upper = line.strip().upper() + if line_upper == cmd.strip().upper() or line_upper == cmd_code.upper(): + continue + data_lines.append(line) + + return data_lines + + except (MultidropCombiCommunicationError, MultidropCombiInstrumentError): + raise + except Exception as e: + raise MultidropCombiCommunicationError( + f"Communication error during {cmd_code}: {e}", + operation=cmd, + original_error=e, + ) from e + finally: + if timeout is not None: + self.io._ser.timeout = original_timeout + + async def _drain_stale_data(self) -> None: + """Drain any stale data from the serial buffer.""" + if self.io is None: + return + + assert self.io._ser is not None + + await self.io.reset_input_buffer() + await self.io.reset_output_buffer() + + original_timeout = self.io._ser.timeout + self.io._ser.timeout = 0.3 + drained = 0 + try: + while True: + stale = await self.io.readline() + if not stale: + break + drained += 1 + logger.debug("Drained stale data: %r", stale) + finally: + self.io._ser.timeout = original_timeout + if drained: + logger.info("Drained %d stale lines from serial buffer", drained) + + async def _enter_remote_mode(self) -> dict: + """Send VER to enter remote control mode and get instrument info. + + Returns: + Dict with keys: instrument_name, firmware_version, serial_number. + """ + try: + lines = await self._send_command("VER", timeout=5.0) + except Exception as first_err: + logger.warning("VER failed (%s), sending EAK and retrying...", first_err) + try: + await self._send_command("EAK", timeout=5.0) + except Exception: + pass + try: + lines = await self._send_command("VER", timeout=5.0) + except Exception as e: + raise MultidropCombiCommunicationError( + f"VER command failed: {e}", operation="VER", original_error=e + ) from e + + info = { + "instrument_name": "Unknown", + "firmware_version": "Unknown", + "serial_number": "Unknown", + } + if lines: + raw = lines[0] + if raw.upper().startswith("VER "): + raw = raw[4:] + parts = raw.split() + if len(parts) > 0: + info["instrument_name"] = parts[0] + if len(parts) > 1: + info["firmware_version"] = parts[1] + if len(parts) > 2: + info["serial_number"] = parts[2] + + return info + + async def _exit_remote_mode(self) -> None: + """Send QIT to exit remote control mode.""" + try: + await self._send_command("QIT", timeout=5.0) + except Exception: + pass diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/demo_multidrop.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/demo_multidrop.py new file mode 100644 index 00000000000..3b7b59d62c6 --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/demo_multidrop.py @@ -0,0 +1,164 @@ +"""Demo script for the Multidrop Combi bulk dispenser. + +Usage: + python demo_multidrop.py COM3 # specify port explicitly (recommended) + python demo_multidrop.py # auto-detect by USB VID/PID (native USB only) +""" + +import asyncio +import sys + +from pylabrobot.bulk_dispensers import BulkDispenser +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi import ( + CassetteType, + DispensingOrder, + MultidropCombiBackend, + MultidropCombiInstrumentError, + plate_to_pla_params, + plate_to_type_index, +) +from pylabrobot.resources.eppendorf.plates import Eppendorf_96_wellplate_250ul_Vb + + +def list_serial_ports(): + """List available serial ports to help the user find the right one.""" + try: + import serial.tools.list_ports + + ports = list(serial.tools.list_ports.comports()) + if not ports: + print(" No serial ports found.") + else: + print(" Available ports:") + for p in ports: + print(f" {p.device} - {p.description} (hwid: {p.hwid})") + except ImportError: + print(" (pyserial not installed, cannot list ports)") + + +async def run_step(name: str, coro): + """Run an async operation with error handling. Returns True on success.""" + try: + await coro + print(f" {name}: OK") + return True + except MultidropCombiInstrumentError as e: + print(f" {name}: INSTRUMENT ERROR (status {e.status_code}): {e.description}") + return False + except Exception as e: + print(f" {name}: ERROR: {type(e).__name__}: {e}") + return False + + +async def main(): + port = sys.argv[1] if len(sys.argv) > 1 else None + + if port is None: + print("No COM port specified. Attempting VID/PID auto-discovery...") + print("(This only works with native USB, not RS232-to-USB adapters)\n") + + # --- Create and connect --- + backend = MultidropCombiBackend(port=port, timeout=30.0) + dispenser = BulkDispenser(backend=backend) + + try: + await dispenser.setup() + except Exception as e: + print(f"Connection failed: {e}\n") + list_serial_ports() + print(f"\nUsage: python {sys.argv[0]} ") + return + + try: + # Connection info + info = backend.get_version() + print( + f"Connected: {info['instrument_name']} " + f"FW {info['firmware_version']} SN {info['serial_number']}" + ) + + # --- Query instrument parameters --- + print("\n--- Instrument Parameters ---") + try: + params = await backend.report_parameters() + for line in params[:10]: + print(f" {line}") + if len(params) > 10: + print(f" ... ({len(params)} lines total)") + except Exception as e: + print(f" REP query failed: {type(e).__name__}: {e}") + + # --- Configure using Eppendorf twin.tec 96-well plate --- + print("\n--- Plate Configuration ---") + plate = Eppendorf_96_wellplate_250ul_Vb("demo_plate") + print(f" Plate: {plate.model}") + print(f" Wells: {plate.num_items} ({plate.num_items_y}x{plate.num_items_x})") + print(f" Height: {plate.get_size_z()} mm") + + # Map to factory type, fall back to PLA remote definition + try: + type_idx = plate_to_type_index(plate) + print(f" Matched factory plate type: {type_idx}") + await run_step("Set plate type (SPL)", dispenser.set_plate_type(plate_type=type_idx)) + except ValueError: + pla_params = plate_to_pla_params(plate) + print(f" No factory match, using remote plate definition: {pla_params}") + await run_step("Define plate (PLA)", backend.define_plate(**pla_params)) + + await run_step( + "Set cassette type (SCT)", dispenser.set_cassette_type(cassette_type=CassetteType.STANDARD) + ) + await run_step( + "Set column volume 10 uL (SCV)", dispenser.set_column_volume(column=0, volume=10.0) + ) + + # Dispensing height must be above the plate. Add 3mm clearance. + dispense_height = round(plate.get_size_z() * 100) + 300 + dispense_height = max(500, min(5500, dispense_height)) # clamp to valid range + print(f" Dispensing height: {dispense_height} (plate {plate.get_size_z()}mm + 3mm clearance)") + await run_step( + f"Set dispensing height {dispense_height} (SDH)", + dispenser.set_dispensing_height(height=dispense_height), + ) + await run_step("Set pump speed 50% (SPS)", dispenser.set_pump_speed(speed=50)) + await run_step( + "Set dispensing order row-wise (SDO)", + dispenser.set_dispensing_order(order=DispensingOrder.ROW_WISE), + ) + + # --- Prime --- + print("\n--- Prime ---") + input(" Press Enter to prime (500 uL)...") + await run_step("Prime 500 uL (PRI)", dispenser.prime(volume=500.0)) + + # --- Dispense --- + print("\n--- Dispense ---") + input(" Press Enter to dispense...") + await run_step("Dispense (DIS)", dispenser.dispense()) + + # --- Shake --- + print("\n--- Shake ---") + input(" Press Enter to shake (3s, 2mm, 10Hz)...") + await run_step("Shake (SHA)", dispenser.shake(time=3.0, distance=2, speed=10)) + + # --- Move plate out --- + print("\n--- Move Plate Out ---") + input(" Press Enter to move plate out...") + await run_step("Move plate out (POU)", dispenser.move_plate_out()) + + # --- Empty --- + print("\n--- Empty ---") + input(" Press Enter to empty hoses (500 uL)...") + await run_step("Empty 500 uL (EMP)", dispenser.empty(volume=500.0)) + + print("\n--- Done! Disconnecting. ---") + + finally: + try: + await dispenser.stop() + except Exception as e: + print(f" Disconnect error: {e}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/enums.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/enums.py new file mode 100644 index 00000000000..af964de8123 --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/enums.py @@ -0,0 +1,25 @@ +import enum + + +class CassetteType(enum.IntEnum): + STANDARD = 0 + SMALL = 1 + USER_DEFINED_1 = 2 + USER_DEFINED_2 = 3 + + +class DispensingOrder(enum.IntEnum): + ROW_WISE = 0 + COLUMN_WISE = 1 + + +class PrimeMode(enum.IntEnum): + STANDARD = 0 + CONTINUOUS = 1 + STOP_CONTINUOUS = 2 + CALIBRATION = 3 + + +class EmptyMode(enum.IntEnum): + STANDARD = 0 + CONTINUOUS = 1 diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/errors.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/errors.py new file mode 100644 index 00000000000..0284adaf58a --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/errors.py @@ -0,0 +1,25 @@ +from __future__ import annotations + + +class MultidropCombiError(Exception): + """Base exception for Multidrop Combi errors.""" + + +class MultidropCombiCommunicationError(MultidropCombiError): + """Serial communication failure (port not found, timeout, connection lost).""" + + def __init__( + self, message: str, operation: str = "", original_error: Exception | None = None + ) -> None: + self.operation = operation + self.original_error = original_error + super().__init__(message) + + +class MultidropCombiInstrumentError(MultidropCombiError): + """Instrument returned a non-zero status code.""" + + def __init__(self, status_code: int, description: str) -> None: + self.status_code = status_code + self.description = description + super().__init__(f"Instrument error (status {status_code}): {description}") diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/helpers.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/helpers.py new file mode 100644 index 00000000000..f310107bbd0 --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/helpers.py @@ -0,0 +1,139 @@ +"""Plate type helpers for the Multidrop Combi. + +Maps PyLabRobot Plate resources to Multidrop Combi plate type indices and +PLA (remote plate definition) command parameters. +""" + +from __future__ import annotations + +from pylabrobot.resources import Plate + +# Multidrop Combi factory plate type definitions (from manual Table 3-3). +# Type index → (well_count, max_plate_height_mm) +# Heights are upper bounds for selecting the best-fit factory type. +_FACTORY_96_WELL_TYPES = [ + # (type_index, max_height_mm) + (0, 18.0), # Type 0: 96-well, 15mm + (1, 30.0), # Type 1: 96-well, 22mm + (2, 55.0), # Type 2: 96-well, 44mm +] + +_FACTORY_384_WELL_TYPES = [ + (3, 8.5), # Type 3: 384-well, 7.5mm + (4, 12.0), # Type 4: 384-well, 10mm + (5, 18.0), # Type 5: 384-well, 15mm + (6, 30.0), # Type 6: 384-well, 22mm + (7, 55.0), # Type 7: 384-well, 44mm +] + +_FACTORY_1536_WELL_TYPES = [ + (8, 7.0), # Type 8: 1536-well, 5mm + (9, 55.0), # Type 9: 1536-well, 10.5mm +] + +# Hardware limits +MAX_COLUMNS = 48 +MAX_ROWS = 32 +MIN_HEIGHT_HUNDREDTHS_MM = 500 # 5mm +MAX_HEIGHT_HUNDREDTHS_MM = 5500 # 55mm +MAX_VOLUME_TENTHS_UL = 25000 # 2500 uL + + +def plate_to_type_index(plate: Plate) -> int: + """Map a PLR Plate to the best-fit Multidrop Combi factory plate type index. + + Selects the factory type based on well count and plate height (size_z). + The smallest factory type whose height threshold accommodates the plate is chosen. + + Args: + plate: A PyLabRobot Plate resource. + + Returns: + Factory plate type index (0-9). + + Raises: + ValueError: If the plate well count is not 96, 384, or 1536, or if the + plate height exceeds all factory type thresholds. + """ + wells = plate.num_items + height_mm = plate.get_size_z() + + if wells == 96: + type_list = _FACTORY_96_WELL_TYPES + elif wells == 384: + type_list = _FACTORY_384_WELL_TYPES + elif wells == 1536: + type_list = _FACTORY_1536_WELL_TYPES + else: + raise ValueError( + f"Unsupported well count: {wells}. " + "Multidrop factory types support 96, 384, or 1536 wells. " + "Use plate_to_pla_params() for custom plate definitions." + ) + + for type_index, max_height in type_list: + if height_mm <= max_height: + return type_index + + raise ValueError( + f"Plate height {height_mm}mm exceeds all factory type thresholds for {wells}-well plates." + ) + + +def plate_to_pla_params(plate: Plate) -> dict: + """Convert a PLR Plate to Multidrop Combi PLA command parameters. + + Use this for plates that don't match factory types (types 0-9), or when you + want precise control over the plate definition sent to the instrument. + The returned dict can be passed directly to ``backend.define_plate(**params)``. + + Args: + plate: A PyLabRobot Plate resource. + + Returns: + Dict with keys matching ``define_plate()`` parameters: + column_positions, row_positions, rows, columns, height, max_volume. + + Raises: + ValueError: If any parameter exceeds Multidrop hardware limits. + """ + columns = plate.num_items_x + rows = plate.num_items_y + height_hundredths = round(plate.get_size_z() * 100) + + # Get max_volume from first well + first_well = plate.get_well("A1") + well_max_volume_tenths = round(first_well.max_volume * 10) + + # Validate against hardware limits + if columns > MAX_COLUMNS: + raise ValueError(f"Plate has {columns} columns, but Multidrop supports at most {MAX_COLUMNS}.") + if rows > MAX_ROWS: + raise ValueError(f"Plate has {rows} rows, but Multidrop supports at most {MAX_ROWS}.") + if height_hundredths < MIN_HEIGHT_HUNDREDTHS_MM: + raise ValueError( + f"Plate height {plate.get_size_z()}mm is below minimum {MIN_HEIGHT_HUNDREDTHS_MM / 100}mm." + ) + if height_hundredths > MAX_HEIGHT_HUNDREDTHS_MM: + raise ValueError( + f"Plate height {plate.get_size_z()}mm exceeds maximum {MAX_HEIGHT_HUNDREDTHS_MM / 100}mm." + ) + if well_max_volume_tenths > MAX_VOLUME_TENTHS_UL: + raise ValueError( + f"Well max volume {first_well.max_volume} uL exceeds Multidrop limit of " + f"{MAX_VOLUME_TENTHS_UL / 10} uL." + ) + + return { + "column_positions": columns, + "row_positions": rows, + "rows": rows, + "columns": columns, + "height": height_hundredths, + "max_volume": well_max_volume_tenths, + } + + +def plate_well_count(plate: Plate) -> int: + """Return the total well count for a plate.""" + return plate.num_items diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/queries.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/queries.py new file mode 100644 index 00000000000..c40ce5d8e6b --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/queries.py @@ -0,0 +1,50 @@ +"""Query operations mixin for the Multidrop Combi.""" + +from __future__ import annotations + + +class MultidropCombiQueriesMixin: + """Mixin providing query operations for the Multidrop Combi.""" + + _instrument_name: str + _firmware_version: str + _serial_number: str + + def get_version(self) -> dict: + """Return cached instrument identification info. + + Returns: + Dict with keys: instrument_name, firmware_version, serial_number. + """ + return { + "instrument_name": self._instrument_name, + "firmware_version": self._firmware_version, + "serial_number": self._serial_number, + } + + async def report_parameters(self) -> list[str]: + """Report instrument parameters (REP command). + + Returns: + List of parameter lines from the instrument. + """ + result: list[str] = await self._send_command("REP", timeout=10.0) # type: ignore[attr-defined] + return result + + async def read_error_log(self) -> list[str]: + """Read the instrument error log (LOG command). + + Returns: + List of error log lines. + """ + result: list[str] = await self._send_command("LOG", timeout=10.0) # type: ignore[attr-defined] + return result + + async def read_cassette_info(self) -> list[str]: + """Read RFID cassette info (RIR command). + + Returns: + List of cassette info lines. + """ + result: list[str] = await self._send_command("RIR", timeout=5.0) # type: ignore[attr-defined] + return result diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/__init__.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/backend_tests.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/backend_tests.py new file mode 100644 index 00000000000..b9063ae1293 --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/backend_tests.py @@ -0,0 +1,63 @@ +import unittest +from unittest.mock import AsyncMock, MagicMock, patch + +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.backend import ( + MultidropCombiBackend, +) + + +class BackendSerializationTests(unittest.TestCase): + def test_serialize(self): + backend = MultidropCombiBackend(port="COM3", timeout=15.0) + data = backend.serialize() + self.assertEqual(data["type"], "MultidropCombiBackend") + self.assertEqual(data["port"], "COM3") + self.assertEqual(data["timeout"], 15.0) + + def test_serialize_defaults(self): + backend = MultidropCombiBackend() + data = backend.serialize() + self.assertIsNone(data["port"]) + self.assertEqual(data["timeout"], 30.0) + + +class BackendLifecycleTests(unittest.IsolatedAsyncioTestCase): + @patch("pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.backend.Serial") + async def test_setup_and_stop(self, MockSerial): + mock_serial = MagicMock() + mock_serial.setup = AsyncMock() + mock_serial.stop = AsyncMock() + mock_serial.write = AsyncMock() + mock_serial.readline = AsyncMock() + mock_serial.reset_input_buffer = AsyncMock() + mock_serial.reset_output_buffer = AsyncMock() + mock_serial._ser = MagicMock() + mock_serial._ser.timeout = 30.0 + MockSerial.return_value = mock_serial + + # Setup readline responses: drain (empty), VER, EAK + mock_serial.readline.side_effect = [ + b"", # drain - empty + b"VER\r\n", # VER echo + b"MultidropCombi 2.00.29 836-4191\r\n", # VER data + b"VER END 0\r\n", # VER end + b"EAK\r\n", # EAK echo + b"EAK END 0\r\n", # EAK end + ] + + backend = MultidropCombiBackend(port="COM3") + await backend.setup() + + self.assertEqual(backend._instrument_name, "MultidropCombi") + self.assertEqual(backend._firmware_version, "2.00.29") + self.assertEqual(backend._serial_number, "836-4191") + self.assertIsNotNone(backend.io) + + # Reset readline for QIT during stop + mock_serial.readline.side_effect = [ + b"QIT\r\n", + b"QIT END 0\r\n", + ] + await backend.stop() + self.assertIsNone(backend.io) + mock_serial.stop.assert_awaited_once() diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/commands_tests.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/commands_tests.py new file mode 100644 index 00000000000..f029ba5e2a1 --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/commands_tests.py @@ -0,0 +1,168 @@ +import unittest +from unittest.mock import AsyncMock + +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.commands import ( + MultidropCombiCommandsMixin, + _ul_to_tenths, +) +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.enums import PrimeMode + + +class MockCommandsBackend(MultidropCombiCommandsMixin): + """Testable class with _send_command mocked.""" + + def __init__(self): + self._send_command = AsyncMock(return_value=[]) + + +class VolumeConversionTests(unittest.TestCase): + def test_ul_to_tenths(self): + self.assertEqual(_ul_to_tenths(1.0), 10) + self.assertEqual(_ul_to_tenths(50.0), 500) + self.assertEqual(_ul_to_tenths(0.1), 1) + self.assertEqual(_ul_to_tenths(10000.0), 100000) + + def test_ul_to_tenths_rounding(self): + self.assertEqual(_ul_to_tenths(1.06), 11) + self.assertEqual(_ul_to_tenths(1.04), 10) + + +class CommandFormattingTests(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self.backend = MockCommandsBackend() + + async def test_dispense(self): + await self.backend.dispense() + self.backend._send_command.assert_awaited_once() + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "DIS") + + async def test_prime_standard(self): + await self.backend.prime(volume=50.0) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "PRI 500") + + async def test_prime_continuous(self): + await self.backend.prime(volume=50.0, mode=PrimeMode.CONTINUOUS) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "PRI 500 1") + + async def test_empty(self): + await self.backend.empty(volume=100.0) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "EMP 1000") + + async def test_shake(self): + await self.backend.shake(time=5.0, distance=3, speed=10) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "SHA 500 3 10") + + async def test_move_plate_out(self): + await self.backend.move_plate_out() + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "POU") + + async def test_set_plate_type(self): + await self.backend.set_plate_type(plate_type=3) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "SPL 3") + + async def test_set_cassette_type(self): + await self.backend.set_cassette_type(cassette_type=1) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "SCT 1") + + async def test_set_column_volume(self): + await self.backend.set_column_volume(column=0, volume=25.0) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "SCV 0 250") + + async def test_set_dispensing_height(self): + await self.backend.set_dispensing_height(height=2500) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "SDH 2500") + + async def test_set_pump_speed(self): + await self.backend.set_pump_speed(speed=50) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "SPS 50") + + async def test_set_dispensing_order(self): + await self.backend.set_dispensing_order(order=1) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "SDO 1") + + async def test_set_dispense_offset(self): + await self.backend.set_dispense_offset(x_offset=100, y_offset=-50) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "SOF 100 -50") + + async def test_set_predispense_volume(self): + await self.backend.set_predispense_volume(volume=10.0) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "SPV 100") + + +class ParameterValidationTests(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self.backend = MockCommandsBackend() + + async def test_prime_volume_too_low(self): + with self.assertRaises(ValueError): + await self.backend.prime(volume=0.0) + + async def test_prime_volume_too_high(self): + with self.assertRaises(ValueError): + await self.backend.prime(volume=20000.0) + + async def test_empty_volume_too_low(self): + with self.assertRaises(ValueError): + await self.backend.empty(volume=0.0) + + async def test_shake_distance_out_of_range(self): + with self.assertRaises(ValueError): + await self.backend.shake(time=5.0, distance=0, speed=10) + with self.assertRaises(ValueError): + await self.backend.shake(time=5.0, distance=6, speed=10) + + async def test_shake_speed_out_of_range(self): + with self.assertRaises(ValueError): + await self.backend.shake(time=5.0, distance=3, speed=0) + with self.assertRaises(ValueError): + await self.backend.shake(time=5.0, distance=3, speed=21) + + async def test_plate_type_out_of_range(self): + with self.assertRaises(ValueError): + await self.backend.set_plate_type(plate_type=-1) + with self.assertRaises(ValueError): + await self.backend.set_plate_type(plate_type=30) + + async def test_cassette_type_out_of_range(self): + with self.assertRaises(ValueError): + await self.backend.set_cassette_type(cassette_type=4) + + async def test_column_out_of_range(self): + with self.assertRaises(ValueError): + await self.backend.set_column_volume(column=49, volume=10.0) + + async def test_dispensing_height_out_of_range(self): + with self.assertRaises(ValueError): + await self.backend.set_dispensing_height(height=499) + with self.assertRaises(ValueError): + await self.backend.set_dispensing_height(height=5501) + + async def test_pump_speed_out_of_range(self): + with self.assertRaises(ValueError): + await self.backend.set_pump_speed(speed=0) + with self.assertRaises(ValueError): + await self.backend.set_pump_speed(speed=101) + + async def test_dispensing_order_invalid(self): + with self.assertRaises(ValueError): + await self.backend.set_dispensing_order(order=2) + + async def test_dispense_offset_out_of_range(self): + with self.assertRaises(ValueError): + await self.backend.set_dispense_offset(x_offset=301, y_offset=0) + with self.assertRaises(ValueError): + await self.backend.set_dispense_offset(x_offset=0, y_offset=-301) diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py new file mode 100644 index 00000000000..19e83cdae2f --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py @@ -0,0 +1,162 @@ +import asyncio +import unittest +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.communication import ( + MultidropCombiCommunicationMixin, +) +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.errors import ( + MultidropCombiCommunicationError, + MultidropCombiInstrumentError, +) + + +class MockCommunicationBackend(MultidropCombiCommunicationMixin): + """Testable class that uses the communication mixin with a mock Serial.""" + + def __init__(self) -> None: + self.io: Any = MagicMock() + self.io._ser = MagicMock() + self.io._ser.timeout = 30.0 + self._command_lock = asyncio.Lock() + + # Make io.write and io.readline async + self.io.write = AsyncMock() + self.io.readline = AsyncMock() + self.io.reset_input_buffer = AsyncMock() + self.io.reset_output_buffer = AsyncMock() + + +class SendCommandTests(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self) -> None: + self.backend = MockCommunicationBackend() + + async def test_simple_command(self) -> None: + """Test a simple command with echo + END response.""" + self.backend.io.readline.side_effect = [ + b"SPL\r\n", # echo + b"SPL END 0\r\n", # end with status 0 + ] + result = await self.backend._send_command("SPL 1") + self.assertEqual(result, []) + self.backend.io.write.assert_awaited_once_with(b"SPL 1\r") + + async def test_command_with_data_lines(self) -> None: + """Test a command that returns data lines between echo and END.""" + self.backend.io.readline.side_effect = [ + b"VER\r\n", + b"MultidropCombi 2.00.29 836-4191\r\n", + b"VER END 0\r\n", + ] + result = await self.backend._send_command("VER") + self.assertEqual(result, ["MultidropCombi 2.00.29 836-4191"]) + + async def test_command_with_error_status(self) -> None: + """Test that non-zero status raises MultidropCombiInstrumentError.""" + self.backend.io.readline.side_effect = [ + b"SPL\r\n", + b"SPL END 18\r\n", # status 18 = Invalid plate type + ] + with self.assertRaises(MultidropCombiInstrumentError) as ctx: + await self.backend._send_command("SPL 99") + self.assertEqual(ctx.exception.status_code, 18) + self.assertIn("Invalid plate type", ctx.exception.description) + + async def test_timeout_raises_communication_error(self) -> None: + """Test that timeout (empty readline) raises MultidropCombiCommunicationError.""" + self.backend.io.readline.side_effect = [b""] + with self.assertRaises(MultidropCombiCommunicationError): + await self.backend._send_command("SPL 1") + + async def test_not_connected(self) -> None: + """Test that sending a command when io is None raises error.""" + self.backend.io = None + with self.assertRaises(MultidropCombiCommunicationError): + await self.backend._send_command("VER") + + async def test_custom_timeout(self) -> None: + """Test that custom timeout is set and restored.""" + self.backend.io.readline.side_effect = [ + b"POU\r\n", + b"POU END 0\r\n", + ] + original = self.backend.io._ser.timeout + await self.backend._send_command("POU", timeout=10.0) + # Timeout should be restored after command + self.assertEqual(self.backend.io._ser.timeout, original) + + async def test_echo_skipping_case_insensitive(self) -> None: + """Test that echo is skipped regardless of case.""" + self.backend.io.readline.side_effect = [ + b"ver\r\n", # lowercase echo + b"MultidropCombi 2.00.29 836-4191\r\n", + b"VER END 0\r\n", + ] + result = await self.backend._send_command("VER") + self.assertEqual(result, ["MultidropCombi 2.00.29 836-4191"]) + + +class EnterRemoteModeTests(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self) -> None: + self.backend = MockCommunicationBackend() + + async def test_enter_remote_mode_success(self) -> None: + """Test successful VER command parses instrument info.""" + self.backend.io.readline.side_effect = [ + b"VER\r\n", + b"MultidropCombi 2.00.29 836-4191\r\n", + b"VER END 0\r\n", + ] + info = await self.backend._enter_remote_mode() + self.assertEqual(info["instrument_name"], "MultidropCombi") + self.assertEqual(info["firmware_version"], "2.00.29") + self.assertEqual(info["serial_number"], "836-4191") + + async def test_enter_remote_mode_retry_after_eak(self) -> None: + """Test VER retry after EAK when first VER fails.""" + call_count = 0 + + async def readline_side_effect() -> bytes: + nonlocal call_count + call_count += 1 + responses = [ + # First VER attempt - fails with error + b"VER\r\n", + b"VER END 1\r\n", + # EAK attempt + b"EAK\r\n", + b"EAK END 0\r\n", + # Second VER attempt - succeeds + b"VER\r\n", + b"MultidropCombi 2.00.29 836-4191\r\n", + b"VER END 0\r\n", + ] + if call_count <= len(responses): + return responses[call_count - 1] + return b"" + + self.backend.io.readline.side_effect = readline_side_effect + info = await self.backend._enter_remote_mode() + self.assertEqual(info["instrument_name"], "MultidropCombi") + + +class DrainStaleDataTests(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self) -> None: + self.backend = MockCommunicationBackend() + + async def test_drain_with_stale_data(self) -> None: + """Test draining stale data from buffer.""" + self.backend.io.readline.side_effect = [ + b"stale line 1\r\n", + b"stale line 2\r\n", + b"", # No more data + ] + await self.backend._drain_stale_data() + self.backend.io.reset_input_buffer.assert_awaited_once() + self.backend.io.reset_output_buffer.assert_awaited_once() + + async def test_drain_empty_buffer(self) -> None: + """Test draining when buffer is already empty.""" + self.backend.io.readline.side_effect = [b""] + await self.backend._drain_stale_data() diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/helpers_tests.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/helpers_tests.py new file mode 100644 index 00000000000..0aeb2cd37dd --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/helpers_tests.py @@ -0,0 +1,209 @@ +import unittest + +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.helpers import ( + plate_to_pla_params, + plate_to_type_index, + plate_well_count, +) +from pylabrobot.resources import Plate, Well, create_ordered_items_2d +from pylabrobot.resources.well import CrossSectionType, WellBottomType + + +def _make_plate( + num_items_x: int = 12, + num_items_y: int = 8, + size_z: float = 14.2, + well_max_volume: float = 360.0, + well_size_z: float = 10.67, +) -> Plate: + """Create a test plate with the given parameters.""" + return Plate( + name="test_plate", + size_x=127.76, + size_y=85.48, + size_z=size_z, + model="test", + ordered_items=create_ordered_items_2d( + Well, + num_items_x=num_items_x, + num_items_y=num_items_y, + dx=10.0, + dy=7.0, + dz=1.0, + item_dx=9.0, + item_dy=9.0, + size_x=6.0, + size_y=6.0, + size_z=well_size_z, + bottom_type=WellBottomType.FLAT, + cross_section_type=CrossSectionType.CIRCLE, + max_volume=well_max_volume, + ), + ) + + +class PlateToTypeIndexTests(unittest.TestCase): + """Test factory plate type mapping.""" + + def test_96_well_short(self): + plate = _make_plate(num_items_x=12, num_items_y=8, size_z=14.0) + self.assertEqual(plate_to_type_index(plate), 0) # 15mm type + + def test_96_well_medium(self): + plate = _make_plate(num_items_x=12, num_items_y=8, size_z=20.0) + self.assertEqual(plate_to_type_index(plate), 1) # 22mm type + + def test_96_well_tall(self): + plate = _make_plate(num_items_x=12, num_items_y=8, size_z=40.0) + self.assertEqual(plate_to_type_index(plate), 2) # 44mm type + + def test_384_well_very_short(self): + plate = _make_plate(num_items_x=24, num_items_y=16, size_z=7.0) + self.assertEqual(plate_to_type_index(plate), 3) # 7.5mm type + + def test_384_well_short(self): + plate = _make_plate(num_items_x=24, num_items_y=16, size_z=10.0) + self.assertEqual(plate_to_type_index(plate), 4) # 10mm type + + def test_384_well_medium(self): + plate = _make_plate(num_items_x=24, num_items_y=16, size_z=14.0) + self.assertEqual(plate_to_type_index(plate), 5) # 15mm type + + def test_384_well_tall(self): + plate = _make_plate(num_items_x=24, num_items_y=16, size_z=25.0) + self.assertEqual(plate_to_type_index(plate), 6) # 22mm type + + def test_384_well_very_tall(self): + plate = _make_plate(num_items_x=24, num_items_y=16, size_z=44.0) + self.assertEqual(plate_to_type_index(plate), 7) # 44mm type + + def test_1536_well_short(self): + plate = _make_plate(num_items_x=48, num_items_y=32, size_z=5.0) + self.assertEqual(plate_to_type_index(plate), 8) # 5mm type + + def test_1536_well_tall(self): + plate = _make_plate(num_items_x=48, num_items_y=32, size_z=10.0) + self.assertEqual(plate_to_type_index(plate), 9) # 10.5mm type + + def test_unsupported_well_count(self): + plate = _make_plate(num_items_x=6, num_items_y=4) # 24-well + with self.assertRaises(ValueError) as ctx: + plate_to_type_index(plate) + self.assertIn("24", str(ctx.exception)) + + def test_96_well_too_tall(self): + plate = _make_plate(num_items_x=12, num_items_y=8, size_z=60.0) + with self.assertRaises(ValueError): + plate_to_type_index(plate) + + +class PlateToTypeIndexRealPlatesTests(unittest.TestCase): + """Test with real PLR plate definitions.""" + + def test_corning_96_well(self): + from pylabrobot.resources.corning.plates import Cor_96_wellplate_360ul_Fb + + plate = Cor_96_wellplate_360ul_Fb("test") + self.assertEqual(plate_to_type_index(plate), 0) # 14.2mm → type 0 + + def test_biorad_384_well(self): + from pylabrobot.resources.biorad.plates import BioRad_384_wellplate_50uL_Vb + + plate = BioRad_384_wellplate_50uL_Vb("test") + self.assertEqual(plate_to_type_index(plate), 4) # 10.4mm → type 4 + + +class PlateToPlaParamsTests(unittest.TestCase): + """Test PLA command parameter generation.""" + + def test_96_well_params(self): + plate = _make_plate(num_items_x=12, num_items_y=8, size_z=14.2, well_max_volume=360.0) + params = plate_to_pla_params(plate) + self.assertEqual(params["columns"], 12) + self.assertEqual(params["rows"], 8) + self.assertEqual(params["column_positions"], 12) + self.assertEqual(params["row_positions"], 8) + self.assertEqual(params["height"], 1420) # 14.2mm * 100 + self.assertEqual(params["max_volume"], 3600) # 360uL * 10 + + def test_384_well_params(self): + plate = _make_plate(num_items_x=24, num_items_y=16, size_z=10.4, well_max_volume=50.0) + params = plate_to_pla_params(plate) + self.assertEqual(params["columns"], 24) + self.assertEqual(params["rows"], 16) + self.assertEqual(params["height"], 1040) + self.assertEqual(params["max_volume"], 500) + + def test_real_corning_96_well(self): + from pylabrobot.resources.corning.plates import Cor_96_wellplate_360ul_Fb + + plate = Cor_96_wellplate_360ul_Fb("test") + params = plate_to_pla_params(plate) + self.assertEqual(params["columns"], 12) + self.assertEqual(params["rows"], 8) + self.assertEqual(params["height"], 1420) + self.assertEqual(params["max_volume"], 3600) + + +class PlaParamsValidationTests(unittest.TestCase): + """Test parameter validation in plate_to_pla_params.""" + + def test_too_many_columns(self): + plate = _make_plate(num_items_x=49, num_items_y=8, size_z=14.0) + with self.assertRaises(ValueError) as ctx: + plate_to_pla_params(plate) + self.assertIn("49 columns", str(ctx.exception)) + self.assertIn("48", str(ctx.exception)) + + def test_too_many_rows(self): + plate = _make_plate(num_items_x=12, num_items_y=33, size_z=14.0) + with self.assertRaises(ValueError) as ctx: + plate_to_pla_params(plate) + self.assertIn("33 rows", str(ctx.exception)) + self.assertIn("32", str(ctx.exception)) + + def test_height_too_low(self): + plate = _make_plate(size_z=4.0) # 4mm < 5mm minimum + with self.assertRaises(ValueError) as ctx: + plate_to_pla_params(plate) + self.assertIn("4.0mm", str(ctx.exception)) + self.assertIn("minimum", str(ctx.exception)) + + def test_height_too_high(self): + plate = _make_plate(size_z=60.0) # 60mm > 55mm maximum + with self.assertRaises(ValueError) as ctx: + plate_to_pla_params(plate) + self.assertIn("60.0mm", str(ctx.exception)) + self.assertIn("maximum", str(ctx.exception)) + + def test_well_volume_too_high(self): + plate = _make_plate(well_max_volume=3000.0) # 3000uL > 2500uL max + with self.assertRaises(ValueError) as ctx: + plate_to_pla_params(plate) + self.assertIn("3000", str(ctx.exception)) + self.assertIn("2500", str(ctx.exception)) + + def test_height_at_minimum_boundary(self): + plate = _make_plate(size_z=5.0) # exactly 5mm = 500 hundredths + params = plate_to_pla_params(plate) + self.assertEqual(params["height"], 500) + + def test_height_at_maximum_boundary(self): + plate = _make_plate(size_z=55.0) # exactly 55mm = 5500 hundredths + params = plate_to_pla_params(plate) + self.assertEqual(params["height"], 5500) + + def test_volume_at_maximum_boundary(self): + plate = _make_plate(well_max_volume=2500.0) # exactly 2500uL + params = plate_to_pla_params(plate) + self.assertEqual(params["max_volume"], 25000) + + +class PlateWellCountTests(unittest.TestCase): + def test_96_well(self): + plate = _make_plate(num_items_x=12, num_items_y=8) + self.assertEqual(plate_well_count(plate), 96) + + def test_384_well(self): + plate = _make_plate(num_items_x=24, num_items_y=16) + self.assertEqual(plate_well_count(plate), 384)