Skip to content

Commit 4387c6c

Browse files
committed
Improve coro::condition_variable_base<strategy_based_on_io_scheduler>
performance with many waiters
1 parent 950a900 commit 4387c6c

File tree

2 files changed

+67
-124
lines changed

2 files changed

+67
-124
lines changed

include/coro/condition_variable.hpp

+20-8
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,17 @@
99
#endif
1010

1111
#include "coro/mutex.hpp"
12+
#include <unordered_set>
13+
14+
#if (!defined(__clang__) && (defined(__GNUC__) && __GNUC__ <= 13)) || (defined(__clang__) && __clang_major__ < 18)
15+
16+
template<>
17+
struct std::hash<std::coroutine_handle<>>
18+
{
19+
std::size_t operator()(const std::coroutine_handle<>& h) const noexcept { return ptrdiff_t(h.address()); }
20+
};
21+
22+
#endif
1223

1324
namespace coro
1425
{
@@ -190,7 +201,6 @@ class strategy_based_on_io_scheduler
190201

191202
strategy_based_on_io_scheduler& m_strategy;
192203
std::coroutine_handle<> m_awaiting_coroutine;
193-
std::atomic<wait_operation*> m_next{nullptr};
194204
};
195205

196206
void notify_one() noexcept;
@@ -210,9 +220,9 @@ class strategy_based_on_io_scheduler
210220
/// A scheduler is needed to suspend coroutines and then wake them up upon notification or timeout.
211221
std::weak_ptr<io_scheduler> m_scheduler;
212222

213-
/// A list of grabbed internal waiters that are only accessed by the notify'er or task that was cancelled due to
223+
/// A set of grabbed internal waiters that are only accessed by the notify'er or task that was cancelled due to
214224
/// timeout.
215-
std::atomic<wait_operation*> m_internal_waiters{nullptr};
225+
std::unordered_set<std::coroutine_handle<>> m_internal_waiters;
216226

217227
/// An atomic-based mutex analog to prevent race conditions between the notify'er and the task being cancelled on
218228
/// timeout. unlocked == nullptr
@@ -233,15 +243,17 @@ class strategy_based_on_io_scheduler
233243
class wait_operation_guard
234244
{
235245
public:
236-
explicit wait_operation_guard(strategy_based_on_io_scheduler* cv) noexcept;
246+
explicit wait_operation_guard(strategy_based_on_io_scheduler* strategy) noexcept;
237247
~wait_operation_guard();
238248
operator bool() const noexcept;
239-
wait_operation* value() const noexcept;
240-
void set_value(wait_operation* value) noexcept;
249+
std::coroutine_handle<> value() const noexcept;
250+
std::unordered_set<std::coroutine_handle<>> values() const noexcept;
251+
void set_value(std::coroutine_handle<> value) noexcept;
252+
void set_values(std::unordered_set<std::coroutine_handle<>> values) noexcept;
241253

242254
private:
243-
strategy_based_on_io_scheduler* m_cv{};
244-
wait_operation* m_value{};
255+
strategy_based_on_io_scheduler* m_strategy{};
256+
std::unordered_set<std::coroutine_handle<>> m_values{};
245257
};
246258

247259
/// Extract one waiter from @ref m_internal_waiters

src/condition_variable.cpp

+47-116
Original file line numberDiff line numberDiff line change
@@ -54,71 +54,14 @@ void detail::strategy_based_on_io_scheduler::unlock() noexcept
5454

5555
void detail::strategy_based_on_io_scheduler::insert_waiter(wait_operation* waiter) noexcept
5656
{
57-
while (true)
58-
{
59-
wait_operation* current = m_internal_waiters.load(std::memory_order::acquire);
60-
waiter->m_next.store(current, std::memory_order::release);
61-
62-
if (!m_internal_waiters.compare_exchange_weak(
63-
current, waiter, std::memory_order::release, std::memory_order::acquire))
64-
{
65-
continue;
66-
}
67-
68-
break;
69-
}
57+
std::lock_guard<detail::strategy_based_on_io_scheduler> guard{*this};
58+
m_internal_waiters.insert(waiter->m_awaiting_coroutine);
7059
}
7160

7261
bool detail::strategy_based_on_io_scheduler::extract_waiter(wait_operation* waiter) noexcept
7362
{
7463
std::lock_guard<detail::strategy_based_on_io_scheduler> guard{*this};
75-
bool result{};
76-
77-
while (true)
78-
{
79-
wait_operation* current = m_internal_waiters.load(std::memory_order::acquire);
80-
81-
if (!current)
82-
{
83-
break;
84-
}
85-
86-
wait_operation* next = current->m_next.load(std::memory_order::acquire);
87-
88-
if (current == waiter)
89-
{
90-
if (!m_internal_waiters.compare_exchange_weak(
91-
current, next, std::memory_order::release, std::memory_order::acquire))
92-
{
93-
continue;
94-
}
95-
}
96-
97-
while (next && next != waiter)
98-
{
99-
current = next;
100-
next = current->m_next.load(std::memory_order::acquire);
101-
}
102-
103-
if (!next)
104-
{
105-
break;
106-
}
107-
108-
wait_operation* new_next = waiter->m_next.load(std::memory_order::acquire);
109-
110-
if (!current->m_next.compare_exchange_strong(
111-
next, new_next, std::memory_order::release, std::memory_order::acquire))
112-
{
113-
continue;
114-
}
115-
116-
waiter->m_next.store(nullptr, std::memory_order::release);
117-
result = true;
118-
break;
119-
}
120-
121-
return result;
64+
return m_internal_waiters.erase(waiter->m_awaiting_coroutine);
12265
}
12366

12467
detail::strategy_based_on_io_scheduler::strategy_based_on_io_scheduler(std::shared_ptr<io_scheduler> io_scheduler)
@@ -133,9 +76,14 @@ void detail::strategy_based_on_io_scheduler::notify_one() noexcept
13376

13477
if (auto waiter_guard = extract_one())
13578
{
136-
if (auto sched = m_scheduler.lock())
79+
auto h = waiter_guard.value();
80+
81+
if (h && !h.done())
13782
{
138-
sched->resume(waiter_guard.value()->m_awaiting_coroutine);
83+
if (auto sched = m_scheduler.lock())
84+
{
85+
sched->resume(h);
86+
}
13987
}
14088
}
14189
}
@@ -146,68 +94,37 @@ void detail::strategy_based_on_io_scheduler::notify_all() noexcept
14694

14795
if (auto waiter_guard = extract_all())
14896
{
97+
auto values = waiter_guard.values();
98+
14999
if (auto sched = m_scheduler.lock())
150100
{
151-
auto* waiter = waiter_guard.value();
152-
do
101+
for (const auto& h : values)
153102
{
154-
auto* next = waiter->m_next.load(std::memory_order::acquire);
155-
sched->resume(waiter->m_awaiting_coroutine);
156-
waiter = next;
157-
} while (waiter);
103+
if (h && !h.done())
104+
{
105+
sched->resume(h);
106+
}
107+
}
158108
}
159109
}
160110
}
161111

162112
detail::strategy_based_on_io_scheduler::wait_operation_guard detail::strategy_based_on_io_scheduler::extract_all()
163113
{
164114
wait_operation_guard result{this};
165-
166-
while (true)
167-
{
168-
auto* current = m_internal_waiters.load(std::memory_order::acquire);
169-
if (!current)
170-
{
171-
break;
172-
}
173-
174-
if (!m_internal_waiters.compare_exchange_weak(
175-
current, nullptr, std::memory_order::release, std::memory_order::acquire))
176-
{
177-
continue;
178-
}
179-
180-
result.set_value(current);
181-
break;
182-
}
183-
115+
result.set_values(m_internal_waiters);
116+
m_internal_waiters.clear();
184117
return result;
185118
}
186119

187120
detail::strategy_based_on_io_scheduler::wait_operation_guard detail::strategy_based_on_io_scheduler::extract_one()
188121
{
189122
wait_operation_guard result{this};
190-
191-
while (true)
123+
if (!m_internal_waiters.empty())
192124
{
193-
auto* current = m_internal_waiters.load(std::memory_order::acquire);
194-
if (!current)
195-
{
196-
break;
197-
}
198-
199-
auto* next = current->m_next.load(std::memory_order::acquire);
200-
if (!m_internal_waiters.compare_exchange_weak(
201-
current, next, std::memory_order::release, std::memory_order::acquire))
202-
{
203-
continue;
204-
}
205-
206-
current->m_next.store(nullptr, std::memory_order::release);
207-
result.set_value(current);
208-
break;
125+
result.set_value(*m_internal_waiters.begin());
126+
m_internal_waiters.erase(m_internal_waiters.begin());
209127
}
210-
211128
return result;
212129
}
213130

@@ -234,31 +151,45 @@ void detail::strategy_based_on_io_scheduler::wait_operation::await_resume() noex
234151
}
235152

236153
detail::strategy_based_on_io_scheduler::wait_operation_guard::wait_operation_guard(
237-
detail::strategy_based_on_io_scheduler* cv) noexcept
238-
: m_cv(cv)
154+
detail::strategy_based_on_io_scheduler* strategy) noexcept
155+
: m_strategy(strategy)
239156
{
240-
m_cv->lock();
157+
m_strategy->lock();
241158
}
242159

243160
detail::strategy_based_on_io_scheduler::wait_operation_guard::~wait_operation_guard()
244161
{
245-
m_cv->unlock();
162+
m_strategy->unlock();
246163
}
247164

248165
detail::strategy_based_on_io_scheduler::wait_operation_guard::operator bool() const noexcept
249166
{
250-
return m_value;
167+
return !m_values.empty();
168+
}
169+
170+
void detail::strategy_based_on_io_scheduler::wait_operation_guard::set_value(std::coroutine_handle<> value) noexcept
171+
{
172+
m_values = {value};
173+
}
174+
175+
std::coroutine_handle<> detail::strategy_based_on_io_scheduler::wait_operation_guard::value() const noexcept
176+
{
177+
if (m_values.empty())
178+
return nullptr;
179+
180+
return *m_values.begin();
251181
}
252182

253-
void detail::strategy_based_on_io_scheduler::wait_operation_guard::set_value(wait_operation* value) noexcept
183+
void detail::strategy_based_on_io_scheduler::wait_operation_guard::set_values(
184+
std::unordered_set<std::coroutine_handle<>> values) noexcept
254185
{
255-
m_value = value;
186+
m_values = values;
256187
}
257188

258-
detail::strategy_based_on_io_scheduler::wait_operation*
259-
detail::strategy_based_on_io_scheduler::wait_operation_guard::value() const noexcept
189+
std::unordered_set<std::coroutine_handle<>>
190+
detail::strategy_based_on_io_scheduler::wait_operation_guard::values() const noexcept
260191
{
261-
return m_value;
192+
return m_values;
262193
}
263194

264195
#endif

0 commit comments

Comments
 (0)