Skip to content

Commit

Permalink
feat: Stop executors not in registry
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Jan 8, 2024
1 parent 7f436ea commit 6adfd2d
Showing 1 changed file with 31 additions and 1 deletion.
32 changes: 31 additions & 1 deletion coordinator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ async fn main() -> anyhow::Result<()> {

loop {
let indexer_registry = registry.fetch().await?;
synchronise_executors(&indexer_registry, &mut executors_handler).await?;

synchronise_executors(&indexer_registry, &mut executors_handler).await?;
synchronise_block_streams(&indexer_registry, &redis_client, &mut block_stream_handler)
.await?;
}
Expand Down Expand Up @@ -72,6 +72,10 @@ async fn synchronise_executors(
}
}

for active_executor in active_executors {
executors_handler.stop(active_executor.executor_id).await?;
}

Ok(())
}

Expand Down Expand Up @@ -310,6 +314,32 @@ mod tests {
.await
.unwrap();
}

#[tokio::test]
async fn stops_executors_not_in_registry() {
let indexer_registry = HashMap::from([]);

let mut executors_handler = ExecutorsHandler::default();
executors_handler.expect_list().returning(|| {
Ok(vec![runner::ExecutorInfo {
executor_id: "executor_id".to_string(),
account_id: "morgs.near".to_string(),
function_name: "test".to_string(),
status: "running".to_string(),
version: 2,
}])
});

executors_handler
.expect_stop()
.with(predicate::eq("executor_id".to_string()))
.returning(|_| Ok(()))
.once();

synchronise_executors(&indexer_registry, &mut executors_handler)
.await
.unwrap();
}
}

mod block_stream {
Expand Down

0 comments on commit 6adfd2d

Please sign in to comment.