Skip to content

Commit

Permalink
Copy the fixed version of init::synchronize_listeners
Browse files Browse the repository at this point in the history
We previously discovered a minor bug that resulted in
`synchronize_listeners` not calling the actual implementation of
`Listen::block_connected`. While we will fix this upstream
(lightningdevkit/rust-lightning#3354), we now
copy over the fixed code in question to not be blocked on the next LDK
release. We also slightly adjusted it to account for types/APIs that are
only visible inside of `lightning-block-sync`.

We intend to drop the newly added module as soon as we can upgrade to
the next LDK release shipping the fixed version.
  • Loading branch information
tnull committed Oct 14, 2024
1 parent 8bf54fa commit 93314b5
Show file tree
Hide file tree
Showing 2 changed files with 394 additions and 2 deletions.
391 changes: 391 additions & 0 deletions src/chain/block_sync_init.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,391 @@
// This file is Copyright its original authors, visible in version control history.
//
// This file is licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
// http://opensource.org/licenses/MIT>, at your option. You may not use this file except in
// accordance with one or both of these licenses.

//! This module contains code that was copied from `lightning_block_sync` and slightly adjusted to account for
//! crate-visible types/APIs.
// TODO: Drop this version when upgrading to the LDK release shipping
// https://github.com/lightningdevkit/rust-lightning/pull/3354.

use lightning::chain::Listen;

use lightning_block_sync::poll::{ChainPoller, Poll, Validate, ValidatedBlockHeader};
use lightning_block_sync::{BlockData, BlockSource, BlockSourceError, BlockSourceResult, Cache};

use bitcoin::block::Header;
use bitcoin::hash_types::BlockHash;
use bitcoin::network::Network;

use lightning::chain;

use std::ops::Deref;

/// Returns a validated block header of the source's best chain tip.
///
/// Upon success, the returned header can be used to initialize [`SpvClient`]. Useful during a fresh
/// start when there are no chain listeners to sync yet.
///
/// [`SpvClient`]: crate::SpvClient
async fn validate_best_block_header<B: Deref>(
block_source: B,
) -> BlockSourceResult<ValidatedBlockHeader>
where
B::Target: BlockSource,
{
let (best_block_hash, best_block_height) = block_source.get_best_block().await?;
block_source.get_header(&best_block_hash, best_block_height).await?.validate(best_block_hash)
}

/// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each
/// listener's view of the chain from its paired block hash to `block_source`'s best chain tip.
///
/// Upon success, the returned header can be used to initialize [`SpvClient`]. In the case of
/// failure, each listener may be left at a different block hash than the one it was originally
/// paired with.
///
/// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before
/// switching to [`SpvClient`]. For example:
///
/// ```
/// use bitcoin::hash_types::BlockHash;
/// use bitcoin::network::Network;
///
/// use lightning::chain;
/// use lightning::chain::Watch;
/// use lightning::chain::chainmonitor;
/// use lightning::chain::chainmonitor::ChainMonitor;
/// use lightning::chain::channelmonitor::ChannelMonitor;
/// use lightning::chain::chaininterface::BroadcasterInterface;
/// use lightning::chain::chaininterface::FeeEstimator;
/// use lightning::sign;
/// use lightning::sign::{EntropySource, NodeSigner, SignerProvider};
/// use lightning::ln::channelmanager::{ChannelManager, ChannelManagerReadArgs};
/// use lightning::routing::router::Router;
/// use lightning::util::config::UserConfig;
/// use lightning::util::logger::Logger;
/// use lightning::util::ser::ReadableArgs;
///
/// use lightning_block_sync::*;
///
/// use lightning::io::Cursor;
///
/// async fn init_sync<
/// B: BlockSource,
/// ES: EntropySource,
/// NS: NodeSigner,
/// SP: SignerProvider,
/// T: BroadcasterInterface,
/// F: FeeEstimator,
/// R: Router,
/// L: Logger,
/// C: chain::Filter,
/// P: chainmonitor::Persist<SP::EcdsaSigner>,
/// >(
/// block_source: &B,
/// chain_monitor: &ChainMonitor<SP::EcdsaSigner, &C, &T, &F, &L, &P>,
/// config: UserConfig,
/// entropy_source: &ES,
/// node_signer: &NS,
/// signer_provider: &SP,
/// tx_broadcaster: &T,
/// fee_estimator: &F,
/// router: &R,
/// logger: &L,
/// persister: &P,
/// ) {
/// // Read a serialized channel monitor paired with the block hash when it was persisted.
/// let serialized_monitor = "...";
/// let (monitor_block_hash, mut monitor) = <(BlockHash, ChannelMonitor<SP::EcdsaSigner>)>::read(
/// &mut Cursor::new(&serialized_monitor), (entropy_source, signer_provider)).unwrap();
///
/// // Read the channel manager paired with the block hash when it was persisted.
/// let serialized_manager = "...";
/// let (manager_block_hash, mut manager) = {
/// let read_args = ChannelManagerReadArgs::new(
/// entropy_source,
/// node_signer,
/// signer_provider,
/// fee_estimator,
/// chain_monitor,
/// tx_broadcaster,
/// router,
/// logger,
/// config,
/// vec![&mut monitor],
/// );
/// <(BlockHash, ChannelManager<&ChainMonitor<SP::EcdsaSigner, &C, &T, &F, &L, &P>, &T, &ES, &NS, &SP, &F, &R, &L>)>::read(
/// &mut Cursor::new(&serialized_manager), read_args).unwrap()
/// };
///
/// // Synchronize any channel monitors and the channel manager to be on the best block.
/// let mut cache = UnboundedCache::new();
/// let mut monitor_listener = (monitor, &*tx_broadcaster, &*fee_estimator, &*logger);
/// let listeners = vec![
/// (monitor_block_hash, &monitor_listener as &dyn chain::Listen),
/// (manager_block_hash, &manager as &dyn chain::Listen),
/// ];
/// let chain_tip = init::synchronize_listeners(
/// block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap();
///
/// // Allow the chain monitor to watch any channels.
/// let monitor = monitor_listener.0;
/// chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
///
/// // Create an SPV client to notify the chain monitor and channel manager of block events.
/// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin);
/// let mut chain_listener = (chain_monitor, &manager);
/// let spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener);
/// }
/// ```
///
/// [`SpvClient`]: crate::SpvClient
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
pub(super) async fn synchronize_listeners<
B: Deref + Sized + Send + Sync,
C: Cache,
L: chain::Listen + ?Sized,
>(
block_source: B, network: Network, header_cache: &mut C,
mut chain_listeners: Vec<(BlockHash, &L)>,
) -> BlockSourceResult<ValidatedBlockHeader>
where
B::Target: BlockSource,
{
let best_header = validate_best_block_header(&*block_source).await?;

// Fetch the header for the block hash paired with each listener.
let mut chain_listeners_with_old_headers = Vec::new();
for (old_block_hash, chain_listener) in chain_listeners.drain(..) {
let old_header = match header_cache.look_up(&old_block_hash) {
Some(header) => *header,
None => {
block_source.get_header(&old_block_hash, None).await?.validate(old_block_hash)?
},
};
chain_listeners_with_old_headers.push((old_header, chain_listener))
}

// Find differences and disconnect blocks for each listener individually.
let mut chain_poller = ChainPoller::new(block_source, network);
let mut chain_listeners_at_height = Vec::new();
let mut most_common_ancestor = None;
let mut most_connected_blocks = Vec::new();
for (old_header, chain_listener) in chain_listeners_with_old_headers.drain(..) {
// Disconnect any stale blocks, but keep them in the cache for the next iteration.
let header_cache = &mut ReadOnlyCache(header_cache);
let (common_ancestor, connected_blocks) = {
let chain_listener = &DynamicChainListener(chain_listener);
let mut chain_notifier = ChainNotifier { header_cache, chain_listener };
let difference =
chain_notifier.find_difference(best_header, &old_header, &mut chain_poller).await?;
chain_notifier.disconnect_blocks(difference.disconnected_blocks);
(difference.common_ancestor, difference.connected_blocks)
};

// Keep track of the most common ancestor and all blocks connected across all listeners.
chain_listeners_at_height.push((common_ancestor.height, chain_listener));
if connected_blocks.len() > most_connected_blocks.len() {
most_common_ancestor = Some(common_ancestor);
most_connected_blocks = connected_blocks;
}
}

// Connect new blocks for all listeners at once to avoid re-fetching blocks.
if let Some(common_ancestor) = most_common_ancestor {
let chain_listener = &ChainListenerSet(chain_listeners_at_height);
let mut chain_notifier = ChainNotifier { header_cache, chain_listener };
chain_notifier
.connect_blocks(common_ancestor, most_connected_blocks, &mut chain_poller)
.await
.map_err(|(e, _)| e)?;
}

Ok(best_header)
}

/// A wrapper to make a cache read-only.
///
/// Used to prevent losing headers that may be needed to disconnect blocks common to more than one
/// listener.
struct ReadOnlyCache<'a, C: Cache>(&'a mut C);

impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> {
fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> {
self.0.look_up(block_hash)
}

fn block_connected(&mut self, _block_hash: BlockHash, _block_header: ValidatedBlockHeader) {
unreachable!()
}

fn block_disconnected(&mut self, _block_hash: &BlockHash) -> Option<ValidatedBlockHeader> {
None
}
}

/// Wrapper for supporting dynamically sized chain listeners.
struct DynamicChainListener<'a, L: chain::Listen + ?Sized>(&'a L);

impl<'a, L: chain::Listen + ?Sized> chain::Listen for DynamicChainListener<'a, L> {
fn filtered_block_connected(
&self, _header: &Header, _txdata: &chain::transaction::TransactionData, _height: u32,
) {
unreachable!()
}

fn block_disconnected(&self, header: &Header, height: u32) {
self.0.block_disconnected(header, height)
}
}

/// A set of dynamically sized chain listeners, each paired with a starting block height.
struct ChainListenerSet<'a, L: chain::Listen + ?Sized>(Vec<(u32, &'a L)>);

impl<'a, L: chain::Listen + ?Sized> chain::Listen for ChainListenerSet<'a, L> {
fn block_connected(&self, block: &bitcoin::Block, height: u32) {
for (starting_height, chain_listener) in self.0.iter() {
if height > *starting_height {
chain_listener.block_connected(block, height);
}
}
}

fn filtered_block_connected(
&self, header: &Header, txdata: &chain::transaction::TransactionData, height: u32,
) {
for (starting_height, chain_listener) in self.0.iter() {
if height > *starting_height {
chain_listener.filtered_block_connected(header, txdata, height);
}
}
}

fn block_disconnected(&self, _header: &Header, _height: u32) {
unreachable!()
}
}

/// Notifies [listeners] of blocks that have been connected or disconnected from the chain.
///
/// [listeners]: lightning::chain::Listen
struct ChainNotifier<'a, C: Cache, L: Deref>
where
L::Target: chain::Listen,
{
/// Cache for looking up headers before fetching from a block source.
header_cache: &'a mut C,

/// Listener that will be notified of connected or disconnected blocks.
chain_listener: L,
}

/// Changes made to the chain between subsequent polls that transformed it from having one chain tip
/// to another.
///
/// Blocks are given in height-descending order. Therefore, blocks are first disconnected in order
/// before new blocks are connected in reverse order.
struct ChainDifference {
/// The most recent ancestor common between the chain tips.
///
/// If there are any disconnected blocks, this is where the chain forked.
common_ancestor: ValidatedBlockHeader,

/// Blocks that were disconnected from the chain since the last poll.
disconnected_blocks: Vec<ValidatedBlockHeader>,

/// Blocks that were connected to the chain since the last poll.
connected_blocks: Vec<ValidatedBlockHeader>,
}

impl<'a, C: Cache, L: Deref> ChainNotifier<'a, C, L>
where
L::Target: chain::Listen,
{
/// Returns the changes needed to produce the chain with `current_header` as its tip from the
/// chain with `prev_header` as its tip.
///
/// Walks backwards from `current_header` and `prev_header`, finding the common ancestor.
async fn find_difference<P: Poll>(
&self, current_header: ValidatedBlockHeader, prev_header: &ValidatedBlockHeader,
chain_poller: &mut P,
) -> BlockSourceResult<ChainDifference> {
let mut disconnected_blocks = Vec::new();
let mut connected_blocks = Vec::new();
let mut current = current_header;
let mut previous = *prev_header;
loop {
// Found the common ancestor.
if current.to_best_block().block_hash == previous.to_best_block().block_hash {
break;
}

// Walk back the chain, finding blocks needed to connect and disconnect. Only walk back
// the header with the greater height, or both if equal heights.
let current_height = current.height;
let previous_height = previous.height;
if current_height <= previous_height {
disconnected_blocks.push(previous);
previous = self.look_up_previous_header(chain_poller, &previous).await?;
}
if current_height >= previous_height {
connected_blocks.push(current);
current = self.look_up_previous_header(chain_poller, &current).await?;
}
}

let common_ancestor = current;
Ok(ChainDifference { common_ancestor, disconnected_blocks, connected_blocks })
}

/// Returns the previous header for the given header, either by looking it up in the cache or
/// fetching it if not found.
async fn look_up_previous_header<P: Poll>(
&self, chain_poller: &mut P, header: &ValidatedBlockHeader,
) -> BlockSourceResult<ValidatedBlockHeader> {
match self.header_cache.look_up(&header.header.prev_blockhash) {
Some(prev_header) => Ok(*prev_header),
None => chain_poller.look_up_previous_header(header).await,
}
}

/// Notifies the chain listeners of disconnected blocks.
fn disconnect_blocks(&mut self, mut disconnected_blocks: Vec<ValidatedBlockHeader>) {
for header in disconnected_blocks.drain(..) {
if let Some(cached_header) =
self.header_cache.block_disconnected(&header.to_best_block().block_hash)
{
assert_eq!(cached_header, header);
}
self.chain_listener.block_disconnected(&header.header, header.height);
}
}

/// Notifies the chain listeners of connected blocks.
async fn connect_blocks<P: Poll>(
&mut self, mut new_tip: ValidatedBlockHeader,
mut connected_blocks: Vec<ValidatedBlockHeader>, chain_poller: &mut P,
) -> Result<(), (BlockSourceError, Option<ValidatedBlockHeader>)> {
for header in connected_blocks.drain(..).rev() {
let height = header.height;
let block_data =
chain_poller.fetch_block(&header).await.map_err(|e| (e, Some(new_tip)))?;
match block_data.deref() {
BlockData::FullBlock(block) => {
self.chain_listener.block_connected(block, height);
},
BlockData::HeaderOnly(header) => {
self.chain_listener.filtered_block_connected(header, &[], height);
},
}

self.header_cache.block_connected(header.to_best_block().block_hash, header);
new_tip = header;
}

Ok(())
}
}
Loading

0 comments on commit 93314b5

Please sign in to comment.