Skip to content

Commit d611ee4

Browse files
Changed Queue to non-blocking
1 parent ff5098c commit d611ee4

7 files changed

+22
-54
lines changed

src/impl/datachannel.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ void DataChannel::remoteClose() {
108108
}
109109

110110
optional<message_variant> DataChannel::receive() {
111-
auto next = mRecvQueue.tryPop();
111+
auto next = mRecvQueue.pop();
112112
return next ? std::make_optional(to_variant(std::move(**next))) : nullopt;
113113
}
114114

src/impl/dtlstransport.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -329,7 +329,7 @@ int DtlsTransport::TimeoutCallback(gnutls_transport_ptr_t ptr, unsigned int ms)
329329
DtlsTransport *t = static_cast<DtlsTransport *>(ptr);
330330
try {
331331
bool isReadable = t->mIncomingQueue.wait(
332-
ms != GNUTLS_INDEFINITE_TIMEOUT ? std::make_optional(milliseconds(ms)) : nullopt);
332+
ms != GNUTLS_INDEFINITE_TIMEOUT ? std::make_optional(milliseconds(ms)) : nullopt); // TODO
333333
return isReadable ? 1 : 0;
334334

335335
} catch (const std::exception &e) {
@@ -541,7 +541,7 @@ void DtlsTransport::runRecvLoop() {
541541
byte buffer[bufferSize];
542542
while (mIncomingQueue.running()) {
543543
// Process pending messages
544-
while (auto next = mIncomingQueue.tryPop()) {
544+
while (auto next = mIncomingQueue.pop()) {
545545
message_ptr message = std::move(*next);
546546
if (demuxMessage(message))
547547
continue;
@@ -599,7 +599,7 @@ void DtlsTransport::runRecvLoop() {
599599
}
600600
}
601601

602-
mIncomingQueue.wait(duration);
602+
mIncomingQueue.wait(duration); // TODO
603603
}
604604
} catch (const std::exception &e) {
605605
PLOG_ERROR << "DTLS recv: " << e.what();

src/impl/peerconnection.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -1110,7 +1110,7 @@ void PeerConnection::triggerTrack(weak_ptr<Track> weakTrack) {
11101110

11111111
void PeerConnection::triggerPendingDataChannels() {
11121112
while (dataChannelCallback) {
1113-
auto next = mPendingDataChannels.tryPop();
1113+
auto next = mPendingDataChannels.pop();
11141114
if (!next)
11151115
break;
11161116

@@ -1128,7 +1128,7 @@ void PeerConnection::triggerPendingDataChannels() {
11281128

11291129
void PeerConnection::triggerPendingTracks() {
11301130
while (trackCallback) {
1131-
auto next = mPendingTracks.tryPop();
1131+
auto next = mPendingTracks.pop();
11321132
if (!next)
11331133
break;
11341134

src/impl/processor.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ void Processor::join() {
2121

2222
void Processor::schedule() {
2323
std::unique_lock lock(mMutex);
24-
if (auto next = mTasks.tryPop()) {
24+
if (auto next = mTasks.pop()) {
2525
ThreadPool::Instance().enqueue(std::move(*next));
2626
} else {
2727
// No more tasks

src/impl/queue.hpp

+12-44
Original file line numberDiff line numberDiff line change
@@ -34,19 +34,14 @@ template <typename T> class Queue {
3434
size_t amount() const; // amount
3535
void push(T element);
3636
optional<T> pop();
37-
optional<T> tryPop();
3837
optional<T> peek();
3938
optional<T> exchange(T element);
40-
bool wait(const optional<std::chrono::milliseconds> &duration = nullopt);
4139

4240
private:
43-
void pushImpl(T element);
44-
optional<T> popImpl();
45-
4641
const size_t mLimit;
4742
size_t mAmount;
4843
std::queue<T> mQueue;
49-
std::condition_variable mPopCondition, mPushCondition;
44+
std::condition_variable mPushCondition;
5045
amount_function mAmountFunction;
5146
bool mStopping = false;
5247

@@ -66,7 +61,6 @@ template <typename T> Queue<T>::~Queue() { stop(); }
6661
template <typename T> void Queue<T>::stop() {
6762
std::lock_guard lock(mMutex);
6863
mStopping = true;
69-
mPopCondition.notify_all();
7064
mPushCondition.notify_all();
7165
}
7266

@@ -98,18 +92,22 @@ template <typename T> size_t Queue<T>::amount() const {
9892
template <typename T> void Queue<T>::push(T element) {
9993
std::unique_lock lock(mMutex);
10094
mPushCondition.wait(lock, [this]() { return !mLimit || mQueue.size() < mLimit || mStopping; });
101-
pushImpl(std::move(element));
95+
if (mStopping)
96+
return;
97+
98+
mAmount += mAmountFunction(element);
99+
mQueue.emplace(std::move(element));
102100
}
103101

104102
template <typename T> optional<T> Queue<T>::pop() {
105103
std::unique_lock lock(mMutex);
106-
mPopCondition.wait(lock, [this]() { return !mQueue.empty() || mStopping; });
107-
return popImpl();
108-
}
104+
if (mQueue.empty())
105+
return nullopt;
109106

110-
template <typename T> optional<T> Queue<T>::tryPop() {
111-
std::unique_lock lock(mMutex);
112-
return popImpl();
107+
mAmount -= mAmountFunction(mQueue.front());
108+
optional<T> element{std::move(mQueue.front())};
109+
mQueue.pop();
110+
return element;
113111
}
114112

115113
template <typename T> optional<T> Queue<T>::peek() {
@@ -126,36 +124,6 @@ template <typename T> optional<T> Queue<T>::exchange(T element) {
126124
return std::make_optional(std::move(element));
127125
}
128126

129-
template <typename T> bool Queue<T>::wait(const optional<std::chrono::milliseconds> &duration) {
130-
std::unique_lock lock(mMutex);
131-
if (duration) {
132-
return mPopCondition.wait_for(lock, *duration,
133-
[this]() { return !mQueue.empty() || mStopping; });
134-
} else {
135-
mPopCondition.wait(lock, [this]() { return !mQueue.empty() || mStopping; });
136-
return true;
137-
}
138-
}
139-
140-
template <typename T> void Queue<T>::pushImpl(T element) {
141-
if (mStopping)
142-
return;
143-
144-
mAmount += mAmountFunction(element);
145-
mQueue.emplace(std::move(element));
146-
mPopCondition.notify_one();
147-
}
148-
149-
template <typename T> optional<T> Queue<T>::popImpl() {
150-
if (mQueue.empty())
151-
return nullopt;
152-
153-
mAmount -= mAmountFunction(mQueue.front());
154-
optional<T> element{std::move(mQueue.front())};
155-
mQueue.pop();
156-
return element;
157-
}
158-
159127
} // namespace rtc::impl
160128

161129
#endif

src/impl/track.cpp

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ void Track::close() {
6363
}
6464

6565
optional<message_variant> Track::receive() {
66-
if (auto next = mRecvQueue.tryPop()) {
66+
if (auto next = mRecvQueue.pop()) {
6767
message_ptr message = *next;
6868
if (message->type == Message::Control)
6969
return to_variant(**next); // The same message may be frowarded into multiple Tracks

src/impl/websocket.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ bool WebSocket::isClosed() const { return state == State::Closed; }
133133
size_t WebSocket::maxMessageSize() const { return DEFAULT_MAX_MESSAGE_SIZE; }
134134

135135
optional<message_variant> WebSocket::receive() {
136-
while (auto next = mRecvQueue.tryPop()) {
136+
while (auto next = mRecvQueue.pop()) {
137137
message_ptr message = *next;
138138
if (message->type != Message::Control)
139139
return to_variant(std::move(*message));
@@ -147,7 +147,7 @@ optional<message_variant> WebSocket::peek() {
147147
if (message->type != Message::Control)
148148
return to_variant(std::move(*message));
149149

150-
mRecvQueue.tryPop();
150+
mRecvQueue.pop();
151151
}
152152
return nullopt;
153153
}

0 commit comments

Comments
 (0)