-
Notifications
You must be signed in to change notification settings - Fork 6
add Queue handling in solver_trajectory_subscriber #20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
Sergim96
wants to merge
5
commits into
RobotMotorIntelligence:devel
Choose a base branch
from
Sergim96:topic/queue
base: devel
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
96fd322
add Queue handling in solver_trajectory_subscriber
Sergim96 6f8f3cf
[copyright] Updated copyright of updated files
cmastalli 5b9b73c
[core] Used const expressions
cmastalli 0ab10f8
[pre-commit] Run pre-commit
cmastalli 04fdb6e
[changelog] Updated
cmastalli File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,7 @@ | ||
| /////////////////////////////////////////////////////////////////////////////// | ||
| // BSD 3-Clause License | ||
| // | ||
| // Copyright (C) 2023-2023, Heriot-Watt University | ||
| // Copyright (C) 2023-2025, Heriot-Watt University | ||
| // Copyright note valid unless otherwise stated in individual files. | ||
| // All rights reserved. | ||
| /////////////////////////////////////////////////////////////////////////////// | ||
|
|
@@ -47,7 +47,8 @@ class SolverTrajectoryRosSubscriber { | |
| topic, 1, | ||
| std::bind(&SolverTrajectoryRosSubscriber::callback, this, | ||
| std::placeholders::_1))), | ||
| has_new_msg_(false), is_processing_msg_(false), last_msg_time_(0.) { | ||
| has_new_msg_(false), is_processing_msg_(false), last_msg_time_(0.), | ||
| communication_delay_(0.) { | ||
| spinner_.add_node(node_); | ||
| thread_ = std::thread([this]() { this->spin(); }); | ||
| thread_.detach(); | ||
|
|
@@ -58,7 +59,8 @@ class SolverTrajectoryRosSubscriber { | |
| sub_(node_.subscribe<SolverTrajectory>( | ||
| topic, 1, &SolverTrajectoryRosSubscriber::callback, this, | ||
| ros::TransportHints().tcpNoDelay())), | ||
| has_new_msg_(false), is_processing_msg_(false), last_msg_time_(0.) { | ||
| has_new_msg_(false), is_processing_msg_(false), last_msg_time_(0.), | ||
| communication_delay_(0.) { | ||
| spinner_.start(); | ||
| ROS_INFO_STREAM("Subscribing SolverTrajectory messages on " << topic); | ||
| #endif | ||
|
|
@@ -120,11 +122,288 @@ class SolverTrajectoryRosSubscriber { | |
| return {ts_, dts_, xs_, dxs_, us_, Ks_, types_, params_}; | ||
| } | ||
|
|
||
| /** | ||
| * @brief Retrieve the next solver trajectory reference point ready for | ||
| * execution. | ||
| * | ||
| * This function returns the first trajectory point in the internal queue that | ||
| * is ready to be executed, based on the current system time corrected by the | ||
| * communication delay. | ||
| * | ||
| * The logic proceeds as follows: | ||
| * | ||
| * - If the queue is empty, an exception is thrown. | ||
| * - If the queue has only one element and it is already expired (i.e., its | ||
| * time interval has passed), the queue is cleared and an exception is thrown. | ||
| * - If the timestamp of the front element is earlier than or equal to the | ||
| * current time (minus communication delay), it is considered ready. Any | ||
| * earlier entries that have also expired are dropped to ensure the returned | ||
| * reference is the most recent one still valid. | ||
| * - If no point is ready for execution, an exception is thrown with | ||
| * diagnostic information. | ||
| * | ||
| * This function should only be called after confirming readiness via | ||
| * `process_queue()`. | ||
| * | ||
| * @return A tuple containing: | ||
| * - `time` (start of the interval), | ||
| * - `duration`, | ||
| * - `state` (`x`), | ||
| * - `state derivative` (`dx`), | ||
| * - `feed-forward control` (`u`), | ||
| * - `feedback gain` (`K`), | ||
| * - `control type`, | ||
| * - `control parametrization`. | ||
| * | ||
| * @throws std::runtime_error if the queue is empty, or if no point is ready | ||
| * for execution. | ||
| */ | ||
| std::tuple<double, double, Eigen::VectorXd, Eigen::VectorXd, Eigen::VectorXd, | ||
| Eigen::MatrixXd, crocoddyl_msgs::ControlType, | ||
| crocoddyl_msgs::ControlParametrization> | ||
| get_current_reference() { | ||
| #ifdef ROS2 | ||
| const double now = node_->get_clock()->now().seconds(); | ||
| #else | ||
| const double now = ros::Time::now().toSec(); | ||
| #endif | ||
|
|
||
| if (ts_queue_.empty()) { | ||
| throw std::runtime_error("Reference queue is empty."); | ||
| } | ||
| const double t0 = ts_queue_.front(); | ||
| const double dt0 = dts_queue_.front(); | ||
| const double t_now = now - communication_delay_; | ||
| // Handle the case of only one element and it's too old | ||
| if (ts_queue_.size() == 1 && t_now > t0 + dt0) { | ||
| ts_queue_.clear(); | ||
| dts_queue_.clear(); | ||
| xs_queue_.clear(); | ||
| dxs_queue_.clear(); | ||
| us_queue_.clear(); | ||
| Ks_queue_.clear(); | ||
| types_queue_.clear(); | ||
| params_queue_.clear(); | ||
| throw std::runtime_error( | ||
| "[SolverTrajectoryRosSubscriber::get_current_reference] " | ||
| "Single remaining point is too old. Queue cleared."); | ||
| } | ||
| // Check if the first reference is ready | ||
| if (t0 <= t_now) { | ||
| // Check if the next one is also ready | ||
| while (ts_queue_.size() >= 2 && ts_queue_[1] <= t_now) { | ||
| ts_queue_.pop_front(); | ||
| dts_queue_.pop_front(); | ||
| xs_queue_.pop_front(); | ||
| dxs_queue_.pop_front(); | ||
| us_queue_.pop_front(); | ||
| Ks_queue_.pop_front(); | ||
| types_queue_.pop_front(); | ||
| params_queue_.pop_front(); | ||
| } | ||
| return {ts_queue_.front(), dts_queue_.front(), xs_queue_.front(), | ||
| dxs_queue_.front(), us_queue_.front(), Ks_queue_.front(), | ||
| types_queue_.front(), params_queue_.front()}; | ||
| } | ||
| // Nothing ready | ||
| std::ostringstream oss; | ||
| oss << std::fixed << std::setprecision(5); | ||
| oss << "[SolverTrajectoryRosSubscriber::get_current_reference] \n" | ||
| << "No point ready. Waiting window not reached. \n" | ||
| << "Next timestamp: " << t0 << "\n" | ||
| << "Current time: " << now << "\n" | ||
| << "Communication delay: " << communication_delay_ << "\n" | ||
| << "Current time + delay: " << t_now << "\n" | ||
| << "Time to wait: " << (t0 - t_now) << "s"; | ||
| throw std::runtime_error(oss.str()); | ||
| } | ||
|
|
||
| /** | ||
| * @brief Process any new incoming trajectory message and update the internal | ||
| * execution queue. | ||
| * | ||
| * This method integrates new `SolverTrajectory` messages into the | ||
| * subscriber's internal trajectory queue based on timing logic. It handles | ||
| * message merging and replacement depending on the temporal relationship | ||
| * between the new and current trajectories. | ||
| * | ||
| * Behavior falls into four categories: | ||
| * | ||
| * 1. **Reject Old Message:** | ||
| * If the new message starts significantly earlier than the current queue | ||
| * (i.e., `t0_new + delay < t0_current`), it is rejected. This guards | ||
| * against receiving stale messages that are out of date. | ||
| * | ||
| * 2. **Replace (Same Start Time):** | ||
| * If the new message starts at approximately the same time as the current | ||
| * queue (within `communication_delay_`), the entire queue is cleared and | ||
| * replaced with the new one. This models re-planning from the same initial | ||
| * state. | ||
| * | ||
| * 3. **Append (Starts After Current Ends):** | ||
| * If the new message starts after the current trajectory has finished | ||
| * executing (i.e., `tN_current + delay < t0_new`), the new message is | ||
| * appended to the end of the queue. This allows smooth extension of the | ||
| * current plan. | ||
| * | ||
| * 4. **Merge (Partial Overlap):** | ||
| * If the new message overlaps partially with the current queue, the prefix | ||
| * of the current queue up to `t0_new + delay` is preserved and the new | ||
| * message is appended after that. This allows partial updates to a | ||
| * trajectory in progress. | ||
| * | ||
| * After any processing, the method checks whether the front of the queue is | ||
| * ready to be executed by comparing its timestamp (adjusted for communication | ||
| * delay) with the current time. A `true` return indicates that a valid | ||
| * reference is available. | ||
| * | ||
| * @return True if a reference in the queue is ready to be executed; False | ||
| * otherwise. | ||
| */ | ||
| bool process_queue() { | ||
| #ifdef ROS2 | ||
| const double now = node_->get_clock()->now().seconds(); | ||
| #else | ||
| const double now = ros::Time::now().toSec(); | ||
| #endif | ||
| const double t_now = now - communication_delay_; | ||
| // STEP 1: Handle new message if available | ||
| if (has_new_msg_) { | ||
| const auto &[ts_new, dts_new, xs_new, dxs_new, us_new, Ks_new, types_new, | ||
| params_new] = get_solver_trajectory(); | ||
|
|
||
| if (ts_new.empty()) { | ||
| throw std::runtime_error("[SolverTrajectoryRosSubscriber::process_" | ||
| "queue] Received empty trajectory."); | ||
| } | ||
| const double t0_new = ts_new.front(); | ||
| const double t0_cur = ts_queue_.empty() ? t0_new : ts_queue_.front(); | ||
| const double tN_cur = | ||
| ts_queue_.empty() ? t0_new : ts_queue_.back() + dts_queue_.back(); | ||
| // Reject if new message is older than current queue (t0_new << t0_cur) | ||
| if (!ts_queue_.empty() && | ||
| t0_new + communication_delay_ < ts_queue_.front()) { | ||
| std::cerr << std::fixed << std::setprecision(5); | ||
| std::cerr << "[SolverTrajectoryRosSubscriber] Rejecting old message: " | ||
| "t0_new = " | ||
| << t0_new << ", current queue t0 = " << ts_queue_.front() | ||
| << std::endl; | ||
| if (!ts_queue_.empty() && ts_queue_.front() <= t_now) { | ||
| return true; | ||
| } else { | ||
| return false; | ||
| } | ||
| } | ||
| // Replace current queue if new message starts at the same time | ||
| // (within communication delay) | ||
| if (std::abs(t0_new - t0_cur) < communication_delay_) { | ||
| ts_queue_.clear(); | ||
| dts_queue_.clear(); | ||
| xs_queue_.clear(); | ||
| dxs_queue_.clear(); | ||
| us_queue_.clear(); | ||
| Ks_queue_.clear(); | ||
| types_queue_.clear(); | ||
| params_queue_.clear(); | ||
| for (std::size_t i = 0; i < ts_new.size(); ++i) { | ||
| ts_queue_.push_back(ts_new[i]); | ||
| dts_queue_.push_back(dts_new[i]); | ||
| xs_queue_.push_back(xs_new[i]); | ||
| dxs_queue_.push_back(dxs_new[i]); | ||
| us_queue_.push_back(us_new[i]); | ||
| Ks_queue_.push_back(Ks_new[i]); | ||
| types_queue_.push_back(types_new[i]); | ||
| params_queue_.push_back(params_new[i]); | ||
| } | ||
| // Append new message if it starts after current queue ends | ||
| } else if (tN_cur + communication_delay_ < t0_new) { | ||
| for (std::size_t i = 0; i < ts_new.size(); ++i) { | ||
| ts_queue_.push_back(ts_new[i]); | ||
| dts_queue_.push_back(dts_new[i]); | ||
| xs_queue_.push_back(xs_new[i]); | ||
| dxs_queue_.push_back(dxs_new[i]); | ||
| us_queue_.push_back(us_new[i]); | ||
| Ks_queue_.push_back(Ks_new[i]); | ||
| types_queue_.push_back(types_new[i]); | ||
| params_queue_.push_back(params_new[i]); | ||
| } | ||
| // Merge new message with current queue if it partially overlaps | ||
| } else { | ||
| std::deque<double> ts_merged; | ||
| std::deque<double> dts_merged; | ||
| std::deque<Eigen::VectorXd> xs_merged; | ||
| std::deque<Eigen::VectorXd> dxs_merged; | ||
| std::deque<Eigen::VectorXd> us_merged; | ||
| std::deque<Eigen::MatrixXd> Ks_merged; | ||
| std::deque<crocoddyl_msgs::ControlType> types_merged; | ||
| std::deque<crocoddyl_msgs::ControlParametrization> params_merged; | ||
| // Preserve valid portion of current queue (before t0_new + delay) | ||
| for (std::size_t i = 0; i < ts_queue_.size(); ++i) { | ||
| if (ts_queue_[i] < t0_new + communication_delay_) { | ||
| ts_merged.push_back(ts_queue_[i]); | ||
| dts_merged.push_back(dts_queue_[i]); | ||
| xs_merged.push_back(xs_queue_[i]); | ||
| dxs_merged.push_back(dxs_queue_[i]); | ||
| us_merged.push_back(us_queue_[i]); | ||
| Ks_merged.push_back(Ks_queue_[i]); | ||
| types_merged.push_back(types_queue_[i]); | ||
| params_merged.push_back(params_queue_[i]); | ||
| } else { | ||
| break; | ||
| } | ||
| } | ||
| // Append new message after preserved section | ||
| for (std::size_t i = 0; i < ts_new.size(); ++i) { | ||
| ts_merged.push_back(ts_new[i]); | ||
| dts_merged.push_back(dts_new[i]); | ||
| xs_merged.push_back(xs_new[i]); | ||
| dxs_merged.push_back(dxs_new[i]); | ||
| us_merged.push_back(us_new[i]); | ||
| Ks_merged.push_back(Ks_new[i]); | ||
| types_merged.push_back(types_new[i]); | ||
| params_merged.push_back(params_new[i]); | ||
| } | ||
| std::swap(ts_queue_, ts_merged); | ||
| std::swap(dts_queue_, dts_merged); | ||
| std::swap(xs_queue_, xs_merged); | ||
| std::swap(dxs_queue_, dxs_merged); | ||
| std::swap(us_queue_, us_merged); | ||
| std::swap(Ks_queue_, Ks_merged); | ||
| std::swap(types_queue_, types_merged); | ||
| std::swap(params_queue_, params_merged); | ||
| } | ||
| } | ||
| // STEP 2: Determine if queue has a reference ready for execution | ||
| while (ts_queue_.size() >= 2 && | ||
| ts_queue_.front() + dts_queue_.front() <= t_now) { | ||
| ts_queue_.pop_front(); | ||
| dts_queue_.pop_front(); | ||
| xs_queue_.pop_front(); | ||
| dxs_queue_.pop_front(); | ||
| us_queue_.pop_front(); | ||
| Ks_queue_.pop_front(); | ||
| types_queue_.pop_front(); | ||
| params_queue_.pop_front(); | ||
| } | ||
| // Now check the head entry | ||
| if (!ts_queue_.empty() && ts_queue_.front() <= t_now && | ||
| t_now <= ts_queue_.front() + dts_queue_.front()) { | ||
| return true; | ||
| } else { | ||
| return false; | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * @brief Indicate whether we have received a new message | ||
| */ | ||
| bool has_new_msg() const { return has_new_msg_; } | ||
|
|
||
| double get_communication_delay() { | ||
| std::lock_guard<std::mutex> guard(mutex_); | ||
| return communication_delay_; | ||
| } | ||
|
|
||
| private: | ||
| #ifdef ROS2 | ||
| std::shared_ptr<rclcpp::Node> node_; | ||
|
|
@@ -143,6 +422,7 @@ class SolverTrajectoryRosSubscriber { | |
| bool is_processing_msg_; //!< Indicate when we are processing the message | ||
| double last_msg_time_; //!< Last message time needed to ensure each message is | ||
| //!< newer | ||
| double communication_delay_; | ||
| std::vector<double> ts_; | ||
| std::vector<double> dts_; | ||
| std::vector<Eigen::VectorXd> xs_; | ||
|
|
@@ -151,31 +431,42 @@ class SolverTrajectoryRosSubscriber { | |
| std::vector<Eigen::MatrixXd> Ks_; | ||
| std::vector<crocoddyl_msgs::ControlType> types_; | ||
| std::vector<crocoddyl_msgs::ControlParametrization> params_; | ||
| std::deque<double> ts_queue_; | ||
| std::deque<double> dts_queue_; | ||
| std::deque<Eigen::VectorXd> xs_queue_; | ||
| std::deque<Eigen::VectorXd> dxs_queue_; | ||
| std::deque<Eigen::VectorXd> us_queue_; | ||
| std::deque<Eigen::MatrixXd> Ks_queue_; | ||
| std::deque<crocoddyl_msgs::ControlType> types_queue_; | ||
| std::deque<crocoddyl_msgs::ControlParametrization> params_queue_; | ||
|
|
||
| void callback(SolverTrajectorySharedPtr msg) { | ||
| if (!is_processing_msg_) { | ||
| #ifdef ROS2 | ||
| double t = rclcpp::Time(msg->header.stamp).seconds(); | ||
| const double msg_time = rclcpp::Time(msg->header.stamp).seconds(); | ||
| const double now = node_->get_clock()->now().seconds(); | ||
| #else | ||
| double t = msg->header.stamp.toNSec(); | ||
| const double msg_time = msg->header.stamp.toSec(); | ||
| const double now = ros::Time::now().toSec(); | ||
| #endif | ||
| // Avoid out of order arrival and ensure each message is newer (or equal | ||
| // to) than the preceeding: | ||
| if (last_msg_time_ <= t) { | ||
| communication_delay_ = now - msg_time; | ||
| if (last_msg_time_ <= msg_time) { | ||
| // std::cout << "Adding new message with initial time: " << | ||
| // msg->intervals[0].time << std::endl; | ||
|
Comment on lines
+454
to
+455
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we remove this?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes sorry! I forgot to remove that |
||
| std::lock_guard<std::mutex> guard(mutex_); | ||
| msg_ = *msg; | ||
| has_new_msg_ = true; | ||
| last_msg_time_ = t; | ||
| last_msg_time_ = msg_time; | ||
| } else { | ||
| #ifdef ROS2 | ||
| RCLCPP_WARN_STREAM(node_->get_logger(), | ||
| "Out of order message. Last timestamp: " | ||
| << std::fixed << last_msg_time_ | ||
| << ", current timestamp: " << t); | ||
| << ", current timestamp: " << msg_time); | ||
| #else | ||
| ROS_WARN_STREAM("Out of order message. Last timestamp: " | ||
| << std::fixed << last_msg_time_ | ||
| << ", current timestamp: " << t); | ||
| << ", current timestamp: " << msg_time); | ||
| #endif | ||
| } | ||
| } | ||
|
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's allocate data for merged objects as well.