diff --git a/relaxed_concurrent_fifo/CMakeLists.txt b/relaxed_concurrent_fifo/CMakeLists.txt index 957e73e..34b659c 100644 --- a/relaxed_concurrent_fifo/CMakeLists.txt +++ b/relaxed_concurrent_fifo/CMakeLists.txt @@ -19,6 +19,11 @@ if (CMAKE_VERSION VERSION_GREATER 3.12) set_property(TARGET relaxed_concurrent_fifo PROPERTY CXX_STANDARD 20) endif() +find_package(Threads REQUIRED) +find_package(TBB REQUIRED) + +target_link_libraries(relaxed_concurrent_fifo PRIVATE Threads::Threads TBB::tbb) + if(MSVC) target_compile_options(relaxed_concurrent_fifo PRIVATE /W3 /WX /bigobj) else() diff --git a/relaxed_concurrent_fifo/benchmarks/benchmark_graph.hpp b/relaxed_concurrent_fifo/benchmarks/benchmark_graph.hpp index f6b5128..37c3847 100644 --- a/relaxed_concurrent_fifo/benchmarks/benchmark_graph.hpp +++ b/relaxed_concurrent_fifo/benchmarks/benchmark_graph.hpp @@ -2,7 +2,9 @@ #define BENCHMARK_GRAPH_HPP_INCLUDED #include "benchmark_base.hpp" +#include "benchmark_graph.hpp" +#include #include #include "../contenders/multififo/ring_buffer.hpp" @@ -80,20 +82,26 @@ struct benchmark_bfs : benchmark_timed<> { template void process_node(std::uint64_t node, typename FIFO::handle& handle, Counter& counter) { - std::uint64_t node_id = node & 0xffff'ffff; - std::uint32_t node_dist = node >> 32; - auto current_distance = distances[node_id].value.load(std::memory_order_relaxed); - if (node_dist > current_distance) { - ++counter.ignored_nodes; - return; - } - for (auto i = graph.nodes[node_id]; i < graph.nodes[node_id + 1]; ++i) { + auto current_distance = distances[node].value.load(std::memory_order_relaxed); + do { + if ((current_distance & 0x1) == 0) { + // LSB is not set, node has already been processed with this + // distance + ++counter.ignored_nodes; + return; + } + } while (!distances[node].value.compare_exchange_weak( + current_distance, current_distance & ~0x1, + std::memory_order_relaxed)); + for (auto i = graph.nodes[node]; i < graph.nodes[node + 1]; ++i) { auto target = graph.edges[i].target; - auto d = node_dist + 1; - auto old_d = distances[target].value.load(std::memory_order_relaxed); + auto d = current_distance + 2; + auto old_d = + distances[target].value.load(std::memory_order_relaxed); while (d < old_d) { - if (distances[target].value.compare_exchange_weak(old_d, d, std::memory_order_relaxed)) { - if (!handle.push((static_cast(d) << 32) | target)) { + if (distances[target].value.compare_exchange_weak( + old_d, d, std::memory_order_relaxed)) { + if (!handle.push(target + 1)) { counter.err = true; } ++counter.pushed_nodes; @@ -108,18 +116,18 @@ struct benchmark_bfs : benchmark_timed<> { void per_thread(int thread_index, typename FIFO::handle& handle, std::barrier<>& a) { Counter counter; if (thread_index == 0) { - // We can't push 0 to the queues! distances[0].value = 1; - handle.push(1ull << 32); + // We can't push 0 to the queues! + handle.push(1); ++counter.pushed_nodes; } a.arrive_and_wait(); std::optional node; while (termination_detection.repeat([&]() { - node = handle.pop(); - return node.has_value(); - })) { - process_node(*node, handle, counter); + node = handle.pop(); + return node.has_value(); + })) { + process_node(*node - 1, handle, counter); } counters[thread_index] = counter; } @@ -127,17 +135,26 @@ struct benchmark_bfs : benchmark_timed<> { template void output(T& stream) { auto total_counts = - std::accumulate(counters.begin(), counters.end(), Counter{}, [](auto sum, auto const& counter) { - sum.pushed_nodes += counter.pushed_nodes; - sum.processed_nodes += counter.processed_nodes; - sum.ignored_nodes += counter.ignored_nodes; - sum.err |= counter.err; - return sum; - }); + std::accumulate(counters.begin(), counters.end(), Counter{}, + [](auto sum, auto const& counter) { + sum.pushed_nodes += counter.pushed_nodes; + sum.processed_nodes += counter.processed_nodes; + sum.ignored_nodes += counter.ignored_nodes; + sum.err |= counter.err; + return sum; + }); if (total_counts.err) { - std::cout << "Push failed!" << std::endl; - stream << "ERR_PUSH_FAIL"; + stream << "ERR: Some nodes were not pushed\n"; + return; + } + + if (std::any_of(distances.begin(), distances.end(), [](auto const& d) { + auto dist = d.value.load(std::memory_order_relaxed); + return dist != std::numeric_limits::max() && + (dist & 0x1) == 1; + })) { + stream << "ERR: Some nodes were not processed\n"; return; } @@ -148,28 +165,27 @@ struct benchmark_bfs : benchmark_timed<> { return; } - for (std::size_t i = 0; i < info.distances.size(); i++) { - if (distances[i].value != info.distances[i]) { - std::cout << "Node " << i << " has distance " << distances[i].value << ", should be " << info.distances[i] << std::endl; - stream << "ERR_DIST_WRONG"; - return; - } - } - auto longest_distance = - std::max_element(distances.begin(), distances.end(), [](auto const& a, auto const& b) { - auto a_val = a.value.load(std::memory_order_relaxed); - auto b_val = b.value.load(std::memory_order_relaxed); - if (b_val == std::numeric_limits::max()) { - return false; - } - if (a_val == std::numeric_limits::max()) { - return true; - } - return a_val < b_val; - })->value.load(); - - stream << time_nanos << ',' << longest_distance << ',' << total_counts.pushed_nodes << ',' << total_counts.processed_nodes << ',' << total_counts.ignored_nodes; + std::max_element( + distances.begin(), distances.end(), + [](auto const& a, auto const& b) { + auto a_val = a.value.load(std::memory_order_relaxed); + auto b_val = b.value.load(std::memory_order_relaxed); + if (b_val == std::numeric_limits::max()) { + return false; + } + if (a_val == std::numeric_limits::max()) { + return true; + } + return a_val < b_val; + }) + ->value.load() >> + 1; + + stream << time_nanos << ',' << longest_distance << ',' + << total_counts.pushed_nodes << ',' + << total_counts.processed_nodes << ',' + << total_counts.ignored_nodes; } }; diff --git a/relaxed_concurrent_fifo/main.cpp b/relaxed_concurrent_fifo/main.cpp index ada5cc8..eed4d88 100644 --- a/relaxed_concurrent_fifo/main.cpp +++ b/relaxed_concurrent_fifo/main.cpp @@ -1,80 +1,81 @@ #include "benchmark.h" #include "config.hpp" -#include "lock_fifo.h" #include "block_based_queue.h" #include "concurrent_fifo.h" +#include "lock_fifo.h" -#include "contenders/LCRQ/wrapper.h" #include "contenders/LCRQ/MichaelScottQueue.hpp" +#include "contenders/LCRQ/wrapper.h" -#include -#include -#include #include #include -#include +#include #include +#include +#include +#include template void test_consistency(std::size_t fifo_size, std::size_t elements_per_thread, double prefill) { block_based_queue fifo{ THREAD_COUNT, fifo_size, BLOCK_MULTIPLIER, 7 }; auto handle = fifo.get_handle(); - std::size_t pre_push = static_cast(fifo_size * prefill); - std::unordered_multiset test_ints; - for (std::size_t index = 0; index < pre_push; index++) { - auto i = index | (1ull << 63); - handle.push(i); - test_ints.emplace(i); - } + std::size_t pre_push = static_cast(fifo_size * prefill); + std::unordered_multiset test_ints; + for (std::size_t index = 0; index < pre_push; index++) { + auto i = index | (1ull << 63); + handle.push(i); + test_ints.emplace(i); + } - std::barrier a{ (ptrdiff_t)(THREAD_COUNT + 1) }; - std::vector threads(THREAD_COUNT); - std::vector> test(THREAD_COUNT); - std::vector> popped(THREAD_COUNT); - for (std::size_t i = 0; i < THREAD_COUNT; i++) { - threads[i] = std::jthread([&, i]() { - auto handle = fifo.get_handle(); - a.arrive_and_wait(); - for (std::uint64_t j = 0; j < elements_per_thread; j++) { - auto val = (i << 32) | (j + 1); - test[i].push_back(val); - while (!handle.push(val)) {} - std::optional pop; - do { - pop = handle.pop(); - } while (!pop.has_value()); - popped[i].push_back(pop.value()); - } - }); - } - a.arrive_and_wait(); - for (auto& thread : threads) { - thread.join(); - } + std::barrier a{(ptrdiff_t)(THREAD_COUNT + 1)}; + std::vector threads(THREAD_COUNT); + std::vector> test(THREAD_COUNT); + std::vector> popped(THREAD_COUNT); + for (std::size_t i = 0; i < THREAD_COUNT; i++) { + threads[i] = std::jthread([&, i]() { + auto handle = fifo.get_handle(); + a.arrive_and_wait(); + for (std::uint64_t j = 0; j < elements_per_thread; j++) { + auto val = (i << 32) | (j + 1); + test[i].push_back(val); + while (!handle.push(val)) { + } + std::optional pop; + do { + pop = handle.pop(); + } while (!pop.has_value()); + popped[i].push_back(pop.value()); + } + }); + } + a.arrive_and_wait(); + for (auto& thread : threads) { + thread.join(); + } - std::unordered_multiset popped_ints; - for (std::size_t index = 0; index < pre_push; index++) { - popped_ints.emplace(handle.pop().value()); - } + std::unordered_multiset popped_ints; + for (std::size_t index = 0; index < pre_push; index++) { + popped_ints.emplace(handle.pop().value()); + } - for (std::size_t i = 0; i < THREAD_COUNT; i++) { - for (auto i : popped[i]) { - popped_ints.emplace(i); - } - for (auto i : test[i]) { - test_ints.emplace(i); - } - } + for (std::size_t i = 0; i < THREAD_COUNT; i++) { + for (auto i : popped[i]) { + popped_ints.emplace(i); + } + for (auto i : test[i]) { + test_ints.emplace(i); + } + } - if (handle.pop().has_value()) { - throw std::runtime_error("Invalid element left!"); - } + if (handle.pop().has_value()) { + throw std::runtime_error("Invalid element left!"); + } - if (popped_ints != test_ints) { - throw std::runtime_error("Sets did not match!"); - } + if (popped_ints != test_ints) { + throw std::runtime_error("Sets did not match!"); + } } template @@ -82,82 +83,86 @@ void run_benchmark(const std::string& test_name, const std::vector& processor_counts, int test_iterations, int test_time_seconds, const Args&... args) { constexpr const char* format = "fifo-{}-{}-{:%FT%H-%M-%S}.csv"; - if (BENCHMARK::HAS_TIMEOUT) { - std::cout << "Expected running time: "; - auto running_time_seconds = test_iterations * test_time_seconds * processor_counts.size() * instances.size(); - if (running_time_seconds >= 60) { - auto running_time_minutes = running_time_seconds / 60; - running_time_seconds %= 60; - if (running_time_minutes >= 60) { - auto running_time_hours = running_time_minutes / 60; - running_time_minutes %= 60; - if (running_time_hours >= 24) { - auto running_time_days = running_time_hours / 24; - running_time_hours %= 24; - std::cout << running_time_days << " days, "; - } - std::cout << running_time_hours << " hours, "; - } - std::cout << running_time_minutes << " minutes, "; - } - std::cout << running_time_seconds << " seconds" << std::endl; - } + if (BENCHMARK::HAS_TIMEOUT) { + std::cout << "Expected running time: "; + auto running_time_seconds = test_iterations * test_time_seconds * + processor_counts.size() * instances.size(); + if (running_time_seconds >= 60) { + auto running_time_minutes = running_time_seconds / 60; + running_time_seconds %= 60; + if (running_time_minutes >= 60) { + auto running_time_hours = running_time_minutes / 60; + running_time_minutes %= 60; + if (running_time_hours >= 24) { + auto running_time_days = running_time_hours / 24; + running_time_hours %= 24; + std::cout << running_time_days << " days, "; + } + std::cout << running_time_hours << " hours, "; + } + std::cout << running_time_minutes << " minutes, "; + } + std::cout << running_time_seconds << " seconds" << std::endl; + } - std::string filename = std::format(format, test_name, prefill, std::chrono::round(std::chrono::file_clock::now())); - std::ofstream file{ filename }; - for (auto i : std::views::iota(0, test_iterations)) { - std::cout << "Test run " << (i + 1) << " of " << test_iterations << std::endl; - for (const auto& imp : instances) { - std::cout << "Testing " << imp->get_name() << std::endl; - for (auto threads : processor_counts) { - std::cout << "With " << threads << " processors" << std::endl; - file << imp->get_name() << "," << threads << ','; - BENCHMARK_DATA_TYPE data{threads, test_time_seconds, args...}; - imp->test(data, prefill).output(file); - file << '\n'; - } - } - } - std::cout << "Results written to " << filename << std::endl; + std::string filename = std::format(format, test_name, prefill, + std::chrono::round( + std::chrono::file_clock::now())); + std::ofstream file{filename}; + for (auto i : std::views::iota(0, test_iterations)) { + std::cout << "Test run " << (i + 1) << " of " << test_iterations + << std::endl; + for (const auto& imp : instances) { + std::cout << "Testing " << imp->get_name() << std::endl; + for (auto threads : processor_counts) { + std::cout << "With " << threads << " processors" << std::endl; + file << imp->get_name() << "," << threads << ','; + BENCHMARK_DATA_TYPE data{threads, test_time_seconds, args...}; + imp->test(data, prefill).output(file); + file << '\n'; + } + } + } + std::cout << "Results written to " << filename << std::endl; } int main(int argc, const char** argv) { #ifndef NDEBUG - std::cout << "Running in debug mode!" << std::endl; -#endif // NDEBUG + std::cout << "Running in debug mode!" << std::endl; +#endif // NDEBUG - //test_consistency<8, 16>(20000, 200000, 0); + // test_consistency<8, 16>(20000, 200000, 0); constexpr int TEST_ITERATIONS_DEFAULT = 2; constexpr int TEST_TIME_SECONDS_DEFAULT = 5; - int input; - std::vector seglist; - std::vector argv2; - if (argc <= 1) { - std::cout << "Which experiment to run? \n" - "[1] Performance\n" - "[2] Quality\n" - "[3] Quality distribution\n" - "[4] Fill\n" - "[5] Empty\n" - "[6] Producer-Consumer\n" - "[7] BFS\n" - "Input: "; - std::string input_str; - getline(std::cin, input_str); - std::stringstream strstr{ input_str }; - std::string temp; - while (std::getline(strstr, temp, ' ')) { - seglist.push_back(temp); - } - argv2.resize(seglist.size() + 1); - for (std::size_t i = 0; i < seglist.size(); i++) { - argv2[i + 1] = seglist[i].c_str(); - } - argc = static_cast(argv2.size()); - argv = argv2.data(); - } + int input; + std::vector seglist; + std::vector argv2; + if (argc <= 1) { + std::cout << "Which experiment to run? \n" + "[1] Performance\n" + "[2] Quality\n" + "[3] Quality distribution\n" + "[4] Fill\n" + "[5] Empty\n" + "[6] Producer-Consumer\n" + "[7] BFS\n" + "Input: "; + std::string input_str; + getline(std::cin, input_str); + std::stringstream strstr{input_str}; + std::string temp; + while (std::getline(strstr, temp, ' ')) { + seglist.push_back(temp); + } + argv2.resize(seglist.size() + 1); + for (std::size_t i = 0; i < seglist.size(); i++) { + argv2[i + 1] = seglist[i].c_str(); + } + argc = static_cast(argv2.size()); + argv = argv2.data(); + } if (strcmp(argv[1], "-h") == 0 || strcmp(argv[1], "--help") == 0) { std::cout << "Usage: " << argv[0] << " ? [-h | --help] " @@ -170,7 +175,7 @@ int main(int argc, const char** argv) { return 0; } - input = std::strtol(argv[1], nullptr, 10); + input = std::strtol(argv[1], nullptr, 10); std::vector processor_counts; if (input == 6) { @@ -301,5 +306,5 @@ int main(int argc, const char** argv) { } break; } - return 0; + return 0; }