From 087e5f2292b2c82249f579e1286a1c985654222d Mon Sep 17 00:00:00 2001 From: Kyle Simpson Date: Wed, 24 Jan 2024 09:30:50 +0000 Subject: [PATCH] Driver: Fix transition to Live on connect (#222) This PR fixes a case where a call which changes channel or gracefully reconnects would have been stuck in the Idle state. SetConn events will now allow a transition straight back to Live if any tracks are found attached to a mixer. --- src/driver/scheduler/idle.rs | 14 ++++++++++---- src/driver/scheduler/task.rs | 5 +++-- src/driver/tasks/message/mixer.rs | 7 +++++-- src/driver/tasks/udp_rx/ssrc_state.rs | 2 +- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/driver/scheduler/idle.rs b/src/driver/scheduler/idle.rs index b0dd17b37..99d153741 100644 --- a/src/driver/scheduler/idle.rs +++ b/src/driver/scheduler/idle.rs @@ -86,12 +86,18 @@ impl Idle { self.tasks.insert(id, task); }, Ok(SchedulerMessage::Do(id, mix_msg)) => { - let now_live = mix_msg.is_mixer_now_live(); + let maybe_live = mix_msg.is_mixer_maybe_live(); if let Some(task) = self.tasks.get_mut(&id) { match task.handle_message(mix_msg) { - Ok(false) if now_live => { - let task = self.tasks.remove(&id).unwrap(); - self.schedule_mixer(task, id, None); + Ok(false) if maybe_live => { + if task.mixer.tracks.is_empty() { + // No tracks, likely due to SetConn. + // Recreate message forwarding task. + task.spawn_forwarder(self.tx.clone(), id); + } else { + let task = self.tasks.remove(&id).unwrap(); + self.schedule_mixer(task, id, None); + } }, Ok(false) => {}, Ok(true) | Err(()) => self.to_cull.push(id), diff --git a/src/driver/scheduler/task.rs b/src/driver/scheduler/task.rs index 6d90fb24d..1dbff46e7 100644 --- a/src/driver/scheduler/task.rs +++ b/src/driver/scheduler/task.rs @@ -116,7 +116,7 @@ impl ParkedMixer { _ = kill_rx.recv_async() => break, msg = remote_rx.recv_async() => { let exit = if let Ok(msg) = msg { - let remove_self = msg.is_mixer_now_live(); + let remove_self = msg.is_mixer_maybe_live(); tx.send_async(SchedulerMessage::Do(id, msg)).await.is_err() || remove_self } else { true @@ -135,7 +135,8 @@ impl ParkedMixer { pub fn handle_message(&mut self, msg: MixerMessage) -> Result { match msg { MixerMessage::SetConn(conn, ssrc) => { - // Overridden because + // Overridden because payload-specific fields are carried + // externally on `ParkedMixer`. self.ssrc = ssrc; self.rtp_sequence = random::(); self.rtp_timestamp = random::(); diff --git a/src/driver/tasks/message/mixer.rs b/src/driver/tasks/message/mixer.rs index f38d4abcd..e46b5e092 100644 --- a/src/driver/tasks/message/mixer.rs +++ b/src/driver/tasks/message/mixer.rs @@ -40,8 +40,11 @@ pub enum MixerMessage { } impl MixerMessage { - pub fn is_mixer_now_live(&self) -> bool { - matches!(self, Self::AddTrack(_) | Self::SetTrack(Some(_))) + pub fn is_mixer_maybe_live(&self) -> bool { + matches!( + self, + Self::AddTrack(_) | Self::SetTrack(Some(_)) | Self::SetConn(..) + ) } } diff --git a/src/driver/tasks/udp_rx/ssrc_state.rs b/src/driver/tasks/udp_rx/ssrc_state.rs index 2b9819e5a..5bb7f891e 100644 --- a/src/driver/tasks/udp_rx/ssrc_state.rs +++ b/src/driver/tasks/udp_rx/ssrc_state.rs @@ -141,7 +141,7 @@ impl SsrcState { let mut out = vec![0; self.decode_size.len()]; for _ in 0..missed_packets { - let missing_frame: Option = None; + let missing_frame: Option> = None; let dest_samples = (&mut out[..]) .try_into() .expect("Decode logic will cap decode buffer size at i32::MAX.");