-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathchronos.cc
More file actions
115 lines (88 loc) · 3.13 KB
/
chronos.cc
File metadata and controls
115 lines (88 loc) · 3.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#include "chronos.hpp"
namespace cubbit
{
std::weak_ptr<chronos> chronos::_instance{};
chronos::chronos()
{
this->_start_jobs_thread();
}
chronos::chronos(std::map<category_type, int> configuration)
: _configuration(configuration)
{
for(auto& [category, limit] : this->_configuration)
this->_current_state[category] = 0;
this->_start_jobs_thread();
}
chronos::~chronos()
{
this->shutdown();
this->_pending_tasks.wait();
{
cubbit::unique_lock<cubbit::mutex> lock(this->_mutex);
this->_condition.wait(lock, [this]
{ return !this->_active; });
}
if(this->_jobs_thread.joinable())
this->_jobs_thread.join();
}
void chronos::shutdown()
{
cubbit::unique_lock<cubbit::mutex> lock(this->_mutex);
this->_shutdown = true;
this->_condition.notify_all();
}
void chronos::wait()
{
this->_pending_tasks.wait();
}
bool chronos::can_schedule(category_type category)
{
if(category == generic)
{
this->_pending_tasks.add();
return true;
}
if(this->_configuration.find(category) == this->_configuration.end())
return false;
{
cubbit::unique_lock<cubbit::mutex> lock(this->_mutex);
this->_condition.wait(lock, [&]
{ return this->_shutdown || this->_current_state[category] < this->_configuration[category]; });
if(this->_shutdown)
throw std::system_error(make_error_code(std::errc::operation_canceled), "Cannot schedule task");
this->_current_state[category]++;
this->_pending_tasks.add();
}
return true;
}
void chronos::_start_jobs_thread()
{
this->_jobs_thread = std::thread(
[this]
{
{
auto scheduler = std::make_unique<marl::Scheduler>(marl::Scheduler::Config::allCores());
scheduler->bind();
this->_active = true;
while(true)
{
cubbit::unique_lock<cubbit::mutex> lock(this->_mutex);
this->_condition.wait(lock, [&]
{ return this->_job_queue.size() > 0 || (this->_job_queue.empty() && this->_shutdown); });
if(this->_shutdown && this->_job_queue.empty())
break;
std::lock_guard<cubbit::mutex> lock_guard(this->_job_mutex);
auto& job = this->_job_queue.front();
marl::schedule(std::move(job));
this->_job_queue.pop();
}
scheduler->unbind();
}
this->_active = false;
this->_condition.notify_all();
});
#ifdef _GNU_SOURCE
pthread_setname_np(this->_jobs_thread.native_handle(), "chronos_queue");
#endif
}
} // namespace cubbit