-
-
Notifications
You must be signed in to change notification settings - Fork 392
/
Copy paththreadpool.cpp
97 lines (80 loc) · 1.94 KB
/
threadpool.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/**
* Copyright (c) 2020 Paul-Louis Ageneau
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#include "threadpool.hpp"
#include "utils.hpp"
namespace rtc::impl {
ThreadPool &ThreadPool::Instance() {
static ThreadPool *instance = new ThreadPool;
return *instance;
}
ThreadPool::ThreadPool() {}
ThreadPool::~ThreadPool() {}
int ThreadPool::count() const {
std::unique_lock lock(mWorkersMutex);
return int(mWorkers.size());
}
void ThreadPool::spawn(int count) {
std::unique_lock lock(mWorkersMutex);
while (count-- > 0)
mWorkers.emplace_back(std::bind(&ThreadPool::run, this));
}
void ThreadPool::join() {
{
std::unique_lock lock(mMutex);
mWaitingCondition.wait(lock, [&]() { return mBusyWorkers == 0; });
mJoining = true;
mTasksCondition.notify_all();
}
std::unique_lock lock(mWorkersMutex);
for (auto &w : mWorkers)
w.join();
mWorkers.clear();
mJoining = false;
}
void ThreadPool::clear() {
std::unique_lock lock(mMutex);
while (!mTasks.empty())
mTasks.pop();
}
void ThreadPool::run() {
utils::this_thread::set_name("RTC worker");
++mBusyWorkers;
scope_guard guard([&]() { --mBusyWorkers; });
while (runOne()) {
}
}
bool ThreadPool::runOne() {
if (auto task = dequeue()) {
task();
return true;
}
return false;
}
std::function<void()> ThreadPool::dequeue() {
std::unique_lock lock(mMutex);
while (!mJoining) {
std::optional<clock::time_point> time;
if (!mTasks.empty()) {
time = mTasks.top().time;
if (*time <= clock::now()) {
auto func = std::move(mTasks.top().func);
mTasks.pop();
return func;
}
}
--mBusyWorkers;
scope_guard guard([&]() { ++mBusyWorkers; });
mWaitingCondition.notify_all();
if (time)
mTasksCondition.wait_until(lock, *time);
else
mTasksCondition.wait(lock);
}
return nullptr;
}
} // namespace rtc::impl