Skip to content

Commit

Permalink
Synchronize systems and stargates with ESI on startup
Browse files Browse the repository at this point in the history
  • Loading branch information
madmikeross committed Dec 23, 2023
1 parent 37c927a commit 97ff16a
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 30 deletions.
11 changes: 7 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ An application for finding optimal routes between systems in EVE Online.
Make sure you have Docker engine installed, then run `docker compose up`. This should build a container for Neo4j,
install the graph-data-science plugin, and also build a container for the eve-graph app.

On start, eve-graph will attempt to synchronize systems and stargates with ESI before accepting requests. If routing
isn't working properly, inspect the logs for the api container.

### Collecting data
You need to exercise the system refresh, stargate refresh, and wormhole refresh endpoints to hydrate the database
with data on first run. Also, every time you restart the database, the in memory "graph" of data that the gds plugin
uses will need to be rebuilt, calling to refresh wormholes also refreshes this "graph" (and you should call to refresh
wormholes regularly).
When you first start eve-graph, you will need to exercise the wormhole refresh endpoint to hydrate the database
ephemeral J-space connections. Also, every time you restart the database, the in memory "graph" of data that the gds
plugin uses will need to be rebuilt, calling to refresh wormholes also refreshes this "graph" (and you should call to
refresh wormholes regularly).

### Finding the shortest route
If you want to find the shortest route between two systems, say Cleyd and Jita, simply issue a get request to
Expand Down
61 changes: 48 additions & 13 deletions src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,39 +8,33 @@ use serde::{Deserialize, Serialize};
use crate::eve_scout::EveScoutSignature;

pub async fn get_graph_client_with_retry(max_retries: usize) -> Result<Arc<Graph>, Error> {
println!("Connecting to Neo4j");
let neo4j_container_name = "neo4j";
let uri = format!("bolt://{}:7687", neo4j_container_name);
let user = "neo4j";
let pass = "neo4jneo4j";

for attempt in 1..=max_retries {
println!("Trying to build a Neo4j graph client, attempt {}", attempt);
for _attempt in 1..=max_retries {
match Graph::new(uri.clone(), user, pass).await {
Ok(graph) => {
let graph = Arc::new(graph);
match check_neo4j_health(graph.clone()).await {
Ok(_) => {
println!("Successfully built a healthy Neo4j graph client");
println!("Connected to Neo4j");
return Ok(graph);
}
Err(err) => {
println!("Neo4j isn't ready yet: {}", err);
}
Err(err) => {}
};
}
Err(err) => println!("Failed to build a Neo4j graph client: {}", err),
Err(err) => {}
};

let seconds = 5;
println!(
"Waiting {} seconds before trying to build another Neo4j graph client",
seconds
);
tokio::time::sleep(Duration::from_secs(seconds)).await;
}

println!(
"Failed to get graph client after max of {} attempts",
"Failed to connect to Neo4j after the allowed {} attempts",
max_retries
);
Err(ConnectionError)
Expand All @@ -50,7 +44,7 @@ async fn check_neo4j_health(graph: Arc<Graph>) -> Result<(), Error> {
let test_query = "MATCH (n) RETURN n LIMIT 1";
let mut result = match graph.execute(query(test_query)).await {
Ok(row_stream) => row_stream,
Err(err) => {
Err(_err) => {
return Err(ConnectionError);
}
};
Expand Down Expand Up @@ -175,6 +169,27 @@ pub async fn get_all_system_ids(graph: Arc<Graph>) -> Result<Vec<i64>, Error> {
Ok(system_ids)
}

pub async fn get_saved_system_count(graph: &Arc<Graph>) -> Result<i64, Error> {
let get_system_count = "MATCH (s:System) RETURN COUNT(s) as count";
let mut result = graph.execute(query(get_system_count)).await?;
let row = result.next().await?;

match row {
None => Ok(0),
Some(row) => row.get::<i64>("count").map_err(DeserializationError),
}
}
pub async fn get_saved_stargate_count(graph: &Arc<Graph>) -> Result<i64, Error> {
let get_stargate_count = "MATCH (sg:Stargate) RETURN COUNT(sg) as count";
let mut result = graph.execute(query(get_stargate_count)).await?;
let row = result.next().await?;

match row {
None => Ok(0),
Some(row) => row.get::<i64>("count").map_err(DeserializationError),
}
}

#[derive(Debug, Serialize, Deserialize)]
pub struct Stargate {
pub destination_stargate_id: i64,
Expand Down Expand Up @@ -559,3 +574,23 @@ pub async fn find_safest_route(
None => Ok(None),
}
}

pub async fn remove_duplicate_systems(graph: Arc<Graph>) -> Result<(), Error> {
let remove_duplicates = "
MATCH (s:System)
WITH s.system_id AS systemId, COLLECT(s) AS duplicates, COUNT(*) AS count
WHERE count > 1
FOREACH (duplicate IN TAIL(duplicates) | DETACH DELETE duplicate)";

graph.run(query(remove_duplicates)).await
}

pub async fn remove_duplicate_stargates(graph: Arc<Graph>) -> Result<(), Error> {
let remove_duplicates = "
MATCH (s:Stargate)
WITH s.stargate_id AS stargateId, COLLECT(s) AS duplicates, COUNT(*) AS count
WHERE count > 1
FOREACH (duplicate IN TAIL(duplicates) | DETACH DELETE duplicate)";

graph.run(query(remove_duplicates)).await
}
94 changes: 81 additions & 13 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::cmp::Ordering;
use std::convert::Infallible;
use std::sync::Arc;

Expand Down Expand Up @@ -72,6 +73,17 @@ async fn main() {
.or(stargates_routes)
.recover(handle_rejection);

match synchronize_esi_systems(client.clone(), graph.clone()).await {
Ok(_) => {
// Stargate sync relies on systems being saved
match synchronize_esi_stargates(client.clone(), graph.clone()).await {
Ok(_) => {}
Err(err) => println!("Stargate synchronization failed {}", err),
}
}
Err(err) => println!("System synchronization failed {}", err),
}

println!("Serving routes on 8008");
warp::serve(service_routes).run(([0, 0, 0, 0], 8008)).await;
}
Expand Down Expand Up @@ -207,6 +219,7 @@ async fn refresh_eve_scout_system_relations(
}

async fn pull_all_stargates(client: Client, graph: Arc<Graph>) -> Result<(), ReplicationError> {
println!("Pulling all stargates from ESI");
let mut set = JoinSet::new();

get_all_system_ids(graph.clone())
Expand All @@ -223,15 +236,81 @@ async fn pull_all_stargates(client: Client, graph: Arc<Graph>) -> Result<(), Rep
error_if_any_member_has_error(&mut set).await.unwrap()
}

const EXPECTED_ESI_SYSTEM_COUNT: i64 = 8436;
async fn synchronize_esi_systems(
client: Client,
graph: Arc<Graph>,
) -> Result<(), ReplicationError> {
println!("Synchronizing systems with ESI");
let mut saved_count = get_saved_system_count(&graph).await?;
let max_attempts = 5;

for _attempt in 1..=max_attempts {
match saved_count.cmp(&EXPECTED_ESI_SYSTEM_COUNT) {
Ordering::Less => {
pull_all_systems(client.clone(), graph.clone()).await?;
saved_count = get_saved_system_count(&graph).await?;
}
Ordering::Equal => {
println!("Systems synchronized");
return Ok(());
}
Ordering::Greater => {
println!("Database has more systems than expected, removing any duplicates");
remove_duplicate_systems(graph.clone()).await?;
}
}
}

println!(
"Failed to synchronize saved system count {} to expected count {}",
saved_count, EXPECTED_ESI_SYSTEM_COUNT
);
Ok(())
}

const EXPECTED_ESI_STARGATE_COUNT: i64 = 13776;
async fn synchronize_esi_stargates(
client: Client,
graph: Arc<Graph>,
) -> Result<(), ReplicationError> {
println!("Synchronizing stargates with ESI");
let mut saved_count = get_saved_stargate_count(&graph).await?;
let max_attempts = 5;

for _attempt in 1..=max_attempts {
match saved_count.cmp(&EXPECTED_ESI_STARGATE_COUNT) {
Ordering::Less => {
pull_all_stargates(client.clone(), graph.clone()).await?;
saved_count = get_saved_stargate_count(&graph).await?;
}
Ordering::Equal => {
println!("Stargates synchronized");
return Ok(());
}
Ordering::Greater => {
println!("Database has more stargates than expected, removing any duplicates");
remove_duplicate_stargates(graph.clone()).await?;
}
}
}

println!(
"Failed to synchronize saved stargate count {} to expected count {}",
saved_count, EXPECTED_ESI_STARGATE_COUNT
);
Ok(())
}

async fn pull_all_systems(client: Client, graph: Arc<Graph>) -> Result<(), ReplicationError> {
println!("Pulling all systems from ESI");
let mut set = JoinSet::new();

get_system_ids(&client)
.await
.unwrap()
.iter()
.for_each(|&system_id| {
println!("Spawning task to pull {} system if its missing", system_id);
set.spawn(pull_system_if_missing(
client.clone(),
graph.clone(),
Expand All @@ -247,23 +326,14 @@ async fn pull_system_if_missing(
graph: Arc<Graph>,
system_id: i64,
) -> Result<(), ReplicationError> {
println!("Checking if system_id {} exists in the database", system_id);

match system_id_exists(graph.clone(), system_id).await {
Ok(exists) => {
if !exists {
println!(
"System {} does not already exist in the database",
system_id
);
pull_system(client, graph.clone(), system_id).await?;
}
Ok(())
}
Err(_) => {
println!("Error checking if system_id {} exists", system_id);
Err(TargetError(Error::ConnectionError))
}
Err(_) => Err(TargetError(Error::ConnectionError)),
}
}

Expand Down Expand Up @@ -309,10 +379,8 @@ async fn pull_system(
graph: Arc<Graph>,
system_id: i64,
) -> Result<(), ReplicationError> {
println!("Getting system {} from ESI", system_id);
let system_response = get_system_details(&client, system_id).await?;
let system = System::from(system_response);
println!("Saving system {} to database", system_id);
save_system(&graph, &system).await.map_err(TargetError)
}

Expand Down

0 comments on commit 97ff16a

Please sign in to comment.