From 007adc5c381782dbe0541fb1f88d3a0f77506a6f Mon Sep 17 00:00:00 2001 From: Richard Myers Date: Wed, 11 Dec 2024 11:55:56 +0100 Subject: [PATCH] Fixed tests and Router use of UnwatchTxConfirmed - Fixup so Router does not send `UnwatchTxConfirmed` only for the spending txs that will never confirm. Channel also has a `WatchTxConfirmed` event that may trigger later and should not be removed. - Fixup channel integration tests to generate enough blocks to deeply confirm a channel has closed once before waiting for the channel to close. - Fixup router tests to check that funding tx spend is unwatched after it's spending tx confirms --- .../scala/fr/acinq/eclair/router/Router.scala | 2 +- .../fr/acinq/eclair/router/Validation.scala | 5 +-- .../integration/ChannelIntegrationSpec.scala | 36 ++++++------------- .../fr/acinq/eclair/router/RouterSpec.scala | 35 +++++++++++------- 4 files changed, 37 insertions(+), 41 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index c0a905101b..ecdf76c217 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -269,7 +269,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm case Event(WatchTxConfirmedTriggered(_, _, spendingTx), d) => d.spentChannels.get(spendingTx.txid) match { - case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, shortChannelId) + case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, spendingTx.txid, shortChannelId) case None => stay() } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala index 6556eb0aef..6b2f3623e8 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala @@ -267,7 +267,7 @@ object Validation { } else d1 } - def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { + def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, spendingTxId: TxId, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors val lostChannel = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get log.info("funding tx for channelId={} was spent", shortChannelId) @@ -294,7 +294,8 @@ object Validation { // we will re-add a spliced channel as a new channel later when we receive the announcement watcher ! UnwatchExternalChannelSpent(lostChannel.fundingTxId, outputIndex(lostChannel.ann.shortChannelId)) val spendingTxs = d.spentChannels.filter(_._2 == shortChannelId).keySet - spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId)) + // stop watching the spending txs that will never confirm, but continue to watch the tx that spends the parent channel + (spendingTxs - spendingTxId).foreach(txId => watcher ! UnwatchTxConfirmed(txId)) val spentChannels1 = d.spentChannels -- spendingTxs d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1, spentChannels = spentChannels1) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/ChannelIntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/ChannelIntegrationSpec.scala index 8f6f663e53..bf7f5985f0 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/ChannelIntegrationSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/ChannelIntegrationSpec.scala @@ -194,14 +194,11 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec { val receivedByF = listReceivedByAddress(finalAddressF) (receivedByF diff previouslyReceivedByF).size == expectedTxCountF && (receivedByC diff previouslyReceivedByC).size == expectedTxCountC }, max = 30 seconds, interval = 1 second) - // we generate blocks to make txs confirm - generateBlocks(2, Some(minerAddress)) + // we generate enough blocks for the channel to be deeply confirmed + generateBlocks(12, Some(minerAddress)) // and we wait for the channel to close awaitCond(stateListenerC.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) awaitCond(stateListenerF.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) - - // generate enough blocks so the router will know the channel has been closed and not spliced - generateBlocks(12) awaitAnnouncements(1) } @@ -238,14 +235,11 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec { val receivedByF = listReceivedByAddress(finalAddressF, sender) (receivedByF diff previouslyReceivedByF).size == expectedTxCountF && (receivedByC diff previouslyReceivedByC).size == expectedTxCountC }, max = 30 seconds, interval = 1 second) - // we generate blocks to make txs confirm - generateBlocks(2, Some(minerAddress)) + // we generate enough blocks for the channel to be deeply confirmed + generateBlocks(12, Some(minerAddress)) // and we wait for the channel to close awaitCond(stateListenerC.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) awaitCond(stateListenerF.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) - - // generate enough blocks so the router will know the channel has been closed and not spliced - generateBlocks(12) awaitAnnouncements(1) } @@ -294,14 +288,11 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec { val receivedByF = listReceivedByAddress(finalAddressF, sender) (receivedByF diff previouslyReceivedByF).size == expectedTxCountF && (receivedByC diff previouslyReceivedByC).size == expectedTxCountC }, max = 30 seconds, interval = 1 second) - // we generate blocks to make txs confirm - generateBlocks(2, Some(minerAddress)) + // we generate enough blocks for the channel to be deeply confirmed + generateBlocks(12, Some(minerAddress)) // and we wait for the channel to close awaitCond(stateListenerC.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) awaitCond(stateListenerF.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) - - // generate enough blocks so the router will know the channel has been closed and not spliced - generateBlocks(12) awaitAnnouncements(1) } @@ -353,14 +344,11 @@ abstract class ChannelIntegrationSpec extends IntegrationSpec { val receivedByF = listReceivedByAddress(finalAddressF, sender) (receivedByF diff previouslyReceivedByF).size == expectedTxCountF && (receivedByC diff previouslyReceivedByC).size == expectedTxCountC }, max = 30 seconds, interval = 1 second) - // we generate blocks to make tx confirm - generateBlocks(2, Some(minerAddress)) + // we generate enough blocks for the channel to be deeply confirmed + generateBlocks(12, Some(minerAddress)) // and we wait for the channel to close awaitCond(stateListenerC.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) awaitCond(stateListenerF.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) - - // generate enough blocks so the router will know the channel has been closed and not spliced - generateBlocks(12) awaitAnnouncements(1) } @@ -599,15 +587,13 @@ class StandardChannelIntegrationSpec extends ChannelIntegrationSpec { bitcoinClient.getMempool().pipeTo(sender.ref) sender.expectMsgType[Seq[Transaction]].exists(_.txIn.head.outPoint.txid == fundingOutpoint.txid) }, max = 20 seconds, interval = 1 second) - generateBlocks(3) + // we generate enough blocks for the channel to be deeply confirmed + generateBlocks(12) awaitCond(stateListener.expectMsgType[ChannelStateChanged](max = 60 seconds).currentState == CLOSED, max = 60 seconds) - bitcoinClient.lookForSpendingTx(None, fundingOutpoint.txid, fundingOutpoint.index.toInt, limit = 10).pipeTo(sender.ref) + bitcoinClient.lookForSpendingTx(None, fundingOutpoint.txid, fundingOutpoint.index.toInt, limit = 12).pipeTo(sender.ref) val closingTx = sender.expectMsgType[Transaction] assert(closingTx.txOut.map(_.publicKeyScript).toSet == Set(finalPubKeyScriptC, finalPubKeyScriptF)) - - // generate enough blocks so the router will know the channel has been closed and not spliced - generateBlocks(12) awaitAnnouncements(1) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala index e4978e5b6e..b523ffdb12 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala @@ -319,10 +319,15 @@ class RouterSpec extends BaseRouterSpec { probe.expectMsg(PublicNode(node_b, 2, publicChannelCapacity * 2)) } + def fundingTx(node1: PublicKey, node2: PublicKey, capacity: Satoshi = publicChannelCapacity): Transaction = { + val fundingScript = write(pay2wsh(Scripts.multiSig2of2(node1, node2))) + val fundingTx = Transaction(version = 0, txIn = Nil, txOut = TxOut(capacity, fundingScript) :: Nil, lockTime = 0) + fundingTx + } + def spendingTx(node1: PublicKey, node2: PublicKey, capacity: Satoshi = publicChannelCapacity): Transaction = { val fundingScript = write(pay2wsh(Scripts.multiSig2of2(node1, node2))) - val previousFundingTx = Transaction(version = 2, txIn = Nil, txOut = TxOut(capacity, fundingScript) :: Nil, lockTime = 0) - val nextFundingTx = Transaction(version = 0, txIn = TxIn(OutPoint(previousFundingTx, 0), fundingScript, 0) :: Nil, txOut = TxOut(capacity, fundingScript) :: Nil, lockTime = 0) + val nextFundingTx = Transaction(version = 0, txIn = TxIn(OutPoint(fundingTx(node1, node2, capacity), 0), fundingScript, 0) :: Nil, txOut = TxOut(capacity, fundingScript) :: Nil, lockTime = 0) nextFundingTx } @@ -335,6 +340,7 @@ class RouterSpec extends BaseRouterSpec { watcher.expectMsgType[WatchTxConfirmed] router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_a, funding_b)) eventListener.expectMsg(ChannelLost(scid_ab)) + assert(watcher.expectMsgType[UnwatchExternalChannelSpent].txId == fundingTx(funding_a, funding_b).txid) assert(nodeParams.db.network.getChannel(scid_ab).isEmpty) // a doesn't have any channels, b still has one with c eventListener.expectMsg(NodeLost(a)) @@ -345,6 +351,7 @@ class RouterSpec extends BaseRouterSpec { router ! WatchExternalChannelSpentTriggered(scid_cd, spendingTx(funding_c, funding_d)) watcher.expectMsgType[WatchTxConfirmed] router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_c, funding_d)) + assert(watcher.expectMsgType[UnwatchExternalChannelSpent].txId == fundingTx(funding_c, funding_d).txid) eventListener.expectMsg(ChannelLost(scid_cd)) assert(nodeParams.db.network.getChannel(scid_cd).isEmpty) // d doesn't have any channels, c still has one with b @@ -356,6 +363,7 @@ class RouterSpec extends BaseRouterSpec { router ! WatchExternalChannelSpentTriggered(scid_bc, spendingTx(funding_b, funding_c)) watcher.expectMsgType[WatchTxConfirmed] router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_b, funding_c)) + assert(watcher.expectMsgType[UnwatchExternalChannelSpent].txId == fundingTx(funding_b, funding_c).txid) eventListener.expectMsg(ChannelLost(scid_bc)) assert(nodeParams.db.network.getChannel(scid_bc).isEmpty) // now b and c do not have any channels @@ -374,10 +382,10 @@ class RouterSpec extends BaseRouterSpec { val priv_funding_u = randomKey() val scid_au = RealShortChannelId(fixture.nodeParams.currentBlockHeight - 5000, 5, 0) val ann = channelAnnouncement(scid_au, priv_a, priv_u, priv_funding_a, priv_funding_u) - val fundingTx = Transaction(2, Nil, Seq(TxOut(500_000 sat, write(pay2wsh(Scripts.multiSig2of2(funding_a, priv_funding_u.publicKey))))), 0) + val fundingTx_au = fundingTx(funding_a, priv_funding_u.publicKey, 500_000 sat) router ! PeerRoutingMessage(TestProbe().ref, remoteNodeId, ann) watcher.expectMsgType[ValidateRequest] - watcher.send(router, ValidateResult(ann, Right((fundingTx, UtxoStatus.Unspent)))) + watcher.send(router, ValidateResult(ann, Right((fundingTx_au, UtxoStatus.Unspent)))) watcher.expectMsgType[WatchExternalChannelSpent] eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(ann, 500_000 sat, None, None) :: Nil)) awaitAssert(assert(nodeParams.db.network.getChannel(scid_au).nonEmpty)) @@ -392,6 +400,7 @@ class RouterSpec extends BaseRouterSpec { router ! WatchExternalChannelSpentTriggered(scid_au, spendingTx(funding_a, priv_funding_u.publicKey)) assert(watcher.expectMsgType[WatchTxConfirmed].txId == spendingTx(funding_a, priv_funding_u.publicKey).txid) router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_a, priv_funding_u.publicKey)) + assert(watcher.expectMsgType[UnwatchExternalChannelSpent].txId == fundingTx_au.txid) eventListener.expectMsg(ChannelLost(scid_au)) eventListener.expectMsg(NodeLost(priv_u.publicKey)) awaitAssert(assert(nodeParams.db.network.getChannel(scid_au).isEmpty)) @@ -905,12 +914,12 @@ class RouterSpec extends BaseRouterSpec { val scid = RealShortChannelId(fixture.nodeParams.currentBlockHeight - 5000, 5, 0) val capacity = 1_000_000.sat val ann = channelAnnouncement(scid, priv_a, priv_c, priv_funding_a, priv_funding_c) - val fundingTx = Transaction(2, Nil, Seq(TxOut(capacity, write(pay2wsh(Scripts.multiSig2of2(funding_a, funding_c))))), 0) val peerConnection = TestProbe() + val fundingTx_ac = fundingTx(funding_a, funding_c, capacity) peerConnection.ignoreMsg { case _: TransportHandler.ReadAck => true } peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, ann)) watcher.expectMsgType[ValidateRequest] - watcher.send(router, ValidateResult(ann, Right((fundingTx, UtxoStatus.Unspent)))) + watcher.send(router, ValidateResult(ann, Right((fundingTx_ac, UtxoStatus.Unspent)))) peerConnection.expectMsg(GossipDecision.Accepted(ann)) probe.send(router, GetChannels) assert(probe.expectMsgType[Iterable[ChannelAnnouncement]].exists(_.shortChannelId == scid)) @@ -928,7 +937,7 @@ class RouterSpec extends BaseRouterSpec { val staleUpdate = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, c, scid, CltvExpiryDelta(72), 1 msat, 10 msat, 100, htlcMaximum, timestamp = staleTimestamp) peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, staleUpdate)) peerConnection.expectMsg(GossipDecision.Stale(staleUpdate)) - assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, None, None, None))) + assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, None, None, None))) // We receive a non-stale channel update for one side of the channel. val update_ac_1 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv_a, c, scid, CltvExpiryDelta(72), 1 msat, 10 msat, 100, htlcMaximum, timestamp = TimestampSecond.now() - 3.days) @@ -936,9 +945,9 @@ class RouterSpec extends BaseRouterSpec { peerConnection.expectMsg(GossipDecision.RelatedChannelPruned(update_ac_1)) peerConnection.expectNoMessage(100 millis) if (update_ac_1.channelFlags.isNode1) { - assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, Some(update_ac_1), None, None))) + assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, Some(update_ac_1), None, None))) } else { - assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, None, Some(update_ac_1), None))) + assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, None, Some(update_ac_1), None))) } probe.send(router, GetRouterData) val routerData1 = probe.expectMsgType[Data] @@ -953,9 +962,9 @@ class RouterSpec extends BaseRouterSpec { peerConnection.expectMsg(GossipDecision.RelatedChannelPruned(update_ac_2)) peerConnection.expectNoMessage(100 millis) if (update_ac_2.channelFlags.isNode1) { - assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, Some(update_ac_2), None, None))) + assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, Some(update_ac_2), None, None))) } else { - assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, None, Some(update_ac_2), None))) + assert(nodeParams.db.network.getChannel(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, None, Some(update_ac_2), None))) } probe.send(router, GetRouterData) val routerData2 = probe.expectMsgType[Data] @@ -972,9 +981,9 @@ class RouterSpec extends BaseRouterSpec { assert(routerData3.channels.contains(scid)) assert(!routerData3.prunedChannels.contains(scid)) if (update_ac_2.channelFlags.isNode1) { - assert(routerData3.channels.get(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, Some(update_ac_2), Some(update_ca), None))) + assert(routerData3.channels.get(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, Some(update_ac_2), Some(update_ca), None))) } else { - assert(routerData3.channels.get(scid).contains(PublicChannel(ann, fundingTx.txid, capacity, Some(update_ca), Some(update_ac_2), None))) + assert(routerData3.channels.get(scid).contains(PublicChannel(ann, fundingTx_ac.txid, capacity, Some(update_ca), Some(update_ac_2), None))) } assert(routerData3.graphWithBalances.graph.containsEdge(ChannelDesc(update_ac_2, ann))) assert(routerData3.graphWithBalances.graph.containsEdge(ChannelDesc(update_ca, ann)))