Skip to content

Commit 8c8aa31

Browse files
Merge pull request #1123 from Sean-Der/opus
Add OpusRtpDepacketizer
2 parents 437a758 + 1439a65 commit 8c8aa31

14 files changed

+197
-39
lines changed

CMakeLists.txt

+3
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ set(LIBDATACHANNEL_SOURCES
7575
${CMAKE_CURRENT_SOURCE_DIR}/src/rtppacketizationconfig.cpp
7676
${CMAKE_CURRENT_SOURCE_DIR}/src/rtcpsrreporter.cpp
7777
${CMAKE_CURRENT_SOURCE_DIR}/src/rtppacketizer.cpp
78+
${CMAKE_CURRENT_SOURCE_DIR}/src/rtpdepacketizer.cpp
7879
${CMAKE_CURRENT_SOURCE_DIR}/src/h264rtppacketizer.cpp
7980
${CMAKE_CURRENT_SOURCE_DIR}/src/h264rtpdepacketizer.cpp
8081
${CMAKE_CURRENT_SOURCE_DIR}/src/nalunit.cpp
@@ -99,6 +100,7 @@ set(LIBDATACHANNEL_HEADERS
99100
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/common.hpp
100101
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/global.hpp
101102
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/message.hpp
103+
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/frameinfo.hpp
102104
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/peerconnection.hpp
103105
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/reliability.hpp
104106
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtc.h
@@ -110,6 +112,7 @@ set(LIBDATACHANNEL_HEADERS
110112
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtppacketizationconfig.hpp
111113
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtcpsrreporter.hpp
112114
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtppacketizer.hpp
115+
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/rtpdepacketizer.hpp
113116
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/h264rtppacketizer.hpp
114117
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/h264rtpdepacketizer.hpp
115118
${CMAKE_CURRENT_SOURCE_DIR}/include/rtc/nalunit.hpp

include/rtc/frameinfo.hpp

+23
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
/**
2+
* Copyright (c) 2019-2020 Paul-Louis Ageneau
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
7+
*/
8+
9+
#ifndef RTC_FRAMEINFO_H
10+
#define RTC_FRAMEINFO_H
11+
12+
#include "common.hpp"
13+
14+
namespace rtc {
15+
16+
struct RTC_CPP_EXPORT FrameInfo {
17+
FrameInfo(uint32_t timestamp) : timestamp(timestamp){};
18+
uint32_t timestamp = 0; // RTP Timestamp
19+
};
20+
21+
} // namespace rtc
22+
23+
#endif // RTC_FRAMEINFO_H

include/rtc/h264rtpdepacketizer.hpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ class RTC_CPP_EXPORT H264RtpDepacketizer : public MediaHandler {
3232
private:
3333
std::vector<message_ptr> mRtpBuffer;
3434

35-
message_vector buildFrames(message_vector::iterator firstPkt, message_vector::iterator lastPkt);
35+
message_vector buildFrames(message_vector::iterator firstPkt, message_vector::iterator lastPkt,
36+
uint32_t timestamp);
3637
};
3738

3839
} // namespace rtc

include/rtc/message.hpp

+7-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
#define RTC_MESSAGE_H
1111

1212
#include "common.hpp"
13+
#include "frameinfo.hpp"
1314
#include "reliability.hpp"
1415

1516
#include <functional>
@@ -32,6 +33,7 @@ struct RTC_CPP_EXPORT Message : binary {
3233
unsigned int stream = 0; // Stream id (SCTP stream or SSRC)
3334
unsigned int dscp = 0; // Differentiated Services Code Point
3435
shared_ptr<Reliability> reliability;
36+
shared_ptr<FrameInfo> frameInfo;
3537
};
3638

3739
using message_ptr = shared_ptr<Message>;
@@ -44,10 +46,12 @@ inline size_t message_size_func(const message_ptr &m) {
4446

4547
template <typename Iterator>
4648
message_ptr make_message(Iterator begin, Iterator end, Message::Type type = Message::Binary,
47-
unsigned int stream = 0, shared_ptr<Reliability> reliability = nullptr) {
49+
unsigned int stream = 0, shared_ptr<Reliability> reliability = nullptr,
50+
shared_ptr<FrameInfo> frameInfo = nullptr) {
4851
auto message = std::make_shared<Message>(begin, end, type);
4952
message->stream = stream;
5053
message->reliability = reliability;
54+
message->frameInfo = frameInfo;
5155
return message;
5256
}
5357

@@ -57,7 +61,8 @@ RTC_CPP_EXPORT message_ptr make_message(size_t size, Message::Type type = Messag
5761

5862
RTC_CPP_EXPORT message_ptr make_message(binary &&data, Message::Type type = Message::Binary,
5963
unsigned int stream = 0,
60-
shared_ptr<Reliability> reliability = nullptr);
64+
shared_ptr<Reliability> reliability = nullptr,
65+
shared_ptr<FrameInfo> frameInfo = nullptr);
6166

6267
RTC_CPP_EXPORT message_ptr make_message(size_t size, message_ptr orig);
6368

include/rtc/rtc.hpp

+1
Original file line numberDiff line numberDiff line change
@@ -38,5 +38,6 @@
3838
#include "rtcpreceivingsession.hpp"
3939
#include "rtcpsrreporter.hpp"
4040
#include "rtppacketizer.hpp"
41+
#include "rtpdepacketizer.hpp"
4142

4243
#endif // RTC_ENABLE_MEDIA

include/rtc/rtpdepacketizer.hpp

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
/**
2+
* Copyright (c) 2024 Paul-Louis Ageneau
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at https://mozilla.org/MPL/2.0/.
7+
*/
8+
9+
#ifndef RTC_RTP_DEPACKETIZER_H
10+
#define RTC_RTP_DEPACKETIZER_H
11+
12+
#if RTC_ENABLE_MEDIA
13+
14+
#include "mediahandler.hpp"
15+
#include "message.hpp"
16+
17+
namespace rtc {
18+
19+
class RTC_CPP_EXPORT RtpDepacketizer : public MediaHandler {
20+
public:
21+
RtpDepacketizer() = default;
22+
virtual ~RtpDepacketizer() = default;
23+
24+
virtual void incoming(message_vector &messages, const message_callback &send) override;
25+
};
26+
27+
using OpusRtpDepacketizer = RtpDepacketizer;
28+
using AACRtpDepacketizer = RtpDepacketizer;
29+
30+
} // namespace rtc
31+
32+
#endif /* RTC_ENABLE_MEDIA */
33+
34+
#endif /* RTC_RTP_DEPACKETIZER_H */

include/rtc/track.hpp

+2
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ class RTC_CPP_EXPORT Track final : private CheshireCat<impl::Track>, public Chan
4141
bool isClosed(void) const override;
4242
size_t maxMessageSize() const override;
4343

44+
void onFrame(std::function<void(binary data, FrameInfo frame)> callback);
45+
4446
bool requestKeyframe();
4547
bool requestBitrate(unsigned int bitrate);
4648

src/h264rtpdepacketizer.cpp

+26-21
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,10 @@ const uint8_t naluTypeSTAPA = 24;
3232
const uint8_t naluTypeFUA = 28;
3333

3434
message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin,
35-
message_vector::iterator end) {
35+
message_vector::iterator end, uint32_t timestamp) {
3636
message_vector out = {};
3737
auto fua_buffer = std::vector<std::byte>{};
38+
auto frameInfo = std::make_shared<FrameInfo>(timestamp);
3839

3940
for (auto it = begin; it != end; it++) {
4041
auto pkt = it->get();
@@ -58,11 +59,13 @@ message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin,
5859
fua_buffer.at(0) =
5960
std::byte(nalUnitHeader.idc() | nalUnitFragmentHeader.unitType());
6061

61-
out.push_back(make_message(std::move(fua_buffer)));
62+
out.push_back(
63+
make_message(std::move(fua_buffer), Message::Binary, 0, nullptr, frameInfo));
6264
fua_buffer.clear();
6365
}
6466
} else if (nalUnitHeader.unitType() > 0 && nalUnitHeader.unitType() < 24) {
65-
out.push_back(make_message(pkt->begin() + headerSize, pkt->end()));
67+
out.push_back(make_message(pkt->begin() + headerSize, pkt->end(), Message::Binary, 0,
68+
nullptr, frameInfo));
6669
} else if (nalUnitHeader.unitType() == naluTypeSTAPA) {
6770
auto currOffset = stapaHeaderSize + headerSize;
6871

@@ -76,11 +79,11 @@ message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin,
7679
throw std::runtime_error("STAP-A declared size is larger then buffer");
7780
}
7881

79-
out.push_back(
80-
make_message(pkt->begin() + currOffset, pkt->begin() + currOffset + naluSize));
82+
out.push_back(make_message(pkt->begin() + currOffset,
83+
pkt->begin() + currOffset + naluSize, Message::Binary, 0,
84+
nullptr, frameInfo));
8185
currOffset += naluSize;
8286
}
83-
8487
} else {
8588
throw std::runtime_error("Unknown H264 RTP Packetization");
8689
}
@@ -90,20 +93,22 @@ message_vector H264RtpDepacketizer::buildFrames(message_vector::iterator begin,
9093
}
9194

9295
void H264RtpDepacketizer::incoming(message_vector &messages, const message_callback &) {
93-
for (auto message : messages) {
94-
if (message->type == Message::Control) {
95-
continue; // RTCP
96-
}
97-
98-
if (message->size() < sizeof(RtpHeader)) {
99-
PLOG_VERBOSE << "RTP packet is too small, size=" << message->size();
100-
continue;
101-
}
102-
103-
mRtpBuffer.push_back(message);
104-
}
105-
106-
messages.clear();
96+
messages.erase(std::remove_if(messages.begin(), messages.end(),
97+
[&](message_ptr message) {
98+
if (message->type == Message::Control) {
99+
return false;
100+
}
101+
102+
if (message->size() < sizeof(RtpHeader)) {
103+
PLOG_VERBOSE << "RTP packet is too small, size="
104+
<< message->size();
105+
return true;
106+
}
107+
108+
mRtpBuffer.push_back(std::move(message));
109+
return true;
110+
}),
111+
messages.end());
107112

108113
while (mRtpBuffer.size() != 0) {
109114
uint32_t current_timestamp = 0;
@@ -128,7 +133,7 @@ void H264RtpDepacketizer::incoming(message_vector &messages, const message_callb
128133
auto begin = mRtpBuffer.begin();
129134
auto end = mRtpBuffer.begin() + (packets_in_timestamp - 1);
130135

131-
auto frames = buildFrames(begin, end + 1);
136+
auto frames = buildFrames(begin, end + 1, current_timestamp);
132137
messages.insert(messages.end(), frames.begin(), frames.end());
133138
mRtpBuffer.erase(mRtpBuffer.begin(), mRtpBuffer.begin() + packets_in_timestamp);
134139
}

src/impl/channel.hpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ struct Channel {
2828
virtual void triggerAvailable(size_t count);
2929
virtual void triggerBufferedAmount(size_t amount);
3030

31-
void flushPendingMessages();
31+
virtual void flushPendingMessages();
3232
void resetOpenCallback();
3333
void resetCallbacks();
3434

@@ -43,7 +43,7 @@ struct Channel {
4343
std::atomic<size_t> bufferedAmount = 0;
4444
std::atomic<size_t> bufferedAmountLowThreshold = 0;
4545

46-
private:
46+
protected:
4747
std::atomic<bool> mOpenTriggered = false;
4848
};
4949

src/impl/track.cpp

+37-11
Original file line numberDiff line numberDiff line change
@@ -75,24 +75,23 @@ void Track::close() {
7575
resetCallbacks();
7676
}
7777

78+
message_variant Track::trackMessageToVariant(message_ptr message) {
79+
if (message->type == Message::Control)
80+
return to_variant(*message); // The same message may be frowarded into multiple Tracks
81+
else
82+
return to_variant(std::move(*message));
83+
}
84+
7885
optional<message_variant> Track::receive() {
7986
if (auto next = mRecvQueue.pop()) {
80-
message_ptr message = *next;
81-
if (message->type == Message::Control)
82-
return to_variant(**next); // The same message may be frowarded into multiple Tracks
83-
else
84-
return to_variant(std::move(*message));
87+
return trackMessageToVariant(*next);
8588
}
8689
return nullopt;
8790
}
8891

8992
optional<message_variant> Track::peek() {
9093
if (auto next = mRecvQueue.peek()) {
91-
message_ptr message = *next;
92-
if (message->type == Message::Control)
93-
return to_variant(**next); // The same message may be forwarded into multiple Tracks
94-
else
95-
return to_variant(std::move(*message));
94+
return trackMessageToVariant(*next);
9695
}
9796
return nullopt;
9897
}
@@ -217,7 +216,7 @@ void Track::setMediaHandler(shared_ptr<MediaHandler> handler) {
217216
mMediaHandler = handler;
218217
}
219218

220-
if(handler)
219+
if (handler)
221220
handler->media(description());
222221
}
223222

@@ -226,4 +225,31 @@ shared_ptr<MediaHandler> Track::getMediaHandler() {
226225
return mMediaHandler;
227226
}
228227

228+
void Track::onFrame(std::function<void(binary data, FrameInfo frame)> callback) {
229+
frameCallback = callback;
230+
flushPendingMessages();
231+
}
232+
233+
void Track::flushPendingMessages() {
234+
if (!mOpenTriggered)
235+
return;
236+
237+
while (messageCallback || frameCallback) {
238+
auto next = mRecvQueue.pop();
239+
if (!next)
240+
break;
241+
242+
auto message = next.value();
243+
try {
244+
if (message->frameInfo != nullptr && frameCallback) {
245+
frameCallback(std::move(*message), std::move(*message->frameInfo));
246+
} else if (message->frameInfo == nullptr && messageCallback) {
247+
messageCallback(trackMessageToVariant(message));
248+
}
249+
} catch (const std::exception &e) {
250+
PLOG_WARNING << "Uncaught exception in callback: " << e.what();
251+
}
252+
}
253+
}
254+
229255
} // namespace rtc::impl

src/impl/track.hpp

+6
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ class Track final : public std::enable_shared_from_this<Track>, public Channel {
3838
optional<message_variant> receive() override;
3939
optional<message_variant> peek() override;
4040
size_t availableAmount() const override;
41+
void flushPendingMessages() override;
42+
message_variant trackMessageToVariant(message_ptr message);
43+
44+
void onFrame(std::function<void(binary data, FrameInfo frame)> callback);
4145

4246
bool isOpen() const;
4347
bool isClosed() const;
@@ -71,6 +75,8 @@ class Track final : public std::enable_shared_from_this<Track>, public Channel {
7175
std::atomic<bool> mIsClosed = false;
7276

7377
Queue<message_ptr> mRecvQueue;
78+
79+
synchronized_callback<binary, FrameInfo> frameCallback;
7480
};
7581

7682
} // namespace rtc::impl

src/message.cpp

+3-2
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,16 @@ message_ptr make_message(size_t size, Message::Type type, unsigned int stream,
1919
}
2020

2121
message_ptr make_message(binary &&data, Message::Type type, unsigned int stream,
22-
shared_ptr<Reliability> reliability) {
22+
shared_ptr<Reliability> reliability, shared_ptr<FrameInfo> frameInfo) {
2323
auto message = std::make_shared<Message>(std::move(data), type);
2424
message->stream = stream;
2525
message->reliability = reliability;
26+
message->frameInfo = frameInfo;
2627
return message;
2728
}
2829

2930
message_ptr make_message(size_t size, message_ptr orig) {
30-
if(!orig)
31+
if (!orig)
3132
return nullptr;
3233

3334
auto message = std::make_shared<Message>(size, orig->type);

0 commit comments

Comments
 (0)