Skip to content

Commit cc8fc4f

Browse files
committed
Add PacingHandler
MediaHandler that can be used to pace packet delivery Resolves #1017
1 parent fbd8f23 commit cc8fc4f

File tree

4 files changed

+149
-0
lines changed

4 files changed

+149
-0
lines changed

CMakeLists.txt

+2
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ set(LIBDATACHANNEL_SOURCES
8787
${CMAKE_CURRENT_SOURCE_DIR}/src/rtp.cpp
8888
${CMAKE_CURRENT_SOURCE_DIR}/src/capi.cpp
8989
${CMAKE_CURRENT_SOURCE_DIR}/src/plihandler.cpp
90+
${CMAKE_CURRENT_SOURCE_DIR}/src/pacinghandler.cpp
9091
)
9192

9293
set(LIBDATACHANNEL_HEADERS
@@ -123,6 +124,7 @@ set(LIBDATACHANNEL_HEADERS
123124
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpnackresponder.hpp
124125
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/utils.hpp
125126
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/plihandler.hpp
127+
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/pacinghandler.hpp
126128
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/version.h
127129
)
128130

include/rtc/pacinghandler.hpp

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/**
2+
* Copyright (c) 2020 Staz Modrzynski
3+
* Copyright (c) 2020 Paul-Louis Ageneau
4+
*
5+
* This Source Code Form is subject to the terms of the Mozilla Public
6+
* License, v. 2.0. If a copy of the MPL was not distributed with this
7+
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
8+
*/
9+
10+
#ifndef RTC_PACING_HANDLER_H
11+
#define RTC_PACING_HANDLER_H
12+
13+
#if RTC_ENABLE_MEDIA
14+
15+
#include "mediahandler.hpp"
16+
#include "utils.hpp"
17+
18+
#include <atomic>
19+
#include <deque>
20+
21+
namespace rtc {
22+
23+
// Paced sending of RTP packets. Takes a stream of RTP packets that can an
24+
// uneven bitrate. It then delivers these packets in a smoother manner by
25+
// sending a fixed size of them on an interval
26+
class RTC_CPP_EXPORT PacingHandler : public MediaHandler {
27+
public:
28+
static constexpr float DefaultOverageFactor = 2.0;
29+
PacingHandler(float bytesPerMillisecond, std::chrono::milliseconds sendInterval,
30+
float overageFactor = DefaultOverageFactor);
31+
32+
void outgoing(message_vector &messages, const message_callback &send) override;
33+
34+
private:
35+
float mBytesPerMillisecond;
36+
std::chrono::milliseconds mSendInterval;
37+
38+
std::atomic<bool> mHaveScheduled = false;
39+
std::chrono::time_point<std::chrono::high_resolution_clock> mLastRun;
40+
41+
size_t mOverage;
42+
float mOverageFactor;
43+
44+
class RTC_CPP_EXPORT Storage {
45+
public:
46+
std::mutex mMutex;
47+
std::deque<message_ptr> mRtpBuffer = {};
48+
};
49+
50+
const shared_ptr<Storage> mStorage;
51+
52+
void schedule(const message_callback &send);
53+
};
54+
55+
} // namespace rtc
56+
57+
#endif // RTC_ENABLE_MEDIA
58+
59+
#endif // RTC_PACING_HANDLER_H

include/rtc/rtc.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "h265rtppacketizer.hpp"
3535
#include "mediahandler.hpp"
3636
#include "plihandler.hpp"
37+
#include "pacinghandler.hpp"
3738
#include "rtcpnackresponder.hpp"
3839
#include "rtcpreceivingsession.hpp"
3940
#include "rtcpsrreporter.hpp"

src/pacinghandler.cpp

+87
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/**
2+
* Copyright (c) 2020 Filip Klembara (in2core)
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
7+
*/
8+
9+
#if RTC_ENABLE_MEDIA
10+
11+
#include <memory>
12+
13+
#include "pacinghandler.hpp"
14+
15+
#include "impl/internals.hpp"
16+
#include "impl/threadpool.hpp"
17+
18+
namespace rtc {
19+
20+
PacingHandler::PacingHandler(float bitsPerSecond, std::chrono::milliseconds sendInterval,
21+
float overageFactor)
22+
: mBytesPerMillisecond((bitsPerSecond / 1000) / 8), mSendInterval(sendInterval),
23+
mOverageFactor(overageFactor), mStorage(std::make_shared<Storage>()){};
24+
25+
void PacingHandler::schedule(const message_callback &send) {
26+
if (!mHaveScheduled.exchange(true)) {
27+
return;
28+
}
29+
30+
impl::ThreadPool::Instance().schedule(mSendInterval, [weak_this = weak_from_this(), send]() {
31+
if (auto locked = std::dynamic_pointer_cast<PacingHandler>(weak_this.lock())) {
32+
const std::lock_guard<std::mutex> lock(locked->mStorage->mMutex);
33+
locked->mHaveScheduled.store(false);
34+
35+
// set byteBudget to how many milliseconds have elapsed since last run
36+
size_t byteBudget = locked->mSendInterval.count();
37+
if (locked->mLastRun !=
38+
std::chrono::time_point<std::chrono::high_resolution_clock>::min()) {
39+
byteBudget = std::chrono::duration_cast<std::chrono::milliseconds>(
40+
std::chrono::high_resolution_clock::now() - locked->mLastRun)
41+
.count();
42+
}
43+
44+
// byteBudget is now the total amount of bytes that we can send since last run
45+
byteBudget *= locked->mBytesPerMillisecond;
46+
47+
size_t amountSent = 0;
48+
while (!locked->mStorage->mRtpBuffer.empty()) {
49+
auto pktSize = locked->mStorage->mRtpBuffer.front()->size();
50+
51+
// If overage is available spend it on current packet
52+
// Otherwise don't send anymore packets
53+
if ((amountSent + pktSize) >= byteBudget) {
54+
if (locked->mOverage > pktSize) {
55+
locked->mOverage -= pktSize;
56+
} else {
57+
return;
58+
}
59+
}
60+
61+
send(std::move(locked->mStorage->mRtpBuffer.front()));
62+
amountSent += pktSize;
63+
locked->mStorage->mRtpBuffer.pop_front();
64+
}
65+
66+
// The remaining byteBudget is added to the overage
67+
locked->mOverage += (byteBudget - amountSent);
68+
locked->mOverage = std::min(static_cast<size_t>(byteBudget * locked->mOverageFactor),
69+
locked->mOverage);
70+
locked->mLastRun = std::chrono::high_resolution_clock::now();
71+
}
72+
});
73+
}
74+
75+
void PacingHandler::outgoing(message_vector &messages, const message_callback &send) {
76+
77+
std::lock_guard<std::mutex> lock(mStorage->mMutex);
78+
79+
std::move(std::begin(messages), std::end(messages), std::back_inserter(mStorage->mRtpBuffer));
80+
messages.clear();
81+
82+
schedule(send);
83+
}
84+
85+
} // namespace rtc
86+
87+
#endif /* RTC_ENABLE_MEDIA */

0 commit comments

Comments
 (0)