From e409e75c1885d0914ea2a4e94c2b76b8c4877950 Mon Sep 17 00:00:00 2001 From: ok300 <106775972+ok300@users.noreply.github.com> Date: Wed, 17 Apr 2024 09:39:33 +0200 Subject: [PATCH] Consider shutdown signal during entire signer loop (#936) * Consider shutdown signal during entire signer loop * Cargo fmt * Add comments to run_forever_inner() * Separate scheduler and signer loop --- libs/sdk-core/src/greenlight/node_api.rs | 90 ++++++++++++++---------- 1 file changed, 54 insertions(+), 36 deletions(-) diff --git a/libs/sdk-core/src/greenlight/node_api.rs b/libs/sdk-core/src/greenlight/node_api.rs index 64bfe65a4..80ea6fcd9 100644 --- a/libs/sdk-core/src/greenlight/node_api.rs +++ b/libs/sdk-core/src/greenlight/node_api.rs @@ -172,7 +172,8 @@ impl Greenlight { }) } - async fn run_forever(&self, mut shutdown: mpsc::Receiver<()>) -> Result<(), anyhow::Error> { + /// Create and, if necessary, upgrade the scheduler + async fn init_scheduler(&self) -> Result> { let channel = Endpoint::from_shared(utils::scheduler_uri())? .tls_config(self.tls_config.client_tls_config())? .tcp_keepalive(Some(Duration::from_secs(30))) @@ -216,46 +217,63 @@ impl Greenlight { break; } + Ok(scheduler) + } + + /// The core signer loop. Connects to the signer and keeps the connection alive. + /// + /// Used as inner loop for `run_forever`. + async fn run_forever_inner( + &self, + mut scheduler: SchedulerClient, + ) -> Result<(), anyhow::Error> { loop { debug!("Start of the signer loop, getting node_info from scheduler"); - let get_node = scheduler.get_node_info(NodeInfoRequest { - node_id: self.signer.node_id(), - // Purposely not using the `wait` parameter - wait: false, - }); - tokio::select! { - info = get_node => { - let node_info = match info.map(|v| v.into_inner()) { - Ok(v) => { - debug!("Got node_info from scheduler: {:?}", v); - v - } - Err(e) => { - trace!( - "Got an error from the scheduler: {}. Sleeping before retrying", - e - ); - sleep(Duration::from_millis(1000)).await; - continue; - } - }; - - if node_info.grpc_uri.is_empty() { - trace!("Got an empty GRPC URI, node is not scheduled, sleeping and retrying"); - sleep(Duration::from_millis(1000)).await; - continue; - } + let node_info_res = scheduler + .get_node_info(NodeInfoRequest { + node_id: self.signer.node_id(), + // Purposely not using the `wait` parameter + wait: false, + }) + .await; - if let Err(e) = self.signer.run_once(Uri::from_maybe_shared(node_info.grpc_uri)?).await { - warn!("Error running against node: {}", e); - } - }, - _ = shutdown.recv() => { - debug!("Received the signal to exit the signer loop"); - break; - }, + let node_info = match node_info_res.map(|v| v.into_inner()) { + Ok(v) => { + debug!("Got node_info from scheduler: {:?}", v); + v + } + Err(e) => { + trace!("Got an error from the scheduler: {e}. Sleeping before retrying"); + sleep(Duration::from_millis(1000)).await; + continue; + } }; + + if node_info.grpc_uri.is_empty() { + trace!("Got an empty GRPC URI, node is not scheduled, sleeping and retrying"); + sleep(Duration::from_millis(1000)).await; + continue; + } + + if let Err(e) = self + .signer + .run_once(Uri::from_maybe_shared(node_info.grpc_uri)?) + .await + { + warn!("Error running against node: {e}"); + } } + } + + async fn run_forever(&self, mut shutdown: mpsc::Receiver<()>) -> Result<(), anyhow::Error> { + let scheduler = self.init_scheduler().await?; + tokio::select! { + run_forever_inner_res = self.run_forever_inner(scheduler) => { + error!("Inner signer loop exited unexpectedly: {run_forever_inner_res:?}"); + }, + _ = shutdown.recv() => debug!("Received the signal to exit the signer loop") + }; + info!("Exiting the signer loop"); Ok(()) }