Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions BedrockCommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,19 @@ void BedrockCommand::finalizeTimingInfo() {
{"unaccountedTime", unaccountedTime},
};

// Also include commit and queue breakdowns so the reply contains everything needed for metrics.
valuePairs.emplace("commitWorkerTime", commitWorkerTotal);
valuePairs.emplace("commitSyncTime", commitSyncTotal);
valuePairs.emplace("queueWorkerTime", queueWorkerTotal);
valuePairs.emplace("queueSyncTime", queueSyncTotal);
valuePairs.emplace("queueBlockingTime", queueBlockingTotal);
valuePairs.emplace("queuePageLockTime", queuePageLockTotal);
valuePairs.emplace("blockingCommitPrePeekTime", blockingPrePeekTotal);
valuePairs.emplace("blockingCommitPeekTime", blockingPeekTotal);
valuePairs.emplace("blockingCommitProcessTime", blockingProcessTotal);
valuePairs.emplace("blockingCommitPostProcessTime", blockingPostProcessTotal);
valuePairs.emplace("blockingCommitCommitTime", blockingCommitWorkerTotal);

// We also want to know what leader did if we're on a follower.
uint64_t upstreamPeekTime = 0;
uint64_t upstreamProcessTime = 0;
Expand Down
7 changes: 7 additions & 0 deletions BedrockCore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "BedrockCore.h"
#include "BedrockPlugin.h"
#include "BedrockServer.h"
#include "BedrockMetrics.h"

BedrockCore::BedrockCore(SQLite& db, const BedrockServer& server) :
SQLiteCore(db),
Expand Down Expand Up @@ -45,6 +46,12 @@ uint64_t BedrockCore::_getRemainingTime(const unique_ptr<BedrockCommand>& comman
// Already expired.
if (adjustedTimeout <= 0 || (isProcessing && processTimeout <= 0)) {
SALERT("Command " << command->request.methodLine << " timed out.");
// metric: command timed out
GlobalRecordMetric(Metric{
.name = string("bedrock.commandTimedOut.") + command->request.methodLine,
.type = MetricType::Counter,
.value = 1
});
STHROW("555 Timeout");
}

Expand Down
116 changes: 116 additions & 0 deletions BedrockMetricPlugin.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
#include "BedrockMetricPlugin.h"
#include "libstuff/libstuff.h"

map<string, function<BedrockMetricPlugin*(const SData& args)>> BedrockMetricPlugin::g_registeredMetricPluginList;

BedrockMetricPlugin::BedrockMetricPlugin(const SData& args, size_t maxQueueSize)
: _args(args), _maxQueueSize(maxQueueSize)
{
}

BedrockMetricPlugin::~BedrockMetricPlugin()
{
stop();
}

bool BedrockMetricPlugin::enqueue(Metric metric)
{
if (_stopping.load()) {
return false;
}

{
unique_lock<mutex> lock(_mutex);
if (_queue.size() >= _maxQueueSize) {
_dropped.fetch_add(1, memory_order_relaxed);
return false;
}
_queue.emplace_back(std::move(metric));
}
_cv.notify_one();
return true;
}

size_t BedrockMetricPlugin::queueSize() const
{
unique_lock<mutex> lock(_mutex);
return _queue.size();
}

uint64_t BedrockMetricPlugin::droppedCount() const
{
return _dropped.load(memory_order_relaxed);
}

void BedrockMetricPlugin::stop()
{
bool expected = false;
if (_stopping.compare_exchange_strong(expected, true)) {
_cv.notify_all();
}
}

bool BedrockMetricPlugin::isStopping() const
{
return _stopping.load();
}

bool BedrockMetricPlugin::tryDequeue(Metric& out)
{
unique_lock<mutex> lock(_mutex);
if (_queue.empty()) {
return false;
}
out = std::move(_queue.front());
_queue.pop_front();
return true;
}

bool BedrockMetricPlugin::waitDequeue(Metric& out, uint64_t timeoutMs)
{
unique_lock<mutex> lock(_mutex);
if (_queue.empty()) {
if (_stopping.load()) {
return false;
}
_cv.wait_for(lock, chrono::milliseconds(timeoutMs), [&]{ return _stopping.load() || !_queue.empty(); });
}
if (_queue.empty()) {
return false;
}
out = std::move(_queue.front());
_queue.pop_front();
return true;
}

vector<Metric> BedrockMetricPlugin::drainUpTo(size_t maxItems)
{
vector<Metric> batch;
batch.reserve(maxItems);
unique_lock<mutex> lock(_mutex);
while (!_queue.empty() && batch.size() < maxItems) {
batch.emplace_back(std::move(_queue.front()));
_queue.pop_front();
}
return batch;
}

vector<Metric> BedrockMetricPlugin::waitAndDrain(size_t maxItems, uint64_t maxWaitMs)
{
unique_lock<mutex> lock(_mutex);
if (_queue.empty()) {
if (_stopping.load()) {
return {};
}
_cv.wait_for(lock, chrono::milliseconds(maxWaitMs), [&]{ return _stopping.load() || !_queue.empty(); });
}
vector<Metric> batch;
batch.reserve(maxItems);
while (!_queue.empty() && batch.size() < maxItems) {
batch.emplace_back(std::move(_queue.front()));
_queue.pop_front();
}
return batch;
}


63 changes: 63 additions & 0 deletions BedrockMetricPlugin.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
#pragma once
#include "libstuff/libstuff.h"
#include <deque>
#include <condition_variable>

enum class MetricType {
Counter,
Gauge,
Histogram,
Timing,
Set,
Distribution
};

struct Metric {
string name;
MetricType type{MetricType::Counter};
uint64_t value{0};
vector<pair<string, string>> tags;
uint64_t timestampUnixMs{0};
double sampleRate{1.0};
};

class BedrockMetricPlugin {
public:
static map<string, function<BedrockMetricPlugin*(const SData& args)>> g_registeredMetricPluginList;

explicit BedrockMetricPlugin(const SData& args, size_t maxQueueSize = 100000);
virtual ~BedrockMetricPlugin();

virtual const string& getName() const = 0;

// Non-blocking enqueue. Returns false if dropped due to full queue or stopping.
bool enqueue(Metric metric);

// Observability
size_t queueSize() const;
uint64_t droppedCount() const;

// Lifecycle
void stop();
bool isStopping() const;

protected:
// For implementers: dequeue helpers to build network loop(s)
bool tryDequeue(Metric& out);
bool waitDequeue(Metric& out, uint64_t timeoutMs);
vector<Metric> drainUpTo(size_t maxItems);
vector<Metric> waitAndDrain(size_t maxItems, uint64_t maxWaitMs);

const SData& _args;

private:
mutable mutex _mutex;
condition_variable _cv;
deque<Metric> _queue;
atomic<bool> _stopping{false};
const size_t _maxQueueSize;
atomic<uint64_t> _dropped{0};
};



29 changes: 29 additions & 0 deletions BedrockMetrics.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include "BedrockMetrics.h"
#include <mutex>

namespace {
function<void(const Metric&)> g_recorder;
mutex g_mutex;
}

void SetGlobalMetricRecorder(const function<void(const Metric&)>& recorder)
{
if (!recorder) {
g_recorder = nullptr;
return;
}
lock_guard<mutex> lock(g_mutex);
g_recorder = recorder;
}

void GlobalRecordMetric(const Metric& metric)
{
function<void(const Metric&)> local;
{
lock_guard<mutex> lock(g_mutex);
local = g_recorder;
}
if (local) {
local(metric);
}
}
10 changes: 10 additions & 0 deletions BedrockMetrics.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Minimal global metrics helper to allow emission from non-server code.
#pragma once
#include "BedrockMetricPlugin.h"
#include <functional>

// Set the global metric recorder callback.
void SetGlobalMetricRecorder(const function<void(const Metric&)>& recorder);

// Record a metric via the global recorder if configured.
void GlobalRecordMetric(const Metric& metric);
Loading
Loading