-
Notifications
You must be signed in to change notification settings - Fork 108
Expand file tree
/
Copy pathBedrockBlockingCommandQueue.cpp
More file actions
150 lines (127 loc) · 4.94 KB
/
Copy pathBedrockBlockingCommandQueue.cpp
File metadata and controls
150 lines (127 loc) · 4.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
#include <BedrockBlockingCommandQueue.h>
void BedrockBlockingCommandQueue::startTiming(unique_ptr<BedrockCommand>& command)
{
command->startTiming(BedrockCommand::QUEUE_BLOCKING);
}
void BedrockBlockingCommandQueue::stopTiming(unique_ptr<BedrockCommand>& command)
{
command->stopTiming(BedrockCommand::QUEUE_BLOCKING);
}
BedrockBlockingCommandQueue::BedrockBlockingCommandQueue() :
BedrockCommandQueue(function<void(unique_ptr<BedrockCommand>&)>(startTiming),
function<void(unique_ptr<BedrockCommand>&)>(stopTiming))
{
}
void BedrockBlockingCommandQueue::push(unique_ptr<BedrockCommand>&& command)
{
const string identifier = command->blockingQueueRateLimitIdentifier;
const size_t maxPerIdentifier = _maxPerIdentifier.load();
const bool shouldCheck = maxPerIdentifier > 0 && !identifier.empty();
if (shouldCheck) {
lock_guard<decltype(_rateLimitMutex)> lock(_rateLimitMutex);
// Clear counts if the blocking queue has been empty for 30 seconds.
uint64_t emptyTime = _emptyTime.load();
if (emptyTime > 0 && STimeNow() - emptyTime >= 30'000'000) {
_identifierCounts.clear();
}
size_t& count = _identifierCounts[identifier];
count++;
if (count > maxPerIdentifier) {
SINFO("Blocking queue rate limit: rejecting '" << command->request.methodLine
<< "' for identifier '" << identifier << "' (count=" << count
<< ", threshold=" << maxPerIdentifier << ")");
// TODO: enable enforcement after monitoring confirms thresholds are correct in production.
// count--;
// STHROW("503 Blocking queue rate limited");
}
}
// A command is entering the queue, so it is no longer empty. Clear the empty timestamp so
// the 30-second auto-reset window doesn't fire until the queue drains again.
uint64_t previousEmptyTime = _emptyTime.exchange(0);
try {
// Base class acquires its own (non-recursive) `_queueMutex`.
BedrockCommandQueue::push(move(command));
} catch (...) {
// The command never entered the queue. Roll back the count increment and restore the
// empty timestamp so the 30-second auto-reset timer isn't lost.
_emptyTime.store(previousEmptyTime);
if (shouldCheck) {
lock_guard<decltype(_rateLimitMutex)> lock(_rateLimitMutex);
_decrementIdentifierCount(identifier);
}
throw;
}
}
/**
* Dequeues command and inspects _queue to update rate limit counts and _emptyTime
* Called by `BedrockCommandQueue::get()` with the base `_queueMutex` held. Calling any base method that reacquires `_queueMutex` would deadlock.
*/
unique_ptr<BedrockCommand> BedrockBlockingCommandQueue::_dequeue()
{
auto command = BedrockCommandQueue::_dequeue();
// Decrement rate limit count when a command leaves the queue.
if (!command->blockingQueueRateLimitIdentifier.empty() && _maxPerIdentifier.load() > 0) {
lock_guard<decltype(_rateLimitMutex)> lock(_rateLimitMutex);
_decrementIdentifierCount(command->blockingQueueRateLimitIdentifier);
}
if (_queue.empty() && _emptyTime.load() == 0) {
_emptyTime.store(STimeNow());
}
return command;
}
void BedrockBlockingCommandQueue::clear()
{
clearRateLimits();
BedrockCommandQueue::clear();
}
size_t BedrockBlockingCommandQueue::clearRateLimits()
{
lock_guard<decltype(_rateLimitMutex)> lock(_rateLimitMutex);
size_t size = _identifierCounts.size();
_identifierCounts.clear();
_emptyTime.store(0);
return size;
}
STable BedrockBlockingCommandQueue::getState()
{
map<string, size_t> countsCopy;
{
lock_guard<decltype(_rateLimitMutex)> lock(_rateLimitMutex);
uint64_t emptyTime = _emptyTime.load();
if (emptyTime > 0 && STimeNow() - emptyTime >= 30'000'000) {
_identifierCounts.clear();
}
countsCopy = _identifierCounts;
}
size_t maxPerIdentifier = _maxPerIdentifier.load();
size_t blockedCount = 0;
STable countsTable;
for (const auto& p : countsCopy) {
countsTable[p.first] = to_string(p.second);
if (p.second > maxPerIdentifier) {
blockedCount++;
}
}
STable content;
content["blockingRateLimitThreshold"] = to_string(maxPerIdentifier);
content["blockedIdentifiers"] = to_string(blockedCount);
if (!countsTable.empty()) {
content["blockingQueueIdentifierCounts"] = SComposeJSONObject(countsTable);
}
return content;
}
size_t BedrockBlockingCommandQueue::setMaxRequestsPerIdentifier(size_t value)
{
return _maxPerIdentifier.exchange(value);
}
void BedrockBlockingCommandQueue::_decrementIdentifierCount(const string& identifier)
{
auto it = _identifierCounts.find(identifier);
if (it != _identifierCounts.end()) {
if (it->second <= 1) {
_identifierCounts.erase(it);
} else {
it->second--;
}
}
}