diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala index 2b28bb0415..3eddf8de63 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Setup.scala @@ -367,7 +367,7 @@ class Setup(val datadir: File, // we want to make sure the handler for post-restart broken HTLCs has finished initializing. _ <- postRestartCleanUpInitialized.future - txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcher, bitcoinClient) + txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, bitcoinClient) channelFactory = Peer.SimpleChannelFactory(nodeParams, watcher, relayer, bitcoinClient, txPublisherFactory) pendingChannelsRateLimiter = system.spawn(Behaviors.supervise(PendingChannelsRateLimiter(nodeParams, router.toTyped, channels)).onFailure(typed.SupervisorStrategy.resume), name = "pending-channels-rate-limiter") peerFactory = Switchboard.SimplePeerFactory(nodeParams, bitcoinClient, channelFactory, pendingChannelsRateLimiter, register, router.toTyped) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala index 2ec66eaf1d..d5126f4a8e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcher.scala @@ -63,6 +63,7 @@ object ZmqWatcher { private case class PublishBlockHeight(current: BlockHeight) extends Command private case class ProcessNewBlock(blockId: BlockId) extends Command private case class ProcessNewTransaction(tx: Transaction) extends Command + private case class SetWatchHint(w: GenericWatch, hint: WatchHint) extends Command final case class ValidateRequest(replyTo: ActorRef[ValidateResult], ann: ChannelAnnouncement) extends Command final case class ValidateResult(c: ChannelAnnouncement, fundingTx: Either[Throwable, (Transaction, UtxoStatus)]) @@ -155,7 +156,8 @@ object ZmqWatcher { case class WatchFundingDeeplyBuried(replyTo: ActorRef[WatchFundingDeeplyBuriedTriggered], txId: TxId, minDepth: Long) extends WatchConfirmed[WatchFundingDeeplyBuriedTriggered] case class WatchFundingDeeplyBuriedTriggered(blockHeight: BlockHeight, txIndex: Int, tx: Transaction) extends WatchConfirmedTriggered - case class WatchTxConfirmed(replyTo: ActorRef[WatchTxConfirmedTriggered], txId: TxId, minDepth: Long) extends WatchConfirmed[WatchTxConfirmedTriggered] + case class RelativeDelay(parentTxId: TxId, delay: Long) + case class WatchTxConfirmed(replyTo: ActorRef[WatchTxConfirmedTriggered], txId: TxId, minDepth: Long, delay_opt: Option[RelativeDelay] = None) extends WatchConfirmed[WatchTxConfirmedTriggered] case class WatchTxConfirmedTriggered(blockHeight: BlockHeight, txIndex: Int, tx: Transaction) extends WatchConfirmedTriggered case class WatchParentTxConfirmed(replyTo: ActorRef[WatchParentTxConfirmedTriggered], txId: TxId, minDepth: Long) extends WatchConfirmed[WatchParentTxConfirmedTriggered] @@ -167,6 +169,13 @@ object ZmqWatcher { private sealed trait AddWatchResult private case object Keep extends AddWatchResult private case object Ignore extends AddWatchResult + + sealed trait WatchHint + /** + * In some cases we don't need to check watches every time a block is found and only need to check again after we + * reach a specific block height. This is for example the case for transactions with a CSV delay. + */ + private case class CheckAfterBlock(blockHeight: BlockHeight) extends WatchHint // @formatter:on def apply(nodeParams: NodeParams, blockCount: AtomicLong, client: BitcoinCoreClient): Behavior[Command] = @@ -178,7 +187,7 @@ object ZmqWatcher { timers.startSingleTimer(TickNewBlock, 1 second) // we start a timer in case we don't receive ZMQ block events timers.startSingleTimer(TickBlockTimeout, blockTimeout) - new ZmqWatcher(nodeParams, blockCount, client, context, timers).watching(Set.empty[GenericWatch], Map.empty[OutPoint, Set[GenericWatch]]) + new ZmqWatcher(nodeParams, blockCount, client, context, timers).watching(Map.empty[GenericWatch, Option[WatchHint]], Map.empty[OutPoint, Set[GenericWatch]]) } } @@ -224,7 +233,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client private val watchdog = context.spawn(Behaviors.supervise(BlockchainWatchdog(nodeParams, 150 seconds)).onFailure(SupervisorStrategy.resume), "blockchain-watchdog") - private def watching(watches: Set[GenericWatch], watchedUtxos: Map[OutPoint, Set[GenericWatch]]): Behavior[Command] = { + private def watching(watches: Map[GenericWatch, Option[WatchHint]], watchedUtxos: Map[OutPoint, Set[GenericWatch]]): Behavior[Command] = { Behaviors.receiveMessage { case ProcessNewTransaction(tx) => log.debug("analyzing txid={} tx={}", tx.txid, tx) @@ -239,7 +248,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client case _: WatchPublished => // nothing to do case _: WatchConfirmed[_] => // nothing to do } - watches.collect { + watches.keySet.collect { case w: WatchPublished if w.txId == tx.txid => context.self ! TriggerEvent(w.replyTo, w, WatchPublishedTriggered(tx)) } Behaviors.same @@ -279,21 +288,32 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client case Failure(t) => GetBlockCountFailed(t) case Success(currentHeight) => PublishBlockHeight(currentHeight) } - // TODO: beware of the herd effect - KamonExt.timeFuture(Metrics.NewBlockCheckConfirmedDuration.withoutTags()) { - Future.sequence(watches.collect { - case w: WatchPublished => checkPublished(w) - case w: WatchConfirmed[_] => checkConfirmed(w) - }) - } Behaviors.same case PublishBlockHeight(currentHeight) => log.debug("setting blockHeight={}", currentHeight) blockHeight.set(currentHeight.toLong) context.system.eventStream ! EventStream.Publish(CurrentBlockHeight(currentHeight)) + // TODO: should we try to mitigate the herd effect and not check all watches immediately? + KamonExt.timeFuture(Metrics.NewBlockCheckConfirmedDuration.withoutTags()) { + Future.sequence(watches.collect { + case (w: WatchPublished, _) => checkPublished(w) + case (w: WatchConfirmed[_], hint) => + hint match { + case Some(CheckAfterBlock(delayUntilBlock)) if currentHeight < delayUntilBlock => Future.successful(()) + case _ => checkConfirmed(w, currentHeight) + } + }) + } Behaviors.same + case SetWatchHint(w, hint) => + val watches1 = watches.get(w) match { + case Some(_) => watches + (w -> Some(hint)) + case None => watches + } + watching(watches1, watchedUtxos) + case TriggerEvent(replyTo, watch, event) => if (watches.contains(watch)) { log.debug("triggering {}", watch) @@ -323,7 +343,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client checkSpent(w) Keep case w: WatchConfirmed[_] => - checkConfirmed(w) + checkConfirmed(w, BlockHeight(blockHeight.get())) Keep case w: WatchPublished => checkPublished(w) @@ -333,14 +353,14 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client case Keep => log.debug("adding watch {}", w) context.watchWith(w.replyTo, StopWatching(w.replyTo)) - watching(watches + w, addWatchedUtxos(watchedUtxos, w)) + watching(watches + (w -> None), addWatchedUtxos(watchedUtxos, w)) case Ignore => Behaviors.same } case StopWatching(origin) => - // we remove watches associated to dead actors - val deprecatedWatches = watches.filter(_.replyTo == origin) + // We remove watches associated to dead actors. + val deprecatedWatches = watches.keySet.filter(_.replyTo == origin) val watchedUtxos1 = deprecatedWatches.foldLeft(watchedUtxos) { case (m, w) => removeWatchedUtxos(m, w) } watching(watches -- deprecatedWatches, watchedUtxos1) @@ -353,7 +373,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client Behaviors.same case r: ListWatches => - r.replyTo ! watches + r.replyTo ! watches.keySet Behaviors.same } @@ -414,7 +434,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client client.getTransaction(w.txId).map(tx => context.self ! TriggerEvent(w.replyTo, w, WatchPublishedTriggered(tx))) } - private def checkConfirmed(w: WatchConfirmed[_ <: WatchConfirmedTriggered]): Future[Unit] = { + private def checkConfirmed(w: WatchConfirmed[_ <: WatchConfirmedTriggered], currentHeight: BlockHeight): Future[Unit] = { log.debug("checking confirmations of txid={}", w.txId) // NB: this is very inefficient since internally we call `getrawtransaction` three times, but it doesn't really // matter because this only happens once, when the watched transaction has reached min_depth @@ -431,7 +451,33 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client } } } - case _ => Future.successful((): Unit) + case Some(confirmations) => + // Once the transaction is confirmed, we don't need to check again at every new block, we only need to check + // again once we should have reached the minimum depth to verify that there hasn't been a reorg. + context.self ! SetWatchHint(w, CheckAfterBlock(currentHeight + w.minDepth - confirmations)) + Future.successful(()) + case None => + w match { + case WatchTxConfirmed(_, _, _, Some(relativeDelay)) => + log.debug("txId={} has a relative delay of {} blocks, checking parentTxId={}", w.txId, relativeDelay.delay, relativeDelay.parentTxId) + // Note how we add one block to avoid an off-by-one: + // - if the parent is confirmed at block P + // - the CSV delay is D and the minimum depth is M + // - the first block that can include the child is P + D + // - the first block at which we can reach minimum depth is P + D + M + // - if we are currently at block P + N, the parent has C = N + 1 confirmations + // - we want to check at block P + N + D + M + 1 - C = P + N + D + M + 1 - (N + 1) = P + D + M + val delay = relativeDelay.delay + w.minDepth + 1 + client.getTxConfirmations(relativeDelay.parentTxId).map(_.getOrElse(0)).collect { + case confirmations if confirmations < delay => context.self ! SetWatchHint(w, CheckAfterBlock(currentHeight + delay - confirmations)) + } + case _ => + // The transaction is unconfirmed: we don't need to check again at every new block: we can check only once + // every minDepth blocks, which is more efficient. If the transaction is included at the current height in + // a reorg, we will trigger the watch one block later than expected, but this is fine. + context.self ! SetWatchHint(w, CheckAfterBlock(currentHeight + w.minDepth)) + Future.successful(()) + } } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala index e55d86d612..1146511f40 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/Channel.scala @@ -115,9 +115,9 @@ object Channel { def spawnTxPublisher(context: ActorContext, remoteNodeId: PublicKey): typed.ActorRef[TxPublisher.Command] } - case class SimpleTxPublisherFactory(nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], bitcoinClient: BitcoinCoreClient) extends TxPublisherFactory { + case class SimpleTxPublisherFactory(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient) extends TxPublisherFactory { override def spawnTxPublisher(context: ActorContext, remoteNodeId: PublicKey): typed.ActorRef[TxPublisher.Command] = { - context.spawn(Behaviors.supervise(TxPublisher(nodeParams, remoteNodeId, TxPublisher.SimpleChildFactory(nodeParams, bitcoinClient, watcher))).onFailure(typed.SupervisorStrategy.restart), "tx-publisher") + context.spawn(Behaviors.supervise(TxPublisher(nodeParams, remoteNodeId, TxPublisher.SimpleChildFactory(nodeParams, bitcoinClient))).onFailure(typed.SupervisorStrategy.restart), "tx-publisher") } } @@ -1714,7 +1714,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder with val (localCommitPublished1, claimHtlcTx_opt) = Closing.LocalClose.claimHtlcDelayedOutput(localCommitPublished, keyManager, d.commitments.latest, tx, nodeParams.currentFeerates, nodeParams.onChainFeeConf, d.finalScriptPubKey) claimHtlcTx_opt.foreach(claimHtlcTx => { txPublisher ! PublishFinalTx(claimHtlcTx, claimHtlcTx.fee, None) - blockchain ! WatchTxConfirmed(self, claimHtlcTx.tx.txid, nodeParams.channelConf.minDepthBlocks) + blockchain ! WatchTxConfirmed(self, claimHtlcTx.tx.txid, nodeParams.channelConf.minDepthBlocks, Some(RelativeDelay(tx.txid, d.commitments.params.remoteParams.toSelfDelay.toInt.toLong))) }) Closing.updateLocalCommitPublished(localCommitPublished1, tx) }), diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala index 728d3cfa40..31103a737b 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ErrorHandlers.scala @@ -18,10 +18,10 @@ package fr.acinq.eclair.channel.fsm import akka.actor.typed.scaladsl.adapter.actorRefAdapter import akka.actor.{ActorRef, FSM} -import fr.acinq.bitcoin.scalacompat.{ByteVector32, OutPoint, SatoshiLong, Transaction} +import fr.acinq.bitcoin.scalacompat.{ByteVector32, OutPoint, SatoshiLong, Transaction, TxId} import fr.acinq.eclair.NotificationsLogger import fr.acinq.eclair.NotificationsLogger.NotifyNodeOperator -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchOutputSpent, WatchTxConfirmed} +import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{RelativeDelay, WatchOutputSpent, WatchTxConfirmed} import fr.acinq.eclair.channel.Helpers.Closing import fr.acinq.eclair.channel._ import fr.acinq.eclair.channel.fsm.Channel.UnhandledExceptionStrategy @@ -164,9 +164,9 @@ trait ErrorHandlers extends CommonHandlers { /** * This helper method will watch txs only if they haven't yet reached minDepth */ - private def watchConfirmedIfNeeded(txs: Iterable[Transaction], irrevocablySpent: Map[OutPoint, Transaction]): Unit = { + private def watchConfirmedIfNeeded(txs: Iterable[Transaction], irrevocablySpent: Map[OutPoint, Transaction], relativeDelays: Map[TxId, RelativeDelay]): Unit = { val (skip, process) = txs.partition(Closing.inputsAlreadySpent(_, irrevocablySpent)) - process.foreach(tx => blockchain ! WatchTxConfirmed(self, tx.txid, nodeParams.channelConf.minDepthBlocks)) + process.foreach(tx => blockchain ! WatchTxConfirmed(self, tx.txid, nodeParams.channelConf.minDepthBlocks, relativeDelays.get(tx.txid))) skip.foreach(tx => log.debug(s"no need to watch txid=${tx.txid}, it has already been confirmed")) } @@ -219,21 +219,23 @@ trait ErrorHandlers extends CommonHandlers { List(PublishFinalTx(commitTx, commitment.commitInput.outPoint, "commit-tx", Closing.commitTxFee(commitment.commitInput, commitTx, localPaysCommitTxFees), None)) ++ (claimMainDelayedOutputTx.map(tx => PublishFinalTx(tx, tx.fee, None)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishFinalTx(tx, tx.fee, None))) case _: Transactions.AnchorOutputsCommitmentFormat => val redeemableHtlcTxs = htlcTxs.values.flatten.map(tx => PublishReplaceableTx(tx, commitment)) - val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx => PublishReplaceableTx(tx, commitment) } + val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx if !localCommitPublished.isConfirmed => PublishReplaceableTx(tx, commitment) } List(PublishFinalTx(commitTx, commitment.commitInput.outPoint, "commit-tx", Closing.commitTxFee(commitment.commitInput, commitTx, localPaysCommitTxFees), None)) ++ claimLocalAnchor ++ claimMainDelayedOutputTx.map(tx => PublishFinalTx(tx, tx.fee, None)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishFinalTx(tx, tx.fee, None)) } publishIfNeeded(publishQueue, irrevocablySpent) - // we watch: - // - the commitment tx itself, so that we can handle the case where we don't have any outputs - // - 'final txs' that send funds to our wallet and that spend outputs that only us control + // We watch: + // - the commitment tx itself, so that we can handle the case where we don't have any outputs + // - 'final txs' that send funds to our wallet and that spend outputs that only us control + // Our 'final txs" have a long relative delay: we provide that information to the watcher for efficiency. + val relativeDelays = (claimMainDelayedOutputTx ++ claimHtlcDelayedTxs).map(tx => tx.tx.txid -> RelativeDelay(tx.input.outPoint.txid, commitment.remoteParams.toSelfDelay.toInt.toLong)).toMap val watchConfirmedQueue = List(commitTx) ++ claimMainDelayedOutputTx.map(_.tx) ++ claimHtlcDelayedTxs.map(_.tx) - watchConfirmedIfNeeded(watchConfirmedQueue, irrevocablySpent) + watchConfirmedIfNeeded(watchConfirmedQueue, irrevocablySpent, relativeDelays) - // we watch outputs of the commitment tx that both parties may spend - // we also watch our local anchor: this ensures that we will correctly detect when it's confirmed and count its fees - // in the audit DB, even if we restart before confirmation - val watchSpentQueue = htlcTxs.keys ++ claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx => tx.input.outPoint } + // We watch outputs of the commitment tx that both parties may spend. + // We also watch our local anchor: this ensures that we will correctly detect when it's confirmed and count its fees + // in the audit DB, even if we restart before confirmation. + val watchSpentQueue = htlcTxs.keys ++ claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx if !localCommitPublished.isConfirmed => tx.input.outPoint } watchSpentIfNeeded(commitTx, watchSpentQueue, irrevocablySpent) } @@ -274,18 +276,18 @@ trait ErrorHandlers extends CommonHandlers { def doPublish(remoteCommitPublished: RemoteCommitPublished, commitment: FullCommitment): Unit = { import remoteCommitPublished._ - val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx => PublishReplaceableTx(tx, commitment) } + val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx if !remoteCommitPublished.isConfirmed => PublishReplaceableTx(tx, commitment) } val redeemableHtlcTxs = claimHtlcTxs.values.flatten.map(tx => PublishReplaceableTx(tx, commitment)) val publishQueue = claimLocalAnchor ++ claimMainOutputTx.map(tx => PublishFinalTx(tx, tx.fee, None)).toSeq ++ redeemableHtlcTxs publishIfNeeded(publishQueue, irrevocablySpent) - // we watch: + // We watch: // - the commitment tx itself, so that we can handle the case where we don't have any outputs // - 'final txs' that send funds to our wallet and that spend outputs that only us control val watchConfirmedQueue = List(commitTx) ++ claimMainOutputTx.map(_.tx) - watchConfirmedIfNeeded(watchConfirmedQueue, irrevocablySpent) + watchConfirmedIfNeeded(watchConfirmedQueue, irrevocablySpent, relativeDelays = Map.empty) - // we watch outputs of the commitment tx that both parties may spend + // We watch outputs of the commitment tx that both parties may spend. val watchSpentQueue = claimHtlcTxs.keys watchSpentIfNeeded(commitTx, watchSpentQueue, irrevocablySpent) } @@ -336,13 +338,13 @@ trait ErrorHandlers extends CommonHandlers { val publishQueue = (claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs).map(tx => PublishFinalTx(tx, tx.fee, None)) publishIfNeeded(publishQueue, irrevocablySpent) - // we watch: + // We watch: // - the commitment tx itself, so that we can handle the case where we don't have any outputs // - 'final txs' that send funds to our wallet and that spend outputs that only us control val watchConfirmedQueue = List(commitTx) ++ claimMainOutputTx.map(_.tx) - watchConfirmedIfNeeded(watchConfirmedQueue, irrevocablySpent) + watchConfirmedIfNeeded(watchConfirmedQueue, irrevocablySpent, relativeDelays = Map.empty) - // we watch outputs of the commitment tx that both parties may spend + // We watch outputs of the commitment tx that both parties may spend. val watchSpentQueue = (mainPenaltyTx ++ htlcPenaltyTxs).map(_.input.outPoint) watchSpentIfNeeded(commitTx, watchSpentQueue, irrevocablySpent) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/FinalTxPublisher.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/FinalTxPublisher.scala index e712e49f3c..6dba5078d2 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/FinalTxPublisher.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/FinalTxPublisher.scala @@ -19,7 +19,6 @@ package fr.acinq.eclair.channel.publish import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler} import akka.actor.typed.{ActorRef, Behavior} import fr.acinq.eclair.NodeParams -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext import fr.acinq.eclair.channel.publish.TxTimeLocksMonitor.CheckTx @@ -50,12 +49,12 @@ object FinalTxPublisher { case object Stop extends Command // @formatter:on - def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command], txPublishContext: TxPublishContext): Behavior[Command] = { + def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, txPublishContext: TxPublishContext): Behavior[Command] = { Behaviors.setup { context => Behaviors.withTimers { timers => Behaviors.withMdc(txPublishContext.mdc()) { Behaviors.receiveMessagePartial { - case Publish(replyTo, cmd) => new FinalTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, watcher, context, timers, txPublishContext).checkTimeLocks() + case Publish(replyTo, cmd) => new FinalTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, context, timers, txPublishContext).checkTimeLocks() case Stop => Behaviors.stopped } } @@ -69,7 +68,6 @@ private class FinalTxPublisher(nodeParams: NodeParams, replyTo: ActorRef[TxPublisher.PublishTxResult], cmd: TxPublisher.PublishFinalTx, bitcoinClient: BitcoinCoreClient, - watcher: ActorRef[ZmqWatcher.Command], context: ActorContext[FinalTxPublisher.Command], timers: TimerScheduler[FinalTxPublisher.Command], txPublishContext: TxPublishContext)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) { @@ -79,7 +77,7 @@ private class FinalTxPublisher(nodeParams: NodeParams, private val log = context.log def checkTimeLocks(): Behavior[Command] = { - val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, watcher, txPublishContext), "time-locks-monitor") + val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, bitcoinClient, txPublishContext), "time-locks-monitor") timeLocksChecker ! CheckTx(context.messageAdapter[TxTimeLocksMonitor.TimeLocksOk](_ => TimeLocksOk), cmd.tx, cmd.desc) Behaviors.receiveMessagePartial { case TimeLocksOk => checkParentPublished() diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisher.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisher.scala index b913c9f3f0..38385f31c1 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisher.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisher.scala @@ -19,7 +19,6 @@ package fr.acinq.eclair.channel.publish import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler} import akka.actor.typed.{ActorRef, Behavior} import fr.acinq.bitcoin.scalacompat.{SatoshiLong, Transaction} -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient import fr.acinq.eclair.blockchain.fee.{ConfirmationTarget, FeeratePerKw, FeeratesPerKw} import fr.acinq.eclair.channel.publish.ReplaceableTxFunder.FundedTx @@ -61,12 +60,12 @@ object ReplaceableTxPublisher { // Timer key to ensure we don't have multiple concurrent timers running. private case object BumpFeeKey - def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command], txPublishContext: TxPublishContext): Behavior[Command] = { + def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, txPublishContext: TxPublishContext): Behavior[Command] = { Behaviors.setup { context => Behaviors.withTimers { timers => Behaviors.withMdc(txPublishContext.mdc()) { Behaviors.receiveMessagePartial { - case Publish(replyTo, cmd) => new ReplaceableTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, watcher, context, timers, txPublishContext).checkPreconditions() + case Publish(replyTo, cmd) => new ReplaceableTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, context, timers, txPublishContext).checkPreconditions() case Stop => Behaviors.stopped } } @@ -108,7 +107,6 @@ private class ReplaceableTxPublisher(nodeParams: NodeParams, replyTo: ActorRef[TxPublisher.PublishTxResult], cmd: TxPublisher.PublishReplaceableTx, bitcoinClient: BitcoinCoreClient, - watcher: ActorRef[ZmqWatcher.Command], context: ActorContext[ReplaceableTxPublisher.Command], timers: TimerScheduler[ReplaceableTxPublisher.Command], txPublishContext: TxPublishContext)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) { @@ -141,7 +139,7 @@ private class ReplaceableTxPublisher(nodeParams: NodeParams, // There are no time locks on anchor transactions, we can claim them right away. case _: ClaimLocalAnchorWithWitnessData => chooseFeerate(txWithWitnessData) case _ => - val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, watcher, txPublishContext), "time-locks-monitor") + val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, bitcoinClient, txPublishContext), "time-locks-monitor") timeLocksChecker ! TxTimeLocksMonitor.CheckTx(context.messageAdapter[TxTimeLocksMonitor.TimeLocksOk](_ => TimeLocksOk), cmd.txInfo.tx, cmd.desc) Behaviors.receiveMessagePartial { case TimeLocksOk => chooseFeerate(txWithWitnessData) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxPublisher.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxPublisher.scala index 6903519824..0255dd9eb3 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxPublisher.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxPublisher.scala @@ -22,7 +22,6 @@ import akka.actor.typed.{ActorRef, Behavior} import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.bitcoin.scalacompat.{ByteVector32, OutPoint, Satoshi, Transaction, TxId} import fr.acinq.eclair.blockchain.CurrentBlockHeight -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient import fr.acinq.eclair.blockchain.fee.ConfirmationTarget import fr.acinq.eclair.channel.FullCommitment @@ -140,13 +139,13 @@ object TxPublisher { // @formatter:on } - case class SimpleChildFactory(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command]) extends ChildFactory { + case class SimpleChildFactory(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient) extends ChildFactory { // @formatter:off override def spawnFinalTxPublisher(context: ActorContext[TxPublisher.Command], txPublishContext: TxPublishContext): ActorRef[FinalTxPublisher.Command] = { - context.spawn(FinalTxPublisher(nodeParams, bitcoinClient, watcher, txPublishContext), s"final-tx-${txPublishContext.id}") + context.spawn(FinalTxPublisher(nodeParams, bitcoinClient, txPublishContext), s"final-tx-${txPublishContext.id}") } override def spawnReplaceableTxPublisher(context: ActorContext[Command], txPublishContext: TxPublishContext): ActorRef[ReplaceableTxPublisher.Command] = { - context.spawn(ReplaceableTxPublisher(nodeParams, bitcoinClient, watcher, txPublishContext), s"replaceable-tx-${txPublishContext.id}") + context.spawn(ReplaceableTxPublisher(nodeParams, bitcoinClient, txPublishContext), s"replaceable-tx-${txPublishContext.id}") } // @formatter:on } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitor.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitor.scala index 96976260b9..2b4762de5f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitor.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitor.scala @@ -20,15 +20,14 @@ import akka.actor.typed.eventstream.EventStream import akka.actor.typed.scaladsl.{ActorContext, Behaviors, TimerScheduler} import akka.actor.typed.{ActorRef, Behavior} import fr.acinq.bitcoin.scalacompat.{Transaction, TxId} -import fr.acinq.eclair.blockchain.CurrentBlockHeight -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchParentTxConfirmed, WatchParentTxConfirmedTriggered} +import fr.acinq.eclair.blockchain.{CurrentBlockHeight, OnChainChannelFunder} import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext import fr.acinq.eclair.transactions.Scripts import fr.acinq.eclair.{BlockHeight, NodeParams} +import scala.concurrent.ExecutionContext import scala.concurrent.duration.DurationLong -import scala.util.Random +import scala.util.{Failure, Random, Success} /** * Created by t-bast on 10/06/2021. @@ -47,15 +46,16 @@ object TxTimeLocksMonitor { case class CheckTx(replyTo: ActorRef[TimeLocksOk], tx: Transaction, desc: String) extends Command final case class WrappedCurrentBlockHeight(currentBlockHeight: BlockHeight) extends Command private case object CheckRelativeTimeLock extends Command - private case class ParentTxConfirmed(parentTxId: TxId) extends Command + private case class ParentTxStatus(parentTxId: TxId, confirmations_opt: Option[Int]) extends Command + private case class GetTxConfirmationsFailed(parentTxId: TxId, reason: Throwable) extends Command // @formatter:on - def apply(nodeParams: NodeParams, watcher: ActorRef[ZmqWatcher.Command], txPublishContext: TxPublishContext): Behavior[Command] = { + def apply(nodeParams: NodeParams, bitcoinClient: OnChainChannelFunder, txPublishContext: TxPublishContext): Behavior[Command] = { Behaviors.setup { context => Behaviors.withTimers { timers => Behaviors.withMdc(txPublishContext.mdc()) { Behaviors.receiveMessagePartial { - case cmd: CheckTx => new TxTimeLocksMonitor(nodeParams, cmd, watcher, context, timers).checkAbsoluteTimeLock() + case cmd: CheckTx => new TxTimeLocksMonitor(nodeParams, cmd, bitcoinClient, context, timers).checkAbsoluteTimeLock() } } } @@ -66,68 +66,103 @@ object TxTimeLocksMonitor { private class TxTimeLocksMonitor(nodeParams: NodeParams, cmd: TxTimeLocksMonitor.CheckTx, - watcher: ActorRef[ZmqWatcher.Command], + bitcoinClient: OnChainChannelFunder, context: ActorContext[TxTimeLocksMonitor.Command], - timers: TimerScheduler[TxTimeLocksMonitor.Command]) { + timers: TimerScheduler[TxTimeLocksMonitor.Command])(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) { import TxTimeLocksMonitor._ private val log = context.log - def checkAbsoluteTimeLock(): Behavior[Command] = { - val blockHeight = nodeParams.currentBlockHeight + private def checkAbsoluteTimeLock(): Behavior[Command] = { val cltvTimeout = Scripts.cltvTimeout(cmd.tx) - if (blockHeight < cltvTimeout) { - log.info("delaying publication of {} until block={} (current block={})", cmd.desc, cltvTimeout, blockHeight) - val messageAdapter = context.messageAdapter[CurrentBlockHeight](cbc => WrappedCurrentBlockHeight(cbc.blockHeight)) - context.system.eventStream ! EventStream.Subscribe(messageAdapter) - Behaviors.receiveMessagePartial { - case WrappedCurrentBlockHeight(currentBlockHeight) => - if (cltvTimeout <= currentBlockHeight) { - context.system.eventStream ! EventStream.Unsubscribe(messageAdapter) - timers.startSingleTimer(CheckRelativeTimeLock, (1 + Random.nextLong(nodeParams.channelConf.maxTxPublishRetryDelay.toMillis)).millis) - Behaviors.same - } else { - Behaviors.same - } - case CheckRelativeTimeLock => checkRelativeTimeLocks() - } + context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[CurrentBlockHeight](cbc => WrappedCurrentBlockHeight(cbc.blockHeight))) + if (nodeParams.currentBlockHeight < cltvTimeout) { + log.info("delaying publication of {} until block={} (current block={})", cmd.desc, cltvTimeout, nodeParams.currentBlockHeight) + waitForAbsoluteTimeLock(cltvTimeout) } else { checkRelativeTimeLocks() } } - def checkRelativeTimeLocks(): Behavior[Command] = { + private def waitForAbsoluteTimeLock(cltvTimeout: BlockHeight): Behavior[Command] = { + Behaviors.receiveMessagePartial { + case WrappedCurrentBlockHeight(currentBlockHeight) if cltvTimeout <= currentBlockHeight => + timers.startSingleTimer(CheckRelativeTimeLock, (1 + Random.nextLong(nodeParams.channelConf.maxTxPublishRetryDelay.toMillis)).millis) + Behaviors.same + case WrappedCurrentBlockHeight(_) => Behaviors.same + case CheckRelativeTimeLock => checkRelativeTimeLocks() + } + } + + private def checkRelativeTimeLocks(): Behavior[Command] = { val csvTimeouts = Scripts.csvTimeouts(cmd.tx) if (csvTimeouts.nonEmpty) { - val watchConfirmedResponseMapper: ActorRef[WatchParentTxConfirmedTriggered] = context.messageAdapter(w => ParentTxConfirmed(w.tx.txid)) - csvTimeouts.foreach { + val parentTxs = csvTimeouts.map { case (parentTxId, csvTimeout) => - log.info("{} has a relative timeout of {} blocks, watching parentTxId={}", cmd.desc, csvTimeout, parentTxId) - watcher ! WatchParentTxConfirmed(watchConfirmedResponseMapper, parentTxId, minDepth = csvTimeout) + log.info("{} has a relative timeout of {} blocks, checking confirmations for parentTxId={}", cmd.desc, csvTimeout, parentTxId) + checkConfirmations(parentTxId) + parentTxId -> RelativeLockStatus(csvTimeout, nodeParams.currentBlockHeight + csvTimeout) } - waitForParentsToConfirm(csvTimeouts.keySet) + waitForRelativeTimeLocks(parentTxs) } else { notifySender() } } - def waitForParentsToConfirm(parentTxIds: Set[TxId]): Behavior[Command] = { + private case class RelativeLockStatus(csvTimeout: Long, checkAfterBlock: BlockHeight) + + private def waitForRelativeTimeLocks(parentTxs: Map[TxId, RelativeLockStatus]): Behavior[Command] = { Behaviors.receiveMessagePartial { - case ParentTxConfirmed(parentTxId) => - log.debug("parent tx of {} has been confirmed (parent txid={})", cmd.desc, parentTxId) - val remainingParentTxIds = parentTxIds - parentTxId - if (remainingParentTxIds.isEmpty) { - log.info("all parent txs of {} have been confirmed", cmd.desc) - notifySender() - } else { - log.debug("some parent txs of {} are still unconfirmed (parent txids={})", cmd.desc, remainingParentTxIds.mkString(",")) - waitForParentsToConfirm(remainingParentTxIds) + case ParentTxStatus(parentTxId, confirmations_opt) => + parentTxs.get(parentTxId) match { + case Some(status) => confirmations_opt match { + case Some(confirmations) if status.csvTimeout <= confirmations => + log.debug("parentTxId={} of {} has reached enough confirmations", parentTxId, cmd.desc) + val remainingParentTxs = parentTxs - parentTxId + if (remainingParentTxs.isEmpty) { + log.info("all parent txs of {} have reached enough confirmations", cmd.desc) + notifySender() + } else { + log.debug("some parent txs of {} don't have enough confirmations yet (parentTxIds={})", cmd.desc, remainingParentTxs.keySet.mkString(",")) + waitForRelativeTimeLocks(remainingParentTxs) + } + case Some(confirmations) => + log.debug("parentTxId={} doesn't have enough confirmations, retrying in {} blocks", parentTxId, status.csvTimeout - confirmations) + val status1 = status.copy(checkAfterBlock = nodeParams.currentBlockHeight + status.csvTimeout - confirmations) + waitForRelativeTimeLocks(parentTxs + (parentTxId -> status1)) + case None => + log.debug("parentTxId={} is unconfirmed, retrying in {} blocks", parentTxId, status.csvTimeout) + val status1 = status.copy(checkAfterBlock = nodeParams.currentBlockHeight + status.csvTimeout) + waitForRelativeTimeLocks(parentTxs + (parentTxId -> status1)) + } + case None => + log.debug("ignoring duplicate parentTxId={}", parentTxId) + Behaviors.same + } + case GetTxConfirmationsFailed(parentTxId, reason) => + log.warn("could not get tx confirmations for parentTxId={}, retrying ({})", parentTxId, reason.getMessage) + checkConfirmations(parentTxId) + Behaviors.same + case WrappedCurrentBlockHeight(currentBlockHeight) => + log.debug("received new block (height={})", currentBlockHeight) + parentTxs.collect { + case (parentTxId, status) if status.checkAfterBlock <= currentBlockHeight => + log.debug("checking confirmations for parentTxId={} ({} <= {})", parentTxId, status.checkAfterBlock, currentBlockHeight) + checkConfirmations(parentTxId) } + Behaviors.same + } + } + + private def checkConfirmations(parentTxId: TxId): Unit = { + context.pipeToSelf(bitcoinClient.getTxConfirmations(parentTxId)) { + case Success(confirmations_opt) => ParentTxStatus(parentTxId, confirmations_opt) + case Failure(reason) => GetTxConfirmationsFailed(parentTxId, reason) } } - def notifySender(): Behavior[Command] = { + private def notifySender(): Behavior[Command] = { log.debug("time locks satisfied for {}", cmd.desc) cmd.replyTo ! TimeLocksOk() Behaviors.stopped diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala index 3cc68fb47e..d0aaad7724 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/blockchain/bitcoind/ZmqWatcherSpec.scala @@ -227,6 +227,39 @@ class ZmqWatcherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bitcoind }) } + test("watch for confirmed transactions with relative delay") { + withWatcher(f => { + import f._ + + // We simulate a transaction with a 3-blocks CSV delay. + val (priv, address) = createExternalAddress() + val parentTx = sendToAddress(address, 50.millibtc, probe) + val tx = createSpendP2WPKH(parentTx, priv, priv.publicKey, 5_000 sat, 3, 0) + val delay = RelativeDelay(parentTx.txid, 3) + + watcher ! WatchTxConfirmed(probe.ref, tx.txid, 6, Some(delay)) + probe.expectNoMessage(100 millis) + + // We make the parent tx confirm to satisfy the CSV delay and publish the delayed transaction. + generateBlocks(3) + bitcoinClient.publishTransaction(tx).pipeTo(probe.ref) + probe.expectMsg(tx.txid) + probe.expectNoMessage(100 millis) + + // The delayed transaction confirms, but hasn't reached its minimum depth. + generateBlocks(3) + probe.expectNoMessage(100 millis) + + // The delayed transaction reaches its minimum depth. + generateBlocks(3) + assert(probe.expectMsgType[WatchTxConfirmedTriggered].tx.txid == tx.txid) + + // If we watch the transaction when it's already confirmed, we immediately receive the WatchEventConfirmed. + watcher ! WatchTxConfirmed(probe.ref, tx.txid, 3, Some(delay.copy(delay = 720))) + assert(probe.expectMsgType[WatchTxConfirmedTriggered].tx.txid == tx.txid) + }) + } + test("watch for spent transactions") { withWatcher(f => { import f._ diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/FinalTxPublisherSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/FinalTxPublisherSpec.scala index 8d8d59f9b8..d25e6d1ff8 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/FinalTxPublisherSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/FinalTxPublisherSpec.scala @@ -24,7 +24,6 @@ import fr.acinq.bitcoin.scalacompat.{SatoshiLong, Transaction, TxId} import fr.acinq.eclair.blockchain.CurrentBlockHeight import fr.acinq.eclair.blockchain.WatcherSpec.createSpendP2WPKH import fr.acinq.eclair.blockchain.bitcoind.BitcoindService -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchParentTxConfirmed, WatchParentTxConfirmedTriggered} import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient import fr.acinq.eclair.channel.publish.FinalTxPublisher.{Publish, Stop} import fr.acinq.eclair.channel.publish.TxPublisher.TxRejectedReason.ConflictingTxConfirmed @@ -48,14 +47,13 @@ class FinalTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bi stopBitcoind() } - case class Fixture(bitcoinClient: BitcoinCoreClient, publisher: ActorRef[FinalTxPublisher.Command], watcher: TestProbe, probe: TestProbe) + case class Fixture(bitcoinClient: BitcoinCoreClient, publisher: ActorRef[FinalTxPublisher.Command], probe: TestProbe) def createFixture(): Fixture = { val probe = TestProbe() - val watcher = TestProbe() val bitcoinClient = new BitcoinCoreClient(bitcoinrpcclient) - val publisher = system.spawnAnonymous(FinalTxPublisher(TestConstants.Alice.nodeParams, bitcoinClient, watcher.ref, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None))) - Fixture(bitcoinClient, publisher, watcher, probe) + val publisher = system.spawnAnonymous(FinalTxPublisher(TestConstants.Alice.nodeParams, bitcoinClient, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None))) + Fixture(bitcoinClient, publisher, probe) } def getMempool(bitcoinClient: BitcoinCoreClient, probe: TestProbe): Seq[Transaction] = { @@ -78,17 +76,13 @@ class FinalTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bi val (priv, address) = createExternalAddress() val parentTx = sendToAddress(address, 125_000 sat, probe) + createBlocks(5, probe) + val tx = createSpendP2WPKH(parentTx, priv, priv.publicKey, 2_500 sat, sequence = 5, lockTime = 0) val cmd = PublishFinalTx(tx, tx.txIn.head.outPoint, "tx-time-locks", 0 sat, None) publisher ! Publish(probe.ref, cmd) - val w = watcher.expectMsgType[WatchParentTxConfirmed] - assert(w.txId == parentTx.txid) - assert(w.minDepth == 5) - createBlocks(5, probe) - w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, parentTx) - - // Once time locks are satisfied, the transaction should be published: + // Time locks are satisfied, the transaction should be published: waitTxInMempool(bitcoinClient, tx.txid, probe) createBlocks(1, probe) probe.expectNoMessage(100 millis) // we don't notify the sender until min depth has been reached @@ -113,7 +107,6 @@ class FinalTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bi publisher ! Publish(probe.ref, cmd) // Since the parent is not published yet, we can't publish the child tx either: - watcher.expectNoMessage(100 millis) assert(!getMempool(bitcoinClient, probe).map(_.txid).contains(tx.txid)) // Once the parent tx is published, it will unblock publication of the child tx: diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisherSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisherSpec.scala index 580efbd02b..dc44a122da 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisherSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisherSpec.scala @@ -75,7 +75,7 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w def createPublisher(): ActorRef[ReplaceableTxPublisher.Command] = createPublisher(alice.underlyingActor.nodeParams) def createPublisher(nodeParams: NodeParams): ActorRef[ReplaceableTxPublisher.Command] = { - system.spawnAnonymous(ReplaceableTxPublisher(nodeParams, wallet, alice2blockchain.ref, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None))) + system.spawnAnonymous(ReplaceableTxPublisher(nodeParams, wallet, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None))) } def aliceBlockHeight(): BlockHeight = alice.underlyingActor.nodeParams.currentBlockHeight @@ -170,7 +170,7 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w generateBlocks(1) // Execute our test. - val publisher = system.spawn(ReplaceableTxPublisher(aliceNodeParams, walletClient, alice2blockchain.ref, TxPublishContext(testId, TestConstants.Bob.nodeParams.nodeId, None)), testId.toString) + val publisher = system.spawn(ReplaceableTxPublisher(aliceNodeParams, walletClient, TxPublishContext(testId, TestConstants.Bob.nodeParams.nodeId, None)), testId.toString) val f = Fixture(alice, bob, alice2bob, bob2alice, alice2blockchain, bob2blockchain, walletClient, walletRpcClient, publisher, probe) // We set a high fastest feerate, to ensure that by default we're not limited by this. f.setFeerate(FeeratePerKw(100_000 sat), blockTarget = 1) @@ -1035,12 +1035,10 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w withFixture(Seq(10.5 millibtc), ChannelTypes.AnchorOutputsZeroFeeHtlcTx()) { f => import f._ - val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight()) + val (_, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight()) val htlcSuccessPublisher = createPublisher() setFeerate(FeeratePerKw(75_000 sat), blockTarget = 1) htlcSuccessPublisher ! Publish(probe.ref, htlcSuccess) - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx) val result = probe.expectMsgType[TxRejected] assert(result.cmd == htlcSuccess) @@ -1054,8 +1052,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w val htlcSuccessPublisher = createPublisher() htlcSuccessPublisher ! Publish(probe.ref, htlcSuccess) - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx) val htlcSuccessTx = getMempoolTxs(1).head val htlcSuccessTargetFee = Transactions.weight2fee(targetFeerate, htlcSuccessTx.weight.toInt) assert(htlcSuccessTargetFee * 0.9 <= htlcSuccessTx.fees && htlcSuccessTx.fees <= htlcSuccessTargetFee * 1.2, s"actualFee=${htlcSuccessTx.fees} targetFee=$htlcSuccessTargetFee") @@ -1084,8 +1080,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w generateBlocks(144) system.eventStream.publish(CurrentBlockHeight(currentBlockHeight(probe))) setFeerate(targetFeerate) // the feerate is higher than what it was when the channel force-closed - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx) val htlcTimeoutTx = getMempoolTxs(1).head val htlcTimeoutTargetFee = Transactions.weight2fee(targetFeerate, htlcTimeoutTx.weight.toInt) assert(htlcTimeoutTargetFee * 0.9 <= htlcTimeoutTx.fees && htlcTimeoutTx.fees <= htlcTimeoutTargetFee * 1.2, s"actualFee=${htlcTimeoutTx.fees} targetFee=$htlcTimeoutTargetFee") @@ -1222,15 +1216,13 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w val initialFeerate = FeeratePerKw(15_000 sat) setFeerate(initialFeerate) - val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 30) + val (_, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 30) val listener = TestProbe() system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) val htlcSuccessPublisher = createPublisher() htlcSuccessPublisher ! Publish(probe.ref, htlcSuccess) - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(aliceBlockHeight(), 0, commitTx) val htlcSuccessTxId1 = listener.expectMsgType[TransactionPublished].tx.txid val htlcSuccessTx1 = getMempoolTxs(1).head val htlcSuccessInputs1 = getMempool().head.txIn.map(_.outPoint).toSet @@ -1258,15 +1250,13 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w val initialFeerate = FeeratePerKw(3_000 sat) setFeerate(initialFeerate) - val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 15) + val (_, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 15) val listener = TestProbe() system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) val htlcSuccessPublisher = createPublisher() htlcSuccessPublisher ! Publish(probe.ref, htlcSuccess) - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(aliceBlockHeight(), 0, commitTx) val htlcSuccessTxId1 = listener.expectMsgType[TransactionPublished].tx.txid val htlcSuccessTx1 = getMempoolTxs(1).head val htlcSuccessInputs1 = getMempool().head.txIn.map(_.outPoint).toSet @@ -1294,15 +1284,13 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w val initialFeerate = FeeratePerKw(10_000 sat) setFeerate(initialFeerate) - val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 6) + val (_, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 6) val listener = TestProbe() system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) val htlcSuccessPublisher = createPublisher() htlcSuccessPublisher ! Publish(probe.ref, htlcSuccess) - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(aliceBlockHeight(), 0, commitTx) val htlcSuccessTxId = listener.expectMsgType[TransactionPublished].tx.txid var htlcSuccessTx = getMempoolTxs(1).head assert(htlcSuccessTx.txid == htlcSuccessTxId) @@ -1327,7 +1315,7 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w val feerate = FeeratePerKw(15_000 sat) setFeerate(feerate, fastest = feerate) // The confirmation target for htlc-timeout corresponds to their CLTV: we should claim them asap once the htlc has timed out. - val (commitTx, _, htlcTimeout) = closeChannelWithHtlcs(f, aliceBlockHeight() + 144) + val (_, _, htlcTimeout) = closeChannelWithHtlcs(f, aliceBlockHeight() + 144) val listener = TestProbe() system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) @@ -1336,8 +1324,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w htlcTimeoutPublisher ! Publish(probe.ref, htlcTimeout) generateBlocks(144) system.eventStream.publish(CurrentBlockHeight(currentBlockHeight(probe))) - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx) val htlcTimeoutTxId1 = listener.expectMsgType[TransactionPublished].tx.txid val htlcTimeoutTx1 = getMempoolTxs(1).head val htlcTimeoutInputs1 = getMempool().head.txIn.map(_.outPoint).toSet @@ -1363,7 +1349,7 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w withFixture(Seq(15 millibtc, 10 millibtc, 5 millibtc), ChannelTypes.AnchorOutputsZeroFeeHtlcTx()) { f => import f._ - val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 144) + val (_, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 144) // The HTLC confirmation target is far away, but we have less safe utxos than the configured threshold. // We will target a 1-block confirmation to get a safe utxo back as soon as possible. val highSafeThresholdParams = alice.underlyingActor.nodeParams.modify(_.onChainFeeConf.safeUtxosThreshold).setTo(10) @@ -1373,8 +1359,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w val htlcSuccessPublisher = createPublisher(highSafeThresholdParams) htlcSuccessPublisher ! Publish(probe.ref, htlcSuccess) - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx) val htlcSuccessTx = getMempoolTxs(1).head val htlcSuccessTargetFee = Transactions.weight2fee(targetFeerate, htlcSuccessTx.weight.toInt) assert(htlcSuccessTargetFee * 0.9 <= htlcSuccessTx.fees && htlcSuccessTx.fees <= htlcSuccessTargetFee * 1.1, s"actualFee=${htlcSuccessTx.fees} targetFee=$htlcSuccessTargetFee") @@ -1390,16 +1374,12 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 18) val publisher1 = createPublisher() publisher1 ! Publish(probe.ref, htlcSuccess) - val w1 = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w1.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx) getMempoolTxs(1) // we try to publish the htlc-success again (can be caused by a node restart): it will fail to replace the existing // one in the mempool but we must ensure we don't leave some utxos locked. val publisher2 = createPublisher() publisher2 ! Publish(probe.ref, htlcSuccess) - val w2 = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w2.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx) val result = probe.expectMsgType[TxRejected] assert(result.reason == ConflictingTxUnconfirmed) getMempoolTxs(1) // the previous htlc-success tx is still in the mempool @@ -1424,10 +1404,8 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w import f._ setFeerate(FeeratePerKw(5_000 sat)) - val (commitTx, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 48) + val (_, htlcSuccess, _) = closeChannelWithHtlcs(f, aliceBlockHeight() + 48) publisher ! Publish(probe.ref, htlcSuccess) - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, commitTx) getMempoolTxs(1) // We unlock utxos before stopping. @@ -1545,8 +1523,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w val claimHtlcSuccessPublisher = createPublisher() claimHtlcSuccessPublisher ! Publish(probe.ref, claimHtlcSuccess) - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, remoteCommitTx) val claimHtlcSuccessTx = getMempoolTxs(1).head val claimHtlcSuccessTargetFee = Transactions.weight2fee(targetFeerate, claimHtlcSuccessTx.weight.toInt) assert(claimHtlcSuccessTargetFee * 0.9 <= claimHtlcSuccessTx.fees && claimHtlcSuccessTx.fees <= claimHtlcSuccessTargetFee * 1.1, s"actualFee=${claimHtlcSuccessTx.fees} targetFee=$claimHtlcSuccessTargetFee") @@ -1574,8 +1550,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w generateBlocks(144) system.eventStream.publish(CurrentBlockHeight(currentBlockHeight(probe))) setFeerate(targetFeerate) // the feerate is higher than what it was when the channel force-closed - val w = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, remoteCommitTx) val claimHtlcTimeoutTx = getMempoolTxs(1).head val claimHtlcTimeoutTargetFee = Transactions.weight2fee(targetFeerate, claimHtlcTimeoutTx.weight.toInt) assert(claimHtlcTimeoutTargetFee * 0.9 <= claimHtlcTimeoutTx.fees && claimHtlcTimeoutTx.fees <= claimHtlcTimeoutTargetFee * 1.1, s"actualFee=${claimHtlcTimeoutTx.fees} targetFee=$claimHtlcTimeoutTargetFee") @@ -1683,13 +1657,11 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w withFixture(Seq(11 millibtc), ChannelTypes.AnchorOutputs()) { f => import f._ - val (remoteCommitTx, claimHtlcSuccess, claimHtlcTimeout) = remoteCloseChannelWithHtlcs(f, aliceBlockHeight() + 300, nextCommit = false) + val (_, claimHtlcSuccess, claimHtlcTimeout) = remoteCloseChannelWithHtlcs(f, aliceBlockHeight() + 300, nextCommit = false) setFeerate(FeeratePerKw(50_000 sat)) val claimHtlcSuccessPublisher = createPublisher() claimHtlcSuccessPublisher ! Publish(probe.ref, claimHtlcSuccess) - val w1 = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w1.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, remoteCommitTx) val result1 = probe.expectMsgType[TxRejected] assert(result1.cmd == claimHtlcSuccess) assert(result1.reason == TxSkipped(retryNextBlock = true)) @@ -1699,8 +1671,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w claimHtlcTimeoutPublisher ! Publish(probe.ref, claimHtlcTimeout) generateBlocks(144) system.eventStream.publish(CurrentBlockHeight(currentBlockHeight(probe))) - val w2 = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w2.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, remoteCommitTx) val result2 = probe.expectMsgType[TxRejected] assert(result2.cmd == claimHtlcTimeout) assert(result2.reason == TxSkipped(retryNextBlock = true)) @@ -1771,7 +1741,7 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w withFixture(Seq(11 millibtc), ChannelTypes.AnchorOutputs()) { f => import f._ - val (remoteCommitTx, claimHtlcSuccess, _) = remoteCloseChannelWithHtlcs(f, aliceBlockHeight() + 300, nextCommit = false) + val (_, claimHtlcSuccess, _) = remoteCloseChannelWithHtlcs(f, aliceBlockHeight() + 300, nextCommit = false) val listener = TestProbe() system.eventStream.subscribe(listener.ref, classOf[TransactionPublished]) @@ -1779,8 +1749,6 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w setFeerate(FeeratePerKw(5_000 sat)) val claimHtlcSuccessPublisher = createPublisher() claimHtlcSuccessPublisher ! Publish(probe.ref, claimHtlcSuccess) - val w1 = alice2blockchain.expectMsgType[WatchParentTxConfirmed] - w1.replyTo ! WatchParentTxConfirmedTriggered(currentBlockHeight(probe), 0, remoteCommitTx) val claimHtlcSuccessTx = getMempoolTxs(1).head assert(listener.expectMsgType[TransactionPublished].tx.txid == claimHtlcSuccessTx.txid) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitorSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitorSpec.scala index c71f57d491..bebd58cade 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitorSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitorSpec.scala @@ -19,28 +19,47 @@ package fr.acinq.eclair.channel.publish import akka.actor.typed.ActorRef import akka.actor.typed.scaladsl.adapter.{ClassicActorSystemOps, actorRefAdapter} import akka.testkit.TestProbe -import fr.acinq.bitcoin.scalacompat.{OutPoint, SatoshiLong, Script, Transaction, TxIn, TxOut} -import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchParentTxConfirmed, WatchParentTxConfirmedTriggered} +import fr.acinq.bitcoin.scalacompat.{OutPoint, SatoshiLong, Script, Transaction, TxId, TxIn, TxOut} +import fr.acinq.eclair.blockchain.NoOpOnChainWallet import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext import fr.acinq.eclair.channel.publish.TxTimeLocksMonitor.{CheckTx, TimeLocksOk, WrappedCurrentBlockHeight} -import fr.acinq.eclair.{BlockHeight, NodeParams, TestConstants, TestKitBaseClass, randomKey} +import fr.acinq.eclair.{NodeParams, TestConstants, TestKitBaseClass, randomKey} import org.scalatest.Outcome import org.scalatest.funsuite.FixtureAnyFunSuiteLike import java.util.UUID import scala.concurrent.duration.DurationInt +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.Success class TxTimeLocksMonitorSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike { - case class FixtureParam(nodeParams: NodeParams, monitor: ActorRef[TxTimeLocksMonitor.Command], watcher: TestProbe, probe: TestProbe) + case class FixtureParam(nodeParams: NodeParams, monitor: ActorRef[TxTimeLocksMonitor.Command], bitcoinClient: BitcoinTestClient, probe: TestProbe) + + case class BitcoinTestClient() extends NoOpOnChainWallet { + private val requests = collection.concurrent.TrieMap.empty[TxId, Promise[Option[Int]]] + + override def getTxConfirmations(txId: TxId)(implicit ec: ExecutionContext): Future[Option[Int]] = { + val p = Promise[Option[Int]]() + requests += (txId -> p) + p.future + } + + def hasRequest(txId: TxId): Boolean = requests.contains(txId) + + def completeRequest(txId: TxId, confirmations_opt: Option[Int]): Unit = { + requests.get(txId).map(_.complete(Success(confirmations_opt))) + requests -= txId + } + } override def withFixture(test: OneArgTest): Outcome = { within(max = 30 seconds) { val nodeParams = TestConstants.Alice.nodeParams val probe = TestProbe() - val watcher = TestProbe() - val monitor = system.spawnAnonymous(TxTimeLocksMonitor(nodeParams, watcher.ref, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None))) - withFixture(test.toNoArgTest(FixtureParam(nodeParams, monitor, watcher, probe))) + val bitcoinClient = BitcoinTestClient() + val monitor = system.spawnAnonymous(TxTimeLocksMonitor(nodeParams, bitcoinClient, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None))) + withFixture(test.toNoArgTest(FixtureParam(nodeParams, monitor, bitcoinClient, probe))) } } @@ -65,12 +84,26 @@ class TxTimeLocksMonitorSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik val tx = Transaction(2, TxIn(OutPoint(parentTx, 0), Nil, 3) :: Nil, TxOut(25_000 sat, Script.pay2wpkh(randomKey().publicKey)) :: Nil, 0) monitor ! CheckTx(probe.ref, tx, "relative-delay") - val w = watcher.expectMsgType[WatchParentTxConfirmed] - assert(w.txId == parentTx.txid) - assert(w.minDepth == 3) + // The parent transaction is unconfirmed: we will check again 3 blocks later (the value of the CSV timeout). + awaitCond(bitcoinClient.hasRequest(parentTx.txid), interval = 100 millis) + bitcoinClient.completeRequest(parentTx.txid, None) probe.expectNoMessage(100 millis) - w.replyTo ! WatchParentTxConfirmedTriggered(BlockHeight(651), 0, parentTx) + monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 1) + assert(!bitcoinClient.hasRequest(parentTx.txid)) + monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 2) + assert(!bitcoinClient.hasRequest(parentTx.txid)) + monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 3) + awaitCond(bitcoinClient.hasRequest(parentTx.txid), interval = 100 millis) + // This time the parent transaction has 1 confirmation: we will check again in two more blocks. + bitcoinClient.completeRequest(parentTx.txid, Some(1)) + monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 1) + probe.expectNoMessage(100 millis) + assert(!bitcoinClient.hasRequest(parentTx.txid)) + monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 2) + awaitCond(bitcoinClient.hasRequest(parentTx.txid), interval = 100 millis) + bitcoinClient.completeRequest(parentTx.txid, Some(3)) + probe.expectMsg(TimeLocksOk()) } @@ -81,23 +114,33 @@ class TxTimeLocksMonitorSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik val parentTx2 = Transaction(2, Nil, TxOut(45_000 sat, Script.pay2wpkh(randomKey().publicKey)) :: Nil, 0) val tx = Transaction( 2, - TxIn(OutPoint(parentTx1, 0), Nil, 3) :: TxIn(OutPoint(parentTx1, 1), Nil, 1) :: TxIn(OutPoint(parentTx2, 0), Nil, 1) :: Nil, + TxIn(OutPoint(parentTx1, 0), Nil, 3) :: TxIn(OutPoint(parentTx1, 1), Nil, 1) :: TxIn(OutPoint(parentTx2, 0), Nil, 2) :: Nil, TxOut(50_000 sat, Script.pay2wpkh(randomKey().publicKey)) :: Nil, 0 ) monitor ! CheckTx(probe.ref, tx, "many-relative-delays") - // We send a single watch for parentTx1, with the max of the two delays. - val w1 = watcher.expectMsgType[WatchParentTxConfirmed] - val w2 = watcher.expectMsgType[WatchParentTxConfirmed] - watcher.expectNoMessage(100 millis) - assert(Seq(w1, w2).map(w => (w.txId, w.minDepth)).toSet == Set((parentTx1.txid, 3), (parentTx2.txid, 1))) + // The first parent transaction is unconfirmed. + awaitCond(bitcoinClient.hasRequest(parentTx1.txid), interval = 100 millis) + bitcoinClient.completeRequest(parentTx1.txid, None) + // The second parent transaction is confirmed, but is still missing one confirmation. + awaitCond(bitcoinClient.hasRequest(parentTx2.txid), interval = 100 millis) + bitcoinClient.completeRequest(parentTx2.txid, Some(1)) probe.expectNoMessage(100 millis) - w1.replyTo ! WatchParentTxConfirmedTriggered(BlockHeight(651), 0, parentTx1) + // A new block is found: the second parent transaction has enough confirmations. + monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 1) + awaitCond(bitcoinClient.hasRequest(parentTx2.txid), interval = 100 millis) + assert(!bitcoinClient.hasRequest(parentTx1.txid)) + bitcoinClient.completeRequest(parentTx2.txid, Some(2)) probe.expectNoMessage(100 millis) - w2.replyTo ! WatchParentTxConfirmedTriggered(BlockHeight(1105), 0, parentTx2) + // Two more blocks are found: the first parent transaction now has enough confirmations. + monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 3) + awaitCond(bitcoinClient.hasRequest(parentTx1.txid), interval = 100 millis) + assert(!bitcoinClient.hasRequest(parentTx2.txid)) + bitcoinClient.completeRequest(parentTx1.txid, Some(3)) + probe.expectMsg(TimeLocksOk()) } @@ -114,19 +157,18 @@ class TxTimeLocksMonitorSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik ) monitor ! CheckTx(probe.ref, tx, "absolute-and-relative-delays") - // We set watches on parent txs only once the absolute delay is over. - watcher.expectNoMessage(100 millis) - monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 3) - val w1 = watcher.expectMsgType[WatchParentTxConfirmed] - val w2 = watcher.expectMsgType[WatchParentTxConfirmed] - watcher.expectNoMessage(100 millis) - assert(Seq(w1, w2).map(w => (w.txId, w.minDepth)).toSet == Set((parentTx1.txid, 3), (parentTx2.txid, 6))) - probe.expectNoMessage(100 millis) + // We watch parent txs only once the absolute delay is over. + monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 2) + assert(!bitcoinClient.hasRequest(parentTx1.txid)) + assert(!bitcoinClient.hasRequest(parentTx2.txid)) - w1.replyTo ! WatchParentTxConfirmedTriggered(BlockHeight(651), 0, parentTx1) + // When the absolute delay is over, we check parent transactions. + monitor ! WrappedCurrentBlockHeight(nodeParams.currentBlockHeight + 3) + awaitCond(bitcoinClient.hasRequest(parentTx1.txid), interval = 100 millis) + bitcoinClient.completeRequest(parentTx1.txid, Some(5)) probe.expectNoMessage(100 millis) - - w2.replyTo ! WatchParentTxConfirmedTriggered(BlockHeight(1105), 0, parentTx2) + awaitCond(bitcoinClient.hasRequest(parentTx2.txid), interval = 100 millis) + bitcoinClient.completeRequest(parentTx2.txid, Some(10)) probe.expectMsg(TimeLocksOk()) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala index ebc97c182a..da20320206 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/ChannelStateTestsHelperMethods.scala @@ -564,7 +564,11 @@ trait ChannelStateTestsBase extends Assertions with Eventually { // we watch the confirmation of the "final" transactions that send funds to our wallets (main delayed output and 2nd stage htlc transactions) assert(s2blockchain.expectMsgType[WatchTxConfirmed].txId == commitTx.txid) - localCommitPublished.claimMainDelayedOutputTx.foreach(claimMain => assert(s2blockchain.expectMsgType[WatchTxConfirmed].txId == claimMain.tx.txid)) + localCommitPublished.claimMainDelayedOutputTx.foreach(claimMain => { + val watchConfirmed = s2blockchain.expectMsgType[WatchTxConfirmed] + assert(watchConfirmed.txId == claimMain.tx.txid) + assert(watchConfirmed.delay_opt.map(_.parentTxId).contains(publishedLocalCommitTx.txid)) + }) // we watch outputs of the commitment tx that both parties may spend and anchor outputs val watchedOutputIndexes = localCommitPublished.htlcTxs.keySet.map(_.index) ++ localCommitPublished.claimAnchorTxs.collect { case tx: ClaimLocalAnchorOutputTx => tx.input.outPoint.index } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala index 4b2e0a4ad7..8703d33c48 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/h/ClosingStateSpec.scala @@ -21,7 +21,6 @@ import akka.testkit.{TestFSMRef, TestProbe} import fr.acinq.bitcoin.ScriptFlags import fr.acinq.bitcoin.scalacompat.Crypto.PrivateKey import fr.acinq.bitcoin.scalacompat.{ByteVector32, ByteVector64, Crypto, OutPoint, SatoshiLong, Script, Transaction, TxIn, TxOut} -import fr.acinq.eclair.Features.StaticRemoteKey import fr.acinq.eclair.TestUtils.randomTxId import fr.acinq.eclair.blockchain.DummyOnChainWallet import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._ @@ -37,6 +36,7 @@ import fr.acinq.eclair.transactions.Transactions._ import fr.acinq.eclair.transactions.{Scripts, Transactions} import fr.acinq.eclair.wire.protocol._ import fr.acinq.eclair.{BlockHeight, CltvExpiry, CltvExpiryDelta, Features, MilliSatoshiLong, TestConstants, TestKitBaseClass, TimestampSecond, randomBytes32, randomKey} +import org.scalatest.Inside.inside import org.scalatest.funsuite.FixtureAnyFunSuiteLike import org.scalatest.{Outcome, Tag} import scodec.bits.ByteVector @@ -710,14 +710,12 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with // Alice receives the preimage for the incoming HTLC. alice ! CMD_FULFILL_HTLC(incomingHtlc.id, preimage, commit = true) - assert(alice2blockchain.expectMsgType[PublishReplaceableTx].txInfo.isInstanceOf[ClaimLocalAnchorOutputTx]) assert(alice2blockchain.expectMsgType[PublishFinalTx].tx.txid == claimMainTx.txid) assert(alice2blockchain.expectMsgType[PublishReplaceableTx].txInfo.isInstanceOf[HtlcTimeoutTx]) assert(alice2blockchain.expectMsgType[PublishReplaceableTx].txInfo.isInstanceOf[HtlcSuccessTx]) assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == claimMainTx.txid) alice2blockchain.expectMsgType[WatchOutputSpent] alice2blockchain.expectMsgType[WatchOutputSpent] - alice2blockchain.expectMsgType[WatchOutputSpent] alice2blockchain.expectNoMessage(100 millis) val closingState2 = alice.stateData.asInstanceOf[DATA_CLOSING].localCommitPublished.get assert(getHtlcSuccessTxs(closingState2).length == 1) @@ -726,11 +724,17 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with // The HTLC txs confirms, so we publish 3rd-stage txs. alice ! WatchTxConfirmedTriggered(BlockHeight(201), 0, htlcTimeoutTx) val claimHtlcTimeoutDelayedTx = alice2blockchain.expectMsgType[PublishFinalTx].tx - assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == claimHtlcTimeoutDelayedTx.txid) + inside(alice2blockchain.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == claimHtlcTimeoutDelayedTx.txid) + assert(w.delay_opt.map(_.parentTxId).contains(htlcTimeoutTx.txid)) + } Transaction.correctlySpends(claimHtlcTimeoutDelayedTx, Seq(htlcTimeoutTx), ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS) alice ! WatchTxConfirmedTriggered(BlockHeight(201), 0, htlcSuccessTx) val claimHtlcSuccessDelayedTx = alice2blockchain.expectMsgType[PublishFinalTx].tx - assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == claimHtlcSuccessDelayedTx.txid) + inside(alice2blockchain.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == claimHtlcSuccessDelayedTx.txid) + assert(w.delay_opt.map(_.parentTxId).contains(htlcSuccessTx.txid)) + } Transaction.correctlySpends(claimHtlcSuccessDelayedTx, Seq(htlcSuccessTx), ScriptFlags.STANDARD_SCRIPT_VERIFY_FLAGS) // We simulate a node restart after a feerate increase. @@ -742,14 +746,12 @@ class ClosingStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with awaitCond(alice.stateName == CLOSING) // We re-publish closing transactions. - assert(alice2blockchain.expectMsgType[PublishReplaceableTx].txInfo.isInstanceOf[ClaimLocalAnchorOutputTx]) assert(alice2blockchain.expectMsgType[PublishFinalTx].tx.txid == claimMainTx.txid) assert(alice2blockchain.expectMsgType[PublishFinalTx].tx.txid == claimHtlcTimeoutDelayedTx.txid) assert(alice2blockchain.expectMsgType[PublishFinalTx].tx.txid == claimHtlcSuccessDelayedTx.txid) assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == claimMainTx.txid) assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == claimHtlcTimeoutDelayedTx.txid) assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId == claimHtlcSuccessDelayedTx.txid) - alice2blockchain.expectMsgType[WatchOutputSpent] // We replay the HTLC fulfillment: nothing happens since we already published a 3rd-stage transaction. alice ! CMD_FULFILL_HTLC(incomingHtlc.id, preimage, commit = true) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala index bbb153a276..9d8bd4854f 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/fixtures/MinimalNodeFixture.scala @@ -96,7 +96,7 @@ object MinimalNodeFixture extends Assertions with Eventually with IntegrationPat val offerManager = system.spawn(OfferManager(nodeParams, router, 1 minute), "offer-manager") val paymentHandler = system.actorOf(PaymentHandler.props(nodeParams, register, offerManager), "payment-handler") val relayer = system.actorOf(Relayer.props(nodeParams, router, register, paymentHandler), "relayer") - val txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, watcherTyped, bitcoinClient) + val txPublisherFactory = Channel.SimpleTxPublisherFactory(nodeParams, bitcoinClient) val channelFactory = Peer.SimpleChannelFactory(nodeParams, watcherTyped, relayer, wallet, txPublisherFactory) val pendingChannelsRateLimiter = system.spawnAnonymous(Behaviors.supervise(PendingChannelsRateLimiter(nodeParams, router.toTyped, Seq())).onFailure(typed.SupervisorStrategy.resume)) val peerFactory = Switchboard.SimplePeerFactory(nodeParams, wallet, channelFactory, pendingChannelsRateLimiter, register, router.toTyped)