From 34d24f9c735de191fa48c93ac834b5e6982a5f3a Mon Sep 17 00:00:00 2001 From: Rick Wierenga Date: Thu, 26 Mar 2026 22:18:24 -0700 Subject: [PATCH] Add liquid handling capabilities Co-Authored-By: Claude Opus 4.6 (1M context) --- .../capabilities/liquid_handling/__init__.py | 18 + .../capabilities/liquid_handling/errors.py | 26 + .../capabilities/liquid_handling/head96.py | 568 ++++++++++ .../liquid_handling/head96_backend.py | 45 + .../capabilities/liquid_handling/pip.py | 817 +++++++++++++ .../liquid_handling/pip_backend.py | 76 ++ .../capabilities/liquid_handling/standard.py | 149 +++ .../capabilities/liquid_handling/utils.py | 77 ++ .../legacy/liquid_handling/liquid_handler.py | 1007 ++++++----------- 9 files changed, 2114 insertions(+), 669 deletions(-) create mode 100644 pylabrobot/capabilities/liquid_handling/__init__.py create mode 100644 pylabrobot/capabilities/liquid_handling/errors.py create mode 100644 pylabrobot/capabilities/liquid_handling/head96.py create mode 100644 pylabrobot/capabilities/liquid_handling/head96_backend.py create mode 100644 pylabrobot/capabilities/liquid_handling/pip.py create mode 100644 pylabrobot/capabilities/liquid_handling/pip_backend.py create mode 100644 pylabrobot/capabilities/liquid_handling/standard.py create mode 100644 pylabrobot/capabilities/liquid_handling/utils.py diff --git a/pylabrobot/capabilities/liquid_handling/__init__.py b/pylabrobot/capabilities/liquid_handling/__init__.py new file mode 100644 index 00000000000..fb6209587e7 --- /dev/null +++ b/pylabrobot/capabilities/liquid_handling/__init__.py @@ -0,0 +1,18 @@ +from .errors import ChannelizedError, NoChannelError +from .head96 import Head96Capability +from .head96_backend import Head96Backend +from .pip import PIP +from .pip_backend import PIPBackend +from .standard import ( + Aspiration, + Dispense, + DropTipRack, + Mix, + MultiHeadAspirationContainer, + MultiHeadAspirationPlate, + MultiHeadDispenseContainer, + MultiHeadDispensePlate, + Pickup, + PickupTipRack, + TipDrop, +) diff --git a/pylabrobot/capabilities/liquid_handling/errors.py b/pylabrobot/capabilities/liquid_handling/errors.py new file mode 100644 index 00000000000..4a8283fd0d6 --- /dev/null +++ b/pylabrobot/capabilities/liquid_handling/errors.py @@ -0,0 +1,26 @@ +"""Errors for liquid handling operations.""" + +from typing import Dict + + +class NoChannelError(Exception): + """Raised when no channel is available.""" + + +class BlowOutVolumeError(Exception): + """Raised when blow-out air volume is invalid.""" + + +class ChannelizedError(Exception): + """Raised by multi-channel operations. Contains per-channel errors.""" + + def __init__(self, errors: Dict[int, Exception], **kwargs): + self.errors = errors + self.kwargs = kwargs + + def __str__(self) -> str: + kwarg_string = ", ".join([f"{k}={v}" for k, v in self.kwargs.items()]) + return f"ChannelizedError(errors={self.errors}, {kwarg_string})" + + def __len__(self) -> int: + return len(self.errors) diff --git a/pylabrobot/capabilities/liquid_handling/head96.py b/pylabrobot/capabilities/liquid_handling/head96.py new file mode 100644 index 00000000000..f0c38de29e5 --- /dev/null +++ b/pylabrobot/capabilities/liquid_handling/head96.py @@ -0,0 +1,568 @@ +"""Capability for 96-head liquid handling.""" + +import logging +from typing import Dict, List, Optional, Sequence, Union, cast + +from pylabrobot.capabilities.capability import BackendParams, Capability, need_capability_ready +from pylabrobot.resources import ( + Container, + Coordinate, + Plate, + Tip, + TipRack, + TipTracker, + Trash, + Well, + does_tip_tracking, + does_volume_tracking, +) + +from .head96_backend import Head96Backend +from .standard import ( + DropTipRack, + Mix, + MultiHeadAspirationContainer, + MultiHeadAspirationPlate, + MultiHeadDispenseContainer, + MultiHeadDispensePlate, + PickupTipRack, +) + +logger = logging.getLogger("pylabrobot") + + +class Head96Capability(Capability): + """96-head liquid handling: pick up tips, aspirate, dispense, drop tips. + + Faithfully ports the 96-head logic from the legacy LiquidHandler, including + tip tracking with commit/rollback, volume tracking, partial tip pickup, + single-container (trough) support, and convenience methods. + """ + + def __init__(self, backend: Head96Backend, default_offset: Coordinate = Coordinate.zero()): + super().__init__(backend=backend) + self.backend: Head96Backend = backend + self.head: Dict[int, TipTracker] = {} + self.default_offset: Coordinate = default_offset + + async def _on_setup(self): + await super()._on_setup() + self.head = {c: TipTracker(thing=f"96Head Channel {c}") for c in range(96)} + + def get_mounted_tips(self) -> List[Optional[Tip]]: + """Get the tips currently mounted on the 96-head. + + Returns: + A list of 96 tips, or None for channels without a tip. + """ + return [tracker.get_tip() if tracker.has_tip else None for tracker in self.head.values()] + + def update_head_state(self, state: Dict[int, Optional[Tip]]): + """Update the state of the 96-head. + + All keys must be valid channels (0-95). Channels not in `state` keep their current state. + """ + if not set(state.keys()).issubset(set(self.head.keys())): + raise ValueError("Invalid channel.") + for channel, tip in state.items(): + if tip is None: + if self.head[channel].has_tip: + self.head[channel].remove_tip() + else: + if self.head[channel].has_tip: + self.head[channel].remove_tip() + self.head[channel].add_tip(tip) + + def clear_head_state(self): + """Clear all tips from the 96-head.""" + self.update_head_state({c: None for c in self.head.keys()}) + + def serialize_state(self) -> Dict: + """Serialize the 96-head state for saving/restoring.""" + return {channel: tracker.serialize() for channel, tracker in self.head.items()} + + def load_state(self, state: Dict): + """Load 96-head state from a serialized dict.""" + for channel, tracker_state in state.items(): + self.head[channel].load_state(tracker_state) + + def _get_origin_tip_rack(self) -> Optional[TipRack]: + """Get the tip rack where the 96-head tips were picked up from. + + Returns None if no tips are mounted. Raises if tips are from different racks. + """ + tip_spot = self.head[0].get_tip_origin() + if tip_spot is None: + return None + tip_rack = tip_spot.parent + if tip_rack is None: + raise RuntimeError("No tip rack found for tip") + for i in range(tip_rack.num_items): + other_tip_spot = self.head[i].get_tip_origin() + if other_tip_spot is None: + raise RuntimeError("Not all channels have a tip origin") + if other_tip_spot.parent != tip_rack: + raise RuntimeError("All tips must be from the same tip rack") + return tip_rack + + @staticmethod + def _check_96_head_fits_in_container(container: Container) -> bool: + """Check if the 96 head can fit in the given container.""" + tip_width = 2 # approximation + distance_between_tips = 9 + return ( + container.get_absolute_size_x() >= tip_width + distance_between_tips * 11 + and container.get_absolute_size_y() >= tip_width + distance_between_tips * 7 + ) + + @need_capability_ready + async def pick_up_tips( + self, + tip_rack: TipRack, + offset: Coordinate = Coordinate.zero(), + backend_params: Optional[BackendParams] = None, + ): + """Pick up tips from a 96-tip rack. + + Not all tip spots need to have tips — only those with tips will be picked up. + + Examples: + >>> await head96.pick_up_tips(my_tiprack) + + Args: + tip_rack: The tip rack to pick up from. Must have 96 positions. + offset: Additional offset (added to default_offset). + backend_params: Vendor-specific parameters. + """ + + offset = self.default_offset + offset + + if not isinstance(tip_rack, TipRack): + raise TypeError(f"Resource must be a TipRack, got {tip_rack}") + if tip_rack.num_items != 96: + raise ValueError("Tip rack must have 96 tips") + + # queue operation on all tip trackers + tips: List[Optional[Tip]] = [] + for i, tip_spot in enumerate(tip_rack.get_all_items()): + if not does_tip_tracking() and self.head[i].has_tip: + self.head[i].remove_tip() + # only add tips where one is present + if tip_spot.has_tip(): + self.head[i].add_tip(tip_spot.get_tip(), origin=tip_spot, commit=False) + tips.append(tip_spot.get_tip()) + else: + tips.append(None) + if does_tip_tracking() and not tip_spot.tracker.is_disabled and tip_spot.has_tip(): + tip_spot.tracker.remove_tip() + + pickup_operation = PickupTipRack(resource=tip_rack, offset=offset, tips=tips) + try: + await self.backend.pick_up_tips96(pickup=pickup_operation, backend_params=backend_params) + except Exception as error: + for i, tip_spot in enumerate(tip_rack.get_all_items()): + if does_tip_tracking() and not tip_spot.tracker.is_disabled: + tip_spot.tracker.rollback() + self.head[i].rollback() + raise error + else: + for i, tip_spot in enumerate(tip_rack.get_all_items()): + if does_tip_tracking() and not tip_spot.tracker.is_disabled: + tip_spot.tracker.commit() + self.head[i].commit() + + @need_capability_ready + async def drop_tips( + self, + resource: Union[TipRack, Trash], + offset: Coordinate = Coordinate.zero(), + allow_nonzero_volume: bool = False, + backend_params: Optional[BackendParams] = None, + ): + """Drop tips using the 96-head. + + Examples: + >>> await head96.drop_tips(my_tiprack) + >>> await head96.drop_tips(trash) + + Args: + resource: The tip rack or trash to drop tips to. + offset: Additional offset (added to default_offset). + allow_nonzero_volume: If True, drop even if tips have liquid. + backend_params: Vendor-specific parameters. + """ + + offset = self.default_offset + offset + + if not isinstance(resource, (TipRack, Trash)): + raise TypeError(f"Resource must be a TipRack or Trash, got {resource}") + if isinstance(resource, TipRack) and resource.num_items != 96: + raise ValueError("Tip rack must have 96 tips") + + # queue operation on all tip trackers + for i in range(96): + if not self.head[i].has_tip: + continue + tip = self.head[i].get_tip() + if tip.tracker.get_used_volume() > 0 and not allow_nonzero_volume and does_volume_tracking(): + raise RuntimeError( + f"Cannot drop tip with volume {tip.tracker.get_used_volume()} on channel {i}" + ) + if isinstance(resource, TipRack): + tip_spot = resource.get_item(i) + if does_tip_tracking() and not tip_spot.tracker.is_disabled: + tip_spot.tracker.add_tip(tip, commit=False) + self.head[i].remove_tip() + + drop_operation = DropTipRack(resource=resource, offset=offset) + try: + await self.backend.drop_tips96(drop=drop_operation, backend_params=backend_params) + except Exception as e: + for i in range(96): + if isinstance(resource, TipRack): + tip_spot = resource.get_item(i) + if does_tip_tracking() and not tip_spot.tracker.is_disabled: + tip_spot.tracker.rollback() + self.head[i].rollback() + raise e + else: + for i in range(96): + if isinstance(resource, TipRack): + tip_spot = resource.get_item(i) + if does_tip_tracking() and not tip_spot.tracker.is_disabled: + tip_spot.tracker.commit() + self.head[i].commit() + + @need_capability_ready + async def return_tips( + self, + allow_nonzero_volume: bool = False, + offset: Coordinate = Coordinate.zero(), + drop_backend_params: Optional[BackendParams] = None, + ): + """Return the tips on the 96-head to the tip rack they were picked up from. + + Args: + allow_nonzero_volume: If True, return even if tips have liquid. + offset: Additional offset. + drop_backend_params: Vendor-specific parameters for the drop. + + Raises: + RuntimeError: If no tips have been picked up. + """ + tip_rack = self._get_origin_tip_rack() + if tip_rack is None: + raise RuntimeError("No tips have been picked up with the 96 head") + await self.drop_tips( + tip_rack, + allow_nonzero_volume=allow_nonzero_volume, + offset=offset, + backend_params=drop_backend_params, + ) + + @need_capability_ready + async def discard_tips( + self, + trash: Trash, + allow_nonzero_volume: bool = True, + drop_backend_params: Optional[BackendParams] = None, + ): + """Permanently discard tips from the 96-head into the trash. + + Args: + trash: The trash resource. + allow_nonzero_volume: If True, discard even if tips have liquid. + drop_backend_params: Vendor-specific parameters for the drop. + """ + await self.drop_tips( + trash, allow_nonzero_volume=allow_nonzero_volume, backend_params=drop_backend_params + ) + + @need_capability_ready + async def aspirate( + self, + resource: Union[Plate, Container, List[Well]], + volume: float, + offset: Coordinate = Coordinate.zero(), + flow_rate: Optional[float] = None, + liquid_height: Optional[float] = None, + blow_out_air_volume: Optional[float] = None, + mix: Optional[Mix] = None, + backend_params: Optional[BackendParams] = None, + ): + """Aspirate from all wells in a plate or from a container. + + Examples: + >>> await head96.aspirate(plate, volume=50) + >>> await head96.aspirate(trough, volume=50) + + Args: + resource: A Plate, Container, or list of 96 Wells. + volume: Volume to aspirate per channel. + offset: Additional offset (added to default_offset). + flow_rate: Flow rate in ul/s. None = machine default. + liquid_height: Liquid height in mm from bottom. None = machine default. + blow_out_air_volume: Air volume to aspirate after liquid (ul). + mix: Mix parameters. + backend_params: Vendor-specific parameters. + """ + + offset = self.default_offset + offset + + if not ( + isinstance(resource, (Plate, Container)) + or (isinstance(resource, list) and all(isinstance(w, Well) for w in resource)) + ): + raise TypeError(f"Resource must be a Plate, Container, or list of Wells, got {resource}") + + tips = [ch.get_tip() if ch.has_tip else None for ch in self.head.values()] + + volume = float(volume) + flow_rate = float(flow_rate) if flow_rate is not None else None + blow_out_air_volume = float(blow_out_air_volume) if blow_out_air_volume is not None else None + + # resolve resource to containers + containers: Sequence[Container] + if isinstance(resource, Plate): + if resource.has_lid(): + raise ValueError("Aspirating from plate with lid") + containers = resource.get_all_items() if resource.num_items > 1 else [resource.get_item(0)] + elif isinstance(resource, Container): + containers = [resource] + elif isinstance(resource, list): + containers = resource + else: + raise TypeError(f"Unexpected resource type: {type(resource)}") + + aspiration: Union[MultiHeadAspirationPlate, MultiHeadAspirationContainer] + + if len(containers) == 1: # single container (trough) + container = containers[0] + if not self._check_96_head_fits_in_container(container): + raise ValueError("Container too small to accommodate 96 head") + + for tip in tips: + if tip is None: + continue + if not container.tracker.is_disabled and does_volume_tracking(): + container.tracker.remove_liquid(volume=volume) + tip.tracker.add_liquid(volume=volume) + + aspiration = MultiHeadAspirationContainer( + container=container, + volume=volume, + offset=offset, + flow_rate=flow_rate, + tips=tips, + liquid_height=liquid_height, + blow_out_air_volume=blow_out_air_volume, + mix=mix, + ) + else: # plate / list of wells + plate = containers[0].parent + for well in containers: + if well.parent != plate: + raise ValueError("All wells must be in the same plate") + if len(containers) != 96: + raise ValueError(f"aspirate96 expects 96 containers when a list, got {len(containers)}") + + for well, tip in zip(containers, tips): + if tip is None: + continue + if not well.tracker.is_disabled and does_volume_tracking(): + well.tracker.remove_liquid(volume=volume) + tip.tracker.add_liquid(volume=volume) + + aspiration = MultiHeadAspirationPlate( + wells=cast(List[Well], containers), + volume=volume, + offset=offset, + flow_rate=flow_rate, + tips=tips, + liquid_height=liquid_height, + blow_out_air_volume=blow_out_air_volume, + mix=mix, + ) + + try: + await self.backend.aspirate96(aspiration=aspiration, backend_params=backend_params) + except Exception: + for tip in tips: + if tip is not None: + tip.tracker.rollback() + for container in containers: + if does_volume_tracking() and not container.tracker.is_disabled: + container.tracker.rollback() + raise + else: + for tip in tips: + if tip is not None: + tip.tracker.commit() + for container in containers: + if does_volume_tracking() and not container.tracker.is_disabled: + container.tracker.commit() + + @need_capability_ready + async def dispense( + self, + resource: Union[Plate, Container, List[Well]], + volume: float, + offset: Coordinate = Coordinate.zero(), + flow_rate: Optional[float] = None, + liquid_height: Optional[float] = None, + blow_out_air_volume: Optional[float] = None, + mix: Optional[Mix] = None, + backend_params: Optional[BackendParams] = None, + ): + """Dispense to all wells in a plate or to a container. + + Examples: + >>> await head96.dispense(plate, volume=50) + + Args: + resource: A Plate, Container, or list of 96 Wells. + volume: Volume to dispense per channel. + offset: Additional offset (added to default_offset). + flow_rate: Flow rate in ul/s. None = machine default. + liquid_height: Liquid height in mm from bottom. None = machine default. + blow_out_air_volume: Air volume to dispense after liquid (ul). + mix: Mix parameters. + backend_params: Vendor-specific parameters. + """ + + offset = self.default_offset + offset + + if not ( + isinstance(resource, (Plate, Container)) + or (isinstance(resource, list) and all(isinstance(w, Well) for w in resource)) + ): + raise TypeError(f"Resource must be a Plate, Container, or list of Wells, got {resource}") + + tips = [ch.get_tip() if ch.has_tip else None for ch in self.head.values()] + + volume = float(volume) + flow_rate = float(flow_rate) if flow_rate is not None else None + blow_out_air_volume = float(blow_out_air_volume) if blow_out_air_volume is not None else None + + # resolve resource to containers + containers: Sequence[Container] + if isinstance(resource, Plate): + if resource.has_lid(): + raise ValueError("Dispensing to plate with lid is not possible. Remove the lid first.") + containers = resource.get_all_items() if resource.num_items > 1 else [resource.get_item(0)] + elif isinstance(resource, Container): + containers = [resource] + elif isinstance(resource, list): + containers = resource + else: + raise TypeError(f"Unexpected resource type: {type(resource)}") + + # remove liquid from tips + for tip in tips: + if tip is None: + continue + if does_volume_tracking(): + tip.tracker.remove_liquid(volume=volume) + elif tip.tracker.get_used_volume() < volume: + tip.tracker.remove_liquid(volume=min(tip.tracker.get_used_volume(), volume)) + + dispense_op: Union[MultiHeadDispensePlate, MultiHeadDispenseContainer] + + if len(containers) == 1: # single container (trough) + container = containers[0] + if not self._check_96_head_fits_in_container(container): + raise ValueError("Container too small to accommodate 96 head") + + if not container.tracker.is_disabled and does_volume_tracking(): + container.tracker.add_liquid(volume=len([t for t in tips if t is not None]) * volume) + + dispense_op = MultiHeadDispenseContainer( + container=container, + volume=volume, + offset=offset, + flow_rate=flow_rate, + tips=tips, + liquid_height=liquid_height, + blow_out_air_volume=blow_out_air_volume, + mix=mix, + ) + else: # plate / list of wells + plate = containers[0].parent + for well in containers: + if well.parent != plate: + raise ValueError("All wells must be in the same plate") + if len(containers) != 96: + raise ValueError(f"dispense96 expects 96 wells, got {len(containers)}") + + for well, tip in zip(containers, tips): + if tip is None: + continue + if not well.tracker.is_disabled and does_volume_tracking(): + well.tracker.add_liquid(volume=volume) + + dispense_op = MultiHeadDispensePlate( + wells=cast(List[Well], containers), + volume=volume, + offset=offset, + flow_rate=flow_rate, + tips=tips, + liquid_height=liquid_height, + blow_out_air_volume=blow_out_air_volume, + mix=mix, + ) + + try: + await self.backend.dispense96(dispense=dispense_op, backend_params=backend_params) + except Exception: + for tip in tips: + if tip is not None: + tip.tracker.rollback() + for container in containers: + if does_volume_tracking() and not container.tracker.is_disabled: + container.tracker.rollback() + raise + else: + for tip in tips: + if tip is not None: + tip.tracker.commit() + for container in containers: + if does_volume_tracking() and not container.tracker.is_disabled: + container.tracker.commit() + + @need_capability_ready + async def stamp( + self, + source: Plate, + target: Plate, + volume: float, + aspiration_flow_rate: Optional[float] = None, + dispense_flow_rate: Optional[float] = None, + aspirate_backend_params: Optional[BackendParams] = None, + dispense_backend_params: Optional[BackendParams] = None, + ): + """Stamp (aspirate and dispense) one plate onto another. + + Args: + source: The source plate. + target: The target plate. + volume: The volume to transfer. + aspiration_flow_rate: Flow rate for aspiration (ul/s). + dispense_flow_rate: Flow rate for dispense (ul/s). + aspirate_backend_params: Vendor-specific parameters for aspiration. + dispense_backend_params: Vendor-specific parameters for dispense. + """ + if (source.num_items_x, source.num_items_y) != (target.num_items_x, target.num_items_y): + raise ValueError("Source and target plates must be the same shape") + + await self.aspirate( + resource=source, + volume=volume, + flow_rate=aspiration_flow_rate, + backend_params=aspirate_backend_params, + ) + await self.dispense( + resource=target, + volume=volume, + flow_rate=dispense_flow_rate, + backend_params=dispense_backend_params, + ) diff --git a/pylabrobot/capabilities/liquid_handling/head96_backend.py b/pylabrobot/capabilities/liquid_handling/head96_backend.py new file mode 100644 index 00000000000..19a6d3299b0 --- /dev/null +++ b/pylabrobot/capabilities/liquid_handling/head96_backend.py @@ -0,0 +1,45 @@ +"""Abstract backend for 96-head liquid handling.""" + +from abc import ABCMeta, abstractmethod +from typing import Optional, Union + +from pylabrobot.capabilities.capability import BackendParams, CapabilityBackend + +from .standard import ( + DropTipRack, + MultiHeadAspirationContainer, + MultiHeadAspirationPlate, + MultiHeadDispenseContainer, + MultiHeadDispensePlate, + PickupTipRack, +) + + +class Head96Backend(CapabilityBackend, metaclass=ABCMeta): + """Backend for 96-head liquid handling operations.""" + + @abstractmethod + async def pick_up_tips96( + self, pickup: PickupTipRack, backend_params: Optional[BackendParams] = None + ): + """Pick up tips from a tip rack using the 96-head.""" + + @abstractmethod + async def drop_tips96(self, drop: DropTipRack, backend_params: Optional[BackendParams] = None): + """Drop tips using the 96-head.""" + + @abstractmethod + async def aspirate96( + self, + aspiration: Union[MultiHeadAspirationPlate, MultiHeadAspirationContainer], + backend_params: Optional[BackendParams] = None, + ): + """Aspirate using the 96-head.""" + + @abstractmethod + async def dispense96( + self, + dispense: Union[MultiHeadDispensePlate, MultiHeadDispenseContainer], + backend_params: Optional[BackendParams] = None, + ): + """Dispense using the 96-head.""" diff --git a/pylabrobot/capabilities/liquid_handling/pip.py b/pylabrobot/capabilities/liquid_handling/pip.py new file mode 100644 index 00000000000..2d5b89cf943 --- /dev/null +++ b/pylabrobot/capabilities/liquid_handling/pip.py @@ -0,0 +1,817 @@ +"""Capability for independent-channel liquid handling.""" + +import contextlib +import logging +from typing import Dict, Generator, List, Literal, Optional, Sequence, Union + +from pylabrobot.capabilities.capability import BackendParams, Capability, need_capability_ready +from pylabrobot.resources import ( + Container, + Coordinate, + Plate, + Tip, + TipSpot, + TipTracker, + Trash, + Well, + does_tip_tracking, + does_volume_tracking, +) +from pylabrobot.resources.errors import HasTipError + +from .errors import BlowOutVolumeError, ChannelizedError +from .pip_backend import PIPBackend +from .standard import Aspiration, Dispense, Mix, Pickup, TipDrop +from .utils import ( + get_tight_single_resource_liquid_op_offsets, + get_wide_single_resource_liquid_op_offsets, +) + +logger = logging.getLogger("pylabrobot") + + +class PIP(Capability): + """Independent-channel liquid handling: pick up tips, aspirate, dispense, drop tips. + + Faithfully ports the tip tracking, volume tracking, validation, spread modes, and + error handling from the legacy LiquidHandler frontend. + """ + + def __init__(self, backend: PIPBackend): + super().__init__(backend=backend) + self.backend: PIPBackend = backend + self.head: Dict[int, TipTracker] = {} + self._default_use_channels: Optional[List[int]] = None + self._blow_out_air_volume: Optional[List[Optional[float]]] = None + + async def _on_setup(self): + await super()._on_setup() + self.head = {c: TipTracker(thing=f"Channel {c}") for c in range(self.backend.num_channels)} + + @property + def num_channels(self) -> int: + return self.backend.num_channels + + def get_mounted_tips(self) -> List[Optional[Tip]]: + """Get the tips currently mounted on the head. + + Returns: + A list of tips, or None for channels without a tip. + """ + return [tracker.get_tip() if tracker.has_tip else None for tracker in self.head.values()] + + def update_head_state(self, state: Dict[int, Optional[Tip]]): + """Update the state of the head. + + All keys in `state` must be valid channels. Channels not in `state` keep their current state. + + Args: + state: A dictionary mapping channels to tips. None means no tip. + """ + if not set(state.keys()).issubset(set(self.head.keys())): + raise ValueError("Invalid channel.") + for channel, tip in state.items(): + if tip is None: + if self.head[channel].has_tip: + self.head[channel].remove_tip() + else: + if self.head[channel].has_tip: + self.head[channel].remove_tip() + self.head[channel].add_tip(tip) + + def clear_head_state(self): + """Clear all tips from the head.""" + self.update_head_state({c: None for c in self.head.keys()}) + + def serialize_state(self) -> Dict: + """Serialize the head state for saving/restoring.""" + return {channel: tracker.serialize() for channel, tracker in self.head.items()} + + def load_state(self, state: Dict): + """Load head state from a serialized dict.""" + for channel, tracker_state in state.items(): + self.head[channel].load_state(tracker_state) + + def _make_sure_channels_exist(self, channels: List[int]): + invalid = [c for c in channels if c not in self.head] + if invalid: + raise ValueError(f"Invalid channels: {invalid}") + + @need_capability_ready + async def pick_up_tips( + self, + tip_spots: List[TipSpot], + use_channels: Optional[List[int]] = None, + offsets: Optional[List[Coordinate]] = None, + backend_params: Optional[BackendParams] = None, + ): + """Pick up tips from tip spots. + + Examples: + Pick up all tips in the first column: + + >>> await lh.pick_up_tips(tips_resource["A1":"H1"]) + + Pick up tips on odd rows, skipping the other channels: + + >>> await lh.pick_up_tips(tips_resource["A1", "C1", "E1", "G1"], use_channels=[0, 2, 4, 6]) + + Args: + tip_spots: List of tip spots to pick up tips from. + use_channels: List of channels to use. If None, the first len(tip_spots) channels are used. + offsets: List of offsets for each pickup. Defaults to zero. + backend_params: Vendor-specific parameters. + + Raises: + HasTipError: If a channel already has a tip. + NoTipError: If a spot does not have a tip. + """ + + not_tip_spots = [ts for ts in tip_spots if not isinstance(ts, TipSpot)] + if not_tip_spots: + raise TypeError(f"Resources must be TipSpots, got {not_tip_spots}") + + use_channels = use_channels or self._default_use_channels or list(range(len(tip_spots))) + if len(set(use_channels)) != len(use_channels): + raise ValueError("Channels must be unique.") + + tips = [tip_spot.get_tip() for tip_spot in tip_spots] + offsets = offsets or [Coordinate.zero()] * len(tip_spots) + + # check tip compatibility + if not all( + self.backend.can_pick_up_tip(channel, tip) for channel, tip in zip(use_channels, tips) + ): + cannot = [ + ch for ch, tip in zip(use_channels, tips) if not self.backend.can_pick_up_tip(ch, tip) + ] + raise RuntimeError(f"Cannot pick up tips on channels {cannot}.") + + self._make_sure_channels_exist(use_channels) + if not (len(tip_spots) == len(offsets) == len(use_channels)): + raise ValueError("Number of tips, offsets, and use_channels must be equal.") + + pickups = [Pickup(resource=ts, offset=o, tip=t) for ts, o, t in zip(tip_spots, offsets, tips)] + + # queue operations on trackers + for channel, op in zip(use_channels, pickups): + if self.head[channel].has_tip: + raise HasTipError("Channel has tip") + if does_tip_tracking() and not op.resource.tracker.is_disabled: + op.resource.tracker.remove_tip() + self.head[channel].add_tip(op.tip, origin=op.resource, commit=False) + + # execute + error: Optional[BaseException] = None + try: + await self.backend.pick_up_tips( + ops=pickups, use_channels=use_channels, backend_params=backend_params + ) + except BaseException as e: + error = e + + # determine per-channel success + successes = [error is None] * len(pickups) + if error is not None: + try: + tip_presence = await self.backend.request_tip_presence() + successes = [tip_presence[ch] is True for ch in use_channels] + except Exception as tip_presence_error: + if not isinstance(tip_presence_error, NotImplementedError): + logger.warning("Failed to query tip presence after error: %s", tip_presence_error) + if isinstance(error, ChannelizedError): + successes = [ch not in error.errors for ch in use_channels] + + # commit or rollback + for channel, op, success in zip(use_channels, pickups, successes): + if does_tip_tracking() and not op.resource.tracker.is_disabled: + (op.resource.tracker.commit if success else op.resource.tracker.rollback)() + (self.head[channel].commit if success else self.head[channel].rollback)() + + if error is not None: + raise error + + @need_capability_ready + async def drop_tips( + self, + tip_spots: Sequence[Union[TipSpot, Trash]], + use_channels: Optional[List[int]] = None, + offsets: Optional[List[Coordinate]] = None, + allow_nonzero_volume: bool = False, + backend_params: Optional[BackendParams] = None, + ): + """Drop tips to tip spots or trash. + + Args: + tip_spots: Tip spots or trash to drop to. + use_channels: List of channels to use. If None, the first len(tip_spots) channels are used. + offsets: List of offsets for each drop. Defaults to zero. + allow_nonzero_volume: If True, drop even if the tip has liquid. Otherwise raise. + backend_params: Vendor-specific parameters. + + Raises: + NoTipError: If a channel does not have a tip. + HasTipError: If a spot already has a tip. + """ + + not_valid = [ts for ts in tip_spots if not isinstance(ts, (TipSpot, Trash))] + if not_valid: + raise TypeError(f"Resources must be TipSpots or Trash, got {not_valid}") + + use_channels = use_channels or self._default_use_channels or list(range(len(tip_spots))) + if len(set(use_channels)) != len(use_channels): + raise ValueError("Channels must be unique.") + + tips = [] + for channel in use_channels: + tip = self.head[channel].get_tip() + if tip.tracker.get_used_volume() > 0 and not allow_nonzero_volume: + raise RuntimeError(f"Cannot drop tip with volume {tip.tracker.get_used_volume()}") + tips.append(tip) + + offsets = offsets or [Coordinate.zero()] * len(tip_spots) + + self._make_sure_channels_exist(use_channels) + if not (len(tip_spots) == len(offsets) == len(use_channels) == len(tips)): + raise ValueError("Number of tip_spots, offsets, use_channels, and tips must be equal.") + + drops = [TipDrop(resource=ts, offset=o, tip=t) for ts, t, o in zip(tip_spots, tips, offsets)] + + # queue operations on trackers + for channel, op in zip(use_channels, drops): + if ( + does_tip_tracking() + and isinstance(op.resource, TipSpot) + and not op.resource.tracker.is_disabled + ): + op.resource.tracker.add_tip(op.tip, commit=False) + self.head[channel].remove_tip() + + # execute + error: Optional[BaseException] = None + try: + await self.backend.drop_tips( + ops=drops, use_channels=use_channels, backend_params=backend_params + ) + except BaseException as e: + error = e + + # determine per-channel success + successes = [error is None] * len(drops) + if error is not None: + try: + tip_presence = await self.backend.request_tip_presence() + successes = [tip_presence[ch] is False for ch in use_channels] + except Exception as tip_presence_error: + if not isinstance(tip_presence_error, NotImplementedError): + logger.warning("Failed to query tip presence after error: %s", tip_presence_error) + if isinstance(error, ChannelizedError): + successes = [ch not in error.errors for ch in use_channels] + + # commit or rollback + for channel, op, success in zip(use_channels, drops, successes): + if ( + does_tip_tracking() + and isinstance(op.resource, TipSpot) + and not op.resource.tracker.is_disabled + ): + (op.resource.tracker.commit if success else op.resource.tracker.rollback)() + (self.head[channel].commit if success else self.head[channel].rollback)() + + if error is not None: + raise error + + @need_capability_ready + async def return_tips( + self, + use_channels: Optional[List[int]] = None, + allow_nonzero_volume: bool = False, + offsets: Optional[List[Coordinate]] = None, + drop_backend_params: Optional[BackendParams] = None, + ): + """Return all tips currently picked up to their original place. + + Args: + use_channels: Channels to return. If None, all channels with tips are used. + allow_nonzero_volume: If True, return even if the tip has liquid. + offsets: List of offsets for each drop. + drop_backend_params: Vendor-specific parameters for the drop. + + Raises: + RuntimeError: If no tips have been picked up. + """ + + tip_spots: List[TipSpot] = [] + channels: List[int] = [] + + for channel, tracker in self.head.items(): + if use_channels is not None and channel not in use_channels: + continue + if tracker.has_tip: + origin = tracker.get_tip_origin() + if origin is None: + raise RuntimeError("No tip origin found.") + tip_spots.append(origin) + channels.append(channel) + + if len(tip_spots) == 0: + raise RuntimeError("No tips have been picked up.") + + await self.drop_tips( + tip_spots=tip_spots, + use_channels=channels, + allow_nonzero_volume=allow_nonzero_volume, + offsets=offsets, + backend_params=drop_backend_params, + ) + + @need_capability_ready + async def discard_tips( + self, + trash: Trash, + use_channels: Optional[List[int]] = None, + allow_nonzero_volume: bool = True, + offsets: Optional[List[Coordinate]] = None, + drop_backend_params: Optional[BackendParams] = None, + ): + """Permanently discard tips in the trash. + + Args: + trash: The trash resource. + use_channels: Channels to discard. If None, all channels with tips are used. + allow_nonzero_volume: If True, discard even if the tip has liquid. + offsets: List of offsets for each drop. + drop_backend_params: Vendor-specific parameters for the drop. + """ + + if use_channels is None: + use_channels = [c for c, t in self.head.items() if t.has_tip] + + n = len(use_channels) + if n == 0: + raise RuntimeError("No tips have been picked up and no channels were specified.") + + trash_offsets = get_tight_single_resource_liquid_op_offsets(trash, num_channels=n) + offsets = [ + o + to if o is not None else to + for o, to in zip(offsets or [None] * n, trash_offsets) # type: ignore + ] + + await self.drop_tips( + tip_spots=[trash] * n, + use_channels=use_channels, + offsets=offsets, + allow_nonzero_volume=allow_nonzero_volume, + backend_params=drop_backend_params, + ) + + @need_capability_ready + async def move_tips( + self, + source_tip_spots: List[TipSpot], + dest_tip_spots: List[TipSpot], + pick_up_backend_params: Optional[BackendParams] = None, + drop_backend_params: Optional[BackendParams] = None, + ): + """Move tips from one tip rack to another. + + Examples: + >>> await cap.move_tips(source_rack["A1":"A8"], dest_rack["B1":"B8"]) + """ + if len(source_tip_spots) != len(dest_tip_spots): + raise ValueError("Number of source and destination tip spots must match.") + + use_channels = list(range(len(source_tip_spots))) + await self.pick_up_tips( + tip_spots=source_tip_spots, + use_channels=use_channels, + backend_params=pick_up_backend_params, + ) + await self.drop_tips( + tip_spots=dest_tip_spots, + use_channels=use_channels, + backend_params=drop_backend_params, + ) + + @need_capability_ready + async def aspirate( + self, + resources: Sequence[Container], + vols: List[float], + use_channels: Optional[List[int]] = None, + flow_rates: Optional[List[Optional[float]]] = None, + offsets: Optional[List[Coordinate]] = None, + liquid_height: Optional[List[Optional[float]]] = None, + blow_out_air_volume: Optional[List[Optional[float]]] = None, + spread: Literal["wide", "tight", "custom"] = "wide", + mix: Optional[List[Mix]] = None, + backend_params: Optional[BackendParams] = None, + ): + """Aspirate liquid from the specified containers. + + Examples: + Aspirate 50 uL from the first column: + + >>> await cap.aspirate(plate["A1:H1"], vols=[50]*8) + + Aspirate from a single container with multiple channels spread evenly: + + >>> await cap.aspirate([trough], vols=[50]*4, use_channels=[0,1,2,3]) + + Args: + resources: Containers to aspirate from. If a single resource is given with multiple channels, + channels are spread across it according to `spread`. + vols: Volume to aspirate per channel. + use_channels: Channels to use. Defaults to 0..len(resources)-1. + flow_rates: Flow rate per channel (ul/s). None = machine default. + offsets: Offset per channel. + liquid_height: Liquid height per channel (mm from bottom). None = machine default. + blow_out_air_volume: Air volume to aspirate after liquid (ul). None = machine default. + spread: How to space channels on a single resource: "wide", "tight", or "custom". + mix: Mix parameters per channel. + backend_params: Vendor-specific parameters. + """ + + not_containers = [r for r in resources if not isinstance(r, Container)] + if not_containers: + raise TypeError(f"Resources must be Containers, got {not_containers}") + + use_channels = use_channels or self._default_use_channels or list(range(len(resources))) + if len(set(use_channels)) != len(use_channels): + raise ValueError("Channels must be unique.") + + offsets = offsets or [Coordinate.zero()] * len(use_channels) + flow_rates = flow_rates or [None] * len(use_channels) + liquid_height = liquid_height or [None] * len(use_channels) + blow_out_air_volume = blow_out_air_volume or [None] * len(use_channels) + + vols = [float(v) for v in vols] + flow_rates = [float(fr) if fr is not None else None for fr in flow_rates] + liquid_height = [float(lh) if lh is not None else None for lh in liquid_height] + blow_out_air_volume = [float(bav) if bav is not None else None for bav in blow_out_air_volume] + + self._blow_out_air_volume = blow_out_air_volume + tips = [self.head[channel].get_tip() for channel in use_channels] + + for resource in resources: + if isinstance(resource.parent, Plate) and resource.parent.has_lid(): + raise ValueError("Aspirating from a well with a lid is not supported.") + + self._make_sure_channels_exist(use_channels) + for name, param in [ + ("resources", resources), + ("vols", vols), + ("offsets", offsets), + ("flow_rates", flow_rates), + ("liquid_height", liquid_height), + ("blow_out_air_volume", blow_out_air_volume), + ]: + if len(param) != len(use_channels): + raise ValueError( + f"Length of {name} must match use_channels: {len(param)} != {len(use_channels)}" + ) + + # spread channels across a single resource + if len(set(resources)) == 1: + resource = resources[0] + resources = [resource] * len(use_channels) + if spread == "tight": + center_offsets = get_tight_single_resource_liquid_op_offsets( + resource=resource, num_channels=len(use_channels) + ) + elif spread == "wide": + center_offsets = get_wide_single_resource_liquid_op_offsets( + resource=resource, num_channels=len(use_channels) + ) + elif spread == "custom": + center_offsets = [Coordinate.zero()] * len(use_channels) + else: + raise ValueError("Invalid spread. Must be 'tight', 'wide', or 'custom'.") + offsets = [c + o for c, o in zip(center_offsets, offsets)] + + aspirations = [ + Aspiration( + resource=r, + volume=v, + offset=o, + flow_rate=fr, + liquid_height=lh, + tip=t, + blow_out_air_volume=bav, + mix=m, + ) + for r, v, o, fr, lh, t, bav, m in zip( + resources, + vols, + offsets, + flow_rates, + liquid_height, + tips, + blow_out_air_volume, + mix or [None] * len(use_channels), # type: ignore + ) + ] + + # queue volume tracking + for op in aspirations: + if does_volume_tracking(): + if not op.resource.tracker.is_disabled: + op.resource.tracker.remove_liquid(op.volume) + op.tip.tracker.add_liquid(volume=op.volume) + + # execute + error: Optional[Exception] = None + try: + await self.backend.aspirate( + ops=aspirations, use_channels=use_channels, backend_params=backend_params + ) + except Exception as e: + error = e + + # determine per-channel success + successes = [error is None] * len(aspirations) + if error is not None and isinstance(error, ChannelizedError): + successes = [ch not in error.errors for ch in use_channels] + + # commit or rollback + for channel, op, success in zip(use_channels, aspirations, successes): + if does_volume_tracking(): + if not op.resource.tracker.is_disabled: + (op.resource.tracker.commit if success else op.resource.tracker.rollback)() + tip_volume_tracker = self.head[channel].get_tip().tracker + (tip_volume_tracker.commit if success else tip_volume_tracker.rollback)() + + if error is not None: + raise error + + @need_capability_ready + async def dispense( + self, + resources: Sequence[Container], + vols: List[float], + use_channels: Optional[List[int]] = None, + flow_rates: Optional[List[Optional[float]]] = None, + offsets: Optional[List[Coordinate]] = None, + liquid_height: Optional[List[Optional[float]]] = None, + blow_out_air_volume: Optional[List[Optional[float]]] = None, + spread: Literal["wide", "tight", "custom"] = "wide", + mix: Optional[List[Mix]] = None, + backend_params: Optional[BackendParams] = None, + ): + """Dispense liquid to the specified containers. + + Examples: + Dispense 50 uL to the first column: + + >>> await cap.dispense(plate["A1:H1"], vols=[50]*8) + + Args: + resources: Containers to dispense to. + vols: Volume to dispense per channel. + use_channels: Channels to use. Defaults to 0..len(resources)-1. + flow_rates: Flow rate per channel (ul/s). None = machine default. + offsets: Offset per channel. + liquid_height: Liquid height per channel (mm from bottom). None = machine default. + blow_out_air_volume: Air volume to dispense after liquid (ul). None = machine default. + spread: How to space channels on a single resource: "wide", "tight", or "custom". + mix: Mix parameters per channel. + backend_params: Vendor-specific parameters. + """ + + not_containers = [r for r in resources if not isinstance(r, Container)] + if not_containers: + raise TypeError(f"Resources must be Containers, got {not_containers}") + + use_channels = use_channels or self._default_use_channels or list(range(len(resources))) + if len(set(use_channels)) != len(use_channels): + raise ValueError("Channels must be unique.") + + offsets = offsets or [Coordinate.zero()] * len(use_channels) + flow_rates = flow_rates or [None] * len(use_channels) + liquid_height = liquid_height or [None] * len(use_channels) + blow_out_air_volume = blow_out_air_volume or [None] * len(use_channels) + + vols = [float(v) for v in vols] + flow_rates = [float(fr) if fr is not None else None for fr in flow_rates] + liquid_height = [float(lh) if lh is not None else None for lh in liquid_height] + blow_out_air_volume = [float(bav) if bav is not None else None for bav in blow_out_air_volume] + + # spread channels across a single resource + if len(set(resources)) == 1: + resource = resources[0] + resources = [resource] * len(use_channels) + if spread == "tight": + center_offsets = get_tight_single_resource_liquid_op_offsets( + resource=resource, num_channels=len(use_channels) + ) + elif spread == "wide": + center_offsets = get_wide_single_resource_liquid_op_offsets( + resource=resource, num_channels=len(use_channels) + ) + elif spread == "custom": + center_offsets = [Coordinate.zero()] * len(use_channels) + else: + raise ValueError("Invalid spread. Must be 'tight', 'wide', or 'custom'.") + offsets = [c + o for c, o in zip(center_offsets, offsets)] + + tips = [self.head[channel].get_tip() for channel in use_channels] + + # check blow-out air volume against what was aspirated + if does_volume_tracking(): + if any(bav is not None and bav != 0.0 for bav in blow_out_air_volume): + if self._blow_out_air_volume is None: + raise BlowOutVolumeError("No blowout volume was aspirated.") + for requested_bav, done_bav in zip(blow_out_air_volume, self._blow_out_air_volume): + if requested_bav is not None and done_bav is not None and requested_bav > done_bav: + raise BlowOutVolumeError("Blowout volume is larger than aspirated volume") + + for resource in resources: + if isinstance(resource.parent, Plate) and resource.parent.has_lid(): + raise ValueError("Dispensing to a well with a lid is not supported.") + + for name, param in [ + ("resources", resources), + ("vols", vols), + ("offsets", offsets), + ("flow_rates", flow_rates), + ("liquid_height", liquid_height), + ("blow_out_air_volume", blow_out_air_volume), + ]: + if len(param) != len(use_channels): + raise ValueError( + f"Length of {name} must match use_channels: {len(param)} != {len(use_channels)}" + ) + + dispenses = [ + Dispense( + resource=r, + volume=v, + offset=o, + flow_rate=fr, + liquid_height=lh, + tip=t, + blow_out_air_volume=bav, + mix=m, + ) + for r, v, o, fr, lh, t, bav, m in zip( + resources, + vols, + offsets, + flow_rates, + liquid_height, + tips, + blow_out_air_volume, + mix or [None] * len(use_channels), # type: ignore + ) + ] + + # queue volume tracking + for op in dispenses: + if does_volume_tracking(): + if not op.resource.tracker.is_disabled: + op.resource.tracker.add_liquid(volume=op.volume) + op.tip.tracker.remove_liquid(op.volume) + + # execute + error: Optional[Exception] = None + try: + await self.backend.dispense( + ops=dispenses, use_channels=use_channels, backend_params=backend_params + ) + except Exception as e: + error = e + + # determine per-channel success + successes = [error is None] * len(dispenses) + if error is not None and isinstance(error, ChannelizedError): + successes = [ch not in error.errors for ch in use_channels] + + # commit or rollback + for channel, op, success in zip(use_channels, dispenses, successes): + if does_volume_tracking(): + if not op.resource.tracker.is_disabled: + (op.resource.tracker.commit if success else op.resource.tracker.rollback)() + tip_volume_tracker = self.head[channel].get_tip().tracker + (tip_volume_tracker.commit if success else tip_volume_tracker.rollback)() + + if any(bav is not None for bav in blow_out_air_volume): + self._blow_out_air_volume = None + + if error is not None: + raise error + + @need_capability_ready + async def transfer( + self, + source: Well, + targets: List[Well], + source_vol: Optional[float] = None, + ratios: Optional[List[float]] = None, + target_vols: Optional[List[float]] = None, + aspiration_flow_rate: Optional[float] = None, + dispense_flow_rates: Optional[List[Optional[float]]] = None, + aspirate_backend_params: Optional[BackendParams] = None, + dispense_backend_params: Optional[BackendParams] = None, + ): + """Transfer liquid from one well to multiple targets. + + Examples: + Transfer 50 uL from A1 to B1: + + >>> await cap.transfer(plate["A1"], plate["B1"], source_vol=50) + + Transfer 80 uL equally to the first column: + + >>> await cap.transfer(plate["A1"], plate["A1:H1"], source_vol=80) + + Transfer 60 uL in a 2:1 ratio: + + >>> await cap.transfer(plate["A1"], plate["B1:C1"], source_vol=60, ratios=[2, 1]) + + Args: + source: The source well. + targets: The target wells. + source_vol: The total volume to aspirate from source. + ratios: Ratios for distributing liquid. If None, distribute equally. + target_vols: Explicit volumes per target. Mutually exclusive with source_vol/ratios. + aspiration_flow_rate: Flow rate for aspiration (ul/s). + dispense_flow_rates: Flow rates for dispense per target (ul/s). + aspirate_backend_params: Vendor-specific parameters for aspiration. + dispense_backend_params: Vendor-specific parameters for dispense. + """ + + if target_vols is not None: + if ratios is not None: + raise TypeError("Cannot specify ratios and target_vols at the same time") + if source_vol is not None: + raise TypeError("Cannot specify source_vol and target_vols at the same time") + else: + if source_vol is None: + raise TypeError("Must specify either source_vol or target_vols") + if ratios is None: + ratios = [1] * len(targets) + target_vols = [source_vol * r / sum(ratios) for r in ratios] + + await self.aspirate( + resources=[source], + vols=[sum(target_vols)], + flow_rates=[aspiration_flow_rate], + backend_params=aspirate_backend_params, + ) + dispense_flow_rates = dispense_flow_rates or [None] * len(targets) + for target, vol, dfr in zip(targets, target_vols, dispense_flow_rates): + await self.dispense( + resources=[target], + vols=[vol], + flow_rates=[dfr], + use_channels=[0], + backend_params=dispense_backend_params, + ) + + @contextlib.contextmanager + def use_channels(self, channels: List[int]) -> Generator[None, None, None]: + """Temporarily use the specified channels as default for all operations. + + Examples: + >>> with cap.use_channels([2]): + ... await cap.pick_up_tips(tip_rack["A1"]) + ... await cap.aspirate(plate["A1"], vols=[50]) + """ + self._default_use_channels = channels + try: + yield + finally: + self._default_use_channels = None + + @contextlib.asynccontextmanager + async def use_tips( + self, + tip_spots: List[TipSpot], + trash: Trash, + channels: Optional[List[int]] = None, + discard: bool = True, + pick_up_backend_params: Optional[BackendParams] = None, + drop_backend_params: Optional[BackendParams] = None, + ): + """Context manager that picks up tips on entry and discards/returns on exit. + + Examples: + >>> async with cap.use_tips(tip_rack["A1":"H1"], trash=trash): + ... await cap.aspirate(plate["A1":"H1"], vols=[50]*8) + ... await cap.dispense(plate["A1":"H1"], vols=[50]*8) + """ + if channels is None: + channels = list(range(len(tip_spots))) + if len(tip_spots) != len(channels): + raise ValueError("Number of tip spots and channels must match.") + + await self.pick_up_tips(tip_spots, use_channels=channels, backend_params=pick_up_backend_params) + try: + yield + finally: + if discard: + await self.discard_tips( + trash=trash, use_channels=channels, drop_backend_params=drop_backend_params + ) + else: + await self.return_tips(use_channels=channels, drop_backend_params=drop_backend_params) diff --git a/pylabrobot/capabilities/liquid_handling/pip_backend.py b/pylabrobot/capabilities/liquid_handling/pip_backend.py new file mode 100644 index 00000000000..373db5371ba --- /dev/null +++ b/pylabrobot/capabilities/liquid_handling/pip_backend.py @@ -0,0 +1,76 @@ +"""Abstract backend for independent-channel liquid handling.""" + +from abc import ABCMeta, abstractmethod +from typing import List, Optional + +from pylabrobot.capabilities.capability import BackendParams, CapabilityBackend +from pylabrobot.resources import Tip + +from .standard import Aspiration, Dispense, Pickup, TipDrop + + +class PIPBackend(CapabilityBackend, metaclass=ABCMeta): + """Backend for independent-channel liquid handling operations. + + Each operation takes a list of ops (one per channel being used) and a list + of channel indices specifying which physical channels to use. + """ + + @property + @abstractmethod + def num_channels(self) -> int: + """The number of independent channels available.""" + + @abstractmethod + async def pick_up_tips( + self, + ops: List[Pickup], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ): + """Pick up tips from the specified tip spots.""" + + @abstractmethod + async def drop_tips( + self, + ops: List[TipDrop], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ): + """Drop tips to the specified resources.""" + + @abstractmethod + async def aspirate( + self, + ops: List[Aspiration], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ): + """Aspirate liquid from the specified containers.""" + + @abstractmethod + async def dispense( + self, + ops: List[Dispense], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ): + """Dispense liquid to the specified containers.""" + + def can_pick_up_tip(self, channel_idx: int, tip: Tip) -> bool: + """Check if the tip can be picked up by the specified channel. + + Does not consider if a tip is already mounted — just whether the tip is compatible. + Default returns True; override for hardware-specific constraints. + """ + return True + + async def request_tip_presence(self) -> List[Optional[bool]]: + """Request the tip presence status for each channel. + + Returns a list of length `num_channels` where each element is True if a tip is mounted, + False if not, or None if unknown. + + Default raises NotImplementedError; override if hardware supports tip presence detection. + """ + raise NotImplementedError() diff --git a/pylabrobot/capabilities/liquid_handling/standard.py b/pylabrobot/capabilities/liquid_handling/standard.py new file mode 100644 index 00000000000..af006699664 --- /dev/null +++ b/pylabrobot/capabilities/liquid_handling/standard.py @@ -0,0 +1,149 @@ +"""Standard types for liquid handling operations.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import TYPE_CHECKING, List, Optional, Sequence, Union + +from pylabrobot.resources import Coordinate + +if TYPE_CHECKING: + from pylabrobot.resources import Container, Tip, TipRack, TipSpot, Trash, Well + + +@dataclass(frozen=True) +class Mix: + """Mix parameters for aspiration/dispense operations.""" + + volume: float + repetitions: int + flow_rate: float + + +# --------------------------------------------------------------------------- +# Independent channel operations +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class Pickup: + """Pick up a tip from a tip spot.""" + + resource: TipSpot + offset: Coordinate + tip: Tip + + +@dataclass(frozen=True) +class TipDrop: + """Drop a tip to a tip spot or trash.""" + + resource: Union[TipSpot, Trash] + offset: Coordinate + tip: Tip + + +@dataclass(frozen=True) +class Aspiration: + """Aspirate liquid from a container using an independent channel.""" + + resource: Container + offset: Coordinate + tip: Tip + volume: float + flow_rate: Optional[float] + liquid_height: Optional[float] + blow_out_air_volume: Optional[float] + mix: Optional[Mix] + + +@dataclass(frozen=True) +class Dispense: + """Dispense liquid to a container using an independent channel.""" + + resource: Container + offset: Coordinate + tip: Tip + volume: float + flow_rate: Optional[float] + liquid_height: Optional[float] + blow_out_air_volume: Optional[float] + mix: Optional[Mix] + + +# --------------------------------------------------------------------------- +# 96-head operations +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class PickupTipRack: + """Pick up tips from a tip rack using the 96-head.""" + + resource: TipRack + offset: Coordinate + tips: Sequence[Optional[Tip]] + + +@dataclass(frozen=True) +class DropTipRack: + """Drop tips to a tip rack or trash using the 96-head.""" + + resource: Union[TipRack, Trash] + offset: Coordinate + + +@dataclass(frozen=True) +class MultiHeadAspirationPlate: + """Aspirate from wells in a plate using the 96-head.""" + + wells: List[Well] + offset: Coordinate + tips: Sequence[Optional[Tip]] + volume: float + flow_rate: Optional[float] + liquid_height: Optional[float] + blow_out_air_volume: Optional[float] + mix: Optional[Mix] + + +@dataclass(frozen=True) +class MultiHeadDispensePlate: + """Dispense to wells in a plate using the 96-head.""" + + wells: List[Well] + offset: Coordinate + tips: Sequence[Optional[Tip]] + volume: float + flow_rate: Optional[float] + liquid_height: Optional[float] + blow_out_air_volume: Optional[float] + mix: Optional[Mix] + + +@dataclass(frozen=True) +class MultiHeadAspirationContainer: + """Aspirate from a single container (trough) using the 96-head.""" + + container: Container + offset: Coordinate + tips: Sequence[Optional[Tip]] + volume: float + flow_rate: Optional[float] + liquid_height: Optional[float] + blow_out_air_volume: Optional[float] + mix: Optional[Mix] + + +@dataclass(frozen=True) +class MultiHeadDispenseContainer: + """Dispense to a single container (trough) using the 96-head.""" + + container: Container + offset: Coordinate + tips: Sequence[Optional[Tip]] + volume: float + flow_rate: Optional[float] + liquid_height: Optional[float] + blow_out_air_volume: Optional[float] + mix: Optional[Mix] diff --git a/pylabrobot/capabilities/liquid_handling/utils.py b/pylabrobot/capabilities/liquid_handling/utils.py new file mode 100644 index 00000000000..ace3173eaf0 --- /dev/null +++ b/pylabrobot/capabilities/liquid_handling/utils.py @@ -0,0 +1,77 @@ +"""Utility functions for liquid handling channel spacing.""" + +from typing import List + +from pylabrobot.resources.coordinate import Coordinate +from pylabrobot.resources.resource import Resource + +GENERIC_LH_MIN_SPACING_BETWEEN_CHANNELS = 9 +MIN_SPACING_BETWEEN_CHANNELS = GENERIC_LH_MIN_SPACING_BETWEEN_CHANNELS +# minimum spacing between the edge of the container and the center of channel +MIN_SPACING_EDGE = 1.0 + + +def _get_centers_with_margin(dim_size: float, n: int, margin: float, min_spacing: float): + """Get the centers of the channels with a minimum margin on the edges.""" + if dim_size < margin * 2 + (n - 1) * min_spacing: + raise ValueError("Resource is too small to space channels.") + if dim_size - (n - 1) * min_spacing <= min_spacing * 2: + remaining_space = dim_size - (n - 1) * min_spacing - margin * 2 + return [margin + remaining_space / 2 + i * min_spacing for i in range(n)] + return [(i + 1) * dim_size / (n + 1) for i in range(n)] + + +def get_wide_single_resource_liquid_op_offsets( + resource: Resource, + num_channels: int, + min_spacing: float = GENERIC_LH_MIN_SPACING_BETWEEN_CHANNELS, +) -> List[Coordinate]: + resource_size = resource.get_absolute_size_y() + centers = list( + reversed( + _get_centers_with_margin( + dim_size=resource_size, + n=num_channels, + margin=MIN_SPACING_EDGE, + min_spacing=min_spacing, + ) + ) + ) # reverse because channels are from back to front + + # offsets are relative to the center of the resource, but above we computed them wrt lfb + # so we need to subtract the center of the resource + # also, offsets are in absolute space, so we need to rotate the center + return [ + Coordinate( + x=0, + y=c - resource.center().rotated(resource.get_absolute_rotation()).y, + z=0, + ) + for c in centers + ] + + +def get_tight_single_resource_liquid_op_offsets( + resource: Resource, + num_channels: int, + min_spacing: float = GENERIC_LH_MIN_SPACING_BETWEEN_CHANNELS, +) -> List[Coordinate]: + channel_space = (num_channels - 1) * min_spacing + + min_y = (resource.get_absolute_size_y() - channel_space) / 2 + if min_y < MIN_SPACING_EDGE: + raise ValueError("Resource is too small to space channels.") + + centers = [min_y + i * min_spacing for i in range(num_channels)][::-1] + + # offsets are relative to the center of the resource, but above we computed them wrt lfb + # so we need to subtract the center of the resource + # also, offsets are in absolute space, so we need to rotate the center + return [ + Coordinate( + x=0, + y=c - resource.center().rotated(resource.get_absolute_rotation()).y, + z=0, + ) + for c in centers + ] diff --git a/pylabrobot/legacy/liquid_handling/liquid_handler.py b/pylabrobot/legacy/liquid_handling/liquid_handler.py index 3159a828312..baf756d5eab 100644 --- a/pylabrobot/legacy/liquid_handling/liquid_handler.py +++ b/pylabrobot/legacy/liquid_handling/liquid_handler.py @@ -8,6 +8,7 @@ import logging import unittest.mock import warnings +from dataclasses import dataclass, field from typing import ( Any, Awaitable, @@ -21,9 +22,50 @@ Set, Tuple, Union, - cast, ) +from pylabrobot.capabilities.capability import BackendParams +from pylabrobot.capabilities.liquid_handling.head96 import Head96Capability +from pylabrobot.capabilities.liquid_handling.head96_backend import ( + Head96Backend as _NewHead96Backend, +) +from pylabrobot.capabilities.liquid_handling.pip import PIP +from pylabrobot.capabilities.liquid_handling.pip_backend import ( + PIPBackend as _NewLHBackend, +) +from pylabrobot.capabilities.liquid_handling.standard import ( + Aspiration as _NewAspiration, +) +from pylabrobot.capabilities.liquid_handling.standard import ( + Dispense as _NewDispense, +) +from pylabrobot.capabilities.liquid_handling.standard import ( + DropTipRack as _NewDropTipRack, +) +from pylabrobot.capabilities.liquid_handling.standard import ( + Mix as _NewMix, +) +from pylabrobot.capabilities.liquid_handling.standard import ( + MultiHeadAspirationContainer as _NewMHAC, +) +from pylabrobot.capabilities.liquid_handling.standard import ( + MultiHeadAspirationPlate as _NewMHAP, +) +from pylabrobot.capabilities.liquid_handling.standard import ( + MultiHeadDispenseContainer as _NewMHDC, +) +from pylabrobot.capabilities.liquid_handling.standard import ( + MultiHeadDispensePlate as _NewMHDP, +) +from pylabrobot.capabilities.liquid_handling.standard import ( + Pickup as _NewPickup, +) +from pylabrobot.capabilities.liquid_handling.standard import ( + PickupTipRack as _NewPickupTipRack, +) +from pylabrobot.capabilities.liquid_handling.standard import ( + TipDrop as _NewTipDrop, +) from pylabrobot.legacy.liquid_handling.errors import ChannelizedError from pylabrobot.legacy.liquid_handling.strictness import ( Strictness, @@ -31,7 +73,6 @@ ) from pylabrobot.legacy.liquid_handling.utils import ( get_tight_single_resource_liquid_op_offsets, - get_wide_single_resource_liquid_op_offsets, ) from pylabrobot.legacy.machines.machine import Machine, need_setup_finished from pylabrobot.legacy.plate_reading import PlateReader @@ -53,10 +94,7 @@ TipTracker, Trash, Well, - does_tip_tracking, - does_volume_tracking, ) -from pylabrobot.resources.errors import HasTipError from pylabrobot.resources.rotation import Rotation from pylabrobot.serializer import deserialize, serialize @@ -88,10 +126,200 @@ ] +def _convert_mix(new_mix) -> Optional[Mix]: + """Convert a new-style Mix to a legacy Mix.""" + if new_mix is None: + return None + return Mix(volume=new_mix.volume, repetitions=new_mix.repetitions, flow_rate=new_mix.flow_rate) + + class BlowOutVolumeError(Exception): pass +# --------------------------------------------------------------------------- +# Legacy → new adapters +# --------------------------------------------------------------------------- + + +@dataclass +class _DictBackendParams(BackendParams): + """Wraps legacy **backend_kwargs into a BackendParams for the new capability interface.""" + + kwargs: Dict[str, Any] = field(default_factory=dict) + + +class _LHAdapter(_NewLHBackend): + """Adapts legacy LiquidHandlerBackend to new LiquidHandlerBackend.""" + + def __init__(self, legacy: LiquidHandlerBackend): + self._legacy = legacy + + @property + def num_channels(self) -> int: + return self._legacy.num_channels + + async def pick_up_tips( + self, + ops: List[_NewPickup], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ): + legacy_ops = [Pickup(resource=op.resource, offset=op.offset, tip=op.tip) for op in ops] + kw = backend_params.kwargs if isinstance(backend_params, _DictBackendParams) else {} + await self._legacy.pick_up_tips(ops=legacy_ops, use_channels=use_channels, **kw) + + async def drop_tips( + self, + ops: List[_NewTipDrop], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ): + legacy_ops = [Drop(resource=op.resource, offset=op.offset, tip=op.tip) for op in ops] + kw = backend_params.kwargs if isinstance(backend_params, _DictBackendParams) else {} + await self._legacy.drop_tips(ops=legacy_ops, use_channels=use_channels, **kw) + + async def aspirate( + self, + ops: List[_NewAspiration], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ): + legacy_ops = [ + SingleChannelAspiration( + resource=op.resource, + offset=op.offset, + tip=op.tip, + volume=op.volume, + flow_rate=op.flow_rate, + liquid_height=op.liquid_height, + blow_out_air_volume=op.blow_out_air_volume, + mix=_convert_mix(op.mix), + ) + for op in ops + ] + kw = backend_params.kwargs if isinstance(backend_params, _DictBackendParams) else {} + await self._legacy.aspirate(ops=legacy_ops, use_channels=use_channels, **kw) + + async def dispense( + self, + ops: List[_NewDispense], + use_channels: List[int], + backend_params: Optional[BackendParams] = None, + ): + legacy_ops = [ + SingleChannelDispense( + resource=op.resource, + offset=op.offset, + tip=op.tip, + volume=op.volume, + flow_rate=op.flow_rate, + liquid_height=op.liquid_height, + blow_out_air_volume=op.blow_out_air_volume, + mix=_convert_mix(op.mix), + ) + for op in ops + ] + kw = backend_params.kwargs if isinstance(backend_params, _DictBackendParams) else {} + await self._legacy.dispense(ops=legacy_ops, use_channels=use_channels, **kw) + + def can_pick_up_tip(self, channel_idx: int, tip: Tip) -> bool: + return self._legacy.can_pick_up_tip(channel_idx, tip) + + async def request_tip_presence(self) -> List[Optional[bool]]: + return await self._legacy.request_tip_presence() + + +class _Head96Adapter(_NewHead96Backend): + """Adapts legacy LiquidHandlerBackend to new Head96Backend.""" + + def __init__(self, legacy: LiquidHandlerBackend): + self._legacy = legacy + + async def pick_up_tips96( + self, pickup: _NewPickupTipRack, backend_params: Optional[BackendParams] = None + ): + legacy_pickup = PickupTipRack(resource=pickup.resource, offset=pickup.offset, tips=pickup.tips) + kw = backend_params.kwargs if isinstance(backend_params, _DictBackendParams) else {} + await self._legacy.pick_up_tips96(pickup=legacy_pickup, **kw) + + async def drop_tips96( + self, drop: _NewDropTipRack, backend_params: Optional[BackendParams] = None + ): + legacy_drop = DropTipRack(resource=drop.resource, offset=drop.offset) + kw = backend_params.kwargs if isinstance(backend_params, _DictBackendParams) else {} + await self._legacy.drop_tips96(drop=legacy_drop, **kw) + + async def aspirate96( + self, + aspiration: Union[_NewMHAP, _NewMHAC], + backend_params: Optional[BackendParams] = None, + ): + if isinstance(aspiration, _NewMHAP): + legacy_asp: Union[MultiHeadAspirationPlate, MultiHeadAspirationContainer] = ( + MultiHeadAspirationPlate( + wells=aspiration.wells, + offset=aspiration.offset, + tips=aspiration.tips, + volume=aspiration.volume, + flow_rate=aspiration.flow_rate, + liquid_height=aspiration.liquid_height, + blow_out_air_volume=aspiration.blow_out_air_volume, + mix=_convert_mix(aspiration.mix), + ) + ) + else: + legacy_asp = MultiHeadAspirationContainer( + container=aspiration.container, + offset=aspiration.offset, + tips=aspiration.tips, + volume=aspiration.volume, + flow_rate=aspiration.flow_rate, + liquid_height=aspiration.liquid_height, + blow_out_air_volume=aspiration.blow_out_air_volume, + mix=_convert_mix(aspiration.mix), + ) + kw = backend_params.kwargs if isinstance(backend_params, _DictBackendParams) else {} + await self._legacy.aspirate96(aspiration=legacy_asp, **kw) + + async def dispense96( + self, + dispense: Union[_NewMHDP, _NewMHDC], + backend_params: Optional[BackendParams] = None, + ): + if isinstance(dispense, _NewMHDP): + legacy_disp: Union[MultiHeadDispensePlate, MultiHeadDispenseContainer] = ( + MultiHeadDispensePlate( + wells=dispense.wells, + offset=dispense.offset, + tips=dispense.tips, + volume=dispense.volume, + flow_rate=dispense.flow_rate, + liquid_height=dispense.liquid_height, + blow_out_air_volume=dispense.blow_out_air_volume, + mix=_convert_mix(dispense.mix), + ) + ) + else: + legacy_disp = MultiHeadDispenseContainer( + container=dispense.container, + offset=dispense.offset, + tips=dispense.tips, + volume=dispense.volume, + flow_rate=dispense.flow_rate, + liquid_height=dispense.liquid_height, + blow_out_air_volume=dispense.blow_out_air_volume, + mix=_convert_mix(dispense.mix), + ) + kw = backend_params.kwargs if isinstance(backend_params, _DictBackendParams) else {} + await self._legacy.dispense96(dispense=legacy_disp, **kw) + + +# --------------------------------------------------------------------------- +# LiquidHandler +# --------------------------------------------------------------------------- + + class LiquidHandler(Resource, Machine): """ Front end for liquid handlers. @@ -135,7 +363,9 @@ def __init__( self.head96: Dict[int, TipTracker] = {} self._default_use_channels: Optional[List[int]] = None - self._blow_out_air_volume: Optional[List[Optional[float]]] = None + # New capability instances — created during setup() + self._lh_cap: Optional[PIP] = None + self._head96_cap: Optional[Head96Capability] = None # Default offset applied to all 96-head operations. Any offset passed to a 96-head method is # added to this value. @@ -166,13 +396,19 @@ async def setup(self, **backend_kwargs): self.backend.set_heads(head=self.head, head96=self.head96) await super().setup(**backend_kwargs) - self.head = {c: TipTracker(thing=f"Channel {c}") for c in range(self.backend.num_channels)} + # Create capabilities with adapter backends + self._lh_cap = PIP(backend=_LHAdapter(self.backend)) + await self._lh_cap._on_setup() - self.head96 = ( - {c: TipTracker(thing=f"Channel {c}") for c in range(96)} - if self.backend.head96_installed - else {} - ) + if self.backend.head96_installed: + self._head96_cap = Head96Capability( + backend=_Head96Adapter(self.backend), + ) + await self._head96_cap._on_setup() + + # Alias head trackers from capabilities for backward compat + self.head = self._lh_cap.head + self.head96 = self._head96_cap.head if self._head96_cap is not None else {} self.backend.set_heads(head=self.head, head96=self.head96 or None) @@ -439,49 +675,7 @@ async def pick_up_tips( offsets=offsets, ) - not_tip_spots = [ts for ts in tip_spots if not isinstance(ts, TipSpot)] - if len(not_tip_spots) > 0: - raise TypeError(f"Resources must be `TipSpot`s, got {not_tip_spots}") - - # fix arguments - use_channels = use_channels or self._default_use_channels or list(range(len(tip_spots))) - assert len(set(use_channels)) == len(use_channels), "Channels must be unique." - - tips = [tip_spot.get_tip() for tip_spot in tip_spots] - - if not all( - self.backend.can_pick_up_tip(channel, tip) for channel, tip in zip(use_channels, tips) - ): - cannot = [ - channel - for channel, tip in zip(use_channels, tips) - if not self.backend.can_pick_up_tip(channel, tip) - ] - raise RuntimeError(f"Cannot pick up tips on channels {cannot}.") - - # expand default arguments - offsets = offsets or [Coordinate.zero()] * len(tip_spots) - - # checks self._assert_resources_exist(tip_spots) - self._make_sure_channels_exist(use_channels) - assert len(tip_spots) == len(offsets) == len(use_channels), ( - "Number of tips and offsets and use_channels must be equal." - ) - - # create operations - pickups = [ - Pickup(resource=tip_spot, offset=offset, tip=tip) - for tip_spot, offset, tip in zip(tip_spots, offsets, tips) - ] - - # queue operations on the trackers - for channel, op in zip(use_channels, pickups): - if self.head[channel].has_tip: - raise HasTipError("Channel has tip") - if does_tip_tracking() and not op.resource.tracker.is_disabled: - op.resource.tracker.remove_tip() - self.head[channel].add_tip(op.tip, origin=op.resource, commit=False) # fix the backend kwargs extras = self._check_args( @@ -493,33 +687,13 @@ async def pick_up_tips( for extra in extras: del backend_kwargs[extra] - # actually pick up the tips - error: Optional[BaseException] = None - try: - await self.backend.pick_up_tips(ops=pickups, use_channels=use_channels, **backend_kwargs) - except BaseException as e: - error = e - - # determine which channels were successful - successes = [error is None] * len(pickups) - if error is not None: - try: - tip_presence = await self.backend.request_tip_presence() - successes = [tip_presence[ch] is True for ch in use_channels] - except Exception as tip_presence_error: - if not isinstance(tip_presence_error, NotImplementedError): - logger.warning("Failed to query tip presence after error: %s", tip_presence_error) - if isinstance(error, ChannelizedError): - successes = [channel_idx not in error.errors for channel_idx in use_channels] - - # commit or rollback the state trackers - for channel, op, success in zip(use_channels, pickups, successes): - if does_tip_tracking() and not op.resource.tracker.is_disabled: - (op.resource.tracker.commit if success else op.resource.tracker.rollback)() - (self.head[channel].commit if success else self.head[channel].rollback)() - - if error is not None: - raise error + assert self._lh_cap is not None + await self._lh_cap.pick_up_tips( + tip_spots=tip_spots, + use_channels=use_channels, + offsets=offsets, + backend_params=_DictBackendParams(kwargs=backend_kwargs) if backend_kwargs else None, + ) def get_mounted_tips(self) -> List[Optional[Tip]]: """Get the tips currently mounted on the head. @@ -588,46 +762,7 @@ async def drop_tips( allow_nonzero_volume=allow_nonzero_volume, ) - not_tip_spots = [ts for ts in tip_spots if not isinstance(ts, (TipSpot, Trash))] - if len(not_tip_spots) > 0: - raise TypeError(f"Resources must be `TipSpot`s or Trash, got {not_tip_spots}") - - # fix arguments - use_channels = use_channels or self._default_use_channels or list(range(len(tip_spots))) - assert len(set(use_channels)) == len(use_channels), "Channels must be unique." - - tips = [] - for channel in use_channels: - tip = self.head[channel].get_tip() - if tip.tracker.get_used_volume() > 0 and not allow_nonzero_volume: - raise RuntimeError(f"Cannot drop tip with volume {tip.tracker.get_used_volume()}") - tips.append(tip) - - # expand default arguments - offsets = offsets or [Coordinate.zero()] * len(tip_spots) - - # checks self._assert_resources_exist(tip_spots) - self._make_sure_channels_exist(use_channels) - assert len(tip_spots) == len(offsets) == len(use_channels) == len(tips), ( - "Number of channels and offsets and use_channels and tips must be equal." - ) - - # create operations - drops = [ - Drop(resource=tip_spot, offset=offset, tip=tip) - for tip_spot, tip, offset in zip(tip_spots, tips, offsets) - ] - - # queue operations on the trackers - for channel, op in zip(use_channels, drops): - if ( - does_tip_tracking() - and isinstance(op.resource, TipSpot) - and not op.resource.tracker.is_disabled - ): - op.resource.tracker.add_tip(op.tip, commit=False) - self.head[channel].remove_tip() # fix the backend kwargs extras = self._check_args( @@ -639,37 +774,14 @@ async def drop_tips( for extra in extras: del backend_kwargs[extra] - # actually drop the tips - error: Optional[BaseException] = None - try: - await self.backend.drop_tips(ops=drops, use_channels=use_channels, **backend_kwargs) - except BaseException as e: - error = e - - # determine which channels were successful - successes = [error is None] * len(drops) - if error is not None: - try: - tip_presence = await self.backend.request_tip_presence() - successes = [tip_presence[ch] is False for ch in use_channels] - except Exception as tip_presence_error: - if not isinstance(tip_presence_error, NotImplementedError): - logger.warning("Failed to query tip presence after error: %s", tip_presence_error) - if isinstance(error, ChannelizedError): - successes = [channel_idx not in error.errors for channel_idx in use_channels] - - # commit or rollback the state trackers - for channel, op, success in zip(use_channels, drops, successes): - if ( - does_tip_tracking() - and isinstance(op.resource, TipSpot) - and not op.resource.tracker.is_disabled - ): - (op.resource.tracker.commit if success else op.resource.tracker.rollback)() - (self.head[channel].commit if success else self.head[channel].rollback)() - - if error is not None: - raise error + assert self._lh_cap is not None + await self._lh_cap.drop_tips( + tip_spots=tip_spots, + use_channels=use_channels, + offsets=offsets, + allow_nonzero_volume=allow_nonzero_volume, + backend_params=_DictBackendParams(kwargs=backend_kwargs) if backend_kwargs else None, + ) async def return_tips( self, @@ -902,98 +1014,7 @@ async def aspirate( blow_out_air_volume=blow_out_air_volume, ) - self._check_containers(resources) - - use_channels = use_channels or self._default_use_channels or list(range(len(resources))) - assert len(set(use_channels)) == len(use_channels), "Channels must be unique." - - # expand default arguments - offsets = offsets or [Coordinate.zero()] * len(use_channels) - flow_rates = flow_rates or [None] * len(use_channels) - liquid_height = liquid_height or [None] * len(use_channels) - blow_out_air_volume = blow_out_air_volume or [None] * len(use_channels) - - # Convert everything to floats to handle exotic number types - vols = [float(v) for v in vols] - flow_rates = [float(fr) if fr is not None else None for fr in flow_rates] - liquid_height = [float(lh) if lh is not None else None for lh in liquid_height] - blow_out_air_volume = [float(bav) if bav is not None else None for bav in blow_out_air_volume] - - self._blow_out_air_volume = blow_out_air_volume - tips = [self.head[channel].get_tip() for channel in use_channels] - - # Checks - for resource in resources: - if isinstance(resource.parent, Plate) and resource.parent.has_lid(): - raise ValueError("Aspirating from a well with a lid is not supported.") - - self._make_sure_channels_exist(use_channels) - for n, p in [ - ("resources", resources), - ("vols", vols), - ("offsets", offsets), - ("flow_rates", flow_rates), - ("liquid_height", liquid_height), - ("blow_out_air_volume", blow_out_air_volume), - ]: - if len(p) != len(use_channels): - raise ValueError( - f"Length of {n} must match length of use_channels: {len(p)} != {len(use_channels)}" - ) - - # If the user specified a single resource, but multiple channels to use, we will assume they - # want to space the channels evenly across the resource. Note that offsets are relative to the - # center of the resource. - if len(set(resources)) == 1: - resource = resources[0] - resources = [resource] * len(use_channels) - if spread == "tight": - center_offsets = get_tight_single_resource_liquid_op_offsets( - resource=resource, num_channels=len(use_channels) - ) - elif spread == "wide": - center_offsets = get_wide_single_resource_liquid_op_offsets( - resource=resource, num_channels=len(use_channels) - ) - elif spread == "custom": - center_offsets = [Coordinate.zero()] * len(use_channels) - else: - raise ValueError("Invalid value for 'spread'. Must be 'tight', 'wide', or 'custom'.") - - # add user defined offsets to the computed centers - offsets = [c + o for c, o in zip(center_offsets, offsets)] - - # create operations - aspirations = [ - SingleChannelAspiration( - resource=r, - volume=v, - offset=o, - flow_rate=fr, - liquid_height=lh, - tip=t, - blow_out_air_volume=bav, - mix=m, - ) - for r, v, o, fr, lh, t, bav, m in zip( - resources, - vols, - offsets, - flow_rates, - liquid_height, - tips, - blow_out_air_volume, - mix or [None] * len(use_channels), # type: ignore - ) - ] - - # queue the operations on the resource (source) and mounted tips (destination) trackers - for op in aspirations: - if does_volume_tracking(): - if not op.resource.tracker.is_disabled: - op.resource.tracker.remove_liquid(op.volume) - op.tip.tracker.add_liquid(volume=op.volume) - + # fix the backend kwargs extras = self._check_args( self.backend.aspirate, backend_kwargs, @@ -1003,28 +1024,21 @@ async def aspirate( for extra in extras: del backend_kwargs[extra] - # actually aspirate the liquid - error: Optional[Exception] = None - try: - await self.backend.aspirate(ops=aspirations, use_channels=use_channels, **backend_kwargs) - except Exception as e: - error = e - - # determine which channels were successful - successes = [error is None] * len(aspirations) - if error is not None and isinstance(error, ChannelizedError): - successes = [channel_idx not in error.errors for channel_idx in use_channels] - - # commit or rollback the state trackers - for channel, op, success in zip(use_channels, aspirations, successes): - if does_volume_tracking(): - if not op.resource.tracker.is_disabled: - (op.resource.tracker.commit if success else op.resource.tracker.rollback)() - tip_volume_tracker = self.head[channel].get_tip().tracker - (tip_volume_tracker.commit if success else tip_volume_tracker.rollback)() - - if error is not None: - raise error + assert self._lh_cap is not None + await self._lh_cap.aspirate( + resources=resources, + vols=vols, + use_channels=use_channels, + flow_rates=flow_rates, + offsets=offsets, + liquid_height=liquid_height, + blow_out_air_volume=blow_out_air_volume, + spread=spread, + mix=[_NewMix(volume=m.volume, repetitions=m.repetitions, flow_rate=m.flow_rate) for m in mix] + if mix is not None + else None, + backend_params=_DictBackendParams(kwargs=backend_kwargs) if backend_kwargs else None, + ) @need_setup_finished async def dispense( @@ -1103,108 +1117,6 @@ async def dispense( blow_out_air_volume=blow_out_air_volume, ) - # If the user specified a single resource, but multiple channels to use, we will assume they - # want to space the channels evenly across the resource. Note that offsets are relative to the - # center of the resource. - - self._check_containers(resources) - - use_channels = use_channels or self._default_use_channels or list(range(len(resources))) - assert len(set(use_channels)) == len(use_channels), "Channels must be unique." - - # expand default arguments - offsets = offsets or [Coordinate.zero()] * len(use_channels) - flow_rates = flow_rates or [None] * len(use_channels) - liquid_height = liquid_height or [None] * len(use_channels) - blow_out_air_volume = blow_out_air_volume or [None] * len(use_channels) - - # Convert everything to floats to handle exotic number types - vols = [float(v) for v in vols] - flow_rates = [float(fr) if fr is not None else None for fr in flow_rates] - liquid_height = [float(lh) if lh is not None else None for lh in liquid_height] - blow_out_air_volume = [float(bav) if bav is not None else None for bav in blow_out_air_volume] - - # If the user specified a single resource, but multiple channels to use, we will assume they - # want to space the channels evenly across the resource. Note that offsets are relative to the - # center of the resource. - if len(set(resources)) == 1: - resource = resources[0] - resources = [resource] * len(use_channels) - if spread == "tight": - center_offsets = get_tight_single_resource_liquid_op_offsets( - resource=resource, num_channels=len(use_channels) - ) - elif spread == "wide": - center_offsets = get_wide_single_resource_liquid_op_offsets( - resource=resource, num_channels=len(use_channels) - ) - elif spread == "custom": - center_offsets = [Coordinate.zero()] * len(use_channels) - else: - raise ValueError("Invalid value for 'spread'. Must be 'tight', 'wide', or 'custom'.") - - # add user defined offsets to the computed centers - offsets = [c + o for c, o in zip(center_offsets, offsets)] - - tips = [self.head[channel].get_tip() for channel in use_channels] - - # Check the blow out air volume with what was aspirated - if does_volume_tracking(): - if any(bav is not None and bav != 0.0 for bav in blow_out_air_volume): - if self._blow_out_air_volume is None: - raise BlowOutVolumeError("No blowout volume was aspirated.") - for requested_bav, done_bav in zip(blow_out_air_volume, self._blow_out_air_volume): - if requested_bav is not None and done_bav is not None and requested_bav > done_bav: - raise BlowOutVolumeError("Blowout volume is larger than aspirated volume") - - for resource in resources: - if isinstance(resource.parent, Plate) and resource.parent.has_lid(): - raise ValueError("Dispensing to plate with lid") - - for n, p in [ - ("resources", resources), - ("vols", vols), - ("offsets", offsets), - ("flow_rates", flow_rates), - ("liquid_height", liquid_height), - ("blow_out_air_volume", blow_out_air_volume), - ]: - if len(p) != len(use_channels): - raise ValueError( - f"Length of {n} must match length of use_channels: {len(p)} != {len(use_channels)}" - ) - - # create operations - dispenses = [ - SingleChannelDispense( - resource=r, - volume=v, - offset=o, - flow_rate=fr, - liquid_height=lh, - tip=t, - blow_out_air_volume=bav, - mix=m, - ) - for r, v, o, fr, lh, t, bav, m in zip( - resources, - vols, - offsets, - flow_rates, - liquid_height, - tips, - blow_out_air_volume, - mix or [None] * len(use_channels), # type: ignore - ) - ] - - # queue the operations on the resource (source) and mounted tips (destination) trackers - for op in dispenses: - if does_volume_tracking(): - if not op.resource.tracker.is_disabled: - op.resource.tracker.add_liquid(volume=op.volume) - op.tip.tracker.remove_liquid(op.volume) - # fix the backend kwargs extras = self._check_args( self.backend.dispense, @@ -1215,31 +1127,21 @@ async def dispense( for extra in extras: del backend_kwargs[extra] - # actually dispense the liquid - error: Optional[Exception] = None - try: - await self.backend.dispense(ops=dispenses, use_channels=use_channels, **backend_kwargs) - except Exception as e: - error = e - - # determine which channels were successful - successes = [error is None] * len(dispenses) - if error is not None and isinstance(error, ChannelizedError): - successes = [channel_idx not in error.errors for channel_idx in use_channels] - - # commit or rollback the state trackers - for channel, op, success in zip(use_channels, dispenses, successes): - if does_volume_tracking(): - if not op.resource.tracker.is_disabled: - (op.resource.tracker.commit if success else op.resource.tracker.rollback)() - tip_volume_tracker = self.head[channel].get_tip().tracker - (tip_volume_tracker.commit if success else tip_volume_tracker.rollback)() - - if any(bav is not None for bav in blow_out_air_volume): - self._blow_out_air_volume = None - - if error is not None: - raise error + assert self._lh_cap is not None + await self._lh_cap.dispense( + resources=resources, + vols=vols, + use_channels=use_channels, + flow_rates=flow_rates, + offsets=offsets, + liquid_height=liquid_height, + blow_out_air_volume=blow_out_air_volume, + spread=spread, + mix=[_NewMix(volume=m.volume, repetitions=m.repetitions, flow_rate=m.flow_rate) for m in mix] + if mix is not None + else None, + backend_params=_DictBackendParams(kwargs=backend_kwargs) if backend_kwargs else None, + ) async def transfer( self, @@ -1354,11 +1256,15 @@ def use_channels(self, channels: List[int]): """ self._default_use_channels = channels + if self._lh_cap is not None: + self._lh_cap._default_use_channels = channels try: yield finally: self._default_use_channels = None + if self._lh_cap is not None: + self._lh_cap._default_use_channels = None @contextlib.asynccontextmanager async def use_tips( @@ -1444,46 +1350,18 @@ async def pick_up_tips96( offset=offset, ) - if not isinstance(tip_rack, TipRack): - raise TypeError(f"Resource must be a TipRack, got {tip_rack}") - if not tip_rack.num_items == 96: - raise ValueError("Tip rack must have 96 tips") - extras = self._check_args( self.backend.pick_up_tips96, backend_kwargs, default={"pickup"}, strictness=get_strictness() ) for extra in extras: del backend_kwargs[extra] - # queue operation on all tip trackers - tips: List[Optional[Tip]] = [] - for i, tip_spot in enumerate(tip_rack.get_all_items()): - if not does_tip_tracking() and self.head96[i].has_tip: - self.head96[i].remove_tip() - # only add tips where there is one present. - # it's possible only some tips are present in the tip rack. - if tip_spot.has_tip(): - self.head96[i].add_tip(tip_spot.get_tip(), origin=tip_spot, commit=False) - tips.append(tip_spot.get_tip()) - else: - tips.append(None) - if does_tip_tracking() and not tip_spot.tracker.is_disabled and tip_spot.has_tip(): - tip_spot.tracker.remove_tip() - - pickup_operation = PickupTipRack(resource=tip_rack, offset=offset, tips=tips) - try: - await self.backend.pick_up_tips96(pickup=pickup_operation, **backend_kwargs) - except Exception as error: - for i, tip_spot in enumerate(tip_rack.get_all_items()): - if does_tip_tracking() and not tip_spot.tracker.is_disabled: - tip_spot.tracker.rollback() - self.head96[i].rollback() - raise error - else: - for i, tip_spot in enumerate(tip_rack.get_all_items()): - if does_tip_tracking() and not tip_spot.tracker.is_disabled: - tip_spot.tracker.commit() - self.head96[i].commit() + assert self._head96_cap is not None + await self._head96_cap.pick_up_tips( + tip_rack=tip_rack, + offset=offset, + backend_params=_DictBackendParams(kwargs=backend_kwargs) if backend_kwargs else None, + ) async def drop_tips96( self, @@ -1522,50 +1400,19 @@ async def drop_tips96( allow_nonzero_volume=allow_nonzero_volume, ) - if not isinstance(resource, (TipRack, Trash)): - raise TypeError(f"Resource must be a TipRack or Trash, got {resource}") - if isinstance(resource, TipRack) and not resource.num_items == 96: - raise ValueError("Tip rack must have 96 tips") - extras = self._check_args( self.backend.drop_tips96, backend_kwargs, default={"drop"}, strictness=get_strictness() ) for extra in extras: del backend_kwargs[extra] - # queue operation on all tip trackers - for i in range(96): - # it's possible not every channel on this head has a tip. - if not self.head96[i].has_tip: - continue - tip = self.head96[i].get_tip() - if tip.tracker.get_used_volume() > 0 and not allow_nonzero_volume and does_volume_tracking(): - error = f"Cannot drop tip with volume {tip.tracker.get_used_volume()} on channel {i}" - raise RuntimeError(error) - if isinstance(resource, TipRack): - tip_spot = resource.get_item(i) - if does_tip_tracking() and not tip_spot.tracker.is_disabled: - tip_spot.tracker.add_tip(tip, commit=False) - self.head96[i].remove_tip() - - drop_operation = DropTipRack(resource=resource, offset=offset) - try: - await self.backend.drop_tips96(drop=drop_operation, **backend_kwargs) - except Exception as e: - for i in range(96): - if isinstance(resource, TipRack): - tip_spot = resource.get_item(i) - if does_tip_tracking() and not tip_spot.tracker.is_disabled: - tip_spot.tracker.rollback() - self.head96[i].rollback() - raise e - else: - for i in range(96): - if isinstance(resource, TipRack): - tip_spot = resource.get_item(i) - if does_tip_tracking() and not tip_spot.tracker.is_disabled: - tip_spot.tracker.commit() - self.head96[i].commit() + assert self._head96_cap is not None + await self._head96_cap.drop_tips( + resource=resource, + offset=offset, + allow_nonzero_volume=allow_nonzero_volume, + backend_params=_DictBackendParams(kwargs=backend_kwargs) if backend_kwargs else None, + ) def _get_96_head_origin_tip_rack(self) -> Optional[TipRack]: """Get the tip rack where the tips on the 96 head were picked up. If no tips were picked up, @@ -1707,111 +1554,25 @@ async def aspirate96( mix=mix, ) - if not ( - isinstance(resource, (Plate, Container)) - or (isinstance(resource, list) and all(isinstance(w, Well) for w in resource)) - ): - raise TypeError(f"Resource must be a Plate, Container, or list of Wells, got {resource}") - extras = self._check_args( self.backend.aspirate96, backend_kwargs, default={"aspiration"}, strictness=get_strictness() ) for extra in extras: del backend_kwargs[extra] - tips = [channel.get_tip() if channel.has_tip else None for channel in self.head96.values()] - aspiration: Union[MultiHeadAspirationPlate, MultiHeadAspirationContainer] - - # Convert everything to floats to handle exotic number types - volume = float(volume) - flow_rate = float(flow_rate) if flow_rate is not None else None - blow_out_air_volume = float(blow_out_air_volume) if blow_out_air_volume is not None else None - - # Convert Plate to either one Container (single well) or a list of Wells - containers: Sequence[Container] - if isinstance(resource, Plate): - if resource.has_lid(): - raise ValueError("Aspirating from plate with lid") - containers = resource.get_all_items() if resource.num_items > 1 else [resource.get_item(0)] - elif isinstance(resource, Container): - containers = [resource] - elif isinstance(resource, list) and all(isinstance(w, Well) for w in resource): - containers = resource - else: - raise TypeError( - f"Resource must be a Plate, Container, or list of Wells, got {type(resource)} " - f" for {resource}" - ) - - if len(containers) == 1: # single container - container = containers[0] - if not self._check_96_head_fits_in_container(container): - raise ValueError("Container too small to accommodate 96 head") - - for tip in tips: - if tip is None: - continue - - if not container.tracker.is_disabled and does_volume_tracking(): - container.tracker.remove_liquid(volume=volume) - tip.tracker.add_liquid(volume=volume) - - aspiration = MultiHeadAspirationContainer( - container=container, - volume=volume, - offset=offset, - flow_rate=flow_rate, - tips=tips, - liquid_height=liquid_height, - blow_out_air_volume=blow_out_air_volume, - mix=mix, - ) - else: # multiple containers - # ensure that wells are all in the same plate - plate = containers[0].parent - for well in containers: - if well.parent != plate: - raise ValueError("All wells must be in the same plate") - - if not len(containers) == 96: - raise ValueError(f"aspirate96 expects 96 containers when a list, got {len(containers)}") - - for well, tip in zip(containers, tips): - if tip is None: - continue - - if not well.tracker.is_disabled and does_volume_tracking(): - well.tracker.remove_liquid(volume=volume) - tip.tracker.add_liquid(volume=volume) - - aspiration = MultiHeadAspirationPlate( - wells=cast(List[Well], containers), - volume=volume, - offset=offset, - flow_rate=flow_rate, - tips=tips, - liquid_height=liquid_height, - blow_out_air_volume=blow_out_air_volume, - mix=mix, - ) - - try: - await self.backend.aspirate96(aspiration=aspiration, **backend_kwargs) - except Exception: - for tip in tips: - if tip is not None: - tip.tracker.rollback() - for container in containers: - if does_volume_tracking() and not container.tracker.is_disabled: - container.tracker.rollback() - raise - else: - for tip in tips: - if tip is not None: - tip.tracker.commit() - for container in containers: - if does_volume_tracking() and not container.tracker.is_disabled: - container.tracker.commit() + assert self._head96_cap is not None + await self._head96_cap.aspirate( + resource=resource, + volume=volume, + offset=offset, + flow_rate=flow_rate, + liquid_height=liquid_height, + blow_out_air_volume=blow_out_air_volume, + mix=_NewMix(volume=mix.volume, repetitions=mix.repetitions, flow_rate=mix.flow_rate) + if mix is not None + else None, + backend_params=_DictBackendParams(kwargs=backend_kwargs) if backend_kwargs else None, + ) async def dispense96( self, @@ -1855,117 +1616,25 @@ async def dispense96( mix=mix, ) - if not ( - isinstance(resource, (Plate, Container)) - or (isinstance(resource, list) and all(isinstance(w, Well) for w in resource)) - ): - raise TypeError(f"Resource must be a Plate, Container, or list of Wells, got {resource}") - extras = self._check_args( self.backend.dispense96, backend_kwargs, default={"dispense"}, strictness=get_strictness() ) for extra in extras: del backend_kwargs[extra] - tips = [channel.get_tip() if channel.has_tip else None for channel in self.head96.values()] - dispense: Union[MultiHeadDispensePlate, MultiHeadDispenseContainer] - - # Convert everything to floats to handle exotic number types - volume = float(volume) - flow_rate = float(flow_rate) if flow_rate is not None else None - blow_out_air_volume = float(blow_out_air_volume) if blow_out_air_volume is not None else None - - # Convert Plate to either one Container (single well) or a list of Wells - containers: Sequence[Container] - if isinstance(resource, Plate): - if resource.has_lid(): - raise ValueError("Dispensing to plate with lid is not possible. Remove the lid first.") - containers = resource.get_all_items() if resource.num_items > 1 else [resource.get_item(0)] - elif isinstance(resource, Container): - containers = [resource] - elif isinstance(resource, list) and all(isinstance(w, Well) for w in resource): - containers = resource - else: - raise TypeError( - f"Resource must be a Plate, Container, or list of Wells, got {type(resource)} " - f"for {resource}" - ) - - # if we have enough liquid in the tip, remove it from the tip tracker for accounting. - # if we do not (for example because the plunger was up on tip pickup), and we - # do not have volume tracking enabled, we just ignore it. - for tip in tips: - if tip is None: - continue - - if does_volume_tracking(): - tip.tracker.remove_liquid(volume=volume) - elif tip.tracker.get_used_volume() < volume: - tip.tracker.remove_liquid(volume=min(tip.tracker.get_used_volume(), volume)) - - if len(containers) == 1: # single container - container = containers[0] - if not self._check_96_head_fits_in_container(container): - raise ValueError("Container too small to accommodate 96 head") - - if not container.tracker.is_disabled and does_volume_tracking(): - container.tracker.add_liquid(volume=len([t for t in tips if t is not None]) * volume) - - dispense = MultiHeadDispenseContainer( - container=container, - volume=volume, - offset=offset, - flow_rate=flow_rate, - tips=tips, - liquid_height=liquid_height, - blow_out_air_volume=blow_out_air_volume, - mix=mix, - ) - else: - # ensure that wells are all in the same plate - plate = containers[0].parent - for well in containers: - if well.parent != plate: - raise ValueError("All wells must be in the same plate") - - if not len(containers) == 96: - raise ValueError(f"dispense96 expects 96 wells, got {len(containers)}") - - for well, tip in zip(containers, tips): - if tip is None: - continue - - if not well.tracker.is_disabled and does_volume_tracking(): - well.tracker.add_liquid(volume=volume) - - dispense = MultiHeadDispensePlate( - wells=cast(List[Well], containers), - volume=volume, - offset=offset, - flow_rate=flow_rate, - tips=tips, - liquid_height=liquid_height, - blow_out_air_volume=blow_out_air_volume, - mix=mix, - ) - - try: - await self.backend.dispense96(dispense=dispense, **backend_kwargs) - except Exception: - for tip in tips: - if tip is not None: - tip.tracker.rollback() - for container in containers: - if does_volume_tracking() and not container.tracker.is_disabled: - container.tracker.rollback() - raise - else: - for tip in tips: - if tip is not None: - tip.tracker.commit() - for container in containers: - if does_volume_tracking() and not container.tracker.is_disabled: - container.tracker.commit() + assert self._head96_cap is not None + await self._head96_cap.dispense( + resource=resource, + volume=volume, + offset=offset, + flow_rate=flow_rate, + liquid_height=liquid_height, + blow_out_air_volume=blow_out_air_volume, + mix=_NewMix(volume=mix.volume, repetitions=mix.repetitions, flow_rate=mix.flow_rate) + if mix is not None + else None, + backend_params=_DictBackendParams(kwargs=backend_kwargs) if backend_kwargs else None, + ) async def stamp( self, @@ -2002,7 +1671,7 @@ async def stamp( ), "Source and target plates must be the same shape" await self.aspirate96(resource=source, volume=volume, flow_rate=aspiration_flow_rate) - await self.dispense96(resource=source, volume=volume, flow_rate=dispense_flow_rate) + await self.dispense96(resource=target, volume=volume, flow_rate=dispense_flow_rate) async def pick_up_resource( self,