diff --git a/pylabrobot/arms/__init__.py b/pylabrobot/arms/__init__.py index 60125af2e6e..87baab3c500 100644 --- a/pylabrobot/arms/__init__.py +++ b/pylabrobot/arms/__init__.py @@ -1,3 +1,6 @@ from .precise_flex import * from .scara import * +from .six_axis import * +from .six_axis_backend import * from .standard import * +from .xarm6 import * diff --git a/pylabrobot/arms/six_axis.py b/pylabrobot/arms/six_axis.py new file mode 100644 index 00000000000..4ac40431c21 --- /dev/null +++ b/pylabrobot/arms/six_axis.py @@ -0,0 +1,129 @@ +from typing import Dict, Optional, Union + +from pylabrobot.arms.backend import AccessPattern +from pylabrobot.arms.six_axis_backend import SixAxisBackend +from pylabrobot.arms.standard import CartesianCoords +from pylabrobot.machines.machine import Machine + + +class SixAxisArm(Machine): + """A 6-axis robotic arm frontend.""" + + def __init__(self, backend: SixAxisBackend): + super().__init__(backend=backend) + self.backend: SixAxisBackend = backend + self._in_freedrive = False + + async def _ensure_not_freedrive(self): + if self._in_freedrive: + await self.end_freedrive_mode() + + async def move_to( + self, + position: Union[CartesianCoords, Dict[int, float]], + **backend_kwargs, + ) -> None: + """Move the arm to a specified position in Cartesian or joint space.""" + await self._ensure_not_freedrive() + return await self.backend.move_to(position, **backend_kwargs) + + async def get_joint_position(self, **backend_kwargs) -> Dict[int, float]: + """Get the current position of the arm in joint space.""" + return await self.backend.get_joint_position(**backend_kwargs) + + async def get_cartesian_position(self, **backend_kwargs) -> CartesianCoords: + """Get the current position of the arm in Cartesian space.""" + return await self.backend.get_cartesian_position(**backend_kwargs) + + async def open_gripper(self, position: int, speed: int = 0, **backend_kwargs) -> None: + """Open the arm's gripper. + + Args: + position: Target open position (gripper-specific units). + speed: Gripper speed (0 = default/max). + """ + await self._ensure_not_freedrive() + return await self.backend.open_gripper(position=position, speed=speed, **backend_kwargs) + + async def close_gripper(self, position: int, speed: int = 0, **backend_kwargs) -> None: + """Close the arm's gripper. + + Args: + position: Target close position (gripper-specific units). + speed: Gripper speed (0 = default/max). + """ + await self._ensure_not_freedrive() + return await self.backend.close_gripper(position=position, speed=speed, **backend_kwargs) + + async def halt(self, **backend_kwargs) -> None: + """Emergency stop any ongoing movement of the arm.""" + await self._ensure_not_freedrive() + return await self.backend.halt(**backend_kwargs) + + async def home(self, **backend_kwargs) -> None: + """Home the arm to its default position.""" + await self._ensure_not_freedrive() + return await self.backend.home(**backend_kwargs) + + async def move_to_safe(self, **backend_kwargs) -> None: + """Move the arm to a predefined safe position.""" + await self._ensure_not_freedrive() + return await self.backend.move_to_safe(**backend_kwargs) + + async def approach( + self, + position: Union[CartesianCoords, Dict[int, float]], + access: Optional[AccessPattern] = None, + **backend_kwargs, + ) -> None: + """Move the arm to an approach position (offset from target). + + Args: + position: Target position (CartesianCoords or joint position dict) + access: Access pattern defining how to approach the target. + Defaults to VerticalAccess() if not specified. + """ + await self._ensure_not_freedrive() + return await self.backend.approach(position, access=access, **backend_kwargs) + + async def pick_up_resource( + self, + position: Union[CartesianCoords, Dict[int, float]], + access: Optional[AccessPattern] = None, + **backend_kwargs, + ) -> None: + """Pick a resource from the specified position. + + Args: + position: Target position for pickup + access: Access pattern defining how to approach and retract. + Defaults to VerticalAccess() if not specified. + """ + await self._ensure_not_freedrive() + return await self.backend.pick_up_resource(position=position, access=access, **backend_kwargs) + + async def drop_resource( + self, + position: Union[CartesianCoords, Dict[int, float]], + access: Optional[AccessPattern] = None, + **backend_kwargs, + ) -> None: + """Place a resource at the specified position. + + Args: + position: Target position for placement + access: Access pattern defining how to approach and retract. + Defaults to VerticalAccess() if not specified. + """ + await self._ensure_not_freedrive() + return await self.backend.drop_resource(position, access=access, **backend_kwargs) + + async def freedrive_mode(self, **backend_kwargs) -> None: + """Enter freedrive mode, allowing manual movement of all joints.""" + await self.backend.freedrive_mode(**backend_kwargs) + self._in_freedrive = True + + async def end_freedrive_mode(self, **backend_kwargs) -> None: + """Exit freedrive mode.""" + await self.backend.end_freedrive_mode(**backend_kwargs) + self._in_freedrive = False diff --git a/pylabrobot/arms/six_axis_backend.py b/pylabrobot/arms/six_axis_backend.py new file mode 100644 index 00000000000..22f13a2eb40 --- /dev/null +++ b/pylabrobot/arms/six_axis_backend.py @@ -0,0 +1,102 @@ +from abc import ABCMeta, abstractmethod +from typing import Dict, Optional, Union + +from pylabrobot.arms.backend import AccessPattern +from pylabrobot.arms.standard import CartesianCoords +from pylabrobot.machines.backend import MachineBackend + + +class SixAxisBackend(MachineBackend, metaclass=ABCMeta): + """Backend for a 6-axis robotic arm.""" + + @abstractmethod + async def open_gripper(self, position: int, speed: int = 0) -> None: + """Open the arm's gripper. + + Args: + position: Target open position (gripper-specific units). + speed: Gripper speed (0 = default/max). + """ + + @abstractmethod + async def close_gripper(self, position: int, speed: int = 0) -> None: + """Close the arm's gripper. + + Args: + position: Target close position (gripper-specific units). + speed: Gripper speed (0 = default/max). + """ + + @abstractmethod + async def halt(self) -> None: + """Emergency stop any ongoing movement of the arm.""" + + @abstractmethod + async def home(self) -> None: + """Home the arm to its default position.""" + + @abstractmethod + async def move_to_safe(self) -> None: + """Move the arm to a predefined safe position.""" + + @abstractmethod + async def approach( + self, + position: Union[CartesianCoords, Dict[int, float]], + access: Optional[AccessPattern] = None, + ) -> None: + """Move the arm to an approach position (offset from target). + + Args: + position: Target position (CartesianCoords or joint position dict) + access: Access pattern defining how to approach the target. + Defaults to VerticalAccess() if not specified. + """ + + @abstractmethod + async def pick_up_resource( + self, + position: Union[CartesianCoords, Dict[int, float]], + access: Optional[AccessPattern] = None, + ) -> None: + """Pick a resource from the specified position. + + Args: + position: Target position for pickup + access: Access pattern defining how to approach and retract. + Defaults to VerticalAccess() if not specified. + """ + + @abstractmethod + async def drop_resource( + self, + position: Union[CartesianCoords, Dict[int, float]], + access: Optional[AccessPattern] = None, + ) -> None: + """Place a resource at the specified position. + + Args: + position: Target position for placement + access: Access pattern defining how to approach and retract. + Defaults to VerticalAccess() if not specified. + """ + + @abstractmethod + async def move_to(self, position: Union[CartesianCoords, Dict[int, float]]) -> None: + """Move the arm to a specified position in Cartesian or joint space.""" + + @abstractmethod + async def get_joint_position(self) -> Dict[int, float]: + """Get the current position of the arm in joint space.""" + + @abstractmethod + async def get_cartesian_position(self) -> CartesianCoords: + """Get the current position of the arm in Cartesian space.""" + + @abstractmethod + async def freedrive_mode(self) -> None: + """Enter freedrive mode, allowing manual movement of all joints.""" + + @abstractmethod + async def end_freedrive_mode(self) -> None: + """Exit freedrive mode.""" diff --git a/pylabrobot/arms/six_axis_tests.py b/pylabrobot/arms/six_axis_tests.py new file mode 100644 index 00000000000..01481f566f2 --- /dev/null +++ b/pylabrobot/arms/six_axis_tests.py @@ -0,0 +1,106 @@ +import unittest +from unittest.mock import AsyncMock, MagicMock + +from pylabrobot.arms.six_axis import SixAxisArm +from pylabrobot.arms.six_axis_backend import SixAxisBackend +from pylabrobot.arms.standard import CartesianCoords +from pylabrobot.resources import Coordinate, Rotation + + +class TestSixAxisArm(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self.mock_backend = MagicMock(spec=SixAxisBackend) + for method_name in [ + "move_to", + "get_joint_position", + "get_cartesian_position", + "open_gripper", + "close_gripper", + "halt", + "home", + "move_to_safe", + "approach", + "pick_up_resource", + "drop_resource", + "freedrive_mode", + "end_freedrive_mode", + ]: + setattr(self.mock_backend, method_name, AsyncMock()) + self.arm = SixAxisArm(backend=self.mock_backend) + + async def test_move_to(self): + position = CartesianCoords( + location=Coordinate(x=100, y=200, z=300), rotation=Rotation(x=0, y=0, z=0) + ) + await self.arm.move_to(position) + self.mock_backend.move_to.assert_called_once_with(position) + + async def test_get_joint_position(self): + await self.arm.get_joint_position() + self.mock_backend.get_joint_position.assert_called_once() + + async def test_get_cartesian_position(self): + await self.arm.get_cartesian_position() + self.mock_backend.get_cartesian_position.assert_called_once() + + async def test_open_gripper(self): + await self.arm.open_gripper(position=850, speed=5) + self.mock_backend.open_gripper.assert_called_once_with(position=850, speed=5) + + async def test_close_gripper(self): + await self.arm.close_gripper(position=100, speed=5) + self.mock_backend.close_gripper.assert_called_once_with(position=100, speed=5) + + async def test_halt(self): + await self.arm.halt() + self.mock_backend.halt.assert_called_once() + + async def test_home(self): + await self.arm.home() + self.mock_backend.home.assert_called_once() + + async def test_move_to_safe(self): + await self.arm.move_to_safe() + self.mock_backend.move_to_safe.assert_called_once() + + async def test_approach(self): + position = CartesianCoords( + location=Coordinate(x=100, y=200, z=300), rotation=Rotation(x=0, y=0, z=0) + ) + await self.arm.approach(position) + self.mock_backend.approach.assert_called_once_with(position, access=None) + + async def test_pick_up_resource(self): + position = CartesianCoords( + location=Coordinate(x=100, y=200, z=300), rotation=Rotation(x=0, y=0, z=0) + ) + await self.arm.pick_up_resource(position) + self.mock_backend.pick_up_resource.assert_called_once_with(position=position, access=None) + + async def test_drop_resource(self): + position = CartesianCoords( + location=Coordinate(x=100, y=200, z=300), rotation=Rotation(x=0, y=0, z=0) + ) + await self.arm.drop_resource(position) + self.mock_backend.drop_resource.assert_called_once_with(position, access=None) + + async def test_freedrive_mode(self): + await self.arm.freedrive_mode() + self.mock_backend.freedrive_mode.assert_called_once() + self.assertTrue(self.arm._in_freedrive) + + async def test_end_freedrive_mode(self): + self.arm._in_freedrive = True + await self.arm.end_freedrive_mode() + self.mock_backend.end_freedrive_mode.assert_called_once() + self.assertFalse(self.arm._in_freedrive) + + async def test_auto_exit_freedrive_on_move(self): + self.arm._in_freedrive = True + position = CartesianCoords( + location=Coordinate(x=100, y=200, z=300), rotation=Rotation(x=0, y=0, z=0) + ) + await self.arm.move_to(position) + self.mock_backend.end_freedrive_mode.assert_called_once() + self.assertFalse(self.arm._in_freedrive) + self.mock_backend.move_to.assert_called_once_with(position) diff --git a/pylabrobot/arms/xarm6/__init__.py b/pylabrobot/arms/xarm6/__init__.py new file mode 100644 index 00000000000..0a7d6b652ac --- /dev/null +++ b/pylabrobot/arms/xarm6/__init__.py @@ -0,0 +1,2 @@ +from .joints import * +from .xarm6_backend import * diff --git a/pylabrobot/arms/xarm6/example_pick_place.py b/pylabrobot/arms/xarm6/example_pick_place.py new file mode 100644 index 00000000000..c3421e7cb21 --- /dev/null +++ b/pylabrobot/arms/xarm6/example_pick_place.py @@ -0,0 +1,247 @@ +"""xArm 6 Pick & Place Walkthrough + +Interactive script that steps through: +1. Connecting and initializing the xArm 6 +2. Homing the robot +3. Teaching pick, place, and safe positions via freedrive mode +4. Saving/loading taught positions to a local JSON file +5. Running a pick-and-place cycle with vertical access + +On first run, you teach positions by hand. On subsequent runs, you can +reuse saved positions or re-teach them. + +Usage: + python -m pylabrobot.arms.xarm6.example_pick_place + +Before running: + - Install the xArm SDK: pip install xarm-python-sdk + - Update ROBOT_IP below to match your xArm's IP address + - Adjust TCP_OFFSET for your bio-gripper mount + - Ensure the workspace is clear and safe +""" + +from __future__ import annotations + +import asyncio +import json +import os + +from pylabrobot.arms.backend import VerticalAccess +from pylabrobot.arms.six_axis import SixAxisArm +from pylabrobot.arms.standard import CartesianCoords +from pylabrobot.arms.xarm6.xarm6_backend import XArm6Backend +from pylabrobot.resources import Coordinate, Rotation + +# ── Configuration ────────────────────────────────────────────── +ROBOT_IP = "192.168.1.220" # Change to your xArm's IP +TCP_OFFSET = (0, 0, 0, 0, 0, 0) # Adjust for bio-gripper mount (x, y, z, roll, pitch, yaw) +POSITIONS_FILE = os.path.join(os.path.dirname(__file__), "taught_positions.json") + + +def coords_to_dict(coords: CartesianCoords) -> dict: + """Serialize a CartesianCoords to a JSON-safe dict.""" + return { + "x": coords.location.x, + "y": coords.location.y, + "z": coords.location.z, + "roll": coords.rotation.x, + "pitch": coords.rotation.y, + "yaw": coords.rotation.z, + } + + +def dict_to_coords(d: dict) -> CartesianCoords: + """Deserialize a dict back to CartesianCoords.""" + return CartesianCoords( + location=Coordinate(x=d["x"], y=d["y"], z=d["z"]), + rotation=Rotation(x=d["roll"], y=d["pitch"], z=d["yaw"]), + ) + + +def save_positions(positions: dict[str, CartesianCoords]) -> None: + """Save taught positions to a JSON file.""" + data = {name: coords_to_dict(coords) for name, coords in positions.items()} + with open(POSITIONS_FILE, "w") as f: + json.dump(data, f, indent=2) + print(f" Positions saved to {POSITIONS_FILE}") + + +def load_positions() -> dict[str, CartesianCoords] | None: + """Load taught positions from JSON file. Returns None if file doesn't exist.""" + if not os.path.exists(POSITIONS_FILE): + return None + with open(POSITIONS_FILE, "r") as f: + data = json.load(f) + return {name: dict_to_coords(d) for name, d in data.items()} + + +def print_position(name: str, coords: CartesianCoords) -> None: + """Print a position nicely.""" + loc = coords.location + rot = coords.rotation + print(f" {name}:") + print(f" Location: x={loc.x:.1f}, y={loc.y:.1f}, z={loc.z:.1f}") + print(f" Rotation: roll={rot.x:.1f}, pitch={rot.y:.1f}, yaw={rot.z:.1f}") + + +async def teach_positions(arm: SixAxisArm) -> dict[str, CartesianCoords]: + """Enter freedrive mode and teach pick, place, and safe positions.""" + positions = {} + + print("\n=== Teach Positions (Freedrive) ===") + print("The robot will enter freedrive mode. Gently guide it by hand.") + input("Press Enter to enable freedrive mode...") + await arm.freedrive_mode() + print("Freedrive enabled.\n") + + # Teach pick position + input("Move the robot to the PICK position, then press Enter...") + pos = await arm.get_cartesian_position() + positions["pick"] = pos + print_position("Pick position saved", pos) + print() + + # Teach place position + input("Move the robot to the PLACE position, then press Enter...") + pos = await arm.get_cartesian_position() + positions["place"] = pos + print_position("Place position saved", pos) + print() + + # Teach safe position + input("Move the robot to the SAFE position, then press Enter...") + pos = await arm.get_cartesian_position() + positions["safe"] = pos + print_position("Safe position saved", pos) + print() + + await arm.end_freedrive_mode() + print("Freedrive disabled.") + + save_positions(positions) + return positions + + +async def main(): + print("=" * 60) + print(" xArm 6 Pick & Place Walkthrough") + print("=" * 60) + print() + print(f"Robot IP: {ROBOT_IP}") + print(f"TCP Offset: {TCP_OFFSET}") + print(f"Positions file: {POSITIONS_FILE}") + print() + + # ── Check for saved positions ───────────────────────────── + saved = load_positions() + need_teach = True + + if saved is not None: + print("Found saved positions:") + for name, coords in saved.items(): + print_position(name, coords) + print() + choice = input("Use saved positions? (y = use saved, n = re-teach): ").strip().lower() + if choice == "y": + need_teach = False + positions = saved + print() + + # ── Connect ─────────────────────────────────────────────── + backend = XArm6Backend( + ip=ROBOT_IP, + default_speed=100, + default_mvacc=2000, + tcp_offset=TCP_OFFSET, + ) + arm = SixAxisArm(backend=backend) + + print("=== Connect & Initialize ===") + input("Press Enter to connect to the xArm 6...") + await arm.setup() + print("Connected and initialized.\n") + + try: + # ── Home ────────────────────────────────────────────────── + print("=== Home Robot ===") + print("The robot will move to its zero/home position.") + input("Press Enter to home the robot (it WILL move)...") + await arm.home() + pos = await arm.get_cartesian_position() + print(f" Home position: {pos.location}\n") + + # ── Teach or load positions ─────────────────────────────── + if need_teach: + positions = await teach_positions(arm) + + # Set the safe position on the backend + backend._safe_position = positions["safe"] + + # ── Pick & Place cycle ──────────────────────────────────── + print("\n=== Pick & Place (Vertical Access) ===") + access = VerticalAccess(approach_height_mm=80, clearance_mm=80, gripper_offset_mm=10) + print( + f" Access: approach={access.approach_height_mm}mm, " + f"clearance={access.clearance_mm}mm, " + f"offset={access.gripper_offset_mm}mm" + ) + + pick_pos = positions["pick"] + place_pos = positions["place"] + print_position("Pick from", pick_pos) + print_position("Place at", place_pos) + print() + + # Move to safe + input("Press Enter to move to safe position...") + await arm.move_to_safe() + print(" At safe position.") + + # Pick + input("\nPress Enter to start PICK sequence...") + print(" Approaching pick position...") + await arm.approach(pick_pos, access=access) + print(" Picking up resource...") + await arm.pick_up_resource(pick_pos, access=access) + print(" Pick complete!") + + # Safe transit + input("\nPress Enter to move to safe position (carrying resource)...") + await arm.move_to_safe() + print(" At safe position.") + + # Place + input("\nPress Enter to start PLACE sequence...") + print(" Approaching place position...") + await arm.approach(place_pos, access=access) + print(" Placing resource...") + await arm.drop_resource(place_pos, access=access) + print(" Place complete!") + + # ── Current state readout ───────────────────────────────── + print("\n=== Current State ===") + joints = await arm.get_joint_position() + cartesian = await arm.get_cartesian_position() + print(" Joint angles (degrees):") + for j, angle in joints.items(): + print(f" J{j}: {angle:.2f}") + print(f" Cartesian: {cartesian.location}") + print( + f" Rotation: roll={cartesian.rotation.x:.1f}, " + f"pitch={cartesian.rotation.y:.1f}, " + f"yaw={cartesian.rotation.z:.1f}" + ) + + # ── Cleanup ─────────────────────────────────────────────── + print("\n=== Cleanup ===") + input("Press Enter to return to safe position and disconnect...") + await arm.move_to_safe() + print(" At safe position.") + + finally: + await arm.stop() + print(" Disconnected. Done!") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/pylabrobot/arms/xarm6/joints.py b/pylabrobot/arms/xarm6/joints.py new file mode 100644 index 00000000000..e93ae39ebbd --- /dev/null +++ b/pylabrobot/arms/xarm6/joints.py @@ -0,0 +1,12 @@ +from enum import IntEnum + + +class XArm6Axis(IntEnum): + """Joint indices for the xArm 6 robot (1-indexed, matching SDK servo_id).""" + + J1 = 1 # Base rotation + J2 = 2 # Shoulder + J3 = 3 # Elbow + J4 = 4 # Wrist roll + J5 = 5 # Wrist pitch + J6 = 6 # Wrist yaw (end effector rotation) diff --git a/pylabrobot/arms/xarm6/taught_positions.json b/pylabrobot/arms/xarm6/taught_positions.json new file mode 100644 index 00000000000..975d9a31cc7 --- /dev/null +++ b/pylabrobot/arms/xarm6/taught_positions.json @@ -0,0 +1,26 @@ +{ + "pick": { + "x": 447.1538, + "y": 113.3208, + "z": 374.5813, + "roll": 179.058249, + "pitch": 1.758751, + "yaw": -0.941198 + }, + "place": { + "x": 235.1209, + "y": 40.6372, + "z": 63.2244, + "roll": 179.696639, + "pitch": 1.776226, + "yaw": -4.400144 + }, + "safe": { + "x": 281.4726, + "y": 66.6849, + "z": 334.2007, + "roll": 179.674637, + "pitch": -0.960048, + "yaw": -1.516963 + } +} \ No newline at end of file diff --git a/pylabrobot/arms/xarm6/xarm6_backend.py b/pylabrobot/arms/xarm6/xarm6_backend.py new file mode 100644 index 00000000000..9ab33af3b6b --- /dev/null +++ b/pylabrobot/arms/xarm6/xarm6_backend.py @@ -0,0 +1,377 @@ +import asyncio +from typing import Any, Dict, Optional, Tuple, Union + +from pylabrobot.arms.backend import AccessPattern, HorizontalAccess, VerticalAccess +from pylabrobot.arms.six_axis_backend import SixAxisBackend +from pylabrobot.arms.standard import CartesianCoords +from pylabrobot.resources import Coordinate, Rotation + + +class XArm6Error(Exception): + """Error raised when the xArm SDK returns a non-zero code.""" + + def __init__(self, code: int, message: str): + self.code = code + super().__init__(f"XArm6Error {code}: {message}") + + +class XArm6Backend(SixAxisBackend): + """Backend for the UFACTORY xArm 6 robotic arm with bio-gripper. + + Uses the xArm Python SDK (xarm-python-sdk) to communicate with the robot + over a network connection. + + Args: + ip: IP address of the xArm controller. + default_speed: Default Cartesian move speed in mm/s. + default_mvacc: Default Cartesian move acceleration in mm/s^2. + default_joint_speed: Default joint move speed in deg/s. + default_joint_mvacc: Default joint move acceleration in deg/s^2. + safe_position: Optional predefined safe position for move_to_safe(). + tcp_offset: Optional TCP offset (x, y, z, roll, pitch, yaw) for the end effector. + tcp_load: Optional payload config as (mass_kg, [cx, cy, cz]). + gripper_open_pos: Default gripper open position for pick/place sequences. + gripper_close_pos: Default gripper close position for pick/place sequences. + """ + + def __init__( + self, + ip: str, + default_speed: float = 100.0, + default_mvacc: float = 2000.0, + default_joint_speed: float = 50.0, + default_joint_mvacc: float = 500.0, + safe_position: Optional[CartesianCoords] = None, + tcp_offset: Optional[Tuple[float, float, float, float, float, float]] = None, + tcp_load: Optional[Tuple[float, list]] = None, + gripper_open_pos: int = 850, + gripper_close_pos: int = 0, + ): + super().__init__() + self._ip = ip + self._arm: Any = None + self._default_speed = default_speed + self._default_mvacc = default_mvacc + self._default_joint_speed = default_joint_speed + self._default_joint_mvacc = default_joint_mvacc + self._safe_position = safe_position + self._tcp_offset = tcp_offset + self._tcp_load = tcp_load + self._gripper_open_pos = gripper_open_pos + self._gripper_close_pos = gripper_close_pos + + # -- Speed/acceleration get/set -- + + @property + def speed(self) -> float: + """Current default Cartesian move speed in mm/s.""" + return self._default_speed + + @speed.setter + def speed(self, value: float) -> None: + self._default_speed = value + + @property + def mvacc(self) -> float: + """Current default Cartesian move acceleration in mm/s^2.""" + return self._default_mvacc + + @mvacc.setter + def mvacc(self, value: float) -> None: + self._default_mvacc = value + + @property + def joint_speed(self) -> float: + """Current default joint move speed in deg/s.""" + return self._default_joint_speed + + @joint_speed.setter + def joint_speed(self, value: float) -> None: + self._default_joint_speed = value + + @property + def joint_mvacc(self) -> float: + """Current default joint move acceleration in deg/s^2.""" + return self._default_joint_mvacc + + @joint_mvacc.setter + def joint_mvacc(self, value: float) -> None: + self._default_joint_mvacc = value + + async def _call_sdk(self, func, *args, **kwargs): + """Run a synchronous xArm SDK call in a thread executor to avoid blocking.""" + return await asyncio.to_thread(func, *args, **kwargs) + + def _check_result(self, code, operation: str = ""): + """Raise XArm6Error if the SDK return code indicates failure.""" + if code != 0: + raise XArm6Error(code, f"Failed during {operation}" if operation else "SDK call failed") + + async def clear_errors(self) -> None: + """Clear errors/warnings and re-enable the robot for motion. + + This runs the full recovery sequence: clean errors, clean warnings, + re-enable motion, set position control mode, and set ready state. + Call this when the robot enters an error/protection state (e.g. code 9). + """ + await self._call_sdk(self._arm.clean_error) + await self._call_sdk(self._arm.clean_warn) + await self._call_sdk(self._arm.motion_enable, True) + await self._call_sdk(self._arm.set_mode, 0) + await self._call_sdk(self._arm.set_state, 0) + + async def setup(self): + """Connect to the xArm and initialize for position control.""" + from xarm.wrapper import XArmAPI # type: ignore[import-not-found] + + self._arm = XArmAPI(self._ip) + await self.clear_errors() + + if self._tcp_offset is not None: + await self._call_sdk(self._arm.set_tcp_offset, list(self._tcp_offset)) + if self._tcp_load is not None: + await self._call_sdk(self._arm.set_tcp_load, self._tcp_load[0], self._tcp_load[1]) + + await self._call_sdk(self._arm.set_gripper_mode, 0) + await self._call_sdk(self._arm.set_gripper_enable, True) + + async def stop(self): + """Disconnect from the xArm.""" + if self._arm is not None: + await self._call_sdk(self._arm.disconnect) + self._arm = None + + async def move_to(self, position: Union[CartesianCoords, Dict[int, float]]) -> None: + """Move the arm to a Cartesian position or joint angles. + + Args: + position: Either a CartesianCoords for Cartesian moves, or a Dict mapping + joint index (1-6) to angle in degrees for joint moves. + """ + if isinstance(position, CartesianCoords): + code = await self._call_sdk( + self._arm.set_position, + x=position.location.x, + y=position.location.y, + z=position.location.z, + roll=position.rotation.x, + pitch=position.rotation.y, + yaw=position.rotation.z, + speed=self._default_speed, + mvacc=self._default_mvacc, + wait=True, + ) + self._check_result(code, "set_position") + elif isinstance(position, dict): + current_code, current_angles = await self._call_sdk(self._arm.get_servo_angle) + self._check_result(current_code, "get_servo_angle") + angles = list(current_angles) + for axis, value in position.items(): + angles[int(axis) - 1] = value + code = await self._call_sdk( + self._arm.set_servo_angle, + angle=angles, + speed=self._default_joint_speed, + mvacc=self._default_joint_mvacc, + wait=True, + ) + self._check_result(code, "set_servo_angle") + else: + raise TypeError(f"Position must be CartesianCoords or Dict[int, float], got {type(position)}") + + async def home(self) -> None: + """Move the arm to its home (zero) position. + + If the robot is in an error state, this will attempt to clear errors + and retry once before raising. + """ + code = await self._call_sdk(self._arm.move_gohome, speed=50, mvacc=5000, wait=True) + if code != 0: + await self.clear_errors() + code = await self._call_sdk(self._arm.move_gohome, speed=50, mvacc=5000, wait=True) + self._check_result(code, "move_gohome") + + async def halt(self) -> None: + """Emergency stop all motion.""" + await self._call_sdk(self._arm.emergency_stop) + + async def move_to_safe(self) -> None: + """Move to the predefined safe position, or home if none is set.""" + if self._safe_position is not None: + await self.move_to(self._safe_position) + else: + await self.home() + + async def get_joint_position(self) -> Dict[int, float]: + """Get current joint angles as {1: j1_deg, 2: j2_deg, ...}.""" + code, angles = await self._call_sdk(self._arm.get_servo_angle) + self._check_result(code, "get_servo_angle") + return {i + 1: angles[i] for i in range(6)} + + async def get_cartesian_position(self) -> CartesianCoords: + """Get the current Cartesian position and orientation.""" + code, pose = await self._call_sdk(self._arm.get_position) + self._check_result(code, "get_position") + return CartesianCoords( + location=Coordinate(x=pose[0], y=pose[1], z=pose[2]), + rotation=Rotation(x=pose[3], y=pose[4], z=pose[5]), + ) + + async def open_gripper(self, position: int, speed: int = 0) -> None: + """Open the gripper to a target position. + + Args: + position: Target open position (gripper-specific units, e.g. 0-850 for xArm gripper). + speed: Gripper speed (0 = default/max). + """ + code = await self._call_sdk(self._arm.set_gripper_position, position, wait=True, speed=speed) + self._check_result(code, "set_gripper_position (open)") + + async def close_gripper(self, position: int, speed: int = 0) -> None: + """Close the gripper to a target position. + + Args: + position: Target close position (gripper-specific units, e.g. 0-850 for xArm gripper). + speed: Gripper speed (0 = default/max). + """ + code = await self._call_sdk(self._arm.set_gripper_position, position, wait=True, speed=speed) + self._check_result(code, "set_gripper_position (close)") + + async def freedrive_mode(self) -> None: + """Enter freedrive (manual teaching) mode.""" + await self._call_sdk(self._arm.set_mode, 2) + await self._call_sdk(self._arm.set_state, 0) + + async def end_freedrive_mode(self) -> None: + """Exit freedrive mode and return to position control.""" + await self._call_sdk(self._arm.set_mode, 0) + await self._call_sdk(self._arm.set_state, 0) + + # -- Access pattern helpers -- + + def _offset_position( + self, + position: CartesianCoords, + dx: float = 0, + dy: float = 0, + dz: float = 0, + ) -> CartesianCoords: + """Create a new CartesianCoords offset from the given position.""" + return CartesianCoords( + location=Coordinate( + x=position.location.x + dx, + y=position.location.y + dy, + z=position.location.z + dz, + ), + rotation=position.rotation, + ) + + def _require_cartesian(self, position, method_name: str) -> CartesianCoords: + if not isinstance(position, CartesianCoords): + raise TypeError(f"{method_name} only supports CartesianCoords for xArm6.") + return position + + # -- Approach -- + + async def approach( + self, + position: Union[CartesianCoords, Dict[int, float]], + access: Optional[AccessPattern] = None, + ) -> None: + """Move to an approach position offset from the target.""" + pos = self._require_cartesian(position, "approach") + if access is None: + access = VerticalAccess() + + if isinstance(access, VerticalAccess): + await self.move_to(self._offset_position(pos, dz=access.approach_height_mm)) + elif isinstance(access, HorizontalAccess): + await self.move_to(self._offset_position(pos, dy=-access.approach_distance_mm)) + else: + raise TypeError(f"Unsupported access pattern: {type(access)}") + + # -- Pick (vertical) -- + + async def _pick_vertical(self, position: CartesianCoords, access: VerticalAccess) -> None: + await self.open_gripper(self._gripper_open_pos) + await self.move_to(self._offset_position(position, dz=access.approach_height_mm)) + await self.move_to(position) + await self.close_gripper(self._gripper_close_pos) + await self.move_to(self._offset_position(position, dz=access.clearance_mm)) + + # -- Place (vertical) -- + + async def _place_vertical(self, position: CartesianCoords, access: VerticalAccess) -> None: + place_z_offset = access.gripper_offset_mm + await self.move_to( + self._offset_position(position, dz=place_z_offset + access.approach_height_mm) + ) + await self.move_to(self._offset_position(position, dz=place_z_offset)) + await self.open_gripper(self._gripper_open_pos) + await self.move_to(self._offset_position(position, dz=place_z_offset + access.clearance_mm)) + + # -- Pick (horizontal) -- + + async def _pick_horizontal(self, position: CartesianCoords, access: HorizontalAccess) -> None: + await self.open_gripper(self._gripper_open_pos) + await self.move_to(self._offset_position(position, dy=-access.approach_distance_mm)) + await self.move_to(position) + await self.close_gripper(self._gripper_close_pos) + retract = self._offset_position(position, dy=-access.clearance_mm) + await self.move_to(retract) + await self.move_to(self._offset_position(retract, dz=access.lift_height_mm)) + + # -- Place (horizontal) -- + + async def _place_horizontal(self, position: CartesianCoords, access: HorizontalAccess) -> None: + place_z_offset = access.gripper_offset_mm + above = self._offset_position( + position, dy=-access.clearance_mm, dz=access.lift_height_mm + place_z_offset + ) + await self.move_to(above) + approach = self._offset_position(position, dy=-access.clearance_mm, dz=place_z_offset) + await self.move_to(approach) + await self.move_to(self._offset_position(position, dz=place_z_offset)) + await self.open_gripper(self._gripper_open_pos) + await self.move_to(self._offset_position(position, dy=-access.clearance_mm, dz=place_z_offset)) + await self.move_to( + self._offset_position( + position, dy=-access.clearance_mm, dz=access.lift_height_mm + place_z_offset + ) + ) + + # -- Public pick/place -- + + async def pick_up_resource( + self, + position: Union[CartesianCoords, Dict[int, float]], + access: Optional[AccessPattern] = None, + ) -> None: + """Pick a resource using the appropriate access pattern.""" + pos = self._require_cartesian(position, "pick_up_resource") + if access is None: + access = VerticalAccess() + + if isinstance(access, VerticalAccess): + await self._pick_vertical(pos, access) + elif isinstance(access, HorizontalAccess): + await self._pick_horizontal(pos, access) + else: + raise TypeError(f"Unsupported access pattern: {type(access)}") + + async def drop_resource( + self, + position: Union[CartesianCoords, Dict[int, float]], + access: Optional[AccessPattern] = None, + ) -> None: + """Place a resource using the appropriate access pattern.""" + pos = self._require_cartesian(position, "drop_resource") + if access is None: + access = VerticalAccess() + + if isinstance(access, VerticalAccess): + await self._place_vertical(pos, access) + elif isinstance(access, HorizontalAccess): + await self._place_horizontal(pos, access) + else: + raise TypeError(f"Unsupported access pattern: {type(access)}") diff --git a/pylabrobot/arms/xarm6/xarm6_backend_tests.py b/pylabrobot/arms/xarm6/xarm6_backend_tests.py new file mode 100644 index 00000000000..8f34e8f9b94 --- /dev/null +++ b/pylabrobot/arms/xarm6/xarm6_backend_tests.py @@ -0,0 +1,257 @@ +import sys +import types +import unittest +from unittest.mock import MagicMock, call + +from pylabrobot.arms.backend import HorizontalAccess, VerticalAccess +from pylabrobot.arms.standard import CartesianCoords +from pylabrobot.arms.xarm6.xarm6_backend import XArm6Backend, XArm6Error +from pylabrobot.resources import Coordinate, Rotation + + +class TestXArm6Backend(unittest.IsolatedAsyncioTestCase): + async def asyncSetUp(self): + self.mock_arm = MagicMock() + self.mock_arm.clean_error.return_value = 0 + self.mock_arm.clean_warn.return_value = 0 + self.mock_arm.motion_enable.return_value = 0 + self.mock_arm.set_mode.return_value = 0 + self.mock_arm.set_state.return_value = 0 + self.mock_arm.set_position.return_value = 0 + self.mock_arm.set_servo_angle.return_value = 0 + self.mock_arm.move_gohome.return_value = 0 + self.mock_arm.get_position.return_value = [0, [100, 200, 300, 180, 0, 90]] + self.mock_arm.get_servo_angle.return_value = [0, [10, 20, 30, 40, 50, 60]] + self.mock_arm.set_gripper_position.return_value = 0 + self.mock_arm.set_gripper_mode.return_value = 0 + self.mock_arm.set_gripper_enable.return_value = 0 + self.mock_arm.emergency_stop.return_value = None + self.mock_arm.disconnect.return_value = None + + # Create fake xarm.wrapper module with a mock XArmAPI class + mock_xarm = types.ModuleType("xarm") + mock_wrapper = types.ModuleType("xarm.wrapper") + mock_wrapper.XArmAPI = MagicMock(return_value=self.mock_arm) # type: ignore[attr-defined] + mock_xarm.wrapper = mock_wrapper # type: ignore[attr-defined] + sys.modules["xarm"] = mock_xarm + sys.modules["xarm.wrapper"] = mock_wrapper + self.MockXArmAPI = mock_wrapper.XArmAPI + + self.backend = XArm6Backend(ip="192.168.1.113") + await self.backend.setup() + + async def asyncTearDown(self): + sys.modules.pop("xarm", None) + sys.modules.pop("xarm.wrapper", None) + + async def test_setup(self): + self.MockXArmAPI.assert_called_once_with("192.168.1.113") + self.mock_arm.clean_error.assert_called_once() + self.mock_arm.clean_warn.assert_called_once() + self.mock_arm.motion_enable.assert_called_once_with(True) + self.mock_arm.set_mode.assert_called_with(0) + self.mock_arm.set_state.assert_called_with(0) + + async def test_stop(self): + await self.backend.stop() + self.mock_arm.disconnect.assert_called_once() + self.assertIsNone(self.backend._arm) + + async def test_move_to_cartesian(self): + pos = CartesianCoords( + location=Coordinate(x=300, y=100, z=200), + rotation=Rotation(x=180, y=0, z=0), + ) + await self.backend.move_to(pos) + self.mock_arm.set_position.assert_called_once_with( + x=300, + y=100, + z=200, + roll=180, + pitch=0, + yaw=0, + speed=100.0, + mvacc=2000.0, + wait=True, + ) + + async def test_move_to_joints(self): + await self.backend.move_to({1: 45, 3: -90}) + self.mock_arm.get_servo_angle.assert_called() + self.mock_arm.set_servo_angle.assert_called_once_with( + angle=[45, 20, -90, 40, 50, 60], + speed=50.0, + mvacc=500.0, + wait=True, + ) + + async def test_move_to_invalid_type(self): + with self.assertRaises(TypeError): + await self.backend.move_to("invalid") # type: ignore[arg-type] + + async def test_home(self): + await self.backend.home() + self.mock_arm.move_gohome.assert_called_once_with(speed=50, mvacc=5000, wait=True) + + async def test_halt(self): + await self.backend.halt() + self.mock_arm.emergency_stop.assert_called_once() + + async def test_move_to_safe_with_position(self): + safe_pos = CartesianCoords( + location=Coordinate(x=250, y=0, z=300), + rotation=Rotation(x=180, y=0, z=0), + ) + self.backend._safe_position = safe_pos + await self.backend.move_to_safe() + self.mock_arm.set_position.assert_called_once() + + async def test_move_to_safe_no_position(self): + await self.backend.move_to_safe() + self.mock_arm.move_gohome.assert_called_once() + + async def test_get_joint_position(self): + result = await self.backend.get_joint_position() + self.assertEqual(result, {1: 10, 2: 20, 3: 30, 4: 40, 5: 50, 6: 60}) + + async def test_get_cartesian_position(self): + result = await self.backend.get_cartesian_position() + self.assertEqual(result.location.x, 100) + self.assertEqual(result.location.y, 200) + self.assertEqual(result.location.z, 300) + self.assertEqual(result.rotation.x, 180) + self.assertEqual(result.rotation.y, 0) + self.assertEqual(result.rotation.z, 90) + + async def test_open_gripper(self): + await self.backend.open_gripper(position=850) + self.mock_arm.set_gripper_position.assert_called_once_with(850, wait=True, speed=0) + + async def test_open_gripper_with_speed(self): + await self.backend.open_gripper(position=850, speed=5) + self.mock_arm.set_gripper_position.assert_called_once_with(850, wait=True, speed=5) + + async def test_close_gripper(self): + await self.backend.close_gripper(position=100) + self.mock_arm.set_gripper_position.assert_called_once_with(100, wait=True, speed=0) + + async def test_freedrive_mode(self): + await self.backend.freedrive_mode() + self.mock_arm.set_mode.assert_called_with(2) + self.mock_arm.set_state.assert_called_with(0) + + async def test_end_freedrive_mode(self): + await self.backend.end_freedrive_mode() + self.mock_arm.set_mode.assert_called_with(0) + self.mock_arm.set_state.assert_called_with(0) + + async def test_error_handling(self): + self.mock_arm.set_position.return_value = -2 + pos = CartesianCoords( + location=Coordinate(x=300, y=100, z=200), + rotation=Rotation(x=180, y=0, z=0), + ) + with self.assertRaises(XArm6Error) as ctx: + await self.backend.move_to(pos) + self.assertEqual(ctx.exception.code, -2) + + # -- Pick/Place sequence tests -- + + async def test_pick_up_resource_vertical(self): + pos = CartesianCoords( + location=Coordinate(x=300, y=100, z=50), + rotation=Rotation(x=180, y=0, z=0), + ) + access = VerticalAccess(approach_height_mm=80, clearance_mm=80, gripper_offset_mm=10) + await self.backend.pick_up_resource(pos, access=access) + + calls = self.mock_arm.set_position.call_args_list + # open_gripper, move above (z=130), descend (z=50), close_gripper, retract (z=130) + gripper_calls = self.mock_arm.set_gripper_position.call_args_list + self.assertEqual(gripper_calls[0], call(850, wait=True, speed=0)) + self.assertEqual(gripper_calls[1], call(0, wait=True, speed=0)) + self.assertEqual(len(calls), 3) + # approach: z = 50 + 80 = 130 + self.assertEqual(calls[0].kwargs["z"], 130) + # target: z = 50 + self.assertEqual(calls[1].kwargs["z"], 50) + # retract: z = 50 + 80 = 130 + self.assertEqual(calls[2].kwargs["z"], 130) + + async def test_drop_resource_vertical(self): + pos = CartesianCoords( + location=Coordinate(x=300, y=100, z=50), + rotation=Rotation(x=180, y=0, z=0), + ) + access = VerticalAccess(approach_height_mm=80, clearance_mm=80, gripper_offset_mm=10) + await self.backend.drop_resource(pos, access=access) + + calls = self.mock_arm.set_position.call_args_list + # approach (z=50+10+80=140), place (z=50+10=60), open_gripper, retract (z=140) + self.assertEqual(len(calls), 3) + self.assertEqual(calls[0].kwargs["z"], 140) + self.assertEqual(calls[1].kwargs["z"], 60) + self.assertEqual(calls[2].kwargs["z"], 140) + self.mock_arm.set_gripper_position.assert_called_once_with(850, wait=True, speed=0) + + async def test_pick_up_resource_horizontal(self): + pos = CartesianCoords( + location=Coordinate(x=300, y=100, z=50), + rotation=Rotation(x=180, y=0, z=0), + ) + access = HorizontalAccess( + approach_distance_mm=50, clearance_mm=50, lift_height_mm=100, gripper_offset_mm=10 + ) + await self.backend.pick_up_resource(pos, access=access) + + calls = self.mock_arm.set_position.call_args_list + # open, approach (y=50), target (y=100), close, retract (y=50), lift (y=50, z=150) + gripper_calls = self.mock_arm.set_gripper_position.call_args_list + self.assertEqual(gripper_calls[0], call(850, wait=True, speed=0)) + self.assertEqual(gripper_calls[1], call(0, wait=True, speed=0)) + self.assertEqual(len(calls), 4) + # approach: y = 100 - 50 = 50 + self.assertEqual(calls[0].kwargs["y"], 50) + # target: y = 100 + self.assertEqual(calls[1].kwargs["y"], 100) + # retract: y = 100 - 50 = 50 + self.assertEqual(calls[2].kwargs["y"], 50) + # lift: y = 50, z = 50 + 100 = 150 + self.assertEqual(calls[3].kwargs["y"], 50) + self.assertEqual(calls[3].kwargs["z"], 150) + + async def test_approach_vertical(self): + pos = CartesianCoords( + location=Coordinate(x=300, y=100, z=50), + rotation=Rotation(x=180, y=0, z=0), + ) + access = VerticalAccess(approach_height_mm=80) + await self.backend.approach(pos, access=access) + + self.mock_arm.set_position.assert_called_once() + call_kwargs = self.mock_arm.set_position.call_args.kwargs + self.assertEqual(call_kwargs["z"], 130) + + async def test_approach_horizontal(self): + pos = CartesianCoords( + location=Coordinate(x=300, y=100, z=50), + rotation=Rotation(x=180, y=0, z=0), + ) + access = HorizontalAccess(approach_distance_mm=50) + await self.backend.approach(pos, access=access) + + self.mock_arm.set_position.assert_called_once() + call_kwargs = self.mock_arm.set_position.call_args.kwargs + self.assertEqual(call_kwargs["y"], 50) + + async def test_pick_requires_cartesian(self): + with self.assertRaises(TypeError): + await self.backend.pick_up_resource({1: 45}) + + async def test_setup_with_tcp_offset(self): + backend = XArm6Backend( + ip="192.168.1.113", + tcp_offset=(0, 0, 50, 0, 0, 0), + ) + await backend.setup() + self.mock_arm.set_tcp_offset.assert_called_once_with([0, 0, 50, 0, 0, 0])