diff --git a/libwebrtc/src/data_channel.rs b/libwebrtc/src/data_channel.rs index 5f59c24c..0ff57cae 100644 --- a/libwebrtc/src/data_channel.rs +++ b/libwebrtc/src/data_channel.rs @@ -109,6 +109,7 @@ impl DataChannel { impl Debug for DataChannel { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("DataChannel") + .field("id", &self.id()) .field("label", &self.label()) .field("state", &self.state()) .finish() diff --git a/livekit-ffi/src/server/room.rs b/livekit-ffi/src/server/room.rs index e4613afd..20effd37 100644 --- a/livekit-ffi/src/server/room.rs +++ b/livekit-ffi/src/server/room.rs @@ -19,8 +19,9 @@ use parking_lot::Mutex; use std::collections::HashSet; use std::slice; use std::sync::Arc; -use tokio::sync::Mutex as AsyncMutex; +use std::time::Duration; use tokio::sync::{broadcast, mpsc}; +use tokio::sync::{oneshot, Mutex as AsyncMutex}; use tokio::task::JoinHandle; use super::FfiDataBuffer; @@ -361,6 +362,14 @@ async fn data_task( } } +// The utility of this struct is to know the state we're currently processing +// (The room could have successfully reconnected while we're still processing the previous event, +// but we still didn't receive the reconnected event). The listening task is always late from +// the room tasks +struct ActualState { + reconnecting: bool, +} + /// Forward events to the ffi client async fn room_task( server: &'static FfiServer, @@ -368,10 +377,31 @@ async fn room_task( mut events: mpsc::UnboundedReceiver, mut close_rx: broadcast::Receiver<()>, ) { + let present_state = Arc::new(Mutex::new(ActualState { + reconnecting: false, + })); + loop { tokio::select! { Some(event) = events.recv() => { - forward_event(server, &inner, event).await; + let debug = format!("{:?}", event); + let inner = inner.clone(); + let present_state = present_state.clone(); + let (tx, rx) = oneshot::channel(); + let task = tokio::spawn(async move { + forward_event(server, &inner, event, present_state).await; + let _ = tx.send(()); + }); + + // Monitor sync/async blockings + tokio::select! { + _ = rx => {}, + _ = tokio::time::sleep(Duration::from_secs(10)) => { + log::error!("signal_event taking too much time: {}", debug); + } + } + + task.await.unwrap(); }, _ = close_rx.recv() => { break; @@ -387,7 +417,12 @@ async fn room_task( .await; } -async fn forward_event(server: &'static FfiServer, inner: &Arc, event: RoomEvent) { +async fn forward_event( + server: &'static FfiServer, + inner: &Arc, + event: RoomEvent, + present_state: Arc>, +) { let send_event = |event: proto::room_event::Message| { server.send_event(proto::ffi_event::Message::RoomEvent(proto::RoomEvent { room_handle: inner.handle_id, @@ -427,15 +462,20 @@ async fn forward_event(server: &'static FfiServer, inner: &Arc, event track: _, participant: _, } => { - // Make sure to send the event *after* the async callback of the PublishTrackRequest - // Wait for the PublishTrack callback to be sent (waiting time is really short, so it is fine to not spawn a new task) let sid = publication.sid(); - loop { - if inner.pending_published_tracks.lock().remove(&sid) { - break; + // If we're currently reconnecting, users can't publish tracks, if we receive this + // event it means the RoomEngine is republishing tracks to finish the reconnection + // process. (So we're not waiting for any PublishCallback) + if !present_state.lock().reconnecting { + // Make sure to send the event *after* the async callback of the PublishTrackRequest + // Wait for the PublishTrack callback to be sent (waiting time is really short, so it is fine to not spawn a new task) + loop { + if inner.pending_published_tracks.lock().remove(&sid) { + break; + } + log::info!("waiting for the PublishTrack callback to be sent"); + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; } - log::info!("waiting for the PublishTrack callback to be sent"); - tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; } let ffi_publication = FfiPublication { @@ -648,12 +688,14 @@ async fn forward_event(server: &'static FfiServer, inner: &Arc, event .await; } RoomEvent::Reconnecting => { + present_state.lock().reconnecting = true; let _ = send_event(proto::room_event::Message::Reconnecting( proto::Reconnecting {}, )) .await; } RoomEvent::Reconnected => { + present_state.lock().reconnecting = false; let _ = send_event(proto::room_event::Message::Reconnected( proto::Reconnected {}, )) diff --git a/livekit/Cargo.toml b/livekit/Cargo.toml index 34016461..12cee5a1 100644 --- a/livekit/Cargo.toml +++ b/livekit/Cargo.toml @@ -18,6 +18,9 @@ rustls-tls-native-roots = ["livekit-api/rustls-tls-native-roots"] rustls-tls-webpki-roots = ["livekit-api/rustls-tls-webpki-roots"] __rustls-tls = ["livekit-api/__rustls-tls"] +# internal features (used by livekit-ffi) +__lk-internal = [] + [dependencies] livekit-api = { path = "../livekit-api", version = "0.2.0", default-features = false, features = ["signal-client"] } libwebrtc = { path = "../libwebrtc", version = "0.2.0" } diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index 03609c26..09007e2e 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -19,11 +19,15 @@ use crate::prelude::*; use crate::rtc_engine::{EngineError, EngineOptions}; use crate::rtc_engine::{EngineEvent, EngineEvents, EngineResult, RtcEngine}; use libwebrtc::native::frame_cryptor::EncryptionState; -use libwebrtc::prelude::{ContinualGatheringPolicy, IceTransportsType, RtcConfiguration}; +use libwebrtc::prelude::{ + ContinualGatheringPolicy, IceTransportsType, MediaStream, MediaStreamTrack, RtcConfiguration, +}; +use libwebrtc::rtp_transceiver::RtpTransceiver; use livekit_api::signal_client::SignalOptions; use livekit_protocol as proto; use livekit_protocol::observer::Dispatcher; use parking_lot::RwLock; +use proto::SignalTarget; use std::collections::HashMap; use std::fmt::Debug; use std::sync::Arc; @@ -123,7 +127,6 @@ pub enum RoomEvent { Connected { /// Initial participants & their tracks prior to joining the room /// We're not returning this directly inside Room::connect because it is unlikely to be used - /// and will break the current API. participants_with_tracks: Vec<(RemoteParticipant, Vec)>, }, Disconnected { @@ -200,7 +203,7 @@ impl Room { options: RoomOptions, ) -> RoomResult<(Self, mpsc::UnboundedReceiver)> { let e2ee_manager = E2eeManager::new(options.e2ee.clone()); - let (rtc_engine, engine_events) = RtcEngine::connect( + let (rtc_engine, join_response, engine_events) = RtcEngine::connect( url, token, EngineOptions { @@ -214,7 +217,6 @@ impl Room { .await?; let rtc_engine = Arc::new(rtc_engine); - let join_response = rtc_engine.last_info().join_response; if let Some(key_provider) = e2ee_manager.key_provider() { key_provider.set_sif_trailer(join_response.sif_trailer); } @@ -496,44 +498,19 @@ impl RoomSession { EngineEvent::MediaTrack { track, stream, - receiver: _, transceiver, - } => { - let stream_id = stream.id(); - let lk_stream_id = unpack_stream_id(&stream_id); - if lk_stream_id.is_none() { - Err(RoomError::Internal(format!( - "MediaTrack event with invalid track_id: {:?}", - &stream_id - )))?; - } - - let (participant_sid, track_sid) = lk_stream_id.unwrap(); - let participant_sid = participant_sid.to_owned().try_into().unwrap(); - let track_sid = track_sid.to_owned().try_into().unwrap(); - let remote_participant = self.get_participant_by_sid(&participant_sid); - - if let Some(remote_participant) = remote_participant { - tokio::spawn(async move { - remote_participant - .add_subscribed_media_track(track_sid, track, transceiver) - .await; - }); - } else { - // The server should send participant updates before sending a new offer - // So this should never happen. - Err(RoomError::Internal(format!( - "AddTrack event with invalid participant_sid: {:?}", - participant_sid - )))?; - } - } + } => self.handle_media_track(track, stream, transceiver), EngineEvent::Resuming(tx) => self.handle_resuming(tx), EngineEvent::Resumed(tx) => self.handle_resumed(tx), - EngineEvent::SignalResumed(tx) => self.handle_signal_resumed(tx), + EngineEvent::SignalResumed { + reconnect_response, + tx, + } => self.handle_signal_resumed(reconnect_response, tx), EngineEvent::Restarting(tx) => self.handle_restarting(tx), EngineEvent::Restarted(tx) => self.handle_restarted(tx), - EngineEvent::SignalRestarted(tx) => self.handle_signal_restarted(tx), + EngineEvent::SignalRestarted { join_response, tx } => { + self.handle_signal_restarted(join_response, tx) + } EngineEvent::Disconnected { reason } => self.handle_disconnected(reason), EngineEvent::Data { payload, @@ -635,6 +612,39 @@ impl RoomSession { } } + fn handle_media_track( + &self, + track: MediaStreamTrack, + stream: MediaStream, + transceiver: RtpTransceiver, + ) { + let stream_id = stream.id(); + let lk_stream_id = unpack_stream_id(&stream_id); + if lk_stream_id.is_none() { + log::error!("received track with an invalid track_id: {:?}", &stream_id); + return; + } + + let (participant_sid, track_sid) = lk_stream_id.unwrap(); + let participant_sid = participant_sid.to_owned().try_into().unwrap(); + let track_sid = track_sid.to_owned().try_into().unwrap(); + let remote_participant = self.get_participant_by_sid(&participant_sid); + + if let Some(remote_participant) = remote_participant { + tokio::spawn(async move { + remote_participant + .add_subscribed_media_track(track_sid, track, transceiver) + .await; + }); + } else { + // The server should send participant updates before sending a new offer, this should happen + log::error!( + "received track from an unknown participant: {:?}", + participant_sid + ); + } + } + /// Active speakers changed /// Update the participants & sort the active_speakers by audio_level fn handle_speakers_changed(&self, speakers_info: Vec) { @@ -693,10 +703,15 @@ impl RoomSession { } async fn send_sync_state(self: &Arc) { - let last_info = self.rtc_engine.last_info(); let auto_subscribe = self.options.auto_subscribe; + let session = self.rtc_engine.session(); - if last_info.subscriber_answer.is_none() { + if session + .subscriber() + .peer_connection() + .current_local_description() + .is_none() + { log::warn!("skipping sendSyncState, no subscriber answer"); return; } @@ -710,8 +725,59 @@ impl RoomSession { } } - let answer = last_info.subscriber_answer.unwrap(); - let offer = last_info.subscriber_offer.unwrap(); + let answer = session + .subscriber() + .peer_connection() + .current_local_description() + .unwrap(); + + let offer = session + .subscriber() + .peer_connection() + .current_remote_description() + .unwrap(); + + let mut dcs = Vec::with_capacity(4); + if session.has_published() { + let lossy_dc = session + .data_channel(SignalTarget::Publisher, DataPacketKind::Lossy) + .unwrap(); + let reliable_dc = session + .data_channel(SignalTarget::Publisher, DataPacketKind::Reliable) + .unwrap(); + + dcs.push(proto::DataChannelInfo { + label: lossy_dc.label(), + id: lossy_dc.id() as u32, + target: proto::SignalTarget::Publisher as i32, + }); + + dcs.push(proto::DataChannelInfo { + label: reliable_dc.label(), + id: reliable_dc.id() as u32, + target: proto::SignalTarget::Publisher as i32, + }); + } + + if let Some(lossy_dc) = + session.data_channel(SignalTarget::Subscriber, DataPacketKind::Lossy) + { + dcs.push(proto::DataChannelInfo { + label: lossy_dc.label(), + id: lossy_dc.id() as u32, + target: proto::SignalTarget::Subscriber as i32, + }); + } + + if let Some(reliable_dc) = + session.data_channel(SignalTarget::Subscriber, DataPacketKind::Reliable) + { + dcs.push(proto::DataChannelInfo { + label: reliable_dc.label(), + id: reliable_dc.id() as u32, + target: proto::SignalTarget::Subscriber as i32, + }); + } let sync_state = proto::SyncState { answer: Some(proto::SessionDescription { @@ -728,17 +794,13 @@ impl RoomSession { participant_tracks: Vec::new(), }), publish_tracks: self.local_participant.published_tracks_info(), - data_channels: last_info.data_channels_info, + data_channels: dcs, }; log::info!("sending sync state {:?}", sync_state); - if let Err(err) = self - .rtc_engine + self.rtc_engine .send_request(proto::signal_request::Message::SyncState(sync_state)) - .await - { - log::error!("failed to send sync state: {:?}", err); - } + .await; } fn handle_resuming(self: &Arc, tx: oneshot::Sender<()>) { @@ -756,7 +818,11 @@ impl RoomSession { let _ = tx.send(()); } - fn handle_signal_resumed(self: &Arc, tx: oneshot::Sender<()>) { + fn handle_signal_resumed( + self: &Arc, + _reconnect_repsonse: proto::ReconnectResponse, + tx: oneshot::Sender<()>, + ) { tokio::spawn({ let session = self.clone(); async move { @@ -784,42 +850,62 @@ impl RoomSession { } fn handle_restarted(self: &Arc, tx: oneshot::Sender<()>) { - self.update_connection_state(ConnectionState::Connected); - self.dispatcher.dispatch(&RoomEvent::Reconnected); - let _ = tx.send(()); - } - - fn handle_signal_restarted(self: &Arc, tx: oneshot::Sender<()>) { - let join_response = self.rtc_engine.last_info().join_response; - self.local_participant - .update_info(join_response.participant.unwrap()); // The sid may have changed - - self.handle_participant_update(join_response.other_participants); - // unpublish & republish tracks + // Unpublish and republish every track + // At this time we know that the RtcSession is successfully restarted let published_tracks = self.local_participant.tracks(); - // Should I create a new task? + // Spawining a new task because we need to wait for the RtcEngine to close the reconnection + // lock. tokio::spawn({ let session = self.clone(); async move { - for (_, publication) in published_tracks { - let track = publication.track(); + let mut set = tokio::task::JoinSet::new(); - let _ = session - .local_participant - .unpublish_track(&publication.sid()) - .await; + for (_, publication) in published_tracks { + let track = publication.track().unwrap(); + + let lp = session.local_participant.clone(); + let republish = async move { + // Only "really" used to send LocalTrackUnpublished event (Since we don't really need + // to remove the RtpSender since we know we are using a new RtcSession, + // so new PeerConnetions) + + let _ = lp.unpublish_track(&publication.sid()).await; + if let Err(err) = lp + .publish_track(track.clone(), publication.publish_options()) + .await + { + log::error!( + "failed to republish track {} after rtc_engine restarted: {}", + track.name(), + err + ) + } + }; - let _ = session - .local_participant - .publish_track(track.unwrap(), publication.publish_options()) - .await; + set.spawn(republish); } + + // Wait for the tracks to be republished before sending the Connect event + while set.join_next().await.is_some() {} + + session.update_connection_state(ConnectionState::Connected); + session.dispatcher.dispatch(&RoomEvent::Reconnected); } }); + } + fn handle_signal_restarted( + self: &Arc, + join_response: proto::JoinResponse, + tx: oneshot::Sender<()>, + ) { + self.local_participant + .update_info(join_response.participant.unwrap()); // The sid may have changed + + self.handle_participant_update(join_response.other_participants); let _ = tx.send(()); } diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index c8dabe2d..64c9771d 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -20,30 +20,33 @@ use crate::rtc_engine::lk_runtime::LkRuntime; use crate::rtc_engine::rtc_session::{RtcSession, SessionEvent, SessionEvents}; use crate::DataPacketKind; use libwebrtc::prelude::*; -use libwebrtc::session_description::SdpParseError; use livekit_api::signal_client::{SignalError, SignalOptions}; use livekit_protocol as proto; -use parking_lot::Mutex; +use parking_lot::RwLock; +use parking_lot::RwLockReadGuard; +use std::borrow::Cow; use std::fmt::Debug; -use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; use thiserror::Error; -use tokio::sync::RwLock as AsyncRwLock; use tokio::sync::{mpsc, oneshot}; use tokio::sync::{Mutex as AsyncMutex, Notify}; +use tokio::sync::{RwLock as AsyncRwLock, RwLockReadGuard as AsyncRwLockReadGuard}; use tokio::task::JoinHandle; -use tokio::time::{interval, Interval, MissedTickBehavior}; +use tokio::time::{interval, Interval}; pub mod lk_runtime; mod peer_transport; mod rtc_events; mod rtc_session; -pub(crate) type EngineEmitter = mpsc::Sender; -pub(crate) type EngineEvents = mpsc::Receiver; +pub(crate) type EngineEmitter = mpsc::UnboundedSender; +pub(crate) type EngineEvents = mpsc::UnboundedReceiver; pub(crate) type EngineResult = Result; +pub const RECONNECT_ATTEMPTS: u32 = 10; +pub const RECONNECT_INTERVAL: Duration = Duration::from_secs(5); + #[derive(Debug, Clone, Copy, Eq, PartialEq)] pub enum SimulateScenario { SignalReconnect, @@ -61,18 +64,16 @@ pub enum EngineError { Signal(#[from] SignalError), #[error("internal webrtc failure")] Rtc(#[from] RtcError), - #[error("failed to parse sdp")] - Parse(#[from] SdpParseError), - #[error("serde error")] - Serde(#[from] serde_json::Error), - #[error("failed to send data to the datachannel")] - Data(#[from] DataChannelError), #[error("connection error: {0}")] - Connection(String), - #[error("decode error")] - Decode(#[from] prost::DecodeError), + Connection(Cow<'static, str>), // Connectivity issues (Failed to connect/reconnect) #[error("internal error: {0}")] - Internal(String), // Unexpected error + Internal(Cow<'static, str>), // Unexpected error, generally we can't recover +} + +#[derive(Default, Debug, Clone)] +pub struct EngineOptions { + pub rtc_config: RtcConfiguration, + pub signal_options: SignalOptions, } #[derive(Debug)] @@ -83,7 +84,6 @@ pub enum EngineEvent { MediaTrack { track: MediaStreamTrack, stream: MediaStream, - receiver: RtpReceiver, transceiver: RtpTransceiver, }, Data { @@ -97,116 +97,75 @@ pub enum EngineEvent { ConnectionQuality { updates: Vec, }, - /// The following events are used to notify the room about the reconnection state /// Since the room needs to also sync state in a good timing with the server. /// We synchronize the state with a one-shot channel. Resuming(oneshot::Sender<()>), Resumed(oneshot::Sender<()>), - SignalResumed(oneshot::Sender<()>), + SignalResumed { + reconnect_response: proto::ReconnectResponse, + tx: oneshot::Sender<()>, + }, Restarting(oneshot::Sender<()>), Restarted(oneshot::Sender<()>), - SignalRestarted(oneshot::Sender<()>), - + SignalRestarted { + join_response: proto::JoinResponse, + tx: oneshot::Sender<()>, + }, Disconnected { reason: DisconnectReason, }, } -pub const RECONNECT_ATTEMPTS: u32 = 10; -pub const RECONNECT_INTERVAL: Duration = Duration::from_secs(5); - -/// Represents a running RTCSession with the ability to close the session +/// Represents a running RtcSession with the ability to close the session /// and the engine_task #[derive(Debug)] struct EngineHandle { - session: RtcSession, - engine_task: JoinHandle<()>, - close_sender: oneshot::Sender<()>, -} - -#[derive(Default, Debug, Clone)] -pub struct EngineOptions { - pub rtc_config: RtcConfiguration, - pub signal_options: SignalOptions, -} - -#[derive(Default, Debug, Clone)] -pub struct LastInfo { - // The join response is updated each time a full reconnect is done - pub join_response: proto::JoinResponse, - - // The last offer/answer exchanged during the last session - pub subscriber_offer: Option, - pub subscriber_answer: Option, - - pub data_channels_info: Vec, + session: Arc, + closed: bool, + reconnecting: bool, + + // If full_reconnect is true, the next attempt will not try to resume + // and will instead do a full reconnect + full_reconnect: bool, + engine_task: Option<(JoinHandle<()>, oneshot::Sender<()>)>, } struct EngineInner { // Keep a strong reference to LkRuntime to avoid creating a new RtcRuntime or PeerConnection factory accross multiple Rtc sessions - #[allow(dead_code)] lk_runtime: Arc, - engine_emitter: EngineEmitter, - + engine_tx: EngineEmitter, options: EngineOptions, - // Last/current session states (needed by the room) - last_info: Mutex, - running_handle: AsyncRwLock>, + close_notifier: Arc, + running_handle: RwLock, - // Reconnecting fields - closed: AtomicBool, // True if closed or the reconnection failed (Note that this is false when reconnecting or resuming) - reconnecting: AtomicBool, - full_reconnect: AtomicBool, // If true, the next reconnect attempt will skip resume and directly try a full reconnect - reconnect_interval: AsyncMutex, - reconnect_notifier: Arc, // Called when the reconnection task finisehd, successful or not + // The lock is write guarded for the whole reconnection time. + // We can simply wait for reconnection by trying to acquire a read lock. + // (This also prevents new reconnection to happens if a read guard is still held) + reconnecting_lock: AsyncRwLock<()>, + reconnecting_interval: AsyncMutex, } -impl Debug for EngineInner { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("EngineInner") - .field("closed", &self.closed) - .field("reconnecting", &self.reconnecting) - .field("full_reconnect", &self.full_reconnect) - .finish() - } -} - -#[derive(Debug)] pub struct RtcEngine { inner: Arc, } +impl Debug for RtcEngine { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RtcEngine").finish() + } +} + impl RtcEngine { pub async fn connect( url: &str, token: &str, options: EngineOptions, - ) -> EngineResult<(Self, EngineEvents)> { - let (engine_emitter, engine_events) = mpsc::channel(8); - - let mut reconnect_interval = interval(RECONNECT_INTERVAL); - reconnect_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); - - let inner = Arc::new(EngineInner { - lk_runtime: LkRuntime::instance(), - running_handle: Default::default(), - engine_emitter, - - options: options.clone(), - - last_info: Default::default(), - closed: Default::default(), - reconnecting: Default::default(), - full_reconnect: Default::default(), - - reconnect_interval: AsyncMutex::new(reconnect_interval), - reconnect_notifier: Arc::new(Notify::new()), - }); - - inner.connect(url, token, options).await?; - Ok((Self { inner }, engine_events)) + ) -> EngineResult<(Self, proto::JoinResponse, EngineEvents)> { + let (inner, join_response, engine_events) = + EngineInner::connect(url, token, options).await?; + Ok((Self { inner }, join_response, engine_events)) } pub async fn close(&self) { @@ -218,32 +177,36 @@ impl RtcEngine { data: &proto::DataPacket, kind: DataPacketKind, ) -> EngineResult<()> { - // Make sure we are connected before trying to send data - self.inner.wait_reconnection().await?; - let handle = self.inner.running_handle.read().await; - let session = &handle.as_ref().unwrap().session; + let (session, _r_lock) = { + let (handle, _r_lock) = self.inner.wait_reconnection().await?; + (handle.session.clone(), _r_lock) + }; + session.publish_data(data, kind).await } pub async fn simulate_scenario(&self, scenario: SimulateScenario) -> EngineResult<()> { - self.inner.wait_reconnection().await?; - let handle = self.inner.running_handle.read().await; - let session = &handle.as_ref().unwrap().session; + let (session, _r_lock) = { + let (handle, _r_lock) = self.inner.wait_reconnection().await?; + (handle.session.clone(), _r_lock) + }; session.simulate_scenario(scenario).await } pub async fn add_track(&self, req: proto::AddTrackRequest) -> EngineResult { - self.inner.wait_reconnection().await?; - let handle = self.inner.running_handle.read().await; - let session = &handle.as_ref().unwrap().session; + let (session, _r_lock) = { + let (handle, _r_lock) = self.inner.wait_reconnection().await?; + (handle.session.clone(), _r_lock) + }; session.add_track(req).await } pub async fn remove_track(&self, sender: RtpSender) -> EngineResult<()> { - self.inner.wait_reconnection().await?; - let handle = self.inner.running_handle.read().await; - let session = &handle.as_ref().unwrap().session; - session.remove_track(sender).await + // We don't need to wait for the reconnection + let session = self.inner.running_handle.read().session.clone(); + session.remove_track(sender).await // TODO(theomonnom): Ignore errors where this + // RtpSender is bound to the old session. (Can + // happen on bad timing and it is safe to ignore) } pub async fn create_sender( @@ -252,40 +215,76 @@ impl RtcEngine { options: TrackPublishOptions, encodings: Vec, ) -> EngineResult { - self.inner.wait_reconnection().await?; - let handle = self.inner.running_handle.read().await; - let session = &handle.as_ref().unwrap().session; + // When creating a new RtpSender, make sure we're always using the latest session + let (session, _r_lock) = { + let (handle, _r_lock) = self.inner.wait_reconnection().await?; + (handle.session.clone(), _r_lock) + }; + session.create_sender(track, options, encodings).await } pub fn publisher_negotiation_needed(&self) { let inner = self.inner.clone(); tokio::spawn(async move { - if inner.wait_reconnection().await.is_ok() { - let handle = inner.running_handle.read().await; - let session = &handle.as_ref().unwrap().session; - session.publisher_negotiation_needed() + if let Ok((handle, _)) = inner.wait_reconnection().await { + handle.session.publisher_negotiation_needed() } }); } - pub async fn send_request(&self, msg: proto::signal_request::Message) -> EngineResult<()> { - let handle = self.inner.running_handle.read().await; - - if let Some(handle) = handle.as_ref() { - handle.session.signal_client().send(msg).await; - } else { - // Should be OK to ignore (full reconnect) - } - Ok(()) + pub async fn send_request(&self, msg: proto::signal_request::Message) { + // Getting the current session is OK to do without waiting for reconnection + // SignalClient will attempt to queue the message if the session is not connected + // Also on full_reconnect, every message is OK to ignore (Since this is another RtcSession) + let session = self.inner.running_handle.read().session.clone(); + session.signal_client().send(msg).await // Returns () and automatically queues the message + // on fail } - pub fn last_info(&self) -> LastInfo { - self.inner.last_info.lock().clone() + pub fn session(&self) -> Arc { + self.inner.running_handle.read().session.clone() } } impl EngineInner { + async fn connect( + url: &str, + token: &str, + options: EngineOptions, + ) -> EngineResult<(Arc, proto::JoinResponse, EngineEvents)> { + let lk_runtime = LkRuntime::instance(); + + let (session, join_response, session_events) = + RtcSession::connect(url, token, options.clone()).await?; + let (engine_tx, engine_rx) = mpsc::unbounded_channel(); + + session.wait_pc_connection().await?; + + let inner = Arc::new(Self { + lk_runtime, + engine_tx, + close_notifier: Arc::new(Notify::new()), + running_handle: RwLock::new(EngineHandle { + session: Arc::new(session), + closed: false, + reconnecting: false, + full_reconnect: false, + engine_task: None, + }), + options, + reconnecting_lock: AsyncRwLock::default(), + reconnecting_interval: AsyncMutex::new(interval(RECONNECT_INTERVAL)), + }); + + // Start initial tasks + let (close_tx, close_rx) = oneshot::channel(); + let session_task = tokio::spawn(Self::engine_task(inner.clone(), session_events, close_rx)); + inner.running_handle.write().engine_task = Some((session_task, close_tx)); + + Ok((inner, join_response, engine_rx)) + } + async fn engine_task( self: Arc, mut session_events: SessionEvents, @@ -334,10 +333,10 @@ impl EngineInner { } => { log::info!("received session close: {}, {:?}", source, reason); if can_reconnect { - self.try_reconnect(retry_now, full_reconnect); + self.reconnection_needed(retry_now, full_reconnect); } else { // Spawning a new task because the close function wait for the engine_task to - // finish. (Where this function is called from) + // finish. (So it doesn't make sense to await it here) tokio::spawn({ let inner = self.clone(); async move { @@ -351,209 +350,166 @@ impl EngineInner { payload, kind, } => { - let _ = self - .engine_emitter - .send(EngineEvent::Data { - participant_sid, - payload, - kind, - }) - .await; + let _ = self.engine_tx.send(EngineEvent::Data { + participant_sid, + payload, + kind, + }); } SessionEvent::MediaTrack { track, stream, - receiver, transceiver, } => { - let _ = self - .engine_emitter - .send(EngineEvent::MediaTrack { - track, - stream, - receiver, - transceiver, - }) - .await; + let _ = self.engine_tx.send(EngineEvent::MediaTrack { + track, + stream, + transceiver, + }); } SessionEvent::ParticipantUpdate { updates } => { let _ = self - .engine_emitter - .send(EngineEvent::ParticipantUpdate { updates }) - .await; + .engine_tx + .send(EngineEvent::ParticipantUpdate { updates }); } SessionEvent::SpeakersChanged { speakers } => { let _ = self - .engine_emitter - .send(EngineEvent::SpeakersChanged { speakers }) - .await; + .engine_tx + .send(EngineEvent::SpeakersChanged { speakers }); } SessionEvent::ConnectionQuality { updates } => { let _ = self - .engine_emitter - .send(EngineEvent::ConnectionQuality { updates }) - .await; + .engine_tx + .send(EngineEvent::ConnectionQuality { updates }); } } Ok(()) } - async fn connect( - self: &Arc, - url: &str, - token: &str, - options: EngineOptions, - ) -> EngineResult<()> { - let mut running_handle = self.running_handle.write().await; - if running_handle.is_some() { - panic!("engine is already connected"); - } - - let (session, session_events) = RtcSession::connect(url, token, options).await?; - - let (close_sender, close_receiver) = oneshot::channel(); - let engine_task = tokio::spawn(self.clone().engine_task(session_events, close_receiver)); + /// Close the engine + /// the RtcSession is not removed so we can still access stats for e.g + async fn close(&self, reason: DisconnectReason) { + let (session, engine_task) = { + let mut running_handle = self.running_handle.write(); + running_handle.closed = true; - let engine_handle = EngineHandle { - session, - engine_task, - close_sender, + let session = running_handle.session.clone(); + let engine_task = running_handle.engine_task.take(); + (session, engine_task) }; - *running_handle = Some(engine_handle); - - // Always update the join response after a new session is created (first session or full reconnect) - drop(running_handle); - self.update_last_info().await; - - Ok(()) - } - - async fn update_last_info(&self) { - if let Some(handle) = self.running_handle.read().await.as_ref() { - let mut last_info = self.last_info.lock(); - let subscriber_pc = handle.session.subscriber().peer_connection(); - - last_info.join_response = handle.session.signal_client().join_response(); - last_info.subscriber_offer = subscriber_pc.current_remote_description(); - last_info.subscriber_answer = subscriber_pc.current_local_description(); - last_info.data_channels_info = handle.session.data_channels_info(); + if let Some((engine_task, close_tx)) = engine_task { + session.close().await; + let _ = close_tx.send(()); + let _ = engine_task.await; } + let _ = self.engine_tx.send(EngineEvent::Disconnected { reason }); } - async fn terminate_session(&self) { - if let Some(handle) = self.running_handle.write().await.take() { - handle.session.close().await; - let _ = handle.close_sender.send(()); - let _ = handle.engine_task.await; - } - } - - async fn close(&self, reason: DisconnectReason) { - self.closed.store(true, Ordering::Release); - self.terminate_session().await; - let _ = self - .engine_emitter - .send(EngineEvent::Disconnected { reason }) - .await; - } - - // Wait for the reconnection task to finish - // Return directly if no open RTCSession - async fn wait_reconnection(&self) -> EngineResult<()> { - if self.closed.load(Ordering::SeqCst) { - Err(EngineError::Connection("engine is closed".to_owned()))? - } - - if self.reconnecting.load(Ordering::Acquire) { - // If currently reconnecting, wait for the reconnect task to finish - self.reconnect_notifier.notified().await; - } - - // reconnect_task is finished here, so it is fine to try to read the RwLock here (should be a short lock) - // (the reconnection logic can lock the running_handle for a long time, e.g when resuming) - - if self.running_handle.read().await.is_none() { - Err(EngineError::Connection("reconnection failed".to_owned()))? + /// When waiting for reconnection, it ensures we're always using the latest session. + async fn wait_reconnection( + &self, + ) -> EngineResult<(RwLockReadGuard, AsyncRwLockReadGuard<()>)> { + let r_lock = self.reconnecting_lock.read().await; + let running_handle = self.running_handle.read(); + + if running_handle.closed { + // Reconnection may have failed + // TODO(theomonnom): More precise error? + return Err(EngineError::Connection("engine is closed".into())); } - Ok(()) + Ok((running_handle, r_lock)) } /// Start the reconnect task if not already started /// Ask to retry directly if `retry_now` is true /// Ask for a full reconnect if `full_reconnect` is true - fn try_reconnect(self: &Arc, retry_now: bool, full_reconnect: bool) { - if self.closed.load(Ordering::Acquire) { - return; - } - - self.full_reconnect.store(full_reconnect, Ordering::Release); - let inner = self.clone(); - if retry_now { - tokio::spawn(async move { - inner.reconnect_interval.lock().await.reset(); - }); - } + fn reconnection_needed(self: &Arc, retry_now: bool, full_reconnect: bool) { + let mut running_handle = self.running_handle.write(); + if running_handle.reconnecting { + // If we're already reconnecting just update the interval to restart a new attempt + // ASAP + + running_handle.full_reconnect = full_reconnect; + + if retry_now { + let inner = self.clone(); + tokio::spawn(async move { + inner.reconnecting_interval.lock().await.reset(); + }); + } - if self - .reconnecting - .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) - .is_err() - { return; } + running_handle.reconnecting = true; + running_handle.full_reconnect = full_reconnect; + tokio::spawn({ let inner = self.clone(); async move { - // Reconnetion logic - inner.reconnect_interval.lock().await.reset(); - inner - .full_reconnect - .store(full_reconnect, Ordering::Release); - - let res = inner.reconnect_task().await; // Wait for the reconnection task to finish - inner.reconnecting.store(false, Ordering::Release); - - if res.is_ok() { - log::info!("RtcEngine successfully recovered") - } else { - log::error!("failed to reconnect after {} attempts", RECONNECT_ATTEMPTS); - inner.close(DisconnectReason::UnknownReason).await; + // Hold the reconnection lock for the whole reconnection time + let _r_lock = inner.reconnecting_lock.write().await; + // The close function can send a signal to cancel the reconnection + + let close_notifier = inner.close_notifier.clone(); + let close_receiver = close_notifier.notified(); + tokio::pin!(close_receiver); + + tokio::select! { + _ = &mut close_receiver => { + log::info!("reconnection cancelled"); + return; + } + res = inner.reconnect_task() => { + if res.is_err() { + log::error!("failed to reconnect"); + inner.close(DisconnectReason::UnknownReason).await; + } else { + log::info!("RtcEngine successfully recovered") + } + } } - inner.reconnect_notifier.notify_waiters(); + let mut running_handle = inner.running_handle.write(); + running_handle.reconnecting = false; + + // r_lock is now dropped } }); } /// Runned every time the PeerConnection or the SignalClient is closed /// We first try to resume the connection, if it fails, we start a full reconnect. + /// NOTE: The reconnect_task must be canncellation safe async fn reconnect_task(self: &Arc) -> EngineResult<()> { // Get the latest connection info from the signal_client (including the refreshed token because the initial join token may have expired) let (url, token) = { - let running_handle = self.running_handle.read().await; - let signal_client = running_handle.as_ref().unwrap().session.signal_client(); + let running_handle = self.running_handle.read(); + let signal_client = running_handle.session.signal_client(); ( signal_client.url(), signal_client.token(), // Refreshed token ) }; - // Update last info before trying to reconnect/resume - self.update_last_info().await; - for i in 0..RECONNECT_ATTEMPTS { - if self.closed.load(Ordering::Acquire) { - // The user closed the RTCEngine, cancel the reconnection task - return Ok(()); + let (is_closed, full_reconnect) = { + let running_handle = self.running_handle.read(); + (running_handle.closed, running_handle.full_reconnect) + }; + + if is_closed { + return Err(EngineError::Connection( + "attempt canncelled, engine is closed".into(), + )); } - if self.full_reconnect.load(Ordering::SeqCst) { + if full_reconnect { if i == 0 { let (tx, rx) = oneshot::channel(); - let _ = self.engine_emitter.send(EngineEvent::Restarting(tx)).await; + let _ = self.engine_tx.send(EngineEvent::Restarting(tx)); let _ = rx.await; } @@ -565,14 +521,14 @@ impl EngineInner { log::error!("restarting connection failed: {}", err); } else { let (tx, rx) = oneshot::channel(); - let _ = self.engine_emitter.send(EngineEvent::Restarted(tx)).await; + let _ = self.engine_tx.send(EngineEvent::Restarted(tx)); let _ = rx.await; return Ok(()); } } else { if i == 0 { let (tx, rx) = oneshot::channel(); - let _ = self.engine_emitter.send(EngineEvent::Resuming(tx)).await; + let _ = self.engine_tx.send(EngineEvent::Resuming(tx)); let _ = rx.await; } @@ -580,57 +536,85 @@ impl EngineInner { if let Err(err) = self.try_resume_connection().await { log::error!("resuming connection failed: {}", err); if let EngineError::Signal(_) = err { - self.full_reconnect.store(true, Ordering::SeqCst); + let mut running_handle = self.running_handle.write(); + running_handle.full_reconnect = true; } } else { let (tx, rx) = oneshot::channel(); - let _ = self.engine_emitter.send(EngineEvent::Resumed(tx)).await; + let _ = self.engine_tx.send(EngineEvent::Resumed(tx)); let _ = rx.await; return Ok(()); } } - self.reconnect_interval.lock().await.tick().await; + self.reconnecting_interval.lock().await.tick().await; } - Err(EngineError::Connection("failed to reconnect".to_owned())) + Err(EngineError::Connection( + format!("failed to reconnect after {}", RECONNECT_ATTEMPTS).into(), + )) } /// Try to recover the connection by doing a full reconnect. - /// It recreates a new RtcSession + /// It recreates a new RtcSession (new peer connection, new signal client, new data channels, etc...) async fn try_restart_connection( self: &Arc, url: &str, token: &str, options: EngineOptions, ) -> EngineResult<()> { - self.terminate_session().await; - self.connect(url, token, options).await?; + // Close the current RtcSession and the current tasks + let (session, engine_task) = { + let mut running_handle = self.running_handle.write(); + let session = running_handle.session.clone(); + let engine_task = running_handle.engine_task.take(); + (session, engine_task) + }; + if let Some((engine_task, close_tx)) = engine_task { + session.close().await; + let _ = close_tx.send(()); + let _ = engine_task.await; + } + + let (new_session, join_response, session_events) = + RtcSession::connect(url, token, options).await?; + + // On SignalRestarted, the room will try to unpublish the local tracks + // NOTE: Doing operations that use rtc_session will not use the new one let (tx, rx) = oneshot::channel(); let _ = self - .engine_emitter - .send(EngineEvent::SignalRestarted(tx)) - .await; + .engine_tx + .send(EngineEvent::SignalRestarted { join_response, tx }); let _ = rx.await; - let handle = self.running_handle.read().await; - let session = &handle.as_ref().unwrap().session; - session.wait_pc_connection().await + new_session.wait_pc_connection().await?; + + // Only replace the current session if the new one succeed + // This is important so we can still use the old session if the new one failed + // (for example, this is important if we still want to get the stats of the old session) + // This has the drawback to not being able to use the new session on the SignalRestarted + // event. + let mut handle = self.running_handle.write(); + handle.session = Arc::new(new_session); + + let (close_tx, close_rx) = oneshot::channel(); + let task = tokio::spawn(self.clone().engine_task(session_events, close_rx)); + handle.engine_task = Some((task, close_tx)); + + Ok(()) } /// Try to restart the current session async fn try_resume_connection(&self) -> EngineResult<()> { - let handle = self.running_handle.read().await; - let session = &handle.as_ref().unwrap().session; - - session.restart().await?; + let session = self.running_handle.read().session.clone(); + let reconnect_response = session.restart().await?; let (tx, rx) = oneshot::channel(); - let _ = self - .engine_emitter - .send(EngineEvent::SignalResumed(tx)) - .await; + let _ = self.engine_tx.send(EngineEvent::SignalResumed { + reconnect_response, + tx, + }); // With SignalResumed, the room will send a SyncState message to the server let _ = rx.await; diff --git a/livekit/src/rtc_engine/rtc_events.rs b/livekit/src/rtc_engine/rtc_events.rs index 1df2f5eb..4ed470a8 100644 --- a/livekit/src/rtc_engine/rtc_events.rs +++ b/livekit/src/rtc_engine/rtc_events.rs @@ -42,7 +42,6 @@ pub enum RtcEvent { target: proto::SignalTarget, }, Track { - receiver: RtpReceiver, streams: Vec, track: MediaStreamTrack, transceiver: RtpTransceiver, @@ -101,7 +100,6 @@ fn on_data_channel( fn on_track(target: proto::SignalTarget, emitter: RtcEmitter) -> rtc::peer_connection::OnTrack { Box::new(move |event| { let _ = emitter.send(RtcEvent::Track { - receiver: event.receiver, streams: event.streams, track: event.track, transceiver: event.transceiver, diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index ed3c7694..94c9c89e 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -28,6 +28,7 @@ use livekit_protocol as proto; use parking_lot::Mutex; use prost::Message; use proto::debouncer::{self, Debouncer}; +use proto::SignalTarget; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::convert::TryInto; @@ -61,7 +62,6 @@ pub enum SessionEvent { MediaTrack { track: MediaStreamTrack, stream: MediaStream, - receiver: RtpReceiver, transceiver: RtpTransceiver, }, SpeakersChanged { @@ -105,32 +105,32 @@ struct SessionInner { // Keep a strong reference to the subscriber datachannels, // so we can receive data from other participants - subscriber_dc: Mutex>, + sub_lossy_dc: Mutex>, + sub_reliable_dc: Mutex>, closed: AtomicBool, emitter: SessionEmitter, options: EngineOptions, - negotiation_debouncer: Mutex>, } -impl Debug for SessionInner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("SessionInner") - .field("has_published", &self.has_published) - .field("closed", &self.closed) - .finish() - } -} - /// This struct holds a WebRTC session /// The session changes at every reconnection /// /// RTCSession is also responsable for the signaling and the negotation -#[derive(Debug)] pub struct RtcSession { inner: Arc, + handle: Mutex>, +} + +impl Debug for RtcSession { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RtcSession").finish() + } +} + +struct SessionHandle { close_tx: watch::Sender, // false = is_running signal_task: JoinHandle<()>, rtc_task: JoinHandle<()>, @@ -141,8 +141,8 @@ impl RtcSession { url: &str, token: &str, options: EngineOptions, - ) -> EngineResult<(Self, SessionEvents)> { - let (session_emitter, session_events) = mpsc::unbounded_channel(); + ) -> EngineResult<(Self, proto::JoinResponse, SessionEvents)> { + let (emitter, session_events) = mpsc::unbounded_channel(); let (signal_client, join_response, signal_events) = SignalClient::connect(url, token, options.signal_options.clone()).await?; @@ -150,7 +150,7 @@ impl RtcSession { log::info!("received JoinResponse: {:?}", join_response); let (rtc_emitter, rtc_events) = mpsc::unbounded_channel(); - let rtc_config = make_rtc_config_join(join_response, options.rtc_config.clone()); + let rtc_config = make_rtc_config_join(join_response.clone(), options.rtc_config.clone()); let lk_runtime = LkRuntime::instance(); let mut publisher_pc = PeerTransport::new( @@ -197,9 +197,10 @@ impl RtcSession { pending_tracks: Default::default(), lossy_dc, reliable_dc, - subscriber_dc: Default::default(), + sub_lossy_dc: Mutex::new(None), + sub_reliable_dc: Mutex::new(None), closed: Default::default(), - emitter: session_emitter, + emitter, options, negotiation_debouncer: Default::default(), }); @@ -208,17 +209,17 @@ impl RtcSession { let signal_task = tokio::spawn(inner.clone().signal_task(signal_events, close_rx.clone())); let rtc_task = tokio::spawn(inner.clone().rtc_session_task(rtc_events, close_rx)); - inner.wait_pc_connection().await?; + let handle = Mutex::new(Some(SessionHandle { + close_tx, + signal_task, + rtc_task, + })); - Ok(( - Self { - inner, - close_tx, - signal_task, - rtc_task, - }, - session_events, - )) + Ok((Self { inner, handle }, join_response, session_events)) + } + + pub fn has_published(&self) -> bool { + self.inner.has_published.load(Ordering::Acquire) } pub async fn add_track(&self, req: proto::AddTrackRequest) -> EngineResult { @@ -243,11 +244,14 @@ impl RtcSession { } /// Close the PeerConnections and the SignalClient - pub async fn close(self) { + pub async fn close(&self) { // Close the tasks - let _ = self.close_tx.send(true); - let _ = self.rtc_task.await; - let _ = self.signal_task.await; + let handle = self.handle.lock().take(); + if let Some(handle) = handle { + let _ = handle.close_tx.send(true); + let _ = handle.rtc_task.await; + let _ = handle.signal_task.await; + } // Close the PeerConnections after the task // So if a sensitive operation is running, we can wait for it @@ -262,7 +266,7 @@ impl RtcSession { self.inner.publish_data(data, kind).await } - pub async fn restart(&self) -> EngineResult<()> { + pub async fn restart(&self) -> EngineResult { self.inner.restart().await } @@ -278,7 +282,6 @@ impl RtcSession { self.inner.simulate_scenario(scenario).await } - #[allow(dead_code)] pub fn publisher(&self) -> &PeerTransport { &self.inner.publisher_pc } @@ -291,13 +294,8 @@ impl RtcSession { &self.inner.signal_client } - #[allow(dead_code)] - pub fn data_channel(&self, kind: DataPacketKind) -> &DataChannel { - self.inner.data_channel(kind) - } - - pub fn data_channels_info(&self) -> Vec { - self.inner.data_channels_info() + pub fn data_channel(&self, target: SignalTarget, kind: DataPacketKind) -> Option { + self.inner.data_channel(target, kind) } } @@ -395,12 +393,13 @@ impl SessionInner { proto::signal_response::Message::Answer(answer) => { log::debug!("received publisher answer: {:?}", answer); let answer = - SessionDescription::parse(&answer.sdp, answer.r#type.parse().unwrap())?; + SessionDescription::parse(&answer.sdp, answer.r#type.parse().unwrap()).unwrap(); // Unwrap is ok, the server shouldn't give us an invalid sdp self.publisher_pc.set_remote_description(answer).await?; } proto::signal_response::Message::Offer(offer) => { log::debug!("received subscriber offer: {:?}", offer); - let offer = SessionDescription::parse(&offer.sdp, offer.r#type.parse().unwrap())?; + let offer = + SessionDescription::parse(&offer.sdp, offer.r#type.parse().unwrap()).unwrap(); let answer = self .subscriber_pc .create_anwser(offer, AnswerOptions::default()) @@ -418,8 +417,10 @@ impl SessionInner { proto::signal_response::Message::Trickle(trickle) => { let target = trickle.target(); let ice_candidate = { - let json = serde_json::from_str::(&trickle.candidate_init)?; - IceCandidate::parse(&json.sdp_mid, json.sdp_m_line_index, &json.candidate)? + let json = + serde_json::from_str::(&trickle.candidate_init).unwrap(); + IceCandidate::parse(&json.sdp_mid, json.sdp_m_line_index, &json.candidate) + .unwrap() }; log::debug!("remote ice_candidate {:?} {:?}", ice_candidate, target); @@ -481,7 +482,8 @@ impl SessionInner { sdp_mid: ice_candidate.sdp_mid(), sdp_m_line_index: ice_candidate.sdp_mline_index(), candidate: ice_candidate.candidate(), - })?, + }) + .unwrap(), target: target as i32, }, )) @@ -506,7 +508,13 @@ impl SessionInner { target, } => { log::debug!("received data channel: {:?} {:?}", data_channel, target); - self.subscriber_dc.lock().push(data_channel); + if target == SignalTarget::Subscriber { + if data_channel.label() == LOSSY_DC_LABEL { + self.sub_lossy_dc.lock().replace(data_channel); + } else if data_channel.label() == RELIABLE_DC_LABEL { + self.sub_reliable_dc.lock().replace(data_channel); + } + } } RtcEvent::Offer { offer, target: _ } => { // Send the publisher offer to the server @@ -521,7 +529,6 @@ impl SessionInner { .await; } RtcEvent::Track { - receiver, mut streams, track, transceiver, @@ -531,7 +538,6 @@ impl SessionInner { let _ = self.emitter.send(SessionEvent::MediaTrack { stream: streams.remove(0), track, - receiver, transceiver, }); } else { @@ -541,11 +547,11 @@ impl SessionInner { RtcEvent::Data { data, binary } => { if !binary { Err(EngineError::Internal( - "text messages aren't supported".to_string(), + "text messages aren't supported".into(), ))?; } - let data = proto::DataPacket::decode(&*data)?; + let data = proto::DataPacket::decode(&*data).unwrap(); match data.value.as_ref().unwrap() { proto::data_packet::Value::User(user) => { let _ = self.emitter.send(SessionEvent::Data { @@ -568,7 +574,7 @@ impl SessionInner { { let mut pendings_tracks = self.pending_tracks.lock(); if pendings_tracks.contains_key(&req.cid) { - Err(EngineError::Internal("track already published".to_string()))?; + Err(EngineError::Internal("track already published".into()))?; } pendings_tracks.insert(cid.clone(), tx); @@ -583,12 +589,10 @@ impl SessionInner { Ok(info) = rx => Ok(info), _ = sleep(TRACK_PUBLISH_TIMEOUT) => { self.pending_tracks.lock().remove(&cid); - Err(EngineError::Internal("track publication timed out, no response received from the server".to_string())) + Err(EngineError::Internal("track publication timed out, no response received from the server".into())) }, else => { - Err(EngineError::Internal( - "track publication cancelled".to_string(), - )) + Err(EngineError::Internal("track publication cancelled".into())) } } } @@ -782,18 +786,21 @@ impl SessionInner { kind: DataPacketKind, ) -> Result<(), EngineError> { self.ensure_publisher_connected(kind).await?; - self.data_channel(kind) + self.data_channel(SignalTarget::Publisher, kind) + .unwrap() .send(&data.encode_to_vec(), true) - .map_err(Into::into) + .map_err(|err| { + EngineError::Internal(format!("failed to send data packet {:?}", err).into()) + }) } /// This reconnection if more seemless compared to the full reconnection implemented in ['RTCEngine'] - async fn restart(&self) -> EngineResult<()> { + async fn restart(&self) -> EngineResult { let reconnect_response = self.signal_client.restart().await?; log::info!("received reconnect response: {:?}", reconnect_response); let rtc_config = - make_rtc_config_reconnect(reconnect_response, self.options.rtc_config.clone()); + make_rtc_config_reconnect(reconnect_response.clone(), self.options.rtc_config.clone()); self.publisher_pc .peer_connection() .set_configuration(rtc_config.clone())?; @@ -801,7 +808,7 @@ impl SessionInner { .peer_connection() .set_configuration(rtc_config)?; - Ok(()) + Ok(reconnect_response) } async fn restart_publisher(&self) -> EngineResult<()> { @@ -823,7 +830,7 @@ impl SessionInner { || (self.has_published.load(Ordering::Acquire) && !self.publisher_pc.is_connected()) { if self.closed.load(Ordering::Acquire) { - return Err(EngineError::Connection("closed".to_string())); + return Err(EngineError::Connection("closed".into())); } tokio::time::sleep(Duration::from_millis(50)).await; @@ -835,7 +842,7 @@ impl SessionInner { tokio::select! { res = wait_connected => res, _ = sleep(ICE_CONNECT_TIMEOUT) => { - let err = EngineError::Connection("wait_pc_connection timed out".to_string()); + let err = EngineError::Connection("wait_pc_connection timed out".into()); Err(err) } } @@ -879,7 +886,7 @@ impl SessionInner { self.publisher_negotiation_needed(); } - let dc = self.data_channel(kind); + let dc = self.data_channel(SignalTarget::Publisher, kind).unwrap(); if dc.state() == DataState::Open { return Ok(()); } @@ -888,7 +895,7 @@ impl SessionInner { let wait_connected = async { while !self.publisher_pc.is_connected() || dc.state() != DataState::Open { if self.closed.load(Ordering::Acquire) { - return Err(EngineError::Connection("closed".to_string())); + return Err(EngineError::Connection("closed".into())); } tokio::time::sleep(Duration::from_millis(50)).await; @@ -900,49 +907,30 @@ impl SessionInner { tokio::select! { res = wait_connected => res, _ = sleep(ICE_CONNECT_TIMEOUT) => { - let err = EngineError::Connection("could not establish publisher connection: timeout".to_string()); + let err = EngineError::Connection("could not establish publisher connection: timeout".into()); log::error!("{}", err); Err(err) } } } - fn data_channel(&self, kind: DataPacketKind) -> &DataChannel { - if kind == DataPacketKind::Reliable { - &self.reliable_dc + fn data_channel(&self, target: SignalTarget, kind: DataPacketKind) -> Option { + if target == SignalTarget::Publisher { + if kind == DataPacketKind::Reliable { + Some(self.reliable_dc.clone()) + } else { + Some(self.lossy_dc.clone()) + } + } else if target == SignalTarget::Subscriber { + if kind == DataPacketKind::Reliable { + self.sub_reliable_dc.lock().clone() + } else { + self.sub_lossy_dc.lock().clone() + } } else { - &self.lossy_dc + unreachable!() } } - - /// Used to send client states to the server on migration - fn data_channels_info(&self) -> Vec { - let mut vec = Vec::with_capacity(4); - - if self.has_published.load(Ordering::Acquire) { - vec.push(proto::DataChannelInfo { - label: self.lossy_dc.label(), - id: self.lossy_dc.id() as u32, - target: proto::SignalTarget::Publisher as i32, - }); - - vec.push(proto::DataChannelInfo { - label: self.reliable_dc.label(), - id: self.reliable_dc.id() as u32, - target: proto::SignalTarget::Publisher as i32, - }); - } - - for dc in self.subscriber_dc.lock().iter() { - vec.push(proto::DataChannelInfo { - label: dc.label(), - id: dc.id() as u32, - target: proto::SignalTarget::Subscriber as i32, - }); - } - - vec - } } macro_rules! make_rtc_config {