-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlockfree_thread_pool.h
137 lines (123 loc) · 4.51 KB
/
lockfree_thread_pool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#pragma once
#include "base/thread_pool.h"
#include "concurrentqueue/blockingconcurrentqueue.h"
#include "async_logger.h"
#include <unordered_map>
namespace learnlog {
namespace base {
// 使用无锁阻塞队列 BlockingConcurrentQueue 的线程池,处理 async_msg,
// 循环尝试 q.try_enqueue() 入队,阻塞等待 q.wait_dequeue() 出队,
// 队列的总占用空间不变,初始化后不再额外申请内存
class lockfree_thread_pool final: public thread_pool {
public:
using c_token_uni_ptr = std::unique_ptr<moodycamel::ConsumerToken>;
lockfree_thread_pool(size_t queue_size, size_t threads_num,
const std::function<void()>& on_thread_start,
const std::function<void()>& on_thread_stop)
: thread_pool(queue_size, lockfree, threads_num, on_thread_start, on_thread_stop),
msg_q_(msg_q_size_) {
for (size_t i = 0; i < threads_num_; ++i) {
threads_.emplace_back([this] {
start_func_();
#ifdef LEARNLOG_USE_TLS
token_ = learnlog::make_unique<moodycamel::ConsumerToken>(msg_q_);
#else
size_t tid = os::thread_id();
{
std::lock_guard<std::mutex> lock(hash_mutex_);
tokens_[tid] = learnlog::make_unique<moodycamel::ConsumerToken>(msg_q_);
}
#endif
this->thread_pool::worker_loop_();
stop_func_();
});
}
}
lockfree_thread_pool(size_t queue_size, size_t threads_num,
const std::function<void()>& on_thread_start)
: thread_pool(queue_size, lockfree, threads_num, on_thread_start, []{}),
msg_q_(msg_q_size_) {
for (size_t i = 0; i < threads_num_; ++i) {
threads_.emplace_back([this] {
start_func_();
#ifdef LEARNLOG_USE_TLS
token_ = learnlog::make_unique<moodycamel::ConsumerToken>(msg_q_);
#else
size_t tid = os::thread_id();
{
std::lock_guard<std::mutex> lock(hash_mutex_);
tokens_[tid] = learnlog::make_unique<moodycamel::ConsumerToken>(msg_q_);
}
#endif
this->thread_pool::worker_loop_();
stop_func_();
});
}
}
lockfree_thread_pool(size_t queue_size = default_queue_size,
size_t threads_num = default_threads_num)
: thread_pool(queue_size, lockfree, threads_num, []{}, []{}),
msg_q_(msg_q_size_) {
for (size_t i = 0; i < threads_num_; ++i) {
threads_.emplace_back([this] {
start_func_();
#ifdef LEARNLOG_USE_TLS
token_ = learnlog::make_unique<moodycamel::ConsumerToken>(msg_q_);
#else
size_t tid = os::thread_id();
{
std::lock_guard<std::mutex> lock(hash_mutex_);
tokens_[tid] = learnlog::make_unique<moodycamel::ConsumerToken>(msg_q_);
}
#endif
this->thread_pool::worker_loop_();
stop_func_();
});
}
}
~lockfree_thread_pool() override {
while (current_msg_count() != 0) {
// os::sleep_for_ms(1);
}
for (size_t i = 0; i < threads_num_; i++) {
enqueue_async_msg_(async_msg(async_msg_type::terminate));
}
try {
for (auto &t : threads_){
t.join();
}
}
catch(const std::exception& e) {
source_loc loc{__FILE__, __LINE__, __func__};
throw_learnlog_excpt(e.what(), os::get_errno(), loc);
}
}
size_t current_msg_count() { return msg_q_.size_approx(); }
private:
void enqueue_async_msg_(async_msg&& amsg) override {
while (!msg_q_.try_enqueue(std::move(amsg)))
continue;
}
void dequeue_async_msg_(async_msg& amsg) override {
#ifdef LEARNLOG_USE_TLS
msg_q_.wait_dequeue(*token_, amsg);
#else
size_t tid = os::thread_id();
moodycamel::ConsumerToken* token = nullptr;
{
std::lock_guard<std::mutex> lock(hash_mutex_);
token = tokens_[tid].get();
}
msg_q_.wait_dequeue(*token, amsg);
#endif
}
#ifdef LEARNLOG_USE_TLS
static thread_local c_token_uni_ptr token_;
#else
std::mutex hash_mutex_;
std::unordered_map<size_t, c_token_uni_ptr> tokens_;
#endif
moodycamel::BlockingConcurrentQueue<async_msg> msg_q_;
};
} // namespace base
} // namespace learnlog