Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: better reconnection logic & safety #204

Merged
merged 9 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions libwebrtc/src/data_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ impl DataChannel {
impl Debug for DataChannel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DataChannel")
.field("id", &self.id())
.field("label", &self.label())
.field("state", &self.state())
.finish()
Expand Down
1 change: 1 addition & 0 deletions livekit-api/src/signal_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub const PROTOCOL_VERSION: u32 = 8;
pub enum SignalError {
#[error("ws failure: {0}")]
WsError(#[from] WsError),

#[error("failed to parse the url {0}")]
UrlParse(#[from] url::ParseError),
#[error("client error: {0} - {1}")]
Expand Down
3 changes: 3 additions & 0 deletions livekit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ rustls-tls-native-roots = ["livekit-api/rustls-tls-native-roots"]
rustls-tls-webpki-roots = ["livekit-api/rustls-tls-webpki-roots"]
__rustls-tls = ["livekit-api/__rustls-tls"]

# internal features (used by livekit-ffi)
__lk-internal = []

[dependencies]
livekit-api = { path = "../livekit-api", version = "0.2.0", default-features = false, features = ["signal-client"] }
libwebrtc = { path = "../libwebrtc", version = "0.2.0" }
Expand Down
171 changes: 120 additions & 51 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ use crate::prelude::*;
use crate::rtc_engine::{EngineError, EngineOptions};
use crate::rtc_engine::{EngineEvent, EngineEvents, EngineResult, RtcEngine};
use libwebrtc::native::frame_cryptor::EncryptionState;
use libwebrtc::prelude::{ContinualGatheringPolicy, IceTransportsType, RtcConfiguration};
use libwebrtc::prelude::{
ContinualGatheringPolicy, IceTransportsType, MediaStream, MediaStreamTrack, RtcConfiguration,
};
use libwebrtc::rtp_transceiver::RtpTransceiver;
use livekit_api::signal_client::SignalOptions;
use livekit_protocol as proto;
use livekit_protocol::observer::Dispatcher;
use parking_lot::RwLock;
use proto::SignalTarget;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
Expand Down Expand Up @@ -123,7 +127,6 @@ pub enum RoomEvent {
Connected {
/// Initial participants & their tracks prior to joining the room
/// We're not returning this directly inside Room::connect because it is unlikely to be used
/// and will break the current API.
participants_with_tracks: Vec<(RemoteParticipant, Vec<RemoteTrackPublication>)>,
},
Disconnected {
Expand Down Expand Up @@ -200,7 +203,7 @@ impl Room {
options: RoomOptions,
) -> RoomResult<(Self, mpsc::UnboundedReceiver<RoomEvent>)> {
let e2ee_manager = E2eeManager::new(options.e2ee.clone());
let (rtc_engine, engine_events) = RtcEngine::connect(
let (rtc_engine, join_response, engine_events) = RtcEngine::connect(
url,
token,
EngineOptions {
Expand All @@ -214,7 +217,6 @@ impl Room {
.await?;
let rtc_engine = Arc::new(rtc_engine);

let join_response = rtc_engine.last_info().join_response;
if let Some(key_provider) = e2ee_manager.key_provider() {
key_provider.set_sif_trailer(join_response.sif_trailer);
}
Expand Down Expand Up @@ -496,44 +498,19 @@ impl RoomSession {
EngineEvent::MediaTrack {
track,
stream,
receiver: _,
transceiver,
} => {
let stream_id = stream.id();
let lk_stream_id = unpack_stream_id(&stream_id);
if lk_stream_id.is_none() {
Err(RoomError::Internal(format!(
"MediaTrack event with invalid track_id: {:?}",
&stream_id
)))?;
}

let (participant_sid, track_sid) = lk_stream_id.unwrap();
let participant_sid = participant_sid.to_owned().try_into().unwrap();
let track_sid = track_sid.to_owned().try_into().unwrap();
let remote_participant = self.get_participant_by_sid(&participant_sid);

if let Some(remote_participant) = remote_participant {
tokio::spawn(async move {
remote_participant
.add_subscribed_media_track(track_sid, track, transceiver)
.await;
});
} else {
// The server should send participant updates before sending a new offer
// So this should never happen.
Err(RoomError::Internal(format!(
"AddTrack event with invalid participant_sid: {:?}",
participant_sid
)))?;
}
}
} => self.handle_media_track(track, stream, transceiver),
EngineEvent::Resuming(tx) => self.handle_resuming(tx),
EngineEvent::Resumed(tx) => self.handle_resumed(tx),
EngineEvent::SignalResumed(tx) => self.handle_signal_resumed(tx),
EngineEvent::SignalResumed {
reconnect_response,
tx,
} => self.handle_signal_resumed(reconnect_response, tx),
EngineEvent::Restarting(tx) => self.handle_restarting(tx),
EngineEvent::Restarted(tx) => self.handle_restarted(tx),
EngineEvent::SignalRestarted(tx) => self.handle_signal_restarted(tx),
EngineEvent::SignalRestarted { join_response, tx } => {
self.handle_signal_restarted(join_response, tx)
}
EngineEvent::Disconnected { reason } => self.handle_disconnected(reason),
EngineEvent::Data {
payload,
Expand Down Expand Up @@ -635,6 +612,39 @@ impl RoomSession {
}
}

fn handle_media_track(
&self,
track: MediaStreamTrack,
stream: MediaStream,
transceiver: RtpTransceiver,
) {
let stream_id = stream.id();
let lk_stream_id = unpack_stream_id(&stream_id);
if lk_stream_id.is_none() {
log::error!("received track with an invalid track_id: {:?}", &stream_id);
return;
}

let (participant_sid, track_sid) = lk_stream_id.unwrap();
let participant_sid = participant_sid.to_owned().try_into().unwrap();
let track_sid = track_sid.to_owned().try_into().unwrap();
let remote_participant = self.get_participant_by_sid(&participant_sid);

if let Some(remote_participant) = remote_participant {
tokio::spawn(async move {
remote_participant
.add_subscribed_media_track(track_sid, track, transceiver)
.await;
});
} else {
// The server should send participant updates before sending a new offer, this should happen
log::error!(
"received track from an unknown participant: {:?}",
participant_sid
);
}
}

/// Active speakers changed
/// Update the participants & sort the active_speakers by audio_level
fn handle_speakers_changed(&self, speakers_info: Vec<proto::SpeakerInfo>) {
Expand Down Expand Up @@ -693,10 +703,15 @@ impl RoomSession {
}

async fn send_sync_state(self: &Arc<Self>) {
let last_info = self.rtc_engine.last_info();
let auto_subscribe = self.options.auto_subscribe;
let session = self.rtc_engine.session();

if last_info.subscriber_answer.is_none() {
if session
.subscriber()
.peer_connection()
.current_local_description()
.is_none()
{
log::warn!("skipping sendSyncState, no subscriber answer");
return;
}
Expand All @@ -710,8 +725,59 @@ impl RoomSession {
}
}

let answer = last_info.subscriber_answer.unwrap();
let offer = last_info.subscriber_offer.unwrap();
let answer = session
.subscriber()
.peer_connection()
.current_local_description()
.unwrap();

let offer = session
.subscriber()
.peer_connection()
.current_remote_description()
.unwrap();

let mut dcs = Vec::with_capacity(4);
if session.has_published() {
let lossy_dc = session
.data_channel(SignalTarget::Publisher, DataPacketKind::Lossy)
.unwrap();
let reliable_dc = session
.data_channel(SignalTarget::Publisher, DataPacketKind::Reliable)
.unwrap();

dcs.push(proto::DataChannelInfo {
label: lossy_dc.label(),
id: lossy_dc.id() as u32,
target: proto::SignalTarget::Publisher as i32,
});

dcs.push(proto::DataChannelInfo {
label: reliable_dc.label(),
id: reliable_dc.id() as u32,
target: proto::SignalTarget::Publisher as i32,
});
}

if let Some(lossy_dc) =
session.data_channel(SignalTarget::Subscriber, DataPacketKind::Lossy)
{
dcs.push(proto::DataChannelInfo {
label: lossy_dc.label(),
id: lossy_dc.id() as u32,
target: proto::SignalTarget::Subscriber as i32,
});
}

if let Some(reliable_dc) =
session.data_channel(SignalTarget::Subscriber, DataPacketKind::Reliable)
{
dcs.push(proto::DataChannelInfo {
label: reliable_dc.label(),
id: reliable_dc.id() as u32,
target: proto::SignalTarget::Subscriber as i32,
});
}

let sync_state = proto::SyncState {
answer: Some(proto::SessionDescription {
Expand All @@ -728,17 +794,13 @@ impl RoomSession {
participant_tracks: Vec::new(),
}),
publish_tracks: self.local_participant.published_tracks_info(),
data_channels: last_info.data_channels_info,
data_channels: dcs,
};

log::info!("sending sync state {:?}", sync_state);
if let Err(err) = self
.rtc_engine
self.rtc_engine
.send_request(proto::signal_request::Message::SyncState(sync_state))
.await
{
log::error!("failed to send sync state: {:?}", err);
}
.await;
}

fn handle_resuming(self: &Arc<Self>, tx: oneshot::Sender<()>) {
Expand All @@ -756,7 +818,11 @@ impl RoomSession {
let _ = tx.send(());
}

fn handle_signal_resumed(self: &Arc<Self>, tx: oneshot::Sender<()>) {
fn handle_signal_resumed(
self: &Arc<Self>,
_reconnect_repsonse: proto::ReconnectResponse,
tx: oneshot::Sender<()>,
) {
tokio::spawn({
let session = self.clone();
async move {
Expand Down Expand Up @@ -790,8 +856,11 @@ impl RoomSession {
let _ = tx.send(());
}

fn handle_signal_restarted(self: &Arc<Self>, tx: oneshot::Sender<()>) {
let join_response = self.rtc_engine.last_info().join_response;
fn handle_signal_restarted(
self: &Arc<Self>,
join_response: proto::JoinResponse,
tx: oneshot::Sender<()>,
) {
self.local_participant
.update_info(join_response.participant.unwrap()); // The sid may have changed

Expand Down
Loading
Loading