diff --git a/BedrockCommand.cpp b/BedrockCommand.cpp index 807f99993..f48a7d548 100644 --- a/BedrockCommand.cpp +++ b/BedrockCommand.cpp @@ -289,6 +289,19 @@ void BedrockCommand::finalizeTimingInfo() { {"unaccountedTime", unaccountedTime}, }; + // Also include commit and queue breakdowns so the reply contains everything needed for metrics. + valuePairs.emplace("commitWorkerTime", commitWorkerTotal); + valuePairs.emplace("commitSyncTime", commitSyncTotal); + valuePairs.emplace("queueWorkerTime", queueWorkerTotal); + valuePairs.emplace("queueSyncTime", queueSyncTotal); + valuePairs.emplace("queueBlockingTime", queueBlockingTotal); + valuePairs.emplace("queuePageLockTime", queuePageLockTotal); + valuePairs.emplace("blockingCommitPrePeekTime", blockingPrePeekTotal); + valuePairs.emplace("blockingCommitPeekTime", blockingPeekTotal); + valuePairs.emplace("blockingCommitProcessTime", blockingProcessTotal); + valuePairs.emplace("blockingCommitPostProcessTime", blockingPostProcessTotal); + valuePairs.emplace("blockingCommitCommitTime", blockingCommitWorkerTotal); + // We also want to know what leader did if we're on a follower. uint64_t upstreamPeekTime = 0; uint64_t upstreamProcessTime = 0; diff --git a/BedrockCore.cpp b/BedrockCore.cpp index 04a3ae6ca..ab35c7272 100644 --- a/BedrockCore.cpp +++ b/BedrockCore.cpp @@ -2,6 +2,7 @@ #include "BedrockCore.h" #include "BedrockPlugin.h" #include "BedrockServer.h" +#include "BedrockMetrics.h" BedrockCore::BedrockCore(SQLite& db, const BedrockServer& server) : SQLiteCore(db), @@ -45,6 +46,12 @@ uint64_t BedrockCore::_getRemainingTime(const unique_ptr& comman // Already expired. if (adjustedTimeout <= 0 || (isProcessing && processTimeout <= 0)) { SALERT("Command " << command->request.methodLine << " timed out."); + // metric: command timed out + GlobalRecordMetric(Metric{ + .name = string("bedrock.commandTimedOut.") + command->request.methodLine, + .type = MetricType::Counter, + .value = 1 + }); STHROW("555 Timeout"); } diff --git a/BedrockMetricPlugin.cpp b/BedrockMetricPlugin.cpp new file mode 100644 index 000000000..28cc82f59 --- /dev/null +++ b/BedrockMetricPlugin.cpp @@ -0,0 +1,116 @@ +#include "BedrockMetricPlugin.h" +#include "libstuff/libstuff.h" + +map> BedrockMetricPlugin::g_registeredMetricPluginList; + +BedrockMetricPlugin::BedrockMetricPlugin(const SData& args, size_t maxQueueSize) + : _args(args), _maxQueueSize(maxQueueSize) +{ +} + +BedrockMetricPlugin::~BedrockMetricPlugin() +{ + stop(); +} + +bool BedrockMetricPlugin::enqueue(Metric metric) +{ + if (_stopping.load()) { + return false; + } + + { + unique_lock lock(_mutex); + if (_queue.size() >= _maxQueueSize) { + _dropped.fetch_add(1, memory_order_relaxed); + return false; + } + _queue.emplace_back(std::move(metric)); + } + _cv.notify_one(); + return true; +} + +size_t BedrockMetricPlugin::queueSize() const +{ + unique_lock lock(_mutex); + return _queue.size(); +} + +uint64_t BedrockMetricPlugin::droppedCount() const +{ + return _dropped.load(memory_order_relaxed); +} + +void BedrockMetricPlugin::stop() +{ + bool expected = false; + if (_stopping.compare_exchange_strong(expected, true)) { + _cv.notify_all(); + } +} + +bool BedrockMetricPlugin::isStopping() const +{ + return _stopping.load(); +} + +bool BedrockMetricPlugin::tryDequeue(Metric& out) +{ + unique_lock lock(_mutex); + if (_queue.empty()) { + return false; + } + out = std::move(_queue.front()); + _queue.pop_front(); + return true; +} + +bool BedrockMetricPlugin::waitDequeue(Metric& out, uint64_t timeoutMs) +{ + unique_lock lock(_mutex); + if (_queue.empty()) { + if (_stopping.load()) { + return false; + } + _cv.wait_for(lock, chrono::milliseconds(timeoutMs), [&]{ return _stopping.load() || !_queue.empty(); }); + } + if (_queue.empty()) { + return false; + } + out = std::move(_queue.front()); + _queue.pop_front(); + return true; +} + +vector BedrockMetricPlugin::drainUpTo(size_t maxItems) +{ + vector batch; + batch.reserve(maxItems); + unique_lock lock(_mutex); + while (!_queue.empty() && batch.size() < maxItems) { + batch.emplace_back(std::move(_queue.front())); + _queue.pop_front(); + } + return batch; +} + +vector BedrockMetricPlugin::waitAndDrain(size_t maxItems, uint64_t maxWaitMs) +{ + unique_lock lock(_mutex); + if (_queue.empty()) { + if (_stopping.load()) { + return {}; + } + _cv.wait_for(lock, chrono::milliseconds(maxWaitMs), [&]{ return _stopping.load() || !_queue.empty(); }); + } + vector batch; + batch.reserve(maxItems); + while (!_queue.empty() && batch.size() < maxItems) { + batch.emplace_back(std::move(_queue.front())); + _queue.pop_front(); + } + return batch; +} + + diff --git a/BedrockMetricPlugin.h b/BedrockMetricPlugin.h new file mode 100644 index 000000000..f256f8c51 --- /dev/null +++ b/BedrockMetricPlugin.h @@ -0,0 +1,63 @@ +#pragma once +#include "libstuff/libstuff.h" +#include +#include + +enum class MetricType { + Counter, + Gauge, + Histogram, + Timing, + Set, + Distribution +}; + +struct Metric { + string name; + MetricType type{MetricType::Counter}; + uint64_t value{0}; + vector> tags; + uint64_t timestampUnixMs{0}; + double sampleRate{1.0}; +}; + +class BedrockMetricPlugin { + public: + static map> g_registeredMetricPluginList; + + explicit BedrockMetricPlugin(const SData& args, size_t maxQueueSize = 100000); + virtual ~BedrockMetricPlugin(); + + virtual const string& getName() const = 0; + + // Non-blocking enqueue. Returns false if dropped due to full queue or stopping. + bool enqueue(Metric metric); + + // Observability + size_t queueSize() const; + uint64_t droppedCount() const; + + // Lifecycle + void stop(); + bool isStopping() const; + + protected: + // For implementers: dequeue helpers to build network loop(s) + bool tryDequeue(Metric& out); + bool waitDequeue(Metric& out, uint64_t timeoutMs); + vector drainUpTo(size_t maxItems); + vector waitAndDrain(size_t maxItems, uint64_t maxWaitMs); + + const SData& _args; + + private: + mutable mutex _mutex; + condition_variable _cv; + deque _queue; + atomic _stopping{false}; + const size_t _maxQueueSize; + atomic _dropped{0}; +}; + + + diff --git a/BedrockMetrics.cpp b/BedrockMetrics.cpp new file mode 100644 index 000000000..d16160a34 --- /dev/null +++ b/BedrockMetrics.cpp @@ -0,0 +1,29 @@ +#include "BedrockMetrics.h" +#include + +namespace { +function g_recorder; +mutex g_mutex; +} + +void SetGlobalMetricRecorder(const function& recorder) +{ + if (!recorder) { + g_recorder = nullptr; + return; + } + lock_guard lock(g_mutex); + g_recorder = recorder; +} + +void GlobalRecordMetric(const Metric& metric) +{ + function local; + { + lock_guard lock(g_mutex); + local = g_recorder; + } + if (local) { + local(metric); + } +} diff --git a/BedrockMetrics.h b/BedrockMetrics.h new file mode 100644 index 000000000..4b435efd3 --- /dev/null +++ b/BedrockMetrics.h @@ -0,0 +1,10 @@ +// Minimal global metrics helper to allow emission from non-server code. +#pragma once +#include "BedrockMetricPlugin.h" +#include + +// Set the global metric recorder callback. +void SetGlobalMetricRecorder(const function& recorder); + +// Record a metric via the global recorder if configured. +void GlobalRecordMetric(const Metric& metric); diff --git a/BedrockServer.cpp b/BedrockServer.cpp index e866a2221..0fd2106aa 100644 --- a/BedrockServer.cpp +++ b/BedrockServer.cpp @@ -18,6 +18,7 @@ #include #include #include +#include "BedrockMetrics.h" setBedrockServer::_blacklistedParallelCommands; shared_timed_mutex BedrockServer::_blacklistedParallelCommandMutex; @@ -78,6 +79,15 @@ shared_ptr BedrockServer::getDBPool() { return _dbPool; } +void BedrockServer::recordMetric(const Metric& metric) +{ + for (auto& p : metricPlugins) { + Metric m = metric; + SDEBUG("Enqueuing metric " << m.name << " to " << p.second->getName()); + p.second->enqueue(std::move(m)); + } +} + void BedrockServer::sync() { // Parse out the number of worker threads we'll use. The DB needs to know this because it will expect a @@ -442,6 +452,9 @@ void BedrockServer::sync() << " after failed sync commit. Sync thread has " << _syncNodeQueuedCommands.size() << " queued commands."); _syncNodeQueuedCommands.push(move(command)); + // metrics: sync thread queue + recordMetric(Metric{ .name = "bedrock.syncThreadQueue.enqueue", .type = MetricType::Counter, .value = 1 }); + recordMetric(Metric{ .name = "bedrock.syncThreadQueue.depth", .type = MetricType::Gauge, .value = (uint64_t)_syncNodeQueuedCommands.size() }); } else { SERROR("Unexpected sync thread commit state."); } @@ -490,6 +503,18 @@ void BedrockServer::sync() SINFO("Sync thread dequeued command " << command->request.methodLine << ". Sync thread has " << _syncNodeQueuedCommands.size() << " queued commands."); + recordMetric(Metric{ + .name = "bedrock.syncThread.dequeue", + .type = MetricType::Counter, + .value = 1 + }); + + recordMetric(Metric{ + .name = "bedrock.syncThread.depth", + .type = MetricType::Gauge, + .value = _syncNodeQueuedCommands.size() + }); + if (command->timeout() < STimeNow()) { SINFO("Command '" << command->request.methodLine << "' timed out in sync thread queue, sending back to main queue."); _commandQueue.push(move(command)); @@ -694,8 +719,16 @@ void BedrockServer::worker(int threadId) command = commandQueue.get(100000); SAUTOPREFIX(command->request); - SINFO("Dequeued command " << command->request.methodLine << " (" << command->id << ") in worker, " - << commandQueue.size() << " commands in " << (threadId ? "" : "blocking") << " queue."); + SINFO("Dequeued command " << command->request.methodLine << " (" << command->id << ") in worker, " + << commandQueue.size() << " commands in " << (threadId ? "" : "blocking") << " queue."); + // metrics: queue depths and blocking queue dequeues + if (threadId == 0) { + // blocking commit thread + recordMetric(Metric{ .name = "bedrock.blockingQueue.enqueue", .type = MetricType::Counter, .value = 1 }); + recordMetric(Metric{ .name = "bedrock.blockingQueue.depth", .type = MetricType::Gauge, .value = (uint64_t)commandQueue.size() }); + } else { + recordMetric(Metric{ .name = "bedrock.queue.worker", .type = MetricType::Gauge, .value = (uint64_t)commandQueue.size() }); + } runCommand(move(command), threadId == 0, false); } catch (const BedrockCommandQueue::timeout_error& e) { @@ -965,6 +998,9 @@ void BedrockServer::runCommand(unique_ptr&& _command, bool isBlo SINFO("Sending non-parallel command " << command->request.methodLine << " to sync thread. Sync thread has " << _syncNodeQueuedCommands.size() << " queued commands."); _syncNodeQueuedCommands.push(move(command)); + // metrics: sync thread queue + recordMetric(Metric{ .name = "bedrock.syncThreadQueue.enqueue", .type = MetricType::Counter, .value = 1 }); + recordMetric(Metric{ .name = "bedrock.syncThreadQueue.depth", .type = MetricType::Gauge, .value = (uint64_t)_syncNodeQueuedCommands.size() }); } else if (_clusterMessengerCopy && _clusterMessengerCopy->runOnPeer(*command, true)) { SINFO("Escalated " << command->request.methodLine << " to leader and complete, responding."); _reply(command); @@ -1023,6 +1059,12 @@ void BedrockServer::runCommand(unique_ptr&& _command, bool isBlo command->complete = true; } else { SINFO("Conflict or state change committing " << command->request.methodLine); + // metric: conflict committing + recordMetric(Metric{ + .name = string("bedrock.multiwrite.conflict.") + command->request.methodLine, + .type = MetricType::Counter, + .value = 1 + }); if (_enableConflictPageLocks) { lastConflictLocation = db.getLastConflictLocation(); @@ -1266,6 +1308,32 @@ BedrockServer::BedrockServer(const SData& args_) } SINFO("Creating BedrockServer with plugins: " << SComposeList(pluginString)); + // Load metric plugins + list metricPluginNameList = SParseList(args["-metricPlugins"]); + if (!metricPluginNameList.empty()) { + SINFO("Loading metric plugins: " << args["-metricPlugins"]); + for (string& pluginName : metricPluginNameList) { + auto it = BedrockMetricPlugin::g_registeredMetricPluginList.find(SToUpper(pluginName)); + if (it == BedrockMetricPlugin::g_registeredMetricPluginList.end()) { + SERROR("Cannot find metric plugin '" << pluginName << "', aborting."); + } + unique_ptr metricPlugin(it->second(args)); + const string nameUpper = SToUpper(metricPlugin->getName()); + metricPlugins.emplace(nameUpper, std::move(metricPlugin)); + } + list metricNames; + for (auto& p : metricPlugins) { + metricNames.emplace_back(p.first); + } + SINFO("Metric plugins enabled: " << SComposeList(metricNames)); + // Expose a global recorder for non-server code paths. + SetGlobalMetricRecorder([this](const Metric& m){ this->recordMetric(m); }); + } else { + SINFO("No metric plugins configured."); + // Ensure the global recorder is cleared when metrics are disabled. + SetGlobalMetricRecorder(nullptr); + } + // If `versionOverride` is set, we throw away what we just did and use the overridden value. // We'll destruct, sort, and then reconstruct the version string passed in so we aren't relying // on the operator to know that they must be sorted. @@ -1336,6 +1404,8 @@ BedrockServer::BedrockServer(const SData& args_) } BedrockServer::~BedrockServer() { + + // Shut down the sync thread, (which will shut down worker threads in turn). SINFO("Closing sync thread '" << _syncThreadName << "'"); if (_syncThread.joinable()) { @@ -1352,6 +1422,11 @@ BedrockServer::~BedrockServer() { delete p.second; } + // Stop metric plugins to allow their threads to exit promptly. + for (auto& p : metricPlugins) { + p.second->stop(); + } + shutdownTimer.serverDestructor = chrono::steady_clock::now(); SINFO("Shutdown timing: " << "start->safeState=" << chrono::duration_cast(shutdownTimer.safeNodeState - shutdownTimer.shutdownStart) @@ -1508,6 +1583,63 @@ void BedrockServer::_reply(unique_ptr& command) { // Finalize timing info even for commands we won't respond to (this makes this data available in logs). command->finalizeTimingInfo(); + // Emit full timing metrics (ms) and counts. + auto metricNameBase = string("bedrock.requestTime.") + command->request.methodLine + "."; + auto emitTiming = [&](const string& suffix, uint64_t us){ + if (us) { + recordMetric(Metric{ + .name = metricNameBase + suffix, + .type = MetricType::Timing, + .value = us/1000 + }); + } + }; + emitTiming("prePeekTime", command->response.calcU64("prePeekTime")); + emitTiming("peekTime", command->response.calcU64("peekTime")); + emitTiming("processTime", command->response.calcU64("processTime")); + emitTiming("postProcessTime", command->response.calcU64("postProcessTime")); + emitTiming("totalTime", command->response.calcU64("totalTime")); + emitTiming("unaccountedTime", command->response.calcU64("unaccountedTime")); + emitTiming("commitWorkerTime", command->response.calcU64("commitWorkerTime")); + emitTiming("commitSyncTime", command->response.calcU64("commitSyncTime")); + emitTiming("queueWorkerTime", command->response.calcU64("queueWorkerTime")); + emitTiming("queueSyncTime", command->response.calcU64("queueSyncTime")); + emitTiming("queueBlockingTime", command->response.calcU64("queueBlockingTime")); + emitTiming("queuePageLockTime", command->response.calcU64("queuePageLockTime")); + emitTiming("blockingCommitPrePeekTime", command->response.calcU64("blockingCommitPrePeekTime")); + emitTiming("blockingCommitPeekTime", command->response.calcU64("blockingCommitPeekTime")); + emitTiming("blockingCommitProcessTime", command->response.calcU64("blockingCommitProcessTime")); + emitTiming("blockingCommitPostProcessTime", command->response.calcU64("blockingCommitPostProcessTime")); + emitTiming("blockingCommitCommitTime", command->response.calcU64("blockingCommitCommitTime")); + emitTiming("upstreamPeekTime", command->response.calcU64("upstreamPeekTime")); + emitTiming("upstreamProcessTime", command->response.calcU64("upstreamProcessTime")); + emitTiming("upstreamTotalTime", command->response.calcU64("upstreamTotalTime")); + emitTiming("upstreamUnaccountedTime", command->response.calcU64("upstreamUnaccountedTime")); + emitTiming("escalationTime", command->response.calcU64("escalationTime")); + + // Emit counts as counters. + if (command->prePeekCount) { + recordMetric(Metric{ .name = metricNameBase + "prePeekCount", .type = MetricType::Counter, .value = (uint64_t)command->prePeekCount }); + } + if (command->peekCount) { + recordMetric(Metric{ .name = metricNameBase + "peekCount", .type = MetricType::Counter, .value = (uint64_t)command->peekCount }); + } + if (command->processCount) { + recordMetric(Metric{ .name = metricNameBase + "processCount", .type = MetricType::Counter, .value = (uint64_t)command->processCount }); + } + if (command->postProcessCount) { + recordMetric(Metric{ .name = metricNameBase + "postProcessCount", .type = MetricType::Counter, .value = (uint64_t)command->postProcessCount }); + } + + // Slow command special-case + if (command->response.calcU64("totalTime")/1000 > 30000) { + recordMetric(Metric{ + .name = string("bedrock.slowCommand.") + command->request.methodLine + ".totalTime", + .type = MetricType::Timing, + .value = command->response.calcU64("totalTime")/1000 + }); + } + // Don't reply to commands with pseudo-clients (i.e., commands that we generated by other commands, or using // `Connection: forget`. if (command->initiatingClientID < 0) { diff --git a/BedrockServer.h b/BedrockServer.h index 53a4adab9..fc2767a1c 100644 --- a/BedrockServer.h +++ b/BedrockServer.h @@ -9,6 +9,7 @@ #include "BedrockConflictManager.h" #include "BedrockBlockingCommandQueue.h" #include "BedrockTimeoutCommandQueue.h" +#include "BedrockMetricPlugin.h" class SQLitePeer; class BedrockCore; @@ -242,6 +243,9 @@ class BedrockServer : public SQLiteServer { // Expose the DB pool to plugins. shared_ptr getDBPool(); + // Record a metric in all registered metric plugins. + void recordMetric(const Metric& metric); + private: // The name of the sync thread. static constexpr auto _syncThreadName = "sync"; @@ -334,6 +338,9 @@ class BedrockServer : public SQLiteServer { // synchronization. shared_ptr _clusterMessenger; + // Metrics plugins registry + map> metricPlugins; + // Functions for checking for and responding to status and control commands. bool _isStatusCommand(const unique_ptr& command); void _status(unique_ptr& command); diff --git a/main.cpp b/main.cpp index b791773d2..77a7eaa59 100644 --- a/main.cpp +++ b/main.cpp @@ -15,6 +15,8 @@ #include #include #include +#include +#include #include #include #include @@ -47,6 +49,54 @@ void RetrySystem(const string& command) { SERROR("Failed to run '" << command << "', aborting."); } +set loadMetricPlugins(SData& args) { + list plugins = SParseList(args["-metricPlugins"]); + + set postProcessedNames; + + // Register built-in metric plugins here if any (none by default) + // Example: BedrockMetricPlugin::g_registeredMetricPluginList.emplace(make_pair("STATSD", [](const SData& a){return new StatsDMetricPlugin(a);})); + + BedrockMetricPlugin::g_registeredMetricPluginList.emplace(make_pair("STATSD", [](const SData& a){return new StatsDMetricPlugin(a);})); + + + for (string pluginName : plugins) { + if (BedrockMetricPlugin::g_registeredMetricPluginList.find(SToUpper(pluginName)) != BedrockMetricPlugin::g_registeredMetricPluginList.end()) { + postProcessedNames.emplace(SToUpper(pluginName)); + continue; + } + + size_t slash = pluginName.rfind('/'); + size_t dot = pluginName.find('.', slash); + string name = pluginName.substr(slash + 1, dot - slash - 1); + string symbolName = "BEDROCK_METRIC_PLUGIN_REGISTER_" + SToUpper(name); + + if(postProcessedNames.find(SToUpper(name)) != postProcessedNames.end()) { + SWARN("Duplicate entry for metric plugin " << name << ", skipping."); + continue; + } + postProcessedNames.insert(SToUpper(name)); + + if (!SEndsWith(pluginName, ".so")) { + pluginName += ".so"; + } + + void* lib = dlopen(pluginName.c_str(), RTLD_NOW); + if(!lib) { + SWARN("Error loading bedrock metric plugin " << pluginName << ": " << dlerror()); + } else { + void* sym = dlsym(lib, symbolName.c_str()); + if (!sym) { + SWARN("Couldn't find symbol " << symbolName); + } else { + BedrockMetricPlugin::g_registeredMetricPluginList.emplace(make_pair(SToUpper(name), (BedrockMetricPlugin*(*)(const SData&))sym)); + } + } + } + + return postProcessedNames; +} + ///////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////// @@ -231,6 +281,7 @@ int main(int argc, char* argv[]) { cout << "-peerList See below" << endl; cout << "-priority See '-peerList Details' below (defaults to 100)" << endl; cout << "-plugins Enable these plugins (defaults to 'db,jobs,cache,mysql')" << endl; + cout << "-metricPlugins Enable metrics plugins (e.g., 'statsd,otel' or .so paths)" << endl; cout << "-cacheSize number of KB to allocate for a page cache (defaults to 1GB)" << endl; cout << "-workerThreads <#> Number of worker threads to start (min 1, defaults to # of cores)" << endl; cout << "-queryLog Set the query log filename (default 'queryLog.csv', SIGUSR2/SIGQUIT to " @@ -298,6 +349,7 @@ int main(int argc, char* argv[]) { SETDEFAULT("-nodeName", SGetHostName()); SETDEFAULT("-cacheSize", SToStr(0)); SETDEFAULT("-plugins", "db,jobs,cache,mysql"); + SETDEFAULT("-metricPlugins", ""); SETDEFAULT("-priority", "100"); SETDEFAULT("-maxJournalSize", "1000000"); SETDEFAULT("-queryLog", "queryLog.csv"); @@ -325,6 +377,7 @@ int main(int argc, char* argv[]) { } args["-plugins"] = SComposeList(loadPlugins(args)); + args["-metricPlugins"] = SComposeList(loadMetricPlugins(args)); // Set our soft limit to the same as our hard limit to allow for more file handles. struct rlimit limits; diff --git a/plugins/StatsDMetricPlugin.cpp b/plugins/StatsDMetricPlugin.cpp new file mode 100644 index 000000000..cbe67dc6b --- /dev/null +++ b/plugins/StatsDMetricPlugin.cpp @@ -0,0 +1,172 @@ +#include "plugins/StatsDMetricPlugin.h" +#include "libstuff/SFastBuffer.h" +#include "libstuff/SData.h" +#include + +#undef SLOGPREFIX +#define SLOGPREFIX "{" << getName() << "} " + +const string StatsDMetricPlugin::name("StatsD"); + +extern "C" BedrockMetricPlugin* BEDROCK_METRIC_PLUGIN_REGISTER_STATSD(const SData& args) +{ + return new StatsDMetricPlugin(args); +} + +StatsDMetricPlugin::StatsDMetricPlugin(const SData& args) + : BedrockMetricPlugin(args) +{ + SDEBUG("Loaded StatsD metric plugin"); + _destHostPort = args["-statsdServer"]; + if (args.isSet("-statsdMaxDatagramBytes")) { + _maxDatagramBytes = max(1400, args.calcU64("-statsdMaxDatagramBytes")); + } + // Start worker + _worker = thread(&StatsDMetricPlugin::_networkLoop, this); +} + +StatsDMetricPlugin::~StatsDMetricPlugin() +{ + stop(); + if (_worker.joinable()) { + _worker.join(); + } +} + +const string& StatsDMetricPlugin::getName() const +{ + return name; +} + +string StatsDMetricPlugin::_sanitizeName(const string& name) +{ + string out; + out.reserve(name.size()); + for (char c : name) { + if ((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9') || c == '_' || c == '.') { + out.push_back(c); + } else { + out.push_back('_'); + } + } + return out; +} + +string StatsDMetricPlugin::_sanitizeTagKey(const string& key) +{ + return _sanitizeName(key); +} + +string StatsDMetricPlugin::_sanitizeTagValue(const string& value) +{ + string out = value; + for (char& c : out) { + if (c == '|') c = '_'; + else if (c == ' ') c = '_'; + else if (c == ',') c = '_'; + else if (c == '\n') c = '_'; + else if (c == ':') c = '_'; + } + return out; +} + +string StatsDMetricPlugin::_formatLine(const Metric& m) const +{ + const string hostname = SGetHostName(); + const string name = _sanitizeName(hostname + "." + m.name); + string line = name + ":" + SToStr(m.value) + "|"; + switch (m.type) { + case MetricType::Counter: line += "c"; break; + case MetricType::Gauge: line += "g"; break; + case MetricType::Timing: line += "ms"; break; + case MetricType::Set: line += "s"; break; + case MetricType::Histogram: line += "g"; break; // map to gauge + case MetricType::Distribution: line += "g"; break; // map to gauge + } + if (m.sampleRate > 0.0 && m.sampleRate < 1.0) { + line += "|@" + SToStr(m.sampleRate); + } + if (!m.tags.empty()) { + line += "|#"; + bool first = true; + for (const auto& kv : m.tags) { + if (!first) { + line += ","; + } + first = false; + line += _sanitizeTagKey(kv.first); + if (!kv.second.empty()) { + line += ":" + _sanitizeTagValue(kv.second); + } + } + } + return line; +} + +void StatsDMetricPlugin::_sendBatch(vector& lines) +{ + if (lines.empty()) { + return; + } + + // Build datagrams up to _maxDatagramBytes and send each via a fresh socket + SFastBuffer buffer; + + // Flushing is fire and forget, we don't check for a return from statsd + auto flush = [&](SFastBuffer& payload){ + if (payload.empty()) { + return; + } + int s = S_socket(_destHostPort, false, false, false); + if (s < 0) { + SWARN("Failed to create socket to " << _destHostPort << ", dropping batch"); + return; + } + bool result = S_sendconsume(s, payload); + if (!result) { + SWARN("Failed to send to " << _destHostPort << ", dropping batch"); + ::close(s); + return; + } + ::close(s); + return; + }; + + for (const string& line : lines) { + const size_t needed = (buffer.empty() ? 0 : 1) + line.size(); + if (buffer.size() + needed > _maxDatagramBytes) { + flush(buffer); + buffer.clear(); + } + + if (!buffer.empty()) { + buffer.append("\n", 1); + } + + if (line.size() > _maxDatagramBytes) { + // If a single line is larger than the datagram size, drop it. + SWARN("Dropping line larger than datagram size: " << line.size()); + continue; + } + buffer += line; + } + flush(buffer); + SDEBUG("Flushed " << lines.size() << " lines to " << _destHostPort); +} + +void StatsDMetricPlugin::_networkLoop() +{ + while (!isStopping()) { + SDEBUG("Waiting for batch"); + vector batch = waitAndDrain(_maxBatch, _maxWaitMs); + if (batch.empty()) { + continue; + } + vector lines; + lines.reserve(batch.size()); + for (const auto& m : batch) { + lines.emplace_back(_formatLine(m)); + } + _sendBatch(lines); + } +} diff --git a/plugins/StatsDMetricPlugin.h b/plugins/StatsDMetricPlugin.h new file mode 100644 index 000000000..89b6c8947 --- /dev/null +++ b/plugins/StatsDMetricPlugin.h @@ -0,0 +1,46 @@ +#pragma once +#include "libstuff/libstuff.h" +#include "BedrockMetricPlugin.h" +#include + +class StatsDMetricPlugin : public BedrockMetricPlugin { + public: + explicit StatsDMetricPlugin(const SData& args); + ~StatsDMetricPlugin() override; + + const string& getName() const override; + + private: + // Worker thread for draining and sending metrics. + void _networkLoop(); + + // Format one Metric into a StatsD line. + string _formatLine(const Metric& m) const; + + // Join lines into datagrams, respecting max size, and send them. + void _sendBatch(vector& lines); + + // Parse and resolve the destination host:port. + bool _resolveDestination(); + + // Sanitization helpers. + static string _sanitizeName(const string& name); + static string _sanitizeTagKey(const string& key); + static string _sanitizeTagValue(const string& value); + + // Destination and config + string _destHostPort; // from -statsdServer + + size_t _maxDatagramBytes{1400}; // from -statsdMaxDatagramBytes (default 1400) + size_t _maxBatch{512}; // per drain + uint64_t _maxWaitMs{50}; // wait for batch accumulation + + thread _worker; + + static const string name; +}; + +// Dynamic loader entry point for .so usage. +extern "C" BedrockMetricPlugin* BEDROCK_METRIC_PLUGIN_REGISTER_STATSD(const SData& args); + + diff --git a/sqlitecluster/SQLite.cpp b/sqlitecluster/SQLite.cpp index 3c29ed468..f52cd958d 100644 --- a/sqlitecluster/SQLite.cpp +++ b/sqlitecluster/SQLite.cpp @@ -6,6 +6,7 @@ #include #include #include +#include "BedrockMetrics.h" #define DBINFO(_MSG_) SINFO("{" << _filename << "} " << _MSG_) @@ -441,6 +442,9 @@ bool SQLite::beginTransaction(SQLite::TRANSACTION_TYPE type) { _sharedData.openTransactionCount++; SINFO("Beginning transaction - open transaction count: " << (_sharedData.openTransactionCount)); + // metrics: open DB transactions + GlobalRecordMetric(Metric{ .name = "bedrock.openDatabaseTransactions.enqueue", .type = MetricType::Counter, .value = 1 }); + GlobalRecordMetric(Metric{ .name = "bedrock.openDatabaseTransactions.depth", .type = MetricType::Gauge, .value = (uint64_t)_sharedData.openTransactionCount }); uint64_t before = STimeNow(); _insideTransaction = !SQuery(_db, "starting db transaction", "BEGIN CONCURRENT"); @@ -739,6 +743,8 @@ bool SQLite::prepare(uint64_t* transactionID, string* transactionhash) { auto end = STimeNow(); if (end - start > 5'000) { SINFO("Waited " << (end - start) << "us for commit lock."); + // metrics: commit lock wait + GlobalRecordMetric(Metric{ .name = "bedrock.waitingCommitLock", .type = MetricType::Timing, .value = (uint64_t)((end - start)/1000) }); } _sharedData._commitLockTimer.start("SHARED"); _mutexLocked = true; @@ -879,6 +885,10 @@ int SQLite::commit(const string& description, function* preCheckpointCal auto end = STimeNow(); SINFO("Checkpoint with type=" << _checkpointMode << " complete with " << framesCheckpointed << " frames checkpointed of " << _sharedData.outstandingFramesToCheckpoint << " frames outstanding in " << (end - start) << "us."); + GlobalRecordMetric(Metric{ .name = "bedrock.checkpointedFrames", .type = MetricType::Gauge, .value = (uint64_t)framesCheckpointed }); + GlobalRecordMetric(Metric{ .name = "bedrock.totalFrames", .type = MetricType::Gauge, .value = (uint64_t)_sharedData.outstandingFramesToCheckpoint }); + GlobalRecordMetric(Metric{ .name = "bedrock.checkpointingTime", .type = MetricType::Timing, .value = (uint64_t)((end - start)/1000) }); + // It might not actually be 0, but we'll just let sqlite tell us what it is next time _walHookCallback runs. _sharedData.outstandingFramesToCheckpoint = 0; } @@ -887,6 +897,13 @@ int SQLite::commit(const string& description, function* preCheckpointCal SINFO(description << " COMMIT " << SToStr(_sharedData.commitCount) << " complete in " << time << ". Wrote " << (endPages - startPages) << " pages. WAL file size is " << sz << " bytes. " << _readQueryCount << " read queries attempted, " << _writeQueryCount << " write queries attempted, " << _cacheHits << " served from cache. Used journal " << _journalName); + + + if (SContains(description, "LEADING")) { + GlobalRecordMetric(Metric{ .name = "bedrock.multiwrite.successful.anyCommand", .type = MetricType::Counter, .value = 1 }); + } + GlobalRecordMetric(Metric{ .name = string("bedrock.commitedQueries.") + _journalName + ".read.enqueue", .type = MetricType::Counter, .value = (uint64_t)_readQueryCount }); + GlobalRecordMetric(Metric{ .name = string("bedrock.commitedQueries.") + _journalName + ".write.enqueue", .type = MetricType::Counter, .value = (uint64_t)_writeQueryCount }); _readQueryCount = 0; _writeQueryCount = 0; _cacheHits = 0; @@ -957,6 +974,10 @@ void SQLite::rollback() { } _queryCache.clear(); SINFO("[performance] Transaction rollback with " << _readQueryCount << " read queries attempted, " << _writeQueryCount << " write queries attempted, " << _cacheHits << " served from cache."); + GlobalRecordMetric(Metric{ .name = "bedrock.uncommitedQueries.read.enqueue", .type = MetricType::Counter, .value = (uint64_t)_readQueryCount }); + GlobalRecordMetric(Metric{ .name = "bedrock.uncommitedQueries.write.enqueue", .type = MetricType::Counter, .value = (uint64_t)_writeQueryCount }); + GlobalRecordMetric(Metric{ .name = "bedrock.uncommitedQueries.cacheHits.enqueue", .type = MetricType::Counter, .value = (uint64_t)_cacheHits }); + _readQueryCount = 0; _writeQueryCount = 0; _cacheHits = 0; diff --git a/sqlitecluster/SQLiteNode.cpp b/sqlitecluster/SQLiteNode.cpp index 837d8953b..19f5e929c 100644 --- a/sqlitecluster/SQLiteNode.cpp +++ b/sqlitecluster/SQLiteNode.cpp @@ -10,6 +10,7 @@ #include #include #include +#include "BedrockMetrics.h" // Convenience class for maintaining connections with a mesh of peers #define PDEBUG(_MSG_) SDEBUG("->{" << peer->name << "} " << _MSG_) @@ -2329,6 +2330,9 @@ void SQLiteNode::_handlePrepareTransaction(SQLite& db, SQLitePeer* peer, const S float applyTimeMS = (float)applyTimeUS / 1000.0; PINFO("[performance] Replicated transaction " << message.calcU64("NewCount") << ", sent by leader at " << leaderSentTimestamp << ", transit/dequeue time: " << transitTimeMS << "ms, applied in: " << applyTimeMS << "ms, should COMMIT next."); + + GlobalRecordMetric(Metric{ .name = "bedrock.replication.dequeueTime", .type = MetricType::Timing, .value = transitTimeUS/1000 }); + GlobalRecordMetric(Metric{ .name = "bedrock.replication.queryTime", .type = MetricType::Timing, .value = applyTimeUS/1000 }); } int SQLiteNode::_handleCommitTransaction(SQLite& db, SQLitePeer* peer, const uint64_t commandCommitCount, const string& commandCommitHash) { diff --git a/test/lib/BedrockTester.cpp b/test/lib/BedrockTester.cpp index cf767de5c..314413206 100644 --- a/test/lib/BedrockTester.cpp +++ b/test/lib/BedrockTester.cpp @@ -84,6 +84,8 @@ BedrockTester::BedrockTester(const map& args, {"-escalateOverHTTP", "true"}, {"-cacheSize", "1000"}, {"-parallelReplication", "true"}, + {"-metricPlugins", "STATSD"}, + {"-statsdServer", "10.0.130.11:9125"}, // Currently breaks only in Travis and needs debugging, which has been removed, maybe? //{"-logDirectlyToSyslogSocket", ""}, {"-testName", currentTestName},