Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consider shutdown signal during entire signer loop #936

Merged
merged 4 commits into from
Apr 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 54 additions & 36 deletions libs/sdk-core/src/greenlight/node_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ impl Greenlight {
})
}

async fn run_forever(&self, mut shutdown: mpsc::Receiver<()>) -> Result<(), anyhow::Error> {
/// Create and, if necessary, upgrade the scheduler
async fn init_scheduler(&self) -> Result<SchedulerClient<tonic::transport::channel::Channel>> {
let channel = Endpoint::from_shared(utils::scheduler_uri())?
.tls_config(self.tls_config.client_tls_config())?
.tcp_keepalive(Some(Duration::from_secs(30)))
Expand Down Expand Up @@ -216,46 +217,63 @@ impl Greenlight {
break;
}

Ok(scheduler)
}

/// The core signer loop. Connects to the signer and keeps the connection alive.
///
/// Used as inner loop for `run_forever`.
async fn run_forever_inner(
&self,
mut scheduler: SchedulerClient<tonic::transport::channel::Channel>,
) -> Result<(), anyhow::Error> {
loop {
debug!("Start of the signer loop, getting node_info from scheduler");
let get_node = scheduler.get_node_info(NodeInfoRequest {
node_id: self.signer.node_id(),
// Purposely not using the `wait` parameter
wait: false,
});
tokio::select! {
info = get_node => {
let node_info = match info.map(|v| v.into_inner()) {
Ok(v) => {
debug!("Got node_info from scheduler: {:?}", v);
v
}
Err(e) => {
trace!(
"Got an error from the scheduler: {}. Sleeping before retrying",
e
);
sleep(Duration::from_millis(1000)).await;
continue;
}
};

if node_info.grpc_uri.is_empty() {
trace!("Got an empty GRPC URI, node is not scheduled, sleeping and retrying");
sleep(Duration::from_millis(1000)).await;
continue;
}
let node_info_res = scheduler
.get_node_info(NodeInfoRequest {
node_id: self.signer.node_id(),
// Purposely not using the `wait` parameter
wait: false,
})
.await;

if let Err(e) = self.signer.run_once(Uri::from_maybe_shared(node_info.grpc_uri)?).await {
warn!("Error running against node: {}", e);
}
},
_ = shutdown.recv() => {
debug!("Received the signal to exit the signer loop");
break;
},
let node_info = match node_info_res.map(|v| v.into_inner()) {
Ok(v) => {
debug!("Got node_info from scheduler: {:?}", v);
v
}
Err(e) => {
trace!("Got an error from the scheduler: {e}. Sleeping before retrying");
sleep(Duration::from_millis(1000)).await;
continue;
}
};

if node_info.grpc_uri.is_empty() {
trace!("Got an empty GRPC URI, node is not scheduled, sleeping and retrying");
sleep(Duration::from_millis(1000)).await;
continue;
}

if let Err(e) = self
.signer
.run_once(Uri::from_maybe_shared(node_info.grpc_uri)?)
.await
{
warn!("Error running against node: {e}");
}
}
}

async fn run_forever(&self, mut shutdown: mpsc::Receiver<()>) -> Result<(), anyhow::Error> {
let scheduler = self.init_scheduler().await?;
tokio::select! {
run_forever_inner_res = self.run_forever_inner(scheduler) => {
error!("Inner signer loop exited unexpectedly: {run_forever_inner_res:?}");
},
_ = shutdown.recv() => debug!("Received the signal to exit the signer loop")
};

info!("Exiting the signer loop");
Ok(())
}
Expand Down
Loading