Skip to content
1 change: 1 addition & 0 deletions pylabrobot/agrowpumps/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .agrowdosepump_backend import AgrowChannelBackend, AgrowDosePumpArray, AgrowDriver
206 changes: 206 additions & 0 deletions pylabrobot/agrowpumps/agrowdosepump_backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
import asyncio
import logging
import threading
import time
from typing import Dict, List, Optional, Union

try:
from pymodbus.client import AsyncModbusSerialClient # type: ignore

_MODBUS_IMPORT_ERROR = None
except ImportError as e:
AsyncModbusSerialClient = None # type: ignore
_MODBUS_IMPORT_ERROR = e

from pylabrobot.capabilities.capability import Capability
from pylabrobot.capabilities.pumping.backend import PumpBackend
from pylabrobot.capabilities.pumping.calibration import PumpCalibration
from pylabrobot.capabilities.pumping.pumping import PumpingCapability
from pylabrobot.device import Device, Driver

logger = logging.getLogger("pylabrobot")


class AgrowDriver(Driver):
"""Modbus driver for Agrow dose pump arrays."""

def __init__(self, port: str, address: Union[int, str]):
super().__init__()
if _MODBUS_IMPORT_ERROR is not None:
raise RuntimeError(
"pymodbus is not installed. Install with: pip install pylabrobot[modbus]. "
f"Import error: {_MODBUS_IMPORT_ERROR}"
)
if not isinstance(port, str):
raise ValueError("Port must be a string")
self.port = port
if address not in range(0, 256):
raise ValueError("Pump address out of range")
self.address = int(address)
self._keep_alive_thread: Optional[threading.Thread] = None
self._pump_index_to_address: Optional[Dict[int, int]] = None
self._modbus: Optional["AsyncModbusSerialClient"] = None
self._num_channels: Optional[int] = None
self._keep_alive_thread_active = False

@property
def modbus(self) -> "AsyncModbusSerialClient":
if self._modbus is None:
raise RuntimeError("Modbus connection not established")
return self._modbus

@property
def pump_index_to_address(self) -> Dict[int, int]:
if self._pump_index_to_address is None:
raise RuntimeError("Pump mappings not established")
return self._pump_index_to_address

@property
def num_channels(self) -> int:
if self._num_channels is None:
raise RuntimeError("Number of channels not established")
return self._num_channels

def _start_keep_alive_thread(self):
async def keep_alive():
i = 0
while self._keep_alive_thread_active:
time.sleep(0.1)
i += 1
if i == 250:
await self.modbus.read_holding_registers(0, 1, unit=self.address)
i = 0

def manage_async_keep_alive():
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(keep_alive())
loop.close()
except Exception as e:
logger.error("Error in keep alive thread: %s", e)

self._keep_alive_thread_active = True
self._keep_alive_thread = threading.Thread(target=manage_async_keep_alive, daemon=True)
self._keep_alive_thread.start()

async def setup(self):
await self._setup_modbus()
register_return = await self.modbus.read_holding_registers(19, 2, unit=self.address)
self._num_channels = int(
"".join(chr(r // 256) + chr(r % 256) for r in register_return.registers)[2]
)
self._start_keep_alive_thread()
self._pump_index_to_address = {pump: pump + 100 for pump in range(0, self.num_channels)}

async def _setup_modbus(self):
if AsyncModbusSerialClient is None:
raise RuntimeError(
"pymodbus is not installed. Install with: pip install pylabrobot[modbus]."
f" Import error: {_MODBUS_IMPORT_ERROR}"
)
self._modbus = AsyncModbusSerialClient(
port=self.port,
baudrate=115200,
timeout=1,
stopbits=1,
bytesize=8,
parity="E",
retry_on_empty=True,
)
await self.modbus.connect()
if not self.modbus.connected:
raise ConnectionError("Modbus connection failed during pump setup")

async def stop(self):
for pump in self.pump_index_to_address:
await self.write_speed(pump, 0)
if self._keep_alive_thread is not None:
self._keep_alive_thread_active = False
self._keep_alive_thread.join()
self.modbus.close()
assert not self.modbus.connected, "Modbus failing to disconnect"

async def write_speed(self, channel: int, speed: int):
if speed not in range(101):
raise ValueError("Pump speed out of range. Value should be between 0 and 100.")
await self.modbus.write_register(
self.pump_index_to_address[channel],
speed,
unit=self.address,
)


class AgrowChannelBackend(PumpBackend):
"""Per-channel PumpBackend adapter that delegates to a shared AgrowDriver."""

def __init__(self, connection: AgrowDriver, channel: int):
self._driver = connection
self._channel = channel

async def run_revolutions(self, num_revolutions: float):
raise NotImplementedError(
"Revolution based pumping commands are not available for Agrow pumps."
)

async def run_continuously(self, speed: float):
await self._driver.write_speed(self._channel, int(speed))

async def halt(self):
await self._driver.write_speed(self._channel, 0)

def serialize(self):
return {
"port": self._driver.port,
"address": self._driver.address,
"channel": self._channel,
}


class AgrowDosePumpArray(Device):
"""Agrow dose pump array device.

Exposes each channel as an individual PumpingCapability via `self.pumps`.
"""

def __init__(
self,
port: str,
address: Union[int, str],
calibrations: Optional[List[Optional[PumpCalibration]]] = None,
):
self._channel_backends: List[AgrowChannelBackend] = []
self.pumps: List[PumpingCapability] = []
self._calibrations = calibrations
super().__init__(driver=AgrowDriver(port=port, address=address))
self._driver: AgrowDriver

async def setup(self):
await self._driver.setup()
num_channels = self._driver.num_channels

self._channel_backends = [AgrowChannelBackend(self._driver, ch) for ch in range(num_channels)]
self.pumps = []
for i, backend in enumerate(self._channel_backends):
cal = None
if self._calibrations is not None and i < len(self._calibrations):
cal = self._calibrations[i]
cap = PumpingCapability(backend=backend, calibration=cal)
self.pumps.append(cap)

self._capabilities: List[Capability] = list(self.pumps)
for c in self._capabilities:
await c._on_setup()
self._setup_finished = True

async def stop(self):
for cap in reversed(self._capabilities):
await cap._on_stop()
await self._driver.stop()
self._setup_finished = False

def serialize(self):
return {
"port": self._driver.port,
"address": self._driver.address,
}
76 changes: 76 additions & 0 deletions pylabrobot/agrowpumps/agrowdosepump_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# mypy: disable-error-code="attr-defined,assignment"
import unittest
from unittest.mock import AsyncMock, patch

import pytest

pytest.importorskip("pymodbus")

from pylabrobot.agrowpumps import AgrowDosePumpArray


class SimulatedModbusClient:
"""Duck-typed modbus client for testing."""

def __init__(self):
self._connected = False
self.write_register = AsyncMock()

async def connect(self):
self._connected = True

@property
def connected(self):
return self._connected

async def read_holding_registers(self, address: int, count: int, **kwargs):
if "unit" not in kwargs:
raise ValueError("unit must be specified")
if address == 19:
result = AsyncMock()
result.registers = [16708, 13824, 0, 0, 0, 0, 0][:count]
return result

def close(self, reconnect=False):
self._connected = False


class TestAgrowPumps(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
self.device = AgrowDosePumpArray(port="simulated", address=1)

async def _mock_setup_modbus():
self.device._driver._modbus = SimulatedModbusClient()

with patch.object(self.device._driver, "_setup_modbus", _mock_setup_modbus):
await self.device.setup()

async def asyncTearDown(self):
await self.device.stop()

async def test_setup(self):
self.assertEqual(self.device._driver.port, "simulated")
self.assertEqual(self.device._driver.address, 1)
self.assertEqual(len(self.device.pumps), 6)
self.assertEqual(
self.device._driver._pump_index_to_address,
{pump: pump + 100 for pump in range(0, 6)},
)

async def test_run_continuously(self):
self.device._driver.modbus.write_register.reset_mock()
await self.device.pumps[0].run_continuously(speed=1)
self.device._driver.modbus.write_register.assert_called_once_with(100, 1, unit=1)

# invalid speed: cannot be bigger than 100
with self.assertRaises(ValueError):
await self.device.pumps[0].run_continuously(speed=101)

async def test_run_revolutions(self):
with self.assertRaises(NotImplementedError):
await self.device.pumps[0].run_revolutions(num_revolutions=1.0)

async def test_halt_single_channel(self):
self.device._driver.modbus.write_register.reset_mock()
await self.device.pumps[2].halt()
self.device._driver.modbus.write_register.assert_called_once_with(102, 0, unit=1)
5 changes: 5 additions & 0 deletions pylabrobot/capabilities/pumping/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .backend import PumpBackend
from .calibration import PumpCalibration
from .chatterbox import PumpChatterboxBackend
from .errors import NotCalibratedError
from .pumping import PumpingCapability
19 changes: 19 additions & 0 deletions pylabrobot/capabilities/pumping/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from abc import ABCMeta, abstractmethod

from pylabrobot.capabilities.capability import CapabilityBackend


class PumpBackend(CapabilityBackend, metaclass=ABCMeta):
"""Abstract backend for a single pump."""

@abstractmethod
async def run_revolutions(self, num_revolutions: float):
"""Run for a given number of revolutions."""

@abstractmethod
async def run_continuously(self, speed: float):
"""Run continuously at a given speed. If speed is 0, halt."""

@abstractmethod
async def halt(self):
"""Halt the pump."""
Loading
Loading