Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure successful message propagation in case of disconnection mid-handshake #2725

Merged
merged 2 commits into from
Feb 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 33 additions & 13 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -903,7 +903,9 @@ impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
if require_disconnected && self.is_connected {
return false
}
self.channel_by_id.iter().filter(|(_, phase)| matches!(phase, ChannelPhase::Funded(_))).count() == 0
!self.channel_by_id.iter().any(|(_, phase)|
matches!(phase, ChannelPhase::Funded(_) | ChannelPhase::UnfundedOutboundV1(_))
)
&& self.monitor_update_blocked_actions.is_empty()
&& self.in_flight_monitor_updates.is_empty()
}
Expand Down Expand Up @@ -8905,10 +8907,12 @@ where
}
&mut chan.context
},
// Unfunded channels will always be removed.
ChannelPhase::UnfundedOutboundV1(chan) => {
&mut chan.context
// We retain UnfundedOutboundV1 channel for some time in case
// peer unexpectedly disconnects, and intends to reconnect again.
ChannelPhase::UnfundedOutboundV1(_) => {
return true;
},
// Unfunded inbound channels will always be removed.
ChannelPhase::UnfundedInboundV1(chan) => {
&mut chan.context
},
Expand Down Expand Up @@ -9047,15 +9051,31 @@ where
let peer_state = &mut *peer_state_lock;
let pending_msg_events = &mut peer_state.pending_msg_events;

peer_state.channel_by_id.iter_mut().filter_map(|(_, phase)|
if let ChannelPhase::Funded(chan) = phase { Some(chan) } else { None }
).for_each(|chan| {
let logger = WithChannelContext::from(&self.logger, &chan.context);
pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
node_id: chan.context.get_counterparty_node_id(),
msg: chan.get_channel_reestablish(&&logger),
});
});
for (_, phase) in peer_state.channel_by_id.iter_mut() {
match phase {
ChannelPhase::Funded(chan) => {
let logger = WithChannelContext::from(&self.logger, &chan.context);
pending_msg_events.push(events::MessageSendEvent::SendChannelReestablish {
node_id: chan.context.get_counterparty_node_id(),
msg: chan.get_channel_reestablish(&&logger),
});
}

ChannelPhase::UnfundedOutboundV1(chan) => {
pending_msg_events.push(events::MessageSendEvent::SendOpenChannel {
node_id: chan.context.get_counterparty_node_id(),
msg: chan.get_open_channel(self.chain_hash),
});
}

ChannelPhase::UnfundedInboundV1(_) => {
// Since unfunded inbound channel maps are cleared upon disconnecting a peer,
// they are not persisted and won't be recovered after a crash.
// Therefore, they shouldn't exist at this point.
debug_assert!(false);
}
}
}
}

return NotifyOption::SkipPersistHandleEvents;
Expand Down
149 changes: 127 additions & 22 deletions lightning/src/ln/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3695,7 +3695,7 @@ fn test_dup_events_on_peer_disconnect() {
#[test]
fn test_peer_disconnected_before_funding_broadcasted() {
// Test that channels are closed with `ClosureReason::DisconnectedPeer` if the peer disconnects
// before the funding transaction has been broadcasted.
// before the funding transaction has been broadcasted, and doesn't reconnect back within time.
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
Expand Down Expand Up @@ -3724,12 +3724,19 @@ fn test_peer_disconnected_before_funding_broadcasted() {
assert_eq!(nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap().len(), 0);
}

// Ensure that the channel is closed with `ClosureReason::DisconnectedPeer` when the peers are
// disconnected before the funding transaction was broadcasted.
// The peers disconnect before the funding is broadcasted.
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());

check_closed_event!(&nodes[0], 2, ClosureReason::DisconnectedPeer, true
// The time for peers to reconnect expires.
for _ in 0..UNFUNDED_CHANNEL_AGE_LIMIT_TICKS {
nodes[0].node.timer_tick_occurred();
}

// Ensure that the channel is closed with `ClosureReason::HolderForceClosed`
// when the peers are disconnected and do not reconnect before the funding
// transaction is broadcasted.
check_closed_event!(&nodes[0], 2, ClosureReason::HolderForceClosed, true
, [nodes[1].node.get_our_node_id()], 1000000);
check_closed_event!(&nodes[1], 1, ClosureReason::DisconnectedPeer, false
, [nodes[0].node.get_our_node_id()], 1000000);
Expand Down Expand Up @@ -10506,6 +10513,90 @@ fn test_remove_expired_inbound_unfunded_channels() {
check_closed_event(&nodes[1], 1, ClosureReason::HolderForceClosed, false, &[nodes[0].node.get_our_node_id()], 100000);
}

#[test]
fn test_channel_close_when_not_timely_accepted() {
// Create network of two nodes
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);

// Simulate peer-disconnects mid-handshake
// The channel is initiated from the node 0 side,
// but the nodes disconnect before node 1 could send accept channel
let create_chan_id = nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100000, 10001, 42, None, None).unwrap();
let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
assert_eq!(open_channel_msg.temporary_channel_id, create_chan_id);

nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());

// Make sure that we have not removed the OutboundV1Channel from node[0] immediately.
assert_eq!(nodes[0].node.list_channels().len(), 1);

// Since channel was inbound from node[1] perspective, it should have been dropped immediately.
assert_eq!(nodes[1].node.list_channels().len(), 0);

// In the meantime, some time passes.
for _ in 0..UNFUNDED_CHANNEL_AGE_LIMIT_TICKS {
nodes[0].node.timer_tick_occurred();
}

// Since we disconnected from peer and did not connect back within time,
// we should have forced-closed the channel by now.
check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed, [nodes[1].node.get_our_node_id()], 100000);
assert_eq!(nodes[0].node.list_channels().len(), 0);

{
// Since accept channel message was never received
// The channel should be forced close by now from node 0 side
// and the peer removed from per_peer_state
let node_0_per_peer_state = nodes[0].node.per_peer_state.read().unwrap();
assert_eq!(node_0_per_peer_state.len(), 0);
}
}

#[test]
fn test_rebroadcast_open_channel_when_reconnect_mid_handshake() {
// Create network of two nodes
let chanmon_cfgs = create_chanmon_cfgs(2);
let node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);

// Simulate peer-disconnects mid-handshake
// The channel is initiated from the node 0 side,
// but the nodes disconnect before node 1 could send accept channel
let create_chan_id = nodes[0].node.create_channel(nodes[1].node.get_our_node_id(), 100000, 10001, 42, None, None).unwrap();
let open_channel_msg = get_event_msg!(nodes[0], MessageSendEvent::SendOpenChannel, nodes[1].node.get_our_node_id());
assert_eq!(open_channel_msg.temporary_channel_id, create_chan_id);

nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());

// Make sure that we have not removed the OutboundV1Channel from node[0] immediately.
assert_eq!(nodes[0].node.list_channels().len(), 1);

// Since channel was inbound from node[1] perspective, it should have been immediately dropped.
assert_eq!(nodes[1].node.list_channels().len(), 0);

// The peers now reconnect
nodes[0].node.peer_connected(&nodes[1].node.get_our_node_id(), &msgs::Init {
features: nodes[1].node.init_features(), networks: None, remote_network_address: None
}, true).unwrap();
nodes[1].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
features: nodes[0].node.init_features(), networks: None, remote_network_address: None
}, false).unwrap();

// Make sure the SendOpenChannel message is added to node_0 pending message events
let msg_events = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(msg_events.len(), 1);
match &msg_events[0] {
MessageSendEvent::SendOpenChannel { msg, .. } => assert_eq!(msg, &open_channel_msg),
_ => panic!("Unexpected message."),
}
}

fn do_test_multi_post_event_actions(do_reload: bool) {
// Tests handling multiple post-Event actions at once.
// There is specific code in ChannelManager to handle channels where multiple post-Event
Expand Down Expand Up @@ -10662,7 +10753,9 @@ fn test_batch_channel_open() {
}

#[test]
fn test_disconnect_in_funding_batch() {
fn test_close_in_funding_batch() {
// This test ensures that if one of the channels
// in the batch closes, the complete batch will close.
let chanmon_cfgs = create_chanmon_cfgs(3);
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
Expand All @@ -10686,14 +10779,39 @@ fn test_disconnect_in_funding_batch() {
// The transaction should not have been broadcast before all channels are ready.
assert_eq!(nodes[0].tx_broadcaster.txn_broadcast().len(), 0);

// The remaining peer in the batch disconnects.
nodes[0].node.peer_disconnected(&nodes[2].node.get_our_node_id());

// The channels in the batch will close immediately.
// Force-close the channel for which we've completed the initial monitor.
let funding_txo_1 = OutPoint { txid: tx.txid(), index: 0 };
let funding_txo_2 = OutPoint { txid: tx.txid(), index: 1 };
let channel_id_1 = ChannelId::v1_from_funding_outpoint(funding_txo_1);
let channel_id_2 = ChannelId::v1_from_funding_outpoint(funding_txo_2);

nodes[0].node.force_close_broadcasting_latest_txn(&channel_id_1, &nodes[1].node.get_our_node_id()).unwrap();

// The monitor should become closed.
check_added_monitors(&nodes[0], 1);
{
let mut monitor_updates = nodes[0].chain_monitor.monitor_updates.lock().unwrap();
let monitor_updates_1 = monitor_updates.get(&channel_id_1).unwrap();
assert_eq!(monitor_updates_1.len(), 1);
assert_eq!(monitor_updates_1[0].update_id, CLOSED_CHANNEL_UPDATE_ID);
}

let msg_events = nodes[0].node.get_and_clear_pending_msg_events();
match msg_events[0] {
MessageSendEvent::HandleError { .. } => (),
_ => panic!("Unexpected message."),
}

// We broadcast the commitment transaction as part of the force-close.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Heh, this is kinda dumb, maybe we should fix that, but its not super critical and certainly unrelated to this PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Super interested in understanding the issue here! And probably might give it a try if it's not a super biggie!

{
let broadcasted_txs = nodes[0].tx_broadcaster.txn_broadcast();
assert_eq!(broadcasted_txs.len(), 1);
assert!(broadcasted_txs[0].txid() != tx.txid());
assert_eq!(broadcasted_txs[0].input.len(), 1);
assert_eq!(broadcasted_txs[0].input[0].previous_output.txid, tx.txid());
}

// All channels in the batch should close immediately.
check_closed_events(&nodes[0], &[
ExpectedCloseEvent {
channel_id: Some(channel_id_1),
Expand All @@ -10711,19 +10829,6 @@ fn test_disconnect_in_funding_batch() {
},
]);

// The monitor should become closed.
check_added_monitors(&nodes[0], 1);
{
let mut monitor_updates = nodes[0].chain_monitor.monitor_updates.lock().unwrap();
let monitor_updates_1 = monitor_updates.get(&channel_id_1).unwrap();
assert_eq!(monitor_updates_1.len(), 1);
assert_eq!(monitor_updates_1[0].update_id, CLOSED_CHANNEL_UPDATE_ID);
}

// The funding transaction should not have been broadcast, and therefore, we don't need
// to broadcast a force-close transaction for the closed monitor.
assert_eq!(nodes[0].tx_broadcaster.txn_broadcast().len(), 0);

// Ensure the channels don't exist anymore.
assert!(nodes[0].node.list_channels().is_empty());
}
Expand Down
Loading