Skip to content

Commit

Permalink
refactor(client): ♻️ Move decoder out of ClientCoreContext; fix IDR r…
Browse files Browse the repository at this point in the history
…ace condition (#2421)
  • Loading branch information
zmerp authored Sep 28, 2024
1 parent 18bc500 commit 370455d
Show file tree
Hide file tree
Showing 9 changed files with 389 additions and 351 deletions.
340 changes: 226 additions & 114 deletions alvr/client_core/src/c_api.rs

Large diffs are not rendered by default.

80 changes: 12 additions & 68 deletions alvr/client_core/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#![allow(clippy::if_same_then_else)]

use crate::{
decoder::{self, DecoderConfig, DecoderSink, DecoderSource},
logging_backend::{LogMirrorData, LOG_CHANNEL_SENDER},
platform,
sockets::AnnouncerSocket,
Expand Down Expand Up @@ -64,8 +63,7 @@ pub struct ConnectionContext {
pub tracking_sender: Mutex<Option<StreamSender<Tracking>>>,
pub statistics_sender: Mutex<Option<StreamSender<ClientStatistics>>>,
pub statistics_manager: Mutex<Option<StatisticsManager>>,
pub decoder_sink: Mutex<Option<DecoderSink>>,
pub decoder_source: Mutex<Option<DecoderSource>>,
pub decoder_callback: Mutex<Option<Box<dyn FnMut(Duration, &[u8]) -> bool + Send>>>,
pub head_pose_queue: RwLock<VecDeque<(Duration, Pose)>>,
pub last_good_head_pose: RwLock<Pose>,
pub view_params: RwLock<[ViewParams; 2]>,
Expand Down Expand Up @@ -274,9 +272,8 @@ fn connection_pipeline(

let video_receive_thread = thread::spawn({
let ctx = Arc::clone(&ctx);
let event_queue = Arc::clone(&event_queue);
move || {
let mut stream_corrupted = false;
let mut stream_corrupted = true;
while is_streaming(&ctx) {
let data = match video_receiver.recv(STREAMING_RECV_TIMEOUT) {
Ok(data) => data,
Expand All @@ -302,37 +299,14 @@ fn connection_pipeline(
}

if !stream_corrupted || !settings.connection.avoid_video_glitching {
if capabilities.external_decoder {
let mut head_pose = *ctx.last_good_head_pose.read();
for (timestamp, pose) in &*ctx.head_pose_queue.read() {
if *timestamp == header.timestamp {
head_pose = *pose;
break;
}
}

let view_params = &ctx.view_params.read();
event_queue.lock().push_back(ClientCoreEvent::FrameReady {
timestamp: header.timestamp,
view_params: [
ViewParams {
pose: head_pose * view_params[0].pose,
fov: view_params[0].fov,
},
ViewParams {
pose: head_pose * view_params[1].pose,
fov: view_params[1].fov,
},
],
nal: nal.to_vec(),
});
} else if !ctx
.decoder_sink
let submitted = ctx
.decoder_callback
.lock()
.as_mut()
.map(|sink| sink.push_nal(header.timestamp, nal))
.unwrap_or(false)
{
.map(|callback| callback(header.timestamp, nal))
.unwrap_or(false);

if !submitted {
stream_corrupted = true;
if let Some(sender) = &mut *ctx.control_sender.lock() {
sender.send(&ClientControlPacket::RequestIdr).ok();
Expand Down Expand Up @@ -490,39 +464,12 @@ fn connection_pipeline(

match maybe_packet {
Ok(ServerControlPacket::DecoderConfig(config)) => {
if capabilities.external_decoder {
event_queue
.lock()
.push_back(ClientCoreEvent::DecoderConfig {
codec: config.codec,
config_nal: config.config_buffer,
});
} else if ctx.decoder_sink.lock().is_none() {
let config = DecoderConfig {
event_queue
.lock()
.push_back(ClientCoreEvent::DecoderConfig {
codec: config.codec,
force_software_decoder: settings.video.force_software_decoder,
max_buffering_frames: settings.video.max_buffering_frames,
buffering_history_weight: settings.video.buffering_history_weight,
options: settings.video.mediacodec_extra_options.clone(),
config_buffer: config.config_buffer,
};

let (sink, source) = decoder::create_decoder(config, {
let ctx = Arc::clone(&ctx);
move |target_timestamp| {
if let Some(stats) = &mut *ctx.statistics_manager.lock() {
stats.report_frame_decoded(target_timestamp);
}
}
config_nal: config.config_buffer,
});

*ctx.decoder_sink.lock() = Some(sink);
*ctx.decoder_source.lock() = Some(source);

if let Some(sender) = &mut *ctx.control_sender.lock() {
sender.send(&ClientControlPacket::RequestIdr).ok();
}
}
}
Ok(ServerControlPacket::Restarting) => {
info!("{SERVER_RESTART_MESSAGE}");
Expand Down Expand Up @@ -600,9 +547,6 @@ fn connection_pipeline(
.lock()
.push_back(ClientCoreEvent::StreamingStopped);

*ctx.decoder_sink.lock() = None;
*ctx.decoder_source.lock() = None;

// Remove lock to allow threads to properly exit:
drop(connection_state_lock);

Expand Down
8 changes: 4 additions & 4 deletions alvr/client_core/src/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use alvr_common::anyhow::Result;
use alvr_session::{CodecType, MediacodecDataType};
use std::time::Duration;

#[derive(Clone, Default)]
#[derive(Clone, Default, PartialEq)]
pub struct DecoderConfig {
pub codec: CodecType,
pub force_software_decoder: bool,
Expand Down Expand Up @@ -37,21 +37,21 @@ pub struct DecoderSource {

impl DecoderSource {
/// If a frame is available, return the timestamp and the AHardwareBuffer.
pub fn get_frame(&mut self) -> Result<Option<(Duration, *mut std::ffi::c_void)>> {
pub fn get_frame(&mut self) -> Option<(Duration, *mut std::ffi::c_void)> {
#[cfg(target_os = "android")]
{
self.inner.dequeue_frame()
}
#[cfg(not(target_os = "android"))]
alvr_common::anyhow::bail!("Not implemented");
None
}
}

// report_frame_decoded: (target_timestamp: Duration) -> ()
#[allow(unused_variables)]
pub fn create_decoder(
config: DecoderConfig,
report_frame_decoded: impl Fn(Duration) + Send + 'static,
report_frame_decoded: impl Fn(Result<Duration>) + Send + Sync + 'static,
) -> (DecoderSink, DecoderSource) {
#[cfg(target_os = "android")]
{
Expand Down
99 changes: 35 additions & 64 deletions alvr/client_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

mod c_api;
mod connection;
mod decoder;
mod logging_backend;
mod platform;
mod sockets;
Expand All @@ -17,6 +16,7 @@ mod storage;
#[cfg(target_os = "android")]
mod audio;

pub mod decoder;
pub mod graphics;

use alvr_common::{
Expand Down Expand Up @@ -67,24 +67,12 @@ pub enum ClientCoreEvent {
codec: CodecType,
config_nal: Vec<u8>,
},
FrameReady {
timestamp: Duration,
view_params: [ViewParams; 2],
nal: Vec<u8>,
},
}

pub struct DecodedFrame {
pub timestamp: Duration,
pub view_params: [ViewParams; 2],
pub buffer_ptr: *mut std::ffi::c_void,
}

// Note: this struct may change without breaking network protocol changes
#[derive(Clone)]
pub struct ClientCapabilities {
pub default_view_resolution: UVec2,
pub external_decoder: bool,
pub refresh_rates: Vec<f32>,
pub foveated_encoding: bool,
pub encoder_high_profile: bool,
Expand Down Expand Up @@ -329,31 +317,45 @@ impl ClientCoreContext {
}
}

pub fn get_frame(&self) -> Option<DecodedFrame> {
dbg_client_core!("get_frame");
/// The callback should return true if the frame was successfully submitted to the decoder
pub fn set_decoder_input_callback(
&self,
callback: Box<dyn FnMut(Duration, &[u8]) -> bool + Send>,
) {
dbg_client_core!("set_decoder_input_callback");

let mut decoder_source_lock = self.connection_context.decoder_source.lock();
let decoder_source = decoder_source_lock.as_mut()?;
*self.connection_context.decoder_callback.lock() = Some(callback);

let (frame_timestamp, buffer_ptr) = match decoder_source.get_frame() {
Ok(maybe_pair) => maybe_pair?,
Err(e) => {
error!("Error getting frame, restarting connection: {}", e);
if let Some(sender) = &mut *self.connection_context.control_sender.lock() {
sender.send(&ClientControlPacket::RequestIdr).ok();
}
}

// The connection loop observes changes on this value
*self.connection_context.state.write() = ConnectionState::Disconnecting;
pub fn report_frame_decoded(&self, timestamp: Duration) {
dbg_client_core!("report_frame_decoded");

return None;
}
};
if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() {
stats.report_frame_decoded(timestamp);
}
}

pub fn report_fatal_decoder_error(&self, error: &str) {
error!("Fatal decoder error, restarting connection: {error}");

// The connection loop observes changes on this value
*self.connection_context.state.write() = ConnectionState::Disconnecting;
}

pub fn report_compositor_start(&self, timestamp: Duration) -> [ViewParams; 2] {
dbg_client_core!("report_compositor_start");

if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() {
stats.report_compositor_start(frame_timestamp);
stats.report_compositor_start(timestamp);
}

let mut head_pose = *self.connection_context.last_good_head_pose.read();
for (timestamp, pose) in &*self.connection_context.head_pose_queue.read() {
if *timestamp == frame_timestamp {
for (ts, pose) in &*self.connection_context.head_pose_queue.read() {
if *ts == timestamp {
head_pose = *pose;
break;
}
Expand All @@ -370,48 +372,17 @@ impl ClientCoreContext {
},
];

Some(DecodedFrame {
timestamp: frame_timestamp,
view_params,
buffer_ptr,
})
}

/// Call only with external decoder
pub fn request_idr(&self) {
dbg_client_core!("request_idr");

if let Some(sender) = &mut *self.connection_context.control_sender.lock() {
sender.send(&ClientControlPacket::RequestIdr).ok();
}
}

/// Call only with external decoder
pub fn report_frame_decoded(&self, target_timestamp: Duration) {
dbg_client_core!("report_frame_decoded");

if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() {
stats.report_frame_decoded(target_timestamp);
}
}

/// Call only with external decoder
pub fn report_compositor_start(&self, target_timestamp: Duration) {
dbg_client_core!("report_compositor_start");

if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() {
stats.report_compositor_start(target_timestamp);
}
view_params
}

pub fn report_submit(&self, target_timestamp: Duration, vsync_queue: Duration) {
pub fn report_submit(&self, timestamp: Duration, vsync_queue: Duration) {
dbg_client_core!("report_submit");

if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() {
stats.report_submit(target_timestamp, vsync_queue);
stats.report_submit(timestamp, vsync_queue);

if let Some(sender) = &mut *self.connection_context.statistics_sender.lock() {
if let Some(stats) = stats.summary(target_timestamp) {
if let Some(stats) = stats.summary(timestamp) {
sender.send_header(&stats).ok();
} else {
warn!("Statistics summary not ready!");
Expand Down
Loading

0 comments on commit 370455d

Please sign in to comment.