Skip to content

Commit

Permalink
Update scid when splice funding tx confirms
Browse files Browse the repository at this point in the history
A zero conf splice will send splice_locked when funding tx is published, but not update it's scid until the fund tx confirms.

Channel balances and max htlc amount will not update until local sends to and receives from it's remote peer splice_locked.
  • Loading branch information
remyers committed Nov 26, 2024
1 parent d1109fa commit 929f85d
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,6 @@ object ZmqWatcher {
def hints: Set[TxId]
}

/**
* Watch for the first transaction spending the given outpoint. We assume that txid is already confirmed or in the
* mempool (i.e. the outpoint exists).
*
* NB: an event will be triggered only once when we see a transaction that spends the given outpoint. If you want to
* react to the transaction spending the outpoint, you should use [[WatchSpent]] instead.
*/
sealed trait WatchSpentBasic[T <: WatchSpentBasicTriggered] extends Watch[T] {
/** TxId of the outpoint to watch. */
def txId: TxId
/** Index of the outpoint to watch. */
def outputIndex: Int
}

/** This event is sent when a [[WatchConfirmed]] condition is met. */
sealed trait WatchConfirmedTriggered extends WatchTriggered {
/** Block in which the transaction was confirmed. */
Expand All @@ -134,11 +120,10 @@ object ZmqWatcher {
def spendingTx: Transaction
}

/** This event is sent when a [[WatchSpentBasic]] condition is met. */
sealed trait WatchSpentBasicTriggered extends WatchTriggered

case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpentBasic[WatchExternalChannelSpentTriggered]
case class WatchExternalChannelSpentTriggered(shortChannelId: RealShortChannelId) extends WatchSpentBasicTriggered
case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpent[WatchExternalChannelSpentTriggered] {
override def hints: Set[TxId] = Set.empty
}
case class WatchExternalChannelSpentTriggered(shortChannelId: RealShortChannelId, spendingTx: Transaction) extends WatchSpentTriggered

case class WatchFundingSpent(replyTo: ActorRef[WatchFundingSpentTriggered], txId: TxId, outputIndex: Int, hints: Set[TxId]) extends WatchSpent[WatchFundingSpentTriggered]
case class WatchFundingSpentTriggered(spendingTx: Transaction) extends WatchSpentTriggered
Expand Down Expand Up @@ -194,7 +179,6 @@ object ZmqWatcher {
private def utxo(w: GenericWatch): Option[OutPoint] = {
w match {
case w: WatchSpent[_] => Some(OutPoint(w.txId, w.outputIndex))
case w: WatchSpentBasic[_] => Some(OutPoint(w.txId, w.outputIndex))
case _ => None
}
}
Expand Down Expand Up @@ -242,7 +226,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
.flatMap(watchedUtxos.get)
.flatten
.foreach {
case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId))
case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId, tx))
case w: WatchFundingSpent => context.self ! TriggerEvent(w.replyTo, w, WatchFundingSpentTriggered(tx))
case w: WatchOutputSpent => context.self ! TriggerEvent(w.replyTo, w, WatchOutputSpentTriggered(tx))
case _: WatchPublished => // nothing to do
Expand Down Expand Up @@ -336,9 +320,6 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
val result = w match {
case _ if watches.contains(w) =>
Ignore // we ignore duplicates
case w: WatchSpentBasic[_] =>
checkSpentBasic(w)
Keep
case w: WatchSpent[_] =>
checkSpent(w)
Keep
Expand Down Expand Up @@ -379,17 +360,6 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
}
}

private def checkSpentBasic(w: WatchSpentBasic[_ <: WatchSpentBasicTriggered]): Future[Unit] = {
// NB: we assume parent tx was published, we just need to make sure this particular output has not been spent
client.isTransactionOutputSpendable(w.txId, w.outputIndex, includeMempool = true).collect {
case false =>
log.info(s"output=${w.txId}:${w.outputIndex} has already been spent")
w match {
case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId))
}
}
}

private def checkSpent(w: WatchSpent[_ <: WatchSpentTriggered]): Future[Unit] = {
// first let's see if the parent tx was published or not
client.getTxConfirmations(w.txId).collect {
Expand Down
37 changes: 13 additions & 24 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain.CurrentBlockHeight
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{ValidateResult, WatchExternalChannelSpent, WatchExternalChannelSpentTriggered}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{ValidateResult, WatchExternalChannelSpent, WatchExternalChannelSpentTriggered, WatchTxConfirmed, WatchTxConfirmedTriggered}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.crypto.TransportHandler
import fr.acinq.eclair.db.NetworkDb
Expand Down Expand Up @@ -76,10 +76,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
{
log.info("loading network announcements from db...")
val (pruned, channels) = db.listChannels().partition { case (_, pc) => pc.isStale(nodeParams.currentBlockHeight) }
val nodeIds = (pruned.values ++ channels.values).flatMap(pc => pc.ann.nodeId1 :: pc.ann.nodeId2 :: Nil).toSet
val (isolatedNodes, nodes) = db.listNodes().partition(n => !nodeIds.contains(n.nodeId))
log.info("removed {} isolated nodes from db", isolatedNodes.size)
isolatedNodes.foreach(n => db.removeNode(n.nodeId))
val nodes = db.listNodes()
Metrics.Nodes.withoutTags().update(nodes.size)
Metrics.Channels.withoutTags().update(channels.size)
log.info("loaded from db: channels={} nodes={}", channels.size, nodes.size)
Expand Down Expand Up @@ -265,22 +262,17 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
case Event(r: ValidateResult, d) =>
stay() using Validation.handleChannelValidationResponse(d, nodeParams, watcher, r)

case Event(WatchExternalChannelSpentTriggered(shortChannelId), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
log.info("funding tx of channelId={} has been spent - delay removing it from the graph for 12 blocks", shortChannelId)
// remove the channel from the db so it will not be added to the graph if a restart occurs before 12 blocks elapse
db.removeChannel(shortChannelId)
stay() using d.copy(spentChannels = d.spentChannels + (shortChannelId -> nodeParams.currentBlockHeight))

case Event(c: CurrentBlockHeight, d) =>
val spentChannels1 = d.spentChannels.filter {
// spent channels may be confirmed as a splice; wait 12 blocks before removing them from the graph
case (_, blockHeight) if blockHeight >= c.blockHeight + 12 => true
case (shortChannelId, _) => self ! HandleChannelSpent(shortChannelId); false
}
stay() using d.copy(spentChannels = spentChannels1)
case Event(WatchExternalChannelSpentTriggered(shortChannelId, spendingTx), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
val txId = d.channels.getOrElse(shortChannelId, d.prunedChannels(shortChannelId)).fundingTxId
log.info("funding tx txId={} of channelId={} has been spent - delay removing it from the graph until {} blocks after the spend confirms", txId, shortChannelId, nodeParams.channelConf.minDepthBlocks)
watcher ! WatchTxConfirmed(self, spendingTx.txid, nodeParams.channelConf.minDepthBlocks)
stay() using d.copy(spentChannels = d.spentChannels + (spendingTx.txid -> shortChannelId))

case Event(HandleChannelSpent(shortChannelId), d: Data) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
stay() using Validation.handleChannelSpent(d, nodeParams.db.network, shortChannelId)
case Event(WatchTxConfirmedTriggered(_, _, spendingTx), d) =>
d.spentChannels.get(spendingTx.txid) match {
case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, nodeParams.db.network, shortChannelId)
case None => stay()
}

case Event(n: NodeAnnouncement, d: Data) =>
stay() using Validation.handleNodeAnnouncement(d, nodeParams.db.network, Set(LocalGossip), n)
Expand Down Expand Up @@ -778,7 +770,7 @@ object Router {
excludedChannels: Map[ChannelDesc, ExcludedChannelStatus], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
graphWithBalances: GraphWithBalanceEstimates,
sync: Map[PublicKey, Syncing], // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
spentChannels: Map[RealShortChannelId, BlockHeight], // channels with funding txs spent less than 12 blocks ago
spentChannels: Map[TxId, RealShortChannelId], // channels with spent funding txs that are not deeply buried yet
) {
def resolve(scid: ShortChannelId): Option[KnownChannel] = {
// let's assume this is a real scid
Expand Down Expand Up @@ -819,7 +811,4 @@ object Router {
/** We have tried to relay this amount from this channel and it failed. */
case class ChannelCouldNotRelay(amount: MilliSatoshi, hop: ChannelHop)

/** Funding Tx of the channel id has been spent and not updated with a splice within 12 blocks. */
private case class HandleChannelSpent(shortChannelId: RealShortChannelId)

}
29 changes: 13 additions & 16 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import fr.acinq.eclair.db.NetworkDb
import fr.acinq.eclair.router.Graph.GraphStructure.GraphEdge
import fr.acinq.eclair.router.Monitoring.Metrics
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.router.Validation.{addPublicChannel, splicePublicChannel}
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{BlockHeight, Logs, MilliSatoshiLong, NodeParams, RealShortChannelId, ShortChannelId, TxCoordinates}
Expand Down Expand Up @@ -111,17 +112,11 @@ object Validation {
None
} else {
log.debug("validation successful for shortChannelId={}", c.shortChannelId)
val sharedInputTxId_opt = tx.txIn.find(_.signatureScript == fundingOutputScript).map(_.outPoint.txid)
remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c)))
val capacity = tx.txOut(outputIndex).amount
// if a channel spends the shared output of a recently spent channel, then it is a splice
sharedInputTxId_opt match {
d0.spentChannels.get(tx.txid) match {
case Some(parentScid) => Some(splicePublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, d0.channels(parentScid)))
case None => Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None))
case Some(sharedInputTxId) =>
d0.spentChannels.find(spent => d0.channels.get(spent._1).exists(_.fundingTxId == sharedInputTxId)) match {
case Some((parentScid, _)) => Some(splicePublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, d0.channels(parentScid)))
case None => log.error("channel shortChannelId={} is a splice, but not matching channel found!", c.shortChannelId); None
}
}
}
case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) =>
Expand Down Expand Up @@ -165,16 +160,16 @@ object Validation {
}
}

private def splicePublicChannel(d: Data, nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], ann: ChannelAnnouncement, fundingTxId: TxId, capacity: Satoshi, parentChannel: PublicChannel)(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = {
private def splicePublicChannel(d: Data, nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], ann: ChannelAnnouncement, spliceTxId: TxId, capacity: Satoshi, parentChannel: PublicChannel)(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
val fundingOutputIndex = outputIndex(ann.shortChannelId)
watcher ! WatchExternalChannelSpent(ctx.self, fundingTxId, fundingOutputIndex, ann.shortChannelId)
watcher ! WatchExternalChannelSpent(ctx.self, spliceTxId, fundingOutputIndex, ann.shortChannelId)
ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(ann, capacity, None, None) :: Nil))
nodeParams.db.network.addChannel(ann, fundingTxId, capacity)
nodeParams.db.network.addChannel(ann, spliceTxId, capacity)
nodeParams.db.network.removeChannel(parentChannel.shortChannelId)
val pubChan = PublicChannel(
ann = ann,
fundingTxId = fundingTxId,
fundingTxId = spliceTxId,
capacity = capacity,
update_1_opt = parentChannel.update_1_opt,
update_2_opt = parentChannel.update_2_opt,
Expand All @@ -192,7 +187,8 @@ object Validation {
// we rebroadcast the splice channel to our peers
channels = d.rebroadcast.channels + (pubChan.ann -> d.awaiting.getOrElse(pubChan.ann, if (pubChan.nodeId1 == nodeParams.nodeId || pubChan.nodeId2 == nodeParams.nodeId) Seq(LocalGossip) else Nil).toSet),
),
graphWithBalances = graph1
graphWithBalances = graph1,
spentChannels = d.spentChannels.filter(_._1 != spliceTxId)
)
}

Expand Down Expand Up @@ -257,7 +253,7 @@ object Validation {
def handleChannelSpent(d: Data, db: NetworkDb, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
val lostChannel = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get.ann
log.info("funding tx of channelId={} has been spent", shortChannelId)
log.info("funding tx for channelId={} was spent", shortChannelId)
// we need to remove nodes that aren't tied to any channels anymore
val channels1 = d.channels - shortChannelId
val prunedChannels1 = d.prunedChannels - shortChannelId
Expand All @@ -276,7 +272,8 @@ object Validation {
db.removeNode(nodeId)
ctx.system.eventStream.publish(NodeLost(nodeId))
}
d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1)
val spentChannels1 = d.spentChannels.filter(_._2 != shortChannelId)
d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1, spentChannels = spentChannels1)
}

def handleNodeAnnouncement(d: Data, db: NetworkDb, origins: Set[GossipOrigin], n: NodeAnnouncement, wasStashed: Boolean = false)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
Expand Down Expand Up @@ -585,7 +582,7 @@ object Validation {
val scid2PrivateChannels1 = d.scid2PrivateChannels - lcd.shortIds.localAlias.toLong -- lcd.shortIds.real.toOption.map(_.toLong)
// a local channel has permanently gone down
if (lcd.shortIds.real.toOption.exists(d.channels.contains)) {
// the channel was public, we will receive (or have already received) a WatchEventSpentBasic event, that will trigger a clean up of the channel
// the channel was public, we will receive (or have already received) a WatchSpent event, that will trigger a clean up of the channel
// so let's not do anything here
d.copy(scid2PrivateChannels = scid2PrivateChannels1)
} else if (d.privateChannels.contains(lcd.channelId)) {
Expand Down
Loading

0 comments on commit 929f85d

Please sign in to comment.