Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

watch panic #305

Merged
merged 1 commit into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions livekit-ffi/protocol/ffi.proto
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ message FfiEvent {
GetStatsCallback get_stats = 14;
LogBatch logs = 15;
GetSessionStatsCallback get_session_stats = 16;
Panic panic = 17;
}
}

Expand Down Expand Up @@ -184,5 +185,11 @@ message LogBatch {
repeated LogRecord records = 1;
}

message Panic {
string message = 1;
}

// TODO(theomonnom): Debug messages (Print handles).



62 changes: 37 additions & 25 deletions livekit-ffi/src/cabi.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
use std::sync::Arc;

use prost::Message;
use server::FfiDataBuffer;
use std::{panic, sync::Arc};

use crate::{
proto,
server::{self, FfiConfig},
FfiHandleId, FFI_SERVER, INVALID_HANDLE,
FfiError, FfiHandleId, FFI_SERVER, INVALID_HANDLE,
};

/// # SAFTEY: The "C" callback must be threadsafe and not block
Expand Down Expand Up @@ -38,34 +37,47 @@ pub unsafe extern "C" fn livekit_ffi_request(
res_ptr: *mut *const u8,
res_len: *mut usize,
) -> FfiHandleId {
let data = unsafe { std::slice::from_raw_parts(data, len) };
let req = match proto::FfiRequest::decode(data) {
Ok(req) => req,
Err(err) => {
log::error!("failed to decode request: {:?}", err);
return INVALID_HANDLE;
let res = panic::catch_unwind(|| {
let data = unsafe { std::slice::from_raw_parts(data, len) };
let req = match proto::FfiRequest::decode(data) {
Ok(req) => req,
Err(err) => {
log::error!("failed to decode request: {:?}", err);
return INVALID_HANDLE;
}
};

let res = match server::requests::handle_request(&FFI_SERVER, req.clone()) {
Ok(res) => res,
Err(err) => {
log::error!("failed to handle request {:?}: {:?}", req, err);
return INVALID_HANDLE;
}
}
};
.encode_to_vec();

let res = match server::requests::handle_request(&FFI_SERVER, req.clone()) {
Ok(res) => res,
Err(err) => {
log::error!("failed to handle request {:?}: {:?}", req, err);
return INVALID_HANDLE;
unsafe {
*res_ptr = res.as_ptr();
*res_len = res.len();
}
}
.encode_to_vec();

unsafe {
*res_ptr = res.as_ptr();
*res_len = res.len();
}
let handle_id = FFI_SERVER.next_id();
let ffi_data = FfiDataBuffer { handle: handle_id, data: Arc::new(res) };

let handle_id = FFI_SERVER.next_id();
let ffi_data = FfiDataBuffer { handle: handle_id, data: Arc::new(res) };
FFI_SERVER.store_handle(handle_id, ffi_data);
handle_id
});

FFI_SERVER.store_handle(handle_id, ffi_data);
handle_id
match res {
Ok(handle_id) => handle_id,
Err(err) => {
log::error!("panic while handling request: {:?}", err);
FFI_SERVER.send_panic(Box::new(FfiError::InvalidRequest(
"panic while handling request".into(),
)));
INVALID_HANDLE
}
}
}

#[no_mangle]
Expand Down
10 changes: 9 additions & 1 deletion livekit-ffi/src/livekit.proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3023,7 +3023,7 @@ pub mod ffi_response {
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct FfiEvent {
#[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16")]
#[prost(oneof="ffi_event::Message", tags="1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17")]
pub message: ::core::option::Option<ffi_event::Message>,
}
/// Nested message and enum types in `FfiEvent`.
Expand Down Expand Up @@ -3063,6 +3063,8 @@ pub mod ffi_event {
Logs(super::LogBatch),
#[prost(message, tag="16")]
GetSessionStats(super::GetSessionStatsCallback),
#[prost(message, tag="17")]
Panic(super::Panic),
}
}
/// Stop all rooms synchronously (Do we need async here?).
Expand Down Expand Up @@ -3110,6 +3112,12 @@ pub struct LogBatch {
#[prost(message, repeated, tag="1")]
pub records: ::prost::alloc::vec::Vec<LogRecord>,
}
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Panic {
#[prost(string, tag="1")]
pub message: ::prost::alloc::string::String,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)]
#[repr(i32)]
pub enum LogLevel {
Expand Down
3 changes: 2 additions & 1 deletion livekit-ffi/src/server/audio_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl FfiAudioSource {
}
.to_vec();

server.async_runtime.spawn(async move {
let handle = server.async_runtime.spawn(async move {
// The data must be available as long as the client receive the callback.
match source {
#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -104,6 +104,7 @@ impl FfiAudioSource {
_ => {}
}
});
server.watch_panic(handle);

Ok(proto::CaptureAudioFrameResponse { async_id })
}
Expand Down
3 changes: 2 additions & 1 deletion livekit-ffi/src/server/audio_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,13 @@ impl FfiAudioStream {
let audio_stream = Self { handle_id, stream_type, close_tx };

let native_stream = NativeAudioStream::new(rtc_track);
server.async_runtime.spawn(Self::native_audio_stream_task(
let handle = server.async_runtime.spawn(Self::native_audio_stream_task(
server,
handle_id,
native_stream,
close_rx,
));
server.watch_panic(handle);
Ok::<FfiAudioStream, FfiError>(audio_stream)
}
_ => return Err(FfiError::InvalidRequest("unsupported audio stream type".into())),
Expand Down
31 changes: 31 additions & 0 deletions livekit-ffi/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::{
error::Error,
sync::{
atomic::{AtomicU64, Ordering},
Arc,
Expand All @@ -23,8 +24,10 @@ use std::{

use dashmap::{mapref::one::MappedRef, DashMap};
use downcast_rs::{impl_downcast, Downcast};
use futures_util::Future;
use livekit::webrtc::{native::audio_resampler::AudioResampler, prelude::*};
use parking_lot::{deadlock, Mutex};
use tokio::task::JoinHandle;

use crate::{proto, proto::FfiEvent, FfiError, FfiHandleId, FfiResult, INVALID_HANDLE};

Expand Down Expand Up @@ -202,4 +205,32 @@ impl FfiServer {
pub fn drop_handle(&self, id: FfiHandleId) -> bool {
self.ffi_handles.remove(&id).is_some()
}

pub fn send_panic(&self, err: Box<dyn Error>) {
// Ok to block here, we're panicking anyway
// Mb send_event can now be sync since we're more confident about
// the callback function not blocking on Python
let _ = self.async_runtime.block_on(self.send_event(proto::ffi_event::Message::Panic(
proto::Panic { message: err.as_ref().to_string() },
)));
}

pub fn watch_panic<O>(&'static self, handle: JoinHandle<O>) -> JoinHandle<O>
where
O: Send + 'static,
{
let handle = self.async_runtime.spawn(async move {
match handle.await {
Ok(r) => r,
Err(e) => {
// Forward the panic to the client
// Recommended behaviour is to exit the process
log::error!("task panicked: {:?}", e);
self.send_panic(Box::new(e));
panic!("watch_panic: task panicked");
}
}
});
handle
}
}
15 changes: 7 additions & 8 deletions livekit-ffi/src/server/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ fn on_disconnect(
disconnect: proto::DisconnectRequest,
) -> FfiResult<proto::DisconnectResponse> {
let async_id = server.next_id();
server.async_runtime.spawn(async move {
let handle = server.async_runtime.spawn(async move {
let ffi_room =
server.retrieve_handle::<room::FfiRoom>(disconnect.room_handle).unwrap().clone();

Expand All @@ -73,7 +73,7 @@ fn on_disconnect(
}))
.await;
});

server.watch_panic(handle);
Ok(proto::DisconnectResponse { async_id })
}

Expand Down Expand Up @@ -205,10 +205,8 @@ fn on_get_stats(
get_stats: proto::GetStatsRequest,
) -> FfiResult<proto::GetStatsResponse> {
let ffi_track = server.retrieve_handle::<FfiTrack>(get_stats.track_handle)?.clone();

let async_id = server.next_id();

server.async_runtime.spawn(async move {
let handle = server.async_runtime.spawn(async move {
match ffi_track.track.get_stats().await {
Ok(stats) => {
let _ = server
Expand All @@ -230,7 +228,7 @@ fn on_get_stats(
}
}
});

server.watch_panic(handle);
Ok(proto::GetStatsResponse { async_id })
}

Expand Down Expand Up @@ -339,6 +337,7 @@ fn new_audio_resampler(
}

/// Remix and resample an audio frame
/// TODO: Deprecate this function
fn remix_and_resample(
server: &'static FfiServer,
remix: proto::RemixAndResampleRequest,
Expand Down Expand Up @@ -504,7 +503,7 @@ fn on_get_session_stats(
let ffi_room = server.retrieve_handle::<room::FfiRoom>(get_session_stats.room_handle)?.clone();
let async_id = server.next_id();

server.async_runtime.spawn(async move {
let handle = server.async_runtime.spawn(async move {
match ffi_room.inner.room.get_stats().await {
Ok(stats) => {
let _ = server
Expand Down Expand Up @@ -539,7 +538,7 @@ fn on_get_session_stats(
}
}
});

server.watch_panic(handle);
Ok(proto::GetSessionStatsResponse { async_id })
}

Expand Down
Loading
Loading