Skip to content

Commit

Permalink
Continue emitting progress updates until import is fully completed
Browse files Browse the repository at this point in the history
This is needed in case there are multiple rounds of imports.
  • Loading branch information
shesek committed Jan 6, 2021
1 parent fdd46f3 commit 5ba2a0b
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 21 deletions.
2 changes: 1 addition & 1 deletion src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ fn wait_bitcoind(rpc: &RpcClient, progress_tx: Option<mpsc::Sender<Progress>>) -
const INTERVAL: time::Duration = time::Duration::from_secs(7);

let bcinfo = rpc.wait_blockchain_sync(progress_tx.clone(), INTERVAL)?;
let walletinfo = rpc.wait_wallet_scan(progress_tx, INTERVAL, false)?;
let walletinfo = rpc.wait_wallet_scan(progress_tx, None, INTERVAL)?;

let netinfo = rpc.get_network_info()?;
info!(
Expand Down
17 changes: 13 additions & 4 deletions src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ impl Indexer {
let mut changelog = Changelog::new(false);
let mut synced_tip;

spawn_send_progress_thread(self.rpc.clone(), progress_tx);
let shutdown_progress_thread = spawn_send_progress_thread(self.rpc.clone(), progress_tx);

while {
synced_tip = self.sync_transactions(&mut changelog)?;
self.watcher.do_imports(&self.rpc, /*rescan=*/ true)?
} { /* do while */ }

shutdown_progress_thread.send(()).unwrap();

self.sync_mempool(/*force_refresh=*/ true)?;

let stats = self.store.stats();
Expand Down Expand Up @@ -427,16 +429,23 @@ impl fmt::Display for IndexChange {
fn spawn_send_progress_thread(
rpc: Arc<RpcClient>,
progress_tx: Option<mpsc::Sender<Progress>>,
) -> thread::JoinHandle<()> {
) -> mpsc::SyncSender<()> {
const DELAY: time::Duration = time::Duration::from_millis(250);
const INTERVAL: time::Duration = time::Duration::from_millis(1500);

let (shutdown_tx, shutdown_rx) = mpsc::sync_channel(1);

thread::spawn(move || {
// allow some time for the indexer to start the first set of imports
thread::sleep(DELAY);

if let Err(e) = rpc.wait_wallet_scan(progress_tx, INTERVAL, true) {
if shutdown_rx.try_recv() != Err(mpsc::TryRecvError::Empty) {
return;
}
if let Err(e) = rpc.wait_wallet_scan(progress_tx, Some(shutdown_rx), INTERVAL) {
warn!("getwalletinfo failed: {:?}", e);
}
})
});

shutdown_tx
}
44 changes: 28 additions & 16 deletions src/util/bitcoincore_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,21 @@ pub trait RpcApiExt: RpcApi {
fn wait_wallet_scan(
&self,
progress_tx: Option<mpsc::Sender<Progress>>,
shutdown_rx: Option<mpsc::Receiver<()>>,
interval: time::Duration,
mut wait_for_scanning: bool,
) -> RpcResult<json::GetWalletInfoResult> {
let start = time::Instant::now();
// Stop if the shutdown signal was received or if the channel was disconnected
let should_shutdown = || {
shutdown_rx
.as_ref()
.map_or(false, |rx| rx.try_recv() != Err(mpsc::TryRecvError::Empty))
};

Ok(loop {
let info = self.get_wallet_info()?;
if should_shutdown() {
break info;
}
match info.scanning {
None => {
warn!("Your bitcoin node does not report the `scanning` status in `getwalletinfo`. It is recommended to upgrade to Bitcoin Core v0.19+ to enable this.");
Expand All @@ -109,36 +118,39 @@ pub trait RpcApiExt: RpcApi {
break info;
}
}
// wait_wallet_scan() could be called before scanning actually started,
// give it a few seconds to start up before giving up
if !wait_for_scanning || start.elapsed().as_secs() > 3 {
// Stop as soon as scanning is completed if no explicit shutdown_rx was given,
// or continue until the shutdown signal is received if it was.
if shutdown_rx.is_none() {
break info;
}
}
Some(ScanningDetails::Scanning { progress, duration }) => {
wait_for_scanning = false;
let duration = duration as u64;
let progress_n = progress as f32;
Some(ScanningDetails::Scanning {
progress: progress_n,
duration,
}) => {
let eta = if progress_n > 0.0 {
(duration as f32 / progress_n) as u64 - duration
(duration as f32 / progress_n) as u64 - duration as u64
} else {
0
};

info!(target: "bwt",
"waiting for bitcoind to finish scanning [done {:.1}%, running for {}m, eta {}m]",
progress_n * 100.0, duration / 60, eta / 60
);

if let Some(ref progress_tx) = progress_tx {
let progress = Progress::Scan { progress_n, eta };
if progress_tx.send(progress).is_err() {
break info;
}
} else {
info!(target: "bwt",
"waiting for bitcoind to finish scanning [done {:.1}%, running for {}m, eta {}m]",
progress_n * 100.0, duration / 60, eta / 60
);
}
}
};
}
thread::sleep(interval);
if should_shutdown() {
break info;
}
})
}
}
Expand Down

0 comments on commit 5ba2a0b

Please sign in to comment.