Skip to content

Commit

Permalink
Merge pull request #374 from G8XSU/replay-events
Browse files Browse the repository at this point in the history
Replay events on event handling failures due to persistence failures.
  • Loading branch information
tnull authored Oct 16, 2024
2 parents 26e61e8 + 5594560 commit cffdf7e
Showing 1 changed file with 133 additions and 101 deletions.
234 changes: 133 additions & 101 deletions src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -498,11 +498,13 @@ where
status: Some(PaymentStatus::Failed),
..PaymentDetailsUpdate::new(payment_id)
};
self.payment_store.update(&update).unwrap_or_else(|e| {
log_error!(self.logger, "Failed to access payment store: {}", e);
panic!("Failed to access payment store");
});
return Ok(());
match self.payment_store.update(&update) {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to access payment store: {}", e);
return Err(ReplayEvent());
},
};
}

if info.status == PaymentStatus::Succeeded
Expand All @@ -520,11 +522,13 @@ where
status: Some(PaymentStatus::Failed),
..PaymentDetailsUpdate::new(payment_id)
};
self.payment_store.update(&update).unwrap_or_else(|e| {
log_error!(self.logger, "Failed to access payment store: {}", e);
panic!("Failed to access payment store");
});
return Ok(());
match self.payment_store.update(&update) {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to access payment store: {}", e);
return Err(ReplayEvent());
},
};
}

let max_total_opening_fee_msat = match info.kind {
Expand Down Expand Up @@ -559,11 +563,13 @@ where
status: Some(PaymentStatus::Failed),
..PaymentDetailsUpdate::new(payment_id)
};
self.payment_store.update(&update).unwrap_or_else(|e| {
log_error!(self.logger, "Failed to access payment store: {}", e);
panic!("Failed to access payment store");
});
return Ok(());
match self.payment_store.update(&update) {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to access payment store: {}", e);
return Err(ReplayEvent());
},
};
}

// If this is known by the store but ChannelManager doesn't know the preimage,
Expand All @@ -577,22 +583,23 @@ where
"We would have registered the preimage if we knew"
);

self.event_queue
.add_event(Event::PaymentClaimable {
payment_id,
payment_hash,
claimable_amount_msat: amount_msat,
claim_deadline,
})
.unwrap_or_else(|e| {
let event = Event::PaymentClaimable {
payment_id,
payment_hash,
claimable_amount_msat: amount_msat,
claim_deadline,
};
match self.event_queue.add_event(event) {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(
self.logger,
"Failed to push to event queue: {}",
e
);
panic!("Failed to push to event queue");
});
return Ok(());
return Err(ReplayEvent());
},
};
}
},
_ => {},
Expand Down Expand Up @@ -715,10 +722,13 @@ where
status: Some(PaymentStatus::Failed),
..PaymentDetailsUpdate::new(payment_id)
};
self.payment_store.update(&update).unwrap_or_else(|e| {
log_error!(self.logger, "Failed to access payment store: {}", e);
panic!("Failed to access payment store");
});
match self.payment_store.update(&update) {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to access payment store: {}", e);
return Err(ReplayEvent());
},
};
}
},
LdkEvent::PaymentClaimed {
Expand Down Expand Up @@ -796,20 +806,22 @@ where
payment_id,
e
);
panic!("Failed to access payment store");
return Err(ReplayEvent());
},
}

self.event_queue
.add_event(Event::PaymentReceived {
payment_id: Some(payment_id),
payment_hash,
amount_msat,
})
.unwrap_or_else(|e| {
let event = Event::PaymentReceived {
payment_id: Some(payment_id),
payment_hash,
amount_msat,
};
match self.event_queue.add_event(event) {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
panic!("Failed to push to event queue");
});
return Err(ReplayEvent());
},
};
},
LdkEvent::PaymentSent {
payment_id,
Expand All @@ -832,10 +844,13 @@ where
..PaymentDetailsUpdate::new(payment_id)
};

self.payment_store.update(&update).unwrap_or_else(|e| {
log_error!(self.logger, "Failed to access payment store: {}", e);
panic!("Failed to access payment store");
});
match self.payment_store.update(&update) {
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to access payment store: {}", e);
return Err(ReplayEvent());
},
};

self.payment_store.get(&payment_id).map(|payment| {
log_info!(
Expand All @@ -852,17 +867,19 @@ where
hex_utils::to_string(&payment_preimage.0)
);
});
let event = Event::PaymentSuccessful {
payment_id: Some(payment_id),
payment_hash,
fee_paid_msat,
};

self.event_queue
.add_event(Event::PaymentSuccessful {
payment_id: Some(payment_id),
payment_hash,
fee_paid_msat,
})
.unwrap_or_else(|e| {
match self.event_queue.add_event(event) {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
panic!("Failed to push to event queue");
});
return Err(ReplayEvent());
},
};
},
LdkEvent::PaymentFailed { payment_id, payment_hash, reason, .. } => {
log_info!(
Expand All @@ -877,20 +894,23 @@ where
status: Some(PaymentStatus::Failed),
..PaymentDetailsUpdate::new(payment_id)
};
self.payment_store.update(&update).unwrap_or_else(|e| {
log_error!(self.logger, "Failed to access payment store: {}", e);
panic!("Failed to access payment store");
});
self.event_queue
.add_event(Event::PaymentFailed {
payment_id: Some(payment_id),
payment_hash,
reason,
})
.unwrap_or_else(|e| {
match self.payment_store.update(&update) {
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to access payment store: {}", e);
return Err(ReplayEvent());
},
};

let event =
Event::PaymentFailed { payment_id: Some(payment_id), payment_hash, reason };
match self.event_queue.add_event(event) {
Ok(_) => return Ok(()),
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
panic!("Failed to push to event queue");
});
return Err(ReplayEvent());
},
};
},

LdkEvent::PaymentPathSuccessful { .. } => {},
Expand All @@ -915,12 +935,13 @@ where
}
},
LdkEvent::SpendableOutputs { outputs, channel_id } => {
self.output_sweeper
.track_spendable_outputs(outputs, channel_id, true, None)
.unwrap_or_else(|_| {
match self.output_sweeper.track_spendable_outputs(outputs, channel_id, true, None) {
Ok(_) => return Ok(()),
Err(_) => {
log_error!(self.logger, "Failed to track spendable outputs");
panic!("Failed to track spendable outputs");
});
return Err(ReplayEvent());
},
};
},
LdkEvent::OpenChannelRequest {
temporary_channel_id,
Expand Down Expand Up @@ -1111,18 +1132,22 @@ where
channel_id,
counterparty_node_id,
);
self.event_queue
.add_event(Event::ChannelPending {
channel_id,
user_channel_id: UserChannelId(user_channel_id),
former_temporary_channel_id: former_temporary_channel_id.unwrap(),
counterparty_node_id,
funding_txo,
})
.unwrap_or_else(|e| {

let event = Event::ChannelPending {
channel_id,
user_channel_id: UserChannelId(user_channel_id),
former_temporary_channel_id: former_temporary_channel_id.unwrap(),
counterparty_node_id,
funding_txo,
};
match self.event_queue.add_event(event) {
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
panic!("Failed to push to event queue");
});
return Err(ReplayEvent());
},
};

let network_graph = self.network_graph.read_only();
let channels =
self.channel_manager.list_channels_with_counterparty(&counterparty_node_id);
Expand Down Expand Up @@ -1164,16 +1189,19 @@ where
channel_id,
counterparty_node_id,
);
self.event_queue
.add_event(Event::ChannelReady {
channel_id,
user_channel_id: UserChannelId(user_channel_id),
counterparty_node_id: Some(counterparty_node_id),
})
.unwrap_or_else(|e| {

let event = Event::ChannelReady {
channel_id,
user_channel_id: UserChannelId(user_channel_id),
counterparty_node_id: Some(counterparty_node_id),
};
match self.event_queue.add_event(event) {
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
panic!("Failed to push to event queue");
});
return Err(ReplayEvent());
},
};
},
LdkEvent::ChannelClosed {
channel_id,
Expand All @@ -1183,17 +1211,21 @@ where
..
} => {
log_info!(self.logger, "Channel {} closed due to: {}", channel_id, reason);
self.event_queue
.add_event(Event::ChannelClosed {
channel_id,
user_channel_id: UserChannelId(user_channel_id),
counterparty_node_id,
reason: Some(reason),
})
.unwrap_or_else(|e| {

let event = Event::ChannelClosed {
channel_id,
user_channel_id: UserChannelId(user_channel_id),
counterparty_node_id,
reason: Some(reason),
};

match self.event_queue.add_event(event) {
Ok(_) => {},
Err(e) => {
log_error!(self.logger, "Failed to push to event queue: {}", e);
panic!("Failed to push to event queue");
});
return Err(ReplayEvent());
},
};
},
LdkEvent::DiscardFunding { .. } => {},
LdkEvent::HTLCIntercepted { .. } => {},
Expand Down

0 comments on commit cffdf7e

Please sign in to comment.