From 47c09c6e017ab5efa867254bc9074a9b1cc09e43 Mon Sep 17 00:00:00 2001 From: s-morita <32567894+smorita-esol@users.noreply.github.com> Date: Fri, 17 Feb 2023 13:16:23 +0900 Subject: [PATCH 1/2] feat: Thread configuration prototype This is a prototype implementation of RCLCPP for discussion about the thread configuration feature to receive and apply a set of scheduling parameters for the threads controlled by the ROS 2 executor. Our basic idea is as below. 1. Implement a new class rclcpp::thread and modify rclcpp to use it. This class has the same function set as the std::thread but also additional features to control its thread attributions. 2. Modify the rcl layer to receive a set of scheduling parameters. The parameters are described in YAML format and passed via command line parameters, environment variables, or files. 3. the rclcpp reads the parameters from rcl and applies them to each thread in the thread pool. *1 *1. the current implementation uses them only for the Multithread Executor. --- rclcpp/CMakeLists.txt | 14 ++ .../executors/multi_threaded_executor.hpp | 13 ++ rclcpp/include/rclcpp/threads.hpp | 26 +++ .../rclcpp/threads/posix/linux/cpu_set.hpp | 119 +++++++++++ .../include/rclcpp/threads/posix/thread.hpp | 141 ++++++++++++ .../rclcpp/threads/posix/thread_attribute.hpp | 200 ++++++++++++++++++ .../rclcpp/threads/posix/thread_func.hpp | 102 +++++++++ .../rclcpp/threads/posix/thread_id.hpp | 141 ++++++++++++ rclcpp/include/rclcpp/threads/std/thread.hpp | 22 ++ .../include/rclcpp/threads/windows/thread.hpp | 22 ++ .../executors/multi_threaded_executor.cpp | 148 ++++++++++++- rclcpp/src/rclcpp/threads/posix_thread.cpp | 101 +++++++++ rclcpp/src/rclcpp/threads/std_thread.cpp | 19 ++ rclcpp/src/rclcpp/threads/windows_thread.cpp | 19 ++ 14 files changed, 1079 insertions(+), 8 deletions(-) create mode 100644 rclcpp/include/rclcpp/threads.hpp create mode 100644 rclcpp/include/rclcpp/threads/posix/linux/cpu_set.hpp create mode 100644 rclcpp/include/rclcpp/threads/posix/thread.hpp create mode 100644 rclcpp/include/rclcpp/threads/posix/thread_attribute.hpp create mode 100644 rclcpp/include/rclcpp/threads/posix/thread_func.hpp create mode 100644 rclcpp/include/rclcpp/threads/posix/thread_id.hpp create mode 100644 rclcpp/include/rclcpp/threads/std/thread.hpp create mode 100644 rclcpp/include/rclcpp/threads/windows/thread.hpp create mode 100644 rclcpp/src/rclcpp/threads/posix_thread.cpp create mode 100644 rclcpp/src/rclcpp/threads/std_thread.cpp create mode 100644 rclcpp/src/rclcpp/threads/windows_thread.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 3abd999f3d..1b5a88b9ae 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -110,6 +110,20 @@ set(${PROJECT_NAME}_SRCS src/rclcpp/waitable.cpp ) +if(${CMAKE_SYSTEM_NAME} MATCHES "Linux") + list(APPEND ${PROJECT_NAME}_SRCS + src/rclcpp/threads/posix_thread.cpp + ) +elseif(${CMAKE_SYSTEM_NAME} MATCHES "Windows") + list(APPEND ${PROJECT_NAME}_SRCS + src/rclcpp/threads/windows_thread.cpp + ) +else() + list(APPEND ${PROJECT_NAME}_SRCS + src/rclcpp/threads/std_thread.cpp + ) +endif() + find_package(Python3 REQUIRED COMPONENTS Interpreter) # "watch" template for changes diff --git a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp index 119013ebfb..f1b92b5fa7 100644 --- a/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/multi_threaded_executor.hpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include #include @@ -26,6 +27,7 @@ #include "rclcpp/macros.hpp" #include "rclcpp/memory_strategies.hpp" #include "rclcpp/visibility_control.hpp" +#include "rclcpp/threads.hpp" namespace rclcpp { @@ -79,12 +81,23 @@ class MultiThreadedExecutor : public rclcpp::Executor run(size_t this_thread_number); private: + void run_on_rclcpp_thread( + rclcpp::detail::ThreadAttribute & thread_attr, + std::vector & threads, + size_t thread_id); + + void run_on_this_thread( + rclcpp::detail::ThreadAttribute & thread_attr, + size_t thread_id); + RCLCPP_DISABLE_COPY(MultiThreadedExecutor) std::mutex wait_mutex_; size_t number_of_threads_; bool yield_before_execute_; std::chrono::nanoseconds next_exec_timeout_; + rcl_thread_attrs_t * thread_attributes_; + bool use_thread_attrs_; }; } // namespace executors diff --git a/rclcpp/include/rclcpp/threads.hpp b/rclcpp/include/rclcpp/threads.hpp new file mode 100644 index 0000000000..ac628de3e1 --- /dev/null +++ b/rclcpp/include/rclcpp/threads.hpp @@ -0,0 +1,26 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS_HPP_ +#define RCLCPP__THREADS_HPP_ + +#if defined(__linux__) +#include "rclcpp/threads/posix/thread.hpp" +#elif defined(_WIN32) +#include "rclcpp/threads/win32/thread.hpp" +#else +#include "rclcpp/threads/std/thread.hpp" +#endif + +#endif // RCLCPP__THREADS_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/linux/cpu_set.hpp b/rclcpp/include/rclcpp/threads/posix/linux/cpu_set.hpp new file mode 100644 index 0000000000..45bc2b090e --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/linux/cpu_set.hpp @@ -0,0 +1,119 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__LINUX__CPU_SET_HPP_ +#define RCLCPP__THREADS__POSIX__LINUX__CPU_SET_HPP_ + +#include +#include + +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +namespace detail +{ + +struct CpuSet +{ + using NativeCpuSetType = cpu_set_t; + CpuSet() + { + int processor_num = static_cast(sysconf(_SC_NPROCESSORS_ONLN)); + cpu_size_ = CPU_ALLOC_SIZE(processor_num); + cpu_set_ = CPU_ALLOC(cpu_size_); + CPU_ZERO_S(cpu_size_, cpu_set_); + } + explicit CpuSet(int num_cpu) + { + int processor_num = static_cast(sysconf(_SC_NPROCESSORS_ONLN)); + cpu_size_ = CPU_ALLOC_SIZE(processor_num); + cpu_set_ = CPU_ALLOC(cpu_size_); + CPU_ZERO_S(cpu_size_, cpu_set_); + CPU_SET_S(num_cpu, cpu_size_, cpu_set_); + } + CpuSet(const CpuSet & cpuset) + { + int processor_num = static_cast(sysconf(_SC_NPROCESSORS_ONLN)); + cpu_size_ = cpuset.get_cpu_size(); + cpu_set_ = CPU_ALLOC(cpu_size_); + CPU_ZERO_S(cpu_size_, cpu_set_); + for (int i = 0; i < processor_num; i++) { + if (CPU_ISSET_S(i, cpu_size_, cpuset.native_cpu_set())) { + CPU_SET_S(i, cpu_size_, cpu_set_); + } + } + } + CpuSet & operator=(CpuSet const & cpuset) + { + int processor_num = static_cast(sysconf(_SC_NPROCESSORS_ONLN)); + cpu_size_ = cpuset.get_cpu_size(); + cpu_set_ = CPU_ALLOC(cpu_size_); + CPU_ZERO_S(cpu_size_, cpu_set_); + for (int i = 0; i < processor_num; i++) { + if (CPU_ISSET_S(i, cpu_size_, cpuset.native_cpu_set())) { + CPU_SET_S(i, cpu_size_, cpu_set_); + } + } + return *this; + } + CpuSet(CpuSet &&) = delete; + CpuSet & operator=(CpuSet &&) = delete; + ~CpuSet() + { + CPU_FREE(cpu_set_); + cpu_size_ = 0; + } + void set(int cpu) + { + int processor_num = static_cast(sysconf(_SC_NPROCESSORS_ONLN)); + if (0 > cpu || processor_num <= cpu) { + auto ec = std::make_error_code(std::errc::invalid_argument); + throw std::system_error{ec, "cpu number is invaild"}; + } + CPU_SET_S(cpu, cpu_size_, cpu_set_); + } + void unset(int cpu) + { + int processor_num = static_cast(sysconf(_SC_NPROCESSORS_ONLN)); + if (0 > cpu || processor_num <= cpu) { + auto ec = std::make_error_code(std::errc::invalid_argument); + throw std::system_error{ec, "cpu number is invaild"}; + } + CPU_CLR_S(cpu, cpu_size_, cpu_set_); + } + void clear() + { + CPU_ZERO_S(cpu_size_, cpu_set_); + } + bool is_set(int cpu) + { + return CPU_ISSET_S(cpu, cpu_size_, cpu_set_); + } + size_t get_cpu_size() const + { + return cpu_size_; + } + NativeCpuSetType * native_cpu_set() const {return cpu_set_;} + +private: + NativeCpuSetType * cpu_set_; + size_t cpu_size_; +}; +} // namespace detail + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__POSIX__LINUX__CPU_SET_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread.hpp b/rclcpp/include/rclcpp/threads/posix/thread.hpp new file mode 100644 index 0000000000..fc786859d0 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/thread.hpp @@ -0,0 +1,141 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__THREAD_HPP_ +#define RCLCPP__THREADS__POSIX__THREAD_HPP_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/threads/posix/thread_attribute.hpp" +#include "rclcpp/threads/posix/thread_func.hpp" +#include "rclcpp/threads/posix/thread_id.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +struct Thread +{ + using NativeHandleType = pthread_t; + using Attribute = detail::ThreadAttribute; + using Id = detail::ThreadId; + + // Assume pthread_t is an invalid handle if it's 0 + Thread() noexcept + : handle_{} {} + Thread(Thread && other) + : handle_(other.handle_) {other.handle_ = NativeHandleType{};} + template, Attribute>::value>> + explicit Thread(F && f, Args && ... args) + : Thread(static_cast(nullptr), + make_thread_func(nullptr, std::forward(f), std::forward(args)...)) + {} + template + Thread(Attribute & attr, F && f, Args && ... args) + : Thread(&attr, make_thread_func(&attr, std::forward(f), std::forward(args)...)) + {} + Thread(Thread const &) = delete; + ~Thread() + { + if (handle_) {std::terminate();} + } + + Thread & operator=(Thread && other) noexcept + { + if (handle_) {std::terminate();} + swap(other); + return *this; + } + + Thread & operator=(Thread const &) = delete; + + void swap(Thread & other) + { + using std::swap; + swap(handle_, other.handle_); + } + + void join() + { + void * p; + int r = pthread_join(handle_, &p); + if (r != 0) {throw std::system_error(r, std::system_category(), "Error in pthread_join ");} + handle_ = NativeHandleType{}; + } + + bool joinable() const noexcept + { + return 0 == pthread_equal(handle_, NativeHandleType{}); + } + + void detach() + { + int r = pthread_detach(handle_); + if (r != 0) {throw std::system_error(r, std::system_category(), "Error in detach ");} + handle_ = NativeHandleType{}; + } + + NativeHandleType native_handle() const + { + return handle_; + } + + Id get_id() const noexcept + { + return Id{handle_}; + } + + static int hardware_concurrency() noexcept + { + return static_cast(sysconf(_SC_NPROCESSORS_ONLN)); + } + +private: + using ThreadFuncUniquePtr = detail::ThreadFuncUniquePtr; + + Thread(Attribute * attr, ThreadFuncUniquePtr func); + + template + static ThreadFuncUniquePtr make_thread_func(Attribute * attr, F && f, Args && ... args) + { + using detail::ThreadFunc; + using detail::ThreadFuncImplementation; + ThreadFunc * func = new ThreadFuncImplementation, std::decay_t...>( + attr, std::forward(f), std::forward(args)... + ); + + return ThreadFuncUniquePtr{func}; + } + + NativeHandleType handle_; +}; + +inline void swap(Thread & t1, Thread & t2) +{ + t1.swap(t2); +} + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__POSIX__THREAD_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread_attribute.hpp b/rclcpp/include/rclcpp/threads/posix/thread_attribute.hpp new file mode 100644 index 0000000000..bf812577e2 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/thread_attribute.hpp @@ -0,0 +1,200 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__THREAD_ATTRIBUTE_HPP_ +#define RCLCPP__THREADS__POSIX__THREAD_ATTRIBUTE_HPP_ + +#include + +#include "rcl_yaml_param_parser/types.h" +#include "rclcpp/visibility_control.hpp" + +#ifdef __linux__ +#include "rclcpp/threads/posix/linux/cpu_set.hpp" +#endif + +namespace rclcpp +{ + +namespace detail +{ + +struct ThreadAttribute +{ + ThreadAttribute() + { + NativeAttributeType attr_; + + int r = pthread_attr_init(&attr_); + if (r != 0) {throw std::system_error(r, std::system_category(), "Error in pthread_attr_init ");} + + size_t cpu_size_ = CPU_ALLOC_SIZE(static_cast(sysconf(_SC_NPROCESSORS_ONLN))); + r = pthread_attr_getaffinity_np(&attr_, cpu_size_, cpu_set_.native_cpu_set()); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_getaffinity_np "); + } + + r = pthread_attr_getschedpolicy(&attr_, &sched_policy_); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_getschedpolicy "); + } + + r = pthread_attr_getstacksize(&attr_, &stack_size_); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_getstacksize "); + } + + sched_param param; + r = pthread_attr_getschedparam(&attr_, ¶m); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_getschedparam "); + } + priority_ = param.sched_priority; + + int flag; + r = pthread_attr_getdetachstate(&attr_, &flag); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_getdetachstate "); + } + detached_flag_ = (flag == PTHREAD_CREATE_DETACHED); + pthread_attr_destroy(&attr_); + } + ~ThreadAttribute() + { + } + + ThreadAttribute(ThreadAttribute const &) = delete; + ThreadAttribute(ThreadAttribute &&) = delete; + ThreadAttribute & operator=(ThreadAttribute const &) = delete; + ThreadAttribute & operator=(ThreadAttribute &&) = delete; + + using NativeAttributeType = pthread_attr_t; + + ThreadAttribute & set_affinity(CpuSet & rclcpp_cs) + { + cpu_set_ = rclcpp_cs; + return *this; + } + CpuSet get_affinity() + { + return cpu_set_; + } + + ThreadAttribute & set_sched_policy(rcl_thread_scheduling_policy_type_t sp) + { + sched_policy_ = rcl_scheduling_policy_to_sched_policy(sp); + return *this; + } + int get_sched_policy() const + { + return sched_policy_; + } + + ThreadAttribute & set_stack_size(std::size_t sz) + { + stack_size_ = sz; + return *this; + } + std::size_t get_stack_size() const + { + return stack_size_; + } + + ThreadAttribute & set_priority(int prio) + { + priority_ = prio; + return *this; + } + int get_priority() const + { + return priority_; + } + + ThreadAttribute & set_run_as_detached(bool detach) + { + detached_flag_ = detach; + return *this; + } + + bool get_run_as_detached() const + { + return detached_flag_; + } + + void + set_thread_attributes( + rcl_thread_attr_t attributes) + { + rclcpp::detail::CpuSet cpu_set; + cpu_set.clear(); + cpu_set.set(attributes.core_affinity); + set_affinity(cpu_set); + set_sched_policy(attributes.scheduling_policy); + set_priority(attributes.priority); + } + +private: + CpuSet cpu_set_; + int sched_policy_; + std::size_t stack_size_; + int priority_; + bool detached_flag_; + + int rcl_scheduling_policy_to_sched_policy( + rcl_thread_scheduling_policy_type_t sched_policy) + { + switch (sched_policy) { +#ifdef SCHED_FIFO + case RCL_THREAD_SCHEDULING_POLICY_FIFO: + return SCHED_FIFO; +#endif +#ifdef SCHED_RR + case RCL_THREAD_SCHEDULING_POLICY_RR: + return SCHED_RR; +#endif +#ifdef SCHED_OTHER + case RCL_THREAD_SCHEDULING_POLICY_OTHER: + return SCHED_OTHER; +#endif +#ifdef SCHED_IDLE + case RCL_THREAD_SCHEDULING_POLICY_IDLE: + return SCHED_IDLE; +#endif +#ifdef SCHED_BATCH + case RCL_THREAD_SCHEDULING_POLICY_BATCH: + return SCHED_BATCH; +#endif +#ifdef SCHED_SPORADIC + case RCL_THREAD_SCHEDULING_POLICY_SPORADIC: + return SCHED_SPORADIC; +#endif + /* Todo: Necessity and setting method need to be considered + #ifdef SCHED_DEADLINE + case RCL_THREAD_SCHEDULING_POLICY_DEADLINE: + return SCHED_DEADLINE; + break; + #endif + */ + default: + throw std::runtime_error("Invalid scheduling policy"); + } + return -1; + } +}; + +} // namespace detail + +} // namespace rclcpp + +#endif // RCLCPP__THREADS__POSIX__THREAD_ATTRIBUTE_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread_func.hpp b/rclcpp/include/rclcpp/threads/posix/thread_func.hpp new file mode 100644 index 0000000000..5620c48532 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/thread_func.hpp @@ -0,0 +1,102 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__THREAD_FUNC_HPP_ +#define RCLCPP__THREADS__POSIX__THREAD_FUNC_HPP_ + +#include +#include +#include +#include +#include + +#include "rclcpp/threads/posix/thread_attribute.hpp" + +namespace rclcpp::detail +{ + +struct ThreadFunc +{ + virtual ~ThreadFunc() = default; + virtual void run() = 0; +}; + +template +struct ThreadFuncImplementation : ThreadFunc +{ + template + ThreadFuncImplementation(rclcpp::detail::ThreadAttribute * attr, G && g, As && ... as) + : func_(std::forward(g)), args_(std::forward(as)...) + { + if (nullptr != attr) { + sched_policy_ = attr->get_sched_policy(); + priority_ = attr->get_priority(); + } else { + sched_policy_ = -1; + priority_ = -1; + } + } + +private: + static constexpr bool is_memfun_ = std::is_member_function_pointer_v; + + void run() override + { + if (sched_policy_ != -1 && sched_policy_ != SCHED_FIFO && sched_policy_ != SCHED_RR) { + sched_param param; + param.sched_priority = priority_; + int r = pthread_setschedparam( + pthread_self(), + sched_policy_, + ¶m); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_setschedparam "); + } + } + run_impl(std::index_sequence_for{}); + } + + template> + void run_impl(std::index_sequence<0, Is...>) + { + (arg<0>().*std::move(func_))(arg()...); + } + template, + typename = void> + void run_impl(std::index_sequence) + { + std::move(func_)(arg()...); + } + + template + auto && arg() + { + return std::move(std::get(args_)); + } + + F func_; + std::tuple args_; + int sched_policy_; + int priority_; +}; + +using ThreadFuncUniquePtr = std::unique_ptr; + +} // namespace rclcpp::detail + +#endif // RCLCPP__THREADS__POSIX__THREAD_FUNC_HPP_ diff --git a/rclcpp/include/rclcpp/threads/posix/thread_id.hpp b/rclcpp/include/rclcpp/threads/posix/thread_id.hpp new file mode 100644 index 0000000000..4333930903 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/posix/thread_id.hpp @@ -0,0 +1,141 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__POSIX__THREAD_ID_HPP_ +#define RCLCPP__THREADS__POSIX__THREAD_ID_HPP_ + +#include + +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +struct Thread; + +namespace detail +{ + +namespace thread_id_ns +{ + +struct ThreadId; + +inline ThreadId get_id() noexcept; +inline bool operator==(ThreadId id1, ThreadId id2); +inline bool operator!=(ThreadId id1, ThreadId id2); +inline bool operator<(ThreadId id1, ThreadId id2); +inline bool operator>(ThreadId id1, ThreadId id2); +inline bool operator<=(ThreadId id1, ThreadId id2); +inline bool operator>=(ThreadId id1, ThreadId id2); +template +inline std::basic_ostream & operator<<( + std::basic_ostream &, + ThreadId); + +struct ThreadId +{ + ThreadId() = default; + ThreadId(ThreadId const &) = default; + ThreadId(ThreadId &&) = default; + ThreadId & operator=(ThreadId const &) = default; + ThreadId & operator=(ThreadId &&) = default; + + friend bool operator==(ThreadId id1, ThreadId id2) + { + return pthread_equal(id1.h, id2.h); + } + friend bool operator<(ThreadId id1, ThreadId id2) + { + return id1.h < id2.h; + } + template + friend std::basic_ostream & operator<<( + std::basic_ostream & ost, + ThreadId id) + { + return ost << id.h; + } + +private: + friend class rclcpp::Thread; + friend ThreadId get_id() noexcept; + friend struct std::hash; + explicit ThreadId(pthread_t h) + : h(h) {} + pthread_t h; +}; + +ThreadId get_id() noexcept +{ + return ThreadId{pthread_self()}; +} + +bool operator!=(ThreadId id1, ThreadId id2) +{ + return !(id1 == id2); +} + +bool operator>(ThreadId id1, ThreadId id2) +{ + return id2 < id1; +} + +bool operator<=(ThreadId id1, ThreadId id2) +{ + return !(id1 > id2); +} + +bool operator>=(ThreadId id1, ThreadId id2) +{ + return !(id1 < id2); +} + +} // namespace thread_id_ns + +using thread_id_ns::ThreadId; +using thread_id_ns::operator==; +using thread_id_ns::operator!=; +using thread_id_ns::operator<; // NOLINT +using thread_id_ns::operator>; // NOLINT +using thread_id_ns::operator<=; +using thread_id_ns::operator>=; +using thread_id_ns::operator<<; + +} // namespace detail + +namespace this_thread +{ + +using detail::thread_id_ns::get_id; + +} // namespace this_thread + +} // namespace rclcpp + +namespace std +{ + +template<> +struct hash +{ + std::size_t operator()(rclcpp::detail::thread_id_ns::ThreadId id) + { + return id.h; + } +}; + +} // namespace std + +#endif // RCLCPP__THREADS__POSIX__THREAD_ID_HPP_ diff --git a/rclcpp/include/rclcpp/threads/std/thread.hpp b/rclcpp/include/rclcpp/threads/std/thread.hpp new file mode 100644 index 0000000000..fed2dee903 --- /dev/null +++ b/rclcpp/include/rclcpp/threads/std/thread.hpp @@ -0,0 +1,22 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__STD__THREAD_HPP_ +#define RCLCPP__THREADS__STD__THREAD_HPP_ + +// Not implemented so far. +// The windows specific code will be implemented +// while discussing the scheduling parameter passing feature at Real-time WG. + +#endif // RCLCPP__THREADS__STD__THREAD_HPP_ diff --git a/rclcpp/include/rclcpp/threads/windows/thread.hpp b/rclcpp/include/rclcpp/threads/windows/thread.hpp new file mode 100644 index 0000000000..49e6134fcf --- /dev/null +++ b/rclcpp/include/rclcpp/threads/windows/thread.hpp @@ -0,0 +1,22 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__THREADS__WINDOWS__THREAD_HPP_ +#define RCLCPP__THREADS__WINDOWS__THREAD_HPP_ + +// Not implemented so far. +// The windows specific code will be implemented +// while discussing the scheduling parameter passing feature at Real-time WG. + +#endif // RCLCPP__THREADS__WINDOWS__THREAD_HPP_ diff --git a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp index 507d47f913..82c32702a7 100644 --- a/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/multi_threaded_executor.cpp @@ -33,7 +33,8 @@ MultiThreadedExecutor::MultiThreadedExecutor( std::chrono::nanoseconds next_exec_timeout) : rclcpp::Executor(options), yield_before_execute_(yield_before_execute), - next_exec_timeout_(next_exec_timeout) + next_exec_timeout_(next_exec_timeout), + use_thread_attrs_(false) { number_of_threads_ = number_of_threads > 0 ? number_of_threads : @@ -45,6 +46,41 @@ MultiThreadedExecutor::MultiThreadedExecutor( "MultiThreadedExecutor is used with a single thread.\n" "Use the SingleThreadedExecutor instead."); } + + thread_attributes_ = nullptr; + if (RCL_RET_OK == + rcl_arguments_get_thread_attrs( + &options.context->get_rcl_context()->global_arguments, + &thread_attributes_)) + { + if (nullptr != thread_attributes_) { + if (0 != thread_attributes_->num_attributes) { + use_thread_attrs_ = true; + } else { + thread_attributes_ = nullptr; + if (RCL_RET_OK == + rcl_context_get_thread_attrs( + options.context->get_rcl_context().get(), + &thread_attributes_)) + { + if (nullptr != thread_attributes_) { + if (0 != thread_attributes_->num_attributes) { + use_thread_attrs_ = true; + } + } + } + } + } + } + if (true == use_thread_attrs_ && + 0 != number_of_threads && + thread_attributes_->num_attributes != number_of_threads) + { + RCLCPP_WARN( + rclcpp::get_logger("rclcpp"), + "The number of thread attribute and number_of_threads is different.\n" + "MultiThreadedExecutor is used with the number of threads specified by thread attribute."); + } } MultiThreadedExecutor::~MultiThreadedExecutor() {} @@ -56,17 +92,35 @@ MultiThreadedExecutor::spin() throw std::runtime_error("spin() called while already spinning"); } RCPPUTILS_SCOPE_EXIT(this->spinning.store(false); ); - std::vector threads; + std::vector threads; size_t thread_id = 0; - { - std::lock_guard wait_lock{wait_mutex_}; - for (; thread_id < number_of_threads_ - 1; ++thread_id) { - auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); - threads.emplace_back(func); + + if (use_thread_attrs_) { + rclcpp::detail::ThreadAttribute thread_attr; + { + std::lock_guard wait_lock{wait_mutex_}; + for (; thread_id < thread_attributes_->num_attributes - 1; ++thread_id) { + thread_attr.set_thread_attributes( + thread_attributes_->attributes[thread_id]); + auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); + threads.emplace_back(rclcpp::Thread(thread_attr, func)); + } + thread_attr.set_thread_attributes( + thread_attributes_->attributes[thread_id]); + run_on_rclcpp_thread(thread_attr, threads, thread_id); + } + run_on_this_thread(thread_attr, thread_id); + } else { + { + std::lock_guard wait_lock{wait_mutex_}; + for (; thread_id < number_of_threads_ - 1; ++thread_id) { + auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); + threads.emplace_back(func); + } } + run(thread_id); } - run(thread_id); for (auto & thread : threads) { thread.join(); } @@ -104,3 +158,81 @@ MultiThreadedExecutor::run(size_t this_thread_number) any_exec.callback_group.reset(); } } + +#if defined(__linux__) +#include +void +MultiThreadedExecutor::run_on_rclcpp_thread( + rclcpp::detail::ThreadAttribute & thread_attr, + std::vector & threads, + size_t thread_id) +{ + (void)thread_attr; + (void)threads; + (void)thread_id; +} + +void +MultiThreadedExecutor::run_on_this_thread( + rclcpp::detail::ThreadAttribute & thread_attr, + size_t thread_id) +{ + int r; + r = sched_setaffinity(0, sizeof(cpu_set_t), thread_attr.get_affinity().native_cpu_set()); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in sched_setaffinity "); + } + sched_param param; + param.sched_priority = thread_attr.get_priority(); + int policy = thread_attr.get_sched_policy(); + r = sched_setscheduler( + 0, policy, + ¶m); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in sched_setscheduler "); + } + run(thread_id); +} + +#elif defined(_WIN32) +#include +void +MultiThreadedExecutor::run_on_rclcpp_thread( + rclcpp::detail::ThreadAttribute & thread_attr, + std::vector & threads, + size_t thread_id) +{ + (void)thread_attr; + (void)threads; + (void)thread_id; +} + +void +MultiThreadedExecutor::run_on_this_thread( + rclcpp::detail::ThreadAttribute & thread_attr, + size_t thread_id) +{ + // TO DO: Setting parent thread attribute on Windows + run(thread_id); +} +#else +void +MultiThreadedExecutor::run_on_rclcpp_thread( + rclcpp::detail::ThreadAttribute & thread_attr, + std::vector & threads, + size_t thread_id) +{ + auto func = std::bind(&MultiThreadedExecutor::run, this, thread_id); + threads.emplace_back(rclcpp::Thread(thread_attr, func)); +} + +void +MultiThreadedExecutor::run_on_this_thread( + rclcpp::detail::ThreadAttribute & thread_attr, + size_t thread_id) +{ + (void)thread_attr; + (void)thread_id; +} + +#endif diff --git a/rclcpp/src/rclcpp/threads/posix_thread.cpp b/rclcpp/src/rclcpp/threads/posix_thread.cpp new file mode 100644 index 0000000000..a52fda5f0a --- /dev/null +++ b/rclcpp/src/rclcpp/threads/posix_thread.cpp @@ -0,0 +1,101 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include "rclcpp/threads/posix/thread.hpp" + +static void * thread_main(void * p); + +namespace rclcpp +{ + +Thread::Thread(Attribute * attr, ThreadFuncUniquePtr func) +: handle_(NativeHandleType{}) +{ + Attribute::NativeAttributeType native_attr; + int r = pthread_attr_init(&native_attr); + if (r != 0) {throw std::system_error(r, std::system_category(), "Error in pthread_attr_init ");} + + if (attr != nullptr) { + r = pthread_attr_setinheritsched(&native_attr, PTHREAD_EXPLICIT_SCHED); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_setinheritsched "); + } + + rclcpp::detail::CpuSet affinity = attr->get_affinity(); + size_t cpu_size_ = CPU_ALLOC_SIZE(static_cast(sysconf(_SC_NPROCESSORS_ONLN))); + r = pthread_attr_setaffinity_np(&native_attr, cpu_size_, affinity.native_cpu_set()); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_setaffinity_np "); + } + + std::size_t stack_size = attr->get_stack_size(); + r = pthread_attr_setstacksize(&native_attr, stack_size); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_setstacksize "); + } + + int flag = attr->get_run_as_detached() ? PTHREAD_CREATE_DETACHED : PTHREAD_CREATE_JOINABLE; + r = pthread_attr_setdetachstate(&native_attr, flag); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_setdetachstate "); + } + + int sched_policy = attr->get_sched_policy(); + if (sched_policy == SCHED_FIFO || sched_policy == SCHED_RR) { + r = pthread_attr_setschedpolicy(&native_attr, sched_policy); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_setschedpolicy "); + } + + sched_param param; + param.sched_priority = attr->get_priority(); + r = pthread_attr_setschedparam(&native_attr, ¶m); + if (r != 0) { + throw std::system_error(r, std::system_category(), "Error in pthread_attr_setschedparam "); + } + } + } + + NativeHandleType h; + + r = pthread_create(&h, &native_attr, thread_main, func.get()); + if (r != 0) {throw std::system_error(r, std::system_category(), "Error in pthread_create ");} + + if (attr == nullptr || !attr->get_run_as_detached()) { + this->handle_ = h; + } + + pthread_attr_destroy(&native_attr); + + func.release(); +} + +} // namespace rclcpp + +static void * thread_main(void * p) +{ + using rclcpp::detail::ThreadFuncUniquePtr; + using rclcpp::detail::ThreadFunc; + + ThreadFuncUniquePtr func{reinterpret_cast(p)}; + try { + func->run(); + } catch (...) { + std::cerr << "failed to run thread" << std::endl; + std::terminate(); + } + + return nullptr; +} diff --git a/rclcpp/src/rclcpp/threads/std_thread.cpp b/rclcpp/src/rclcpp/threads/std_thread.cpp new file mode 100644 index 0000000000..725fcea997 --- /dev/null +++ b/rclcpp/src/rclcpp/threads/std_thread.cpp @@ -0,0 +1,19 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/threads/std/thread.hpp" + +// Not implemented so far. +// The windows specific code will be implemented +// while discussing the scheduling parameter passing feature at Real-time WG. diff --git a/rclcpp/src/rclcpp/threads/windows_thread.cpp b/rclcpp/src/rclcpp/threads/windows_thread.cpp new file mode 100644 index 0000000000..2f6ca50ec7 --- /dev/null +++ b/rclcpp/src/rclcpp/threads/windows_thread.cpp @@ -0,0 +1,19 @@ +// Copyright 2023 eSOL Co.,Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "rclcpp/threads/windows/thread.hpp" + +// Not implemented so far. +// The windows specific code will be implemented +// while discussing the scheduling parameter passing feature at Real-time WG. From 07551e6cf227f510de6201d90776bfb8a0d7151c Mon Sep 17 00:00:00 2001 From: s-morita <32567894+smorita-esol@users.noreply.github.com> Date: Thu, 9 Mar 2023 13:15:21 +0900 Subject: [PATCH 2/2] feat: added rclcpp::thread implementation based on std::tread as a fallback. For the environments where an environment-specific implementation is not prepared yet, we also provide a fallback implementation of rclcpp::thread. With this, the modified Multi-thread executor can be used as the existing one in such an environment. --- rclcpp/CMakeLists.txt | 4 - rclcpp/include/rclcpp/threads/std/thread.hpp | 101 ++++++++++++++++++- rclcpp/src/rclcpp/threads/std_thread.cpp | 19 ---- 3 files changed, 98 insertions(+), 26 deletions(-) delete mode 100644 rclcpp/src/rclcpp/threads/std_thread.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 1b5a88b9ae..c1e49dc3a1 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -118,10 +118,6 @@ elseif(${CMAKE_SYSTEM_NAME} MATCHES "Windows") list(APPEND ${PROJECT_NAME}_SRCS src/rclcpp/threads/windows_thread.cpp ) -else() - list(APPEND ${PROJECT_NAME}_SRCS - src/rclcpp/threads/std_thread.cpp - ) endif() find_package(Python3 REQUIRED COMPONENTS Interpreter) diff --git a/rclcpp/include/rclcpp/threads/std/thread.hpp b/rclcpp/include/rclcpp/threads/std/thread.hpp index fed2dee903..5b783be40a 100644 --- a/rclcpp/include/rclcpp/threads/std/thread.hpp +++ b/rclcpp/include/rclcpp/threads/std/thread.hpp @@ -15,8 +15,103 @@ #ifndef RCLCPP__THREADS__STD__THREAD_HPP_ #define RCLCPP__THREADS__STD__THREAD_HPP_ -// Not implemented so far. -// The windows specific code will be implemented -// while discussing the scheduling parameter passing feature at Real-time WG. +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "rclcpp/threads/std/thread_attribute.hpp" +#include "rclcpp/visibility_control.hpp" + +namespace rclcpp +{ + +struct Thread +{ + using NativeHandleType = std::thread::native_handle_type; + using Attribute = detail::ThreadAttribute; + using Id = std::thread::id; + + Thread() noexcept + : thread_{} {} + Thread(Thread &&) = default; + template, Attribute>::value>> + explicit Thread(F && f, Args && ... args) + : thread_(std::forward(f), std::forward(args)...) + {} + template + Thread(Attribute & attr, F && f, Args && ... args) + { + if (attr.set_value) { + throw std::runtime_error("std::thread can't set thread attribute"); + } + thread_ = std::thread(std::forward(f), std::forward(args)...); + } + Thread(Thread const &) = delete; + ~Thread() {} + + Thread & operator=(Thread && other) noexcept = default; + Thread & operator=(Thread const &) = delete; + + void swap(Thread & other) + { + using std::swap; + swap(thread_, other.thread_); + } + + void join() + { + thread_.join(); + } + + bool joinable() const noexcept + { + return thread_.joinable(); + } + + void detach() + { + thread_.detach(); + } + + NativeHandleType native_handle() + { + return thread_.native_handle(); + } + + Id get_id() const noexcept + { + return thread_.get_id(); + } + + static int hardware_concurrency() noexcept + { + return std::thread::hardware_concurrency(); + } + +private: + std::thread thread_; +}; + +inline void swap(Thread & t1, Thread & t2) +{ + t1.swap(t2); +} + +namespace this_thread +{ + +using std::this_thread::get_id; + +} // namespace this_thread + +} // namespace rclcpp #endif // RCLCPP__THREADS__STD__THREAD_HPP_ diff --git a/rclcpp/src/rclcpp/threads/std_thread.cpp b/rclcpp/src/rclcpp/threads/std_thread.cpp deleted file mode 100644 index 725fcea997..0000000000 --- a/rclcpp/src/rclcpp/threads/std_thread.cpp +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2023 eSOL Co.,Ltd. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "rclcpp/threads/std/thread.hpp" - -// Not implemented so far. -// The windows specific code will be implemented -// while discussing the scheduling parameter passing feature at Real-time WG.