From 370455d67ac0fb51224db0c7333405f7bed350d8 Mon Sep 17 00:00:00 2001 From: zarik5 Date: Sat, 28 Sep 2024 23:16:40 +0200 Subject: [PATCH] refactor(client): :recycle: Move decoder out of ClientCoreContext; fix IDR race condition (#2421) --- alvr/client_core/src/c_api.rs | 340 ++++++++++++------ alvr/client_core/src/connection.rs | 80 +---- alvr/client_core/src/decoder.rs | 8 +- alvr/client_core/src/lib.rs | 99 ++--- .../src/platform/android/decoder.rs | 38 +- alvr/client_mock/src/main.rs | 11 - alvr/client_openxr/src/lib.rs | 52 +-- alvr/client_openxr/src/stream.rs | 110 ++++-- alvr/session/src/settings.rs | 2 +- 9 files changed, 389 insertions(+), 351 deletions(-) diff --git a/alvr/client_core/src/c_api.rs b/alvr/client_core/src/c_api.rs index 255e6fda9d..51b0a62247 100644 --- a/alvr/client_core/src/c_api.rs +++ b/alvr/client_core/src/c_api.rs @@ -1,8 +1,10 @@ use crate::{ + decoder::{self, DecoderConfig, DecoderSource}, graphics::{GraphicsContext, LobbyRenderer, RenderViewInput, StreamRenderer}, storage, ClientCapabilities, ClientCoreContext, ClientCoreEvent, }; use alvr_common::{ + anyhow::Result, debug, error, glam::{Quat, UVec2, Vec2, Vec3}, info, @@ -11,10 +13,9 @@ use alvr_common::{ warn, DeviceMotion, Fov, OptLazy, Pose, }; use alvr_packets::{ButtonEntry, ButtonValue, FaceData, ViewParams}; -use alvr_session::{CodecType, FoveatedEncodingConfig}; +use alvr_session::{CodecType, FoveatedEncodingConfig, MediacodecDataType}; use std::{ cell::RefCell, - collections::VecDeque, ffi::{c_char, c_void, CStr, CString}, ptr, rc::Rc, @@ -26,9 +27,7 @@ static CLIENT_CORE_CONTEXT: OptLazy = alvr_common::lazy_mut_n static HUD_MESSAGE: Lazy> = Lazy::new(|| Mutex::new("".into())); static SETTINGS: Lazy> = Lazy::new(|| Mutex::new("".into())); static SERVER_VERSION: Lazy> = Lazy::new(|| Mutex::new("".into())); -#[allow(clippy::type_complexity)] -static NAL_QUEUE: Lazy)>>> = - Lazy::new(|| Mutex::new(VecDeque::new())); +static DECODER_CONFIG_BUFFER: Lazy>> = Lazy::new(|| Mutex::new("".into())); // Core interface: @@ -36,9 +35,8 @@ static NAL_QUEUE: Lazy)>>> = pub struct AlvrClientCapabilities { default_view_width: u32, default_view_height: u32, - external_decoder: bool, refresh_rates: *const f32, - refresh_rates_count: i32, + refresh_rates_count: u64, foveated_encoding: bool, encoder_high_profile: bool, encoder_10_bits: bool, @@ -72,7 +70,14 @@ pub enum AlvrEvent { DecoderConfig { codec: AlvrCodec, }, - FrameReady, +} + +#[repr(C)] +pub struct AlvrVideoFrameData { + callback_context: *mut c_void, + timestamp_ns: u64, + buffer_ptr: *const u8, + buffer_size: u64, } #[repr(C)] @@ -262,13 +267,12 @@ pub unsafe extern "C" fn alvr_initialize(capabilities: AlvrClientCapabilities) { let refresh_rates = slice::from_raw_parts( capabilities.refresh_rates, - capabilities.refresh_rates_count as _, + capabilities.refresh_rates_count as usize, ) .to_vec(); let capabilities = ClientCapabilities { default_view_resolution, - external_decoder: capabilities.external_decoder, refresh_rates, foveated_encoding: capabilities.foveated_encoding, encoder_high_profile: capabilities.encoder_high_profile, @@ -337,9 +341,7 @@ pub extern "C" fn alvr_poll_event(out_event: *mut AlvrEvent) -> bool { amplitude, }, ClientCoreEvent::DecoderConfig { codec, config_nal } => { - NAL_QUEUE - .lock() - .push_back((0, [ViewParams::default(); 2], config_nal)); + *DECODER_CONFIG_BUFFER.lock() = config_nal; AlvrEvent::DecoderConfig { codec: match codec { @@ -349,17 +351,6 @@ pub extern "C" fn alvr_poll_event(out_event: *mut AlvrEvent) -> bool { }, } } - ClientCoreEvent::FrameReady { - timestamp, - view_params, - nal, - } => { - NAL_QUEUE - .lock() - .push_back((timestamp.as_nanos() as _, view_params, nal)); - - AlvrEvent::FrameReady - } }; unsafe { *out_event = event }; @@ -373,58 +364,6 @@ pub extern "C" fn alvr_poll_event(out_event: *mut AlvrEvent) -> bool { } } -/// Settings will be updated after receiving StreamingStarted event -#[no_mangle] -pub extern "C" fn alvr_get_settings_json(buffer: *mut c_char) -> u64 { - string_to_c_str(buffer, &SETTINGS.lock()) -} - -/// Will be updated after receiving StreamingStarted event -#[no_mangle] -pub extern "C" fn alvr_get_server_version(buffer: *mut c_char) -> u64 { - string_to_c_str(buffer, &SERVER_VERSION.lock()) -} - -/// Call only with external decoder -/// Returns the number of bytes of the next nal, or 0 if there are no nals ready. -/// If out_nal or out_timestamp_ns is null, no nal is dequeued. Use to get the nal allocation size. -/// Returns out_timestamp_ns == 0 if config NAL. -#[no_mangle] -pub extern "C" fn alvr_poll_nal( - out_timestamp_ns: *mut u64, - out_views_params: *mut AlvrViewParams, - out_nal: *mut c_char, -) -> u64 { - let mut queue_lock = NAL_QUEUE.lock(); - if let Some((timestamp_ns, view_params, data)) = queue_lock.pop_front() { - let nal_size = data.len(); - if !out_nal.is_null() && !out_timestamp_ns.is_null() { - unsafe { - *out_timestamp_ns = timestamp_ns; - - if !out_views_params.is_null() { - *out_views_params = AlvrViewParams { - pose: to_capi_pose(view_params[0].pose), - fov: to_capi_fov(view_params[0].fov), - }; - *out_views_params.offset(1) = AlvrViewParams { - pose: to_capi_pose(view_params[1].pose), - fov: to_capi_fov(view_params[1].fov), - }; - } - - ptr::copy_nonoverlapping(data.as_ptr(), out_nal as _, nal_size); - } - } else { - queue_lock.push_front((timestamp_ns, view_params, data)) - } - - nal_size as u64 - } else { - 0 - } -} - // Returns the length of the message. message_buffer can be null. #[no_mangle] pub extern "C" fn alvr_hud_message(message_buffer: *mut c_char) -> u64 { @@ -442,6 +381,32 @@ pub extern "C" fn alvr_hud_message(message_buffer: *mut c_char) -> u64 { cstring.as_bytes_with_nul().len() as u64 } +/// Settings will be updated after receiving StreamingStarted event +#[no_mangle] +pub extern "C" fn alvr_get_settings_json(out_buffer: *mut c_char) -> u64 { + string_to_c_str(out_buffer, &SETTINGS.lock()) +} + +/// Will be updated after receiving StreamingStarted event +#[no_mangle] +pub extern "C" fn alvr_get_server_version(out_buffer: *mut c_char) -> u64 { + string_to_c_str(out_buffer, &SERVER_VERSION.lock()) +} + +/// Returns the number of bytes of the decoder_buffer +#[no_mangle] +pub extern "C" fn alvr_get_decoder_config(out_buffer: *mut c_char) -> u64 { + let buffer = DECODER_CONFIG_BUFFER.lock(); + + let size = buffer.len(); + + if !out_buffer.is_null() { + unsafe { ptr::copy_nonoverlapping(buffer.as_ptr(), out_buffer as _, size) } + } + + size as u64 +} + #[no_mangle] pub extern "C" fn alvr_send_battery(device_id: u64, gauge_value: f32, is_plugged: bool) { if let Some(context) = &*CLIENT_CORE_CONTEXT.lock() { @@ -469,7 +434,7 @@ pub extern "C" fn alvr_send_custom_interaction_profile( input_ids_ptr: *const u64, input_ids_count: u64, ) { - let input_ids = unsafe { slice::from_raw_parts(input_ids_ptr, input_ids_count as _) }; + let input_ids = unsafe { slice::from_raw_parts(input_ids_ptr, input_ids_count as usize) }; if let Some(context) = &*CLIENT_CORE_CONTEXT.lock() { context.send_custom_interaction_profile(device_id, input_ids.iter().cloned().collect()); } @@ -528,7 +493,7 @@ pub extern "C" fn alvr_send_tracking( ptr::copy_nonoverlapping( device_motions, raw_motions.as_mut_ptr(), - device_motions_count as _, + device_motions_count as usize, ); } @@ -613,7 +578,7 @@ pub extern "C" fn alvr_send_tracking( #[no_mangle] pub extern "C" fn alvr_get_head_prediction_offset_ns() -> u64 { if let Some(context) = &*CLIENT_CORE_CONTEXT.lock() { - context.get_head_prediction_offset().as_nanos() as _ + context.get_head_prediction_offset().as_nanos() as u64 } else { 0 } @@ -622,68 +587,82 @@ pub extern "C" fn alvr_get_head_prediction_offset_ns() -> u64 { #[no_mangle] pub extern "C" fn alvr_get_tracker_prediction_offset_ns() -> u64 { if let Some(context) = &*CLIENT_CORE_CONTEXT.lock() { - context.get_tracker_prediction_offset().as_nanos() as _ + context.get_tracker_prediction_offset().as_nanos() as u64 } else { 0 } } +/// Safety: `context` must be thread safe and valid until the StreamingStopped event. #[no_mangle] -pub extern "C" fn alvr_report_submit(target_timestamp_ns: u64, vsync_queue_ns: u64) { +pub extern "C" fn alvr_set_decoder_input_callback( + callback_context: *mut c_void, + callback: extern "C" fn(AlvrVideoFrameData) -> bool, +) { + struct CallbackContext(*mut c_void); + unsafe impl Send for CallbackContext {} + + let callback_context = CallbackContext(callback_context); + if let Some(context) = &*CLIENT_CORE_CONTEXT.lock() { - context.report_submit( - Duration::from_nanos(target_timestamp_ns), - Duration::from_nanos(vsync_queue_ns), - ); + context.set_decoder_input_callback(Box::new(move |timestamp, buffer| { + // Make sure to capture the struct itself instead of just the pointer to make the + // borrow checker happy + let callback_context = &callback_context; + + callback(AlvrVideoFrameData { + callback_context: callback_context.0, + timestamp_ns: timestamp.as_nanos() as u64, + buffer_ptr: buffer.as_ptr(), + buffer_size: buffer.len() as u64, + }) + })); } } #[no_mangle] -pub extern "C" fn alvr_request_idr() { +pub extern "C" fn alvr_report_frame_decoded(target_timestamp_ns: u64) { if let Some(context) = &*CLIENT_CORE_CONTEXT.lock() { - context.request_idr(); + context.report_frame_decoded(Duration::from_nanos(target_timestamp_ns as u64)); } } #[no_mangle] -pub extern "C" fn alvr_report_frame_decoded(target_timestamp_ns: u64) { +pub extern "C" fn alvr_report_fatal_decoder_error(message: *const c_char) { if let Some(context) = &*CLIENT_CORE_CONTEXT.lock() { - context.report_frame_decoded(Duration::from_nanos(target_timestamp_ns as _)); + context.report_fatal_decoder_error(unsafe { CStr::from_ptr(message).to_str().unwrap() }); } } +/// out_view_params must be a vector of 2 elements +/// out_view_params is populated only if the core context is valid #[no_mangle] -pub extern "C" fn alvr_report_compositor_start(target_timestamp_ns: u64) { +pub unsafe extern "C" fn alvr_report_compositor_start( + target_timestamp_ns: u64, + out_view_params: *mut AlvrViewParams, +) { if let Some(context) = &*CLIENT_CORE_CONTEXT.lock() { - context.report_compositor_start(Duration::from_nanos(target_timestamp_ns as _)); + let view_params = + context.report_compositor_start(Duration::from_nanos(target_timestamp_ns as u64)); + + *out_view_params = AlvrViewParams { + pose: to_capi_pose(view_params[0].pose), + fov: to_capi_fov(view_params[0].fov), + }; + *out_view_params.offset(1) = AlvrViewParams { + pose: to_capi_pose(view_params[1].pose), + fov: to_capi_fov(view_params[1].fov), + }; } } -/// Returns frame timestamp in nanoseconds or -1 if no frame available. Returns an AHardwareBuffer -/// from out_buffer. #[no_mangle] -pub unsafe extern "C" fn alvr_get_frame( - view_params: *mut AlvrViewParams, - out_buffer: *mut *mut std::ffi::c_void, -) -> i64 { +pub extern "C" fn alvr_report_submit(target_timestamp_ns: u64, vsync_queue_ns: u64) { if let Some(context) = &*CLIENT_CORE_CONTEXT.lock() { - if let Some(decoded_frame) = context.get_frame() { - *view_params = AlvrViewParams { - pose: to_capi_pose(decoded_frame.view_params[0].pose), - fov: to_capi_fov(decoded_frame.view_params[0].fov), - }; - *view_params.offset(1) = AlvrViewParams { - pose: to_capi_pose(decoded_frame.view_params[1].pose), - fov: to_capi_fov(decoded_frame.view_params[1].fov), - }; - *out_buffer = decoded_frame.buffer_ptr as _; - - decoded_frame.timestamp.as_nanos() as _ - } else { - -1 - } - } else { - -1 + context.report_submit( + Duration::from_nanos(target_timestamp_ns), + Duration::from_nanos(vsync_queue_ns), + ); } } @@ -842,3 +821,136 @@ pub unsafe extern "C" fn alvr_render_stream_opengl( } }); } + +// Decoder-related interface + +static DECODER_SOURCE: OptLazy = alvr_common::lazy_mut_none(); + +#[repr(u8)] +pub enum AlvrMediacodecPropType { + Float, + Int32, + Int64, + String, +} + +#[repr(C)] +pub union AlvrMediacodecPropValue { + float_: f32, + int32: i32, + int64: i64, + string: *const c_char, +} + +#[repr(C)] +pub struct AlvrMediacodecOption { + key: *const c_char, + ty: AlvrMediacodecPropType, + value: AlvrMediacodecPropValue, +} + +#[repr(C)] +pub struct AlvrDecoderConfig { + codec: AlvrCodec, + force_software_decoder: bool, + max_buffering_frames: f32, + buffering_history_weight: f32, + options: *const AlvrMediacodecOption, + options_count: u64, + config_buffer: *const u8, + config_buffer_size: u64, +} + +/// alvr_initialize() must be called before alvr_create_decoder +#[no_mangle] +pub extern "C" fn alvr_create_decoder(config: AlvrDecoderConfig) { + let config = DecoderConfig { + codec: match config.codec { + AlvrCodec::H264 => CodecType::H264, + AlvrCodec::Hevc => CodecType::Hevc, + AlvrCodec::AV1 => CodecType::AV1, + }, + force_software_decoder: config.force_software_decoder, + max_buffering_frames: config.max_buffering_frames, + buffering_history_weight: config.buffering_history_weight, + options: if !config.options.is_null() { + let options = + unsafe { slice::from_raw_parts(config.options, config.options_count as usize) }; + options + .iter() + .map(|option| unsafe { + let key = CStr::from_ptr(option.key).to_str().unwrap(); + let value = match option.ty { + AlvrMediacodecPropType::Float => { + MediacodecDataType::Float(option.value.float_) + } + AlvrMediacodecPropType::Int32 => { + MediacodecDataType::Int32(option.value.int32) + } + AlvrMediacodecPropType::Int64 => { + MediacodecDataType::Int64(option.value.int64) + } + AlvrMediacodecPropType::String => MediacodecDataType::String( + CStr::from_ptr(option.value.string) + .to_str() + .unwrap() + .to_owned(), + ), + }; + + (key.to_string(), value) + }) + .collect() + } else { + vec![] + }, + config_buffer: unsafe { + slice::from_raw_parts(config.config_buffer, config.config_buffer_size as usize).to_vec() + }, + }; + + let (mut sink, source) = + decoder::create_decoder(config, |maybe_timestamp: Result| { + if let Some(context) = &*CLIENT_CORE_CONTEXT.lock() { + match maybe_timestamp { + Ok(timestamp) => context.report_frame_decoded(timestamp), + Err(e) => context.report_fatal_decoder_error(&e.to_string()), + } + } + }); + + *DECODER_SOURCE.lock() = Some(source); + + if let Some(context) = &*CLIENT_CORE_CONTEXT.lock() { + context.set_decoder_input_callback(Box::new(move |timestamp, buffer| { + sink.push_nal(timestamp, buffer) + })); + } +} + +#[no_mangle] +pub extern "C" fn alvr_destroy_decoder() { + *DECODER_SOURCE.lock() = None; +} + +// Returns true if the timestamp and buffer has been written to +#[no_mangle] +pub extern "C" fn alvr_get_frame( + out_timestamp_ns: *mut u64, + out_buffer_ptr: *mut *mut c_void, +) -> bool { + if let Some(source) = &mut *DECODER_SOURCE.lock() { + if let Some((timestamp, buffer_ptr)) = source.get_frame() { + unsafe { + *out_timestamp_ns = timestamp.as_nanos() as u64; + *out_buffer_ptr = buffer_ptr; + } + + true + } else { + false + } + } else { + false + } +} diff --git a/alvr/client_core/src/connection.rs b/alvr/client_core/src/connection.rs index 30a693340a..db6ee4074a 100644 --- a/alvr/client_core/src/connection.rs +++ b/alvr/client_core/src/connection.rs @@ -1,7 +1,6 @@ #![allow(clippy::if_same_then_else)] use crate::{ - decoder::{self, DecoderConfig, DecoderSink, DecoderSource}, logging_backend::{LogMirrorData, LOG_CHANNEL_SENDER}, platform, sockets::AnnouncerSocket, @@ -64,8 +63,7 @@ pub struct ConnectionContext { pub tracking_sender: Mutex>>, pub statistics_sender: Mutex>>, pub statistics_manager: Mutex>, - pub decoder_sink: Mutex>, - pub decoder_source: Mutex>, + pub decoder_callback: Mutex bool + Send>>>, pub head_pose_queue: RwLock>, pub last_good_head_pose: RwLock, pub view_params: RwLock<[ViewParams; 2]>, @@ -274,9 +272,8 @@ fn connection_pipeline( let video_receive_thread = thread::spawn({ let ctx = Arc::clone(&ctx); - let event_queue = Arc::clone(&event_queue); move || { - let mut stream_corrupted = false; + let mut stream_corrupted = true; while is_streaming(&ctx) { let data = match video_receiver.recv(STREAMING_RECV_TIMEOUT) { Ok(data) => data, @@ -302,37 +299,14 @@ fn connection_pipeline( } if !stream_corrupted || !settings.connection.avoid_video_glitching { - if capabilities.external_decoder { - let mut head_pose = *ctx.last_good_head_pose.read(); - for (timestamp, pose) in &*ctx.head_pose_queue.read() { - if *timestamp == header.timestamp { - head_pose = *pose; - break; - } - } - - let view_params = &ctx.view_params.read(); - event_queue.lock().push_back(ClientCoreEvent::FrameReady { - timestamp: header.timestamp, - view_params: [ - ViewParams { - pose: head_pose * view_params[0].pose, - fov: view_params[0].fov, - }, - ViewParams { - pose: head_pose * view_params[1].pose, - fov: view_params[1].fov, - }, - ], - nal: nal.to_vec(), - }); - } else if !ctx - .decoder_sink + let submitted = ctx + .decoder_callback .lock() .as_mut() - .map(|sink| sink.push_nal(header.timestamp, nal)) - .unwrap_or(false) - { + .map(|callback| callback(header.timestamp, nal)) + .unwrap_or(false); + + if !submitted { stream_corrupted = true; if let Some(sender) = &mut *ctx.control_sender.lock() { sender.send(&ClientControlPacket::RequestIdr).ok(); @@ -490,39 +464,12 @@ fn connection_pipeline( match maybe_packet { Ok(ServerControlPacket::DecoderConfig(config)) => { - if capabilities.external_decoder { - event_queue - .lock() - .push_back(ClientCoreEvent::DecoderConfig { - codec: config.codec, - config_nal: config.config_buffer, - }); - } else if ctx.decoder_sink.lock().is_none() { - let config = DecoderConfig { + event_queue + .lock() + .push_back(ClientCoreEvent::DecoderConfig { codec: config.codec, - force_software_decoder: settings.video.force_software_decoder, - max_buffering_frames: settings.video.max_buffering_frames, - buffering_history_weight: settings.video.buffering_history_weight, - options: settings.video.mediacodec_extra_options.clone(), - config_buffer: config.config_buffer, - }; - - let (sink, source) = decoder::create_decoder(config, { - let ctx = Arc::clone(&ctx); - move |target_timestamp| { - if let Some(stats) = &mut *ctx.statistics_manager.lock() { - stats.report_frame_decoded(target_timestamp); - } - } + config_nal: config.config_buffer, }); - - *ctx.decoder_sink.lock() = Some(sink); - *ctx.decoder_source.lock() = Some(source); - - if let Some(sender) = &mut *ctx.control_sender.lock() { - sender.send(&ClientControlPacket::RequestIdr).ok(); - } - } } Ok(ServerControlPacket::Restarting) => { info!("{SERVER_RESTART_MESSAGE}"); @@ -600,9 +547,6 @@ fn connection_pipeline( .lock() .push_back(ClientCoreEvent::StreamingStopped); - *ctx.decoder_sink.lock() = None; - *ctx.decoder_source.lock() = None; - // Remove lock to allow threads to properly exit: drop(connection_state_lock); diff --git a/alvr/client_core/src/decoder.rs b/alvr/client_core/src/decoder.rs index ec27945fba..0816e843ff 100644 --- a/alvr/client_core/src/decoder.rs +++ b/alvr/client_core/src/decoder.rs @@ -2,7 +2,7 @@ use alvr_common::anyhow::Result; use alvr_session::{CodecType, MediacodecDataType}; use std::time::Duration; -#[derive(Clone, Default)] +#[derive(Clone, Default, PartialEq)] pub struct DecoderConfig { pub codec: CodecType, pub force_software_decoder: bool, @@ -37,13 +37,13 @@ pub struct DecoderSource { impl DecoderSource { /// If a frame is available, return the timestamp and the AHardwareBuffer. - pub fn get_frame(&mut self) -> Result> { + pub fn get_frame(&mut self) -> Option<(Duration, *mut std::ffi::c_void)> { #[cfg(target_os = "android")] { self.inner.dequeue_frame() } #[cfg(not(target_os = "android"))] - alvr_common::anyhow::bail!("Not implemented"); + None } } @@ -51,7 +51,7 @@ impl DecoderSource { #[allow(unused_variables)] pub fn create_decoder( config: DecoderConfig, - report_frame_decoded: impl Fn(Duration) + Send + 'static, + report_frame_decoded: impl Fn(Result) + Send + Sync + 'static, ) -> (DecoderSink, DecoderSource) { #[cfg(target_os = "android")] { diff --git a/alvr/client_core/src/lib.rs b/alvr/client_core/src/lib.rs index df6ce97451..a1ef98f008 100644 --- a/alvr/client_core/src/lib.rs +++ b/alvr/client_core/src/lib.rs @@ -7,7 +7,6 @@ mod c_api; mod connection; -mod decoder; mod logging_backend; mod platform; mod sockets; @@ -17,6 +16,7 @@ mod storage; #[cfg(target_os = "android")] mod audio; +pub mod decoder; pub mod graphics; use alvr_common::{ @@ -67,24 +67,12 @@ pub enum ClientCoreEvent { codec: CodecType, config_nal: Vec, }, - FrameReady { - timestamp: Duration, - view_params: [ViewParams; 2], - nal: Vec, - }, -} - -pub struct DecodedFrame { - pub timestamp: Duration, - pub view_params: [ViewParams; 2], - pub buffer_ptr: *mut std::ffi::c_void, } // Note: this struct may change without breaking network protocol changes #[derive(Clone)] pub struct ClientCapabilities { pub default_view_resolution: UVec2, - pub external_decoder: bool, pub refresh_rates: Vec, pub foveated_encoding: bool, pub encoder_high_profile: bool, @@ -329,31 +317,45 @@ impl ClientCoreContext { } } - pub fn get_frame(&self) -> Option { - dbg_client_core!("get_frame"); + /// The callback should return true if the frame was successfully submitted to the decoder + pub fn set_decoder_input_callback( + &self, + callback: Box bool + Send>, + ) { + dbg_client_core!("set_decoder_input_callback"); - let mut decoder_source_lock = self.connection_context.decoder_source.lock(); - let decoder_source = decoder_source_lock.as_mut()?; + *self.connection_context.decoder_callback.lock() = Some(callback); - let (frame_timestamp, buffer_ptr) = match decoder_source.get_frame() { - Ok(maybe_pair) => maybe_pair?, - Err(e) => { - error!("Error getting frame, restarting connection: {}", e); + if let Some(sender) = &mut *self.connection_context.control_sender.lock() { + sender.send(&ClientControlPacket::RequestIdr).ok(); + } + } - // The connection loop observes changes on this value - *self.connection_context.state.write() = ConnectionState::Disconnecting; + pub fn report_frame_decoded(&self, timestamp: Duration) { + dbg_client_core!("report_frame_decoded"); - return None; - } - }; + if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() { + stats.report_frame_decoded(timestamp); + } + } + + pub fn report_fatal_decoder_error(&self, error: &str) { + error!("Fatal decoder error, restarting connection: {error}"); + + // The connection loop observes changes on this value + *self.connection_context.state.write() = ConnectionState::Disconnecting; + } + + pub fn report_compositor_start(&self, timestamp: Duration) -> [ViewParams; 2] { + dbg_client_core!("report_compositor_start"); if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() { - stats.report_compositor_start(frame_timestamp); + stats.report_compositor_start(timestamp); } let mut head_pose = *self.connection_context.last_good_head_pose.read(); - for (timestamp, pose) in &*self.connection_context.head_pose_queue.read() { - if *timestamp == frame_timestamp { + for (ts, pose) in &*self.connection_context.head_pose_queue.read() { + if *ts == timestamp { head_pose = *pose; break; } @@ -370,48 +372,17 @@ impl ClientCoreContext { }, ]; - Some(DecodedFrame { - timestamp: frame_timestamp, - view_params, - buffer_ptr, - }) - } - - /// Call only with external decoder - pub fn request_idr(&self) { - dbg_client_core!("request_idr"); - - if let Some(sender) = &mut *self.connection_context.control_sender.lock() { - sender.send(&ClientControlPacket::RequestIdr).ok(); - } - } - - /// Call only with external decoder - pub fn report_frame_decoded(&self, target_timestamp: Duration) { - dbg_client_core!("report_frame_decoded"); - - if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() { - stats.report_frame_decoded(target_timestamp); - } - } - - /// Call only with external decoder - pub fn report_compositor_start(&self, target_timestamp: Duration) { - dbg_client_core!("report_compositor_start"); - - if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() { - stats.report_compositor_start(target_timestamp); - } + view_params } - pub fn report_submit(&self, target_timestamp: Duration, vsync_queue: Duration) { + pub fn report_submit(&self, timestamp: Duration, vsync_queue: Duration) { dbg_client_core!("report_submit"); if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() { - stats.report_submit(target_timestamp, vsync_queue); + stats.report_submit(timestamp, vsync_queue); if let Some(sender) = &mut *self.connection_context.statistics_sender.lock() { - if let Some(stats) = stats.summary(target_timestamp) { + if let Some(stats) = stats.summary(timestamp) { sender.send_header(&stats).ok(); } else { warn!("Statistics summary not ready!"); diff --git a/alvr/client_core/src/platform/android/decoder.rs b/alvr/client_core/src/platform/android/decoder.rs index 7d0e9a44ee..55f8802610 100644 --- a/alvr/client_core/src/platform/android/decoder.rs +++ b/alvr/client_core/src/platform/android/decoder.rs @@ -20,7 +20,7 @@ use std::{ ffi::c_void, ops::Deref, ptr, - sync::Arc, + sync::{Arc, Weak}, thread::{self, JoinHandle}, time::Duration, }; @@ -88,7 +88,6 @@ unsafe impl Send for QueuedImage {} // Access the image queue synchronously. pub struct VideoDecoderSource { running: Arc, - error: Arc>>, dequeue_thread: Option>, image_queue: Arc>>, config: DecoderConfig, @@ -99,11 +98,7 @@ unsafe impl Send for VideoDecoderSource {} impl VideoDecoderSource { // The application MUST finish using the returned buffer before calling this function again - pub fn dequeue_frame(&mut self) -> Result> { - if let Some(error) = self.error.lock().take() { - return Err(anyhow!(error)); - } - + pub fn dequeue_frame(&mut self) -> Option<(Duration, *mut c_void)> { let mut image_queue_lock = self.image_queue.lock(); if let Some(queued_image) = image_queue_lock.front() { @@ -124,7 +119,7 @@ impl VideoDecoderSource { if let Some(queued_image) = image_queue_lock.front_mut() { queued_image.in_use = true; - Ok(Some(( + Some(( queued_image.timestamp, queued_image .image @@ -132,11 +127,11 @@ impl VideoDecoderSource { .unwrap() .as_ptr() .cast(), - ))) + )) } else { // TODO: add back when implementing proper phase sync //warn!("Video frame queue underflow!"); - Ok(None) + None } } } @@ -193,10 +188,12 @@ fn decoder_attempt_setup( Ok(decoder) } +// Since we leak the ImageReader, and we pass frame_result_callback to it which contains a reference +// to ClientCoreContext, to avoid circular references we need to use a Weak reference. fn decoder_lifecycle( config: DecoderConfig, csd_0: Vec, - dequeued_frame_callback: impl Fn(Duration) + Send + 'static, + frame_result_callback: Weak) + Send + Sync + 'static>, running: Arc, decoder_sink: Arc>>, decoder_ready_notifier: Arc, @@ -220,7 +217,9 @@ fn decoder_lifecycle( Ok(AcquireResult::Image(image)) => { let timestamp = Duration::from_nanos(image.timestamp().unwrap() as u64); - dequeued_frame_callback(timestamp); + if let Some(callback) = frame_result_callback.upgrade() { + callback(Ok(timestamp)); + } image_queue_lock.push_back(QueuedImage { timestamp, @@ -305,7 +304,7 @@ fn decoder_lifecycle( error!("Decoder dequeue error: {e}"); } } - Ok(DequeuedOutputBufferInfoResult::TryAgainLater) => thread::yield_now(), + Ok(DequeuedOutputBufferInfoResult::TryAgainLater) => continue, Ok(i) => info!("Decoder dequeue event: {i:?}"), Err(e) => { error!("Decoder dequeue error: {e}"); @@ -337,10 +336,9 @@ fn decoder_lifecycle( pub fn video_decoder_split( config: DecoderConfig, csd_0: Vec, - dequeued_frame_callback: impl Fn(Duration) + Send + 'static, + frame_result_callback: impl Fn(Result) + Send + Sync + 'static, ) -> Result<(VideoDecoderSink, VideoDecoderSource)> { let running = Arc::new(RelaxedAtomic::new(true)); - let error = Arc::new(Mutex::new(None)); let decoder_sink = Arc::new(Mutex::new(None::)); let decoder_ready_notifier = Arc::new(Condvar::new()); let image_queue = Arc::new(Mutex::new(VecDeque::::new())); @@ -348,7 +346,6 @@ pub fn video_decoder_split( let dequeue_thread = thread::spawn({ let config = config.clone(); let running = Arc::clone(&running); - let error = Arc::clone(&error); let decoder_sink = Arc::clone(&decoder_sink); let decoder_ready_notifier = Arc::clone(&decoder_ready_notifier); let image_queue = Arc::clone(&image_queue); @@ -363,22 +360,24 @@ pub fn video_decoder_split( ) { Ok(reader) => reader, Err(e) => { - *error.lock() = Some(anyhow!("{e}")); + frame_result_callback(Err(anyhow!("{e}"))); return; } }; + let frame_result_callback = Arc::new(frame_result_callback); + if let Err(e) = decoder_lifecycle( config, csd_0, - dequeued_frame_callback, + Arc::downgrade(&frame_result_callback), running, decoder_sink, decoder_ready_notifier, Arc::clone(&image_queue), &mut image_reader, ) { - *error.lock() = Some(e); + frame_result_callback(Err(e)); } image_queue.lock().clear(); @@ -403,7 +402,6 @@ pub fn video_decoder_split( }; let source = VideoDecoderSource { running, - error, dequeue_thread: Some(dequeue_thread), image_queue, config, diff --git a/alvr/client_mock/src/main.rs b/alvr/client_mock/src/main.rs index 176f1b6135..4ffb9b9510 100644 --- a/alvr/client_mock/src/main.rs +++ b/alvr/client_mock/src/main.rs @@ -205,7 +205,6 @@ fn client_thread( ) { let capabilities = ClientCapabilities { default_view_resolution: UVec2::new(1920, 1832), - external_decoder: true, refresh_rates: vec![60.0, 72.0, 80.0, 90.0, 120.0], foveated_encoding: false, encoder_high_profile: false, @@ -270,16 +269,6 @@ fn client_thread( window_output.decoder_codec = Some(codec); } - ClientCoreEvent::FrameReady { timestamp, .. } => { - if streaming.value() && !got_decoder_config.value() { - client_core_context.request_idr(); - } - - window_output.current_frame_timestamp = timestamp; - - thread::sleep(Duration::from_millis(input_lock.emulated_decode_ms)); - client_core_context.report_frame_decoded(timestamp); - } } output_sender.send(window_output.clone()).ok(); diff --git a/alvr/client_openxr/src/lib.rs b/alvr/client_openxr/src/lib.rs index a4530419bd..6a52907387 100644 --- a/alvr/client_openxr/src/lib.rs +++ b/alvr/client_openxr/src/lib.rs @@ -20,17 +20,9 @@ use extra_extensions::{ }; use lobby::Lobby; use openxr as xr; -use std::{ - path::Path, - rc::Rc, - sync::Arc, - thread, - time::{Duration, Instant}, -}; +use std::{path::Path, rc::Rc, sync::Arc, thread, time::Duration}; use stream::StreamContext; -const DECODER_MAX_TIMEOUT_MULTIPLIER: f32 = 0.8; - fn from_xr_vec3(v: xr::Vector3f) -> Vec3 { Vec3::new(v.x, v.y, v.z) } @@ -247,7 +239,6 @@ pub fn entry_point() { let capabilities = ClientCapabilities { default_view_resolution, - external_decoder: false, refresh_rates, foveated_encoding: platform != Platform::Unknown, encoder_high_profile: platform != Platform::Unknown, @@ -318,8 +309,8 @@ pub fn entry_point() { lobby.update_reference_space(); - if let Some(context) = &mut stream_context { - context.update_reference_space(); + if let Some(stream) = &mut stream_context { + stream.update_reference_space(); } } xr::Event::PerfSettingsEXT(event) => { @@ -370,15 +361,13 @@ pub fn entry_point() { Rc::clone(&graphics_context), Arc::clone(&interaction_context), platform, - &new_config, + new_config.clone(), )); parsed_stream_config = Some(new_config); } } - ClientCoreEvent::StreamingStopped => { - stream_context = None; - } + ClientCoreEvent::StreamingStopped => stream_context = None, ClientCoreEvent::Haptics { device_id, duration, @@ -402,8 +391,10 @@ pub fn entry_point() { ) .unwrap(); } - ClientCoreEvent::DecoderConfig { .. } | ClientCoreEvent::FrameReady { .. } => { - panic!() + ClientCoreEvent::DecoderConfig { codec, config_nal } => { + if let Some(stream) = &mut stream_context { + stream.maybe_initialize_decoder(codec, config_nal); + } } } } @@ -435,29 +426,10 @@ pub fn entry_point() { } // todo: allow rendering lobby and stream layers at the same time and add cross fade - let (layer, display_time) = if let Some(context) = &mut stream_context { - let frame_poll_deadline = Instant::now() - + Duration::from_secs_f32( - frame_interval.as_secs_f32() * DECODER_MAX_TIMEOUT_MULTIPLIER, - ); - let mut frame_result = None; - while frame_result.is_none() && Instant::now() < frame_poll_deadline { - frame_result = core_context.get_frame(); - thread::yield_now(); - } - - let timestamp = frame_result - .as_ref() - .map(|r| r.timestamp) - .unwrap_or(vsync_time); - - let layer = context.render(frame_result, vsync_time); - - (layer, timestamp) + let (layer, display_time) = if let Some(stream) = &mut stream_context { + stream.render(frame_interval, vsync_time) } else { - let layer = lobby.render(frame_state.predicted_display_time); - - (layer, vsync_time) + (lobby.render(frame_state.predicted_display_time), vsync_time) }; graphics_context.make_current(); diff --git a/alvr/client_openxr/src/stream.rs b/alvr/client_openxr/src/stream.rs index 9f701e47bd..d6382e2925 100644 --- a/alvr/client_openxr/src/stream.rs +++ b/alvr/client_openxr/src/stream.rs @@ -4,21 +4,24 @@ use crate::{ XrContext, }; use alvr_client_core::{ + decoder::{self, DecoderConfig, DecoderSource}, graphics::{GraphicsContext, StreamRenderer}, - ClientCoreContext, DecodedFrame, Platform, + ClientCoreContext, Platform, }; use alvr_common::{ + anyhow::Result, error, glam::{UVec2, Vec2}, Pose, RelaxedAtomic, HAND_LEFT_ID, HAND_RIGHT_ID, HEAD_ID, }; use alvr_packets::{FaceData, StreamConfig, ViewParams}; use alvr_session::{ - BodyTrackingSourcesConfig, ClientsideFoveationConfig, ClientsideFoveationMode, EncoderConfig, - FaceTrackingSourcesConfig, FoveatedEncodingConfig, + BodyTrackingSourcesConfig, ClientsideFoveationConfig, ClientsideFoveationMode, CodecType, + EncoderConfig, FaceTrackingSourcesConfig, FoveatedEncodingConfig, MediacodecDataType, }; use openxr as xr; use std::{ + ptr, rc::Rc, sync::Arc, thread::{self, JoinHandle}, @@ -27,8 +30,9 @@ use std::{ // When the latency goes too high, if prediction offset is not capped tracking poll will fail. const MAX_PREDICTION: Duration = Duration::from_millis(70); +const DECODER_MAX_TIMEOUT_MULTIPLIER: f32 = 0.8; -#[derive(PartialEq)] +#[derive(PartialEq, Clone)] pub struct ParsedStreamConfig { pub view_resolution: UVec2, pub refresh_rate_hint: f32, @@ -38,6 +42,10 @@ pub struct ParsedStreamConfig { pub face_sources_config: Option, pub body_sources_config: Option, pub prefers_multimodal_input: bool, + pub force_software_decoder: bool, + pub max_buffering_frames: f32, + pub buffering_history_weight: f32, + pub decoder_options: Vec<(String, MediacodecDataType)>, } impl ParsedStreamConfig { @@ -76,6 +84,10 @@ impl ParsedStreamConfig { .as_option() .map(|c| c.multimodal_tracking) .unwrap_or(false), + force_software_decoder: config.settings.video.force_software_decoder, + max_buffering_frames: config.settings.video.max_buffering_frames, + buffering_history_weight: config.settings.video.buffering_history_weight, + decoder_options: config.settings.video.mediacodec_extra_options.clone(), } } } @@ -86,12 +98,12 @@ pub struct StreamContext { interaction_context: Arc, reference_space: Arc, swapchains: [xr::Swapchain; 2], - view_resolution: UVec2, - refresh_rate: f32, last_good_view_params: [ViewParams; 2], input_thread: Option>, input_thread_running: Arc, + config: ParsedStreamConfig, renderer: StreamRenderer, + decoder: Option<(DecoderConfig, DecoderSource)>, } impl StreamContext { @@ -101,7 +113,7 @@ impl StreamContext { gfx_ctx: Rc, interaction_ctx: Arc, platform: Platform, - config: &ParsedStreamConfig, + config: ParsedStreamConfig, ) -> StreamContext { if xr_ctx.instance.exts().fb_display_refresh_rate.is_some() { xr_ctx @@ -237,12 +249,12 @@ impl StreamContext { interaction_context: interaction_ctx, reference_space, swapchains, - view_resolution: config.view_resolution, - refresh_rate: config.refresh_rate_hint, last_good_view_params: [ViewParams::default(); 2], input_thread: Some(input_thread), input_thread_running, + config, renderer, + decoder: None, } } @@ -273,7 +285,7 @@ impl StreamContext { let xr_ctx = self.xr_context.clone(); let interaction_ctx = Arc::clone(&self.interaction_context); let reference_space = Arc::clone(&self.reference_space); - let refresh_rate = self.refresh_rate; + let refresh_rate = self.config.refresh_rate_hint; let running = Arc::clone(&self.input_thread_running); move || { stream_input_loop( @@ -288,26 +300,64 @@ impl StreamContext { })); } + pub fn maybe_initialize_decoder(&mut self, codec: CodecType, config_nal: Vec) { + let new_config = DecoderConfig { + codec, + force_software_decoder: self.config.force_software_decoder, + max_buffering_frames: self.config.max_buffering_frames, + buffering_history_weight: self.config.buffering_history_weight, + options: self.config.decoder_options.clone(), + config_buffer: config_nal, + }; + + let maybe_config = if let Some((config, _)) = &self.decoder { + (new_config != *config).then_some(new_config) + } else { + Some(new_config) + }; + + if let Some(config) = maybe_config { + let (mut sink, source) = decoder::create_decoder(config.clone(), { + let ctx = Arc::clone(&self.core_context); + move |maybe_timestamp: Result| match maybe_timestamp { + Ok(timestamp) => ctx.report_frame_decoded(timestamp), + Err(e) => ctx.report_fatal_decoder_error(&e.to_string()), + } + }); + self.decoder = Some((config, source)); + + self.core_context.set_decoder_input_callback(Box::new( + move |timestamp, buffer| -> bool { sink.push_nal(timestamp, buffer) }, + )); + } + } + pub fn render( &mut self, - decoded_frame: Option, + frame_interval: Duration, vsync_time: Duration, - ) -> CompositionLayerBuilder { - let timestamp; - let view_params; - let buffer_ptr; - if let Some(frame) = decoded_frame { - timestamp = frame.timestamp; - view_params = frame.view_params; - buffer_ptr = frame.buffer_ptr; - - self.last_good_view_params = frame.view_params; - } else { - timestamp = vsync_time; - view_params = self.last_good_view_params; - buffer_ptr = std::ptr::null_mut(); + ) -> (CompositionLayerBuilder, Duration) { + let frame_poll_deadline = Instant::now() + + Duration::from_secs_f32( + frame_interval.as_secs_f32() * DECODER_MAX_TIMEOUT_MULTIPLIER, + ); + let mut frame_result = None; + if let Some((_, source)) = &mut self.decoder { + while frame_result.is_none() && Instant::now() < frame_poll_deadline { + frame_result = source.get_frame(); + thread::sleep(Duration::from_micros(500)); + } } + let (timestamp, view_params, buffer_ptr) = + if let Some((timestamp, buffer_ptr)) = frame_result { + let view_params = self.core_context.report_compositor_start(timestamp); + + (timestamp, view_params, buffer_ptr) + } else { + (vsync_time, self.last_good_view_params, ptr::null_mut()) + }; + let left_swapchain_idx = self.swapchains[0].acquire_image().unwrap(); let right_swapchain_idx = self.swapchains[1].acquire_image().unwrap(); @@ -336,12 +386,12 @@ impl StreamContext { let rect = xr::Rect2Di { offset: xr::Offset2Di { x: 0, y: 0 }, extent: xr::Extent2Di { - width: self.view_resolution.x as _, - height: self.view_resolution.y as _, + width: self.config.view_resolution.x as _, + height: self.config.view_resolution.y as _, }, }; - CompositionLayerBuilder::new( + let layer = CompositionLayerBuilder::new( &self.reference_space, [ xr::CompositionLayerProjectionView::new() @@ -363,7 +413,9 @@ impl StreamContext { .image_rect(rect), ), ], - ) + ); + + (layer, timestamp) } } diff --git a/alvr/session/src/settings.rs b/alvr/session/src/settings.rs index 2c124d6fa0..569290a157 100644 --- a/alvr/session/src/settings.rs +++ b/alvr/session/src/settings.rs @@ -276,7 +276,7 @@ CABAC produces better compression but it's significantly slower and may lead to pub software: SoftwareEncodingConfig, } -#[derive(SettingsSchema, Serialize, Deserialize, Clone, Debug)] +#[derive(SettingsSchema, Serialize, Deserialize, Clone, Debug, PartialEq)] pub enum MediacodecDataType { Float(f32), Int32(i32),