Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement redis caching actor #4

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,4 @@ listenfd = "0.3"
failure = "0.1"
futures = "0.1"
url = "1.7"
redis = "0.24.0"
32 changes: 27 additions & 5 deletions server/src/actors/game_actor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
actors::ClientWsActor,
actors::{ClientWsActor, StoreActor},
game::{Game, TICKS_PER_SECOND},
models::messages::{ClientStop, PlayerGameCommand, ServerCommand},
models::messages::{ClientStop, PlayerGameCommand, ServerCommand, SetScoreboardCommand, SetPlayerInfoCommand},
};
use actix::{Actor, Addr, AsyncContext, Context, Handler, Message};
use futures::sync::oneshot;
Expand All @@ -15,6 +15,7 @@ use tokyo::models::*;

#[derive(Debug)]
pub struct GameActor {
store_actor_addr: Addr<StoreActor>,
connections: HashMap<String, Addr<ClientWsActor>>,
spectators: HashSet<Addr<ClientWsActor>>,
team_names: HashMap<u32, String>,
Expand All @@ -26,6 +27,7 @@ pub struct GameActor {
game_config: GameConfig,
max_players: u32,
time_limit_seconds: u32,
room_token: String,
}

#[derive(Debug)]
Expand All @@ -37,10 +39,11 @@ pub enum GameLoopCommand {
}

impl GameActor {
pub fn new(config: GameConfig, max_players: u32, time_limit_seconds: u32) -> GameActor {
pub fn new(config: GameConfig, store_actor_addr: Addr<StoreActor>, max_players: u32, time_limit_seconds: u32, room_token: String) -> GameActor {
let (msg_tx, msg_rx) = channel();

GameActor {
store_actor_addr,
connections: HashMap::new(),
spectators: HashSet::new(),
team_names: HashMap::new(),
Expand All @@ -52,17 +55,20 @@ impl GameActor {
game_config: config,
max_players,
time_limit_seconds,
room_token,
}
}
}

fn game_loop(
game_actor: Addr<GameActor>,
store_actor: Addr<StoreActor>,
msg_chan: Receiver<GameLoopCommand>,
mut cancel_chan: oneshot::Receiver<()>,
config: GameConfig,
max_players: u32,
time_limit_seconds: u32,
room_token: String,
) {
let mut loop_helper = LoopHelper::builder().build_with_target_rate(TICKS_PER_SECOND);

Expand Down Expand Up @@ -119,6 +125,12 @@ fn game_loop(
println!("Ending game!");
status = GameStatus::Finished;
game_over_at = None;

// store scoreboard on game end
store_actor.do_send(SetScoreboardCommand {
room_token: room_token.clone(),
scoreboard: game.state.scoreboard.clone(),
});
}

if status.is_running() {
Expand Down Expand Up @@ -180,6 +192,8 @@ impl Actor for GameActor {
info!("Game Actor started!");
let (cancel_tx, cancel_rx) = oneshot::channel();
let addr = ctx.address();
let store_actor_addr = self.store_actor_addr.clone();
let room_token = self.room_token.clone();

// "Take" the receiving end of the channel and give it
// to the game loop thread
Expand All @@ -189,7 +203,7 @@ impl Actor for GameActor {
let max_players = self.max_players;
let time_limit_seconds = self.time_limit_seconds;
std::thread::spawn(move || {
game_loop(addr, msg_rx, cancel_rx, config, max_players, time_limit_seconds);
game_loop(addr, store_actor_addr, msg_rx, cancel_rx, config, max_players, time_limit_seconds, room_token);
});

self.cancel_chan = Some(cancel_tx);
Expand All @@ -210,6 +224,8 @@ impl Handler<SocketEvent> for GameActor {
SocketEvent::Join(api_key, team_name, addr) => {
let key_clone = api_key.clone();
let addr_clone = addr.clone();
let cache_api_key = api_key.clone();
let cache_team_name = team_name.clone();

info!("person joined - {:?}", api_key);

Expand Down Expand Up @@ -250,6 +266,12 @@ impl Handler<SocketEvent> for GameActor {
for addr in self.connections.values().chain(self.spectators.iter()) {
addr.do_send(ServerToClient::TeamNames(self.team_names.clone()));
}

// Store player info to DB
let mut fields = HashMap::new();
fields.insert("api_key".to_string(), cache_api_key);
fields.insert("team_name".to_string(), cache_team_name);
self.store_actor_addr.do_send(SetPlayerInfoCommand { player_id, fields });
}
},
SocketEvent::Leave(api_key, addr) => {
Expand Down
2 changes: 2 additions & 0 deletions server/src/actors/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
pub mod client_ws_actor;
pub mod game_actor;
pub mod store_actor;

pub use client_ws_actor::ClientWsActor;
pub use game_actor::GameActor;

pub mod room_manager_actor;
pub use room_manager_actor::{CreateRoom, JoinRoom, ListRooms, RoomManagerActor};
pub use store_actor::StoreActor;
11 changes: 7 additions & 4 deletions server/src/actors/room_manager_actor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::actors::GameActor;
use crate::actors::{GameActor, StoreActor};
use actix::prelude::*;
use rand::{distributions::Alphanumeric, Rng};
use std::{
Expand All @@ -12,6 +12,7 @@ const TOKEN_LENGTH: usize = 8;
// RoomManagerActor is responsible for creating and managing rooms
pub struct RoomManagerActor {
config: GameConfig,
store_actor_addr: Addr<StoreActor>,
id_counter: u32,
rooms: HashMap<String, Room>,
}
Expand All @@ -29,6 +30,7 @@ struct Room {
impl Room {
pub fn new(
config: &GameConfig,
store_actor_addr: Addr<StoreActor>,
id: u32,
name: String,
max_players: u32,
Expand All @@ -37,15 +39,15 @@ impl Room {
) -> Room {
let game_cfg = GameConfig { bound_x: config.bound_x, bound_y: config.bound_y };

let game_actor = GameActor::new(game_cfg, max_players, time_limit_seconds);
let game_actor = GameActor::new(game_cfg, store_actor_addr, max_players, time_limit_seconds, token.clone());
let game_actor_addr = game_actor.start();
Room { id, name, max_players, time_limit_seconds, token, game: game_actor_addr }
}
}

impl RoomManagerActor {
pub fn new(cfg: GameConfig) -> RoomManagerActor {
RoomManagerActor { config: cfg, id_counter: 0, rooms: HashMap::new() }
pub fn new(cfg: GameConfig, store_actor_addr: Addr<StoreActor>) -> RoomManagerActor {
RoomManagerActor { config: cfg, id_counter: 0, rooms: HashMap::new(), store_actor_addr }
}

pub fn create_room(
Expand All @@ -66,6 +68,7 @@ impl RoomManagerActor {
token.clone(),
Room::new(
&self.config,
self.store_actor_addr.clone(),
self.id_counter,
name.to_string(),
max_players,
Expand Down
87 changes: 87 additions & 0 deletions server/src/actors/store_actor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
use std::collections::HashMap;

use actix::prelude::*;
use redis::{Client, Commands, Connection};

use crate::models::messages::{GetScoreboardCommand, SetPlayerInfoCommand, SetScoreboardCommand, GetMultiplePlayerInfo};

#[derive(Debug)]
pub struct StoreActor {
client: Client,
}

impl StoreActor {
pub fn new(redis_url: String) -> StoreActor {
let client = Client::open(redis_url).expect("Failed to create Redis client");
StoreActor { client }
}
}

impl Actor for StoreActor {
type Context = Context<Self>;
}

impl Handler<SetScoreboardCommand> for StoreActor {
type Result = Result<(), redis::RedisError>;

fn handle(&mut self, msg: SetScoreboardCommand, _: &mut Self::Context) -> Self::Result {
let mut con: Connection = self.client.get_connection()?;
let query_key = format!("room:{}:scoreboard", msg.room_token);
for (player_id, points) in &msg.scoreboard {
con.zadd(query_key.clone(), *points as f64, *player_id)?;
}
Ok(())
}
}

impl Handler<GetScoreboardCommand> for StoreActor {
type Result = Result<HashMap<u32, u32>, redis::RedisError>;

fn handle(&mut self, msg: GetScoreboardCommand, _: &mut Self::Context) -> Self::Result {
let mut con: Connection = self.client.get_connection()?;
let query_key = format!("room:{}:scoreboard", msg.0);
let scoreboard: Vec<(String, String)> = con.zrevrange_withscores(query_key, 0, -1)?;
let mut result = HashMap::new();
for (total_points_str, player_id_str) in scoreboard {
let player_id = player_id_str.parse::<u32>().unwrap_or_default();
let total_points = total_points_str.parse::<f64>().unwrap_or_default() as u32;
result.insert(player_id, total_points);
}

Ok(result)
}
}

impl Handler<SetPlayerInfoCommand> for StoreActor {
type Result = Result<String, redis::RedisError>;

fn handle(&mut self, msg: SetPlayerInfoCommand, _: &mut Self::Context) -> Self::Result {
let mut con: Connection = self.client.get_connection()?;

// Use hset_multiple to set multiple fields at the same time
let query_key = format!("player:{}:info", msg.player_id);
let fields: Vec<(&str, &str)> =
msg.fields.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect();
let result: redis::RedisResult<()> = con.hset_multiple(query_key, &fields);

result
.map_err(|e| e.into())
.map(|_| format!("Fields are set for player_id {}", msg.player_id))
}
}

impl Handler<GetMultiplePlayerInfo> for StoreActor {
type Result = Result<HashMap<u32, String>, redis::RedisError>;

fn handle(&mut self, msg: GetMultiplePlayerInfo, _: &mut Self::Context) -> Self::Result {
let mut con: Connection = self.client.get_connection()?;
let mut results = HashMap::new();
for key in msg.player_ids {
let hash_key: String = format!("player:{}:info", key);
let player_info: HashMap<String, String> = con.hgetall(&hash_key)?;
results.insert(key.clone(), serde_json::to_string(&player_info).unwrap());
}

Ok(results)
}
}
90 changes: 87 additions & 3 deletions server/src/controllers/api.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
use std::collections::HashMap;

use crate::{
actors::{ClientWsActor, CreateRoom, JoinRoom, ListRooms},
models::messages::ServerCommand,
actors::{ClientWsActor, CreateRoom, JoinRoom, ListRooms, StoreActor},
models::messages::{GetMultiplePlayerInfo, GetScoreboardCommand, ServerCommand},
AppState,
};
use actix_web::{http::StatusCode, HttpRequest, Query, State};
use actix::Addr;
use actix_web::{http::StatusCode, HttpRequest, Path, Query, State};
use futures::Future;

#[derive(Debug, Deserialize)]
Expand All @@ -13,6 +16,25 @@ pub struct QueryString {
name: String,
}

#[derive(Debug, Deserialize)]
pub struct PlayerInfo {
api_key: String,
team_name: String,
}

#[derive(Serialize, Deserialize)]
pub struct ScoreboardEntry {
player_id: u32,
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we expose more data? how about name, sth like that. We can't show the data just the id

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I just pushed my refactor to include more info (api_key, team_name) in scoreboard response.

total_points: u32,
api_key: String,
team_name: String,
}

#[derive(Serialize, Deserialize)]
struct ScoreboardResponse {
scoreboard: Vec<ScoreboardEntry>,
}

pub fn socket_handler(
(req, state, query): (HttpRequest<AppState>, State<AppState>, Query<QueryString>),
) -> Result<actix_web::HttpResponse, actix_web::Error> {
Expand Down Expand Up @@ -107,3 +129,65 @@ pub fn list_rooms_handler(
Err(_) => Err(actix_web::error::ErrorBadRequest("Failed to list rooms")),
}
}

fn get_scoreboard_player_info(player_ids: Vec<u32>, addr: Addr<StoreActor>) -> Result<HashMap<u32, PlayerInfo>, actix_web::Error> {
let result: Result<HashMap<u32, String>, redis::RedisError> =
addr.send(GetMultiplePlayerInfo { player_ids }).wait().unwrap();

match result {
Ok(players) => {
let mut player_infos: HashMap<u32, PlayerInfo> = HashMap::new();
for (id, player_data_json) in players {
let player_info: PlayerInfo = serde_json::from_str(&player_data_json).expect("Failed to deserialize JSON");
player_infos.insert(id, player_info);
}
Ok(player_infos)
},
Err(_) => Err(actix_web::error::ErrorBadRequest(String::from("failed to query player info data")))
}
}

pub fn get_room_scoreboard(
(_req, state, path): (HttpRequest<AppState>, State<AppState>, Path<String>),
) -> Result<actix_web::HttpResponse, actix_web::Error> {
let room_token = path.into_inner();
let result = state.store_actor_addr.send(GetScoreboardCommand(room_token)).wait().unwrap();
match result {
Ok(scoreboard) => {
let player_ids: Vec<u32> = scoreboard.keys().cloned().collect();
let player_info_map = get_scoreboard_player_info(player_ids, state.store_actor_addr.clone()).unwrap();
let scoreboard_response: ScoreboardResponse = ScoreboardResponse {
scoreboard: scoreboard
.into_iter()
.map(|(player_id, total_points)| {
player_info_map
.get(&player_id)
.map_or_else(
|| {
info!("Failed to query player info by player_id");
ScoreboardEntry {
player_id,
total_points,
api_key: String::from(""),
team_name: String::from(""),
}
},
|info| ScoreboardEntry {
player_id,
total_points,
api_key: info.api_key.clone(),
team_name: info.team_name.clone(),
},
)
})
.collect(),
};
let body = serde_json::to_string(&scoreboard_response)?;
Ok(actix_web::HttpResponse::with_body(StatusCode::OK, body))
},
Err(e) => Err(actix_web::error::ErrorBadRequest(format!(
"Failed to get room's scoreboard: {}",
e
))),
}
}
Loading