diff --git a/alvr/client_core/src/connection.rs b/alvr/client_core/src/connection.rs index 649ce83b5f..a184d72a47 100644 --- a/alvr/client_core/src/connection.rs +++ b/alvr/client_core/src/connection.rs @@ -11,7 +11,7 @@ use crate::{ }; use alvr_audio::AudioDevice; use alvr_common::{ - debug, error, info, + dbg_connection, debug, error, info, parking_lot::{Condvar, Mutex, RwLock}, wait_rwlock, warn, AnyhowToCon, ConResult, ConnectionError, ConnectionState, LifecycleState, ALVR_VERSION, @@ -94,6 +94,8 @@ pub fn connection_lifecycle_loop( lifecycle_state: Arc>, event_queue: Arc>>, ) { + dbg_connection!("connection_lifecycle_loop: Begin"); + set_hud_message(&event_queue, INITIAL_MESSAGE); while *lifecycle_state.read() != LifecycleState::ShuttingDown { @@ -117,6 +119,8 @@ pub fn connection_lifecycle_loop( thread::sleep(CONNECTION_RETRY_INTERVAL); } + + dbg_connection!("connection_lifecycle_loop: End"); } fn connection_pipeline( @@ -125,6 +129,8 @@ fn connection_pipeline( lifecycle_state: Arc>, event_queue: Arc>>, ) -> ConResult { + dbg_connection!("connection_pipeline: Begin"); + let (mut proto_control_socket, server_ip) = { let config = Config::load(); let announcer_socket = AnnouncerSocket::new(&config.hostname).to_con()?; @@ -158,6 +164,7 @@ fn connection_pipeline( .input_sample_rate() .to_con()?; + dbg_connection!("connection_pipeline: Send stream capabilities"); proto_control_socket .send(&ClientConnectionResult::ConnectionAccepted { client_protocol_id: alvr_common::protocol_id_u64(), @@ -179,6 +186,7 @@ fn connection_pipeline( .to_con()?; let config_packet = proto_control_socket.recv::(HANDSHAKE_ACTION_TIMEOUT)?; + dbg_connection!("connection_pipeline: stream config received"); let (settings, negotiated_config) = alvr_packets::decode_stream_config(&config_packet).to_con()?; @@ -224,6 +232,7 @@ fn connection_pipeline( } } + dbg_connection!("connection_pipeline: create StreamSocket"); let stream_socket_builder = StreamSocketBuilder::listen_for_server( Duration::from_secs(1), settings.connection.stream_port, @@ -234,12 +243,14 @@ fn connection_pipeline( ) .to_con()?; + dbg_connection!("connection_pipeline: Send StreamReady"); if let Err(e) = control_sender.send(&ClientControlPacket::StreamReady) { info!("Server disconnected. Cause: {e:?}"); set_hud_message(&event_queue, SERVER_DISCONNECTED_MESSAGE); return Ok(()); } + dbg_connection!("connection_pipeline: accept connection"); let mut stream_socket = stream_socket_builder.accept_from_server( server_ip, settings.connection.stream_port, @@ -558,6 +569,8 @@ fn connection_pipeline( *connection_state_lock = ConnectionState::Streaming; + dbg_connection!("connection_pipeline: Unlock streams"); + // Make sure IPD and FoV are resent after reconnection // todo: send this data as part of the connection handshake ctx.view_params_queue.write().clear(); @@ -582,6 +595,8 @@ fn connection_pipeline( // Remove lock to allow threads to properly exit: drop(connection_state_lock); + dbg_connection!("connection_pipeline: Destroying streams"); + video_receive_thread.join().ok(); game_audio_thread.join().ok(); microphone_thread.join().ok(); @@ -590,5 +605,7 @@ fn connection_pipeline( control_receive_thread.join().ok(); stream_receive_thread.join().ok(); + dbg_connection!("connection_pipeline: End"); + Ok(()) } diff --git a/alvr/client_core/src/lib.rs b/alvr/client_core/src/lib.rs index 6b99a171e5..3a1e21cffa 100644 --- a/alvr/client_core/src/lib.rs +++ b/alvr/client_core/src/lib.rs @@ -20,7 +20,7 @@ mod audio; pub mod graphics; use alvr_common::{ - error, + dbg_client_core, error, glam::{UVec2, Vec2, Vec3}, parking_lot::{Mutex, RwLock}, warn, ConnectionState, DeviceMotion, LifecycleState, Pose, HEAD_ID, @@ -105,6 +105,8 @@ pub struct ClientCoreContext { impl ClientCoreContext { pub fn new(capabilities: ClientCapabilities) -> Self { + dbg_client_core!("Create"); + // Make sure to reset config in case of version compat mismatch. if Config::load().protocol_id != alvr_common::protocol_id() { // NB: Config::default() sets the current protocol ID @@ -112,9 +114,11 @@ impl ClientCoreContext { } #[cfg(target_os = "android")] - platform::try_get_permission(platform::MICROPHONE_PERMISSION); - #[cfg(target_os = "android")] - platform::set_wifi_lock(true); + { + dbg_client_core!("Getting permissions"); + platform::try_get_permission(platform::MICROPHONE_PERMISSION); + platform::set_wifi_lock(true); + } let lifecycle_state = Arc::new(RwLock::new(LifecycleState::Idle)); let event_queue = Arc::new(Mutex::new(VecDeque::new())); @@ -142,10 +146,14 @@ impl ClientCoreContext { } pub fn resume(&self) { + dbg_client_core!("resume"); + *self.lifecycle_state.write() = LifecycleState::Resumed; } pub fn pause(&self) { + dbg_client_core!("pause"); + let mut connection_state_lock = self.connection_context.state.write(); *self.lifecycle_state.write() = LifecycleState::Idle; @@ -160,10 +168,14 @@ impl ClientCoreContext { } pub fn poll_event(&self) -> Option { + dbg_client_core!("poll_event"); + self.event_queue.lock().pop_front() } pub fn send_battery(&self, device_id: u64, gauge_value: f32, is_plugged: bool) { + dbg_client_core!("send_battery"); + if let Some(sender) = &mut *self.connection_context.control_sender.lock() { sender .send(&ClientControlPacket::Battery(BatteryInfo { @@ -176,12 +188,16 @@ impl ClientCoreContext { } pub fn send_playspace(&self, area: Option) { + dbg_client_core!("send_playspace"); + if let Some(sender) = &mut *self.connection_context.control_sender.lock() { sender.send(&ClientControlPacket::PlayspaceSync(area)).ok(); } } pub fn send_active_interaction_profile(&self, device_id: u64, profile_id: u64) { + dbg_client_core!("send_active_interaction_profile"); + if let Some(sender) = &mut *self.connection_context.control_sender.lock() { sender .send(&ClientControlPacket::ActiveInteractionProfile { @@ -193,6 +209,8 @@ impl ClientCoreContext { } pub fn send_custom_interaction_profile(&self, device_id: u64, input_ids: HashSet) { + dbg_client_core!("send_custom_interaction_profile"); + if let Some(sender) = &mut *self.connection_context.control_sender.lock() { sender .send(&alvr_packets::encode_reserved_client_control_packet( @@ -206,6 +224,8 @@ impl ClientCoreContext { } pub fn send_buttons(&self, entries: Vec) { + dbg_client_core!("send_buttons"); + if let Some(sender) = &mut *self.connection_context.control_sender.lock() { sender.send(&ClientControlPacket::Buttons(entries)).ok(); } @@ -219,6 +239,8 @@ impl ClientCoreContext { hand_skeletons: [Option<[Pose; 26]>; 2], face_data: FaceData, ) { + dbg_client_core!("send_tracking"); + let last_ipd = { let mut view_params_queue_lock = self.connection_context.view_params_queue.write(); @@ -281,6 +303,8 @@ impl ClientCoreContext { } pub fn get_head_prediction_offset(&self) -> Duration { + dbg_client_core!("get_head_prediction_offset"); + if let Some(stats) = &*self.connection_context.statistics_manager.lock() { stats.average_total_pipeline_latency() } else { @@ -289,6 +313,8 @@ impl ClientCoreContext { } pub fn get_tracker_prediction_offset(&self) -> Duration { + dbg_client_core!("get_tracker_prediction_offset"); + if let Some(stats) = &*self.connection_context.statistics_manager.lock() { stats.tracker_prediction_offset() } else { @@ -297,6 +323,8 @@ impl ClientCoreContext { } pub fn get_frame(&self) -> Option { + dbg_client_core!("get_frame"); + let mut decoder_source_lock = self.connection_context.decoder_source.lock(); let decoder_source = decoder_source_lock.as_mut()?; @@ -333,6 +361,8 @@ impl ClientCoreContext { /// Call only with external decoder pub fn request_idr(&self) { + dbg_client_core!("request_idr"); + if let Some(sender) = &mut *self.connection_context.control_sender.lock() { sender.send(&ClientControlPacket::RequestIdr).ok(); } @@ -340,6 +370,8 @@ impl ClientCoreContext { /// Call only with external decoder pub fn report_frame_decoded(&self, target_timestamp: Duration) { + dbg_client_core!("report_frame_decoded"); + if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() { stats.report_frame_decoded(target_timestamp); } @@ -347,12 +379,16 @@ impl ClientCoreContext { /// Call only with external decoder pub fn report_compositor_start(&self, target_timestamp: Duration) { + dbg_client_core!("report_compositor_start"); + if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() { stats.report_compositor_start(target_timestamp); } } pub fn report_submit(&self, target_timestamp: Duration, vsync_queue: Duration) { + dbg_client_core!("report_submit"); + if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() { stats.report_submit(target_timestamp, vsync_queue); @@ -369,6 +405,8 @@ impl ClientCoreContext { impl Drop for ClientCoreContext { fn drop(&mut self) { + dbg_client_core!("Drop"); + *self.lifecycle_state.write() = LifecycleState::ShuttingDown; if let Some(thread) = self.connection_thread.lock().take() { diff --git a/alvr/server_core/src/connection.rs b/alvr/server_core/src/connection.rs index fc1f85e7f2..a173df3999 100644 --- a/alvr/server_core/src/connection.rs +++ b/alvr/server_core/src/connection.rs @@ -11,7 +11,7 @@ use crate::{ }; use alvr_audio::AudioDevice; use alvr_common::{ - con_bail, debug, error, + con_bail, dbg_connection, debug, error, glam::{Quat, UVec2, Vec2, Vec3}, info, parking_lot::{Condvar, Mutex, RwLock}, @@ -240,6 +240,8 @@ pub fn contruct_openvr_config(session: &SessionConfig) -> OpenvrConfig { // Alternate connection trials with manual IPs and clients discovered on the local network pub fn handshake_loop(ctx: Arc, lifecycle_state: Arc>) { + dbg_connection!("handshake_loop: Begin"); + let mut welcome_socket = match WelcomeSocket::new() { Ok(socket) => socket, Err(e) => { @@ -249,6 +251,8 @@ pub fn handshake_loop(ctx: Arc, lifecycle_state: Arc, lifecycle_state: Arc clients, Err(e) => { @@ -347,10 +353,14 @@ pub fn handshake_loop(ctx: Arc, lifecycle_state: Arc>, mut client_ips: HashMap, ) -> ConResult { + dbg_connection!("try_connect: Finding client and creating control socket"); + let (proto_socket, client_ip) = ProtoControlSocket::connect_to( Duration::from_secs(1), PeerType::AnyClient(client_ips.keys().cloned().collect()), @@ -367,6 +379,8 @@ fn try_connect( con_bail!("unreachable"); }; + dbg_connection!("try_connect: Pushing new client connection thread"); + ctx.connection_threads.lock().push(thread::spawn({ let ctx = Arc::clone(&ctx); move || { @@ -405,11 +419,14 @@ fn connection_pipeline( client_hostname: String, client_ip: IpAddr, ) -> ConResult { + dbg_connection!("connection_pipeline: Begin"); + // This session lock will make sure settings and client list cannot be changed while connecting // to thos client, no other client can connect until handshake is finished. It will then be // temporarily relocked while shutting down the threads. let mut session_manager_lock = SESSION_MANAGER.write(); + dbg_connection!("connection_pipeline: Setting client state in session"); session_manager_lock.update_client_list( client_hostname.clone(), ClientListAction::SetConnectionState(ConnectionState::Connecting), @@ -421,6 +438,7 @@ fn connection_pipeline( let disconnect_notif = Arc::new(Condvar::new()); + dbg_connection!("connection_pipeline: Getting client status packet"); let connection_result = match proto_socket.recv(HANDSHAKE_ACTION_TIMEOUT) { Ok(r) => r, Err(ConnectionError::TryAgain(e)) => { @@ -465,6 +483,8 @@ fn connection_pipeline( con_bail!("Only streaming clients are supported for now"); }; + dbg_connection!("connection_pipeline: setting up negotiated streaming config"); + let settings = session_manager_lock.settings().clone(); fn get_view_res(config: FrameSize, default_res: UVec2) -> UVec2 { @@ -607,6 +627,7 @@ fn connection_pipeline( 0 }; + dbg_connection!("connection_pipeline: send streaming config"); let stream_config_packet = alvr_packets::encode_stream_config( session_manager_lock.session(), &NegotiatedStreamingConfig { @@ -641,6 +662,7 @@ fn connection_pipeline( crate::notify_restart_driver(); } + dbg_connection!("connection_pipeline: Send StartStream packet"); control_sender .send(&ServerControlPacket::StartStream) .to_con()?; @@ -649,6 +671,8 @@ fn connection_pipeline( if !matches!(signal, ClientControlPacket::StreamReady) { con_bail!("Got unexpected packet waiting for stream ack"); } + dbg_connection!("connection_pipeline: Got StreamReady packet"); + *ctx.statistics_manager.lock() = Some(StatisticsManager::new( settings.connection.statistics_history_size, Duration::from_secs_f32(1.0 / fps), @@ -661,6 +685,7 @@ fn connection_pipeline( *ctx.bitrate_manager.lock() = BitrateManager::new(settings.video.bitrate.history_size, fps); + dbg_connection!("connection_pipeline: StreamSocket connect_to_client"); let mut stream_socket = StreamSocketBuilder::connect_to_client( HANDSHAKE_ACTION_TIMEOUT, client_ip, @@ -1370,6 +1395,7 @@ fn connection_pipeline( } if settings.extra.capture.startup_video_recording { + info!("Creating recording file"); crate::create_recording_file(&ctx, session_manager_lock.settings()); } @@ -1382,7 +1408,9 @@ fn connection_pipeline( .send(ServerCoreEvent::ClientConnected) .ok(); + dbg_connection!("connection_pipeline: handshake finished; unlocking streams"); alvr_common::wait_rwlock(&disconnect_notif, &mut session_manager_lock); + dbg_connection!("connection_pipeline: Begin connection shutdown"); // This requests shutdown from threads *ctx.video_channel_sender.lock() = None; @@ -1414,6 +1442,7 @@ fn connection_pipeline( drop(session_manager_lock); // Ensure shutdown of threads + dbg_connection!("connection_pipeline: Shutdown threads"); video_send_thread.join().ok(); game_audio_thread.join().ok(); microphone_thread.join().ok(); @@ -1428,5 +1457,7 @@ fn connection_pipeline( .send(ServerCoreEvent::ClientDisconnected) .ok(); + dbg_connection!("connection_pipeline: End"); + Ok(()) } diff --git a/alvr/server_core/src/lib.rs b/alvr/server_core/src/lib.rs index 7a5dcb4794..9b9e89dc2d 100644 --- a/alvr/server_core/src/lib.rs +++ b/alvr/server_core/src/lib.rs @@ -18,7 +18,7 @@ pub use tracking::get_hand_skeleton_offsets; use crate::connection::VideoPacket; use alvr_common::{ - error, + dbg_server_core, error, glam::Vec2, once_cell::sync::Lazy, parking_lot::{Mutex, RwLock}, @@ -182,6 +182,8 @@ pub struct ServerCoreContext { impl ServerCoreContext { pub fn new() -> (Self, mpsc::Receiver) { + dbg_server_core!("Creating"); + if SESSION_MANAGER .read() .settings() @@ -229,6 +231,8 @@ impl ServerCoreContext { } pub fn start_connection(&self) { + dbg_server_core!("start_connection"); + // Note: Idle state is not used on the server side *self.lifecycle_state.write() = LifecycleState::Resumed; @@ -240,6 +244,8 @@ impl ServerCoreContext { } pub fn send_haptics(&self, haptics: Haptics) { + dbg_server_core!("send_haptics"); + let haptics_config = { let session_manager_lock = SESSION_MANAGER.read(); @@ -274,6 +280,8 @@ impl ServerCoreContext { } pub fn set_video_config_nals(&self, config_buffer: Vec, codec: CodecType) { + dbg_server_core!("set_video_config_nals"); + if let Some(sender) = &*self.connection_context.video_mirror_sender.lock() { sender.send(config_buffer.clone()).ok(); } @@ -289,6 +297,8 @@ impl ServerCoreContext { } pub fn send_video_nal(&self, target_timestamp: Duration, nal_buffer: Vec, is_idr: bool) { + dbg_server_core!("send_video_nal"); + // start in the corrupts state, the client didn't receive the initial IDR yet. static STREAM_CORRUPTED: AtomicBool = AtomicBool::new(true); static LAST_IDR_INSTANT: Lazy> = Lazy::new(|| Mutex::new(Instant::now())); @@ -373,6 +383,8 @@ impl ServerCoreContext { } pub fn get_dynamic_encoder_params(&self) -> Option { + dbg_server_core!("get_dynamic_encoder_params"); + let pair = { let session_manager_lock = SESSION_MANAGER.read(); self.connection_context @@ -393,12 +405,16 @@ impl ServerCoreContext { } pub fn report_composed(&self, target_timestamp: Duration, offset: Duration) { + dbg_server_core!("report_composed"); + if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() { stats.report_frame_composed(target_timestamp, offset); } } pub fn report_present(&self, target_timestamp: Duration, offset: Duration) { + dbg_server_core!("report_present"); + if let Some(stats) = &mut *self.connection_context.statistics_manager.lock() { stats.report_frame_present(target_timestamp, offset); } @@ -417,6 +433,8 @@ impl ServerCoreContext { } pub fn duration_until_next_vsync(&self) -> Option { + dbg_server_core!("duration_until_next_vsync"); + self.connection_context .statistics_manager .lock() @@ -425,6 +443,8 @@ impl ServerCoreContext { } pub fn restart(self) { + dbg_server_core!("restart"); + self.is_restarting.set(true); // drop is called here for self @@ -433,9 +453,12 @@ impl ServerCoreContext { impl Drop for ServerCoreContext { fn drop(&mut self) { + dbg_server_core!("Drop"); + // Invoke connection runtimes shutdown *self.lifecycle_state.write() = LifecycleState::ShuttingDown; + dbg_server_core!("Setting clients as Disconnecting"); { let mut session_manager_lock = SESSION_MANAGER.write(); @@ -459,22 +482,26 @@ impl Drop for ServerCoreContext { } } + dbg_server_core!("Joining connection thread"); if let Some(thread) = self.connection_thread.write().take() { thread.join().ok(); } // apply openvr config for the next launch + dbg_server_core!("Setting restart settings chache"); { let mut session_manager_lock = SESSION_MANAGER.write(); session_manager_lock.session_mut().openvr_config = connection::contruct_openvr_config(session_manager_lock.session()); } + dbg_server_core!("Restore drivers registration backup"); if let Some(backup) = SESSION_MANAGER.write().session_mut().drivers_backup.take() { alvr_server_io::driver_registration(&backup.other_paths, true).ok(); alvr_server_io::driver_registration(&[backup.alvr_path], false).ok(); } + // todo: check if this is still needed while SESSION_MANAGER .read() .client_list()