Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FrameInfo #1121

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ set(LIBDATACHANNEL_HEADERS
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/common.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/global.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/message.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/frameinfo.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/peerconnection.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/reliability.hpp
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtc.h
Expand Down
23 changes: 23 additions & 0 deletions include/rtc/frameinfo.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Copyright (c) 2019-2020 Paul-Louis Ageneau
*
* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
*/

#ifndef RTC_FRAMEINFO_H
#define RTC_FRAMEINFO_H

#include "common.hpp"

namespace rtc {

struct RTC_CPP_EXPORT FrameInfo {
FrameInfo(uint32_t timestamp) : timestamp(timestamp){};
uint32_t timestamp = 0; // RTP Timestamp
};

} // namespace rtc

#endif // RTC_FRAMEINFO_H
3 changes: 2 additions & 1 deletion include/rtc/h264rtpdepacketizer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ class RTC_CPP_EXPORT H264RtpDepacketizer : public MediaHandler {
private:
std::vector<message_ptr> mRtpBuffer;

message_vector buildFrames(message_vector::iterator firstPkt, message_vector::iterator lastPkt);
message_vector buildFrames(message_vector::iterator firstPkt, message_vector::iterator lastPkt,
uint32_t timestamp);
};

} // namespace rtc
Expand Down
9 changes: 7 additions & 2 deletions include/rtc/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#define RTC_MESSAGE_H

#include "common.hpp"
#include "frameinfo.hpp"
#include "reliability.hpp"

#include <functional>
Expand All @@ -32,6 +33,7 @@ struct RTC_CPP_EXPORT Message : binary {
unsigned int stream = 0; // Stream id (SCTP stream or SSRC)
unsigned int dscp = 0; // Differentiated Services Code Point
shared_ptr<Reliability> reliability;
shared_ptr<FrameInfo> frame_info;
};

using message_ptr = shared_ptr<Message>;
Expand All @@ -44,10 +46,12 @@ inline size_t message_size_func(const message_ptr &m) {

template <typename Iterator>
message_ptr make_message(Iterator begin, Iterator end, Message::Type type = Message::Binary,
unsigned int stream = 0, shared_ptr<Reliability> reliability = nullptr) {
unsigned int stream = 0, shared_ptr<Reliability> reliability = nullptr,
shared_ptr<FrameInfo> frame_info = nullptr) {
auto message = std::make_shared<Message>(begin, end, type);
message->stream = stream;
message->reliability = reliability;
message->frame_info = frame_info;
return message;
}

Expand All @@ -57,7 +61,8 @@ RTC_CPP_EXPORT message_ptr make_message(size_t size, Message::Type type = Messag

RTC_CPP_EXPORT message_ptr make_message(binary &&data, Message::Type type = Message::Binary,
unsigned int stream = 0,
shared_ptr<Reliability> reliability = nullptr);
shared_ptr<Reliability> reliability = nullptr,
shared_ptr<FrameInfo> frame_info = nullptr);

RTC_CPP_EXPORT message_ptr make_message(size_t size, message_ptr orig);

Expand Down
2 changes: 2 additions & 0 deletions include/rtc/track.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class RTC_CPP_EXPORT Track final : private CheshireCat<impl::Track>, public Chan
bool isClosed(void) const override;
size_t maxMessageSize() const override;

void onFrame(std::function<void(binary data, FrameInfo frame)> callback);

bool requestKeyframe();
bool requestBitrate(unsigned int bitrate);

Expand Down
47 changes: 26 additions & 21 deletions src/h264rtpdepacketizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ const uint8_t naluTypeSTAPA = 24;
const uint8_t naluTypeFUA = 28;

message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin,
message_vector::iterator end) {
message_vector::iterator end, uint32_t timestamp) {
message_vector out = {};
auto fua_buffer = std::vector<std::byte>{};
auto frame_info = std::make_shared<FrameInfo>(timestamp);

for (auto it = begin; it != end; it++) {
auto pkt = it->get();
Expand All @@ -58,11 +59,13 @@ message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin,
fua_buffer.at(0) =
std::byte(nalUnitHeader.idc() | nalUnitFragmentHeader.unitType());

out.push_back(make_message(std::move(fua_buffer)));
out.push_back(
make_message(std::move(fua_buffer), Message::Binary, 0, nullptr, frame_info));
fua_buffer.clear();
}
} else if (nalUnitHeader.unitType() > 0 && nalUnitHeader.unitType() < 24) {
out.push_back(make_message(pkt->begin() + headerSize, pkt->end()));
out.push_back(make_message(pkt->begin() + headerSize, pkt->end(), Message::Binary, 0,
nullptr, frame_info));
} else if (nalUnitHeader.unitType() == naluTypeSTAPA) {
auto currOffset = stapaHeaderSize + headerSize;

Expand All @@ -76,11 +79,11 @@ message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin,
throw std::runtime_error("STAP-A declared size is larger then buffer");
}

out.push_back(
make_message(pkt->begin() + currOffset, pkt->begin() + currOffset + naluSize));
out.push_back(make_message(pkt->begin() + currOffset,
pkt->begin() + currOffset + naluSize, Message::Binary, 0,
nullptr, frame_info));
currOffset += naluSize;
}

} else {
throw std::runtime_error("Unknown H264 RTP Packetization");
}
Expand All @@ -90,20 +93,22 @@ message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin,
}

void H264RtpDepacketizer::incoming(message_vector &messages, const message_callback &) {
for (auto message : messages) {
if (message->type == Message::Control) {
continue; // RTCP
}

if (message->size() < sizeof(RtpHeader)) {
PLOG_VERBOSE << "RTP packet is too small, size=" << message->size();
continue;
}

mRtpBuffer.push_back(message);
}

messages.clear();
messages.erase(std::remove_if(messages.begin(), messages.end(),
[&](message_ptr message) {
if (message->type == Message::Control) {
return false;
}

if (message->size() < sizeof(RtpHeader)) {
PLOG_VERBOSE << "RTP packet is too small, size="
<< message->size();
return false;
}

mRtpBuffer.push_back(std::move(message));
return true;
}),
messages.end());

while (mRtpBuffer.size() != 0) {
uint32_t current_timestamp = 0;
Expand All @@ -128,7 +133,7 @@ void H264RtpDepacketizer::incoming(message_vector &messages, const message_callb
auto begin = mRtpBuffer.begin();
auto end = mRtpBuffer.begin() + (packets_in_timestamp - 1);

auto frames = buildFrames(begin, end + 1);
auto frames = buildFrames(begin, end + 1, current_timestamp);
messages.insert(messages.end(), frames.begin(), frames.end());
mRtpBuffer.erase(mRtpBuffer.begin(), mRtpBuffer.begin() + packets_in_timestamp);
}
Expand Down
4 changes: 2 additions & 2 deletions src/impl/channel.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct Channel {
virtual void triggerAvailable(size_t count);
virtual void triggerBufferedAmount(size_t amount);

void flushPendingMessages();
virtual void flushPendingMessages();
void resetOpenCallback();
void resetCallbacks();

Expand All @@ -43,7 +43,7 @@ struct Channel {
std::atomic<size_t> bufferedAmount = 0;
std::atomic<size_t> bufferedAmountLowThreshold = 0;

private:
protected:
std::atomic<bool> mOpenTriggered = false;
};

Expand Down
48 changes: 37 additions & 11 deletions src/impl/track.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,24 +75,23 @@ void Track::close() {
resetCallbacks();
}

message_variant Track::trackMessageToVariant(message_ptr message) {
if (message->type == Message::Control)
return to_variant(*message); // The same message may be frowarded into multiple Tracks
else
return to_variant(std::move(*message));
}

optional<message_variant> Track::receive() {
if (auto next = mRecvQueue.pop()) {
message_ptr message = *next;
if (message->type == Message::Control)
return to_variant(**next); // The same message may be frowarded into multiple Tracks
else
return to_variant(std::move(*message));
return trackMessageToVariant(*next);
}
return nullopt;
}

optional<message_variant> Track::peek() {
if (auto next = mRecvQueue.peek()) {
message_ptr message = *next;
if (message->type == Message::Control)
return to_variant(**next); // The same message may be forwarded into multiple Tracks
else
return to_variant(std::move(*message));
return trackMessageToVariant(*next);
}
return nullopt;
}
Expand Down Expand Up @@ -217,7 +216,7 @@ void Track::setMediaHandler(shared_ptr<MediaHandler> handler) {
mMediaHandler = handler;
}

if(handler)
if (handler)
handler->media(description());
}

Expand All @@ -226,4 +225,31 @@ shared_ptr<MediaHandler> Track::getMediaHandler() {
return mMediaHandler;
}

void Track::onFrame(std::function<void(binary data, FrameInfo frame)> callback) {
frameCallback = callback;
flushPendingMessages();
}

void Track::flushPendingMessages() {
if (!mOpenTriggered)
return;

while (messageCallback || frameCallback) {
auto next = mRecvQueue.pop();
if (!next)
break;

auto message = next.value();
try {
if (message->frame_info != nullptr && frameCallback) {
frameCallback(std::move(*message), std::move(*message->frame_info));
} else if (message->frame_info == nullptr && messageCallback) {
messageCallback(trackMessageToVariant(message));
}
} catch (const std::exception &e) {
PLOG_WARNING << "Uncaught exception in callback: " << e.what();
}
}
}

} // namespace rtc::impl
6 changes: 6 additions & 0 deletions src/impl/track.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ class Track final : public std::enable_shared_from_this<Track>, public Channel {
optional<message_variant> receive() override;
optional<message_variant> peek() override;
size_t availableAmount() const override;
void flushPendingMessages() override;
message_variant trackMessageToVariant(message_ptr message);

void onFrame(std::function<void(binary data, FrameInfo frame)> callback);

bool isOpen() const;
bool isClosed() const;
Expand Down Expand Up @@ -71,6 +75,8 @@ class Track final : public std::enable_shared_from_this<Track>, public Channel {
std::atomic<bool> mIsClosed = false;

Queue<message_ptr> mRecvQueue;

synchronized_callback<binary, FrameInfo> frameCallback;
};

} // namespace rtc::impl
Expand Down
5 changes: 3 additions & 2 deletions src/message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ message_ptr make_message(size_t size, Message::Type type, unsigned int stream,
}

message_ptr make_message(binary &&data, Message::Type type, unsigned int stream,
shared_ptr<Reliability> reliability) {
shared_ptr<Reliability> reliability, shared_ptr<FrameInfo> frame_info) {
auto message = std::make_shared<Message>(std::move(data), type);
message->stream = stream;
message->reliability = reliability;
message->frame_info = frame_info;
return message;
}

message_ptr make_message(size_t size, message_ptr orig) {
if(!orig)
if (!orig)
return nullptr;

auto message = std::make_shared<Message>(size, orig->type);
Expand Down
4 changes: 4 additions & 0 deletions src/track.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,8 @@ bool Track::requestBitrate(unsigned int bitrate) {

shared_ptr<MediaHandler> Track::getMediaHandler() { return impl()->getMediaHandler(); }

void Track::onFrame(std::function<void(binary data, FrameInfo frame)> callback) {
impl()->onFrame(callback);
}

} // namespace rtc
Loading