diff --git a/mvec/mvec_lib/include/mvec_lib/mvec_relay_socketcan.hpp b/mvec/mvec_lib/include/mvec_lib/mvec_relay_socketcan.hpp index 8a07229..07914c9 100644 --- a/mvec/mvec_lib/include/mvec_lib/mvec_relay_socketcan.hpp +++ b/mvec/mvec_lib/include/mvec_lib/mvec_relay_socketcan.hpp @@ -15,10 +15,11 @@ #ifndef MVEC_LIB__MVEC_RELAY_SOCKETCAN_HPP_ #define MVEC_LIB__MVEC_RELAY_SOCKETCAN_HPP_ +#include #include #include #include -#include +#include #include "mvec_lib/mvec_relay.hpp" #include "socketcan_adapter/socketcan_adapter.hpp" @@ -31,10 +32,16 @@ namespace polymath::sygnal class MvecRelaySocketcan { public: - /// @brief Constructor + /// @brief Constructor with default 500ms response timeout /// @param socketcan_adapter Shared pointer to socketcan adapter for CAN communication explicit MvecRelaySocketcan(std::shared_ptr socketcan_adapter); + /// @brief Constructor with custom response timeout + /// @param socketcan_adapter Shared pointer to socketcan adapter for CAN communication + /// @param response_timeout Timeout for all MVEC response types + MvecRelaySocketcan( + std::shared_ptr socketcan_adapter, std::chrono::milliseconds response_timeout); + /// @brief Parse incoming CAN frame and fulfill waiting promises /// @param frame CAN frame to parse /// @return Message type that was parsed @@ -49,16 +56,16 @@ class MvecRelaySocketcan void clear_relay(); /// @brief Query current relay states asynchronously - /// @return Future that will contain relay query reply - std::future get_relay_state(); + /// @return Future containing reply, or nullopt on timeout or rejection due to pending requests. + std::future> get_relay_state(); /// @brief Send relay command and wait for confirmation - /// @return Future that will contain command reply - std::future send_relay_command(); + /// @return Future containing reply, or nullopt on timeout or rejection due to pending requests. + std::future> send_relay_command(); /// @brief Query device population (which relays/fuses are installed) - /// @return Future that will contain population reply - std::future get_relay_population(); + /// @return Future containing reply, or nullopt on timeout or rejection due to pending requests. + std::future> get_relay_population(); /// @brief Get last received fuse status message /// @return Optional containing fuse status if valid data available @@ -72,21 +79,32 @@ class MvecRelaySocketcan /// @return Optional containing error status if valid data available const std::optional get_last_error_status(); + /// @brief Default timeout for MVEC responses (500ms) + static constexpr std::chrono::milliseconds MVEC_DEFAULT_RESPONSE_TIMEOUT{500}; + private: /// @brief SocketCAN adapter for CAN communication std::shared_ptr socketcan_adapter_; /// @brief Core MVEC relay implementation MvecRelay relay_impl_; - /// @brief Queue of promises waiting for relay query responses - std::queue> query_reply_promises_; - /// @brief Queue of promises waiting for relay command responses - std::queue> command_reply_promises_; - /// @brief Queue of promises waiting for population query responses - std::queue> population_reply_promises_; + /// @brief Timeout for all MVEC response types + std::chrono::milliseconds response_timeout_; + + /// @brief Single-slot promise for relay query response + std::optional>> query_reply_promise_; + std::chrono::steady_clock::time_point query_send_time_; + std::mutex query_mutex_; + + /// @brief Single-slot promise for relay command response + std::optional>> command_reply_promise_; + std::chrono::steady_clock::time_point command_send_time_; + std::mutex command_mutex_; - /// @brief Mutex protecting promise queues for thread safety - std::mutex promises_mutex_; + /// @brief Single-slot promise for population query response + std::optional>> population_reply_promise_; + std::chrono::steady_clock::time_point population_send_time_; + std::mutex population_mutex_; }; } // namespace polymath::sygnal diff --git a/mvec/mvec_lib/src/mvec_relay_socketcan.cpp b/mvec/mvec_lib/src/mvec_relay_socketcan.cpp index 50bf22b..f694033 100644 --- a/mvec/mvec_lib/src/mvec_relay_socketcan.cpp +++ b/mvec/mvec_lib/src/mvec_relay_socketcan.cpp @@ -14,6 +14,7 @@ #include "mvec_lib/mvec_relay_socketcan.hpp" +#include #include #include @@ -21,45 +22,45 @@ namespace polymath::sygnal { MvecRelaySocketcan::MvecRelaySocketcan(std::shared_ptr socketcan_adapter) +: MvecRelaySocketcan(socketcan_adapter, MVEC_DEFAULT_RESPONSE_TIMEOUT) +{} + +MvecRelaySocketcan::MvecRelaySocketcan( + std::shared_ptr socketcan_adapter, std::chrono::milliseconds response_timeout) : socketcan_adapter_(socketcan_adapter) , relay_impl_() +, response_timeout_(response_timeout) {} MvecMessageType MvecRelaySocketcan::parse(const socketcan::CanFrame & frame) { MvecMessageType message_type = relay_impl_.parseMessage(frame); - // Check if we received expected response types and fulfill promises - std::lock_guard lock(promises_mutex_); - switch (message_type) { case MvecMessageType::RELAY_QUERY_RESPONSE: { const auto & reply = relay_impl_.get_last_relay_query_reply(); - if (reply.is_valid() && !query_reply_promises_.empty()) { - // Get the oldest waiting promise - auto promise = std::move(query_reply_promises_.front()); - query_reply_promises_.pop(); - - // Fulfill the promise - promise.set_value(reply); + std::lock_guard lock(query_mutex_); + if (reply.is_valid() && query_reply_promise_.has_value()) { + query_reply_promise_->set_value(std::make_optional(reply)); + query_reply_promise_.reset(); } break; } case MvecMessageType::RELAY_COMMAND_RESPONSE: { const auto & reply = relay_impl_.get_last_relay_command_reply(); - if (reply.is_valid() && !command_reply_promises_.empty()) { - auto promise = std::move(command_reply_promises_.front()); - command_reply_promises_.pop(); - promise.set_value(reply); + std::lock_guard lock(command_mutex_); + if (reply.is_valid() && command_reply_promise_.has_value()) { + command_reply_promise_->set_value(std::make_optional(reply)); + command_reply_promise_.reset(); } break; } case MvecMessageType::POPULATION_RESPONSE: { const auto & reply = relay_impl_.get_last_population_reply(); - if (reply.is_valid() && !population_reply_promises_.empty()) { - auto promise = std::move(population_reply_promises_.front()); - population_reply_promises_.pop(); - promise.set_value(reply); + std::lock_guard lock(population_mutex_); + if (reply.is_valid() && population_reply_promise_.has_value()) { + population_reply_promise_->set_value(std::make_optional(reply)); + population_reply_promise_.reset(); } break; } @@ -82,67 +83,85 @@ void MvecRelaySocketcan::clear_relay() relay_impl_.clearRelayCommands(); } -std::future MvecRelaySocketcan::get_relay_state() +std::future> MvecRelaySocketcan::get_relay_state() { - // Get the query message from the relay implementation - /// TODO: (zeerek) Set invalid for received message - auto query_frame = relay_impl_.getRelayQueryMessage(); + std::lock_guard lock(query_mutex_); + + // If a request is already in-flight, check if it has timed out + if (query_reply_promise_.has_value()) { + auto elapsed = std::chrono::steady_clock::now() - query_send_time_; + if (elapsed < response_timeout_) { + // Still in-flight, reject this request + std::promise> rejected; + auto future = rejected.get_future(); + rejected.set_value(std::nullopt); + return future; + } + // Timed out — fulfill the old promise with nullopt so any waiter gets a clean result + query_reply_promise_->set_value(std::nullopt); + query_reply_promise_.reset(); + } - // Create a new promise and get its future - std::promise promise; + std::promise> promise; auto future = promise.get_future(); + query_reply_promise_.emplace(std::move(promise)); + query_send_time_ = std::chrono::steady_clock::now(); - // Add promise to the queue with thread safety - { - std::lock_guard lock(promises_mutex_); - query_reply_promises_.push(std::move(promise)); - } - - // Transmit the query message via socketcan adapter + auto query_frame = relay_impl_.getRelayQueryMessage(); socketcan_adapter_->send(query_frame); return future; } -std::future MvecRelaySocketcan::send_relay_command() +std::future> MvecRelaySocketcan::send_relay_command() { - // Get the command message from the relay implementation - /// TODO: (zeerek) Set invalid for received message - auto command_frame = relay_impl_.getRelayCommandMessage(); + std::lock_guard lock(command_mutex_); + + if (command_reply_promise_.has_value()) { + auto elapsed = std::chrono::steady_clock::now() - command_send_time_; + if (elapsed < response_timeout_) { + std::promise> rejected; + auto future = rejected.get_future(); + rejected.set_value(std::nullopt); + return future; + } + command_reply_promise_->set_value(std::nullopt); + command_reply_promise_.reset(); + } - // Create a new promise and get its future - std::promise promise; + std::promise> promise; auto future = promise.get_future(); + command_reply_promise_.emplace(std::move(promise)); + command_send_time_ = std::chrono::steady_clock::now(); - // Add promise to the queue with thread safety - { - std::lock_guard lock(promises_mutex_); - command_reply_promises_.push(std::move(promise)); - } - - // Transmit the command message via socketcan adapter + auto command_frame = relay_impl_.getRelayCommandMessage(); socketcan_adapter_->send(command_frame); return future; } -std::future MvecRelaySocketcan::get_relay_population() +std::future> MvecRelaySocketcan::get_relay_population() { - // Get the population query message from the relay implementation - /// TODO: (zeerek) Set invalid for received message - auto population_frame = relay_impl_.getPopulationQueryMessage(); + std::lock_guard lock(population_mutex_); + + if (population_reply_promise_.has_value()) { + auto elapsed = std::chrono::steady_clock::now() - population_send_time_; + if (elapsed < response_timeout_) { + std::promise> rejected; + auto future = rejected.get_future(); + rejected.set_value(std::nullopt); + return future; + } + population_reply_promise_->set_value(std::nullopt); + population_reply_promise_.reset(); + } - // Create a new promise and get its future - std::promise promise; + std::promise> promise; auto future = promise.get_future(); + population_reply_promise_.emplace(std::move(promise)); + population_send_time_ = std::chrono::steady_clock::now(); - // Add promise to the queue with thread safety - { - std::lock_guard lock(promises_mutex_); - population_reply_promises_.push(std::move(promise)); - } - - // Transmit the population query message via socketcan adapter + auto population_frame = relay_impl_.getPopulationQueryMessage(); socketcan_adapter_->send(population_frame); return future; diff --git a/mvec/mvec_lib/test/mvec_socketcan_hardware.cpp b/mvec/mvec_lib/test/mvec_socketcan_hardware.cpp index 6468a3c..610ba38 100644 --- a/mvec/mvec_lib/test/mvec_socketcan_hardware.cpp +++ b/mvec/mvec_lib/test/mvec_socketcan_hardware.cpp @@ -58,7 +58,10 @@ TEST_CASE("MvecRelaySocketcan hardware integration test", "[hardware]") auto status = population_future.wait_for(std::chrono::seconds(5)); if (status == std::future_status::ready) { - auto population_reply = population_future.get(); + auto population_result = population_future.get(); + REQUIRE(population_result.has_value()); + + const auto & population_reply = population_result.value(); std::cout << "Population query successful! Valid: " << population_reply.is_valid() << std::endl; // Check that we got a valid response @@ -95,7 +98,10 @@ TEST_CASE("MvecRelaySocketcan hardware integration test", "[hardware]") auto status = relay_state_future.wait_for(std::chrono::seconds(5)); if (status == std::future_status::ready) { - auto relay_query_reply = relay_state_future.get(); + auto relay_query_result = relay_state_future.get(); + REQUIRE(relay_query_result.has_value()); + + const auto & relay_query_reply = relay_query_result.value(); std::cout << "Relay state query successful! Valid: " << relay_query_reply.is_valid() << std::endl; // Check that we got a valid response @@ -213,18 +219,20 @@ TEST_CASE("MvecRelaySocketcan hardware integration test", "[hardware]") REQUIRE(status == std::future_status::ready); - auto response = relay_command_response_future.get(); + auto response_result = relay_command_response_future.get(); + REQUIRE(response_result.has_value()); // 1 is no error - REQUIRE(response.get_success() == 1); + REQUIRE(response_result->get_success() == 1); std::cout << "Relay response message confirms success" << std::endl; auto mvec_query_future = mvec_socketcan->get_relay_state(); status = mvec_query_future.wait_for(std::chrono::seconds(5)); - auto relay_state = mvec_query_future.get(); + auto relay_state_result = mvec_query_future.get(); + REQUIRE(relay_state_result.has_value()); - REQUIRE(relay_state.get_relay_state(8) == 1); - REQUIRE(relay_state.get_relay_state(9) == 1); + REQUIRE(relay_state_result->get_relay_state(8) == 1); + REQUIRE(relay_state_result->get_relay_state(9) == 1); std::cout << "Relay states queried and confirm command" << std::endl; } diff --git a/mvec/mvec_ros2/src/mvec_node.cpp b/mvec/mvec_ros2/src/mvec_node.cpp index 22c4d97..e59f96e 100644 --- a/mvec/mvec_ros2/src/mvec_node.cpp +++ b/mvec/mvec_ros2/src/mvec_node.cpp @@ -165,8 +165,13 @@ void MvecNode::timerCallback() auto status = relay_state_future.wait_for(std::chrono::milliseconds(100)); if (status == std::future_status::ready) { - auto relay_query_reply = relay_state_future.get(); - if (relay_query_reply.is_valid()) { + auto relay_query_result = relay_state_future.get(); + if (!relay_query_result.has_value()) { + RCLCPP_DEBUG(get_logger(), "Relay state query returned nullopt (timed out or rejected due to pending request)"); + } else if (!relay_query_result->is_valid()) { + RCLCPP_DEBUG(get_logger(), "Invalid relay query response received"); + } else { + const auto & relay_query_reply = relay_query_result.value(); // Store current relay states mvec_msgs::msg::MvecFeedback feedback_msg; feedback_msg.header.stamp = get_clock()->now(); @@ -187,11 +192,9 @@ void MvecNode::timerCallback() addDefaultPresetIfNotPresent(defaults); current_relay_states_ = feedback_msg; RCLCPP_DEBUG(get_logger(), "Updated relay states from hardware"); - } else { - RCLCPP_DEBUG(get_logger(), "Invalid relay query response received"); } } else { - RCLCPP_DEBUG(get_logger(), "Relay state query timed out"); + RCLCPP_DEBUG(get_logger(), "Relay state query timed out waiting for future"); } // Publish diagnostics array @@ -244,21 +247,27 @@ std::optional MvecNode::set_single_relay(mvec_msgs::msg::Relay rela auto future = mvec_socketcan_->send_relay_command(); auto status = future.wait_for(timeout_ms_); - if (status == std::future_status::ready) { - auto command_reply = future.get(); - if (command_reply.get_success() == 1) { - RCLCPP_INFO(get_logger(), "Successfully set relay %d to state %s", relay.relay_id, relay.state ? "ON" : "OFF"); - return std::nullopt; // Success - } else { - std::string error_msg = "MVEC device rejected relay command"; - RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); - return error_msg; - } - } else { + if (status != std::future_status::ready) { std::string error_msg = "Timeout waiting for relay command response"; RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); return error_msg; } + + auto command_result = future.get(); + if (!command_result.has_value()) { + std::string error_msg = "Relay command returned no response (timeout or rejected)"; + RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); + return error_msg; + } + + if (command_result->get_success() == 1) { + RCLCPP_INFO(get_logger(), "Successfully set relay %d to state %s", relay.relay_id, relay.state ? "ON" : "OFF"); + return std::nullopt; // Success + } + + std::string error_msg = "MVEC device rejected relay command"; + RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); + return error_msg; } std::optional MvecNode::set_multi_relay(const std::vector & relays) @@ -286,21 +295,27 @@ std::optional MvecNode::set_multi_relay(const std::vectorsend_relay_command(); auto status = future.wait_for(timeout_ms_); - if (status == std::future_status::ready) { - auto command_reply = future.get(); - if (command_reply.get_success() == 1) { - RCLCPP_INFO(get_logger(), "Successfully set %zu relays", relays.size()); - return std::nullopt; // Success - } else { - std::string error_msg = "MVEC device rejected multi-relay command"; - RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); - return error_msg; - } - } else { + if (status != std::future_status::ready) { std::string error_msg = "Timeout waiting for multi-relay command response"; RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); return error_msg; } + + auto command_result = future.get(); + if (!command_result.has_value()) { + std::string error_msg = "Multi-relay command returned no response (timeout or rejected)"; + RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); + return error_msg; + } + + if (command_result->get_success() == 1) { + RCLCPP_INFO(get_logger(), "Successfully set %zu relays", relays.size()); + return std::nullopt; // Success + } + + std::string error_msg = "MVEC device rejected multi-relay command"; + RCLCPP_WARN(get_logger(), "%s", error_msg.c_str()); + return error_msg; } catch (const std::exception & e) { std::string error_msg = "Exception during multi-relay command: " + std::string(e.what()); RCLCPP_ERROR(get_logger(), "%s", error_msg.c_str());