Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions relaxed_concurrent_fifo/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ if (CMAKE_VERSION VERSION_GREATER 3.12)
set_property(TARGET relaxed_concurrent_fifo PROPERTY CXX_STANDARD 20)
endif()

find_package(Threads REQUIRED)
find_package(TBB REQUIRED)

target_link_libraries(relaxed_concurrent_fifo PRIVATE Threads::Threads TBB::tbb)

if(MSVC)
target_compile_options(relaxed_concurrent_fifo PRIVATE /W3 /WX /bigobj)
else()
Expand Down
112 changes: 64 additions & 48 deletions relaxed_concurrent_fifo/benchmarks/benchmark_graph.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
#define BENCHMARK_GRAPH_HPP_INCLUDED

#include "benchmark_base.hpp"
#include "benchmark_graph.hpp"

#include <algorithm>
#include <optional>

#include "../contenders/multififo/ring_buffer.hpp"
Expand Down Expand Up @@ -80,20 +82,26 @@ struct benchmark_bfs : benchmark_timed<> {

template <typename FIFO>
void process_node(std::uint64_t node, typename FIFO::handle& handle, Counter& counter) {
std::uint64_t node_id = node & 0xffff'ffff;
std::uint32_t node_dist = node >> 32;
auto current_distance = distances[node_id].value.load(std::memory_order_relaxed);
if (node_dist > current_distance) {
++counter.ignored_nodes;
return;
}
for (auto i = graph.nodes[node_id]; i < graph.nodes[node_id + 1]; ++i) {
auto current_distance = distances[node].value.load(std::memory_order_relaxed);
do {
if ((current_distance & 0x1) == 0) {
// LSB is not set, node has already been processed with this
// distance
++counter.ignored_nodes;
return;
}
} while (!distances[node].value.compare_exchange_weak(
current_distance, current_distance & ~0x1,
std::memory_order_relaxed));
for (auto i = graph.nodes[node]; i < graph.nodes[node + 1]; ++i) {
auto target = graph.edges[i].target;
auto d = node_dist + 1;
auto old_d = distances[target].value.load(std::memory_order_relaxed);
auto d = current_distance + 2;
auto old_d =
distances[target].value.load(std::memory_order_relaxed);
while (d < old_d) {
if (distances[target].value.compare_exchange_weak(old_d, d, std::memory_order_relaxed)) {
if (!handle.push((static_cast<std::uint64_t>(d) << 32) | target)) {
if (distances[target].value.compare_exchange_weak(
old_d, d, std::memory_order_relaxed)) {
if (!handle.push(target + 1)) {
counter.err = true;
}
++counter.pushed_nodes;
Expand All @@ -108,36 +116,45 @@ struct benchmark_bfs : benchmark_timed<> {
void per_thread(int thread_index, typename FIFO::handle& handle, std::barrier<>& a) {
Counter counter;
if (thread_index == 0) {
// We can't push 0 to the queues!
distances[0].value = 1;
handle.push(1ull << 32);
// We can't push 0 to the queues!
handle.push(1);
++counter.pushed_nodes;
}
a.arrive_and_wait();
std::optional<std::uint64_t> node;
while (termination_detection.repeat([&]() {
node = handle.pop();
return node.has_value();
})) {
process_node<FIFO>(*node, handle, counter);
node = handle.pop();
return node.has_value();
})) {
process_node<FIFO>(*node - 1, handle, counter);
}
counters[thread_index] = counter;
}

template <typename T>
void output(T& stream) {
auto total_counts =
std::accumulate(counters.begin(), counters.end(), Counter{}, [](auto sum, auto const& counter) {
sum.pushed_nodes += counter.pushed_nodes;
sum.processed_nodes += counter.processed_nodes;
sum.ignored_nodes += counter.ignored_nodes;
sum.err |= counter.err;
return sum;
});
std::accumulate(counters.begin(), counters.end(), Counter{},
[](auto sum, auto const& counter) {
sum.pushed_nodes += counter.pushed_nodes;
sum.processed_nodes += counter.processed_nodes;
sum.ignored_nodes += counter.ignored_nodes;
sum.err |= counter.err;
return sum;
});

if (total_counts.err) {
std::cout << "Push failed!" << std::endl;
stream << "ERR_PUSH_FAIL";
stream << "ERR: Some nodes were not pushed\n";
return;
}

if (std::any_of(distances.begin(), distances.end(), [](auto const& d) {
auto dist = d.value.load(std::memory_order_relaxed);
return dist != std::numeric_limits<std::uint32_t>::max() &&
(dist & 0x1) == 1;
})) {
stream << "ERR: Some nodes were not processed\n";
return;
}

Expand All @@ -148,28 +165,27 @@ struct benchmark_bfs : benchmark_timed<> {
return;
}

for (std::size_t i = 0; i < info.distances.size(); i++) {
if (distances[i].value != info.distances[i]) {
std::cout << "Node " << i << " has distance " << distances[i].value << ", should be " << info.distances[i] << std::endl;
stream << "ERR_DIST_WRONG";
return;
}
}

auto longest_distance =
std::max_element(distances.begin(), distances.end(), [](auto const& a, auto const& b) {
auto a_val = a.value.load(std::memory_order_relaxed);
auto b_val = b.value.load(std::memory_order_relaxed);
if (b_val == std::numeric_limits<long long>::max()) {
return false;
}
if (a_val == std::numeric_limits<long long>::max()) {
return true;
}
return a_val < b_val;
})->value.load();

stream << time_nanos << ',' << longest_distance << ',' << total_counts.pushed_nodes << ',' << total_counts.processed_nodes << ',' << total_counts.ignored_nodes;
std::max_element(
distances.begin(), distances.end(),
[](auto const& a, auto const& b) {
auto a_val = a.value.load(std::memory_order_relaxed);
auto b_val = b.value.load(std::memory_order_relaxed);
if (b_val == std::numeric_limits<std::uint32_t>::max()) {
return false;
}
if (a_val == std::numeric_limits<std::uint32_t>::max()) {
return true;
}
return a_val < b_val;
})
->value.load() >>
1;

stream << time_nanos << ',' << longest_distance << ','
<< total_counts.pushed_nodes << ','
<< total_counts.processed_nodes << ','
<< total_counts.ignored_nodes;
}
};

Expand Down
Loading