Skip to content

Commit

Permalink
Emit wallet rescan and blockchain sync progress updates
Browse files Browse the repository at this point in the history
Over an mpsc channel, the FFI callback function, and to the console.

Refs #64
  • Loading branch information
shesek committed Nov 16, 2020
1 parent 041f4bb commit 55330b4
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 37 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
- Support binding on ephemeral port (`--http-addr 127.0.0.1:0`) (#63)

- Shutdown cleanly, via `SIGINT`/`SIGTERM` for CLI or a custom signal for library users (#62, #66)

- Emit wallet rescan and blockchain sync progress updates (via mpsc, [ffi](#64) and console)

- Renamed CLI options: `--http-server-addr` to `--http-addr`, `--electrum-rpc-addr` to `--electrum-addr`

Expand Down
2 changes: 1 addition & 1 deletion contrib/nodejs-bwt-daemon/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ let bwtd = await BwtDaemon({
gap_limit: 100,

// Progress notifications for history scanning (a full rescan from genesis can take 20-30 minutes)
progress_fn: progress => console.log('bwt progress %f%%', progress*100),
progress_cb: (type, progress, detail) => console.log('bwt progress %s %f%%', type, progress*100, detail),
})

// Get the assigned address/port for the Electrum/HTTP servers
Expand Down
2 changes: 1 addition & 1 deletion contrib/nodejs-bwt-daemon/example.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ const BwtDaemon = require('bwt-daemon')
electrum: true,
http: true,
verbose: 2,
progress_cb: progress => console.log('bwt progress %f%%', progress*100),
progress_cb: (type, progress, detail) => console.log('bwt progress %s %f%%', type, progress*100, detail),
})

console.log('bwt running', bwtd.electrum_addr, bwtd.http_addr)
Expand Down
8 changes: 6 additions & 2 deletions contrib/nodejs-bwt-daemon/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,12 @@ function init(options) {
reject(new Error(detail))
} else if (msg_type.startsWith('ready:')) {
services[msg_type.substr(6)] = detail
} else if (['booting', 'scanning', 'ready'].includes(msg_type)) {
opt_progress && opt_progress(progress)
} else if (msg_type == 'progress:sync') {
opt_progress && opt_progress('sync', progress, { tip_time: +detail })
} else if (msg_type == 'progress:scan') {
opt_progress && opt_progress('scan', progress, { eta: +detail })
} else if (['booting', 'ready'].includes(msg_type)) {
opt_progress && opt_progress(msg_type, progress, {})
}
}

Expand Down
2 changes: 1 addition & 1 deletion examples/use-from-rust.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn main() -> Result<()> {
config.setup_logger(); // optional

// Boot up bwt. The thread will be blocked until the initial sync is completed
let app = App::boot(config)?;
let app = App::boot(config, None)?;

// The index is now ready for querying
let query = app.query();
Expand Down
16 changes: 9 additions & 7 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ use std::{net, thread};

use bitcoincore_rpc::{self as rpc, Client as RpcClient, RpcApi};

use crate::util::{banner, debounce_sender, RpcApiExt};
use crate::util::bitcoincore_ext::{Progress, RpcApiExt};
use crate::util::{banner, debounce_sender};
use crate::{Config, Indexer, Query, Result, WalletWatcher};

#[cfg(feature = "electrum")]
Expand Down Expand Up @@ -32,7 +33,7 @@ pub struct App {
}

impl App {
pub fn boot(config: Config) -> Result<Self> {
pub fn boot(config: Config, progress_tx: Option<mpsc::Sender<Progress>>) -> Result<Self> {
debug!("{:?}", config);

let watcher = WalletWatcher::from_config(
Expand All @@ -55,14 +56,14 @@ impl App {
load_wallet(&rpc, bitcoind_wallet)?;
}

wait_bitcoind(&rpc)?;
wait_bitcoind(&rpc, progress_tx.clone())?;

if config.startup_banner {
println!("{}", banner::get_welcome_banner(&query, false)?);
}

// do an initial sync without keeping track of updates
indexer.write().unwrap().initial_sync()?;
indexer.write().unwrap().initial_sync(progress_tx)?;

let (sync_tx, sync_rx) = mpsc::channel();
// debounce sync message rate to avoid excessive indexing when bitcoind catches up
Expand Down Expand Up @@ -221,9 +222,9 @@ fn load_wallet(rpc: &RpcClient, name: &str) -> Result<()> {
}

// wait for bitcoind to sync and finish rescanning
fn wait_bitcoind(rpc: &RpcClient) -> Result<()> {
let bcinfo = rpc.wait_blockchain_sync()?;
let walletinfo = rpc.wait_wallet_scan()?;
fn wait_bitcoind(rpc: &RpcClient, progress_tx: Option<mpsc::Sender<Progress>>) -> Result<()> {
let bcinfo = rpc.wait_blockchain_sync(progress_tx.clone())?;
let walletinfo = rpc.wait_wallet_scan(progress_tx)?;

let netinfo = rpc.get_network_info()?;
info!(
Expand All @@ -234,6 +235,7 @@ fn wait_bitcoind(rpc: &RpcClient) -> Result<()> {
netinfo.protocol_version,
bcinfo.best_block_hash
);

trace!("{:?}", netinfo);
trace!("{:?}", bcinfo);
trace!("{:?}", walletinfo);
Expand Down
24 changes: 21 additions & 3 deletions src/indexer.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::{fmt, time};
use std::sync::{mpsc, Arc};
use std::{fmt, thread, time};

use serde::Serialize;

Expand All @@ -13,6 +13,7 @@ use bitcoincore_rpc::{Client as RpcClient, RpcApi};
use crate::error::Result;
use crate::store::{FundingInfo, MemoryStore, SpendingInfo, TxEntry};
use crate::types::{BlockId, InPoint, ScriptHash, TxStatus};
use crate::util::bitcoincore_ext::{Progress, RpcApiExt};
use crate::wallet::{KeyOrigin, WalletWatcher};

pub struct Indexer {
Expand Down Expand Up @@ -42,7 +43,7 @@ impl Indexer {

// continue to sync transactions and import addresses (with rescan) until no more new addresses
// need to be imported. the initial sync does not collect the Changelog and does not emit updates.
pub fn initial_sync(&mut self) -> Result<()> {
pub fn initial_sync(&mut self, progress_tx: Option<mpsc::Sender<Progress>>) -> Result<()> {
let timer = time::Instant::now();

info!("starting initial sync");
Expand All @@ -51,6 +52,8 @@ impl Indexer {
let mut changelog = Changelog::new(false);
let mut synced_tip;

spawn_send_progress_thread(self.rpc.clone(), progress_tx);

while {
synced_tip = self.sync_transactions(&mut changelog)?;
self.watcher.do_imports(&self.rpc, /*rescan=*/ true)?
Expand Down Expand Up @@ -415,3 +418,18 @@ impl fmt::Display for IndexChange {
write!(f, "{:?}", self)
}
}

// Spawn a thread to poll getwalletinfo, log progress and send progress updates via mpsc
fn spawn_send_progress_thread(
rpc: Arc<RpcClient>,
progress_tx: Option<mpsc::Sender<Progress>>,
) -> thread::JoinHandle<()> {
thread::spawn(move || {
// allow some time for the indexer to start the first set of imports
thread::sleep(time::Duration::from_secs(2));

if let Err(e) = rpc.wait_wallet_scan(progress_tx) {
warn!("getwalletinfo failed: {:?}", e);
}
})
}
57 changes: 44 additions & 13 deletions src/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ mod ffi {
use std::sync::{mpsc, Once};
use std::thread;

use crate::util::bitcoincore_ext::Progress;
use crate::{App, Config, Result};

const OK: i32 = 0;
const ERR: i32 = -1;

type Callback = extern "C" fn(*const c_char, f32, *const c_char);

#[repr(C)]
pub struct ShutdownHandler(mpsc::Sender<()>);
pub struct ShutdownHandler(mpsc::SyncSender<()>);

static INIT_LOGGER: Once = Once::new();

Expand All @@ -23,36 +26,36 @@ mod ffi {
#[no_mangle]
pub extern "C" fn bwt_start(
json_config: *const c_char,
callback_fn: extern "C" fn(*const c_char, f32, *const c_char),
callback_fn: Callback,
shutdown_out: *mut *const ShutdownHandler,
) -> i32 {
let json_config = unsafe { CStr::from_ptr(json_config) }.to_str().unwrap();

let callback = |msg_type: &str, progress: f32, detail: &str| {
callback_fn(cstring(msg_type), progress, cstring(detail))
};

let start = || -> Result<_> {
let config: Config = serde_json::from_str(json_config)?;
if config.verbose > 0 {
// The verbosity level cannot be changed once enabled.
INIT_LOGGER.call_once(|| config.setup_logger());
}

// TODO emit rescan progress updates with ETA from App::boot() and forward them
callback("booting", 0.0, "");
let app = App::boot(config)?;
let (progress_tx, progress_rx) = mpsc::channel();
let _progress_thread = spawn_recv_progress_thread(progress_rx, callback_fn.clone());

notify(callback_fn, "booting", 0.0, "");
let app = App::boot(config, Some(progress_tx))?;

// XXX progress_thread.join().unwrap();

#[cfg(feature = "electrum")]
if let Some(addr) = app.electrum_addr() {
callback("ready:electrum", 1.0, &addr.to_string());
notify(callback_fn, "ready:electrum", 1.0, &addr.to_string());
}
#[cfg(feature = "http")]
if let Some(addr) = app.http_addr() {
callback("ready:http", 1.0, &addr.to_string());
notify(callback_fn, "ready:http", 1.0, &addr.to_string());
}

callback("ready", 1.0, "");
notify(callback_fn, "ready", 1.0, "");

let shutdown_tx = app.sync_background();

Expand All @@ -66,7 +69,7 @@ mod ffi {
},
Err(e) => {
warn!("{:?}", e);
callback("error", 0.0, &e.to_string());
notify(callback_fn, "error", 0.0, &e.to_string());
ERR
}
}
Expand All @@ -82,7 +85,35 @@ mod ffi {
OK
}

fn notify(callback_fn: Callback, msg_type: &str, progress: f32, detail: &str) {
callback_fn(cstring(msg_type), progress, cstring(detail))
}

fn cstring(s: &str) -> *const c_char {
CString::new(s).unwrap().into_raw()
}

// Spawn a thread to receive mpsc progress updates and forward them to the callback_fn
fn spawn_recv_progress_thread(
progress_rx: mpsc::Receiver<Progress>,
callback_fn: Callback,
) -> thread::JoinHandle<()> {
thread::spawn(move || loop {
match progress_rx.recv() {
Ok(Progress::Sync {
progress_n,
tip_time,
}) => notify(
callback_fn,
"progress:sync",
progress_n,
&tip_time.to_string(),
),
Ok(Progress::Scan { progress_n, eta }) => {
notify(callback_fn, "progress:scan", progress_n, &eta.to_string())
}
Err(mpsc::RecvError) => break,
}
})
}
}
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn main() -> Result<()> {

config.setup_logger();

let app = App::boot(config)?;
let app = App::boot(config, None)?;
app.sync(None);

Ok(())
Expand Down
42 changes: 35 additions & 7 deletions src/util/bitcoincore_ext.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use serde::{de, Serialize};
use std::fmt::{self, Formatter};
use std::{thread, time};
use std::{sync::mpsc, thread, time};

use bitcoincore_rpc::json::{self, ImportMultiRescanSince, ScanningDetails};
use bitcoincore_rpc::{Client, Result as RpcResult, RpcApi};

const WAIT_INTERVAL: time::Duration = time::Duration::from_secs(10);
const WAIT_INTERVAL: time::Duration = time::Duration::from_secs(7);

// Extensions for rust-bitcoincore-rpc

Expand All @@ -30,7 +30,10 @@ pub trait RpcApiExt: RpcApi {
self.call("getmempoolinfo", &[])
}

fn wait_blockchain_sync(&self) -> RpcResult<json::GetBlockchainInfoResult> {
fn wait_blockchain_sync(
&self,
progress_tx: Option<mpsc::Sender<Progress>>,
) -> RpcResult<json::GetBlockchainInfoResult> {
Ok(loop {
let info = self.get_blockchain_info()?;

Expand All @@ -45,11 +48,23 @@ pub trait RpcApiExt: RpcApi {
info.blocks, info.headers, info.verification_progress * 100.0
);

if let Some(ref progress_tx) = progress_tx {
let progress = Progress::Sync {
progress_n: info.verification_progress as f32,
tip_time: info.median_time,
};
if progress_tx.send(progress).is_err() {
break info;
}
}
thread::sleep(WAIT_INTERVAL);
})
}

fn wait_wallet_scan(&self) -> RpcResult<json::GetWalletInfoResult> {
fn wait_wallet_scan(
&self,
progress_tx: Option<mpsc::Sender<Progress>>,
) -> RpcResult<json::GetWalletInfoResult> {
Ok(loop {
let info = self.get_wallet_info()?;
match info.scanning {
Expand All @@ -61,13 +76,20 @@ pub trait RpcApiExt: RpcApi {
Some(ScanningDetails::NotScanning(_)) => break info,
Some(ScanningDetails::Scanning { progress, duration }) => {
let duration = duration as u64;
let progress = progress as f64;
let eta = (duration as f64 / progress) as u64 - duration;
let progress_n = progress as f32;
let eta = (duration as f32 / progress_n) as u64 - duration;

info!(target: "bwt",
"waiting for bitcoind to finish scanning [done {:.1}%, running for {}m, eta {}m]",
progress * 100.0, duration / 60, eta / 60
progress_n * 100.0, duration / 60, eta / 60
);

if let Some(ref progress_tx) = progress_tx {
let progress = Progress::Scan { progress_n, eta };
if progress_tx.send(progress).is_err() {
break info;
}
}
}
};
thread::sleep(WAIT_INTERVAL);
Expand All @@ -77,6 +99,12 @@ pub trait RpcApiExt: RpcApi {

impl RpcApiExt for Client {}

#[derive(Debug, Copy, Clone)]
pub enum Progress {
Sync { progress_n: f32, tip_time: u64 },
Scan { progress_n: f32, eta: u64 },
}

#[derive(Clone, PartialEq, Eq, Debug, Deserialize, Serialize)]
pub struct GetBlockStatsResult {
pub height: u64,
Expand Down
1 change: 0 additions & 1 deletion src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,6 @@ impl WalletWatcher {
let has_imports = !import_reqs.is_empty();

if has_imports {
// TODO report syncing progress
info!(
"importing batch of {} addresses... (this may take awhile)",
import_reqs.len()
Expand Down

0 comments on commit 55330b4

Please sign in to comment.