-
-
Notifications
You must be signed in to change notification settings - Fork 392
/
Copy pathprocessor.hpp
76 lines (59 loc) · 1.78 KB
/
processor.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
/**
* Copyright (c) 2020 Paul-Louis Ageneau
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/
#ifndef RTC_IMPL_PROCESSOR_H
#define RTC_IMPL_PROCESSOR_H
#include "common.hpp"
#include "queue.hpp"
#include "threadpool.hpp"
#include <condition_variable>
#include <future>
#include <memory>
#include <mutex>
#include <queue>
namespace rtc::impl {
// Processed tasks in order by delegating them to the thread pool
class Processor {
public:
Processor(size_t limit = 0);
virtual ~Processor();
Processor(const Processor &) = delete;
Processor &operator=(const Processor &) = delete;
Processor(Processor &&) = delete;
Processor &operator=(Processor &&) = delete;
void join();
template <class F, class... Args> void enqueue(F &&f, Args &&...args) noexcept;
private:
void schedule();
Queue<std::function<void()>> mTasks;
bool mPending = false; // true iff a task is pending in the thread pool
mutable std::mutex mMutex;
std::condition_variable mCondition;
};
class TearDownProcessor final : public Processor {
public:
static TearDownProcessor &Instance();
private:
TearDownProcessor();
~TearDownProcessor();
};
template <class F, class... Args> void Processor::enqueue(F &&f, Args &&...args) noexcept {
std::unique_lock lock(mMutex);
auto bound = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
auto task = [this, bound = std::move(bound)]() mutable {
scope_guard guard(std::bind(&Processor::schedule, this)); // chain the next task
return bound();
};
if (!mPending) {
ThreadPool::Instance().enqueue(std::move(task));
mPending = true;
} else {
mTasks.push(std::move(task));
}
}
} // namespace rtc::impl
#endif