From 929f85d688450edbc0ab474736bd35af8b1d062b Mon Sep 17 00:00:00 2001 From: Richard Myers Date: Mon, 25 Nov 2024 11:48:58 +0100 Subject: [PATCH] Update scid when splice funding tx confirms A zero conf splice will send splice_locked when funding tx is published, but not update it's scid until the fund tx confirms. Channel balances and max htlc amount will not update until local sends to and receives from it's remote peer splice_locked. --- .../blockchain/bitcoind/ZmqWatcher.scala | 40 ++----------- .../scala/fr/acinq/eclair/router/Router.scala | 37 +++++------- .../fr/acinq/eclair/router/Validation.scala | 29 +++++----- .../blockchain/bitcoind/ZmqWatcherSpec.scala | 31 +++++----- .../acinq/eclair/router/BaseRouterSpec.scala | 8 +-- .../router/ChannelRouterIntegrationSpec.scala | 9 ++- .../fr/acinq/eclair/router/RouterSpec.scala | 56 ++++++++++++------- 7 files changed, 91 insertions(+), 119 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala index d5126f4a8e..4429da4c41 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala @@ -104,20 +104,6 @@ object ZmqWatcher { def hints: Set[TxId] } - /** - * Watch for the first transaction spending the given outpoint. We assume that txid is already confirmed or in the - * mempool (i.e. the outpoint exists). - * - * NB: an event will be triggered only once when we see a transaction that spends the given outpoint. If you want to - * react to the transaction spending the outpoint, you should use [[WatchSpent]] instead. - */ - sealed trait WatchSpentBasic[T <: WatchSpentBasicTriggered] extends Watch[T] { - /** TxId of the outpoint to watch. */ - def txId: TxId - /** Index of the outpoint to watch. */ - def outputIndex: Int - } - /** This event is sent when a [[WatchConfirmed]] condition is met. */ sealed trait WatchConfirmedTriggered extends WatchTriggered { /** Block in which the transaction was confirmed. */ @@ -134,11 +120,10 @@ object ZmqWatcher { def spendingTx: Transaction } - /** This event is sent when a [[WatchSpentBasic]] condition is met. */ - sealed trait WatchSpentBasicTriggered extends WatchTriggered - - case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpentBasic[WatchExternalChannelSpentTriggered] - case class WatchExternalChannelSpentTriggered(shortChannelId: RealShortChannelId) extends WatchSpentBasicTriggered + case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpent[WatchExternalChannelSpentTriggered] { + override def hints: Set[TxId] = Set.empty + } + case class WatchExternalChannelSpentTriggered(shortChannelId: RealShortChannelId, spendingTx: Transaction) extends WatchSpentTriggered case class WatchFundingSpent(replyTo: ActorRef[WatchFundingSpentTriggered], txId: TxId, outputIndex: Int, hints: Set[TxId]) extends WatchSpent[WatchFundingSpentTriggered] case class WatchFundingSpentTriggered(spendingTx: Transaction) extends WatchSpentTriggered @@ -194,7 +179,6 @@ object ZmqWatcher { private def utxo(w: GenericWatch): Option[OutPoint] = { w match { case w: WatchSpent[_] => Some(OutPoint(w.txId, w.outputIndex)) - case w: WatchSpentBasic[_] => Some(OutPoint(w.txId, w.outputIndex)) case _ => None } } @@ -242,7 +226,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client .flatMap(watchedUtxos.get) .flatten .foreach { - case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId)) + case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId, tx)) case w: WatchFundingSpent => context.self ! TriggerEvent(w.replyTo, w, WatchFundingSpentTriggered(tx)) case w: WatchOutputSpent => context.self ! TriggerEvent(w.replyTo, w, WatchOutputSpentTriggered(tx)) case _: WatchPublished => // nothing to do @@ -336,9 +320,6 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client val result = w match { case _ if watches.contains(w) => Ignore // we ignore duplicates - case w: WatchSpentBasic[_] => - checkSpentBasic(w) - Keep case w: WatchSpent[_] => checkSpent(w) Keep @@ -379,17 +360,6 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client } } - private def checkSpentBasic(w: WatchSpentBasic[_ <: WatchSpentBasicTriggered]): Future[Unit] = { - // NB: we assume parent tx was published, we just need to make sure this particular output has not been spent - client.isTransactionOutputSpendable(w.txId, w.outputIndex, includeMempool = true).collect { - case false => - log.info(s"output=${w.txId}:${w.outputIndex} has already been spent") - w match { - case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId)) - } - } - } - private def checkSpent(w: WatchSpent[_ <: WatchSpentTriggered]): Future[Unit] = { // first let's see if the parent tx was published or not client.getTxConfirmations(w.txId).collect { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index a27f244517..a6338fd460 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -27,7 +27,7 @@ import fr.acinq.eclair.Logs.LogCategory import fr.acinq.eclair._ import fr.acinq.eclair.blockchain.CurrentBlockHeight import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{ValidateResult, WatchExternalChannelSpent, WatchExternalChannelSpentTriggered} +import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{ValidateResult, WatchExternalChannelSpent, WatchExternalChannelSpentTriggered, WatchTxConfirmed, WatchTxConfirmedTriggered} import fr.acinq.eclair.channel._ import fr.acinq.eclair.crypto.TransportHandler import fr.acinq.eclair.db.NetworkDb @@ -76,10 +76,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm { log.info("loading network announcements from db...") val (pruned, channels) = db.listChannels().partition { case (_, pc) => pc.isStale(nodeParams.currentBlockHeight) } - val nodeIds = (pruned.values ++ channels.values).flatMap(pc => pc.ann.nodeId1 :: pc.ann.nodeId2 :: Nil).toSet - val (isolatedNodes, nodes) = db.listNodes().partition(n => !nodeIds.contains(n.nodeId)) - log.info("removed {} isolated nodes from db", isolatedNodes.size) - isolatedNodes.foreach(n => db.removeNode(n.nodeId)) + val nodes = db.listNodes() Metrics.Nodes.withoutTags().update(nodes.size) Metrics.Channels.withoutTags().update(channels.size) log.info("loaded from db: channels={} nodes={}", channels.size, nodes.size) @@ -265,22 +262,17 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm case Event(r: ValidateResult, d) => stay() using Validation.handleChannelValidationResponse(d, nodeParams, watcher, r) - case Event(WatchExternalChannelSpentTriggered(shortChannelId), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) => - log.info("funding tx of channelId={} has been spent - delay removing it from the graph for 12 blocks", shortChannelId) - // remove the channel from the db so it will not be added to the graph if a restart occurs before 12 blocks elapse - db.removeChannel(shortChannelId) - stay() using d.copy(spentChannels = d.spentChannels + (shortChannelId -> nodeParams.currentBlockHeight)) - - case Event(c: CurrentBlockHeight, d) => - val spentChannels1 = d.spentChannels.filter { - // spent channels may be confirmed as a splice; wait 12 blocks before removing them from the graph - case (_, blockHeight) if blockHeight >= c.blockHeight + 12 => true - case (shortChannelId, _) => self ! HandleChannelSpent(shortChannelId); false - } - stay() using d.copy(spentChannels = spentChannels1) + case Event(WatchExternalChannelSpentTriggered(shortChannelId, spendingTx), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) => + val txId = d.channels.getOrElse(shortChannelId, d.prunedChannels(shortChannelId)).fundingTxId + log.info("funding tx txId={} of channelId={} has been spent - delay removing it from the graph until {} blocks after the spend confirms", txId, shortChannelId, nodeParams.channelConf.minDepthBlocks) + watcher ! WatchTxConfirmed(self, spendingTx.txid, nodeParams.channelConf.minDepthBlocks) + stay() using d.copy(spentChannels = d.spentChannels + (spendingTx.txid -> shortChannelId)) - case Event(HandleChannelSpent(shortChannelId), d: Data) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) => - stay() using Validation.handleChannelSpent(d, nodeParams.db.network, shortChannelId) + case Event(WatchTxConfirmedTriggered(_, _, spendingTx), d) => + d.spentChannels.get(spendingTx.txid) match { + case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, nodeParams.db.network, shortChannelId) + case None => stay() + } case Event(n: NodeAnnouncement, d: Data) => stay() using Validation.handleNodeAnnouncement(d, nodeParams.db.network, Set(LocalGossip), n) @@ -778,7 +770,7 @@ object Router { excludedChannels: Map[ChannelDesc, ExcludedChannelStatus], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure graphWithBalances: GraphWithBalanceEstimates, sync: Map[PublicKey, Syncing], // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message - spentChannels: Map[RealShortChannelId, BlockHeight], // channels with funding txs spent less than 12 blocks ago + spentChannels: Map[TxId, RealShortChannelId], // channels with spent funding txs that are not deeply buried yet ) { def resolve(scid: ShortChannelId): Option[KnownChannel] = { // let's assume this is a real scid @@ -819,7 +811,4 @@ object Router { /** We have tried to relay this amount from this channel and it failed. */ case class ChannelCouldNotRelay(amount: MilliSatoshi, hop: ChannelHop) - /** Funding Tx of the channel id has been spent and not updated with a splice within 12 blocks. */ - private case class HandleChannelSpent(shortChannelId: RealShortChannelId) - } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala index 18c86a1743..ca6e6887e0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala @@ -31,6 +31,7 @@ import fr.acinq.eclair.db.NetworkDb import fr.acinq.eclair.router.Graph.GraphStructure.GraphEdge import fr.acinq.eclair.router.Monitoring.Metrics import fr.acinq.eclair.router.Router._ +import fr.acinq.eclair.router.Validation.{addPublicChannel, splicePublicChannel} import fr.acinq.eclair.transactions.Scripts import fr.acinq.eclair.wire.protocol._ import fr.acinq.eclair.{BlockHeight, Logs, MilliSatoshiLong, NodeParams, RealShortChannelId, ShortChannelId, TxCoordinates} @@ -111,17 +112,11 @@ object Validation { None } else { log.debug("validation successful for shortChannelId={}", c.shortChannelId) - val sharedInputTxId_opt = tx.txIn.find(_.signatureScript == fundingOutputScript).map(_.outPoint.txid) remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c))) val capacity = tx.txOut(outputIndex).amount - // if a channel spends the shared output of a recently spent channel, then it is a splice - sharedInputTxId_opt match { + d0.spentChannels.get(tx.txid) match { + case Some(parentScid) => Some(splicePublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, d0.channels(parentScid))) case None => Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None)) - case Some(sharedInputTxId) => - d0.spentChannels.find(spent => d0.channels.get(spent._1).exists(_.fundingTxId == sharedInputTxId)) match { - case Some((parentScid, _)) => Some(splicePublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, d0.channels(parentScid))) - case None => log.error("channel shortChannelId={} is a splice, but not matching channel found!", c.shortChannelId); None - } } } case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) => @@ -165,16 +160,16 @@ object Validation { } } - private def splicePublicChannel(d: Data, nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], ann: ChannelAnnouncement, fundingTxId: TxId, capacity: Satoshi, parentChannel: PublicChannel)(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = { + private def splicePublicChannel(d: Data, nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], ann: ChannelAnnouncement, spliceTxId: TxId, capacity: Satoshi, parentChannel: PublicChannel)(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors val fundingOutputIndex = outputIndex(ann.shortChannelId) - watcher ! WatchExternalChannelSpent(ctx.self, fundingTxId, fundingOutputIndex, ann.shortChannelId) + watcher ! WatchExternalChannelSpent(ctx.self, spliceTxId, fundingOutputIndex, ann.shortChannelId) ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(ann, capacity, None, None) :: Nil)) - nodeParams.db.network.addChannel(ann, fundingTxId, capacity) + nodeParams.db.network.addChannel(ann, spliceTxId, capacity) nodeParams.db.network.removeChannel(parentChannel.shortChannelId) val pubChan = PublicChannel( ann = ann, - fundingTxId = fundingTxId, + fundingTxId = spliceTxId, capacity = capacity, update_1_opt = parentChannel.update_1_opt, update_2_opt = parentChannel.update_2_opt, @@ -192,7 +187,8 @@ object Validation { // we rebroadcast the splice channel to our peers channels = d.rebroadcast.channels + (pubChan.ann -> d.awaiting.getOrElse(pubChan.ann, if (pubChan.nodeId1 == nodeParams.nodeId || pubChan.nodeId2 == nodeParams.nodeId) Seq(LocalGossip) else Nil).toSet), ), - graphWithBalances = graph1 + graphWithBalances = graph1, + spentChannels = d.spentChannels.filter(_._1 != spliceTxId) ) } @@ -257,7 +253,7 @@ object Validation { def handleChannelSpent(d: Data, db: NetworkDb, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors val lostChannel = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get.ann - log.info("funding tx of channelId={} has been spent", shortChannelId) + log.info("funding tx for channelId={} was spent", shortChannelId) // we need to remove nodes that aren't tied to any channels anymore val channels1 = d.channels - shortChannelId val prunedChannels1 = d.prunedChannels - shortChannelId @@ -276,7 +272,8 @@ object Validation { db.removeNode(nodeId) ctx.system.eventStream.publish(NodeLost(nodeId)) } - d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1) + val spentChannels1 = d.spentChannels.filter(_._2 != shortChannelId) + d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1, spentChannels = spentChannels1) } def handleNodeAnnouncement(d: Data, db: NetworkDb, origins: Set[GossipOrigin], n: NodeAnnouncement, wasStashed: Boolean = false)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { @@ -585,7 +582,7 @@ object Validation { val scid2PrivateChannels1 = d.scid2PrivateChannels - lcd.shortIds.localAlias.toLong -- lcd.shortIds.real.toOption.map(_.toLong) // a local channel has permanently gone down if (lcd.shortIds.real.toOption.exists(d.channels.contains)) { - // the channel was public, we will receive (or have already received) a WatchEventSpentBasic event, that will trigger a clean up of the channel + // the channel was public, we will receive (or have already received) a WatchSpent event, that will trigger a clean up of the channel // so let's not do anything here d.copy(scid2PrivateChannels = scid2PrivateChannels1) } else if (d.privateChannels.contains(lcd.channelId)) { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala index d0aaad7724..41ba5126cb 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala @@ -213,7 +213,7 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind watcher ! ListWatches(probe.ref) assert(probe.expectMsgType[Set[Watch[_]]].isEmpty) - // If we try to watch a transaction that has already been confirmed, we should immediately receive a WatchEventConfirmed. + // If we try to watch a transaction that has already been confirmed, we should immediately receive a WatchConfirmedTriggered event. watcher ! WatchFundingConfirmed(probe.ref, tx.txid, 1) assert(probe.expectMsgType[WatchFundingConfirmedTriggered].tx.txid == tx.txid) watcher ! WatchFundingConfirmed(probe.ref, tx.txid, 2) @@ -254,7 +254,7 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind generateBlocks(3) assert(probe.expectMsgType[WatchTxConfirmedTriggered].tx.txid == tx.txid) - // If we watch the transaction when it's already confirmed, we immediately receive the WatchEventConfirmed. + // If we watch the transaction when it's already confirmed, we immediately receive the WatchConfirmedTriggered event. watcher ! WatchTxConfirmed(probe.ref, tx.txid, 3, Some(delay.copy(delay = 720))) assert(probe.expectMsgType[WatchTxConfirmedTriggered].tx.txid == tx.txid) }) @@ -277,23 +277,25 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind assert(probe.expectMsgType[Set[Watch[_]]].size == 2) bitcoinClient.publishTransaction(tx1) - // tx and tx1 aren't confirmed yet, but we trigger the WatchEventSpent when we see tx1 in the mempool. + // tx and tx1 aren't confirmed yet, but we trigger the WatchSpentTriggered event when we see tx1 in the mempool. probe.expectMsgAllOf( - WatchExternalChannelSpentTriggered(RealShortChannelId(5)), + WatchExternalChannelSpentTriggered(RealShortChannelId(5), tx1), WatchFundingSpentTriggered(tx1) ) - // Let's confirm tx and tx1: seeing tx1 in a block should trigger WatchEventSpent again, but not WatchEventSpentBasic - // (which only triggers once). + // Let's confirm tx and tx1: seeing tx1 in a block should trigger both WatchSpentTriggered events again. bitcoinClient.getBlockHeight().pipeTo(probe.ref) val initialBlockHeight = probe.expectMsgType[BlockHeight] generateBlocks(1) - probe.expectMsg(WatchFundingSpentTriggered(tx1)) + probe.expectMsgAllOf( + WatchExternalChannelSpentTriggered(RealShortChannelId(5), tx1), + WatchFundingSpentTriggered(tx1) + ) probe.expectNoMessage(100 millis) watcher ! ListWatches(probe.ref) val watches1 = probe.expectMsgType[Set[Watch[_]]] - assert(watches1.size == 1) - assert(watches1.forall(_.isInstanceOf[WatchFundingSpent])) + assert(watches1.size == 2) + assert(watches1.forall(_.isInstanceOf[WatchSpent[_]])) // Let's submit tx2, and set a watch after it has been confirmed this time. bitcoinClient.publishTransaction(tx2) @@ -305,8 +307,8 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind watcher ! ListWatches(probe.ref) val watches2 = probe.expectMsgType[Set[Watch[_]]] - assert(watches2.size == 1) - assert(watches2.forall(_.isInstanceOf[WatchFundingSpent])) + assert(watches2.size == 2) + assert(watches2.forall(_.isInstanceOf[WatchSpent[_]])) watcher ! StopWatching(probe.ref) // We use hints and see if we can find tx2 @@ -325,7 +327,8 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind watcher ! StopWatching(probe.ref) watcher ! WatchExternalChannelSpent(probe.ref, tx1.txid, 0, RealShortChannelId(1)) - probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(1))) + probe.expectMsg(WatchExternalChannelSpentTriggered(RealShortChannelId(1), tx2)) + watcher ! StopWatching(probe.ref) watcher ! WatchFundingSpent(probe.ref, tx1.txid, 0, Set.empty) probe.expectMsg(WatchFundingSpentTriggered(tx2)) }) @@ -357,7 +360,7 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind probe.expectMsgAllOf(tx2.txid, WatchFundingSpentTriggered(tx2)) probe.expectNoMessage(100 millis) generateBlocks(1) - probe.expectMsg(WatchFundingSpentTriggered(tx2)) // tx2 is confirmed which triggers WatchEventSpent again + probe.expectMsg(WatchFundingSpentTriggered(tx2)) // tx2 is confirmed which triggers a WatchSpentTriggered event again generateBlocks(1) assert(probe.expectMsgType[WatchFundingConfirmedTriggered].tx == tx1) // tx1 now has 3 confirmations }) @@ -377,7 +380,7 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind // It may happen that transactions get included in a block without getting into our mempool first (e.g. a miner could // try to hide a revoked commit tx from the network until it gets confirmed, in an attempt to steal funds). // When we receive that block, we must send an event for every transaction inside it to analyze them and potentially - // trigger `WatchSpent` / `WatchSpentBasic`. + // trigger `WatchSpent`. generateBlocks(1) val txs = Seq( listener.fishForMessage() { case m: NewTransaction => Set(tx1.txid, tx2.txid).contains(m.tx.txid) }, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala index 86086f7e89..3cf635a52a 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala @@ -63,8 +63,8 @@ abstract class BaseRouterSpec extends TestKitBaseClass with FixtureAnyFunSuiteLi val testNodeKeyManager = new LocalNodeKeyManager(seed, Block.RegtestGenesisBlock.hash) val testChannelKeyManager = new LocalChannelKeyManager(seed, Block.RegtestGenesisBlock.hash) - val (priv_a, priv_b, priv_c, priv_d, priv_e, priv_f, priv_g, priv_h, priv_isolated) = (testNodeKeyManager.nodeKey.privateKey, randomKey(), randomKey(), randomKey(), randomKey(), randomKey(), randomKey(), randomKey(), randomKey()) - val (a, b, c, d, e, f, g, h, isolated) = (priv_a.publicKey, priv_b.publicKey, priv_c.publicKey, priv_d.publicKey, priv_e.publicKey, priv_f.publicKey, priv_g.publicKey, priv_h.publicKey, priv_isolated.publicKey) + val (priv_a, priv_b, priv_c, priv_d, priv_e, priv_f, priv_g, priv_h) = (testNodeKeyManager.nodeKey.privateKey, randomKey(), randomKey(), randomKey(), randomKey(), randomKey(), randomKey(), randomKey()) + val (a, b, c, d, e, f, g, h) = (priv_a.publicKey, priv_b.publicKey, priv_c.publicKey, priv_d.publicKey, priv_e.publicKey, priv_f.publicKey, priv_g.publicKey, priv_h.publicKey) val (priv_funding_a, priv_funding_b, priv_funding_c, priv_funding_d, priv_funding_e, priv_funding_f, priv_funding_g, priv_funding_h) = (randomKey(), randomKey(), randomKey(), randomKey(), randomKey(), randomKey(), randomKey(), randomKey()) val (funding_a, funding_b, funding_c, funding_d, funding_e, funding_f, funding_g, funding_h) = (priv_funding_a.publicKey, priv_funding_b.publicKey, priv_funding_c.publicKey, priv_funding_d.publicKey, priv_funding_e.publicKey, priv_funding_f.publicKey, priv_funding_g.publicKey, priv_funding_h.publicKey) @@ -140,10 +140,6 @@ abstract class BaseRouterSpec extends TestKitBaseClass with FixtureAnyFunSuiteLi import com.softwaremill.quicklens._ val nodeParams = Alice.nodeParams .modify(_.nodeKeyManager).setTo(testNodeKeyManager) - - // an isolated node may exist in our db after a restart if 12-blocks had not elapsed after its last channel was removed - nodeParams.db.network.addNode(makeNodeAnnouncement(randomKey(), "node-I", Color(30, 10, -50), Nil, Features.empty)) - val router = system.actorOf(Router.props(nodeParams, watcher.ref)) // we announce channels peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, chan_ab)) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala index 9a31cfe7af..2550873c7b 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/ChannelRouterIntegrationSpec.scala @@ -2,8 +2,9 @@ package fr.acinq.eclair.router import akka.actor.ActorSystem import akka.testkit.{TestFSMRef, TestProbe} +import fr.acinq.bitcoin.scalacompat.Transaction import fr.acinq.eclair.blockchain.CurrentBlockHeight -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchExternalChannelSpent, WatchExternalChannelSpentTriggered, WatchFundingDeeplyBuriedTriggered} +import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchExternalChannelSpent, WatchExternalChannelSpentTriggered, WatchFundingDeeplyBuriedTriggered, WatchTxConfirmed, WatchTxConfirmedTriggered} import fr.acinq.eclair.channel.states.{ChannelStateTestsBase, ChannelStateTestsTags} import fr.acinq.eclair.channel.{CMD_CLOSE, DATA_NORMAL} import fr.acinq.eclair.io.Peer.PeerRoutingMessage @@ -175,9 +176,11 @@ class ChannelRouterIntegrationSpec extends TestKitBaseClass with FixtureAnyFunSu channels.bob2alice.forward(channels.alice) if (testTags.contains(ChannelStateTestsTags.ChannelsPublic)) { // if the channel was public, the router asked the watcher to watch the funding tx and will be notified - val watchSpentBasic = channels.alice2blockchain.expectMsgType[WatchExternalChannelSpent] - watchSpentBasic.replyTo ! WatchExternalChannelSpentTriggered(watchSpentBasic.shortChannelId) + val watchSpent = channels.alice2blockchain.expectMsgType[WatchExternalChannelSpent] + watchSpent.replyTo ! WatchExternalChannelSpentTriggered(watchSpent.shortChannelId, Transaction(0, Nil, Nil, 0)) + val watchConfirmed = channels.alice2blockchain.expectMsgType[WatchTxConfirmed] channels.alice.underlying.system.eventStream.publish(CurrentBlockHeight(BlockHeight(400012))) + watchConfirmed.replyTo ! WatchTxConfirmedTriggered(BlockHeight(42000), 42, Transaction(0, Nil, Nil, 0)) } // router cleans up the channel awaitAssert { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala index 778b2ddc28..06766812c8 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala @@ -22,8 +22,7 @@ import akka.actor.typed.scaladsl.adapter.ClassicActorRefOps import akka.testkit.TestProbe import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.bitcoin.scalacompat.Script.{pay2wsh, write} -import fr.acinq.bitcoin.scalacompat.{Block, ByteVector32, OutPoint, SatoshiLong, Transaction, TxIn, TxOut} -import fr.acinq.eclair.blockchain.CurrentBlockHeight +import fr.acinq.bitcoin.scalacompat.{Block, ByteVector32, OutPoint, Satoshi, SatoshiLong, Transaction, TxIn, TxOut} import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._ import fr.acinq.eclair.channel.{AvailableBalanceChanged, CommitmentsSpec, LocalChannelUpdate} import fr.acinq.eclair.crypto.TransportHandler @@ -319,14 +318,19 @@ class RouterSpec extends BaseRouterSpec { probe.expectMsg(PublicNode(node_b, 2, publicChannelCapacity * 2)) } + def spendingTx(node1: PublicKey, node2: PublicKey): Transaction = { + val originalFundingTx = Transaction(version = 0, txIn = Nil, txOut = TxOut(publicChannelCapacity, write(pay2wsh(Scripts.multiSig2of2(node1, node2)))) :: Nil, lockTime = 0) + Transaction(version = 0, txIn = TxIn(outPoint = OutPoint(originalFundingTx,0), signatureScript = write(pay2wsh(Scripts.multiSig2of2(node1, node2))), sequence = 0) :: Nil, txOut = TxOut(publicChannelCapacity, write(pay2wsh(Scripts.multiSig2of2(node1, node1)))) :: Nil, lockTime = 0) + } + test("properly announce lost channels and nodes") { fixture => import fixture._ val eventListener = TestProbe() system.eventStream.subscribe(eventListener.ref, classOf[NetworkEvent]) - router ! WatchExternalChannelSpentTriggered(scid_ab) - eventListener.expectNoMessage(100 millis) - system.eventStream.publish(CurrentBlockHeight(fixture.nodeParams.currentBlockHeight + 12)) + router ! WatchExternalChannelSpentTriggered(scid_ab, spendingTx(funding_a, funding_b)) + watcher.expectMsgType[WatchTxConfirmed] + router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_a, funding_b)) eventListener.expectMsg(ChannelLost(scid_ab)) assert(nodeParams.db.network.getChannel(scid_ab).isEmpty) // a doesn't have any channels, b still has one with c @@ -335,9 +339,9 @@ class RouterSpec extends BaseRouterSpec { assert(nodeParams.db.network.getNode(b).nonEmpty) eventListener.expectNoMessage(200 milliseconds) - router ! WatchExternalChannelSpentTriggered(scid_cd) - eventListener.expectNoMessage(100 millis) - system.eventStream.publish(CurrentBlockHeight(fixture.nodeParams.currentBlockHeight + 12)) + router ! WatchExternalChannelSpentTriggered(scid_cd, spendingTx(funding_c, funding_d)) + watcher.expectMsgType[WatchTxConfirmed] + router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_c, funding_d)) eventListener.expectMsg(ChannelLost(scid_cd)) assert(nodeParams.db.network.getChannel(scid_cd).isEmpty) // d doesn't have any channels, c still has one with b @@ -346,9 +350,9 @@ class RouterSpec extends BaseRouterSpec { assert(nodeParams.db.network.getNode(c).nonEmpty) eventListener.expectNoMessage(200 milliseconds) - router ! WatchExternalChannelSpentTriggered(scid_bc) - eventListener.expectNoMessage(100 millis) - system.eventStream.publish(CurrentBlockHeight(fixture.nodeParams.currentBlockHeight + 12)) + router ! WatchExternalChannelSpentTriggered(scid_bc, spendingTx(funding_b, funding_c)) + watcher.expectMsgType[WatchTxConfirmed] + router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_b, funding_c)) eventListener.expectMsg(ChannelLost(scid_bc)) assert(nodeParams.db.network.getChannel(scid_bc).isEmpty) // now b and c do not have any channels @@ -371,6 +375,7 @@ class RouterSpec extends BaseRouterSpec { router ! PeerRoutingMessage(TestProbe().ref, remoteNodeId, ann) watcher.expectMsgType[ValidateRequest] watcher.send(router, ValidateResult(ann, Right((fundingTx, UtxoStatus.Unspent)))) + watcher.expectMsgType[WatchExternalChannelSpent] eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(ann, 500_000 sat, None, None) :: Nil)) awaitAssert(assert(nodeParams.db.network.getChannel(scid_au).nonEmpty)) @@ -381,9 +386,9 @@ class RouterSpec extends BaseRouterSpec { awaitAssert(assert(nodeParams.db.network.getChannel(scid_au).nonEmpty)) // The channel is closed, now we can remove it from the DB. - router ! WatchExternalChannelSpentTriggered(scid_au) - eventListener.expectNoMessage(100 millis) - system.eventStream.publish(CurrentBlockHeight(fixture.nodeParams.currentBlockHeight + 12)) + router ! WatchExternalChannelSpentTriggered(scid_au, spendingTx(funding_a, priv_funding_u.publicKey)) + assert(watcher.expectMsgType[WatchTxConfirmed].txId == spendingTx(funding_a, priv_funding_u.publicKey).txid) + router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_a, priv_funding_u.publicKey)) eventListener.expectMsg(ChannelLost(scid_au)) eventListener.expectMsg(NodeLost(priv_u.publicKey)) awaitAssert(assert(nodeParams.db.network.getChannel(scid_au).isEmpty)) @@ -1121,6 +1126,11 @@ class RouterSpec extends BaseRouterSpec { } } + def spliceTx(node1: PublicKey, node2: PublicKey, newCapacity: Satoshi): Transaction = { + val originalFundingTx = Transaction(version = 0, txIn = Nil, txOut = TxOut(publicChannelCapacity, write(pay2wsh(Scripts.multiSig2of2(node1, node2)))) :: Nil, lockTime = 0) + Transaction(version = 0, txIn = TxIn(outPoint = OutPoint(originalFundingTx,0), signatureScript = write(pay2wsh(Scripts.multiSig2of2(node1, node2))), sequence = 0) :: Nil, txOut = TxOut(newCapacity, write(pay2wsh(Scripts.multiSig2of2(node1, node2)))) :: Nil, lockTime = 0) + } + test("update an existing channel after a splice") { fixture => import fixture._ @@ -1129,7 +1139,9 @@ class RouterSpec extends BaseRouterSpec { val peerConnection = TestProbe() // Channel ab is spent by a splice tx. - router ! WatchExternalChannelSpentTriggered(scid_ab) + val newCapacity = publicChannelCapacity - 100_000.sat + router ! WatchExternalChannelSpentTriggered(scid_ab, spliceTx(funding_a, funding_b, newCapacity)) + watcher.expectMsgType[WatchTxConfirmed] eventListener.expectNoMessage(100 millis) // The splice of channel ab is announced. @@ -1138,14 +1150,10 @@ class RouterSpec extends BaseRouterSpec { peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, spliceAnn)) peerConnection.expectNoMessage(100 millis) assert(watcher.expectMsgType[ValidateRequest].ann == spliceAnn) - val postSpliceCapacity = publicChannelCapacity - 100_000.sat - val originalFundingTx = Transaction(version = 0, txIn = Nil, txOut = TxOut(publicChannelCapacity, write(pay2wsh(Scripts.multiSig2of2(funding_a, funding_b)))) :: Nil, lockTime = 0) - val spliceFundingTx = Transaction(version = 0, txIn = TxIn(outPoint = OutPoint(originalFundingTx,0), signatureScript = write(pay2wsh(Scripts.multiSig2of2(funding_a, funding_b))), sequence = 0) :: Nil, txOut = TxOut(postSpliceCapacity, write(pay2wsh(Scripts.multiSig2of2(funding_a, funding_b)))) :: Nil, lockTime = 0) - watcher.send(router, ValidateResult(spliceAnn, Right(spliceFundingTx, UtxoStatus.Unspent))) + watcher.send(router, ValidateResult(spliceAnn, Right(spliceTx(funding_a, funding_b, newCapacity), UtxoStatus.Unspent))) peerConnection.expectMsg(TransportHandler.ReadAck(spliceAnn)) peerConnection.expectMsg(GossipDecision.Accepted(spliceAnn)) assert(peerConnection.sender() == router) - assert(watcher.expectMsgType[WatchExternalChannelSpent].shortChannelId == spliceAnn.shortChannelId) // And the graph should be updated too. val sender = TestProbe() @@ -1155,7 +1163,13 @@ class RouterSpec extends BaseRouterSpec { val edge_ba = g.getEdge(ChannelDesc(spliceScid, b, a)).get assert(g.getEdge(ChannelDesc(scid_ab, a, b)).isEmpty) assert(g.getEdge(ChannelDesc(scid_ab, b, a)).isEmpty) - assert(edge_ab.capacity == postSpliceCapacity && edge_ba.capacity == postSpliceCapacity) + assert(edge_ab.capacity == newCapacity && edge_ba.capacity == newCapacity) + + // The channel update for the splice is confirmed and the channel is not removed. + router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_a, funding_b)) + eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(spliceAnn, newCapacity, None, None) :: Nil)) + peerConnection.expectNoMessage(100 millis) + eventListener.expectNoMessage(100 millis) } test("after a reload, any node without channels should be removed from the database") { fixture =>