diff --git a/lib/bindings/langs/flutter/breez_sdk_liquidFFI/include/breez_sdk_liquidFFI.h b/lib/bindings/langs/flutter/breez_sdk_liquidFFI/include/breez_sdk_liquidFFI.h index 87abfcedd..7227569c8 100644 --- a/lib/bindings/langs/flutter/breez_sdk_liquidFFI/include/breez_sdk_liquidFFI.h +++ b/lib/bindings/langs/flutter/breez_sdk_liquidFFI/include/breez_sdk_liquidFFI.h @@ -408,6 +408,12 @@ uint16_t uniffi_breez_sdk_liquid_bindings_checksum_method_signer_slip77_master_b ); uint16_t uniffi_breez_sdk_liquid_bindings_checksum_method_signer_hmac_sha256(void +); +uint16_t uniffi_breez_sdk_liquid_bindings_checksum_method_signer_ecies_encrypt(void + +); +uint16_t uniffi_breez_sdk_liquid_bindings_checksum_method_signer_ecies_decrypt(void + ); uint32_t ffi_breez_sdk_liquid_bindings_uniffi_contract_version(void diff --git a/lib/core/src/chain_swap.rs b/lib/core/src/chain_swap.rs index b5d4468a6..7c82feeca 100644 --- a/lib/core/src/chain_swap.rs +++ b/lib/core/src/chain_swap.rs @@ -1,11 +1,11 @@ -use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::{str::FromStr, sync::Arc}; use anyhow::{anyhow, Result}; +use async_trait::async_trait; use boltz_client::{ boltz::{self}, swaps::boltz::{ChainSwapStates, CreateChainResponse, SwapUpdateTxDetails}, - Address, ElementsLockTime, LockTime, Secp256k1, Serialize, ToHex, + ElementsLockTime, Secp256k1, Serialize, ToHex, }; use futures_util::TryFutureExt; use log::{debug, error, info, warn}; @@ -14,9 +14,9 @@ use lwk_wollet::{ hashes::hex::DisplayHex, History, }; -use tokio::sync::{broadcast, watch, Mutex}; -use tokio::time::MissedTickBehavior; +use tokio::sync::{broadcast, Mutex}; +use crate::model::{BlockListener, ChainSwapUpdate}; use crate::{ chain::{bitcoin::BitcoinChainService, liquid::LiquidChainService}, ensure_sdk, @@ -46,6 +46,27 @@ pub(crate) struct ChainSwapHandler { subscription_notifier: broadcast::Sender, } +#[async_trait] +impl BlockListener for ChainSwapHandler { + async fn on_bitcoin_block(&self, height: u32) { + if let Err(e) = self.rescan_incoming_refunds(height, false).await { + error!("Error rescanning incoming refunds: {e:?}"); + } + if let Err(e) = self.claim_outgoing(height).await { + error!("Error claiming outgoing: {e:?}"); + } + } + + async fn on_liquid_block(&self, height: u32) { + if let Err(e) = self.refund_outgoing(height).await { + warn!("Error refunding outgoing: {e:?}"); + } + if let Err(e) = self.claim_incoming(height).await { + error!("Error claiming incoming: {e:?}"); + } + } +} + impl ChainSwapHandler { pub(crate) fn new( config: Config, @@ -67,38 +88,6 @@ impl ChainSwapHandler { }) } - pub(crate) async fn start(self: Arc, mut shutdown: watch::Receiver<()>) { - let cloned = self.clone(); - tokio::spawn(async move { - let mut bitcoin_rescan_interval = tokio::time::interval(Duration::from_secs(60 * 10)); - let mut liquid_rescan_interval = tokio::time::interval(Duration::from_secs(60)); - bitcoin_rescan_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - liquid_rescan_interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - - loop { - tokio::select! { - _ = bitcoin_rescan_interval.tick() => { - if let Err(e) = cloned.rescan_incoming_user_lockup_txs(false).await { - error!("Error checking incoming user txs: {e:?}"); - } - if let Err(e) = cloned.rescan_outgoing_claim_txs().await { - error!("Error checking outgoing server txs: {e:?}"); - } - }, - _ = liquid_rescan_interval.tick() => { - if let Err(e) = cloned.rescan_incoming_server_lockup_txs().await { - error!("Error checking incoming server txs: {e:?}"); - } - }, - _ = shutdown.changed() => { - info!("Received shutdown signal, exiting chain swap loop"); - return; - } - } - } - }); - } - pub(crate) fn subscribe_payment_updates(&self) -> broadcast::Receiver { self.subscription_notifier.subscribe() } @@ -106,10 +95,7 @@ impl ChainSwapHandler { /// Handles status updates from Boltz for Chain swaps pub(crate) async fn on_new_status(&self, update: &boltz::Update) -> Result<()> { let id = &update.id; - let swap = self - .persister - .fetch_chain_swap_by_id(id)? - .ok_or(anyhow!("No ongoing Chain Swap found for ID {id}"))?; + let swap = self.fetch_chain_swap_by_id(id)?; if let Some(sync_state) = self.persister.get_sync_state_by_data_id(&swap.id)? { if !sync_state.is_local { @@ -134,11 +120,11 @@ impl ChainSwapHandler { } } - pub(crate) async fn rescan_incoming_user_lockup_txs( + pub(crate) async fn rescan_incoming_refunds( &self, + height: u32, ignore_monitoring_block_height: bool, ) -> Result<()> { - let current_height = self.bitcoin_chain_service.lock().await.tip()?.height as u32; let chain_swaps: Vec = self .persister .list_chain_swaps()? @@ -148,15 +134,11 @@ impl ChainSwapHandler { info!( "Rescanning {} incoming Chain Swap(s) user lockup txs at height {}", chain_swaps.len(), - current_height + height ); for swap in chain_swaps { if let Err(e) = self - .rescan_incoming_chain_swap_user_lockup_tx( - &swap, - current_height, - ignore_monitoring_block_height, - ) + .rescan_incoming_user_lockup_balance(&swap, height, ignore_monitoring_block_height) .await { error!( @@ -173,7 +155,7 @@ impl ChainSwapHandler { /// - `current_height`: the tip /// - `ignore_monitoring_block_height`: if true, it rescans an expired swap even after the /// cutoff monitoring block height - async fn rescan_incoming_chain_swap_user_lockup_tx( + async fn rescan_incoming_user_lockup_balance( &self, swap: &ChainSwap, current_height: u32, @@ -209,8 +191,12 @@ impl ChainSwapHandler { "Incoming Chain Swap {} has {} unspent sats. Setting the swap to refundable", swap.id, script_balance.confirmed ); - self.update_swap_info(&swap.id, Refundable, None, None, None, None) - .await?; + self.update_swap_info(&ChainSwapUpdate { + swap_id: swap.id.clone(), + to_state: Refundable, + ..Default::default() + }) + .await?; } else if script_balance.confirmed == 0 { // If the funds sent to the lockup script address are spent then set the // state back to Complete/Failed. @@ -224,16 +210,19 @@ impl ChainSwapHandler { "Incoming Chain Swap {} has 0 unspent sats. Setting the swap to {:?}", swap.id, to_state ); - self.update_swap_info(&swap.id, to_state, None, None, None, None) - .await?; + self.update_swap_info(&ChainSwapUpdate { + swap_id: swap.id.clone(), + to_state, + ..Default::default() + }) + .await?; } } } Ok(()) } - pub(crate) async fn rescan_incoming_server_lockup_txs(&self) -> Result<()> { - let current_height = self.liquid_chain_service.lock().await.tip().await?; + async fn claim_incoming(&self, height: u32) -> Result<()> { let chain_swaps: Vec = self .persister .list_chain_swaps()? @@ -245,95 +234,72 @@ impl ChainSwapHandler { info!( "Rescanning {} incoming Chain Swap(s) server lockup txs at height {}", chain_swaps.len(), - current_height + height ); for swap in chain_swaps { - if let Err(e) = self - .rescan_incoming_chain_swap_server_lockup_tx(&swap) - .await - { + if let Err(e) = self.claim_confirmed_server_lockup(&swap).await { error!( "Error rescanning server lockup of incoming Chain Swap {}: {e:?}", - swap.id + swap.id, ); } } Ok(()) } - async fn rescan_incoming_chain_swap_server_lockup_tx(&self, swap: &ChainSwap) -> Result<()> { - let Some(tx_id) = swap.server_lockup_tx_id.clone() else { - // Skip the rescan if there is no server_lockup_tx_id yet - return Ok(()); - }; - let swap_id = &swap.id; - let swap_script = swap.get_claim_swap_script()?; - let script_history = self.fetch_liquid_script_history(&swap_script).await?; - let tx_history = script_history - .iter() - .find(|h| h.txid.to_hex().eq(&tx_id)) - .ok_or(anyhow!( - "Server lockup tx for incoming Chain Swap {swap_id} was not found, txid={tx_id}" - ))?; - if tx_history.height > 0 { - info!("Incoming Chain Swap {swap_id} server lockup tx is confirmed"); - self.claim(swap_id) - .await - .map_err(|e| anyhow!("Could not claim Chain Swap {swap_id}: {e:?}"))?; - } - Ok(()) - } - - pub(crate) async fn rescan_outgoing_claim_txs(&self) -> Result<()> { - let current_height = self.bitcoin_chain_service.lock().await.tip()?.height as u32; + async fn claim_outgoing(&self, height: u32) -> Result<()> { let chain_swaps: Vec = self .persister .list_chain_swaps()? .into_iter() .filter(|s| { - s.direction == Direction::Outgoing && s.state == Pending && s.claim_tx_id.is_some() + s.direction == Direction::Outgoing && s.state == Pending && s.claim_tx_id.is_none() }) .collect(); info!( - "Rescanning {} outgoing Chain Swap(s) claim txs at height {}", + "Rescanning {} outgoing Chain Swap(s) server lockup txs at height {}", chain_swaps.len(), - current_height + height ); for swap in chain_swaps { - if let Err(e) = self.rescan_outgoing_chain_swap_claim_tx(&swap).await { - error!("Error rescanning outgoing Chain Swap {}: {e:?}", swap.id); + if let Err(e) = self.claim_confirmed_server_lockup(&swap).await { + error!( + "Error rescanning server lockup of outgoing Chain Swap {}: {e:?}", + swap.id + ); } } Ok(()) } - async fn rescan_outgoing_chain_swap_claim_tx(&self, swap: &ChainSwap) -> Result<()> { - if let Some(claim_address) = &swap.claim_address { - let address = Address::from_str(claim_address)?; - let claim_tx_id = swap.claim_tx_id.clone().ok_or(anyhow!("No claim tx id"))?; - let script_pubkey = address.assume_checked().script_pubkey(); - let script_history = self - .bitcoin_chain_service - .lock() + async fn claim_confirmed_server_lockup(&self, swap: &ChainSwap) -> Result<()> { + let Some(tx_id) = swap.server_lockup_tx_id.clone() else { + // Skip the rescan if there is no server_lockup_tx_id yet + return Ok(()); + }; + let swap_id = &swap.id; + let swap_script = swap.get_claim_swap_script()?; + let script_history = match swap.direction { + Direction::Incoming => self.fetch_liquid_script_history(&swap_script).await, + Direction::Outgoing => self.fetch_bitcoin_script_history(&swap_script).await, + }?; + let tx_history = script_history + .iter() + .find(|h| h.txid.to_hex().eq(&tx_id)) + .ok_or(anyhow!( + "Server lockup tx for Chain Swap {swap_id} was not found, txid={tx_id}" + ))?; + if tx_history.height > 0 { + info!("Chain Swap {swap_id} server lockup tx is confirmed"); + self.claim(swap_id) .await - .get_script_history(script_pubkey.as_script())?; - let claim_tx_history = script_history - .iter() - .find(|h| h.txid.to_hex().eq(&claim_tx_id) && h.height > 0); - if claim_tx_history.is_some() { - info!( - "Outgoing Chain Swap {} claim tx is confirmed. Setting the swap to Complete", - swap.id - ); - self.update_swap_info(&swap.id, Complete, None, None, None, None) - .await?; - } + .map_err(|e| anyhow!("Could not claim Chain Swap {swap_id}: {e:?}"))?; } Ok(()) } async fn on_new_incoming_status(&self, swap: &ChainSwap, update: &boltz::Update) -> Result<()> { - let id = &update.id; + let id = update.id.clone(); let status = &update.status; let swap_state = ChainSwapStates::from_str(status) .map_err(|_| anyhow!("Invalid ChainSwapState for Chain Swap {id}: {status}"))?; @@ -346,11 +312,16 @@ impl ChainSwapHandler { if let Some(zero_conf_rejected) = update.zero_conf_rejected { info!("Is zero conf rejected for Chain Swap {id}: {zero_conf_rejected}"); self.persister - .update_chain_swap_accept_zero_conf(id, !zero_conf_rejected)?; + .update_chain_swap_accept_zero_conf(&id, !zero_conf_rejected)?; } if let Some(transaction) = update.transaction.clone() { - self.update_swap_info(id, Pending, None, Some(&transaction.id), None, None) - .await?; + self.update_swap_info(&ChainSwapUpdate { + swap_id: id, + to_state: Pending, + user_lockup_tx_id: Some(transaction.id), + ..Default::default() + }) + .await?; } Ok(()) } @@ -379,11 +350,16 @@ impl ChainSwapHandler { } info!("Server lockup mempool transaction was verified for incoming Chain Swap {}", swap.id); - self.update_swap_info(id, Pending, Some(&transaction.id), None, None, None) - .await?; + self.update_swap_info(&ChainSwapUpdate { + swap_id: id.clone(), + to_state: Pending, + server_lockup_tx_id: Some(transaction.id), + ..Default::default() + }) + .await?; if swap.accept_zero_conf { - self.claim(id).await.map_err(|e| { + self.claim(&id).await.map_err(|e| { error!("Could not cooperate Chain Swap {id} claim: {e}"); anyhow!("Could not post claim details. Err: {e:?}") })?; @@ -416,13 +392,18 @@ impl ChainSwapHandler { // Set the server_lockup_tx_id if it is verified or not. // If it is not yet confirmed, then it will be claimed after confirmation // in rescan_incoming_chain_swap_server_lockup_tx() - self.update_swap_info(id, Pending, Some(&transaction.id), None, None, None) - .await?; + self.update_swap_info(&ChainSwapUpdate { + swap_id: id.clone(), + to_state: Pending, + server_lockup_tx_id: Some(transaction.id.clone()), + ..Default::default() + }) + .await?; match verify_res { Ok(_) => { info!("Server lockup transaction was verified for incoming Chain Swap {}", swap.id); - self.claim(id).await.map_err(|e| { + self.claim(&id).await.map_err(|e| { error!("Could not cooperate Chain Swap {id} claim: {e}"); anyhow!("Could not post claim details. Err: {e:?}") })?; @@ -459,13 +440,21 @@ impl ChainSwapHandler { match self.verify_user_lockup_tx(swap).await { Ok(_) => { info!("Chain Swap {id} user lockup tx was broadcast. Setting the swap to refundable."); - self.update_swap_info(id, Refundable, None, None, None, None) - .await?; + self.update_swap_info(&ChainSwapUpdate { + swap_id: id, + to_state: Refundable, + ..Default::default() + }) + .await?; } Err(_) => { info!("Chain Swap {id} user lockup tx was never broadcast. Resolving payment as failed."); - self.update_swap_info(id, Failed, None, None, None, None) - .await?; + self.update_swap_info(&ChainSwapUpdate { + swap_id: id, + to_state: Failed, + ..Default::default() + }) + .await?; } } } @@ -484,7 +473,7 @@ impl ChainSwapHandler { } async fn on_new_outgoing_status(&self, swap: &ChainSwap, update: &boltz::Update) -> Result<()> { - let id = &update.id; + let id = update.id.clone(); let status = &update.status; let swap_state = ChainSwapStates::from_str(status) .map_err(|_| anyhow!("Invalid ChainSwapState for Chain Swap {id}: {status}"))?; @@ -501,7 +490,7 @@ impl ChainSwapHandler { // Create the user lockup tx (_, None) => { let create_response = swap.get_boltz_create_response()?; - let user_lockup_tx = self.lockup_funds(id, &create_response).await?; + let user_lockup_tx = self.lockup_funds(&id, &create_response).await?; let lockup_tx_id = user_lockup_tx.txid().to_string(); let lockup_tx_fees_sat: u64 = user_lockup_tx.all_fees().values().sum(); @@ -517,7 +506,12 @@ impl ChainSwapHandler { is_confirmed: false, }, None, None)?; - self.update_swap_info(id, Pending, None, Some(&lockup_tx_id), None, None) + self.update_swap_info(&ChainSwapUpdate { + swap_id: id, + to_state: Pending, + user_lockup_tx_id: Some(lockup_tx_id), + ..Default::default() + }) .await?; }, @@ -532,11 +526,16 @@ impl ChainSwapHandler { if let Some(zero_conf_rejected) = update.zero_conf_rejected { info!("Is zero conf rejected for Chain Swap {id}: {zero_conf_rejected}"); self.persister - .update_chain_swap_accept_zero_conf(id, !zero_conf_rejected)?; + .update_chain_swap_accept_zero_conf(&id, !zero_conf_rejected)?; } if let Some(transaction) = update.transaction.clone() { - self.update_swap_info(id, Pending, None, Some(&transaction.id), None, None) - .await?; + self.update_swap_info(&ChainSwapUpdate { + swap_id: id, + to_state: Pending, + user_lockup_tx_id: Some(transaction.id), + ..Default::default() + }) + .await?; } Ok(()) } @@ -565,11 +564,16 @@ impl ChainSwapHandler { } info!("Server lockup mempool transaction was verified for outgoing Chain Swap {}", swap.id); - self.update_swap_info(id, Pending, Some(&transaction.id), None, None, None) - .await?; + self.update_swap_info(&ChainSwapUpdate { + swap_id: id.clone(), + to_state: Pending, + server_lockup_tx_id: Some(transaction.id), + ..Default::default() + }) + .await?; if swap.accept_zero_conf { - self.claim(id).await.map_err(|e| { + self.claim(&id).await.map_err(|e| { error!("Could not cooperate Chain Swap {id} claim: {e}"); anyhow!("Could not post claim details. Err: {e:?}") })?; @@ -612,9 +616,14 @@ impl ChainSwapHandler { "Server lockup transaction was verified for outgoing Chain Swap {}", swap.id ); - self.update_swap_info(id, Pending, Some(&transaction.id), None, None, None) - .await?; - self.claim(id).await.map_err(|e| { + self.update_swap_info(&ChainSwapUpdate { + swap_id: id.clone(), + to_state: Pending, + server_lockup_tx_id: Some(transaction.id), + ..Default::default() + }) + .await?; + self.claim(&id).await.map_err(|e| { error!("Could not cooperate Chain Swap {id} claim: {e}"); anyhow!("Could not post claim details. Err: {e:?}") })?; @@ -655,20 +664,22 @@ impl ChainSwapHandler { // Set the payment state to `RefundPending`. This ensures that the // background thread will pick it up and try to refund it // periodically - self.update_swap_info( - &swap.id, - RefundPending, - None, - None, - None, - refund_tx_id.as_deref(), - ) + self.update_swap_info(&ChainSwapUpdate { + swap_id: id, + to_state: RefundPending, + refund_tx_id, + ..Default::default() + }) .await?; } None => { warn!("Chain Swap {id} user lockup tx was never broadcast. Resolving payment as failed."); - self.update_swap_info(id, Failed, None, None, None, None) - .await?; + self.update_swap_info(&ChainSwapUpdate { + swap_id: id, + to_state: Failed, + ..Default::default() + }) + .await?; } } } @@ -721,52 +732,47 @@ impl ChainSwapHandler { Ok(lockup_tx) } + fn fetch_chain_swap_by_id(&self, swap_id: &str) -> Result { + self.persister + .fetch_chain_swap_by_id(swap_id) + .map_err(|_| PaymentError::PersistError)? + .ok_or(PaymentError::Generic { + err: format!("Chain Swap not found {swap_id}"), + }) + } + /// Transitions a Chain swap to a new state pub(crate) async fn update_swap_info( &self, - swap_id: &str, - to_state: PaymentState, - server_lockup_tx_id: Option<&str>, - user_lockup_tx_id: Option<&str>, - claim_tx_id: Option<&str>, - refund_tx_id: Option<&str>, + swap_update: &ChainSwapUpdate, ) -> Result<(), PaymentError> { - info!("Transitioning Chain swap {swap_id} to {to_state:?} (server_lockup_tx_id = {:?}, user_lockup_tx_id = {:?}, claim_tx_id = {:?}), refund_tx_id = {:?})", server_lockup_tx_id, user_lockup_tx_id, claim_tx_id, refund_tx_id); + info!("Updating Chain swap {swap_update:?}"); + let swap = self.fetch_chain_swap_by_id(&swap_update.swap_id)?; + Self::validate_state_transition(swap.state, swap_update.to_state)?; + self.persister.try_handle_chain_swap_update(swap_update)?; + let updated_swap = self.fetch_chain_swap_by_id(&swap_update.swap_id)?; - let swap: ChainSwap = self - .persister - .fetch_chain_swap_by_id(swap_id) - .map_err(|_| PaymentError::PersistError)? - .ok_or(PaymentError::Generic { - err: format!("Chain Swap not found {swap_id}"), - })?; + // Only notify subscribers if the swap changes let payment_id = match swap.direction { - Direction::Incoming => claim_tx_id.map(|c| c.to_string()).or(swap.claim_tx_id), - Direction::Outgoing => user_lockup_tx_id + Direction::Incoming => swap_update + .claim_tx_id + .clone() + .map(|c| c.to_string()) + .or(swap.claim_tx_id.clone()), + Direction::Outgoing => swap_update + .user_lockup_tx_id + .clone() .map(|c| c.to_string()) - .or(swap.user_lockup_tx_id), + .or(swap.user_lockup_tx_id.clone()), }; - - Self::validate_state_transition(swap.state, to_state)?; - self.persister.try_handle_chain_swap_update( - swap_id, - to_state, - server_lockup_tx_id, - user_lockup_tx_id, - claim_tx_id, - refund_tx_id, - )?; - if let Some(payment_id) = payment_id { - let _ = self.subscription_notifier.send(payment_id); + if updated_swap != swap { + payment_id.and_then(|payment_id| self.subscription_notifier.send(payment_id).ok()); } Ok(()) } async fn claim(&self, swap_id: &str) -> Result<(), PaymentError> { - let swap = self - .persister - .fetch_chain_swap_by_id(swap_id)? - .ok_or(anyhow!("No Chain Swap found for ID {swap_id}"))?; + let swap = self.fetch_chain_swap_by_id(swap_id)?; ensure_sdk!(swap.claim_tx_id.is_none(), PaymentError::AlreadyClaimed); debug!("Initiating claim for Chain Swap {swap_id}"); @@ -819,33 +825,34 @@ impl ChainSwapHandler { match broadcast_res { Ok(claim_tx_id) => { - if swap.direction == Direction::Incoming { - // We insert a pseudo-claim-tx in case LWK fails to pick up the new mempool tx for a while - // This makes the tx known to the SDK (get_info, list_payments) instantly - self.persister.insert_or_update_payment( - PaymentTxData { - tx_id: claim_tx_id.clone(), - timestamp: Some(utils::now()), - amount_sat: swap.receiver_amount_sat, - fees_sat: 0, - payment_type: PaymentType::Receive, - is_confirmed: false, - }, - None, - None, - )?; - } + let payment_id = match swap.direction { + Direction::Incoming => { + // We insert a pseudo-claim-tx in case LWK fails to pick up the new mempool tx for a while + // This makes the tx known to the SDK (get_info, list_payments) instantly + self.persister.insert_or_update_payment( + PaymentTxData { + tx_id: claim_tx_id.clone(), + timestamp: Some(utils::now()), + amount_sat: swap.receiver_amount_sat, + fees_sat: 0, + payment_type: PaymentType::Receive, + is_confirmed: false, + }, + None, + None, + )?; + Some(claim_tx_id.clone()) + } + Direction::Outgoing => swap.user_lockup_tx_id, + }; info!("Successfully broadcast claim tx {claim_tx_id} for Chain Swap {swap_id}"); - self.update_swap_info( - &swap.id, - Pending, - None, - None, - Some(&claim_tx_id), - None, - ) - .await + // The claim_tx_id is already set by set_chain_swap_claim_tx_id. Manually trigger notifying + // subscribers as update_swap_info will not recognise a change to the swap + payment_id.and_then(|payment_id| { + self.subscription_notifier.send(payment_id).ok() + }); + Ok(()) } Err(err) => { // Multiple attempts to broadcast have failed. Unset the swap claim_tx_id @@ -960,14 +967,12 @@ impl ChainSwapHandler { // After refund tx is broadcasted, set the payment state to `RefundPending`. This ensures: // - the swap is not shown in `list-refundables` anymore // - the background thread will move it to Failed once the refund tx confirms - self.update_swap_info( - &swap.id, - RefundPending, - None, - None, - None, - Some(&refund_tx_id), - ) + self.update_swap_info(&ChainSwapUpdate { + swap_id: swap.id, + to_state: RefundPending, + refund_tx_id: Some(refund_tx_id.clone()), + ..Default::default() + }) .await?; Ok(refund_tx_id) @@ -1036,111 +1041,58 @@ impl ChainSwapHandler { Ok(refund_tx_id) } - async fn check_swap_expiry(&self, swap: &ChainSwap) -> Result { - let swap_creation_time = UNIX_EPOCH + Duration::from_secs(swap.created_at as u64); - let duration_since_creation_time = SystemTime::now().duration_since(swap_creation_time)?; - if duration_since_creation_time.as_secs() < 60 * 10 { - return Ok(false); - } - - match swap.direction { - Direction::Incoming => { - let swap_script = swap.get_lockup_swap_script()?.as_bitcoin_script()?; - let current_height = self.bitcoin_chain_service.lock().await.tip()?.height as u32; - let locktime_from_height = - LockTime::from_height(current_height).map_err(|e| PaymentError::Generic { - err: format!("Error getting locktime from height {current_height:?}: {e}",), - })?; - - info!("Checking Chain Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", swap.id, swap_script.locktime); - Ok(swap_script.locktime.is_implied_by(locktime_from_height)) - } - Direction::Outgoing => { - let swap_script = swap.get_lockup_swap_script()?.as_liquid_script()?; - let current_height = self.liquid_chain_service.lock().await.tip().await?; - let locktime_from_height = ElementsLockTime::from_height(current_height)?; - - info!("Checking Chain Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", swap.id, swap_script.locktime); - Ok(utils::is_locktime_expired( - locktime_from_height, - swap_script.locktime, - )) - } - } - } - - pub(crate) async fn track_refunds_and_refundables(&self) -> Result<(), PaymentError> { - let pending_swaps = self.persister.list_pending_chain_swaps()?; + async fn refund_outgoing(&self, height: u32) -> Result<(), PaymentError> { + // Get all pending outgoing chain swaps with no refund tx + let pending_swaps: Vec = self + .persister + .list_pending_chain_swaps()? + .into_iter() + .filter(|s| s.direction == Direction::Outgoing && s.refund_tx_id.is_none()) + .collect(); for swap in pending_swaps { - if swap.refund_tx_id.is_some() { - continue; - } - - let has_swap_expired = self.check_swap_expiry(&swap).await.unwrap_or(false); - - if !has_swap_expired && swap.state == Pending { - continue; - } - - match swap.direction { - // Track refunds - Direction::Outgoing => { - let refund_tx_id_result: Result = match swap.state { - Pending => self.refund_outgoing_swap(&swap, false).await, - RefundPending => match has_swap_expired { - true => { - self.refund_outgoing_swap(&swap, true) - .or_else(|e| { - warn!("Failed to initiate cooperative refund, switching to non-cooperative: {e:?}"); - self.refund_outgoing_swap(&swap, false) - }) - .await - } - false => self.refund_outgoing_swap(&swap, true).await, - }, - _ => { - continue; + let swap_script = swap.get_lockup_swap_script()?.as_liquid_script()?; + let locktime_from_height = ElementsLockTime::from_height(height) + .map_err(|e| PaymentError::Generic { err: e.to_string() })?; + info!("Checking Chain Swap {} expiration: locktime_from_height = {locktime_from_height:?}, swap_script.locktime = {:?}", swap.id, swap_script.locktime); + let has_swap_expired = + utils::is_locktime_expired(locktime_from_height, swap_script.locktime); + if has_swap_expired || swap.state == RefundPending { + let refund_tx_id_res = match swap.state { + Pending => self.refund_outgoing_swap(&swap, false).await, + RefundPending => match has_swap_expired { + true => { + self.refund_outgoing_swap(&swap, true) + .or_else(|e| { + warn!("Failed to initiate cooperative refund, switching to non-cooperative: {e:?}"); + self.refund_outgoing_swap(&swap, false) + }) + .await } - }; - - if let Ok(refund_tx_id) = refund_tx_id_result { - let update_swap_info_result = self - .update_swap_info( - &swap.id, - RefundPending, - None, - None, - None, - Some(&refund_tx_id), - ) - .await; - if let Err(err) = update_swap_info_result { - warn!( - "Could not update outgoing Chain swap {} information, error: {err:?}", - swap.id - ); - }; + false => self.refund_outgoing_swap(&swap, true).await, + }, + _ => { + continue; } - } + }; - // Track refundables by verifying that the expiry has elapsed, and set the state of the incoming swap to `Refundable` - Direction::Incoming => { - if swap.user_lockup_tx_id.is_some() && has_swap_expired { - let update_swap_info_result = self - .update_swap_info(&swap.id, Refundable, None, None, None, None) - .await; - - if let Err(err) = update_swap_info_result { - warn!( - "Could not update Chain swap {} information, error: {err:?}", - swap.id - ); - } - } + if let Ok(refund_tx_id) = refund_tx_id_res { + let update_swap_info_res = self + .update_swap_info(&ChainSwapUpdate { + swap_id: swap.id.clone(), + to_state: RefundPending, + refund_tx_id: Some(refund_tx_id), + ..Default::default() + }) + .await; + if let Err(err) = update_swap_info_res { + warn!( + "Could not update outgoing Chain swap {} information: {err:?}", + swap.id + ); + }; } } } - Ok(()) } @@ -1333,8 +1285,13 @@ impl ChainSwapHandler { .ok_or(anyhow!("Script history has no transactions"))? .txid .to_hex(); - self.update_swap_info(&chain_swap.id, Pending, None, Some(&txid), None, None) - .await?; + self.update_swap_info(&ChainSwapUpdate { + swap_id: chain_swap.id.clone(), + to_state: Pending, + user_lockup_tx_id: Some(txid.clone()), + ..Default::default() + }) + .await?; Ok(txid) } } @@ -1384,7 +1341,7 @@ mod tests { use crate::{ model::{ - Direction, + ChainSwapUpdate, Direction, PaymentState::{self, *}, }, test_utils::{ @@ -1424,7 +1381,11 @@ mod tests { persister.insert_chain_swap(&chain_swap)?; assert!(chain_swap_handler - .update_swap_info(&chain_swap.id, *allowed_state, None, None, None, None) + .update_swap_info(&ChainSwapUpdate { + swap_id: chain_swap.id, + to_state: *allowed_state, + ..Default::default() + }) .await .is_ok()); } @@ -1448,7 +1409,11 @@ mod tests { persister.insert_chain_swap(&chain_swap)?; assert!(chain_swap_handler - .update_swap_info(&chain_swap.id, *disallowed_state, None, None, None, None) + .update_swap_info(&ChainSwapUpdate { + swap_id: chain_swap.id, + to_state: *disallowed_state, + ..Default::default() + }) .await .is_err()); } diff --git a/lib/core/src/model.rs b/lib/core/src/model.rs index 49ddf8950..3db5ab625 100644 --- a/lib/core/src/model.rs +++ b/lib/core/src/model.rs @@ -1,7 +1,7 @@ use std::path::PathBuf; use anyhow::{anyhow, Result}; - +use async_trait::async_trait; use boltz_client::{ bitcoin::ScriptBuf, network::Chain, @@ -565,6 +565,13 @@ pub enum GetPaymentRequest { Lightning { payment_hash: String }, } +/// Trait that can be used to react to new blocks from Bitcoin and Liquid chains +#[async_trait] +pub(crate) trait BlockListener: Send + Sync { + async fn on_bitcoin_block(&self, height: u32); + async fn on_liquid_block(&self, height: u32); +} + // A swap enum variant #[derive(Clone, Debug)] pub(crate) enum Swap { @@ -629,7 +636,7 @@ impl FromSql for Direction { /// A chain swap /// /// See -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub(crate) struct ChainSwap { pub(crate) id: String, pub(crate) direction: Direction, @@ -758,8 +765,19 @@ impl ChainSwap { } } +#[derive(Clone, Debug, Default)] +pub(crate) struct ChainSwapUpdate { + pub(crate) swap_id: String, + pub(crate) to_state: PaymentState, + pub(crate) server_lockup_tx_id: Option, + pub(crate) user_lockup_tx_id: Option, + pub(crate) claim_address: Option, + pub(crate) claim_tx_id: Option, + pub(crate) refund_tx_id: Option, +} + /// A submarine swap, used for Send -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub(crate) struct SendSwap { pub(crate) id: String, /// Bolt11 or Bolt12 invoice. This is determined by whether `bolt12_offer` is set or not. @@ -844,7 +862,7 @@ impl SendSwap { } /// A reverse swap, used for Receive -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq)] pub(crate) struct ReceiveSwap { pub(crate) id: String, pub(crate) preimage: String, @@ -860,12 +878,8 @@ pub(crate) struct ReceiveSwap { pub(crate) claim_fees_sat: u64, /// Persisted as soon as a claim tx is broadcast pub(crate) claim_tx_id: Option, - /// Persisted only when the lockup tx is broadcast - pub(crate) lockup_tx_id: Option, /// The address reserved for a magic routing hint payment pub(crate) mrh_address: String, - /// The script pubkey for a magic routing hint payment - pub(crate) mrh_script_pubkey: String, /// Persisted only if a transaction is sent to the `mrh_address` pub(crate) mrh_tx_id: Option, /// Until the lockup tx is seen in the mempool, it contains the swap creation time. @@ -953,8 +967,9 @@ pub struct RefundableSwap { } /// The payment state of an individual payment. -#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Hash)] +#[derive(Clone, Copy, Debug, Default, Eq, PartialEq, Serialize, Hash)] pub enum PaymentState { + #[default] Created = 0, /// ## Receive Swaps diff --git a/lib/core/src/persist/chain.rs b/lib/core/src/persist/chain.rs index 9be65cffe..563eebadc 100644 --- a/lib/core/src/persist/chain.rs +++ b/lib/core/src/persist/chain.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use anyhow::Result; use boltz_client::swaps::boltz::{ChainSwapDetails, CreateChainResponse}; use rusqlite::{named_params, params, Connection, Row, TransactionBehavior}; @@ -190,63 +188,29 @@ impl Persister { pub(crate) fn list_chain_swaps_by_state( &self, - con: &Connection, states: Vec, ) -> Result> { + let con = self.get_connection()?; let where_clause = vec![get_where_clause_state_in(&states)]; - self.list_chain_swaps_where(con, where_clause) + self.list_chain_swaps_where(&con, where_clause) } - pub(crate) fn list_ongoing_chain_swaps(&self, con: &Connection) -> Result> { - self.list_chain_swaps_by_state(con, vec![PaymentState::Created, PaymentState::Pending]) + pub(crate) fn list_ongoing_chain_swaps(&self) -> Result> { + let con = self.get_connection()?; + let where_clause = vec![get_where_clause_state_in(&[ + PaymentState::Created, + PaymentState::Pending, + ])]; + + self.list_chain_swaps_where(&con, where_clause) } pub(crate) fn list_pending_chain_swaps(&self) -> Result> { - let con: Connection = self.get_connection()?; - self.list_chain_swaps_by_state( - &con, - vec![PaymentState::Pending, PaymentState::RefundPending], - ) + self.list_chain_swaps_by_state(vec![PaymentState::Pending, PaymentState::RefundPending]) } pub(crate) fn list_refundable_chain_swaps(&self) -> Result> { - let con: Connection = self.get_connection()?; - self.list_chain_swaps_by_state(&con, vec![PaymentState::Refundable]) - } - - /// Pending Chain swaps, indexed by refund tx id - pub(crate) fn list_pending_chain_swaps_by_refund_tx_id( - &self, - ) -> Result> { - let res: HashMap = self - .list_pending_chain_swaps()? - .iter() - .filter_map(|pending_chain_swap| { - pending_chain_swap - .refund_tx_id - .as_ref() - .map(|refund_tx_id| (refund_tx_id.clone(), pending_chain_swap.clone())) - }) - .collect(); - Ok(res) - } - - /// This only returns the swaps that have a claim tx, skipping the pending ones that are being refunded. - pub(crate) fn list_pending_chain_swaps_by_claim_tx_id( - &self, - ) -> Result> { - let con: Connection = self.get_connection()?; - let res: HashMap = self - .list_chain_swaps_by_state(&con, vec![PaymentState::Pending])? - .iter() - .filter_map(|pending_chain_swap| { - pending_chain_swap - .claim_tx_id - .as_ref() - .map(|claim_tx_id| (claim_tx_id.clone(), pending_chain_swap.clone())) - }) - .collect(); - Ok(res) + self.list_chain_swaps_by_state(vec![PaymentState::Refundable]) } pub(crate) fn update_chain_swap_accept_zero_conf( @@ -332,14 +296,9 @@ impl Persister { pub(crate) fn try_handle_chain_swap_update( &self, - swap_id: &str, - to_state: PaymentState, - server_lockup_tx_id: Option<&str>, - user_lockup_tx_id: Option<&str>, - claim_tx_id: Option<&str>, - refund_tx_id: Option<&str>, + swap_update: &ChainSwapUpdate, ) -> Result<(), PaymentError> { - // Do not overwrite server_lockup_tx_id, user_lockup_tx_id, claim_tx_id, refund_tx_id + // Do not overwrite server_lockup_tx_id, user_lockup_tx_id, claim_address, claim_tx_id, refund_tx_id let mut con = self.get_connection()?; let tx = con.transaction_with_behavior(TransactionBehavior::Immediate)?; @@ -358,6 +317,12 @@ impl Persister { ELSE user_lockup_tx_id END, + claim_address = + CASE + WHEN claim_address IS NULL THEN :claim_address + ELSE claim_address + END, + claim_tx_id = CASE WHEN claim_tx_id IS NULL THEN :claim_tx_id @@ -374,12 +339,13 @@ impl Persister { WHERE id = :id", named_params! { - ":id": swap_id, - ":server_lockup_tx_id": server_lockup_tx_id, - ":user_lockup_tx_id": user_lockup_tx_id, - ":claim_tx_id": claim_tx_id, - ":refund_tx_id": refund_tx_id, - ":state": to_state, + ":id": swap_update.swap_id, + ":server_lockup_tx_id": swap_update.server_lockup_tx_id, + ":user_lockup_tx_id": swap_update.user_lockup_tx_id, + ":claim_address": swap_update.claim_address, + ":claim_tx_id": swap_update.claim_tx_id, + ":refund_tx_id": swap_update.refund_tx_id, + ":state": swap_update.to_state, }, )?; diff --git a/lib/core/src/persist/migrations.rs b/lib/core/src/persist/migrations.rs index 462c9e62d..acac55e45 100644 --- a/lib/core/src/persist/migrations.rs +++ b/lib/core/src/persist/migrations.rs @@ -207,5 +207,6 @@ pub(crate) fn current_migrations() -> Vec<&'static str> { schema_version TEXT NOT NULL, data BLOB NOT NULL ) STRICT;", + "ALTER TABLE receive_swaps DROP COLUMN mrh_script_pubkey;", ] } diff --git a/lib/core/src/persist/mod.rs b/lib/core/src/persist/mod.rs index 9adea4419..8658fb75d 100644 --- a/lib/core/src/persist/mod.rs +++ b/lib/core/src/persist/mod.rs @@ -7,7 +7,7 @@ pub(crate) mod receive; pub(crate) mod send; pub(crate) mod sync; -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::{fs::create_dir_all, path::PathBuf, str::FromStr}; use crate::error::PaymentError; @@ -15,9 +15,11 @@ use crate::lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription}; use crate::model::*; use crate::{get_invoice_description, utils}; use anyhow::{anyhow, Result}; +use lwk_wollet::WalletTx; use migrations::current_migrations; use rusqlite::{params, params_from_iter, Connection, OptionalExtension, Row, ToSql}; use rusqlite_migration::{Migrations, M}; +use sdk_common::bitcoin::hashes::hex::ToHex; use tokio::sync::mpsc::Sender; const DEFAULT_DB_FILENAME: &str = "storage.sql"; @@ -93,6 +95,35 @@ impl Persister { } } + pub(crate) fn insert_or_update_payment_with_wallet_tx( + &self, + tx: &WalletTx, + ) -> Result<(), PaymentError> { + let tx_id = tx.txid.to_string(); + let is_tx_confirmed = tx.height.is_some(); + let amount_sat = tx.balance.values().sum::(); + let maybe_script_pubkey = tx + .outputs + .iter() + .find(|output| output.is_some()) + .and_then(|output| output.clone().map(|o| o.script_pubkey.to_hex())); + self.insert_or_update_payment( + PaymentTxData { + tx_id: tx_id.clone(), + timestamp: tx.timestamp, + amount_sat: amount_sat.unsigned_abs(), + fees_sat: tx.fee, + payment_type: match amount_sat >= 0 { + true => PaymentType::Receive, + false => PaymentType::Send, + }, + is_confirmed: is_tx_confirmed, + }, + maybe_script_pubkey, + None, + ) + } + pub(crate) fn insert_or_update_payment( &self, ptx: PaymentTxData, @@ -138,19 +169,18 @@ impl Persister { } pub(crate) fn list_ongoing_swaps(&self) -> Result> { - let con = self.get_connection()?; let ongoing_send_swaps: Vec = self - .list_ongoing_send_swaps(&con)? + .list_ongoing_send_swaps()? .into_iter() .map(Swap::Send) .collect(); let ongoing_receive_swaps: Vec = self - .list_ongoing_receive_swaps(&con)? + .list_ongoing_receive_swaps()? .into_iter() .map(Swap::Receive) .collect(); let ongoing_chain_swaps: Vec = self - .list_ongoing_chain_swaps(&con)? + .list_ongoing_chain_swaps()? .into_iter() .map(Swap::Chain) .collect(); @@ -478,6 +508,28 @@ impl Persister { .collect(); Ok(payments) } + + pub fn get_payments_by_tx_id( + &self, + req: &ListPaymentsRequest, + ) -> Result> { + let res: HashMap = self + .get_payments(req)? + .into_iter() + .flat_map(|payment| { + // Index payments by both tx_id (lockup/claim) and refund_tx_id + let mut res = vec![]; + if let Some(tx_id) = payment.tx_id.clone() { + res.push((tx_id, payment.clone())); + } + if let Some(refund_tx_id) = payment.get_refund_tx_id() { + res.push((refund_tx_id, payment)); + } + res + }) + .collect(); + Ok(res) + } } fn filter_to_where_clause(req: &ListPaymentsRequest) -> (String, Vec>) { diff --git a/lib/core/src/persist/receive.rs b/lib/core/src/persist/receive.rs index b237db9e3..49e93b9dd 100644 --- a/lib/core/src/persist/receive.rs +++ b/lib/core/src/persist/receive.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use anyhow::Result; use boltz_client::swaps::boltz::CreateReverseResponse; use rusqlite::{named_params, params, Connection, Row, TransactionBehavior}; @@ -33,10 +31,9 @@ impl Persister { created_at, claim_fees_sat, mrh_address, - mrh_script_pubkey, state ) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ( &receive_swap.id, id_hash, @@ -50,7 +47,6 @@ impl Persister { &receive_swap.created_at, &receive_swap.claim_fees_sat, &receive_swap.mrh_address, - &receive_swap.mrh_script_pubkey, &receive_swap.state, ), )?; @@ -99,9 +95,7 @@ impl Persister { rs.receiver_amount_sat, rs.claim_fees_sat, rs.claim_tx_id, - rs.lockup_tx_id, rs.mrh_address, - rs.mrh_script_pubkey, rs.mrh_tx_id, rs.created_at, rs.state @@ -144,12 +138,10 @@ impl Persister { receiver_amount_sat: row.get(8)?, claim_fees_sat: row.get(9)?, claim_tx_id: row.get(10)?, - lockup_tx_id: row.get(11)?, - mrh_address: row.get(12)?, - mrh_script_pubkey: row.get(13)?, - mrh_tx_id: row.get(14)?, - created_at: row.get(15)?, - state: row.get(16)?, + mrh_address: row.get(11)?, + mrh_tx_id: row.get(12)?, + created_at: row.get(13)?, + state: row.get(14)?, }) } @@ -172,66 +164,14 @@ impl Persister { Ok(ongoing_receive) } - pub(crate) fn list_ongoing_receive_swaps(&self, con: &Connection) -> Result> { + pub(crate) fn list_ongoing_receive_swaps(&self) -> Result> { + let con = self.get_connection()?; let where_clause = vec![get_where_clause_state_in(&[ PaymentState::Created, PaymentState::Pending, ])]; - self.list_receive_swaps_where(con, where_clause) - } - - pub(crate) fn list_pending_receive_swaps(&self) -> Result> { - let con: Connection = self.get_connection()?; - let query = Self::list_receive_swaps_query(vec!["state = ?1".to_string()]); - let res = con - .prepare(&query)? - .query_map( - params![PaymentState::Pending], - Self::sql_row_to_receive_swap, - )? - .map(|i| i.unwrap()) - .collect(); - Ok(res) - } - - /// Ongoing Receive Swaps with no claim or lockup transactions, indexed by mrh_script_pubkey - pub(crate) fn list_ongoing_receive_swaps_by_mrh_script_pubkey( - &self, - ) -> Result> { - let con: Connection = self.get_connection()?; - let res = self - .list_ongoing_receive_swaps(&con)? - .iter() - .filter_map(|swap| { - match ( - swap.lockup_tx_id.clone(), - swap.claim_tx_id.clone(), - swap.mrh_script_pubkey.is_empty(), - ) { - (None, None, false) => Some((swap.mrh_script_pubkey.clone(), swap.clone())), - _ => None, - } - }) - .collect(); - Ok(res) - } - - /// Pending Receive Swaps, indexed by claim_tx_id - pub(crate) fn list_pending_receive_swaps_by_claim_tx_id( - &self, - ) -> Result> { - let res = self - .list_pending_receive_swaps()? - .iter() - .filter_map(|pending_receive_swap| { - pending_receive_swap - .claim_tx_id - .as_ref() - .map(|claim_tx_id| (claim_tx_id.clone(), pending_receive_swap.clone())) - }) - .collect(); - Ok(res) + self.list_receive_swaps_where(&con, where_clause) } // Only set the Receive Swap claim_tx_id if not set, otherwise return an error @@ -418,13 +358,9 @@ mod tests { // List ongoing receive swaps storage.insert_receive_swap(&new_receive_swap(Some(PaymentState::Pending)))?; - let ongoing_swaps = storage.list_ongoing_receive_swaps(&con)?; + let ongoing_swaps = storage.list_ongoing_receive_swaps()?; assert_eq!(ongoing_swaps.len(), 4); - // List pending receive swaps - let ongoing_swaps = storage.list_pending_receive_swaps()?; - assert_eq!(ongoing_swaps.len(), 1); - Ok(()) } diff --git a/lib/core/src/persist/send.rs b/lib/core/src/persist/send.rs index 4a2f11178..ca318b5fa 100644 --- a/lib/core/src/persist/send.rs +++ b/lib/core/src/persist/send.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use anyhow::Result; use boltz_client::swaps::boltz::CreateSubmarineResponse; use rusqlite::{named_params, params, Connection, Row}; @@ -169,13 +167,14 @@ impl Persister { Ok(ongoing_send) } - pub(crate) fn list_ongoing_send_swaps(&self, con: &Connection) -> Result> { + pub(crate) fn list_ongoing_send_swaps(&self) -> Result> { + let con = self.get_connection()?; let where_clause = vec![get_where_clause_state_in(&[ PaymentState::Created, PaymentState::Pending, ])]; - self.list_send_swaps_where(con, where_clause) + self.list_send_swaps_where(&con, where_clause) } pub(crate) fn list_pending_send_swaps(&self) -> Result> { @@ -187,21 +186,14 @@ impl Persister { self.list_send_swaps_where(&con, where_clause) } - /// Pending Send swaps, indexed by refund tx id - pub(crate) fn list_pending_send_swaps_by_refund_tx_id( - &self, - ) -> Result> { - let res: HashMap = self - .list_pending_send_swaps()? - .iter() - .filter_map(|pending_send_swap| { - pending_send_swap - .refund_tx_id - .as_ref() - .map(|refund_tx_id| (refund_tx_id.clone(), pending_send_swap.clone())) - }) - .collect(); - Ok(res) + pub(crate) fn list_pending_and_ongoing_send_swaps(&self) -> Result> { + let con = self.get_connection()?; + let where_clause = vec![get_where_clause_state_in(&[ + PaymentState::Created, + PaymentState::Pending, + PaymentState::RefundPending, + ])]; + self.list_send_swaps_where(&con, where_clause) } pub(crate) fn try_handle_send_swap_update( @@ -385,7 +377,7 @@ mod tests { // List ongoing send swaps storage.insert_send_swap(&new_send_swap(Some(PaymentState::Pending)))?; - let ongoing_swaps = storage.list_ongoing_send_swaps(&con)?; + let ongoing_swaps = storage.list_ongoing_send_swaps()?; assert_eq!(ongoing_swaps.len(), 4); // List pending send swaps diff --git a/lib/core/src/receive_swap.rs b/lib/core/src/receive_swap.rs index 730befc9e..7aa49b77d 100644 --- a/lib/core/src/receive_swap.rs +++ b/lib/core/src/receive_swap.rs @@ -62,10 +62,7 @@ impl ReceiveSwapHandler { let status = &update.status; let swap_state = RevSwapStates::from_str(status) .map_err(|_| anyhow!("Invalid RevSwapState for Receive Swap {id}: {status}"))?; - let receive_swap = self - .persister - .fetch_receive_swap_by_id(id)? - .ok_or(anyhow!("No ongoing Receive Swap found for ID {id}"))?; + let receive_swap = self.fetch_receive_swap_by_id(id)?; info!("Handling Receive Swap transition to {swap_state:?} for swap {id}"); @@ -236,6 +233,14 @@ impl ReceiveSwapHandler { } } + fn fetch_receive_swap_by_id(&self, swap_id: &str) -> Result { + self.persister + .fetch_receive_swap_by_id(swap_id) + .map_err(|_| PaymentError::PersistError)? + .ok_or(PaymentError::Generic { + err: format!("Receive Swap not found {swap_id}"), + }) + } /// Transitions a Receive swap to a new state pub(crate) async fn update_swap_info( &self, @@ -250,20 +255,7 @@ impl ReceiveSwapHandler { "Transitioning Receive swap {} to {:?} (claim_tx_id = {:?}, lockup_tx_id = {:?}, mrh_tx_id = {:?})", swap_id, to_state, claim_tx_id, lockup_tx_id, mrh_tx_id ); - - let swap = self - .persister - .fetch_receive_swap_by_id(swap_id) - .map_err(|_| PaymentError::PersistError)? - .ok_or(PaymentError::Generic { - err: format!("Receive Swap not found {swap_id}"), - })?; - let payment_id = claim_tx_id - .or(lockup_tx_id) - .or(mrh_tx_id) - .map(|id| id.to_string()) - .or(swap.claim_tx_id); - + let swap = self.fetch_receive_swap_by_id(swap_id)?; Self::validate_state_transition(swap.state, to_state)?; self.persister.try_handle_receive_swap_update( swap_id, @@ -273,18 +265,26 @@ impl ReceiveSwapHandler { mrh_tx_id, mrh_amount_sat, )?; + let updated_swap = self.fetch_receive_swap_by_id(swap_id)?; - if let Some(payment_id) = payment_id { - let _ = self.subscription_notifier.send(payment_id); + if mrh_tx_id.is_some() { + self.persister.delete_reserved_address(&swap.mrh_address)?; + } + + // Only notify subscribers if the swap changes + let payment_id = claim_tx_id + .or(mrh_tx_id) + .map(|id| id.to_string()) + .or(swap.claim_tx_id.clone()) + .or(swap.mrh_tx_id.clone()); + if updated_swap != swap { + payment_id.and_then(|payment_id| self.subscription_notifier.send(payment_id).ok()); } Ok(()) } async fn claim(&self, swap_id: &str) -> Result<(), PaymentError> { - let swap = self - .persister - .fetch_receive_swap_by_id(swap_id)? - .ok_or(anyhow!("No Receive Swap found for ID {swap_id}"))?; + let swap = self.fetch_receive_swap_by_id(swap_id)?; ensure_sdk!(swap.claim_tx_id.is_none(), PaymentError::AlreadyClaimed); info!("Initiating claim for Receive Swap {swap_id}"); @@ -335,15 +335,10 @@ impl ReceiveSwapHandler { )?; info!("Successfully broadcast claim tx {claim_tx_id} for Receive Swap {swap_id}"); - self.update_swap_info( - swap_id, - Pending, - Some(&claim_tx_id), - None, - None, - None, - ) - .await + // The claim_tx_id is already set by set_receive_swap_claim_tx_id. Manually trigger notifying + // subscribers as update_swap_info will not recognise a change to the swap + _ = self.subscription_notifier.send(claim_tx_id); + Ok(()) } Err(err) => { // Multiple attempts to broadcast have failed. Unset the swap claim_tx_id diff --git a/lib/core/src/restore.rs b/lib/core/src/restore.rs index d3f51170a..21b9176ef 100644 --- a/lib/core/src/restore.rs +++ b/lib/core/src/restore.rs @@ -3,8 +3,11 @@ use std::collections::HashMap; use anyhow::{anyhow, Result}; -use log::{error, info}; -use lwk_wollet::elements::Txid; +use boltz_client::ElementsAddress; +use log::{debug, error, warn}; +use lwk_wollet::elements::{secp256k1_zkp, AddressParams, Txid}; +use lwk_wollet::elements_miniscript::slip77::MasterBlindingKey; +use lwk_wollet::hashes::hex::{DisplayHex, FromHex}; use lwk_wollet::WalletTx; use crate::prelude::*; @@ -29,135 +32,185 @@ impl TxMap { } } -trait PartialSwapState { +pub(crate) trait PartialSwapState { /// Determine partial swap state, based on recovered chain data. /// /// This is a partial state, which means it may be incomplete because it's based on partial /// information. Some swap states cannot be determined based only on chain data. - /// - /// For example, it cannot distinguish between [PaymentState::Created] and [PaymentState::TimedOut], - /// and in some cases, between [PaymentState::Created] and [PaymentState::Failed]. - fn derive_partial_state(&self) -> PaymentState; + /// In these cases we do not assume any swap state. + fn derive_partial_state(&self) -> Option; } pub(crate) struct RecoveredOnchainDataSend { - lockup_tx_id: Option, - claim_tx_id: Option, - refund_tx_id: Option, + pub(crate) lockup_tx_id: Option, + pub(crate) claim_tx_id: Option, + pub(crate) refund_tx_id: Option, } impl PartialSwapState for RecoveredOnchainDataSend { - fn derive_partial_state(&self) -> PaymentState { + fn derive_partial_state(&self) -> Option { match &self.lockup_tx_id { Some(_) => match &self.claim_tx_id { - Some(_) => PaymentState::Complete, + Some(_) => Some(PaymentState::Complete), None => match &self.refund_tx_id { Some(refund_tx_id) => match refund_tx_id.confirmed() { - true => PaymentState::Failed, - false => PaymentState::RefundPending, + true => Some(PaymentState::Failed), + false => Some(PaymentState::RefundPending), }, - None => PaymentState::Pending, + None => Some(PaymentState::Pending), }, }, - // For Send swaps, no lockup could mean both Created or TimedOut. - // However, we're in Created for a very short period of time in the originating instance, - // after which we expect Pending or TimedOut. Therefore here we default to TimedOut. - None => PaymentState::TimedOut, + // We have no onchain data to support deriving the state as the swap could + // potentially be Created, TimedOut or Failed after expiry. In this case we return None. + None => None, } } } pub(crate) struct RecoveredOnchainDataReceive { - lockup_tx_id: Option, - claim_tx_id: Option, + pub(crate) lockup_tx_id: Option, + pub(crate) claim_tx_id: Option, + pub(crate) mrh_tx_id: Option, + pub(crate) mrh_amount_sat: Option, } impl PartialSwapState for RecoveredOnchainDataReceive { - fn derive_partial_state(&self) -> PaymentState { - match (&self.lockup_tx_id, &self.claim_tx_id) { - (Some(_), Some(claim_tx_id)) => match claim_tx_id.confirmed() { - true => PaymentState::Complete, - false => PaymentState::Pending, + fn derive_partial_state(&self) -> Option { + match &self.lockup_tx_id { + Some(_) => match &self.claim_tx_id { + Some(claim_tx_id) => match claim_tx_id.confirmed() { + true => Some(PaymentState::Complete), + false => Some(PaymentState::Pending), + }, + None => Some(PaymentState::Pending), + }, + None => match &self.mrh_tx_id { + Some(mrh_tx_id) => match mrh_tx_id.confirmed() { + true => Some(PaymentState::Complete), + false => Some(PaymentState::Pending), + }, + // We have no onchain data to support deriving the state as the swap could + // potentially be Created or Failed after expiry. In this case we return None. + None => None, }, - (Some(_), None) => PaymentState::Pending, - // TODO How to distinguish between Failed and Created (if in both cases, no lockup or claim tx present) - // See https://docs.boltz.exchange/v/api/lifecycle#reverse-submarine-swaps - _ => PaymentState::Created, } } } pub(crate) struct RecoveredOnchainDataChainSend { /// LBTC tx initiated by the SDK (the "user" as per Boltz), sending funds to the swap funding address. - lbtc_user_lockup_tx_id: Option, + pub(crate) lbtc_user_lockup_tx_id: Option, /// LBTC tx initiated by the SDK to itself, in case the initial funds have to be refunded. - lbtc_refund_tx_id: Option, + pub(crate) lbtc_refund_tx_id: Option, /// BTC tx locking up funds by the swapper - btc_server_lockup_tx_id: Option, + pub(crate) btc_server_lockup_tx_id: Option, /// BTC tx that claims to the final BTC destination address. The final step in a successful swap. - btc_claim_tx_id: Option, + pub(crate) btc_claim_tx_id: Option, } impl PartialSwapState for RecoveredOnchainDataChainSend { - fn derive_partial_state(&self) -> PaymentState { + fn derive_partial_state(&self) -> Option { match &self.lbtc_user_lockup_tx_id { Some(_) => match &self.btc_claim_tx_id { Some(btc_claim_tx_id) => match btc_claim_tx_id.confirmed() { - true => PaymentState::Complete, - false => PaymentState::Pending, + true => Some(PaymentState::Complete), + false => Some(PaymentState::Pending), }, None => match &self.lbtc_refund_tx_id { Some(tx) => match tx.confirmed() { - true => PaymentState::Failed, - false => PaymentState::RefundPending, + true => Some(PaymentState::Failed), + false => Some(PaymentState::RefundPending), }, - None => PaymentState::Created, + None => Some(PaymentState::Pending), }, }, - // For Send swaps, no lockup could mean both Created or TimedOut. - // However, we're in Created for a very short period of time in the originating instance, - // after which we expect Pending or TimedOut. Therefore here we default to TimedOut. - None => PaymentState::TimedOut, + // We have no onchain data to support deriving the state as the swap could + // potentially be Created, TimedOut or Failed after expiry. In this case we return None. + None => None, } } } pub(crate) struct RecoveredOnchainDataChainReceive { /// LBTC tx locking up funds by the swapper - lbtc_server_lockup_tx_id: Option, + pub(crate) lbtc_server_lockup_tx_id: Option, /// LBTC tx that claims to our wallet. The final step in a successful swap. - lbtc_server_claim_tx_id: Option, + pub(crate) lbtc_claim_tx_id: Option, + /// LBTC tx out address for the claim tx. + pub(crate) lbtc_claim_address: Option, /// BTC tx initiated by the payer (the "user" as per Boltz), sending funds to the swap funding address. - btc_user_lockup_tx_id: Option, + pub(crate) btc_user_lockup_tx_id: Option, /// BTC tx initiated by the SDK to a user-chosen address, in case the initial funds have to be refunded. - btc_refund_tx_id: Option, + pub(crate) btc_refund_tx_id: Option, } impl PartialSwapState for RecoveredOnchainDataChainReceive { - fn derive_partial_state(&self) -> PaymentState { + fn derive_partial_state(&self) -> Option { match &self.btc_user_lockup_tx_id { - Some(_) => match &self.lbtc_server_claim_tx_id { - Some(lbtc_server_claim_tx_id) => match lbtc_server_claim_tx_id.confirmed() { - true => PaymentState::Complete, - false => PaymentState::Pending, + Some(_) => match &self.lbtc_claim_tx_id { + Some(lbtc_claim_tx_id) => match lbtc_claim_tx_id.confirmed() { + true => Some(PaymentState::Complete), + false => Some(PaymentState::Pending), }, None => match &self.btc_refund_tx_id { Some(tx) => match tx.confirmed() { - true => PaymentState::Failed, - false => PaymentState::RefundPending, + true => Some(PaymentState::Failed), + false => Some(PaymentState::RefundPending), }, - None => PaymentState::Created, + None => Some(PaymentState::Pending), }, }, - None => PaymentState::Created, + // We have no onchain data to support deriving the state as the swap could + // potentially be Created or Failed after expiry. In this case we return None. + None => None, } } } pub(crate) struct RecoveredOnchainData { - send: HashMap, - receive: HashMap, - chain_send: HashMap, - chain_receive: HashMap, + pub(crate) send: HashMap, + pub(crate) receive: HashMap, + pub(crate) chain_send: HashMap, + pub(crate) chain_receive: HashMap, } impl LiquidSdk { + pub(crate) async fn get_monitored_swaps_list(&self, partial_sync: bool) -> Result { + let receive_swaps = self.persister.list_ongoing_receive_swaps()?; + match partial_sync { + false => { + let bitcoin_height = self.bitcoin_chain_service.lock().await.tip()?.height as u32; + let liquid_height = self.liquid_chain_service.lock().await.tip().await?; + let final_swap_states = [PaymentState::Complete, PaymentState::Failed]; + + let send_swaps = self.persister.list_pending_and_ongoing_send_swaps()?; + let chain_swaps: Vec = self + .persister + .list_chain_swaps()? + .into_iter() + .filter(|swap| match swap.direction { + Direction::Incoming => { + bitcoin_height + <= swap.timeout_block_height + + CHAIN_SWAP_MONITORING_PERIOD_BITCOIN_BLOCKS + } + Direction::Outgoing => { + !final_swap_states.contains(&swap.state) + && liquid_height <= swap.timeout_block_height + } + }) + .collect(); + let (send_chain_swaps, receive_chain_swaps): (Vec, Vec) = + chain_swaps + .into_iter() + .partition(|swap| swap.direction == Direction::Outgoing); + SwapsList::all( + send_swaps, + receive_swaps, + send_chain_swaps, + receive_chain_swaps, + ) + } + true => SwapsList::receive_only(receive_swaps), + } + } + /// For each swap, recovers data from chain services. /// /// The returned data include txs and the partial swap state. See [PartialSwapState::derive_partial_state]. @@ -169,12 +222,14 @@ impl LiquidSdk { /// /// - `tx_map`: all known onchain txs of this wallet at this time, essentially our own LWK cache. /// - `swaps`: immutable data of the swaps for which we want to recover onchain data. + /// - `partial_sync`: recovers related scripts like MRH when true, otherwise recovers all scripts. pub(crate) async fn recover_from_onchain( &self, tx_map: TxMap, swaps: SwapsList, + partial_sync: bool, ) -> Result { - let histories = self.fetch_swaps_histories(&swaps).await?; + let histories = self.fetch_swaps_histories(&swaps, partial_sync).await?; let recovered_send_data = self .recover_send_swap_tx_ids(&tx_map, histories.send) @@ -186,14 +241,14 @@ impl LiquidSdk { .recover_send_chain_swap_tx_ids( &tx_map, histories.send_chain, - &swaps.send_chain_swap_immutable_db_by_swap_id, + &swaps.send_chain_swap_immutable_data_by_swap_id, ) .await?; let recovered_chain_receive_data = self .recover_receive_chain_swap_tx_ids( &tx_map, histories.receive_chain, - &swaps.receive_chain_swap_immutable_db_by_swap_id, + &swaps.receive_chain_swap_immutable_data_by_swap_id, ) .await?; @@ -205,7 +260,7 @@ impl LiquidSdk { }) } - /// Reconstruct Send Swap tx IDs from the onchain data and the immutable DB data + /// Reconstruct Send Swap tx IDs from the onchain data and the immutable data async fn recover_send_swap_tx_ids( &self, tx_map: &TxMap, @@ -213,6 +268,8 @@ impl LiquidSdk { ) -> Result> { let mut res: HashMap = HashMap::new(); for (swap_id, history) in send_histories_by_swap_id { + debug!("[Recover Send] Checking swap {swap_id}"); + // If a history tx is one of our outgoing txs, it's a lockup tx let lockup_tx_id = history .iter() @@ -248,7 +305,7 @@ impl LiquidSdk { Ok(res) } - /// Reconstruct Receive Swap tx IDs from the onchain data and the immutable DB data + /// Reconstruct Receive Swap tx IDs from the onchain data and the immutable data async fn recover_receive_swap_tx_ids( &self, tx_map: &TxMap, @@ -256,13 +313,25 @@ impl LiquidSdk { ) -> Result> { let mut res: HashMap = HashMap::new(); for (swap_id, history) in receive_histories_by_swap_id { - let (lockup_tx_id, claim_tx_id) = match history.len() { + debug!("[Recover Receive] Checking swap {swap_id}"); + + let mrh_tx_id = history + .lbtc_mrh_script_history + .iter() + .find(|&tx| tx_map.incoming_tx_map.contains_key::(&tx.txid)) + .cloned(); + let mrh_amount_sat = mrh_tx_id + .clone() + .and_then(|h| tx_map.incoming_tx_map.get(&h.txid)) + .map(|tx| tx.balance.values().sum::().unsigned_abs()); + + let (lockup_tx_id, claim_tx_id) = match history.lbtc_claim_script_history.len() { // Only lockup tx available - 1 => (Some(history[0].clone()), None), + 1 => (Some(history.lbtc_claim_script_history[0].clone()), None), 2 => { - let first = history[0].clone(); - let second = history[1].clone(); + let first = history.lbtc_claim_script_history[0].clone(); + let second = history.lbtc_claim_script_history[1].clone(); if tx_map.incoming_tx_map.contains_key::(&first.txid) { // If the first tx is a known incoming tx, it's the claim tx and the second is the lockup @@ -288,40 +357,51 @@ impl LiquidSdk { // If neither is confirmed, this is an edge-case (false, false) => { - error!("Found unconfirmed lockup and refund txs while recovering data for Receive Swap {swap_id}"); + warn!("Found unconfirmed lockup and refund txs while recovering data for Receive Swap {swap_id}"); (None, None) } } } } n => { - error!("Script history with unexpected length {n} found while recovering data for Receive Swap {swap_id}"); + warn!("Script history with length {n} found while recovering data for Receive Swap {swap_id}"); (None, None) } }; - res.insert( - swap_id, - RecoveredOnchainDataReceive { + // Take only the lockup_tx_id and claim_tx_id if either are set, + // otherwise take the mrh_tx_id and mrh_amount_sat + let recovered_onchain_data = match (lockup_tx_id.as_ref(), claim_tx_id.as_ref()) { + (Some(_), None) | (Some(_), Some(_)) => RecoveredOnchainDataReceive { lockup_tx_id, claim_tx_id, + mrh_tx_id: None, + mrh_amount_sat: None, }, - ); + _ => RecoveredOnchainDataReceive { + lockup_tx_id: None, + claim_tx_id: None, + mrh_tx_id, + mrh_amount_sat, + }, + }; + + res.insert(swap_id, recovered_onchain_data); } Ok(res) } - /// Reconstruct Chain Send Swap tx IDs from the onchain data and the immutable DB data + /// Reconstruct Chain Send Swap tx IDs from the onchain data and the immutable data async fn recover_send_chain_swap_tx_ids( &self, tx_map: &TxMap, chain_send_histories_by_swap_id: HashMap, - send_chain_swap_immutable_db_by_swap_id: &HashMap, + send_chain_swap_immutable_data_by_swap_id: &HashMap, ) -> Result> { let mut res: HashMap = HashMap::new(); for (swap_id, history) in chain_send_histories_by_swap_id { - info!("[Recover Chain Send] Checking swap {swap_id}"); + debug!("[Recover Chain Send] Checking swap {swap_id}"); // If a history tx is one of our outgoing txs, it's a lockup tx let lbtc_user_lockup_tx_id = history @@ -352,7 +432,7 @@ impl LiquidSdk { let first_tx_id = history.btc_claim_script_history[0].clone(); let second_tx_id = history.btc_claim_script_history[1].clone(); - let btc_lockup_script = send_chain_swap_immutable_db_by_swap_id + let btc_lockup_script = send_chain_swap_immutable_data_by_swap_id .get(&swap_id) .map(|imm| imm.claim_script.clone()) .ok_or_else(|| { @@ -371,7 +451,7 @@ impl LiquidSdk { } } n => { - error!("BTC script history with unexpected length {n} found while recovering data for Chain Send Swap {swap_id}"); + warn!("BTC script history with length {n} found while recovering data for Chain Send Swap {swap_id}"); (None, None) } }; @@ -390,23 +470,38 @@ impl LiquidSdk { Ok(res) } - /// Reconstruct Chain Receive Swap tx IDs from the onchain data and the immutable DB data + /// Reconstruct Chain Receive Swap tx IDs from the onchain data and the immutable data data async fn recover_receive_chain_swap_tx_ids( &self, tx_map: &TxMap, chain_receive_histories_by_swap_id: HashMap, - receive_chain_swap_immutable_db_by_swap_id: &HashMap, + receive_chain_swap_immutable_data_by_swap_id: &HashMap< + String, + ReceiveChainSwapImmutableData, + >, ) -> Result> { + let blinding_key = MasterBlindingKey::from_hex( + &self + .signer + .slip77_master_blinding_key()? + .to_lower_hex_string(), + )?; + let secp = secp256k1_zkp::Secp256k1::new(); + let mut res: HashMap = HashMap::new(); for (swap_id, history) in chain_receive_histories_by_swap_id { - info!("[Recover Chain Receive] Checking swap {swap_id}"); + debug!("[Recover Chain Receive] Checking swap {swap_id}"); - let (lbtc_server_lockup_tx_id, lbtc_server_claim_tx_id) = match history + let (lbtc_server_lockup_tx_id, lbtc_claim_tx_id, lbtc_claim_address) = match history .lbtc_claim_script_history .len() { // Only lockup tx available - 1 => (Some(history.lbtc_claim_script_history[0].clone()), None), + 1 => ( + Some(history.lbtc_claim_script_history[0].clone()), + None, + None, + ), 2 => { let first = &history.lbtc_claim_script_history[0]; @@ -418,11 +513,35 @@ impl LiquidSdk { true => (second, first), false => (first, second), }; - (Some(lockup_tx_id.clone()), Some(claim_tx_id.clone())) + + // Get the claim address from the claim tx output + let claim_address = tx_map + .incoming_tx_map + .get(&claim_tx_id.txid) + .and_then(|tx| { + tx.outputs + .iter() + .find(|output| output.is_some()) + .and_then(|output| output.clone().map(|o| o.script_pubkey)) + }) + .and_then(|script| { + ElementsAddress::from_script( + &script, + Some(blinding_key.blinding_key(&secp, &script)), + &AddressParams::LIQUID, + ) + .map(|addr| addr.to_string()) + }); + + ( + Some(lockup_tx_id.clone()), + Some(claim_tx_id.clone()), + claim_address, + ) } n => { - error!("L-BTC script history with unexpected length {n} found while recovering data for Chain Receive Swap {swap_id}"); - (None, None) + warn!("L-BTC script history with length {n} found while recovering data for Chain Receive Swap {swap_id}"); + (None, None, None) } }; @@ -445,7 +564,7 @@ impl LiquidSdk { let first_tx_id = history.btc_lockup_script_history[0].clone(); let second_tx_id = history.btc_lockup_script_history[1].clone(); - let btc_lockup_script = receive_chain_swap_immutable_db_by_swap_id + let btc_lockup_script = receive_chain_swap_immutable_data_by_swap_id .get(&swap_id) .map(|imm| imm.lockup_script.clone()) .ok_or_else(|| { @@ -466,14 +585,14 @@ impl LiquidSdk { } } n => { - error!("BTC script history with unexpected length {n} found while recovering data for Chain Receive Swap {swap_id}"); + warn!("BTC script history with length {n} found while recovering data for Chain Receive Swap {swap_id}"); (None, None) } }; // The second BTC tx is only a refund in case we didn't claim. // If we claimed, then the second BTC tx was an internal BTC server claim tx, which we're not tracking. - let btc_refund_tx_id = match lbtc_server_claim_tx_id.is_some() { + let btc_refund_tx_id = match lbtc_claim_tx_id.is_some() { true => None, false => btc_second_tx_id, }; @@ -482,7 +601,8 @@ impl LiquidSdk { swap_id, RecoveredOnchainDataChainReceive { lbtc_server_lockup_tx_id, - lbtc_server_claim_tx_id, + lbtc_claim_tx_id, + lbtc_claim_address, btc_user_lockup_tx_id, btc_refund_tx_id, }, @@ -493,14 +613,15 @@ impl LiquidSdk { } } -/// Methods to simulate the immutable DB data available from real-time sync +/// Methods to simulate the immutable data data available from real-time sync // TODO Remove once real-time sync is integrated pub(crate) mod immutable { use std::collections::HashMap; + use std::str::FromStr; use anyhow::{anyhow, ensure, Result}; - use boltz_client::{BtcSwapScript, LBtcSwapScript}; - use log::{error, info}; + use boltz_client::{BtcSwapScript, ElementsAddress, LBtcSwapScript}; + use log::{debug, error}; use lwk_wollet::elements::Txid; use lwk_wollet::History; @@ -511,7 +632,6 @@ pub(crate) mod immutable { type LBtcScript = lwk_wollet::elements::Script; pub(crate) type SendSwapHistory = Vec; - pub(crate) type ReceiveSwapHistory = Vec; #[derive(Clone)] pub(crate) struct HistoryTxId { @@ -544,15 +664,22 @@ pub(crate) mod immutable { #[derive(Clone)] pub(crate) struct SendSwapImmutableData { pub(crate) swap_id: String, - pub(crate) swap_script: LBtcSwapScript, - pub(crate) script: LBtcScript, + pub(crate) lockup_swap_script: LBtcSwapScript, + pub(crate) lockup_script: LBtcScript, } #[derive(Clone)] pub(crate) struct ReceiveSwapImmutableData { pub(crate) swap_id: String, - pub(crate) swap_script: LBtcSwapScript, - pub(crate) script: LBtcScript, + pub(crate) timeout_block_height: u32, + pub(crate) claim_swap_script: LBtcSwapScript, + pub(crate) claim_script: LBtcScript, + pub(crate) mrh_script: Option, + } + + pub(crate) struct ReceiveSwapHistory { + pub(crate) lbtc_claim_script_history: Vec, + pub(crate) lbtc_mrh_script_history: Vec, } #[derive(Clone)] @@ -585,24 +712,48 @@ pub(crate) mod immutable { pub(crate) btc_lockup_script_txs: Vec, } - /// Swap data received from the immutable DB + /// Swap immutable data pub(crate) struct SwapsList { - pub(crate) send_swap_immutable_db_by_swap_id: HashMap, - pub(crate) receive_swap_immutable_db_by_swap_id_: HashMap, - pub(crate) send_chain_swap_immutable_db_by_swap_id: + pub(crate) send_swap_immutable_data_by_swap_id: HashMap, + pub(crate) receive_swap_immutable_data_by_swap_id: + HashMap, + pub(crate) send_chain_swap_immutable_data_by_swap_id: HashMap, - pub(crate) receive_chain_swap_immutable_db_by_swap_id: + pub(crate) receive_chain_swap_immutable_data_by_swap_id: HashMap, } impl SwapsList { + pub(crate) fn all( + send_swaps: Vec, + receive_swaps: Vec, + send_chain_swaps: Vec, + receive_chain_swaps: Vec, + ) -> Result { + SwapsList::init( + send_swaps, + receive_swaps, + send_chain_swaps, + receive_chain_swaps, + ) + } + + pub(crate) fn receive_only(receive_swaps: Vec) -> Result { + SwapsList::init( + Default::default(), + receive_swaps, + Default::default(), + Default::default(), + ) + } + fn init( send_swaps: Vec, receive_swaps: Vec, send_chain_swaps: Vec, receive_chain_swaps: Vec, ) -> Result { - let send_swap_immutable_db_by_swap_id: HashMap = + let send_swap_immutable_data_by_swap_id: HashMap = send_swaps .iter() .filter_map(|swap| match swap.get_swap_script() { @@ -611,8 +762,8 @@ pub(crate) mod immutable { swap.id.clone(), SendSwapImmutableData { swap_id: swap.id.clone(), - swap_script: swap_script.clone(), - script: address.script_pubkey(), + lockup_swap_script: swap_script.clone(), + lockup_script: address.script_pubkey(), }, )), None => { @@ -626,29 +777,29 @@ pub(crate) mod immutable { } }) .collect(); - let send_swap_immutable_db_size = send_swap_immutable_db_by_swap_id.len(); - info!("Send Swap immutable DB: {send_swap_immutable_db_size} rows"); - - let receive_swap_immutable_db_by_swap_id_: HashMap = + let receive_swap_immutable_data_by_swap_id: HashMap = receive_swaps .iter() .filter_map(|swap| { let swap_id = &swap.id; - + let create_response = swap.get_boltz_create_response().ok()?; let swap_script = swap .get_swap_script() - .map_err(|e| { + .inspect_err(|e| { error!("Failed to get swap script for Receive Swap {swap_id}: {e}") }) .ok()?; + let mrh_address = ElementsAddress::from_str(&swap.mrh_address).ok(); match &swap_script.funding_addrs { Some(address) => Some(( swap.id.clone(), ReceiveSwapImmutableData { swap_id: swap.id.clone(), - swap_script: swap_script.clone(), - script: address.script_pubkey(), + timeout_block_height: create_response.timeout_block_height, + claim_swap_script: swap_script.clone(), + claim_script: address.script_pubkey(), + mrh_script: mrh_address.map(|s| s.script_pubkey()), }, )), None => { @@ -658,10 +809,7 @@ pub(crate) mod immutable { } }) .collect(); - let receive_swap_immutable_db_size = receive_swap_immutable_db_by_swap_id_.len(); - info!("Receive Swap immutable DB: {receive_swap_immutable_db_size} rows"); - - let send_chain_swap_immutable_db_by_swap_id: HashMap = + let send_chain_swap_immutable_data_by_swap_id: HashMap = send_chain_swaps.iter().filter_map(|swap| { let swap_id = &swap.id; @@ -694,10 +842,7 @@ pub(crate) mod immutable { } }) .collect(); - let send_chain_swap_immutable_db_size = send_chain_swap_immutable_db_by_swap_id.len(); - info!("Send Chain Swap immutable DB: {send_chain_swap_immutable_db_size} rows"); - - let receive_chain_swap_immutable_db_by_swap_id: HashMap = + let receive_chain_swap_immutable_data_by_swap_id: HashMap = receive_chain_swaps.iter().filter_map(|swap| { let swap_id = &swap.id; @@ -728,23 +873,34 @@ pub(crate) mod immutable { } }) .collect(); - let receive_chain_swap_immutable_db_size = - receive_chain_swap_immutable_db_by_swap_id.len(); - info!("Receive Chain Swap immutable DB: {receive_chain_swap_immutable_db_size} rows"); + + let send_swap_immutable_data_size = send_swap_immutable_data_by_swap_id.len(); + let receive_swap_immutable_data_size = receive_swap_immutable_data_by_swap_id.len(); + let send_chain_swap_immutable_data_size = + send_chain_swap_immutable_data_by_swap_id.len(); + let receive_chain_swap_immutable_data_size = + receive_chain_swap_immutable_data_by_swap_id.len(); + debug!( + "Immutable data items: send {}, receive {}, chain send {}, chain receive {}", + send_swap_immutable_data_size, + receive_swap_immutable_data_size, + send_chain_swap_immutable_data_size, + receive_chain_swap_immutable_data_size + ); Ok(SwapsList { - send_swap_immutable_db_by_swap_id, - receive_swap_immutable_db_by_swap_id_, - send_chain_swap_immutable_db_by_swap_id, - receive_chain_swap_immutable_db_by_swap_id, + send_swap_immutable_data_by_swap_id, + receive_swap_immutable_data_by_swap_id, + send_chain_swap_immutable_data_by_swap_id, + receive_chain_swap_immutable_data_by_swap_id, }) } fn send_swaps_by_script(&self) -> HashMap { - self.send_swap_immutable_db_by_swap_id + self.send_swap_immutable_data_by_swap_id .clone() .into_values() - .map(|imm| (imm.script.clone(), imm)) + .map(|imm| (imm.lockup_script.clone(), imm)) .collect() } @@ -765,11 +921,19 @@ pub(crate) mod immutable { data } - fn receive_swaps_by_script(&self) -> HashMap { - self.receive_swap_immutable_db_by_swap_id_ + fn receive_swaps_by_claim_script(&self) -> HashMap { + self.receive_swap_immutable_data_by_swap_id .clone() .into_values() - .map(|imm| (imm.script.clone(), imm)) + .map(|imm| (imm.claim_script.clone(), imm)) + .collect() + } + + fn receive_swaps_by_mrh_script(&self) -> HashMap { + self.receive_swap_immutable_data_by_swap_id + .clone() + .into_values() + .filter_map(|imm| imm.mrh_script.clone().map(|mrh_script| (mrh_script, imm))) .collect() } @@ -777,14 +941,57 @@ pub(crate) mod immutable { &self, lbtc_script_to_history_map: &HashMap>, ) -> HashMap { - let receive_swaps_by_script = self.receive_swaps_by_script(); + let receive_swaps_by_claim_script = self.receive_swaps_by_claim_script(); + let receive_swaps_by_mrh_script = self.receive_swaps_by_mrh_script(); let mut data: HashMap = HashMap::new(); lbtc_script_to_history_map .iter() .for_each(|(lbtc_script, lbtc_script_history)| { - if let Some(imm) = receive_swaps_by_script.get(lbtc_script) { - data.insert(imm.swap_id.clone(), lbtc_script_history.clone()); + if let Some(imm) = receive_swaps_by_claim_script.get(lbtc_script) { + // The MRH script history filtered by the swap timeout block height + let mrh_script_history = imm + .mrh_script + .clone() + .and_then(|mrh_script| { + lbtc_script_to_history_map.get(&mrh_script).map(|h| { + h.iter() + .filter(|&tx_history| { + tx_history.height < imm.timeout_block_height as i32 + }) + .cloned() + .collect::>() + }) + }) + .unwrap_or_default(); + data.insert( + imm.swap_id.clone(), + ReceiveSwapHistory { + lbtc_claim_script_history: lbtc_script_history.clone(), + lbtc_mrh_script_history: mrh_script_history, + }, + ); + } + if let Some(imm) = receive_swaps_by_mrh_script.get(lbtc_script) { + let claim_script_history = lbtc_script_to_history_map + .get(&imm.claim_script) + .cloned() + .unwrap_or_default(); + // The MRH script history filtered by the swap timeout block height + let mrh_script_history = lbtc_script_history + .iter() + .filter(|&tx_history| { + tx_history.height < imm.timeout_block_height as i32 + }) + .cloned() + .collect::>(); + data.insert( + imm.swap_id.clone(), + ReceiveSwapHistory { + lbtc_claim_script_history: claim_script_history, + lbtc_mrh_script_history: mrh_script_history, + }, + ); } }); data @@ -793,7 +1000,7 @@ pub(crate) mod immutable { fn send_chain_swaps_by_lbtc_lockup_script( &self, ) -> HashMap { - self.send_chain_swap_immutable_db_by_swap_id + self.send_chain_swap_immutable_data_by_swap_id .clone() .into_values() .map(|imm| (imm.lockup_script.clone(), imm)) @@ -838,7 +1045,7 @@ pub(crate) mod immutable { fn receive_chain_swaps_by_lbtc_claim_script( &self, ) -> HashMap { - self.receive_chain_swap_immutable_db_by_swap_id + self.receive_chain_swap_immutable_data_by_swap_id .clone() .into_values() .map(|imm| (imm.claim_script.clone(), imm)) @@ -881,55 +1088,65 @@ pub(crate) mod immutable { data } - fn get_all_swap_lbtc_scripts(&self) -> Vec { - let send_swap_scripts: Vec = self - .send_swap_immutable_db_by_swap_id - .clone() - .into_values() - .map(|imm| imm.script) - .collect(); - let receive_swap_scripts: Vec = self - .receive_swap_immutable_db_by_swap_id_ - .clone() - .into_values() - .map(|imm| imm.script) - .collect(); - let send_chain_swap_lbtc_lockup_scripts: Vec = self - .send_chain_swap_immutable_db_by_swap_id - .clone() - .into_values() - .map(|imm| imm.lockup_script) - .collect(); - let receive_chain_swap_lbtc_claim_scripts: Vec = self - .receive_chain_swap_immutable_db_by_swap_id + fn get_swap_lbtc_scripts(&self, partial_sync: bool) -> Vec { + let receive_swap_lbtc_mrh_scripts: Vec = self + .receive_swap_immutable_data_by_swap_id .clone() .into_values() - .map(|imm| imm.claim_script) + .filter_map(|imm| imm.mrh_script) .collect(); - - let mut swap_scripts = send_swap_scripts.clone(); - swap_scripts.extend(receive_swap_scripts.clone()); - swap_scripts.extend(send_chain_swap_lbtc_lockup_scripts.clone()); - swap_scripts.extend(receive_chain_swap_lbtc_claim_scripts.clone()); + let mut swap_scripts = receive_swap_lbtc_mrh_scripts.clone(); + if !partial_sync { + let send_swap_scripts: Vec = self + .send_swap_immutable_data_by_swap_id + .clone() + .into_values() + .map(|imm| imm.lockup_script) + .collect(); + let receive_swap_lbtc_claim_scripts: Vec = self + .receive_swap_immutable_data_by_swap_id + .clone() + .into_values() + .map(|imm| imm.claim_script) + .collect(); + let send_chain_swap_lbtc_lockup_scripts: Vec = self + .send_chain_swap_immutable_data_by_swap_id + .clone() + .into_values() + .map(|imm| imm.lockup_script) + .collect(); + let receive_chain_swap_lbtc_claim_scripts: Vec = self + .receive_chain_swap_immutable_data_by_swap_id + .clone() + .into_values() + .map(|imm| imm.claim_script) + .collect(); + swap_scripts.extend(send_swap_scripts.clone()); + swap_scripts.extend(receive_swap_lbtc_claim_scripts.clone()); + swap_scripts.extend(send_chain_swap_lbtc_lockup_scripts.clone()); + swap_scripts.extend(receive_chain_swap_lbtc_claim_scripts.clone()); + } swap_scripts } - fn get_all_swap_btc_scripts(&self) -> Vec { - let send_chain_swap_btc_claim_scripts: Vec = self - .send_chain_swap_immutable_db_by_swap_id - .clone() - .into_values() - .map(|imm| imm.claim_script) - .collect(); - let receive_chain_swap_btc_lockup_scripts: Vec = self - .receive_chain_swap_immutable_db_by_swap_id - .clone() - .into_values() - .map(|imm| imm.lockup_script) - .collect(); - - let mut swap_scripts = send_chain_swap_btc_claim_scripts.clone(); - swap_scripts.extend(receive_chain_swap_btc_lockup_scripts.clone()); + fn get_swap_btc_scripts(&self, partial_sync: bool) -> Vec { + let mut swap_scripts = vec![]; + if !partial_sync { + let send_chain_swap_btc_claim_scripts: Vec = self + .send_chain_swap_immutable_data_by_swap_id + .clone() + .into_values() + .map(|imm| imm.claim_script) + .collect(); + let receive_chain_swap_btc_lockup_scripts: Vec = self + .receive_chain_swap_immutable_data_by_swap_id + .clone() + .into_values() + .map(|imm| imm.lockup_script) + .collect(); + swap_scripts.extend(send_chain_swap_btc_claim_scripts.clone()); + swap_scripts.extend(receive_chain_swap_btc_lockup_scripts.clone()); + } swap_scripts } } @@ -951,7 +1168,7 @@ pub(crate) mod immutable { .into_iter() .partition(|swap| swap.direction == Direction::Outgoing); - SwapsList::init( + SwapsList::all( send_swaps, receive_swaps, send_chain_swaps, @@ -963,8 +1180,9 @@ pub(crate) mod immutable { pub(crate) async fn fetch_swaps_histories( &self, swaps_list: &SwapsList, + partial_sync: bool, ) -> Result { - let swap_lbtc_scripts = swaps_list.get_all_swap_lbtc_scripts(); + let swap_lbtc_scripts = swaps_list.get_swap_lbtc_scripts(partial_sync); let lbtc_script_histories = self .liquid_chain_service @@ -985,7 +1203,7 @@ pub(crate) mod immutable { .map(|(k, v)| (k, v.into_iter().map(HistoryTxId::from).collect())) .collect(); - let swap_btc_scripts = swaps_list.get_all_swap_btc_scripts(); + let swap_btc_scripts = swaps_list.get_swap_btc_scripts(partial_sync); let btc_script_histories = self .bitcoin_chain_service .lock() diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index 85ef01bd2..bfa2f85f9 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::time::Instant; use std::{fs, path::PathBuf, str::FromStr, sync::Arc, time::Duration}; @@ -13,11 +12,13 @@ use futures_util::{StreamExt, TryFutureExt}; use lnurl::auth::SdkLnurlAuthSigner; use log::{debug, error, info, warn}; use lwk_wollet::bitcoin::base64::Engine as _; -use lwk_wollet::elements::{AssetId, Txid}; +use lwk_wollet::elements::AssetId; use lwk_wollet::elements_miniscript::elements::bitcoin::bip32::Xpub; use lwk_wollet::hashes::{sha256, Hash}; use lwk_wollet::secp256k1::ThirtyTwoByteHash; -use lwk_wollet::{ElementsNetwork, WalletTx}; +use lwk_wollet::ElementsNetwork; +use restore::immutable::HistoryTxId; +use restore::{PartialSwapState, TxMap}; use sdk_common::bitcoin::hashes::hex::ToHex; use sdk_common::input_parser::InputType; use sdk_common::liquid::LiquidAddressData; @@ -284,23 +285,6 @@ impl LiquidSdk { /// /// Internal method. Should only be used as part of [LiquidSdk::start]. async fn start_background_tasks(self: &Arc) -> SdkResult<()> { - // Periodically run sync() in the background - let sdk_clone = self.clone(); - let mut shutdown_rx_sync_loop = self.shutdown_receiver.clone(); - tokio::spawn(async move { - loop { - _ = sdk_clone.sync().await; - - tokio::select! { - _ = tokio::time::sleep(Duration::from_secs(30)) => {} - _ = shutdown_rx_sync_loop.changed() => { - info!("Received shutdown signal, exiting periodic sync loop"); - return; - } - } - } - }); - let reconnect_handler = Box::new(SwapperReconnectHandler::new( self.persister.clone(), self.status_stream.clone(), @@ -309,16 +293,12 @@ impl LiquidSdk { .clone() .start(reconnect_handler, self.shutdown_receiver.clone()) .await; - self.chain_swap_handler - .clone() - .start(self.shutdown_receiver.clone()) - .await; self.sync_service .clone() .start(self.shutdown_receiver.clone()) .await?; + self.track_new_blocks().await; self.track_swap_updates().await; - self.track_pending_swaps().await; Ok(()) } @@ -341,6 +321,61 @@ impl LiquidSdk { Ok(()) } + async fn track_new_blocks(self: &Arc) { + let cloned = self.clone(); + tokio::spawn(async move { + let mut current_liquid_block: u32 = 0; + let mut current_bitcoin_block: u32 = 0; + let mut shutdown_receiver = cloned.shutdown_receiver.clone(); + let mut interval = tokio::time::interval(Duration::from_secs(10)); + interval.set_missed_tick_behavior(MissedTickBehavior::Skip); + loop { + tokio::select! { + _ = interval.tick() => { + // Get the Liquid tip and process a new block + let liquid_tip_res = cloned.liquid_chain_service.lock().await.tip().await; + match liquid_tip_res { + Ok(height) => { + debug!("Got Liquid tip: {height}"); + if height > current_liquid_block { + // Sync on new Liquid block + _ = cloned.sync().await; + // Update swap handlers + cloned.chain_swap_handler.on_liquid_block(height).await; + cloned.send_swap_handler.on_liquid_block(height).await; + current_liquid_block = height; + } else { + // Partial sync of wallet txs + _ = cloned.sync_payments_with_chain_data(true).await; + } + }, + Err(e) => error!("Failed to fetch Liquid tip {e}"), + }; + // Get the Bitcoin tip and process a new block + let bitcoin_tip_res = cloned.bitcoin_chain_service.lock().await.tip().map(|tip| tip.height as u32); + match bitcoin_tip_res { + Ok(height) => { + debug!("Got Bitcoin tip: {height}"); + if height > current_bitcoin_block { + // Update swap handlers + cloned.chain_swap_handler.on_bitcoin_block(height).await; + cloned.send_swap_handler.on_bitcoin_block(height).await; + current_bitcoin_block = height; + } + }, + Err(e) => error!("Failed to fetch Bitcoin tip {e}"), + }; + } + + _ = shutdown_receiver.changed() => { + info!("Received shutdown signal, exiting track blocks loop"); + return; + } + } + } + }); + } + async fn track_swap_updates(self: &Arc) { let cloned = self.clone(); tokio::spawn(async move { @@ -399,31 +434,6 @@ impl LiquidSdk { }); } - async fn track_pending_swaps(self: &Arc) { - let cloned = self.clone(); - tokio::spawn(async move { - let mut shutdown_receiver = cloned.shutdown_receiver.clone(); - let mut interval = tokio::time::interval(Duration::from_secs(60)); - interval.set_missed_tick_behavior(MissedTickBehavior::Skip); - loop { - tokio::select! { - _ = interval.tick() => { - if let Err(err) = cloned.send_swap_handler.track_refunds().await { - warn!("Could not refund expired swaps, error: {err:?}"); - } - if let Err(err) = cloned.chain_swap_handler.track_refunds_and_refundables().await { - warn!("Could not refund expired swaps, error: {err:?}"); - } - }, - _ = shutdown_receiver.changed() => { - info!("Received shutdown signal, exiting pending swaps loop"); - return; - } - } - } - }); - } - async fn notify_event_listeners(&self, e: SdkEvent) -> Result<()> { self.event_manager.notify(e).await; Ok(()) @@ -463,25 +473,42 @@ impl LiquidSdk { Pending => { match &payment.details.get_swap_id() { Some(swap_id) => match self.persister.fetch_swap_by_id(swap_id)? { - Swap::Chain(ChainSwap { claim_tx_id, .. }) - | Swap::Receive(ReceiveSwap { claim_tx_id, .. }) => { - match claim_tx_id { - Some(_) => { - // The claim tx has now been broadcast - self.notify_event_listeners( - SdkEvent::PaymentWaitingConfirmation { - details: payment, - }, - ) - .await? - } - None => { - // The lockup tx is in the mempool/confirmed - self.notify_event_listeners( - SdkEvent::PaymentPending { details: payment }, - ) - .await? - } + Swap::Chain(ChainSwap { claim_tx_id, .. }) => { + if claim_tx_id.is_some() { + // The claim tx has now been broadcast + self.notify_event_listeners( + SdkEvent::PaymentWaitingConfirmation { + details: payment, + }, + ) + .await? + } else { + // The lockup tx is in the mempool/confirmed + self.notify_event_listeners(SdkEvent::PaymentPending { + details: payment, + }) + .await? + } + } + Swap::Receive(ReceiveSwap { + claim_tx_id, + mrh_tx_id, + .. + }) => { + if claim_tx_id.is_some() || mrh_tx_id.is_some() { + // The a claim or mrh tx has now been broadcast + self.notify_event_listeners( + SdkEvent::PaymentWaitingConfirmation { + details: payment, + }, + ) + .await? + } else { + // The lockup tx is in the mempool/confirmed + self.notify_event_listeners(SdkEvent::PaymentPending { + details: payment, + }) + .await? } } Swap::Send(_) => { @@ -1605,7 +1632,11 @@ impl LiquidSdk { debug!("Timeout occurred without payment, set swap to timed out"); match swap { Swap::Send(_) => self.send_swap_handler.update_swap_info(&expected_swap_id, TimedOut, None, None, None).await?, - Swap::Chain(_) => self.chain_swap_handler.update_swap_info(&expected_swap_id, TimedOut, None, None, None, None).await?, + Swap::Chain(_) => self.chain_swap_handler.update_swap_info(&ChainSwapUpdate { + swap_id: expected_swap_id, + to_state: TimedOut, + ..Default::default() + }).await?, _ => () } return Err(PaymentError::PaymentTimeout) @@ -1901,9 +1932,7 @@ impl LiquidSdk { receiver_amount_sat, claim_fees_sat: reverse_pair.fees.claim_estimate(), claim_tx_id: None, - lockup_tx_id: None, mrh_address: mrh_addr_str, - mrh_script_pubkey: mrh_addr.to_unconfidential().script_pubkey().to_hex(), mrh_tx_id: None, created_at: utils::now(), state: PaymentState::Created, @@ -2120,8 +2149,14 @@ impl LiquidSdk { /// (within last [CHAIN_SWAP_MONITORING_PERIOD_BITCOIN_BLOCKS] blocks = ~30 days), calling this /// is not necessary as it happens automatically in the background. pub async fn rescan_onchain_swaps(&self) -> SdkResult<()> { + let height = self + .bitcoin_chain_service + .lock() + .await + .tip() + .map(|tip| tip.height as u32)?; self.chain_swap_handler - .rescan_incoming_user_lockup_txs(true) + .rescan_incoming_refunds(height, true) .await?; Ok(()) } @@ -2204,136 +2239,189 @@ impl LiquidSdk { /// This method fetches the chain tx data (onchain and mempool) using LWK. For every wallet tx, /// it inserts or updates a corresponding entry in our Payments table. - async fn sync_payments_with_chain_data(&self, with_scan: bool) -> Result<()> { - let payments_before_sync: HashMap = self - .list_payments(&ListPaymentsRequest::default()) - .await? - .into_iter() - .flat_map(|payment| { - // Index payments by both tx_id (lockup/claim) and refund_tx_id - let mut res = vec![]; - if let Some(tx_id) = payment.tx_id.clone() { - res.push((tx_id, payment.clone())); - } - if let Some(refund_tx_id) = payment.get_refund_tx_id() { - res.push((refund_tx_id, payment)); + async fn sync_payments_with_chain_data(&self, partial_sync: bool) -> Result<()> { + self.onchain_wallet.full_scan().await?; + + let mut tx_map = self.onchain_wallet.transactions_by_tx_id().await?; + let swaps_list = self.get_monitored_swaps_list(partial_sync).await?; + let recovered_onchain_data = self + .recover_from_onchain( + TxMap::from_raw_tx_map(tx_map.clone()), + swaps_list, + partial_sync, + ) + .await?; + + let wallet_amount_sat = tx_map + .values() + .map(|tx| tx.balance.values().sum::()) + .sum::(); + debug!("Onchain wallet balance: {wallet_amount_sat} sats"); + + // Loop over the recovered chain data for monitored receive swaps + for (swap_id, receive_data) in recovered_onchain_data.receive { + if let Some(to_state) = receive_data.derive_partial_state() { + let lockup_tx_id = receive_data.lockup_tx_id.map(|h| h.txid.to_string()); + let claim_tx_id = receive_data.claim_tx_id.clone().map(|h| h.txid.to_string()); + let mrh_tx_id = receive_data.mrh_tx_id.clone().map(|h| h.txid.to_string()); + let history_updates = vec![receive_data.claim_tx_id, receive_data.mrh_tx_id] + .into_iter() + .flatten() + .collect::>(); + for history in history_updates { + if let Some(tx) = tx_map.remove(&history.txid) { + self.persister + .insert_or_update_payment_with_wallet_tx(&tx)?; + } } - res - }) - .collect(); - if with_scan { - self.onchain_wallet.full_scan().await?; + _ = self + .receive_swap_handler + .update_swap_info( + &swap_id, + to_state, + claim_tx_id.as_deref(), + lockup_tx_id.as_deref(), + mrh_tx_id.as_deref(), + receive_data.mrh_amount_sat, + ) + .await; + } } - let pending_receive_swaps_by_claim_tx_id = - self.persister.list_pending_receive_swaps_by_claim_tx_id()?; - let ongoing_receive_swaps_by_mrh_script_pubkey = self - .persister - .list_ongoing_receive_swaps_by_mrh_script_pubkey()?; - let pending_send_swaps_by_refund_tx_id = - self.persister.list_pending_send_swaps_by_refund_tx_id()?; - let pending_chain_swaps_by_claim_tx_id = - self.persister.list_pending_chain_swaps_by_claim_tx_id()?; - let pending_chain_swaps_by_refund_tx_id = - self.persister.list_pending_chain_swaps_by_refund_tx_id()?; - - let tx_map: HashMap = self - .onchain_wallet - .transactions() - .await? - .iter() - .map(|tx| (tx.txid, tx.clone())) - .collect(); - - for tx in tx_map.values() { - let tx_id = tx.txid.to_string(); - let is_tx_confirmed = tx.height.is_some(); - let amount_sat = tx.balance.values().sum::(); - let maybe_script_pubkey = tx - .outputs - .iter() - .find(|output| output.is_some()) - .and_then(|output| output.clone().map(|o| o.script_pubkey.to_hex())); - let mrh_script_pubkey = maybe_script_pubkey.clone().unwrap_or_default(); - - self.persister.insert_or_update_payment( - PaymentTxData { - tx_id: tx_id.clone(), - timestamp: tx.timestamp, - amount_sat: amount_sat.unsigned_abs(), - fees_sat: tx.fee, - payment_type: match amount_sat >= 0 { - true => PaymentType::Receive, - false => PaymentType::Send, - }, - is_confirmed: is_tx_confirmed, - }, - maybe_script_pubkey, - None, - )?; - - if let Some(swap) = pending_receive_swaps_by_claim_tx_id.get(&tx_id) { - if is_tx_confirmed { - self.receive_swap_handler - .update_swap_info(&swap.id, Complete, None, None, None, None) - .await?; + // Loop over the recovered chain data for monitored send swaps + for (swap_id, send_data) in recovered_onchain_data.send { + if let Some(to_state) = send_data.derive_partial_state() { + let lockup_tx_id = send_data.lockup_tx_id.clone().map(|h| h.txid.to_string()); + let refund_tx_id = send_data.refund_tx_id.clone().map(|h| h.txid.to_string()); + let history_updates = vec![send_data.lockup_tx_id, send_data.refund_tx_id] + .into_iter() + .flatten() + .collect::>(); + for history in history_updates { + if let Some(tx) = tx_map.remove(&history.txid) { + self.persister + .insert_or_update_payment_with_wallet_tx(&tx)?; + } } - } else if let Some(swap) = - ongoing_receive_swaps_by_mrh_script_pubkey.get(&mrh_script_pubkey) - { - // Update the swap status according to the MRH tx confirmation state - let to_state = match is_tx_confirmed { - true => Complete, - false => Pending, - }; - self.receive_swap_handler + _ = self + .send_swap_handler .update_swap_info( - &swap.id, + &swap_id, to_state, None, - None, - Some(&tx_id), - Some(amount_sat.unsigned_abs()), + lockup_tx_id.as_deref(), + refund_tx_id.as_deref(), ) - .await?; - // Remove the used MRH address from the reserved addresses - self.persister.delete_reserved_address(&swap.mrh_address)?; - } else if let Some(swap) = pending_send_swaps_by_refund_tx_id.get(&tx_id) { - if is_tx_confirmed { - self.send_swap_handler - .update_swap_info(&swap.id, Failed, None, None, None) - .await?; - } - } else if let Some(swap) = pending_chain_swaps_by_claim_tx_id.get(&tx_id) { - if is_tx_confirmed { - self.chain_swap_handler - .update_swap_info(&swap.id, Complete, None, None, None, None) - .await?; + .await; + } + } + + // Loop over the recovered chain data for monitored chain receive swaps + for (swap_id, chain_receive_data) in recovered_onchain_data.chain_receive { + if let Some(to_state) = chain_receive_data.derive_partial_state() { + let server_lockup_tx_id = chain_receive_data + .lbtc_server_lockup_tx_id + .map(|h| h.txid.to_string()); + let user_lockup_tx_id = chain_receive_data + .btc_user_lockup_tx_id + .map(|h| h.txid.to_string()); + let claim_tx_id = chain_receive_data + .lbtc_claim_tx_id + .clone() + .map(|h| h.txid.to_string()); + let refund_tx_id = chain_receive_data + .btc_refund_tx_id + .map(|h| h.txid.to_string()); + if let Some(history) = chain_receive_data.lbtc_claim_tx_id { + if let Some(tx) = tx_map.remove(&history.txid) { + self.persister + .insert_or_update_payment_with_wallet_tx(&tx)?; + } } - } else if let Some(swap) = pending_chain_swaps_by_refund_tx_id.get(&tx_id) { - if is_tx_confirmed { - self.chain_swap_handler - .update_swap_info(&swap.id, Failed, None, None, None, None) - .await?; + _ = self + .chain_swap_handler + .update_swap_info(&ChainSwapUpdate { + swap_id, + to_state, + server_lockup_tx_id, + user_lockup_tx_id, + claim_address: chain_receive_data.lbtc_claim_address, + claim_tx_id, + refund_tx_id, + }) + .await; + } + } + + // Loop over the recovered chain data for monitored chain send swaps + for (swap_id, chain_send_data) in recovered_onchain_data.chain_send { + if let Some(to_state) = chain_send_data.derive_partial_state() { + let server_lockup_tx_id = chain_send_data + .btc_server_lockup_tx_id + .map(|h| h.txid.to_string()); + let user_lockup_tx_id = chain_send_data + .lbtc_user_lockup_tx_id + .clone() + .map(|h| h.txid.to_string()); + let claim_tx_id = chain_send_data.btc_claim_tx_id.map(|h| h.txid.to_string()); + let refund_tx_id = chain_send_data + .lbtc_refund_tx_id + .clone() + .map(|h| h.txid.to_string()); + let history_updates = vec![ + chain_send_data.lbtc_user_lockup_tx_id, + chain_send_data.lbtc_refund_tx_id, + ] + .into_iter() + .flatten() + .collect::>(); + for history in history_updates { + if let Some(tx) = tx_map.remove(&history.txid) { + self.persister + .insert_or_update_payment_with_wallet_tx(&tx)?; + } } - } else { - // Payments that are not directly associated with a swap - match payments_before_sync.get(&tx_id) { - None => { - // A completely new payment brought in by this sync, in mempool or confirmed - // Covers events: - // - onchain Receive Pending and Complete - // - onchain Send Complete + _ = self + .chain_swap_handler + .update_swap_info(&ChainSwapUpdate { + swap_id, + to_state, + server_lockup_tx_id, + user_lockup_tx_id, + claim_address: None, + claim_tx_id, + refund_tx_id, + }) + .await; + } + } + + let payments = self + .persister + .get_payments_by_tx_id(&ListPaymentsRequest::default())?; + + for tx in tx_map.values() { + let tx_id = tx.txid.to_string(); + let maybe_payment = payments.get(&tx_id); + match maybe_payment { + // When no payment is found or its a Liquid payment + None + | Some(Payment { + details: PaymentDetails::Liquid { .. }, + .. + }) => { + let updated_needed = maybe_payment.map_or(true, |payment| { + payment.status == Pending && tx.height.is_some() + }); + if updated_needed { + // An unknown tx which needs inserting or a known Liquid payment tx + // that was in the mempool, but is now confirmed + self.persister.insert_or_update_payment_with_wallet_tx(tx)?; self.emit_payment_updated(Some(tx_id)).await?; } - Some(payment_before_sync) => { - if payment_before_sync.status == Pending && is_tx_confirmed { - // A know payment that was in the mempool, but is now confirmed - // Covers events: Send and Receive direct onchain payments transitioning to Complete - self.emit_payment_updated(Some(tx_id)).await?; - } - } } + + _ => {} } } @@ -2394,12 +2482,12 @@ impl LiquidSdk { match is_first_sync { true => { self.event_manager.pause_notifications(); - self.sync_payments_with_chain_data(true).await?; + self.sync_payments_with_chain_data(false).await?; self.event_manager.resume_notifications(); self.persister.set_is_first_sync_complete(true)?; } false => { - self.sync_payments_with_chain_data(true).await?; + self.sync_payments_with_chain_data(false).await?; } } let duration_ms = Instant::now().duration_since(t0).as_millis(); diff --git a/lib/core/src/send_swap.rs b/lib/core/src/send_swap.rs index f6d370bab..1d1307829 100644 --- a/lib/core/src/send_swap.rs +++ b/lib/core/src/send_swap.rs @@ -2,6 +2,7 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::{str::FromStr, sync::Arc}; use anyhow::{anyhow, Result}; +use async_trait::async_trait; use boltz_client::swaps::boltz; use boltz_client::swaps::{boltz::CreateSubmarineResponse, boltz::SubSwapStates}; use boltz_client::util::secrets::Preimage; @@ -14,7 +15,7 @@ use lwk_wollet::hashes::{sha256, Hash}; use tokio::sync::{broadcast, Mutex}; use crate::chain::liquid::LiquidChainService; -use crate::model::{Config, PaymentState::*, SendSwap}; +use crate::model::{BlockListener, Config, PaymentState::*, SendSwap}; use crate::prelude::{PaymentTxData, PaymentType, Swap}; use crate::swapper::Swapper; use crate::wallet::OnchainWallet; @@ -35,6 +36,17 @@ pub(crate) struct SendSwapHandler { subscription_notifier: broadcast::Sender, } +#[async_trait] +impl BlockListener for SendSwapHandler { + async fn on_bitcoin_block(&self, _height: u32) {} + + async fn on_liquid_block(&self, _height: u32) { + if let Err(err) = self.check_refunds().await { + warn!("Could not refund expired swaps, error: {err:?}"); + } + } +} + impl SendSwapHandler { pub(crate) fn new( config: Config, @@ -64,11 +76,7 @@ impl SendSwapHandler { let status = &update.status; let swap_state = SubSwapStates::from_str(status) .map_err(|_| anyhow!("Invalid SubSwapState for Send Swap {id}: {status}"))?; - let swap = self - .persister - .fetch_send_swap_by_id(id)? - .ok_or(anyhow!("No ongoing Send Swap found for ID {id}"))?; - + let swap = self.fetch_send_swap_by_id(id)?; info!("Handling Send Swap transition to {swap_state:?} for swap {id}"); // See https://docs.boltz.exchange/v/api/lifecycle#normal-submarine-swaps @@ -237,6 +245,15 @@ impl SendSwapHandler { Ok(lockup_tx) } + fn fetch_send_swap_by_id(&self, swap_id: &str) -> Result { + self.persister + .fetch_send_swap_by_id(swap_id) + .map_err(|_| PaymentError::PersistError)? + .ok_or(PaymentError::Generic { + err: format!("Send Swap not found {swap_id}"), + }) + } + /// Transitions a Send swap to a new state pub(crate) async fn update_swap_info( &self, @@ -246,17 +263,11 @@ impl SendSwapHandler { lockup_tx_id: Option<&str>, refund_tx_id: Option<&str>, ) -> Result<(), PaymentError> { - info!("Transitioning Send swap {swap_id} to {to_state:?} (lockup_tx_id = {lockup_tx_id:?}, refund_tx_id = {refund_tx_id:?})"); - - let swap: SendSwap = self - .persister - .fetch_send_swap_by_id(swap_id) - .map_err(|_| PaymentError::PersistError)? - .ok_or(PaymentError::Generic { - err: format!("Send Swap not found {swap_id}"), - })?; - let payment_id = lockup_tx_id.map(|c| c.to_string()).or(swap.lockup_tx_id); - + info!( + "Transitioning Send swap {} to {:?} (lockup_tx_id = {:?}, refund_tx_id = {:?})", + swap_id, to_state, lockup_tx_id, refund_tx_id + ); + let swap = self.fetch_send_swap_by_id(swap_id)?; Self::validate_state_transition(swap.state, to_state)?; self.persister.try_handle_send_swap_update( swap_id, @@ -265,8 +276,14 @@ impl SendSwapHandler { lockup_tx_id, refund_tx_id, )?; - if let Some(payment_id) = payment_id { - let _ = self.subscription_notifier.send(payment_id); + let updated_swap = self.fetch_send_swap_by_id(swap_id)?; + + // Only notify subscribers if the swap changes + let payment_id = lockup_tx_id + .map(|c| c.to_string()) + .or(swap.lockup_tx_id.clone()); + if updated_swap != swap { + payment_id.and_then(|payment_id| self.subscription_notifier.send(payment_id).ok()); } Ok(()) } @@ -485,7 +502,7 @@ impl SendSwapHandler { // Attempts refunding all payments whose state is `RefundPending` and with no // refund_tx_id field present - pub(crate) async fn track_refunds(&self) -> Result<(), PaymentError> { + pub(crate) async fn check_refunds(&self) -> Result<(), PaymentError> { let pending_swaps = self.persister.list_pending_send_swaps()?; self.try_refund_all(&pending_swaps).await; Ok(()) diff --git a/lib/core/src/test_utils/persist.rs b/lib/core/src/test_utils/persist.rs index fcad33ef8..34e0a438a 100644 --- a/lib/core/src/test_utils/persist.rs +++ b/lib/core/src/test_utils/persist.rs @@ -101,9 +101,7 @@ pub(crate) fn new_receive_swap(payment_state: Option) -> ReceiveSw receiver_amount_sat: 587, claim_fees_sat: 200, claim_tx_id: None, - lockup_tx_id: None, mrh_address: "tlq1pq2amlulhea6ltq7x3eu9atsc2nnrer7yt7xve363zxedqwu2mk6ctcyv9awl8xf28cythreqklt5q0qqwsxzlm6wu4z6d574adl9zh2zmr0h85gt534n".to_string(), - mrh_script_pubkey: "tex1qnkznyyxwnxnkk0j94cnvq27h24jk6sqf0te55x".to_string(), mrh_tx_id: None, created_at: utils::now(), state: payment_state.unwrap_or(PaymentState::Created), diff --git a/lib/core/src/test_utils/wallet.rs b/lib/core/src/test_utils/wallet.rs index 3f1d32919..f9cc4d3ee 100644 --- a/lib/core/src/test_utils/wallet.rs +++ b/lib/core/src/test_utils/wallet.rs @@ -1,6 +1,6 @@ #![cfg(test)] -use std::str::FromStr; +use std::{collections::HashMap, str::FromStr}; use crate::{ error::PaymentError, @@ -13,7 +13,7 @@ use async_trait::async_trait; use boltz_client::{Keypair, Secp256k1}; use lazy_static::lazy_static; use lwk_wollet::{ - elements::{Address, Transaction}, + elements::{Address, Transaction, Txid}, elements_miniscript::ToPublicKey as _, secp256k1::Message, Tip, WalletTx, @@ -38,6 +38,10 @@ impl OnchainWallet for MockWallet { Ok(vec![]) } + async fn transactions_by_tx_id(&self) -> Result, PaymentError> { + Ok(Default::default()) + } + async fn build_tx( &self, _fee_rate: Option, diff --git a/lib/core/src/wallet.rs b/lib/core/src/wallet.rs index e66c9ad31..ab17510a1 100644 --- a/lib/core/src/wallet.rs +++ b/lib/core/src/wallet.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::fs::{self, create_dir_all}; use std::io::Write; use std::path::PathBuf; @@ -9,6 +10,7 @@ use boltz_client::ElementsAddress; use log::{debug, warn}; use lwk_common::Signer as LwkSigner; use lwk_common::{singlesig_desc, Singlesig}; +use lwk_wollet::elements::Txid; use lwk_wollet::{ elements::{hex::ToHex, Address, Transaction}, ElectrumClient, ElectrumUrl, ElementsNetwork, FsPersister, Tip, WalletTx, Wollet, @@ -36,6 +38,9 @@ pub trait OnchainWallet: Send + Sync { /// List all transactions in the wallet async fn transactions(&self) -> Result, PaymentError>; + /// List all transactions in the wallet mapped by tx id + async fn transactions_by_tx_id(&self) -> Result, PaymentError>; + /// Build a transaction to send funds to a recipient async fn build_tx( &self, @@ -179,6 +184,17 @@ impl OnchainWallet for LiquidOnchainWallet { }) } + /// List all transactions in the wallet mapped by tx id + async fn transactions_by_tx_id(&self) -> Result, PaymentError> { + let tx_map: HashMap = self + .transactions() + .await? + .iter() + .map(|tx| (tx.txid, tx.clone())) + .collect(); + Ok(tx_map) + } + /// Build a transaction to send funds to a recipient async fn build_tx( &self, diff --git a/packages/flutter/lib/flutter_breez_liquid_bindings_generated.dart b/packages/flutter/lib/flutter_breez_liquid_bindings_generated.dart index 34e339463..116cda308 100644 --- a/packages/flutter/lib/flutter_breez_liquid_bindings_generated.dart +++ b/packages/flutter/lib/flutter_breez_liquid_bindings_generated.dart @@ -3826,6 +3826,26 @@ class FlutterBreezLiquidBindings { late final _uniffi_breez_sdk_liquid_bindings_checksum_method_signer_hmac_sha256 = _uniffi_breez_sdk_liquid_bindings_checksum_method_signer_hmac_sha256Ptr.asFunction(); + int uniffi_breez_sdk_liquid_bindings_checksum_method_signer_ecies_encrypt() { + return _uniffi_breez_sdk_liquid_bindings_checksum_method_signer_ecies_encrypt(); + } + + late final _uniffi_breez_sdk_liquid_bindings_checksum_method_signer_ecies_encryptPtr = + _lookup>( + 'uniffi_breez_sdk_liquid_bindings_checksum_method_signer_ecies_encrypt'); + late final _uniffi_breez_sdk_liquid_bindings_checksum_method_signer_ecies_encrypt = + _uniffi_breez_sdk_liquid_bindings_checksum_method_signer_ecies_encryptPtr.asFunction(); + + int uniffi_breez_sdk_liquid_bindings_checksum_method_signer_ecies_decrypt() { + return _uniffi_breez_sdk_liquid_bindings_checksum_method_signer_ecies_decrypt(); + } + + late final _uniffi_breez_sdk_liquid_bindings_checksum_method_signer_ecies_decryptPtr = + _lookup>( + 'uniffi_breez_sdk_liquid_bindings_checksum_method_signer_ecies_decrypt'); + late final _uniffi_breez_sdk_liquid_bindings_checksum_method_signer_ecies_decrypt = + _uniffi_breez_sdk_liquid_bindings_checksum_method_signer_ecies_decryptPtr.asFunction(); + int ffi_breez_sdk_liquid_bindings_uniffi_contract_version() { return _ffi_breez_sdk_liquid_bindings_uniffi_contract_version(); }