Skip to content

Commit

Permalink
feat: session stats (#266)
Browse files Browse the repository at this point in the history
  • Loading branch information
theomonnom authored Dec 9, 2023
1 parent 74904a4 commit b7d4c88
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 76 deletions.
67 changes: 35 additions & 32 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,29 @@ message FfiRequest {
SetSubscribedRequest set_subscribed = 8;
UpdateLocalMetadataRequest update_local_metadata = 9;
UpdateLocalNameRequest update_local_name = 10;
GetSessionStatsRequest get_session_stats = 11;

// Track
CreateVideoTrackRequest create_video_track = 11;
CreateAudioTrackRequest create_audio_track = 12;
GetStatsRequest get_stats = 13;
CreateVideoTrackRequest create_video_track = 12;
CreateAudioTrackRequest create_audio_track = 13;
GetStatsRequest get_stats = 14;

// Video
AllocVideoBufferRequest alloc_video_buffer = 14;
NewVideoStreamRequest new_video_stream = 15;
NewVideoSourceRequest new_video_source = 16;
CaptureVideoFrameRequest capture_video_frame = 17;
ToI420Request to_i420 = 18;
ToArgbRequest to_argb = 19;
AllocVideoBufferRequest alloc_video_buffer = 15;
NewVideoStreamRequest new_video_stream = 16;
NewVideoSourceRequest new_video_source = 17;
CaptureVideoFrameRequest capture_video_frame = 18;
ToI420Request to_i420 = 19;
ToArgbRequest to_argb = 20;

// Audio
AllocAudioBufferRequest alloc_audio_buffer = 20;
NewAudioStreamRequest new_audio_stream = 21;
NewAudioSourceRequest new_audio_source = 22;
CaptureAudioFrameRequest capture_audio_frame = 23;
NewAudioResamplerRequest new_audio_resampler = 24;
RemixAndResampleRequest remix_and_resample = 25;
E2eeRequest e2ee = 26;
AllocAudioBufferRequest alloc_audio_buffer = 21;
NewAudioStreamRequest new_audio_stream = 22;
NewAudioSourceRequest new_audio_source = 23;
CaptureAudioFrameRequest capture_audio_frame = 24;
NewAudioResamplerRequest new_audio_resampler = 25;
RemixAndResampleRequest remix_and_resample = 26;
E2eeRequest e2ee = 27;
}
}

Expand All @@ -104,28 +105,29 @@ message FfiResponse {
SetSubscribedResponse set_subscribed = 8;
UpdateLocalMetadataResponse update_local_metadata = 9;
UpdateLocalNameResponse update_local_name = 10;
GetSessionStatsResponse get_session_stats = 11;

// Track
CreateVideoTrackResponse create_video_track = 11;
CreateAudioTrackResponse create_audio_track = 12;
GetStatsResponse get_stats = 13;
CreateVideoTrackResponse create_video_track = 12;
CreateAudioTrackResponse create_audio_track = 13;
GetStatsResponse get_stats = 14;

// Video
AllocVideoBufferResponse alloc_video_buffer = 14;
NewVideoStreamResponse new_video_stream = 15;
NewVideoSourceResponse new_video_source = 16;
CaptureVideoFrameResponse capture_video_frame = 17;
ToI420Response to_i420 = 18;
ToArgbResponse to_argb = 19;
AllocVideoBufferResponse alloc_video_buffer = 15;
NewVideoStreamResponse new_video_stream = 16;
NewVideoSourceResponse new_video_source = 17;
CaptureVideoFrameResponse capture_video_frame = 18;
ToI420Response to_i420 = 19;
ToArgbResponse to_argb = 20;

// Audio
AllocAudioBufferResponse alloc_audio_buffer = 20;
NewAudioStreamResponse new_audio_stream = 21;
NewAudioSourceResponse new_audio_source = 22;
CaptureAudioFrameResponse capture_audio_frame = 23;
NewAudioResamplerResponse new_audio_resampler = 24;
RemixAndResampleResponse remix_and_resample = 25;
E2eeResponse e2ee = 26;
AllocAudioBufferResponse alloc_audio_buffer = 21;
NewAudioStreamResponse new_audio_stream = 22;
NewAudioSourceResponse new_audio_source = 23;
CaptureAudioFrameResponse capture_audio_frame = 24;
NewAudioResamplerResponse new_audio_resampler = 25;
RemixAndResampleResponse remix_and_resample = 26;
E2eeResponse e2ee = 27;
}
}

Expand All @@ -149,6 +151,7 @@ message FfiEvent {
UpdateLocalNameCallback update_local_name = 13;
GetStatsCallback get_stats = 14;
LogBatch logs = 15;
GetSessionStatsCallback get_session_stats = 16;
}
}

Expand Down
14 changes: 14 additions & 0 deletions livekit-ffi/protocol/room.proto
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import "handle.proto";
import "participant.proto";
import "track.proto";
import "video_frame.proto";
import "stats.proto";

// Connect to a new LiveKit room
message ConnectRequest {
Expand Down Expand Up @@ -130,6 +131,19 @@ message SetSubscribedRequest {
}
message SetSubscribedResponse {}

message GetSessionStatsRequest {
uint64 room_handle = 1;
}
message GetSessionStatsResponse {
uint64 async_id = 1;
}
message GetSessionStatsCallback {
uint64 async_id = 1;
optional string error = 2;
repeated RtcStats publisher_stats = 3;
repeated RtcStats subscriber_stats = 4;
}


//
// Options
Expand Down
100 changes: 65 additions & 35 deletions livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2232,6 +2232,30 @@ pub struct SetSubscribedRequest {
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct SetSubscribedResponse {
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetSessionStatsRequest {
#[prost(uint64, tag="1")]
pub room_handle: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetSessionStatsResponse {
#[prost(uint64, tag="1")]
pub async_id: u64,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct GetSessionStatsCallback {
#[prost(uint64, tag="1")]
pub async_id: u64,
#[prost(string, optional, tag="2")]
pub error: ::core::option::Option<::prost::alloc::string::String>,
#[prost(message, repeated, tag="3")]
pub publisher_stats: ::prost::alloc::vec::Vec<RtcStats>,
#[prost(message, repeated, tag="4")]
pub subscriber_stats: ::prost::alloc::vec::Vec<RtcStats>,
}
//
// Options
//
Expand Down Expand Up @@ -3022,7 +3046,7 @@ impl AudioSourceType {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiRequest {
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26")]
#[prost(oneof="ffi_request::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27")]
pub message: ::core::option::Option<ffi_request::Message>,
}
/// Nested message and enum types in `FfiRequest`.
Expand All @@ -3049,48 +3073,50 @@ pub mod ffi_request {
UpdateLocalMetadata(super::UpdateLocalMetadataRequest),
#[prost(message, tag="10")]
UpdateLocalName(super::UpdateLocalNameRequest),
/// Track
#[prost(message, tag="11")]
CreateVideoTrack(super::CreateVideoTrackRequest),
GetSessionStats(super::GetSessionStatsRequest),
/// Track
#[prost(message, tag="12")]
CreateAudioTrack(super::CreateAudioTrackRequest),
CreateVideoTrack(super::CreateVideoTrackRequest),
#[prost(message, tag="13")]
CreateAudioTrack(super::CreateAudioTrackRequest),
#[prost(message, tag="14")]
GetStats(super::GetStatsRequest),
/// Video
#[prost(message, tag="14")]
AllocVideoBuffer(super::AllocVideoBufferRequest),
#[prost(message, tag="15")]
NewVideoStream(super::NewVideoStreamRequest),
AllocVideoBuffer(super::AllocVideoBufferRequest),
#[prost(message, tag="16")]
NewVideoSource(super::NewVideoSourceRequest),
NewVideoStream(super::NewVideoStreamRequest),
#[prost(message, tag="17")]
CaptureVideoFrame(super::CaptureVideoFrameRequest),
NewVideoSource(super::NewVideoSourceRequest),
#[prost(message, tag="18")]
ToI420(super::ToI420Request),
CaptureVideoFrame(super::CaptureVideoFrameRequest),
#[prost(message, tag="19")]
ToI420(super::ToI420Request),
#[prost(message, tag="20")]
ToArgb(super::ToArgbRequest),
/// Audio
#[prost(message, tag="20")]
AllocAudioBuffer(super::AllocAudioBufferRequest),
#[prost(message, tag="21")]
NewAudioStream(super::NewAudioStreamRequest),
AllocAudioBuffer(super::AllocAudioBufferRequest),
#[prost(message, tag="22")]
NewAudioSource(super::NewAudioSourceRequest),
NewAudioStream(super::NewAudioStreamRequest),
#[prost(message, tag="23")]
CaptureAudioFrame(super::CaptureAudioFrameRequest),
NewAudioSource(super::NewAudioSourceRequest),
#[prost(message, tag="24")]
NewAudioResampler(super::NewAudioResamplerRequest),
CaptureAudioFrame(super::CaptureAudioFrameRequest),
#[prost(message, tag="25")]
RemixAndResample(super::RemixAndResampleRequest),
NewAudioResampler(super::NewAudioResamplerRequest),
#[prost(message, tag="26")]
RemixAndResample(super::RemixAndResampleRequest),
#[prost(message, tag="27")]
E2ee(super::E2eeRequest),
}
}
/// This is the output of livekit_ffi_request function.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiResponse {
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26")]
#[prost(oneof="ffi_response::Message", tags="2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27")]
pub message: ::core::option::Option<ffi_response::Message>,
}
/// Nested message and enum types in `FfiResponse`.
Expand All @@ -3117,40 +3143,42 @@ pub mod ffi_response {
UpdateLocalMetadata(super::UpdateLocalMetadataResponse),
#[prost(message, tag="10")]
UpdateLocalName(super::UpdateLocalNameResponse),
/// Track
#[prost(message, tag="11")]
CreateVideoTrack(super::CreateVideoTrackResponse),
GetSessionStats(super::GetSessionStatsResponse),
/// Track
#[prost(message, tag="12")]
CreateAudioTrack(super::CreateAudioTrackResponse),
CreateVideoTrack(super::CreateVideoTrackResponse),
#[prost(message, tag="13")]
CreateAudioTrack(super::CreateAudioTrackResponse),
#[prost(message, tag="14")]
GetStats(super::GetStatsResponse),
/// Video
#[prost(message, tag="14")]
AllocVideoBuffer(super::AllocVideoBufferResponse),
#[prost(message, tag="15")]
NewVideoStream(super::NewVideoStreamResponse),
AllocVideoBuffer(super::AllocVideoBufferResponse),
#[prost(message, tag="16")]
NewVideoSource(super::NewVideoSourceResponse),
NewVideoStream(super::NewVideoStreamResponse),
#[prost(message, tag="17")]
CaptureVideoFrame(super::CaptureVideoFrameResponse),
NewVideoSource(super::NewVideoSourceResponse),
#[prost(message, tag="18")]
ToI420(super::ToI420Response),
CaptureVideoFrame(super::CaptureVideoFrameResponse),
#[prost(message, tag="19")]
ToI420(super::ToI420Response),
#[prost(message, tag="20")]
ToArgb(super::ToArgbResponse),
/// Audio
#[prost(message, tag="20")]
AllocAudioBuffer(super::AllocAudioBufferResponse),
#[prost(message, tag="21")]
NewAudioStream(super::NewAudioStreamResponse),
AllocAudioBuffer(super::AllocAudioBufferResponse),
#[prost(message, tag="22")]
NewAudioSource(super::NewAudioSourceResponse),
NewAudioStream(super::NewAudioStreamResponse),
#[prost(message, tag="23")]
CaptureAudioFrame(super::CaptureAudioFrameResponse),
NewAudioSource(super::NewAudioSourceResponse),
#[prost(message, tag="24")]
NewAudioResampler(super::NewAudioResamplerResponse),
CaptureAudioFrame(super::CaptureAudioFrameResponse),
#[prost(message, tag="25")]
RemixAndResample(super::RemixAndResampleResponse),
NewAudioResampler(super::NewAudioResamplerResponse),
#[prost(message, tag="26")]
RemixAndResample(super::RemixAndResampleResponse),
#[prost(message, tag="27")]
E2ee(super::E2eeResponse),
}
}
Expand All @@ -3160,7 +3188,7 @@ pub mod ffi_response {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiEvent {
#[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15")]
#[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16")]
pub message: ::core::option::Option<ffi_event::Message>,
}
/// Nested message and enum types in `FfiEvent`.
Expand Down Expand Up @@ -3198,6 +3226,8 @@ pub mod ffi_event {
GetStats(super::GetStatsCallback),
#[prost(message, tag="15")]
Logs(super::LogBatch),
#[prost(message, tag="16")]
GetSessionStats(super::GetSessionStatsCallback),
}
}
/// Stop all rooms synchronously (Do we need async here?).
Expand Down
54 changes: 54 additions & 0 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,54 @@ fn on_e2ee_request(
Ok(proto::E2eeResponse { message: Some(msg) })
}

fn on_get_session_stats(
server: &'static FfiServer,
get_session_stats: proto::GetSessionStatsRequest,
) -> FfiResult<proto::GetSessionStatsResponse> {
let ffi_room = server
.retrieve_handle::<room::FfiRoom>(get_session_stats.room_handle)?
.clone();
let async_id = server.next_id();

server.async_runtime.spawn(async move {
match ffi_room.inner.room.get_stats().await {
Ok(stats) => {
let _ = server
.send_event(proto::ffi_event::Message::GetSessionStats(
proto::GetSessionStatsCallback {
async_id,
error: None,
publisher_stats: stats
.publisher_stats
.into_iter()
.map(Into::into)
.collect(),
subscriber_stats: stats
.subscriber_stats
.into_iter()
.map(Into::into)
.collect(),
},
))
.await;
}
Err(err) => {
let _ = server
.send_event(proto::ffi_event::Message::GetSessionStats(
proto::GetSessionStatsCallback {
async_id,
error: Some(err.to_string()),
..Default::default()
},
))
.await;
}
}
});

Ok(proto::GetSessionStatsResponse { async_id })
}

#[allow(clippy::field_reassign_with_default)] // Avoid uggly format
pub fn handle_request(
server: &'static FfiServer,
Expand Down Expand Up @@ -823,6 +871,12 @@ pub fn handle_request(
proto::ffi_request::Message::E2ee(e2ee) => {
proto::ffi_response::Message::E2ee(on_e2ee_request(server, e2ee)?)
}
proto::ffi_request::Message::GetSessionStats(get_session_stats) => {
proto::ffi_response::Message::GetSessionStats(on_get_session_stats(
server,
get_session_stats,
)?)
}
});

Ok(res)
Expand Down
Loading

0 comments on commit b7d4c88

Please sign in to comment.