diff --git a/nano/core_test/rate_limiting.cpp b/nano/core_test/rate_limiting.cpp index 0d547e41c0..b0debd70c7 100644 --- a/nano/core_test/rate_limiting.cpp +++ b/nano/core_test/rate_limiting.cpp @@ -24,7 +24,6 @@ TEST (rate, basic) // Allow time for the bucket to completely refill and do a full burst std::this_thread::sleep_for (1s); ASSERT_TRUE (bucket.try_consume (10)); - ASSERT_EQ (bucket.largest_burst (), 10); } TEST (rate, network) @@ -35,9 +34,7 @@ TEST (rate, network) // Initial burst of 10 mb/s over two calls ASSERT_TRUE (bucket.try_consume (5)); - ASSERT_EQ (bucket.largest_burst (), 5); ASSERT_TRUE (bucket.try_consume (5)); - ASSERT_EQ (bucket.largest_burst (), 10); ASSERT_FALSE (bucket.try_consume (5)); // After 200 ms, the 5 mb/s fillrate means we have 1 mb available @@ -84,13 +81,10 @@ TEST (rate, unlimited) { nano::rate::token_bucket bucket (0, 0); ASSERT_TRUE (bucket.try_consume (5)); - ASSERT_EQ (bucket.largest_burst (), 5); ASSERT_TRUE (bucket.try_consume (static_cast (1e9))); - ASSERT_EQ (bucket.largest_burst (), static_cast (1e9)); // With unlimited tokens, consuming always succeed ASSERT_TRUE (bucket.try_consume (static_cast (1e9))); - ASSERT_EQ (bucket.largest_burst (), static_cast (1e9)); } TEST (rate, busy_spin) diff --git a/nano/lib/rate_limiting.cpp b/nano/lib/rate_limiting.cpp index ced8828cd1..4f6fa00220 100644 --- a/nano/lib/rate_limiting.cpp +++ b/nano/lib/rate_limiting.cpp @@ -13,56 +13,53 @@ nano::rate::token_bucket::token_bucket (std::size_t max_token_count_a, std::size reset (max_token_count_a, refill_rate_a); } -bool nano::rate::token_bucket::try_consume (unsigned tokens_required_a) +bool nano::rate::token_bucket::try_consume (std::size_t tokens_required) { - debug_assert (tokens_required_a <= 1e9); + debug_assert (tokens_required <= unlimited_rate_sentinel); + refill (); - bool possible = current_size >= tokens_required_a; + + bool possible = current_size >= tokens_required; if (possible) { - current_size -= tokens_required_a; - } - else if (tokens_required_a == 1e9) - { - current_size = 0; + current_size -= tokens_required; } - // Keep track of smallest observed bucket size so burst size can be computed (for tests and stats) - smallest_size = std::min (smallest_size, current_size); - return possible || refill_rate == unlimited_rate_sentinel; } void nano::rate::token_bucket::refill () { - auto now (std::chrono::steady_clock::now ()); + auto now = std::chrono::steady_clock::now (); std::size_t tokens_to_add = static_cast (std::chrono::duration_cast (now - last_refill).count () / 1e9 * refill_rate); - // Only update if there are any tokens to add + // Only update if there are tokens to add if (tokens_to_add > 0) { current_size = std::min (current_size + tokens_to_add, max_token_count); - last_refill = std::chrono::steady_clock::now (); + last_refill = now; } } void nano::rate::token_bucket::reset (std::size_t max_token_count_a, std::size_t refill_rate_a) { - // A token count of 0 indicates unlimited capacity. We use 1e9 as - // a sentinel, allowing largest burst to still be computed. - if (max_token_count_a == 0 || refill_rate_a == 0) + // A token count of 0 indicates unlimited capacity. We use 1e9 as a sentinel, allowing largest burst to still be computed. + if (max_token_count_a == 0) { - refill_rate_a = max_token_count_a = unlimited_rate_sentinel; + // Unlimited capacity + max_token_count_a = unlimited_rate_sentinel; } - max_token_count = smallest_size = current_size = max_token_count_a; + if (refill_rate_a == 0) + { + // Unlimited rate + refill_rate_a = unlimited_rate_sentinel; + } + + max_token_count = max_token_count_a; refill_rate = refill_rate_a; + current_size = max_token_count < unlimited_rate_sentinel ? max_token_count : 0; last_refill = std::chrono::steady_clock::now (); } -std::size_t nano::rate::token_bucket::largest_burst () const -{ - return max_token_count - smallest_size; -} - std::size_t nano::rate::token_bucket::size () const { return current_size; diff --git a/nano/lib/rate_limiting.hpp b/nano/lib/rate_limiting.hpp index 3ba0f3b771..92039395c6 100644 --- a/nano/lib/rate_limiting.hpp +++ b/nano/lib/rate_limiting.hpp @@ -25,8 +25,8 @@ class token_bucket public: /** * Set up a token bucket. - * @param max_token_count Maximum number of tokens in this bucket, which limits bursts. - * @param refill_rate Token refill rate, which limits the long term rate (tokens per seconds) + * @param max_token_count Maximum number of tokens in this bucket, which limits bursts. 0 is unlimited. + * @param refill_rate Token refill rate, which limits the long term rate (tokens per seconds). 0 is unlimited (everything passes). */ token_bucket (std::size_t max_token_count, std::size_t refill_rate); @@ -36,25 +36,20 @@ class token_bucket * The default cost is 1 token, but resource intensive operations may request * more tokens to be available. */ - bool try_consume (unsigned tokens_required = 1); + bool try_consume (std::size_t tokens_required = 1); /** Update the max_token_count and/or refill_rate_a parameters */ void reset (std::size_t max_token_count, std::size_t refill_rate); - /** Returns the largest burst observed */ - std::size_t largest_burst () const; + /** Returns the current number of tokens in the bucket */ std::size_t size () const; -private: void refill (); private: - std::size_t max_token_count; - std::size_t refill_rate; - + std::size_t max_token_count{ 0 }; + std::size_t refill_rate{ 0 }; std::size_t current_size{ 0 }; - /** The minimum observed bucket size, from which the largest burst can be derived */ - std::size_t smallest_size{ 0 }; std::chrono::steady_clock::time_point last_refill; static std::size_t constexpr unlimited_rate_sentinel{ static_cast (1e9) }; @@ -70,8 +65,7 @@ class rate_limiter final rate_limiter (std::size_t limit, double burst_ratio = 1.0); bool should_pass (std::size_t buffer_size); - void reset (std::size_t limit, double burst_ratio = 1.0); - + void reset (std::size_t limit, double burst_ratio); std::size_t size () const; private: diff --git a/nano/lib/stats_enums.hpp b/nano/lib/stats_enums.hpp index b9ce858a17..441c4ba8a4 100644 --- a/nano/lib/stats_enums.hpp +++ b/nano/lib/stats_enums.hpp @@ -149,6 +149,7 @@ enum class detail sync, requeued, evicted, + rate_limited, // processing queue queue, diff --git a/nano/node/confirming_set.cpp b/nano/node/confirming_set.cpp index c9e79444ab..378414789f 100644 --- a/nano/node/confirming_set.cpp +++ b/nano/node/confirming_set.cpp @@ -16,6 +16,7 @@ nano::confirming_set::confirming_set (confirming_set_config const & config_a, na block_processor{ block_processor_a }, stats{ stats_a }, logger{ logger_a }, + limiter{ config.rate_limit, /* unlimited token bucket capacity */ 0 }, workers{ 1, nano::thread_role::name::confirmation_height_notifications } { batch_cemented.add ([this] (auto const & cemented) { @@ -134,7 +135,9 @@ void nano::confirming_set::run () } else { - condition.wait (lock, [&] () { return !set.empty () || stopped; }); + condition.wait (lock, [&] () { + return !set.empty () || stopped; + }); } } } @@ -243,11 +246,24 @@ void nano::confirming_set::run_batch (std::unique_lock & lock) { // Confirming this block may implicitly confirm more stats.add (nano::stat::type::confirming_set, nano::stat::detail::cemented, added.size ()); - for (auto & block : added) + for (auto const & block : added) { cemented.push_back ({ block, hash, election }); } cemented_count += added.size (); + + // Rate limit cementing + while (!limiter.should_pass (added.size ())) + { + stats.inc (nano::stat::type::confirming_set, nano::stat::detail::rate_limited); + transaction.commit (); + std::this_thread::sleep_for (100ms); + transaction.renew (); + if (stopped) + { + return; + } + } } else { @@ -336,6 +352,7 @@ nano::container_info nano::confirming_set::container_info () const nano::container_info info; info.put ("set", set); info.put ("deferred", deferred); + info.put ("limiter", limiter.size ()); info.add ("workers", workers.container_info ()); return info; } diff --git a/nano/node/confirming_set.hpp b/nano/node/confirming_set.hpp index 908b08c95e..90a5051eff 100644 --- a/nano/node/confirming_set.hpp +++ b/nano/node/confirming_set.hpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -41,6 +42,9 @@ class confirming_set_config final size_t max_deferred{ 16 * 1024 }; /** Max age of deferred blocks before they are dropped */ std::chrono::seconds deferred_age_cutoff{ 15min }; + + /** For bounded backlog testing */ + size_t rate_limit{ 0 }; }; /** @@ -120,6 +124,8 @@ class confirming_set final // Blocks that are being cemented in the current batch std::unordered_set current; + nano::rate_limiter limiter; + std::atomic stopped{ false }; mutable std::mutex mutex; std::condition_variable condition;