diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/Logs.scala b/eclair-core/src/main/scala/fr/acinq/eclair/Logs.scala index 32d4882fd2..93d94c199f 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/Logs.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/Logs.scala @@ -43,14 +43,21 @@ object Logs { * - for a channel-relay: the relay id * - for a trampoline-relay: the relay id and the parent payment id of the outgoing payment */ - def mdc(category_opt: Option[LogCategory] = None, remoteNodeId_opt: Option[PublicKey] = None, channelId_opt: Option[ByteVector32] = None, parentPaymentId_opt: Option[UUID] = None, paymentId_opt: Option[UUID] = None, paymentHash_opt: Option[ByteVector32] = None): Map[String, String] = + def mdc(category_opt: Option[LogCategory] = None, + remoteNodeId_opt: Option[PublicKey] = None, + channelId_opt: Option[ByteVector32] = None, + parentPaymentId_opt: Option[UUID] = None, + paymentId_opt: Option[UUID] = None, + paymentHash_opt: Option[ByteVector32] = None, + txPublishId_opt: Option[UUID] = None): Map[String, String] = Seq( category_opt.map(l => "category" -> s" ${l.category}"), remoteNodeId_opt.map(n => "nodeId" -> s" n:$n"), // nb: we preformat MDC values so that there is no white spaces in logs when they are not defined channelId_opt.map(c => "channelId" -> s" c:$c"), parentPaymentId_opt.map(p => "parentPaymentId" -> s" p:$p"), paymentId_opt.map(i => "paymentId" -> s" i:$i"), - paymentHash_opt.map(h => "paymentHash" -> s" h:$h") + paymentHash_opt.map(h => "paymentHash" -> s" h:$h"), + txPublishId_opt.map(t => "txPublishId" -> s" t:$t") ).flatten.toMap /** diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/FinalTxPublisher.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/FinalTxPublisher.scala index d3ca8cd043..f019b994c1 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/FinalTxPublisher.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/FinalTxPublisher.scala @@ -21,7 +21,7 @@ import akka.actor.typed.{ActorRef, Behavior} import fr.acinq.eclair.NodeParams import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient -import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishLogContext +import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext import fr.acinq.eclair.channel.publish.TxTimeLocksMonitor.CheckTx import scala.concurrent.ExecutionContext @@ -50,12 +50,12 @@ object FinalTxPublisher { case object Stop extends Command // @formatter:on - def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command], loggingInfo: TxPublishLogContext): Behavior[Command] = { + def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command], txPublishContext: TxPublishContext): Behavior[Command] = { Behaviors.setup { context => Behaviors.withTimers { timers => - Behaviors.withMdc(loggingInfo.mdc()) { + Behaviors.withMdc(txPublishContext.mdc()) { Behaviors.receiveMessagePartial { - case Publish(replyTo, cmd) => new FinalTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, watcher, context, timers, loggingInfo).checkTimeLocks() + case Publish(replyTo, cmd) => new FinalTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, watcher, context, timers, txPublishContext).checkTimeLocks() case Stop => Behaviors.stopped } } @@ -72,14 +72,14 @@ private class FinalTxPublisher(nodeParams: NodeParams, watcher: ActorRef[ZmqWatcher.Command], context: ActorContext[FinalTxPublisher.Command], timers: TimerScheduler[FinalTxPublisher.Command], - loggingInfo: TxPublishLogContext)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) { + txPublishContext: TxPublishContext)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) { import FinalTxPublisher._ private val log = context.log def checkTimeLocks(): Behavior[Command] = { - val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, watcher, loggingInfo), "time-locks-monitor") + val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, watcher, txPublishContext), "time-locks-monitor") timeLocksChecker ! CheckTx(context.messageAdapter[TxTimeLocksMonitor.TimeLocksOk](_ => TimeLocksOk), cmd.tx, cmd.desc) Behaviors.receiveMessagePartial { case TimeLocksOk => checkParentPublished() @@ -106,7 +106,7 @@ private class FinalTxPublisher(nodeParams: NodeParams, Behaviors.same case UnknownFailure(reason) => log.error("could not check parent tx", reason) - sendResult(TxPublisher.TxRejected(loggingInfo.id, cmd, TxPublisher.TxRejectedReason.UnknownTxFailure)) + sendResult(TxPublisher.TxRejected(txPublishContext.id, cmd, TxPublisher.TxRejectedReason.UnknownTxFailure)) case Stop => Behaviors.stopped } case None => publish() @@ -114,13 +114,13 @@ private class FinalTxPublisher(nodeParams: NodeParams, } def publish(): Behavior[Command] = { - val txMonitor = context.spawn(MempoolTxMonitor(nodeParams, bitcoinClient, loggingInfo), "mempool-tx-monitor") + val txMonitor = context.spawn(MempoolTxMonitor(nodeParams, bitcoinClient, txPublishContext), "mempool-tx-monitor") txMonitor ! MempoolTxMonitor.Publish(context.messageAdapter[MempoolTxMonitor.TxResult](WrappedTxResult), cmd.tx, cmd.input, cmd.desc, cmd.fee) Behaviors.receiveMessagePartial { case WrappedTxResult(txResult) => txResult match { case _: MempoolTxMonitor.IntermediateTxResult => Behaviors.same - case MempoolTxMonitor.TxRejected(_, reason) => sendResult(TxPublisher.TxRejected(loggingInfo.id, cmd, reason)) + case MempoolTxMonitor.TxRejected(_, reason) => sendResult(TxPublisher.TxRejected(txPublishContext.id, cmd, reason)) case MempoolTxMonitor.TxDeeplyBuried(tx) => sendResult(TxPublisher.TxConfirmed(cmd, tx)) } case Stop => Behaviors.stopped diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitor.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitor.scala index 982bb38f3b..5c672ce288 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitor.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitor.scala @@ -22,7 +22,7 @@ import akka.actor.typed.{ActorRef, Behavior} import fr.acinq.bitcoin.{ByteVector32, OutPoint, Satoshi, Transaction} import fr.acinq.eclair.blockchain.CurrentBlockHeight import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient -import fr.acinq.eclair.channel.publish.TxPublisher.{TxPublishLogContext, TxRejectedReason} +import fr.acinq.eclair.channel.publish.TxPublisher.{TxPublishContext, TxRejectedReason} import fr.acinq.eclair.channel.{TransactionConfirmed, TransactionPublished} import fr.acinq.eclair.{BlockHeight, NodeParams} @@ -68,12 +68,12 @@ object MempoolTxMonitor { case class TxRejected(txid: ByteVector32, reason: TxPublisher.TxRejectedReason) extends FinalTxResult // @formatter:on - def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, loggingInfo: TxPublishLogContext): Behavior[Command] = { + def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, txPublishContext: TxPublishContext): Behavior[Command] = { Behaviors.setup { context => Behaviors.withTimers { timers => - Behaviors.withMdc(loggingInfo.mdc()) { + Behaviors.withMdc(txPublishContext.mdc()) { Behaviors.receiveMessagePartial { - case cmd: Publish => new MempoolTxMonitor(nodeParams, cmd, bitcoinClient, loggingInfo, context, timers).publish() + case cmd: Publish => new MempoolTxMonitor(nodeParams, cmd, bitcoinClient, txPublishContext, context, timers).publish() } } } @@ -85,7 +85,7 @@ object MempoolTxMonitor { private class MempoolTxMonitor(nodeParams: NodeParams, cmd: MempoolTxMonitor.Publish, bitcoinClient: BitcoinCoreClient, - loggingInfo: TxPublishLogContext, + txPublishContext: TxPublishContext, context: ActorContext[MempoolTxMonitor.Command], timers: TimerScheduler[MempoolTxMonitor.Command])(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) { @@ -101,7 +101,7 @@ private class MempoolTxMonitor(nodeParams: NodeParams, Behaviors.receiveMessagePartial { case PublishOk => log.debug("txid={} was successfully published, waiting for confirmation...", cmd.tx.txid) - context.system.eventStream ! EventStream.Publish(TransactionPublished(loggingInfo.channelId_opt.getOrElse(ByteVector32.Zeroes), loggingInfo.remoteNodeId, cmd.tx, cmd.fee, cmd.desc)) + context.system.eventStream ! EventStream.Publish(TransactionPublished(txPublishContext.channelId_opt.getOrElse(ByteVector32.Zeroes), txPublishContext.remoteNodeId, cmd.tx, cmd.fee, cmd.desc)) waitForConfirmation() case PublishFailed(reason) if reason.getMessage.contains("rejecting replacement") => log.info("could not publish tx: a conflicting mempool transaction is already in the mempool") @@ -155,7 +155,7 @@ private class MempoolTxMonitor(nodeParams: NodeParams, Behaviors.same } else { log.info("txid={} has reached min depth", cmd.tx.txid) - context.system.eventStream ! EventStream.Publish(TransactionConfirmed(loggingInfo.channelId_opt.getOrElse(ByteVector32.Zeroes), loggingInfo.remoteNodeId, cmd.tx)) + context.system.eventStream ! EventStream.Publish(TransactionConfirmed(txPublishContext.channelId_opt.getOrElse(ByteVector32.Zeroes), txPublishContext.remoteNodeId, cmd.tx)) sendFinalResult(TxDeeplyBuried(cmd.tx), Some(messageAdapter)) } case TxNotFound => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxFunder.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxFunder.scala index 313c770fa4..9ed945bd56 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxFunder.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxFunder.scala @@ -26,7 +26,7 @@ import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient.FundTransaction import fr.acinq.eclair.blockchain.fee.FeeratePerKw import fr.acinq.eclair.channel.Commitments import fr.acinq.eclair.channel.publish.ReplaceableTxPrePublisher._ -import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishLogContext +import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext import fr.acinq.eclair.transactions.Transactions._ import fr.acinq.eclair.{NodeParams, NotificationsLogger} @@ -68,9 +68,9 @@ object ReplaceableTxFunder { case class FundingFailed(reason: TxPublisher.TxRejectedReason) extends FundingResult // @formatter:on - def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, loggingInfo: TxPublishLogContext): Behavior[Command] = { + def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, txPublishContext: TxPublishContext): Behavior[Command] = { Behaviors.setup { context => - Behaviors.withMdc(loggingInfo.mdc()) { + Behaviors.withMdc(txPublishContext.mdc()) { Behaviors.receiveMessagePartial { case FundTransaction(replyTo, cmd, tx, targetFeerate) => val txFunder = new ReplaceableTxFunder(nodeParams, replyTo, cmd, bitcoinClient, context) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPrePublisher.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPrePublisher.scala index fa3f008af4..8b2b213162 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPrePublisher.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPrePublisher.scala @@ -21,7 +21,7 @@ import akka.actor.typed.{ActorRef, Behavior} import fr.acinq.bitcoin.{ByteVector32, ByteVector64, Crypto, Transaction} import fr.acinq.eclair.NodeParams import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient -import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishLogContext +import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext import fr.acinq.eclair.channel.{Commitments, HtlcTxAndRemoteSig} import fr.acinq.eclair.transactions.Transactions import fr.acinq.eclair.transactions.Transactions._ @@ -96,9 +96,9 @@ object ReplaceableTxPrePublisher { } // @formatter:on - def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, loggingInfo: TxPublishLogContext): Behavior[Command] = { + def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, txPublishContext: TxPublishContext): Behavior[Command] = { Behaviors.setup { context => - Behaviors.withMdc(loggingInfo.mdc()) { + Behaviors.withMdc(txPublishContext.mdc()) { Behaviors.receiveMessagePartial { case CheckPreconditions(replyTo, cmd) => val prePublisher = new ReplaceableTxPrePublisher(nodeParams, replyTo, cmd, bitcoinClient, context) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisher.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisher.scala index 7416eebedd..0ca1c38e90 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisher.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisher.scala @@ -24,7 +24,7 @@ import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient import fr.acinq.eclair.blockchain.fee.{FeeEstimator, FeeratePerKw} import fr.acinq.eclair.channel.publish.ReplaceableTxFunder.FundedTx import fr.acinq.eclair.channel.publish.ReplaceableTxPrePublisher.{ClaimLocalAnchorWithWitnessData, ReplaceableTxWithWitnessData} -import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishLogContext +import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext import fr.acinq.eclair.{BlockHeight, NodeParams} import scala.concurrent.duration.{DurationInt, DurationLong} @@ -60,12 +60,12 @@ object ReplaceableTxPublisher { // Timer key to ensure we don't have multiple concurrent timers running. private case object BumpFeeKey - def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command], loggingInfo: TxPublishLogContext): Behavior[Command] = { + def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command], txPublishContext: TxPublishContext): Behavior[Command] = { Behaviors.setup { context => Behaviors.withTimers { timers => - Behaviors.withMdc(loggingInfo.mdc()) { + Behaviors.withMdc(txPublishContext.mdc()) { Behaviors.receiveMessagePartial { - case Publish(replyTo, cmd) => new ReplaceableTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, watcher, context, timers, loggingInfo).checkPreconditions() + case Publish(replyTo, cmd) => new ReplaceableTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, watcher, context, timers, txPublishContext).checkPreconditions() case Stop => Behaviors.stopped } } @@ -98,7 +98,7 @@ private class ReplaceableTxPublisher(nodeParams: NodeParams, watcher: ActorRef[ZmqWatcher.Command], context: ActorContext[ReplaceableTxPublisher.Command], timers: TimerScheduler[ReplaceableTxPublisher.Command], - loggingInfo: TxPublishLogContext)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) { + txPublishContext: TxPublishContext)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) { import ReplaceableTxPublisher._ @@ -107,13 +107,13 @@ private class ReplaceableTxPublisher(nodeParams: NodeParams, private var confirmBefore: BlockHeight = cmd.txInfo.confirmBefore def checkPreconditions(): Behavior[Command] = { - val prePublisher = context.spawn(ReplaceableTxPrePublisher(nodeParams, bitcoinClient, loggingInfo), "pre-publisher") + val prePublisher = context.spawn(ReplaceableTxPrePublisher(nodeParams, bitcoinClient, txPublishContext), "pre-publisher") prePublisher ! ReplaceableTxPrePublisher.CheckPreconditions(context.messageAdapter[ReplaceableTxPrePublisher.PreconditionsResult](WrappedPreconditionsResult), cmd) Behaviors.receiveMessagePartial { case WrappedPreconditionsResult(result) => result match { case ReplaceableTxPrePublisher.PreconditionsOk(txWithWitnessData) => checkTimeLocks(txWithWitnessData) - case ReplaceableTxPrePublisher.PreconditionsFailed(reason) => sendResult(TxPublisher.TxRejected(loggingInfo.id, cmd, reason), None) + case ReplaceableTxPrePublisher.PreconditionsFailed(reason) => sendResult(TxPublisher.TxRejected(txPublishContext.id, cmd, reason), None) } case UpdateConfirmationTarget(target) => confirmBefore = target @@ -127,7 +127,7 @@ private class ReplaceableTxPublisher(nodeParams: NodeParams, // There are no time locks on anchor transactions, we can claim them right away. case _: ClaimLocalAnchorWithWitnessData => fund(txWithWitnessData) case _ => - val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, watcher, loggingInfo), "time-locks-monitor") + val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, watcher, txPublishContext), "time-locks-monitor") timeLocksChecker ! TxTimeLocksMonitor.CheckTx(context.messageAdapter[TxTimeLocksMonitor.TimeLocksOk](_ => TimeLocksOk), cmd.txInfo.tx, cmd.desc) Behaviors.receiveMessagePartial { case TimeLocksOk => fund(txWithWitnessData) @@ -141,17 +141,17 @@ private class ReplaceableTxPublisher(nodeParams: NodeParams, def fund(txWithWitnessData: ReplaceableTxWithWitnessData): Behavior[Command] = { val targetFeerate = getFeerate(nodeParams.onChainFeeConf.feeEstimator, confirmBefore, nodeParams.currentBlockHeight) - val txFunder = context.spawn(ReplaceableTxFunder(nodeParams, bitcoinClient, loggingInfo), "tx-funder") + val txFunder = context.spawn(ReplaceableTxFunder(nodeParams, bitcoinClient, txPublishContext), "tx-funder") txFunder ! ReplaceableTxFunder.FundTransaction(context.messageAdapter[ReplaceableTxFunder.FundingResult](WrappedFundingResult), cmd, Right(txWithWitnessData), targetFeerate) Behaviors.receiveMessagePartial { case WrappedFundingResult(result) => result match { case ReplaceableTxFunder.TransactionReady(tx) => log.debug("publishing {} with confirmation target in {} blocks", cmd.desc, confirmBefore - nodeParams.currentBlockHeight) - val txMonitor = context.spawn(MempoolTxMonitor(nodeParams, bitcoinClient, loggingInfo), s"mempool-tx-monitor-${tx.signedTx.txid}") + val txMonitor = context.spawn(MempoolTxMonitor(nodeParams, bitcoinClient, txPublishContext), s"mempool-tx-monitor-${tx.signedTx.txid}") txMonitor ! MempoolTxMonitor.Publish(context.messageAdapter[MempoolTxMonitor.TxResult](WrappedTxResult), tx.signedTx, cmd.input, cmd.desc, tx.fee) wait(tx) - case ReplaceableTxFunder.FundingFailed(reason) => sendResult(TxPublisher.TxRejected(loggingInfo.id, cmd, reason), None) + case ReplaceableTxFunder.FundingFailed(reason) => sendResult(TxPublisher.TxRejected(txPublishContext.id, cmd, reason), None) } case UpdateConfirmationTarget(target) => confirmBefore = target @@ -189,7 +189,7 @@ private class ReplaceableTxPublisher(nodeParams: NodeParams, Behaviors.same case MempoolTxMonitor.TxRecentlyConfirmed(_, _) => Behaviors.same // just wait for the tx to be deeply buried case MempoolTxMonitor.TxDeeplyBuried(confirmedTx) => sendResult(TxPublisher.TxConfirmed(cmd, confirmedTx), None) - case MempoolTxMonitor.TxRejected(_, reason) => sendResult(TxPublisher.TxRejected(loggingInfo.id, cmd, reason), Some(Seq(tx.signedTx))) + case MempoolTxMonitor.TxRejected(_, reason) => sendResult(TxPublisher.TxRejected(txPublishContext.id, cmd, reason), Some(Seq(tx.signedTx))) } case BumpFee(targetFeerate) => fundReplacement(targetFeerate, tx) case UpdateConfirmationTarget(target) => @@ -202,7 +202,7 @@ private class ReplaceableTxPublisher(nodeParams: NodeParams, // Fund a replacement transaction because our previous attempt seems to be stuck in the mempool. def fundReplacement(targetFeerate: FeeratePerKw, previousTx: FundedTx): Behavior[Command] = { log.info("bumping {} fees: previous feerate={}, next feerate={}", cmd.desc, previousTx.feerate, targetFeerate) - val txFunder = context.spawn(ReplaceableTxFunder(nodeParams, bitcoinClient, loggingInfo), "tx-funder-rbf") + val txFunder = context.spawn(ReplaceableTxFunder(nodeParams, bitcoinClient, txPublishContext), "tx-funder-rbf") txFunder ! ReplaceableTxFunder.FundTransaction(context.messageAdapter[ReplaceableTxFunder.FundingResult](WrappedFundingResult), cmd, Left(previousTx), targetFeerate) Behaviors.receiveMessagePartial { case WrappedFundingResult(result) => @@ -232,7 +232,7 @@ private class ReplaceableTxPublisher(nodeParams: NodeParams, // Only one of them can be in the mempool, so we wait for the other to be rejected. Once that's done, we're back to a // situation where we have one transaction in the mempool and wait for it to confirm. def publishReplacement(previousTx: FundedTx, bumpedTx: FundedTx): Behavior[Command] = { - val txMonitor = context.spawn(MempoolTxMonitor(nodeParams, bitcoinClient, loggingInfo), s"mempool-tx-monitor-${bumpedTx.signedTx.txid}") + val txMonitor = context.spawn(MempoolTxMonitor(nodeParams, bitcoinClient, txPublishContext), s"mempool-tx-monitor-${bumpedTx.signedTx.txid}") txMonitor ! MempoolTxMonitor.Publish(context.messageAdapter[MempoolTxMonitor.TxResult](WrappedTxResult), bumpedTx.signedTx, cmd.input, cmd.desc, bumpedTx.fee) Behaviors.receiveMessagePartial { case WrappedTxResult(txResult) => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxPublisher.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxPublisher.scala index 0d4b783d38..edca99afde 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxPublisher.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxPublisher.scala @@ -124,28 +124,28 @@ object TxPublisher { // @formatter:on // @formatter:off - case class ChannelLogContext(remoteNodeId: PublicKey, channelId_opt: Option[ByteVector32]) { + case class ChannelContext(remoteNodeId: PublicKey, channelId_opt: Option[ByteVector32]) { def mdc(): Map[String, String] = Logs.mdc(remoteNodeId_opt = Some(remoteNodeId), channelId_opt = channelId_opt) } - case class TxPublishLogContext(id: UUID, remoteNodeId: PublicKey, channelId_opt: Option[ByteVector32]) { - def mdc(): Map[String, String] = Logs.mdc(remoteNodeId_opt = Some(remoteNodeId), channelId_opt = channelId_opt, paymentId_opt = Some(id)) + case class TxPublishContext(id: UUID, remoteNodeId: PublicKey, channelId_opt: Option[ByteVector32]) { + def mdc(): Map[String, String] = Logs.mdc(txPublishId_opt = Some(id), remoteNodeId_opt = Some(remoteNodeId), channelId_opt = channelId_opt) } // @formatter:on trait ChildFactory { // @formatter:off - def spawnFinalTxPublisher(context: ActorContext[TxPublisher.Command], loggingInfo: TxPublishLogContext): ActorRef[FinalTxPublisher.Command] - def spawnReplaceableTxPublisher(context: ActorContext[TxPublisher.Command], loggingInfo: TxPublishLogContext): ActorRef[ReplaceableTxPublisher.Command] + def spawnFinalTxPublisher(context: ActorContext[TxPublisher.Command], txPublishContext: TxPublishContext): ActorRef[FinalTxPublisher.Command] + def spawnReplaceableTxPublisher(context: ActorContext[TxPublisher.Command], txPublishContext: TxPublishContext): ActorRef[ReplaceableTxPublisher.Command] // @formatter:on } case class SimpleChildFactory(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command]) extends ChildFactory { // @formatter:off - override def spawnFinalTxPublisher(context: ActorContext[TxPublisher.Command], loggingInfo: TxPublishLogContext): ActorRef[FinalTxPublisher.Command] = { - context.spawn(FinalTxPublisher(nodeParams, bitcoinClient, watcher, loggingInfo), s"final-tx-${loggingInfo.id}") + override def spawnFinalTxPublisher(context: ActorContext[TxPublisher.Command], txPublishContext: TxPublishContext): ActorRef[FinalTxPublisher.Command] = { + context.spawn(FinalTxPublisher(nodeParams, bitcoinClient, watcher, txPublishContext), s"final-tx-${txPublishContext.id}") } - override def spawnReplaceableTxPublisher(context: ActorContext[Command], loggingInfo: TxPublishLogContext): ActorRef[ReplaceableTxPublisher.Command] = { - context.spawn(ReplaceableTxPublisher(nodeParams, bitcoinClient, watcher, loggingInfo), s"replaceable-tx-${loggingInfo.id}") + override def spawnReplaceableTxPublisher(context: ActorContext[Command], txPublishContext: TxPublishContext): ActorRef[ReplaceableTxPublisher.Command] = { + context.spawn(ReplaceableTxPublisher(nodeParams, bitcoinClient, watcher, txPublishContext), s"replaceable-tx-${txPublishContext.id}") } // @formatter:on } @@ -190,7 +190,7 @@ object TxPublisher { Behaviors.withTimers { timers => Behaviors.withMdc(Logs.mdc(remoteNodeId_opt = Some(remoteNodeId))) { context.system.eventStream ! EventStream.Subscribe(context.messageAdapter[CurrentBlockHeight](cbc => WrappedCurrentBlockHeight(cbc.blockHeight))) - new TxPublisher(nodeParams, factory, context, timers).run(Map.empty, Seq.empty, ChannelLogContext(remoteNodeId, None)) + new TxPublisher(nodeParams, factory, context, timers).run(Map.empty, Seq.empty, ChannelContext(remoteNodeId, None)) } } } @@ -203,7 +203,7 @@ private class TxPublisher(nodeParams: NodeParams, factory: TxPublisher.ChildFact private val log = context.log - private def run(pending: Map[OutPoint, PublishAttempts], retryNextBlock: Seq[PublishTx], channelInfo: ChannelLogContext): Behavior[Command] = { + private def run(pending: Map[OutPoint, PublishAttempts], retryNextBlock: Seq[PublishTx], channelContext: ChannelContext): Behavior[Command] = { Behaviors.receiveMessage { case cmd: PublishFinalTx => val attempts = pending.getOrElse(cmd.input, PublishAttempts.empty) @@ -214,9 +214,9 @@ private class TxPublisher(nodeParams: NodeParams, factory: TxPublisher.ChildFact } else { val publishId = UUID.randomUUID() log.info("publishing {} txid={} spending {}:{} with id={} ({} other attempts)", cmd.desc, cmd.tx.txid, cmd.input.txid, cmd.input.index, publishId, attempts.count) - val actor = factory.spawnFinalTxPublisher(context, TxPublishLogContext(publishId, channelInfo.remoteNodeId, channelInfo.channelId_opt)) + val actor = factory.spawnFinalTxPublisher(context, TxPublishContext(publishId, channelContext.remoteNodeId, channelContext.channelId_opt)) actor ! FinalTxPublisher.Publish(context.self, cmd) - run(pending + (cmd.input -> attempts.add(FinalAttempt(publishId, cmd, actor))), retryNextBlock, channelInfo) + run(pending + (cmd.input -> attempts.add(FinalAttempt(publishId, cmd, actor))), retryNextBlock, channelContext) } case cmd: PublishReplaceableTx => @@ -235,21 +235,21 @@ private class TxPublisher(nodeParams: NodeParams, factory: TxPublisher.ChildFact log.info("replaceable {} spending {}:{} has new confirmation target={} (previous={})", cmd.desc, cmd.input.txid, cmd.input.index, proposedConfirmationTarget, currentConfirmationTarget) currentAttempt.actor ! ReplaceableTxPublisher.UpdateConfirmationTarget(proposedConfirmationTarget) val attempts2 = attempts.copy(replaceableAttempt_opt = Some(currentAttempt.copy(confirmBefore = proposedConfirmationTarget))) - run(pending + (cmd.input -> attempts2), retryNextBlock, channelInfo) + run(pending + (cmd.input -> attempts2), retryNextBlock, channelContext) } case None => val publishId = UUID.randomUUID() log.info("publishing replaceable {} spending {}:{} with id={} ({} other attempts)", cmd.desc, cmd.input.txid, cmd.input.index, publishId, attempts.count) - val actor = factory.spawnReplaceableTxPublisher(context, TxPublishLogContext(publishId, channelInfo.remoteNodeId, channelInfo.channelId_opt)) + val actor = factory.spawnReplaceableTxPublisher(context, TxPublishContext(publishId, channelContext.remoteNodeId, channelContext.channelId_opt)) actor ! ReplaceableTxPublisher.Publish(context.self, cmd) val attempts2 = attempts.copy(replaceableAttempt_opt = Some(ReplaceableAttempt(publishId, cmd, proposedConfirmationTarget, actor))) - run(pending + (cmd.input -> attempts2), retryNextBlock, channelInfo) + run(pending + (cmd.input -> attempts2), retryNextBlock, channelContext) } case result: PublishTxResult => result match { case TxConfirmed(cmd, _) => pending.get(cmd.input).foreach(a => stopAttempts(a.attempts)) - run(pending - cmd.input, retryNextBlock, channelInfo) + run(pending - cmd.input, retryNextBlock, channelContext) case TxRejected(id, cmd, reason) => val (rejectedAttempts, remainingAttempts) = pending.getOrElse(cmd.input, PublishAttempts.empty).remove(id) stopAttempts(rejectedAttempts) @@ -260,33 +260,33 @@ private class TxPublisher(nodeParams: NodeParams, factory: TxPublisher.ChildFact // been replaced. We should be able to retry right now with new wallet inputs (no need to wait for a new // block). timers.startSingleTimer(cmd, (1 + Random.nextLong(nodeParams.maxTxPublishRetryDelay.toMillis)).millis) - run(pending2, retryNextBlock, channelInfo) + run(pending2, retryNextBlock, channelContext) case TxRejectedReason.CouldNotFund => // We don't have enough funds at the moment to afford our target feerate, but it may change once pending // transactions confirm, so we retry when a new block is found. - run(pending2, retryNextBlock ++ rejectedAttempts.map(_.cmd), channelInfo) + run(pending2, retryNextBlock ++ rejectedAttempts.map(_.cmd), channelContext) case TxRejectedReason.TxSkipped(retry) => val retryNextBlock2 = if (retry) retryNextBlock ++ rejectedAttempts.map(_.cmd) else retryNextBlock - run(pending2, retryNextBlock2, channelInfo) + run(pending2, retryNextBlock2, channelContext) case TxRejectedReason.ConflictingTxUnconfirmed => cmd match { case _: PublishFinalTx => // Our transaction is not replaceable, and the mempool contains a transaction that pays more fees, so // it doesn't make sense to retry, we will keep getting rejected. - run(pending2, retryNextBlock, channelInfo) + run(pending2, retryNextBlock, channelContext) case _: PublishReplaceableTx => // The mempool contains a transaction that pays more fees, but as we get closer to the confirmation // target, we will try to publish with higher fees, so if the conflicting transaction doesn't confirm, // we should be able to replace it before we reach the confirmation target. - run(pending2, retryNextBlock ++ rejectedAttempts.map(_.cmd), channelInfo) + run(pending2, retryNextBlock ++ rejectedAttempts.map(_.cmd), channelContext) } case TxRejectedReason.ConflictingTxConfirmed => // Our transaction was double-spent by a competing transaction that has been confirmed, so it doesn't make // sense to retry. - run(pending2, retryNextBlock, channelInfo) + run(pending2, retryNextBlock, channelContext) case TxRejectedReason.UnknownTxFailure => // We don't automatically retry unknown failures, they should be investigated manually. - run(pending2, retryNextBlock, channelInfo) + run(pending2, retryNextBlock, channelContext) } } @@ -295,10 +295,10 @@ private class TxPublisher(nodeParams: NodeParams, factory: TxPublisher.ChildFact log.info("{} transactions are still pending at block {}, retrying {} transactions that previously failed", pending.size, currentBlockHeight, retryNextBlock.length) retryNextBlock.foreach(cmd => timers.startSingleTimer(cmd, (1 + Random.nextLong(nodeParams.maxTxPublishRetryDelay.toMillis)).millis)) } - run(pending, Seq.empty, channelInfo) + run(pending, Seq.empty, channelContext) case SetChannelId(remoteNodeId, channelId) => - val channelInfo2 = channelInfo.copy(remoteNodeId = remoteNodeId, channelId_opt = Some(channelId)) + val channelInfo2 = channelContext.copy(remoteNodeId = remoteNodeId, channelId_opt = Some(channelId)) Behaviors.withMdc(channelInfo2.mdc()) { run(pending, retryNextBlock, channelInfo2) } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitor.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitor.scala index 3ebfcdc974..90332a07eb 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitor.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitor.scala @@ -23,7 +23,7 @@ import fr.acinq.bitcoin.{ByteVector32, Transaction} import fr.acinq.eclair.blockchain.CurrentBlockHeight import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchParentTxConfirmed, WatchParentTxConfirmedTriggered} -import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishLogContext +import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext import fr.acinq.eclair.transactions.Scripts import fr.acinq.eclair.{BlockHeight, NodeParams} @@ -50,10 +50,10 @@ object TxTimeLocksMonitor { private case class ParentTxConfirmed(parentTxId: ByteVector32) extends Command // @formatter:on - def apply(nodeParams: NodeParams, watcher: ActorRef[ZmqWatcher.Command], loggingInfo: TxPublishLogContext): Behavior[Command] = { + def apply(nodeParams: NodeParams, watcher: ActorRef[ZmqWatcher.Command], txPublishContext: TxPublishContext): Behavior[Command] = { Behaviors.setup { context => Behaviors.withTimers { timers => - Behaviors.withMdc(loggingInfo.mdc()) { + Behaviors.withMdc(txPublishContext.mdc()) { Behaviors.receiveMessagePartial { case cmd: CheckTx => new TxTimeLocksMonitor(nodeParams, cmd, watcher, context, timers).checkAbsoluteTimeLock() } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/FinalTxPublisherSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/FinalTxPublisherSpec.scala index 4b4b60592a..f007ac7d5f 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/FinalTxPublisherSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/FinalTxPublisherSpec.scala @@ -28,7 +28,7 @@ import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchParentTxConfirmed, W import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient import fr.acinq.eclair.channel.publish.FinalTxPublisher.{Publish, Stop} import fr.acinq.eclair.channel.publish.TxPublisher.TxRejectedReason.ConflictingTxConfirmed -import fr.acinq.eclair.channel.publish.TxPublisher.{PublishFinalTx, TxConfirmed, TxPublishLogContext, TxRejected} +import fr.acinq.eclair.channel.publish.TxPublisher.{PublishFinalTx, TxConfirmed, TxPublishContext, TxRejected} import fr.acinq.eclair.{TestConstants, TestKitBaseClass, randomKey} import org.scalatest.BeforeAndAfterAll import org.scalatest.funsuite.AnyFunSuiteLike @@ -54,7 +54,7 @@ class FinalTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike with Bi val probe = TestProbe() val watcher = TestProbe() val bitcoinClient = new BitcoinCoreClient(bitcoinrpcclient) - val publisher = system.spawnAnonymous(FinalTxPublisher(TestConstants.Alice.nodeParams, bitcoinClient, watcher.ref, TxPublishLogContext(UUID.randomUUID(), randomKey().publicKey, None))) + val publisher = system.spawnAnonymous(FinalTxPublisher(TestConstants.Alice.nodeParams, bitcoinClient, watcher.ref, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None))) Fixture(bitcoinClient, publisher, watcher, probe) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitorSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitorSpec.scala index 2c17144c66..705f29aa78 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitorSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/MempoolTxMonitorSpec.scala @@ -27,7 +27,7 @@ import fr.acinq.eclair.blockchain.WatcherSpec.{createSpendManyP2WPKH, createSpen import fr.acinq.eclair.blockchain.bitcoind.BitcoindService import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient import fr.acinq.eclair.channel.publish.MempoolTxMonitor._ -import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishLogContext +import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext import fr.acinq.eclair.channel.publish.TxPublisher.TxRejectedReason._ import fr.acinq.eclair.channel.{TransactionConfirmed, TransactionPublished} import fr.acinq.eclair.{TestConstants, TestKitBaseClass, randomBytes32, randomKey} @@ -54,7 +54,7 @@ class MempoolTxMonitorSpec extends TestKitBaseClass with AnyFunSuiteLike with Bi def createFixture(): Fixture = { val probe = TestProbe() val bitcoinClient = new BitcoinCoreClient(bitcoinrpcclient) - val monitor = system.spawnAnonymous(MempoolTxMonitor(TestConstants.Alice.nodeParams, bitcoinClient, TxPublishLogContext(UUID.randomUUID(), randomKey().publicKey, None))) + val monitor = system.spawnAnonymous(MempoolTxMonitor(TestConstants.Alice.nodeParams, bitcoinClient, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None))) val (priv, address) = createExternalAddress() val parentTx = sendToAddress(address, 125_000 sat, probe) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisherSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisherSpec.scala index d29dc79307..e12c5dd326 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisherSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/ReplaceableTxPublisherSpec.scala @@ -67,7 +67,7 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w probe: TestProbe) { def createPublisher(): ActorRef[ReplaceableTxPublisher.Command] = { - system.spawnAnonymous(ReplaceableTxPublisher(alice.underlyingActor.nodeParams, wallet, alice2blockchain.ref, TxPublishLogContext(UUID.randomUUID(), randomKey().publicKey, None))) + system.spawnAnonymous(ReplaceableTxPublisher(alice.underlyingActor.nodeParams, wallet, alice2blockchain.ref, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None))) } def aliceBlockHeight(): BlockHeight = alice.underlyingActor.nodeParams.currentBlockHeight @@ -141,7 +141,7 @@ class ReplaceableTxPublisherSpec extends TestKitBaseClass with AnyFunSuiteLike w generateBlocks(1) // Execute our test. - val publisher = system.spawn(ReplaceableTxPublisher(aliceNodeParams, walletClient, alice2blockchain.ref, TxPublishLogContext(testId, TestConstants.Bob.nodeParams.nodeId, None)), testId.toString) + val publisher = system.spawn(ReplaceableTxPublisher(aliceNodeParams, walletClient, alice2blockchain.ref, TxPublishContext(testId, TestConstants.Bob.nodeParams.nodeId, None)), testId.toString) testFun(Fixture(alice, bob, alice2bob, bob2alice, alice2blockchain, bob2blockchain, walletClient, walletRpcClient, publisher, probe)) } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/TxPublisherSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/TxPublisherSpec.scala index a40dd55a79..add2d7908c 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/TxPublisherSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/TxPublisherSpec.scala @@ -55,14 +55,14 @@ class TxPublisherSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike { case class FinalTxPublisherSpawned(id: UUID, actor: TestProbe) case class ReplaceableTxPublisherSpawned(id: UUID, actor: TestProbe) case class FakeChildFactory(factoryProbe: TestProbe) extends TxPublisher.ChildFactory { - override def spawnFinalTxPublisher(context: ActorContext[TxPublisher.Command], loggingInfo: TxPublisher.TxPublishLogContext): ActorRef[FinalTxPublisher.Command] = { + override def spawnFinalTxPublisher(context: ActorContext[TxPublisher.Command], txPublishContext: TxPublisher.TxPublishContext): ActorRef[FinalTxPublisher.Command] = { val actor = TestProbe() - factoryProbe.ref ! FinalTxPublisherSpawned(loggingInfo.id, actor) + factoryProbe.ref ! FinalTxPublisherSpawned(txPublishContext.id, actor) actor.ref } - override def spawnReplaceableTxPublisher(context: ActorContext[TxPublisher.Command], loggingInfo: TxPublisher.TxPublishLogContext): ActorRef[ReplaceableTxPublisher.Command] = { + override def spawnReplaceableTxPublisher(context: ActorContext[TxPublisher.Command], txPublishContext: TxPublisher.TxPublishContext): ActorRef[ReplaceableTxPublisher.Command] = { val actor = TestProbe() - factoryProbe.ref ! ReplaceableTxPublisherSpawned(loggingInfo.id, actor) + factoryProbe.ref ! ReplaceableTxPublisherSpawned(txPublishContext.id, actor) actor.ref } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitorSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitorSpec.scala index 7c64fcffac..3f0e55e671 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitorSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/publish/TxTimeLocksMonitorSpec.scala @@ -22,7 +22,7 @@ import akka.testkit.TestProbe import fr.acinq.bitcoin.{OutPoint, SatoshiLong, Script, Transaction, TxIn, TxOut} import fr.acinq.eclair.blockchain.CurrentBlockHeight import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{WatchParentTxConfirmed, WatchParentTxConfirmedTriggered} -import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishLogContext +import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext import fr.acinq.eclair.channel.publish.TxTimeLocksMonitor.{CheckTx, TimeLocksOk} import fr.acinq.eclair.{BlockHeight, NodeParams, TestConstants, TestKitBaseClass, randomKey} import org.scalatest.Outcome @@ -40,7 +40,7 @@ class TxTimeLocksMonitorSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik val nodeParams = TestConstants.Alice.nodeParams val probe = TestProbe() val watcher = TestProbe() - val monitor = system.spawnAnonymous(TxTimeLocksMonitor(nodeParams, watcher.ref, TxPublishLogContext(UUID.randomUUID(), randomKey().publicKey, None))) + val monitor = system.spawnAnonymous(TxTimeLocksMonitor(nodeParams, watcher.ref, TxPublishContext(UUID.randomUUID(), randomKey().publicKey, None))) withFixture(test.toNoArgTest(FixtureParam(nodeParams, monitor, watcher, probe))) } }