Skip to content

Commit

Permalink
Delay considering a channel closed when seeing an on-chain spend
Browse files Browse the repository at this point in the history
Issue #2437

When an external channel is spent, add it to the `spentChannels` list instead of immediately removing it from the graph.

RBF attempts can produce multiple spending txs in the mempool for the same channel.

The `spendChannels` list maps the txid of the spending tx to the scid of the spent channel.

When a channel announcement is validated with a funding tx on the `spentChannels` list, consider the new channel a splice of the corresponding spent channel.

A splice updates the graph edges to preserve balance estimate information in the graph.

If a spending tx from the `spentChannels` list is deeply buried before appearing in a valid channel announcement, remove the corresponding spent channel edge from the graph. Also remove any corresponding spending tx entries on the `spentChannels` list.
  • Loading branch information
remyers committed Nov 28, 2024
1 parent a0b5834 commit d7b9bea
Show file tree
Hide file tree
Showing 10 changed files with 285 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,6 @@ object ZmqWatcher {
def hints: Set[TxId]
}

/**
* Watch for the first transaction spending the given outpoint. We assume that txid is already confirmed or in the
* mempool (i.e. the outpoint exists).
*
* NB: an event will be triggered only once when we see a transaction that spends the given outpoint. If you want to
* react to the transaction spending the outpoint, you should use [[WatchSpent]] instead.
*/
sealed trait WatchSpentBasic[T <: WatchSpentBasicTriggered] extends Watch[T] {
/** TxId of the outpoint to watch. */
def txId: TxId
/** Index of the outpoint to watch. */
def outputIndex: Int
}

/** This event is sent when a [[WatchConfirmed]] condition is met. */
sealed trait WatchConfirmedTriggered extends WatchTriggered {
/** Block in which the transaction was confirmed. */
Expand All @@ -134,11 +120,10 @@ object ZmqWatcher {
def spendingTx: Transaction
}

/** This event is sent when a [[WatchSpentBasic]] condition is met. */
sealed trait WatchSpentBasicTriggered extends WatchTriggered

case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpentBasic[WatchExternalChannelSpentTriggered]
case class WatchExternalChannelSpentTriggered(shortChannelId: RealShortChannelId) extends WatchSpentBasicTriggered
case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpent[WatchExternalChannelSpentTriggered] {
override def hints: Set[TxId] = Set.empty
}
case class WatchExternalChannelSpentTriggered(shortChannelId: RealShortChannelId, spendingTx: Transaction) extends WatchSpentTriggered

case class WatchFundingSpent(replyTo: ActorRef[WatchFundingSpentTriggered], txId: TxId, outputIndex: Int, hints: Set[TxId]) extends WatchSpent[WatchFundingSpentTriggered]
case class WatchFundingSpentTriggered(spendingTx: Transaction) extends WatchSpentTriggered
Expand Down Expand Up @@ -194,7 +179,6 @@ object ZmqWatcher {
private def utxo(w: GenericWatch): Option[OutPoint] = {
w match {
case w: WatchSpent[_] => Some(OutPoint(w.txId, w.outputIndex))
case w: WatchSpentBasic[_] => Some(OutPoint(w.txId, w.outputIndex))
case _ => None
}
}
Expand Down Expand Up @@ -242,7 +226,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
.flatMap(watchedUtxos.get)
.flatten
.foreach {
case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId))
case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId, tx))
case w: WatchFundingSpent => context.self ! TriggerEvent(w.replyTo, w, WatchFundingSpentTriggered(tx))
case w: WatchOutputSpent => context.self ! TriggerEvent(w.replyTo, w, WatchOutputSpentTriggered(tx))
case _: WatchPublished => // nothing to do
Expand Down Expand Up @@ -336,9 +320,6 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
val result = w match {
case _ if watches.contains(w) =>
Ignore // we ignore duplicates
case w: WatchSpentBasic[_] =>
checkSpentBasic(w)
Keep
case w: WatchSpent[_] =>
checkSpent(w)
Keep
Expand Down Expand Up @@ -379,17 +360,6 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
}
}

private def checkSpentBasic(w: WatchSpentBasic[_ <: WatchSpentBasicTriggered]): Future[Unit] = {
// NB: we assume parent tx was published, we just need to make sure this particular output has not been spent
client.isTransactionOutputSpendable(w.txId, w.outputIndex, includeMempool = true).collect {
case false =>
log.info(s"output=${w.txId}:${w.outputIndex} has already been spent")
w match {
case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId))
}
}
}

private def checkSpent(w: WatchSpent[_ <: WatchSpentTriggered]): Future[Unit] = {
// first let's see if the parent tx was published or not
client.getTxConfirmations(w.txId).collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import fr.acinq.bitcoin.scalacompat.{Satoshi, SatoshiLong}
import fr.acinq.eclair.router.Graph.GraphStructure.{DirectedGraph, GraphEdge}
import fr.acinq.eclair.router.Router.{ChannelDesc, ChannelHop, Route}
import fr.acinq.eclair.wire.protocol.NodeAnnouncement
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, ShortChannelId, TimestampSecond, TimestampSecondLong, ToMilliSatoshiConversion}
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, RealShortChannelId, ShortChannelId, TimestampSecond, TimestampSecondLong, ToMilliSatoshiConversion}

import scala.concurrent.duration.{DurationInt, FiniteDuration}

Expand Down Expand Up @@ -195,6 +195,18 @@ case class BalanceEstimate private(low: MilliSatoshi,
)
}

def updateEdge(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): BalanceEstimate = {
val newCapacities = capacities - desc.shortChannelId + (newShortChannelId -> newCapacity)
val capacityDelta = (newCapacity - capacities.getOrElse(desc.shortChannelId, newCapacity)).toMilliSatoshi
copy(
// a capacity decrease will decrease the low bound, but not below 0
low = (low + capacityDelta.min(0 msat)).max(0 msat),
// a capacity increase will increase the high bound, but not above the capacity of the largest channel
high = (high + capacityDelta.max(0 msat)).min(newCapacities.values.maxOption.getOrElse(0 sat).toMilliSatoshi),
capacities = newCapacities
)
}

/**
* Estimate the probability that we can successfully send `amount` through the channel
*
Expand Down Expand Up @@ -263,6 +275,14 @@ case class BalancesEstimates(balances: Map[(PublicKey, PublicKey), BalanceEstima
defaultHalfLife
)

def updateEdge(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): BalancesEstimates = BalancesEstimates(
balances.updatedWith((desc.a, desc.b)) {
case None => None
case Some(balance) => Some(balance.updateEdge(desc, newShortChannelId, newCapacity))
},
defaultHalfLife
)

def channelCouldSend(hop: ChannelHop, amount: MilliSatoshi)(implicit log: LoggingAdapter): BalancesEstimates = {
get(hop.nodeId, hop.nextNodeId).foreach { balance =>
val estimatedProbability = balance.canSend(amount, TimestampSecond.now())
Expand Down Expand Up @@ -305,6 +325,13 @@ case class GraphWithBalanceEstimates(graph: DirectedGraph, private val balances:
descList.foldLeft(balances)((acc, edge) => acc.removeEdge(edge).removeEdge(edge.reversed)),
)

def updateChannel(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): GraphWithBalanceEstimates = {
GraphWithBalanceEstimates(
graph.updateChannel(desc, newShortChannelId, newCapacity),
balances.updateEdge(desc, newShortChannelId, newCapacity).updateEdge(desc.reversed, newShortChannelId, newCapacity)
)
}

def routeCouldRelay(route: Route)(implicit log: LoggingAdapter): GraphWithBalanceEstimates = {
val (balances1, _) = route.hops.foldRight((balances, route.amount)) {
case (hop, (balances, amount)) =>
Expand Down
20 changes: 20 additions & 0 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,26 @@ object Graph {
descList.foldLeft(this)((acc, edge) => acc.removeChannel(edge))
}

/**
* Update the shortChannelId and capacity of edges corresponding to the given channel-desc,
* both edges (corresponding to both directions) are updated.
*
* @param desc the channel description for the channel to update
* @param newShortChannelId the new shortChannelId for this channel
* @param newCapacity the new capacity of the channel
* @return a new graph with updated vertexes
*/
def updateChannel(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): DirectedGraph = {
val newDesc = desc.copy(shortChannelId = newShortChannelId)
val updatedVertices =
vertices
.updatedWith(desc.b)(_.map(vertexB => vertexB.copy(incomingEdges = vertexB.incomingEdges - desc +
(newDesc -> vertexB.incomingEdges(desc).copy(desc = newDesc, capacity = newCapacity)))))
.updatedWith(desc.a)(_.map(vertexA => vertexA.copy(incomingEdges = vertexA.incomingEdges - desc.reversed +
(newDesc.reversed -> vertexA.incomingEdges(desc.reversed).copy(desc = newDesc.reversed, capacity = newCapacity)))))
DirectedGraph(updatedVertices)
}

/**
* @return For edges to be considered equal they must have the same in/out vertices AND same shortChannelId
*/
Expand Down
24 changes: 19 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@ import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.bitcoin.scalacompat.{BlockHash, ByteVector32, Satoshi, TxId}
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain.CurrentBlockHeight
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{ValidateResult, WatchExternalChannelSpent, WatchExternalChannelSpentTriggered}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{ValidateResult, WatchExternalChannelSpent, WatchExternalChannelSpentTriggered, WatchTxConfirmed, WatchTxConfirmedTriggered}
import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.fsm.Channel.ANNOUNCEMENTS_MINCONF
import fr.acinq.eclair.crypto.TransportHandler
import fr.acinq.eclair.db.NetworkDb
import fr.acinq.eclair.io.Peer.PeerRoutingMessage
Expand Down Expand Up @@ -64,6 +66,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
context.system.eventStream.subscribe(self, classOf[LocalChannelUpdate])
context.system.eventStream.subscribe(self, classOf[LocalChannelDown])
context.system.eventStream.subscribe(self, classOf[AvailableBalanceChanged])
context.system.eventStream.subscribe(self, classOf[CurrentBlockHeight])
context.system.eventStream.publish(SubscriptionsComplete(this.getClass))

startTimerWithFixedDelay(TickBroadcast.toString, TickBroadcast, nodeParams.routerConf.routerBroadcastInterval)
Expand Down Expand Up @@ -113,7 +116,8 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
scid2PrivateChannels = Map.empty,
excludedChannels = Map.empty,
graphWithBalances = GraphWithBalanceEstimates(graph, nodeParams.routerConf.balanceEstimateHalfLife),
sync = Map.empty)
sync = Map.empty,
spentChannels = Map.empty)
startWith(NORMAL, data)
}

Expand Down Expand Up @@ -259,8 +263,17 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
case Event(r: ValidateResult, d) =>
stay() using Validation.handleChannelValidationResponse(d, nodeParams, watcher, r)

case Event(WatchExternalChannelSpentTriggered(shortChannelId), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
stay() using Validation.handleChannelSpent(d, nodeParams.db.network, shortChannelId)
case Event(WatchExternalChannelSpentTriggered(shortChannelId, spendingTx), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
val txId = d.channels.getOrElse(shortChannelId, d.prunedChannels(shortChannelId)).fundingTxId
log.info("funding tx txId={} of channelId={} has been spent - delay removing it from the graph until {} blocks after the spend confirms", txId, shortChannelId, ANNOUNCEMENTS_MINCONF * 2)
watcher ! WatchTxConfirmed(self, spendingTx.txid, ANNOUNCEMENTS_MINCONF * 2)
stay() using d.copy(spentChannels = d.spentChannels + (spendingTx.txid -> shortChannelId))

case Event(WatchTxConfirmedTriggered(_, _, spendingTx), d) =>
d.spentChannels.get(spendingTx.txid) match {
case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, nodeParams.db.network, shortChannelId)
case None => stay()
}

case Event(n: NodeAnnouncement, d: Data) =>
stay() using Validation.handleNodeAnnouncement(d, nodeParams.db.network, Set(LocalGossip), n)
Expand Down Expand Up @@ -757,7 +770,8 @@ object Router {
scid2PrivateChannels: Map[Long, ByteVector32], // real scid or alias to channel_id, only to be used for private channels
excludedChannels: Map[ChannelDesc, ExcludedChannelStatus], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
graphWithBalances: GraphWithBalanceEstimates,
sync: Map[PublicKey, Syncing] // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
sync: Map[PublicKey, Syncing], // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
spentChannels: Map[TxId, RealShortChannelId], // transactions that spend funding txs that are not yet deeply buried
) {
def resolve(scid: ShortChannelId): Option[KnownChannel] = {
// let's assume this is a real scid
Expand Down
44 changes: 40 additions & 4 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import fr.acinq.eclair.db.NetworkDb
import fr.acinq.eclair.router.Graph.GraphStructure.GraphEdge
import fr.acinq.eclair.router.Monitoring.Metrics
import fr.acinq.eclair.router.Router._
import fr.acinq.eclair.router.Validation.{addPublicChannel, splicePublicChannel}
import fr.acinq.eclair.transactions.Scripts
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{BlockHeight, Logs, MilliSatoshiLong, NodeParams, RealShortChannelId, ShortChannelId, TxCoordinates}
Expand Down Expand Up @@ -113,7 +114,10 @@ object Validation {
log.debug("validation successful for shortChannelId={}", c.shortChannelId)
remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c)))
val capacity = tx.txOut(outputIndex).amount
Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None))
d0.spentChannels.get(tx.txid) match {
case Some(parentScid) => Some(splicePublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, d0.channels(parentScid)))
case None => Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None))
}
}
case ValidateResult(c, Right((tx, fundingTxStatus: UtxoStatus.Spent))) =>
if (fundingTxStatus.spendingTxConfirmed) {
Expand Down Expand Up @@ -156,6 +160,38 @@ object Validation {
}
}

private def splicePublicChannel(d: Data, nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], ann: ChannelAnnouncement, spliceTxId: TxId, capacity: Satoshi, parentChannel: PublicChannel)(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
val fundingOutputIndex = outputIndex(ann.shortChannelId)
watcher ! WatchExternalChannelSpent(ctx.self, spliceTxId, fundingOutputIndex, ann.shortChannelId)
ctx.system.eventStream.publish(ChannelsDiscovered(SingleChannelDiscovered(ann, capacity, None, None) :: Nil))
nodeParams.db.network.addChannel(ann, spliceTxId, capacity)
nodeParams.db.network.removeChannel(parentChannel.shortChannelId)
val pubChan = PublicChannel(
ann = ann,
fundingTxId = spliceTxId,
capacity = capacity,
update_1_opt = None,
update_2_opt = None,
meta_opt = parentChannel.meta_opt
)
log.debug("replacing parent channel scid={} with splice channel scid={}; splice channel={}", parentChannel.shortChannelId, ann.shortChannelId, pubChan)
// we need to update the graph because the edge identifiers and capacity change from the parent scid to the new splice scid
log.debug("updating the graph for shortChannelId={}", pubChan.shortChannelId)
val graph1 = d.graphWithBalances.updateChannel(ChannelDesc(parentChannel.shortChannelId, parentChannel.nodeId1, parentChannel.nodeId2), ann.shortChannelId, capacity)
d.copy(
// we also add the splice scid -> channelId and remove the parent scid -> channelId mappings
channels = d.channels + (pubChan.shortChannelId -> pubChan) - parentChannel.shortChannelId,
// we also add the newly validated channels to the rebroadcast queue
rebroadcast = d.rebroadcast.copy(
// we rebroadcast the splice channel to our peers
channels = d.rebroadcast.channels + (pubChan.ann -> d.awaiting.getOrElse(pubChan.ann, if (pubChan.nodeId1 == nodeParams.nodeId || pubChan.nodeId2 == nodeParams.nodeId) Seq(LocalGossip) else Nil).toSet),
),
graphWithBalances = graph1,
spentChannels = d.spentChannels.filter(_._2 != parentChannel.shortChannelId)
)
}

private def addPublicChannel(d: Data, nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Command], ann: ChannelAnnouncement, fundingTxId: TxId, capacity: Satoshi, privChan_opt: Option[PrivateChannel])(implicit ctx: ActorContext, log: DiagnosticLoggingAdapter): Data = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
val fundingOutputIndex = outputIndex(ann.shortChannelId)
Expand Down Expand Up @@ -217,7 +253,7 @@ object Validation {
def handleChannelSpent(d: Data, db: NetworkDb, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
val lostChannel = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get.ann
log.info("funding tx of channelId={} has been spent", shortChannelId)
log.info("funding tx for channelId={} was spent", shortChannelId)
// we need to remove nodes that aren't tied to any channels anymore
val channels1 = d.channels - shortChannelId
val prunedChannels1 = d.prunedChannels - shortChannelId
Expand All @@ -236,7 +272,7 @@ object Validation {
db.removeNode(nodeId)
ctx.system.eventStream.publish(NodeLost(nodeId))
}
d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1)
d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1, spentChannels = d.spentChannels.filter(_._2 != shortChannelId))
}

def handleNodeAnnouncement(d: Data, db: NetworkDb, origins: Set[GossipOrigin], n: NodeAnnouncement, wasStashed: Boolean = false)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
Expand Down Expand Up @@ -545,7 +581,7 @@ object Validation {
val scid2PrivateChannels1 = d.scid2PrivateChannels - lcd.shortIds.localAlias.toLong -- lcd.shortIds.real.toOption.map(_.toLong)
// a local channel has permanently gone down
if (lcd.shortIds.real.toOption.exists(d.channels.contains)) {
// the channel was public, we will receive (or have already received) a WatchEventSpentBasic event, that will trigger a clean up of the channel
// the channel was public, we will receive (or have already received) a WatchSpent event, that will trigger a clean up of the channel
// so let's not do anything here
d.copy(scid2PrivateChannels = scid2PrivateChannels1)
} else if (d.privateChannels.contains(lcd.channelId)) {
Expand Down
Loading

0 comments on commit d7b9bea

Please sign in to comment.