From 660daaacdf862743b318ee85673900f91591d036 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 30 May 2023 18:11:47 +0000 Subject: [PATCH 1/8] Fix `held_by_thread` in `no-std` to return instead of panicing Our `no-std` locks simply panic if a lock cannot be taken as there should be no lock contention in a single-threaded environment. However, the `held_by_thread` debug methods were delegating to the lock methods which resulted in a panic when asserting that a lock *is* held by the current thread. Instead, they are updated here to call the relevant `RefCell` testing methods. --- lightning/src/sync/nostd_sync.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lightning/src/sync/nostd_sync.rs b/lightning/src/sync/nostd_sync.rs index 08d54a939be..27cfb9b8f78 100644 --- a/lightning/src/sync/nostd_sync.rs +++ b/lightning/src/sync/nostd_sync.rs @@ -49,7 +49,7 @@ impl Mutex { impl<'a, T: 'a> LockTestExt<'a> for Mutex { #[inline] fn held_by_thread(&self) -> LockHeldState { - if self.lock().is_err() { return LockHeldState::HeldByThread; } + if self.inner.try_borrow_mut().is_err() { return LockHeldState::HeldByThread; } else { return LockHeldState::NotHeldByThread; } } type ExclLock = MutexGuard<'a, T>; @@ -115,7 +115,7 @@ impl RwLock { impl<'a, T: 'a> LockTestExt<'a> for RwLock { #[inline] fn held_by_thread(&self) -> LockHeldState { - if self.write().is_err() { return LockHeldState::HeldByThread; } + if self.inner.try_borrow_mut().is_err() { return LockHeldState::HeldByThread; } else { return LockHeldState::NotHeldByThread; } } type ExclLock = RwLockWriteGuard<'a, T>; From 3ce1a5e0876bf8b6d898fc75f0f595a102634a92 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 24 May 2023 03:45:30 +0000 Subject: [PATCH 2/8] Move the `ShutdownResult` type alias to `channel.rs` This allows us to make the `force_shutdown` definition less verbose --- lightning/src/ln/channel.rs | 8 +++++++- lightning/src/ln/channelmanager.rs | 4 +--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 11f0261d677..2f8f418f27f 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -432,6 +432,12 @@ pub(super) struct ReestablishResponses { pub shutdown_msg: Option, } +/// The return type of `force_shutdown` +pub(crate) type ShutdownResult = ( + Option<(OutPoint, ChannelMonitorUpdate)>, + Vec<(HTLCSource, PaymentHash, PublicKey, [u8; 32])> +); + /// If the majority of the channels funds are to the fundee and the initiator holds only just /// enough funds to cover their reserve value, channels are at risk of getting "stuck". Because the /// initiator controls the feerate, if they then go to increase the channel fee, they may have no @@ -6228,7 +6234,7 @@ impl Channel { /// those explicitly stated to be allowed after shutdown completes, eg some simple getters). /// Also returns the list of payment_hashes for channels which we can safely fail backwards /// immediately (others we will have to allow to time out). - pub fn force_shutdown(&mut self, should_broadcast: bool) -> (Option<(OutPoint, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash, PublicKey, [u8; 32])>) { + pub fn force_shutdown(&mut self, should_broadcast: bool) -> ShutdownResult { // Note that we MUST only generate a monitor update that indicates force-closure - we're // called during initialization prior to the chain_monitor in the encompassing ChannelManager // being fully configured in some cases. Thus, its likely any monitor events we generate will diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 45d4209930e..c06a5872df5 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -40,7 +40,7 @@ use crate::events::{Event, EventHandler, EventsProvider, MessageSendEvent, Messa // Since this struct is returned in `list_channels` methods, expose it here in case users want to // construct one themselves. use crate::ln::{inbound_payment, PaymentHash, PaymentPreimage, PaymentSecret}; -use crate::ln::channel::{Channel, ChannelError, ChannelUpdateStatus, UpdateFulfillCommitFetch}; +use crate::ln::channel::{Channel, ChannelError, ChannelUpdateStatus, ShutdownResult, UpdateFulfillCommitFetch}; use crate::ln::features::{ChannelFeatures, ChannelTypeFeatures, InitFeatures, NodeFeatures}; #[cfg(any(feature = "_test_utils", test))] use crate::ln::features::InvoiceFeatures; @@ -359,8 +359,6 @@ pub enum FailureCode { IncorrectOrUnknownPaymentDetails = 0x4000 | 15, } -type ShutdownResult = (Option<(OutPoint, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash, PublicKey, [u8; 32])>); - /// Error type returned across the peer_state mutex boundary. When an Err is generated for a /// Channel, we generally end up with a ChannelError::Close for which we have to close the channel /// immediately (ie with no further calls on it made). Thus, this step happens inside a From 34d5f2afc46d7caf47dd85c68d570cd6091a431d Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 10 May 2023 00:45:08 +0000 Subject: [PATCH 3/8] Return the counterparty node_id as a part of a force-shutdown res In the coming commits we'll need the counterparty node_id when handling a background monitor update as we may need to resume normal channel operation as a result. Thus, we go ahead and pipe it through from the shutdown end, as it makes the codepaths consistent. Sadly, the monitor-originated shutdown case doesn't allow for a required counterparty node_id as some versions of LDK didn't have it present in the ChannelMonitor. --- lightning/src/ln/channel.rs | 4 +-- lightning/src/ln/channelmanager.rs | 43 +++++++++++++++++++++++------- 2 files changed, 36 insertions(+), 11 deletions(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 2f8f418f27f..18ad49f3271 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -434,7 +434,7 @@ pub(super) struct ReestablishResponses { /// The return type of `force_shutdown` pub(crate) type ShutdownResult = ( - Option<(OutPoint, ChannelMonitorUpdate)>, + Option<(PublicKey, OutPoint, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash, PublicKey, [u8; 32])> ); @@ -6263,7 +6263,7 @@ impl Channel { // See test_duplicate_chan_id and test_pre_lockin_no_chan_closed_update for more. if self.channel_state & (ChannelState::FundingSent as u32 | ChannelState::ChannelReady as u32 | ChannelState::ShutdownComplete as u32) != 0 { self.latest_monitor_update_id = CLOSED_CHANNEL_UPDATE_ID; - Some((funding_txo, ChannelMonitorUpdate { + Some((self.get_counterparty_node_id(), funding_txo, ChannelMonitorUpdate { update_id: self.latest_monitor_update_id, updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast }], })) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index c06a5872df5..29d2ae81671 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -499,11 +499,26 @@ struct ClaimablePayments { /// for some reason. They are handled in timer_tick_occurred, so may be processed with /// quite some time lag. enum BackgroundEvent { - /// Handle a ChannelMonitorUpdate + /// Handle a ChannelMonitorUpdate which closes the channel. This is only separated from + /// [`Self::MonitorUpdateRegeneratedOnStartup`] as the non-closing variant needs a public key + /// to handle channel resumption, whereas if the channel has been force-closed we do not need + /// the counterparty node_id. /// /// Note that any such events are lost on shutdown, so in general they must be updates which /// are regenerated on startup. - MonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)), + ClosingMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)), + /// Handle a ChannelMonitorUpdate which may or may not close the channel. In general this + /// should be used rather than [`Self::ClosingMonitorUpdateRegeneratedOnStartup`], however in + /// cases where the `counterparty_node_id` is not available as the channel has closed from a + /// [`ChannelMonitor`] error the other variant is acceptable. + /// + /// Note that any such events are lost on shutdown, so in general they must be updates which + /// are regenerated on startup. + MonitorUpdateRegeneratedOnStartup { + counterparty_node_id: PublicKey, + funding_txo: OutPoint, + update: ChannelMonitorUpdate + }, } #[derive(Debug)] @@ -2193,7 +2208,7 @@ where let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id }; self.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver); } - if let Some((funding_txo, monitor_update)) = monitor_update_option { + if let Some((_, funding_txo, monitor_update)) = monitor_update_option { // There isn't anything we can do if we get an update failure - we're already // force-closing. The monitor update on the required in-memory copy should broadcast // the latest local state, which is the best we can do anyway. Thus, it is safe to @@ -3774,7 +3789,12 @@ where for event in background_events.drain(..) { match event { - BackgroundEvent::MonitorUpdateRegeneratedOnStartup((funding_txo, update)) => { + BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((funding_txo, update)) => { + // The channel has already been closed, so no use bothering to care about the + // monitor updating completing. + let _ = self.chain_monitor.update_channel(funding_txo, &update); + }, + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { funding_txo, update, .. } => { // The channel has already been closed, so no use bothering to care about the // monitor updating completing. let _ = self.chain_monitor.update_channel(funding_txo, &update); @@ -5689,12 +5709,15 @@ where // Channel::force_shutdown tries to make us do) as we may still be in initialization, // so we track the update internally and handle it when the user next calls // timer_tick_occurred, guaranteeing we're running normally. - if let Some((funding_txo, update)) = failure.0.take() { + if let Some((counterparty_node_id, funding_txo, update)) = failure.0.take() { assert_eq!(update.updates.len(), 1); if let ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast } = update.updates[0] { assert!(should_broadcast); } else { unreachable!(); } - self.pending_background_events.lock().unwrap().push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup((funding_txo, update))); + self.pending_background_events.lock().unwrap().push( + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id, funding_txo, update + }); } self.finish_force_close_channel(failure); } @@ -7767,8 +7790,10 @@ where log_error!(args.logger, " The ChannelMonitor for channel {} is at update_id {} but the ChannelManager is at update_id {}.", log_bytes!(channel.channel_id()), monitor.get_latest_update_id(), channel.get_latest_monitor_update_id()); let (monitor_update, mut new_failed_htlcs) = channel.force_shutdown(true); - if let Some(monitor_update) = monitor_update { - pending_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup(monitor_update)); + if let Some((counterparty_node_id, funding_txo, update)) = monitor_update { + pending_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id, funding_txo, update + }); } failed_htlcs.append(&mut new_failed_htlcs); channel_closures.push_back((events::Event::ChannelClosed { @@ -7843,7 +7868,7 @@ where update_id: CLOSED_CHANNEL_UPDATE_ID, updates: vec![ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast: true }], }; - pending_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update))); + pending_background_events.push(BackgroundEvent::ClosingMonitorUpdateRegeneratedOnStartup((*funding_txo, monitor_update))); } } From a2989129a79a150e12115c490d361fc701755c75 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 10 May 2023 17:15:29 +0000 Subject: [PATCH 4/8] Make `AChannelManager` trait slightly more generic and always on Rather than letting `AChannelManager` be bounded by all traits being `Sized` we make them explicitly `?Sized`. We also make the trait no longer test-only as it will be used in a coming commit. --- lightning/src/ln/channelmanager.rs | 42 ++++++++++++++++-------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 29d2ae81671..1bdd8d6e000 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -656,40 +656,44 @@ pub type SimpleArcChannelManager = ChannelManager< /// This is not exported to bindings users as Arcs don't make sense in bindings pub type SimpleRefChannelManager<'a, 'b, 'c, 'd, 'e, 'f, 'g, 'h, M, T, F, L> = ChannelManager<&'a M, &'b T, &'c KeysManager, &'c KeysManager, &'c KeysManager, &'d F, &'e DefaultRouter<&'f NetworkGraph<&'g L>, &'g L, &'h Mutex, &'g L>>>, &'g L>; +macro_rules! define_test_pub_trait { ($vis: vis) => { /// A trivial trait which describes any [`ChannelManager`] used in testing. -#[cfg(any(test, feature = "_test_utils"))] -pub trait AChannelManager { - type Watch: chain::Watch; +$vis trait AChannelManager { + type Watch: chain::Watch + ?Sized; type M: Deref; - type Broadcaster: BroadcasterInterface; + type Broadcaster: BroadcasterInterface + ?Sized; type T: Deref; - type EntropySource: EntropySource; + type EntropySource: EntropySource + ?Sized; type ES: Deref; - type NodeSigner: NodeSigner; + type NodeSigner: NodeSigner + ?Sized; type NS: Deref; - type Signer: WriteableEcdsaChannelSigner; - type SignerProvider: SignerProvider; + type Signer: WriteableEcdsaChannelSigner + Sized; + type SignerProvider: SignerProvider + ?Sized; type SP: Deref; - type FeeEstimator: FeeEstimator; + type FeeEstimator: FeeEstimator + ?Sized; type F: Deref; - type Router: Router; + type Router: Router + ?Sized; type R: Deref; - type Logger: Logger; + type Logger: Logger + ?Sized; type L: Deref; fn get_cm(&self) -> &ChannelManager; } +} } #[cfg(any(test, feature = "_test_utils"))] +define_test_pub_trait!(pub); +#[cfg(not(any(test, feature = "_test_utils")))] +define_test_pub_trait!(pub(crate)); impl AChannelManager for ChannelManager where - M::Target: chain::Watch<::Signer> + Sized, - T::Target: BroadcasterInterface + Sized, - ES::Target: EntropySource + Sized, - NS::Target: NodeSigner + Sized, - SP::Target: SignerProvider + Sized, - F::Target: FeeEstimator + Sized, - R::Target: Router + Sized, - L::Target: Logger + Sized, + M::Target: chain::Watch<::Signer>, + T::Target: BroadcasterInterface, + ES::Target: EntropySource, + NS::Target: NodeSigner, + SP::Target: SignerProvider, + F::Target: FeeEstimator, + R::Target: Router, + L::Target: Logger, { type Watch = M::Target; type M = M; From acbe41abe28b9434bbadc2216e4ed5847f753a76 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 10 May 2023 05:39:26 +0000 Subject: [PATCH 5/8] Handle `BackgroundEvent`s replaying non-closing monitor updates `BackgroundEvent` was used to store `ChannelMonitorUpdate`s which result in a channel force-close, avoiding relying on `ChannelMonitor`s having been loaded while `ChannelManager` block-connection methods are called during startup. In the coming commit(s) we'll also generate non-channel-closing `ChannelMonitorUpdate`s during startup, which will need to be replayed prior to any other `ChannelMonitorUpdate`s generated from normal operation. In the next commit we'll handle that by handling `BackgroundEvent`s immediately after locking the `total_consistency_lock`. --- lightning/src/ln/channelmanager.rs | 45 ++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 1bdd8d6e000..303ba7f7927 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -500,17 +500,20 @@ struct ClaimablePayments { /// quite some time lag. enum BackgroundEvent { /// Handle a ChannelMonitorUpdate which closes the channel. This is only separated from - /// [`Self::MonitorUpdateRegeneratedOnStartup`] as the non-closing variant needs a public key - /// to handle channel resumption, whereas if the channel has been force-closed we do not need - /// the counterparty node_id. + /// [`Self::MonitorUpdateRegeneratedOnStartup`] as the maybe-non-closing variant needs a public + /// key to handle channel resumption, whereas if the channel has been force-closed we do not + /// need the counterparty node_id. /// /// Note that any such events are lost on shutdown, so in general they must be updates which /// are regenerated on startup. ClosingMonitorUpdateRegeneratedOnStartup((OutPoint, ChannelMonitorUpdate)), - /// Handle a ChannelMonitorUpdate which may or may not close the channel. In general this - /// should be used rather than [`Self::ClosingMonitorUpdateRegeneratedOnStartup`], however in - /// cases where the `counterparty_node_id` is not available as the channel has closed from a - /// [`ChannelMonitor`] error the other variant is acceptable. + /// Handle a ChannelMonitorUpdate which may or may not close the channel and may unblock the + /// channel to continue normal operation. + /// + /// In general this should be used rather than + /// [`Self::ClosingMonitorUpdateRegeneratedOnStartup`], however in cases where the + /// `counterparty_node_id` is not available as the channel has closed from a [`ChannelMonitor`] + /// error the other variant is acceptable. /// /// Note that any such events are lost on shutdown, so in general they must be updates which /// are regenerated on startup. @@ -3798,10 +3801,30 @@ where // monitor updating completing. let _ = self.chain_monitor.update_channel(funding_txo, &update); }, - BackgroundEvent::MonitorUpdateRegeneratedOnStartup { funding_txo, update, .. } => { - // The channel has already been closed, so no use bothering to care about the - // monitor updating completing. - let _ = self.chain_monitor.update_channel(funding_txo, &update); + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { counterparty_node_id, funding_txo, update } => { + let update_res = self.chain_monitor.update_channel(funding_txo, &update); + + let res = { + let per_peer_state = self.per_peer_state.read().unwrap(); + if let Some(peer_state_mutex) = per_peer_state.get(&counterparty_node_id) { + let mut peer_state_lock = peer_state_mutex.lock().unwrap(); + let peer_state = &mut *peer_state_lock; + match peer_state.channel_by_id.entry(funding_txo.to_channel_id()) { + hash_map::Entry::Occupied(mut chan) => { + handle_new_monitor_update!(self, update_res, update.update_id, peer_state_lock, peer_state, per_peer_state, chan) + }, + hash_map::Entry::Vacant(_) => Ok(()), + } + } else { Ok(()) } + }; + // TODO: If this channel has since closed, we're likely providing a payment + // preimage update, which we must ensure is durable! We currently don't, + // however, ensure that. + if res.is_err() { + log_error!(self.logger, + "Failed to provide ChannelMonitorUpdate to closed channel! This likely lost us a payment preimage!"); + } + let _ = handle_error!(self, res, counterparty_node_id); }, } } From e5070c488090476ed61910316dd13fb45a58f442 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Thu, 6 Apr 2023 19:56:01 +0000 Subject: [PATCH 6/8] Process background events when taking the total_consistency_lock When we generated a `ChannelMonitorUpdate` during `ChannelManager` deserialization, we must ensure that it gets processed before any other `ChannelMonitorUpdate`s. The obvious hook for this is when taking the `total_consistency_lock`, which makes it unlikely we'll regress by forgetting this. Here we add that call in the `PersistenceNotifierGuard`, with a test-only atomic bool to test that this criteria is met. --- lightning/src/ln/channelmanager.rs | 176 +++++++++++++--------- lightning/src/ln/payment_tests.rs | 8 + lightning/src/ln/priv_short_conf_tests.rs | 2 + lightning/src/ln/reload_tests.rs | 6 + lightning/src/ln/reorg_tests.rs | 11 +- 5 files changed, 128 insertions(+), 75 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 303ba7f7927..7880fa8fbfe 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -495,9 +495,10 @@ struct ClaimablePayments { pending_claiming_payments: HashMap, } -/// Events which we process internally but cannot be procsesed immediately at the generation site -/// for some reason. They are handled in timer_tick_occurred, so may be processed with -/// quite some time lag. +/// Events which we process internally but cannot be processed immediately at the generation site +/// usually because we're running pre-full-init. They are handled immediately once we detect we are +/// running normally, and specifically must be processed before any other non-background +/// [`ChannelMonitorUpdate`]s are applied. enum BackgroundEvent { /// Handle a ChannelMonitorUpdate which closes the channel. This is only separated from /// [`Self::MonitorUpdateRegeneratedOnStartup`] as the maybe-non-closing variant needs a public @@ -982,7 +983,18 @@ where pending_events: Mutex)>>, /// A simple atomic flag to ensure only one task at a time can be processing events asynchronously. pending_events_processor: AtomicBool, + + /// If we are running during init (either directly during the deserialization method or in + /// block connection methods which run after deserialization but before normal operation) we + /// cannot provide the user with [`ChannelMonitorUpdate`]s through the normal update flow - + /// prior to normal operation the user may not have loaded the [`ChannelMonitor`]s into their + /// [`ChainMonitor`] and thus attempting to update it will fail or panic. + /// + /// Thus, we place them here to be handled as soon as possible once we are running normally. + /// /// See `ChannelManager` struct-level documentation for lock order requirements. + /// + /// [`ChainMonitor`]: crate::chain::chainmonitor::ChainMonitor pending_background_events: Mutex>, /// Used when we have to take a BIG lock to make sure everything is self-consistent. /// Essentially just when we're serializing ourselves out. @@ -992,6 +1004,9 @@ where /// Notifier the lock contains sends out a notification when the lock is released. total_consistency_lock: RwLock<()>, + #[cfg(debug_assertions)] + background_events_processed_since_startup: AtomicBool, + persistence_notifier: Notifier, entropy_source: ES, @@ -1018,6 +1033,7 @@ pub struct ChainParameters { } #[derive(Copy, Clone, PartialEq)] +#[must_use] enum NotifyOption { DoPersist, SkipPersist, @@ -1041,10 +1057,20 @@ struct PersistenceNotifierGuard<'a, F: Fn() -> NotifyOption> { } impl<'a> PersistenceNotifierGuard<'a, fn() -> NotifyOption> { // We don't care what the concrete F is here, it's unused - fn notify_on_drop(lock: &'a RwLock<()>, notifier: &'a Notifier) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { - PersistenceNotifierGuard::optionally_notify(lock, notifier, || -> NotifyOption { NotifyOption::DoPersist }) + fn notify_on_drop(cm: &'a C) -> PersistenceNotifierGuard<'a, impl Fn() -> NotifyOption> { + let read_guard = cm.get_cm().total_consistency_lock.read().unwrap(); + let _ = cm.get_cm().process_background_events(); // We always persist + + PersistenceNotifierGuard { + persistence_notifier: &cm.get_cm().persistence_notifier, + should_persist: || -> NotifyOption { NotifyOption::DoPersist }, + _read_guard: read_guard, + } + } + /// Note that if any [`ChannelMonitorUpdate`]s are possibly generated, + /// [`ChannelManager::process_background_events`] MUST be called first. fn optionally_notify NotifyOption>(lock: &'a RwLock<()>, notifier: &'a Notifier, persist_check: F) -> PersistenceNotifierGuard<'a, F> { let read_guard = lock.read().unwrap(); @@ -1708,6 +1734,9 @@ macro_rules! handle_new_monitor_update { // update_maps_on_chan_removal needs to be able to take id_to_peer, so make sure we can in // any case so that it won't deadlock. debug_assert_ne!($self.id_to_peer.held_by_thread(), LockHeldState::HeldByThread); + #[cfg(debug_assertions)] { + debug_assert!($self.background_events_processed_since_startup.load(Ordering::Acquire)); + } match $update_res { ChannelMonitorUpdateStatus::InProgress => { log_debug!($self.logger, "ChannelMonitor update for {} in flight, holding messages until the update completes.", @@ -1754,6 +1783,10 @@ macro_rules! process_events_body { // persists happen while processing monitor events. let _read_guard = $self.total_consistency_lock.read().unwrap(); + // Because `handle_post_event_actions` may send `ChannelMonitorUpdate`s to the user we must + // ensure any startup-generated background events are handled first. + if $self.process_background_events() == NotifyOption::DoPersist { result = NotifyOption::DoPersist; } + // TODO: This behavior should be documented. It's unintuitive that we query // ChannelMonitors when clearing other events. if $self.process_pending_monitor_events() { @@ -1863,6 +1896,8 @@ where pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(Vec::new()), total_consistency_lock: RwLock::new(()), + #[cfg(debug_assertions)] + background_events_processed_since_startup: AtomicBool::new(false), persistence_notifier: Notifier::new(), entropy_source, @@ -1931,7 +1966,7 @@ where return Err(APIError::APIMisuseError { err: format!("Channel value must be at least 1000 satoshis. It was {}", channel_value_satoshis) }); } - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); // We want to make sure the lock is actually acquired by PersistenceNotifierGuard. debug_assert!(&self.total_consistency_lock.try_write().is_err()); @@ -2085,7 +2120,7 @@ where } fn close_channel_internal(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, target_feerate_sats_per_1000_weight: Option, override_shutdown_script: Option) -> Result<(), APIError> { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>; let result: Result<(), _> = loop { @@ -2258,7 +2293,7 @@ where } fn force_close_sending_error(&self, channel_id: &[u8; 32], counterparty_node_id: &PublicKey, broadcast: bool) -> Result<(), APIError> { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); match self.force_close_channel_with_peer(channel_id, counterparty_node_id, None, broadcast) { Ok(counterparty_node_id) => { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -2868,7 +2903,7 @@ where /// [`ChannelMonitorUpdateStatus::InProgress`]: crate::chain::ChannelMonitorUpdateStatus::InProgress pub fn send_payment_with_route(&self, route: &Route, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, payment_id: PaymentId) -> Result<(), PaymentSendFailure> { let best_block_height = self.best_block.read().unwrap().height(); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments .send_payment_with_route(route, payment_hash, recipient_onion, payment_id, &self.entropy_source, &self.node_signer, best_block_height, |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| @@ -2879,7 +2914,7 @@ where /// `route_params` and retry failed payment paths based on `retry_strategy`. pub fn send_payment(&self, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result<(), RetryableSendFailure> { let best_block_height = self.best_block.read().unwrap().height(); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments .send_payment(payment_hash, recipient_onion, payment_id, retry_strategy, route_params, &self.router, self.list_usable_channels(), || self.compute_inflight_htlcs(), @@ -2892,7 +2927,7 @@ where #[cfg(test)] pub(super) fn test_send_payment_internal(&self, route: &Route, payment_hash: PaymentHash, recipient_onion: RecipientOnionFields, keysend_preimage: Option, payment_id: PaymentId, recv_value_msat: Option, onion_session_privs: Vec<[u8; 32]>) -> Result<(), PaymentSendFailure> { let best_block_height = self.best_block.read().unwrap().height(); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments.test_send_payment_internal(route, payment_hash, recipient_onion, keysend_preimage, payment_id, recv_value_msat, onion_session_privs, &self.node_signer, best_block_height, |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) @@ -2927,7 +2962,7 @@ where /// [`Event::PaymentFailed`]: events::Event::PaymentFailed /// [`Event::PaymentSent`]: events::Event::PaymentSent pub fn abandon_payment(&self, payment_id: PaymentId) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments.abandon_payment(payment_id, PaymentFailureReason::UserAbandoned, &self.pending_events); } @@ -2948,7 +2983,7 @@ where /// [`send_payment`]: Self::send_payment pub fn send_spontaneous_payment(&self, route: &Route, payment_preimage: Option, recipient_onion: RecipientOnionFields, payment_id: PaymentId) -> Result { let best_block_height = self.best_block.read().unwrap().height(); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments.send_spontaneous_payment_with_route( route, payment_preimage, recipient_onion, payment_id, &self.entropy_source, &self.node_signer, best_block_height, @@ -2965,7 +3000,7 @@ where /// [`PaymentParameters::for_keysend`]: crate::routing::router::PaymentParameters::for_keysend pub fn send_spontaneous_payment_with_retry(&self, payment_preimage: Option, recipient_onion: RecipientOnionFields, payment_id: PaymentId, route_params: RouteParameters, retry_strategy: Retry) -> Result { let best_block_height = self.best_block.read().unwrap().height(); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments.send_spontaneous_payment(payment_preimage, recipient_onion, payment_id, retry_strategy, route_params, &self.router, self.list_usable_channels(), || self.compute_inflight_htlcs(), &self.entropy_source, &self.node_signer, best_block_height, @@ -2979,7 +3014,7 @@ where /// us to easily discern them from real payments. pub fn send_probe(&self, path: Path) -> Result<(PaymentHash, PaymentId), PaymentSendFailure> { let best_block_height = self.best_block.read().unwrap().height(); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); self.pending_outbound_payments.send_probe(path, self.probing_cookie_secret, &self.entropy_source, &self.node_signer, best_block_height, |path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv| self.send_payment_along_path(path, payment_hash, recipient_onion, total_value, cur_height, payment_id, keysend_preimage, session_priv)) @@ -3090,7 +3125,7 @@ where /// [`Event::FundingGenerationReady`]: crate::events::Event::FundingGenerationReady /// [`Event::ChannelClosed`]: crate::events::Event::ChannelClosed pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction) -> Result<(), APIError> { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); for inp in funding_transaction.input.iter() { if inp.witness.is_empty() { @@ -3170,9 +3205,7 @@ where }); } - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop( - &self.total_consistency_lock, &self.persistence_notifier, - ); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let per_peer_state = self.per_peer_state.read().unwrap(); let peer_state_mutex = per_peer_state.get(counterparty_node_id) .ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?; @@ -3225,7 +3258,7 @@ where // TODO: when we move to deciding the best outbound channel at forward time, only take // `next_node_id` and not `next_hop_channel_id` pub fn forward_intercepted_htlc(&self, intercept_id: InterceptId, next_hop_channel_id: &[u8; 32], next_node_id: PublicKey, amt_to_forward_msat: u64) -> Result<(), APIError> { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let next_hop_scid = { let peer_state_lock = self.per_peer_state.read().unwrap(); @@ -3281,7 +3314,7 @@ where /// /// [`HTLCIntercepted`]: events::Event::HTLCIntercepted pub fn fail_intercepted_htlc(&self, intercept_id: InterceptId) -> Result<(), APIError> { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let payment = self.pending_intercepted_htlcs.lock().unwrap().remove(&intercept_id) .ok_or_else(|| APIError::APIMisuseError { @@ -3310,7 +3343,7 @@ where /// Should only really ever be called in response to a PendingHTLCsForwardable event. /// Will likely generate further events. pub fn process_pending_htlc_forwards(&self) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let mut new_events = VecDeque::new(); let mut failed_forwards = Vec::new(); @@ -3781,17 +3814,19 @@ where events.append(&mut new_events); } - /// Free the background events, generally called from timer_tick_occurred. - /// - /// Exposed for testing to allow us to process events quickly without generating accidental - /// BroadcastChannelUpdate events in timer_tick_occurred. + /// Free the background events, generally called from [`PersistenceNotifierGuard`] constructors. /// /// Expects the caller to have a total_consistency_lock read lock. - fn process_background_events(&self) -> bool { + fn process_background_events(&self) -> NotifyOption { + debug_assert_ne!(self.total_consistency_lock.held_by_thread(), LockHeldState::NotHeldByThread); + + #[cfg(debug_assertions)] + self.background_events_processed_since_startup.store(true, Ordering::Release); + let mut background_events = Vec::new(); mem::swap(&mut *self.pending_background_events.lock().unwrap(), &mut background_events); if background_events.is_empty() { - return false; + return NotifyOption::SkipPersist; } for event in background_events.drain(..) { @@ -3828,13 +3863,14 @@ where }, } } - true + NotifyOption::DoPersist } #[cfg(any(test, feature = "_test_utils"))] /// Process background events, for functional testing pub fn test_process_background_events(&self) { - self.process_background_events(); + let _lck = self.total_consistency_lock.read().unwrap(); + let _ = self.process_background_events(); } fn update_channel_fee(&self, chan_id: &[u8; 32], chan: &mut Channel<::Signer>, new_feerate: u32) -> NotifyOption { @@ -3864,7 +3900,7 @@ where /// it wants to detect). Thus, we have a variant exposed here for its benefit. pub fn maybe_update_chan_fees(&self) { PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let mut should_persist = NotifyOption::SkipPersist; + let mut should_persist = self.process_background_events(); let new_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); @@ -3900,8 +3936,7 @@ where /// [`ChannelConfig`]: crate::util::config::ChannelConfig pub fn timer_tick_occurred(&self) { PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let mut should_persist = NotifyOption::SkipPersist; - if self.process_background_events() { should_persist = NotifyOption::DoPersist; } + let mut should_persist = self.process_background_events(); let new_feerate = self.fee_estimator.bounded_sat_per_1000_weight(ConfirmationTarget::Normal); @@ -4073,7 +4108,7 @@ where /// /// See [`FailureCode`] for valid failure codes. pub fn fail_htlc_backwards_with_reason(&self, payment_hash: &PaymentHash, failure_code: FailureCode) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let removed_source = self.claimable_payments.lock().unwrap().claimable_payments.remove(payment_hash); if let Some(payment) = removed_source { @@ -4250,7 +4285,7 @@ where pub fn claim_funds(&self, payment_preimage: PaymentPreimage) { let payment_hash = PaymentHash(Sha256::hash(&payment_preimage.0).into_inner()); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let mut sources = { let mut claimable_payments = self.claimable_payments.lock().unwrap(); @@ -4651,7 +4686,7 @@ where } fn do_accept_inbound_channel(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, accept_0conf: bool, user_channel_id: u128) -> Result<(), APIError> { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let peers_without_funded_channels = self.peers_without_funded_channels(|peer| !peer.channel_by_id.is_empty()); let per_peer_state = self.per_peer_state.read().unwrap(); @@ -5598,13 +5633,8 @@ where /// update events as a separate process method here. #[cfg(fuzzing)] pub fn process_monitor_events(&self) { - PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - if self.process_pending_monitor_events() { - NotifyOption::DoPersist - } else { - NotifyOption::SkipPersist - } - }); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); + self.process_pending_monitor_events(); } /// Check the holding cell in each channel and free any pending HTLCs in them if possible. @@ -5759,7 +5789,7 @@ where let payment_secret = PaymentSecret(self.entropy_source.get_secure_random_bytes()); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let mut payment_secrets = self.pending_inbound_payments.lock().unwrap(); match payment_secrets.entry(payment_hash) { hash_map::Entry::Vacant(e) => { @@ -6113,7 +6143,7 @@ where fn get_and_clear_pending_msg_events(&self) -> Vec { let events = RefCell::new(Vec::new()); PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { - let mut result = NotifyOption::SkipPersist; + let mut result = self.process_background_events(); // TODO: This behavior should be documented. It's unintuitive that we query // ChannelMonitors when clearing other events. @@ -6194,7 +6224,8 @@ where } fn block_disconnected(&self, header: &BlockHeader, height: u32) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, + &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); let new_height = height - 1; { let mut best_block = self.best_block.write().unwrap(); @@ -6228,7 +6259,8 @@ where let block_hash = header.block_hash(); log_trace!(self.logger, "{} transactions included in block {} at height {} provided", txdata.len(), block_hash, height); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, + &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); self.do_chain_event(Some(height), |channel| channel.transactions_confirmed(&block_hash, height, txdata, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger) .map(|(a, b)| (a, Vec::new(), b))); @@ -6247,8 +6279,8 @@ where let block_hash = header.block_hash(); log_trace!(self.logger, "New best block: {} at height {}", block_hash, height); - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); - + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, + &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); *self.best_block.write().unwrap() = BestBlock::new(block_hash, height); self.do_chain_event(Some(height), |channel| channel.best_block_updated(height, header.time, self.genesis_hash.clone(), &self.node_signer, &self.default_configuration, &self.logger)); @@ -6291,7 +6323,8 @@ where } fn transaction_unconfirmed(&self, txid: &Txid) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, + &self.persistence_notifier, || -> NotifyOption { NotifyOption::DoPersist }); self.do_chain_event(None, |channel| { if let Some(funding_txo) = channel.get_funding_txo() { if funding_txo.txid == *txid { @@ -6535,7 +6568,7 @@ where L::Target: Logger, { fn handle_open_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::OpenChannel) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_open_channel(counterparty_node_id, msg), *counterparty_node_id); } @@ -6546,7 +6579,7 @@ where } fn handle_accept_channel(&self, counterparty_node_id: &PublicKey, msg: &msgs::AcceptChannel) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_accept_channel(counterparty_node_id, msg), *counterparty_node_id); } @@ -6557,74 +6590,75 @@ where } fn handle_funding_created(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingCreated) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_funding_created(counterparty_node_id, msg), *counterparty_node_id); } fn handle_funding_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::FundingSigned) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_funding_signed(counterparty_node_id, msg), *counterparty_node_id); } fn handle_channel_ready(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReady) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_channel_ready(counterparty_node_id, msg), *counterparty_node_id); } fn handle_shutdown(&self, counterparty_node_id: &PublicKey, msg: &msgs::Shutdown) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_shutdown(counterparty_node_id, msg), *counterparty_node_id); } fn handle_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_closing_signed(counterparty_node_id, msg), *counterparty_node_id); } fn handle_update_add_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateAddHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_update_add_htlc(counterparty_node_id, msg), *counterparty_node_id); } fn handle_update_fulfill_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFulfillHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_update_fulfill_htlc(counterparty_node_id, msg), *counterparty_node_id); } fn handle_update_fail_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_update_fail_htlc(counterparty_node_id, msg), *counterparty_node_id); } fn handle_update_fail_malformed_htlc(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFailMalformedHTLC) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_update_fail_malformed_htlc(counterparty_node_id, msg), *counterparty_node_id); } fn handle_commitment_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::CommitmentSigned) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_commitment_signed(counterparty_node_id, msg), *counterparty_node_id); } fn handle_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_revoke_and_ack(counterparty_node_id, msg), *counterparty_node_id); } fn handle_update_fee(&self, counterparty_node_id: &PublicKey, msg: &msgs::UpdateFee) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_update_fee(counterparty_node_id, msg), *counterparty_node_id); } fn handle_announcement_signatures(&self, counterparty_node_id: &PublicKey, msg: &msgs::AnnouncementSignatures) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_announcement_signatures(counterparty_node_id, msg), *counterparty_node_id); } fn handle_channel_update(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelUpdate) { PersistenceNotifierGuard::optionally_notify(&self.total_consistency_lock, &self.persistence_notifier, || { + let force_persist = self.process_background_events(); if let Ok(persist) = handle_error!(self, self.internal_channel_update(counterparty_node_id, msg), *counterparty_node_id) { - persist + if force_persist == NotifyOption::DoPersist { NotifyOption::DoPersist } else { persist } } else { NotifyOption::SkipPersist } @@ -6632,12 +6666,12 @@ where } fn handle_channel_reestablish(&self, counterparty_node_id: &PublicKey, msg: &msgs::ChannelReestablish) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let _ = handle_error!(self, self.internal_channel_reestablish(counterparty_node_id, msg), *counterparty_node_id); } fn peer_disconnected(&self, counterparty_node_id: &PublicKey) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); let mut failed_channels = Vec::new(); let mut per_peer_state = self.per_peer_state.write().unwrap(); let remove_peer = { @@ -6719,7 +6753,7 @@ where return Err(()); } - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); // If we have too many peers connected which don't have funded channels, disconnect the // peer immediately (as long as it doesn't have funded channels). If we have a bunch of @@ -6802,7 +6836,7 @@ where } fn handle_error(&self, counterparty_node_id: &PublicKey, msg: &msgs::ErrorMessage) { - let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(&self.total_consistency_lock, &self.persistence_notifier); + let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self); if msg.channel_id == [0; 32] { let channel_ids: Vec<[u8; 32]> = { @@ -8363,6 +8397,8 @@ where pending_events_processor: AtomicBool::new(false), pending_background_events: Mutex::new(pending_background_events), total_consistency_lock: RwLock::new(()), + #[cfg(debug_assertions)] + background_events_processed_since_startup: AtomicBool::new(false), persistence_notifier: Notifier::new(), entropy_source: args.entropy_source, diff --git a/lightning/src/ln/payment_tests.rs b/lightning/src/ln/payment_tests.rs index ba7da5d5e6b..ba8731bd581 100644 --- a/lightning/src/ln/payment_tests.rs +++ b/lightning/src/ln/payment_tests.rs @@ -609,6 +609,9 @@ fn do_test_completed_payment_not_retryable_on_reload(use_dust: bool) { reload_node!(nodes[0], test_default_channel_config(), nodes_0_serialized, &[&chan_0_monitor_serialized, &chan_1_monitor_serialized], second_persister, second_new_chain_monitor, second_nodes_0_deserialized); nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id()); + nodes[0].node.test_process_background_events(); + check_added_monitors(&nodes[0], 1); + reconnect_nodes(&nodes[0], &nodes[1], (true, true), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (false, false)); // Now resend the payment, delivering the HTLC and actually claiming it this time. This ensures @@ -634,6 +637,9 @@ fn do_test_completed_payment_not_retryable_on_reload(use_dust: bool) { reload_node!(nodes[0], test_default_channel_config(), nodes_0_serialized, &[&chan_0_monitor_serialized, &chan_1_monitor_serialized], third_persister, third_new_chain_monitor, third_nodes_0_deserialized); nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id()); + nodes[0].node.test_process_background_events(); + check_added_monitors(&nodes[0], 1); + reconnect_nodes(&nodes[0], &nodes[1], (false, false), (0, 0), (0, 0), (0, 0), (0, 0), (0, 0), (false, false)); match nodes[0].node.send_payment_with_route(&new_route, payment_hash, RecipientOnionFields::secret_only(payment_secret), payment_id) { @@ -782,6 +788,7 @@ fn do_test_dup_htlc_onchain_fails_on_reload(persist_manager_post_event: bool, co let height = nodes[0].blocks.lock().unwrap().len() as u32 - 1; nodes[0].chain_monitor.chain_monitor.block_connected(&claim_block, height); assert!(nodes[0].node.get_and_clear_pending_events().is_empty()); + check_added_monitors(&nodes[0], 1); } #[test] @@ -2869,6 +2876,7 @@ fn do_no_missing_sent_on_midpoint_reload(persist_manager_with_payment: bool) { reload_node!(nodes[0], test_default_channel_config(), &nodes[0].node.encode(), &[&chan_0_monitor_serialized], persister_c, chain_monitor_c, nodes_0_deserialized_c); let events = nodes[0].node.get_and_clear_pending_events(); assert!(events.is_empty()); + check_added_monitors(&nodes[0], 1); } #[test] diff --git a/lightning/src/ln/priv_short_conf_tests.rs b/lightning/src/ln/priv_short_conf_tests.rs index cfcc46dfeda..29789eea2e2 100644 --- a/lightning/src/ln/priv_short_conf_tests.rs +++ b/lightning/src/ln/priv_short_conf_tests.rs @@ -853,10 +853,12 @@ fn test_0conf_channel_reorg() { err: "Funding transaction was un-confirmed. Locked at 0 confs, now have 0 confs.".to_owned() }); check_closed_broadcast!(nodes[0], true); + check_added_monitors(&nodes[0], 1); check_closed_event!(&nodes[1], 1, ClosureReason::ProcessingError { err: "Funding transaction was un-confirmed. Locked at 0 confs, now have 0 confs.".to_owned() }); check_closed_broadcast!(nodes[1], true); + check_added_monitors(&nodes[1], 1); } #[test] diff --git a/lightning/src/ln/reload_tests.rs b/lightning/src/ln/reload_tests.rs index 9b694c7b869..6bb755f8c91 100644 --- a/lightning/src/ln/reload_tests.rs +++ b/lightning/src/ln/reload_tests.rs @@ -774,6 +774,9 @@ fn do_test_partial_claim_before_restart(persist_both_monitors: bool) { if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[1] { } else { panic!(); } if persist_both_monitors { if let Event::ChannelClosed { reason: ClosureReason::OutdatedChannelManager, .. } = events[2] { } else { panic!(); } + check_added_monitors(&nodes[3], 2); + } else { + check_added_monitors(&nodes[3], 1); } // On restart, we should also get a duplicate PaymentClaimed event as we persisted the @@ -1047,6 +1050,9 @@ fn removed_payment_no_manager_persistence() { _ => panic!("Unexpected event"), } + nodes[1].node.test_process_background_events(); + check_added_monitors(&nodes[1], 1); + // Now that the ChannelManager has force-closed the channel which had the HTLC removed, it is // now forgotten everywhere. The ChannelManager should have, as a side-effect of reload, // learned that the HTLC is gone from the ChannelMonitor and added it to the to-fail-back set. diff --git a/lightning/src/ln/reorg_tests.rs b/lightning/src/ln/reorg_tests.rs index e8f0c125943..46ffcf152df 100644 --- a/lightning/src/ln/reorg_tests.rs +++ b/lightning/src/ln/reorg_tests.rs @@ -302,8 +302,6 @@ fn do_test_unconf_chan(reload_node: bool, reorg_after_reload: bool, use_funding_ let relevant_txids = nodes[0].node.get_relevant_txids(); assert_eq!(relevant_txids.len(), 0); - handle_announce_close_broadcast_events(&nodes, 0, 1, true, "Channel closed because of an exception: Funding transaction was un-confirmed. Locked at 6 confs, now have 0 confs."); - check_added_monitors!(nodes[1], 1); { let per_peer_state = nodes[0].node.per_peer_state.read().unwrap(); let peer_state = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().lock().unwrap(); @@ -349,8 +347,6 @@ fn do_test_unconf_chan(reload_node: bool, reorg_after_reload: bool, use_funding_ let relevant_txids = nodes[0].node.get_relevant_txids(); assert_eq!(relevant_txids.len(), 0); - handle_announce_close_broadcast_events(&nodes, 0, 1, true, "Channel closed because of an exception: Funding transaction was un-confirmed. Locked at 6 confs, now have 0 confs."); - check_added_monitors!(nodes[1], 1); { let per_peer_state = nodes[0].node.per_peer_state.read().unwrap(); let peer_state = per_peer_state.get(&nodes[1].node.get_our_node_id()).unwrap().lock().unwrap(); @@ -364,7 +360,12 @@ fn do_test_unconf_chan(reload_node: bool, reorg_after_reload: bool, use_funding_ nodes[0].node.test_process_background_events(); // Required to free the pending background monitor update check_added_monitors!(nodes[0], 1); let expected_err = "Funding transaction was un-confirmed. Locked at 6 confs, now have 0 confs."; - check_closed_event!(nodes[1], 1, ClosureReason::CounterpartyForceClosed { peer_msg: UntrustedString(format!("Channel closed because of an exception: {}", expected_err)) }); + if reorg_after_reload || !reload_node { + handle_announce_close_broadcast_events(&nodes, 0, 1, true, "Channel closed because of an exception: Funding transaction was un-confirmed. Locked at 6 confs, now have 0 confs."); + check_added_monitors!(nodes[1], 1); + check_closed_event!(nodes[1], 1, ClosureReason::CounterpartyForceClosed { peer_msg: UntrustedString(format!("Channel closed because of an exception: {}", expected_err)) }); + } + check_closed_event!(nodes[0], 1, ClosureReason::ProcessingError { err: expected_err.to_owned() }); assert_eq!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().len(), 1); nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().clear(); From 785bdb84cb3fb0305cad2edc30e2df03b6988c9b Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 4 Apr 2023 21:45:37 +0000 Subject: [PATCH 7/8] Reapply pending `ChannelMonitorUpdate`s on startup If a `ChannelMonitorUpdate` was created and given to the user but left uncompleted when the `ChannelManager` is persisted prior to a restart, the user likely lost the `ChannelMonitorUpdate`(s). Thus, we need to replay them for the user, which we do here using the new `BackgroundEvent::MonitorUpdateRegeneratedOnStartup` variant. --- lightning/src/ln/channel.rs | 15 +++++++++++++++ lightning/src/ln/channelmanager.rs | 23 ++++++++++++++++++++++- 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/lightning/src/ln/channel.rs b/lightning/src/ln/channel.rs index 18ad49f3271..955f519c2f6 100644 --- a/lightning/src/ln/channel.rs +++ b/lightning/src/ln/channel.rs @@ -5050,10 +5050,25 @@ impl Channel { self.pending_monitor_updates.is_empty() } + pub fn complete_all_mon_updates_through(&mut self, update_id: u64) { + self.pending_monitor_updates.retain(|upd| { + if upd.update.update_id <= update_id { + assert!(!upd.blocked, "Completed update must have flown"); + false + } else { true } + }); + } + pub fn complete_one_mon_update(&mut self, update_id: u64) { self.pending_monitor_updates.retain(|upd| upd.update.update_id != update_id); } + /// Returns an iterator over all unblocked monitor updates which have not yet completed. + pub fn uncompleted_unblocked_mon_updates(&self) -> impl Iterator { + self.pending_monitor_updates.iter() + .filter_map(|upd| if upd.blocked { None } else { Some(&upd.update) }) + } + /// Returns true if funding_created was sent/received. pub fn is_funding_initiated(&self) -> bool { self.channel_state >= ChannelState::FundingSent as u32 diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 7880fa8fbfe..2735aac5c51 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -7882,7 +7882,10 @@ where } } } else { - log_info!(args.logger, "Successfully loaded channel {}", log_bytes!(channel.channel_id())); + log_info!(args.logger, "Successfully loaded channel {} at update_id {} against monitor at update id {}", + log_bytes!(channel.channel_id()), channel.get_latest_monitor_update_id(), + monitor.get_latest_update_id()); + channel.complete_all_mon_updates_through(monitor.get_latest_update_id()); if let Some(short_channel_id) = channel.get_short_channel_id() { short_to_chan_info.insert(short_channel_id, (channel.get_counterparty_node_id(), channel.channel_id())); } @@ -7996,6 +7999,24 @@ where } } + for (node_id, peer_mtx) in per_peer_state.iter() { + let peer_state = peer_mtx.lock().unwrap(); + for (_, chan) in peer_state.channel_by_id.iter() { + for update in chan.uncompleted_unblocked_mon_updates() { + if let Some(funding_txo) = chan.get_funding_txo() { + log_trace!(args.logger, "Replaying ChannelMonitorUpdate {} for channel {}", + update.update_id, log_bytes!(funding_txo.to_channel_id())); + pending_background_events.push( + BackgroundEvent::MonitorUpdateRegeneratedOnStartup { + counterparty_node_id: *node_id, funding_txo, update: update.clone(), + }); + } else { + return Err(DecodeError::InvalidValue); + } + } + } + } + let _last_node_announcement_serial: u32 = Readable::read(reader)?; // Only used < 0.0.111 let highest_seen_timestamp: u32 = Readable::read(reader)?; From 394f54da314864a1a368d4c4f1b5c503da85c7ce Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 7 Apr 2023 00:31:39 +0000 Subject: [PATCH 8/8] Add infra to block `ChannelMonitorUpdate`s on forwarded claims When we forward a payment and receive an `update_fulfill_htlc` message from the downstream channel, we immediately claim the HTLC on the upstream channel, before even doing a `commitment_signed` dance on the downstream channel. This implies that our `ChannelMonitorUpdate`s "go out" in the right order - first we ensure we'll get our money by writing the preimage down, then we write the update that resolves giving money on the downstream node. This is safe as long as `ChannelMonitorUpdate`s complete in the order in which they are generated, but of course looking forward we want to support asynchronous updates, which may complete in any order. Here we add infrastructure to handle downstream `ChannelMonitorUpdate`s which are blocked on an upstream preimage-containing one. We don't yet actually do the blocking which will come in a future commit. --- lightning/src/ln/channelmanager.rs | 158 ++++++++++++++++++++++++----- 1 file changed, 131 insertions(+), 27 deletions(-) diff --git a/lightning/src/ln/channelmanager.rs b/lightning/src/ln/channelmanager.rs index 2735aac5c51..0ec8bddf656 100644 --- a/lightning/src/ln/channelmanager.rs +++ b/lightning/src/ln/channelmanager.rs @@ -532,13 +532,31 @@ pub(crate) enum MonitorUpdateCompletionAction { /// this payment. Note that this is only best-effort. On restart it's possible such a duplicate /// event can be generated. PaymentClaimed { payment_hash: PaymentHash }, - /// Indicates an [`events::Event`] should be surfaced to the user. - EmitEvent { event: events::Event }, + /// Indicates an [`events::Event`] should be surfaced to the user and possibly resume the + /// operation of another channel. + /// + /// This is usually generated when we've forwarded an HTLC and want to block the outbound edge + /// from completing a monitor update which removes the payment preimage until the inbound edge + /// completes a monitor update containing the payment preimage. In that case, after the inbound + /// edge completes, we will surface an [`Event::PaymentForwarded`] as well as unblock the + /// outbound edge. + EmitEventAndFreeOtherChannel { + event: events::Event, + downstream_counterparty_and_funding_outpoint: Option<(PublicKey, OutPoint, RAAMonitorUpdateBlockingAction)>, + }, } impl_writeable_tlv_based_enum_upgradable!(MonitorUpdateCompletionAction, (0, PaymentClaimed) => { (0, payment_hash, required) }, - (2, EmitEvent) => { (0, event, upgradable_required) }, + (2, EmitEventAndFreeOtherChannel) => { + (0, event, upgradable_required), + // LDK prior to 0.0.116 did not have this field as the monitor update application order was + // required by clients. If we downgrade to something prior to 0.0.116 this may result in + // monitor updates which aren't properly blocked or resumed, however that's fine - we don't + // support async monitor updates even in LDK 0.0.116 and once we do we'll require no + // downgrades to prior versions. + (1, downstream_counterparty_and_funding_outpoint, option), + }, ); #[derive(Clone, Debug, PartialEq, Eq)] @@ -555,6 +573,36 @@ impl_writeable_tlv_based_enum!(EventCompletionAction, }; ); +#[derive(Clone, PartialEq, Eq, Debug)] +/// If something is blocked on the completion of an RAA-generated [`ChannelMonitorUpdate`] we track +/// the blocked action here. See enum variants for more info. +pub(crate) enum RAAMonitorUpdateBlockingAction { + /// A forwarded payment was claimed. We block the downstream channel completing its monitor + /// update which removes the HTLC preimage until the upstream channel has gotten the preimage + /// durably to disk. + ForwardedPaymentInboundClaim { + /// The upstream channel ID (i.e. the inbound edge). + channel_id: [u8; 32], + /// The HTLC ID on the inbound edge. + htlc_id: u64, + }, +} + +impl RAAMonitorUpdateBlockingAction { + #[allow(unused)] + fn from_prev_hop_data(prev_hop: &HTLCPreviousHopData) -> Self { + Self::ForwardedPaymentInboundClaim { + channel_id: prev_hop.outpoint.to_channel_id(), + htlc_id: prev_hop.htlc_id, + } + } +} + +impl_writeable_tlv_based_enum!(RAAMonitorUpdateBlockingAction, + (0, ForwardedPaymentInboundClaim) => { (0, channel_id, required), (2, htlc_id, required) } +;); + + /// State we hold per-peer. pub(super) struct PeerState { /// `temporary_channel_id` or `channel_id` -> `channel`. @@ -583,6 +631,11 @@ pub(super) struct PeerState { /// to funding appearing on-chain), the downstream `ChannelMonitor` set is required to ensure /// duplicates do not occur, so such channels should fail without a monitor update completing. monitor_update_blocked_actions: BTreeMap<[u8; 32], Vec>, + /// If another channel's [`ChannelMonitorUpdate`] needs to complete before a channel we have + /// with this peer can complete an RAA [`ChannelMonitorUpdate`] (e.g. because the RAA update + /// will remove a preimage that needs to be durably in an upstream channel first), we put an + /// entry here to note that the channel with the key's ID is blocked on a set of actions. + actions_blocking_raa_monitor_updates: BTreeMap<[u8; 32], Vec>, /// The peer is currently connected (i.e. we've seen a /// [`ChannelMessageHandler::peer_connected`] and no corresponding /// [`ChannelMessageHandler::peer_disconnected`]. @@ -4490,16 +4543,16 @@ where Some(claimed_htlc_value - forwarded_htlc_value) } else { None }; - let prev_channel_id = Some(prev_outpoint.to_channel_id()); - let next_channel_id = Some(next_channel_id); - - Some(MonitorUpdateCompletionAction::EmitEvent { event: events::Event::PaymentForwarded { - fee_earned_msat, - claim_from_onchain_tx: from_onchain, - prev_channel_id, - next_channel_id, - outbound_amount_forwarded_msat: forwarded_htlc_value_msat, - }}) + Some(MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel { + event: events::Event::PaymentForwarded { + fee_earned_msat, + claim_from_onchain_tx: from_onchain, + prev_channel_id: Some(prev_outpoint.to_channel_id()), + next_channel_id: Some(next_channel_id), + outbound_amount_forwarded_msat: forwarded_htlc_value_msat, + }, + downstream_counterparty_and_funding_outpoint: None, + }) } else { None } }); if let Err((pk, err)) = res { @@ -4526,8 +4579,13 @@ where }, None)); } }, - MonitorUpdateCompletionAction::EmitEvent { event } => { + MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel { + event, downstream_counterparty_and_funding_outpoint + } => { self.pending_events.lock().unwrap().push_back((event, None)); + if let Some((node_id, funding_outpoint, blocker)) = downstream_counterparty_and_funding_outpoint { + self.handle_monitor_update_release(node_id, funding_outpoint, Some(blocker)); + } }, } } @@ -5374,6 +5432,24 @@ where } } + /// Checks whether [`ChannelMonitorUpdate`]s generated by the receipt of a remote + /// [`msgs::RevokeAndACK`] should be held for the given channel until some other event + /// completes. Note that this needs to happen in the same [`PeerState`] mutex as any release of + /// the [`ChannelMonitorUpdate`] in question. + fn raa_monitor_updates_held(&self, + actions_blocking_raa_monitor_updates: &BTreeMap<[u8; 32], Vec>, + channel_funding_outpoint: OutPoint, counterparty_node_id: PublicKey + ) -> bool { + actions_blocking_raa_monitor_updates + .get(&channel_funding_outpoint.to_channel_id()).map(|v| !v.is_empty()).unwrap_or(false) + || self.pending_events.lock().unwrap().iter().any(|(_, action)| { + action == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate { + channel_funding_outpoint, + counterparty_node_id, + }) + }) + } + fn internal_revoke_and_ack(&self, counterparty_node_id: &PublicKey, msg: &msgs::RevokeAndACK) -> Result<(), MsgHandleErrInternal> { let (htlcs_to_fail, res) = { let per_peer_state = self.per_peer_state.read().unwrap(); @@ -6038,25 +6114,37 @@ where self.pending_outbound_payments.clear_pending_payments() } - fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint) { + /// When something which was blocking a channel from updating its [`ChannelMonitor`] (e.g. an + /// [`Event`] being handled) completes, this should be called to restore the channel to normal + /// operation. It will double-check that nothing *else* is also blocking the same channel from + /// making progress and then any blocked [`ChannelMonitorUpdate`]s fly. + fn handle_monitor_update_release(&self, counterparty_node_id: PublicKey, channel_funding_outpoint: OutPoint, mut completed_blocker: Option) { let mut errors = Vec::new(); loop { let per_peer_state = self.per_peer_state.read().unwrap(); if let Some(peer_state_mtx) = per_peer_state.get(&counterparty_node_id) { let mut peer_state_lck = peer_state_mtx.lock().unwrap(); let peer_state = &mut *peer_state_lck; - if self.pending_events.lock().unwrap().iter() - .any(|(_ev, action_opt)| action_opt == &Some(EventCompletionAction::ReleaseRAAChannelMonitorUpdate { - channel_funding_outpoint, counterparty_node_id - })) - { - // Check that, while holding the peer lock, we don't have another event - // blocking any monitor updates for this channel. If we do, let those - // events be the ones that ultimately release the monitor update(s). - log_trace!(self.logger, "Delaying monitor unlock for channel {} as another event is pending", + + if let Some(blocker) = completed_blocker.take() { + // Only do this on the first iteration of the loop. + if let Some(blockers) = peer_state.actions_blocking_raa_monitor_updates + .get_mut(&channel_funding_outpoint.to_channel_id()) + { + blockers.retain(|iter| iter != &blocker); + } + } + + if self.raa_monitor_updates_held(&peer_state.actions_blocking_raa_monitor_updates, + channel_funding_outpoint, counterparty_node_id) { + // Check that, while holding the peer lock, we don't have anything else + // blocking monitor updates for this channel. If we do, release the monitor + // update(s) when those blockers complete. + log_trace!(self.logger, "Delaying monitor unlock for channel {} as another channel's mon update needs to complete first", log_bytes!(&channel_funding_outpoint.to_channel_id()[..])); break; } + if let hash_map::Entry::Occupied(mut chan) = peer_state.channel_by_id.entry(channel_funding_outpoint.to_channel_id()) { debug_assert_eq!(chan.get().get_funding_txo().unwrap(), channel_funding_outpoint); if let Some((monitor_update, further_update_exists)) = chan.get_mut().unblock_next_blocked_monitor_update() { @@ -6098,7 +6186,7 @@ where EventCompletionAction::ReleaseRAAChannelMonitorUpdate { channel_funding_outpoint, counterparty_node_id } => { - self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint); + self.handle_monitor_update_release(counterparty_node_id, channel_funding_outpoint, None); } } } @@ -6774,6 +6862,7 @@ where latest_features: init_msg.features.clone(), pending_msg_events: Vec::new(), monitor_update_blocked_actions: BTreeMap::new(), + actions_blocking_raa_monitor_updates: BTreeMap::new(), is_connected: true, })); }, @@ -7970,6 +8059,7 @@ where latest_features: Readable::read(reader)?, pending_msg_events: Vec::new(), monitor_update_blocked_actions: BTreeMap::new(), + actions_blocking_raa_monitor_updates: BTreeMap::new(), is_connected: false, }; per_peer_state.insert(peer_pubkey, Mutex::new(peer_state)); @@ -8051,7 +8141,7 @@ where let mut claimable_htlc_purposes = None; let mut claimable_htlc_onion_fields = None; let mut pending_claiming_payments = Some(HashMap::new()); - let mut monitor_update_blocked_actions_per_peer = Some(Vec::new()); + let mut monitor_update_blocked_actions_per_peer: Option>)>> = Some(Vec::new()); let mut events_override = None; read_tlv_fields!(reader, { (1, pending_outbound_payments_no_retry, option), @@ -8376,7 +8466,21 @@ where } for (node_id, monitor_update_blocked_actions) in monitor_update_blocked_actions_per_peer.unwrap() { - if let Some(peer_state) = per_peer_state.get_mut(&node_id) { + if let Some(peer_state) = per_peer_state.get(&node_id) { + for (_, actions) in monitor_update_blocked_actions.iter() { + for action in actions.iter() { + if let MonitorUpdateCompletionAction::EmitEventAndFreeOtherChannel { + downstream_counterparty_and_funding_outpoint: + Some((blocked_node_id, blocked_channel_outpoint, blocking_action)), .. + } = action { + if let Some(blocked_peer_state) = per_peer_state.get(&blocked_node_id) { + blocked_peer_state.lock().unwrap().actions_blocking_raa_monitor_updates + .entry(blocked_channel_outpoint.to_channel_id()) + .or_insert_with(Vec::new).push(blocking_action.clone()); + } + } + } + } peer_state.lock().unwrap().monitor_update_blocked_actions = monitor_update_blocked_actions; } else { log_error!(args.logger, "Got blocked actions without a per-peer-state for {}", node_id);