Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Judge: fix game state deserialization... #216

Merged
merged 8 commits into from
Apr 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 105 additions & 1 deletion micro-changelog/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion micro-changelog/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "micro_changelog"
version = "0.1.1"
version = "0.1.2"
authors = ["terkwood <[email protected]>"]
edition = "2018"

Expand All @@ -11,3 +11,5 @@ redis_conn_pool = { git = "https://github.com/Terkwood/BUGOUT", branch = "unstab
redis_streams = { git = "https://github.com/Terkwood/BUGOUT", branch = "unstable" }
bincode = "1.2.1"
uuid = { version = "0.8.1", features = ["serde"] }
log = "0.4.8"
env_logger = "0.7.1"
2 changes: 2 additions & 0 deletions micro-changelog/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ COPY . /var/BUGOUT/micro-changelog/.

RUN cargo install --path .

ENV RUST_LOG info

CMD ["micro_changelog"]
4 changes: 4 additions & 0 deletions micro-changelog/dev-cp.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#!/bin/bash

docker cp Cargo.toml bugout_micro-changelog_1:/var/BUGOUT/micro-changelog/.
docker cp src bugout_micro-changelog_1:/var/BUGOUT/micro-changelog/.
7 changes: 6 additions & 1 deletion micro-changelog/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
extern crate bincode;
extern crate env_logger;
extern crate log;
pub extern crate micro_model_moves;
extern crate redis_streams;

mod model;
pub mod repo;
pub mod stream;
Expand All @@ -9,6 +12,8 @@ pub use redis_conn_pool;
pub use redis_conn_pool::{r2d2, r2d2_redis, redis, RedisHostUrl};
use repo::redis_key::KeyProvider;

use log::info;

pub struct Components {
pub pool: redis_conn_pool::Pool,
pub redis_key_provider: KeyProvider,
Expand All @@ -17,7 +22,7 @@ pub struct Components {
impl Default for Components {
fn default() -> Self {
let pool = redis_conn_pool::create(RedisHostUrl::default());
println!("Connected to redis");
info!("Connected to redis");
Components {
pool,
redis_key_provider: KeyProvider::default(),
Expand Down
10 changes: 6 additions & 4 deletions micro-changelog/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
const NAME: &'static str = env!("CARGO_PKG_NAME");
const VERSION: &'static str = env!("CARGO_PKG_VERSION");

use micro_changelog::stream;
use micro_changelog::Components;
use stream::StreamTopics;

use log::info;

const VERSION: &'static str = env!("CARGO_PKG_VERSION");

fn main() {
println!("🔢 {:<8} {}", NAME, VERSION);
env_logger::init();
info!("🔢 {}", VERSION);
stream::process(StreamTopics::default(), &Components::default())
}
8 changes: 4 additions & 4 deletions micro-changelog/src/repo/game_states_repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ pub fn fetch(game_id: &GameId, components: &Components) -> Result<Option<GameSta
}

pub fn write(
game_id: GameId,
game_id: &GameId,
game_state: &GameState,
components: &Components,
) -> Result<String, WriteErr> {
) -> Result<String, WriteErr> {
let mut conn = components.pool.get().unwrap();

let key = components.redis_key_provider.game_states(&game_id);
let key = components.redis_key_provider.game_states(game_id);
let done = conn.set(&key, game_state.serialize()?)?;
// Touch TTL whenever you set the record
conn.expire(key, EXPIRY_SECS)?;

Ok(done)
}
78 changes: 41 additions & 37 deletions micro-changelog/src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ use micro_model_moves::*;
use redis_conn_pool::redis;
pub use topics::StreamTopics;
use xread::*;

use log::{error, info};

pub fn process(topics: StreamTopics, components: &crate::Components) {
println!("Processing {:#?}", topics);
info!("Processing {:#?}", topics);
loop {
match entry_id_repo::fetch_all(components) {
Ok(entry_ids) => match xread_sorted(entry_ids, &topics, &components.pool) {
Expand All @@ -18,58 +21,59 @@ pub fn process(topics: StreamTopics, components: &crate::Components) {
match time_ordered_event {
(entry_id, StreamData::MA(move_acc)) => {
match update_game_state(&move_acc, &components) {
Err(e) => println!("err updating game state {:?}", e),
Err(e) => error!("err updating game state {:?}", e),
Ok(gs) => {
// These next two ops are concurrent in the kafka impl
if let Err(e) = xadd_game_states_changelog(
&move_acc.game_id,
gs,
&topics.game_states_changelog,
components,
) {
error!("could not XADD to game state changelog {:?}", e)
}

if let Err(e) = xadd_move_made(
&move_acc,
&topics.move_made_ev,
&components,
)
.and_then(|_| {
xadd_game_states_changelog(
&move_acc.game_id,
gs,
&topics.game_states_changelog,
components,
)
}) {
println!("err in XADDs {:?}", e)
} else {
if let Err(e) = entry_id_repo::update(
EntryIdType::MoveAcceptedEvent,
entry_id,
&components,
) {
println!(
"err saving entry id for move accepted {:?}",
e
)
}
) {
error!("err in XADD move made {:?}", e)
}

if let Err(e) = entry_id_repo::update(
EntryIdType::MoveAcceptedEvent,
entry_id,
&components,
) {
error!("err saving entry id for move accepted {:?}", e)
}
}
}
}
(entry_id, StreamData::GS(game_id, gs)) => {
if let Err(e) = game_states_repo::write(game_id, &gs, &components) {
println!("Error saving game state {:#?}", e)
if let Err(e) = game_states_repo::write(&game_id, &gs, &components)
{
error!("Error saving game state {:#?}", e)
} else {
println!("loop game STATE");
if let Err(e) = entry_id_repo::update(
EntryIdType::GameStateChangelog,
entry_id,
&components,
) {
println!("Error saving entry ID for game state {:#?}", e)
}
info!("wrote game state: {:?} {:?}", game_id, gs);
}

if let Err(e) = entry_id_repo::update(
EntryIdType::GameStateChangelog,
entry_id,
&components,
) {
error!("Error saving entry ID for game state {:#?}", e)
}
}
}
}
}
Err(e) => println!("Redis err in xread: {:#?}", e),
Err(e) => error!("Redis err in xread: {:#?}", e),
},
Err(FetchErr::Deser) => println!("Unable to deserialize entry IDs"),
Err(FetchErr::Redis(r)) => println!("Redis err {:#?}", r),
Err(FetchErr::Deser) => error!("Unable to deserialize entry IDs"),
Err(FetchErr::Redis(r)) => error!("Redis err {:#?}", r),
}
}
}
Expand Down Expand Up @@ -102,7 +106,7 @@ fn update_game_state(
og.moves.push(move_acc.clone());
og
})?;
game_states_repo::write(game_id, &new_game_state, &components)?;
game_states_repo::write(&game_id, &new_game_state, &components)?;
Ok(new_game_state)
}

Expand Down
Loading