Skip to content

Commit

Permalink
Fix lack of "consumerlayerschange" event when all streams in the prod…
Browse files Browse the repository at this point in the history
…ucer die (#1122)
  • Loading branch information
ibc authored Jul 19, 2023
1 parent 1e3a57c commit 2f6bfca
Show file tree
Hide file tree
Showing 17 changed files with 117 additions and 78 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

### Next

* `SimulcastConsumer`: Fix lack of "consumerlayerschange" event when all streams in the producer die ([PR #1122](https://github.com/versatica/mediasoup/pull/1122)).


### 3.12.6

Expand Down
9 changes: 5 additions & 4 deletions worker/include/RTC/Consumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "RTC/RtpHeaderExtensionIds.hpp"
#include "RTC/RtpPacket.hpp"
#include "RTC/RtpStream.hpp"
#include "RTC/RtpStreamRecv.hpp"
#include "RTC/RtpStreamSend.hpp"
#include "RTC/Shared.hpp"
#include <absl/container/flat_hash_set.h>
Expand Down Expand Up @@ -129,12 +130,12 @@ namespace RTC
}
void ProducerPaused();
void ProducerResumed();
virtual void ProducerRtpStream(RTC::RtpStream* rtpStream, uint32_t mappedSsrc) = 0;
virtual void ProducerNewRtpStream(RTC::RtpStream* rtpStream, uint32_t mappedSsrc) = 0;
virtual void ProducerRtpStream(RTC::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) = 0;
virtual void ProducerNewRtpStream(RTC::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) = 0;
void ProducerRtpStreamScores(const std::vector<uint8_t>* scores);
virtual void ProducerRtpStreamScore(
RTC::RtpStream* rtpStream, uint8_t score, uint8_t previousScore) = 0;
virtual void ProducerRtcpSenderReport(RTC::RtpStream* rtpStream, bool first) = 0;
RTC::RtpStreamRecv* rtpStream, uint8_t score, uint8_t previousScore) = 0;
virtual void ProducerRtcpSenderReport(RTC::RtpStreamRecv* rtpStream, bool first) = 0;
void ProducerClosed();
void SetExternallyManagedBitrate()
{
Expand Down
10 changes: 5 additions & 5 deletions worker/include/RTC/PipeConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#define MS_RTC_PIPE_CONSUMER_HPP

#include "RTC/Consumer.hpp"
#include "RTC/RtpStreamSend.hpp"
#include "RTC/SeqManager.hpp"
#include "RTC/Shared.hpp"

Expand All @@ -23,10 +22,11 @@ namespace RTC
void FillJson(json& jsonObject) const override;
void FillJsonStats(json& jsonArray) const override;
void FillJsonScore(json& jsonObject) const override;
void ProducerRtpStream(RTC::RtpStream* rtpStream, uint32_t mappedSsrc) override;
void ProducerNewRtpStream(RTC::RtpStream* rtpStream, uint32_t mappedSsrc) override;
void ProducerRtpStreamScore(RTC::RtpStream* rtpStream, uint8_t score, uint8_t previousScore) override;
void ProducerRtcpSenderReport(RTC::RtpStream* rtpStream, bool first) override;
void ProducerRtpStream(RTC::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) override;
void ProducerNewRtpStream(RTC::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) override;
void ProducerRtpStreamScore(
RTC::RtpStreamRecv* rtpStream, uint8_t score, uint8_t previousScore) override;
void ProducerRtcpSenderReport(RTC::RtpStreamRecv* rtpStream, bool first) override;
uint8_t GetBitratePriority() const override;
uint32_t IncreaseLayer(uint32_t bitrate, bool considerLoss) override;
void ApplyLayers() override;
Expand Down
9 changes: 6 additions & 3 deletions worker/include/RTC/Producer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,14 @@ namespace RTC
virtual void OnProducerPaused(RTC::Producer* producer) = 0;
virtual void OnProducerResumed(RTC::Producer* producer) = 0;
virtual void OnProducerNewRtpStream(
RTC::Producer* producer, RTC::RtpStream* rtpStream, uint32_t mappedSsrc) = 0;
RTC::Producer* producer, RTC::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) = 0;
virtual void OnProducerRtpStreamScore(
RTC::Producer* producer, RTC::RtpStream* rtpStream, uint8_t score, uint8_t previousScore) = 0;
RTC::Producer* producer,
RTC::RtpStreamRecv* rtpStream,
uint8_t score,
uint8_t previousScore) = 0;
virtual void OnProducerRtcpSenderReport(
RTC::Producer* producer, RTC::RtpStream* rtpStream, bool first) = 0;
RTC::Producer* producer, RTC::RtpStreamRecv* rtpStream, bool first) = 0;
virtual void OnProducerRtpPacketReceived(RTC::Producer* producer, RTC::RtpPacket* packet) = 0;
virtual void OnProducerSendRtcpPacket(RTC::Producer* producer, RTC::RTCP::Packet* packet) = 0;
virtual void OnProducerNeedWorstRemoteFractionLost(
Expand Down
11 changes: 7 additions & 4 deletions worker/include/RTC/Router.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
#include "RTC/Producer.hpp"
#include "RTC/RtpObserver.hpp"
#include "RTC/RtpPacket.hpp"
#include "RTC/RtpStream.hpp"
#include "RTC/RtpStreamRecv.hpp"
#include "RTC/Shared.hpp"
#include "RTC/Transport.hpp"
#include "RTC/WebRtcServer.hpp"
Expand Down Expand Up @@ -65,16 +65,19 @@ namespace RTC
void OnTransportProducerNewRtpStream(
RTC::Transport* transport,
RTC::Producer* producer,
RTC::RtpStream* rtpStream,
RTC::RtpStreamRecv* rtpStream,
uint32_t mappedSsrc) override;
void OnTransportProducerRtpStreamScore(
RTC::Transport* transport,
RTC::Producer* producer,
RTC::RtpStream* rtpStream,
RTC::RtpStreamRecv* rtpStream,
uint8_t score,
uint8_t previousScore) override;
void OnTransportProducerRtcpSenderReport(
RTC::Transport* transport, RTC::Producer* producer, RTC::RtpStream* rtpStream, bool first) override;
RTC::Transport* transport,
RTC::Producer* producer,
RTC::RtpStreamRecv* rtpStream,
bool first) override;
void OnTransportProducerRtpPacketReceived(
RTC::Transport* transport, RTC::Producer* producer, RTC::RtpPacket* packet) override;
void OnTransportNeedWorstRemoteFractionLost(
Expand Down
4 changes: 4 additions & 0 deletions worker/include/RTC/RtpStreamRecv.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ namespace RTC
{
return this->transmissionCounter.GetLayerBitrate(nowMs, spatialLayer, temporalLayer);
}
bool HasRtpInactivityCheckEnabled() const
{
return this->useRtpInactivityCheck;
}

private:
void CalculateJitter(uint32_t rtpTimestamp);
Expand Down
16 changes: 9 additions & 7 deletions worker/include/RTC/SimpleConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#define MS_RTC_SIMPLE_CONSUMER_HPP

#include "RTC/Consumer.hpp"
#include "RTC/RtpStreamSend.hpp"
#include "RTC/SeqManager.hpp"
#include "RTC/Shared.hpp"

Expand All @@ -29,14 +28,17 @@ namespace RTC
return (
RTC::Consumer::IsActive() &&
this->producerRtpStream &&
(this->producerRtpStream->GetScore() > 0u || this->producerRtpStream->HasDtx())
// If there is no RTP inactivity check do not consider the stream
// inactive despite it has score 0.
(this->producerRtpStream->GetScore() > 0u || !this->producerRtpStream->HasRtpInactivityCheckEnabled())
);
// clang-format on
}
void ProducerRtpStream(RTC::RtpStream* rtpStream, uint32_t mappedSsrc) override;
void ProducerNewRtpStream(RTC::RtpStream* rtpStream, uint32_t mappedSsrc) override;
void ProducerRtpStreamScore(RTC::RtpStream* rtpStream, uint8_t score, uint8_t previousScore) override;
void ProducerRtcpSenderReport(RTC::RtpStream* rtpStream, bool first) override;
void ProducerRtpStream(RTC::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) override;
void ProducerNewRtpStream(RTC::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) override;
void ProducerRtpStreamScore(
RTC::RtpStreamRecv* rtpStream, uint8_t score, uint8_t previousScore) override;
void ProducerRtcpSenderReport(RTC::RtpStreamRecv* rtpStream, bool first) override;
uint8_t GetBitratePriority() const override;
uint32_t IncreaseLayer(uint32_t bitrate, bool considerLoss) override;
void ApplyLayers() override;
Expand Down Expand Up @@ -78,7 +80,7 @@ namespace RTC
RTC::RtpStreamSend* rtpStream{ nullptr };
// Others.
std::vector<RTC::RtpStreamSend*> rtpStreams;
RTC::RtpStream* producerRtpStream{ nullptr };
RTC::RtpStreamRecv* producerRtpStream{ nullptr };
bool keyFrameSupported{ false };
bool syncRequired{ false };
RTC::SeqManager<uint16_t> rtpSeqManager;
Expand Down
24 changes: 13 additions & 11 deletions worker/include/RTC/SimulcastConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

#include "RTC/Codecs/PayloadDescriptorHandler.hpp"
#include "RTC/Consumer.hpp"
#include "RTC/RtpStreamSend.hpp"
#include "RTC/SeqManager.hpp"
#include "RTC/Shared.hpp"

Expand Down Expand Up @@ -41,18 +40,21 @@ namespace RTC
std::any_of(
this->producerRtpStreams.begin(),
this->producerRtpStreams.end(),
[](const RTC::RtpStream* rtpStream)
[](const RTC::RtpStreamRecv* rtpStream)
{
return (rtpStream != nullptr && (rtpStream->GetScore() > 0u || rtpStream->HasDtx()));
// If there is no RTP inactivity check do not consider the stream
// inactive despite it has score 0.
return (rtpStream != nullptr && (rtpStream->GetScore() > 0u || !rtpStream->HasRtpInactivityCheckEnabled()));
}
)
);
// clang-format on
}
void ProducerRtpStream(RTC::RtpStream* rtpStream, uint32_t mappedSsrc) override;
void ProducerNewRtpStream(RTC::RtpStream* rtpStream, uint32_t mappedSsrc) override;
void ProducerRtpStreamScore(RTC::RtpStream* rtpStream, uint8_t score, uint8_t previousScore) override;
void ProducerRtcpSenderReport(RTC::RtpStream* rtpStream, bool first) override;
void ProducerRtpStream(RTC::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) override;
void ProducerNewRtpStream(RTC::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) override;
void ProducerRtpStreamScore(
RTC::RtpStreamRecv* rtpStream, uint8_t score, uint8_t previousScore) override;
void ProducerRtcpSenderReport(RTC::RtpStreamRecv* rtpStream, bool first) override;
uint8_t GetBitratePriority() const override;
uint32_t IncreaseLayer(uint32_t bitrate, bool considerLoss) override;
void ApplyLayers() override;
Expand Down Expand Up @@ -90,9 +92,9 @@ namespace RTC
bool CanSwitchToSpatialLayer(int16_t spatialLayer) const;
void EmitScore() const;
void EmitLayersChange() const;
RTC::RtpStream* GetProducerCurrentRtpStream() const;
RTC::RtpStream* GetProducerTargetRtpStream() const;
RTC::RtpStream* GetProducerTsReferenceRtpStream() const;
RTC::RtpStreamRecv* GetProducerCurrentRtpStream() const;
RTC::RtpStreamRecv* GetProducerTargetRtpStream() const;
RTC::RtpStreamRecv* GetProducerTsReferenceRtpStream() const;

/* Pure virtual methods inherited from RtpStreamSend::Listener. */
public:
Expand All @@ -105,7 +107,7 @@ namespace RTC
// Others.
absl::flat_hash_map<uint32_t, int16_t> mapMappedSsrcSpatialLayer;
std::vector<RTC::RtpStreamSend*> rtpStreams;
std::vector<RTC::RtpStream*> producerRtpStreams; // Indexed by spatial layer.
std::vector<RTC::RtpStreamRecv*> producerRtpStreams; // Indexed by spatial layer.
bool syncRequired{ false };
int16_t spatialLayerToSync{ -1 };
bool lastSentPacketHasMarker{ false };
Expand Down
16 changes: 9 additions & 7 deletions worker/include/RTC/SvcConsumer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

#include "RTC/Codecs/PayloadDescriptorHandler.hpp"
#include "RTC/Consumer.hpp"
#include "RTC/RtpStreamSend.hpp"
#include "RTC/SeqManager.hpp"
#include "RTC/Shared.hpp"
#include <map>
Expand Down Expand Up @@ -40,14 +39,17 @@ namespace RTC
return (
RTC::Consumer::IsActive() &&
this->producerRtpStream &&
(this->producerRtpStream->GetScore() > 0u || this->producerRtpStream->HasDtx())
// If there is no RTP inactivity check do not consider the stream
// inactive despite it has score 0.
(this->producerRtpStream->GetScore() > 0u || !this->producerRtpStream->HasRtpInactivityCheckEnabled())
);
// clang-format on
}
void ProducerRtpStream(RTC::RtpStream* rtpStream, uint32_t mappedSsrc) override;
void ProducerNewRtpStream(RTC::RtpStream* rtpStream, uint32_t mappedSsrc) override;
void ProducerRtpStreamScore(RTC::RtpStream* rtpStream, uint8_t score, uint8_t previousScore) override;
void ProducerRtcpSenderReport(RTC::RtpStream* rtpStream, bool first) override;
void ProducerRtpStream(RTC::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) override;
void ProducerNewRtpStream(RTC::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) override;
void ProducerRtpStreamScore(
RTC::RtpStreamRecv* rtpStream, uint8_t score, uint8_t previousScore) override;
void ProducerRtcpSenderReport(RTC::RtpStreamRecv* rtpStream, bool first) override;
uint8_t GetBitratePriority() const override;
uint32_t IncreaseLayer(uint32_t bitrate, bool considerLoss) override;
void ApplyLayers() override;
Expand Down Expand Up @@ -93,7 +95,7 @@ namespace RTC
RTC::RtpStreamSend* rtpStream{ nullptr };
// Others.
std::vector<RTC::RtpStreamSend*> rtpStreams;
RTC::RtpStream* producerRtpStream{ nullptr };
RTC::RtpStreamRecv* producerRtpStream{ nullptr };
bool syncRequired{ false };
RTC::SeqManager<uint16_t> rtpSeqManager;
int16_t preferredSpatialLayer{ -1 };
Expand Down
18 changes: 12 additions & 6 deletions worker/include/RTC/Transport.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,19 @@ namespace RTC
virtual void OnTransportProducerNewRtpStream(
RTC::Transport* transport,
RTC::Producer* producer,
RTC::RtpStream* rtpStream,
RTC::RtpStreamRecv* rtpStream,
uint32_t mappedSsrc) = 0;
virtual void OnTransportProducerRtpStreamScore(
RTC::Transport* transport,
RTC::Producer* producer,
RTC::RtpStream* rtpStream,
RTC::RtpStreamRecv* rtpStream,
uint8_t score,
uint8_t previousScore) = 0;
virtual void OnTransportProducerRtcpSenderReport(
RTC::Transport* transport, RTC::Producer* producer, RTC::RtpStream* rtpStream, bool first) = 0;
RTC::Transport* transport,
RTC::Producer* producer,
RTC::RtpStreamRecv* rtpStream,
bool first) = 0;
virtual void OnTransportProducerRtpPacketReceived(
RTC::Transport* transport, RTC::Producer* producer, RTC::RtpPacket* packet) = 0;
virtual void OnTransportNeedWorstRemoteFractionLost(
Expand Down Expand Up @@ -204,11 +207,14 @@ namespace RTC
void OnProducerPaused(RTC::Producer* producer) override;
void OnProducerResumed(RTC::Producer* producer) override;
void OnProducerNewRtpStream(
RTC::Producer* producer, RTC::RtpStream* rtpStream, uint32_t mappedSsrc) override;
RTC::Producer* producer, RTC::RtpStreamRecv* rtpStream, uint32_t mappedSsrc) override;
void OnProducerRtpStreamScore(
RTC::Producer* producer, RTC::RtpStream* rtpStream, uint8_t score, uint8_t previousScore) override;
RTC::Producer* producer,
RTC::RtpStreamRecv* rtpStream,
uint8_t score,
uint8_t previousScore) override;
void OnProducerRtcpSenderReport(
RTC::Producer* producer, RTC::RtpStream* rtpStream, bool first) override;
RTC::Producer* producer, RTC::RtpStreamRecv* rtpStream, bool first) override;
void OnProducerRtpPacketReceived(RTC::Producer* producer, RTC::RtpPacket* packet) override;
void OnProducerSendRtcpPacket(RTC::Producer* producer, RTC::RTCP::Packet* packet) override;
void OnProducerNeedWorstRemoteFractionLost(
Expand Down
8 changes: 4 additions & 4 deletions worker/src/RTC/PipeConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,29 +137,29 @@ namespace RTC
}
}

void PipeConsumer::ProducerRtpStream(RTC::RtpStream* /*rtpStream*/, uint32_t /*mappedSsrc*/)
void PipeConsumer::ProducerRtpStream(RTC::RtpStreamRecv* /*rtpStream*/, uint32_t /*mappedSsrc*/)
{
MS_TRACE();

// Do nothing.
}

void PipeConsumer::ProducerNewRtpStream(RTC::RtpStream* /*rtpStream*/, uint32_t /*mappedSsrc*/)
void PipeConsumer::ProducerNewRtpStream(RTC::RtpStreamRecv* /*rtpStream*/, uint32_t /*mappedSsrc*/)
{
MS_TRACE();

// Do nothing.
}

void PipeConsumer::ProducerRtpStreamScore(
RTC::RtpStream* /*rtpStream*/, uint8_t /*score*/, uint8_t /*previousScore*/)
RTC::RtpStreamRecv* /*rtpStream*/, uint8_t /*score*/, uint8_t /*previousScore*/)
{
MS_TRACE();

// Do nothing.
}

void PipeConsumer::ProducerRtcpSenderReport(RTC::RtpStream* /*rtpStream*/, bool /*first*/)
void PipeConsumer::ProducerRtcpSenderReport(RTC::RtpStreamRecv* /*rtpStream*/, bool /*first*/)
{
MS_TRACE();

Expand Down
11 changes: 7 additions & 4 deletions worker/src/RTC/Producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1223,8 +1223,10 @@ namespace RTC
}
}

// Only perform RTP inactivity check on simulcast.
auto useRtpInactivityCheck = this->type == RtpParameters::Type::SIMULCAST;
// Only perform RTP inactivity check on simulcast and only if there are
// more than 1 stream.
auto useRtpInactivityCheck =
this->type == RtpParameters::Type::SIMULCAST && this->rtpMapping.encodings.size() > 1;

// Create a RtpStreamRecv for receiving a media stream.
auto* rtpStream = new RTC::RtpStreamRecv(this, params, SendNackDelay, useRtpInactivityCheck);
Expand Down Expand Up @@ -1255,7 +1257,7 @@ namespace RTC
auto mappedSsrc = this->mapRtpStreamMappedSsrc.at(rtpStream);

// Notify the listener.
this->listener->OnProducerNewRtpStream(this, static_cast<RTC::RtpStream*>(rtpStream), mappedSsrc);
this->listener->OnProducerNewRtpStream(this, rtpStream, mappedSsrc);
}

inline void Producer::PreProcessRtpPacket(RTC::RtpPacket* packet)
Expand Down Expand Up @@ -1637,7 +1639,8 @@ namespace RTC
this->rtpStreamScores[rtpStream->GetEncodingIdx()] = score;

// Notify the listener.
this->listener->OnProducerRtpStreamScore(this, rtpStream, score, previousScore);
this->listener->OnProducerRtpStreamScore(
this, static_cast<RTC::RtpStreamRecv*>(rtpStream), score, previousScore);

// Emit the score event.
EmitScore();
Expand Down
9 changes: 6 additions & 3 deletions worker/src/RTC/Router.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -659,7 +659,10 @@ namespace RTC
}

inline void Router::OnTransportProducerNewRtpStream(
RTC::Transport* /*transport*/, RTC::Producer* producer, RTC::RtpStream* rtpStream, uint32_t mappedSsrc)
RTC::Transport* /*transport*/,
RTC::Producer* producer,
RTC::RtpStreamRecv* rtpStream,
uint32_t mappedSsrc)
{
MS_TRACE();

Expand All @@ -674,7 +677,7 @@ namespace RTC
inline void Router::OnTransportProducerRtpStreamScore(
RTC::Transport* /*transport*/,
RTC::Producer* producer,
RTC::RtpStream* rtpStream,
RTC::RtpStreamRecv* rtpStream,
uint8_t score,
uint8_t previousScore)
{
Expand All @@ -689,7 +692,7 @@ namespace RTC
}

inline void Router::OnTransportProducerRtcpSenderReport(
RTC::Transport* /*transport*/, RTC::Producer* producer, RTC::RtpStream* rtpStream, bool first)
RTC::Transport* /*transport*/, RTC::Producer* producer, RTC::RtpStreamRecv* rtpStream, bool first)
{
MS_TRACE();

Expand Down
Loading

0 comments on commit 2f6bfca

Please sign in to comment.