From c29778d96bbc52cdfe0b90a1b84bd1441a4f9cc5 Mon Sep 17 00:00:00 2001 From: Cole Eason Date: Thu, 30 Oct 2025 15:54:35 -0400 Subject: [PATCH 01/10] metrics: add standalone Metric model and BedrockMetricPlugin base with non-blocking queue --- BedrockMetric.h | 25 +++++++++ BedrockMetricPlugin.cpp | 116 ++++++++++++++++++++++++++++++++++++++++ BedrockMetricPlugin.h | 52 ++++++++++++++++++ 3 files changed, 193 insertions(+) create mode 100644 BedrockMetric.h create mode 100644 BedrockMetricPlugin.cpp create mode 100644 BedrockMetricPlugin.h diff --git a/BedrockMetric.h b/BedrockMetric.h new file mode 100644 index 000000000..2cc62cd93 --- /dev/null +++ b/BedrockMetric.h @@ -0,0 +1,25 @@ +#pragma once +#include +#include +#include +#include + +enum class MetricType { + Counter, + Gauge, + Histogram, + Timing, + Set, + Distribution +}; + +struct Metric { + std::string name; + MetricType type{MetricType::Counter}; + double value{0.0}; + std::vector> tags; + uint64_t timestampUnixMs{0}; + double sampleRate{1.0}; +}; + + diff --git a/BedrockMetricPlugin.cpp b/BedrockMetricPlugin.cpp new file mode 100644 index 000000000..9cc64f7d1 --- /dev/null +++ b/BedrockMetricPlugin.cpp @@ -0,0 +1,116 @@ +#include "BedrockMetricPlugin.h" +#include "libstuff/libstuff.h" + +std::map> BedrockMetricPlugin::g_registeredMetricPluginList; + +BedrockMetricPlugin::BedrockMetricPlugin(const SData& args, size_t maxQueueSize) + : _args(args), _maxQueueSize(maxQueueSize) +{ +} + +BedrockMetricPlugin::~BedrockMetricPlugin() +{ + stop(); +} + +bool BedrockMetricPlugin::enqueue(Metric metric) +{ + if (_stopping.load()) { + return false; + } + + { + std::unique_lock lock(_mutex); + if (_queue.size() >= _maxQueueSize) { + _dropped.fetch_add(1, std::memory_order_relaxed); + return false; + } + _queue.emplace_back(std::move(metric)); + } + _cv.notify_one(); + return true; +} + +size_t BedrockMetricPlugin::queueSize() const +{ + std::unique_lock lock(_mutex); + return _queue.size(); +} + +uint64_t BedrockMetricPlugin::droppedCount() const +{ + return _dropped.load(std::memory_order_relaxed); +} + +void BedrockMetricPlugin::stop() +{ + bool expected = false; + if (_stopping.compare_exchange_strong(expected, true)) { + _cv.notify_all(); + } +} + +bool BedrockMetricPlugin::isStopping() const +{ + return _stopping.load(); +} + +bool BedrockMetricPlugin::tryDequeue(Metric& out) +{ + std::unique_lock lock(_mutex); + if (_queue.empty()) { + return false; + } + out = std::move(_queue.front()); + _queue.pop_front(); + return true; +} + +bool BedrockMetricPlugin::waitDequeue(Metric& out, uint64_t timeoutMs) +{ + std::unique_lock lock(_mutex); + if (_queue.empty()) { + if (_stopping.load()) { + return false; + } + _cv.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&]{ return _stopping.load() || !_queue.empty(); }); + } + if (_queue.empty()) { + return false; + } + out = std::move(_queue.front()); + _queue.pop_front(); + return true; +} + +std::vector BedrockMetricPlugin::drainUpTo(size_t maxItems) +{ + std::vector batch; + batch.reserve(maxItems); + std::unique_lock lock(_mutex); + while (!_queue.empty() && batch.size() < maxItems) { + batch.emplace_back(std::move(_queue.front())); + _queue.pop_front(); + } + return batch; +} + +std::vector BedrockMetricPlugin::waitAndDrain(size_t maxItems, uint64_t maxWaitMs) +{ + std::unique_lock lock(_mutex); + if (_queue.empty()) { + if (_stopping.load()) { + return {}; + } + _cv.wait_for(lock, std::chrono::milliseconds(maxWaitMs), [&]{ return _stopping.load() || !_queue.empty(); }); + } + std::vector batch; + batch.reserve(maxItems); + while (!_queue.empty() && batch.size() < maxItems) { + batch.emplace_back(std::move(_queue.front())); + _queue.pop_front(); + } + return batch; +} + + diff --git a/BedrockMetricPlugin.h b/BedrockMetricPlugin.h new file mode 100644 index 000000000..0944e01df --- /dev/null +++ b/BedrockMetricPlugin.h @@ -0,0 +1,52 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "libstuff/SData.h" +#include "BedrockMetric.h" + +class BedrockMetricPlugin { + public: + static std::map> g_registeredMetricPluginList; + + explicit BedrockMetricPlugin(const SData& args, size_t maxQueueSize = 100000); + virtual ~BedrockMetricPlugin(); + + virtual const std::string& getName() const = 0; + + // Non-blocking enqueue. Returns false if dropped due to full queue or stopping. + bool enqueue(Metric metric); + + // Observability + size_t queueSize() const; + uint64_t droppedCount() const; + + // Lifecycle + void stop(); + bool isStopping() const; + + protected: + // For implementers: dequeue helpers to build network loop(s) + bool tryDequeue(Metric& out); + bool waitDequeue(Metric& out, uint64_t timeoutMs); + std::vector drainUpTo(size_t maxItems); + std::vector waitAndDrain(size_t maxItems, uint64_t maxWaitMs); + + const SData& _args; + + private: + mutable std::mutex _mutex; + std::condition_variable _cv; + std::deque _queue; + std::atomic _stopping{false}; + const size_t _maxQueueSize; + std::atomic _dropped{0}; +}; + + From a53408c90cdd8f52f71bbfa31013867ee6baca9b Mon Sep 17 00:00:00 2001 From: Cole Eason Date: Thu, 30 Oct 2025 15:54:40 -0400 Subject: [PATCH 02/10] metrics: add -metricPlugins flag, loader function, and help text --- main.cpp | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/main.cpp b/main.cpp index b791773d2..f6c84469c 100644 --- a/main.cpp +++ b/main.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -47,6 +48,51 @@ void RetrySystem(const string& command) { SERROR("Failed to run '" << command << "', aborting."); } +set loadMetricPlugins(SData& args) { + list plugins = SParseList(args["-metricPlugins"]); + + set postProcessedNames; + + // Register built-in metric plugins here if any (none by default) + // Example: BedrockMetricPlugin::g_registeredMetricPluginList.emplace(make_pair("STATSD", [](const SData& a){return new StatsDMetricPlugin(a);})); + + for (string pluginName : plugins) { + if (BedrockMetricPlugin::g_registeredMetricPluginList.find(SToUpper(pluginName)) != BedrockMetricPlugin::g_registeredMetricPluginList.end()) { + postProcessedNames.emplace(SToUpper(pluginName)); + continue; + } + + size_t slash = pluginName.rfind('/'); + size_t dot = pluginName.find('.', slash); + string name = pluginName.substr(slash + 1, dot - slash - 1); + string symbolName = "BEDROCK_METRIC_PLUGIN_REGISTER_" + SToUpper(name); + + if(postProcessedNames.find(SToUpper(name)) != postProcessedNames.end()) { + SWARN("Duplicate entry for metric plugin " << name << ", skipping."); + continue; + } + postProcessedNames.insert(SToUpper(name)); + + if (!SEndsWith(pluginName, ".so")) { + pluginName += ".so"; + } + + void* lib = dlopen(pluginName.c_str(), RTLD_NOW); + if(!lib) { + SWARN("Error loading bedrock metric plugin " << pluginName << ": " << dlerror()); + } else { + void* sym = dlsym(lib, symbolName.c_str()); + if (!sym) { + SWARN("Couldn't find symbol " << symbolName); + } else { + BedrockMetricPlugin::g_registeredMetricPluginList.emplace(make_pair(SToUpper(name), (BedrockMetricPlugin*(*)(const SData&))sym)); + } + } + } + + return postProcessedNames; +} + ///////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////// @@ -231,6 +277,7 @@ int main(int argc, char* argv[]) { cout << "-peerList See below" << endl; cout << "-priority See '-peerList Details' below (defaults to 100)" << endl; cout << "-plugins Enable these plugins (defaults to 'db,jobs,cache,mysql')" << endl; + cout << "-metricPlugins Enable metrics plugins (e.g., 'statsd,otel' or .so paths)" << endl; cout << "-cacheSize number of KB to allocate for a page cache (defaults to 1GB)" << endl; cout << "-workerThreads <#> Number of worker threads to start (min 1, defaults to # of cores)" << endl; cout << "-queryLog Set the query log filename (default 'queryLog.csv', SIGUSR2/SIGQUIT to " @@ -298,6 +345,7 @@ int main(int argc, char* argv[]) { SETDEFAULT("-nodeName", SGetHostName()); SETDEFAULT("-cacheSize", SToStr(0)); SETDEFAULT("-plugins", "db,jobs,cache,mysql"); + SETDEFAULT("-metricPlugins", ""); SETDEFAULT("-priority", "100"); SETDEFAULT("-maxJournalSize", "1000000"); SETDEFAULT("-queryLog", "queryLog.csv"); @@ -325,6 +373,7 @@ int main(int argc, char* argv[]) { } args["-plugins"] = SComposeList(loadPlugins(args)); + args["-metricPlugins"] = SComposeList(loadMetricPlugins(args)); // Set our soft limit to the same as our hard limit to allow for more file handles. struct rlimit limits; From a256aef05dae3f45d3a6adc427bc950584e592d4 Mon Sep 17 00:00:00 2001 From: Cole Eason Date: Thu, 30 Oct 2025 15:54:45 -0400 Subject: [PATCH 03/10] metrics: integrate metricPlugins into BedrockServer, add recordMetric(), stop on shutdown --- BedrockServer.cpp | 35 +++++++++++++++++++++++++++++++++++ BedrockServer.h | 8 ++++++++ 2 files changed, 43 insertions(+) diff --git a/BedrockServer.cpp b/BedrockServer.cpp index e866a2221..7c13f46ec 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -78,6 +78,14 @@ shared_ptr BedrockServer::getDBPool() { return _dbPool; } +void BedrockServer::recordMetric(const Metric& metric) +{ + for (auto& p : metricPlugins) { + Metric m = metric; + p.second->enqueue(std::move(m)); + } +} + void BedrockServer::sync() { // Parse out the number of worker threads we'll use. The DB needs to know this because it will expect a @@ -1266,6 +1274,28 @@ BedrockServer::BedrockServer(const SData& args_) } SINFO("Creating BedrockServer with plugins: " << SComposeList(pluginString)); + // Load metric plugins + list metricPluginNameList = SParseList(args["-metricPlugins"]); + if (!metricPluginNameList.empty()) { + SINFO("Loading metric plugins: " << args["-metricPlugins"]); + for (string& pluginName : metricPluginNameList) { + auto it = BedrockMetricPlugin::g_registeredMetricPluginList.find(SToUpper(pluginName)); + if (it == BedrockMetricPlugin::g_registeredMetricPluginList.end()) { + SERROR("Cannot find metric plugin '" << pluginName << "', aborting."); + } + unique_ptr metricPlugin(it->second(args)); + const string nameUpper = SToUpper(metricPlugin->getName()); + metricPlugins.emplace(nameUpper, std::move(metricPlugin)); + } + list metricNames; + for (auto& p : metricPlugins) { + metricNames.emplace_back(p.first); + } + SINFO("Metric plugins enabled: " << SComposeList(metricNames)); + } else { + SINFO("No metric plugins configured."); + } + // If `versionOverride` is set, we throw away what we just did and use the overridden value. // We'll destruct, sort, and then reconstruct the version string passed in so we aren't relying // on the operator to know that they must be sorted. @@ -1336,6 +1366,11 @@ BedrockServer::BedrockServer(const SData& args_) } BedrockServer::~BedrockServer() { + // Stop metric plugins first to allow their threads to exit promptly. + for (auto& p : metricPlugins) { + p.second->stop(); + } + // Shut down the sync thread, (which will shut down worker threads in turn). SINFO("Closing sync thread '" << _syncThreadName << "'"); if (_syncThread.joinable()) { diff --git a/BedrockServer.h b/BedrockServer.h index 53a4adab9..0585933b5 100644 --- a/BedrockServer.h +++ b/BedrockServer.h @@ -9,6 +9,8 @@ #include "BedrockConflictManager.h" #include "BedrockBlockingCommandQueue.h" #include "BedrockTimeoutCommandQueue.h" +#include "BedrockMetricPlugin.h" +#include "BedrockMetric.h" class SQLitePeer; class BedrockCore; @@ -242,6 +244,9 @@ class BedrockServer : public SQLiteServer { // Expose the DB pool to plugins. shared_ptr getDBPool(); + // Record a metric in all registered metric plugins. + void recordMetric(const Metric& metric); + private: // The name of the sync thread. static constexpr auto _syncThreadName = "sync"; @@ -334,6 +339,9 @@ class BedrockServer : public SQLiteServer { // synchronization. shared_ptr _clusterMessenger; + // Metrics plugins registry + map> metricPlugins; + // Functions for checking for and responding to status and control commands. bool _isStatusCommand(const unique_ptr& command); void _status(unique_ptr& command); From e4b436d8938b3f15185bcb2c3f8398c08d87ba94 Mon Sep 17 00:00:00 2001 From: Cole Eason Date: Fri, 31 Oct 2025 12:44:45 -0400 Subject: [PATCH 04/10] metrics: move Metric into BedrockMetricPlugin.h, include libstuff, drop redundant std headers/qualifiers --- BedrockMetric.h | 25 ---------------------- BedrockMetricPlugin.cpp | 30 +++++++++++++------------- BedrockMetricPlugin.h | 47 +++++++++++++++++++++++++---------------- BedrockServer.h | 1 - 4 files changed, 44 insertions(+), 59 deletions(-) delete mode 100644 BedrockMetric.h diff --git a/BedrockMetric.h b/BedrockMetric.h deleted file mode 100644 index 2cc62cd93..000000000 --- a/BedrockMetric.h +++ /dev/null @@ -1,25 +0,0 @@ -#pragma once -#include -#include -#include -#include - -enum class MetricType { - Counter, - Gauge, - Histogram, - Timing, - Set, - Distribution -}; - -struct Metric { - std::string name; - MetricType type{MetricType::Counter}; - double value{0.0}; - std::vector> tags; - uint64_t timestampUnixMs{0}; - double sampleRate{1.0}; -}; - - diff --git a/BedrockMetricPlugin.cpp b/BedrockMetricPlugin.cpp index 9cc64f7d1..28cc82f59 100644 --- a/BedrockMetricPlugin.cpp +++ b/BedrockMetricPlugin.cpp @@ -1,7 +1,7 @@ #include "BedrockMetricPlugin.h" #include "libstuff/libstuff.h" -std::map> BedrockMetricPlugin::g_registeredMetricPluginList; +map> BedrockMetricPlugin::g_registeredMetricPluginList; BedrockMetricPlugin::BedrockMetricPlugin(const SData& args, size_t maxQueueSize) : _args(args), _maxQueueSize(maxQueueSize) @@ -20,9 +20,9 @@ bool BedrockMetricPlugin::enqueue(Metric metric) } { - std::unique_lock lock(_mutex); + unique_lock lock(_mutex); if (_queue.size() >= _maxQueueSize) { - _dropped.fetch_add(1, std::memory_order_relaxed); + _dropped.fetch_add(1, memory_order_relaxed); return false; } _queue.emplace_back(std::move(metric)); @@ -33,13 +33,13 @@ bool BedrockMetricPlugin::enqueue(Metric metric) size_t BedrockMetricPlugin::queueSize() const { - std::unique_lock lock(_mutex); + unique_lock lock(_mutex); return _queue.size(); } uint64_t BedrockMetricPlugin::droppedCount() const { - return _dropped.load(std::memory_order_relaxed); + return _dropped.load(memory_order_relaxed); } void BedrockMetricPlugin::stop() @@ -57,7 +57,7 @@ bool BedrockMetricPlugin::isStopping() const bool BedrockMetricPlugin::tryDequeue(Metric& out) { - std::unique_lock lock(_mutex); + unique_lock lock(_mutex); if (_queue.empty()) { return false; } @@ -68,12 +68,12 @@ bool BedrockMetricPlugin::tryDequeue(Metric& out) bool BedrockMetricPlugin::waitDequeue(Metric& out, uint64_t timeoutMs) { - std::unique_lock lock(_mutex); + unique_lock lock(_mutex); if (_queue.empty()) { if (_stopping.load()) { return false; } - _cv.wait_for(lock, std::chrono::milliseconds(timeoutMs), [&]{ return _stopping.load() || !_queue.empty(); }); + _cv.wait_for(lock, chrono::milliseconds(timeoutMs), [&]{ return _stopping.load() || !_queue.empty(); }); } if (_queue.empty()) { return false; @@ -83,11 +83,11 @@ bool BedrockMetricPlugin::waitDequeue(Metric& out, uint64_t timeoutMs) return true; } -std::vector BedrockMetricPlugin::drainUpTo(size_t maxItems) +vector BedrockMetricPlugin::drainUpTo(size_t maxItems) { - std::vector batch; + vector batch; batch.reserve(maxItems); - std::unique_lock lock(_mutex); + unique_lock lock(_mutex); while (!_queue.empty() && batch.size() < maxItems) { batch.emplace_back(std::move(_queue.front())); _queue.pop_front(); @@ -95,16 +95,16 @@ std::vector BedrockMetricPlugin::drainUpTo(size_t maxItems) return batch; } -std::vector BedrockMetricPlugin::waitAndDrain(size_t maxItems, uint64_t maxWaitMs) +vector BedrockMetricPlugin::waitAndDrain(size_t maxItems, uint64_t maxWaitMs) { - std::unique_lock lock(_mutex); + unique_lock lock(_mutex); if (_queue.empty()) { if (_stopping.load()) { return {}; } - _cv.wait_for(lock, std::chrono::milliseconds(maxWaitMs), [&]{ return _stopping.load() || !_queue.empty(); }); + _cv.wait_for(lock, chrono::milliseconds(maxWaitMs), [&]{ return _stopping.load() || !_queue.empty(); }); } - std::vector batch; + vector batch; batch.reserve(maxItems); while (!_queue.empty() && batch.size() < maxItems) { batch.emplace_back(std::move(_queue.front())); diff --git a/BedrockMetricPlugin.h b/BedrockMetricPlugin.h index 0944e01df..c15e28932 100644 --- a/BedrockMetricPlugin.h +++ b/BedrockMetricPlugin.h @@ -1,24 +1,34 @@ #pragma once +#include "libstuff/libstuff.h" #include -#include -#include -#include -#include #include -#include -#include -#include -#include "libstuff/SData.h" -#include "BedrockMetric.h" + +enum class MetricType { + Counter, + Gauge, + Histogram, + Timing, + Set, + Distribution +}; + +struct Metric { + string name; + MetricType type{MetricType::Counter}; + double value{0.0}; + vector> tags; + uint64_t timestampUnixMs{0}; + double sampleRate{1.0}; +}; class BedrockMetricPlugin { public: - static std::map> g_registeredMetricPluginList; + static map> g_registeredMetricPluginList; explicit BedrockMetricPlugin(const SData& args, size_t maxQueueSize = 100000); virtual ~BedrockMetricPlugin(); - virtual const std::string& getName() const = 0; + virtual const string& getName() const = 0; // Non-blocking enqueue. Returns false if dropped due to full queue or stopping. bool enqueue(Metric metric); @@ -35,18 +45,19 @@ class BedrockMetricPlugin { // For implementers: dequeue helpers to build network loop(s) bool tryDequeue(Metric& out); bool waitDequeue(Metric& out, uint64_t timeoutMs); - std::vector drainUpTo(size_t maxItems); - std::vector waitAndDrain(size_t maxItems, uint64_t maxWaitMs); + vector drainUpTo(size_t maxItems); + vector waitAndDrain(size_t maxItems, uint64_t maxWaitMs); const SData& _args; private: - mutable std::mutex _mutex; - std::condition_variable _cv; - std::deque _queue; - std::atomic _stopping{false}; + mutable mutex _mutex; + condition_variable _cv; + deque _queue; + atomic _stopping{false}; const size_t _maxQueueSize; - std::atomic _dropped{0}; + atomic _dropped{0}; }; + diff --git a/BedrockServer.h b/BedrockServer.h index 0585933b5..fc2767a1c 100644 --- a/BedrockServer.h +++ b/BedrockServer.h @@ -10,7 +10,6 @@ #include "BedrockBlockingCommandQueue.h" #include "BedrockTimeoutCommandQueue.h" #include "BedrockMetricPlugin.h" -#include "BedrockMetric.h" class SQLitePeer; class BedrockCore; From 8d968198788512957a595cd07f044c24aea32e78 Mon Sep 17 00:00:00 2001 From: Cole Eason Date: Fri, 31 Oct 2025 12:52:28 -0400 Subject: [PATCH 05/10] Move plugin stopping together --- BedrockServer.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 7c13f46ec..6d51622ee 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -1366,10 +1366,7 @@ BedrockServer::BedrockServer(const SData& args_) } BedrockServer::~BedrockServer() { - // Stop metric plugins first to allow their threads to exit promptly. - for (auto& p : metricPlugins) { - p.second->stop(); - } + // Shut down the sync thread, (which will shut down worker threads in turn). SINFO("Closing sync thread '" << _syncThreadName << "'"); @@ -1387,6 +1384,11 @@ BedrockServer::~BedrockServer() { delete p.second; } + // Stop metric plugins to allow their threads to exit promptly. + for (auto& p : metricPlugins) { + p.second->stop(); + } + shutdownTimer.serverDestructor = chrono::steady_clock::now(); SINFO("Shutdown timing: " << "start->safeState=" << chrono::duration_cast(shutdownTimer.safeNodeState - shutdownTimer.shutdownStart) From 83588c73263f9984b3f4a88e0274e94baa513519 Mon Sep 17 00:00:00 2001 From: Cole Eason Date: Fri, 31 Oct 2025 13:21:30 -0400 Subject: [PATCH 06/10] metrics(statsd): add StatsDMetricPlugin with per-batch UDP socket, default 1400B datagrams, .so symbol --- plugins/StatsDMetricPlugin.cpp | 193 +++++++++++++++++++++++++++++++++ plugins/StatsDMetricPlugin.h | 46 ++++++++ 2 files changed, 239 insertions(+) create mode 100644 plugins/StatsDMetricPlugin.cpp create mode 100644 plugins/StatsDMetricPlugin.h diff --git a/plugins/StatsDMetricPlugin.cpp b/plugins/StatsDMetricPlugin.cpp new file mode 100644 index 000000000..5b77dc93a --- /dev/null +++ b/plugins/StatsDMetricPlugin.cpp @@ -0,0 +1,193 @@ +#include "plugins/StatsDMetricPlugin.h" +#include +#include +#include + +StatsDMetricPlugin::StatsDMetricPlugin(const SData& args) + : BedrockMetricPlugin(args) +{ + _destHostPort = _args["-statsdServer"]; + if (_args.isSet("-statsdMaxDatagramBytes")) { + _maxDatagramBytes = max(512, _args.calcU64("-statsdMaxDatagramBytes")); + } + // Start worker + _worker = thread(&StatsDMetricPlugin::_networkLoop, this); +} + +StatsDMetricPlugin::~StatsDMetricPlugin() +{ + stop(); + if (_worker.joinable()) { + _worker.join(); + } +} + +const string& StatsDMetricPlugin::getName() const +{ + return _name; +} + +bool StatsDMetricPlugin::_resolveDestination() +{ + if (_destResolved) { + return true; + } + if (_destHostPort.empty()) { + SWARN("StatsD destination '-statsdServer' is not set; dropping metrics."); + return false; + } + + string host; + string portStr; + size_t colon = _destHostPort.rfind(':'); + if (colon == string::npos) { + SWARN("StatsD destination missing port: '" << _destHostPort << "'"); + return false; + } + host = _destHostPort.substr(0, colon); + portStr = _destHostPort.substr(colon + 1); + + addrinfo hints{}; + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_DGRAM; + addrinfo* result = nullptr; + int rc = getaddrinfo(host.c_str(), portStr.c_str(), &hints, &result); + if (rc != 0 || !result) { + SWARN("getaddrinfo failed for StatsD server '" << _destHostPort << "': " << gai_strerror(rc)); + return false; + } + // Take first IPv4 result + memcpy(&_dest, result->ai_addr, sizeof(sockaddr_in)); + freeaddrinfo(result); + _destResolved = true; + return true; +} + +string StatsDMetricPlugin::_sanitizeName(const string& name) +{ + string out; + out.reserve(name.size()); + for (char c : name) { + if ((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9') || c == '_' || c == '.') { + out.push_back(c); + } else { + out.push_back('_'); + } + } + return out; +} + +string StatsDMetricPlugin::_sanitizeTagKey(const string& key) +{ + return _sanitizeName(key); +} + +string StatsDMetricPlugin::_sanitizeTagValue(const string& value) +{ + string out = value; + for (char& c : out) { + if (c == '|') c = '_'; + else if (c == ' ') c = '_'; + else if (c == ',') c = '_'; + else if (c == '\n') c = '_'; + else if (c == ':') c = '_'; + } + return out; +} + +string StatsDMetricPlugin::_formatLine(const Metric& m) const +{ + const string name = _sanitizeName(m.name); + string line = name + ":" + SToStr(m.value) + "|"; + switch (m.type) { + case MetricType::Counter: line += "c"; break; + case MetricType::Gauge: line += "g"; break; + case MetricType::Timing: line += "ms"; break; + case MetricType::Set: line += "s"; break; + case MetricType::Histogram: line += "g"; break; // map to gauge + case MetricType::Distribution: line += "g"; break; // map to gauge + } + if (m.sampleRate > 0.0 && m.sampleRate < 1.0) { + line += "|@" + SToStr(m.sampleRate); + } + if (!m.tags.empty()) { + line += "|#"; + bool first = true; + for (const auto& kv : m.tags) { + if (!first) { + line += ","; + } + first = false; + line += _sanitizeTagKey(kv.first); + if (!kv.second.empty()) { + line += ":" + _sanitizeTagValue(kv.second); + } + } + } + return line; +} + +void StatsDMetricPlugin::_sendBatch(vector& lines) +{ + if (lines.empty()) { + return; + } + if (!_resolveDestination()) { + return; + } + + // Build datagrams up to _maxDatagramBytes and send each via a fresh socket + string buffer; + buffer.reserve(min(_maxDatagramBytes, 4096)); + auto flush = [&](const string& payload){ + if (payload.empty()) return; + int s = socket(AF_INET, SOCK_DGRAM, 0); + if (s < 0) { + SWARN("StatsD: failed to create UDP socket, dropping batch"); + return; + } + ssize_t sent = sendto(s, payload.data(), payload.size(), 0, (sockaddr*)&_dest, sizeof(_dest)); + if (sent < 0 || (size_t)sent != payload.size()) { + SWARN("StatsD: sendto failed or partial, dropped: " << errno); + } + close(s); + }; + + for (const string& line : lines) { + const size_t needed = (buffer.empty() ? 0 : 1) + line.size(); + if (buffer.size() + needed > _maxDatagramBytes) { + flush(buffer); + buffer.clear(); + } + if (!buffer.empty()) buffer.push_back('\n'); + if (line.size() > _maxDatagramBytes) { + // If a single line is larger than the datagram size, drop it. + continue; + } + buffer += line; + } + flush(buffer); +} + +void StatsDMetricPlugin::_networkLoop() +{ + while (!isStopping()) { + vector batch = waitAndDrain(_maxBatch, _maxWaitMs); + if (batch.empty()) { + continue; + } + vector lines; + lines.reserve(batch.size()); + for (const auto& m : batch) { + lines.emplace_back(_formatLine(m)); + } + _sendBatch(lines); + } +} + +extern "C" BedrockMetricPlugin* BEDROCK_METRIC_PLUGIN_REGISTER_STATSD(const SData& args) +{ + return new StatsDMetricPlugin(args); +} + + diff --git a/plugins/StatsDMetricPlugin.h b/plugins/StatsDMetricPlugin.h new file mode 100644 index 000000000..30c565e15 --- /dev/null +++ b/plugins/StatsDMetricPlugin.h @@ -0,0 +1,46 @@ +#pragma once +#include "libstuff/libstuff.h" +#include "BedrockMetricPlugin.h" + +class StatsDMetricPlugin : public BedrockMetricPlugin { + public: + explicit StatsDMetricPlugin(const SData& args); + ~StatsDMetricPlugin() override; + + const string& getName() const override; + + private: + // Worker thread for draining and sending metrics. + void _networkLoop(); + + // Format one Metric into a StatsD line. + string _formatLine(const Metric& m) const; + + // Join lines into datagrams, respecting max size, and send them. + void _sendBatch(vector& lines); + + // Parse and resolve the destination host:port. + bool _resolveDestination(); + + // Sanitization helpers. + static string _sanitizeName(const string& name); + static string _sanitizeTagKey(const string& key); + static string _sanitizeTagValue(const string& value); + + // Destination and config + string _destHostPort; // from -statsdServer + sockaddr_in _dest{}; // resolved destination + bool _destResolved{false}; + + size_t _maxDatagramBytes{1400}; // from -statsdMaxDatagramBytes (default 1400) + size_t _maxBatch{512}; // per drain + uint64_t _maxWaitMs{50}; // wait for batch accumulation + + thread _worker; + string _name{"STATSD"}; +}; + +// Dynamic loader entry point for .so usage. +extern "C" BedrockMetricPlugin* BEDROCK_METRIC_PLUGIN_REGISTER_STATSD(const SData& args); + + From 549ca8b361939f258f2735d1bc7ed85cf729b2a2 Mon Sep 17 00:00:00 2001 From: Cole Eason Date: Fri, 31 Oct 2025 13:53:13 -0400 Subject: [PATCH 07/10] metrics(statsd): fix includes, use SFastBuffer correctly, avoid shutdown(), and adjust newline append --- plugins/StatsDMetricPlugin.cpp | 100 ++++++++++++--------------------- plugins/StatsDMetricPlugin.h | 6 +- 2 files changed, 40 insertions(+), 66 deletions(-) diff --git a/plugins/StatsDMetricPlugin.cpp b/plugins/StatsDMetricPlugin.cpp index 5b77dc93a..015e5bcc2 100644 --- a/plugins/StatsDMetricPlugin.cpp +++ b/plugins/StatsDMetricPlugin.cpp @@ -1,14 +1,23 @@ #include "plugins/StatsDMetricPlugin.h" -#include -#include -#include +#include "libstuff/SFastBuffer.h" +#include + +#undef SLOGPREFIX +#define SLOGPREFIX "{" << getName() << "} " + +const string StatsDMetricPlugin::name("StatsD"); + +extern "C" BedrockMetricPlugin* BEDROCK_METRIC_PLUGIN_REGISTER_STATSD(const SData& args) +{ + return new StatsDMetricPlugin(args); +} StatsDMetricPlugin::StatsDMetricPlugin(const SData& args) : BedrockMetricPlugin(args) { - _destHostPort = _args["-statsdServer"]; - if (_args.isSet("-statsdMaxDatagramBytes")) { - _maxDatagramBytes = max(512, _args.calcU64("-statsdMaxDatagramBytes")); + _destHostPort = args["-statsdServer"]; + if (args.isSet("-statsdMaxDatagramBytes")) { + _maxDatagramBytes = max(512, args.calcU64("-statsdMaxDatagramBytes")); } // Start worker _worker = thread(&StatsDMetricPlugin::_networkLoop, this); @@ -24,43 +33,7 @@ StatsDMetricPlugin::~StatsDMetricPlugin() const string& StatsDMetricPlugin::getName() const { - return _name; -} - -bool StatsDMetricPlugin::_resolveDestination() -{ - if (_destResolved) { - return true; - } - if (_destHostPort.empty()) { - SWARN("StatsD destination '-statsdServer' is not set; dropping metrics."); - return false; - } - - string host; - string portStr; - size_t colon = _destHostPort.rfind(':'); - if (colon == string::npos) { - SWARN("StatsD destination missing port: '" << _destHostPort << "'"); - return false; - } - host = _destHostPort.substr(0, colon); - portStr = _destHostPort.substr(colon + 1); - - addrinfo hints{}; - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_DGRAM; - addrinfo* result = nullptr; - int rc = getaddrinfo(host.c_str(), portStr.c_str(), &hints, &result); - if (rc != 0 || !result) { - SWARN("getaddrinfo failed for StatsD server '" << _destHostPort << "': " << gai_strerror(rc)); - return false; - } - // Take first IPv4 result - memcpy(&_dest, result->ai_addr, sizeof(sockaddr_in)); - freeaddrinfo(result); - _destResolved = true; - return true; + return name; } string StatsDMetricPlugin::_sanitizeName(const string& name) @@ -132,25 +105,28 @@ void StatsDMetricPlugin::_sendBatch(vector& lines) if (lines.empty()) { return; } - if (!_resolveDestination()) { - return; - } // Build datagrams up to _maxDatagramBytes and send each via a fresh socket - string buffer; - buffer.reserve(min(_maxDatagramBytes, 4096)); - auto flush = [&](const string& payload){ - if (payload.empty()) return; - int s = socket(AF_INET, SOCK_DGRAM, 0); + SFastBuffer buffer; + + // Flushing is fire and forget, we don't check for a return from statsd + auto flush = [&](SFastBuffer& payload){ + if (payload.empty()) { + return; + } + int s = S_socket(_destHostPort, false, false, false); if (s < 0) { - SWARN("StatsD: failed to create UDP socket, dropping batch"); + SWARN("Failed to create socket to " << _destHostPort << ", dropping batch"); return; } - ssize_t sent = sendto(s, payload.data(), payload.size(), 0, (sockaddr*)&_dest, sizeof(_dest)); - if (sent < 0 || (size_t)sent != payload.size()) { - SWARN("StatsD: sendto failed or partial, dropped: " << errno); + bool result = S_sendconsume(s, payload); + if (!result) { + SWARN("Failed to send to " << _destHostPort << ", dropping batch"); + close(s); + return; } close(s); + return; }; for (const string& line : lines) { @@ -159,9 +135,14 @@ void StatsDMetricPlugin::_sendBatch(vector& lines) flush(buffer); buffer.clear(); } - if (!buffer.empty()) buffer.push_back('\n'); + + if (!buffer.empty()) { + buffer.append("\n", 1); + } + if (line.size() > _maxDatagramBytes) { // If a single line is larger than the datagram size, drop it. + SWARN("Dropping line larger than datagram size: " << line.size()); continue; } buffer += line; @@ -184,10 +165,3 @@ void StatsDMetricPlugin::_networkLoop() _sendBatch(lines); } } - -extern "C" BedrockMetricPlugin* BEDROCK_METRIC_PLUGIN_REGISTER_STATSD(const SData& args) -{ - return new StatsDMetricPlugin(args); -} - - diff --git a/plugins/StatsDMetricPlugin.h b/plugins/StatsDMetricPlugin.h index 30c565e15..89b6c8947 100644 --- a/plugins/StatsDMetricPlugin.h +++ b/plugins/StatsDMetricPlugin.h @@ -1,6 +1,7 @@ #pragma once #include "libstuff/libstuff.h" #include "BedrockMetricPlugin.h" +#include class StatsDMetricPlugin : public BedrockMetricPlugin { public: @@ -29,15 +30,14 @@ class StatsDMetricPlugin : public BedrockMetricPlugin { // Destination and config string _destHostPort; // from -statsdServer - sockaddr_in _dest{}; // resolved destination - bool _destResolved{false}; size_t _maxDatagramBytes{1400}; // from -statsdMaxDatagramBytes (default 1400) size_t _maxBatch{512}; // per drain uint64_t _maxWaitMs{50}; // wait for batch accumulation thread _worker; - string _name{"STATSD"}; + + static const string name; }; // Dynamic loader entry point for .so usage. From 8b1177b4182bd20bd6d22b72d0678b761cf3b449 Mon Sep 17 00:00:00 2001 From: Cole Eason Date: Fri, 31 Oct 2025 21:34:47 +0000 Subject: [PATCH 08/10] Close sockets, add logs --- plugins/StatsDMetricPlugin.cpp | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/plugins/StatsDMetricPlugin.cpp b/plugins/StatsDMetricPlugin.cpp index 015e5bcc2..cbe67dc6b 100644 --- a/plugins/StatsDMetricPlugin.cpp +++ b/plugins/StatsDMetricPlugin.cpp @@ -1,5 +1,6 @@ #include "plugins/StatsDMetricPlugin.h" #include "libstuff/SFastBuffer.h" +#include "libstuff/SData.h" #include #undef SLOGPREFIX @@ -15,9 +16,10 @@ extern "C" BedrockMetricPlugin* BEDROCK_METRIC_PLUGIN_REGISTER_STATSD(const SDat StatsDMetricPlugin::StatsDMetricPlugin(const SData& args) : BedrockMetricPlugin(args) { + SDEBUG("Loaded StatsD metric plugin"); _destHostPort = args["-statsdServer"]; if (args.isSet("-statsdMaxDatagramBytes")) { - _maxDatagramBytes = max(512, args.calcU64("-statsdMaxDatagramBytes")); + _maxDatagramBytes = max(1400, args.calcU64("-statsdMaxDatagramBytes")); } // Start worker _worker = thread(&StatsDMetricPlugin::_networkLoop, this); @@ -70,7 +72,8 @@ string StatsDMetricPlugin::_sanitizeTagValue(const string& value) string StatsDMetricPlugin::_formatLine(const Metric& m) const { - const string name = _sanitizeName(m.name); + const string hostname = SGetHostName(); + const string name = _sanitizeName(hostname + "." + m.name); string line = name + ":" + SToStr(m.value) + "|"; switch (m.type) { case MetricType::Counter: line += "c"; break; @@ -122,10 +125,10 @@ void StatsDMetricPlugin::_sendBatch(vector& lines) bool result = S_sendconsume(s, payload); if (!result) { SWARN("Failed to send to " << _destHostPort << ", dropping batch"); - close(s); + ::close(s); return; } - close(s); + ::close(s); return; }; @@ -148,11 +151,13 @@ void StatsDMetricPlugin::_sendBatch(vector& lines) buffer += line; } flush(buffer); + SDEBUG("Flushed " << lines.size() << " lines to " << _destHostPort); } void StatsDMetricPlugin::_networkLoop() { while (!isStopping()) { + SDEBUG("Waiting for batch"); vector batch = waitAndDrain(_maxBatch, _maxWaitMs); if (batch.empty()) { continue; From 8262c1b33f6d9f704b09789bf79507608544605a Mon Sep 17 00:00:00 2001 From: Cole Eason Date: Fri, 31 Oct 2025 21:35:09 +0000 Subject: [PATCH 09/10] Feature complete POC testing --- BedrockMetricPlugin.h | 2 +- BedrockServer.cpp | 29 +++++++++++++++++++++++++++++ main.cpp | 4 ++++ test/lib/BedrockTester.cpp | 2 ++ 4 files changed, 36 insertions(+), 1 deletion(-) diff --git a/BedrockMetricPlugin.h b/BedrockMetricPlugin.h index c15e28932..f256f8c51 100644 --- a/BedrockMetricPlugin.h +++ b/BedrockMetricPlugin.h @@ -15,7 +15,7 @@ enum class MetricType { struct Metric { string name; MetricType type{MetricType::Counter}; - double value{0.0}; + uint64_t value{0}; vector> tags; uint64_t timestampUnixMs{0}; double sampleRate{1.0}; diff --git a/BedrockServer.cpp b/BedrockServer.cpp index 6d51622ee..af08f216f 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -82,6 +82,7 @@ void BedrockServer::recordMetric(const Metric& metric) { for (auto& p : metricPlugins) { Metric m = metric; + SDEBUG("Enqueuing metric " << m.name << " to " << p.second->getName()); p.second->enqueue(std::move(m)); } } @@ -498,6 +499,18 @@ void BedrockServer::sync() SINFO("Sync thread dequeued command " << command->request.methodLine << ". Sync thread has " << _syncNodeQueuedCommands.size() << " queued commands."); + recordMetric(Metric{ + .name = "bedrock.syncThread.dequeue", + .type = MetricType::Counter, + .value = 1 + }); + + recordMetric(Metric{ + .name = "bedrock.syncThread.depth", + .type = MetricType::Gauge, + .value = _syncNodeQueuedCommands.size() + }); + if (command->timeout() < STimeNow()) { SINFO("Command '" << command->request.methodLine << "' timed out in sync thread queue, sending back to main queue."); _commandQueue.push(move(command)); @@ -1545,6 +1558,22 @@ void BedrockServer::_reply(unique_ptr& command) { // Finalize timing info even for commands we won't respond to (this makes this data available in logs). command->finalizeTimingInfo(); + recordMetric(Metric{ + .name = "bedrock.requestTime." + command->request.methodLine + ".prePeekTotal", + .type = MetricType::Timing, + .value = command->response.calcU64("prePeekTotal")/1000 + }); + recordMetric(Metric{ + .name = "bedrock.requestTime." + command->request.methodLine + ".peekTotal", + .type = MetricType::Timing, + .value = command->response.calcU64("peekTotal")/1000 + }); + recordMetric(Metric{ + .name = "bedrock.requestTime." + command->request.methodLine + ".processTotal", + .type = MetricType::Timing, + .value = command->response.calcU64("processTotal")/1000 + }); + // Don't reply to commands with pseudo-clients (i.e., commands that we generated by other commands, or using // `Connection: forget`. if (command->initiatingClientID < 0) { diff --git a/main.cpp b/main.cpp index f6c84469c..77a7eaa59 100644 --- a/main.cpp +++ b/main.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -56,6 +57,9 @@ set loadMetricPlugins(SData& args) { // Register built-in metric plugins here if any (none by default) // Example: BedrockMetricPlugin::g_registeredMetricPluginList.emplace(make_pair("STATSD", [](const SData& a){return new StatsDMetricPlugin(a);})); + BedrockMetricPlugin::g_registeredMetricPluginList.emplace(make_pair("STATSD", [](const SData& a){return new StatsDMetricPlugin(a);})); + + for (string pluginName : plugins) { if (BedrockMetricPlugin::g_registeredMetricPluginList.find(SToUpper(pluginName)) != BedrockMetricPlugin::g_registeredMetricPluginList.end()) { postProcessedNames.emplace(SToUpper(pluginName)); diff --git a/test/lib/BedrockTester.cpp b/test/lib/BedrockTester.cpp index cf767de5c..314413206 100644 --- a/test/lib/BedrockTester.cpp +++ b/test/lib/BedrockTester.cpp @@ -84,6 +84,8 @@ BedrockTester::BedrockTester(const map& args, {"-escalateOverHTTP", "true"}, {"-cacheSize", "1000"}, {"-parallelReplication", "true"}, + {"-metricPlugins", "STATSD"}, + {"-statsdServer", "10.0.130.11:9125"}, // Currently breaks only in Travis and needs debugging, which has been removed, maybe? //{"-logDirectlyToSyslogSocket", ""}, {"-testName", currentTestName}, From 5ef79aa306195141fb63a7009f0efdf79915eb8f Mon Sep 17 00:00:00 2001 From: Cole Eason Date: Fri, 21 Nov 2025 01:37:49 +0000 Subject: [PATCH 10/10] Replace all currently tracked metrics with first class metrics --- BedrockCommand.cpp | 13 +++++ BedrockCore.cpp | 7 +++ BedrockMetrics.cpp | 29 ++++++++++ BedrockMetrics.h | 10 ++++ BedrockServer.cpp | 100 +++++++++++++++++++++++++++++------ sqlitecluster/SQLite.cpp | 21 ++++++++ sqlitecluster/SQLiteNode.cpp | 4 ++ 7 files changed, 167 insertions(+), 17 deletions(-) create mode 100644 BedrockMetrics.cpp create mode 100644 BedrockMetrics.h diff --git a/BedrockCommand.cpp b/BedrockCommand.cpp index 807f99993..f48a7d548 100644 --- a/BedrockCommand.cpp +++ b/BedrockCommand.cpp @@ -289,6 +289,19 @@ void BedrockCommand::finalizeTimingInfo() { {"unaccountedTime", unaccountedTime}, }; + // Also include commit and queue breakdowns so the reply contains everything needed for metrics. + valuePairs.emplace("commitWorkerTime", commitWorkerTotal); + valuePairs.emplace("commitSyncTime", commitSyncTotal); + valuePairs.emplace("queueWorkerTime", queueWorkerTotal); + valuePairs.emplace("queueSyncTime", queueSyncTotal); + valuePairs.emplace("queueBlockingTime", queueBlockingTotal); + valuePairs.emplace("queuePageLockTime", queuePageLockTotal); + valuePairs.emplace("blockingCommitPrePeekTime", blockingPrePeekTotal); + valuePairs.emplace("blockingCommitPeekTime", blockingPeekTotal); + valuePairs.emplace("blockingCommitProcessTime", blockingProcessTotal); + valuePairs.emplace("blockingCommitPostProcessTime", blockingPostProcessTotal); + valuePairs.emplace("blockingCommitCommitTime", blockingCommitWorkerTotal); + // We also want to know what leader did if we're on a follower. uint64_t upstreamPeekTime = 0; uint64_t upstreamProcessTime = 0; diff --git a/BedrockCore.cpp b/BedrockCore.cpp index 04a3ae6ca..ab35c7272 100644 --- a/BedrockCore.cpp +++ b/BedrockCore.cpp @@ -2,6 +2,7 @@ #include "BedrockCore.h" #include "BedrockPlugin.h" #include "BedrockServer.h" +#include "BedrockMetrics.h" BedrockCore::BedrockCore(SQLite& db, const BedrockServer& server) : SQLiteCore(db), @@ -45,6 +46,12 @@ uint64_t BedrockCore::_getRemainingTime(const unique_ptr& comman // Already expired. if (adjustedTimeout <= 0 || (isProcessing && processTimeout <= 0)) { SALERT("Command " << command->request.methodLine << " timed out."); + // metric: command timed out + GlobalRecordMetric(Metric{ + .name = string("bedrock.commandTimedOut.") + command->request.methodLine, + .type = MetricType::Counter, + .value = 1 + }); STHROW("555 Timeout"); } diff --git a/BedrockMetrics.cpp b/BedrockMetrics.cpp new file mode 100644 index 000000000..d16160a34 --- /dev/null +++ b/BedrockMetrics.cpp @@ -0,0 +1,29 @@ +#include "BedrockMetrics.h" +#include + +namespace { +function g_recorder; +mutex g_mutex; +} + +void SetGlobalMetricRecorder(const function& recorder) +{ + if (!recorder) { + g_recorder = nullptr; + return; + } + lock_guard lock(g_mutex); + g_recorder = recorder; +} + +void GlobalRecordMetric(const Metric& metric) +{ + function local; + { + lock_guard lock(g_mutex); + local = g_recorder; + } + if (local) { + local(metric); + } +} diff --git a/BedrockMetrics.h b/BedrockMetrics.h new file mode 100644 index 000000000..4b435efd3 --- /dev/null +++ b/BedrockMetrics.h @@ -0,0 +1,10 @@ +// Minimal global metrics helper to allow emission from non-server code. +#pragma once +#include "BedrockMetricPlugin.h" +#include + +// Set the global metric recorder callback. +void SetGlobalMetricRecorder(const function& recorder); + +// Record a metric via the global recorder if configured. +void GlobalRecordMetric(const Metric& metric); diff --git a/BedrockServer.cpp b/BedrockServer.cpp index af08f216f..0fd2106aa 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -18,6 +18,7 @@ #include #include #include +#include "BedrockMetrics.h" setBedrockServer::_blacklistedParallelCommands; shared_timed_mutex BedrockServer::_blacklistedParallelCommandMutex; @@ -451,6 +452,9 @@ void BedrockServer::sync() << " after failed sync commit. Sync thread has " << _syncNodeQueuedCommands.size() << " queued commands."); _syncNodeQueuedCommands.push(move(command)); + // metrics: sync thread queue + recordMetric(Metric{ .name = "bedrock.syncThreadQueue.enqueue", .type = MetricType::Counter, .value = 1 }); + recordMetric(Metric{ .name = "bedrock.syncThreadQueue.depth", .type = MetricType::Gauge, .value = (uint64_t)_syncNodeQueuedCommands.size() }); } else { SERROR("Unexpected sync thread commit state."); } @@ -715,8 +719,16 @@ void BedrockServer::worker(int threadId) command = commandQueue.get(100000); SAUTOPREFIX(command->request); - SINFO("Dequeued command " << command->request.methodLine << " (" << command->id << ") in worker, " - << commandQueue.size() << " commands in " << (threadId ? "" : "blocking") << " queue."); + SINFO("Dequeued command " << command->request.methodLine << " (" << command->id << ") in worker, " + << commandQueue.size() << " commands in " << (threadId ? "" : "blocking") << " queue."); + // metrics: queue depths and blocking queue dequeues + if (threadId == 0) { + // blocking commit thread + recordMetric(Metric{ .name = "bedrock.blockingQueue.enqueue", .type = MetricType::Counter, .value = 1 }); + recordMetric(Metric{ .name = "bedrock.blockingQueue.depth", .type = MetricType::Gauge, .value = (uint64_t)commandQueue.size() }); + } else { + recordMetric(Metric{ .name = "bedrock.queue.worker", .type = MetricType::Gauge, .value = (uint64_t)commandQueue.size() }); + } runCommand(move(command), threadId == 0, false); } catch (const BedrockCommandQueue::timeout_error& e) { @@ -986,6 +998,9 @@ void BedrockServer::runCommand(unique_ptr&& _command, bool isBlo SINFO("Sending non-parallel command " << command->request.methodLine << " to sync thread. Sync thread has " << _syncNodeQueuedCommands.size() << " queued commands."); _syncNodeQueuedCommands.push(move(command)); + // metrics: sync thread queue + recordMetric(Metric{ .name = "bedrock.syncThreadQueue.enqueue", .type = MetricType::Counter, .value = 1 }); + recordMetric(Metric{ .name = "bedrock.syncThreadQueue.depth", .type = MetricType::Gauge, .value = (uint64_t)_syncNodeQueuedCommands.size() }); } else if (_clusterMessengerCopy && _clusterMessengerCopy->runOnPeer(*command, true)) { SINFO("Escalated " << command->request.methodLine << " to leader and complete, responding."); _reply(command); @@ -1044,6 +1059,12 @@ void BedrockServer::runCommand(unique_ptr&& _command, bool isBlo command->complete = true; } else { SINFO("Conflict or state change committing " << command->request.methodLine); + // metric: conflict committing + recordMetric(Metric{ + .name = string("bedrock.multiwrite.conflict.") + command->request.methodLine, + .type = MetricType::Counter, + .value = 1 + }); if (_enableConflictPageLocks) { lastConflictLocation = db.getLastConflictLocation(); @@ -1305,8 +1326,12 @@ BedrockServer::BedrockServer(const SData& args_) metricNames.emplace_back(p.first); } SINFO("Metric plugins enabled: " << SComposeList(metricNames)); + // Expose a global recorder for non-server code paths. + SetGlobalMetricRecorder([this](const Metric& m){ this->recordMetric(m); }); } else { SINFO("No metric plugins configured."); + // Ensure the global recorder is cleared when metrics are disabled. + SetGlobalMetricRecorder(nullptr); } // If `versionOverride` is set, we throw away what we just did and use the overridden value. @@ -1558,21 +1583,62 @@ void BedrockServer::_reply(unique_ptr& command) { // Finalize timing info even for commands we won't respond to (this makes this data available in logs). command->finalizeTimingInfo(); - recordMetric(Metric{ - .name = "bedrock.requestTime." + command->request.methodLine + ".prePeekTotal", - .type = MetricType::Timing, - .value = command->response.calcU64("prePeekTotal")/1000 - }); - recordMetric(Metric{ - .name = "bedrock.requestTime." + command->request.methodLine + ".peekTotal", - .type = MetricType::Timing, - .value = command->response.calcU64("peekTotal")/1000 - }); - recordMetric(Metric{ - .name = "bedrock.requestTime." + command->request.methodLine + ".processTotal", - .type = MetricType::Timing, - .value = command->response.calcU64("processTotal")/1000 - }); + // Emit full timing metrics (ms) and counts. + auto metricNameBase = string("bedrock.requestTime.") + command->request.methodLine + "."; + auto emitTiming = [&](const string& suffix, uint64_t us){ + if (us) { + recordMetric(Metric{ + .name = metricNameBase + suffix, + .type = MetricType::Timing, + .value = us/1000 + }); + } + }; + emitTiming("prePeekTime", command->response.calcU64("prePeekTime")); + emitTiming("peekTime", command->response.calcU64("peekTime")); + emitTiming("processTime", command->response.calcU64("processTime")); + emitTiming("postProcessTime", command->response.calcU64("postProcessTime")); + emitTiming("totalTime", command->response.calcU64("totalTime")); + emitTiming("unaccountedTime", command->response.calcU64("unaccountedTime")); + emitTiming("commitWorkerTime", command->response.calcU64("commitWorkerTime")); + emitTiming("commitSyncTime", command->response.calcU64("commitSyncTime")); + emitTiming("queueWorkerTime", command->response.calcU64("queueWorkerTime")); + emitTiming("queueSyncTime", command->response.calcU64("queueSyncTime")); + emitTiming("queueBlockingTime", command->response.calcU64("queueBlockingTime")); + emitTiming("queuePageLockTime", command->response.calcU64("queuePageLockTime")); + emitTiming("blockingCommitPrePeekTime", command->response.calcU64("blockingCommitPrePeekTime")); + emitTiming("blockingCommitPeekTime", command->response.calcU64("blockingCommitPeekTime")); + emitTiming("blockingCommitProcessTime", command->response.calcU64("blockingCommitProcessTime")); + emitTiming("blockingCommitPostProcessTime", command->response.calcU64("blockingCommitPostProcessTime")); + emitTiming("blockingCommitCommitTime", command->response.calcU64("blockingCommitCommitTime")); + emitTiming("upstreamPeekTime", command->response.calcU64("upstreamPeekTime")); + emitTiming("upstreamProcessTime", command->response.calcU64("upstreamProcessTime")); + emitTiming("upstreamTotalTime", command->response.calcU64("upstreamTotalTime")); + emitTiming("upstreamUnaccountedTime", command->response.calcU64("upstreamUnaccountedTime")); + emitTiming("escalationTime", command->response.calcU64("escalationTime")); + + // Emit counts as counters. + if (command->prePeekCount) { + recordMetric(Metric{ .name = metricNameBase + "prePeekCount", .type = MetricType::Counter, .value = (uint64_t)command->prePeekCount }); + } + if (command->peekCount) { + recordMetric(Metric{ .name = metricNameBase + "peekCount", .type = MetricType::Counter, .value = (uint64_t)command->peekCount }); + } + if (command->processCount) { + recordMetric(Metric{ .name = metricNameBase + "processCount", .type = MetricType::Counter, .value = (uint64_t)command->processCount }); + } + if (command->postProcessCount) { + recordMetric(Metric{ .name = metricNameBase + "postProcessCount", .type = MetricType::Counter, .value = (uint64_t)command->postProcessCount }); + } + + // Slow command special-case + if (command->response.calcU64("totalTime")/1000 > 30000) { + recordMetric(Metric{ + .name = string("bedrock.slowCommand.") + command->request.methodLine + ".totalTime", + .type = MetricType::Timing, + .value = command->response.calcU64("totalTime")/1000 + }); + } // Don't reply to commands with pseudo-clients (i.e., commands that we generated by other commands, or using // `Connection: forget`. diff --git a/sqlitecluster/SQLite.cpp b/sqlitecluster/SQLite.cpp index 3c29ed468..f52cd958d 100644 --- a/sqlitecluster/SQLite.cpp +++ b/sqlitecluster/SQLite.cpp @@ -6,6 +6,7 @@ #include #include #include +#include "BedrockMetrics.h" #define DBINFO(_MSG_) SINFO("{" << _filename << "} " << _MSG_) @@ -441,6 +442,9 @@ bool SQLite::beginTransaction(SQLite::TRANSACTION_TYPE type) { _sharedData.openTransactionCount++; SINFO("Beginning transaction - open transaction count: " << (_sharedData.openTransactionCount)); + // metrics: open DB transactions + GlobalRecordMetric(Metric{ .name = "bedrock.openDatabaseTransactions.enqueue", .type = MetricType::Counter, .value = 1 }); + GlobalRecordMetric(Metric{ .name = "bedrock.openDatabaseTransactions.depth", .type = MetricType::Gauge, .value = (uint64_t)_sharedData.openTransactionCount }); uint64_t before = STimeNow(); _insideTransaction = !SQuery(_db, "starting db transaction", "BEGIN CONCURRENT"); @@ -739,6 +743,8 @@ bool SQLite::prepare(uint64_t* transactionID, string* transactionhash) { auto end = STimeNow(); if (end - start > 5'000) { SINFO("Waited " << (end - start) << "us for commit lock."); + // metrics: commit lock wait + GlobalRecordMetric(Metric{ .name = "bedrock.waitingCommitLock", .type = MetricType::Timing, .value = (uint64_t)((end - start)/1000) }); } _sharedData._commitLockTimer.start("SHARED"); _mutexLocked = true; @@ -879,6 +885,10 @@ int SQLite::commit(const string& description, function* preCheckpointCal auto end = STimeNow(); SINFO("Checkpoint with type=" << _checkpointMode << " complete with " << framesCheckpointed << " frames checkpointed of " << _sharedData.outstandingFramesToCheckpoint << " frames outstanding in " << (end - start) << "us."); + GlobalRecordMetric(Metric{ .name = "bedrock.checkpointedFrames", .type = MetricType::Gauge, .value = (uint64_t)framesCheckpointed }); + GlobalRecordMetric(Metric{ .name = "bedrock.totalFrames", .type = MetricType::Gauge, .value = (uint64_t)_sharedData.outstandingFramesToCheckpoint }); + GlobalRecordMetric(Metric{ .name = "bedrock.checkpointingTime", .type = MetricType::Timing, .value = (uint64_t)((end - start)/1000) }); + // It might not actually be 0, but we'll just let sqlite tell us what it is next time _walHookCallback runs. _sharedData.outstandingFramesToCheckpoint = 0; } @@ -887,6 +897,13 @@ int SQLite::commit(const string& description, function* preCheckpointCal SINFO(description << " COMMIT " << SToStr(_sharedData.commitCount) << " complete in " << time << ". Wrote " << (endPages - startPages) << " pages. WAL file size is " << sz << " bytes. " << _readQueryCount << " read queries attempted, " << _writeQueryCount << " write queries attempted, " << _cacheHits << " served from cache. Used journal " << _journalName); + + + if (SContains(description, "LEADING")) { + GlobalRecordMetric(Metric{ .name = "bedrock.multiwrite.successful.anyCommand", .type = MetricType::Counter, .value = 1 }); + } + GlobalRecordMetric(Metric{ .name = string("bedrock.commitedQueries.") + _journalName + ".read.enqueue", .type = MetricType::Counter, .value = (uint64_t)_readQueryCount }); + GlobalRecordMetric(Metric{ .name = string("bedrock.commitedQueries.") + _journalName + ".write.enqueue", .type = MetricType::Counter, .value = (uint64_t)_writeQueryCount }); _readQueryCount = 0; _writeQueryCount = 0; _cacheHits = 0; @@ -957,6 +974,10 @@ void SQLite::rollback() { } _queryCache.clear(); SINFO("[performance] Transaction rollback with " << _readQueryCount << " read queries attempted, " << _writeQueryCount << " write queries attempted, " << _cacheHits << " served from cache."); + GlobalRecordMetric(Metric{ .name = "bedrock.uncommitedQueries.read.enqueue", .type = MetricType::Counter, .value = (uint64_t)_readQueryCount }); + GlobalRecordMetric(Metric{ .name = "bedrock.uncommitedQueries.write.enqueue", .type = MetricType::Counter, .value = (uint64_t)_writeQueryCount }); + GlobalRecordMetric(Metric{ .name = "bedrock.uncommitedQueries.cacheHits.enqueue", .type = MetricType::Counter, .value = (uint64_t)_cacheHits }); + _readQueryCount = 0; _writeQueryCount = 0; _cacheHits = 0; diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 837d8953b..19f5e929c 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -10,6 +10,7 @@ #include #include #include +#include "BedrockMetrics.h" // Convenience class for maintaining connections with a mesh of peers #define PDEBUG(_MSG_) SDEBUG("->{" << peer->name << "} " << _MSG_) @@ -2329,6 +2330,9 @@ void SQLiteNode::_handlePrepareTransaction(SQLite& db, SQLitePeer* peer, const S float applyTimeMS = (float)applyTimeUS / 1000.0; PINFO("[performance] Replicated transaction " << message.calcU64("NewCount") << ", sent by leader at " << leaderSentTimestamp << ", transit/dequeue time: " << transitTimeMS << "ms, applied in: " << applyTimeMS << "ms, should COMMIT next."); + + GlobalRecordMetric(Metric{ .name = "bedrock.replication.dequeueTime", .type = MetricType::Timing, .value = transitTimeUS/1000 }); + GlobalRecordMetric(Metric{ .name = "bedrock.replication.queryTime", .type = MetricType::Timing, .value = applyTimeUS/1000 }); } int SQLiteNode::_handleCommitTransaction(SQLite& db, SQLitePeer* peer, const uint64_t commandCommitCount, const string& commandCommitHash) {