Skip to content

Commit

Permalink
indexer: Use listsinceblock instead of listtransactions
Browse files Browse the repository at this point in the history
This makes syncing more bandwidth-efficient and simplifies the implementation.

Resolves #33.
  • Loading branch information
shesek committed Jun 8, 2020
1 parent 073d363 commit f7a2906
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 148 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

## Unreleased

- Electrum: Implement `--electrum-skip-merkle` to avoid generating SPV proofs entirely, even when it's possible.
- Electrum: Implement `--electrum-skip-merkle` to avoid generating SPV proofs entirely, even when it's possible. (#34)

- Indexer: Use `listsinceblock` instead of `listtransactions`. This makes syncing more bandwidth-efficient and simplifies the implementation. (#33)

## 0.1.3 - 2020-06-02

Expand Down
199 changes: 52 additions & 147 deletions src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use bitcoincore_rpc::json::{
};
use bitcoincore_rpc::{Client as RpcClient, RpcApi};

use crate::error::{OptionExt, Result};
use crate::error::Result;
use crate::hd::{HDWatcher, KeyOrigin};
use crate::store::{FundingInfo, MemoryStore, SpendingInfo, TxEntry};
use crate::types::{BlockId, InPoint, ScriptHash, TxStatus};
Expand Down Expand Up @@ -117,72 +117,66 @@ impl Indexer {
}

fn sync_transactions(&mut self, changelog: &mut Changelog) -> Result<BlockId> {
let start_height = self
.tip
.as_ref()
.map_or(0, |BlockId(tip_height, _)| tip_height + 1);
let since_block = self.tip.as_ref().map(|tip| tip.1);

let result: ListSinceBlockResult = self.rpc.call(
"listsinceblock",
&[json!(since_block), json!(1), json!(true)],
)?;

let tip_hash = result.lastblock;
let tip_height = self.rpc.get_block_header_info(&tip_hash)?.height as u32;

let mut buffered_outgoing: HashMap<Txid, (i32, u64)> = HashMap::new();

let synced_tip = load_transactions_since(
&self.rpc.clone(),
start_height,
None,
&mut |chunk, tip_height, is_first| {
// reset buffered txs whenever load_transactions_since() starts over, we'll get these
// outgoing txs again later (with a potentially different, more updated, `confirmations` number)
if is_first {
buffered_outgoing.clear();
for ltx in result.removed {
// transactions that were re-added in the active chain will appear in `removed`
// but with a positive confirmation count, ignore these.
if ltx.info.confirmations < 0 {
if self.store.purge_tx(&ltx.info.txid) {
changelog.push(|| IndexChange::TransactionReplaced(ltx.info.txid));
}
}
}

for ltx in chunk {
if ltx.info.confirmations < 0 {
if self.store.purge_tx(&ltx.info.txid) {
changelog.push(|| IndexChange::TransactionReplaced(ltx.info.txid));
}
continue;
}

// "listtransactions" in fact lists transaction outputs and not transactions.
// for "receive" txs, it returns one entry per wallet-owned output in the tx.
// for "send" txs, it returns one entry for every output in the tx, owned or not.
match ltx.detail.category {
TxCategory::Receive => {
// incoming txouts are easy: bitcoind tells us the associated
// address and label, giving us all the information we need in
// order to save the txo to the index.
self.process_incoming_txo(ltx, tip_height, changelog);
}
TxCategory::Send => {
// outgoing txs are more tricky: bitcoind doesn't tell us which
// prevouts are being spent, so we have to fetch the transaction to
// determine it. we can't do that straightaway because prevouts being
// spent might not be indexed yet. instead, buffer outgoing txs and
// process them at the end, so that the parent txs funding the prevouts
// are guaranteed to get indexed first.
buffered_outgoing.entry(ltx.info.txid).or_insert_with(|| {
// "send" transactions must have a fee
let fee = ltx.detail.fee.unwrap().abs().as_sat() as u64;
(ltx.info.confirmations, fee)
});
}
// ignore mining-related transactions
TxCategory::Generate | TxCategory::Immature | TxCategory::Orphan => (),
};
for ltx in result.transactions {
// "listtransactions"/"listsinceblock" in fact lists transaction outputs and not transactions.
// for "receive" txs, it returns one entry per wallet-owned output in the tx.
// for "send" txs, it returns one entry for every output in the tx, owned or not.
match ltx.detail.category {
TxCategory::Receive => {
// incoming txouts are easy: bitcoind tells us the associated
// address and label, giving us all the information we need in
// order to save the txo to the index.
self.process_incoming_txo(ltx, tip_height, changelog);
}
},
)?;
TxCategory::Send => {
// indexing outgoing txs require fetching the list of spent prevouts and
// comparing them against the wallet's known funded outputs. we can't do that
// straightaway because the prevouts being spent might not be indexed yet, so
// the outgoing txs are buffered and processed at the end, after the txs funding
// the prevouts are guarranted to be indexed.
buffered_outgoing.entry(ltx.info.txid).or_insert_with(|| {
// "send" transactions must have a fee
let fee = ltx.detail.fee.unwrap().abs().as_sat() as u64;
(ltx.info.confirmations, fee)
});
}
// ignore mining-related transactions
TxCategory::Generate | TxCategory::Immature | TxCategory::Orphan => (),
};
}

for (txid, (confirmations, fee)) in buffered_outgoing {
let status = TxStatus::from_confirmations(confirmations, synced_tip.0);
let status = TxStatus::from_confirmations(confirmations, tip_height);
self.process_outgoing_tx(txid, status, fee, changelog)
.map_err(|err| warn!("failed processing outgoing payment: {:?}", err))
.ok();
}

// TODO: complete fee information for incoming-only txs

Ok(synced_tip)
Ok(BlockId(tip_height, tip_hash))
}

// upsert the transaction while collecting the changelog
Expand Down Expand Up @@ -408,99 +402,10 @@ impl fmt::Display for IndexChange {
}
}

const INIT_TX_PER_PAGE: usize = 500;
const DELTA_TX_PER_PAGE: usize = 25;
const MAX_TX_PER_PAGE: usize = 5000;

// Fetch all unconfirmed transactions + transactions confirmed at or after start_height
fn load_transactions_since(
rpc: &RpcClient,
start_height: u32,
init_per_page: Option<usize>,
chunk_handler: &mut dyn FnMut(Vec<ListTransactionResult>, u32, bool),
) -> Result<BlockId> {
let mut start_index = 0;
let mut per_page = init_per_page.unwrap_or_else(|| {
if start_height == 0 {
// start with larger pages if we're catching up for the first time
INIT_TX_PER_PAGE
} else {
DELTA_TX_PER_PAGE
}
});
let mut oldest_seen = None;

let tip_height = rpc.get_block_count()? as u32;
let tip_hash = rpc.get_block_hash(tip_height as u64)?;

assert!(start_height <= tip_height + 1, "start_height too far");
let max_confirmations = (tip_height + 1 - start_height) as i32;

if start_height <= tip_height {
info!(
"syncing transactions since height {} + mempool transactions (tip height={} hash={})",
start_height, tip_height, tip_hash,
);
} else {
debug!("syncing mempool transactions (no new blocks)");
}

loop {
debug!(
"fetching {} transactions starting at index {}",
per_page, start_index
);

let mut chunk =
rpc.list_transactions(None, Some(per_page), Some(start_index), Some(true))?;

// this is necessary because we rely on the tip height to derive the confirmed height
// from the number of confirmations
if tip_hash != rpc.get_best_block_hash()? {
warn!("tip changed while fetching transactions, retrying...");
return load_transactions_since(rpc, start_height, Some(per_page), chunk_handler);
}

// make sure we didn't miss any transactions by comparing the first entry of this page with
// the last entry of the last page (the "marker")
if let Some(oldest_seen) = &oldest_seen {
let marker = chunk.pop().or_err("missing marker tx")?;

if oldest_seen != &(marker.info.txid, marker.detail.vout) {
warn!("transaction set changed while fetching transactions, retrying...");
return load_transactions_since(rpc, start_height, Some(per_page), chunk_handler);
}
}
// update the marker
if let Some(oldest) = chunk.first() {
oldest_seen = Some((oldest.info.txid, oldest.detail.vout));
} else {
break;
}

let chunk: Vec<ListTransactionResult> = chunk
.into_iter()
.rev()
.take_while(|ltx| ltx.info.confirmations <= max_confirmations)
.collect();

let exhausted = if start_index == 0 {
chunk.len() < per_page
} else {
// account for the removed marker tx
chunk.len() < per_page - 1
};

chunk_handler(chunk, tip_height, start_index == 0);

if exhausted {
break;
}

// -1 so we'll get the last entry of this page as the first of the next, as a marker for sanity check
start_index = start_index + per_page - 1;
per_page = MAX_TX_PER_PAGE.min(per_page * 2);
}

Ok(BlockId(tip_height, tip_hash))
#[derive(Clone, PartialEq, Eq, Debug, Deserialize)]
struct ListSinceBlockResult {
transactions: Vec<ListTransactionResult>,
#[serde(default)]
removed: Vec<ListTransactionResult>,
lastblock: BlockHash,
}

0 comments on commit f7a2906

Please sign in to comment.