Skip to content

Commit

Permalink
Use join sets for other tokio task sets (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
madmikeross authored Dec 18, 2023
1 parent 5a45fd3 commit d2815d5
Showing 1 changed file with 105 additions and 83 deletions.
188 changes: 105 additions & 83 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,54 +158,57 @@ async fn refresh_eve_scout_system_relations(
) -> Result<(), ReplicationError> {
drop_system_connections(&graph, "Thera").await?;
drop_system_connections(&graph, "Turnur").await?;
let signatures = get_public_signatures(client.clone()).await?;
let wormhole_saves: Vec<_> = signatures

let mut set = JoinSet::new();

get_public_signatures(client.clone())
.await?
.iter()
.filter(|sig| sig.signature_type == "wormhole")
.map(|wormhole| tokio::spawn(save_wormhole(graph.clone(), wormhole.clone())))
.collect();
futures::future::try_join_all(wormhole_saves).await?;
Ok(())
.for_each(|wormhole| {
set.spawn(save_wormhole(graph.clone(), wormhole.clone()));
});

error_if_any_member_has_error(&mut set)
.await
.unwrap()
.map_err(TargetError)
}

async fn pull_all_stargates(client: Client, graph: Arc<Graph>) -> Result<(), ReplicationError> {
let saved_system_ids = get_all_system_ids(graph.clone()).await?;
let stargate_pulls: Vec<_> = saved_system_ids
let mut set = JoinSet::new();

get_all_system_ids(graph.clone())
.await?
.iter()
.map(|&system_id| {
tokio::spawn(pull_system_stargates(
.for_each(|&system_id| {
set.spawn(pull_system_stargates(
client.clone(),
graph.clone(),
system_id,
))
})
.collect();
futures::future::try_join_all(stargate_pulls).await?;
Ok(())
));
});

error_if_any_member_has_error(&mut set).await.unwrap()
}

async fn pull_all_systems(client: Client, graph: Arc<Graph>) -> Result<(), ReplicationError> {
let system_ids = get_system_ids(&client).await.unwrap();
println!("Received {} system ids from ESI", system_ids.len());

let mut set = JoinSet::new();

system_ids.iter().for_each(|&system_id| {
println!("Spawning task to pull {} system if its missing", system_id);
set.spawn(pull_system_if_missing(
client.clone(),
graph.clone(),
system_id,
));
});

while let Some(res) = set.join_next().await {
if let Err(e) = res.unwrap() {
return Err(e);
}
}
get_system_ids(&client)
.await
.unwrap()
.iter()
.for_each(|&system_id| {
println!("Spawning task to pull {} system if its missing", system_id);
set.spawn(pull_system_if_missing(
client.clone(),
graph.clone(),
system_id,
));
});

Ok(())
error_if_any_member_has_error(&mut set).await.unwrap()
}

async fn pull_system_if_missing(
Expand All @@ -214,9 +217,8 @@ async fn pull_system_if_missing(
system_id: i64,
) -> Result<(), ReplicationError> {
println!("Checking if system_id {} exists in the database", system_id);
let result = system_id_exists(graph.clone(), system_id).await;

match result {
match system_id_exists(graph.clone(), system_id).await {
Ok(exists) => {
if exists {
println!(
Expand Down Expand Up @@ -304,76 +306,96 @@ async fn pull_system_stargates(
graph: Arc<Graph>,
system_id: i64,
) -> Result<(), ReplicationError> {
let system = get_system(graph.clone(), system_id).await?;
let stargate_pulls: Vec<_> = system
let mut set = JoinSet::new();

get_system(graph.clone(), system_id)
.await?
.unwrap()
.stargates
.iter()
.map(|&stargate_id| {
tokio::spawn(pull_stargate_if_missing(
.for_each(|&stargate_id| {
set.spawn(pull_stargate_if_missing(
client.clone(),
graph.clone(),
stargate_id,
))
})
.collect();
futures::future::try_join_all(stargate_pulls).await?;
Ok(())
));
});

error_if_any_member_has_error(&mut set).await.unwrap()
}

async fn error_if_any_member_has_error<T: 'static>(
set: &mut JoinSet<Result<(), T>>,
) -> Option<Result<(), T>> {
while let Some(res) = set.join_next().await {
if let Err(e) = res.unwrap() {
return Some(Err(e));
}
}
None
}

async fn pull_system_kills(client: Client, graph: Arc<Graph>) -> Result<i32, ReplicationError> {
let response = get_system_kills(&client).await?;
let galaxy_kills: i32 = response.system_kills.iter().map(|s| s.ship_kills).sum();
let kill_saves: Vec<_> = response
.system_kills
.iter()
.map(|system_kill| {
tokio::spawn(set_last_hour_system_kills(
graph.clone(),
system_kill.system_id,
system_kill.ship_kills,
))
})
.collect();
futures::future::try_join_all(kill_saves).await?;
Ok(galaxy_kills)

let mut set = JoinSet::new();

response.system_kills.iter().for_each(|system_kill| {
set.spawn(set_last_hour_system_kills(
graph.clone(),
system_kill.system_id,
system_kill.ship_kills,
));
});

error_if_any_member_has_error(&mut set)
.await
.unwrap()
.map_err(TargetError)
.map(|_| galaxy_kills)
}

async fn pull_system_jumps(client: Client, graph: Arc<Graph>) -> Result<i32, ReplicationError> {
let response = get_system_jumps(&client).await?;
let galaxy_jumps: i32 = response.system_jumps.iter().map(|s| s.ship_jumps).sum();
let jump_saves: Vec<_> = response
.system_jumps
.iter()
.map(|system_jump| {
tokio::spawn(set_last_hour_system_jumps(
graph.clone(),
system_jump.system_id,
system_jump.ship_jumps,
))
})
.collect();
futures::future::try_join_all(jump_saves).await?;
Ok(galaxy_jumps)

let mut set = JoinSet::new();

response.system_jumps.iter().for_each(|system_jump| {
set.spawn(set_last_hour_system_jumps(
graph.clone(),
system_jump.system_id,
system_jump.ship_jumps,
));
});

error_if_any_member_has_error(&mut set)
.await
.unwrap()
.map_err(TargetError)
.map(|_| galaxy_jumps)
}

async fn pull_system_risk(client: Client, graph: Arc<Graph>) -> Result<(), ReplicationError> {
let galaxy_kills = pull_system_kills(client.clone(), graph.clone()).await?;
let galaxy_jumps = pull_system_jumps(client.clone(), graph.clone()).await?;
let system_ids = get_all_system_ids(graph.clone()).await?;
let risk_saves: Vec<_> = system_ids
.iter()
.map(|&system_id| {
tokio::spawn(set_system_jump_risk(
graph.clone(),
system_id,
galaxy_jumps,
galaxy_kills,
))
})
.collect();
futures::future::try_join_all(risk_saves).await?;
Ok(())
let mut set = JoinSet::new();

system_ids.iter().for_each(|&system_id| {
set.spawn(set_system_jump_risk(
graph.clone(),
system_id,
galaxy_jumps,
galaxy_kills,
));
});

error_if_any_member_has_error(&mut set)
.await
.unwrap()
.map_err(TargetError)
}

async fn pull_stargate_if_missing(
Expand Down

0 comments on commit d2815d5

Please sign in to comment.