Skip to content

Commit b192072

Browse files
committed
Add PacingHandler
MediaHandler that can be used to pace packet delivery
1 parent fbd8f23 commit b192072

File tree

4 files changed

+114
-0
lines changed

4 files changed

+114
-0
lines changed

CMakeLists.txt

+2
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ set(LIBDATACHANNEL_SOURCES
8787
${CMAKE_CURRENT_SOURCE_DIR}/src/rtp.cpp
8888
${CMAKE_CURRENT_SOURCE_DIR}/src/capi.cpp
8989
${CMAKE_CURRENT_SOURCE_DIR}/src/plihandler.cpp
90+
${CMAKE_CURRENT_SOURCE_DIR}/src/pacinghandler.cpp
9091
)
9192

9293
set(LIBDATACHANNEL_HEADERS
@@ -123,6 +124,7 @@ set(LIBDATACHANNEL_HEADERS
123124
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpnackresponder.hpp
124125
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/utils.hpp
125126
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/plihandler.hpp
127+
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/pacinghandler.hpp
126128
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/version.h
127129
)
128130

include/rtc/pacinghandler.hpp

+52
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
/**
2+
* Copyright (c) 2020 Staz Modrzynski
3+
* Copyright (c) 2020 Paul-Louis Ageneau
4+
*
5+
* This Source Code Form is subject to the terms of the Mozilla Public
6+
* License, v. 2.0. If a copy of the MPL was not distributed with this
7+
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
8+
*/
9+
10+
#ifndef RTC_PACING_HANDLER_H
11+
#define RTC_PACING_HANDLER_H
12+
13+
#if RTC_ENABLE_MEDIA
14+
15+
#include "mediahandler.hpp"
16+
#include "utils.hpp"
17+
18+
#include <atomic>
19+
#include <deque>
20+
21+
namespace rtc {
22+
23+
// Paced sending of RTP packets. Takes a stream of RTP packets that can an
24+
// uneven bitrate. It then delivers these packets in a smoother manner by
25+
// sending a fixed size of them on an interval
26+
class RTC_CPP_EXPORT PacingHandler : public MediaHandler {
27+
public:
28+
PacingHandler(size_t windowSize, std::chrono::milliseconds sendInterval);
29+
30+
void outgoing(message_vector &messages, const message_callback &send) override;
31+
32+
private:
33+
std::atomic<bool> mHaveScheduled = false;
34+
size_t mWindowSize;
35+
std::chrono::milliseconds mSendInterval;
36+
37+
class RTC_CPP_EXPORT Storage {
38+
public:
39+
std::mutex mMutex;
40+
std::deque<message_ptr> mRtpBuffer = {};
41+
};
42+
43+
const shared_ptr<Storage> mStorage;
44+
45+
void schedule(const message_callback &send);
46+
};
47+
48+
} // namespace rtc
49+
50+
#endif // RTC_ENABLE_MEDIA
51+
52+
#endif // RTC_PACING_HANDLER_H

include/rtc/rtc.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "h265rtppacketizer.hpp"
3535
#include "mediahandler.hpp"
3636
#include "plihandler.hpp"
37+
#include "pacinghandler.hpp"
3738
#include "rtcpnackresponder.hpp"
3839
#include "rtcpreceivingsession.hpp"
3940
#include "rtcpsrreporter.hpp"

src/pacinghandler.cpp

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/**
2+
* Copyright (c) 2020 Filip Klembara (in2core)
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
7+
*/
8+
9+
#if RTC_ENABLE_MEDIA
10+
11+
#include <memory>
12+
13+
#include "pacinghandler.hpp"
14+
15+
#include "impl/internals.hpp"
16+
#include "impl/threadpool.hpp"
17+
18+
namespace rtc {
19+
20+
PacingHandler::PacingHandler(size_t windowSize, std::chrono::milliseconds sendInterval)
21+
: mWindowSize(windowSize), mSendInterval(sendInterval), mStorage(std::make_shared<Storage>()){};
22+
23+
void PacingHandler::schedule(const message_callback &send) {
24+
if (!mHaveScheduled.exchange(true)) {
25+
return;
26+
}
27+
28+
impl::ThreadPool::Instance().schedule(mSendInterval, [weak_this = weak_from_this(), send]() {
29+
if (auto locked = std::dynamic_pointer_cast<PacingHandler>(weak_this.lock())) {
30+
const std::lock_guard<std::mutex> lock(locked->mStorage->mMutex);
31+
locked->mHaveScheduled.store(false);
32+
33+
size_t amountSent = 0;
34+
while (!locked->mStorage->mRtpBuffer.empty()) {
35+
amountSent += locked->mStorage->mRtpBuffer.front()->size();
36+
if (amountSent >= locked->mWindowSize) {
37+
return;
38+
}
39+
40+
send(std::move(locked->mStorage->mRtpBuffer.front()));
41+
locked->mStorage->mRtpBuffer.pop_front();
42+
}
43+
}
44+
});
45+
}
46+
47+
void PacingHandler::outgoing(message_vector &messages, const message_callback &send) {
48+
49+
std::lock_guard<std::mutex> lock(mStorage->mMutex);
50+
51+
std::move(std::begin(messages), std::end(messages), std::back_inserter(mStorage->mRtpBuffer));
52+
messages.clear();
53+
54+
schedule(send);
55+
}
56+
57+
} // namespace rtc
58+
59+
#endif /* RTC_ENABLE_MEDIA */

0 commit comments

Comments
 (0)