Skip to content

Commit

Permalink
feat: full keep-alive impl, separate session data and rename, support…
Browse files Browse the repository at this point in the history
… ping keep-alive grace period
  • Loading branch information
jacobtread committed Oct 13, 2024
1 parent 04c1632 commit 0a5566e
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 30 deletions.
146 changes: 127 additions & 19 deletions src/session/data.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use std::{net::Ipv4Addr, sync::Arc};
use std::{net::Ipv4Addr, sync::Arc, task::Context, time::Duration};

use parking_lot::{RwLock, RwLockReadGuard};
use serde::Serialize;
use tokio::time::{interval_at, Instant, Interval, MissedTickBehavior};

use crate::{
database::entities::Player,
Expand Down Expand Up @@ -30,8 +31,8 @@ use super::{
};

pub struct SessionData {
/// Extended session data for authenticated sessions
ext: RwLock<Option<SessionDataExt>>,
/// Extended session data, writable data
ext: RwLock<SessionDataExt>,

/// IP address associated with the session
addr: Ipv4Addr,
Expand All @@ -41,15 +42,119 @@ pub struct SessionData {
association: Option<AssociationId>,
}

struct SessionDataExt {
/// Data for authorized sessions
auth: Option<SessionDataAuth>,

/// Keep-alive data for the session
keep_alive: SessionDataKeepAlive,
}

impl SessionDataExt {
fn new() -> Self {
Self {
auth: None,
keep_alive: SessionDataKeepAlive::new(),
}
}
}

pub struct SessionDataKeepAlive {
/// Last time a keep-alive message was received through the tunnel
pub last_keep_alive: Instant,

/// Time that has been granted as a grace period to allow the
/// session to go without a keep-alive message for
pub extended_grace: Duration,

/// Interval for polling connection alive checks
pub keep_alive_interval: Interval,
}

/// Delay between each keep-alive check
pub const KEEP_ALIVE_DELAY: Duration = Duration::from_secs(15);

/// When this duration elapses between keep-alive checks for a connection
/// the connection is considered to be dead (4 missed keep-alive check intervals)
pub const KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(KEEP_ALIVE_DELAY.as_secs() * 4);

impl SessionDataKeepAlive {
fn new() -> Self {
let now = Instant::now();

// Create the interval to track keep alive checking
let keep_alive_start = now + KEEP_ALIVE_DELAY;
let mut keep_alive_interval = interval_at(keep_alive_start, KEEP_ALIVE_DELAY);

keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);

Self {
last_keep_alive: Instant::now(),
extended_grace: Duration::from_secs(0),
keep_alive_interval,
}
}
}

impl SessionData {
/// Creates new session data
pub fn new(addr: Ipv4Addr, association: Option<AssociationId>) -> Self {
Self {
ext: Default::default(),
ext: RwLock::new(SessionDataExt::new()),
addr,
association,
}
}

/// Polls the keep alive check to see if its ready and if the connection is dead
pub fn poll_keep_alive_dead(&self, cx: &mut Context<'_>) -> bool {
let keep_alive = &mut self.ext.write().keep_alive;

// Not ready to perform a keep-alive check
if !keep_alive.keep_alive_interval.poll_tick(cx).is_ready() {
return false;
}

// Check the keep alive state
let now = Instant::now();
let last_alive = keep_alive
.last_keep_alive
// Get time since last keep alive message
.duration_since(now)
// Remove current grace period from the elapsed time
.saturating_sub(keep_alive.extended_grace);

// Connection to the client has timed out as no keep alive messages were
// given by the client
last_alive > KEEP_ALIVE_TIMEOUT
}

/// Sets the connection as alive
pub fn set_alive(&self) {
let keep_alive = &mut self.ext.write().keep_alive;

// Clear existing grace period
keep_alive.extended_grace = Duration::from_secs(0);

// Mark current alive period
keep_alive.last_keep_alive = Instant::now();
}

/// Grants a grace period duration where the client is allowed to not send any keep-alive
/// messages and won't be timed-out for doing so
pub fn set_keep_alive_grace(&self, grace: Duration) {
let keep_alive = &mut self.ext.write().keep_alive;
let now = Instant::now()
.checked_add(grace)
.expect("reached limit of time");

// Delay next keep alive check
keep_alive.keep_alive_interval.reset_at(now);

// Apply grace period to next check
keep_alive.extended_grace = grace;
}

pub fn get_addr(&self) -> Ipv4Addr {
self.addr
}
Expand All @@ -59,42 +164,43 @@ impl SessionData {
}

// Read from the underlying session data
fn read(&self) -> RwLockReadGuard<'_, Option<SessionDataExt>> {
fn read(&self) -> RwLockReadGuard<'_, SessionDataExt> {
self.ext.read()
}

/// Writes to the underlying session data without publishing the changes
fn write_silent<F, O>(&self, update: F) -> Option<O>
where
F: FnOnce(&mut SessionDataExt) -> O,
F: FnOnce(&mut SessionDataAuth) -> O,
{
self.ext.write().as_mut().map(update)
self.ext.write().auth.as_mut().map(update)
}

/// Writes to the underlying session data, publishes changes to
/// subscribers
#[inline]
fn write_publish<F, O>(&self, update: F) -> Option<O>
where
F: FnOnce(&mut SessionDataExt) -> O,
F: FnOnce(&mut SessionDataAuth) -> O,
{
self.ext.write().as_mut().map(|data| {
self.ext.write().auth.as_mut().map(|data| {
let value = update(data);
data.publish_update();
value
})
}

/// Clears the underlying session data
pub fn clear(&self) {
self.ext.write().take();
/// Clears the underlying authenticated session data
pub fn clear_auth(&self) {
self.ext.write().auth.take();
}

/// Starts a session from the provided player association
pub fn start_session(&self, player: SessionPlayerAssociation) -> Arc<Player> {
pub fn set_auth(&self, player: SessionPlayerAssociation) -> Arc<Player> {
self.ext
.write()
.insert(SessionDataExt::new(player))
.auth
.insert(SessionDataAuth::new(player))
// Obtain the player to return
.player_assoc
.player
Expand All @@ -104,13 +210,15 @@ impl SessionData {
/// Gets the currently authenticated player
pub fn get_player(&self) -> Option<Arc<Player>> {
self.read()
.auth
.as_ref()
.map(|value| value.player_assoc.player.clone())
}

/// Obtains the parts required to create a game player
pub fn get_game_player_data(&self) -> Option<(Arc<Player>, Arc<NetData>)> {
self.read()
.auth
.as_ref()
.map(|value| (value.player_assoc.player.clone(), value.net.clone()))
}
Expand All @@ -134,7 +242,7 @@ impl SessionData {

/// Obtains the network data for the session
pub fn network_info(&self) -> Option<Arc<NetData>> {
self.read().as_ref().map(|value| value.net.clone())
self.read().auth.as_ref().map(|value| value.net.clone())
}

/// Sets the game the session is currently apart of
Expand All @@ -158,7 +266,7 @@ impl SessionData {
pub fn get_game(&self) -> Option<(GameID, GameRef)> {
let guard = self.read();

let data = guard.as_ref()?;
let data = guard.auth.as_ref()?;
let game_data = data.game.as_ref()?;

let game_ref = match game_data.game_ref.upgrade() {
Expand All @@ -178,7 +286,7 @@ impl SessionData {
}

pub fn get_lookup_response(&self) -> Option<LookupResponse> {
self.read().as_ref().map(|data| LookupResponse {
self.read().auth.as_ref().map(|data| LookupResponse {
player: data.player_assoc.player.clone(),
extended_data: data.ext_data(),
})
Expand All @@ -196,7 +304,7 @@ impl SessionData {
}

/// Extended session data, present when the user is authenticated
struct SessionDataExt {
struct SessionDataAuth {
/// Session -> Player association, currently authenticated player
player_assoc: Arc<SessionPlayerAssociation>,
/// Networking information for current session
Expand All @@ -207,7 +315,7 @@ struct SessionDataExt {
subscribers: Vec<SessionSubscription>,
}

impl SessionDataExt {
impl SessionDataAuth {
fn new(player: SessionPlayerAssociation) -> Self {
Self {
player_assoc: Arc::new(player),
Expand Down
8 changes: 7 additions & 1 deletion src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,12 @@ impl SessionFuture<'_> {
/// Polls the read state, the poll ready state returns whether
/// the future should continue
fn poll_read_state(&mut self, cx: &mut Context<'_>) -> Poll<()> {
// Poll checking if the connection has timed-out
if self.session.data.poll_keep_alive_dead(cx) {
self.stop = true;
return Poll::Ready(());
}

match &mut self.read_state {
ReadState::Recv => {
// Try receive a packet from the write channel
Expand Down Expand Up @@ -330,7 +336,7 @@ impl Drop for SessionFuture<'_> {
fn drop(&mut self) {
// Clear session data, speeds up process of ending the session
// prevents session data being accessed while shutting down
self.session.data.clear();
self.session.data.clear_auth();
}
}

Expand Down
6 changes: 4 additions & 2 deletions src/session/models/util.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::Port;
use crate::{
config::{QosServerConfig, RuntimeConfig},
session::data::KEEP_ALIVE_DELAY,
utils::types::PlayerID,
};
use bitflags::bitflags;
Expand Down Expand Up @@ -89,7 +90,6 @@ impl TdfSerialize for TickerServer {
/// Origin auth source?
pub const AUTH_SOURCE: &str = "303107";
pub const BLAZE_VERSION: &str = "Blaze 3.15.08.0 (CL# 1905397)\n";
pub const PING_PERIOD: &str = "15s";

/// Alias used for ping sites
pub const PING_SITE_ALIAS: &str = "ea-sjc";
Expand All @@ -114,13 +114,15 @@ impl TdfSerialize for PreAuthResponse {
);
w.tag_str_empty(b"CNGN");

let ping_period = format!("{}s", KEEP_ALIVE_DELAY.as_secs());

// Client configuration provided by the server
w.group(b"CONF", |w| {
w.tag_map_tuples(
b"CONF",
&[
// Client to server ping period
("pingPeriod", PING_PERIOD),
("pingPeriod", ping_period.as_str()),
// VOIP headset update rate
("voipHeadsetUpdateRate", "1000"),
// XLSP (Xbox Live Server Platform)
Expand Down
10 changes: 5 additions & 5 deletions src/session/routes/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ pub async fn handle_login(
let player = sessions.add_session(player, Arc::downgrade(&session));

// Update the session stored player
let player = session.data.start_session(player);
let player = session.data.set_auth(player);

let session_token: String = sessions.create_token(player.id);

Expand Down Expand Up @@ -100,7 +100,7 @@ pub async fn handle_silent_login(
let player = sessions.add_session(player, Arc::downgrade(&session));

// Update the session stored player
let player = session.data.start_session(player);
let player = session.data.set_auth(player);

Ok(Blaze(AuthResponse {
player,
Expand Down Expand Up @@ -137,7 +137,7 @@ pub async fn handle_origin_login(
let player = sessions.add_session(player, Arc::downgrade(&session));

// Update the session stored player
let player = session.data.start_session(player);
let player = session.data.set_auth(player);

let session_token: String = sessions.create_token(player.id);

Expand All @@ -157,7 +157,7 @@ pub async fn handle_origin_login(
/// Content: {}
/// ```
pub async fn handle_logout(session: SessionLink) {
session.data.clear();
session.data.clear_auth();
}

// Skip formatting these entitlement creations
Expand Down Expand Up @@ -363,7 +363,7 @@ pub async fn handle_create_account(
// Create the session association
let player = sessions.add_session(player, Arc::downgrade(&session));

let player = session.data.start_session(player);
let player = session.data.set_auth(player);

let session_token = sessions.create_token(player.id);

Expand Down
2 changes: 1 addition & 1 deletion src/session/routes/user_sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub async fn handle_resume_session(
}

let player = sessions.add_session(player, Arc::downgrade(&session));
let player = session.data.start_session(player);
let player = session.data.set_auth(player);

Ok(Blaze(AuthResponse {
player,
Expand Down
13 changes: 11 additions & 2 deletions src/session/routes/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ pub async fn handle_post_auth(
/// Content: {}
/// ```
///
pub async fn handle_ping() -> Blaze<PingResponse> {
pub async fn handle_ping(session: SessionLink) -> Blaze<PingResponse> {
session.data.set_alive();

let server_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
Expand Down Expand Up @@ -420,12 +422,19 @@ fn data_config() -> TdfMap<String, String> {
/// }
/// ```
pub async fn handle_suspend_user_ping(
session: SessionLink,
Blaze(SuspendPingRequest { time_value }): Blaze<SuspendPingRequest>,
) -> BlazeError {
let res = match time_value.cmp(&90000000) {
Ordering::Less => UtilError::SuspendPingTimeTooSmall,
Ordering::Greater => UtilError::SuspendPingTimeTooLarge,
Ordering::Equal => UtilError::PingSuspended,
Ordering::Equal => {
session
.data
.set_keep_alive_grace(Duration::from_micros(time_value as u64));

UtilError::PingSuspended
}
};
res.into()
}
Expand Down

0 comments on commit 0a5566e

Please sign in to comment.