From a5f805fca6c499f3bdf6277261f10b212321b7cd Mon Sep 17 00:00:00 2001 From: Robert-Keyser-Calico Date: Wed, 25 Mar 2026 09:48:02 -0700 Subject: [PATCH 1/6] Add bulk_dispensers module with Multidrop Combi backend New device category for bulk reagent dispensers, following the existing pylabrobot patterns (mixin composition, abstract backend, frontend with @need_setup_finished guards). - Abstract BulkDispenserBackend with dispense, prime, empty, shake, etc. - MultidropCombiBackend with full serial protocol support (RS232/USB) - Plate helpers to map PLR Plate resources to Multidrop plate types - Chatterbox backend for testing without hardware - 84 tests covering communication, commands, validation, and helpers - Demo script for real-device testing Co-Authored-By: Claude Opus 4.6 (1M context) --- demo_multidrop.py | 155 +++++++++++ pylabrobot/bulk_dispensers/__init__.py | 3 + pylabrobot/bulk_dispensers/backend.py | 84 ++++++ pylabrobot/bulk_dispensers/bulk_dispenser.py | 60 +++++ pylabrobot/bulk_dispensers/chatterbox.py | 47 ++++ pylabrobot/bulk_dispensers/tests/__init__.py | 0 .../tests/bulk_dispenser_tests.py | 91 +++++++ .../thermo_scientific/__init__.py | 13 + .../multidrop_combi/__init__.py | 19 ++ .../multidrop_combi/actions.py | 29 ++ .../multidrop_combi/backend.py | 115 ++++++++ .../multidrop_combi/commands.py | 249 ++++++++++++++++++ .../multidrop_combi/communication.py | 223 ++++++++++++++++ .../multidrop_combi/enums.py | 25 ++ .../multidrop_combi/errors.py | 21 ++ .../multidrop_combi/helpers.py | 144 ++++++++++ .../multidrop_combi/queries.py | 46 ++++ .../multidrop_combi/tests/__init__.py | 0 .../multidrop_combi/tests/backend_tests.py | 64 +++++ .../multidrop_combi/tests/commands_tests.py | 169 ++++++++++++ .../tests/communication_tests.py | 161 +++++++++++ .../multidrop_combi/tests/helpers_tests.py | 210 +++++++++++++++ 22 files changed, 1928 insertions(+) create mode 100644 demo_multidrop.py create mode 100644 pylabrobot/bulk_dispensers/__init__.py create mode 100644 pylabrobot/bulk_dispensers/backend.py create mode 100644 pylabrobot/bulk_dispensers/bulk_dispenser.py create mode 100644 pylabrobot/bulk_dispensers/chatterbox.py create mode 100644 pylabrobot/bulk_dispensers/tests/__init__.py create mode 100644 pylabrobot/bulk_dispensers/tests/bulk_dispenser_tests.py create mode 100644 pylabrobot/bulk_dispensers/thermo_scientific/__init__.py create mode 100644 pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/__init__.py create mode 100644 pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/actions.py create mode 100644 pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/backend.py create mode 100644 pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/commands.py create mode 100644 pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/communication.py create mode 100644 pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/enums.py create mode 100644 pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/errors.py create mode 100644 pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/helpers.py create mode 100644 pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/queries.py create mode 100644 pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/__init__.py create mode 100644 pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/backend_tests.py create mode 100644 pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/commands_tests.py create mode 100644 pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py create mode 100644 pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/helpers_tests.py diff --git a/demo_multidrop.py b/demo_multidrop.py new file mode 100644 index 00000000000..28a3abc72a3 --- /dev/null +++ b/demo_multidrop.py @@ -0,0 +1,155 @@ +"""Demo script for the Multidrop Combi bulk dispenser. + +Usage: + python demo_multidrop.py COM3 # specify port explicitly (recommended) + python demo_multidrop.py # auto-detect by USB VID/PID (native USB only) +""" + +import asyncio +import sys + +from pylabrobot.bulk_dispensers import BulkDispenser +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi import ( + CassetteType, + DispensingOrder, + MultidropCombiBackend, + MultidropCombiInstrumentError, + plate_to_pla_params, + plate_to_type_index, +) +from pylabrobot.resources.eppendorf.plates import Eppendorf_96_wellplate_250ul_Vb + + +def list_serial_ports(): + """List available serial ports to help the user find the right one.""" + try: + import serial.tools.list_ports + ports = list(serial.tools.list_ports.comports()) + if not ports: + print(" No serial ports found.") + else: + print(" Available ports:") + for p in ports: + print(f" {p.device} - {p.description} (hwid: {p.hwid})") + except ImportError: + print(" (pyserial not installed, cannot list ports)") + + +async def run_step(name: str, coro): + """Run an async operation with error handling. Returns True on success.""" + try: + await coro + print(f" {name}: OK") + return True + except MultidropCombiInstrumentError as e: + print(f" {name}: INSTRUMENT ERROR (status {e.status_code}): {e.description}") + return False + except Exception as e: + print(f" {name}: ERROR: {type(e).__name__}: {e}") + return False + + +async def main(): + port = sys.argv[1] if len(sys.argv) > 1 else None + + if port is None: + print("No COM port specified. Attempting VID/PID auto-discovery...") + print("(This only works with native USB, not RS232-to-USB adapters)\n") + + # --- Create and connect --- + backend = MultidropCombiBackend(port=port, timeout=30.0) + dispenser = BulkDispenser(backend=backend) + + try: + await dispenser.setup() + except Exception as e: + print(f"Connection failed: {e}\n") + list_serial_ports() + print(f"\nUsage: python {sys.argv[0]} ") + return + + try: + # Connection info + info = backend.get_version() + print(f"Connected: {info['instrument_name']} " + f"FW {info['firmware_version']} SN {info['serial_number']}") + + # --- Query instrument parameters --- + print("\n--- Instrument Parameters ---") + try: + params = await backend.report_parameters() + for line in params[:10]: + print(f" {line}") + if len(params) > 10: + print(f" ... ({len(params)} lines total)") + except Exception as e: + print(f" REP query failed: {type(e).__name__}: {e}") + + # --- Configure using Eppendorf twin.tec 96-well plate --- + print("\n--- Plate Configuration ---") + plate = Eppendorf_96_wellplate_250ul_Vb("demo_plate") + print(f" Plate: {plate.model}") + print(f" Wells: {plate.num_items} ({plate.num_items_y}x{plate.num_items_x})") + print(f" Height: {plate.get_size_z()} mm") + + # Map to factory type, fall back to PLA remote definition + try: + type_idx = plate_to_type_index(plate) + print(f" Matched factory plate type: {type_idx}") + await run_step("Set plate type (SPL)", dispenser.set_plate_type(plate_type=type_idx)) + except ValueError: + pla_params = plate_to_pla_params(plate) + print(f" No factory match, using remote plate definition: {pla_params}") + await run_step("Define plate (PLA)", backend.define_plate(**pla_params)) + + await run_step("Set cassette type (SCT)", dispenser.set_cassette_type( + cassette_type=CassetteType.STANDARD)) + await run_step("Set column volume 10 uL (SCV)", dispenser.set_column_volume( + column=0, volume=10.0)) + + # Dispensing height must be above the plate. Add 3mm clearance. + dispense_height = round(plate.get_size_z() * 100) + 300 + dispense_height = max(500, min(5500, dispense_height)) # clamp to valid range + print(f" Dispensing height: {dispense_height} (plate {plate.get_size_z()}mm + 3mm clearance)") + await run_step(f"Set dispensing height {dispense_height} (SDH)", + dispenser.set_dispensing_height(height=dispense_height)) + await run_step("Set pump speed 50% (SPS)", dispenser.set_pump_speed(speed=50)) + await run_step("Set dispensing order row-wise (SDO)", dispenser.set_dispensing_order( + order=DispensingOrder.ROW_WISE)) + + # --- Prime --- + print("\n--- Prime ---") + input(" Press Enter to prime (500 uL)...") + await run_step("Prime 500 uL (PRI)", dispenser.prime(volume=500.0)) + + # --- Dispense --- + print("\n--- Dispense ---") + input(" Press Enter to dispense...") + await run_step("Dispense (DIS)", dispenser.dispense()) + + # --- Shake --- + print("\n--- Shake ---") + input(" Press Enter to shake (3s, 2mm, 10Hz)...") + await run_step("Shake (SHA)", dispenser.shake(time=3.0, distance=2, speed=10)) + + # --- Move plate out --- + print("\n--- Move Plate Out ---") + input(" Press Enter to move plate out...") + await run_step("Move plate out (POU)", dispenser.move_plate_out()) + + # --- Empty --- + print("\n--- Empty ---") + input(" Press Enter to empty hoses (500 uL)...") + await run_step("Empty 500 uL (EMP)", dispenser.empty(volume=500.0)) + + print("\n--- Done! Disconnecting. ---") + + finally: + try: + await dispenser.stop() + except Exception as e: + print(f" Disconnect error: {e}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pylabrobot/bulk_dispensers/__init__.py b/pylabrobot/bulk_dispensers/__init__.py new file mode 100644 index 00000000000..e224ad00513 --- /dev/null +++ b/pylabrobot/bulk_dispensers/__init__.py @@ -0,0 +1,3 @@ +from pylabrobot.bulk_dispensers.backend import BulkDispenserBackend +from pylabrobot.bulk_dispensers.bulk_dispenser import BulkDispenser +from pylabrobot.bulk_dispensers.chatterbox import BulkDispenserChatterboxBackend diff --git a/pylabrobot/bulk_dispensers/backend.py b/pylabrobot/bulk_dispensers/backend.py new file mode 100644 index 00000000000..a7c7abfeaf2 --- /dev/null +++ b/pylabrobot/bulk_dispensers/backend.py @@ -0,0 +1,84 @@ +from __future__ import annotations + +from abc import ABCMeta, abstractmethod + +from pylabrobot.machines.backend import MachineBackend + + +class BulkDispenserBackend(MachineBackend, metaclass=ABCMeta): + """Abstract class for bulk dispenser backends. + + Volumes are specified in microliters (float). Concrete backends are responsible + for converting to instrument-specific units. + """ + + @abstractmethod + async def dispense(self) -> None: + pass + + @abstractmethod + async def prime(self, volume: float) -> None: + pass + + @abstractmethod + async def empty(self, volume: float) -> None: + pass + + @abstractmethod + async def shake(self, time: float, distance: int, speed: int) -> None: + """Shake the plate. + + Args: + time: Shake duration in seconds. + distance: Shake distance in mm (1-5). + speed: Shake frequency in Hz (1-20). + """ + + @abstractmethod + async def move_plate_out(self) -> None: + pass + + @abstractmethod + async def set_plate_type(self, plate_type: int) -> None: + pass + + @abstractmethod + async def set_cassette_type(self, cassette_type: int) -> None: + pass + + @abstractmethod + async def set_column_volume(self, column: int, volume: float) -> None: + """Set dispense volume for a column. + + Args: + column: Column number (0 = all columns). + volume: Volume in microliters. + """ + + @abstractmethod + async def set_dispensing_height(self, height: int) -> None: + """Set dispensing height. + + Args: + height: Height in 1/100 mm (500-5500). + """ + + @abstractmethod + async def set_pump_speed(self, speed: int) -> None: + """Set pump speed as percentage of cassette range. + + Args: + speed: Speed percentage (1-100). + """ + + @abstractmethod + async def set_dispensing_order(self, order: int) -> None: + """Set dispensing order. + + Args: + order: 0 = row-wise, 1 = column-wise. + """ + + @abstractmethod + async def abort(self) -> None: + pass diff --git a/pylabrobot/bulk_dispensers/bulk_dispenser.py b/pylabrobot/bulk_dispensers/bulk_dispenser.py new file mode 100644 index 00000000000..e4783df925b --- /dev/null +++ b/pylabrobot/bulk_dispensers/bulk_dispenser.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +from pylabrobot.bulk_dispensers.backend import BulkDispenserBackend +from pylabrobot.machines.machine import Machine, need_setup_finished + + +class BulkDispenser(Machine): + """Frontend for bulk reagent dispensers.""" + + def __init__(self, backend: BulkDispenserBackend) -> None: + super().__init__(backend=backend) + self.backend: BulkDispenserBackend = backend + + @need_setup_finished + async def dispense(self, **backend_kwargs) -> None: + await self.backend.dispense(**backend_kwargs) + + @need_setup_finished + async def prime(self, volume: float, **backend_kwargs) -> None: + await self.backend.prime(volume=volume, **backend_kwargs) + + @need_setup_finished + async def empty(self, volume: float, **backend_kwargs) -> None: + await self.backend.empty(volume=volume, **backend_kwargs) + + @need_setup_finished + async def shake(self, time: float, distance: int, speed: int, **backend_kwargs) -> None: + await self.backend.shake(time=time, distance=distance, speed=speed, **backend_kwargs) + + @need_setup_finished + async def move_plate_out(self, **backend_kwargs) -> None: + await self.backend.move_plate_out(**backend_kwargs) + + @need_setup_finished + async def set_plate_type(self, plate_type: int, **backend_kwargs) -> None: + await self.backend.set_plate_type(plate_type=plate_type, **backend_kwargs) + + @need_setup_finished + async def set_cassette_type(self, cassette_type: int, **backend_kwargs) -> None: + await self.backend.set_cassette_type(cassette_type=cassette_type, **backend_kwargs) + + @need_setup_finished + async def set_column_volume(self, column: int, volume: float, **backend_kwargs) -> None: + await self.backend.set_column_volume(column=column, volume=volume, **backend_kwargs) + + @need_setup_finished + async def set_dispensing_height(self, height: int, **backend_kwargs) -> None: + await self.backend.set_dispensing_height(height=height, **backend_kwargs) + + @need_setup_finished + async def set_pump_speed(self, speed: int, **backend_kwargs) -> None: + await self.backend.set_pump_speed(speed=speed, **backend_kwargs) + + @need_setup_finished + async def set_dispensing_order(self, order: int, **backend_kwargs) -> None: + await self.backend.set_dispensing_order(order=order, **backend_kwargs) + + @need_setup_finished + async def abort(self, **backend_kwargs) -> None: + await self.backend.abort(**backend_kwargs) diff --git a/pylabrobot/bulk_dispensers/chatterbox.py b/pylabrobot/bulk_dispensers/chatterbox.py new file mode 100644 index 00000000000..eddf02b68c8 --- /dev/null +++ b/pylabrobot/bulk_dispensers/chatterbox.py @@ -0,0 +1,47 @@ +from pylabrobot.bulk_dispensers.backend import BulkDispenserBackend + + +class BulkDispenserChatterboxBackend(BulkDispenserBackend): + """A backend that prints operations for testing without hardware.""" + + async def setup(self) -> None: + print("Setting up bulk dispenser.") + + async def stop(self) -> None: + print("Stopping bulk dispenser.") + + async def dispense(self) -> None: + print("Dispensing.") + + async def prime(self, volume: float) -> None: + print(f"Priming with {volume} uL.") + + async def empty(self, volume: float) -> None: + print(f"Emptying with {volume} uL.") + + async def shake(self, time: float, distance: int, speed: int) -> None: + print(f"Shaking for {time}s, distance={distance}mm, speed={speed}Hz.") + + async def move_plate_out(self) -> None: + print("Moving plate out.") + + async def set_plate_type(self, plate_type: int) -> None: + print(f"Setting plate type to {plate_type}.") + + async def set_cassette_type(self, cassette_type: int) -> None: + print(f"Setting cassette type to {cassette_type}.") + + async def set_column_volume(self, column: int, volume: float) -> None: + print(f"Setting column {column} volume to {volume} uL.") + + async def set_dispensing_height(self, height: int) -> None: + print(f"Setting dispensing height to {height}.") + + async def set_pump_speed(self, speed: int) -> None: + print(f"Setting pump speed to {speed}%.") + + async def set_dispensing_order(self, order: int) -> None: + print(f"Setting dispensing order to {order}.") + + async def abort(self) -> None: + print("Aborting.") diff --git a/pylabrobot/bulk_dispensers/tests/__init__.py b/pylabrobot/bulk_dispensers/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/pylabrobot/bulk_dispensers/tests/bulk_dispenser_tests.py b/pylabrobot/bulk_dispensers/tests/bulk_dispenser_tests.py new file mode 100644 index 00000000000..26314145247 --- /dev/null +++ b/pylabrobot/bulk_dispensers/tests/bulk_dispenser_tests.py @@ -0,0 +1,91 @@ +import unittest + +from pylabrobot.bulk_dispensers import ( + BulkDispenser, + BulkDispenserBackend, + BulkDispenserChatterboxBackend, +) + + +class BulkDispenserSetupTests(unittest.IsolatedAsyncioTestCase): + """Test setup/stop lifecycle and need_setup_finished guard.""" + + def setUp(self): + self.backend = BulkDispenserChatterboxBackend() + self.dispenser = BulkDispenser(backend=self.backend) + + async def test_methods_fail_before_setup(self): + with self.assertRaises(RuntimeError): + await self.dispenser.dispense() + with self.assertRaises(RuntimeError): + await self.dispenser.prime(volume=100.0) + with self.assertRaises(RuntimeError): + await self.dispenser.abort() + + async def test_setup_and_stop(self): + await self.dispenser.setup() + self.assertTrue(self.dispenser.setup_finished) + await self.dispenser.stop() + self.assertFalse(self.dispenser.setup_finished) + + async def test_context_manager(self): + async with BulkDispenser(backend=BulkDispenserChatterboxBackend()) as d: + self.assertTrue(d.setup_finished) + self.assertFalse(d.setup_finished) + + +class BulkDispenserDelegationTests(unittest.IsolatedAsyncioTestCase): + """Test that frontend methods delegate to the backend.""" + + async def asyncSetUp(self): + self.backend = unittest.mock.MagicMock(spec=BulkDispenserBackend) + self.dispenser = BulkDispenser(backend=self.backend) + self.dispenser._setup_finished = True + + async def test_dispense(self): + await self.dispenser.dispense() + self.backend.dispense.assert_awaited_once() + + async def test_prime(self): + await self.dispenser.prime(volume=50.0) + self.backend.prime.assert_awaited_once_with(volume=50.0) + + async def test_empty(self): + await self.dispenser.empty(volume=100.0) + self.backend.empty.assert_awaited_once_with(volume=100.0) + + async def test_shake(self): + await self.dispenser.shake(time=5.0, distance=3, speed=10) + self.backend.shake.assert_awaited_once_with(time=5.0, distance=3, speed=10) + + async def test_move_plate_out(self): + await self.dispenser.move_plate_out() + self.backend.move_plate_out.assert_awaited_once() + + async def test_set_plate_type(self): + await self.dispenser.set_plate_type(plate_type=3) + self.backend.set_plate_type.assert_awaited_once_with(plate_type=3) + + async def test_set_cassette_type(self): + await self.dispenser.set_cassette_type(cassette_type=1) + self.backend.set_cassette_type.assert_awaited_once_with(cassette_type=1) + + async def test_set_column_volume(self): + await self.dispenser.set_column_volume(column=0, volume=25.0) + self.backend.set_column_volume.assert_awaited_once_with(column=0, volume=25.0) + + async def test_set_dispensing_height(self): + await self.dispenser.set_dispensing_height(height=2500) + self.backend.set_dispensing_height.assert_awaited_once_with(height=2500) + + async def test_set_pump_speed(self): + await self.dispenser.set_pump_speed(speed=50) + self.backend.set_pump_speed.assert_awaited_once_with(speed=50) + + async def test_set_dispensing_order(self): + await self.dispenser.set_dispensing_order(order=1) + self.backend.set_dispensing_order.assert_awaited_once_with(order=1) + + async def test_abort(self): + await self.dispenser.abort() + self.backend.abort.assert_awaited_once() diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/__init__.py b/pylabrobot/bulk_dispensers/thermo_scientific/__init__.py new file mode 100644 index 00000000000..f7aff58e4b3 --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/__init__.py @@ -0,0 +1,13 @@ +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi import ( + MultidropCombiBackend, + MultidropCombiCommunicationError, + MultidropCombiError, + MultidropCombiInstrumentError, + CassetteType, + DispensingOrder, + EmptyMode, + PrimeMode, + plate_to_pla_params, + plate_to_type_index, + plate_well_count, +) diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/__init__.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/__init__.py new file mode 100644 index 00000000000..0d4ac7fe73b --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/__init__.py @@ -0,0 +1,19 @@ +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.backend import ( + MultidropCombiBackend, +) +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.enums import ( + CassetteType, + DispensingOrder, + EmptyMode, + PrimeMode, +) +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.errors import ( + MultidropCombiCommunicationError, + MultidropCombiError, + MultidropCombiInstrumentError, +) +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.helpers import ( + plate_to_pla_params, + plate_to_type_index, + plate_well_count, +) diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/actions.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/actions.py new file mode 100644 index 00000000000..74c56eec022 --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/actions.py @@ -0,0 +1,29 @@ +"""Control operations mixin for the Multidrop Combi.""" +from __future__ import annotations + +from typing import Optional + +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.errors import ( + MultidropCombiCommunicationError, +) +from pylabrobot.io.serial import Serial + + +class MultidropCombiActionsMixin: + """Mixin providing control operations for the Multidrop Combi.""" + + io: Optional[Serial] + + async def abort(self) -> None: + """Send ESC character to abort the current operation.""" + if self.io is None: + raise MultidropCombiCommunicationError("Not connected to instrument", operation="abort") + await self.io.write(b"\x1b") + + async def restart(self) -> None: + """Restart the instrument (equivalent to power cycle).""" + await self._send_command("RST", timeout=10.0) # type: ignore[attr-defined] + + async def acknowledge_error(self) -> None: + """Clear instrument error state.""" + await self._send_command("EAK", timeout=5.0) # type: ignore[attr-defined] diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/backend.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/backend.py new file mode 100644 index 00000000000..64cf735fdd2 --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/backend.py @@ -0,0 +1,115 @@ +"""Composed backend for the Thermo Scientific Multidrop Combi.""" +from __future__ import annotations + +import asyncio +import logging +from typing import Optional + +from pylabrobot.bulk_dispensers.backend import BulkDispenserBackend +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.actions import ( + MultidropCombiActionsMixin, +) +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.commands import ( + MultidropCombiCommandsMixin, +) +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.communication import ( + MultidropCombiCommunicationMixin, +) +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.queries import ( + MultidropCombiQueriesMixin, +) +from pylabrobot.io.serial import Serial + +logger = logging.getLogger(__name__) + + +class MultidropCombiBackend( + MultidropCombiCommunicationMixin, + MultidropCombiQueriesMixin, + MultidropCombiActionsMixin, + MultidropCombiCommandsMixin, + BulkDispenserBackend, +): + """Backend for the Thermo Scientific Multidrop Combi reagent dispenser. + + Communication is via RS232/USB serial at 9600 baud, 8N1. + + Args: + port: Serial port (e.g. "COM3", "/dev/ttyUSB0"). If None, auto-detected by VID/PID. + timeout: Default serial read timeout in seconds. + """ + + def __init__( + self, + port: str | None = None, + timeout: float = 30.0, + ) -> None: + super().__init__() + self._port = port + self.timeout = timeout + self.io: Optional[Serial] = None + self._command_lock: Optional[asyncio.Lock] = None + self._instrument_name: str = "" + self._firmware_version: str = "" + self._serial_number: str = "" + + async def setup(self) -> None: + self._command_lock = asyncio.Lock() + + # When port is specified, skip VID/PID discovery (the Multidrop is often + # connected via an RS232-to-USB adapter with a different VID/PID). + serial_kwargs = dict( + human_readable_device_name="Multidrop Combi", + baudrate=9600, + bytesize=8, + parity="N", + stopbits=1, + timeout=self.timeout, + write_timeout=5, + ) + if self._port: + serial_kwargs["port"] = self._port + else: + serial_kwargs["vid"] = 0x0AB6 + serial_kwargs["pid"] = 0x0344 + + self.io = Serial(**serial_kwargs) + await self.io.setup() + + # Enable XON/XOFF flow control on the underlying serial port + if self.io._ser is not None: + self.io._ser.xonxoff = True + + await self._drain_stale_data() + + info = await self._enter_remote_mode() + self._instrument_name = info["instrument_name"] + self._firmware_version = info["firmware_version"] + self._serial_number = info["serial_number"] + + logger.info( + "Connected to %s (FW: %s, SN: %s)", + self._instrument_name, + self._firmware_version, + self._serial_number, + ) + + # Clear any pending errors + try: + await self.acknowledge_error() + except Exception: + pass + + async def stop(self) -> None: + await self._exit_remote_mode() + if self.io is not None: + await self.io.stop() + self.io = None + self._command_lock = None + + def serialize(self) -> dict: + return { + **super().serialize(), + "port": self._port, + "timeout": self.timeout, + } diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/commands.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/commands.py new file mode 100644 index 00000000000..4c1b3765d12 --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/commands.py @@ -0,0 +1,249 @@ +"""Operational commands mixin for the Multidrop Combi. + +All volume parameters at the public interface are in microliters (float). +Internally, volumes are converted to the instrument's native 1/10 uL units. +""" +from __future__ import annotations + +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.enums import ( + EmptyMode, + PrimeMode, +) + +# Per-command timeout constants (seconds) +COMMAND_TIMEOUTS = { + "SPL": 5.0, + "SCT": 5.0, + "SCV": 5.0, + "SDH": 5.0, + "SPS": 5.0, + "SDO": 5.0, + "SOF": 5.0, + "SPV": 5.0, + "PLA": 5.0, + "EAK": 5.0, + "POU": 10.0, + "RST": 10.0, + "DIS": 120.0, + "PRI": 60.0, + "EMP": 60.0, + "SHA": 120.0, + "BGN": 120.0, +} + + +def _ul_to_tenths(volume_ul: float) -> int: + """Convert microliters to 1/10 uL integer.""" + return round(volume_ul * 10) + + +class MultidropCombiCommandsMixin: + """Mixin providing operational commands for the Multidrop Combi.""" + + async def dispense(self) -> None: + """Dispense liquid to the plate (DIS command).""" + await self._send_command("DIS", timeout=COMMAND_TIMEOUTS["DIS"]) # type: ignore[attr-defined] + + async def prime(self, volume: float, mode: PrimeMode = PrimeMode.STANDARD) -> None: + """Prime dispenser hoses. + + Args: + volume: Prime volume in microliters. + mode: Prime mode (standard, continuous, stop continuous, calibration). + """ + vol_tenths = _ul_to_tenths(volume) + if vol_tenths < 10 or vol_tenths > 100000: + raise ValueError(f"Prime volume must be 1-10000 uL, got {volume} uL") + cmd = f"PRI {vol_tenths}" + if mode != PrimeMode.STANDARD: + cmd += f" {mode.value}" + timeout = COMMAND_TIMEOUTS["PRI"] + volume / 100.0 + await self._send_command(cmd, timeout=timeout) # type: ignore[attr-defined] + + async def empty(self, volume: float, mode: EmptyMode = EmptyMode.STANDARD) -> None: + """Empty dispenser hoses. + + Args: + volume: Empty volume in microliters. + mode: Empty mode (standard or continuous). + """ + vol_tenths = _ul_to_tenths(volume) + if vol_tenths < 10 or vol_tenths > 100000: + raise ValueError(f"Empty volume must be 1-10000 uL, got {volume} uL") + cmd = f"EMP {vol_tenths}" + if mode != EmptyMode.STANDARD: + cmd += f" {mode.value}" + timeout = COMMAND_TIMEOUTS["EMP"] + volume / 100.0 + await self._send_command(cmd, timeout=timeout) # type: ignore[attr-defined] + + async def shake(self, time: float, distance: int, speed: int) -> None: + """Shake the plate. + + Args: + time: Duration in seconds. + distance: Shake distance in mm (1-5). + speed: Shake frequency in Hz (1-20). + """ + if not 1 <= distance <= 5: + raise ValueError(f"Shake distance must be 1-5 mm, got {distance}") + if not 1 <= speed <= 20: + raise ValueError(f"Shake speed must be 1-20 Hz, got {speed}") + time_hundredths = round(time * 100) + if time_hundredths < 1: + raise ValueError(f"Shake time must be > 0, got {time}s") + timeout = COMMAND_TIMEOUTS["SHA"] + time + await self._send_command( # type: ignore[attr-defined] + f"SHA {time_hundredths} {distance} {speed}", timeout=timeout + ) + + async def move_plate_out(self) -> None: + """Move plate carrier to loading position (POU command).""" + await self._send_command( # type: ignore[attr-defined] + "POU", timeout=COMMAND_TIMEOUTS["POU"] + ) + + async def set_plate_type(self, plate_type: int) -> None: + """Set plate type. + + Args: + plate_type: Plate type index (0-29; 0-9 factory-defined, 10-29 user-defined). + """ + if not 0 <= plate_type <= 29: + raise ValueError(f"Plate type must be 0-29, got {plate_type}") + await self._send_command( # type: ignore[attr-defined] + f"SPL {plate_type}", timeout=COMMAND_TIMEOUTS["SPL"] + ) + + async def set_cassette_type(self, cassette_type: int) -> None: + """Set cassette type. + + Args: + cassette_type: Cassette type (0=Standard, 1=Small, 2-3=User-defined). + """ + if not 0 <= cassette_type <= 3: + raise ValueError(f"Cassette type must be 0-3, got {cassette_type}") + await self._send_command( # type: ignore[attr-defined] + f"SCT {cassette_type}", timeout=COMMAND_TIMEOUTS["SCT"] + ) + + async def set_column_volume(self, column: int, volume: float) -> None: + """Set dispense volume for a column. + + Args: + column: Column number (0 = all columns, 1-48 = specific column). + volume: Volume in microliters. + """ + if not 0 <= column <= 48: + raise ValueError(f"Column must be 0-48, got {column}") + vol_tenths = _ul_to_tenths(volume) + await self._send_command( # type: ignore[attr-defined] + f"SCV {column} {vol_tenths}", timeout=COMMAND_TIMEOUTS["SCV"] + ) + + async def set_dispensing_height(self, height: int) -> None: + """Set dispensing height. + + Args: + height: Height in 1/100 mm (500-5500). + """ + if not 500 <= height <= 5500: + raise ValueError(f"Dispensing height must be 500-5500, got {height}") + await self._send_command( # type: ignore[attr-defined] + f"SDH {height}", timeout=COMMAND_TIMEOUTS["SDH"] + ) + + async def set_pump_speed(self, speed: int) -> None: + """Set pump speed as percentage of cassette range. + + Args: + speed: Speed percentage (1-100). + """ + if not 1 <= speed <= 100: + raise ValueError(f"Pump speed must be 1-100, got {speed}") + await self._send_command( # type: ignore[attr-defined] + f"SPS {speed}", timeout=COMMAND_TIMEOUTS["SPS"] + ) + + async def set_dispensing_order(self, order: int) -> None: + """Set dispensing order. + + Args: + order: 0 = row-wise, 1 = column-wise. + """ + if order not in (0, 1): + raise ValueError(f"Dispensing order must be 0 or 1, got {order}") + await self._send_command( # type: ignore[attr-defined] + f"SDO {order}", timeout=COMMAND_TIMEOUTS["SDO"] + ) + + async def set_dispense_offset(self, x_offset: int, y_offset: int) -> None: + """Set X/Y dispense offset. + + Args: + x_offset: X offset in 1/100 mm (±300). + y_offset: Y offset in 1/100 mm (±300). + """ + if not -300 <= x_offset <= 300: + raise ValueError(f"X offset must be ±300, got {x_offset}") + if not -300 <= y_offset <= 300: + raise ValueError(f"Y offset must be ±300, got {y_offset}") + await self._send_command( # type: ignore[attr-defined] + f"SOF {x_offset} {y_offset}", timeout=COMMAND_TIMEOUTS["SOF"] + ) + + async def set_predispense_volume(self, volume: float) -> None: + """Set predispense volume. + + Args: + volume: Predispense volume in microliters. + """ + vol_tenths = _ul_to_tenths(volume) + if vol_tenths < 10 or vol_tenths > 100000: + raise ValueError(f"Predispense volume must be 1-10000 uL, got {volume} uL") + await self._send_command( # type: ignore[attr-defined] + f"SPV {vol_tenths}", timeout=COMMAND_TIMEOUTS["SPV"] + ) + + async def define_plate( + self, + column_positions: int, + row_positions: int, + rows: int, + columns: int, + height: int, + max_volume: int, + x_offset: int = 0, + y_offset: int = 0, + ) -> None: + """Define a remote plate (PLA command). + + Args: + column_positions: Number of column positions. + row_positions: Number of row positions. + rows: Number of rows. + columns: Number of columns. + height: Plate height in 1/100 mm. + max_volume: Maximum well volume in 1/10 uL. + x_offset: X offset in 1/100 mm. + y_offset: Y offset in 1/100 mm. + """ + await self._send_command( # type: ignore[attr-defined] + f"PLA {column_positions} {row_positions} {rows} {columns} " + f"{height} {max_volume} {x_offset} {y_offset}", + timeout=COMMAND_TIMEOUTS["PLA"], + ) + + async def start_protocol(self, plate_type: int | None = None, + protocol_name: str | None = None) -> None: + """Start a protocol from instrument memory (BGN command). + + Args: + plate_type: Optional plate type override. + protocol_name: Optional protocol name. + """ + cmd = "BGN" + if plate_type is not None: + cmd += f" {plate_type}" + if protocol_name is not None: + cmd += f" {protocol_name}" + await self._send_command(cmd, timeout=COMMAND_TIMEOUTS["BGN"]) # type: ignore[attr-defined] diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/communication.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/communication.py new file mode 100644 index 00000000000..85ef87f1448 --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/communication.py @@ -0,0 +1,223 @@ +"""Low-level serial communication mixin for the Multidrop Combi. + +Ported from the SiLA implementation's serial_transport.py to use +pylabrobot's async Serial wrapper. +""" +from __future__ import annotations + +import asyncio +import logging +from typing import Optional + +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.errors import ( + MultidropCombiCommunicationError, + MultidropCombiInstrumentError, +) +from pylabrobot.io.serial import Serial + +logger = logging.getLogger(__name__) + +STATUS_OK = 0 + +ERROR_DESCRIPTIONS = { + 1: "Internal firmware error", + 2: "Unrecognized command", + 3: "Invalid command arguments", + 4: "Pump position error", + 5: "Plate X position error", + 6: "Plate Y position error", + 7: "Z position error", + 9: "Attempt to reset serial number", + 10: "Nonvolatile parameters lost", + 11: "No more memory for user data", + 12: "Pump or X motor was running", + 13: "X and Z positions conflict", + 14: "Cannot dispense: pump not primed", + 15: "Missing prime vessel", + 16: "Rotor shield not in place", + 17: "Dispense volume for all columns is 0", + 18: "Invalid plate type (bad plate index)", + 19: "Plate has not been defined", + 20: "Invalid rows in plate definition", + 21: "Invalid columns in plate definition", + 22: "Plate height is invalid", + 23: "Plate well volume invalid (too small or too big)", + 24: "Invalid cassette type (bad cassette index)", + 25: "Cassette not defined", + 26: "Invalid volume increment for cassette", + 27: "Invalid maximum volume for cassette", + 28: "Invalid minimum volume for cassette", + 29: "Invalid min/max pump speed for cassette", + 30: "Invalid pump rotor offset in cassette definition", + 32: "Dispensing volume not within cassette limits", + 33: "Invalid selector channel", + 34: "Invalid dispensing speed", + 35: "Dispensing height too low for plate", + 36: "Predispense volume not within cassette limits", + 37: "Invalid dispensing order", + 38: "Invalid X or Y dispensing offset", + 39: "RFID option not present", + 40: "RFID tag not present", + 41: "RFID tag data checksum incorrect", + 43: "Wrong cassette type", + 44: "Protocol/plate in use, cannot modify or delete", + 45: "Protocol/plate/cassette is read-only", +} + + +class MultidropCombiCommunicationMixin: + """Mixin providing low-level serial communication for the Multidrop Combi.""" + + io: Optional[Serial] + _command_lock: Optional[asyncio.Lock] + + async def _send_command(self, cmd: str, timeout: float | None = None) -> list[str]: + """Send a command and return the data lines from the response. + + Args: + cmd: Command string (e.g. "DIS", "SPL 1", "SCV 0 500"). + timeout: Per-command read timeout in seconds. If None, uses default. + + Returns: + List of data lines (between echo and END terminator). + + Raises: + MultidropCombiCommunicationError: If not connected or communication fails. + MultidropCombiInstrumentError: If instrument returns non-zero status code. + """ + if self.io is None or self._command_lock is None: + raise MultidropCombiCommunicationError("Not connected to instrument", operation=cmd) + + assert self.io._ser is not None, "Serial port not open. Did you call setup()?" + + cmd_code = cmd.split()[0] + + async with self._command_lock: + original_timeout = self.io._ser.timeout + if timeout is not None: + self.io._ser.timeout = timeout + try: + logger.debug("TX: %r", cmd) + await self.io.write(f"{cmd}\r".encode("ascii")) + + lines: list[str] = [] + while True: + raw = await self.io.readline() + if not raw: + raise MultidropCombiCommunicationError( + f"Timeout reading response for {cmd_code}", operation=cmd + ) + line = raw.decode("ascii", errors="replace").strip() + logger.debug("RX: %r", line) + if not line: + continue + lines.append(line) + + if line.startswith(cmd_code) and " END " in line: + break + + # Parse status from END terminator + end_line = lines[-1] + parts = end_line.split() + status_code = int(parts[-1]) if parts[-1].isdigit() else -1 + + if status_code != STATUS_OK: + desc = ERROR_DESCRIPTIONS.get(status_code, "Unknown error") + logger.error("Command %s failed (status %d). RX lines: %s", + cmd_code, status_code, lines) + raise MultidropCombiInstrumentError(status_code, desc) + + # Return data lines: skip echo (first) and END line (last) + # The instrument may echo just the command code or the full command + data_lines = [] + for line in lines[:-1]: + line_upper = line.strip().upper() + if line_upper == cmd.strip().upper() or line_upper == cmd_code.upper(): + continue + data_lines.append(line) + + return data_lines + + except (MultidropCombiCommunicationError, MultidropCombiInstrumentError): + raise + except Exception as e: + raise MultidropCombiCommunicationError( + f"Communication error during {cmd_code}: {e}", + operation=cmd, + original_error=e, + ) from e + finally: + if timeout is not None: + self.io._ser.timeout = original_timeout + + async def _drain_stale_data(self) -> None: + """Drain any stale data from the serial buffer.""" + if self.io is None: + return + + assert self.io._ser is not None + + await self.io.reset_input_buffer() + await self.io.reset_output_buffer() + + original_timeout = self.io._ser.timeout + self.io._ser.timeout = 0.3 + drained = 0 + try: + while True: + stale = await self.io.readline() + if not stale: + break + drained += 1 + logger.debug("Drained stale data: %r", stale) + finally: + self.io._ser.timeout = original_timeout + if drained: + logger.info("Drained %d stale lines from serial buffer", drained) + + async def _enter_remote_mode(self) -> dict: + """Send VER to enter remote control mode and get instrument info. + + Returns: + Dict with keys: instrument_name, firmware_version, serial_number. + """ + try: + lines = await self._send_command("VER", timeout=5.0) + except Exception as first_err: + logger.warning("VER failed (%s), sending EAK and retrying...", first_err) + try: + await self._send_command("EAK", timeout=5.0) + except Exception: + pass + try: + lines = await self._send_command("VER", timeout=5.0) + except Exception as e: + raise MultidropCombiCommunicationError( + f"VER command failed: {e}", operation="VER", original_error=e + ) from e + + info = { + "instrument_name": "Unknown", + "firmware_version": "Unknown", + "serial_number": "Unknown", + } + if lines: + raw = lines[0] + if raw.upper().startswith("VER "): + raw = raw[4:] + parts = raw.split() + if len(parts) > 0: + info["instrument_name"] = parts[0] + if len(parts) > 1: + info["firmware_version"] = parts[1] + if len(parts) > 2: + info["serial_number"] = parts[2] + + return info + + async def _exit_remote_mode(self) -> None: + """Send QIT to exit remote control mode.""" + try: + await self._send_command("QIT", timeout=5.0) + except Exception: + pass diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/enums.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/enums.py new file mode 100644 index 00000000000..af964de8123 --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/enums.py @@ -0,0 +1,25 @@ +import enum + + +class CassetteType(enum.IntEnum): + STANDARD = 0 + SMALL = 1 + USER_DEFINED_1 = 2 + USER_DEFINED_2 = 3 + + +class DispensingOrder(enum.IntEnum): + ROW_WISE = 0 + COLUMN_WISE = 1 + + +class PrimeMode(enum.IntEnum): + STANDARD = 0 + CONTINUOUS = 1 + STOP_CONTINUOUS = 2 + CALIBRATION = 3 + + +class EmptyMode(enum.IntEnum): + STANDARD = 0 + CONTINUOUS = 1 diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/errors.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/errors.py new file mode 100644 index 00000000000..83e751bd1f4 --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/errors.py @@ -0,0 +1,21 @@ +class MultidropCombiError(Exception): + """Base exception for Multidrop Combi errors.""" + + +class MultidropCombiCommunicationError(MultidropCombiError): + """Serial communication failure (port not found, timeout, connection lost).""" + + def __init__(self, message: str, operation: str = "", + original_error: Exception | None = None) -> None: + self.operation = operation + self.original_error = original_error + super().__init__(message) + + +class MultidropCombiInstrumentError(MultidropCombiError): + """Instrument returned a non-zero status code.""" + + def __init__(self, status_code: int, description: str) -> None: + self.status_code = status_code + self.description = description + super().__init__(f"Instrument error (status {status_code}): {description}") diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/helpers.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/helpers.py new file mode 100644 index 00000000000..cf7852c10a4 --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/helpers.py @@ -0,0 +1,144 @@ +"""Plate type helpers for the Multidrop Combi. + +Maps PyLabRobot Plate resources to Multidrop Combi plate type indices and +PLA (remote plate definition) command parameters. +""" +from __future__ import annotations + +from pylabrobot.resources import Plate + +# Multidrop Combi factory plate type definitions (from manual Table 3-3). +# Type index → (well_count, max_plate_height_mm) +# Heights are upper bounds for selecting the best-fit factory type. +_FACTORY_96_WELL_TYPES = [ + # (type_index, max_height_mm) + (0, 18.0), # Type 0: 96-well, 15mm + (1, 30.0), # Type 1: 96-well, 22mm + (2, 55.0), # Type 2: 96-well, 44mm +] + +_FACTORY_384_WELL_TYPES = [ + (3, 8.5), # Type 3: 384-well, 7.5mm + (4, 12.0), # Type 4: 384-well, 10mm + (5, 18.0), # Type 5: 384-well, 15mm + (6, 30.0), # Type 6: 384-well, 22mm + (7, 55.0), # Type 7: 384-well, 44mm +] + +_FACTORY_1536_WELL_TYPES = [ + (8, 7.0), # Type 8: 1536-well, 5mm + (9, 55.0), # Type 9: 1536-well, 10.5mm +] + +# Hardware limits +MAX_COLUMNS = 48 +MAX_ROWS = 32 +MIN_HEIGHT_HUNDREDTHS_MM = 500 # 5mm +MAX_HEIGHT_HUNDREDTHS_MM = 5500 # 55mm +MAX_VOLUME_TENTHS_UL = 25000 # 2500 uL + + +def plate_to_type_index(plate: Plate) -> int: + """Map a PLR Plate to the best-fit Multidrop Combi factory plate type index. + + Selects the factory type based on well count and plate height (size_z). + The smallest factory type whose height threshold accommodates the plate is chosen. + + Args: + plate: A PyLabRobot Plate resource. + + Returns: + Factory plate type index (0-9). + + Raises: + ValueError: If the plate well count is not 96, 384, or 1536, or if the + plate height exceeds all factory type thresholds. + """ + wells = plate.num_items + height_mm = plate.get_size_z() + + if wells == 96: + type_list = _FACTORY_96_WELL_TYPES + elif wells == 384: + type_list = _FACTORY_384_WELL_TYPES + elif wells == 1536: + type_list = _FACTORY_1536_WELL_TYPES + else: + raise ValueError( + f"Unsupported well count: {wells}. " + "Multidrop factory types support 96, 384, or 1536 wells. " + "Use plate_to_pla_params() for custom plate definitions." + ) + + for type_index, max_height in type_list: + if height_mm <= max_height: + return type_index + + raise ValueError( + f"Plate height {height_mm}mm exceeds all factory type thresholds for {wells}-well plates." + ) + + +def plate_to_pla_params(plate: Plate) -> dict: + """Convert a PLR Plate to Multidrop Combi PLA command parameters. + + Use this for plates that don't match factory types (types 0-9), or when you + want precise control over the plate definition sent to the instrument. + The returned dict can be passed directly to ``backend.define_plate(**params)``. + + Args: + plate: A PyLabRobot Plate resource. + + Returns: + Dict with keys matching ``define_plate()`` parameters: + column_positions, row_positions, rows, columns, height, max_volume. + + Raises: + ValueError: If any parameter exceeds Multidrop hardware limits. + """ + columns = plate.num_items_x + rows = plate.num_items_y + height_hundredths = round(plate.get_size_z() * 100) + + # Get max_volume from first well + first_well = next(iter(plate.get_all_children())) + well_max_volume_tenths = round(first_well.max_volume * 10) + + # Validate against hardware limits + if columns > MAX_COLUMNS: + raise ValueError( + f"Plate has {columns} columns, but Multidrop supports at most {MAX_COLUMNS}." + ) + if rows > MAX_ROWS: + raise ValueError( + f"Plate has {rows} rows, but Multidrop supports at most {MAX_ROWS}." + ) + if height_hundredths < MIN_HEIGHT_HUNDREDTHS_MM: + raise ValueError( + f"Plate height {plate.get_size_z()}mm is below minimum " + f"{MIN_HEIGHT_HUNDREDTHS_MM / 100}mm." + ) + if height_hundredths > MAX_HEIGHT_HUNDREDTHS_MM: + raise ValueError( + f"Plate height {plate.get_size_z()}mm exceeds maximum " + f"{MAX_HEIGHT_HUNDREDTHS_MM / 100}mm." + ) + if well_max_volume_tenths > MAX_VOLUME_TENTHS_UL: + raise ValueError( + f"Well max volume {first_well.max_volume} uL exceeds Multidrop limit of " + f"{MAX_VOLUME_TENTHS_UL / 10} uL." + ) + + return { + "column_positions": columns, + "row_positions": rows, + "rows": rows, + "columns": columns, + "height": height_hundredths, + "max_volume": well_max_volume_tenths, + } + + +def plate_well_count(plate: Plate) -> int: + """Return the total well count for a plate.""" + return plate.num_items diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/queries.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/queries.py new file mode 100644 index 00000000000..23ba375f124 --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/queries.py @@ -0,0 +1,46 @@ +"""Query operations mixin for the Multidrop Combi.""" +from __future__ import annotations + + +class MultidropCombiQueriesMixin: + """Mixin providing query operations for the Multidrop Combi.""" + + _instrument_name: str + _firmware_version: str + _serial_number: str + + def get_version(self) -> dict: + """Return cached instrument identification info. + + Returns: + Dict with keys: instrument_name, firmware_version, serial_number. + """ + return { + "instrument_name": self._instrument_name, + "firmware_version": self._firmware_version, + "serial_number": self._serial_number, + } + + async def report_parameters(self) -> list[str]: + """Report instrument parameters (REP command). + + Returns: + List of parameter lines from the instrument. + """ + return await self._send_command("REP", timeout=10.0) # type: ignore[attr-defined] + + async def read_error_log(self) -> list[str]: + """Read the instrument error log (LOG command). + + Returns: + List of error log lines. + """ + return await self._send_command("LOG", timeout=10.0) # type: ignore[attr-defined] + + async def read_cassette_info(self) -> list[str]: + """Read RFID cassette info (RIR command). + + Returns: + List of cassette info lines. + """ + return await self._send_command("RIR", timeout=5.0) # type: ignore[attr-defined] diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/__init__.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/backend_tests.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/backend_tests.py new file mode 100644 index 00000000000..ed326265d1c --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/backend_tests.py @@ -0,0 +1,64 @@ +import asyncio +import unittest +from unittest.mock import AsyncMock, MagicMock, patch + +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.backend import ( + MultidropCombiBackend, +) + + +class BackendSerializationTests(unittest.TestCase): + def test_serialize(self): + backend = MultidropCombiBackend(port="COM3", timeout=15.0) + data = backend.serialize() + self.assertEqual(data["type"], "MultidropCombiBackend") + self.assertEqual(data["port"], "COM3") + self.assertEqual(data["timeout"], 15.0) + + def test_serialize_defaults(self): + backend = MultidropCombiBackend() + data = backend.serialize() + self.assertIsNone(data["port"]) + self.assertEqual(data["timeout"], 30.0) + + +class BackendLifecycleTests(unittest.IsolatedAsyncioTestCase): + @patch("pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.backend.Serial") + async def test_setup_and_stop(self, MockSerial): + mock_serial = MagicMock() + mock_serial.setup = AsyncMock() + mock_serial.stop = AsyncMock() + mock_serial.write = AsyncMock() + mock_serial.readline = AsyncMock() + mock_serial.reset_input_buffer = AsyncMock() + mock_serial.reset_output_buffer = AsyncMock() + mock_serial._ser = MagicMock() + mock_serial._ser.timeout = 30.0 + MockSerial.return_value = mock_serial + + # Setup readline responses: drain (empty), VER, EAK + mock_serial.readline.side_effect = [ + b"", # drain - empty + b"VER\r\n", # VER echo + b"MultidropCombi 2.00.29 836-4191\r\n", # VER data + b"VER END 0\r\n", # VER end + b"EAK\r\n", # EAK echo + b"EAK END 0\r\n", # EAK end + ] + + backend = MultidropCombiBackend(port="COM3") + await backend.setup() + + self.assertEqual(backend._instrument_name, "MultidropCombi") + self.assertEqual(backend._firmware_version, "2.00.29") + self.assertEqual(backend._serial_number, "836-4191") + self.assertIsNotNone(backend.io) + + # Reset readline for QIT during stop + mock_serial.readline.side_effect = [ + b"QIT\r\n", + b"QIT END 0\r\n", + ] + await backend.stop() + self.assertIsNone(backend.io) + mock_serial.stop.assert_awaited_once() diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/commands_tests.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/commands_tests.py new file mode 100644 index 00000000000..40195331809 --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/commands_tests.py @@ -0,0 +1,169 @@ +import asyncio +import unittest +from unittest.mock import AsyncMock, MagicMock + +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.commands import ( + MultidropCombiCommandsMixin, + _ul_to_tenths, +) +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.enums import PrimeMode + + +class MockCommandsBackend(MultidropCombiCommandsMixin): + """Testable class with _send_command mocked.""" + + def __init__(self): + self._send_command = AsyncMock(return_value=[]) + + +class VolumeConversionTests(unittest.TestCase): + def test_ul_to_tenths(self): + self.assertEqual(_ul_to_tenths(1.0), 10) + self.assertEqual(_ul_to_tenths(50.0), 500) + self.assertEqual(_ul_to_tenths(0.1), 1) + self.assertEqual(_ul_to_tenths(10000.0), 100000) + + def test_ul_to_tenths_rounding(self): + self.assertEqual(_ul_to_tenths(1.06), 11) + self.assertEqual(_ul_to_tenths(1.04), 10) + + +class CommandFormattingTests(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self.backend = MockCommandsBackend() + + async def test_dispense(self): + await self.backend.dispense() + self.backend._send_command.assert_awaited_once() + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "DIS") + + async def test_prime_standard(self): + await self.backend.prime(volume=50.0) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "PRI 500") + + async def test_prime_continuous(self): + await self.backend.prime(volume=50.0, mode=PrimeMode.CONTINUOUS) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "PRI 500 1") + + async def test_empty(self): + await self.backend.empty(volume=100.0) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "EMP 1000") + + async def test_shake(self): + await self.backend.shake(time=5.0, distance=3, speed=10) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "SHA 500 3 10") + + async def test_move_plate_out(self): + await self.backend.move_plate_out() + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "POU") + + async def test_set_plate_type(self): + await self.backend.set_plate_type(plate_type=3) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "SPL 3") + + async def test_set_cassette_type(self): + await self.backend.set_cassette_type(cassette_type=1) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "SCT 1") + + async def test_set_column_volume(self): + await self.backend.set_column_volume(column=0, volume=25.0) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "SCV 0 250") + + async def test_set_dispensing_height(self): + await self.backend.set_dispensing_height(height=2500) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "SDH 2500") + + async def test_set_pump_speed(self): + await self.backend.set_pump_speed(speed=50) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "SPS 50") + + async def test_set_dispensing_order(self): + await self.backend.set_dispensing_order(order=1) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "SDO 1") + + async def test_set_dispense_offset(self): + await self.backend.set_dispense_offset(x_offset=100, y_offset=-50) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "SOF 100 -50") + + async def test_set_predispense_volume(self): + await self.backend.set_predispense_volume(volume=10.0) + args = self.backend._send_command.call_args + self.assertEqual(args[0][0], "SPV 100") + + +class ParameterValidationTests(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self.backend = MockCommandsBackend() + + async def test_prime_volume_too_low(self): + with self.assertRaises(ValueError): + await self.backend.prime(volume=0.0) + + async def test_prime_volume_too_high(self): + with self.assertRaises(ValueError): + await self.backend.prime(volume=20000.0) + + async def test_empty_volume_too_low(self): + with self.assertRaises(ValueError): + await self.backend.empty(volume=0.0) + + async def test_shake_distance_out_of_range(self): + with self.assertRaises(ValueError): + await self.backend.shake(time=5.0, distance=0, speed=10) + with self.assertRaises(ValueError): + await self.backend.shake(time=5.0, distance=6, speed=10) + + async def test_shake_speed_out_of_range(self): + with self.assertRaises(ValueError): + await self.backend.shake(time=5.0, distance=3, speed=0) + with self.assertRaises(ValueError): + await self.backend.shake(time=5.0, distance=3, speed=21) + + async def test_plate_type_out_of_range(self): + with self.assertRaises(ValueError): + await self.backend.set_plate_type(plate_type=-1) + with self.assertRaises(ValueError): + await self.backend.set_plate_type(plate_type=30) + + async def test_cassette_type_out_of_range(self): + with self.assertRaises(ValueError): + await self.backend.set_cassette_type(cassette_type=4) + + async def test_column_out_of_range(self): + with self.assertRaises(ValueError): + await self.backend.set_column_volume(column=49, volume=10.0) + + async def test_dispensing_height_out_of_range(self): + with self.assertRaises(ValueError): + await self.backend.set_dispensing_height(height=499) + with self.assertRaises(ValueError): + await self.backend.set_dispensing_height(height=5501) + + async def test_pump_speed_out_of_range(self): + with self.assertRaises(ValueError): + await self.backend.set_pump_speed(speed=0) + with self.assertRaises(ValueError): + await self.backend.set_pump_speed(speed=101) + + async def test_dispensing_order_invalid(self): + with self.assertRaises(ValueError): + await self.backend.set_dispensing_order(order=2) + + async def test_dispense_offset_out_of_range(self): + with self.assertRaises(ValueError): + await self.backend.set_dispense_offset(x_offset=301, y_offset=0) + with self.assertRaises(ValueError): + await self.backend.set_dispense_offset(x_offset=0, y_offset=-301) diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py new file mode 100644 index 00000000000..ebe398eb280 --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py @@ -0,0 +1,161 @@ +import asyncio +import unittest +from unittest.mock import AsyncMock, MagicMock, patch + +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.communication import ( + MultidropCombiCommunicationMixin, +) +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.errors import ( + MultidropCombiCommunicationError, + MultidropCombiInstrumentError, +) + + +class MockCommunicationBackend(MultidropCombiCommunicationMixin): + """Testable class that uses the communication mixin with a mock Serial.""" + + def __init__(self): + self.io = MagicMock() + self.io._ser = MagicMock() + self.io._ser.timeout = 30.0 + self._command_lock = asyncio.Lock() + + # Make io.write and io.readline async + self.io.write = AsyncMock() + self.io.readline = AsyncMock() + self.io.reset_input_buffer = AsyncMock() + self.io.reset_output_buffer = AsyncMock() + + +class SendCommandTests(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self.backend = MockCommunicationBackend() + + async def test_simple_command(self): + """Test a simple command with echo + END response.""" + self.backend.io.readline.side_effect = [ + b"SPL\r\n", # echo + b"SPL END 0\r\n", # end with status 0 + ] + result = await self.backend._send_command("SPL 1") + self.assertEqual(result, []) + self.backend.io.write.assert_awaited_once_with(b"SPL 1\r") + + async def test_command_with_data_lines(self): + """Test a command that returns data lines between echo and END.""" + self.backend.io.readline.side_effect = [ + b"VER\r\n", + b"MultidropCombi 2.00.29 836-4191\r\n", + b"VER END 0\r\n", + ] + result = await self.backend._send_command("VER") + self.assertEqual(result, ["MultidropCombi 2.00.29 836-4191"]) + + async def test_command_with_error_status(self): + """Test that non-zero status raises MultidropCombiInstrumentError.""" + self.backend.io.readline.side_effect = [ + b"SPL\r\n", + b"SPL END 18\r\n", # status 18 = Invalid plate type + ] + with self.assertRaises(MultidropCombiInstrumentError) as ctx: + await self.backend._send_command("SPL 99") + self.assertEqual(ctx.exception.status_code, 18) + self.assertIn("Invalid plate type", ctx.exception.description) + + async def test_timeout_raises_communication_error(self): + """Test that timeout (empty readline) raises MultidropCombiCommunicationError.""" + self.backend.io.readline.side_effect = [b""] + with self.assertRaises(MultidropCombiCommunicationError): + await self.backend._send_command("SPL 1") + + async def test_not_connected(self): + """Test that sending a command when io is None raises error.""" + self.backend.io = None + with self.assertRaises(MultidropCombiCommunicationError): + await self.backend._send_command("VER") + + async def test_custom_timeout(self): + """Test that custom timeout is set and restored.""" + self.backend.io.readline.side_effect = [ + b"POU\r\n", + b"POU END 0\r\n", + ] + original = self.backend.io._ser.timeout + await self.backend._send_command("POU", timeout=10.0) + # Timeout should be restored after command + self.assertEqual(self.backend.io._ser.timeout, original) + + async def test_echo_skipping_case_insensitive(self): + """Test that echo is skipped regardless of case.""" + self.backend.io.readline.side_effect = [ + b"ver\r\n", # lowercase echo + b"MultidropCombi 2.00.29 836-4191\r\n", + b"VER END 0\r\n", + ] + result = await self.backend._send_command("VER") + self.assertEqual(result, ["MultidropCombi 2.00.29 836-4191"]) + + +class EnterRemoteModeTests(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self.backend = MockCommunicationBackend() + + async def test_enter_remote_mode_success(self): + """Test successful VER command parses instrument info.""" + self.backend.io.readline.side_effect = [ + b"VER\r\n", + b"MultidropCombi 2.00.29 836-4191\r\n", + b"VER END 0\r\n", + ] + info = await self.backend._enter_remote_mode() + self.assertEqual(info["instrument_name"], "MultidropCombi") + self.assertEqual(info["firmware_version"], "2.00.29") + self.assertEqual(info["serial_number"], "836-4191") + + async def test_enter_remote_mode_retry_after_eak(self): + """Test VER retry after EAK when first VER fails.""" + call_count = 0 + + async def readline_side_effect(): + nonlocal call_count + call_count += 1 + responses = [ + # First VER attempt - fails with error + b"VER\r\n", + b"VER END 1\r\n", + # EAK attempt + b"EAK\r\n", + b"EAK END 0\r\n", + # Second VER attempt - succeeds + b"VER\r\n", + b"MultidropCombi 2.00.29 836-4191\r\n", + b"VER END 0\r\n", + ] + if call_count <= len(responses): + return responses[call_count - 1] + return b"" + + self.backend.io.readline.side_effect = readline_side_effect + info = await self.backend._enter_remote_mode() + self.assertEqual(info["instrument_name"], "MultidropCombi") + + +class DrainStaleDataTests(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self.backend = MockCommunicationBackend() + + async def test_drain_with_stale_data(self): + """Test draining stale data from buffer.""" + self.backend.io.readline.side_effect = [ + b"stale line 1\r\n", + b"stale line 2\r\n", + b"", # No more data + ] + await self.backend._drain_stale_data() + self.backend.io.reset_input_buffer.assert_awaited_once() + self.backend.io.reset_output_buffer.assert_awaited_once() + + async def test_drain_empty_buffer(self): + """Test draining when buffer is already empty.""" + self.backend.io.readline.side_effect = [b""] + await self.backend._drain_stale_data() diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/helpers_tests.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/helpers_tests.py new file mode 100644 index 00000000000..daea19ddfaa --- /dev/null +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/helpers_tests.py @@ -0,0 +1,210 @@ +import unittest + +from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.helpers import ( + plate_to_pla_params, + plate_to_type_index, + plate_well_count, +) +from pylabrobot.resources import Plate, Well, create_ordered_items_2d +from pylabrobot.resources.well import CrossSectionType, WellBottomType + + +def _make_plate( + num_items_x: int = 12, + num_items_y: int = 8, + size_z: float = 14.2, + well_max_volume: float = 360.0, + well_size_z: float = 10.67, +) -> Plate: + """Create a test plate with the given parameters.""" + return Plate( + name="test_plate", + size_x=127.76, + size_y=85.48, + size_z=size_z, + model="test", + ordered_items=create_ordered_items_2d( + Well, + num_items_x=num_items_x, + num_items_y=num_items_y, + dx=10.0, + dy=7.0, + dz=1.0, + item_dx=9.0, + item_dy=9.0, + size_x=6.0, + size_y=6.0, + size_z=well_size_z, + bottom_type=WellBottomType.FLAT, + cross_section_type=CrossSectionType.CIRCLE, + max_volume=well_max_volume, + ), + ) + + +class PlateToTypeIndexTests(unittest.TestCase): + """Test factory plate type mapping.""" + + def test_96_well_short(self): + plate = _make_plate(num_items_x=12, num_items_y=8, size_z=14.0) + self.assertEqual(plate_to_type_index(plate), 0) # 15mm type + + def test_96_well_medium(self): + plate = _make_plate(num_items_x=12, num_items_y=8, size_z=20.0) + self.assertEqual(plate_to_type_index(plate), 1) # 22mm type + + def test_96_well_tall(self): + plate = _make_plate(num_items_x=12, num_items_y=8, size_z=40.0) + self.assertEqual(plate_to_type_index(plate), 2) # 44mm type + + def test_384_well_very_short(self): + plate = _make_plate(num_items_x=24, num_items_y=16, size_z=7.0) + self.assertEqual(plate_to_type_index(plate), 3) # 7.5mm type + + def test_384_well_short(self): + plate = _make_plate(num_items_x=24, num_items_y=16, size_z=10.0) + self.assertEqual(plate_to_type_index(plate), 4) # 10mm type + + def test_384_well_medium(self): + plate = _make_plate(num_items_x=24, num_items_y=16, size_z=14.0) + self.assertEqual(plate_to_type_index(plate), 5) # 15mm type + + def test_384_well_tall(self): + plate = _make_plate(num_items_x=24, num_items_y=16, size_z=25.0) + self.assertEqual(plate_to_type_index(plate), 6) # 22mm type + + def test_384_well_very_tall(self): + plate = _make_plate(num_items_x=24, num_items_y=16, size_z=44.0) + self.assertEqual(plate_to_type_index(plate), 7) # 44mm type + + def test_1536_well_short(self): + plate = _make_plate(num_items_x=48, num_items_y=32, size_z=5.0) + self.assertEqual(plate_to_type_index(plate), 8) # 5mm type + + def test_1536_well_tall(self): + plate = _make_plate(num_items_x=48, num_items_y=32, size_z=10.0) + self.assertEqual(plate_to_type_index(plate), 9) # 10.5mm type + + def test_unsupported_well_count(self): + plate = _make_plate(num_items_x=6, num_items_y=4) # 24-well + with self.assertRaises(ValueError) as ctx: + plate_to_type_index(plate) + self.assertIn("24", str(ctx.exception)) + + def test_96_well_too_tall(self): + plate = _make_plate(num_items_x=12, num_items_y=8, size_z=60.0) + with self.assertRaises(ValueError): + plate_to_type_index(plate) + + +class PlateToTypeIndexRealPlatesTests(unittest.TestCase): + """Test with real PLR plate definitions.""" + + def test_corning_96_well(self): + from pylabrobot.resources.corning.plates import Cor_96_wellplate_360ul_Fb + plate = Cor_96_wellplate_360ul_Fb("test") + self.assertEqual(plate_to_type_index(plate), 0) # 14.2mm → type 0 + + def test_biorad_384_well(self): + from pylabrobot.resources.biorad.plates import BioRad_384_wellplate_50uL_Vb + plate = BioRad_384_wellplate_50uL_Vb("test") + self.assertEqual(plate_to_type_index(plate), 4) # 10.4mm → type 4 + + +class PlateToPlaParamsTests(unittest.TestCase): + """Test PLA command parameter generation.""" + + def test_96_well_params(self): + plate = _make_plate( + num_items_x=12, num_items_y=8, size_z=14.2, well_max_volume=360.0 + ) + params = plate_to_pla_params(plate) + self.assertEqual(params["columns"], 12) + self.assertEqual(params["rows"], 8) + self.assertEqual(params["column_positions"], 12) + self.assertEqual(params["row_positions"], 8) + self.assertEqual(params["height"], 1420) # 14.2mm * 100 + self.assertEqual(params["max_volume"], 3600) # 360uL * 10 + + def test_384_well_params(self): + plate = _make_plate( + num_items_x=24, num_items_y=16, size_z=10.4, well_max_volume=50.0 + ) + params = plate_to_pla_params(plate) + self.assertEqual(params["columns"], 24) + self.assertEqual(params["rows"], 16) + self.assertEqual(params["height"], 1040) + self.assertEqual(params["max_volume"], 500) + + def test_real_corning_96_well(self): + from pylabrobot.resources.corning.plates import Cor_96_wellplate_360ul_Fb + plate = Cor_96_wellplate_360ul_Fb("test") + params = plate_to_pla_params(plate) + self.assertEqual(params["columns"], 12) + self.assertEqual(params["rows"], 8) + self.assertEqual(params["height"], 1420) + self.assertEqual(params["max_volume"], 3600) + + +class PlaParamsValidationTests(unittest.TestCase): + """Test parameter validation in plate_to_pla_params.""" + + def test_too_many_columns(self): + plate = _make_plate(num_items_x=49, num_items_y=8, size_z=14.0) + with self.assertRaises(ValueError) as ctx: + plate_to_pla_params(plate) + self.assertIn("49 columns", str(ctx.exception)) + self.assertIn("48", str(ctx.exception)) + + def test_too_many_rows(self): + plate = _make_plate(num_items_x=12, num_items_y=33, size_z=14.0) + with self.assertRaises(ValueError) as ctx: + plate_to_pla_params(plate) + self.assertIn("33 rows", str(ctx.exception)) + self.assertIn("32", str(ctx.exception)) + + def test_height_too_low(self): + plate = _make_plate(size_z=4.0) # 4mm < 5mm minimum + with self.assertRaises(ValueError) as ctx: + plate_to_pla_params(plate) + self.assertIn("4.0mm", str(ctx.exception)) + self.assertIn("minimum", str(ctx.exception)) + + def test_height_too_high(self): + plate = _make_plate(size_z=60.0) # 60mm > 55mm maximum + with self.assertRaises(ValueError) as ctx: + plate_to_pla_params(plate) + self.assertIn("60.0mm", str(ctx.exception)) + self.assertIn("maximum", str(ctx.exception)) + + def test_well_volume_too_high(self): + plate = _make_plate(well_max_volume=3000.0) # 3000uL > 2500uL max + with self.assertRaises(ValueError) as ctx: + plate_to_pla_params(plate) + self.assertIn("3000", str(ctx.exception)) + self.assertIn("2500", str(ctx.exception)) + + def test_height_at_minimum_boundary(self): + plate = _make_plate(size_z=5.0) # exactly 5mm = 500 hundredths + params = plate_to_pla_params(plate) + self.assertEqual(params["height"], 500) + + def test_height_at_maximum_boundary(self): + plate = _make_plate(size_z=55.0) # exactly 55mm = 5500 hundredths + params = plate_to_pla_params(plate) + self.assertEqual(params["height"], 5500) + + def test_volume_at_maximum_boundary(self): + plate = _make_plate(well_max_volume=2500.0) # exactly 2500uL + params = plate_to_pla_params(plate) + self.assertEqual(params["max_volume"], 25000) + + +class PlateWellCountTests(unittest.TestCase): + def test_96_well(self): + plate = _make_plate(num_items_x=12, num_items_y=8) + self.assertEqual(plate_well_count(plate), 96) + + def test_384_well(self): + plate = _make_plate(num_items_x=24, num_items_y=16) + self.assertEqual(plate_well_count(plate), 384) From 4f28b964d9000ba4da7410889c497204977a2972 Mon Sep 17 00:00:00 2001 From: Robert-Keyser-Calico Date: Wed, 25 Mar 2026 14:02:54 -0700 Subject: [PATCH 2/6] Add demo script for Multidrop Combi bulk dispenser --- .../thermo_scientific/multidrop_combi/demo_multidrop.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename demo_multidrop.py => pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/demo_multidrop.py (100%) diff --git a/demo_multidrop.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/demo_multidrop.py similarity index 100% rename from demo_multidrop.py rename to pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/demo_multidrop.py From ec8b8732baf01dd57869968bb8e0d77b75a5960e Mon Sep 17 00:00:00 2001 From: Robert-Keyser-Calico Date: Wed, 25 Mar 2026 14:53:35 -0700 Subject: [PATCH 3/6] Remove unused imports in bulk_dispensers tests Co-Authored-By: Claude Opus 4.6 (1M context) --- .../thermo_scientific/multidrop_combi/tests/backend_tests.py | 1 - .../thermo_scientific/multidrop_combi/tests/commands_tests.py | 3 +-- .../multidrop_combi/tests/communication_tests.py | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/backend_tests.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/backend_tests.py index ed326265d1c..0a5b32c748d 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/backend_tests.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/backend_tests.py @@ -1,4 +1,3 @@ -import asyncio import unittest from unittest.mock import AsyncMock, MagicMock, patch diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/commands_tests.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/commands_tests.py index 40195331809..f029ba5e2a1 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/commands_tests.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/commands_tests.py @@ -1,6 +1,5 @@ -import asyncio import unittest -from unittest.mock import AsyncMock, MagicMock +from unittest.mock import AsyncMock from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.commands import ( MultidropCombiCommandsMixin, diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py index ebe398eb280..a44572c37de 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py @@ -1,6 +1,6 @@ import asyncio import unittest -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMock from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.communication import ( MultidropCombiCommunicationMixin, From 2ad7cebc33e289bd3e26db08ab8248850227f7a6 Mon Sep 17 00:00:00 2001 From: Robert-Keyser-Calico Date: Wed, 25 Mar 2026 14:56:01 -0700 Subject: [PATCH 4/6] Fix mypy type errors in bulk_dispensers - Add `from __future__ import annotations` to errors.py for X | Y syntax - Use explicit Serial() constructor calls instead of **dict unpacking - Type intermediate variables to avoid no-any-return errors in queries - Use plate.get_well("A1") instead of get_all_children() for proper typing - Type mock io as Any in communication tests Co-Authored-By: Claude Opus 4.6 (1M context) --- .../multidrop_combi/backend.py | 35 +++++++++++-------- .../multidrop_combi/errors.py | 3 ++ .../multidrop_combi/helpers.py | 2 +- .../multidrop_combi/queries.py | 9 +++-- .../tests/communication_tests.py | 35 ++++++++++--------- 5 files changed, 49 insertions(+), 35 deletions(-) diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/backend.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/backend.py index 64cf735fdd2..d75e123a71e 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/backend.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/backend.py @@ -58,22 +58,29 @@ async def setup(self) -> None: # When port is specified, skip VID/PID discovery (the Multidrop is often # connected via an RS232-to-USB adapter with a different VID/PID). - serial_kwargs = dict( - human_readable_device_name="Multidrop Combi", - baudrate=9600, - bytesize=8, - parity="N", - stopbits=1, - timeout=self.timeout, - write_timeout=5, - ) if self._port: - serial_kwargs["port"] = self._port + self.io = Serial( + human_readable_device_name="Multidrop Combi", + port=self._port, + baudrate=9600, + bytesize=8, + parity="N", + stopbits=1, + timeout=self.timeout, + write_timeout=5, + ) else: - serial_kwargs["vid"] = 0x0AB6 - serial_kwargs["pid"] = 0x0344 - - self.io = Serial(**serial_kwargs) + self.io = Serial( + human_readable_device_name="Multidrop Combi", + vid=0x0AB6, + pid=0x0344, + baudrate=9600, + bytesize=8, + parity="N", + stopbits=1, + timeout=self.timeout, + write_timeout=5, + ) await self.io.setup() # Enable XON/XOFF flow control on the underlying serial port diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/errors.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/errors.py index 83e751bd1f4..17971744e91 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/errors.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/errors.py @@ -1,3 +1,6 @@ +from __future__ import annotations + + class MultidropCombiError(Exception): """Base exception for Multidrop Combi errors.""" diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/helpers.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/helpers.py index cf7852c10a4..f42a4cd4744 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/helpers.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/helpers.py @@ -101,7 +101,7 @@ def plate_to_pla_params(plate: Plate) -> dict: height_hundredths = round(plate.get_size_z() * 100) # Get max_volume from first well - first_well = next(iter(plate.get_all_children())) + first_well = plate.get_well("A1") well_max_volume_tenths = round(first_well.max_volume * 10) # Validate against hardware limits diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/queries.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/queries.py index 23ba375f124..f628cf1f67c 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/queries.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/queries.py @@ -27,7 +27,8 @@ async def report_parameters(self) -> list[str]: Returns: List of parameter lines from the instrument. """ - return await self._send_command("REP", timeout=10.0) # type: ignore[attr-defined] + result: list[str] = await self._send_command("REP", timeout=10.0) # type: ignore[attr-defined] + return result async def read_error_log(self) -> list[str]: """Read the instrument error log (LOG command). @@ -35,7 +36,8 @@ async def read_error_log(self) -> list[str]: Returns: List of error log lines. """ - return await self._send_command("LOG", timeout=10.0) # type: ignore[attr-defined] + result: list[str] = await self._send_command("LOG", timeout=10.0) # type: ignore[attr-defined] + return result async def read_cassette_info(self) -> list[str]: """Read RFID cassette info (RIR command). @@ -43,4 +45,5 @@ async def read_cassette_info(self) -> list[str]: Returns: List of cassette info lines. """ - return await self._send_command("RIR", timeout=5.0) # type: ignore[attr-defined] + result: list[str] = await self._send_command("RIR", timeout=5.0) # type: ignore[attr-defined] + return result diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py index a44572c37de..6deefd54534 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py @@ -1,5 +1,6 @@ import asyncio import unittest +from typing import Any from unittest.mock import AsyncMock, MagicMock from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.communication import ( @@ -14,8 +15,8 @@ class MockCommunicationBackend(MultidropCombiCommunicationMixin): """Testable class that uses the communication mixin with a mock Serial.""" - def __init__(self): - self.io = MagicMock() + def __init__(self) -> None: + self.io: Any = MagicMock() self.io._ser = MagicMock() self.io._ser.timeout = 30.0 self._command_lock = asyncio.Lock() @@ -28,10 +29,10 @@ def __init__(self): class SendCommandTests(unittest.IsolatedAsyncioTestCase): - async def asyncSetUp(self): + async def asyncSetUp(self) -> None: self.backend = MockCommunicationBackend() - async def test_simple_command(self): + async def test_simple_command(self) -> None: """Test a simple command with echo + END response.""" self.backend.io.readline.side_effect = [ b"SPL\r\n", # echo @@ -41,7 +42,7 @@ async def test_simple_command(self): self.assertEqual(result, []) self.backend.io.write.assert_awaited_once_with(b"SPL 1\r") - async def test_command_with_data_lines(self): + async def test_command_with_data_lines(self) -> None: """Test a command that returns data lines between echo and END.""" self.backend.io.readline.side_effect = [ b"VER\r\n", @@ -51,7 +52,7 @@ async def test_command_with_data_lines(self): result = await self.backend._send_command("VER") self.assertEqual(result, ["MultidropCombi 2.00.29 836-4191"]) - async def test_command_with_error_status(self): + async def test_command_with_error_status(self) -> None: """Test that non-zero status raises MultidropCombiInstrumentError.""" self.backend.io.readline.side_effect = [ b"SPL\r\n", @@ -62,19 +63,19 @@ async def test_command_with_error_status(self): self.assertEqual(ctx.exception.status_code, 18) self.assertIn("Invalid plate type", ctx.exception.description) - async def test_timeout_raises_communication_error(self): + async def test_timeout_raises_communication_error(self) -> None: """Test that timeout (empty readline) raises MultidropCombiCommunicationError.""" self.backend.io.readline.side_effect = [b""] with self.assertRaises(MultidropCombiCommunicationError): await self.backend._send_command("SPL 1") - async def test_not_connected(self): + async def test_not_connected(self) -> None: """Test that sending a command when io is None raises error.""" self.backend.io = None with self.assertRaises(MultidropCombiCommunicationError): await self.backend._send_command("VER") - async def test_custom_timeout(self): + async def test_custom_timeout(self) -> None: """Test that custom timeout is set and restored.""" self.backend.io.readline.side_effect = [ b"POU\r\n", @@ -85,7 +86,7 @@ async def test_custom_timeout(self): # Timeout should be restored after command self.assertEqual(self.backend.io._ser.timeout, original) - async def test_echo_skipping_case_insensitive(self): + async def test_echo_skipping_case_insensitive(self) -> None: """Test that echo is skipped regardless of case.""" self.backend.io.readline.side_effect = [ b"ver\r\n", # lowercase echo @@ -97,10 +98,10 @@ async def test_echo_skipping_case_insensitive(self): class EnterRemoteModeTests(unittest.IsolatedAsyncioTestCase): - async def asyncSetUp(self): + async def asyncSetUp(self) -> None: self.backend = MockCommunicationBackend() - async def test_enter_remote_mode_success(self): + async def test_enter_remote_mode_success(self) -> None: """Test successful VER command parses instrument info.""" self.backend.io.readline.side_effect = [ b"VER\r\n", @@ -112,11 +113,11 @@ async def test_enter_remote_mode_success(self): self.assertEqual(info["firmware_version"], "2.00.29") self.assertEqual(info["serial_number"], "836-4191") - async def test_enter_remote_mode_retry_after_eak(self): + async def test_enter_remote_mode_retry_after_eak(self) -> None: """Test VER retry after EAK when first VER fails.""" call_count = 0 - async def readline_side_effect(): + async def readline_side_effect() -> bytes: nonlocal call_count call_count += 1 responses = [ @@ -141,10 +142,10 @@ async def readline_side_effect(): class DrainStaleDataTests(unittest.IsolatedAsyncioTestCase): - async def asyncSetUp(self): + async def asyncSetUp(self) -> None: self.backend = MockCommunicationBackend() - async def test_drain_with_stale_data(self): + async def test_drain_with_stale_data(self) -> None: """Test draining stale data from buffer.""" self.backend.io.readline.side_effect = [ b"stale line 1\r\n", @@ -155,7 +156,7 @@ async def test_drain_with_stale_data(self): self.backend.io.reset_input_buffer.assert_awaited_once() self.backend.io.reset_output_buffer.assert_awaited_once() - async def test_drain_empty_buffer(self): + async def test_drain_empty_buffer(self) -> None: """Test draining when buffer is already empty.""" self.backend.io.readline.side_effect = [b""] await self.backend._drain_stale_data() From 53a444c52e0df9bed9ab34e2e79f1a3e9c779e26 Mon Sep 17 00:00:00 2001 From: Robert-Keyser-Calico Date: Wed, 25 Mar 2026 15:06:54 -0700 Subject: [PATCH 5/6] Apply ruff formatting to bulk_dispensers Co-Authored-By: Claude Opus 4.6 (1M context) --- .../multidrop_combi/actions.py | 1 + .../multidrop_combi/backend.py | 1 + .../multidrop_combi/commands.py | 6 ++- .../multidrop_combi/communication.py | 4 +- .../multidrop_combi/demo_multidrop.py | 29 ++++++++----- .../multidrop_combi/errors.py | 5 ++- .../multidrop_combi/helpers.py | 41 ++++++++----------- .../multidrop_combi/queries.py | 1 + .../multidrop_combi/tests/backend_tests.py | 12 +++--- .../tests/communication_tests.py | 4 +- .../multidrop_combi/tests/helpers_tests.py | 15 ++++--- 11 files changed, 64 insertions(+), 55 deletions(-) diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/actions.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/actions.py index 74c56eec022..d0bc885d3bb 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/actions.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/actions.py @@ -1,4 +1,5 @@ """Control operations mixin for the Multidrop Combi.""" + from __future__ import annotations from typing import Optional diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/backend.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/backend.py index d75e123a71e..fe373c6f96d 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/backend.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/backend.py @@ -1,4 +1,5 @@ """Composed backend for the Thermo Scientific Multidrop Combi.""" + from __future__ import annotations import asyncio diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/commands.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/commands.py index 4c1b3765d12..7e4fd3de14a 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/commands.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/commands.py @@ -3,6 +3,7 @@ All volume parameters at the public interface are in microliters (float). Internally, volumes are converted to the instrument's native 1/10 uL units. """ + from __future__ import annotations from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi.enums import ( @@ -233,8 +234,9 @@ async def define_plate( timeout=COMMAND_TIMEOUTS["PLA"], ) - async def start_protocol(self, plate_type: int | None = None, - protocol_name: str | None = None) -> None: + async def start_protocol( + self, plate_type: int | None = None, protocol_name: str | None = None + ) -> None: """Start a protocol from instrument memory (BGN command). Args: diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/communication.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/communication.py index 85ef87f1448..c08de8f02ef 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/communication.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/communication.py @@ -3,6 +3,7 @@ Ported from the SiLA implementation's serial_transport.py to use pylabrobot's async Serial wrapper. """ + from __future__ import annotations import asyncio @@ -123,8 +124,7 @@ async def _send_command(self, cmd: str, timeout: float | None = None) -> list[st if status_code != STATUS_OK: desc = ERROR_DESCRIPTIONS.get(status_code, "Unknown error") - logger.error("Command %s failed (status %d). RX lines: %s", - cmd_code, status_code, lines) + logger.error("Command %s failed (status %d). RX lines: %s", cmd_code, status_code, lines) raise MultidropCombiInstrumentError(status_code, desc) # Return data lines: skip echo (first) and END line (last) diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/demo_multidrop.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/demo_multidrop.py index 28a3abc72a3..3b7b59d62c6 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/demo_multidrop.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/demo_multidrop.py @@ -24,6 +24,7 @@ def list_serial_ports(): """List available serial ports to help the user find the right one.""" try: import serial.tools.list_ports + ports = list(serial.tools.list_ports.comports()) if not ports: print(" No serial ports found.") @@ -71,8 +72,10 @@ async def main(): try: # Connection info info = backend.get_version() - print(f"Connected: {info['instrument_name']} " - f"FW {info['firmware_version']} SN {info['serial_number']}") + print( + f"Connected: {info['instrument_name']} " + f"FW {info['firmware_version']} SN {info['serial_number']}" + ) # --- Query instrument parameters --- print("\n--- Instrument Parameters ---") @@ -102,20 +105,26 @@ async def main(): print(f" No factory match, using remote plate definition: {pla_params}") await run_step("Define plate (PLA)", backend.define_plate(**pla_params)) - await run_step("Set cassette type (SCT)", dispenser.set_cassette_type( - cassette_type=CassetteType.STANDARD)) - await run_step("Set column volume 10 uL (SCV)", dispenser.set_column_volume( - column=0, volume=10.0)) + await run_step( + "Set cassette type (SCT)", dispenser.set_cassette_type(cassette_type=CassetteType.STANDARD) + ) + await run_step( + "Set column volume 10 uL (SCV)", dispenser.set_column_volume(column=0, volume=10.0) + ) # Dispensing height must be above the plate. Add 3mm clearance. dispense_height = round(plate.get_size_z() * 100) + 300 dispense_height = max(500, min(5500, dispense_height)) # clamp to valid range print(f" Dispensing height: {dispense_height} (plate {plate.get_size_z()}mm + 3mm clearance)") - await run_step(f"Set dispensing height {dispense_height} (SDH)", - dispenser.set_dispensing_height(height=dispense_height)) + await run_step( + f"Set dispensing height {dispense_height} (SDH)", + dispenser.set_dispensing_height(height=dispense_height), + ) await run_step("Set pump speed 50% (SPS)", dispenser.set_pump_speed(speed=50)) - await run_step("Set dispensing order row-wise (SDO)", dispenser.set_dispensing_order( - order=DispensingOrder.ROW_WISE)) + await run_step( + "Set dispensing order row-wise (SDO)", + dispenser.set_dispensing_order(order=DispensingOrder.ROW_WISE), + ) # --- Prime --- print("\n--- Prime ---") diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/errors.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/errors.py index 17971744e91..0284adaf58a 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/errors.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/errors.py @@ -8,8 +8,9 @@ class MultidropCombiError(Exception): class MultidropCombiCommunicationError(MultidropCombiError): """Serial communication failure (port not found, timeout, connection lost).""" - def __init__(self, message: str, operation: str = "", - original_error: Exception | None = None) -> None: + def __init__( + self, message: str, operation: str = "", original_error: Exception | None = None + ) -> None: self.operation = operation self.original_error = original_error super().__init__(message) diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/helpers.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/helpers.py index f42a4cd4744..f310107bbd0 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/helpers.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/helpers.py @@ -3,6 +3,7 @@ Maps PyLabRobot Plate resources to Multidrop Combi plate type indices and PLA (remote plate definition) command parameters. """ + from __future__ import annotations from pylabrobot.resources import Plate @@ -12,30 +13,30 @@ # Heights are upper bounds for selecting the best-fit factory type. _FACTORY_96_WELL_TYPES = [ # (type_index, max_height_mm) - (0, 18.0), # Type 0: 96-well, 15mm - (1, 30.0), # Type 1: 96-well, 22mm - (2, 55.0), # Type 2: 96-well, 44mm + (0, 18.0), # Type 0: 96-well, 15mm + (1, 30.0), # Type 1: 96-well, 22mm + (2, 55.0), # Type 2: 96-well, 44mm ] _FACTORY_384_WELL_TYPES = [ - (3, 8.5), # Type 3: 384-well, 7.5mm - (4, 12.0), # Type 4: 384-well, 10mm - (5, 18.0), # Type 5: 384-well, 15mm - (6, 30.0), # Type 6: 384-well, 22mm - (7, 55.0), # Type 7: 384-well, 44mm + (3, 8.5), # Type 3: 384-well, 7.5mm + (4, 12.0), # Type 4: 384-well, 10mm + (5, 18.0), # Type 5: 384-well, 15mm + (6, 30.0), # Type 6: 384-well, 22mm + (7, 55.0), # Type 7: 384-well, 44mm ] _FACTORY_1536_WELL_TYPES = [ - (8, 7.0), # Type 8: 1536-well, 5mm - (9, 55.0), # Type 9: 1536-well, 10.5mm + (8, 7.0), # Type 8: 1536-well, 5mm + (9, 55.0), # Type 9: 1536-well, 10.5mm ] # Hardware limits MAX_COLUMNS = 48 MAX_ROWS = 32 -MIN_HEIGHT_HUNDREDTHS_MM = 500 # 5mm -MAX_HEIGHT_HUNDREDTHS_MM = 5500 # 55mm -MAX_VOLUME_TENTHS_UL = 25000 # 2500 uL +MIN_HEIGHT_HUNDREDTHS_MM = 500 # 5mm +MAX_HEIGHT_HUNDREDTHS_MM = 5500 # 55mm +MAX_VOLUME_TENTHS_UL = 25000 # 2500 uL def plate_to_type_index(plate: Plate) -> int: @@ -106,22 +107,16 @@ def plate_to_pla_params(plate: Plate) -> dict: # Validate against hardware limits if columns > MAX_COLUMNS: - raise ValueError( - f"Plate has {columns} columns, but Multidrop supports at most {MAX_COLUMNS}." - ) + raise ValueError(f"Plate has {columns} columns, but Multidrop supports at most {MAX_COLUMNS}.") if rows > MAX_ROWS: - raise ValueError( - f"Plate has {rows} rows, but Multidrop supports at most {MAX_ROWS}." - ) + raise ValueError(f"Plate has {rows} rows, but Multidrop supports at most {MAX_ROWS}.") if height_hundredths < MIN_HEIGHT_HUNDREDTHS_MM: raise ValueError( - f"Plate height {plate.get_size_z()}mm is below minimum " - f"{MIN_HEIGHT_HUNDREDTHS_MM / 100}mm." + f"Plate height {plate.get_size_z()}mm is below minimum {MIN_HEIGHT_HUNDREDTHS_MM / 100}mm." ) if height_hundredths > MAX_HEIGHT_HUNDREDTHS_MM: raise ValueError( - f"Plate height {plate.get_size_z()}mm exceeds maximum " - f"{MAX_HEIGHT_HUNDREDTHS_MM / 100}mm." + f"Plate height {plate.get_size_z()}mm exceeds maximum {MAX_HEIGHT_HUNDREDTHS_MM / 100}mm." ) if well_max_volume_tenths > MAX_VOLUME_TENTHS_UL: raise ValueError( diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/queries.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/queries.py index f628cf1f67c..c40ce5d8e6b 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/queries.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/queries.py @@ -1,4 +1,5 @@ """Query operations mixin for the Multidrop Combi.""" + from __future__ import annotations diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/backend_tests.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/backend_tests.py index 0a5b32c748d..b9063ae1293 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/backend_tests.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/backend_tests.py @@ -37,12 +37,12 @@ async def test_setup_and_stop(self, MockSerial): # Setup readline responses: drain (empty), VER, EAK mock_serial.readline.side_effect = [ - b"", # drain - empty - b"VER\r\n", # VER echo - b"MultidropCombi 2.00.29 836-4191\r\n", # VER data - b"VER END 0\r\n", # VER end - b"EAK\r\n", # EAK echo - b"EAK END 0\r\n", # EAK end + b"", # drain - empty + b"VER\r\n", # VER echo + b"MultidropCombi 2.00.29 836-4191\r\n", # VER data + b"VER END 0\r\n", # VER end + b"EAK\r\n", # EAK echo + b"EAK END 0\r\n", # EAK end ] backend = MultidropCombiBackend(port="COM3") diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py index 6deefd54534..19e83cdae2f 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/communication_tests.py @@ -35,8 +35,8 @@ async def asyncSetUp(self) -> None: async def test_simple_command(self) -> None: """Test a simple command with echo + END response.""" self.backend.io.readline.side_effect = [ - b"SPL\r\n", # echo - b"SPL END 0\r\n", # end with status 0 + b"SPL\r\n", # echo + b"SPL END 0\r\n", # end with status 0 ] result = await self.backend._send_command("SPL 1") self.assertEqual(result, []) diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/helpers_tests.py b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/helpers_tests.py index daea19ddfaa..0aeb2cd37dd 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/helpers_tests.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/multidrop_combi/tests/helpers_tests.py @@ -102,11 +102,13 @@ class PlateToTypeIndexRealPlatesTests(unittest.TestCase): def test_corning_96_well(self): from pylabrobot.resources.corning.plates import Cor_96_wellplate_360ul_Fb + plate = Cor_96_wellplate_360ul_Fb("test") self.assertEqual(plate_to_type_index(plate), 0) # 14.2mm → type 0 def test_biorad_384_well(self): from pylabrobot.resources.biorad.plates import BioRad_384_wellplate_50uL_Vb + plate = BioRad_384_wellplate_50uL_Vb("test") self.assertEqual(plate_to_type_index(plate), 4) # 10.4mm → type 4 @@ -115,21 +117,17 @@ class PlateToPlaParamsTests(unittest.TestCase): """Test PLA command parameter generation.""" def test_96_well_params(self): - plate = _make_plate( - num_items_x=12, num_items_y=8, size_z=14.2, well_max_volume=360.0 - ) + plate = _make_plate(num_items_x=12, num_items_y=8, size_z=14.2, well_max_volume=360.0) params = plate_to_pla_params(plate) self.assertEqual(params["columns"], 12) self.assertEqual(params["rows"], 8) self.assertEqual(params["column_positions"], 12) self.assertEqual(params["row_positions"], 8) - self.assertEqual(params["height"], 1420) # 14.2mm * 100 - self.assertEqual(params["max_volume"], 3600) # 360uL * 10 + self.assertEqual(params["height"], 1420) # 14.2mm * 100 + self.assertEqual(params["max_volume"], 3600) # 360uL * 10 def test_384_well_params(self): - plate = _make_plate( - num_items_x=24, num_items_y=16, size_z=10.4, well_max_volume=50.0 - ) + plate = _make_plate(num_items_x=24, num_items_y=16, size_z=10.4, well_max_volume=50.0) params = plate_to_pla_params(plate) self.assertEqual(params["columns"], 24) self.assertEqual(params["rows"], 16) @@ -138,6 +136,7 @@ def test_384_well_params(self): def test_real_corning_96_well(self): from pylabrobot.resources.corning.plates import Cor_96_wellplate_360ul_Fb + plate = Cor_96_wellplate_360ul_Fb("test") params = plate_to_pla_params(plate) self.assertEqual(params["columns"], 12) From 101208c82e11253bf86cda159b6e01f46a2a6c41 Mon Sep 17 00:00:00 2001 From: Robert-Keyser-Calico Date: Wed, 25 Mar 2026 15:20:49 -0700 Subject: [PATCH 6/6] Sort imports in thermo_scientific __init__.py Co-Authored-By: Claude Opus 4.6 (1M context) --- pylabrobot/bulk_dispensers/thermo_scientific/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pylabrobot/bulk_dispensers/thermo_scientific/__init__.py b/pylabrobot/bulk_dispensers/thermo_scientific/__init__.py index f7aff58e4b3..96f12e46471 100644 --- a/pylabrobot/bulk_dispensers/thermo_scientific/__init__.py +++ b/pylabrobot/bulk_dispensers/thermo_scientific/__init__.py @@ -1,11 +1,11 @@ from pylabrobot.bulk_dispensers.thermo_scientific.multidrop_combi import ( + CassetteType, + DispensingOrder, + EmptyMode, MultidropCombiBackend, MultidropCombiCommunicationError, MultidropCombiError, MultidropCombiInstrumentError, - CassetteType, - DispensingOrder, - EmptyMode, PrimeMode, plate_to_pla_params, plate_to_type_index,