-
Notifications
You must be signed in to change notification settings - Fork 1
Make MVEC Lib tolerant to dropped messages. #14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,52 +14,53 @@ | |
|
|
||
| #include "mvec_lib/mvec_relay_socketcan.hpp" | ||
|
|
||
| #include <memory> | ||
| #include <mutex> | ||
| #include <utility> | ||
|
|
||
| namespace polymath::sygnal | ||
| { | ||
|
|
||
| MvecRelaySocketcan::MvecRelaySocketcan(std::shared_ptr<socketcan::SocketcanAdapter> socketcan_adapter) | ||
| : MvecRelaySocketcan(socketcan_adapter, MVEC_DEFAULT_RESPONSE_TIMEOUT) | ||
| {} | ||
|
|
||
| MvecRelaySocketcan::MvecRelaySocketcan( | ||
| std::shared_ptr<socketcan::SocketcanAdapter> socketcan_adapter, std::chrono::milliseconds response_timeout) | ||
| : socketcan_adapter_(socketcan_adapter) | ||
| , relay_impl_() | ||
| , response_timeout_(response_timeout) | ||
| {} | ||
|
|
||
| MvecMessageType MvecRelaySocketcan::parse(const socketcan::CanFrame & frame) | ||
| { | ||
| MvecMessageType message_type = relay_impl_.parseMessage(frame); | ||
|
|
||
| // Check if we received expected response types and fulfill promises | ||
| std::lock_guard<std::mutex> lock(promises_mutex_); | ||
|
|
||
| switch (message_type) { | ||
| case MvecMessageType::RELAY_QUERY_RESPONSE: { | ||
| const auto & reply = relay_impl_.get_last_relay_query_reply(); | ||
| if (reply.is_valid() && !query_reply_promises_.empty()) { | ||
| // Get the oldest waiting promise | ||
| auto promise = std::move(query_reply_promises_.front()); | ||
| query_reply_promises_.pop(); | ||
|
|
||
| // Fulfill the promise | ||
| promise.set_value(reply); | ||
| std::lock_guard<std::mutex> lock(query_mutex_); | ||
| if (reply.is_valid() && query_reply_promise_.has_value()) { | ||
| query_reply_promise_->set_value(std::make_optional(reply)); | ||
| query_reply_promise_.reset(); | ||
| } | ||
| break; | ||
| } | ||
| case MvecMessageType::RELAY_COMMAND_RESPONSE: { | ||
| const auto & reply = relay_impl_.get_last_relay_command_reply(); | ||
| if (reply.is_valid() && !command_reply_promises_.empty()) { | ||
| auto promise = std::move(command_reply_promises_.front()); | ||
| command_reply_promises_.pop(); | ||
| promise.set_value(reply); | ||
| std::lock_guard<std::mutex> lock(command_mutex_); | ||
| if (reply.is_valid() && command_reply_promise_.has_value()) { | ||
| command_reply_promise_->set_value(std::make_optional(reply)); | ||
| command_reply_promise_.reset(); | ||
| } | ||
| break; | ||
| } | ||
| case MvecMessageType::POPULATION_RESPONSE: { | ||
| const auto & reply = relay_impl_.get_last_population_reply(); | ||
| if (reply.is_valid() && !population_reply_promises_.empty()) { | ||
| auto promise = std::move(population_reply_promises_.front()); | ||
| population_reply_promises_.pop(); | ||
| promise.set_value(reply); | ||
| std::lock_guard<std::mutex> lock(population_mutex_); | ||
| if (reply.is_valid() && population_reply_promise_.has_value()) { | ||
| population_reply_promise_->set_value(std::make_optional(reply)); | ||
| population_reply_promise_.reset(); | ||
| } | ||
| break; | ||
| } | ||
|
|
@@ -82,67 +83,85 @@ void MvecRelaySocketcan::clear_relay() | |
| relay_impl_.clearRelayCommands(); | ||
| } | ||
|
|
||
| std::future<MvecRelayQueryReply> MvecRelaySocketcan::get_relay_state() | ||
| std::future<std::optional<MvecRelayQueryReply>> MvecRelaySocketcan::get_relay_state() | ||
| { | ||
| // Get the query message from the relay implementation | ||
| /// TODO: (zeerek) Set invalid for received message | ||
| auto query_frame = relay_impl_.getRelayQueryMessage(); | ||
| std::lock_guard<std::mutex> lock(query_mutex_); | ||
|
|
||
| // If a request is already in-flight, check if it has timed out | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this logic necessary? Do we actually care to reject or would we rather just resend? The other promise would just time out because we resent and the new one would be used. But since we only have 1 request in flight at a time, that should not matter? |
||
| if (query_reply_promise_.has_value()) { | ||
| auto elapsed = std::chrono::steady_clock::now() - query_send_time_; | ||
| if (elapsed < response_timeout_) { | ||
| // Still in-flight, reject this request | ||
| std::promise<std::optional<MvecRelayQueryReply>> rejected; | ||
| auto future = rejected.get_future(); | ||
| rejected.set_value(std::nullopt); | ||
| return future; | ||
| } | ||
| // Timed out — fulfill the old promise with nullopt so any waiter gets a clean result | ||
| query_reply_promise_->set_value(std::nullopt); | ||
| query_reply_promise_.reset(); | ||
| } | ||
|
|
||
| // Create a new promise and get its future | ||
| std::promise<MvecRelayQueryReply> promise; | ||
| std::promise<std::optional<MvecRelayQueryReply>> promise; | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why promise optional? That seems redundant? |
||
| auto future = promise.get_future(); | ||
| query_reply_promise_.emplace(std::move(promise)); | ||
| query_send_time_ = std::chrono::steady_clock::now(); | ||
|
|
||
| // Add promise to the queue with thread safety | ||
| { | ||
| std::lock_guard<std::mutex> lock(promises_mutex_); | ||
| query_reply_promises_.push(std::move(promise)); | ||
| } | ||
|
|
||
| // Transmit the query message via socketcan adapter | ||
| auto query_frame = relay_impl_.getRelayQueryMessage(); | ||
| socketcan_adapter_->send(query_frame); | ||
|
|
||
| return future; | ||
| } | ||
|
|
||
| std::future<MvecRelayCommandReply> MvecRelaySocketcan::send_relay_command() | ||
| std::future<std::optional<MvecRelayCommandReply>> MvecRelaySocketcan::send_relay_command() | ||
| { | ||
| // Get the command message from the relay implementation | ||
| /// TODO: (zeerek) Set invalid for received message | ||
| auto command_frame = relay_impl_.getRelayCommandMessage(); | ||
| std::lock_guard<std::mutex> lock(command_mutex_); | ||
|
|
||
| if (command_reply_promise_.has_value()) { | ||
| auto elapsed = std::chrono::steady_clock::now() - command_send_time_; | ||
| if (elapsed < response_timeout_) { | ||
| std::promise<std::optional<MvecRelayCommandReply>> rejected; | ||
| auto future = rejected.get_future(); | ||
| rejected.set_value(std::nullopt); | ||
| return future; | ||
| } | ||
| command_reply_promise_->set_value(std::nullopt); | ||
| command_reply_promise_.reset(); | ||
| } | ||
|
|
||
| // Create a new promise and get its future | ||
| std::promise<MvecRelayCommandReply> promise; | ||
| std::promise<std::optional<MvecRelayCommandReply>> promise; | ||
| auto future = promise.get_future(); | ||
| command_reply_promise_.emplace(std::move(promise)); | ||
| command_send_time_ = std::chrono::steady_clock::now(); | ||
|
|
||
| // Add promise to the queue with thread safety | ||
| { | ||
| std::lock_guard<std::mutex> lock(promises_mutex_); | ||
| command_reply_promises_.push(std::move(promise)); | ||
| } | ||
|
|
||
| // Transmit the command message via socketcan adapter | ||
| auto command_frame = relay_impl_.getRelayCommandMessage(); | ||
| socketcan_adapter_->send(command_frame); | ||
|
|
||
| return future; | ||
| } | ||
|
|
||
| std::future<MvecPopulationReply> MvecRelaySocketcan::get_relay_population() | ||
| std::future<std::optional<MvecPopulationReply>> MvecRelaySocketcan::get_relay_population() | ||
| { | ||
| // Get the population query message from the relay implementation | ||
| /// TODO: (zeerek) Set invalid for received message | ||
| auto population_frame = relay_impl_.getPopulationQueryMessage(); | ||
| std::lock_guard<std::mutex> lock(population_mutex_); | ||
|
|
||
| if (population_reply_promise_.has_value()) { | ||
| auto elapsed = std::chrono::steady_clock::now() - population_send_time_; | ||
| if (elapsed < response_timeout_) { | ||
| std::promise<std::optional<MvecPopulationReply>> rejected; | ||
| auto future = rejected.get_future(); | ||
| rejected.set_value(std::nullopt); | ||
| return future; | ||
| } | ||
| population_reply_promise_->set_value(std::nullopt); | ||
| population_reply_promise_.reset(); | ||
| } | ||
|
|
||
| // Create a new promise and get its future | ||
| std::promise<MvecPopulationReply> promise; | ||
| std::promise<std::optional<MvecPopulationReply>> promise; | ||
| auto future = promise.get_future(); | ||
| population_reply_promise_.emplace(std::move(promise)); | ||
| population_send_time_ = std::chrono::steady_clock::now(); | ||
|
|
||
| // Add promise to the queue with thread safety | ||
| { | ||
| std::lock_guard<std::mutex> lock(promises_mutex_); | ||
| population_reply_promises_.push(std::move(promise)); | ||
| } | ||
|
|
||
| // Transmit the population query message via socketcan adapter | ||
| auto population_frame = relay_impl_.getPopulationQueryMessage(); | ||
| socketcan_adapter_->send(population_frame); | ||
|
|
||
| return future; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know if you need optional here! Just don't fulfill the promise, the future will take care of not having a value!