diff --git a/Cargo.toml b/Cargo.toml index 3d9310ec6..48bc64cbb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,7 @@ lightning-net-tokio = { version = "0.0.125" } lightning-persister = { version = "0.0.125" } lightning-background-processor = { version = "0.0.125", features = ["futures"] } lightning-rapid-gossip-sync = { version = "0.0.125" } +lightning-block-sync = { version = "0.0.125", features = ["rpc-client", "tokio"] } lightning-transaction-sync = { version = "0.0.125", features = ["esplora-async-https", "time"] } lightning-liquidity = { version = "0.1.0-alpha.6", features = ["std"] } @@ -65,12 +66,15 @@ bitcoin = "0.32.2" bip39 = "2.0.0" bip21 = { version = "0.5", features = ["std"], default-features = false } +base64 = { version = "0.22.1", default-features = false, features = ["std"] } rand = "0.8.5" chrono = { version = "0.4", default-features = false, features = ["clock"] } tokio = { version = "1.37", default-features = false, features = [ "rt-multi-thread", "time", "sync", "macros" ] } esplora-client = { version = "0.9", default-features = false } libc = "0.2" uniffi = { version = "0.26.0", features = ["build"], optional = true } +serde = { version = "1.0.210", default-features = false, features = ["std", "derive"] } +serde_json = { version = "1.0.128", default-features = false, features = ["std"] } [target.'cfg(vss)'.dependencies] vss-client = "0.3" diff --git a/README.md b/README.md index 22ef1a1b2..02dcbf323 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,7 @@ fn main() { LDK Node currently comes with a decidedly opinionated set of design choices: - On-chain data is handled by the integrated [BDK][bdk] wallet. -- Chain data may currently be sourced from an [Esplora][esplora] server, while support for Electrum and `bitcoind` RPC will follow soon. +- Chain data may currently be sourced from the Bitcoin Core RPC interface or an [Esplora][esplora] server, while support for Electrum will follow soon. - Wallet and channel state may be persisted to an [SQLite][sqlite] database, to file system, or to a custom back-end to be implemented by the user. - Gossip data may be sourced via Lightning's peer-to-peer network or the [Rapid Gossip Sync](https://docs.rs/lightning-rapid-gossip-sync/*/lightning_rapid_gossip_sync/) protocol. - Entropy for the Lightning and on-chain wallets may be sourced from raw bytes or a [BIP39](https://github.com/bitcoin/bips/blob/master/bip-0039.mediawiki) mnemonic. In addition, LDK Node offers the means to generate and persist the entropy bytes to disk. diff --git a/bindings/ldk_node.udl b/bindings/ldk_node.udl index b4fc7ec79..5deb36915 100644 --- a/bindings/ldk_node.udl +++ b/bindings/ldk_node.udl @@ -36,6 +36,7 @@ interface Builder { void set_entropy_seed_bytes(sequence seed_bytes); void set_entropy_bip39_mnemonic(Mnemonic mnemonic, string? passphrase); void set_chain_source_esplora(string server_url, EsploraSyncConfig? config); + void set_chain_source_bitcoind_rpc(string rpc_host, u16 rpc_port, string rpc_user, string rpc_password); void set_gossip_source_p2p(); void set_gossip_source_rgs(string rgs_server_url); void set_liquidity_source_lsps2(SocketAddress address, PublicKey node_id, string? token); diff --git a/src/builder.rs b/src/builder.rs index 43171db1f..733a99960 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -78,6 +78,7 @@ use std::time::SystemTime; #[derive(Debug, Clone)] enum ChainDataSourceConfig { Esplora { server_url: String, sync_config: Option }, + BitcoindRpc { rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String }, } #[derive(Debug, Clone)] @@ -248,6 +249,16 @@ impl NodeBuilder { self } + /// Configures the [`Node`] instance to source its chain data from the given Bitcoin Core RPC + /// endpoint. + pub fn set_chain_source_bitcoind_rpc( + &mut self, rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, + ) -> &mut Self { + self.chain_data_source_config = + Some(ChainDataSourceConfig::BitcoindRpc { rpc_host, rpc_port, rpc_user, rpc_password }); + self + } + /// Configures the [`Node`] instance to source its gossip data from the Lightning peer-to-peer /// network. pub fn set_gossip_source_p2p(&mut self) -> &mut Self { @@ -479,6 +490,19 @@ impl ArcedNodeBuilder { self.inner.write().unwrap().set_chain_source_esplora(server_url, sync_config); } + /// Configures the [`Node`] instance to source its chain data from the given Bitcoin Core RPC + /// endpoint. + pub fn set_chain_source_bitcoind_rpc( + &self, rpc_host: String, rpc_port: u16, rpc_user: String, rpc_password: String, + ) { + self.inner.write().unwrap().set_chain_source_bitcoind_rpc( + rpc_host, + rpc_port, + rpc_user, + rpc_password, + ); + } + /// Configures the [`Node`] instance to source its gossip data from the Lightning peer-to-peer /// network. pub fn set_gossip_source_p2p(&self) { @@ -633,6 +657,21 @@ fn build_with_store_internal( Arc::clone(&node_metrics), )) }, + Some(ChainDataSourceConfig::BitcoindRpc { rpc_host, rpc_port, rpc_user, rpc_password }) => { + Arc::new(ChainSource::new_bitcoind_rpc( + rpc_host.clone(), + *rpc_port, + rpc_user.clone(), + rpc_password.clone(), + Arc::clone(&wallet), + Arc::clone(&fee_estimator), + Arc::clone(&tx_broadcaster), + Arc::clone(&kv_store), + Arc::clone(&config), + Arc::clone(&logger), + Arc::clone(&node_metrics), + )) + }, None => { // Default to Esplora client. let server_url = DEFAULT_ESPLORA_SERVER_URL.to_string(); diff --git a/src/chain/bitcoind_rpc.rs b/src/chain/bitcoind_rpc.rs new file mode 100644 index 000000000..6e7360601 --- /dev/null +++ b/src/chain/bitcoind_rpc.rs @@ -0,0 +1,394 @@ +// This file is Copyright its original authors, visible in version control history. +// +// This file is licensed under the Apache License, Version 2.0 or the MIT license , at your option. You may not use this file except in +// accordance with one or both of these licenses. + +use crate::types::{ChainMonitor, ChannelManager, Sweeper, Wallet}; + +use lightning::chain::Listen; + +use lightning_block_sync::http::HttpEndpoint; +use lightning_block_sync::http::JsonResponse; +use lightning_block_sync::poll::ValidatedBlockHeader; +use lightning_block_sync::rpc::{RpcClient, RpcError}; +use lightning_block_sync::{ + AsyncBlockSourceResult, BlockData, BlockHeaderData, BlockSource, Cache, +}; + +use serde::Serialize; + +use bitcoin::{BlockHash, FeeRate, Transaction, Txid}; + +use base64::prelude::{Engine, BASE64_STANDARD}; + +use std::collections::{HashMap, VecDeque}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; + +pub struct BitcoindRpcClient { + rpc_client: Arc, + latest_mempool_timestamp: AtomicU64, +} + +impl BitcoindRpcClient { + pub(crate) fn new(host: String, port: u16, rpc_user: String, rpc_password: String) -> Self { + let http_endpoint = HttpEndpoint::for_host(host.clone()).with_port(port); + let rpc_credentials = + BASE64_STANDARD.encode(format!("{}:{}", rpc_user.clone(), rpc_password.clone())); + + let rpc_client = Arc::new( + RpcClient::new(&rpc_credentials, http_endpoint) + .expect("RpcClient::new is actually infallible"), + ); + + let latest_mempool_timestamp = AtomicU64::new(0); + + Self { rpc_client, latest_mempool_timestamp } + } + + pub(crate) async fn broadcast_transaction(&self, tx: &Transaction) -> std::io::Result { + let tx_serialized = bitcoin::consensus::encode::serialize_hex(tx); + let tx_json = serde_json::json!(tx_serialized); + self.rpc_client.call_method::("sendrawtransaction", &vec![tx_json]).await + } + + pub(crate) async fn get_fee_estimate_for_target( + &self, num_blocks: usize, estimation_mode: FeeRateEstimationMode, + ) -> std::io::Result { + let num_blocks_json = serde_json::json!(num_blocks); + let estimation_mode_json = serde_json::json!(estimation_mode); + self.rpc_client + .call_method::( + "estimatesmartfee", + &vec![num_blocks_json, estimation_mode_json], + ) + .await + .map(|resp| resp.0) + } + + pub(crate) async fn get_mempool_minimum_fee_rate(&self) -> std::io::Result { + self.rpc_client + .call_method::("getmempoolinfo", &vec![]) + .await + .map(|resp| resp.0) + } + + pub(crate) async fn get_raw_transaction( + &self, txid: &Txid, + ) -> std::io::Result> { + let txid_hex = bitcoin::consensus::encode::serialize_hex(txid); + let txid_json = serde_json::json!(txid_hex); + match self + .rpc_client + .call_method::("getrawtransaction", &vec![txid_json]) + .await + { + Ok(resp) => Ok(Some(resp.0)), + Err(e) => match e.into_inner() { + Some(inner) => { + let rpc_error_res: Result, _> = inner.downcast(); + + match rpc_error_res { + Ok(rpc_error) => { + // Check if it's the 'not found' error code. + if rpc_error.code == -5 { + Ok(None) + } else { + Err(std::io::Error::new(std::io::ErrorKind::Other, rpc_error)) + } + }, + Err(_) => Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to process getrawtransaction response", + )), + } + }, + None => Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to process getrawtransaction response", + )), + }, + } + } + + pub(crate) async fn get_raw_mempool(&self) -> std::io::Result> { + let verbose_flag_json = serde_json::json!(true); + self.rpc_client + .call_method::("getrawmempool", &vec![verbose_flag_json]) + .await + .map(|resp| resp.0) + } + + /// Get mempool transactions, alongside their first-seen unix timestamps. + /// + /// This method is an adapted version of `bdk_bitcoind_rpc::Emitter::mempool`. It emits each + /// transaction only once, unless we cannot assume the transaction's ancestors are already + /// emitted. + pub(crate) async fn get_mempool_transactions_and_timestamp_at_height( + &self, best_processed_height: u32, + ) -> std::io::Result> { + let prev_mempool_time = self.latest_mempool_timestamp.load(Ordering::Relaxed); + let mut latest_time = prev_mempool_time; + + let mempool_entries = self.get_raw_mempool().await?; + let mut txs_to_emit = Vec::new(); + + for entry in mempool_entries { + if entry.time > latest_time { + latest_time = entry.time; + } + + // Avoid emitting transactions that are already emitted if we can guarantee + // blocks containing ancestors are already emitted. The bitcoind rpc interface + // provides us with the block height that the tx is introduced to the mempool. + // If we have already emitted the block of height, we can assume that all + // ancestor txs have been processed by the receiver. + let ancestor_within_height = entry.height <= best_processed_height; + let is_already_emitted = entry.time <= prev_mempool_time; + if is_already_emitted && ancestor_within_height { + continue; + } + + match self.get_raw_transaction(&entry.txid).await { + Ok(Some(tx)) => { + txs_to_emit.push((tx, entry.time)); + }, + Ok(None) => { + continue; + }, + Err(e) => return Err(e), + }; + } + + if !txs_to_emit.is_empty() { + self.latest_mempool_timestamp.store(latest_time, Ordering::Release); + } + Ok(txs_to_emit) + } +} + +impl BlockSource for BitcoindRpcClient { + fn get_header<'a>( + &'a self, header_hash: &'a BlockHash, height_hint: Option, + ) -> AsyncBlockSourceResult<'a, BlockHeaderData> { + Box::pin(async move { self.rpc_client.get_header(header_hash, height_hint).await }) + } + + fn get_block<'a>( + &'a self, header_hash: &'a BlockHash, + ) -> AsyncBlockSourceResult<'a, BlockData> { + Box::pin(async move { self.rpc_client.get_block(header_hash).await }) + } + + fn get_best_block<'a>(&'a self) -> AsyncBlockSourceResult<(BlockHash, Option)> { + Box::pin(async move { self.rpc_client.get_best_block().await }) + } +} + +pub(crate) struct FeeResponse(pub FeeRate); + +impl TryInto for JsonResponse { + type Error = std::io::Error; + fn try_into(self) -> std::io::Result { + if !self.0["errors"].is_null() { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + self.0["errors"].to_string(), + )); + } + let fee_rate_btc_per_kvbyte = self.0["feerate"] + .as_f64() + .ok_or(std::io::Error::new(std::io::ErrorKind::Other, "Failed to parse fee rate"))?; + // Bitcoin Core gives us a feerate in BTC/KvB. + // Thus, we multiply by 25_000_000 (10^8 / 4) to get satoshis/kwu. + let fee_rate = { + let fee_rate_sat_per_kwu = (fee_rate_btc_per_kvbyte * 25_000_000.0).round() as u64; + FeeRate::from_sat_per_kwu(fee_rate_sat_per_kwu) + }; + Ok(FeeResponse(fee_rate)) + } +} + +pub struct MempoolMinFeeResponse(pub FeeRate); + +impl TryInto for JsonResponse { + type Error = std::io::Error; + fn try_into(self) -> std::io::Result { + let fee_rate_btc_per_kvbyte = self.0["mempoolminfee"] + .as_f64() + .ok_or(std::io::Error::new(std::io::ErrorKind::Other, "Failed to parse fee rate"))?; + // Bitcoin Core gives us a feerate in BTC/KvB. + // Thus, we multiply by 25_000_000 (10^8 / 4) to get satoshis/kwu. + let fee_rate = { + let fee_rate_sat_per_kwu = (fee_rate_btc_per_kvbyte * 25_000_000.0).round() as u64; + FeeRate::from_sat_per_kwu(fee_rate_sat_per_kwu) + }; + Ok(MempoolMinFeeResponse(fee_rate)) + } +} + +pub struct GetRawTransactionResponse(pub Transaction); + +impl TryInto for JsonResponse { + type Error = std::io::Error; + fn try_into(self) -> std::io::Result { + let tx = self + .0 + .as_str() + .ok_or(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to parse getrawtransaction response", + )) + .and_then(|s| { + bitcoin::consensus::encode::deserialize_hex(s).map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to parse getrawtransaction response", + ) + }) + })?; + + Ok(GetRawTransactionResponse(tx)) + } +} + +pub struct GetRawMempoolResponse(Vec); + +impl TryInto for JsonResponse { + type Error = std::io::Error; + fn try_into(self) -> std::io::Result { + let mut mempool_transactions = Vec::new(); + let res = self.0.as_object().ok_or(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to parse getrawmempool response", + ))?; + + for (k, v) in res { + let txid = match bitcoin::consensus::encode::deserialize_hex(k) { + Ok(txid) => txid, + Err(_) => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to parse getrawmempool response", + )); + }, + }; + + let time = match v["time"].as_u64() { + Some(time) => time, + None => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to parse getrawmempool response", + )); + }, + }; + + let height = match v["height"].as_u64().and_then(|h| h.try_into().ok()) { + Some(height) => height, + None => { + return Err(std::io::Error::new( + std::io::ErrorKind::Other, + "Failed to parse getrawmempool response", + )); + }, + }; + let entry = RawMempoolEntry { txid, time, height }; + + mempool_transactions.push(entry); + } + + Ok(GetRawMempoolResponse(mempool_transactions)) + } +} + +#[derive(Debug, Clone)] +pub(crate) struct RawMempoolEntry { + /// The transaction id + txid: Txid, + /// Local time transaction entered pool in seconds since 1 Jan 1970 GMT + time: u64, + /// Block height when transaction entered pool + height: u32, +} + +#[derive(Debug, Clone, Serialize)] +#[serde(rename_all = "UPPERCASE")] +pub(crate) enum FeeRateEstimationMode { + Economical, + Conservative, +} + +const MAX_HEADER_CACHE_ENTRIES: usize = 100; + +pub(crate) struct BoundedHeaderCache { + header_map: HashMap, + recently_seen: VecDeque, +} + +impl BoundedHeaderCache { + pub(crate) fn new() -> Self { + let header_map = HashMap::new(); + let recently_seen = VecDeque::new(); + Self { header_map, recently_seen } + } +} + +impl Cache for BoundedHeaderCache { + fn look_up(&self, block_hash: &BlockHash) -> Option<&ValidatedBlockHeader> { + self.header_map.get(block_hash) + } + + fn block_connected(&mut self, block_hash: BlockHash, block_header: ValidatedBlockHeader) { + self.recently_seen.push_back(block_hash); + self.header_map.insert(block_hash, block_header); + + if self.header_map.len() >= MAX_HEADER_CACHE_ENTRIES { + // Keep dropping old entries until we've actually removed a header entry. + while let Some(oldest_entry) = self.recently_seen.pop_front() { + if self.header_map.remove(&oldest_entry).is_some() { + break; + } + } + } + } + + fn block_disconnected(&mut self, block_hash: &BlockHash) -> Option { + self.recently_seen.retain(|e| e != block_hash); + self.header_map.remove(block_hash) + } +} + +pub(crate) struct ChainListener { + pub(crate) onchain_wallet: Arc, + pub(crate) channel_manager: Arc, + pub(crate) chain_monitor: Arc, + pub(crate) output_sweeper: Arc, +} + +impl Listen for ChainListener { + fn filtered_block_connected( + &self, header: &bitcoin::block::Header, + txdata: &lightning::chain::transaction::TransactionData, height: u32, + ) { + self.onchain_wallet.filtered_block_connected(header, txdata, height); + self.channel_manager.filtered_block_connected(header, txdata, height); + self.chain_monitor.filtered_block_connected(header, txdata, height); + self.output_sweeper.filtered_block_connected(header, txdata, height); + } + fn block_connected(&self, block: &bitcoin::Block, height: u32) { + self.onchain_wallet.block_connected(block, height); + self.channel_manager.block_connected(block, height); + self.chain_monitor.block_connected(block, height); + self.output_sweeper.block_connected(block, height); + } + + fn block_disconnected(&self, header: &bitcoin::block::Header, height: u32) { + self.onchain_wallet.block_disconnected(header, height); + self.channel_manager.block_disconnected(header, height); + self.chain_monitor.block_disconnected(header, height); + self.output_sweeper.block_disconnected(header, height); + } +} diff --git a/src/chain/mod.rs b/src/chain/mod.rs index 7501c9809..af77e6bee 100644 --- a/src/chain/mod.rs +++ b/src/chain/mod.rs @@ -5,6 +5,11 @@ // http://opensource.org/licenses/MIT>, at your option. You may not use this file except in // accordance with one or both of these licenses. +mod bitcoind_rpc; + +use crate::chain::bitcoind_rpc::{ + BitcoindRpcClient, BoundedHeaderCache, ChainListener, FeeRateEstimationMode, +}; use crate::config::{ Config, EsploraSyncConfig, BDK_CLIENT_CONCURRENCY, BDK_CLIENT_STOP_GAP, BDK_WALLET_SYNC_TIMEOUT_SECS, FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS, LDK_WALLET_SYNC_TIMEOUT_SECS, @@ -13,18 +18,23 @@ use crate::config::{ }; use crate::fee_estimator::{ apply_post_estimation_adjustments, get_all_conf_targets, get_num_block_defaults_for_target, - OnchainFeeEstimator, + ConfirmationTarget, OnchainFeeEstimator, }; use crate::io::utils::write_node_metrics; use crate::logger::{log_bytes, log_error, log_info, log_trace, FilesystemLogger, Logger}; use crate::types::{Broadcaster, ChainMonitor, ChannelManager, DynStore, Sweeper, Wallet}; use crate::{Error, NodeMetrics}; -use lightning::chain::{Confirm, Filter}; +use lightning::chain::chaininterface::ConfirmationTarget as LdkConfirmationTarget; +use lightning::chain::{Confirm, Filter, Listen}; use lightning::util::ser::Writeable; use lightning_transaction_sync::EsploraSyncClient; +use lightning_block_sync::init::{synchronize_listeners, validate_best_block_header}; +use lightning_block_sync::poll::{ChainPoller, ChainTip, ValidatedBlockHeader}; +use lightning_block_sync::SpvClient; + use bdk_esplora::EsploraAsyncExt; use esplora_client::AsyncClient as EsploraAsyncClient; @@ -41,6 +51,8 @@ pub(crate) const DEFAULT_ESPLORA_SERVER_URL: &str = "https://blockstream.info/ap // The default Esplora client timeout we're using. pub(crate) const DEFAULT_ESPLORA_CLIENT_TIMEOUT_SECS: u64 = 10; +const CHAIN_POLLING_INTERVAL_SECS: u64 = 1; + pub(crate) enum WalletSyncStatus { Completed, InProgress { subscribers: tokio::sync::broadcast::Sender> }, @@ -109,6 +121,19 @@ pub(crate) enum ChainSource { logger: Arc, node_metrics: Arc>, }, + BitcoindRpc { + bitcoind_rpc_client: Arc, + header_cache: tokio::sync::Mutex, + latest_chain_tip: RwLock>, + onchain_wallet: Arc, + wallet_polling_status: Mutex, + fee_estimator: Arc, + tx_broadcaster: Arc, + kv_store: Arc, + config: Arc, + logger: Arc, + node_metrics: Arc>, + }, } impl ChainSource { @@ -141,6 +166,32 @@ impl ChainSource { } } + pub(crate) fn new_bitcoind_rpc( + host: String, port: u16, rpc_user: String, rpc_password: String, + onchain_wallet: Arc, fee_estimator: Arc, + tx_broadcaster: Arc, kv_store: Arc, config: Arc, + logger: Arc, node_metrics: Arc>, + ) -> Self { + let bitcoind_rpc_client = + Arc::new(BitcoindRpcClient::new(host, port, rpc_user, rpc_password)); + let header_cache = tokio::sync::Mutex::new(BoundedHeaderCache::new()); + let latest_chain_tip = RwLock::new(None); + let wallet_polling_status = Mutex::new(WalletSyncStatus::Completed); + Self::BitcoindRpc { + bitcoind_rpc_client, + header_cache, + latest_chain_tip, + onchain_wallet, + wallet_polling_status, + fee_estimator, + tx_broadcaster, + kv_store, + config, + logger, + node_metrics, + } + } + pub(crate) async fn continuously_sync_wallets( &self, mut stop_sync_receiver: tokio::sync::watch::Receiver<()>, channel_manager: Arc, chain_monitor: Arc, @@ -201,9 +252,144 @@ impl ChainSource { } } }, + Self::BitcoindRpc { + bitcoind_rpc_client, + header_cache, + latest_chain_tip, + onchain_wallet, + wallet_polling_status, + kv_store, + config, + logger, + node_metrics, + .. + } => { + // First register for the wallet polling status to make sure `Node::sync_wallets` calls + // wait on the result before proceeding. + { + let mut status_lock = wallet_polling_status.lock().unwrap(); + if status_lock.register_or_subscribe_pending_sync().is_some() { + debug_assert!(false, "Sync already in progress. This should never happen."); + } + } + + let channel_manager_best_block_hash = + channel_manager.current_best_block().block_hash; + let sweeper_best_block_hash = output_sweeper.current_best_block().block_hash; + let onchain_wallet_best_block_hash = onchain_wallet.current_best_block().block_hash; + + let mut chain_listeners = vec![ + ( + onchain_wallet_best_block_hash, + &**onchain_wallet as &(dyn Listen + Send + Sync), + ), + ( + channel_manager_best_block_hash, + &*channel_manager as &(dyn Listen + Send + Sync), + ), + (sweeper_best_block_hash, &*output_sweeper as &(dyn Listen + Send + Sync)), + ]; + + // TODO: Eventually we might want to see if we can synchronize `ChannelMonitor`s + // before giving them to `ChainMonitor` it the first place. However, this isn't + // trivial as we load them on initialization (in the `Builder`) and only gain + // network access during `start`. For now, we just make sure we get the worst known + // block hash and sychronize them via `ChainMonitor`. + if let Some(worst_channel_monitor_block_hash) = chain_monitor + .list_monitors() + .iter() + .flat_map(|(txo, _)| chain_monitor.get_monitor(*txo)) + .map(|m| m.current_best_block()) + .min_by_key(|b| b.height) + .map(|b| b.block_hash) + { + chain_listeners.push(( + worst_channel_monitor_block_hash, + &*chain_monitor as &(dyn Listen + Send + Sync), + )); + } + + loop { + let mut locked_header_cache = header_cache.lock().await; + match synchronize_listeners( + bitcoind_rpc_client.as_ref(), + config.network, + &mut *locked_header_cache, + chain_listeners.clone(), + ) + .await + { + Ok(chain_tip) => { + { + *latest_chain_tip.write().unwrap() = Some(chain_tip); + let unix_time_secs_opt = SystemTime::now() + .duration_since(UNIX_EPOCH) + .ok() + .map(|d| d.as_secs()); + let mut locked_node_metrics = node_metrics.write().unwrap(); + locked_node_metrics.latest_lightning_wallet_sync_timestamp = + unix_time_secs_opt; + locked_node_metrics.latest_onchain_wallet_sync_timestamp = + unix_time_secs_opt; + write_node_metrics( + &*locked_node_metrics, + Arc::clone(&kv_store), + Arc::clone(&logger), + ) + .unwrap_or_else(|e| { + log_error!(logger, "Failed to persist node metrics: {}", e); + }); + } + break; + }, + + Err(e) => { + log_error!(logger, "Failed to synchronize chain listeners: {:?}", e); + tokio::time::sleep(Duration::from_secs(CHAIN_POLLING_INTERVAL_SECS)) + .await; + }, + } + } + + // Now propagate the initial result to unblock waiting subscribers. + wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(Ok(())); + + let mut chain_polling_interval = + tokio::time::interval(Duration::from_secs(CHAIN_POLLING_INTERVAL_SECS)); + chain_polling_interval + .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + let mut fee_rate_update_interval = + tokio::time::interval(Duration::from_secs(CHAIN_POLLING_INTERVAL_SECS)); + // When starting up, we just blocked on updating, so skip the first tick. + fee_rate_update_interval.reset(); + fee_rate_update_interval + .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + // Start the polling loop. + loop { + tokio::select! { + _ = stop_sync_receiver.changed() => { + log_trace!( + logger, + "Stopping polling for new chain data.", + ); + return; + } + _ = chain_polling_interval.tick() => { + let _ = self.poll_and_update_listeners(Arc::clone(&channel_manager), Arc::clone(&chain_monitor), Arc::clone(&output_sweeper)).await; + } + _ = fee_rate_update_interval.tick() => { + let _ = self.update_fee_rate_estimates().await; + } + } + } + }, } } + // Synchronize the onchain wallet via transaction-based protocols (i.e., Esplora, Electrum, + // etc.) pub(crate) async fn sync_onchain_wallet(&self) -> Result<(), Error> { match self { Self::Esplora { @@ -319,9 +505,16 @@ impl ChainSource { res }, + Self::BitcoindRpc { .. } => { + // In BitcoindRpc mode we sync lightning and onchain wallet in one go by via + // `ChainPoller`. So nothing to do here. + unreachable!("Onchain wallet will be synced via chain polling") + }, } } + // Synchronize the Lightning wallet via transaction-based protocols (i.e., Esplora, Electrum, + // etc.) pub(crate) async fn sync_lightning_wallet( &self, channel_manager: Arc, chain_monitor: Arc, output_sweeper: Arc, @@ -411,6 +604,145 @@ impl ChainSource { res }, + Self::BitcoindRpc { .. } => { + // In BitcoindRpc mode we sync lightning and onchain wallet in one go by via + // `ChainPoller`. So nothing to do here. + unreachable!("Lightning wallet will be synced via chain polling") + }, + } + } + + pub(crate) async fn poll_and_update_listeners( + &self, channel_manager: Arc, chain_monitor: Arc, + output_sweeper: Arc, + ) -> Result<(), Error> { + match self { + Self::Esplora { .. } => { + // In Esplora mode we sync lightning and onchain wallets via + // `sync_onchain_wallet` and `sync_lightning_wallet`. So nothing to do here. + unreachable!("Listeners will be synced via transction-based syncing") + }, + Self::BitcoindRpc { + bitcoind_rpc_client, + header_cache, + latest_chain_tip, + onchain_wallet, + wallet_polling_status, + kv_store, + config, + logger, + node_metrics, + .. + } => { + let receiver_res = { + let mut status_lock = wallet_polling_status.lock().unwrap(); + status_lock.register_or_subscribe_pending_sync() + }; + + if let Some(mut sync_receiver) = receiver_res { + log_info!(logger, "Sync in progress, skipping."); + return sync_receiver.recv().await.map_err(|e| { + debug_assert!(false, "Failed to receive wallet polling result: {:?}", e); + log_error!(logger, "Failed to receive wallet polling result: {:?}", e); + Error::WalletOperationFailed + })?; + } + + let latest_chain_tip_opt = latest_chain_tip.read().unwrap().clone(); + let chain_tip = if let Some(tip) = latest_chain_tip_opt { + tip + } else { + match validate_best_block_header(bitcoind_rpc_client.as_ref()).await { + Ok(tip) => { + *latest_chain_tip.write().unwrap() = Some(tip); + tip + }, + Err(e) => { + log_error!(logger, "Failed to poll for chain data: {:?}", e); + let res = Err(Error::TxSyncFailed); + wallet_polling_status + .lock() + .unwrap() + .propagate_result_to_subscribers(res); + return res; + }, + } + }; + + let mut locked_header_cache = header_cache.lock().await; + let chain_poller = + ChainPoller::new(Arc::clone(&bitcoind_rpc_client), config.network); + let chain_listener = ChainListener { + onchain_wallet: Arc::clone(&onchain_wallet), + channel_manager: Arc::clone(&channel_manager), + chain_monitor, + output_sweeper, + }; + let mut spv_client = SpvClient::new( + chain_tip, + chain_poller, + &mut *locked_header_cache, + &chain_listener, + ); + let mut chain_polling_interval = + tokio::time::interval(Duration::from_secs(CHAIN_POLLING_INTERVAL_SECS)); + chain_polling_interval + .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + match spv_client.poll_best_tip().await { + Ok((ChainTip::Better(tip), true)) => { + *latest_chain_tip.write().unwrap() = Some(tip); + }, + Ok(_) => {}, + Err(e) => { + log_error!(logger, "Failed to poll for chain data: {:?}", e); + let res = Err(Error::TxSyncFailed); + wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); + return res; + }, + } + + let cur_height = channel_manager.current_best_block().height; + match bitcoind_rpc_client + .get_mempool_transactions_and_timestamp_at_height(cur_height) + .await + { + Ok(unconfirmed_txs) => { + let _ = onchain_wallet.apply_unconfirmed_txs(unconfirmed_txs); + }, + Err(e) => { + log_error!(logger, "Failed to poll for mempool transactions: {:?}", e); + let res = Err(Error::TxSyncFailed); + wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); + return res; + }, + } + + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + let mut locked_node_metrics = node_metrics.write().unwrap(); + locked_node_metrics.latest_lightning_wallet_sync_timestamp = unix_time_secs_opt; + locked_node_metrics.latest_onchain_wallet_sync_timestamp = unix_time_secs_opt; + + let write_res = write_node_metrics( + &*locked_node_metrics, + Arc::clone(&kv_store), + Arc::clone(&logger), + ); + match write_res { + Ok(()) => (), + Err(e) => { + log_error!(logger, "Failed to persist node metrics: {}", e); + let res = Err(Error::PersistenceFailed); + wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); + return res; + }, + } + + let res = Ok(()); + wallet_polling_status.lock().unwrap().propagate_result_to_subscribers(res); + res + }, } } @@ -504,6 +836,135 @@ impl ChainSource { )?; } + Ok(()) + }, + Self::BitcoindRpc { + bitcoind_rpc_client, + fee_estimator, + config, + kv_store, + logger, + node_metrics, + .. + } => { + macro_rules! get_fee_rate_update { + ($estimation_fut: expr) => {{ + let update_res = tokio::time::timeout( + Duration::from_secs(FEE_RATE_CACHE_UPDATE_TIMEOUT_SECS), + $estimation_fut, + ) + .await + .map_err(|e| { + log_error!(logger, "Updating fee rate estimates timed out: {}", e); + Error::FeerateEstimationUpdateTimeout + })?; + update_res + }}; + } + let confirmation_targets = get_all_conf_targets(); + + let mut new_fee_rate_cache = HashMap::with_capacity(10); + let now = Instant::now(); + for target in confirmation_targets { + let fee_rate_update_res = match target { + ConfirmationTarget::Lightning( + LdkConfirmationTarget::MinAllowedAnchorChannelRemoteFee, + ) => { + let estimation_fut = bitcoind_rpc_client.get_mempool_minimum_fee_rate(); + get_fee_rate_update!(estimation_fut) + }, + ConfirmationTarget::Lightning( + LdkConfirmationTarget::MaximumFeeEstimate, + ) => { + let num_blocks = get_num_block_defaults_for_target(target); + let estimation_mode = FeeRateEstimationMode::Conservative; + let estimation_fut = bitcoind_rpc_client + .get_fee_estimate_for_target(num_blocks, estimation_mode); + get_fee_rate_update!(estimation_fut) + }, + ConfirmationTarget::Lightning( + LdkConfirmationTarget::UrgentOnChainSweep, + ) => { + let num_blocks = get_num_block_defaults_for_target(target); + let estimation_mode = FeeRateEstimationMode::Conservative; + let estimation_fut = bitcoind_rpc_client + .get_fee_estimate_for_target(num_blocks, estimation_mode); + get_fee_rate_update!(estimation_fut) + }, + _ => { + // Otherwise, we default to economical block-target estimate. + let num_blocks = get_num_block_defaults_for_target(target); + let estimation_mode = FeeRateEstimationMode::Economical; + let estimation_fut = bitcoind_rpc_client + .get_fee_estimate_for_target(num_blocks, estimation_mode); + get_fee_rate_update!(estimation_fut) + }, + }; + + let fee_rate = match (fee_rate_update_res, config.network) { + (Ok(rate), _) => rate, + (Err(e), Network::Bitcoin) => { + // Strictly fail on mainnet. + log_error!(logger, "Failed to retrieve fee rate estimates: {}", e); + return Err(Error::FeerateEstimationUpdateFailed); + }, + (Err(e), n) if n == Network::Regtest || n == Network::Signet => { + // On regtest/signet we just fall back to the usual 1 sat/vb == 250 + // sat/kwu default. + log_error!( + logger, + "Failed to retrieve fee rate estimates: {}. Falling back to default of 1 sat/vb.", + e, + ); + FeeRate::from_sat_per_kwu(250) + }, + (Err(e), _) => { + // On testnet `estimatesmartfee` can be unreliable so we just skip in + // case of a failure, which will have us falling back to defaults. + log_error!( + logger, + "Failed to retrieve fee rate estimates: {}. Falling back to defaults.", + e, + ); + return Ok(()); + }, + }; + + // LDK 0.0.118 introduced changes to the `ConfirmationTarget` semantics that + // require some post-estimation adjustments to the fee rates, which we do here. + let adjusted_fee_rate = apply_post_estimation_adjustments(target, fee_rate); + + new_fee_rate_cache.insert(target, adjusted_fee_rate); + + log_trace!( + logger, + "Fee rate estimation updated for {:?}: {} sats/kwu", + target, + adjusted_fee_rate.to_sat_per_kwu(), + ); + } + + if fee_estimator.set_fee_rate_cache(new_fee_rate_cache) { + // We only log if the values changed, as it might be very spammy otherwise. + log_info!( + logger, + "Fee rate cache update finished in {}ms.", + now.elapsed().as_millis() + ); + } + + let unix_time_secs_opt = + SystemTime::now().duration_since(UNIX_EPOCH).ok().map(|d| d.as_secs()); + { + let mut locked_node_metrics = node_metrics.write().unwrap(); + locked_node_metrics.latest_fee_rate_cache_update_timestamp = unix_time_secs_opt; + write_node_metrics( + &*locked_node_metrics, + Arc::clone(&kv_store), + Arc::clone(&logger), + )?; + } + Ok(()) }, } @@ -582,6 +1043,60 @@ impl ChainSource { } } }, + Self::BitcoindRpc { bitcoind_rpc_client, tx_broadcaster, logger, .. } => { + // While it's a bit unclear when we'd be able to lean on Bitcoin Core >v28 + // features, we should eventually switch to use `submitpackage` via the + // `rust-bitcoind-json-rpc` crate rather than just broadcasting individual + // transactions. + let mut receiver = tx_broadcaster.get_broadcast_queue().await; + while let Some(next_package) = receiver.recv().await { + for tx in &next_package { + let txid = tx.compute_txid(); + let timeout_fut = tokio::time::timeout( + Duration::from_secs(TX_BROADCAST_TIMEOUT_SECS), + bitcoind_rpc_client.broadcast_transaction(tx), + ); + match timeout_fut.await { + Ok(res) => match res { + Ok(id) => { + debug_assert_eq!(id, txid); + log_trace!( + logger, + "Successfully broadcast transaction {}", + txid + ); + }, + Err(e) => { + log_error!( + logger, + "Failed to broadcast transaction {}: {}", + txid, + e + ); + log_trace!( + logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + }, + }, + Err(e) => { + log_error!( + logger, + "Failed to broadcast transaction due to timeout {}: {}", + txid, + e + ); + log_trace!( + logger, + "Failed broadcast transaction bytes: {}", + log_bytes!(tx.encode()) + ); + }, + } + } + } + }, } } } @@ -590,11 +1105,13 @@ impl Filter for ChainSource { fn register_tx(&self, txid: &bitcoin::Txid, script_pubkey: &bitcoin::Script) { match self { Self::Esplora { tx_sync, .. } => tx_sync.register_tx(txid, script_pubkey), + Self::BitcoindRpc { .. } => (), } } fn register_output(&self, output: lightning::chain::WatchedOutput) { match self { Self::Esplora { tx_sync, .. } => tx_sync.register_output(output), + Self::BitcoindRpc { .. } => (), } } } diff --git a/src/fee_estimator.rs b/src/fee_estimator.rs index 0ecc71586..8db6a6050 100644 --- a/src/fee_estimator.rs +++ b/src/fee_estimator.rs @@ -44,8 +44,17 @@ impl OnchainFeeEstimator { Self { fee_rate_cache } } - pub(crate) fn set_fee_rate_cache(&self, fee_rate_cache: HashMap) { - *self.fee_rate_cache.write().unwrap() = fee_rate_cache; + // Updates the fee rate cache and returns if the new values changed. + pub(crate) fn set_fee_rate_cache( + &self, fee_rate_cache_update: HashMap, + ) -> bool { + let mut locked_fee_rate_cache = self.fee_rate_cache.write().unwrap(); + if fee_rate_cache_update != *locked_fee_rate_cache { + *locked_fee_rate_cache = fee_rate_cache_update; + true + } else { + false + } } } diff --git a/src/lib.rs b/src/lib.rs index 42b99406a..abf9b8b36 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1209,9 +1209,21 @@ impl Node { tokio::task::block_in_place(move || { tokio::runtime::Builder::new_multi_thread().enable_all().build().unwrap().block_on( async move { - chain_source.update_fee_rate_estimates().await?; - chain_source.sync_lightning_wallet(sync_cman, sync_cmon, sync_sweeper).await?; - chain_source.sync_onchain_wallet().await?; + match chain_source.as_ref() { + ChainSource::Esplora { .. } => { + chain_source.update_fee_rate_estimates().await?; + chain_source + .sync_lightning_wallet(sync_cman, sync_cmon, sync_sweeper) + .await?; + chain_source.sync_onchain_wallet().await?; + }, + ChainSource::BitcoindRpc { .. } => { + chain_source.update_fee_rate_estimates().await?; + chain_source + .poll_and_update_listeners(sync_cman, sync_cmon, sync_sweeper) + .await?; + }, + } Ok(()) }, ) diff --git a/src/wallet/mod.rs b/src/wallet/mod.rs index 30da1682d..494fcd768 100644 --- a/src/wallet/mod.rs +++ b/src/wallet/mod.rs @@ -7,12 +7,13 @@ use persist::KVStoreWalletPersister; -use crate::logger::{log_error, log_info, log_trace, Logger}; +use crate::logger::{log_debug, log_error, log_info, log_trace, Logger}; use crate::fee_estimator::{ConfirmationTarget, FeeEstimator}; use crate::Error; use lightning::chain::chaininterface::BroadcasterInterface; +use lightning::chain::{BestBlock, Listen}; use lightning::events::bump_transaction::{Utxo, WalletSource}; use lightning::ln::msgs::{DecodeError, UnsignedGossipMessage}; @@ -84,6 +85,11 @@ where self.inner.lock().unwrap().start_sync_with_revealed_spks().build() } + pub(crate) fn current_best_block(&self) -> BestBlock { + let checkpoint = self.inner.lock().unwrap().latest_checkpoint(); + BestBlock { block_hash: checkpoint.hash(), height: checkpoint.height() } + } + pub(crate) fn apply_update(&self, update: impl Into) -> Result<(), Error> { let mut locked_wallet = self.inner.lock().unwrap(); match locked_wallet.apply_update(update) { @@ -103,6 +109,21 @@ where } } + pub(crate) fn apply_unconfirmed_txs( + &self, unconfirmed_txs: Vec<(Transaction, u64)>, + ) -> Result<(), Error> { + let mut locked_wallet = self.inner.lock().unwrap(); + locked_wallet.apply_unconfirmed_txs(unconfirmed_txs); + + let mut locked_persister = self.persister.lock().unwrap(); + locked_wallet.persist(&mut locked_persister).map_err(|e| { + log_error!(self.logger, "Failed to persist wallet: {}", e); + Error::PersistenceFailed + })?; + + Ok(()) + } + pub(crate) fn create_funding_transaction( &self, output_script: ScriptBuf, amount: Amount, confirmation_target: ConfirmationTarget, locktime: LockTime, @@ -285,6 +306,66 @@ where } } +impl Listen for Wallet +where + B::Target: BroadcasterInterface, + E::Target: FeeEstimator, + L::Target: Logger, +{ + fn filtered_block_connected( + &self, _header: &bitcoin::block::Header, + _txdata: &lightning::chain::transaction::TransactionData, _height: u32, + ) { + debug_assert!(false, "Syncing filtered blocks is currently not supported"); + // As far as we can tell this would be a no-op anyways as we don't have to tell BDK about + // the header chain of intermediate blocks. According to the BDK team, it's sufficient to + // only connect full blocks starting from the last point of disagreement. + } + + fn block_connected(&self, block: &bitcoin::Block, height: u32) { + let mut locked_wallet = self.inner.lock().unwrap(); + + let pre_checkpoint = locked_wallet.latest_checkpoint(); + if pre_checkpoint.height() != height - 1 + || pre_checkpoint.hash() != block.header.prev_blockhash + { + log_debug!( + self.logger, + "Detected reorg while applying a connected block to on-chain wallet: new block with hash {} at height {}", + block.header.block_hash(), + height + ); + } + + match locked_wallet.apply_block(block, height) { + Ok(()) => (), + Err(e) => { + log_error!( + self.logger, + "Failed to apply connected block to on-chain wallet: {}", + e + ); + return; + }, + }; + + let mut locked_persister = self.persister.lock().unwrap(); + match locked_wallet.persist(&mut locked_persister) { + Ok(_) => (), + Err(e) => { + log_error!(self.logger, "Failed to persist on-chain wallet: {}", e); + return; + }, + }; + } + + fn block_disconnected(&self, _header: &bitcoin::block::Header, _height: u32) { + // This is a no-op as we don't have to tell BDK about disconnections. According to the BDK + // team, it's sufficient in case of a reorg to always connect blocks starting from the last + // point of disagreement. + } +} + impl WalletSource for Wallet where B::Target: BroadcasterInterface, diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 9c712286a..7c501d545 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -241,6 +241,12 @@ type TestNode = Arc; #[cfg(not(feature = "uniffi"))] type TestNode = Node; +#[derive(Clone)] +pub(crate) enum TestChainSource<'a> { + Esplora(&'a ElectrsD), + BitcoindRpc(&'a BitcoinD), +} + macro_rules! setup_builder { ($builder: ident, $config: expr) => { #[cfg(feature = "uniffi")] @@ -253,11 +259,12 @@ macro_rules! setup_builder { pub(crate) use setup_builder; pub(crate) fn setup_two_nodes( - electrsd: &ElectrsD, allow_0conf: bool, anchor_channels: bool, anchors_trusted_no_reserve: bool, + chain_source: &TestChainSource, allow_0conf: bool, anchor_channels: bool, + anchors_trusted_no_reserve: bool, ) -> (TestNode, TestNode) { println!("== Node A =="); let config_a = random_config(anchor_channels); - let node_a = setup_node(electrsd, config_a); + let node_a = setup_node(chain_source, config_a); println!("\n== Node B =="); let mut config_b = random_config(anchor_channels); @@ -272,17 +279,29 @@ pub(crate) fn setup_two_nodes( .trusted_peers_no_reserve .push(node_a.node_id()); } - let node_b = setup_node(electrsd, config_b); + let node_b = setup_node(chain_source, config_b); (node_a, node_b) } -pub(crate) fn setup_node(electrsd: &ElectrsD, config: Config) -> TestNode { - let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); - let mut sync_config = EsploraSyncConfig::default(); - sync_config.onchain_wallet_sync_interval_secs = 100000; - sync_config.lightning_wallet_sync_interval_secs = 100000; +pub(crate) fn setup_node(chain_source: &TestChainSource, config: Config) -> TestNode { setup_builder!(builder, config); - builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); + match chain_source { + TestChainSource::Esplora(electrsd) => { + let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); + let mut sync_config = EsploraSyncConfig::default(); + sync_config.onchain_wallet_sync_interval_secs = 100000; + sync_config.lightning_wallet_sync_interval_secs = 100000; + builder.set_chain_source_esplora(esplora_url.clone(), Some(sync_config)); + }, + TestChainSource::BitcoindRpc(bitcoind) => { + let rpc_host = bitcoind.params.rpc_socket.ip().to_string(); + let rpc_port = bitcoind.params.rpc_socket.port(); + let values = bitcoind.params.get_cookie_values().unwrap().unwrap(); + let rpc_user = values.user; + let rpc_password = values.password; + builder.set_chain_source_bitcoind_rpc(rpc_host, rpc_port, rpc_user, rpc_password); + }, + } let test_sync_store = Arc::new(TestSyncStore::new(config.storage_dir_path.into())); let node = builder.build_with_store(test_sync_store).unwrap(); node.start().unwrap(); @@ -483,7 +502,7 @@ pub(crate) fn do_channel_full_cycle( node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); - let onchain_fee_buffer_sat = 1500; + let onchain_fee_buffer_sat = 5000; let node_a_anchor_reserve_sat = if expect_anchor_channel { 25_000 } else { 0 }; let node_a_upper_bound_sat = premine_amount_sat - node_a_anchor_reserve_sat - funding_amount_sat; diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 13f3ab0be..dc5c4b818 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -11,7 +11,7 @@ use common::{ do_channel_full_cycle, expect_channel_ready_event, expect_event, expect_payment_received_event, expect_payment_successful_event, generate_blocks_and_wait, open_channel, premine_and_distribute_funds, random_config, setup_bitcoind_and_electrsd, setup_builder, - setup_node, setup_two_nodes, wait_for_tx, TestSyncStore, + setup_node, setup_two_nodes, wait_for_tx, TestChainSource, TestSyncStore, }; use ldk_node::config::EsploraSyncConfig; @@ -21,49 +21,63 @@ use ldk_node::{Builder, Event, NodeError}; use lightning::ln::channelmanager::PaymentId; use lightning::util::persist::KVStore; -use bitcoin::{Amount, Network}; +use bitcoin::Amount; use std::sync::Arc; #[test] fn channel_full_cycle() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let (node_a, node_b) = setup_two_nodes(&electrsd, false, true, false); + let chain_source = TestChainSource::Esplora(&electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false); +} + +#[test] +fn channel_full_cycle_bitcoind() { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + let chain_source = TestChainSource::BitcoindRpc(&bitcoind); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false); } #[test] fn channel_full_cycle_force_close() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let (node_a, node_b) = setup_two_nodes(&electrsd, false, true, false); + let chain_source = TestChainSource::Esplora(&electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, true); } #[test] fn channel_full_cycle_force_close_trusted_no_reserve() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let (node_a, node_b) = setup_two_nodes(&electrsd, false, true, true); + let chain_source = TestChainSource::Esplora(&electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, true); do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, true); } #[test] fn channel_full_cycle_0conf() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let (node_a, node_b) = setup_two_nodes(&electrsd, true, true, false); + let chain_source = TestChainSource::Esplora(&electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, true, true, false); do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, true, true, false) } #[test] fn channel_full_cycle_legacy_staticremotekey() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let (node_a, node_b) = setup_two_nodes(&electrsd, false, false, false); + let chain_source = TestChainSource::Esplora(&electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, false, false); do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, false, false); } #[test] fn channel_open_fails_when_funds_insufficient() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let (node_a, node_b) = setup_two_nodes(&electrsd, false, true, false); + let chain_source = TestChainSource::Esplora(&electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); let addr_a = node_a.onchain_payment().new_address().unwrap(); let addr_b = node_b.onchain_payment().new_address().unwrap(); @@ -181,17 +195,6 @@ fn multi_hop_sending() { expect_payment_successful_event!(nodes[0], payment_id, Some(fee_paid_msat)); } -#[test] -fn connect_to_public_testnet_esplora() { - let mut config = random_config(true); - config.network = Network::Testnet; - setup_builder!(builder, config); - builder.set_chain_source_esplora("https://blockstream.info/testnet/api".to_string(), None); - let node = builder.build().unwrap(); - node.start().unwrap(); - node.stop().unwrap(); -} - #[test] fn start_stop_reinit() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); @@ -266,7 +269,8 @@ fn start_stop_reinit() { #[test] fn onchain_spend_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let (node_a, node_b) = setup_two_nodes(&electrsd, false, true, false); + let chain_source = TestChainSource::Esplora(&electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); let addr_a = node_a.onchain_payment().new_address().unwrap(); let addr_b = node_b.onchain_payment().new_address().unwrap(); @@ -315,7 +319,8 @@ fn onchain_spend_receive() { fn sign_verify_msg() { let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let config = random_config(true); - let node = setup_node(&electrsd, config); + let chain_source = TestChainSource::Esplora(&electrsd); + let node = setup_node(&chain_source, config); // Tests arbitrary message signing and later verification let msg = "OK computer".as_bytes(); @@ -332,7 +337,8 @@ fn connection_restart_behavior() { fn do_connection_restart_behavior(persist: bool) { let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let (node_a, node_b) = setup_two_nodes(&electrsd, false, false, false); + let chain_source = TestChainSource::Esplora(&electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, false, false); let node_id_a = node_a.node_id(); let node_id_b = node_b.node_id(); @@ -383,7 +389,8 @@ fn do_connection_restart_behavior(persist: bool) { #[test] fn concurrent_connections_succeed() { let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let (node_a, node_b) = setup_two_nodes(&electrsd, false, true, false); + let chain_source = TestChainSource::Esplora(&electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); let node_a = Arc::new(node_a); let node_b = Arc::new(node_b); @@ -413,7 +420,8 @@ fn concurrent_connections_succeed() { #[test] fn simple_bolt12_send_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let (node_a, node_b) = setup_two_nodes(&electrsd, false, true, false); + let chain_source = TestChainSource::Esplora(&electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); let address_a = node_a.onchain_payment().new_address().unwrap(); let premine_amount_sat = 5_000_000; @@ -620,7 +628,8 @@ fn simple_bolt12_send_receive() { #[test] fn generate_bip21_uri() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let (node_a, node_b) = setup_two_nodes(&electrsd, false, true, false); + let chain_source = TestChainSource::Esplora(&electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); let address_a = node_a.onchain_payment().new_address().unwrap(); let premined_sats = 5_000_000; @@ -661,7 +670,8 @@ fn generate_bip21_uri() { #[test] fn unified_qr_send_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - let (node_a, node_b) = setup_two_nodes(&electrsd, false, true, false); + let chain_source = TestChainSource::Esplora(&electrsd); + let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); let address_a = node_a.onchain_payment().new_address().unwrap(); let premined_sats = 5_000_000;