Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: muted events & capture_audio response #171

Merged
merged 4 commits into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions examples/wgpu_room/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,6 @@ impl LkApp {
RoomEvent::Disconnected { reason: _ } => {
self.video_renderers.clear();
}
RoomEvent::E2eeStateChanged { participant, state } => {
let identity = participant.identity();
log::info!("e2ee state changed {} - {:?}", identity, state)
}
_ => {}
}
}
Expand Down
4 changes: 2 additions & 2 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ fn on_publish_track(
Ok(ffi_participant.room.publish_track(server, publish))
}

// Unpublish a local track
fn on_unpublish_track(
server: &'static FfiServer,
unpublish: proto::UnpublishTrackRequest,
Expand Down Expand Up @@ -398,8 +399,7 @@ fn on_capture_audio_frame(
push: proto::CaptureAudioFrameRequest,
) -> FfiResult<proto::CaptureAudioFrameResponse> {
let source = server.retrieve_handle::<audio_source::FfiAudioSource>(push.source_handle)?;
source.capture_frame(server, push)?;
Ok(proto::CaptureAudioFrameResponse::default())
source.capture_frame(server, push)
}

/// Create a new audio resampler
Expand Down
133 changes: 94 additions & 39 deletions livekit/src/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ pub enum RoomEvent {
publication: RemoteTrackPublication,
participant: RemoteParticipant,
},
// TODO(theomonnom): Should we also add track for muted events?
TrackMuted {
participant: Participant,
publication: TrackPublication,
Expand Down Expand Up @@ -242,6 +243,28 @@ impl Room {
}
});

local_participant.on_track_muted({
let dispatcher = dispatcher.clone();
move |participant, publication| {
let event = RoomEvent::TrackMuted {
participant,
publication,
};
dispatcher.dispatch(&event);
}
});

local_participant.on_track_unmuted({
let dispatcher = dispatcher.clone();
move |participant, publication| {
let event = RoomEvent::TrackUnmuted {
participant,
publication,
};
dispatcher.dispatch(&event);
}
});

let room_info = join_response.room.unwrap();
let inner = Arc::new(RoomSession {
sid: room_info.sid.try_into().unwrap(),
Expand Down Expand Up @@ -759,53 +782,85 @@ impl RoomSession {
metadata,
);

let dispatcher = self.dispatcher.clone();
participant.on_track_published(move |participant, publication| {
dispatcher.dispatch(&RoomEvent::TrackPublished {
participant,
publication,
});
participant.on_track_published({
let dispatcher = self.dispatcher.clone();
move |participant, publication| {
dispatcher.dispatch(&RoomEvent::TrackPublished {
participant,
publication,
});
}
});

let dispatcher = self.dispatcher.clone();
participant.on_track_unpublished(move |participant, publication| {
dispatcher.dispatch(&RoomEvent::TrackUnpublished {
participant,
publication,
});
participant.on_track_unpublished({
let dispatcher = self.dispatcher.clone();
move |participant, publication| {
dispatcher.dispatch(&RoomEvent::TrackUnpublished {
participant,
publication,
});
}
});

let dispatcher = self.dispatcher.clone();
let e2ee_manager = self.e2ee_manager.clone();
participant.on_track_subscribed(move |participant, publication, track| {
let event = RoomEvent::TrackSubscribed {
participant: participant.clone(),
track: track.clone(),
publication: publication.clone(),
};
e2ee_manager.on_track_subscribed(track, publication, participant);
dispatcher.dispatch(&event);
participant.on_track_subscribed({
let dispatcher = self.dispatcher.clone();
let e2ee_manager = self.e2ee_manager.clone();
move |participant, publication, track| {
let event = RoomEvent::TrackSubscribed {
participant: participant.clone(),
track: track.clone(),
publication: publication.clone(),
};
e2ee_manager.on_track_subscribed(track, publication, participant);
dispatcher.dispatch(&event);
}
});

let dispatcher = self.dispatcher.clone();
let e2ee_manager = self.e2ee_manager.clone();
participant.on_track_unsubscribed(move |participant, publication, track| {
let event = RoomEvent::TrackUnsubscribed {
participant: participant.clone(),
track: track.clone(),
publication: publication.clone(),
};
e2ee_manager.on_track_unsubscribed(track, publication, participant);
dispatcher.dispatch(&event);
participant.on_track_unsubscribed({
let dispatcher = self.dispatcher.clone();
let e2ee_manager = self.e2ee_manager.clone();
move |participant, publication, track| {
let event = RoomEvent::TrackUnsubscribed {
participant: participant.clone(),
track: track.clone(),
publication: publication.clone(),
};
e2ee_manager.on_track_unsubscribed(track, publication, participant);
dispatcher.dispatch(&event);
}
});

let dispatcher = self.dispatcher.clone();
participant.on_track_subscription_failed(move |participant, track_sid, error| {
dispatcher.dispatch(&RoomEvent::TrackSubscriptionFailed {
participant,
track_sid,
error,
});
participant.on_track_subscription_failed({
let dispatcher = self.dispatcher.clone();
move |participant, track_sid, error| {
dispatcher.dispatch(&RoomEvent::TrackSubscriptionFailed {
participant,
track_sid,
error,
});
}
});

participant.on_track_muted({
let dispatcher = self.dispatcher.clone();
move |participant, publication| {
let event = RoomEvent::TrackMuted {
participant,
publication,
};
dispatcher.dispatch(&event);
}
});

participant.on_track_unmuted({
let dispatcher = self.dispatcher.clone();
move |participant, publication| {
let event = RoomEvent::TrackUnmuted {
participant,
publication,
};
dispatcher.dispatch(&event);
}
});

self.participants.write().insert(sid, participant.clone());
Expand Down
25 changes: 18 additions & 7 deletions livekit/src/room/participant/local_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,29 +97,40 @@ impl LocalParticipant {
super::set_connection_quality(&self.inner, &Participant::Local(self.clone()), quality);
}

#[allow(dead_code)]
pub(crate) fn on_local_track_published(
&self,
handler: impl Fn(LocalParticipant, LocalTrackPublication) + Send + 'static,
) {
*self.local.events.local_track_published.lock() = Some(Box::new(handler));
}

#[allow(dead_code)]
pub(crate) fn on_local_track_unpublished(
&self,
handler: impl Fn(LocalParticipant, LocalTrackPublication) + Send + 'static,
) {
*self.local.events.local_track_unpublished.lock() = Some(Box::new(handler));
}

pub(crate) fn on_track_muted(
&self,
handler: impl Fn(Participant, TrackPublication) + Send + 'static,
) {
super::on_track_muted(&self.inner, handler)
}

pub(crate) fn on_track_unmuted(
&self,
handler: impl Fn(Participant, TrackPublication) + Send + 'static,
) {
super::on_track_unmuted(&self.inner, handler)
}

pub(crate) fn add_publication(&self, publication: TrackPublication) {
super::add_publication(&self.inner, &Participant::Local(self.clone()), publication);
}

#[allow(dead_code)]
pub(crate) fn remove_publication(&self, sid: &TrackSid) {
super::remove_publication(&self.inner, &Participant::Local(self.clone()), sid);
pub(crate) fn remove_publication(&self, sid: &TrackSid) -> Option<TrackPublication> {
super::remove_publication(&self.inner, &Participant::Local(self.clone()), sid)
}

pub(crate) fn published_tracks_info(&self) -> Vec<proto::TrackPublishedResponse> {
Expand Down Expand Up @@ -210,10 +221,10 @@ impl LocalParticipant {

pub async fn unpublish_track(
&self,
track: &TrackSid,
sid: &TrackSid,
// _stop_on_unpublish: bool,
) -> RoomResult<LocalTrackPublication> {
let publication = self.inner.tracks.write().remove(track);
let publication = self.remove_publication(sid);
if let Some(TrackPublication::Local(publication)) = publication {
let track = publication.track().unwrap();
let sender = track.transceiver().unwrap().sender();
Expand Down
48 changes: 33 additions & 15 deletions livekit/src/room/participant/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Participant {
pub(crate) fn set_audio_level(self: &Self, level: f32) -> ();
pub(crate) fn set_connection_quality(self: &Self, quality: ConnectionQuality) -> ();
pub(crate) fn add_publication(self: &Self, publication: TrackPublication) -> ();
pub(crate) fn remove_publication(self: &Self, sid: &TrackSid) -> ();
pub(crate) fn remove_publication(self: &Self, sid: &TrackSid) -> Option<TrackPublication>;
);

pub fn tracks(&self) -> HashMap<TrackSid, TrackPublication> {
Expand All @@ -79,8 +79,8 @@ struct ParticipantInfo {
pub connection_quality: ConnectionQuality,
}

type TrackMutedHandler = Box<dyn Fn(Participant, TrackPublication, Track) + Send>;
type TrackUnmutedHandler = Box<dyn Fn(Participant, TrackPublication, Track) + Send>;
type TrackMutedHandler = Box<dyn Fn(Participant, TrackPublication) + Send>;
type TrackUnmutedHandler = Box<dyn Fn(Participant, TrackPublication) + Send>;

#[derive(Default)]
struct ParticipantEvents {
Expand Down Expand Up @@ -154,6 +154,20 @@ pub(super) fn set_connection_quality(
inner.info.write().connection_quality = quality;
}

pub(super) fn on_track_muted(
inner: &Arc<ParticipantInner>,
handler: impl Fn(Participant, TrackPublication) + Send + 'static,
) {
*inner.events.track_muted.lock() = Some(Box::new(handler));
}

pub(super) fn on_track_unmuted(
inner: &Arc<ParticipantInner>,
handler: impl Fn(Participant, TrackPublication) + Send + 'static,
) {
*inner.events.track_unmuted.lock() = Some(Box::new(handler));
}

pub(super) fn remove_publication(
inner: &Arc<ParticipantInner>,
_participant: &Participant,
Expand All @@ -163,8 +177,8 @@ pub(super) fn remove_publication(
let publication = tracks.remove(sid);
if let Some(publication) = publication.clone() {
// remove events
publication.on_muted(|_, _| {});
publication.on_unmuted(|_, _| {});
publication.on_muted(|_| {});
publication.on_unmuted(|_| {});
} else {
// shouldn't happen (internal)
log::warn!("could not find publication to remove: {:?}", sid);
Expand All @@ -181,19 +195,23 @@ pub(super) fn add_publication(
let mut tracks = inner.tracks.write();
tracks.insert(publication.sid(), publication.clone());

let events = inner.events.clone();
let particiant = participant.clone();
publication.on_muted(move |publication, track| {
if let Some(cb) = events.track_muted.lock().as_ref() {
cb(particiant.clone(), publication, track);
publication.on_muted({
let events = inner.events.clone();
let participant = participant.clone();
move |publication| {
if let Some(cb) = events.track_muted.lock().as_ref() {
cb(participant.clone(), publication);
}
}
});

let events = inner.events.clone();
let participant = participant.clone();
publication.on_unmuted(move |publication, track| {
if let Some(cb) = events.track_unmuted.lock().as_ref() {
cb(participant.clone(), publication, track);
publication.on_unmuted({
let events = inner.events.clone();
let participant = participant.clone();
move |publication| {
if let Some(cb) = events.track_unmuted.lock().as_ref() {
cb(participant.clone(), publication);
}
}
});
}
20 changes: 18 additions & 2 deletions livekit/src/room/participant/remote_participant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,20 @@ impl RemoteParticipant {
Some(Box::new(track_subscription_failed));
}

pub(crate) fn on_track_muted(
&self,
handler: impl Fn(Participant, TrackPublication) + Send + 'static,
) {
super::on_track_muted(&self.inner, handler)
}

pub(crate) fn on_track_unmuted(
&self,
handler: impl Fn(Participant, TrackPublication) + Send + 'static,
) {
super::on_track_unmuted(&self.inner, handler)
}

pub(crate) fn set_speaking(&self, speaking: bool) {
super::set_speaking(&self.inner, &Participant::Remote(self.clone()), speaking);
}
Expand Down Expand Up @@ -329,11 +343,11 @@ impl RemoteParticipant {
});
}

pub(crate) fn remove_publication(&self, sid: &TrackSid) {
pub(crate) fn remove_publication(&self, sid: &TrackSid) -> Option<TrackPublication> {
let publication =
super::remove_publication(&self.inner, &Participant::Remote(self.clone()), sid);

if let Some(publication) = publication {
if let Some(publication) = publication.clone() {
let TrackPublication::Remote(publication) = publication else {
panic!("expected remote publication");
};
Expand All @@ -342,6 +356,8 @@ impl RemoteParticipant {
publication.on_subscribed(|_, _| {});
publication.on_unsubscribed(|_, _| {});
}

publication
}

pub fn get_track_publication(&self, sid: &TrackSid) -> Option<RemoteTrackPublication> {
Expand Down
Loading