Skip to content

Commit f61fe88

Browse files
Merge pull request #1153 from Sean-Der/pacing-handler
Add PacingHandler
2 parents 40dcebc + 118a692 commit f61fe88

File tree

5 files changed

+135
-2
lines changed

5 files changed

+135
-2
lines changed

CMakeLists.txt

+2
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ set(LIBDATACHANNEL_SOURCES
8787
${CMAKE_CURRENT_SOURCE_DIR}/src/rtp.cpp
8888
${CMAKE_CURRENT_SOURCE_DIR}/src/capi.cpp
8989
${CMAKE_CURRENT_SOURCE_DIR}/src/plihandler.cpp
90+
${CMAKE_CURRENT_SOURCE_DIR}/src/pacinghandler.cpp
9091
)
9192

9293
set(LIBDATACHANNEL_HEADERS
@@ -123,6 +124,7 @@ set(LIBDATACHANNEL_HEADERS
123124
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpnackresponder.hpp
124125
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/utils.hpp
125126
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/plihandler.hpp
127+
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/pacinghandler.hpp
126128
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/version.h
127129
)
128130

include/rtc/pacinghandler.hpp

+50
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
/**
2+
* Copyright (c) 2024 Sean DuBois <sean@siobud.com>
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
7+
*/
8+
9+
#ifndef RTC_PACING_HANDLER_H
10+
#define RTC_PACING_HANDLER_H
11+
12+
#if RTC_ENABLE_MEDIA
13+
14+
#include "mediahandler.hpp"
15+
#include "utils.hpp"
16+
17+
#include <atomic>
18+
#include <queue>
19+
20+
namespace rtc {
21+
22+
// Paced sending of RTP packets. Takes a stream of RTP packets that can an
23+
// uneven bitrate. It then delivers these packets in a smoother manner by
24+
// sending a fixed size of them on an interval
25+
class RTC_CPP_EXPORT PacingHandler : public MediaHandler {
26+
public:
27+
PacingHandler(double bitsPerSecond, std::chrono::milliseconds sendInterval);
28+
29+
void outgoing(message_vector &messages, const message_callback &send) override;
30+
31+
private:
32+
std::atomic<bool> mHaveScheduled = false;
33+
34+
double mBytesPerSecond;
35+
double mBudget;
36+
37+
std::chrono::milliseconds mSendInterval;
38+
std::chrono::time_point<std::chrono::high_resolution_clock> mLastRun;
39+
40+
std::mutex mMutex;
41+
std::queue<message_ptr> mRtpBuffer;
42+
43+
void schedule(const message_callback &send);
44+
};
45+
46+
} // namespace rtc
47+
48+
#endif // RTC_ENABLE_MEDIA
49+
50+
#endif // RTC_PACING_HANDLER_H

include/rtc/rtc.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "h265rtppacketizer.hpp"
3535
#include "mediahandler.hpp"
3636
#include "plihandler.hpp"
37+
#include "pacinghandler.hpp"
3738
#include "rtcpnackresponder.hpp"
3839
#include "rtcpreceivingsession.hpp"
3940
#include "rtcpsrreporter.hpp"

src/impl/track.cpp

+10-2
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,11 @@ void Track::incoming(message_ptr message) {
142142

143143
message_vector messages{std::move(message)};
144144
if (auto handler = getMediaHandler())
145-
handler->incomingChain(messages, [this](message_ptr m) { transportSend(m); });
145+
handler->incomingChain(messages, [this, weak_this = weak_from_this()](message_ptr m) {
146+
if (auto locked = weak_this.lock()) {
147+
transportSend(m);
148+
}
149+
});
146150

147151
for (auto &m : messages) {
148152
// Tail drop if queue is full
@@ -175,7 +179,11 @@ bool Track::outgoing(message_ptr message) {
175179

176180
if (handler) {
177181
message_vector messages{std::move(message)};
178-
handler->outgoingChain(messages, [this](message_ptr m) { transportSend(m); });
182+
handler->outgoingChain(messages, [this, weak_this = weak_from_this()](message_ptr m) {
183+
if (auto locked = weak_this.lock()) {
184+
transportSend(m);
185+
}
186+
});
179187
bool ret = false;
180188
for (auto &m : messages)
181189
ret = transportSend(std::move(m));

src/pacinghandler.cpp

+72
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/**
2+
* Copyright (c) 2024 Sean DuBois <sean@siobud.com>
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
7+
*/
8+
9+
#if RTC_ENABLE_MEDIA
10+
11+
#include <memory>
12+
13+
#include "pacinghandler.hpp"
14+
15+
#include "impl/internals.hpp"
16+
#include "impl/threadpool.hpp"
17+
18+
namespace rtc {
19+
20+
PacingHandler::PacingHandler(double bitsPerSecond, std::chrono::milliseconds sendInterval)
21+
: mBytesPerSecond(bitsPerSecond / 8), mBudget(0), mSendInterval(sendInterval){};
22+
23+
void PacingHandler::schedule(const message_callback &send) {
24+
if (!mHaveScheduled.exchange(true)) {
25+
return;
26+
}
27+
28+
impl::ThreadPool::Instance().schedule(mSendInterval, [this, weak_this = weak_from_this(),
29+
send]() {
30+
if (auto locked = weak_this.lock()) {
31+
const std::lock_guard<std::mutex> lock(mMutex);
32+
mHaveScheduled.store(false);
33+
34+
// Update the budget and cap it
35+
auto newBudget =
36+
std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - mLastRun)
37+
.count() *
38+
mBytesPerSecond;
39+
auto maxBudget = std::chrono::duration<double>(mSendInterval).count() * mBytesPerSecond;
40+
mBudget = std::min(mBudget + newBudget, maxBudget);
41+
mLastRun = std::chrono::high_resolution_clock::now();
42+
43+
// Send packets while there is budget, allow a single partial packet over budget
44+
while (!mRtpBuffer.empty() && mBudget > 0) {
45+
auto size = int(mRtpBuffer.front()->size());
46+
send(std::move(mRtpBuffer.front()));
47+
mRtpBuffer.pop();
48+
mBudget -= size;
49+
}
50+
51+
if (!mRtpBuffer.empty()) {
52+
schedule(send);
53+
}
54+
}
55+
});
56+
}
57+
58+
void PacingHandler::outgoing(message_vector &messages, const message_callback &send) {
59+
60+
std::lock_guard<std::mutex> lock(mMutex);
61+
62+
for (auto &m : messages) {
63+
mRtpBuffer.push(std::move(m));
64+
}
65+
messages.clear();
66+
67+
schedule(send);
68+
}
69+
70+
} // namespace rtc
71+
72+
#endif /* RTC_ENABLE_MEDIA */

0 commit comments

Comments
 (0)