diff --git a/src/session/data.rs b/src/session/data.rs index d237d3e..1283747 100644 --- a/src/session/data.rs +++ b/src/session/data.rs @@ -1,7 +1,8 @@ -use std::{net::Ipv4Addr, sync::Arc}; +use std::{net::Ipv4Addr, sync::Arc, task::Context, time::Duration}; use parking_lot::{RwLock, RwLockReadGuard}; use serde::Serialize; +use tokio::time::{interval_at, Instant, Interval, MissedTickBehavior}; use crate::{ database::entities::Player, @@ -30,8 +31,8 @@ use super::{ }; pub struct SessionData { - /// Extended session data for authenticated sessions - ext: RwLock>, + /// Extended session data, writable data + ext: RwLock, /// IP address associated with the session addr: Ipv4Addr, @@ -41,15 +42,119 @@ pub struct SessionData { association: Option, } +struct SessionDataExt { + /// Data for authorized sessions + auth: Option, + + /// Keep-alive data for the session + keep_alive: SessionDataKeepAlive, +} + +impl SessionDataExt { + fn new() -> Self { + Self { + auth: None, + keep_alive: SessionDataKeepAlive::new(), + } + } +} + +pub struct SessionDataKeepAlive { + /// Last time a keep-alive message was received through the tunnel + pub last_keep_alive: Instant, + + /// Time that has been granted as a grace period to allow the + /// session to go without a keep-alive message for + pub extended_grace: Duration, + + /// Interval for polling connection alive checks + pub keep_alive_interval: Interval, +} + +/// Delay between each keep-alive check +pub const KEEP_ALIVE_DELAY: Duration = Duration::from_secs(15); + +/// When this duration elapses between keep-alive checks for a connection +/// the connection is considered to be dead (4 missed keep-alive check intervals) +pub const KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(KEEP_ALIVE_DELAY.as_secs() * 4); + +impl SessionDataKeepAlive { + fn new() -> Self { + let now = Instant::now(); + + // Create the interval to track keep alive checking + let keep_alive_start = now + KEEP_ALIVE_DELAY; + let mut keep_alive_interval = interval_at(keep_alive_start, KEEP_ALIVE_DELAY); + + keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + + Self { + last_keep_alive: Instant::now(), + extended_grace: Duration::from_secs(0), + keep_alive_interval, + } + } +} + impl SessionData { + /// Creates new session data pub fn new(addr: Ipv4Addr, association: Option) -> Self { Self { - ext: Default::default(), + ext: RwLock::new(SessionDataExt::new()), addr, association, } } + /// Polls the keep alive check to see if its ready and if the connection is dead + pub fn poll_keep_alive_dead(&self, cx: &mut Context<'_>) -> bool { + let keep_alive = &mut self.ext.write().keep_alive; + + // Not ready to perform a keep-alive check + if !keep_alive.keep_alive_interval.poll_tick(cx).is_ready() { + return false; + } + + // Check the keep alive state + let now = Instant::now(); + let last_alive = keep_alive + .last_keep_alive + // Get time since last keep alive message + .duration_since(now) + // Remove current grace period from the elapsed time + .saturating_sub(keep_alive.extended_grace); + + // Connection to the client has timed out as no keep alive messages were + // given by the client + last_alive > KEEP_ALIVE_TIMEOUT + } + + /// Sets the connection as alive + pub fn set_alive(&self) { + let keep_alive = &mut self.ext.write().keep_alive; + + // Clear existing grace period + keep_alive.extended_grace = Duration::from_secs(0); + + // Mark current alive period + keep_alive.last_keep_alive = Instant::now(); + } + + /// Grants a grace period duration where the client is allowed to not send any keep-alive + /// messages and won't be timed-out for doing so + pub fn set_keep_alive_grace(&self, grace: Duration) { + let keep_alive = &mut self.ext.write().keep_alive; + let now = Instant::now() + .checked_add(grace) + .expect("reached limit of time"); + + // Delay next keep alive check + keep_alive.keep_alive_interval.reset_at(now); + + // Apply grace period to next check + keep_alive.extended_grace = grace; + } + pub fn get_addr(&self) -> Ipv4Addr { self.addr } @@ -59,16 +164,16 @@ impl SessionData { } // Read from the underlying session data - fn read(&self) -> RwLockReadGuard<'_, Option> { + fn read(&self) -> RwLockReadGuard<'_, SessionDataExt> { self.ext.read() } /// Writes to the underlying session data without publishing the changes fn write_silent(&self, update: F) -> Option where - F: FnOnce(&mut SessionDataExt) -> O, + F: FnOnce(&mut SessionDataAuth) -> O, { - self.ext.write().as_mut().map(update) + self.ext.write().auth.as_mut().map(update) } /// Writes to the underlying session data, publishes changes to @@ -76,25 +181,26 @@ impl SessionData { #[inline] fn write_publish(&self, update: F) -> Option where - F: FnOnce(&mut SessionDataExt) -> O, + F: FnOnce(&mut SessionDataAuth) -> O, { - self.ext.write().as_mut().map(|data| { + self.ext.write().auth.as_mut().map(|data| { let value = update(data); data.publish_update(); value }) } - /// Clears the underlying session data - pub fn clear(&self) { - self.ext.write().take(); + /// Clears the underlying authenticated session data + pub fn clear_auth(&self) { + self.ext.write().auth.take(); } /// Starts a session from the provided player association - pub fn start_session(&self, player: SessionPlayerAssociation) -> Arc { + pub fn set_auth(&self, player: SessionPlayerAssociation) -> Arc { self.ext .write() - .insert(SessionDataExt::new(player)) + .auth + .insert(SessionDataAuth::new(player)) // Obtain the player to return .player_assoc .player @@ -104,6 +210,7 @@ impl SessionData { /// Gets the currently authenticated player pub fn get_player(&self) -> Option> { self.read() + .auth .as_ref() .map(|value| value.player_assoc.player.clone()) } @@ -111,6 +218,7 @@ impl SessionData { /// Obtains the parts required to create a game player pub fn get_game_player_data(&self) -> Option<(Arc, Arc)> { self.read() + .auth .as_ref() .map(|value| (value.player_assoc.player.clone(), value.net.clone())) } @@ -134,7 +242,7 @@ impl SessionData { /// Obtains the network data for the session pub fn network_info(&self) -> Option> { - self.read().as_ref().map(|value| value.net.clone()) + self.read().auth.as_ref().map(|value| value.net.clone()) } /// Sets the game the session is currently apart of @@ -158,7 +266,7 @@ impl SessionData { pub fn get_game(&self) -> Option<(GameID, GameRef)> { let guard = self.read(); - let data = guard.as_ref()?; + let data = guard.auth.as_ref()?; let game_data = data.game.as_ref()?; let game_ref = match game_data.game_ref.upgrade() { @@ -178,7 +286,7 @@ impl SessionData { } pub fn get_lookup_response(&self) -> Option { - self.read().as_ref().map(|data| LookupResponse { + self.read().auth.as_ref().map(|data| LookupResponse { player: data.player_assoc.player.clone(), extended_data: data.ext_data(), }) @@ -196,7 +304,7 @@ impl SessionData { } /// Extended session data, present when the user is authenticated -struct SessionDataExt { +struct SessionDataAuth { /// Session -> Player association, currently authenticated player player_assoc: Arc, /// Networking information for current session @@ -207,7 +315,7 @@ struct SessionDataExt { subscribers: Vec, } -impl SessionDataExt { +impl SessionDataAuth { fn new(player: SessionPlayerAssociation) -> Self { Self { player_assoc: Arc::new(player), diff --git a/src/session/mod.rs b/src/session/mod.rs index 51bc4d4..ff8dd8b 100644 --- a/src/session/mod.rs +++ b/src/session/mod.rs @@ -258,6 +258,12 @@ impl SessionFuture<'_> { /// Polls the read state, the poll ready state returns whether /// the future should continue fn poll_read_state(&mut self, cx: &mut Context<'_>) -> Poll<()> { + // Poll checking if the connection has timed-out + if self.session.data.poll_keep_alive_dead(cx) { + self.stop = true; + return Poll::Ready(()); + } + match &mut self.read_state { ReadState::Recv => { // Try receive a packet from the write channel @@ -330,7 +336,7 @@ impl Drop for SessionFuture<'_> { fn drop(&mut self) { // Clear session data, speeds up process of ending the session // prevents session data being accessed while shutting down - self.session.data.clear(); + self.session.data.clear_auth(); } } diff --git a/src/session/models/util.rs b/src/session/models/util.rs index a677dd2..5f1c26f 100644 --- a/src/session/models/util.rs +++ b/src/session/models/util.rs @@ -1,6 +1,7 @@ use super::Port; use crate::{ config::{QosServerConfig, RuntimeConfig}, + session::data::KEEP_ALIVE_DELAY, utils::types::PlayerID, }; use bitflags::bitflags; @@ -89,7 +90,6 @@ impl TdfSerialize for TickerServer { /// Origin auth source? pub const AUTH_SOURCE: &str = "303107"; pub const BLAZE_VERSION: &str = "Blaze 3.15.08.0 (CL# 1905397)\n"; -pub const PING_PERIOD: &str = "15s"; /// Alias used for ping sites pub const PING_SITE_ALIAS: &str = "ea-sjc"; @@ -114,13 +114,15 @@ impl TdfSerialize for PreAuthResponse { ); w.tag_str_empty(b"CNGN"); + let ping_period = format!("{}s", KEEP_ALIVE_DELAY.as_secs()); + // Client configuration provided by the server w.group(b"CONF", |w| { w.tag_map_tuples( b"CONF", &[ // Client to server ping period - ("pingPeriod", PING_PERIOD), + ("pingPeriod", ping_period.as_str()), // VOIP headset update rate ("voipHeadsetUpdateRate", "1000"), // XLSP (Xbox Live Server Platform) diff --git a/src/session/routes/auth.rs b/src/session/routes/auth.rs index cf9de49..493bb68 100644 --- a/src/session/routes/auth.rs +++ b/src/session/routes/auth.rs @@ -64,7 +64,7 @@ pub async fn handle_login( let player = sessions.add_session(player, Arc::downgrade(&session)); // Update the session stored player - let player = session.data.start_session(player); + let player = session.data.set_auth(player); let session_token: String = sessions.create_token(player.id); @@ -100,7 +100,7 @@ pub async fn handle_silent_login( let player = sessions.add_session(player, Arc::downgrade(&session)); // Update the session stored player - let player = session.data.start_session(player); + let player = session.data.set_auth(player); Ok(Blaze(AuthResponse { player, @@ -137,7 +137,7 @@ pub async fn handle_origin_login( let player = sessions.add_session(player, Arc::downgrade(&session)); // Update the session stored player - let player = session.data.start_session(player); + let player = session.data.set_auth(player); let session_token: String = sessions.create_token(player.id); @@ -157,7 +157,7 @@ pub async fn handle_origin_login( /// Content: {} /// ``` pub async fn handle_logout(session: SessionLink) { - session.data.clear(); + session.data.clear_auth(); } // Skip formatting these entitlement creations @@ -363,7 +363,7 @@ pub async fn handle_create_account( // Create the session association let player = sessions.add_session(player, Arc::downgrade(&session)); - let player = session.data.start_session(player); + let player = session.data.set_auth(player); let session_token = sessions.create_token(player.id); diff --git a/src/session/routes/user_sessions.rs b/src/session/routes/user_sessions.rs index 17b5e23..10ff0e5 100644 --- a/src/session/routes/user_sessions.rs +++ b/src/session/routes/user_sessions.rs @@ -85,7 +85,7 @@ pub async fn handle_resume_session( } let player = sessions.add_session(player, Arc::downgrade(&session)); - let player = session.data.start_session(player); + let player = session.data.set_auth(player); Ok(Blaze(AuthResponse { player, diff --git a/src/session/routes/util.rs b/src/session/routes/util.rs index 9f8c721..ac91d82 100644 --- a/src/session/routes/util.rs +++ b/src/session/routes/util.rs @@ -121,7 +121,9 @@ pub async fn handle_post_auth( /// Content: {} /// ``` /// -pub async fn handle_ping() -> Blaze { +pub async fn handle_ping(session: SessionLink) -> Blaze { + session.data.set_alive(); + let server_time = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or(Duration::ZERO) @@ -420,12 +422,19 @@ fn data_config() -> TdfMap { /// } /// ``` pub async fn handle_suspend_user_ping( + session: SessionLink, Blaze(SuspendPingRequest { time_value }): Blaze, ) -> BlazeError { let res = match time_value.cmp(&90000000) { Ordering::Less => UtilError::SuspendPingTimeTooSmall, Ordering::Greater => UtilError::SuspendPingTimeTooLarge, - Ordering::Equal => UtilError::PingSuspended, + Ordering::Equal => { + session + .data + .set_keep_alive_grace(Duration::from_micros(time_value as u64)); + + UtilError::PingSuspended + } }; res.into() }