Skip to content

Commit

Permalink
Wait for a node to sync before broadcasting protocol txs
Browse files Browse the repository at this point in the history
  • Loading branch information
sug0 committed Oct 19, 2023
1 parent 3f979bf commit 0ea463a
Showing 1 changed file with 37 additions and 0 deletions.
37 changes: 37 additions & 0 deletions apps/src/lib/node/ledger/broadcaster.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::net::SocketAddr;
use std::ops::ControlFlow;

use namada::types::control_flow::time;
use tokio::sync::mpsc::UnboundedReceiver;

use crate::facade::tendermint_rpc::{Client, HttpClient};
Expand All @@ -26,6 +28,41 @@ impl Broadcaster {
/// Loop forever, braodcasting messages that have been received
/// by the receiver
async fn run_loop(&mut self) {
let result = time::Sleep {
strategy: time::ExponentialBackoff {
base: 2,
as_duration: time::Duration::from_secs,
},
}
.run(|| async {
let status_result = time::Sleep {
strategy: time::Constant(time::Duration::from_secs(1)),
}
.timeout(
time::Instant::now() + time::Duration::from_secs(30),
|| async {
match self.client.status().await {
Ok(status) => ControlFlow::Break(status),
Err(_) => ControlFlow::Continue(()),
}
},
)
.await;
let status = match status_result {
Ok(status) => status,
Err(_) => return ControlFlow::Break(Err(())),
};
if status.sync_info.catching_up {
ControlFlow::Continue(())
} else {
ControlFlow::Break(Ok(()))
}
})
.await;
if let Err(()) = result {
tracing::error!("Broadcaster failed to connect to CometBFT node");
return;
}
loop {
if let Some(msg) = self.receiver.recv().await {
let _ = self.client.broadcast_tx_sync(msg.into()).await;
Expand Down

0 comments on commit 0ea463a

Please sign in to comment.