Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(client): ♻️ Move decoder out of ClientCoreContext; fix IDR race condition #2421

Merged
merged 4 commits into from
Sep 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
340 changes: 226 additions & 114 deletions alvr/client_core/src/c_api.rs

Large diffs are not rendered by default.

80 changes: 12 additions & 68 deletions alvr/client_core/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![allow(clippy::if_same_then_else)]

use crate::{
decoder::{self, DecoderConfig, DecoderSink, DecoderSource},
logging_backend::{LogMirrorData, LOG_CHANNEL_SENDER},
platform,
sockets::AnnouncerSocket,
Expand Down Expand Up @@ -64,8 +63,7 @@ pub struct ConnectionContext {
pub tracking_sender: Mutex<Option<StreamSender<Tracking>>>,
pub statistics_sender: Mutex<Option<StreamSender<ClientStatistics>>>,
pub statistics_manager: Mutex<Option<StatisticsManager>>,
pub decoder_sink: Mutex<Option<DecoderSink>>,
pub decoder_source: Mutex<Option<DecoderSource>>,
pub decoder_callback: Mutex<Option<Box<dyn FnMut(Duration, &[u8]) -> bool + Send>>>,
pub head_pose_queue: RwLock<VecDeque<(Duration, Pose)>>,
pub last_good_head_pose: RwLock<Pose>,
pub view_params: RwLock<[ViewParams; 2]>,
Expand Down Expand Up @@ -274,9 +272,8 @@ fn connection_pipeline(

let video_receive_thread = thread::spawn({
let ctx = Arc::clone(&ctx);
let event_queue = Arc::clone(&event_queue);
move || {
let mut stream_corrupted = false;
let mut stream_corrupted = true;
while is_streaming(&ctx) {
let data = match video_receiver.recv(STREAMING_RECV_TIMEOUT) {
Ok(data) => data,
Expand All @@ -302,37 +299,14 @@ fn connection_pipeline(
}

if !stream_corrupted || !settings.connection.avoid_video_glitching {
if capabilities.external_decoder {
let mut head_pose = *ctx.last_good_head_pose.read();
for (timestamp, pose) in &*ctx.head_pose_queue.read() {
if *timestamp == header.timestamp {
head_pose = *pose;
break;
}
}

let view_params = &ctx.view_params.read();
event_queue.lock().push_back(ClientCoreEvent::FrameReady {
timestamp: header.timestamp,
view_params: [
ViewParams {
pose: head_pose * view_params[0].pose,
fov: view_params[0].fov,
},
ViewParams {
pose: head_pose * view_params[1].pose,
fov: view_params[1].fov,
},
],
nal: nal.to_vec(),
});
} else if !ctx
.decoder_sink
let submitted = ctx
.decoder_callback
.lock()
.as_mut()
.map(|sink| sink.push_nal(header.timestamp, nal))
.unwrap_or(false)
{
.map(|callback| callback(header.timestamp, nal))
.unwrap_or(false);

if !submitted {
stream_corrupted = true;
if let Some(sender) = &mut *ctx.control_sender.lock() {
sender.send(&ClientControlPacket::RequestIdr).ok();
Expand Down Expand Up @@ -490,39 +464,12 @@ fn connection_pipeline(

match maybe_packet {
Ok(ServerControlPacket::DecoderConfig(config)) => {
if capabilities.external_decoder {
event_queue
.lock()
.push_back(ClientCoreEvent::DecoderConfig {
codec: config.codec,
config_nal: config.config_buffer,
});
} else if ctx.decoder_sink.lock().is_none() {
Vixea marked this conversation as resolved.
Show resolved Hide resolved
let config = DecoderConfig {
event_queue
.lock()
.push_back(ClientCoreEvent::DecoderConfig {
codec: config.codec,
force_software_decoder: settings.video.force_software_decoder,
max_buffering_frames: settings.video.max_buffering_frames,
buffering_history_weight: settings.video.buffering_history_weight,
options: settings.video.mediacodec_extra_options.clone(),
config_buffer: config.config_buffer,
};

let (sink, source) = decoder::create_decoder(config, {
let ctx = Arc::clone(&ctx);
move |target_timestamp| {
if let Some(stats) = &mut *ctx.statistics_manager.lock() {
stats.report_frame_decoded(target_timestamp);
}
}
config_nal: config.config_buffer,
});

*ctx.decoder_sink.lock() = Some(sink);
*ctx.decoder_source.lock() = Some(source);

if let Some(sender) = &mut *ctx.control_sender.lock() {
sender.send(&ClientControlPacket::RequestIdr).ok();
}
}
}
Ok(ServerControlPacket::Restarting) => {
info!("{SERVER_RESTART_MESSAGE}");
Expand Down Expand Up @@ -600,9 +547,6 @@ fn connection_pipeline(
.lock()
.push_back(ClientCoreEvent::StreamingStopped);

*ctx.decoder_sink.lock() = None;
*ctx.decoder_source.lock() = None;

// Remove lock to allow threads to properly exit:
drop(connection_state_lock);

Expand Down
8 changes: 4 additions & 4 deletions alvr/client_core/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use alvr_common::anyhow::Result;
use alvr_session::{CodecType, MediacodecDataType};
use std::time::Duration;

#[derive(Clone, Default)]
#[derive(Clone, Default, PartialEq)]
pub struct DecoderConfig {
pub codec: CodecType,
pub force_software_decoder: bool,
Expand Down Expand Up @@ -37,21 +37,21 @@ pub struct DecoderSource {

impl DecoderSource {
/// If a frame is available, return the timestamp and the AHardwareBuffer.
pub fn get_frame(&mut self) -> Result<Option<(Duration, *mut std::ffi::c_void)>> {
pub fn get_frame(&mut self) -> Option<(Duration, *mut std::ffi::c_void)> {
#[cfg(target_os = "android")]
{
self.inner.dequeue_frame()
}
#[cfg(not(target_os = "android"))]
alvr_common::anyhow::bail!("Not implemented");
None
}
}

// report_frame_decoded: (target_timestamp: Duration) -> ()
#[allow(unused_variables)]
pub fn create_decoder(
config: DecoderConfig,
report_frame_decoded: impl Fn(Duration) + Send + 'static,
report_frame_decoded: impl Fn(Result<Duration>) + Send + Sync + 'static,
) -> (DecoderSink, DecoderSource) {
#[cfg(target_os = "android")]
{
Expand Down
99 changes: 35 additions & 64 deletions alvr/client_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

mod c_api;
mod connection;
mod decoder;
mod logging_backend;
mod platform;
mod sockets;
Expand All @@ -17,6 +16,7 @@ mod storage;
#[cfg(target_os = "android")]
mod audio;

pub mod decoder;
pub mod graphics;

use alvr_common::{
Expand Down Expand Up @@ -67,24 +67,12 @@ pub enum ClientCoreEvent {
codec: CodecType,
config_nal: Vec<u8>,
},
FrameReady {
timestamp: Duration,
view_params: [ViewParams; 2],
nal: Vec<u8>,
},
}

pub struct DecodedFrame {
pub timestamp: Duration,
pub view_params: [ViewParams; 2],
pub buffer_ptr: *mut std::ffi::c_void,
}

// Note: this struct may change without breaking network protocol changes
#[derive(Clone)]
pub struct ClientCapabilities {
pub default_view_resolution: UVec2,
pub external_decoder: bool,
pub refresh_rates: Vec<f32>,
pub foveated_encoding: bool,
pub encoder_high_profile: bool,
Expand Down Expand Up @@ -329,31 +317,45 @@ impl ClientCoreContext {
}
}

pub fn get_frame(&self) -> Option<DecodedFrame> {
dbg_client_core!("get_frame");
/// The callback should return true if the frame was successfully submitted to the decoder
pub fn set_decoder_input_callback(
&self,
callback: Box<dyn FnMut(Duration, &[u8]) -> bool + Send>,
) {
dbg_client_core!("set_decoder_input_callback");

let mut decoder_source_lock = self.connection_context.decoder_source.lock();
let decoder_source = decoder_source_lock.as_mut()?;
*self.connection_context.decoder_callback.lock() = Some(callback);

let (frame_timestamp, buffer_ptr) = match decoder_source.get_frame() {
Ok(maybe_pair) => maybe_pair?,
Err(e) => {
error!("Error getting frame, restarting connection: {}", e);
if let Some(sender) = &mut *self.connection_context.control_sender.lock() {
sender.send(&ClientControlPacket::RequestIdr).ok();
}
}

// The connection loop observes changes on this value
*self.connection_context.state.write() = ConnectionState::Disconnecting;
pub fn report_frame_decoded(&self, timestamp: Duration) {
zmerp marked this conversation as resolved.
Show resolved Hide resolved
dbg_client_core!("report_frame_decoded");

return None;
}
};
if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() {
stats.report_frame_decoded(timestamp);
}
}

pub fn report_fatal_decoder_error(&self, error: &str) {
error!("Fatal decoder error, restarting connection: {error}");

// The connection loop observes changes on this value
*self.connection_context.state.write() = ConnectionState::Disconnecting;
}

pub fn report_compositor_start(&self, timestamp: Duration) -> [ViewParams; 2] {
dbg_client_core!("report_compositor_start");

if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() {
stats.report_compositor_start(frame_timestamp);
stats.report_compositor_start(timestamp);
}

let mut head_pose = *self.connection_context.last_good_head_pose.read();
for (timestamp, pose) in &*self.connection_context.head_pose_queue.read() {
if *timestamp == frame_timestamp {
for (ts, pose) in &*self.connection_context.head_pose_queue.read() {
if *ts == timestamp {
head_pose = *pose;
break;
}
Expand All @@ -370,48 +372,17 @@ impl ClientCoreContext {
},
];

Some(DecodedFrame {
timestamp: frame_timestamp,
view_params,
buffer_ptr,
})
}

/// Call only with external decoder
pub fn request_idr(&self) {
dbg_client_core!("request_idr");

if let Some(sender) = &mut *self.connection_context.control_sender.lock() {
sender.send(&ClientControlPacket::RequestIdr).ok();
}
}

/// Call only with external decoder
pub fn report_frame_decoded(&self, target_timestamp: Duration) {
dbg_client_core!("report_frame_decoded");

if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() {
stats.report_frame_decoded(target_timestamp);
}
}

/// Call only with external decoder
pub fn report_compositor_start(&self, target_timestamp: Duration) {
dbg_client_core!("report_compositor_start");

if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() {
stats.report_compositor_start(target_timestamp);
}
view_params
}

pub fn report_submit(&self, target_timestamp: Duration, vsync_queue: Duration) {
pub fn report_submit(&self, timestamp: Duration, vsync_queue: Duration) {
dbg_client_core!("report_submit");

if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() {
stats.report_submit(target_timestamp, vsync_queue);
stats.report_submit(timestamp, vsync_queue);

if let Some(sender) = &mut *self.connection_context.statistics_sender.lock() {
if let Some(stats) = stats.summary(target_timestamp) {
if let Some(stats) = stats.summary(timestamp) {
sender.send_header(&stats).ok();
} else {
warn!("Statistics summary not ready!");
Expand Down
Loading
Loading