Skip to content

Commit

Permalink
Consider shutdown signal during entire signer loop (#936)
Browse files Browse the repository at this point in the history
* Consider shutdown signal during entire signer loop

* Cargo fmt

* Add comments to run_forever_inner()

* Separate scheduler and signer loop
  • Loading branch information
ok300 committed Apr 17, 2024
1 parent 30c0ae0 commit e409e75
Showing 1 changed file with 54 additions and 36 deletions.
90 changes: 54 additions & 36 deletions libs/sdk-core/src/greenlight/node_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ impl Greenlight {
})
}

async fn run_forever(&self, mut shutdown: mpsc::Receiver<()>) -> Result<(), anyhow::Error> {
/// Create and, if necessary, upgrade the scheduler
async fn init_scheduler(&self) -> Result<SchedulerClient<tonic::transport::channel::Channel>> {
let channel = Endpoint::from_shared(utils::scheduler_uri())?
.tls_config(self.tls_config.client_tls_config())?
.tcp_keepalive(Some(Duration::from_secs(30)))
Expand Down Expand Up @@ -216,46 +217,63 @@ impl Greenlight {
break;
}

Ok(scheduler)
}

/// The core signer loop. Connects to the signer and keeps the connection alive.
///
/// Used as inner loop for `run_forever`.
async fn run_forever_inner(
&self,
mut scheduler: SchedulerClient<tonic::transport::channel::Channel>,
) -> Result<(), anyhow::Error> {
loop {
debug!("Start of the signer loop, getting node_info from scheduler");
let get_node = scheduler.get_node_info(NodeInfoRequest {
node_id: self.signer.node_id(),
// Purposely not using the `wait` parameter
wait: false,
});
tokio::select! {
info = get_node => {
let node_info = match info.map(|v| v.into_inner()) {
Ok(v) => {
debug!("Got node_info from scheduler: {:?}", v);
v
}
Err(e) => {
trace!(
"Got an error from the scheduler: {}. Sleeping before retrying",
e
);
sleep(Duration::from_millis(1000)).await;
continue;
}
};

if node_info.grpc_uri.is_empty() {
trace!("Got an empty GRPC URI, node is not scheduled, sleeping and retrying");
sleep(Duration::from_millis(1000)).await;
continue;
}
let node_info_res = scheduler
.get_node_info(NodeInfoRequest {
node_id: self.signer.node_id(),
// Purposely not using the `wait` parameter
wait: false,
})
.await;

if let Err(e) = self.signer.run_once(Uri::from_maybe_shared(node_info.grpc_uri)?).await {
warn!("Error running against node: {}", e);
}
},
_ = shutdown.recv() => {
debug!("Received the signal to exit the signer loop");
break;
},
let node_info = match node_info_res.map(|v| v.into_inner()) {
Ok(v) => {
debug!("Got node_info from scheduler: {:?}", v);
v
}
Err(e) => {
trace!("Got an error from the scheduler: {e}. Sleeping before retrying");
sleep(Duration::from_millis(1000)).await;
continue;
}
};

if node_info.grpc_uri.is_empty() {
trace!("Got an empty GRPC URI, node is not scheduled, sleeping and retrying");
sleep(Duration::from_millis(1000)).await;
continue;
}

if let Err(e) = self
.signer
.run_once(Uri::from_maybe_shared(node_info.grpc_uri)?)
.await
{
warn!("Error running against node: {e}");
}
}
}

async fn run_forever(&self, mut shutdown: mpsc::Receiver<()>) -> Result<(), anyhow::Error> {
let scheduler = self.init_scheduler().await?;
tokio::select! {
run_forever_inner_res = self.run_forever_inner(scheduler) => {
error!("Inner signer loop exited unexpectedly: {run_forever_inner_res:?}");
},
_ = shutdown.recv() => debug!("Received the signal to exit the signer loop")
};

info!("Exiting the signer loop");
Ok(())
}
Expand Down

0 comments on commit e409e75

Please sign in to comment.