From 5ba2a0be03ec5ea331c70574ccb7163739065712 Mon Sep 17 00:00:00 2001 From: Nadav Ivgi Date: Mon, 28 Dec 2020 06:33:27 +0200 Subject: [PATCH] Continue emitting progress updates until import is fully completed This is needed in case there are multiple rounds of imports. --- src/app.rs | 2 +- src/indexer.rs | 17 ++++++++++---- src/util/bitcoincore_ext.rs | 44 +++++++++++++++++++++++-------------- 3 files changed, 42 insertions(+), 21 deletions(-) diff --git a/src/app.rs b/src/app.rs index 342caa1..c7d5111 100644 --- a/src/app.rs +++ b/src/app.rs @@ -225,7 +225,7 @@ fn wait_bitcoind(rpc: &RpcClient, progress_tx: Option>) - const INTERVAL: time::Duration = time::Duration::from_secs(7); let bcinfo = rpc.wait_blockchain_sync(progress_tx.clone(), INTERVAL)?; - let walletinfo = rpc.wait_wallet_scan(progress_tx, INTERVAL, false)?; + let walletinfo = rpc.wait_wallet_scan(progress_tx, None, INTERVAL)?; let netinfo = rpc.get_network_info()?; info!( diff --git a/src/indexer.rs b/src/indexer.rs index cd691ef..b5c2630 100644 --- a/src/indexer.rs +++ b/src/indexer.rs @@ -52,13 +52,15 @@ impl Indexer { let mut changelog = Changelog::new(false); let mut synced_tip; - spawn_send_progress_thread(self.rpc.clone(), progress_tx); + let shutdown_progress_thread = spawn_send_progress_thread(self.rpc.clone(), progress_tx); while { synced_tip = self.sync_transactions(&mut changelog)?; self.watcher.do_imports(&self.rpc, /*rescan=*/ true)? } { /* do while */ } + shutdown_progress_thread.send(()).unwrap(); + self.sync_mempool(/*force_refresh=*/ true)?; let stats = self.store.stats(); @@ -427,16 +429,23 @@ impl fmt::Display for IndexChange { fn spawn_send_progress_thread( rpc: Arc, progress_tx: Option>, -) -> thread::JoinHandle<()> { +) -> mpsc::SyncSender<()> { const DELAY: time::Duration = time::Duration::from_millis(250); const INTERVAL: time::Duration = time::Duration::from_millis(1500); + let (shutdown_tx, shutdown_rx) = mpsc::sync_channel(1); + thread::spawn(move || { // allow some time for the indexer to start the first set of imports thread::sleep(DELAY); - if let Err(e) = rpc.wait_wallet_scan(progress_tx, INTERVAL, true) { + if shutdown_rx.try_recv() != Err(mpsc::TryRecvError::Empty) { + return; + } + if let Err(e) = rpc.wait_wallet_scan(progress_tx, Some(shutdown_rx), INTERVAL) { warn!("getwalletinfo failed: {:?}", e); } - }) + }); + + shutdown_tx } diff --git a/src/util/bitcoincore_ext.rs b/src/util/bitcoincore_ext.rs index b03ca44..56cbbed 100644 --- a/src/util/bitcoincore_ext.rs +++ b/src/util/bitcoincore_ext.rs @@ -87,12 +87,21 @@ pub trait RpcApiExt: RpcApi { fn wait_wallet_scan( &self, progress_tx: Option>, + shutdown_rx: Option>, interval: time::Duration, - mut wait_for_scanning: bool, ) -> RpcResult { - let start = time::Instant::now(); + // Stop if the shutdown signal was received or if the channel was disconnected + let should_shutdown = || { + shutdown_rx + .as_ref() + .map_or(false, |rx| rx.try_recv() != Err(mpsc::TryRecvError::Empty)) + }; + Ok(loop { let info = self.get_wallet_info()?; + if should_shutdown() { + break info; + } match info.scanning { None => { warn!("Your bitcoin node does not report the `scanning` status in `getwalletinfo`. It is recommended to upgrade to Bitcoin Core v0.19+ to enable this."); @@ -109,36 +118,39 @@ pub trait RpcApiExt: RpcApi { break info; } } - // wait_wallet_scan() could be called before scanning actually started, - // give it a few seconds to start up before giving up - if !wait_for_scanning || start.elapsed().as_secs() > 3 { + // Stop as soon as scanning is completed if no explicit shutdown_rx was given, + // or continue until the shutdown signal is received if it was. + if shutdown_rx.is_none() { break info; } } - Some(ScanningDetails::Scanning { progress, duration }) => { - wait_for_scanning = false; - let duration = duration as u64; - let progress_n = progress as f32; + Some(ScanningDetails::Scanning { + progress: progress_n, + duration, + }) => { let eta = if progress_n > 0.0 { - (duration as f32 / progress_n) as u64 - duration + (duration as f32 / progress_n) as u64 - duration as u64 } else { 0 }; - info!(target: "bwt", - "waiting for bitcoind to finish scanning [done {:.1}%, running for {}m, eta {}m]", - progress_n * 100.0, duration / 60, eta / 60 - ); - if let Some(ref progress_tx) = progress_tx { let progress = Progress::Scan { progress_n, eta }; if progress_tx.send(progress).is_err() { break info; } + } else { + info!(target: "bwt", + "waiting for bitcoind to finish scanning [done {:.1}%, running for {}m, eta {}m]", + progress_n * 100.0, duration / 60, eta / 60 + ); } } - }; + } thread::sleep(interval); + if should_shutdown() { + break info; + } }) } }