Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add PacingHandler #1153

Merged
merged 4 commits into from
Apr 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ set(LIBDATACHANNEL_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/src/rtp.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/capi.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/plihandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/src/pacinghandler.cpp
)

set(LIBDATACHANNEL_HEADERS
Expand Down Expand Up @@ -123,6 +124,7 @@ set(LIBDATACHANNEL_HEADERS
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpnackresponder.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/utils.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/plihandler.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/pacinghandler.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/version.h
)

Expand Down
50 changes: 50 additions & 0 deletions include/rtc/pacinghandler.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/**
* Copyright (c) 2024 Sean DuBois <sean@siobud.com>
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/

#ifndef RTC_PACING_HANDLER_H
#define RTC_PACING_HANDLER_H

#if RTC_ENABLE_MEDIA

#include "mediahandler.hpp"
#include "utils.hpp"

#include <atomic>
#include <queue>

namespace rtc {

// Paced sending of RTP packets. Takes a stream of RTP packets that can an
// uneven bitrate. It then delivers these packets in a smoother manner by
// sending a fixed size of them on an interval
class RTC_CPP_EXPORT PacingHandler : public MediaHandler {
public:
PacingHandler(double bitsPerSecond, std::chrono::milliseconds sendInterval);

void outgoing(message_vector &messages, const message_callback &send) override;

private:
std::atomic<bool> mHaveScheduled = false;

double mBytesPerSecond;
double mBudget;

std::chrono::milliseconds mSendInterval;
std::chrono::time_point<std::chrono::high_resolution_clock> mLastRun;

std::mutex mMutex;
std::queue<message_ptr> mRtpBuffer;

void schedule(const message_callback &send);
};

} // namespace rtc

#endif // RTC_ENABLE_MEDIA

#endif // RTC_PACING_HANDLER_H
1 change: 1 addition & 0 deletions include/rtc/rtc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "h265rtppacketizer.hpp"
#include "mediahandler.hpp"
#include "plihandler.hpp"
#include "pacinghandler.hpp"
#include "rtcpnackresponder.hpp"
#include "rtcpreceivingsession.hpp"
#include "rtcpsrreporter.hpp"
Expand Down
12 changes: 10 additions & 2 deletions src/impl/track.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,11 @@ void Track::incoming(message_ptr message) {

message_vector messages{std::move(message)};
if (auto handler = getMediaHandler())
handler->incomingChain(messages, [this](message_ptr m) { transportSend(m); });
handler->incomingChain(messages, [this, weak_this = weak_from_this()](message_ptr m) {
if (auto locked = weak_this.lock()) {
transportSend(m);
}
});

for (auto &m : messages) {
// Tail drop if queue is full
Expand Down Expand Up @@ -175,7 +179,11 @@ bool Track::outgoing(message_ptr message) {

if (handler) {
message_vector messages{std::move(message)};
handler->outgoingChain(messages, [this](message_ptr m) { transportSend(m); });
handler->outgoingChain(messages, [this, weak_this = weak_from_this()](message_ptr m) {
if (auto locked = weak_this.lock()) {
transportSend(m);
}
});
bool ret = false;
for (auto &m : messages)
ret = transportSend(std::move(m));
Expand Down
72 changes: 72 additions & 0 deletions src/pacinghandler.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Copyright (c) 2024 Sean DuBois <sean@siobud.com>
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/

#if RTC_ENABLE_MEDIA

#include <memory>

#include "pacinghandler.hpp"

#include "impl/internals.hpp"
#include "impl/threadpool.hpp"

namespace rtc {

PacingHandler::PacingHandler(double bitsPerSecond, std::chrono::milliseconds sendInterval)
: mBytesPerSecond(bitsPerSecond / 8), mBudget(0), mSendInterval(sendInterval){};

void PacingHandler::schedule(const message_callback &send) {
if (!mHaveScheduled.exchange(true)) {
return;
}

impl::ThreadPool::Instance().schedule(mSendInterval, [this, weak_this = weak_from_this(),
send]() {
if (auto locked = weak_this.lock()) {
const std::lock_guard<std::mutex> lock(mMutex);
mHaveScheduled.store(false);

// Update the budget and cap it
auto newBudget =
std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - mLastRun)
.count() *
mBytesPerSecond;
auto maxBudget = std::chrono::duration<double>(mSendInterval).count() * mBytesPerSecond;
mBudget = std::min(mBudget + newBudget, maxBudget);
mLastRun = std::chrono::high_resolution_clock::now();

// Send packets while there is budget, allow a single partial packet over budget
while (!mRtpBuffer.empty() && mBudget > 0) {
auto size = int(mRtpBuffer.front()->size());
send(std::move(mRtpBuffer.front()));
mRtpBuffer.pop();
mBudget -= size;
}

if (!mRtpBuffer.empty()) {
schedule(send);
}
}
});
}

void PacingHandler::outgoing(message_vector &messages, const message_callback &send) {

std::lock_guard<std::mutex> lock(mMutex);

for (auto &m : messages) {
mRtpBuffer.push(std::move(m));
}
messages.clear();

schedule(send);
}

} // namespace rtc

#endif /* RTC_ENABLE_MEDIA */
Loading