Skip to content

Commit

Permalink
Rename TxPublishLogContext (#2131)
Browse files Browse the repository at this point in the history
This data class is actually used for more than logging, we also use it to
track the id of a publish request.

We also switch to a different MDC key for transaction publishing instead
of re-using the paymentId one.
  • Loading branch information
t-bast authored Jan 19, 2022
1 parent 58f9ebc commit 52a6ee9
Show file tree
Hide file tree
Showing 13 changed files with 86 additions and 79 deletions.
11 changes: 9 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/Logs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,21 @@ object Logs {
* - for a channel-relay: the relay id
* - for a trampoline-relay: the relay id and the parent payment id of the outgoing payment
*/
def mdc(category_opt: Option[LogCategory] = None, remoteNodeId_opt: Option[PublicKey] = None, channelId_opt: Option[ByteVector32] = None, parentPaymentId_opt: Option[UUID] = None, paymentId_opt: Option[UUID] = None, paymentHash_opt: Option[ByteVector32] = None): Map[String, String] =
def mdc(category_opt: Option[LogCategory] = None,
remoteNodeId_opt: Option[PublicKey] = None,
channelId_opt: Option[ByteVector32] = None,
parentPaymentId_opt: Option[UUID] = None,
paymentId_opt: Option[UUID] = None,
paymentHash_opt: Option[ByteVector32] = None,
txPublishId_opt: Option[UUID] = None): Map[String, String] =
Seq(
category_opt.map(l => "category" -> s" ${l.category}"),
remoteNodeId_opt.map(n => "nodeId" -> s" n:$n"), // nb: we preformat MDC values so that there is no white spaces in logs when they are not defined
channelId_opt.map(c => "channelId" -> s" c:$c"),
parentPaymentId_opt.map(p => "parentPaymentId" -> s" p:$p"),
paymentId_opt.map(i => "paymentId" -> s" i:$i"),
paymentHash_opt.map(h => "paymentHash" -> s" h:$h")
paymentHash_opt.map(h => "paymentHash" -> s" h:$h"),
txPublishId_opt.map(t => "txPublishId" -> s" t:$t")
).flatten.toMap

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient
import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishLogContext
import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext
import fr.acinq.eclair.channel.publish.TxTimeLocksMonitor.CheckTx

import scala.concurrent.ExecutionContext
Expand Down Expand Up @@ -50,12 +50,12 @@ object FinalTxPublisher {
case object Stop extends Command
// @formatter:on

def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command], loggingInfo: TxPublishLogContext): Behavior[Command] = {
def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command], txPublishContext: TxPublishContext): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
Behaviors.withMdc(loggingInfo.mdc()) {
Behaviors.withMdc(txPublishContext.mdc()) {
Behaviors.receiveMessagePartial {
case Publish(replyTo, cmd) => new FinalTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, watcher, context, timers, loggingInfo).checkTimeLocks()
case Publish(replyTo, cmd) => new FinalTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, watcher, context, timers, txPublishContext).checkTimeLocks()
case Stop => Behaviors.stopped
}
}
Expand All @@ -72,14 +72,14 @@ private class FinalTxPublisher(nodeParams: NodeParams,
watcher: ActorRef[ZmqWatcher.Command],
context: ActorContext[FinalTxPublisher.Command],
timers: TimerScheduler[FinalTxPublisher.Command],
loggingInfo: TxPublishLogContext)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) {
txPublishContext: TxPublishContext)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) {

import FinalTxPublisher._

private val log = context.log

def checkTimeLocks(): Behavior[Command] = {
val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, watcher, loggingInfo), "time-locks-monitor")
val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, watcher, txPublishContext), "time-locks-monitor")
timeLocksChecker ! CheckTx(context.messageAdapter[TxTimeLocksMonitor.TimeLocksOk](_ => TimeLocksOk), cmd.tx, cmd.desc)
Behaviors.receiveMessagePartial {
case TimeLocksOk => checkParentPublished()
Expand All @@ -106,21 +106,21 @@ private class FinalTxPublisher(nodeParams: NodeParams,
Behaviors.same
case UnknownFailure(reason) =>
log.error("could not check parent tx", reason)
sendResult(TxPublisher.TxRejected(loggingInfo.id, cmd, TxPublisher.TxRejectedReason.UnknownTxFailure))
sendResult(TxPublisher.TxRejected(txPublishContext.id, cmd, TxPublisher.TxRejectedReason.UnknownTxFailure))
case Stop => Behaviors.stopped
}
case None => publish()
}
}

def publish(): Behavior[Command] = {
val txMonitor = context.spawn(MempoolTxMonitor(nodeParams, bitcoinClient, loggingInfo), "mempool-tx-monitor")
val txMonitor = context.spawn(MempoolTxMonitor(nodeParams, bitcoinClient, txPublishContext), "mempool-tx-monitor")
txMonitor ! MempoolTxMonitor.Publish(context.messageAdapter[MempoolTxMonitor.TxResult](WrappedTxResult), cmd.tx, cmd.input, cmd.desc, cmd.fee)
Behaviors.receiveMessagePartial {
case WrappedTxResult(txResult) =>
txResult match {
case _: MempoolTxMonitor.IntermediateTxResult => Behaviors.same
case MempoolTxMonitor.TxRejected(_, reason) => sendResult(TxPublisher.TxRejected(loggingInfo.id, cmd, reason))
case MempoolTxMonitor.TxRejected(_, reason) => sendResult(TxPublisher.TxRejected(txPublishContext.id, cmd, reason))
case MempoolTxMonitor.TxDeeplyBuried(tx) => sendResult(TxPublisher.TxConfirmed(cmd, tx))
}
case Stop => Behaviors.stopped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.{ByteVector32, OutPoint, Satoshi, Transaction}
import fr.acinq.eclair.blockchain.CurrentBlockHeight
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient
import fr.acinq.eclair.channel.publish.TxPublisher.{TxPublishLogContext, TxRejectedReason}
import fr.acinq.eclair.channel.publish.TxPublisher.{TxPublishContext, TxRejectedReason}
import fr.acinq.eclair.channel.{TransactionConfirmed, TransactionPublished}
import fr.acinq.eclair.{BlockHeight, NodeParams}

Expand Down Expand Up @@ -68,12 +68,12 @@ object MempoolTxMonitor {
case class TxRejected(txid: ByteVector32, reason: TxPublisher.TxRejectedReason) extends FinalTxResult
// @formatter:on

def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, loggingInfo: TxPublishLogContext): Behavior[Command] = {
def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, txPublishContext: TxPublishContext): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
Behaviors.withMdc(loggingInfo.mdc()) {
Behaviors.withMdc(txPublishContext.mdc()) {
Behaviors.receiveMessagePartial {
case cmd: Publish => new MempoolTxMonitor(nodeParams, cmd, bitcoinClient, loggingInfo, context, timers).publish()
case cmd: Publish => new MempoolTxMonitor(nodeParams, cmd, bitcoinClient, txPublishContext, context, timers).publish()
}
}
}
Expand All @@ -85,7 +85,7 @@ object MempoolTxMonitor {
private class MempoolTxMonitor(nodeParams: NodeParams,
cmd: MempoolTxMonitor.Publish,
bitcoinClient: BitcoinCoreClient,
loggingInfo: TxPublishLogContext,
txPublishContext: TxPublishContext,
context: ActorContext[MempoolTxMonitor.Command],
timers: TimerScheduler[MempoolTxMonitor.Command])(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) {

Expand All @@ -101,7 +101,7 @@ private class MempoolTxMonitor(nodeParams: NodeParams,
Behaviors.receiveMessagePartial {
case PublishOk =>
log.debug("txid={} was successfully published, waiting for confirmation...", cmd.tx.txid)
context.system.eventStream ! EventStream.Publish(TransactionPublished(loggingInfo.channelId_opt.getOrElse(ByteVector32.Zeroes), loggingInfo.remoteNodeId, cmd.tx, cmd.fee, cmd.desc))
context.system.eventStream ! EventStream.Publish(TransactionPublished(txPublishContext.channelId_opt.getOrElse(ByteVector32.Zeroes), txPublishContext.remoteNodeId, cmd.tx, cmd.fee, cmd.desc))
waitForConfirmation()
case PublishFailed(reason) if reason.getMessage.contains("rejecting replacement") =>
log.info("could not publish tx: a conflicting mempool transaction is already in the mempool")
Expand Down Expand Up @@ -155,7 +155,7 @@ private class MempoolTxMonitor(nodeParams: NodeParams,
Behaviors.same
} else {
log.info("txid={} has reached min depth", cmd.tx.txid)
context.system.eventStream ! EventStream.Publish(TransactionConfirmed(loggingInfo.channelId_opt.getOrElse(ByteVector32.Zeroes), loggingInfo.remoteNodeId, cmd.tx))
context.system.eventStream ! EventStream.Publish(TransactionConfirmed(txPublishContext.channelId_opt.getOrElse(ByteVector32.Zeroes), txPublishContext.remoteNodeId, cmd.tx))
sendFinalResult(TxDeeplyBuried(cmd.tx), Some(messageAdapter))
}
case TxNotFound =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient.FundTransaction
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.channel.Commitments
import fr.acinq.eclair.channel.publish.ReplaceableTxPrePublisher._
import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishLogContext
import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext
import fr.acinq.eclair.transactions.Transactions._
import fr.acinq.eclair.{NodeParams, NotificationsLogger}

Expand Down Expand Up @@ -68,9 +68,9 @@ object ReplaceableTxFunder {
case class FundingFailed(reason: TxPublisher.TxRejectedReason) extends FundingResult
// @formatter:on

def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, loggingInfo: TxPublishLogContext): Behavior[Command] = {
def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, txPublishContext: TxPublishContext): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withMdc(loggingInfo.mdc()) {
Behaviors.withMdc(txPublishContext.mdc()) {
Behaviors.receiveMessagePartial {
case FundTransaction(replyTo, cmd, tx, targetFeerate) =>
val txFunder = new ReplaceableTxFunder(nodeParams, replyTo, cmd, bitcoinClient, context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.{ByteVector32, ByteVector64, Crypto, Transaction}
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient
import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishLogContext
import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext
import fr.acinq.eclair.channel.{Commitments, HtlcTxAndRemoteSig}
import fr.acinq.eclair.transactions.Transactions
import fr.acinq.eclair.transactions.Transactions._
Expand Down Expand Up @@ -96,9 +96,9 @@ object ReplaceableTxPrePublisher {
}
// @formatter:on

def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, loggingInfo: TxPublishLogContext): Behavior[Command] = {
def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, txPublishContext: TxPublishContext): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withMdc(loggingInfo.mdc()) {
Behaviors.withMdc(txPublishContext.mdc()) {
Behaviors.receiveMessagePartial {
case CheckPreconditions(replyTo, cmd) =>
val prePublisher = new ReplaceableTxPrePublisher(nodeParams, replyTo, cmd, bitcoinClient, context)
Expand Down
Loading

0 comments on commit 52a6ee9

Please sign in to comment.