diff --git a/spot_wrapper/wrapper.py b/spot_wrapper/wrapper.py index 1d99bc86..52126523 100644 --- a/spot_wrapper/wrapper.py +++ b/spot_wrapper/wrapper.py @@ -373,10 +373,10 @@ def __init__( self._spot_wrapper: SpotWrapper = spot_wrapper def _start_query(self) -> None: - if self._spot_wrapper._last_stand_command != None: + if self._spot_wrapper.last_stand_command is not None: try: response = self._client.robot_command_feedback( - self._spot_wrapper._last_stand_command + self._spot_wrapper.last_stand_command ) status = ( response.feedback.synchronized_feedback.mobility_command_feedback.stand_feedback.status @@ -384,7 +384,7 @@ def _start_query(self) -> None: self._spot_wrapper.is_sitting = False if status == basic_command_pb2.StandCommand.Feedback.STATUS_IS_STANDING: self._spot_wrapper.is_standing = True - self._spot_wrapper._last_stand_command = None + self._spot_wrapper.last_stand_command = None elif ( status == basic_command_pb2.StandCommand.Feedback.STATUS_IN_PROGRESS ): @@ -394,38 +394,38 @@ def _start_query(self) -> None: self._spot_wrapper.is_standing = False except (ResponseError, RpcError) as e: self._logger.error("Error when getting robot command feedback: %s", e) - self._spot_wrapper._last_stand_command = None + self._spot_wrapper.last_stand_command = None - if self._spot_wrapper._last_sit_command != None: + if self._spot_wrapper.last_sit_command is not None: try: self._spot_wrapper.is_standing = False response = self._client.robot_command_feedback( - self._spot_wrapper._last_sit_command + self._spot_wrapper.last_sit_command ) if ( response.feedback.synchronized_feedback.mobility_command_feedback.sit_feedback.status == basic_command_pb2.SitCommand.Feedback.STATUS_IS_SITTING ): self._spot_wrapper.is_sitting = True - self._spot_wrapper._last_sit_command = None + self._spot_wrapper.last_sit_command = None else: self._spot_wrapper.is_sitting = False except (ResponseError, RpcError) as e: self._logger.error("Error when getting robot command feedback: %s", e) - self._spot_wrapper._last_sit_command = None + self._spot_wrapper.last_sit_command = None is_moving = False - if self._spot_wrapper._last_velocity_command_time != None: - if time.time() < self._spot_wrapper._last_velocity_command_time: + if self._spot_wrapper.last_velocity_command_time is not None: + if time.time() < self._spot_wrapper.last_velocity_command_time: is_moving = True else: - self._spot_wrapper._last_velocity_command_time = None + self._spot_wrapper.last_velocity_command_time = None - if self._spot_wrapper._last_trajectory_command != None: + if self._spot_wrapper.last_trajectory_command is not None: try: response = self._client.robot_command_feedback( - self._spot_wrapper._last_trajectory_command + self._spot_wrapper.last_trajectory_command ) status = ( response.feedback.synchronized_feedback.mobility_command_feedback.se2_trajectory_feedback.status @@ -438,12 +438,12 @@ def _start_query(self) -> None: or ( status == basic_command_pb2.SE2TrajectoryCommand.Feedback.STATUS_NEAR_GOAL - and not self._spot_wrapper._last_trajectory_command_precise + and not self._spot_wrapper.last_trajectory_command_precise ) ): self._spot_wrapper.at_goal = True # Clear the command once at the goal - self._spot_wrapper._last_trajectory_command = None + self._spot_wrapper.last_trajectory_command = None self._spot_wrapper._trajectory_status_unknown = False elif ( status @@ -460,18 +460,21 @@ def _start_query(self) -> None: status == basic_command_pb2.SE2TrajectoryCommand.Feedback.STATUS_UNKNOWN ): - self._spot_wrapper._trajectory_status_unknown = True - self._spot_wrapper._last_trajectory_command = None + self._spot_wrapper.trajectory_status_unknown = True + self._spot_wrapper.last_trajectory_command = None else: self._logger.error( "Received trajectory command status outside of expected range, value is {}".format( status ) ) - self._spot_wrapper._last_trajectory_command = None + self._spot_wrapper.last_trajectory_command = None except (ResponseError, RpcError) as e: self._logger.error("Error when getting robot command feedback: %s", e) - self._spot_wrapper._last_trajectory_command = None + self._spot_wrapper.last_trajectory_command = None + + if self._spot_wrapper.last_navigate_to_command is not None: + is_moving = True self._spot_wrapper.is_moving = is_moving @@ -481,10 +484,10 @@ def _start_query(self) -> None: self._spot_wrapper.is_standing and self._spot_wrapper._continually_try_stand and not self._spot_wrapper.is_moving - and self._spot_wrapper._last_trajectory_command is not None - and self._spot_wrapper._last_stand_command is not None - and self._spot_wrapper._last_velocity_command_time is not None - and self._spot_wrapper._last_docking_command is not None + and self._spot_wrapper.last_trajectory_command is not None + and self._spot_wrapper.last_stand_command is not None + and self._spot_wrapper.last_velocity_command_time is not None + and self._spot_wrapper.last_docking_command is not None ): self._spot_wrapper.stand(False) @@ -575,6 +578,24 @@ class RobotState: near_goal: bool = False +@dataclass() +class RobotCommandData: + """ + Store data about the commands the wrapper sends to the SDK. Running a command returns an integer value + representing that command's ID. These values are used to monitor the progress of the command and modify attributes + of RobotState accordingly. The values should be reset to none when the command completes. + """ + + last_stand_command: typing.Optional[int] = None + last_sit_command: typing.Optional[int] = None + last_docking_command: typing.Optional[int] = None + last_trajectory_command: typing.Optional[int] = None + # Was the last trajectory command requested to be precise + last_trajectory_command_precise: typing.Optional[bool] = None + last_velocity_command_time: typing.Optional[float] = None + last_navigate_to_command: typing.Optional[int] = None + + class SpotWrapper: """Generic wrapper class to encompass release 1.1.4 API features as well as maintaining leases automatically""" @@ -637,13 +658,8 @@ def __init__( self._mobility_params = RobotCommandBuilder.mobility_params() self._state = RobotState() self._trajectory_status_unknown = False - self._last_robot_command_feedback = False - self._last_stand_command = None - self._last_sit_command = None - self._last_trajectory_command = None - self._last_trajectory_command_precise = None - self._last_velocity_command_time = None - self._last_docking_command = None + self._command_data = RobotCommandData() + self._state_navigate_to_valid = None self._front_image_requests = [] for source in front_image_sources: @@ -1122,6 +1138,62 @@ def at_goal(self) -> bool: def at_goal(self, state: bool) -> None: self._state.at_goal = state + @property + def last_stand_command(self) -> typing.Optional[int]: + return self._command_data.last_stand_command + + @last_stand_command.setter + def last_stand_command(self, command_id: int) -> None: + self._command_data.last_stand_command = command_id + + @property + def last_sit_command(self) -> typing.Optional[int]: + return self._command_data.last_sit_command + + @last_sit_command.setter + def last_sit_command(self, command_id: int) -> None: + self._command_data.last_sit_command = command_id + + @property + def last_docking_command(self) -> typing.Optional[int]: + return self._command_data.last_docking_command + + @last_docking_command.setter + def last_docking_command(self, command_id: int) -> None: + self._command_data.last_docking_command = command_id + + @property + def last_trajectory_command(self) -> typing.Optional[int]: + return self._command_data.last_trajectory_command + + @last_trajectory_command.setter + def last_trajectory_command(self, command_id: int) -> None: + self._command_data.last_trajectory_command = command_id + + @property + def last_trajectory_command_precise(self) -> typing.Optional[bool]: + return self._command_data.last_trajectory_command_precise + + @last_trajectory_command_precise.setter + def last_trajectory_command_precise(self, was_precise: bool) -> None: + self._command_data.last_trajectory_command_precise = was_precise + + @property + def last_velocity_command_time(self) -> typing.Optional[float]: + return self._command_data.last_velocity_command_time + + @last_velocity_command_time.setter + def last_velocity_command_time(self, command_time: float) -> None: + self._command_data.last_velocity_command_time = command_time + + @property + def last_navigate_to_command(self) -> typing.Optional[int]: + return self._command_data.last_navigate_to_command + + @last_navigate_to_command.setter + def last_navigate_to_command(self, command_id: int) -> None: + self._command_data.last_navigate_to_command = command_id + def is_estopped(self, timeout: typing.Optional[float] = None) -> bool: return self._robot.is_estopped(timeout=timeout) @@ -1333,7 +1405,7 @@ def sit(self) -> typing.Tuple[bool, str]: """ response = self._robot_command(RobotCommandBuilder.synchro_sit_command()) - self._last_sit_command = response[2] + self.last_sit_command = response[2] return response[0], response[1] @try_claim(power_on=True) @@ -1348,7 +1420,7 @@ def simple_stand(self, monitor_command: bool = True) -> typing.Tuple[bool, str]: RobotCommandBuilder.synchro_stand_command(params=self._mobility_params) ) if monitor_command: - self._last_stand_command = response[2] + self.last_stand_command = response[2] return response[0], response[1] @try_claim(power_on=True) @@ -1392,7 +1464,7 @@ def stand( ) if monitor_command: - self._last_stand_command = response[2] + self.last_stand_command = response[2] return response[0], response[1] @try_claim(power_on=True) @@ -1503,7 +1575,7 @@ def velocity_cmd( end_time_secs=end_time, timesync_endpoint=self._robot.time_sync.endpoint, ) - self._last_velocity_command_time = end_time + self.last_velocity_command_time = end_time return response[0], response[1] @try_claim @@ -1538,7 +1610,7 @@ def trajectory_cmd( self._trajectory_status_unknown = False self.at_goal = False self.near_goal = False - self._last_trajectory_command_precise = precise_position + self.last_trajectory_command_precise = precise_position self._logger.info("got command duration of {}".format(cmd_duration)) end_time = time.time() + cmd_duration if frame_name == "vision": @@ -1580,7 +1652,7 @@ def trajectory_cmd( else: raise ValueError("frame_name must be 'vision' or 'odom'") if response[0]: - self._last_trajectory_command = response[2] + self.last_trajectory_command = response[2] return response[0], response[1] def robot_command( @@ -1694,6 +1766,58 @@ def download_graph(self, download_path: str) -> typing.Tuple[bool, str]: f"Got an error during downloading graph and snapshots from the robot: {e}", ) + def set_localization_fiducial(self) -> typing.Tuple[bool, str]: + """Localize a robot according to near fiducials based on current graph and snapshots in the robot. + + Returns: + (bool, str) tuple indicating whether the command was successfully sent, and a message. + """ + try: + self._set_initial_localization_fiducial() + return True, "Success" + except Exception as e: + return False, f"Got an error while localizing the robot with fiducials: {e}" + + def set_localization_waypoint(self, waypoint_id: str) -> typing.Tuple[bool, str]: + """Localize a robot to specified waypoint based on current graph and snapshots in the robot.. + + Args: + waypoint_id (int): ID of waypoint to be localized. + + Returns: + (bool, str) tuple indicating whether the command was successfully sent, and a message. + """ + try: + resp = self._set_initial_localization_waypoint(waypoint_id) + return resp[0], resp[1] + except Exception as e: + return ( + False, + f"Got an error while localizing the robot to the waypoint {waypoint_id}: {e}", + ) + + def cancel_navigate_to(self) -> None: + """Cancel navigation of a robot from start_navigate_to()""" + self._cancel_navigate_to() + + @try_claim + def start_navigate_to( + self, target_waypoint_id: str + ) -> typing.Tuple[bool, str, str]: + """Navigate a robot to specified waypoint id with GraphNav + + Args: + target_waypoint_id (str) : Waypont id string for where to goal + + Returns: + result (bool) : success flag + message (str) : message about result + state (str) : state of result. options are 'aborted', 'failed', 'preempted', 'succeeded'. + """ + self._get_localization_state() + resp = self._start_navigate_to(target_waypoint_id) + return resp + @try_claim def navigate_to( self, @@ -1731,7 +1855,7 @@ def navigate_to( if initial_localization_fiducial: self._set_initial_localization_fiducial() if initial_localization_waypoint: - self._set_initial_localization_waypoint([initial_localization_waypoint]) + self._set_initial_localization_waypoint(initial_localization_waypoint) self._list_graph_waypoint_and_edge_ids() self._get_localization_state() resp = self._navigate_to([navigate_to]) @@ -2222,7 +2346,7 @@ def _get_localization_state(self, *args): "Got robot state in kinematic odometry frame: \n%s" % str(odom_tform_body) ) - def _set_initial_localization_fiducial(self, *args): + def _set_initial_localization_fiducial(self) -> None: """Trigger localization when near a fiducial.""" robot_state = self._robot_state_client.get_robot_state() current_odom_tform_body = get_odom_tform_body( @@ -2236,22 +2360,28 @@ def _set_initial_localization_fiducial(self, *args): ko_tform_body=current_odom_tform_body, ) - def _set_initial_localization_waypoint(self, *args): - """Trigger localization to a waypoint.""" + def _set_initial_localization_waypoint( + self, waypoint_id: str + ) -> typing.Tuple[bool, str]: + """Trigger localization to a waypoint. + + Args: + waypoint_id (str): ID of waypoint to be localized. + + Returns: + result (bool): success flag of localization + message (str): message about result. + """ # Take the first argument as the localization waypoint. - if len(args) < 1: - # If no waypoint id is given as input, then return without initializing. - self._logger.error("No waypoint specified to initialize to.") - return destination_waypoint = graph_nav_util.find_unique_waypoint_id( - args[0][0], + waypoint_id, self._current_graph, self._current_annotation_name_to_wp_id, self._logger, ) if not destination_waypoint: # Failed to find the unique waypoint id. - return + return False, "Failed to find the unique waypoint id." robot_state = self._robot_state_client.get_robot_state() current_odom_tform_body = get_odom_tform_body( @@ -2270,6 +2400,8 @@ def _set_initial_localization_waypoint(self, *args): ko_tform_body=current_odom_tform_body, ) + return True, "Success" + def _list_graph_waypoint_and_edge_ids(self, *args): """List the waypoint ids and edge ids of the graph currently on the robot.""" @@ -2441,6 +2573,81 @@ def _download_graph_and_snapshots( ) return True, "Success" + def _cancel_navigate_to(self) -> None: + self._state_navigate_to_valid = False + + @try_claim + def _start_navigate_to( + self, target_waypoint_id: str + ) -> typing.Tuple[bool, str, str]: + self._lease = self._lease_wallet.get_lease() + destination_waypoint = graph_nav_util.find_unique_waypoint_id( + target_waypoint_id, + self._current_graph, + self._current_annotation_name_to_wp_id, + self._logger, + ) + + if not destination_waypoint: + self.logger.error( + "Failed to find waypoint %s in current graph.", target_waypoint_id + ) + return ( + False, + f"Failed to find waypoint {target_waypoint_id} in current graph.", + "aborted", + ) + + self.logger.info("Starting navigation to %s", destination_waypoint) + + self._lease = self._lease_wallet.advance() + sublease = self._lease.create_sublease() + self._lease_keepalive.shutdown() + + self._state_navigate_to_valid = True + nav_to_cmd_id = None + while self._state_navigate_to_valid: + time.sleep(0.5) + try: + nav_to_cmd_id = self._graph_nav_client.navigate_to( + destination_waypoint, + 1.0, + leases=[sublease.lease_proto], + command_id=nav_to_cmd_id, + ) + self.last_navigate_to_command = nav_to_cmd_id + except ResponseError as e: + self._logger.error("Error while navitation: %s", e) + break + + if self._check_success(nav_to_cmd_id): + break + + self.last_navigate_to_command = None + self._lease = self._lease_wallet.advance() + self._lease_keepalive = LeaseKeepAlive(self._lease_client) + + if not self._state_navigate_to_valid: + return False, "Navigation is canceled", "preempted" + + status = self._graph_nav_client.navigation_feedback(nav_to_cmd_id) + if ( + status.status + == graph_nav_pb2.NavigationFeedbackResponse.STATUS_REACHED_GOAL + ): + return True, "Successfully completed the navigation commands!", "succeeded" + elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_LOST: + return False, "Robot got lost when navigating the route,", "failed" + elif status.status == graph_nav_pb2.NavigationFeedbackResponse.STATUS_STUCK: + return False, "Robot got stuck when navigating the route,", "failed" + elif ( + status.status + == graph_nav_pb2.NavigationFeedbackResponse.STATUS_ROBOT_IMPAIRED + ): + return False, "Robot is impaired.", "failed" + else: + return False, "Navigation command is not complete yet.", "failed" + @try_claim def _navigate_to(self, *args): """Navigate to a specific waypoint.""" @@ -2697,11 +2904,11 @@ def dock(self, dock_id): self._robot.power_on() self.stand() # Dock the robot - self._last_docking_command = dock_id + self.last_docking_command = dock_id blocking_dock_robot(self._robot, dock_id) - self._last_docking_command = None + self.last_docking_command = None # Necessary to reset this as docking often causes the last stand command to go into an unknown state - self._last_stand_command = None + self.last_stand_command = None return True, "Success" except Exception as e: return False, f"Exception while trying to dock: {e}"