diff --git a/.github/workflows/cont_integration.yml b/.github/workflows/cont_integration.yml index 3dfb9906c..9db71a9d0 100644 --- a/.github/workflows/cont_integration.yml +++ b/.github/workflows/cont_integration.yml @@ -30,7 +30,7 @@ jobs: - name: Pin dependencies for MSRV if: matrix.rust.version == '1.63.0' run: | - cargo update -p zip --precise "0.6.2" + cargo update -p zstd-sys --precise "2.0.8+zstd.1.5.5" cargo update -p time --precise "0.3.20" cargo update -p jobserver --precise "0.1.26" cargo update -p home --precise "0.5.5" diff --git a/crates/esplora/src/async_ext.rs b/crates/esplora/src/async_ext.rs index 649cd6891..d5ba4a0e0 100644 --- a/crates/esplora/src/async_ext.rs +++ b/crates/esplora/src/async_ext.rs @@ -2,14 +2,14 @@ use async_trait::async_trait; use bdk_chain::collections::btree_map; use bdk_chain::{ bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid}, - collections::{BTreeMap, BTreeSet}, + collections::BTreeMap, local_chain::{self, CheckPoint}, BlockId, ConfirmationTimeHeightAnchor, TxGraph, }; use esplora_client::{Error, TxStatus}; use futures::{stream::FuturesOrdered, TryStreamExt}; -use crate::{anchor_from_status, ASSUME_FINAL_DEPTH}; +use crate::anchor_from_status; /// Trait to extend the functionality of [`esplora_client::AsyncClient`]. /// @@ -26,6 +26,12 @@ pub trait EsploraAsyncExt { /// /// The result of this method can be applied to [`LocalChain::apply_update`]. /// + /// ## Consistency + /// + /// The chain update returned is guaranteed to be consistent as long as there is not a *large* re-org + /// during the call. The size of re-org we can tollerate is server dependent but will be at + /// least 10. + /// /// [`LocalChain`]: bdk_chain::local_chain::LocalChain /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip /// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update @@ -85,21 +91,22 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { local_tip: CheckPoint, request_heights: impl IntoIterator + Send> + Send, ) -> Result { - let request_heights = request_heights.into_iter().collect::>(); - let new_tip_height = self.get_height().await?; - - // atomically fetch blocks from esplora - let mut fetched_blocks = { - let heights = (0..=new_tip_height).rev(); - let hashes = self - .get_blocks(Some(new_tip_height)) - .await? - .into_iter() - .map(|b| b.id); - heights.zip(hashes).collect::>() - }; + // Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are + // consistent. + let mut fetched_blocks = self + .get_blocks(None) + .await? + .into_iter() + .map(|b| (b.time.height, b.id)) + .collect::>(); + let new_tip_height = fetched_blocks + .keys() + .last() + .copied() + .expect("must have atleast one block"); - // fetch heights that the caller is interested in + // Fetch blocks of heights that the caller is interested in, skipping blocks that are + // already fetched when constructing `fetched_blocks`. for height in request_heights { // do not fetch blocks higher than remote tip if height > new_tip_height { @@ -107,81 +114,37 @@ impl EsploraAsyncExt for esplora_client::AsyncClient { } // only fetch what is missing if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) { - let hash = self.get_block_hash(height).await?; - entry.insert(hash); + // ❗The return value of `get_block_hash` is not strictly guaranteed to be consistent + // with the chain at the time of `get_blocks` above (there could have been a deep + // re-org). Since `get_blocks` returns 10 (or so) blocks we are assuming that it's + // not possible to have a re-org deeper than that. + entry.insert(self.get_block_hash(height).await?); } } - // find the earliest point of agreement between local chain and fetched chain - let earliest_agreement_cp = { - let mut earliest_agreement_cp = Option::::None; - - let local_tip_height = local_tip.height(); - for local_cp in local_tip.iter() { - let local_block = local_cp.block_id(); - - // the updated hash (block hash at this height after the update), can either be: - // 1. a block that already existed in `fetched_blocks` - // 2. a block that exists locally and at least has a depth of ASSUME_FINAL_DEPTH - // 3. otherwise we can freshly fetch the block from remote, which is safe as it - // is guaranteed that this would be at or below ASSUME_FINAL_DEPTH from the - // remote tip - let updated_hash = match fetched_blocks.entry(local_block.height) { - btree_map::Entry::Occupied(entry) => *entry.get(), - btree_map::Entry::Vacant(entry) => *entry.insert( - if local_tip_height - local_block.height >= ASSUME_FINAL_DEPTH { - local_block.hash - } else { - self.get_block_hash(local_block.height).await? - }, - ), - }; - - // since we may introduce blocks below the point of agreement, we cannot break - // here unconditionally - we only break if we guarantee there are no new heights - // below our current local checkpoint - if local_block.hash == updated_hash { - earliest_agreement_cp = Some(local_cp); - - let first_new_height = *fetched_blocks - .keys() - .next() - .expect("must have at least one new block"); - if first_new_height >= local_block.height { - break; - } - } + // Ensure `fetched_blocks` can create an update that connects with the original chain by + // finding a "Point of Agreement". + for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) { + if height > new_tip_height { + continue; } - earliest_agreement_cp - }; - - let tip = { - // first checkpoint to use for the update chain - let first_cp = match earliest_agreement_cp { - Some(cp) => cp, - None => { - let (&height, &hash) = fetched_blocks - .iter() - .next() - .expect("must have at least one new block"); - CheckPoint::new(BlockId { height, hash }) + let fetched_hash = match fetched_blocks.entry(height) { + btree_map::Entry::Occupied(entry) => *entry.get(), + btree_map::Entry::Vacant(entry) => { + *entry.insert(self.get_block_hash(height).await?) } }; - // transform fetched chain into the update chain - fetched_blocks - // we exclude anything at or below the first cp of the update chain otherwise - // building the chain will fail - .split_off(&(first_cp.height() + 1)) - .into_iter() - .map(|(height, hash)| BlockId { height, hash }) - .fold(first_cp, |prev_cp, block| { - prev_cp.push(block).expect("must extend checkpoint") - }) - }; + + // We have found point of agreement so the update will connect! + if fetched_hash == local_hash { + break; + } + } Ok(local_chain::Update { - tip, + tip: CheckPoint::from_block_ids(fetched_blocks.into_iter().map(BlockId::from)) + .expect("must be in height order"), introduce_older_blocks: true, }) } diff --git a/crates/esplora/src/blocking_ext.rs b/crates/esplora/src/blocking_ext.rs index 493c4b8a7..8f9fcd59c 100644 --- a/crates/esplora/src/blocking_ext.rs +++ b/crates/esplora/src/blocking_ext.rs @@ -1,7 +1,7 @@ use std::thread::JoinHandle; use bdk_chain::collections::btree_map; -use bdk_chain::collections::{BTreeMap, BTreeSet}; +use bdk_chain::collections::BTreeMap; use bdk_chain::{ bitcoin::{BlockHash, OutPoint, ScriptBuf, Txid}, local_chain::{self, CheckPoint}, @@ -9,7 +9,7 @@ use bdk_chain::{ }; use esplora_client::{Error, TxStatus}; -use crate::{anchor_from_status, ASSUME_FINAL_DEPTH}; +use crate::anchor_from_status; /// Trait to extend the functionality of [`esplora_client::BlockingClient`]. /// @@ -24,6 +24,12 @@ pub trait EsploraExt { /// /// The result of this method can be applied to [`LocalChain::apply_update`]. /// + /// ## Consistency + /// + /// The chain update returned is guaranteed to be consistent as long as there is not a *large* re-org + /// during the call. The size of re-org we can tollerate is server dependent but will be at + /// least 10. + /// /// [`LocalChain`]: bdk_chain::local_chain::LocalChain /// [`LocalChain::tip`]: bdk_chain::local_chain::LocalChain::tip /// [`LocalChain::apply_update`]: bdk_chain::local_chain::LocalChain::apply_update @@ -78,20 +84,21 @@ impl EsploraExt for esplora_client::BlockingClient { local_tip: CheckPoint, request_heights: impl IntoIterator, ) -> Result { - let request_heights = request_heights.into_iter().collect::>(); - let new_tip_height = self.get_height()?; - - // atomically fetch blocks from esplora - let mut fetched_blocks = { - let heights = (0..=new_tip_height).rev(); - let hashes = self - .get_blocks(Some(new_tip_height))? - .into_iter() - .map(|b| b.id); - heights.zip(hashes).collect::>() - }; + // Fetch latest N (server dependent) blocks from Esplora. The server guarantees these are + // consistent. + let mut fetched_blocks = self + .get_blocks(None)? + .into_iter() + .map(|b| (b.time.height, b.id)) + .collect::>(); + let new_tip_height = fetched_blocks + .keys() + .last() + .copied() + .expect("must atleast have one block"); - // fetch heights that the caller is interested in + // Fetch blocks of heights that the caller is interested in, skipping blocks that are + // already fetched when constructing `fetched_blocks`. for height in request_heights { // do not fetch blocks higher than remote tip if height > new_tip_height { @@ -99,81 +106,35 @@ impl EsploraExt for esplora_client::BlockingClient { } // only fetch what is missing if let btree_map::Entry::Vacant(entry) = fetched_blocks.entry(height) { - let hash = self.get_block_hash(height)?; - entry.insert(hash); + // ❗The return value of `get_block_hash` is not strictly guaranteed to be consistent + // with the chain at the time of `get_blocks` above (there could have been a deep + // re-org). Since `get_blocks` returns 10 (or so) blocks we are assuming that it's + // not possible to have a re-org deeper than that. + entry.insert(self.get_block_hash(height)?); } } - // find the earliest point of agreement between local chain and fetched chain - let earliest_agreement_cp = { - let mut earliest_agreement_cp = Option::::None; - - let local_tip_height = local_tip.height(); - for local_cp in local_tip.iter() { - let local_block = local_cp.block_id(); - - // the updated hash (block hash at this height after the update), can either be: - // 1. a block that already existed in `fetched_blocks` - // 2. a block that exists locally and at least has a depth of ASSUME_FINAL_DEPTH - // 3. otherwise we can freshly fetch the block from remote, which is safe as it - // is guaranteed that this would be at or below ASSUME_FINAL_DEPTH from the - // remote tip - let updated_hash = match fetched_blocks.entry(local_block.height) { - btree_map::Entry::Occupied(entry) => *entry.get(), - btree_map::Entry::Vacant(entry) => *entry.insert( - if local_tip_height - local_block.height >= ASSUME_FINAL_DEPTH { - local_block.hash - } else { - self.get_block_hash(local_block.height)? - }, - ), - }; - - // since we may introduce blocks below the point of agreement, we cannot break - // here unconditionally - we only break if we guarantee there are no new heights - // below our current local checkpoint - if local_block.hash == updated_hash { - earliest_agreement_cp = Some(local_cp); - - let first_new_height = *fetched_blocks - .keys() - .next() - .expect("must have at least one new block"); - if first_new_height >= local_block.height { - break; - } - } + // Ensure `fetched_blocks` can create an update that connects with the original chain by + // finding a "Point of Agreement". + for (height, local_hash) in local_tip.iter().map(|cp| (cp.height(), cp.hash())) { + if height > new_tip_height { + continue; } - earliest_agreement_cp - }; - - let tip = { - // first checkpoint to use for the update chain - let first_cp = match earliest_agreement_cp { - Some(cp) => cp, - None => { - let (&height, &hash) = fetched_blocks - .iter() - .next() - .expect("must have at least one new block"); - CheckPoint::new(BlockId { height, hash }) - } + let fetched_hash = match fetched_blocks.entry(height) { + btree_map::Entry::Occupied(entry) => *entry.get(), + btree_map::Entry::Vacant(entry) => *entry.insert(self.get_block_hash(height)?), }; - // transform fetched chain into the update chain - fetched_blocks - // we exclude anything at or below the first cp of the update chain otherwise - // building the chain will fail - .split_off(&(first_cp.height() + 1)) - .into_iter() - .map(|(height, hash)| BlockId { height, hash }) - .fold(first_cp, |prev_cp, block| { - prev_cp.push(block).expect("must extend checkpoint") - }) - }; + + // We have found point of agreement so the update will connect! + if fetched_hash == local_hash { + break; + } + } Ok(local_chain::Update { - tip, + tip: CheckPoint::from_block_ids(fetched_blocks.into_iter().map(BlockId::from)) + .expect("must be in height order"), introduce_older_blocks: true, }) } diff --git a/crates/esplora/src/lib.rs b/crates/esplora/src/lib.rs index 727c8c53b..535167ff2 100644 --- a/crates/esplora/src/lib.rs +++ b/crates/esplora/src/lib.rs @@ -31,8 +31,6 @@ mod async_ext; #[cfg(feature = "async")] pub use async_ext::*; -const ASSUME_FINAL_DEPTH: u32 = 15; - fn anchor_from_status(status: &TxStatus) -> Option { if let TxStatus { block_height: Some(height), diff --git a/crates/esplora/tests/blocking_ext.rs b/crates/esplora/tests/blocking_ext.rs index 50b19d1cc..b91231d1d 100644 --- a/crates/esplora/tests/blocking_ext.rs +++ b/crates/esplora/tests/blocking_ext.rs @@ -1,15 +1,31 @@ +use bdk_chain::local_chain::LocalChain; +use bdk_chain::BlockId; use bdk_esplora::EsploraExt; use electrsd::bitcoind::bitcoincore_rpc::RpcApi; use electrsd::bitcoind::{self, anyhow, BitcoinD}; use electrsd::{Conf, ElectrsD}; use esplora_client::{self, BlockingClient, Builder}; -use std::collections::{BTreeMap, HashSet}; +use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::str::FromStr; use std::thread::sleep; use std::time::Duration; use bdk_chain::bitcoin::{Address, Amount, BlockHash, Txid}; +macro_rules! h { + ($index:literal) => {{ + bdk_chain::bitcoin::hashes::Hash::hash($index.as_bytes()) + }}; +} + +macro_rules! local_chain { + [ $(($height:expr, $block_hash:expr)), * ] => {{ + #[allow(unused_mut)] + bdk_chain::local_chain::LocalChain::from_blocks([$(($height, $block_hash).into()),*].into_iter().collect()) + .expect("chain must have genesis block") + }}; +} + struct TestEnv { bitcoind: BitcoinD, #[allow(dead_code)] @@ -39,6 +55,20 @@ impl TestEnv { }) } + fn reset_electrsd(mut self) -> anyhow::Result { + let mut electrs_conf = Conf::default(); + electrs_conf.http_enabled = true; + let electrs_exe = + electrsd::downloaded_exe_path().expect("electrs version feature must be enabled"); + let electrsd = ElectrsD::with_conf(electrs_exe, &self.bitcoind, &electrs_conf)?; + + let base_url = format!("http://{}", &electrsd.esplora_url.clone().unwrap()); + let client = Builder::new(base_url.as_str()).build_blocking()?; + self.electrsd = electrsd; + self.client = client; + Ok(self) + } + fn mine_blocks( &self, count: usize, @@ -202,3 +232,180 @@ pub fn test_update_tx_graph_gap_limit() -> anyhow::Result<()> { Ok(()) } + +#[test] +fn update_local_chain() -> anyhow::Result<()> { + const TIP_HEIGHT: u32 = 50; + + let env = TestEnv::new()?; + let blocks = { + let bitcoind_client = &env.bitcoind.client; + assert_eq!(bitcoind_client.get_block_count()?, 1); + [ + (0, bitcoind_client.get_block_hash(0)?), + (1, bitcoind_client.get_block_hash(1)?), + ] + .into_iter() + .chain((2..).zip(env.mine_blocks((TIP_HEIGHT - 1) as usize, None)?)) + .collect::>() + }; + // so new blocks can be seen by Electrs + let env = env.reset_electrsd()?; + + struct TestCase { + name: &'static str, + chain: LocalChain, + request_heights: &'static [u32], + exp_update_heights: &'static [u32], + } + + let test_cases = [ + TestCase { + name: "request_later_blocks", + chain: local_chain![(0, blocks[&0]), (21, blocks[&21])], + request_heights: &[22, 25, 28], + exp_update_heights: &[21, 22, 25, 28], + }, + TestCase { + name: "request_prev_blocks", + chain: local_chain![(0, blocks[&0]), (1, blocks[&1]), (5, blocks[&5])], + request_heights: &[4], + exp_update_heights: &[4, 5], + }, + TestCase { + name: "request_prev_blocks_2", + chain: local_chain![(0, blocks[&0]), (1, blocks[&1]), (10, blocks[&10])], + request_heights: &[4, 6], + exp_update_heights: &[4, 6, 10], + }, + TestCase { + name: "request_later_and_prev_blocks", + chain: local_chain![(0, blocks[&0]), (7, blocks[&7]), (11, blocks[&11])], + request_heights: &[8, 9, 15], + exp_update_heights: &[8, 9, 11, 15], + }, + TestCase { + name: "request_tip_only", + chain: local_chain![(0, blocks[&0]), (5, blocks[&5]), (49, blocks[&49])], + request_heights: &[TIP_HEIGHT], + exp_update_heights: &[49], + }, + TestCase { + name: "request_nothing", + chain: local_chain![(0, blocks[&0]), (13, blocks[&13]), (23, blocks[&23])], + request_heights: &[], + exp_update_heights: &[23], + }, + TestCase { + name: "request_nothing_during_reorg", + chain: local_chain![(0, blocks[&0]), (13, blocks[&13]), (23, h!("23"))], + request_heights: &[], + exp_update_heights: &[13, 23], + }, + TestCase { + name: "request_nothing_during_reorg_2", + chain: local_chain![ + (0, blocks[&0]), + (21, blocks[&21]), + (22, h!("22")), + (23, h!("23")) + ], + request_heights: &[], + exp_update_heights: &[21, 22, 23], + }, + TestCase { + name: "request_prev_blocks_during_reorg", + chain: local_chain![ + (0, blocks[&0]), + (21, blocks[&21]), + (22, h!("22")), + (23, h!("23")) + ], + request_heights: &[17, 20], + exp_update_heights: &[17, 20, 21, 22, 23], + }, + TestCase { + name: "request_later_blocks_during_reorg", + chain: local_chain![ + (0, blocks[&0]), + (9, blocks[&9]), + (22, h!("22")), + (23, h!("23")) + ], + request_heights: &[25, 27], + exp_update_heights: &[9, 22, 23, 25, 27], + }, + TestCase { + name: "request_later_blocks_during_reorg_2", + chain: local_chain![(0, blocks[&0]), (9, h!("9"))], + request_heights: &[10], + exp_update_heights: &[0, 9, 10], + }, + TestCase { + name: "request_later_and_prev_blocks_during_reorg", + chain: local_chain![(0, blocks[&0]), (1, blocks[&1]), (9, h!("9"))], + request_heights: &[8, 11], + exp_update_heights: &[1, 8, 9, 11], + }, + ]; + + for (i, t) in test_cases.into_iter().enumerate() { + println!("Case {}: {}", i, t.name); + let mut chain = t.chain; + + let update = env + .client + .update_local_chain(chain.tip(), t.request_heights.iter().copied()) + .map_err(|err| { + anyhow::format_err!("[{}:{}] `update_local_chain` failed: {}", i, t.name, err) + })?; + + let update_blocks = update + .tip + .iter() + .map(|cp| cp.block_id()) + .collect::>(); + + let exp_update_blocks = t + .exp_update_heights + .iter() + .map(|&height| { + let hash = blocks[&height]; + BlockId { height, hash } + }) + .chain( + // Electrs Esplora `get_block` call fetches 10 blocks which is included in the + // update + blocks + .range(TIP_HEIGHT - 9..) + .map(|(&height, &hash)| BlockId { height, hash }), + ) + .collect::>(); + + assert_eq!( + update_blocks, exp_update_blocks, + "[{}:{}] unexpected update", + i, t.name + ); + + let _ = chain + .apply_update(update) + .unwrap_or_else(|err| panic!("[{}:{}] update failed to apply: {}", i, t.name, err)); + + // all requested heights must exist in the final chain + for height in t.request_heights { + let exp_blockhash = blocks.get(height).expect("block must exist in bitcoind"); + assert_eq!( + chain.blocks().get(height), + Some(exp_blockhash), + "[{}:{}] block {}:{} must exist in final chain", + i, + t.name, + height, + exp_blockhash + ); + } + } + + Ok(()) +}