@@ -440,23 +440,28 @@ void PeerConnection::forwardMessage(message_ptr message) {
440
440
return ;
441
441
442
442
const uint16_t stream = uint16_t (message->stream );
443
- auto channel = findDataChannel (stream);
443
+ auto [ channel, found] = findDataChannel (stream);
444
444
445
445
if (DataChannel::IsOpenMessage (message)) {
446
+ if (found) {
447
+ // The stream is already used, the receiver must close the DataChannel
448
+ PLOG_WARNING << " Got open message on already used stream " << stream;
449
+ if (channel && channel->isOpen ())
450
+ channel->close ();
451
+ else
452
+ sctpTransport->closeStream (message->stream );
453
+
454
+ return ;
455
+ }
456
+
446
457
const uint16_t remoteParity = (iceTransport->role () == Description::Role::Active) ? 1 : 0 ;
447
458
if (stream % 2 != remoteParity) {
448
- // The odd/even rule is violated, close the DataChannel
459
+ // The odd/even rule is violated, the receiver must close the DataChannel
449
460
PLOG_WARNING << " Got open message violating the odd/even rule on stream " << stream;
450
461
sctpTransport->closeStream (message->stream );
451
462
return ;
452
463
}
453
464
454
- if (channel && channel->isOpen ()) {
455
- PLOG_WARNING << " Got open message on stream " << stream
456
- << " for an already open DataChannel, closing it first" ;
457
- channel->close ();
458
- }
459
-
460
465
channel = std::make_shared<IncomingDataChannel>(weak_from_this (), sctpTransport);
461
466
channel->assignStream (stream);
462
467
channel->openCallback =
@@ -465,8 +470,7 @@ void PeerConnection::forwardMessage(message_ptr message) {
465
470
std::unique_lock lock (mDataChannelsMutex ); // we are going to emplace
466
471
mDataChannels .emplace (stream, channel);
467
472
}
468
-
469
- if (!channel) {
473
+ else if (!found) {
470
474
if (message->type == Message::Reset)
471
475
return ; // ignore
472
476
@@ -476,8 +480,18 @@ void PeerConnection::forwardMessage(message_ptr message) {
476
480
return ;
477
481
}
478
482
479
- // Forward the message
480
- channel->incoming (message);
483
+ if (message->type == Message::Reset) {
484
+ // Incoming stream is reset, unregister it
485
+ removeDataChannel (stream);
486
+ }
487
+
488
+ if (channel) {
489
+ // Forward the message
490
+ channel->incoming (message);
491
+ } else {
492
+ // DataChannel was destroyed, ignore
493
+ PLOG_DEBUG << " Ignored message on stream " << stream << " , DataChannel is destroyed" ;
494
+ }
481
495
}
482
496
483
497
void PeerConnection::forwardMedia (message_ptr message) {
@@ -571,12 +585,12 @@ void PeerConnection::forwardMedia(message_ptr message) {
571
585
}
572
586
573
587
void PeerConnection::forwardBufferedAmount (uint16_t stream, size_t amount) {
574
- if (auto channel = findDataChannel (stream))
588
+ [[maybe_unused]] auto [channel, found] = findDataChannel (stream);
589
+ if (channel)
575
590
channel->triggerBufferedAmount (amount);
576
591
}
577
592
578
593
shared_ptr<DataChannel> PeerConnection::emplaceDataChannel (string label, DataChannelInit init) {
579
- cleanupDataChannels ();
580
594
std::unique_lock lock (mDataChannelsMutex ); // we are going to emplace
581
595
582
596
// If the DataChannel is user-negotiated, do not negotiate it in-band
@@ -613,13 +627,17 @@ shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(string label, DataCha
613
627
return channel;
614
628
}
615
629
616
- shared_ptr<DataChannel> PeerConnection::findDataChannel (uint16_t stream) {
630
+ std::pair< shared_ptr<DataChannel>, bool > PeerConnection::findDataChannel (uint16_t stream) {
617
631
std::shared_lock lock (mDataChannelsMutex ); // read-only
618
632
if (auto it = mDataChannels .find (stream); it != mDataChannels .end ())
619
- if (auto channel = it->second .lock ())
620
- return channel;
633
+ return std::make_pair (it->second .lock (), true );
634
+ else
635
+ return std::make_pair (nullptr , false );
636
+ }
621
637
622
- return nullptr ;
638
+ bool PeerConnection::removeDataChannel (uint16_t stream) {
639
+ std::unique_lock lock (mDataChannelsMutex ); // we are going to erase
640
+ return mDataChannels .erase (stream) != 0 ;
623
641
}
624
642
625
643
uint16_t PeerConnection::maxDataChannelStream () const {
@@ -650,8 +668,7 @@ void PeerConnection::assignDataChannels() {
650
668
if (stream > maxStream)
651
669
throw std::runtime_error (" Too many DataChannels" );
652
670
653
- auto it = mDataChannels .find (stream);
654
- if (it == mDataChannels .end () || !it->second .lock ())
671
+ if (mDataChannels .find (stream) == mDataChannels .end ())
655
672
break ;
656
673
657
674
stream += 2 ;
@@ -691,19 +708,6 @@ void PeerConnection::iterateDataChannels(
691
708
}
692
709
}
693
710
694
- void PeerConnection::cleanupDataChannels () {
695
- std::unique_lock lock (mDataChannelsMutex ); // we are going to erase
696
- auto it = mDataChannels .begin ();
697
- while (it != mDataChannels .end ()) {
698
- if (!it->second .lock ()) {
699
- it = mDataChannels .erase (it);
700
- continue ;
701
- }
702
-
703
- ++it;
704
- }
705
- }
706
-
707
711
void PeerConnection::openDataChannels () {
708
712
if (auto transport = std::atomic_load (&mSctpTransport ))
709
713
iterateDataChannels ([&](shared_ptr<DataChannel> channel) {
0 commit comments