Skip to content

Commit

Permalink
Fixed tests and Router use of UnwatchTxConfirmed
Browse files Browse the repository at this point in the history
- Fixup so Router does not send `UnwatchTxConfirmed` only for the spending txs that will never confirm. Channel also has a `WatchTxConfirmed` event that may trigger later and should not be removed.
- Fixup channel integration tests to generate enough blocks to deeply confirm a channel has closed once before waiting for the channel to close.
- Fixup router tests to check that funding tx spend is unwatched after it's spending tx confirms
  • Loading branch information
remyers committed Dec 11, 2024
1 parent 0f250b1 commit 007adc5
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm

case Event(WatchTxConfirmedTriggered(_, _, spendingTx), d) =>
d.spentChannels.get(spendingTx.txid) match {
case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, shortChannelId)
case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, spendingTx.txid, shortChannelId)
case None => stay()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ object Validation {
} else d1
}

def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, spendingTxId: TxId, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
val lostChannel = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get
log.info("funding tx for channelId={} was spent", shortChannelId)
Expand All @@ -294,7 +294,8 @@ object Validation {
// we will re-add a spliced channel as a new channel later when we receive the announcement
watcher ! UnwatchExternalChannelSpent(lostChannel.fundingTxId, outputIndex(lostChannel.ann.shortChannelId))
val spendingTxs = d.spentChannels.filter(_._2 == shortChannelId).keySet
spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId))
// stop watching the spending txs that will never confirm, but continue to watch the tx that spends the parent channel
(spendingTxs - spendingTxId).foreach(txId => watcher ! UnwatchTxConfirmed(txId))
val spentChannels1 = d.spentChannels -- spendingTxs
d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1, spentChannels = spentChannels1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,11 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {
val receivedByF = listReceivedByAddress(finalAddressF)
(receivedByF diff previouslyReceivedByF).size == expectedTxCountF && (receivedByC diff previouslyReceivedByC).size == expectedTxCountC
}, max = 30 seconds, interval = 1 second)
// we generate blocks to make txs confirm
generateBlocks(2, Some(minerAddress))
// we generate enough blocks for the channel to be deeply confirmed
generateBlocks(12, Some(minerAddress))
// and we wait for the channel to close
awaitCond(stateListenerC.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds)
awaitCond(stateListenerF.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds)

// generate enough blocks so the router will know the channel has been closed and not spliced
generateBlocks(12)
awaitAnnouncements(1)
}

Expand Down Expand Up @@ -238,14 +235,11 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {
val receivedByF = listReceivedByAddress(finalAddressF, sender)
(receivedByF diff previouslyReceivedByF).size == expectedTxCountF && (receivedByC diff previouslyReceivedByC).size == expectedTxCountC
}, max = 30 seconds, interval = 1 second)
// we generate blocks to make txs confirm
generateBlocks(2, Some(minerAddress))
// we generate enough blocks for the channel to be deeply confirmed
generateBlocks(12, Some(minerAddress))
// and we wait for the channel to close
awaitCond(stateListenerC.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds)
awaitCond(stateListenerF.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds)

// generate enough blocks so the router will know the channel has been closed and not spliced
generateBlocks(12)
awaitAnnouncements(1)
}

Expand Down Expand Up @@ -294,14 +288,11 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {
val receivedByF = listReceivedByAddress(finalAddressF, sender)
(receivedByF diff previouslyReceivedByF).size == expectedTxCountF && (receivedByC diff previouslyReceivedByC).size == expectedTxCountC
}, max = 30 seconds, interval = 1 second)
// we generate blocks to make txs confirm
generateBlocks(2, Some(minerAddress))
// we generate enough blocks for the channel to be deeply confirmed
generateBlocks(12, Some(minerAddress))
// and we wait for the channel to close
awaitCond(stateListenerC.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds)
awaitCond(stateListenerF.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds)

// generate enough blocks so the router will know the channel has been closed and not spliced
generateBlocks(12)
awaitAnnouncements(1)
}

Expand Down Expand Up @@ -353,14 +344,11 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec {
val receivedByF = listReceivedByAddress(finalAddressF, sender)
(receivedByF diff previouslyReceivedByF).size == expectedTxCountF && (receivedByC diff previouslyReceivedByC).size == expectedTxCountC
}, max = 30 seconds, interval = 1 second)
// we generate blocks to make tx confirm
generateBlocks(2, Some(minerAddress))
// we generate enough blocks for the channel to be deeply confirmed
generateBlocks(12, Some(minerAddress))
// and we wait for the channel to close
awaitCond(stateListenerC.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds)
awaitCond(stateListenerF.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds)

// generate enough blocks so the router will know the channel has been closed and not spliced
generateBlocks(12)
awaitAnnouncements(1)
}

Expand Down Expand Up @@ -599,15 +587,13 @@ class StandardChannelIntegrationSpec extends ChannelIntegrationSpec {
bitcoinClient.getMempool().pipeTo(sender.ref)
sender.expectMsgType[Seq[Transaction]].exists(_.txIn.head.outPoint.txid == fundingOutpoint.txid)
}, max = 20 seconds, interval = 1 second)
generateBlocks(3)
// we generate enough blocks for the channel to be deeply confirmed
generateBlocks(12)
awaitCond(stateListener.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds)

bitcoinClient.lookForSpendingTx(None, fundingOutpoint.txid, fundingOutpoint.index.toInt, limit = 10).pipeTo(sender.ref)
bitcoinClient.lookForSpendingTx(None, fundingOutpoint.txid, fundingOutpoint.index.toInt, limit = 12).pipeTo(sender.ref)
val closingTx = sender.expectMsgType[Transaction]
assert(closingTx.txOut.map(_.publicKeyScript).toSet == Set(finalPubKeyScriptC, finalPubKeyScriptF))

// generate enough blocks so the router will know the channel has been closed and not spliced
generateBlocks(12)
awaitAnnouncements(1)
}

Expand Down
35 changes: 22 additions & 13 deletions eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -319,10 +319,15 @@ class RouterSpec extends BaseRouterSpec {
probe.expectMsg(PublicNode(node_b, 2, publicChannelCapacity * 2))
}

def fundingTx(node1: PublicKey, node2: PublicKey, capacity: Satoshi = publicChannelCapacity): Transaction = {
val fundingScript = write(pay2wsh(Scripts.multiSig2of2(node1, node2)))
val fundingTx = Transaction(version = 0, txIn = Nil, txOut = TxOut(capacity, fundingScript) :: Nil, lockTime = 0)
fundingTx
}

def spendingTx(node1: PublicKey, node2: PublicKey, capacity: Satoshi = publicChannelCapacity): Transaction = {
val fundingScript = write(pay2wsh(Scripts.multiSig2of2(node1, node2)))
val previousFundingTx = Transaction(version = 2, txIn = Nil, txOut = TxOut(capacity, fundingScript) :: Nil, lockTime = 0)
val nextFundingTx = Transaction(version = 0, txIn = TxIn(OutPoint(previousFundingTx, 0), fundingScript, 0) :: Nil, txOut = TxOut(capacity, fundingScript) :: Nil, lockTime = 0)
val nextFundingTx = Transaction(version = 0, txIn = TxIn(OutPoint(fundingTx(node1, node2, capacity), 0), fundingScript, 0) :: Nil, txOut = TxOut(capacity, fundingScript) :: Nil, lockTime = 0)
nextFundingTx
}

Expand All @@ -335,6 +340,7 @@ class RouterSpec extends BaseRouterSpec {
watcher.expectMsgType[WatchTxConfirmed]
router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_a, funding_b))
eventListener.expectMsg(ChannelLost(scid_ab))
assert(watcher.expectMsgType[UnwatchExternalChannelSpent].txId == fundingTx(funding_a, funding_b).txid)
assert(nodeParams.db.network.getChannel(scid_ab).isEmpty)
// a doesn't have any channels, b still has one with c
eventListener.expectMsg(NodeLost(a))
Expand All @@ -345,6 +351,7 @@ class RouterSpec extends BaseRouterSpec {
router ! WatchExternalChannelSpentTriggered(scid_cd, spendingTx(funding_c, funding_d))
watcher.expectMsgType[WatchTxConfirmed]
router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_c, funding_d))
assert(watcher.expectMsgType[UnwatchExternalChannelSpent].txId == fundingTx(funding_c, funding_d).txid)
eventListener.expectMsg(ChannelLost(scid_cd))
assert(nodeParams.db.network.getChannel(scid_cd).isEmpty)
// d doesn't have any channels, c still has one with b
Expand All @@ -356,6 +363,7 @@ class RouterSpec extends BaseRouterSpec {
router ! WatchExternalChannelSpentTriggered(scid_bc, spendingTx(funding_b, funding_c))
watcher.expectMsgType[WatchTxConfirmed]
router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_b, funding_c))
assert(watcher.expectMsgType[UnwatchExternalChannelSpent].txId == fundingTx(funding_b, funding_c).txid)
eventListener.expectMsg(ChannelLost(scid_bc))
assert(nodeParams.db.network.getChannel(scid_bc).isEmpty)
// now b and c do not have any channels
Expand All @@ -374,10 +382,10 @@ class RouterSpec extends BaseRouterSpec {
val priv_funding_u = randomKey()
val scid_au = RealShortChannelId(fixture.nodeParams.currentBlockHeight - 5000, 5, 0)
val ann = channelAnnouncement(scid_au, priv_a, priv_u, priv_funding_a, priv_funding_u)
val fundingTx = Transaction(2, Nil, Seq(TxOut(500_000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, priv_funding_u.publicKey))))), 0)
val fundingTx_au = fundingTx(funding_a, priv_funding_u.publicKey, 500_000 sat)
router ! PeerRoutingMessage(TestProbe().ref, remoteNodeId, ann)
watcher.expectMsgType[ValidateRequest]
watcher.send(router, ValidateResult(ann, Right((fundingTx, UtxoStatus.Unspent))))
watcher.send(router, ValidateResult(ann, Right((fundingTx_au, UtxoStatus.Unspent))))
watcher.expectMsgType[WatchExternalChannelSpent]
eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(ann, 500_000 sat, None, None) :: Nil))
awaitAssert(assert(nodeParams.db.network.getChannel(scid_au).nonEmpty))
Expand All @@ -392,6 +400,7 @@ class RouterSpec extends BaseRouterSpec {
router ! WatchExternalChannelSpentTriggered(scid_au, spendingTx(funding_a, priv_funding_u.publicKey))
assert(watcher.expectMsgType[WatchTxConfirmed].txId == spendingTx(funding_a, priv_funding_u.publicKey).txid)
router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_a, priv_funding_u.publicKey))
assert(watcher.expectMsgType[UnwatchExternalChannelSpent].txId == fundingTx_au.txid)
eventListener.expectMsg(ChannelLost(scid_au))
eventListener.expectMsg(NodeLost(priv_u.publicKey))
awaitAssert(assert(nodeParams.db.network.getChannel(scid_au).isEmpty))
Expand Down Expand Up @@ -905,12 +914,12 @@ class RouterSpec extends BaseRouterSpec {
val scid = RealShortChannelId(fixture.nodeParams.currentBlockHeight - 5000, 5, 0)
val capacity = 1_000_000.sat
val ann = channelAnnouncement(scid, priv_a, priv_c, priv_funding_a, priv_funding_c)
val fundingTx = Transaction(2, Nil, Seq(TxOut(capacity, write(pay2wsh(Scripts.multiSig2of2(funding_a, funding_c))))), 0)
val peerConnection = TestProbe()
val fundingTx_ac = fundingTx(funding_a, funding_c, capacity)
peerConnection.ignoreMsg { case _: TransportHandler.ReadAck => true }
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, ann))
watcher.expectMsgType[ValidateRequest]
watcher.send(router, ValidateResult(ann, Right((fundingTx, UtxoStatus.Unspent))))
watcher.send(router, ValidateResult(ann, Right((fundingTx_ac, UtxoStatus.Unspent))))
peerConnection.expectMsg(GossipDecision.Accepted(ann))
probe.send(router, GetChannels)
assert(probe.expectMsgType[Iterable[ChannelAnnouncement]].exists(_.shortChannelId == scid))
Expand All @@ -928,17 +937,17 @@ class RouterSpec extends BaseRouterSpec {
val staleUpdate = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, c, scid, CltvExpiryDelta(72), 1 msat, 10 msat, 100, htlcMaximum, timestamp = staleTimestamp)
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, staleUpdate))
peerConnection.expectMsg(GossipDecision.Stale(staleUpdate))
assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, None, None, None)))
assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, None, None, None)))

// We receive a non-stale channel update for one side of the channel.
val update_ac_1 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, c, scid, CltvExpiryDelta(72), 1 msat, 10 msat, 100, htlcMaximum, timestamp = TimestampSecond.now() - 3.days)
peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update_ac_1))
peerConnection.expectMsg(GossipDecision.RelatedChannelPruned(update_ac_1))
peerConnection.expectNoMessage(100 millis)
if (update_ac_1.channelFlags.isNode1) {
assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, Some(update_ac_1), None, None)))
assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, Some(update_ac_1), None, None)))
} else {
assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, None, Some(update_ac_1), None)))
assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, None, Some(update_ac_1), None)))
}
probe.send(router, GetRouterData)
val routerData1 = probe.expectMsgType[Data]
Expand All @@ -953,9 +962,9 @@ class RouterSpec extends BaseRouterSpec {
peerConnection.expectMsg(GossipDecision.RelatedChannelPruned(update_ac_2))
peerConnection.expectNoMessage(100 millis)
if (update_ac_2.channelFlags.isNode1) {
assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, Some(update_ac_2), None, None)))
assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, Some(update_ac_2), None, None)))
} else {
assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, None, Some(update_ac_2), None)))
assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, None, Some(update_ac_2), None)))
}
probe.send(router, GetRouterData)
val routerData2 = probe.expectMsgType[Data]
Expand All @@ -972,9 +981,9 @@ class RouterSpec extends BaseRouterSpec {
assert(routerData3.channels.contains(scid))
assert(!routerData3.prunedChannels.contains(scid))
if (update_ac_2.channelFlags.isNode1) {
assert(routerData3.channels.get(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, Some(update_ac_2), Some(update_ca), None)))
assert(routerData3.channels.get(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, Some(update_ac_2), Some(update_ca), None)))
} else {
assert(routerData3.channels.get(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, Some(update_ca), Some(update_ac_2), None)))
assert(routerData3.channels.get(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, Some(update_ca), Some(update_ac_2), None)))
}
assert(routerData3.graphWithBalances.graph.containsEdge(ChannelDesc(update_ac_2, ann)))
assert(routerData3.graphWithBalances.graph.containsEdge(ChannelDesc(update_ca, ann)))
Expand Down

0 comments on commit 007adc5

Please sign in to comment.