Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clarify tx publisher context and logging #2131

Merged
merged 1 commit into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/Logs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,21 @@ object Logs {
* - for a channel-relay: the relay id
* - for a trampoline-relay: the relay id and the parent payment id of the outgoing payment
*/
def mdc(category_opt: Option[LogCategory] = None, remoteNodeId_opt: Option[PublicKey] = None, channelId_opt: Option[ByteVector32] = None, parentPaymentId_opt: Option[UUID] = None, paymentId_opt: Option[UUID] = None, paymentHash_opt: Option[ByteVector32] = None): Map[String, String] =
def mdc(category_opt: Option[LogCategory] = None,
remoteNodeId_opt: Option[PublicKey] = None,
channelId_opt: Option[ByteVector32] = None,
parentPaymentId_opt: Option[UUID] = None,
paymentId_opt: Option[UUID] = None,
paymentHash_opt: Option[ByteVector32] = None,
txPublishId_opt: Option[UUID] = None): Map[String, String] =
Seq(
category_opt.map(l => "category" -> s" ${l.category}"),
remoteNodeId_opt.map(n => "nodeId" -> s" n:$n"), // nb: we preformat MDC values so that there is no white spaces in logs when they are not defined
channelId_opt.map(c => "channelId" -> s" c:$c"),
parentPaymentId_opt.map(p => "parentPaymentId" -> s" p:$p"),
paymentId_opt.map(i => "paymentId" -> s" i:$i"),
paymentHash_opt.map(h => "paymentHash" -> s" h:$h")
paymentHash_opt.map(h => "paymentHash" -> s" h:$h"),
txPublishId_opt.map(t => "txPublishId" -> s" t:$t")
).flatten.toMap

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient
import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishLogContext
import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext
import fr.acinq.eclair.channel.publish.TxTimeLocksMonitor.CheckTx

import scala.concurrent.ExecutionContext
Expand Down Expand Up @@ -50,12 +50,12 @@ object FinalTxPublisher {
case object Stop extends Command
// @formatter:on

def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command], loggingInfo: TxPublishLogContext): Behavior[Command] = {
def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command], txPublishContext: TxPublishContext): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
Behaviors.withMdc(loggingInfo.mdc()) {
Behaviors.withMdc(txPublishContext.mdc()) {
Behaviors.receiveMessagePartial {
case Publish(replyTo, cmd) => new FinalTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, watcher, context, timers, loggingInfo).checkTimeLocks()
case Publish(replyTo, cmd) => new FinalTxPublisher(nodeParams, replyTo, cmd, bitcoinClient, watcher, context, timers, txPublishContext).checkTimeLocks()
case Stop => Behaviors.stopped
}
}
Expand All @@ -72,14 +72,14 @@ private class FinalTxPublisher(nodeParams: NodeParams,
watcher: ActorRef[ZmqWatcher.Command],
context: ActorContext[FinalTxPublisher.Command],
timers: TimerScheduler[FinalTxPublisher.Command],
loggingInfo: TxPublishLogContext)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) {
txPublishContext: TxPublishContext)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) {

import FinalTxPublisher._

private val log = context.log

def checkTimeLocks(): Behavior[Command] = {
val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, watcher, loggingInfo), "time-locks-monitor")
val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, watcher, txPublishContext), "time-locks-monitor")
timeLocksChecker ! CheckTx(context.messageAdapter[TxTimeLocksMonitor.TimeLocksOk](_ => TimeLocksOk), cmd.tx, cmd.desc)
Behaviors.receiveMessagePartial {
case TimeLocksOk => checkParentPublished()
Expand All @@ -106,21 +106,21 @@ private class FinalTxPublisher(nodeParams: NodeParams,
Behaviors.same
case UnknownFailure(reason) =>
log.error("could not check parent tx", reason)
sendResult(TxPublisher.TxRejected(loggingInfo.id, cmd, TxPublisher.TxRejectedReason.UnknownTxFailure))
sendResult(TxPublisher.TxRejected(txPublishContext.id, cmd, TxPublisher.TxRejectedReason.UnknownTxFailure))
case Stop => Behaviors.stopped
}
case None => publish()
}
}

def publish(): Behavior[Command] = {
val txMonitor = context.spawn(MempoolTxMonitor(nodeParams, bitcoinClient, loggingInfo), "mempool-tx-monitor")
val txMonitor = context.spawn(MempoolTxMonitor(nodeParams, bitcoinClient, txPublishContext), "mempool-tx-monitor")
txMonitor ! MempoolTxMonitor.Publish(context.messageAdapter[MempoolTxMonitor.TxResult](WrappedTxResult), cmd.tx, cmd.input, cmd.desc, cmd.fee)
Behaviors.receiveMessagePartial {
case WrappedTxResult(txResult) =>
txResult match {
case _: MempoolTxMonitor.IntermediateTxResult => Behaviors.same
case MempoolTxMonitor.TxRejected(_, reason) => sendResult(TxPublisher.TxRejected(loggingInfo.id, cmd, reason))
case MempoolTxMonitor.TxRejected(_, reason) => sendResult(TxPublisher.TxRejected(txPublishContext.id, cmd, reason))
case MempoolTxMonitor.TxDeeplyBuried(tx) => sendResult(TxPublisher.TxConfirmed(cmd, tx))
}
case Stop => Behaviors.stopped
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.{ByteVector32, OutPoint, Satoshi, Transaction}
import fr.acinq.eclair.blockchain.CurrentBlockHeight
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient
import fr.acinq.eclair.channel.publish.TxPublisher.{TxPublishLogContext, TxRejectedReason}
import fr.acinq.eclair.channel.publish.TxPublisher.{TxPublishContext, TxRejectedReason}
import fr.acinq.eclair.channel.{TransactionConfirmed, TransactionPublished}
import fr.acinq.eclair.{BlockHeight, NodeParams}

Expand Down Expand Up @@ -68,12 +68,12 @@ object MempoolTxMonitor {
case class TxRejected(txid: ByteVector32, reason: TxPublisher.TxRejectedReason) extends FinalTxResult
// @formatter:on

def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, loggingInfo: TxPublishLogContext): Behavior[Command] = {
def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, txPublishContext: TxPublishContext): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
Behaviors.withMdc(loggingInfo.mdc()) {
Behaviors.withMdc(txPublishContext.mdc()) {
Behaviors.receiveMessagePartial {
case cmd: Publish => new MempoolTxMonitor(nodeParams, cmd, bitcoinClient, loggingInfo, context, timers).publish()
case cmd: Publish => new MempoolTxMonitor(nodeParams, cmd, bitcoinClient, txPublishContext, context, timers).publish()
}
}
}
Expand All @@ -85,7 +85,7 @@ object MempoolTxMonitor {
private class MempoolTxMonitor(nodeParams: NodeParams,
cmd: MempoolTxMonitor.Publish,
bitcoinClient: BitcoinCoreClient,
loggingInfo: TxPublishLogContext,
txPublishContext: TxPublishContext,
context: ActorContext[MempoolTxMonitor.Command],
timers: TimerScheduler[MempoolTxMonitor.Command])(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) {

Expand All @@ -101,7 +101,7 @@ private class MempoolTxMonitor(nodeParams: NodeParams,
Behaviors.receiveMessagePartial {
case PublishOk =>
log.debug("txid={} was successfully published, waiting for confirmation...", cmd.tx.txid)
context.system.eventStream ! EventStream.Publish(TransactionPublished(loggingInfo.channelId_opt.getOrElse(ByteVector32.Zeroes), loggingInfo.remoteNodeId, cmd.tx, cmd.fee, cmd.desc))
context.system.eventStream ! EventStream.Publish(TransactionPublished(txPublishContext.channelId_opt.getOrElse(ByteVector32.Zeroes), txPublishContext.remoteNodeId, cmd.tx, cmd.fee, cmd.desc))
waitForConfirmation()
case PublishFailed(reason) if reason.getMessage.contains("rejecting replacement") =>
log.info("could not publish tx: a conflicting mempool transaction is already in the mempool")
Expand Down Expand Up @@ -155,7 +155,7 @@ private class MempoolTxMonitor(nodeParams: NodeParams,
Behaviors.same
} else {
log.info("txid={} has reached min depth", cmd.tx.txid)
context.system.eventStream ! EventStream.Publish(TransactionConfirmed(loggingInfo.channelId_opt.getOrElse(ByteVector32.Zeroes), loggingInfo.remoteNodeId, cmd.tx))
context.system.eventStream ! EventStream.Publish(TransactionConfirmed(txPublishContext.channelId_opt.getOrElse(ByteVector32.Zeroes), txPublishContext.remoteNodeId, cmd.tx))
sendFinalResult(TxDeeplyBuried(cmd.tx), Some(messageAdapter))
}
case TxNotFound =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient.FundTransaction
import fr.acinq.eclair.blockchain.fee.FeeratePerKw
import fr.acinq.eclair.channel.Commitments
import fr.acinq.eclair.channel.publish.ReplaceableTxPrePublisher._
import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishLogContext
import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext
import fr.acinq.eclair.transactions.Transactions._
import fr.acinq.eclair.{NodeParams, NotificationsLogger}

Expand Down Expand Up @@ -68,9 +68,9 @@ object ReplaceableTxFunder {
case class FundingFailed(reason: TxPublisher.TxRejectedReason) extends FundingResult
// @formatter:on

def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, loggingInfo: TxPublishLogContext): Behavior[Command] = {
def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, txPublishContext: TxPublishContext): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withMdc(loggingInfo.mdc()) {
Behaviors.withMdc(txPublishContext.mdc()) {
Behaviors.receiveMessagePartial {
case FundTransaction(replyTo, cmd, tx, targetFeerate) =>
val txFunder = new ReplaceableTxFunder(nodeParams, replyTo, cmd, bitcoinClient, context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import akka.actor.typed.{ActorRef, Behavior}
import fr.acinq.bitcoin.{ByteVector32, ByteVector64, Crypto, Transaction}
import fr.acinq.eclair.NodeParams
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient
import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishLogContext
import fr.acinq.eclair.channel.publish.TxPublisher.TxPublishContext
import fr.acinq.eclair.channel.{Commitments, HtlcTxAndRemoteSig}
import fr.acinq.eclair.transactions.Transactions
import fr.acinq.eclair.transactions.Transactions._
Expand Down Expand Up @@ -96,9 +96,9 @@ object ReplaceableTxPrePublisher {
}
// @formatter:on

def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, loggingInfo: TxPublishLogContext): Behavior[Command] = {
def apply(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, txPublishContext: TxPublishContext): Behavior[Command] = {
Behaviors.setup { context =>
Behaviors.withMdc(loggingInfo.mdc()) {
Behaviors.withMdc(txPublishContext.mdc()) {
Behaviors.receiveMessagePartial {
case CheckPreconditions(replyTo, cmd) =>
val prePublisher = new ReplaceableTxPrePublisher(nodeParams, replyTo, cmd, bitcoinClient, context)
Expand Down
Loading