From c9ce3b915dd98e84f3a1d5cd978c3c58a2026f4a Mon Sep 17 00:00:00 2001 From: ok300 <106775972+ok300@users.noreply.github.com> Date: Tue, 16 Apr 2024 21:35:42 +0200 Subject: [PATCH 1/4] Consider shutdown signal during entire signer loop --- libs/sdk-core/src/greenlight/node_api.rs | 66 +++++++++++++----------- 1 file changed, 36 insertions(+), 30 deletions(-) diff --git a/libs/sdk-core/src/greenlight/node_api.rs b/libs/sdk-core/src/greenlight/node_api.rs index 401153a39..977d3003b 100644 --- a/libs/sdk-core/src/greenlight/node_api.rs +++ b/libs/sdk-core/src/greenlight/node_api.rs @@ -172,7 +172,7 @@ impl Greenlight { }) } - async fn run_forever(&self, mut shutdown: mpsc::Receiver<()>) -> Result<(), anyhow::Error> { + async fn run_forever_signer(&self) -> Result<(), anyhow::Error> { let channel = Endpoint::from_shared(utils::scheduler_uri())? .tls_config(self.tls_config.client_tls_config())? .tcp_keepalive(Some(Duration::from_secs(30))) @@ -218,44 +218,50 @@ impl Greenlight { loop { debug!("Start of the signer loop, getting node_info from scheduler"); - let get_node = scheduler.get_node_info(NodeInfoRequest { + let node_info_res = scheduler.get_node_info(NodeInfoRequest { node_id: self.signer.node_id(), // Purposely not using the `wait` parameter wait: false, - }); - tokio::select! { - info = get_node => { - let node_info = match info.map(|v| v.into_inner()) { - Ok(v) => { - debug!("Got node_info from scheduler: {:?}", v); - v - } - Err(e) => { - trace!( + }).await; + + let node_info = match node_info_res.map(|v| v.into_inner()) { + Ok(v) => { + debug!("Got node_info from scheduler: {:?}", v); + v + } + Err(e) => { + trace!( "Got an error from the scheduler: {}. Sleeping before retrying", e ); - sleep(Duration::from_millis(1000)).await; - continue; - } - }; + sleep(Duration::from_millis(1000)).await; + continue; + } + }; - if node_info.grpc_uri.is_empty() { - trace!("Got an empty GRPC URI, node is not scheduled, sleeping and retrying"); - sleep(Duration::from_millis(1000)).await; - continue; - } + if node_info.grpc_uri.is_empty() { + trace!("Got an empty GRPC URI, node is not scheduled, sleeping and retrying"); + sleep(Duration::from_millis(1000)).await; + continue; + } - if let Err(e) = self.signer.run_once(Uri::from_maybe_shared(node_info.grpc_uri)?).await { - warn!("Error running against node: {}", e); - } - }, - _ = shutdown.recv() => { - debug!("Received the signal to exit the signer loop"); - break; - }, - }; + if let Err(e) = self.signer.run_once(Uri::from_maybe_shared(node_info.grpc_uri)?).await { + warn!("Error running against node: {}", e); + } } + } + + async fn run_forever(&self, mut shutdown: mpsc::Receiver<()>) -> Result<(), anyhow::Error> { + tokio::select! { + run_forever_res = self.run_forever_signer() => { + match run_forever_res { + Ok(_) => info!("Inner signer loop exited"), + Err(e) => error!("Inner signer loop exited with error: {e:?}"), + } + }, + _ = shutdown.recv() => debug!("Received the signal to exit the signer loop") + }; + info!("Exiting the signer loop"); Ok(()) } From e5b3ebfdda66f2557e2f9eced454840b5497348a Mon Sep 17 00:00:00 2001 From: ok300 <106775972+ok300@users.noreply.github.com> Date: Tue, 16 Apr 2024 21:47:03 +0200 Subject: [PATCH 2/4] Cargo fmt --- libs/sdk-core/src/greenlight/node_api.rs | 25 +++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/libs/sdk-core/src/greenlight/node_api.rs b/libs/sdk-core/src/greenlight/node_api.rs index 977d3003b..ca2f1f15b 100644 --- a/libs/sdk-core/src/greenlight/node_api.rs +++ b/libs/sdk-core/src/greenlight/node_api.rs @@ -218,11 +218,13 @@ impl Greenlight { loop { debug!("Start of the signer loop, getting node_info from scheduler"); - let node_info_res = scheduler.get_node_info(NodeInfoRequest { - node_id: self.signer.node_id(), - // Purposely not using the `wait` parameter - wait: false, - }).await; + let node_info_res = scheduler + .get_node_info(NodeInfoRequest { + node_id: self.signer.node_id(), + // Purposely not using the `wait` parameter + wait: false, + }) + .await; let node_info = match node_info_res.map(|v| v.into_inner()) { Ok(v) => { @@ -230,10 +232,7 @@ impl Greenlight { v } Err(e) => { - trace!( - "Got an error from the scheduler: {}. Sleeping before retrying", - e - ); + trace!("Got an error from the scheduler: {e}. Sleeping before retrying"); sleep(Duration::from_millis(1000)).await; continue; } @@ -245,8 +244,12 @@ impl Greenlight { continue; } - if let Err(e) = self.signer.run_once(Uri::from_maybe_shared(node_info.grpc_uri)?).await { - warn!("Error running against node: {}", e); + if let Err(e) = self + .signer + .run_once(Uri::from_maybe_shared(node_info.grpc_uri)?) + .await + { + warn!("Error running against node: {e}"); } } } From b6bdf01fe2267e6388fe32baefdb4e757870469c Mon Sep 17 00:00:00 2001 From: ok300 <106775972+ok300@users.noreply.github.com> Date: Tue, 16 Apr 2024 21:53:31 +0200 Subject: [PATCH 3/4] Add comments to run_forever_inner() --- libs/sdk-core/src/greenlight/node_api.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/libs/sdk-core/src/greenlight/node_api.rs b/libs/sdk-core/src/greenlight/node_api.rs index ca2f1f15b..8516a39d7 100644 --- a/libs/sdk-core/src/greenlight/node_api.rs +++ b/libs/sdk-core/src/greenlight/node_api.rs @@ -172,7 +172,10 @@ impl Greenlight { }) } - async fn run_forever_signer(&self) -> Result<(), anyhow::Error> { + /// The actual signer loop. Connects to, upgrades and keeps alive the connection to the signer. + /// + /// Used as inner loop for `run_forever`. + async fn run_forever_inner(&self) -> Result<(), anyhow::Error> { let channel = Endpoint::from_shared(utils::scheduler_uri())? .tls_config(self.tls_config.client_tls_config())? .tcp_keepalive(Some(Duration::from_secs(30))) @@ -256,7 +259,7 @@ impl Greenlight { async fn run_forever(&self, mut shutdown: mpsc::Receiver<()>) -> Result<(), anyhow::Error> { tokio::select! { - run_forever_res = self.run_forever_signer() => { + run_forever_res = self.run_forever_inner() => { match run_forever_res { Ok(_) => info!("Inner signer loop exited"), Err(e) => error!("Inner signer loop exited with error: {e:?}"), From dce8142c586945f564df0be7a850666405df3106 Mon Sep 17 00:00:00 2001 From: ok300 <106775972+ok300@users.noreply.github.com> Date: Wed, 17 Apr 2024 08:11:53 +0200 Subject: [PATCH 4/4] Separate scheduler and signer loop --- libs/sdk-core/src/greenlight/node_api.rs | 24 +++++++++++++++--------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/libs/sdk-core/src/greenlight/node_api.rs b/libs/sdk-core/src/greenlight/node_api.rs index 8516a39d7..b5f5f848c 100644 --- a/libs/sdk-core/src/greenlight/node_api.rs +++ b/libs/sdk-core/src/greenlight/node_api.rs @@ -172,10 +172,8 @@ impl Greenlight { }) } - /// The actual signer loop. Connects to, upgrades and keeps alive the connection to the signer. - /// - /// Used as inner loop for `run_forever`. - async fn run_forever_inner(&self) -> Result<(), anyhow::Error> { + /// Create and, if necessary, upgrade the scheduler + async fn init_scheduler(&self) -> Result> { let channel = Endpoint::from_shared(utils::scheduler_uri())? .tls_config(self.tls_config.client_tls_config())? .tcp_keepalive(Some(Duration::from_secs(30))) @@ -219,6 +217,16 @@ impl Greenlight { break; } + Ok(scheduler) + } + + /// The core signer loop. Connects to the signer and keeps the connection alive. + /// + /// Used as inner loop for `run_forever`. + async fn run_forever_inner( + &self, + mut scheduler: SchedulerClient, + ) -> Result<(), anyhow::Error> { loop { debug!("Start of the signer loop, getting node_info from scheduler"); let node_info_res = scheduler @@ -258,12 +266,10 @@ impl Greenlight { } async fn run_forever(&self, mut shutdown: mpsc::Receiver<()>) -> Result<(), anyhow::Error> { + let scheduler = self.init_scheduler().await?; tokio::select! { - run_forever_res = self.run_forever_inner() => { - match run_forever_res { - Ok(_) => info!("Inner signer loop exited"), - Err(e) => error!("Inner signer loop exited with error: {e:?}"), - } + run_forever_inner_res = self.run_forever_inner(scheduler) => { + error!("Inner signer loop exited unexpectedly: {run_forever_inner_res:?}"); }, _ = shutdown.recv() => debug!("Received the signal to exit the signer loop") };