-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlock_thread_pool.h
89 lines (77 loc) · 2.78 KB
/
lock_thread_pool.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#pragma once
#include "base/thread_pool.h"
#include "block_queue.h"
#include "async_logger.h"
namespace learnlog {
namespace base {
// 使用阻塞队列 block_queue 的线程池,处理 async_msg,
// q.enqueue() 入队,q.dequeue() 出队
// 队列的总占用空间不变
class lock_thread_pool final: public thread_pool {
public:
lock_thread_pool(size_t queue_size, size_t threads_num,
const std::function<void()>& on_thread_start,
const std::function<void()>& on_thread_stop)
: thread_pool(queue_size, lock, threads_num, on_thread_start, on_thread_stop),
msg_q_(msg_q_size_) {
for (size_t i = 0; i < threads_num_; ++i) {
threads_.emplace_back([this] {
start_func_();
this->thread_pool::worker_loop_();
stop_func_();
});
}
}
lock_thread_pool(size_t queue_size, size_t threads_num,
const std::function<void()>& on_thread_start)
: thread_pool(queue_size, lock, threads_num, on_thread_start, []{}),
msg_q_(msg_q_size_) {
for (size_t i = 0; i < threads_num_; ++i) {
threads_.emplace_back([this] {
start_func_();
this->thread_pool::worker_loop_();
stop_func_();
});
}
}
lock_thread_pool(size_t queue_size = default_queue_size,
size_t threads_num = default_threads_num)
: thread_pool(queue_size, lock, threads_num, []{}, []{}),
msg_q_(msg_q_size_) {
for (size_t i = 0; i < threads_num_; ++i) {
threads_.emplace_back([this] {
start_func_();
this->thread_pool::worker_loop_();
stop_func_();
});
}
}
~lock_thread_pool() override {
for (size_t i = 0; i < threads_num_; i++) {
enqueue_async_msg_(async_msg(async_msg_type::terminate));
}
try {
for(auto &t : threads_){
t.join();
}
}
catch(const std::exception& e) {
source_loc loc{__FILE__, __LINE__, __func__};
throw_learnlog_excpt(e.what(), os::get_errno(), loc);
}
}
size_t override_count() { return msg_q_.get_override_count(); }
void reset_override_count() { msg_q_.reset_override_count(); }
size_t discard_count() { return msg_q_.get_discard_count(); }
void reset_discard_count() { msg_q_.reset_discard_count(); }
private:
void enqueue_async_msg_(async_msg&& amsg) override {
msg_q_.enqueue(std::move(amsg));
}
void dequeue_async_msg_(async_msg& amsg) override {
msg_q_.dequeue(amsg);
}
block_queue<async_msg> msg_q_;
};
} // namespace base
} // namespace learnlog