Skip to content
This repository has been archived by the owner on Oct 25, 2024. It is now read-only.

Commit

Permalink
Allow subscribing with SVC params.
Browse files Browse the repository at this point in the history
  • Loading branch information
taste1981 committed Jul 12, 2021
1 parent fa8b907 commit 4211a9f
Show file tree
Hide file tree
Showing 6 changed files with 267 additions and 28 deletions.
96 changes: 95 additions & 1 deletion talk/owt/sdk/conference/conferenceclient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,95 @@ void ConferenceClient::Subscribe(
},
on_failure);
}
void ConferenceClient::UnPublish(
void ConferenceClient::Subscribe(
std::shared_ptr<RemoteStream> stream,
const SubscribeOptions2& options,
std::function<void(std::shared_ptr<ConferenceSubscription>)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure) {
if (!CheckNullPointer((uintptr_t)stream.get(), on_failure)) {
RTC_LOG(LS_ERROR) << "Remote stream cannot be nullptr.";
return;
}
if (added_stream_type_.find(stream->Id()) == added_stream_type_.end()) {
std::string failure_message(
"Subscribing an invalid stream. Please check whether this stream is "
"removed.");
if (on_failure != nullptr) {
event_queue_->PostTask([on_failure, failure_message]() {
std::unique_ptr<Exception> e(
new Exception(ExceptionType::kConferenceUnknown, failure_message));
on_failure(std::move(e));
});
}
return;
}
if (!stream->VideoEnabled()) {
std::string failure_message(
"Stream without video is not allowed to be subcribed with simulcast/SVC constraints");
if (on_failure != nullptr) {
event_queue_->PostTask([on_failure, failure_message]() {
std::unique_ptr<Exception> e(
new Exception(ExceptionType::kConferenceUnknown, failure_message));
on_failure(std::move(e));
});
}
return;
}
// Avoid subscribing the same stream twice.
{
std::lock_guard<std::mutex> lock(subscribe_pcs_mutex_);
// Search subscirbe pcs
auto it = std::find_if(
subscribe_pcs_.begin(), subscribe_pcs_.end(),
[&](std::shared_ptr<ConferencePeerConnectionChannel> o) -> bool {
return o->GetSubStreamId() == stream->Id();
});
if (it != subscribe_pcs_.end()) {
std::string failure_message(
"The same remote stream has already been subscribed. Subcribe after "
"it is unsubscribed");
if (on_failure != nullptr) {
event_queue_->PostTask([on_failure, failure_message]() {
std::unique_ptr<Exception> e(new Exception(
ExceptionType::kConferenceUnknown, failure_message));
on_failure(std::move(e));
});
}
return;
}
}
// Reorder SDP according to perference list.
PeerConnectionChannelConfiguration config =
GetPeerConnectionChannelConfiguration();
for (auto codec : options.audio.codecs) {
config.audio.push_back(AudioEncodingParameters(codec, 0));
}
std::shared_ptr<ConferencePeerConnectionChannel> pcc(
new ConferencePeerConnectionChannel(config, signaling_channel_,
event_queue_));
pcc->AddObserver(*this);
{
std::lock_guard<std::mutex> lock(subscribe_pcs_mutex_);
subscribe_pcs_.push_back(pcc);
}
std::weak_ptr<ConferenceClient> weak_this = shared_from_this();
std::string stream_id = stream->Id();
pcc->Subscribe(
stream, options,
[on_success, weak_this, stream_id](std::string session_id) {
auto that = weak_this.lock();
if (!that)
return;
// map current pcc
if (on_success != nullptr) {
std::shared_ptr<ConferenceSubscription> cp(
new ConferenceSubscription(that, session_id, stream_id));
on_success(cp);
}
},
on_failure);
}
void ConferenceClient::UnPublish(
const std::string& session_id,
std::function<void()> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure) {
Expand Down Expand Up @@ -1491,6 +1579,12 @@ void ConferenceClient::ParseStreamInfo(sio::message::ptr stream_info,
rid_obj->get_flag() == sio::message::flag_string) {
video_publication_settings.rid = rid_obj->get_string();
}
auto scalability_mode_obj = (*tit)->get_map()["scalabilityMode"];
if (scalability_mode_obj != nullptr &&
scalability_mode_obj->get_flag() == sio::message::flag_string) {
video_publication_settings.scalability_mode =
scalability_mode_obj->get_string();
}
auto trackid_obj = (*tit)->get_map()["id"];
if (trackid_obj != nullptr &&
trackid_obj->get_flag() == sio::message::flag_string) {
Expand Down
136 changes: 113 additions & 23 deletions talk/owt/sdk/conference/conferencepeerconnectionchannel.cc
Original file line number Diff line number Diff line change
Expand Up @@ -496,15 +496,6 @@ static bool SubOptionAllowed(
// specifies codec, though signaling allows specifying sample rate and channel
// number.

// If rid is specified, search in publication_settings for rid;
if (subscribe_options.video.rid != "") {
for (auto video_setting : publication_settings.video) {
if (video_setting.rid == subscribe_options.video.rid)
return true;
}
return false;
}

bool resolution_supported = (subscribe_options.video.resolution.width == 0 &&
subscribe_options.video.resolution.height == 0);
bool frame_rate_supported = (subscribe_options.video.frameRate == 0);
Expand Down Expand Up @@ -647,19 +638,7 @@ void ConferencePeerConnectionChannel::Subscribe(
video_options->get_map()["mid"] = sio::string_message::create("1");
}
auto publication_settings = stream->Settings();
if (subscribe_options.video.rid != "") {
for (auto video_setting : publication_settings.video) {
if (video_setting.rid == subscribe_options.video.rid) {
std::string track_id = video_setting.track_id;
video_options->get_map()["from"] =
sio::string_message::create(track_id);
break;
}
}
} else {
video_options->get_map()["from"] =
sio::string_message::create(stream->Id());
}
video_options->get_map()["from"] = sio::string_message::create(stream->Id());
sio::message::ptr video_spec = sio::object_message::create();
sio::message::ptr resolution_options = sio::object_message::create();
if (subscribe_options.video.resolution.width != 0 &&
Expand Down Expand Up @@ -691,10 +670,121 @@ void ConferencePeerConnectionChannel::Subscribe(
sio::int_message::create(subscribe_options.video.frameRate);
}
video_options->get_map()["parameters"] = video_spec;
tracks_options->get_vector().push_back(video_options);
}

media_options->get_map()["tracks"] = tracks_options;
sio_options->get_map()["media"] = media_options;
sio::message::ptr transport_ptr = sio::object_message::create();
transport_ptr->get_map()["type"] = sio::string_message::create("webrtc");
sio_options->get_map()["transport"] = transport_ptr;

signaling_channel_->SendInitializationMessage(
sio_options, "", stream->Id(),
[this](std::string session_id, std::string transport_id) {
// Pre-set the session's ID.
SetSessionId(session_id);
CreateOffer();
},
on_failure); // TODO: on_failure
subscribed_stream_ = stream;
}

void ConferencePeerConnectionChannel::Subscribe(
std::shared_ptr<RemoteStream> stream,
const SubscribeOptions2& subscribe_options,
std::function<void(std::string)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure) {
if (!CheckNullPointer((uintptr_t)stream.get(), on_failure)) {
RTC_LOG(LS_ERROR) << "Remote stream cannot be nullptr.";
return;
}
if (subscribe_success_callback_) {
if (on_failure) {
event_queue_->PostTask([on_failure]() {
std::unique_ptr<Exception> e(new Exception(
ExceptionType::kConferenceUnknown, "Subscribing this stream."));
on_failure(std::move(e));
});
}
}
if ((subscribe_options.video.rid == "") &&
(subscribe_options.video.spatialLayerId == -1) &&
(subscribe_options.video.temporalLayerId == -1)) {
if (on_failure) {
event_queue_->PostTask([on_failure]() {
std::unique_ptr<Exception> e(new Exception(
ExceptionType::kConferenceUnknown,
"Either rid/spatialLayer/temporalLayer needs to be set for subscribing."));
on_failure(std::move(e));
});
}
}
subscribe_success_callback_ = on_success;
failure_callback_ = on_failure;
int audio_track_count = 0, video_track_count = 0;
if (stream->has_audio_ && !subscribe_options.audio.disabled) {
webrtc::RtpTransceiverInit transceiver_init;
transceiver_init.direction = webrtc::RtpTransceiverDirection::kRecvOnly;
AddTransceiver(cricket::MediaType::MEDIA_TYPE_AUDIO, transceiver_init);
audio_track_count = 1;
}
if (stream->has_video_) {
webrtc::RtpTransceiverInit transceiver_init;
transceiver_init.direction = webrtc::RtpTransceiverDirection::kRecvOnly;
AddTransceiver(cricket::MediaType::MEDIA_TYPE_VIDEO, transceiver_init);
video_track_count = 1;
}
sio::message::ptr sio_options = sio::object_message::create();
sio::message::ptr media_options = sio::object_message::create();
sio::message::ptr tracks_options = sio::array_message::create();
if (audio_track_count > 0) {
sio::message::ptr audio_options = sio::object_message::create();
audio_options->get_map()["type"] = sio::string_message::create("audio");
audio_options->get_map()["mid"] = sio::string_message::create("0");
audio_options->get_map()["from"] =
sio::string_message::create(stream->Id());
tracks_options->get_vector().push_back(audio_options);
}
if (video_track_count > 0) {
sio::message::ptr video_options = sio::object_message::create();
video_options->get_map()["type"] = sio::string_message::create("video");
if (audio_track_count == 0) {
video_options->get_map()["mid"] = sio::string_message::create("0");
} else {
video_options->get_map()["mid"] = sio::string_message::create("1");
}
auto publication_settings = stream->Settings();
if (subscribe_options.video.rid != "") {
for (auto video_setting : publication_settings.video) {
if (video_setting.rid == subscribe_options.video.rid) {
std::string track_id = video_setting.track_id;
video_options->get_map()["from"] =
sio::string_message::create(track_id);
break;
}
}
} else {
video_options->get_map()["from"] =
sio::string_message::create(stream->Id());
}
sio::message::ptr layer_spec = sio::object_message::create();
if (subscribe_options.video.spatialLayerId >= 0) {
sio::message::ptr spatial_layer_options =
sio::int_message::create(subscribe_options.video.spatialLayerId);
layer_spec->get_map()["spatialLayer"] = spatial_layer_options;
}
if (subscribe_options.video.temporalLayerId >= 0) {
sio::message::ptr temporal_layer_options =
sio::int_message::create(subscribe_options.video.temporalLayerId);
layer_spec->get_map()["temporallLayer"] = temporal_layer_options;
}
if (subscribe_options.video.rid != "") {
video_options->get_map()["simulcastRid"] =
sio::message::ptr rid_options =
sio::string_message::create(subscribe_options.video.rid);
layer_spec->get_map()["rid"] = rid_options;
}
video_options->get_map()["parameters"] = layer_spec;
tracks_options->get_vector().push_back(video_options);
}

Expand Down
7 changes: 6 additions & 1 deletion talk/owt/sdk/conference/conferencepeerconnectionchannel.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,17 @@ class ConferencePeerConnectionChannel
const std::string& session_id,
std::function<void()> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure);
// Subscribe a stream from the conference.
// Subscribe a non-simulcast/non-SVC stream from the conference.
void Subscribe(
std::shared_ptr<RemoteStream> stream,
const SubscribeOptions& options,
std::function<void(std::string)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure);
// Subscribe a simulcast or SVC stream from the conference.
void Subscribe(std::shared_ptr<RemoteStream> stream,
const SubscribeOptions2& options,
std::function<void(std::string)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure);
// Unsubscribe a remote stream from the conference.
void Unsubscribe(
const std::string& session_id,
Expand Down
1 change: 1 addition & 0 deletions talk/owt/sdk/include/cpp/owt/base/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct VideoPublicationSettings {
unsigned long keyframe_interval;
std::string rid;
std::string track_id;
std::string scalability_mode;
};

#ifdef OWT_ENABLE_QUIC
Expand Down
22 changes: 21 additions & 1 deletion talk/owt/sdk/include/cpp/owt/conference/conferenceclient.h
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,10 @@ class ConferenceClient final
std::function<void(std::shared_ptr<ConferenceSubscription>)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure);
/**
@brief Subscribe a stream from the current room.
@brief Subscribe a stream with transcoding from the current room.
@details Should only be called on stream that is published without simulcast
or SVC opotions. If |stream| is a simulcast stream or an SVC stream,
subscription will fail.
@param stream The remote stream to be subscribed.
@param options Options for subscribing the stream.
@param onSuccess Success callback with a stream that contains media stream.
Expand All @@ -325,6 +328,23 @@ class ConferenceClient final
const SubscribeOptions& options,
std::function<void(std::shared_ptr<ConferenceSubscription>)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure);
/**
@brief Subscribe a simulcast or SVC stream from the current room with preferred rid
and/or temporal/spatial layers.
@details rid and temporal/spatial layer ID can be specified together. If rid in
|options| is not empty and |stream| is not a simulcast stream, subscribe will fail;
If temporalLayerId/spatialLayerId is larger than -1 in |options| and |stream| is not
an SVC stream, subscribe will fail; If both rid and temporal/spatialLyerId are specified,
temporalLayerId/spatialLayerId only applies to simulcast stream associated with rid.
@param stream The remote stream to be subscribed.
@param options Simulcast and SVC stream subscribe options for subscribing the stream.
@param onSuccess Success callback with a stream that contains media stream.
*/
void Subscribe(
std::shared_ptr<RemoteStream> stream,
const SubscribeOptions2& options,
std::function<void(std::shared_ptr<ConferenceSubscription>)> on_success,
std::function<void(std::unique_ptr<Exception>)> on_failure);
/**
@brief Send messsage to all participants in the conference.
@param message The message to be sent.
Expand Down
33 changes: 31 additions & 2 deletions talk/owt/sdk/include/cpp/owt/conference/subscribeoptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,21 @@ struct VideoSubscriptionConstraints {
resolution(0, 0),
frameRate(0),
bitrateMultiplier(0),
keyFrameInterval(0),
rid("") {}
keyFrameInterval(0) {}
bool disabled;
std::vector<owt::base::VideoCodecParameters> codecs;
owt::base::Resolution resolution;
double frameRate;
double bitrateMultiplier;
unsigned long keyFrameInterval;
};
/// Simulcast and SVC stream subscription constranits.
struct VideoSubscriptionConstraints2 {
explicit VideoSubscriptionConstraints2()
: rid(""), spatialLayerId(-1), temporalLayerId(-1) {}
std::string rid;
int spatialLayerId;
int temporalLayerId;
};

#ifdef OWT_ENABLE_QUIC
Expand All @@ -57,6 +63,12 @@ struct SubscribeOptions {
DataSubscriptionConstraints data;
#endif
};

/// SVC and Simulcast stream subscribe options
struct SubscribeOptions2 {
AudioSubscriptionConstraints audio;
VideoSubscriptionConstraints2 video;
};
/// Video subscription update constrains used by subscription's ApplyOptions
/// API.
struct VideoSubscriptionUpdateConstraints {
Expand All @@ -73,11 +85,28 @@ struct VideoSubscriptionUpdateConstraints {
double bitrateMultiplier;
unsigned long keyFrameInterval;
};
/// Simulcast and SVC stream subscription update constraints
struct VideoSubscriptionUpdateConstraints2 {
/**
@brief Construct VideoSubscriptionUpdateConstraints with default value.
*/
explicit VideoSubscriptionUpdateConstraints2()
: rid(""), spatialLayerId(-1), temporalLayerId(-1) {}
std::string rid;
int spatialLayerId;
int temporalLayerId;
};
/// Subscription update option used by subscription's ApplyOptions API.
struct SubscriptionUpdateOptions {
/// Options for updating a subscription.
VideoSubscriptionUpdateConstraints video;
};
/// Simulcast and SVC stream subcription update option used by subscription's
/// ApplyOptions API.
struct SubscriptionUpdateOptions2 {
/// Options for updating a subscription.
VideoSubscriptionUpdateConstraints2 video;
};
} // namespace conference
} // namespace owt
#endif // OWT_CONFERENCE_SUBSCRIBEOPTIONS_H_

0 comments on commit 4211a9f

Please sign in to comment.