From 8e04fe1e62e2be3e46e25a18beeb18d5877d6244 Mon Sep 17 00:00:00 2001 From: Ross Savage Date: Thu, 5 Dec 2024 13:14:06 +0100 Subject: [PATCH] Handle Recoverable state --- lib/core/src/chain_swap.rs | 10 +- lib/core/src/persist/receive.rs | 11 ++ lib/core/src/persist/send.rs | 4 +- lib/core/src/receive_swap.rs | 8 +- lib/core/src/restore.rs | 4 +- lib/core/src/sdk.rs | 314 ++++++++++++++++++++------------ lib/core/src/send_swap.rs | 8 +- 7 files changed, 227 insertions(+), 132 deletions(-) diff --git a/lib/core/src/chain_swap.rs b/lib/core/src/chain_swap.rs index 7c82feeca..e2aed6c1b 100644 --- a/lib/core/src/chain_swap.rs +++ b/lib/core/src/chain_swap.rs @@ -129,7 +129,7 @@ impl ChainSwapHandler { .persister .list_chain_swaps()? .into_iter() - .filter(|s| s.direction == Direction::Incoming) + .filter(|s| s.direction == Direction::Incoming && s.state != PaymentState::Recoverable) .collect(); info!( "Rescanning {} incoming Chain Swap(s) user lockup txs at height {}", @@ -745,7 +745,7 @@ impl ChainSwapHandler { pub(crate) async fn update_swap_info( &self, swap_update: &ChainSwapUpdate, - ) -> Result<(), PaymentError> { + ) -> Result<(PaymentState, PaymentState), PaymentError> { info!("Updating Chain swap {swap_update:?}"); let swap = self.fetch_chain_swap_by_id(&swap_update.swap_id)?; Self::validate_state_transition(swap.state, swap_update.to_state)?; @@ -768,7 +768,7 @@ impl ChainSwapHandler { if updated_swap != swap { payment_id.and_then(|payment_id| self.subscription_notifier.send(payment_id).ok()); } - Ok(()) + Ok((swap.state, updated_swap.state)) } async fn claim(&self, swap_id: &str) -> Result<(), PaymentError> { @@ -1101,7 +1101,9 @@ impl ChainSwapHandler { to_state: PaymentState, ) -> Result<(), PaymentError> { match (from_state, to_state) { - (Recoverable, Pending | Refundable | RefundPending | Failed | Complete) => Ok(()), + (Recoverable, Created | Pending | Refundable | RefundPending | Failed | Complete) => { + Ok(()) + } (_, Recoverable) => Err(PaymentError::Generic { err: format!("Cannot transition from {from_state:?} to Recoverable state"), }), diff --git a/lib/core/src/persist/receive.rs b/lib/core/src/persist/receive.rs index 49e93b9dd..9f0f17439 100644 --- a/lib/core/src/persist/receive.rs +++ b/lib/core/src/persist/receive.rs @@ -174,6 +174,17 @@ impl Persister { self.list_receive_swaps_where(&con, where_clause) } + pub(crate) fn list_recoverable_receive_swaps(&self) -> Result> { + let con = self.get_connection()?; + let where_clause = vec![get_where_clause_state_in(&[ + PaymentState::Created, + PaymentState::Pending, + PaymentState::Recoverable, + ])]; + + self.list_receive_swaps_where(&con, where_clause) + } + // Only set the Receive Swap claim_tx_id if not set, otherwise return an error pub(crate) fn set_receive_swap_claim_tx_id( &self, diff --git a/lib/core/src/persist/send.rs b/lib/core/src/persist/send.rs index ca318b5fa..46924feb4 100644 --- a/lib/core/src/persist/send.rs +++ b/lib/core/src/persist/send.rs @@ -186,12 +186,12 @@ impl Persister { self.list_send_swaps_where(&con, where_clause) } - pub(crate) fn list_pending_and_ongoing_send_swaps(&self) -> Result> { + pub(crate) fn list_recoverable_send_swaps(&self) -> Result> { let con = self.get_connection()?; let where_clause = vec![get_where_clause_state_in(&[ - PaymentState::Created, PaymentState::Pending, PaymentState::RefundPending, + PaymentState::Recoverable, ])]; self.list_send_swaps_where(&con, where_clause) } diff --git a/lib/core/src/receive_swap.rs b/lib/core/src/receive_swap.rs index 7aa49b77d..dd905c8c4 100644 --- a/lib/core/src/receive_swap.rs +++ b/lib/core/src/receive_swap.rs @@ -250,7 +250,7 @@ impl ReceiveSwapHandler { lockup_tx_id: Option<&str>, mrh_tx_id: Option<&str>, mrh_amount_sat: Option, - ) -> Result<(), PaymentError> { + ) -> Result<(PaymentState, PaymentState), PaymentError> { info!( "Transitioning Receive swap {} to {:?} (claim_tx_id = {:?}, lockup_tx_id = {:?}, mrh_tx_id = {:?})", swap_id, to_state, claim_tx_id, lockup_tx_id, mrh_tx_id @@ -280,7 +280,7 @@ impl ReceiveSwapHandler { if updated_swap != swap { payment_id.and_then(|payment_id| self.subscription_notifier.send(payment_id).ok()); } - Ok(()) + Ok((swap.state, updated_swap.state)) } async fn claim(&self, swap_id: &str) -> Result<(), PaymentError> { @@ -365,7 +365,9 @@ impl ReceiveSwapHandler { to_state: PaymentState, ) -> Result<(), PaymentError> { match (from_state, to_state) { - (Recoverable, Pending | Refundable | RefundPending | Failed | Complete) => Ok(()), + (Recoverable, Created | Pending | Refundable | RefundPending | Failed | Complete) => { + Ok(()) + } (_, Recoverable) => Err(PaymentError::Generic { err: format!("Cannot transition from {from_state:?} to Recoverable state"), }), diff --git a/lib/core/src/restore.rs b/lib/core/src/restore.rs index 21b9176ef..8fb5392f5 100644 --- a/lib/core/src/restore.rs +++ b/lib/core/src/restore.rs @@ -172,14 +172,14 @@ pub(crate) struct RecoveredOnchainData { impl LiquidSdk { pub(crate) async fn get_monitored_swaps_list(&self, partial_sync: bool) -> Result { - let receive_swaps = self.persister.list_ongoing_receive_swaps()?; + let receive_swaps = self.persister.list_recoverable_receive_swaps()?; match partial_sync { false => { let bitcoin_height = self.bitcoin_chain_service.lock().await.tip()?.height as u32; let liquid_height = self.liquid_chain_service.lock().await.tip().await?; let final_swap_states = [PaymentState::Complete, PaymentState::Failed]; - let send_swaps = self.persister.list_pending_and_ongoing_send_swaps()?; + let send_swaps = self.persister.list_recoverable_send_swaps()?; let chain_swaps: Vec = self .persister .list_chain_swaps()? diff --git a/lib/core/src/sdk.rs b/lib/core/src/sdk.rs index bfa2f85f9..5894a14b4 100644 --- a/lib/core/src/sdk.rs +++ b/lib/core/src/sdk.rs @@ -1631,12 +1631,16 @@ impl LiquidSdk { None => { debug!("Timeout occurred without payment, set swap to timed out"); match swap { - Swap::Send(_) => self.send_swap_handler.update_swap_info(&expected_swap_id, TimedOut, None, None, None).await?, - Swap::Chain(_) => self.chain_swap_handler.update_swap_info(&ChainSwapUpdate { - swap_id: expected_swap_id, - to_state: TimedOut, - ..Default::default() - }).await?, + Swap::Send(_) => { + self.send_swap_handler.update_swap_info(&expected_swap_id, TimedOut, None, None, None).await?; + }, + Swap::Chain(_) => { + self.chain_swap_handler.update_swap_info(&ChainSwapUpdate { + swap_id: expected_swap_id, + to_state: TimedOut, + ..Default::default() + }).await?; + }, _ => () } return Err(PaymentError::PaymentTimeout) @@ -2258,141 +2262,215 @@ impl LiquidSdk { .sum::(); debug!("Onchain wallet balance: {wallet_amount_sat} sats"); - // Loop over the recovered chain data for monitored receive swaps + // Loop over the recovered chain data for Created, Pending or Recoverable receive swaps for (swap_id, receive_data) in recovered_onchain_data.receive { - if let Some(to_state) = receive_data.derive_partial_state() { - let lockup_tx_id = receive_data.lockup_tx_id.map(|h| h.txid.to_string()); - let claim_tx_id = receive_data.claim_tx_id.clone().map(|h| h.txid.to_string()); - let mrh_tx_id = receive_data.mrh_tx_id.clone().map(|h| h.txid.to_string()); - let history_updates = vec![receive_data.claim_tx_id, receive_data.mrh_tx_id] - .into_iter() - .flatten() - .collect::>(); - for history in history_updates { - if let Some(tx) = tx_map.remove(&history.txid) { - self.persister - .insert_or_update_payment_with_wallet_tx(&tx)?; + let to_state: PaymentState = receive_data + .derive_partial_state() + .unwrap_or(PaymentState::Created); + let lockup_tx_id = receive_data.lockup_tx_id.map(|h| h.txid.to_string()); + let claim_tx_id = receive_data.claim_tx_id.clone().map(|h| h.txid.to_string()); + let mrh_tx_id = receive_data.mrh_tx_id.clone().map(|h| h.txid.to_string()); + let history_updates = vec![receive_data.claim_tx_id, receive_data.mrh_tx_id] + .into_iter() + .flatten() + .collect::>(); + for history in history_updates { + if let Some(tx) = tx_map.remove(&history.txid) { + self.persister + .insert_or_update_payment_with_wallet_tx(&tx)?; + } + } + let update_swap_info_res = self + .receive_swap_handler + .update_swap_info( + &swap_id, + to_state, + claim_tx_id.as_deref(), + lockup_tx_id.as_deref(), + mrh_tx_id.as_deref(), + receive_data.mrh_amount_sat, + ) + .await; + match update_swap_info_res { + Ok((PaymentState::Recoverable, PaymentState::Created)) + | Ok((PaymentState::Recoverable, PaymentState::Pending)) => { + info!("Tracking recoverable swap: {swap_id}"); + if let Err(e) = self.status_stream.track_swap_id(&swap_id) { + error!("Unable to track swap id {swap_id}: {e}"); } } - _ = self - .receive_swap_handler - .update_swap_info( - &swap_id, - to_state, - claim_tx_id.as_deref(), - lockup_tx_id.as_deref(), - mrh_tx_id.as_deref(), - receive_data.mrh_amount_sat, - ) - .await; + _ => {} } } - // Loop over the recovered chain data for monitored send swaps + // Loop over the recovered chain data for Created, Pending or Recoverable send swaps for (swap_id, send_data) in recovered_onchain_data.send { - if let Some(to_state) = send_data.derive_partial_state() { - let lockup_tx_id = send_data.lockup_tx_id.clone().map(|h| h.txid.to_string()); - let refund_tx_id = send_data.refund_tx_id.clone().map(|h| h.txid.to_string()); - let history_updates = vec![send_data.lockup_tx_id, send_data.refund_tx_id] - .into_iter() - .flatten() - .collect::>(); - for history in history_updates { - if let Some(tx) = tx_map.remove(&history.txid) { - self.persister - .insert_or_update_payment_with_wallet_tx(&tx)?; + let to_state: PaymentState = send_data + .derive_partial_state() + .unwrap_or(PaymentState::Created); + let lockup_tx_id = send_data.lockup_tx_id.clone().map(|h| h.txid.to_string()); + let refund_tx_id = send_data.refund_tx_id.clone().map(|h| h.txid.to_string()); + let history_updates = vec![send_data.lockup_tx_id, send_data.refund_tx_id] + .into_iter() + .flatten() + .collect::>(); + for history in history_updates { + if let Some(tx) = tx_map.remove(&history.txid) { + self.persister + .insert_or_update_payment_with_wallet_tx(&tx)?; + } + } + let update_swap_info_res = self + .send_swap_handler + .update_swap_info( + &swap_id, + to_state, + None, + lockup_tx_id.as_deref(), + refund_tx_id.as_deref(), + ) + .await; + match update_swap_info_res { + Ok((PaymentState::Recoverable, PaymentState::Created)) + | Ok((PaymentState::Recoverable, PaymentState::Pending)) => { + info!("Tracking recoverable swap: {swap_id}"); + if let Err(e) = self.status_stream.track_swap_id(&swap_id) { + error!("Unable to track swap id {swap_id}: {e}"); } } - _ = self - .send_swap_handler - .update_swap_info( - &swap_id, - to_state, - None, - lockup_tx_id.as_deref(), - refund_tx_id.as_deref(), - ) - .await; + _ => {} } } // Loop over the recovered chain data for monitored chain receive swaps for (swap_id, chain_receive_data) in recovered_onchain_data.chain_receive { - if let Some(to_state) = chain_receive_data.derive_partial_state() { - let server_lockup_tx_id = chain_receive_data - .lbtc_server_lockup_tx_id - .map(|h| h.txid.to_string()); - let user_lockup_tx_id = chain_receive_data - .btc_user_lockup_tx_id - .map(|h| h.txid.to_string()); - let claim_tx_id = chain_receive_data - .lbtc_claim_tx_id - .clone() - .map(|h| h.txid.to_string()); - let refund_tx_id = chain_receive_data - .btc_refund_tx_id - .map(|h| h.txid.to_string()); - if let Some(history) = chain_receive_data.lbtc_claim_tx_id { - if let Some(tx) = tx_map.remove(&history.txid) { - self.persister - .insert_or_update_payment_with_wallet_tx(&tx)?; + let to_state: PaymentState = chain_receive_data + .derive_partial_state() + .or_else(|| { + // When the state cannot be derived from onchain data then use the swap state. + // If the swap state is Recoverable, set it to Created and let the status loop + // determine the state. + self.persister + .fetch_chain_swap_by_id(&swap_id) + .map(|maybe_swap| { + maybe_swap.map(|swap| match swap.state { + PaymentState::Recoverable => PaymentState::Created, + state => state, + }) + }) + .ok() + .flatten() + }) + .unwrap_or(PaymentState::Created); + let server_lockup_tx_id = chain_receive_data + .lbtc_server_lockup_tx_id + .map(|h| h.txid.to_string()); + let user_lockup_tx_id = chain_receive_data + .btc_user_lockup_tx_id + .map(|h| h.txid.to_string()); + let claim_tx_id = chain_receive_data + .lbtc_claim_tx_id + .clone() + .map(|h| h.txid.to_string()); + let refund_tx_id = chain_receive_data + .btc_refund_tx_id + .map(|h| h.txid.to_string()); + if let Some(history) = chain_receive_data.lbtc_claim_tx_id { + if let Some(tx) = tx_map.remove(&history.txid) { + self.persister + .insert_or_update_payment_with_wallet_tx(&tx)?; + } + } + let update_swap_info_res = self + .chain_swap_handler + .update_swap_info(&ChainSwapUpdate { + swap_id: swap_id.clone(), + to_state, + server_lockup_tx_id, + user_lockup_tx_id, + claim_address: chain_receive_data.lbtc_claim_address, + claim_tx_id, + refund_tx_id, + }) + .await; + match update_swap_info_res { + Ok((PaymentState::Recoverable, PaymentState::Created)) + | Ok((PaymentState::Recoverable, PaymentState::Pending)) => { + info!("Tracking recoverable swap: {swap_id}"); + if let Err(e) = self.status_stream.track_swap_id(&swap_id) { + error!("Unable to track swap id {swap_id}: {e}"); } } - _ = self - .chain_swap_handler - .update_swap_info(&ChainSwapUpdate { - swap_id, - to_state, - server_lockup_tx_id, - user_lockup_tx_id, - claim_address: chain_receive_data.lbtc_claim_address, - claim_tx_id, - refund_tx_id, - }) - .await; + _ => {} } } // Loop over the recovered chain data for monitored chain send swaps for (swap_id, chain_send_data) in recovered_onchain_data.chain_send { - if let Some(to_state) = chain_send_data.derive_partial_state() { - let server_lockup_tx_id = chain_send_data - .btc_server_lockup_tx_id - .map(|h| h.txid.to_string()); - let user_lockup_tx_id = chain_send_data - .lbtc_user_lockup_tx_id - .clone() - .map(|h| h.txid.to_string()); - let claim_tx_id = chain_send_data.btc_claim_tx_id.map(|h| h.txid.to_string()); - let refund_tx_id = chain_send_data - .lbtc_refund_tx_id - .clone() - .map(|h| h.txid.to_string()); - let history_updates = vec![ - chain_send_data.lbtc_user_lockup_tx_id, - chain_send_data.lbtc_refund_tx_id, - ] - .into_iter() - .flatten() - .collect::>(); - for history in history_updates { - if let Some(tx) = tx_map.remove(&history.txid) { - self.persister - .insert_or_update_payment_with_wallet_tx(&tx)?; + let to_state: PaymentState = chain_send_data + .derive_partial_state() + .or_else(|| { + // When the state cannot be derived from onchain data then use the swap state. + // If the swap state is Recoverable, set it to Created and let the status loop + // determine the state. + self.persister + .fetch_chain_swap_by_id(&swap_id) + .map(|maybe_swap| { + maybe_swap.map(|swap| match swap.state { + PaymentState::Recoverable => PaymentState::Created, + state => state, + }) + }) + .ok() + .flatten() + }) + .unwrap_or(PaymentState::Created); + let server_lockup_tx_id = chain_send_data + .btc_server_lockup_tx_id + .map(|h| h.txid.to_string()); + let user_lockup_tx_id = chain_send_data + .lbtc_user_lockup_tx_id + .clone() + .map(|h| h.txid.to_string()); + let claim_tx_id = chain_send_data.btc_claim_tx_id.map(|h| h.txid.to_string()); + let refund_tx_id = chain_send_data + .lbtc_refund_tx_id + .clone() + .map(|h| h.txid.to_string()); + let history_updates = vec![ + chain_send_data.lbtc_user_lockup_tx_id, + chain_send_data.lbtc_refund_tx_id, + ] + .into_iter() + .flatten() + .collect::>(); + for history in history_updates { + if let Some(tx) = tx_map.remove(&history.txid) { + self.persister + .insert_or_update_payment_with_wallet_tx(&tx)?; + } + } + let update_swap_info_res = self + .chain_swap_handler + .update_swap_info(&ChainSwapUpdate { + swap_id: swap_id.clone(), + to_state, + server_lockup_tx_id, + user_lockup_tx_id, + claim_address: None, + claim_tx_id, + refund_tx_id, + }) + .await; + match update_swap_info_res { + Ok((PaymentState::Recoverable, PaymentState::Created)) + | Ok((PaymentState::Recoverable, PaymentState::Pending)) => { + info!("Tracking recoverable swap: {swap_id}"); + if let Err(e) = self.status_stream.track_swap_id(&swap_id) { + error!("Unable to track swap id {swap_id}: {e}"); } } - _ = self - .chain_swap_handler - .update_swap_info(&ChainSwapUpdate { - swap_id, - to_state, - server_lockup_tx_id, - user_lockup_tx_id, - claim_address: None, - claim_tx_id, - refund_tx_id, - }) - .await; + _ => {} } } diff --git a/lib/core/src/send_swap.rs b/lib/core/src/send_swap.rs index 1d1307829..1126f8260 100644 --- a/lib/core/src/send_swap.rs +++ b/lib/core/src/send_swap.rs @@ -262,7 +262,7 @@ impl SendSwapHandler { preimage: Option<&str>, lockup_tx_id: Option<&str>, refund_tx_id: Option<&str>, - ) -> Result<(), PaymentError> { + ) -> Result<(PaymentState, PaymentState), PaymentError> { info!( "Transitioning Send swap {} to {:?} (lockup_tx_id = {:?}, refund_tx_id = {:?})", swap_id, to_state, lockup_tx_id, refund_tx_id @@ -285,7 +285,7 @@ impl SendSwapHandler { if updated_swap != swap { payment_id.and_then(|payment_id| self.subscription_notifier.send(payment_id).ok()); } - Ok(()) + Ok((swap.state, updated_swap.state)) } async fn cooperate_claim(&self, send_swap: &SendSwap) -> Result<(), PaymentError> { @@ -513,7 +513,9 @@ impl SendSwapHandler { to_state: PaymentState, ) -> Result<(), PaymentError> { match (from_state, to_state) { - (Recoverable, Pending | Refundable | RefundPending | Failed | Complete) => Ok(()), + (Recoverable, Created | Pending | Refundable | RefundPending | Failed | Complete) => { + Ok(()) + } (_, Recoverable) => Err(PaymentError::Generic { err: format!("Cannot transition from {from_state:?} to Recoverable state"), }),