From cd229bbcb819335909ec4f086adb7ff56abffd5d Mon Sep 17 00:00:00 2001 From: Marvin Williams Date: Fri, 4 Apr 2025 19:08:32 +0200 Subject: [PATCH 1/4] added bfs variant without dist in fifo --- relaxed_concurrent_fifo/CMakeLists.txt | 5 + relaxed_concurrent_fifo/benchmark.h | 1 + .../benchmarks/benchmark_bfs_no_dist.hpp | 149 ++++ relaxed_concurrent_fifo/main.cpp | 756 ++++++++++-------- 4 files changed, 573 insertions(+), 338 deletions(-) create mode 100644 relaxed_concurrent_fifo/benchmarks/benchmark_bfs_no_dist.hpp diff --git a/relaxed_concurrent_fifo/CMakeLists.txt b/relaxed_concurrent_fifo/CMakeLists.txt index 80cd336..92aabfa 100644 --- a/relaxed_concurrent_fifo/CMakeLists.txt +++ b/relaxed_concurrent_fifo/CMakeLists.txt @@ -17,6 +17,11 @@ if (CMAKE_VERSION VERSION_GREATER 3.12) set_property(TARGET relaxed_concurrent_fifo PROPERTY CXX_STANDARD 20) endif() +find_package(Threads REQUIRED) +find_package(TBB REQUIRED) + +target_link_libraries(relaxed_concurrent_fifo PRIVATE Threads::Threads TBB::tbb) + if(MSVC) target_compile_options(relaxed_concurrent_fifo PRIVATE /W3 /WX /bigobj) else() diff --git a/relaxed_concurrent_fifo/benchmark.h b/relaxed_concurrent_fifo/benchmark.h index 90fabf2..a1440f4 100644 --- a/relaxed_concurrent_fifo/benchmark.h +++ b/relaxed_concurrent_fifo/benchmark.h @@ -7,6 +7,7 @@ #include "benchmarks/benchmark_fill.hpp" #include "benchmarks/benchmark_prodcon.hpp" #include "benchmarks/benchmark_graph.hpp" +#include "benchmarks/benchmark_bfs_no_dist.hpp" #include "benchmarks/providers/benchmark_provider_generic.hpp" #include "benchmarks/providers/benchmark_provider_bbq.hpp" diff --git a/relaxed_concurrent_fifo/benchmarks/benchmark_bfs_no_dist.hpp b/relaxed_concurrent_fifo/benchmarks/benchmark_bfs_no_dist.hpp new file mode 100644 index 0000000..d60e97c --- /dev/null +++ b/relaxed_concurrent_fifo/benchmarks/benchmark_bfs_no_dist.hpp @@ -0,0 +1,149 @@ +#ifndef BENCHMARK_GRAPH_NO_DIST_HPP_INCLUDED +#define BENCHMARK_GRAPH_NO_DIST_HPP_INCLUDED + +#include "benchmark_base.hpp" +#include "benchmark_graph.hpp" + +#include +#include + +#include "../contenders/multififo/util/graph.hpp" +#include "../contenders/multififo/util/termination_detection.hpp" + +struct benchmark_bfs_no_dist : benchmark_timed<> { + struct Counter { + long long pushed_nodes{0}; + long long ignored_nodes{0}; + long long processed_nodes{0}; + bool err{false}; + }; + +#ifdef __GNUC__ +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Winterference-size" +#endif // __GNUC__ + struct alignas(std::hardware_destructive_interference_size) AtomicDistance { + std::atomic value{ + std::numeric_limits::max()}; + }; +#ifdef __GNUC__ +#pragma GCC diagnostic pop +#endif // __GNUC__ + + Graph* graph; + std::vector distances; + termination_detection::TerminationDetection termination_detection; + std::vector counters; + + benchmark_bfs_no_dist(const benchmark_info& info) + : graph(reinterpret_cast(info).graph), + distances(graph->num_nodes()), + termination_detection(info.num_threads), + counters(info.num_threads) {} + + template + void process_node(std::uint64_t node, typename FIFO::handle& handle, + Counter& counter) { + auto current_distance = + distances[node].value.load(std::memory_order_relaxed); + do { + if ((current_distance & 0x1) == 0) { + // LSB is not set, node has already been processed with this + // distance + ++counter.ignored_nodes; + return; + } + } while (!distances[node].value.compare_exchange_weak( + current_distance, current_distance & ~0x1, + std::memory_order_relaxed)); + for (auto i = graph->nodes[node]; i < graph->nodes[node + 1]; ++i) { + auto target = graph->edges[i].target; + auto d = current_distance + 2; + auto old_d = + distances[target].value.load(std::memory_order_relaxed); + while (d < old_d) { + if (distances[target].value.compare_exchange_weak( + old_d, d, std::memory_order_relaxed)) { + if (!handle.push(target + 1)) { + counter.err = true; + } + ++counter.pushed_nodes; + break; + } + } + } + ++counter.processed_nodes; + } + + template + void per_thread(int thread_index, typename FIFO::handle& handle, + std::barrier<>& a) { + Counter counter; + if (thread_index == 0) { + distances[0].value = 1; + // We can't push 0 to the queues! + handle.push(1); + ++counter.pushed_nodes; + } + a.arrive_and_wait(); + std::optional node; + while (termination_detection.repeat([&]() { + node = handle.pop(); + return node.has_value(); + })) { + process_node(*node - 1, handle, counter); + } + counters[thread_index] = counter; + } + + template + void output(T& stream) { + auto total_counts = + std::accumulate(counters.begin(), counters.end(), Counter{}, + [](auto sum, auto const& counter) { + sum.pushed_nodes += counter.pushed_nodes; + sum.processed_nodes += counter.processed_nodes; + sum.ignored_nodes += counter.ignored_nodes; + sum.err |= counter.err; + return sum; + }); + + if (total_counts.err) { + stream << "ERR: Some nodes were not pushed\n"; + return; + } + + if (std::any_of(distances.begin(), distances.end(), [](auto const& d) { + auto dist = d.value.load(std::memory_order_relaxed); + return dist != std::numeric_limits::max() && + (dist & 0x1) == 1; + })) { + stream << "ERR: Some nodes were not processed\n"; + return; + } + + auto longest_distance = + std::max_element( + distances.begin(), distances.end(), + [](auto const& a, auto const& b) { + auto a_val = a.value.load(std::memory_order_relaxed); + auto b_val = b.value.load(std::memory_order_relaxed); + if (b_val == std::numeric_limits::max()) { + return false; + } + if (a_val == std::numeric_limits::max()) { + return true; + } + return a_val < b_val; + }) + ->value.load() >> + 1; + + stream << time_nanos << ',' << longest_distance << ',' + << total_counts.pushed_nodes << ',' + << total_counts.processed_nodes << ',' + << total_counts.ignored_nodes; + } +}; + +#endif // BENCHMARK_GRAPH_HPP_INCLUDED diff --git a/relaxed_concurrent_fifo/main.cpp b/relaxed_concurrent_fifo/main.cpp index 62874b5..66ce7b2 100644 --- a/relaxed_concurrent_fifo/main.cpp +++ b/relaxed_concurrent_fifo/main.cpp @@ -1,389 +1,469 @@ #include "benchmark.h" #include "config.hpp" -#include "lock_fifo.h" #include "block_based_queue.h" #include "concurrent_fifo.h" +#include "lock_fifo.h" -#include "contenders/LCRQ/wrapper.h" #include "contenders/LCRQ/MichaelScottQueue.hpp" +#include "contenders/LCRQ/wrapper.h" -#include -#include -#include #include #include -#include +#include #include +#include +#include +#include static constexpr std::size_t make_po2(std::size_t size) { - std::size_t ret = 1; - while (size > ret) { - ret *= 2; - } - return ret; + std::size_t ret = 1; + while (size > ret) { + ret *= 2; + } + return ret; } -static std::pair sequential_bfs(const Graph& graph) { - multififo::RingBuffer nodes(make_po2(graph.num_nodes())); - std::vector distances(graph.num_nodes(), std::numeric_limits::max()); - distances[0] = 0; - - nodes.push(static_cast(graph.nodes[0])); - - auto now = std::chrono::steady_clock::now().time_since_epoch().count(); - while (!nodes.empty()) { - auto node_id = nodes.top(); - nodes.pop(); - auto d = distances[node_id] + 1; - for (auto i = graph.nodes[node_id]; i < graph.nodes[node_id + 1]; ++i) { - auto node_id = graph.edges[i].target; - if (distances[node_id] == std::numeric_limits::max()) { - distances[node_id] = d; - nodes.push(static_cast(node_id)); - } - } - } - auto end = std::chrono::steady_clock::now().time_since_epoch().count(); - return std::pair(end - now, *std::max_element(distances.begin(), distances.end())); +static std::pair sequential_bfs( + const Graph& graph) { + multififo::RingBuffer nodes(make_po2(graph.num_nodes())); + std::vector distances( + graph.num_nodes(), std::numeric_limits::max()); + distances[0] = 0; + + nodes.push(static_cast(graph.nodes[0])); + + auto now = std::chrono::steady_clock::now().time_since_epoch().count(); + while (!nodes.empty()) { + auto node_id = nodes.top(); + nodes.pop(); + auto d = distances[node_id] + 1; + for (auto i = graph.nodes[node_id]; i < graph.nodes[node_id + 1]; ++i) { + auto node_id = graph.edges[i].target; + if (distances[node_id] == + std::numeric_limits::max()) { + distances[node_id] = d; + nodes.push(static_cast(node_id)); + } + } + } + auto end = std::chrono::steady_clock::now().time_since_epoch().count(); + return std::pair(end - now, + *std::max_element(distances.begin(), distances.end())); } /*static constexpr int COUNT = 512; template