Skip to content

Commit

Permalink
fix: better reconnection logic & safety (#204)
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom authored Sep 29, 2023
1 parent 0627da4 commit 05a2d83
Show file tree
Hide file tree
Showing 7 changed files with 577 additions and 475 deletions.
1 change: 1 addition & 0 deletions libwebrtc/src/data_channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ impl DataChannel {
impl Debug for DataChannel {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("DataChannel")
.field("id", &self.id())
.field("label", &self.label())
.field("state", &self.state())
.finish()
Expand Down
62 changes: 52 additions & 10 deletions livekit-ffi/src/server/room.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ use parking_lot::Mutex;
use std::collections::HashSet;
use std::slice;
use std::sync::Arc;
use tokio::sync::Mutex as AsyncMutex;
use std::time::Duration;
use tokio::sync::{broadcast, mpsc};
use tokio::sync::{oneshot, Mutex as AsyncMutex};
use tokio::task::JoinHandle;

use super::FfiDataBuffer;
Expand Down Expand Up @@ -361,17 +362,46 @@ async fn data_task(
}
}

// The utility of this struct is to know the state we're currently processing
// (The room could have successfully reconnected while we're still processing the previous event,
// but we still didn't receive the reconnected event). The listening task is always late from
// the room tasks
struct ActualState {
reconnecting: bool,
}

/// Forward events to the ffi client
async fn room_task(
server: &'static FfiServer,
inner: Arc<RoomInner>,
mut events: mpsc::UnboundedReceiver<livekit::RoomEvent>,
mut close_rx: broadcast::Receiver<()>,
) {
let present_state = Arc::new(Mutex::new(ActualState {
reconnecting: false,
}));

loop {
tokio::select! {
Some(event) = events.recv() => {
forward_event(server, &inner, event).await;
let debug = format!("{:?}", event);
let inner = inner.clone();
let present_state = present_state.clone();
let (tx, rx) = oneshot::channel();
let task = tokio::spawn(async move {
forward_event(server, &inner, event, present_state).await;
let _ = tx.send(());
});

// Monitor sync/async blockings
tokio::select! {
_ = rx => {},
_ = tokio::time::sleep(Duration::from_secs(10)) => {
log::error!("signal_event taking too much time: {}", debug);
}
}

task.await.unwrap();
},
_ = close_rx.recv() => {
break;
Expand All @@ -387,7 +417,12 @@ async fn room_task(
.await;
}

async fn forward_event(server: &'static FfiServer, inner: &Arc<RoomInner>, event: RoomEvent) {
async fn forward_event(
server: &'static FfiServer,
inner: &Arc<RoomInner>,
event: RoomEvent,
present_state: Arc<Mutex<ActualState>>,
) {
let send_event = |event: proto::room_event::Message| {
server.send_event(proto::ffi_event::Message::RoomEvent(proto::RoomEvent {
room_handle: inner.handle_id,
Expand Down Expand Up @@ -427,15 +462,20 @@ async fn forward_event(server: &'static FfiServer, inner: &Arc<RoomInner>, event
track: _,
participant: _,
} => {
// Make sure to send the event *after* the async callback of the PublishTrackRequest
// Wait for the PublishTrack callback to be sent (waiting time is really short, so it is fine to not spawn a new task)
let sid = publication.sid();
loop {
if inner.pending_published_tracks.lock().remove(&sid) {
break;
// If we're currently reconnecting, users can't publish tracks, if we receive this
// event it means the RoomEngine is republishing tracks to finish the reconnection
// process. (So we're not waiting for any PublishCallback)
if !present_state.lock().reconnecting {
// Make sure to send the event *after* the async callback of the PublishTrackRequest
// Wait for the PublishTrack callback to be sent (waiting time is really short, so it is fine to not spawn a new task)
loop {
if inner.pending_published_tracks.lock().remove(&sid) {
break;
}
log::info!("waiting for the PublishTrack callback to be sent");
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
log::info!("waiting for the PublishTrack callback to be sent");
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}

let ffi_publication = FfiPublication {
Expand Down Expand Up @@ -648,12 +688,14 @@ async fn forward_event(server: &'static FfiServer, inner: &Arc<RoomInner>, event
.await;
}
RoomEvent::Reconnecting => {
present_state.lock().reconnecting = true;
let _ = send_event(proto::room_event::Message::Reconnecting(
proto::Reconnecting {},
))
.await;
}
RoomEvent::Reconnected => {
present_state.lock().reconnecting = false;
let _ = send_event(proto::room_event::Message::Reconnected(
proto::Reconnected {},
))
Expand Down
3 changes: 3 additions & 0 deletions livekit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ rustls-tls-native-roots = ["livekit-api/rustls-tls-native-roots"]
rustls-tls-webpki-roots = ["livekit-api/rustls-tls-webpki-roots"]
__rustls-tls = ["livekit-api/__rustls-tls"]

# internal features (used by livekit-ffi)
__lk-internal = []

[dependencies]
livekit-api = { path = "../livekit-api", version = "0.2.0", default-features = false, features = ["signal-client"] }
libwebrtc = { path = "../libwebrtc", version = "0.2.0" }
Expand Down
Loading

0 comments on commit 05a2d83

Please sign in to comment.