Skip to content

Commit

Permalink
fix: muted events & capture_audio response (#171)
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom authored Sep 4, 2023
1 parent 0edb535 commit d1f8d4f
Show file tree
Hide file tree
Showing 10 changed files with 202 additions and 84 deletions.
4 changes: 0 additions & 4 deletions examples/wgpu_room/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,6 @@ impl LkApp {
RoomEvent::Disconnected { reason: _ } => {
self.video_renderers.clear();
}
RoomEvent::E2eeStateChanged { participant, state } => {
let identity = participant.identity();
log::info!("e2ee state changed {} - {:?}", identity, state)
}
_ => {}
}
}
Expand Down
4 changes: 2 additions & 2 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ fn on_publish_track(
Ok(ffi_participant.room.publish_track(server, publish))
}

// Unpublish a local track
fn on_unpublish_track(
server: &'static FfiServer,
unpublish: proto::UnpublishTrackRequest,
Expand Down Expand Up @@ -398,8 +399,7 @@ fn on_capture_audio_frame(
push: proto::CaptureAudioFrameRequest,
) -> FfiResult<proto::CaptureAudioFrameResponse> {
let source = server.retrieve_handle::<audio_source::FfiAudioSource>(push.source_handle)?;
source.capture_frame(server, push)?;
Ok(proto::CaptureAudioFrameResponse::default())
source.capture_frame(server, push)
}

/// Create a new audio resampler
Expand Down
133 changes: 94 additions & 39 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub enum RoomEvent {
publication: RemoteTrackPublication,
participant: RemoteParticipant,
},
// TODO(theomonnom): Should we also add track for muted events?
TrackMuted {
participant: Participant,
publication: TrackPublication,
Expand Down Expand Up @@ -242,6 +243,28 @@ impl Room {
}
});

local_participant.on_track_muted({
let dispatcher = dispatcher.clone();
move |participant, publication| {
let event = RoomEvent::TrackMuted {
participant,
publication,
};
dispatcher.dispatch(&event);
}
});

local_participant.on_track_unmuted({
let dispatcher = dispatcher.clone();
move |participant, publication| {
let event = RoomEvent::TrackUnmuted {
participant,
publication,
};
dispatcher.dispatch(&event);
}
});

let room_info = join_response.room.unwrap();
let inner = Arc::new(RoomSession {
sid: room_info.sid.try_into().unwrap(),
Expand Down Expand Up @@ -759,53 +782,85 @@ impl RoomSession {
metadata,
);

let dispatcher = self.dispatcher.clone();
participant.on_track_published(move |participant, publication| {
dispatcher.dispatch(&RoomEvent::TrackPublished {
participant,
publication,
});
participant.on_track_published({
let dispatcher = self.dispatcher.clone();
move |participant, publication| {
dispatcher.dispatch(&RoomEvent::TrackPublished {
participant,
publication,
});
}
});

let dispatcher = self.dispatcher.clone();
participant.on_track_unpublished(move |participant, publication| {
dispatcher.dispatch(&RoomEvent::TrackUnpublished {
participant,
publication,
});
participant.on_track_unpublished({
let dispatcher = self.dispatcher.clone();
move |participant, publication| {
dispatcher.dispatch(&RoomEvent::TrackUnpublished {
participant,
publication,
});
}
});

let dispatcher = self.dispatcher.clone();
let e2ee_manager = self.e2ee_manager.clone();
participant.on_track_subscribed(move |participant, publication, track| {
let event = RoomEvent::TrackSubscribed {
participant: participant.clone(),
track: track.clone(),
publication: publication.clone(),
};
e2ee_manager.on_track_subscribed(track, publication, participant);
dispatcher.dispatch(&event);
participant.on_track_subscribed({
let dispatcher = self.dispatcher.clone();
let e2ee_manager = self.e2ee_manager.clone();
move |participant, publication, track| {
let event = RoomEvent::TrackSubscribed {
participant: participant.clone(),
track: track.clone(),
publication: publication.clone(),
};
e2ee_manager.on_track_subscribed(track, publication, participant);
dispatcher.dispatch(&event);
}
});

let dispatcher = self.dispatcher.clone();
let e2ee_manager = self.e2ee_manager.clone();
participant.on_track_unsubscribed(move |participant, publication, track| {
let event = RoomEvent::TrackUnsubscribed {
participant: participant.clone(),
track: track.clone(),
publication: publication.clone(),
};
e2ee_manager.on_track_unsubscribed(track, publication, participant);
dispatcher.dispatch(&event);
participant.on_track_unsubscribed({
let dispatcher = self.dispatcher.clone();
let e2ee_manager = self.e2ee_manager.clone();
move |participant, publication, track| {
let event = RoomEvent::TrackUnsubscribed {
participant: participant.clone(),
track: track.clone(),
publication: publication.clone(),
};
e2ee_manager.on_track_unsubscribed(track, publication, participant);
dispatcher.dispatch(&event);
}
});

let dispatcher = self.dispatcher.clone();
participant.on_track_subscription_failed(move |participant, track_sid, error| {
dispatcher.dispatch(&RoomEvent::TrackSubscriptionFailed {
participant,
track_sid,
error,
});
participant.on_track_subscription_failed({
let dispatcher = self.dispatcher.clone();
move |participant, track_sid, error| {
dispatcher.dispatch(&RoomEvent::TrackSubscriptionFailed {
participant,
track_sid,
error,
});
}
});

participant.on_track_muted({
let dispatcher = self.dispatcher.clone();
move |participant, publication| {
let event = RoomEvent::TrackMuted {
participant,
publication,
};
dispatcher.dispatch(&event);
}
});

participant.on_track_unmuted({
let dispatcher = self.dispatcher.clone();
move |participant, publication| {
let event = RoomEvent::TrackUnmuted {
participant,
publication,
};
dispatcher.dispatch(&event);
}
});

self.participants.write().insert(sid, participant.clone());
Expand Down
25 changes: 18 additions & 7 deletions livekit/src/room/participant/local_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,29 +97,40 @@ impl LocalParticipant {
super::set_connection_quality(&self.inner, &Participant::Local(self.clone()), quality);
}

#[allow(dead_code)]
pub(crate) fn on_local_track_published(
&self,
handler: impl Fn(LocalParticipant, LocalTrackPublication) + Send + 'static,
) {
*self.local.events.local_track_published.lock() = Some(Box::new(handler));
}

#[allow(dead_code)]
pub(crate) fn on_local_track_unpublished(
&self,
handler: impl Fn(LocalParticipant, LocalTrackPublication) + Send + 'static,
) {
*self.local.events.local_track_unpublished.lock() = Some(Box::new(handler));
}

pub(crate) fn on_track_muted(
&self,
handler: impl Fn(Participant, TrackPublication) + Send + 'static,
) {
super::on_track_muted(&self.inner, handler)
}

pub(crate) fn on_track_unmuted(
&self,
handler: impl Fn(Participant, TrackPublication) + Send + 'static,
) {
super::on_track_unmuted(&self.inner, handler)
}

pub(crate) fn add_publication(&self, publication: TrackPublication) {
super::add_publication(&self.inner, &Participant::Local(self.clone()), publication);
}

#[allow(dead_code)]
pub(crate) fn remove_publication(&self, sid: &TrackSid) {
super::remove_publication(&self.inner, &Participant::Local(self.clone()), sid);
pub(crate) fn remove_publication(&self, sid: &TrackSid) -> Option<TrackPublication> {
super::remove_publication(&self.inner, &Participant::Local(self.clone()), sid)
}

pub(crate) fn published_tracks_info(&self) -> Vec<proto::TrackPublishedResponse> {
Expand Down Expand Up @@ -210,10 +221,10 @@ impl LocalParticipant {

pub async fn unpublish_track(
&self,
track: &TrackSid,
sid: &TrackSid,
// _stop_on_unpublish: bool,
) -> RoomResult<LocalTrackPublication> {
let publication = self.inner.tracks.write().remove(track);
let publication = self.remove_publication(sid);
if let Some(TrackPublication::Local(publication)) = publication {
let track = publication.track().unwrap();
let sender = track.transceiver().unwrap().sender();
Expand Down
48 changes: 33 additions & 15 deletions livekit/src/room/participant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Participant {
pub(crate) fn set_audio_level(self: &Self, level: f32) -> ();
pub(crate) fn set_connection_quality(self: &Self, quality: ConnectionQuality) -> ();
pub(crate) fn add_publication(self: &Self, publication: TrackPublication) -> ();
pub(crate) fn remove_publication(self: &Self, sid: &TrackSid) -> ();
pub(crate) fn remove_publication(self: &Self, sid: &TrackSid) -> Option<TrackPublication>;
);

pub fn tracks(&self) -> HashMap<TrackSid, TrackPublication> {
Expand All @@ -79,8 +79,8 @@ struct ParticipantInfo {
pub connection_quality: ConnectionQuality,
}

type TrackMutedHandler = Box<dyn Fn(Participant, TrackPublication, Track) + Send>;
type TrackUnmutedHandler = Box<dyn Fn(Participant, TrackPublication, Track) + Send>;
type TrackMutedHandler = Box<dyn Fn(Participant, TrackPublication) + Send>;
type TrackUnmutedHandler = Box<dyn Fn(Participant, TrackPublication) + Send>;

#[derive(Default)]
struct ParticipantEvents {
Expand Down Expand Up @@ -154,6 +154,20 @@ pub(super) fn set_connection_quality(
inner.info.write().connection_quality = quality;
}

pub(super) fn on_track_muted(
inner: &Arc<ParticipantInner>,
handler: impl Fn(Participant, TrackPublication) + Send + 'static,
) {
*inner.events.track_muted.lock() = Some(Box::new(handler));
}

pub(super) fn on_track_unmuted(
inner: &Arc<ParticipantInner>,
handler: impl Fn(Participant, TrackPublication) + Send + 'static,
) {
*inner.events.track_unmuted.lock() = Some(Box::new(handler));
}

pub(super) fn remove_publication(
inner: &Arc<ParticipantInner>,
_participant: &Participant,
Expand All @@ -163,8 +177,8 @@ pub(super) fn remove_publication(
let publication = tracks.remove(sid);
if let Some(publication) = publication.clone() {
// remove events
publication.on_muted(|_, _| {});
publication.on_unmuted(|_, _| {});
publication.on_muted(|_| {});
publication.on_unmuted(|_| {});
} else {
// shouldn't happen (internal)
log::warn!("could not find publication to remove: {:?}", sid);
Expand All @@ -181,19 +195,23 @@ pub(super) fn add_publication(
let mut tracks = inner.tracks.write();
tracks.insert(publication.sid(), publication.clone());

let events = inner.events.clone();
let particiant = participant.clone();
publication.on_muted(move |publication, track| {
if let Some(cb) = events.track_muted.lock().as_ref() {
cb(particiant.clone(), publication, track);
publication.on_muted({
let events = inner.events.clone();
let participant = participant.clone();
move |publication| {
if let Some(cb) = events.track_muted.lock().as_ref() {
cb(participant.clone(), publication);
}
}
});

let events = inner.events.clone();
let participant = participant.clone();
publication.on_unmuted(move |publication, track| {
if let Some(cb) = events.track_unmuted.lock().as_ref() {
cb(participant.clone(), publication, track);
publication.on_unmuted({
let events = inner.events.clone();
let participant = participant.clone();
move |publication| {
if let Some(cb) = events.track_unmuted.lock().as_ref() {
cb(participant.clone(), publication);
}
}
});
}
20 changes: 18 additions & 2 deletions livekit/src/room/participant/remote_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,20 @@ impl RemoteParticipant {
Some(Box::new(track_subscription_failed));
}

pub(crate) fn on_track_muted(
&self,
handler: impl Fn(Participant, TrackPublication) + Send + 'static,
) {
super::on_track_muted(&self.inner, handler)
}

pub(crate) fn on_track_unmuted(
&self,
handler: impl Fn(Participant, TrackPublication) + Send + 'static,
) {
super::on_track_unmuted(&self.inner, handler)
}

pub(crate) fn set_speaking(&self, speaking: bool) {
super::set_speaking(&self.inner, &Participant::Remote(self.clone()), speaking);
}
Expand Down Expand Up @@ -329,11 +343,11 @@ impl RemoteParticipant {
});
}

pub(crate) fn remove_publication(&self, sid: &TrackSid) {
pub(crate) fn remove_publication(&self, sid: &TrackSid) -> Option<TrackPublication> {
let publication =
super::remove_publication(&self.inner, &Participant::Remote(self.clone()), sid);

if let Some(publication) = publication {
if let Some(publication) = publication.clone() {
let TrackPublication::Remote(publication) = publication else {
panic!("expected remote publication");
};
Expand All @@ -342,6 +356,8 @@ impl RemoteParticipant {
publication.on_subscribed(|_, _| {});
publication.on_unsubscribed(|_, _| {});
}

publication
}

pub fn get_track_publication(&self, sid: &TrackSid) -> Option<RemoteTrackPublication> {
Expand Down
Loading

0 comments on commit d1f8d4f

Please sign in to comment.