diff --git a/livekit-ffi/protocol/ffi.proto b/livekit-ffi/protocol/ffi.proto index 6b0b1a77..e6c2e7f4 100644 --- a/livekit-ffi/protocol/ffi.proto +++ b/livekit-ffi/protocol/ffi.proto @@ -65,28 +65,29 @@ message FfiRequest { SetSubscribedRequest set_subscribed = 8; UpdateLocalMetadataRequest update_local_metadata = 9; UpdateLocalNameRequest update_local_name = 10; + GetSessionStatsRequest get_session_stats = 11; // Track - CreateVideoTrackRequest create_video_track = 11; - CreateAudioTrackRequest create_audio_track = 12; - GetStatsRequest get_stats = 13; + CreateVideoTrackRequest create_video_track = 12; + CreateAudioTrackRequest create_audio_track = 13; + GetStatsRequest get_stats = 14; // Video - AllocVideoBufferRequest alloc_video_buffer = 14; - NewVideoStreamRequest new_video_stream = 15; - NewVideoSourceRequest new_video_source = 16; - CaptureVideoFrameRequest capture_video_frame = 17; - ToI420Request to_i420 = 18; - ToArgbRequest to_argb = 19; + AllocVideoBufferRequest alloc_video_buffer = 15; + NewVideoStreamRequest new_video_stream = 16; + NewVideoSourceRequest new_video_source = 17; + CaptureVideoFrameRequest capture_video_frame = 18; + ToI420Request to_i420 = 19; + ToArgbRequest to_argb = 20; // Audio - AllocAudioBufferRequest alloc_audio_buffer = 20; - NewAudioStreamRequest new_audio_stream = 21; - NewAudioSourceRequest new_audio_source = 22; - CaptureAudioFrameRequest capture_audio_frame = 23; - NewAudioResamplerRequest new_audio_resampler = 24; - RemixAndResampleRequest remix_and_resample = 25; - E2eeRequest e2ee = 26; + AllocAudioBufferRequest alloc_audio_buffer = 21; + NewAudioStreamRequest new_audio_stream = 22; + NewAudioSourceRequest new_audio_source = 23; + CaptureAudioFrameRequest capture_audio_frame = 24; + NewAudioResamplerRequest new_audio_resampler = 25; + RemixAndResampleRequest remix_and_resample = 26; + E2eeRequest e2ee = 27; } } @@ -104,28 +105,29 @@ message FfiResponse { SetSubscribedResponse set_subscribed = 8; UpdateLocalMetadataResponse update_local_metadata = 9; UpdateLocalNameResponse update_local_name = 10; + GetSessionStatsResponse get_session_stats = 11; // Track - CreateVideoTrackResponse create_video_track = 11; - CreateAudioTrackResponse create_audio_track = 12; - GetStatsResponse get_stats = 13; + CreateVideoTrackResponse create_video_track = 12; + CreateAudioTrackResponse create_audio_track = 13; + GetStatsResponse get_stats = 14; // Video - AllocVideoBufferResponse alloc_video_buffer = 14; - NewVideoStreamResponse new_video_stream = 15; - NewVideoSourceResponse new_video_source = 16; - CaptureVideoFrameResponse capture_video_frame = 17; - ToI420Response to_i420 = 18; - ToArgbResponse to_argb = 19; + AllocVideoBufferResponse alloc_video_buffer = 15; + NewVideoStreamResponse new_video_stream = 16; + NewVideoSourceResponse new_video_source = 17; + CaptureVideoFrameResponse capture_video_frame = 18; + ToI420Response to_i420 = 19; + ToArgbResponse to_argb = 20; // Audio - AllocAudioBufferResponse alloc_audio_buffer = 20; - NewAudioStreamResponse new_audio_stream = 21; - NewAudioSourceResponse new_audio_source = 22; - CaptureAudioFrameResponse capture_audio_frame = 23; - NewAudioResamplerResponse new_audio_resampler = 24; - RemixAndResampleResponse remix_and_resample = 25; - E2eeResponse e2ee = 26; + AllocAudioBufferResponse alloc_audio_buffer = 21; + NewAudioStreamResponse new_audio_stream = 22; + NewAudioSourceResponse new_audio_source = 23; + CaptureAudioFrameResponse capture_audio_frame = 24; + NewAudioResamplerResponse new_audio_resampler = 25; + RemixAndResampleResponse remix_and_resample = 26; + E2eeResponse e2ee = 27; } } @@ -149,6 +151,7 @@ message FfiEvent { UpdateLocalNameCallback update_local_name = 13; GetStatsCallback get_stats = 14; LogBatch logs = 15; + GetSessionStatsCallback get_session_stats = 16; } } diff --git a/livekit-ffi/protocol/room.proto b/livekit-ffi/protocol/room.proto index 64f61444..1d201c05 100644 --- a/livekit-ffi/protocol/room.proto +++ b/livekit-ffi/protocol/room.proto @@ -22,6 +22,7 @@ import "handle.proto"; import "participant.proto"; import "track.proto"; import "video_frame.proto"; +import "stats.proto"; // Connect to a new LiveKit room message ConnectRequest { @@ -130,6 +131,19 @@ message SetSubscribedRequest { } message SetSubscribedResponse {} +message GetSessionStatsRequest { + uint64 room_handle = 1; +} +message GetSessionStatsResponse { + uint64 async_id = 1; +} +message GetSessionStatsCallback { + uint64 async_id = 1; + optional string error = 2; + repeated RtcStats publisher_stats = 3; + repeated RtcStats subscriber_stats = 4; +} + // // Options diff --git a/livekit-ffi/src/livekit.proto.rs b/livekit-ffi/src/livekit.proto.rs index 4dcc9729..548d05b8 100644 --- a/livekit-ffi/src/livekit.proto.rs +++ b/livekit-ffi/src/livekit.proto.rs @@ -2232,6 +2232,30 @@ pub struct SetSubscribedRequest { #[derive(Clone, PartialEq, ::prost::Message)] pub struct SetSubscribedResponse { } +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetSessionStatsRequest { + #[prost(uint64, tag="1")] + pub room_handle: u64, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetSessionStatsResponse { + #[prost(uint64, tag="1")] + pub async_id: u64, +} +#[allow(clippy::derive_partial_eq_without_eq)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetSessionStatsCallback { + #[prost(uint64, tag="1")] + pub async_id: u64, + #[prost(string, optional, tag="2")] + pub error: ::core::option::Option<::prost::alloc::string::String>, + #[prost(message, repeated, tag="3")] + pub publisher_stats: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag="4")] + pub subscriber_stats: ::prost::alloc::vec::Vec, +} // // Options // @@ -3022,7 +3046,7 @@ impl AudioSourceType { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiRequest { - #[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26")] + #[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiRequest`. @@ -3049,40 +3073,42 @@ pub mod ffi_request { UpdateLocalMetadata(super::UpdateLocalMetadataRequest), #[prost(message, tag="10")] UpdateLocalName(super::UpdateLocalNameRequest), - /// Track #[prost(message, tag="11")] - CreateVideoTrack(super::CreateVideoTrackRequest), + GetSessionStats(super::GetSessionStatsRequest), + /// Track #[prost(message, tag="12")] - CreateAudioTrack(super::CreateAudioTrackRequest), + CreateVideoTrack(super::CreateVideoTrackRequest), #[prost(message, tag="13")] + CreateAudioTrack(super::CreateAudioTrackRequest), + #[prost(message, tag="14")] GetStats(super::GetStatsRequest), /// Video - #[prost(message, tag="14")] - AllocVideoBuffer(super::AllocVideoBufferRequest), #[prost(message, tag="15")] - NewVideoStream(super::NewVideoStreamRequest), + AllocVideoBuffer(super::AllocVideoBufferRequest), #[prost(message, tag="16")] - NewVideoSource(super::NewVideoSourceRequest), + NewVideoStream(super::NewVideoStreamRequest), #[prost(message, tag="17")] - CaptureVideoFrame(super::CaptureVideoFrameRequest), + NewVideoSource(super::NewVideoSourceRequest), #[prost(message, tag="18")] - ToI420(super::ToI420Request), + CaptureVideoFrame(super::CaptureVideoFrameRequest), #[prost(message, tag="19")] + ToI420(super::ToI420Request), + #[prost(message, tag="20")] ToArgb(super::ToArgbRequest), /// Audio - #[prost(message, tag="20")] - AllocAudioBuffer(super::AllocAudioBufferRequest), #[prost(message, tag="21")] - NewAudioStream(super::NewAudioStreamRequest), + AllocAudioBuffer(super::AllocAudioBufferRequest), #[prost(message, tag="22")] - NewAudioSource(super::NewAudioSourceRequest), + NewAudioStream(super::NewAudioStreamRequest), #[prost(message, tag="23")] - CaptureAudioFrame(super::CaptureAudioFrameRequest), + NewAudioSource(super::NewAudioSourceRequest), #[prost(message, tag="24")] - NewAudioResampler(super::NewAudioResamplerRequest), + CaptureAudioFrame(super::CaptureAudioFrameRequest), #[prost(message, tag="25")] - RemixAndResample(super::RemixAndResampleRequest), + NewAudioResampler(super::NewAudioResamplerRequest), #[prost(message, tag="26")] + RemixAndResample(super::RemixAndResampleRequest), + #[prost(message, tag="27")] E2ee(super::E2eeRequest), } } @@ -3090,7 +3116,7 @@ pub mod ffi_request { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiResponse { - #[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26")] + #[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiResponse`. @@ -3117,40 +3143,42 @@ pub mod ffi_response { UpdateLocalMetadata(super::UpdateLocalMetadataResponse), #[prost(message, tag="10")] UpdateLocalName(super::UpdateLocalNameResponse), - /// Track #[prost(message, tag="11")] - CreateVideoTrack(super::CreateVideoTrackResponse), + GetSessionStats(super::GetSessionStatsResponse), + /// Track #[prost(message, tag="12")] - CreateAudioTrack(super::CreateAudioTrackResponse), + CreateVideoTrack(super::CreateVideoTrackResponse), #[prost(message, tag="13")] + CreateAudioTrack(super::CreateAudioTrackResponse), + #[prost(message, tag="14")] GetStats(super::GetStatsResponse), /// Video - #[prost(message, tag="14")] - AllocVideoBuffer(super::AllocVideoBufferResponse), #[prost(message, tag="15")] - NewVideoStream(super::NewVideoStreamResponse), + AllocVideoBuffer(super::AllocVideoBufferResponse), #[prost(message, tag="16")] - NewVideoSource(super::NewVideoSourceResponse), + NewVideoStream(super::NewVideoStreamResponse), #[prost(message, tag="17")] - CaptureVideoFrame(super::CaptureVideoFrameResponse), + NewVideoSource(super::NewVideoSourceResponse), #[prost(message, tag="18")] - ToI420(super::ToI420Response), + CaptureVideoFrame(super::CaptureVideoFrameResponse), #[prost(message, tag="19")] + ToI420(super::ToI420Response), + #[prost(message, tag="20")] ToArgb(super::ToArgbResponse), /// Audio - #[prost(message, tag="20")] - AllocAudioBuffer(super::AllocAudioBufferResponse), #[prost(message, tag="21")] - NewAudioStream(super::NewAudioStreamResponse), + AllocAudioBuffer(super::AllocAudioBufferResponse), #[prost(message, tag="22")] - NewAudioSource(super::NewAudioSourceResponse), + NewAudioStream(super::NewAudioStreamResponse), #[prost(message, tag="23")] - CaptureAudioFrame(super::CaptureAudioFrameResponse), + NewAudioSource(super::NewAudioSourceResponse), #[prost(message, tag="24")] - NewAudioResampler(super::NewAudioResamplerResponse), + CaptureAudioFrame(super::CaptureAudioFrameResponse), #[prost(message, tag="25")] - RemixAndResample(super::RemixAndResampleResponse), + NewAudioResampler(super::NewAudioResamplerResponse), #[prost(message, tag="26")] + RemixAndResample(super::RemixAndResampleResponse), + #[prost(message, tag="27")] E2ee(super::E2eeResponse), } } @@ -3160,7 +3188,7 @@ pub mod ffi_response { #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct FfiEvent { - #[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15")] + #[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16")] pub message: ::core::option::Option, } /// Nested message and enum types in `FfiEvent`. @@ -3198,6 +3226,8 @@ pub mod ffi_event { GetStats(super::GetStatsCallback), #[prost(message, tag="15")] Logs(super::LogBatch), + #[prost(message, tag="16")] + GetSessionStats(super::GetSessionStatsCallback), } } /// Stop all rooms synchronously (Do we need async here?). diff --git a/livekit-ffi/src/server/requests.rs b/livekit-ffi/src/server/requests.rs index d9e93b2e..fe0905dc 100644 --- a/livekit-ffi/src/server/requests.rs +++ b/livekit-ffi/src/server/requests.rs @@ -736,6 +736,54 @@ fn on_e2ee_request( Ok(proto::E2eeResponse { message: Some(msg) }) } +fn on_get_session_stats( + server: &'static FfiServer, + get_session_stats: proto::GetSessionStatsRequest, +) -> FfiResult { + let ffi_room = server + .retrieve_handle::(get_session_stats.room_handle)? + .clone(); + let async_id = server.next_id(); + + server.async_runtime.spawn(async move { + match ffi_room.inner.room.get_stats().await { + Ok(stats) => { + let _ = server + .send_event(proto::ffi_event::Message::GetSessionStats( + proto::GetSessionStatsCallback { + async_id, + error: None, + publisher_stats: stats + .publisher_stats + .into_iter() + .map(Into::into) + .collect(), + subscriber_stats: stats + .subscriber_stats + .into_iter() + .map(Into::into) + .collect(), + }, + )) + .await; + } + Err(err) => { + let _ = server + .send_event(proto::ffi_event::Message::GetSessionStats( + proto::GetSessionStatsCallback { + async_id, + error: Some(err.to_string()), + ..Default::default() + }, + )) + .await; + } + } + }); + + Ok(proto::GetSessionStatsResponse { async_id }) +} + #[allow(clippy::field_reassign_with_default)] // Avoid uggly format pub fn handle_request( server: &'static FfiServer, @@ -823,6 +871,12 @@ pub fn handle_request( proto::ffi_request::Message::E2ee(e2ee) => { proto::ffi_response::Message::E2ee(on_e2ee_request(server, e2ee)?) } + proto::ffi_request::Message::GetSessionStats(get_session_stats) => { + proto::ffi_response::Message::GetSessionStats(on_get_session_stats( + server, + get_session_stats, + )?) + } }); Ok(res) diff --git a/livekit/src/room/mod.rs b/livekit/src/room/mod.rs index a120de50..28ed2e8c 100644 --- a/livekit/src/room/mod.rs +++ b/livekit/src/room/mod.rs @@ -16,7 +16,7 @@ use self::e2ee::manager::E2eeManager; use self::e2ee::E2eeOptions; use crate::participant::ConnectionQuality; use crate::prelude::*; -use crate::rtc_engine::{EngineError, EngineOptions}; +use crate::rtc_engine::{EngineError, EngineOptions, SessionStats}; use crate::rtc_engine::{EngineEvent, EngineEvents, EngineResult, RtcEngine}; use libwebrtc::native::frame_cryptor::EncryptionState; use libwebrtc::prelude::{ @@ -451,6 +451,10 @@ impl Room { self.inner.rtc_engine.simulate_scenario(scenario).await } + pub async fn get_stats(&self) -> EngineResult { + self.inner.rtc_engine.get_stats().await + } + pub fn subscribe(&self) -> mpsc::UnboundedReceiver { self.inner.dispatcher.register() } diff --git a/livekit/src/rtc_engine/mod.rs b/livekit/src/rtc_engine/mod.rs index d48a39fe..4bdf52f2 100644 --- a/livekit/src/rtc_engine/mod.rs +++ b/livekit/src/rtc_engine/mod.rs @@ -35,6 +35,8 @@ use tokio::sync::{RwLock as AsyncRwLock, RwLockReadGuard as AsyncRwLockReadGuard use tokio::task::JoinHandle; use tokio::time::{interval, Interval}; +pub use self::rtc_session::SessionStats; + pub mod lk_runtime; mod peer_transport; mod rtc_events; @@ -248,6 +250,11 @@ impl RtcEngine { // on fail } + pub async fn get_stats(&self) -> EngineResult { + let session = self.inner.running_handle.read().session.clone(); + session.get_stats().await + } + pub fn session(&self) -> Arc { self.inner.running_handle.read().session.clone() } diff --git a/livekit/src/rtc_engine/rtc_session.rs b/livekit/src/rtc_engine/rtc_session.rs index f45ed028..628f4325 100644 --- a/livekit/src/rtc_engine/rtc_session.rs +++ b/livekit/src/rtc_engine/rtc_session.rs @@ -23,6 +23,7 @@ use crate::rtc_engine::rtc_events::{RtcEvent, RtcEvents}; use crate::track::LocalTrack; use crate::DataPacketKind; use libwebrtc::prelude::*; +use libwebrtc::stats::RtcStats; use livekit_api::signal_client::{SignalClient, SignalEvent, SignalEvents}; use livekit_protocol as proto; use parking_lot::Mutex; @@ -50,6 +51,12 @@ pub const PUBLISHER_NEGOTIATION_FREQUENCY: Duration = Duration::from_millis(150) pub type SessionEmitter = mpsc::UnboundedSender; pub type SessionEvents = mpsc::UnboundedReceiver; +#[derive(Debug, Clone)] +pub struct SessionStats { + pub publisher_stats: Vec, + pub subscriber_stats: Vec, +} + #[derive(Debug)] pub enum SessionEvent { ParticipantUpdate { @@ -227,14 +234,18 @@ impl RtcSession { self.inner.has_published.load(Ordering::Acquire) } - pub async fn add_track(&self, req: proto::AddTrackRequest) -> EngineResult { - self.inner.add_track(req).await - } - pub fn remove_track(&self, sender: RtpSender) -> EngineResult<()> { self.inner.remove_track(sender) } + pub fn publisher_negotiation_needed(&self) { + self.inner.publisher_negotiation_needed() + } + + pub async fn add_track(&self, req: proto::AddTrackRequest) -> EngineResult { + self.inner.add_track(req).await + } + pub async fn create_sender( &self, track: LocalTrack, @@ -244,10 +255,6 @@ impl RtcSession { self.inner.create_sender(track, options, encodings).await } - pub fn publisher_negotiation_needed(&self) { - self.inner.publisher_negotiation_needed() - } - /// Close the PeerConnections and the SignalClient pub async fn close(&self) { // Close the tasks @@ -287,6 +294,27 @@ impl RtcSession { self.inner.simulate_scenario(scenario).await } + pub async fn get_stats(&self) -> EngineResult { + let publisher_stats = self + .inner + .publisher_pc + .peer_connection() + .get_stats() + .await?; + + let subscriber_stats = self + .inner + .subscriber_pc + .peer_connection() + .get_stats() + .await?; + + Ok(SessionStats { + publisher_stats, + subscriber_stats, + }) + } + pub fn publisher(&self) -> &PeerTransport { &self.inner.publisher_pc }