-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathBlockingQueue.cpp
More file actions
119 lines (107 loc) · 2.55 KB
/
BlockingQueue.cpp
File metadata and controls
119 lines (107 loc) · 2.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// BlockingQueue.cpp : Defines the entry point for the console application.
//
#include "BlockingQueue.h"
#include <string>
template<typename T>
class BlockingQueue<T>::sync {
public:
mutable std::mutex mutex_;
std::condition_variable condition_;
};
template<typename T>
BlockingQueue<T>::BlockingQueue()
: sync_(new sync()) {
}
template<typename T>
void BlockingQueue<T>::push(const T& t)
{
std::unique_lock<std::mutex> lock(sync_->mutex_);
queue_.push(t);
lock.unlock();
sync_->condition_.notify_one();
}
template<typename T>
bool BlockingQueue<T>::try_pop(T* t)
{
std::unique_lock<std::mutex> lock(sync_->mutex_);
if (queue_.empty()) {
return false;
}
*t = queue_.front();
queue_.pop();
return true;
}
template<typename T>
T BlockingQueue<T>::pop(const string& log_on_wait)
{
std::unique_lock<std::mutex> lock(sync_->mutex_);
while (queue_.empty())
{
if (!log_on_wait.empty()) {
fprintf(stdout, "%s\n", log_on_wait.c_str());
}
sync_->condition_.wait(lock);
}
T t = queue_.front();
queue_.pop();
return t;
}
template<typename T>
bool BlockingQueue<T>::try_peek(T* t)
{
std::unique_lock<std::mutex> lock(sync_->mutex_);
if (queue_.empty()) {
return false;
}
*t = queue_.front();
return true;
}
template<typename T>
T BlockingQueue<T>::peek()
{
std::unique_lock<std::mutex> lock(sync_->mutex_);
while (queue_.empty()) {
sync_->condition_.wait(lock);
}
return queue_.front();
}
template<typename T>
size_t BlockingQueue<T>::size() const {
std::unique_lock<std::mutex> lock(sync_->mutex_);
return queue_.size();
}
#ifdef BLOCKING_QUEUE_TEST
template class BlockingQueue<int>;
BlockingQueue<int> g_blok_queue;
void provider(int id)
{
for (int i = 0; i < 10; i++) {
fprintf(stderr, "producer %d.\n", id);
g_blok_queue.push(id);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
void consumer(int id)
{
while (1)
{
//std::cout << "con:" << id << std::endl;
string s = string("Consumer ") + std::to_string(id) + string(" waiting...");
fprintf(stderr, "con(%d): %d.\n", id, g_blok_queue.pop(s));
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}
int main()
{
std::thread p1(provider, 1);
std::thread p2(provider, 2);
std::thread p3(provider, 3);
std::thread s1(consumer, 1);
std::thread s2(consumer, 2);
p1.join();
s1.join();
p2.join();
s2.join();
return 0;
}
#endif