From 93314b5dab0e1581294bf31014a80fa9b569e4b8 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 14 Oct 2024 10:58:23 +0200 Subject: [PATCH] Copy the fixed version of `init::synchronize_listeners` We previously discovered a minor bug that resulted in `synchronize_listeners` not calling the actual implementation of `Listen::block_connected`. While we will fix this upstream (https://github.com/lightningdevkit/rust-lightning/pull/3354), we now copy over the fixed code in question to not be blocked on the next LDK release. We also slightly adjusted it to account for types/APIs that are only visible inside of `lightning-block-sync`. We intend to drop the newly added module as soon as we can upgrade to the next LDK release shipping the fixed version. --- src/chain/block_sync_init.rs | 391 +++++++++++++++++++++++++++++++++++ src/chain/mod.rs | 5 +- 2 files changed, 394 insertions(+), 2 deletions(-) create mode 100644 src/chain/block_sync_init.rs diff --git a/src/chain/block_sync_init.rs b/src/chain/block_sync_init.rs new file mode 100644 index 000000000..1bc7972a5 --- /dev/null +++ b/src/chain/block_sync_init.rs @@ -0,0 +1,391 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +//! This module contains code that was copied from `lightning_block_sync` and slightly adjusted to account for +//! crate-visible types/APIs. +// TODO: Drop this version when upgrading to the LDK release shipping +// https://github.com/lightningdevkit/rust-lightning/pull/3354. + +use lightning::chain::Listen; + +use lightning_block_sync::poll::{ChainPoller, Poll, Validate, ValidatedBlockHeader}; +use lightning_block_sync::{BlockData, BlockSource, BlockSourceError, BlockSourceResult, Cache}; + +use bitcoin::block::Header; +use bitcoin::hash_types::BlockHash; +use bitcoin::network::Network; + +use lightning::chain; + +use std::ops::Deref; + +/// Returns a validated block header of the source's best chain tip. +/// +/// Upon success, the returned header can be used to initialize [`SpvClient`]. Useful during a fresh +/// start when there are no chain listeners to sync yet. +/// +/// [`SpvClient`]: crate::SpvClient +async fn validate_best_block_header( + block_source: B, +) -> BlockSourceResult +where + B::Target: BlockSource, +{ + let (best_block_hash, best_block_height) = block_source.get_best_block().await?; + block_source.get_header(&best_block_hash, best_block_height).await?.validate(best_block_hash) +} + +/// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each +/// listener's view of the chain from its paired block hash to `block_source`'s best chain tip. +/// +/// Upon success, the returned header can be used to initialize [`SpvClient`]. In the case of +/// failure, each listener may be left at a different block hash than the one it was originally +/// paired with. +/// +/// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before +/// switching to [`SpvClient`]. For example: +/// +/// ``` +/// use bitcoin::hash_types::BlockHash; +/// use bitcoin::network::Network; +/// +/// use lightning::chain; +/// use lightning::chain::Watch; +/// use lightning::chain::chainmonitor; +/// use lightning::chain::chainmonitor::ChainMonitor; +/// use lightning::chain::channelmonitor::ChannelMonitor; +/// use lightning::chain::chaininterface::BroadcasterInterface; +/// use lightning::chain::chaininterface::FeeEstimator; +/// use lightning::sign; +/// use lightning::sign::{EntropySource, NodeSigner, SignerProvider}; +/// use lightning::ln::channelmanager::{ChannelManager, ChannelManagerReadArgs}; +/// use lightning::routing::router::Router; +/// use lightning::util::config::UserConfig; +/// use lightning::util::logger::Logger; +/// use lightning::util::ser::ReadableArgs; +/// +/// use lightning_block_sync::*; +/// +/// use lightning::io::Cursor; +/// +/// async fn init_sync< +/// B: BlockSource, +/// ES: EntropySource, +/// NS: NodeSigner, +/// SP: SignerProvider, +/// T: BroadcasterInterface, +/// F: FeeEstimator, +/// R: Router, +/// L: Logger, +/// C: chain::Filter, +/// P: chainmonitor::Persist, +/// >( +/// block_source: &B, +/// chain_monitor: &ChainMonitor, +/// config: UserConfig, +/// entropy_source: &ES, +/// node_signer: &NS, +/// signer_provider: &SP, +/// tx_broadcaster: &T, +/// fee_estimator: &F, +/// router: &R, +/// logger: &L, +/// persister: &P, +/// ) { +/// // Read a serialized channel monitor paired with the block hash when it was persisted. +/// let serialized_monitor = "..."; +/// let (monitor_block_hash, mut monitor) = <(BlockHash, ChannelMonitor)>::read( +/// &mut Cursor::new(&serialized_monitor), (entropy_source, signer_provider)).unwrap(); +/// +/// // Read the channel manager paired with the block hash when it was persisted. +/// let serialized_manager = "..."; +/// let (manager_block_hash, mut manager) = { +/// let read_args = ChannelManagerReadArgs::new( +/// entropy_source, +/// node_signer, +/// signer_provider, +/// fee_estimator, +/// chain_monitor, +/// tx_broadcaster, +/// router, +/// logger, +/// config, +/// vec![&mut monitor], +/// ); +/// <(BlockHash, ChannelManager<&ChainMonitor, &T, &ES, &NS, &SP, &F, &R, &L>)>::read( +/// &mut Cursor::new(&serialized_manager), read_args).unwrap() +/// }; +/// +/// // Synchronize any channel monitors and the channel manager to be on the best block. +/// let mut cache = UnboundedCache::new(); +/// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger); +/// let listeners = vec![ +/// (monitor_block_hash, &monitor_listener as &dyn chain::Listen), +/// (manager_block_hash, &manager as &dyn chain::Listen), +/// ]; +/// let chain_tip = init::synchronize_listeners( +/// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap(); +/// +/// // Allow the chain monitor to watch any channels. +/// let monitor = monitor_listener.0; +/// chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor); +/// +/// // Create an SPV client to notify the chain monitor and channel manager of block events. +/// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin); +/// let mut chain_listener = (chain_monitor, &manager); +/// let spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener); +/// } +/// ``` +/// +/// [`SpvClient`]: crate::SpvClient +/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager +/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor +pub(super) async fn synchronize_listeners< + B: Deref + Sized + Send + Sync, + C: Cache, + L: chain::Listen + ?Sized, +>( + block_source: B, network: Network, header_cache: &mut C, + mut chain_listeners: Vec<(BlockHash, &L)>, +) -> BlockSourceResult +where + B::Target: BlockSource, +{ + let best_header = validate_best_block_header(&*block_source).await?; + + // Fetch the header for the block hash paired with each listener. + let mut chain_listeners_with_old_headers = Vec::new(); + for (old_block_hash, chain_listener) in chain_listeners.drain(..) { + let old_header = match header_cache.look_up(&old_block_hash) { + Some(header) => *header, + None => { + block_source.get_header(&old_block_hash, None).await?.validate(old_block_hash)? + }, + }; + chain_listeners_with_old_headers.push((old_header, chain_listener)) + } + + // Find differences and disconnect blocks for each listener individually. + let mut chain_poller = ChainPoller::new(block_source, network); + let mut chain_listeners_at_height = Vec::new(); + let mut most_common_ancestor = None; + let mut most_connected_blocks = Vec::new(); + for (old_header, chain_listener) in chain_listeners_with_old_headers.drain(..) { + // Disconnect any stale blocks, but keep them in the cache for the next iteration. + let header_cache = &mut ReadOnlyCache(header_cache); + let (common_ancestor, connected_blocks) = { + let chain_listener = &DynamicChainListener(chain_listener); + let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; + let difference = + chain_notifier.find_difference(best_header, &old_header, &mut chain_poller).await?; + chain_notifier.disconnect_blocks(difference.disconnected_blocks); + (difference.common_ancestor, difference.connected_blocks) + }; + + // Keep track of the most common ancestor and all blocks connected across all listeners. + chain_listeners_at_height.push((common_ancestor.height, chain_listener)); + if connected_blocks.len() > most_connected_blocks.len() { + most_common_ancestor = Some(common_ancestor); + most_connected_blocks = connected_blocks; + } + } + + // Connect new blocks for all listeners at once to avoid re-fetching blocks. + if let Some(common_ancestor) = most_common_ancestor { + let chain_listener = &ChainListenerSet(chain_listeners_at_height); + let mut chain_notifier = ChainNotifier { header_cache, chain_listener }; + chain_notifier + .connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller) + .await + .map_err(|(e, _)| e)?; + } + + Ok(best_header) +} + +/// A wrapper to make a cache read-only. +/// +/// Used to prevent losing headers that may be needed to disconnect blocks common to more than one +/// listener. +struct ReadOnlyCache<'a, C: Cache>(&'a mut C); + +impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> { + fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { + self.0.look_up(block_hash) + } + + fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) { + unreachable!() + } + + fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option { + None + } +} + +/// Wrapper for supporting dynamically sized chain listeners. +struct DynamicChainListener<'a, L: chain::Listen + ?Sized>(&'a L); + +impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L> { + fn filtered_block_connected( + &self, _header: &Header, _txdata: &chain::transaction::TransactionData, _height: u32, + ) { + unreachable!() + } + + fn block_disconnected(&self, header: &Header, height: u32) { + self.0.block_disconnected(header, height) + } +} + +/// A set of dynamically sized chain listeners, each paired with a starting block height. +struct ChainListenerSet<'a, L: chain::Listen + ?Sized>(Vec<(u32, &'a L)>); + +impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> { + fn block_connected(&self, block: &bitcoin::Block, height: u32) { + for (starting_height, chain_listener) in self.0.iter() { + if height > *starting_height { + chain_listener.block_connected(block, height); + } + } + } + + fn filtered_block_connected( + &self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32, + ) { + for (starting_height, chain_listener) in self.0.iter() { + if height > *starting_height { + chain_listener.filtered_block_connected(header, txdata, height); + } + } + } + + fn block_disconnected(&self, _header: &Header, _height: u32) { + unreachable!() + } +} + +/// Notifies [listeners] of blocks that have been connected or disconnected from the chain. +/// +/// [listeners]: lightning::chain::Listen +struct ChainNotifier<'a, C: Cache, L: Deref> +where + L::Target: chain::Listen, +{ + /// Cache for looking up headers before fetching from a block source. + header_cache: &'a mut C, + + /// Listener that will be notified of connected or disconnected blocks. + chain_listener: L, +} + +/// Changes made to the chain between subsequent polls that transformed it from having one chain tip +/// to another. +/// +/// Blocks are given in height-descending order. Therefore, blocks are first disconnected in order +/// before new blocks are connected in reverse order. +struct ChainDifference { + /// The most recent ancestor common between the chain tips. + /// + /// If there are any disconnected blocks, this is where the chain forked. + common_ancestor: ValidatedBlockHeader, + + /// Blocks that were disconnected from the chain since the last poll. + disconnected_blocks: Vec, + + /// Blocks that were connected to the chain since the last poll. + connected_blocks: Vec, +} + +impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L> +where + L::Target: chain::Listen, +{ + /// Returns the changes needed to produce the chain with `current_header` as its tip from the + /// chain with `prev_header` as its tip. + /// + /// Walks backwards from `current_header` and `prev_header`, finding the common ancestor. + async fn find_difference( + &self, current_header: ValidatedBlockHeader, prev_header: &ValidatedBlockHeader, + chain_poller: &mut P, + ) -> BlockSourceResult { + let mut disconnected_blocks = Vec::new(); + let mut connected_blocks = Vec::new(); + let mut current = current_header; + let mut previous = *prev_header; + loop { + // Found the common ancestor. + if current.to_best_block().block_hash == previous.to_best_block().block_hash { + break; + } + + // Walk back the chain, finding blocks needed to connect and disconnect. Only walk back + // the header with the greater height, or both if equal heights. + let current_height = current.height; + let previous_height = previous.height; + if current_height <= previous_height { + disconnected_blocks.push(previous); + previous = self.look_up_previous_header(chain_poller, &previous).await?; + } + if current_height >= previous_height { + connected_blocks.push(current); + current = self.look_up_previous_header(chain_poller, ¤t).await?; + } + } + + let common_ancestor = current; + Ok(ChainDifference { common_ancestor, disconnected_blocks, connected_blocks }) + } + + /// Returns the previous header for the given header, either by looking it up in the cache or + /// fetching it if not found. + async fn look_up_previous_header( + &self, chain_poller: &mut P, header: &ValidatedBlockHeader, + ) -> BlockSourceResult { + match self.header_cache.look_up(&header.header.prev_blockhash) { + Some(prev_header) => Ok(*prev_header), + None => chain_poller.look_up_previous_header(header).await, + } + } + + /// Notifies the chain listeners of disconnected blocks. + fn disconnect_blocks(&mut self, mut disconnected_blocks: Vec) { + for header in disconnected_blocks.drain(..) { + if let Some(cached_header) = + self.header_cache.block_disconnected(&header.to_best_block().block_hash) + { + assert_eq!(cached_header, header); + } + self.chain_listener.block_disconnected(&header.header, header.height); + } + } + + /// Notifies the chain listeners of connected blocks. + async fn connect_blocks( + &mut self, mut new_tip: ValidatedBlockHeader, + mut connected_blocks: Vec, chain_poller: &mut P, + ) -> Result<(), (BlockSourceError, Option)> { + for header in connected_blocks.drain(..).rev() { + let height = header.height; + let block_data = + chain_poller.fetch_block(&header).await.map_err(|e| (e, Some(new_tip)))?; + match block_data.deref() { + BlockData::FullBlock(block) => { + self.chain_listener.block_connected(block, height); + }, + BlockData::HeaderOnly(header) => { + self.chain_listener.filtered_block_connected(header, &[], height); + }, + } + + self.header_cache.block_connected(header.to_best_block().block_hash, header); + new_tip = header; + } + + Ok(()) + } +} diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 9781dcbc2..d53097cea 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -6,6 +6,9 @@ // accordance with one or both of these licenses. mod bitcoind_rpc; +mod block_sync_init; + +use crate::chain::bitcoind_rpc::BitcoindRpcClient; use crate::config::{ Config, EsploraSyncConfig, BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, @@ -37,8 +40,6 @@ use std::collections::HashMap; use std::sync::{Arc, Mutex, RwLock}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; -use self::bitcoind_rpc::BitcoindRpcClient; - // The default Esplora server we're using. pub(crate) const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/api";