Add pumping capability and migrate vendor backends#958
Add pumping capability and migrate vendor backends#958rickwierenga wants to merge 6 commits intov1b1from
Conversation
8927a8f to
b3fb9f7
Compare
- PumpingCapability with calibration, chatterbox, tests - AgrowDriver (extends Driver), AgrowDosePumpArray device - Cole Parmer Masterflex backend - PumpBackend extends CapabilityBackend Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Follow CLARIOstar pattern: Driver owns connection, Backend owns capability operations. Remove stale setup/stop from capability backends. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Legacy Pump.run_revolutions, run_continuously, halt were not awaiting the backend coroutines. Fixed + updated test mock accordingly. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Adapter pattern: _PumpAdapter and _ChannelAdapter wrap legacy backends into new CapabilityBackend interface. Frontends delegate to PumpingCapability — no duplicated orchestration logic. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR introduces a new PumpingCapability under the capabilities architecture and migrates pump integrations (Cole Parmer Masterflex + Agrow dose pumps) from legacy backends into first-class Device implementations, while keeping legacy adapters in place.
Changes:
- Added a new pumping capability module (
PumpBackend,PumpingCapability, calibration helpers, chatterbox backend) with unit tests. - Added new device/driver-based implementations for Cole Parmer Masterflex and Agrow dose pump arrays.
- Updated legacy
Pump/PumpArrayand legacy pump backends to adapt to the new pumping capability architecture.
Reviewed changes
Copilot reviewed 18 out of 18 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| pylabrobot/legacy/pumps/pumparray.py | Adapts legacy pump arrays to expose per-channel PumpingCapability and routes operations through it. |
| pylabrobot/legacy/pumps/pump_tests.py | Updates legacy pump tests to align with the new capability-based wiring. |
| pylabrobot/legacy/pumps/pump.py | Wraps a legacy PumpBackend with a PumpingCapability adapter. |
| pylabrobot/legacy/pumps/cole_parmer/masterflex_backend.py | Replaces legacy Masterflex implementation with a delegating wrapper to the new backend/driver. |
| pylabrobot/legacy/pumps/agrowpumps/agrowdosepump_tests.py | Adjusts legacy Agrow tests to the new driver-based implementation while keeping legacy entrypoints. |
| pylabrobot/legacy/pumps/agrowpumps/agrowdosepump_backend.py | Replaces legacy Agrow backend internals with a wrapper over the new AgrowDriver + channel backends. |
| pylabrobot/cole_parmer/masterflex_backend.py | Adds new MasterflexDriver, MasterflexBackend, and MasterflexPump device. |
| pylabrobot/cole_parmer/init.py | Exports the new Cole Parmer Masterflex classes. |
| pylabrobot/capabilities/pumping/pumping_tests.py | Adds unit tests for the new PumpingCapability. |
| pylabrobot/capabilities/pumping/pumping.py | Implements PumpingCapability behavior and lifecycle hooks. |
| pylabrobot/capabilities/pumping/errors.py | Introduces a pumping-specific calibration error type. |
| pylabrobot/capabilities/pumping/chatterbox.py | Adds a pumping chatterbox backend for device-free usage. |
| pylabrobot/capabilities/pumping/calibration.py | Adds a new calibration model + loaders for pumping. |
| pylabrobot/capabilities/pumping/backend.py | Defines the capability backend ABC for pumping. |
| pylabrobot/capabilities/pumping/init.py | Re-exports pumping capability module public API. |
| pylabrobot/agrowpumps/agrowdosepump_tests.py | Adds tests for the new Agrow device implementation. |
| pylabrobot/agrowpumps/agrowdosepump_backend.py | Adds new AgrowDriver, AgrowChannelBackend, and AgrowDosePumpArray device. |
| pylabrobot/agrowpumps/init.py | Exports the new Agrow pump classes. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
|
||
| def setUp(self): | ||
| self.mock_backend = Mock(spec=PumpBackend) | ||
| self.mock_backend = AsyncMock() |
There was a problem hiding this comment.
TestPump.setUp() switched to a bare AsyncMock() which drops interface checking and also doesn't match the legacy PumpBackend contract (where run_revolutions/run_continuously/halt are synchronous methods). Once _PumpAdapter is fixed to handle legacy sync methods, this test should use a Mock(spec=PumpBackend) (and configure setup/stop as AsyncMocks) so it accurately models real legacy backends and catches API mismatches.
| self.mock_backend = AsyncMock() | |
| # Use a spec'd Mock so we enforce the PumpBackend interface and keep | |
| # run_revolutions/run_continuously/halt as synchronous methods. | |
| self.mock_backend = Mock(spec=PumpBackend) | |
| # setup and stop are asynchronous in the adapter, so model them as AsyncMocks. | |
| self.mock_backend.setup = AsyncMock() | |
| self.mock_backend.stop = AsyncMock() |
| await self.backend.run_revolutions(num_revolutions=num_revolutions, use_channels=use_channels) | ||
| channels = self._normalize_channels(use_channels) | ||
| if isinstance(num_revolutions, (float, int)): | ||
| num_revolutions = [float(num_revolutions)] * len(channels) |
There was a problem hiding this comment.
run_revolutions() zips channels with num_revolutions without validating list length. If a list is provided with a different length than use_channels, extra values will be silently ignored or channels will be skipped. Add an explicit length check (similar to _normalize_speeds) and normalize values to floats before iterating.
| num_revolutions = [float(num_revolutions)] * len(channels) | |
| num_revolutions = [float(num_revolutions)] * len(channels) | |
| else: | |
| if len(num_revolutions) != len(channels): | |
| raise ValueError("num_revolutions and use_channels must be the same length.") | |
| num_revolutions = [float(rev) for rev in num_revolutions] |
| if self.calibration is None: | ||
| raise TypeError( | ||
| "Pump is not calibrated. Volume based pumping and related functions unavailable." | ||
| ) |
There was a problem hiding this comment.
pumping/errors.py defines NotCalibratedError, but PumpingCapability.pump_volume() raises TypeError when calibration is missing. Exception type is part of the public API and this currently diverges from the legacy pumping APIs (which raise NotCalibratedError). Consider raising NotCalibratedError here (and updating the tests accordingly) or removing the unused error type/export.
| from .backend import PumpBackend | ||
|
|
||
|
|
||
| class PumpChatterboxBackend(PumpBackend): | ||
| """Chatterbox backend for device-free testing.""" | ||
|
|
||
| async def setup(self): | ||
| print("Setting up the pump.") | ||
|
|
||
| async def stop(self): | ||
| print("Stopping the pump.") | ||
|
|
||
| async def run_revolutions(self, num_revolutions: float): | ||
| print(f"Running {num_revolutions} revolutions.") | ||
|
|
||
| async def run_continuously(self, speed: float): | ||
| print(f"Running continuously at speed {speed}.") | ||
|
|
||
| async def halt(self): | ||
| print("Halting the pump.") |
There was a problem hiding this comment.
This chatterbox backend prints directly to stdout on every call. In the rest of the capabilities package, chatterbox/mock backends typically avoid printing and instead return dummy data (or use logging) to keep tests and library usage quiet. Consider switching to logging or making these methods no-ops to avoid noisy output when used in unit tests or notebooks.
| from .backend import PumpBackend | |
| class PumpChatterboxBackend(PumpBackend): | |
| """Chatterbox backend for device-free testing.""" | |
| async def setup(self): | |
| print("Setting up the pump.") | |
| async def stop(self): | |
| print("Stopping the pump.") | |
| async def run_revolutions(self, num_revolutions: float): | |
| print(f"Running {num_revolutions} revolutions.") | |
| async def run_continuously(self, speed: float): | |
| print(f"Running continuously at speed {speed}.") | |
| async def halt(self): | |
| print("Halting the pump.") | |
| import logging | |
| from .backend import PumpBackend | |
| logger = logging.getLogger(__name__) | |
| class PumpChatterboxBackend(PumpBackend): | |
| """Chatterbox backend for device-free testing.""" | |
| async def setup(self): | |
| logger.info("Setting up the pump.") | |
| async def stop(self): | |
| logger.info("Stopping the pump.") | |
| async def run_revolutions(self, num_revolutions: float): | |
| logger.info(f"Running {num_revolutions} revolutions.") | |
| async def run_continuously(self, speed: float): | |
| logger.info(f"Running continuously at speed {speed}.") | |
| async def halt(self): | |
| logger.info("Halting the pump.") |
|
|
||
| async def stop(self): | ||
| await self.io.stop() | ||
| return self._driver.serialize() |
There was a problem hiding this comment.
MasterflexBackend.serialize() now returns self._driver.serialize(), which sets type to MasterflexDriver. Since this class is a legacy MachineBackend, MachineBackend.deserialize() expects the serialized type to be a MachineBackend subclass name (e.g., MasterflexBackend). Returning the driver serialization will break deserialization of legacy Pump/Machine objects. Keep the legacy backend serialization shape (include super().serialize() and com_port) or otherwise ensure the type remains MasterflexBackend.
| return self._driver.serialize() | |
| data = self._driver.serialize() | |
| # Ensure legacy deserialization sees this as a MasterflexBackend, | |
| # not as the underlying MasterflexDriver. | |
| data["type"] = type(self).__name__ | |
| return data |
| async def send_command(self, command: str): | ||
| command = "\x02P02" + command + "\x0d" | ||
| await self.io.write(command.encode()) | ||
| return self.io.read() |
There was a problem hiding this comment.
MasterflexDriver.send_command() returns self.io.read() without awaiting it. Since Serial.read() is async, this causes send_command() to return a coroutine object rather than the bytes response (and callers that await send_command() will receive that coroutine instead of data). Update this to await the read (or otherwise ensure the response is awaited) before returning.
| return self.io.read() | |
| return await self.io.read() |
| async def run_revolutions(self, num_revolutions: float): | ||
| await self._legacy.run_revolutions(num_revolutions=num_revolutions) | ||
|
|
||
| async def run_continuously(self, speed: float): | ||
| await self._legacy.run_continuously(speed=speed) | ||
|
|
||
| async def halt(self): | ||
| await self._legacy.halt() |
There was a problem hiding this comment.
_PumpAdapter is awaiting legacy.run_revolutions/run_continuously/halt, but the legacy PumpBackend interface defines these methods as synchronous (e.g., PumpChatterboxBackend implements them as normal def). Awaiting a non-coroutine will raise at runtime. Consider calling the legacy methods directly, or supporting both sync/async by checking inspect.isawaitable() and only awaiting when needed.
| if any(ch not in range(0, self.num_channels) for ch in use_channels): | ||
| raise ValueError( | ||
| f"Pump address out of range for this pump array. " | ||
| f"Value should be between 0 and {self.num_channels}" |
There was a problem hiding this comment.
The out-of-range message says the channel should be between 0 and {self.num_channels}, but valid indices are 0..self.num_channels-1. Adjust the message to avoid off-by-one confusion for users.
| f"Value should be between 0 and {self.num_channels}" | |
| f"Value should be between 0 and {self.num_channels - 1}" |
| """ | ||
|
|
||
| if any(value <= 0 for value in calibration): | ||
| raise ValueError("A value in the calibration is is outside expected parameters.") |
There was a problem hiding this comment.
Typo in this error message: duplicated "is".
| raise ValueError("A value in the calibration is is outside expected parameters.") | |
| raise ValueError("A value in the calibration is outside expected parameters.") |
Legacy PumpCalibration now re-exports from new module to avoid type mismatch when passing calibration to PumpingCapability. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
PumpingCapabilityunder the capability composition architecture withPumpBackend, calibration, and chatterboxpylabrobot/cole_parmer/as aDevicewith a singlePumpingCapabilitypylabrobot/agrowpumps/— models each channel as its ownPumpingCapabilitybacked by a per-channel adapter (AgrowChannelBackend) over a sharedAgrowDriverPumpArrayCapability— pump arrays are justList[PumpingCapability]Test plan
pytest pylabrobot/capabilities/pumping/pumping_tests.py— 10 tests passpytest pylabrobot/agrowpumps/agrowdosepump_tests.py— 4 tests pass🤖 Generated with Claude Code