Skip to content

Commit

Permalink
Merge pull request #1004 from TheBlueMatt/2021-07-forward-event
Browse files Browse the repository at this point in the history
Add a `PaymentForwarded` Event
  • Loading branch information
TheBlueMatt authored Aug 4, 2021
2 parents 09e1670 + 50f47ec commit 69ee486
Show file tree
Hide file tree
Showing 11 changed files with 249 additions and 113 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
# 0.0.100 - WIP

## Serialization Compatibility
* HTLCs which were in the process of being claimed on-chain when a pre-0.0.100
`ChannelMonitor` was serialized may generate `PaymentForwarded` events with
spurious `fee_earned_msat` values. This only applies to payments which were
unresolved at the time of the upgrade.
* 0.0.100 clients with pending PaymentForwarded events at serialization-time
will generate serialized `ChannelManager` objects which 0.0.99 and earlier
clients cannot read. The likelihood of this can be reduced by ensuring you
process all pending events immediately before serialization (as is done by
the `lightning-background-processor` crate).


# 0.0.99 - 2021-07-09

## API Updates
Expand Down
1 change: 1 addition & 0 deletions fuzz/src/chanmon_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,7 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
},
events::Event::PaymentSent { .. } => {},
events::Event::PaymentFailed { .. } => {},
events::Event::PaymentForwarded { .. } if $node == 1 => {},
events::Event::PendingHTLCsForwardable { .. } => {
nodes[$node].process_pending_htlc_forwards();
},
Expand Down
4 changes: 1 addition & 3 deletions fuzz/src/full_stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,12 +596,10 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {
//TODO: enhance by fetching random amounts from fuzz input?
payments_received.push(payment_hash);
},
Event::PaymentSent {..} => {},
Event::PaymentFailed {..} => {},
Event::PendingHTLCsForwardable {..} => {
should_forward = true;
},
Event::SpendableOutputs {..} => {},
_ => {},
}
}
}
Expand Down
78 changes: 44 additions & 34 deletions lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,10 +199,12 @@ pub enum MonitorEvent {
pub struct HTLCUpdate {
pub(crate) payment_hash: PaymentHash,
pub(crate) payment_preimage: Option<PaymentPreimage>,
pub(crate) source: HTLCSource
pub(crate) source: HTLCSource,
pub(crate) onchain_value_satoshis: Option<u64>,
}
impl_writeable_tlv_based!(HTLCUpdate, {
(0, payment_hash, required),
(1, onchain_value_satoshis, option),
(2, source, required),
(4, payment_preimage, option),
});
Expand Down Expand Up @@ -385,6 +387,7 @@ enum OnchainEvent {
HTLCUpdate {
source: HTLCSource,
payment_hash: PaymentHash,
onchain_value_satoshis: Option<u64>,
},
MaturingOutput {
descriptor: SpendableOutputDescriptor,
Expand All @@ -400,6 +403,7 @@ impl_writeable_tlv_based!(OnchainEventEntry, {
impl_writeable_tlv_based_enum!(OnchainEvent,
(0, HTLCUpdate) => {
(0, source, required),
(1, onchain_value_satoshis, option),
(2, payment_hash, required),
},
(1, MaturingOutput) => {
Expand Down Expand Up @@ -1574,6 +1578,7 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
event: OnchainEvent::HTLCUpdate {
source: (**source).clone(),
payment_hash: htlc.payment_hash.clone(),
onchain_value_satoshis: Some(htlc.amount_msat / 1000),
},
};
log_info!(logger, "Failing HTLC with payment_hash {} from {} counterparty commitment tx due to broadcast of revoked counterparty commitment transaction, waiting for confirmation (at height {})", log_bytes!(htlc.payment_hash.0), $commitment_tx, entry.confirmation_threshold());
Expand Down Expand Up @@ -1641,6 +1646,7 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
event: OnchainEvent::HTLCUpdate {
source: (**source).clone(),
payment_hash: htlc.payment_hash.clone(),
onchain_value_satoshis: Some(htlc.amount_msat / 1000),
},
});
}
Expand Down Expand Up @@ -1779,27 +1785,6 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
let mut claim_requests = Vec::new();
let mut watch_outputs = Vec::new();

macro_rules! wait_threshold_conf {
($source: expr, $commitment_tx: expr, $payment_hash: expr) => {
self.onchain_events_awaiting_threshold_conf.retain(|ref entry| {
if entry.height != height { return true; }
match entry.event {
OnchainEvent::HTLCUpdate { source: ref update_source, .. } => {
*update_source != $source
},
_ => true,
}
});
let entry = OnchainEventEntry {
txid: commitment_txid,
height,
event: OnchainEvent::HTLCUpdate { source: $source, payment_hash: $payment_hash },
};
log_trace!(logger, "Failing HTLC with payment_hash {} from {} holder commitment tx due to broadcast of transaction, waiting confirmation (at height{})", log_bytes!($payment_hash.0), $commitment_tx, entry.confirmation_threshold());
self.onchain_events_awaiting_threshold_conf.push(entry);
}
}

macro_rules! append_onchain_update {
($updates: expr, $to_watch: expr) => {
claim_requests = $updates.0;
Expand Down Expand Up @@ -1828,21 +1813,40 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
}

macro_rules! fail_dust_htlcs_after_threshold_conf {
($holder_tx: expr) => {
($holder_tx: expr, $commitment_tx: expr) => {
for &(ref htlc, _, ref source) in &$holder_tx.htlc_outputs {
if htlc.transaction_output_index.is_none() {
if let &Some(ref source) = source {
wait_threshold_conf!(source.clone(), "lastest", htlc.payment_hash.clone());
self.onchain_events_awaiting_threshold_conf.retain(|ref entry| {
if entry.height != height { return true; }
match entry.event {
OnchainEvent::HTLCUpdate { source: ref update_source, .. } => {
update_source != source
},
_ => true,
}
});
let entry = OnchainEventEntry {
txid: commitment_txid,
height,
event: OnchainEvent::HTLCUpdate {
source: source.clone(), payment_hash: htlc.payment_hash,
onchain_value_satoshis: Some(htlc.amount_msat / 1000)
},
};
log_trace!(logger, "Failing HTLC with payment_hash {} from {} holder commitment tx due to broadcast of transaction, waiting confirmation (at height{})",
log_bytes!(htlc.payment_hash.0), $commitment_tx, entry.confirmation_threshold());
self.onchain_events_awaiting_threshold_conf.push(entry);
}
}
}
}
}

if is_holder_tx {
fail_dust_htlcs_after_threshold_conf!(self.current_holder_commitment_tx);
fail_dust_htlcs_after_threshold_conf!(self.current_holder_commitment_tx, "latest");
if let &Some(ref holder_tx) = &self.prev_holder_signed_commitment_tx {
fail_dust_htlcs_after_threshold_conf!(holder_tx);
fail_dust_htlcs_after_threshold_conf!(holder_tx, "previous");
}
}

Expand Down Expand Up @@ -2090,7 +2094,7 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
// Produce actionable events from on-chain events having reached their threshold.
for entry in onchain_events_reaching_threshold_conf.drain(..) {
match entry.event {
OnchainEvent::HTLCUpdate { ref source, payment_hash } => {
OnchainEvent::HTLCUpdate { ref source, payment_hash, onchain_value_satoshis } => {
// Check for duplicate HTLC resolutions.
#[cfg(debug_assertions)]
{
Expand All @@ -2109,9 +2113,10 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {

log_debug!(logger, "HTLC {} failure update has got enough confirmations to be passed upstream", log_bytes!(payment_hash.0));
self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
payment_hash: payment_hash,
payment_hash,
payment_preimage: None,
source: source.clone(),
onchain_value_satoshis,
}));
},
OnchainEvent::MaturingOutput { descriptor } => {
Expand Down Expand Up @@ -2328,7 +2333,7 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
if pending_htlc.payment_hash == $htlc_output.payment_hash && pending_htlc.amount_msat == $htlc_output.amount_msat {
if let &Some(ref source) = pending_source {
log_claim!("revoked counterparty commitment tx", false, pending_htlc, true);
payment_data = Some(((**source).clone(), $htlc_output.payment_hash));
payment_data = Some(((**source).clone(), $htlc_output.payment_hash, $htlc_output.amount_msat));
break;
}
}
Expand All @@ -2348,7 +2353,7 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
// transaction. This implies we either learned a preimage, the HTLC
// has timed out, or we screwed up. In any case, we should now
// resolve the source HTLC with the original sender.
payment_data = Some(((*source).clone(), htlc_output.payment_hash));
payment_data = Some(((*source).clone(), htlc_output.payment_hash, htlc_output.amount_msat));
} else if !$holder_tx {
check_htlc_valid_counterparty!(self.current_counterparty_commitment_txid, htlc_output);
if payment_data.is_none() {
Expand Down Expand Up @@ -2381,7 +2386,7 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {

// Check that scan_commitment, above, decided there is some source worth relaying an
// HTLC resolution backwards to and figure out whether we learned a preimage from it.
if let Some((source, payment_hash)) = payment_data {
if let Some((source, payment_hash, amount_msat)) = payment_data {
let mut payment_preimage = PaymentPreimage([0; 32]);
if accepted_preimage_claim {
if !self.pending_monitor_events.iter().any(
Expand All @@ -2390,7 +2395,8 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
source,
payment_preimage: Some(payment_preimage),
payment_hash
payment_hash,
onchain_value_satoshis: Some(amount_msat / 1000),
}));
}
} else if offered_preimage_claim {
Expand All @@ -2402,7 +2408,8 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
self.pending_monitor_events.push(MonitorEvent::HTLCEvent(HTLCUpdate {
source,
payment_preimage: Some(payment_preimage),
payment_hash
payment_hash,
onchain_value_satoshis: Some(amount_msat / 1000),
}));
}
} else {
Expand All @@ -2418,7 +2425,10 @@ impl<Signer: Sign> ChannelMonitorImpl<Signer> {
let entry = OnchainEventEntry {
txid: tx.txid(),
height,
event: OnchainEvent::HTLCUpdate { source: source, payment_hash: payment_hash },
event: OnchainEvent::HTLCUpdate {
source, payment_hash,
onchain_value_satoshis: Some(amount_msat / 1000),
},
};
log_info!(logger, "Failing HTLC with payment_hash {} timeout by a spend tx, waiting for confirmation (at height {})", log_bytes!(payment_hash.0), entry.confirmation_threshold());
self.onchain_events_awaiting_threshold_conf.push(entry);
Expand Down
2 changes: 2 additions & 0 deletions lightning/src/ln/chanmon_update_fail_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,7 @@ fn test_monitor_update_fail_reestablish() {
assert!(updates.update_fee.is_none());
assert_eq!(updates.update_fulfill_htlcs.len(), 1);
nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &updates.update_fulfill_htlcs[0]);
expect_payment_forwarded!(nodes[1], Some(1000), false);
check_added_monitors!(nodes[1], 1);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
commitment_signed_dance!(nodes[1], nodes[2], updates.commitment_signed, false);
Expand Down Expand Up @@ -2318,6 +2319,7 @@ fn do_test_reconnect_dup_htlc_claims(htlc_status: HTLCStatusAtDupClaim, second_f
assert_eq!(fulfill_msg, cs_updates.update_fulfill_htlcs[0]);
}
nodes[1].node.handle_update_fulfill_htlc(&nodes[2].node.get_our_node_id(), &fulfill_msg);
expect_payment_forwarded!(nodes[1], Some(1000), false);
check_added_monitors!(nodes[1], 1);

let mut bs_updates = None;
Expand Down
32 changes: 21 additions & 11 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ pub struct CounterpartyForwardingInfo {
enum UpdateFulfillFetch {
NewClaim {
monitor_update: ChannelMonitorUpdate,
htlc_value_msat: u64,
msg: Option<msgs::UpdateFulfillHTLC>,
},
DuplicateClaim {},
Expand All @@ -320,6 +321,8 @@ pub enum UpdateFulfillCommitFetch {
NewClaim {
/// The ChannelMonitorUpdate which places the new payment preimage in the channel monitor
monitor_update: ChannelMonitorUpdate,
/// The value of the HTLC which was claimed, in msat.
htlc_value_msat: u64,
/// The update_fulfill message and commitment_signed message (if the claim was not placed
/// in the holding cell).
msgs: Option<(msgs::UpdateFulfillHTLC, msgs::CommitmentSigned)>,
Expand All @@ -337,6 +340,9 @@ pub enum UpdateFulfillCommitFetch {
// Holder designates channel data owned for the benefice of the user client.
// Counterparty designates channel data owned by the another channel participant entity.
pub(super) struct Channel<Signer: Sign> {
#[cfg(any(test, feature = "_test_utils"))]
pub(crate) config: ChannelConfig,
#[cfg(not(any(test, feature = "_test_utils")))]
config: ChannelConfig,

user_id: u64,
Expand Down Expand Up @@ -1276,6 +1282,7 @@ impl<Signer: Sign> Channel<Signer> {
// these, but for now we just have to treat them as normal.

let mut pending_idx = core::usize::MAX;
let mut htlc_value_msat = 0;
for (idx, htlc) in self.pending_inbound_htlcs.iter().enumerate() {
if htlc.htlc_id == htlc_id_arg {
assert_eq!(htlc.payment_hash, payment_hash_calc);
Expand All @@ -1295,6 +1302,7 @@ impl<Signer: Sign> Channel<Signer> {
}
}
pending_idx = idx;
htlc_value_msat = htlc.amount_msat;
break;
}
}
Expand Down Expand Up @@ -1336,7 +1344,7 @@ impl<Signer: Sign> Channel<Signer> {
// TODO: We may actually be able to switch to a fulfill here, though its
// rare enough it may not be worth the complexity burden.
debug_assert!(false, "Tried to fulfill an HTLC that was already failed");
return UpdateFulfillFetch::NewClaim { monitor_update, msg: None };
return UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None };
}
},
_ => {}
Expand All @@ -1348,7 +1356,7 @@ impl<Signer: Sign> Channel<Signer> {
});
#[cfg(any(test, feature = "fuzztarget"))]
self.historical_inbound_htlc_fulfills.insert(htlc_id_arg);
return UpdateFulfillFetch::NewClaim { monitor_update, msg: None };
return UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None };
}
#[cfg(any(test, feature = "fuzztarget"))]
self.historical_inbound_htlc_fulfills.insert(htlc_id_arg);
Expand All @@ -1358,14 +1366,15 @@ impl<Signer: Sign> Channel<Signer> {
if let InboundHTLCState::Committed = htlc.state {
} else {
debug_assert!(false, "Have an inbound HTLC we tried to claim before it was fully committed to");
return UpdateFulfillFetch::NewClaim { monitor_update, msg: None };
return UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None };
}
log_trace!(logger, "Upgrading HTLC {} to LocalRemoved with a Fulfill in channel {}!", log_bytes!(htlc.payment_hash.0), log_bytes!(self.channel_id));
htlc.state = InboundHTLCState::LocalRemoved(InboundHTLCRemovalReason::Fulfill(payment_preimage_arg.clone()));
}

UpdateFulfillFetch::NewClaim {
monitor_update,
htlc_value_msat,
msg: Some(msgs::UpdateFulfillHTLC {
channel_id: self.channel_id(),
htlc_id: htlc_id_arg,
Expand All @@ -1376,7 +1385,7 @@ impl<Signer: Sign> Channel<Signer> {

pub fn get_update_fulfill_htlc_and_commit<L: Deref>(&mut self, htlc_id: u64, payment_preimage: PaymentPreimage, logger: &L) -> Result<UpdateFulfillCommitFetch, (ChannelError, ChannelMonitorUpdate)> where L::Target: Logger {
match self.get_update_fulfill_htlc(htlc_id, payment_preimage, logger) {
UpdateFulfillFetch::NewClaim { mut monitor_update, msg: Some(update_fulfill_htlc) } => {
UpdateFulfillFetch::NewClaim { mut monitor_update, htlc_value_msat, msg: Some(update_fulfill_htlc) } => {
let (commitment, mut additional_update) = match self.send_commitment_no_status_check(logger) {
Err(e) => return Err((e, monitor_update)),
Ok(res) => res
Expand All @@ -1385,9 +1394,10 @@ impl<Signer: Sign> Channel<Signer> {
// strictly increasing by one, so decrement it here.
self.latest_monitor_update_id = monitor_update.update_id;
monitor_update.updates.append(&mut additional_update.updates);
Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, msgs: Some((update_fulfill_htlc, commitment)) })
Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: Some((update_fulfill_htlc, commitment)) })
},
UpdateFulfillFetch::NewClaim { monitor_update, msg: None } => Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, msgs: None }),
UpdateFulfillFetch::NewClaim { monitor_update, htlc_value_msat, msg: None } =>
Ok(UpdateFulfillCommitFetch::NewClaim { monitor_update, htlc_value_msat, msgs: None }),
UpdateFulfillFetch::DuplicateClaim {} => Ok(UpdateFulfillCommitFetch::DuplicateClaim {}),
}
}
Expand Down Expand Up @@ -2164,7 +2174,7 @@ impl<Signer: Sign> Channel<Signer> {

/// Marks an outbound HTLC which we have received update_fail/fulfill/malformed
#[inline]
fn mark_outbound_htlc_removed(&mut self, htlc_id: u64, check_preimage: Option<PaymentHash>, fail_reason: Option<HTLCFailReason>) -> Result<&HTLCSource, ChannelError> {
fn mark_outbound_htlc_removed(&mut self, htlc_id: u64, check_preimage: Option<PaymentHash>, fail_reason: Option<HTLCFailReason>) -> Result<&OutboundHTLCOutput, ChannelError> {
for htlc in self.pending_outbound_htlcs.iter_mut() {
if htlc.htlc_id == htlc_id {
match check_preimage {
Expand All @@ -2183,13 +2193,13 @@ impl<Signer: Sign> Channel<Signer> {
OutboundHTLCState::AwaitingRemoteRevokeToRemove(_) | OutboundHTLCState::AwaitingRemovedRemoteRevoke(_) | OutboundHTLCState::RemoteRemoved(_) =>
return Err(ChannelError::Close(format!("Remote tried to fulfill/fail HTLC ({}) that they'd already fulfilled/failed", htlc_id))),
}
return Ok(&htlc.source);
return Ok(htlc);
}
}
Err(ChannelError::Close("Remote tried to fulfill/fail an HTLC we couldn't find".to_owned()))
}

pub fn update_fulfill_htlc(&mut self, msg: &msgs::UpdateFulfillHTLC) -> Result<HTLCSource, ChannelError> {
pub fn update_fulfill_htlc(&mut self, msg: &msgs::UpdateFulfillHTLC) -> Result<(HTLCSource, u64), ChannelError> {
if (self.channel_state & (ChannelState::ChannelFunded as u32)) != (ChannelState::ChannelFunded as u32) {
return Err(ChannelError::Close("Got fulfill HTLC message when channel was not in an operational state".to_owned()));
}
Expand All @@ -2198,7 +2208,7 @@ impl<Signer: Sign> Channel<Signer> {
}

let payment_hash = PaymentHash(Sha256::hash(&msg.payment_preimage.0[..]).into_inner());
self.mark_outbound_htlc_removed(msg.htlc_id, Some(payment_hash), None).map(|source| source.clone())
self.mark_outbound_htlc_removed(msg.htlc_id, Some(payment_hash), None).map(|htlc| (htlc.source.clone(), htlc.amount_msat))
}

pub fn update_fail_htlc(&mut self, msg: &msgs::UpdateFailHTLC, fail_reason: HTLCFailReason) -> Result<(), ChannelError> {
Expand Down Expand Up @@ -2497,7 +2507,7 @@ impl<Signer: Sign> Channel<Signer> {
// in it hitting the holding cell again and we cannot change the state of a
// holding cell HTLC from fulfill to anything else.
let (update_fulfill_msg_option, mut additional_monitor_update) =
if let UpdateFulfillFetch::NewClaim { msg, monitor_update } = self.get_update_fulfill_htlc(htlc_id, *payment_preimage, logger) {
if let UpdateFulfillFetch::NewClaim { msg, monitor_update, .. } = self.get_update_fulfill_htlc(htlc_id, *payment_preimage, logger) {
(msg, monitor_update)
} else { unreachable!() };
update_fulfill_htlcs.push(update_fulfill_msg_option.unwrap());
Expand Down
Loading

0 comments on commit 69ee486

Please sign in to comment.