diff --git a/.github/workflows/graph_nav_util_test.yml b/.github/workflows/graph_nav_util_test.yml new file mode 100644 index 00000000..08636ac9 --- /dev/null +++ b/.github/workflows/graph_nav_util_test.yml @@ -0,0 +1,53 @@ +name: graph_nav_util unit tests + +# Run on every push +on: push + +jobs: + test: + runs-on: ubuntu-latest + defaults: + run: + shell: bash + + steps: + - name: Make directory + run: mkdir -p ~/spot_wrapper + + - name: Checkout repository + uses: actions/checkout@v3 + with: + path: spot_wrapper + + - name: Check files + run: ls spot_wrapper + + - name: Set up Python + uses: actions/setup-python@v2 + with: + python-version: "3.8" + + - name: Install dependencies + run: | + pip install -r spot_wrapper/requirements.txt + pip install -e spot_wrapper + + - name: Run tests + run: pytest --junit-xml=test-results.xml + + - name: Surface failing tests + if: always() + uses: pmeier/pytest-summary-gha@main + with: + # A list of JUnit XML files, directories containing the former, and wildcard patterns to process. + path: test-results.xml + + # Add a summary of the results at the top of the report + summary: true + + # Select which results should be included in the report. + # Follows the same syntax as `pytest -r` + display-options: fEX + + # Fail the workflow if no JUnit XML was found. + fail-on-empty: true diff --git a/requirements.txt b/requirements.txt index 0e57211a..2c4b4fde 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ bosdyn_core==3.2.3 protobuf==4.22.1 setuptools==45.2.0 +pytest==7.3.1 \ No newline at end of file diff --git a/spot_wrapper/graph_nav_util.py b/spot_wrapper/graph_nav_util.py deleted file mode 100644 index e35357e4..00000000 --- a/spot_wrapper/graph_nav_util.py +++ /dev/null @@ -1,130 +0,0 @@ -# Copyright (c) 2020 Boston Dynamics, Inc. All rights reserved. -# -# Downloading, reproducing, distributing or otherwise using the SDK Software -# is subject to the terms and conditions of the Boston Dynamics Software -# Development Kit License (20191101-BDSDK-SL). - -"""Graph nav utility functions""" - - -def id_to_short_code(id): - """Convert a unique id to a 2 letter short code.""" - tokens = id.split("-") - if len(tokens) > 2: - return "%c%c" % (tokens[0][0], tokens[1][0]) - return None - - -def pretty_print_waypoints( - waypoint_id, waypoint_name, short_code_to_count, localization_id, logger -): - short_code = id_to_short_code(waypoint_id) - if short_code is None or short_code_to_count[short_code] != 1: - short_code = " " # If the short code is not valid/unique, don't show it. - - logger.info( - "%s Waypoint name: %s id: %s short code: %s" - % ( - "->" if localization_id == waypoint_id else " ", - waypoint_name, - waypoint_id, - short_code, - ) - ) - - -def find_unique_waypoint_id(short_code, graph, name_to_id, logger): - """Convert either a 2 letter short code or an annotation name into the associated unique id.""" - if len(short_code) != 2: - # Not a short code, check if it is an annotation name (instead of the waypoint id). - if short_code in name_to_id: - # Short code is an waypoint's annotation name. Check if it is paired with a unique waypoint id. - if name_to_id[short_code] is not None: - # Has an associated waypoint id! - return name_to_id[short_code] - else: - logger.error( - "The waypoint name %s is used for multiple different unique waypoints. Please use" - + "the waypoint id." % (short_code) - ) - return None - # Also not an waypoint annotation name, so we will operate under the assumption that it is a - # unique waypoint id. - return short_code - - ret = short_code - for waypoint in graph.waypoints: - if short_code == id_to_short_code(waypoint.id): - if ret != short_code: - return short_code # Multiple waypoints with same short code. - ret = waypoint.id - return ret - - -def update_waypoints_and_edges(graph, localization_id, logger): - """Update and print waypoint ids and edge ids.""" - name_to_id = dict() - edges = dict() - - short_code_to_count = {} - waypoint_to_timestamp = [] - for waypoint in graph.waypoints: - # Determine the timestamp that this waypoint was created at. - timestamp = -1.0 - try: - timestamp = ( - waypoint.annotations.creation_time.seconds - + waypoint.annotations.creation_time.nanos / 1e9 - ) - except: - # Must be operating on an older graph nav map, since the creation_time is not - # available within the waypoint annotations message. - pass - waypoint_to_timestamp.append( - (waypoint.id, timestamp, waypoint.annotations.name) - ) - - # Determine how many waypoints have the same short code. - short_code = id_to_short_code(waypoint.id) - if short_code not in short_code_to_count: - short_code_to_count[short_code] = 0 - short_code_to_count[short_code] += 1 - - # Add the annotation name/id into the current dictionary. - waypoint_name = waypoint.annotations.name - if waypoint_name: - if waypoint_name in name_to_id: - # Waypoint name is used for multiple different waypoints, so set the waypoint id - # in this dictionary to None to avoid confusion between two different waypoints. - name_to_id[waypoint_name] = None - else: - # First time we have seen this waypoint annotation name. Add it into the dictionary - # with the respective waypoint unique id. - name_to_id[waypoint_name] = waypoint.id - - # Sort the set of waypoints by their creation timestamp. If the creation timestamp is unavailable, - # fallback to sorting by annotation name. - waypoint_to_timestamp = sorted(waypoint_to_timestamp, key=lambda x: (x[1], x[2])) - - # Print out the waypoints name, id, and short code in a ordered sorted by the timestamp from - # when the waypoint was created. - logger.info("%d waypoints:" % len(graph.waypoints)) - for waypoint in waypoint_to_timestamp: - pretty_print_waypoints( - waypoint[0], waypoint[2], short_code_to_count, localization_id, logger - ) - - for edge in graph.edges: - if edge.id.to_waypoint in edges: - if edge.id.from_waypoint not in edges[edge.id.to_waypoint]: - edges[edge.id.to_waypoint].append(edge.id.from_waypoint) - else: - edges[edge.id.to_waypoint] = [edge.id.from_waypoint] - logger.info( - "(Edge) from waypoint id: ", - edge.id.from_waypoint, - " and to waypoint id: ", - edge.id.to_waypoint, - ) - - return name_to_id, edges diff --git a/spot_wrapper/spot_arm.py b/spot_wrapper/spot_arm.py index c671b881..b1ba2263 100644 --- a/spot_wrapper/spot_arm.py +++ b/spot_wrapper/spot_arm.py @@ -21,7 +21,7 @@ from bosdyn.client.time_sync import TimeSyncEndpoint from bosdyn.util import seconds_to_duration -from spot_wrapper.wrapper_helpers import RobotState +from spot_wrapper.wrapper_helpers import RobotState, ClaimAndPowerDecorator class SpotArm: @@ -34,13 +34,19 @@ def __init__( manipulation_api_client: ManipulationApiClient, robot_state_client: RobotStateClient, max_command_duration: float, + claim_and_power_decorator: ClaimAndPowerDecorator, ) -> None: """ Constructor for SpotArm class. Args: robot: Robot object logger: Logger object + robot_state: Object containing the robot's state as controlled by the wrapper + robot_command_client: Command client to use to send commands to the robot + manipulation_api_client: Command client to send manipulation commands to the robot + robot_state_client: Client to retrieve state of the robot max_command_duration: Maximum duration for commands when using the manipulation command method + claim_and_power_decorator: Object to use to decorate the functions on this object """ self._robot = robot self._logger = logger @@ -49,6 +55,23 @@ def __init__( self._robot_command_client = robot_command_client self._manipulation_api_client = manipulation_api_client self._robot_state_client = robot_state_client + self._claim_and_power_decorator = claim_and_power_decorator + self._claim_and_power_decorator.decorate_functions( + self, + decorated_funcs=[ + self.ensure_arm_power_and_stand, + self.arm_stow, + self.arm_unstow, + self.arm_carry, + self.arm_joint_move, + self.force_trajectory, + self.gripper_open, + self.gripper_close, + self.gripper_angle_open, + self.hand_pose, + self.grasp_3d, + ], + ) def _manipulation_request( self, diff --git a/spot_wrapper/spot_docking.py b/spot_wrapper/spot_docking.py index 63a88f83..085152cf 100644 --- a/spot_wrapper/spot_docking.py +++ b/spot_wrapper/spot_docking.py @@ -6,7 +6,11 @@ from bosdyn.client.docking import DockingClient, blocking_dock_robot, blocking_undock from bosdyn.client.robot import Robot -from spot_wrapper.wrapper_helpers import RobotState, RobotCommandData +from spot_wrapper.wrapper_helpers import ( + RobotState, + RobotCommandData, + ClaimAndPowerDecorator, +) class SpotDocking: @@ -22,6 +26,7 @@ def __init__( command_data: RobotCommandData, docking_client: DockingClient, robot_command_client: robot_command.RobotCommandClient, + claim_and_power_decorator: ClaimAndPowerDecorator, ) -> None: self._robot = robot self._logger = logger @@ -29,6 +34,12 @@ def __init__( self._docking_client: DockingClient = docking_client self._robot_command_client = robot_command_client self._robot_state = robot_state + self._claim_and_power_decorator = claim_and_power_decorator + # Decorate the functions so that they take the lease. Dock function needs to power on because it might have + # to move the robot, the undock + self._claim_and_power_decorator.decorate_functions( + self, decorated_funcs=[self.dock, self.undock] + ) def dock(self, dock_id: int) -> typing.Tuple[bool, str]: """Dock the robot to the docking station with fiducial ID [dock_id].""" diff --git a/spot_wrapper/spot_graph_nav.py b/spot_wrapper/spot_graph_nav.py new file mode 100644 index 00000000..515234b1 --- /dev/null +++ b/spot_wrapper/spot_graph_nav.py @@ -0,0 +1,926 @@ +import logging +import math +import os +import time +import typing + +from bosdyn.api.graph_nav import graph_nav_pb2 +from bosdyn.api.graph_nav import map_pb2 +from bosdyn.api.graph_nav import map_processing_pb2 +from bosdyn.api.graph_nav import nav_pb2 +from bosdyn.client.frame_helpers import get_odom_tform_body +from bosdyn.client.graph_nav import GraphNavClient +from bosdyn.client.lease import LeaseClient, LeaseWallet, LeaseKeepAlive, Lease +from bosdyn.client.map_processing import MapProcessingServiceClient +from bosdyn.client.robot import Robot +from bosdyn.client.robot_state import RobotStateClient +from google.protobuf import wrappers_pb2 + + +class SpotGraphNav: + def __init__( + self, + robot: Robot, + logger: logging.Logger, + graph_nav_client: GraphNavClient, + map_processing_client: MapProcessingServiceClient, + robot_state_client: RobotStateClient, + lease_client: LeaseClient, + ): + self._robot = robot + self._logger = logger + self._graph_nav_client = graph_nav_client + self._map_processing_client = map_processing_client + self._robot_state_client = robot_state_client + self._lease_client = lease_client + self._lease_wallet: LeaseWallet = self._lease_client.lease_wallet + + self._init_current_graph_nav_state() + + def _get_lease(self) -> Lease: + self._lease = self._lease_wallet.get_lease() + return self._lease + + def _init_current_graph_nav_state(self): + # Store the most recent knowledge of the state of the robot based on rpc calls. + self._current_graph = None + self._current_edges = {} # maps to_waypoint to list(from_waypoint) + self._current_waypoint_snapshots = {} # maps id to waypoint snapshot + self._current_edge_snapshots = {} # maps id to edge snapshot + self._current_annotation_name_to_wp_id = {} + self._current_anchored_world_objects = ( + {} + ) # maps object id to a (wo, waypoint, fiducial) + self._current_anchors = {} # maps anchor id to anchor + + def list_graph(self) -> typing.List[str]: + """List waypoint ids of graph_nav + Args: + upload_path : Path to the root directory of the map. + """ + ids, eds = self._list_graph_waypoint_and_edge_ids() + + return [ + v + for k, v in sorted( + ids.items(), key=lambda id: int(id[0].replace("waypoint_", "")) + ) + ] + + def navigate_initial_localization( + self, + upload_path: str, + initial_localization_fiducial: bool = True, + initial_localization_waypoint: typing.Optional[str] = None, + ): + """Navigate with graphnav. + + Args: + upload_path : Path to the root directory of the map. + navigate_to : Waypont id string for where to goal + initial_localization_fiducial : Tells the initializer whether to use fiducials + initial_localization_waypoint : Waypoint id string of current robot position (optional) + """ + # Filepath for uploading a saved graph's and snapshots too. + if upload_path and upload_path[-1] == "/": + upload_filepath = upload_path[:-1] + else: + upload_filepath = upload_path + + # Boolean indicating the robot's power state. + power_state = self._robot_state_client.get_robot_state().power_state + self._started_powered_on = power_state.motor_power_state == power_state.STATE_ON + self._powered_on = self._started_powered_on + + # Claim lease, power on robot, start graphnav. + self._lease = self._get_lease() + self._lease_keepalive = LeaseKeepAlive(self._lease_client) + if upload_filepath: + self.clear_graph() + self.upload_graph_and_snapshots(upload_filepath) + else: + self._download_current_graph() + self._logger.info( + "Re-using existing graph on robot. Check that the correct graph is loaded!" + ) + if initial_localization_fiducial: + self.set_initial_localization_fiducial() + if initial_localization_waypoint: + self.set_initial_localization_waypoint([initial_localization_waypoint]) + self._list_graph_waypoint_and_edge_ids() + self._get_localization_state() + + return True, "Localization done!" + + def navigate_to_existing_waypoint(self, waypoint_id: str): + """Navigate to an existing waypoint. + Args: + waypoint_id : Waypoint id string for where to go + """ + self._get_localization_state() + resp = self._navigate_to(waypoint_id) + return resp + + def navigate_through_route(self, waypoint_ids: typing.List[str]): + """ + Args: + waypoint_ids: List[str] of waypoints to follow + """ + self._get_localization_state() + self._logger.info(f"Waypoint ids: {','.join(waypoint_ids)}") + resp = self._navigate_route(waypoint_ids) + return resp + + def download_navigation_graph(self, download_path: str) -> typing.List[str]: + """Download the navigation graph. + Args: + download_path : Path to the root directory of the map. + """ + self._download_filepath = download_path + self._download_full_graph() + return self.list_graph() + + def navigation_close_loops( + self, close_fiducial_loops: bool, close_odometry_loops: bool + ) -> typing.Tuple[bool, str]: + return self._auto_close_loops(close_fiducial_loops, close_odometry_loops) + + def optmize_anchoring(self) -> typing.Tuple[bool, str]: + return self._optimize_anchoring() + + def _write_bytes_while_download(self, filepath: str, data: bytes): + """Write data to a file. + Args: + filepath (str) : Path of file where data will be written. + data (bytes) : Bytes of data""" + directory = os.path.dirname(filepath) + os.makedirs(directory, exist_ok=True) + with open(filepath, "wb+") as f: + f.write(data) + f.close() + + def _download_graph_and_snapshots( + self, download_path: str + ) -> typing.Tuple[bool, str]: + """Download the graph and snapshots from the robot. + Args: + download_path (str): Directory where graph and snapshots are downloaded from robot. + Returns: + success (bool): success flag + message (str): message""" + graph = self._graph_nav_client.download_graph() + if graph is None: + return False, "Failed to download the graph." + graph_bytes = graph.SerializeToString() + self._write_bytes_while_download( + os.path.join(download_path, "graph"), graph_bytes + ) + # Download the waypoint and edge snapshots. + for waypoint in graph.waypoints: + try: + waypoint_snapshot = self._graph_nav_client.download_waypoint_snapshot( + waypoint.snapshot_id + ) + except Exception: + self._logger.warning( + "Failed to download waypoint snapshot: %s", waypoint.snapshot_id + ) + continue + self._write_bytes_while_download( + os.path.join(download_path, "waypoint_snapshots", waypoint.snapshot_id), + waypoint_snapshot.SerializeToString(), + ) + for edge in graph.edges: + try: + edge_snapshot = self._graph_nav_client.download_edge_snapshot( + edge.snapshot_id + ) + except Exception: + self._logger.warning( + "Failed to download edge snapshot: %s", edge.snapshot_id + ) + continue + self._write_bytes_while_download( + os.path.join(download_path, "edge_snapshots", edge.snapshot_id), + edge_snapshot.SerializeToString(), + ) + return True, "Success" + + def clear_graph(self) -> typing.Tuple[bool, str]: + """Clear the state of the map on the robot, removing all waypoints and edges in the RAM of the robot. + Returns: (bool, str) tuple indicating whether the command was successfully sent, and a message + """ + try: + self._clear_graph() + return True, "Success" + except Exception as e: + return ( + False, + f"Got an error while clearing a graph and snanshots in a robot: {e}", + ) + + def upload_graph(self, upload_path: str) -> typing.Tuple[bool, str]: + """Upload the specified graph and snapshots from local to a robot. + While this method, if there are snapshots already in the robot, they will be loaded from the robot's disk without uploading. + Graph and snapshots to be uploaded should be placed like + Directory specified with upload_path arg + | + +-- graph + | + +-- waypoint_snapshots/ + | | + | +-- waypont snapshot files + | + +-- edge_snapshots/ + | + +-- edge snapshot files + Args: + upload_path (str): Path to the directory of the map. + Returns: (bool, str) tuple indicating whether the command was successfully sent, and a message + """ + try: + self._upload_graph_and_snapshots(upload_path) + return True, "Success" + except Exception as e: + return ( + False, + f"Got an error while uploading a graph and snapshots to a robot: {e}", + ) + + def download_graph(self, download_path: str) -> typing.Tuple[bool, str]: + """Download current graph and snapshots in the robot to the specified directory. + Args: + download_path (str): Directory where graph and snapshots are downloaded from robot. + Returns: (bool, str) tuple indicating whether the command was successfully sent, and a message + """ + try: + success, message = self._download_graph_and_snapshots( + download_path=download_path + ) + return success, message + except Exception as e: + return ( + False, + f"Got an error during downloading graph and snapshots from the robot: {e}", + ) + + ## Copied from https://github.com/boston-dynamics/spot-sdk/blob/master/python/examples/graph_nav_command_line/recording_command_line.py and https://github.com/boston-dynamics/spot-sdk/blob/master/python/examples/graph_nav_command_line/graph_nav_command_line.py with minor modifications + # Copyright (c) 2020 Boston Dynamics, Inc. All rights reserved. + # + # Downloading, reproducing, distributing or otherwise using the SDK Software + # is subject to the terms and conditions of the Boston Dynamics Software + # Development Kit License (20191101-BDSDK-SL). + def _get_localization_state(self, *args): + """Get the current localization and state of the robot.""" + state = self._graph_nav_client.get_localization_state() + self._logger.info(f"Got localization: \n{str(state.localization)}") + odom_tform_body = get_odom_tform_body( + state.robot_kinematics.transforms_snapshot + ) + self._logger.info( + f"Got robot state in kinematic odometry frame: \n{str(odom_tform_body)}" + ) + + def set_initial_localization_fiducial(self, *args): + """Trigger localization when near a fiducial.""" + robot_state = self._robot_state_client.get_robot_state() + current_odom_tform_body = get_odom_tform_body( + robot_state.kinematic_state.transforms_snapshot + ).to_proto() + # Create an empty instance for initial localization since we are asking it to localize + # based on the nearest fiducial. + localization = nav_pb2.Localization() + self._graph_nav_client.set_localization( + initial_guess_localization=localization, + ko_tform_body=current_odom_tform_body, + ) + + def set_initial_localization_waypoint(self, *args): + """Trigger localization to a waypoint.""" + # Take the first argument as the localization waypoint. + if len(args) < 1: + # If no waypoint id is given as input, then return without initializing. + self._logger.error("No waypoint specified to initialize to.") + return + destination_waypoint = self._find_unique_waypoint_id( + args[0][0], + self._current_graph, + self._current_annotation_name_to_wp_id, + self._logger, + ) + if not destination_waypoint: + self._logger.error("Failed to find waypoint id.") + return + + robot_state = self._robot_state_client.get_robot_state() + current_odom_tform_body = get_odom_tform_body( + robot_state.kinematic_state.transforms_snapshot + ).to_proto() + # Create an initial localization to the specified waypoint as the identity. + localization = nav_pb2.Localization() + localization.waypoint_id = destination_waypoint + localization.waypoint_tform_body.rotation.w = 1.0 + self._graph_nav_client.set_localization( + initial_guess_localization=localization, + # It's hard to get the pose perfect, search +/-20 deg and +/-20cm (0.2m). + max_distance=0.2, + max_yaw=20.0 * math.pi / 180.0, + fiducial_init=graph_nav_pb2.SetLocalizationRequest.FIDUCIAL_INIT_NO_FIDUCIAL, + ko_tform_body=current_odom_tform_body, + ) + + def _download_current_graph(self): + graph = self._graph_nav_client.download_graph() + if graph is None: + self._logger.error("Empty graph.") + return + self._current_graph = graph + return graph + + def _download_full_graph(self, *args): + """Download the graph and snapshots from the robot.""" + graph = self._graph_nav_client.download_graph() + if graph is None: + self._logger.info("Failed to download the graph.") + return + self._write_full_graph(graph) + self._logger.info( + "Graph downloaded with {} waypoints and {} edges".format( + len(graph.waypoints), len(graph.edges) + ) + ) + # Download the waypoint and edge snapshots. + self._download_and_write_waypoint_snapshots(graph.waypoints) + self._download_and_write_edge_snapshots(graph.edges) + + def _write_full_graph(self, graph): + """Download the graph from robot to the specified, local filepath location.""" + graph_bytes = graph.SerializeToString() + self._write_bytes(self._download_filepath, "/graph", graph_bytes) + + def _download_and_write_waypoint_snapshots(self, waypoints): + """Download the waypoint snapshots from robot to the specified, local filepath location.""" + num_waypoint_snapshots_downloaded = 0 + for waypoint in waypoints: + if len(waypoint.snapshot_id) == 0: + continue + try: + waypoint_snapshot = self._graph_nav_client.download_waypoint_snapshot( + waypoint.snapshot_id + ) + except Exception: + # Failure in downloading waypoint snapshot. Continue to next snapshot. + self._logger.error( + "Failed to download waypoint snapshot: " + waypoint.snapshot_id + ) + continue + self._write_bytes( + self._download_filepath + "/waypoint_snapshots", + "/" + waypoint.snapshot_id, + waypoint_snapshot.SerializeToString(), + ) + num_waypoint_snapshots_downloaded += 1 + self._logger.info( + "Downloaded {} of the total {} waypoint snapshots.".format( + num_waypoint_snapshots_downloaded, len(waypoints) + ) + ) + + def _download_and_write_edge_snapshots(self, edges): + """Download the edge snapshots from robot to the specified, local filepath location.""" + num_edge_snapshots_downloaded = 0 + num_to_download = 0 + for edge in edges: + if len(edge.snapshot_id) == 0: + continue + num_to_download += 1 + try: + edge_snapshot = self._graph_nav_client.download_edge_snapshot( + edge.snapshot_id + ) + except Exception: + # Failure in downloading edge snapshot. Continue to next snapshot. + self._logger.error( + "Failed to download edge snapshot: " + edge.snapshot_id + ) + continue + self._write_bytes( + self._download_filepath + "/edge_snapshots", + "/" + edge.snapshot_id, + edge_snapshot.SerializeToString(), + ) + num_edge_snapshots_downloaded += 1 + self._logger.info( + "Downloaded {} of the total {} edge snapshots.".format( + num_edge_snapshots_downloaded, num_to_download + ) + ) + + def _write_bytes(self, filepath: str, filename: str, data): + """Write data to a file.""" + os.makedirs(filepath, exist_ok=True) + with open(filepath + filename, "wb+") as f: + f.write(data) + f.close() + + def _list_graph_waypoint_and_edge_ids(self, *args): + """List the waypoint ids and edge ids of the graph currently on the robot.""" + + # Download current graph + graph = self._download_current_graph() + + localization_id = ( + self._graph_nav_client.get_localization_state().localization.waypoint_id + ) + + # Update and print waypoints and edges + ( + self._current_annotation_name_to_wp_id, + self._current_edges, + ) = self._update_waypoints_and_edges(graph, localization_id, self._logger) + return self._current_annotation_name_to_wp_id, self._current_edges + + def _upload_graph_and_snapshots(self, upload_filepath: str): + """Upload the graph and snapshots to the robot.""" + self._logger.info("Loading the graph from disk into local storage...") + with open(os.path.join(upload_filepath, "graph"), "rb") as graph_file: + # Load the graph from disk. + data = graph_file.read() + self._current_graph = map_pb2.Graph() + self._current_graph.ParseFromString(data) + self._logger.info( + "Loaded graph has {} waypoints and {} edges".format( + len(self._current_graph.waypoints), len(self._current_graph.edges) + ) + ) + for waypoint in self._current_graph.waypoints: + # Load the waypoint snapshots from disk. + if len(waypoint.snapshot_id) == 0: + continue + waypoint_filepath = os.path.join( + upload_filepath, "waypoint_snapshots", waypoint.snapshot_id + ) + if not os.path.exists(waypoint_filepath): + continue + with open(waypoint_filepath, "rb") as snapshot_file: + waypoint_snapshot = map_pb2.WaypointSnapshot() + waypoint_snapshot.ParseFromString(snapshot_file.read()) + self._current_waypoint_snapshots[ + waypoint_snapshot.id + ] = waypoint_snapshot + + for fiducial in waypoint_snapshot.objects: + if not fiducial.HasField("apriltag_properties"): + continue + + str_id = str(fiducial.apriltag_properties.tag_id) + if ( + str_id in self._current_anchored_world_objects + and len(self._current_anchored_world_objects[str_id]) == 1 + ): + # Replace the placeholder tuple with a tuple of (wo, waypoint, fiducial). + anchored_wo = self._current_anchored_world_objects[str_id][0] + self._current_anchored_world_objects[str_id] = ( + anchored_wo, + waypoint, + fiducial, + ) + for edge in self._current_graph.edges: + # Load the edge snapshots from disk. + if len(edge.snapshot_id) == 0: + continue + edge_filepath = os.path.join( + upload_filepath, "edge_snapshots", edge.snapshot_id + ) + if not os.path.exists(edge_filepath): + continue + with open(edge_filepath, "rb") as snapshot_file: + edge_snapshot = map_pb2.EdgeSnapshot() + edge_snapshot.ParseFromString(snapshot_file.read()) + self._current_edge_snapshots[edge_snapshot.id] = edge_snapshot + for anchor in self._current_graph.anchoring.anchors: + self._current_anchors[anchor.id] = anchor + # Upload the graph to the robot. + self._logger.info("Uploading the graph and snapshots to the robot...") + if self._lease is None: + self._logger.error( + "Graph nav module did not have a lease to the robot. Claim it before attempting to upload the graph " + "and snapshots." + ) + return + + self._graph_nav_client.upload_graph( + lease=self._lease.lease_proto, graph=self._current_graph + ) + # Upload the snapshots to the robot. + for waypoint_snapshot in self._current_waypoint_snapshots.values(): + self._graph_nav_client.upload_waypoint_snapshot(waypoint_snapshot) + self._logger.info("Uploaded {}".format(waypoint_snapshot.id)) + for edge_snapshot in self._current_edge_snapshots.values(): + self._graph_nav_client.upload_edge_snapshot(edge_snapshot) + self._logger.info("Uploaded {}".format(edge_snapshot.id)) + + # The upload is complete! Check that the robot is localized to the graph, + # and it if is not, prompt the user to localize the robot before attempting + # any navigation commands. + localization_state = self._graph_nav_client.get_localization_state() + if not localization_state.localization.waypoint_id: + # The robot is not localized to the newly uploaded graph. + self._logger.info( + "Upload complete! The robot is currently not localized to the map; please localize the robot using a " + "fiducial before attempting a navigation command." + ) + + def _navigate_to(self, waypoint_id: str) -> typing.Tuple[bool, str]: + """Navigate to a specific waypoint.""" + self._lease = self._get_lease() + destination_waypoint = self._find_unique_waypoint_id( + waypoint_id, + self._current_graph, + self._current_annotation_name_to_wp_id, + self._logger, + ) + if not destination_waypoint: + self._logger.error( + "Failed to find the appropriate unique waypoint id for the navigation command." + ) + return ( + False, + "Failed to find the appropriate unique waypoint id for the navigation command.", + ) + + # Stop the lease keepalive and create a new sublease for graphnav. + self._lease = self._lease_wallet.advance() + sublease = self._lease.create_sublease() + self._lease_keepalive.shutdown() + + # Navigate to the destination waypoint. + is_finished = False + nav_to_cmd_id = -1 + while not is_finished: + # Issue the navigation command about twice a second such that it is easy to terminate the + # navigation command (with estop or killing the program). + nav_to_cmd_id = self._graph_nav_client.navigate_to( + destination_waypoint, 1.0, leases=[sublease.lease_proto] + ) + time.sleep(0.5) # Sleep for half a second to allow for command execution. + # Poll the robot for feedback to determine if the navigation command is complete. + is_finished = self._check_success(nav_to_cmd_id) + + self._lease = self._lease_wallet.advance() + self._lease_keepalive = LeaseKeepAlive(self._lease_client) + + status = self._graph_nav_client.navigation_feedback(nav_to_cmd_id) + if ( + status.status + == graph_nav_pb2.NavigationFeedbackResponse.STATUS_REACHED_GOAL + ): + return True, "Successfully completed the navigation commands!" + elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_LOST: + return ( + False, + "Robot got lost when navigating the route, the robot will now sit down.", + ) + elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_STUCK: + return ( + False, + "Robot got stuck when navigating the route, the robot will now sit down.", + ) + elif ( + status.status + == graph_nav_pb2.NavigationFeedbackResponse.STATUS_ROBOT_IMPAIRED + ): + return False, "Robot is impaired." + else: + return False, "Navigation command is not complete yet." + + def _navigate_route( + self, waypoint_ids: typing.List[str] + ) -> typing.Tuple[bool, str]: + """Navigate through a specific route of waypoints. + Note that each waypoint must have an edge between them, aka be adjacent. + """ + for i in range(len(waypoint_ids)): + waypoint_ids[i] = self._find_unique_waypoint_id( + waypoint_ids[i], + self._current_graph, + self._current_annotation_name_to_wp_id, + self._logger, + ) + if not waypoint_ids[i]: + self._logger.error( + "navigate_route: Failed to find the unique waypoint id." + ) + return False, "Failed to find the unique waypoint id." + + edge_ids_list = [] + # Attempt to find edges in the current graph that match the ordered waypoint pairs. + # These are necessary to create a valid route. + for i in range(len(waypoint_ids) - 1): + start_wp = waypoint_ids[i] + end_wp = waypoint_ids[i + 1] + edge_id = self._match_edge(self._current_edges, start_wp, end_wp) + if edge_id is not None: + edge_ids_list.append(edge_id) + else: + self._logger.error( + f"Failed to find an edge between waypoints: {start_wp} and {end_wp}" + ) + return ( + False, + f"Failed to find an edge between waypoints: {start_wp} and {end_wp}", + ) + + # Stop the lease keepalive and create a new sublease for graphnav. + self._lease = self._lease_wallet.advance() + sublease = self._lease.create_sublease() + self._lease_keepalive.shutdown() + + # Navigate a specific route. + route = self._graph_nav_client.build_route(waypoint_ids, edge_ids_list) + is_finished = False + while not is_finished: + # Issue the route command about twice a second such that it is easy to terminate the + # navigation command (with estop or killing the program). + nav_route_command_id = self._graph_nav_client.navigate_route( + route, cmd_duration=1.0, leases=[sublease.lease_proto] + ) + time.sleep(0.5) # Sleep for half a second to allow for command execution. + # Poll the robot for feedback to determine if the route is complete. + is_finished = self._check_success(nav_route_command_id) + + self._lease = self._lease_wallet.advance() + self._lease_keepalive = LeaseKeepAlive(self._lease_client) + + return True, "Finished navigating route!" + + def _clear_graph(self, *args) -> bool: + """Clear the state of the map on the robot, removing all waypoints and edges in the RAM of the robot""" + self._lease = self._get_lease() + result = self._graph_nav_client.clear_graph(lease=self._lease.lease_proto) + self._init_current_graph_nav_state() + return result + + def _check_success(self, command_id: int = -1) -> bool: + """Use a navigation command id to get feedback from the robot and sit when command succeeds.""" + if command_id == -1: + # No command, so we have not status to check. + return False + status = self._graph_nav_client.navigation_feedback(command_id) + if ( + status.status + == graph_nav_pb2.NavigationFeedbackResponse.STATUS_REACHED_GOAL + ): + # Successfully completed the navigation commands! + return True + elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_LOST: + self._logger.error( + "Robot got lost when navigating the route, the robot will now sit down." + ) + return True + elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_STUCK: + self._logger.error( + "Robot got stuck when navigating the route, the robot will now sit down." + ) + return True + elif ( + status.status + == graph_nav_pb2.NavigationFeedbackResponse.STATUS_ROBOT_IMPAIRED + ): + self._logger.error("Robot is impaired.") + return True + else: + # Navigation command is not complete yet. + return False + + def _match_edge( + self, + current_edges: typing.Dict[str, typing.List[str]], + waypoint1: str, + waypoint2: str, + ) -> typing.Optional[map_pb2.Edge.Id]: + """Find an edge in the graph that is between two waypoint ids.""" + # Return the correct edge id as soon as it's found. + for edge_to_id in current_edges: + for edge_from_id in current_edges[edge_to_id]: + if (waypoint1 == edge_to_id) and (waypoint2 == edge_from_id): + # This edge matches the pair of waypoints! Add it the edge list and continue. + return map_pb2.Edge.Id( + from_waypoint=waypoint2, to_waypoint=waypoint1 + ) + elif (waypoint2 == edge_to_id) and (waypoint1 == edge_from_id): + # This edge matches the pair of waypoints! Add it the edge list and continue. + return map_pb2.Edge.Id( + from_waypoint=waypoint1, to_waypoint=waypoint2 + ) + return None + + def _auto_close_loops( + self, close_fiducial_loops: bool, close_odometry_loops: bool, *args + ): + """Automatically find and close all loops in the graph.""" + response: map_processing_pb2.ProcessTopologyResponse = ( + self._map_processing_client.process_topology( + params=map_processing_pb2.ProcessTopologyRequest.Params( + do_fiducial_loop_closure=wrappers_pb2.BoolValue( + value=close_fiducial_loops + ), + do_odometry_loop_closure=wrappers_pb2.BoolValue( + value=close_odometry_loops + ), + ), + modify_map_on_server=True, + ) + ) + self._logger.info( + "Created {} new edge(s).".format(len(response.new_subgraph.edges)) + ) + if response.status == map_processing_pb2.ProcessTopologyResponse.STATUS_OK: + return True, "Successfully closed loops." + elif ( + response.status + == map_processing_pb2.ProcessTopologyResponse.STATUS_MISSING_WAYPOINT_SNAPSHOTS + ): + return False, "Missing waypoint snapshots." + elif ( + response.status + == map_processing_pb2.ProcessTopologyResponse.STATUS_INVALID_GRAPH + ): + return False, "Invalid graph." + elif ( + response.status + == map_processing_pb2.ProcessTopologyResponse.STATUS_MAP_MODIFIED_DURING_PROCESSING + ): + return False, "Map modified during processing." + else: + return False, "Unknown error during map processing." + + def _optimize_anchoring(self, *args): + """Call anchoring optimization on the server, producing a globally optimal reference frame for waypoints to be expressed in.""" + response: map_processing_pb2.ProcessAnchoringResponse = ( + self._map_processing_client.process_anchoring( + params=map_processing_pb2.ProcessAnchoringRequest.Params(), + modify_anchoring_on_server=True, + stream_intermediate_results=False, + ) + ) + if response.status == map_processing_pb2.ProcessAnchoringResponse.STATUS_OK: + self._logger.info( + "Optimized anchoring after {} iteration(s).".format(response.iteration) + ) + return True, "Successfully optimized anchoring." + else: + self._logger.error("Error optimizing {}".format(response)) + status_mapping = { + map_processing_pb2.ProcessAnchoringResponse.STATUS_MISSING_WAYPOINT_SNAPSHOTS: ( + "Missing waypoint snapshots." + ), + map_processing_pb2.ProcessAnchoringResponse.STATUS_INVALID_GRAPH: "Invalid graph.", + map_processing_pb2.ProcessAnchoringResponse.STATUS_OPTIMIZATION_FAILURE: "Optimization failure.", + map_processing_pb2.ProcessAnchoringResponse.STATUS_INVALID_PARAMS: "Invalid parameters.", + map_processing_pb2.ProcessAnchoringResponse.STATUS_CONSTRAINT_VIOLATION: "Constraint violation.", + map_processing_pb2.ProcessAnchoringResponse.STATUS_MAX_ITERATIONS: "Max iterations, timeout.", + map_processing_pb2.ProcessAnchoringResponse.STATUS_MAX_TIME: "Max time reached, timeout.", + map_processing_pb2.ProcessAnchoringResponse.STATUS_INVALID_HINTS: "Invalid hints.", + map_processing_pb2.ProcessAnchoringResponse.STATUS_MAP_MODIFIED_DURING_PROCESSING: ( + "Map modified during processing." + ), + map_processing_pb2.ProcessAnchoringResponse.STATUS_INVALID_GRAVITY_ALIGNMENT: ( + "Invalid gravity alignment." + ), + } + if response.status in status_mapping: + return False, status_mapping[response.status] + return False, "Unknown error during anchoring optimization." + + def _id_to_short_code(self, id: str): + """Convert a unique id to a 2 letter short code.""" + tokens = id.split("-") + if len(tokens) > 2: + return f"{tokens[0][0]}{tokens[1][0]}" + return None + + def _pretty_print_waypoints( + self, + waypoint_id: str, + waypoint_name: str, + short_code_to_count: typing.Dict[str, int], + localization_id: str, + logger: logging.Logger, + ): + short_code = self._id_to_short_code(waypoint_id) + if short_code is None or short_code_to_count[short_code] != 1: + # If the short code is not valid/unique, don't show it. + short_code = " " + + logger.info( + "%s Waypoint name: %s id: %s short code: %s" + % ( + "->" if localization_id == waypoint_id else " ", + waypoint_name, + waypoint_id, + short_code, + ) + ) + + def _find_unique_waypoint_id( + self, + short_code: str, + graph: map_pb2.Graph, + name_to_id: typing.Dict[str, str], + logger: logging.Logger, + ): + """Convert either a 2 letter short code or an annotation name into the associated unique id.""" + if len(short_code) != 2: + # Not a short code, check if it is an annotation name (instead of the waypoint id). + if short_code in name_to_id: + # Short code is an waypoint's annotation name. Check if it is paired with a unique waypoint id. + if name_to_id[short_code] is not None: + # Has an associated waypoint id! + return name_to_id[short_code] + else: + logger.error( + "The waypoint name %s is used for multiple different unique waypoints. Please use" + + "the waypoint id." % (short_code) + ) + return None + # Also not an waypoint annotation name, so we will operate under the assumption that it is a + # unique waypoint id. + return short_code + + ret = short_code + for waypoint in graph.waypoints: + if short_code == self._id_to_short_code(waypoint.id): + if ret != short_code: + return short_code # Multiple waypoints with same short code. + ret = waypoint.id + return ret + + def _update_waypoints_and_edges( + self, graph: map_pb2.Graph, localization_id: str, logger: logging.Logger + ) -> typing.Tuple[typing.Dict[str, str], typing.Dict[str, str]]: + """Update and print waypoint ids and edge ids.""" + name_to_id = dict() + edges = dict() + + short_code_to_count = {} + waypoint_to_timestamp = [] + for waypoint in graph.waypoints: + # Determine the timestamp that this waypoint was created at. + timestamp = -1.0 + try: + timestamp = ( + waypoint.annotations.creation_time.seconds + + waypoint.annotations.creation_time.nanos / 1e9 + ) + except: + # Must be operating on an older graph nav map, since the creation_time is not + # available within the waypoint annotations message. + pass + waypoint_to_timestamp.append( + (waypoint.id, timestamp, waypoint.annotations.name) + ) + + # Determine how many waypoints have the same short code. + short_code = self._id_to_short_code(waypoint.id) + if short_code not in short_code_to_count: + short_code_to_count[short_code] = 0 + short_code_to_count[short_code] += 1 + + # Add the annotation name/id into the current dictionary. + waypoint_name = waypoint.annotations.name + if waypoint_name: + if waypoint_name in name_to_id: + # Waypoint name is used for multiple different waypoints, so set the waypoint id + # in this dictionary to None to avoid confusion between two different waypoints. + name_to_id[waypoint_name] = None + else: + # First time we have seen this waypoint annotation name. Add it into the dictionary + # with the respective waypoint unique id. + name_to_id[waypoint_name] = waypoint.id + + # Sort the set of waypoints by their creation timestamp. If the creation timestamp is unavailable, + # fallback to sorting by annotation name. + waypoint_to_timestamp = sorted( + waypoint_to_timestamp, key=lambda x: (x[1], x[2]) + ) + + # Print out the waypoints name, id, and short code in a ordered sorted by the timestamp from + # when the waypoint was created. + logger.info("%d waypoints:" % len(graph.waypoints)) + for waypoint in waypoint_to_timestamp: + self._pretty_print_waypoints( + waypoint[0], waypoint[2], short_code_to_count, localization_id, logger + ) + + for edge in graph.edges: + if edge.id.to_waypoint in edges: + if edge.id.from_waypoint not in edges[edge.id.to_waypoint]: + edges[edge.id.to_waypoint].append(edge.id.from_waypoint) + else: + edges[edge.id.to_waypoint] = [edge.id.from_waypoint] + logger.info( + f"(Edge) from waypoint id: {edge.id.from_waypoint} and to waypoint id: {edge.id.to_waypoint}" + ) + + return name_to_id, edges diff --git a/spot_wrapper/tests/test_graph_nav_util.py b/spot_wrapper/tests/test_graph_nav_util.py new file mode 100755 index 00000000..18ba04d2 --- /dev/null +++ b/spot_wrapper/tests/test_graph_nav_util.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3 +import pytest +import logging + +from bosdyn.api.graph_nav import map_pb2 + +from spot_wrapper.spot_graph_nav import SpotGraphNav + + +class MockSpotGraphNav(SpotGraphNav): + def __init__(self) -> None: + pass + + +# Create a mock SpotGraphNav object to access utility methods +graph_nav_util = MockSpotGraphNav() + + +class TestGraphNavUtilShortCode: + def test_id_to_short_code(self): + assert ( + graph_nav_util._id_to_short_code("ebony-pug-mUzxLNq.TkGlVIxga+UKAQ==") + == "ep" + ) + assert ( + graph_nav_util._id_to_short_code("erose-simian-sug9xpxhCxgft7Mtbhr98A==") + == "es" + ) + + +class TestGraphNavUtilFindUniqueWaypointId: + def test_short_code(self): + # Set up + self.logger = logging.Logger("test_graph_nav_util", level=logging.INFO) + self.graph = map_pb2.Graph() + self.name_to_id = {"ABCDE": "Node1"} + # Test normal short code + assert ( + graph_nav_util._find_unique_waypoint_id( + "AC", self.graph, self.name_to_id, self.logger + ) + == "AC" + ) + # Test annotation name that is known + assert ( + graph_nav_util._find_unique_waypoint_id( + "ABCDE", self.graph, self.name_to_id, self.logger + ) + == "Node1" + ) + # Test annotation name that is unknown + assert ( + graph_nav_util._find_unique_waypoint_id( + "ABCDEF", self.graph, self.name_to_id, self.logger + ) + == "ABCDEF" + ) + + def test_short_code_with_graph(self): + # Set up + self.logger = logging.Logger("test_graph_nav_util", level=logging.INFO) + self.graph = map_pb2.Graph() + self.name_to_id = {"ABCDE": "Node1"} + + # Test short code that is in graph + self.graph.waypoints.add(id="AB-CD-EF") + assert ( + graph_nav_util._find_unique_waypoint_id( + "AC", self.graph, self.name_to_id, self.logger + ) + == "AB-CD-EF" + ) + # Test short code that is not in graph + assert ( + graph_nav_util._find_unique_waypoint_id( + "AD", self.graph, self.name_to_id, self.logger + ) + == "AD" + ) + # Test multiple waypoints with the same short code + self.graph.waypoints.add(id="AB-CD-EF-1") + assert ( + graph_nav_util._find_unique_waypoint_id( + "AC", self.graph, self.name_to_id, self.logger + ) + == "AC" + ) + + +class TestGraphNavUtilUpdateWaypointsEdges: + def test_empty_graph(self): + self.logger = logging.Logger("test_graph_nav_util", level=logging.INFO) + + # Test empty graph + self.graph = map_pb2.Graph() + self.localization_id = "" + graph_nav_util._update_waypoints_and_edges( + self.graph, self.localization_id, self.logger + ) + assert len(self.graph.waypoints) == 0 + assert len(self.graph.edges) == 0 + + def test_one_waypoint(self): + self.logger = logging.Logger("test_graph_nav_util", level=logging.INFO) + + # Test graph with 1 waypoint + self.localization_id = "" + self.graph = map_pb2.Graph() + new_waypoint = map_pb2.Waypoint() + new_waypoint.id = "ABCDE" + new_waypoint.annotations.name = "Node1" + self.graph.waypoints.add( + id=new_waypoint.id, annotations=new_waypoint.annotations + ) + self.name_to_id, self.edges = graph_nav_util._update_waypoints_and_edges( + self.graph, self.localization_id, self.logger + ) + assert len(self.graph.waypoints) == 1 + assert len(self.graph.edges) == 0 + assert len(self.edges) == 0 + assert len(self.name_to_id) == 1 + assert self.name_to_id["Node1"] == "ABCDE" + + def test_two_waypoints_with_edge(self): + self.logger = logging.Logger("test_graph_nav_util", level=logging.INFO) + + # Test graph with 2 waypoints and an edge between them + self.localization_id = "" + self.graph = map_pb2.Graph() + new_waypoint = map_pb2.Waypoint() + new_waypoint.id = "ABCDE" + new_waypoint.annotations.name = "Node1" + self.graph.waypoints.add( + id=new_waypoint.id, annotations=new_waypoint.annotations + ) + new_waypoint.id = "DE" + new_waypoint.annotations.name = "Node2" + self.graph.waypoints.add( + id=new_waypoint.id, annotations=new_waypoint.annotations + ) + new_edge = map_pb2.Edge.Id(from_waypoint="ABCDE", to_waypoint="DE") + + self.graph.edges.add(id=new_edge) + self.name_to_id, self.edges = graph_nav_util._update_waypoints_and_edges( + self.graph, self.localization_id, self.logger + ) + assert len(self.graph.waypoints) == 2 + assert len(self.graph.edges) == 1 + assert len(self.edges) == 1 + assert self.edges["DE"][0] == "ABCDE" + assert len(self.name_to_id) == 2 + assert self.name_to_id["Node1"] == "ABCDE" + assert self.name_to_id["Node2"] == "DE" + + def test_two_waypoints_with_edge_and_localization(self): + self.logger = logging.Logger("test_graph_nav_util", level=logging.INFO) + + # Test graph with 2 waypoints and an edge between them. Mainly affects the pretty print. + self.localization_id = "ABCDE" + self.graph = map_pb2.Graph() + new_waypoint = map_pb2.Waypoint() + new_waypoint.id = "ABCDE" + new_waypoint.annotations.name = "Node1" + self.graph.waypoints.add( + id=new_waypoint.id, annotations=new_waypoint.annotations + ) + new_waypoint.id = "DE" + new_waypoint.annotations.name = "Node2" + self.graph.waypoints.add( + id=new_waypoint.id, annotations=new_waypoint.annotations + ) + new_edge = map_pb2.Edge.Id(from_waypoint="ABCDE", to_waypoint="DE") + + self.graph.edges.add(id=new_edge) + self.name_to_id, self.edges = graph_nav_util._update_waypoints_and_edges( + self.graph, self.localization_id, self.logger + ) + assert len(self.graph.waypoints) == 2 + assert len(self.graph.edges) == 1 + assert len(self.edges) == 1 + assert self.edges["DE"][0] == "ABCDE" + assert len(self.name_to_id) == 2 + assert self.name_to_id["Node1"] == "ABCDE" + assert self.name_to_id["Node2"] == "DE" diff --git a/spot_wrapper/wrapper.py b/spot_wrapper/wrapper.py index 42777b03..3dd116c8 100644 --- a/spot_wrapper/wrapper.py +++ b/spot_wrapper/wrapper.py @@ -38,6 +38,8 @@ from bosdyn.client.image import ImageClient from bosdyn.client.lease import LeaseClient, LeaseKeepAlive from bosdyn.client.manipulation_api_client import ManipulationApiClient +from bosdyn.client.map_processing import MapProcessingServiceClient +from bosdyn.client.point_cloud import build_pc_request from bosdyn.client.payload_registration import PayloadNotAuthorizedError from bosdyn.client.point_cloud import PointCloudClient, build_pc_request from bosdyn.client.power import safe_power_off, PowerClient, power_on @@ -68,12 +70,6 @@ MAX_COMMAND_DURATION = 1e5 VELODYNE_SERVICE_NAME = "velodyne-point-cloud" -### Release -from . import graph_nav_util - -### Debug -# import graph_nav_util - from bosdyn.api import basic_command_pb2 from google.protobuf.timestamp_pb2 import Timestamp @@ -81,10 +77,11 @@ from .spot_check import SpotCheck from .spot_docking import SpotDocking from .spot_eap import SpotEAP +from .spot_graph_nav import SpotGraphNav from .spot_images import SpotImages from .spot_world_objects import SpotWorldObjects -from .wrapper_helpers import RobotCommandData, RobotState +from .wrapper_helpers import RobotCommandData, RobotState, ClaimAndPowerDecorator """Service name for getting pointcloud of VLP16 connected to Spot Core""" point_cloud_sources = ["velodyne-point-cloud"] @@ -379,38 +376,6 @@ def _start_query(self): pass -def try_claim(func=None, *, power_on=False): - """ - Decorator which tries to acquire the lease before executing the wrapped function - - the _func=None and * args are required to allow this decorator to be used with or without arguments - - Args: - func: Function that is being wrapped - power_on: If true, power on after claiming the lease - - Returns: - Decorator which will wrap the decorated function - """ - # If this decorator is being used without the power_on arg, return it as if it was called with that arg specified - if func is None: - return functools.partial(try_claim, power_on=power_on) - - @functools.wraps(func) - def wrapper_try_claim(self, *args, **kwargs): - if self._get_lease_on_action: - if power_on: - # Power on is also wrapped by this decorator so if we request power on the lease will also be claimed - response = self.power_on() - else: - response = self.claim() - if not response[0]: - return response - return func(self, *args, **kwargs) - - return wrapper_try_claim - - class SpotWrapper: """Generic wrapper class to encompass release 1.1.4 API features as well as maintaining leases automatically""" @@ -459,7 +424,10 @@ def __init__( self._rates = rates or {} self._callbacks = callbacks or {} self._use_take_lease = use_take_lease - self._get_lease_on_action = get_lease_on_action + self._claim_decorator = ClaimAndPowerDecorator( + self.power_on, self.claim, get_lease_on_action + ) + self.decorate_functions() self._continually_try_stand = continually_try_stand self._rgb_cameras = rgb_cameras self._frame_prefix = "" @@ -527,6 +495,9 @@ def __init__( self._graph_nav_client = self._robot.ensure_client( GraphNavClient.default_service_name ) + self._map_processing_client = self._robot.ensure_client( + MapProcessingServiceClient.default_service_name + ) self._power_client = self._robot.ensure_client( PowerClient.default_service_name ) @@ -594,9 +565,6 @@ def __init__( ) time.sleep(sleep_secs) - # Store the most recent knowledge of the state of the robot based on rpc calls. - self._init_current_graph_nav_state() - # Async Tasks self._async_task_list = [] self._robot_state_task = AsyncRobotState( @@ -653,6 +621,7 @@ def __init__( self._manipulation_api_client, self._robot_state_client, MAX_COMMAND_DURATION, + self._claim_decorator, ) else: self._spot_arm = None @@ -666,6 +635,16 @@ def __init__( self._command_data, self._docking_client, self._robot_command_client, + self._claim_decorator, + ) + + self._spot_graph_nav = SpotGraphNav( + self._robot, + self._logger, + self._graph_nav_client, + self._map_processing_client, + self._robot_state_client, + self._lease_client, ) if self._point_cloud_client: @@ -701,6 +680,40 @@ def __init__( self._robot_id = None self._lease = None + def decorate_functions(self): + """ + Many of the functions in the wrapper need to have the lease claimed and the robot powered on before they will + function. The TryClaimDecorator object includes a decorator which is the mechanism we use to make sure that + is the case, assuming the get_lease_on_action variable is true. Otherwise, it is up to the user to ensure + that the lease is claimed and the power is on before running commands, otherwise the commands will fail. + """ + decorated_funcs = [ + self.stop, + self.self_right, + self.sit, + self.simple_stand, + self.stand, + self.battery_change_pose, + self.velocity_cmd, + self.trajectory_cmd, + self.navigate_to, + self._navigate_to, + self._navigate_route, + self.execute_dance, + self._robot_command, + self._manipulation_request, + ] + decorated_funcs_no_power = [ + self.stop, + self.power_on, + self.safe_power_off, + self.toggle_power, + ] + + self._claim_decorator.decorate_functions( + self, decorated_funcs, decorated_funcs_no_power + ) + @staticmethod def authenticate( robot: Robot, username: str, password: str, logger: logging.Logger @@ -794,6 +807,11 @@ def robot_name(self) -> str: def frame_prefix(self) -> str: return self._frame_prefix + @property + def spot_graph_nav(self) -> SpotGraphNav: + """Return SpotGraphNav instance""" + return self._spot_graph_nav + @property def spot_arm(self) -> SpotArm: """Return SpotArm instance""" @@ -1136,7 +1154,6 @@ def _manipulation_request( self._logger.error(f"Unable to execute manipulation command: {e}") return False, str(e), None - @try_claim def stop(self) -> typing.Tuple[bool, str]: """ Stop any action the robot is currently doing. @@ -1148,7 +1165,6 @@ def stop(self) -> typing.Tuple[bool, str]: response = self._robot_command(RobotCommandBuilder.stop_command()) return response[0], response[1] - @try_claim(power_on=True) def self_right(self) -> typing.Tuple[bool, str]: """ Have the robot self-right. @@ -1159,7 +1175,6 @@ def self_right(self) -> typing.Tuple[bool, str]: response = self._robot_command(RobotCommandBuilder.selfright_command()) return response[0], response[1] - @try_claim(power_on=True) def sit(self) -> typing.Tuple[bool, str]: """ Stop the robot's motion and sit down if able. @@ -1172,7 +1187,6 @@ def sit(self) -> typing.Tuple[bool, str]: self.last_sit_command = response[2] return response[0], response[1] - @try_claim(power_on=True) def simple_stand(self, monitor_command: bool = True) -> typing.Tuple[bool, str]: """ If the e-stop is enabled, and the motor power is enabled, stand the robot up. @@ -1187,7 +1201,6 @@ def simple_stand(self, monitor_command: bool = True) -> typing.Tuple[bool, str]: self.last_stand_command = response[2] return response[0], response[1] - @try_claim(power_on=True) def stand( self, monitor_command: bool = True, @@ -1231,7 +1244,6 @@ def stand( self.last_stand_command = response[2] return response[0], response[1] - @try_claim(power_on=True) def battery_change_pose(self, dir_hint: int = 1) -> typing.Tuple[bool, str]: """ Put the robot into the battery change pose @@ -1249,7 +1261,6 @@ def battery_change_pose(self, dir_hint: int = 1) -> typing.Tuple[bool, str]: return response[0], response[1] return False, "Call sit before trying to roll over" - @try_claim def safe_power_off(self) -> typing.Tuple[bool, str]: """ Stop the robot's motion and sit if possible. Once sitting, disable motor power. @@ -1277,7 +1288,6 @@ def clear_behavior_fault( except Exception as e: return False, f"Exception while clearing behavior fault: {e}", None - @try_claim def power_on(self) -> typing.Tuple[bool, str]: """ Enable the motor power if e-stop is enabled. @@ -1314,7 +1324,6 @@ def get_mobility_params(self) -> spot_command_pb2.MobilityParams: """Get mobility params""" return self._mobility_params - @try_claim def velocity_cmd( self, v_x: float, v_y: float, v_rot: float, cmd_duration: float = 0.125 ) -> typing.Tuple[bool, str]: @@ -1342,7 +1351,6 @@ def velocity_cmd( self.last_velocity_command_time = end_time return response[0], response[1] - @try_claim def trajectory_cmd( self, goal_x: float, @@ -1530,7 +1538,6 @@ def download_graph(self, download_path: str) -> typing.Tuple[bool, str]: f"Got an error during downloading graph and snapshots from the robot: {e}", ) - @try_claim def navigate_to( self, upload_path, @@ -1817,7 +1824,6 @@ def _download_graph_and_snapshots( ) return True, "Success" - @try_claim def _navigate_to(self, *args): """Navigate to a specific waypoint.""" # Take the first argument as the destination waypoint. @@ -1893,7 +1899,6 @@ def _navigate_to(self, *args): else: return False, "Navigation command is not complete yet." - @try_claim def _navigate_route(self, *args): """Navigate through a specific route of waypoints.""" if len(args) < 1: @@ -1978,7 +1983,6 @@ def _clear_graph(self, *args): self._init_current_graph_nav_state() return result - @try_claim def toggle_power(self, should_power_on): """Power the robot on/off dependent on the current power state.""" is_powered_on = self.check_is_powered_on() @@ -2065,7 +2069,6 @@ def _match_edge(self, current_edges, waypoint1, waypoint2): ) return None - @try_claim def execute_dance(self, data): if self._is_licensed_for_choreography: return self._spot_dance.execute_dance(data) diff --git a/spot_wrapper/wrapper_helpers.py b/spot_wrapper/wrapper_helpers.py index c824cba4..bf2f816b 100644 --- a/spot_wrapper/wrapper_helpers.py +++ b/spot_wrapper/wrapper_helpers.py @@ -1,9 +1,119 @@ """Helper classes for the wrapper. This file is necessary to prevent circular imports caused by the modules also using these classes""" import typing +import functools from dataclasses import dataclass +class ClaimAndPowerDecorator: + """ + Some functions in the wrapper require the lease to be claimed and the robot to be powered on before they can + function. + + This class provides a portable way of wrapping the functions of the wrapper or modules to enable them to do + that. It can be passed around to modules which can then decorate their functions with it, allowing them to claim + and power on as needed. + + Note that this decorator is not intended to be applied using the @ syntax. It should be applied during or after + object instantiation. + """ + + def __init__( + self, + power_on_function: typing.Callable[[], typing.Tuple[bool, str]], + claim_function: typing.Callable[[], typing.Tuple[bool, str]], + get_lease_on_action: bool = False, + ) -> None: + self.power_on = power_on_function + self.claim = claim_function + self._get_lease_on_action = get_lease_on_action + + def _make_function_take_lease_and_power_on( + self, func: typing.Callable, power_on: bool = True + ) -> typing.Callable: + """ + Decorator which tries to acquire the lease before executing the wrapped function + + Args: + func: Function that is being wrapped + power_on: If true, power on after claiming the lease. For the vast majority of cases this is needed + + Returns: + Decorator which will wrap the decorated function + """ + + @functools.wraps(func) + def wrapper_try_claim(*args, **kwargs) -> typing.Callable: + # Note that because we are assuming that this decorator is used only on instantiated classes, + # this function does not take a self arg. The self arg is necessary when using the @ syntax because at + # that point the class has not yet been instantiated. In this case, the func we receive is already a bound + # method, as opposed to an unbound one. A bound function has the "self" instance built in. + if self._get_lease_on_action: + # Ignore success or failure of these functions. If they fail, then the function that is being wrapped + # will fail and the caller will be able to handle from there. + self.claim() + if power_on: + self.power_on() + return func(*args, **kwargs) + + return wrapper_try_claim + + def make_function_take_lease_and_power_on( + self, decorated_object, function: typing.Callable, power_on: bool = True + ) -> None: + """ + Decorate a function of an object with this class's decorator. After being decorated, when the function is + called, it will forcefully take the lease, then power on the robot if that option is specified. + + Args: + decorated_object: The object whose function is to be decorated + function: The function to be decorated + power_on: If true, power on the robot after claiming + + Raises: + AttributeError: if the object passed does not have a function with the same name as the given function + """ + function_name = function.__name__ + if not hasattr(decorated_object, function_name): + raise AttributeError( + f"Requested decoration of function {function_name} of object {decorated_object}, but the object does " + f"not have a function by that name." + ) + + setattr( + decorated_object, + function_name, + self._make_function_take_lease_and_power_on(function, power_on=power_on), + ) + + def decorate_functions( + self, + decorated_object, + decorated_funcs: typing.List[typing.Callable], + decorated_funcs_no_power: typing.Optional[typing.List[typing.Callable]] = None, + ) -> None: + """ + Decorate the specified functions of the given object with this class's decorator. + + Args: + decorated_object: Object which contains the functions to be decorated + decorated_funcs: List of the functions of the object which should be decorated. When called, + these functions will forcefully take the lease and power on the robot + decorated_funcs_no_power: Same as decorated_funcs, but will calling these will not power on the robot. + """ + + for func in decorated_funcs: + self.make_function_take_lease_and_power_on(decorated_object, func) + + if decorated_funcs_no_power is None: + decorated_funcs_no_power = [] + + for func in decorated_funcs_no_power: + self.make_function_take_lease_and_power_on( + decorated_object, func, power_on=False + ) + + @dataclass() class RobotState: """