Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay considering a channel closed when seeing an on-chain spend #2936

Merged
merged 13 commits into from
Dec 13, 2024
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -104,20 +104,6 @@ object ZmqWatcher {
def hints: Set[TxId]
}

/**
* Watch for the first transaction spending the given outpoint. We assume that txid is already confirmed or in the
* mempool (i.e. the outpoint exists).
*
* NB: an event will be triggered only once when we see a transaction that spends the given outpoint. If you want to
* react to the transaction spending the outpoint, you should use [[WatchSpent]] instead.
*/
sealed trait WatchSpentBasic[T <: WatchSpentBasicTriggered] extends Watch[T] {
/** TxId of the outpoint to watch. */
def txId: TxId
/** Index of the outpoint to watch. */
def outputIndex: Int
}

/** This event is sent when a [[WatchConfirmed]] condition is met. */
sealed trait WatchConfirmedTriggered extends WatchTriggered {
/** Block in which the transaction was confirmed. */
Expand All @@ -134,11 +120,9 @@ object ZmqWatcher {
def spendingTx: Transaction
}

/** This event is sent when a [[WatchSpentBasic]] condition is met. */
sealed trait WatchSpentBasicTriggered extends WatchTriggered

case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpentBasic[WatchExternalChannelSpentTriggered]
case class WatchExternalChannelSpentTriggered(shortChannelId: RealShortChannelId) extends WatchSpentBasicTriggered
case class WatchExternalChannelSpent(replyTo: ActorRef[WatchExternalChannelSpentTriggered], txId: TxId, outputIndex: Int, shortChannelId: RealShortChannelId) extends WatchSpent[WatchExternalChannelSpentTriggered] { override def hints: Set[TxId] = Set.empty }
case class WatchExternalChannelSpentTriggered(shortChannelId: RealShortChannelId, spendingTx: Transaction) extends WatchSpentTriggered
case class UnwatchExternalChannelSpent(txId: TxId, outputIndex: Int) extends Command

case class WatchFundingSpent(replyTo: ActorRef[WatchFundingSpentTriggered], txId: TxId, outputIndex: Int, hints: Set[TxId]) extends WatchSpent[WatchFundingSpentTriggered]
case class WatchFundingSpentTriggered(spendingTx: Transaction) extends WatchSpentTriggered
Expand Down Expand Up @@ -197,7 +181,6 @@ object ZmqWatcher {
private def utxo(w: GenericWatch): Option[OutPoint] = {
w match {
case w: WatchSpent[_] => Some(OutPoint(w.txId, w.outputIndex))
case w: WatchSpentBasic[_] => Some(OutPoint(w.txId, w.outputIndex))
case _ => None
}
}
Expand Down Expand Up @@ -245,7 +228,7 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
.flatMap(watchedUtxos.get)
.flatten
.foreach {
case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId))
case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId, tx))
case w: WatchFundingSpent => context.self ! TriggerEvent(w.replyTo, w, WatchFundingSpentTriggered(tx))
case w: WatchOutputSpent => context.self ! TriggerEvent(w.replyTo, w, WatchOutputSpentTriggered(tx))
case _: WatchPublished => // nothing to do
Expand Down Expand Up @@ -339,9 +322,6 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
val result = w match {
case _ if watches.contains(w) =>
Ignore // we ignore duplicates
case w: WatchSpentBasic[_] =>
checkSpentBasic(w)
Keep
case w: WatchSpent[_] =>
checkSpent(w)
Keep
Expand Down Expand Up @@ -375,6 +355,11 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
}
watching(watches -- deprecatedWatches, watchedUtxos)

case UnwatchExternalChannelSpent(txId, outputIndex) =>
val deprecatedWatches = watches.keySet.collect { case w: WatchExternalChannelSpent if w.txId == txId && w.outputIndex == outputIndex => w }
val watchedUtxos1 = deprecatedWatches.foldLeft(watchedUtxos) { case (m, w) => removeWatchedUtxos(m, w) }
watching(watches -- deprecatedWatches, watchedUtxos1)

case ValidateRequest(replyTo, ann) =>
client.validate(ann).map(replyTo ! _)
Behaviors.same
Expand All @@ -390,17 +375,6 @@ private class ZmqWatcher(nodeParams: NodeParams, blockHeight: AtomicLong, client
}
}

private def checkSpentBasic(w: WatchSpentBasic[_ <: WatchSpentBasicTriggered]): Future[Unit] = {
// NB: we assume parent tx was published, we just need to make sure this particular output has not been spent
client.isTransactionOutputSpendable(w.txId, w.outputIndex, includeMempool = true).collect {
case false =>
log.info(s"output=${w.txId}:${w.outputIndex} has already been spent")
w match {
case w: WatchExternalChannelSpent => context.self ! TriggerEvent(w.replyTo, w, WatchExternalChannelSpentTriggered(w.shortChannelId))
}
}
}

private def checkSpent(w: WatchSpent[_ <: WatchSpentTriggered]): Future[Unit] = {
// first let's see if the parent tx was published or not
client.getTxConfirmations(w.txId).collect {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,8 @@ case class Commitments(params: ChannelParams,
val remoteCommitIndex = active.head.remoteCommit.index
val nextRemoteCommitIndex = remoteCommitIndex + 1

// While we have multiple active commitments, we use the most restrictive one.
val capacity = active.map(_.capacity).min
lazy val availableBalanceForSend: MilliSatoshi = active.map(_.availableBalanceForSend(params, changes)).min
lazy val availableBalanceForReceive: MilliSatoshi = active.map(_.availableBalanceForReceive(params, changes)).min

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import fr.acinq.bitcoin.scalacompat.{Satoshi, SatoshiLong}
import fr.acinq.eclair.router.Graph.GraphStructure.{DirectedGraph, GraphEdge}
import fr.acinq.eclair.router.Router.{ChannelDesc, ChannelHop, Route}
import fr.acinq.eclair.wire.protocol.NodeAnnouncement
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, ShortChannelId, TimestampSecond, TimestampSecondLong, ToMilliSatoshiConversion}
import fr.acinq.eclair.{MilliSatoshi, MilliSatoshiLong, RealShortChannelId, ShortChannelId, TimestampSecond, TimestampSecondLong, ToMilliSatoshiConversion}

import scala.concurrent.duration.{DurationInt, FiniteDuration}

Expand Down Expand Up @@ -195,6 +195,18 @@ case class BalanceEstimate private(low: MilliSatoshi,
)
}

def updateEdge(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): BalanceEstimate = {
t-bast marked this conversation as resolved.
Show resolved Hide resolved
val newCapacities = capacities - desc.shortChannelId + (newShortChannelId -> newCapacity)
val capacityDelta = (newCapacity - capacities.getOrElse(desc.shortChannelId, newCapacity)).toMilliSatoshi
copy(
// a capacity decrease will decrease the low bound, but not below 0
low = (low + capacityDelta.min(0 msat)).max(0 msat),
// a capacity increase will increase the high bound, but not above the capacity of the largest channel
high = (high + capacityDelta.max(0 msat)).min(newCapacities.values.maxOption.getOrElse(0 sat).toMilliSatoshi),
capacities = newCapacities
)
}

/**
* Estimate the probability that we can successfully send `amount` through the channel
*
Expand Down Expand Up @@ -263,6 +275,14 @@ case class BalancesEstimates(balances: Map[(PublicKey, PublicKey), BalanceEstima
defaultHalfLife
)

def updateEdge(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): BalancesEstimates = BalancesEstimates(
balances.updatedWith((desc.a, desc.b)) {
case None => None
case Some(balance) => Some(balance.updateEdge(desc, newShortChannelId, newCapacity))
},
defaultHalfLife
)

def channelCouldSend(hop: ChannelHop, amount: MilliSatoshi)(implicit log: LoggingAdapter): BalancesEstimates = {
get(hop.nodeId, hop.nextNodeId).foreach { balance =>
val estimatedProbability = balance.canSend(amount, TimestampSecond.now())
Expand Down Expand Up @@ -305,6 +325,13 @@ case class GraphWithBalanceEstimates(graph: DirectedGraph, private val balances:
descList.foldLeft(balances)((acc, edge) => acc.removeEdge(edge).removeEdge(edge.reversed)),
)

def updateChannel(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): GraphWithBalanceEstimates = {
GraphWithBalanceEstimates(
graph.updateChannel(desc, newShortChannelId, newCapacity),
balances.updateEdge(desc, newShortChannelId, newCapacity).updateEdge(desc.reversed, newShortChannelId, newCapacity)
)
}

def routeCouldRelay(route: Route)(implicit log: LoggingAdapter): GraphWithBalanceEstimates = {
val (balances1, _) = route.hops.foldRight((balances, route.amount)) {
case (hop, (balances, amount)) =>
Expand Down
26 changes: 25 additions & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/router/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,15 @@ object Graph {
}
}

case class Vertex(features: Features[NodeFeature], incomingEdges: Map[ChannelDesc, GraphEdge])
case class Vertex(features: Features[NodeFeature], incomingEdges: Map[ChannelDesc, GraphEdge]) {
def update(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): Vertex =
incomingEdges.get(desc) match {
case None => this
case Some(edge) =>
val updatedEdge = edge.copy(desc = desc.copy(shortChannelId = newShortChannelId), capacity = newCapacity)
copy(incomingEdges = incomingEdges - desc + (desc.copy(shortChannelId = newShortChannelId) -> updatedEdge))
}
}

/** A graph data structure that uses an adjacency list, stores the incoming edges of the neighbors */
case class DirectedGraph(private val vertices: Map[PublicKey, Vertex]) {
Expand Down Expand Up @@ -678,6 +686,22 @@ object Graph {
descList.foldLeft(this)((acc, edge) => acc.removeChannel(edge))
}

/**
* Update the shortChannelId and capacity of edges corresponding to the given channel-desc,
* both edges (corresponding to both directions) are updated.
*
* @param desc the channel description for the channel to update
* @param newShortChannelId the new shortChannelId for this channel
* @param newCapacity the new capacity of the channel
* @return a new graph with updated vertexes
*/
def updateChannel(desc: ChannelDesc, newShortChannelId: RealShortChannelId, newCapacity: Satoshi): DirectedGraph = {
DirectedGraph(vertices
.updatedWith(desc.b)(_.map(_.update(desc, newShortChannelId, newCapacity)))
t-bast marked this conversation as resolved.
Show resolved Hide resolved
.updatedWith(desc.a)(_.map(_.update(desc.reversed, newShortChannelId, newCapacity)))
)
}

/**
* @return For edges to be considered equal they must have the same in/out vertices AND same shortChannelId
*/
Expand Down
33 changes: 19 additions & 14 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ import fr.acinq.bitcoin.scalacompat.{BlockHash, ByteVector32, Satoshi, TxId}
import fr.acinq.eclair.Logs.LogCategory
import fr.acinq.eclair._
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{ValidateResult, WatchExternalChannelSpent, WatchExternalChannelSpentTriggered}
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._
import fr.acinq.eclair.channel._
import fr.acinq.eclair.channel.fsm.Channel.ANNOUNCEMENTS_MINCONF
import fr.acinq.eclair.crypto.TransportHandler
import fr.acinq.eclair.db.NetworkDb
import fr.acinq.eclair.io.Peer.PeerRoutingMessage
Expand Down Expand Up @@ -113,7 +114,8 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
scid2PrivateChannels = Map.empty,
excludedChannels = Map.empty,
graphWithBalances = GraphWithBalanceEstimates(graph, nodeParams.routerConf.balanceEstimateHalfLife),
sync = Map.empty)
sync = Map.empty,
spentChannels = Map.empty)
startWith(NORMAL, data)
}

Expand Down Expand Up @@ -259,8 +261,17 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
case Event(r: ValidateResult, d) =>
stay() using Validation.handleChannelValidationResponse(d, nodeParams, watcher, r)

case Event(WatchExternalChannelSpentTriggered(shortChannelId), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
stay() using Validation.handleChannelSpent(d, nodeParams.db.network, shortChannelId)
case Event(WatchExternalChannelSpentTriggered(shortChannelId, spendingTx), d) if d.channels.contains(shortChannelId) || d.prunedChannels.contains(shortChannelId) =>
val fundingTxId = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get.fundingTxId
log.info("funding tx txId={} of channelId={} has been spent by txId={}: waiting for the spending tx to have enough confirmations before removing the channel from the graph", fundingTxId, shortChannelId, spendingTx.txid)
watcher ! WatchTxConfirmed(self, spendingTx.txid, ANNOUNCEMENTS_MINCONF * 2)
remyers marked this conversation as resolved.
Show resolved Hide resolved
t-bast marked this conversation as resolved.
Show resolved Hide resolved
stay() using d.copy(spentChannels = d.spentChannels + (spendingTx.txid -> shortChannelId))

case Event(WatchTxConfirmedTriggered(_, _, spendingTx), d) =>
d.spentChannels.get(spendingTx.txid) match {
case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, spendingTx.txid, shortChannelId)
case None => stay()
}

case Event(n: NodeAnnouncement, d: Data) =>
stay() using Validation.handleNodeAnnouncement(d, nodeParams.db.network, Set(LocalGossip), n)
Expand Down Expand Up @@ -409,9 +420,9 @@ object Router {
def getBalanceSameSideAs(u: ChannelUpdate): Option[MilliSatoshi] = if (u.channelFlags.isNode1) meta_opt.map(_.balance1) else meta_opt.map(_.balance2)
def updateChannelUpdateSameSideAs(u: ChannelUpdate): PublicChannel = if (u.channelFlags.isNode1) copy(update_1_opt = Some(u)) else copy(update_2_opt = Some(u))
def updateBalances(commitments: Commitments): PublicChannel = if (commitments.localNodeId == ann.nodeId1) {
copy(meta_opt = Some(ChannelMeta(commitments.availableBalanceForSend, commitments.availableBalanceForReceive)))
copy(capacity = commitments.capacity, meta_opt = Some(ChannelMeta(commitments.availableBalanceForSend, commitments.availableBalanceForReceive)))
} else {
copy(meta_opt = Some(ChannelMeta(commitments.availableBalanceForReceive, commitments.availableBalanceForSend)))
copy(capacity = commitments.capacity, meta_opt = Some(ChannelMeta(commitments.availableBalanceForReceive, commitments.availableBalanceForSend)))
}
def applyChannelUpdate(update: Either[LocalChannelUpdate, RemoteChannelUpdate]): PublicChannel = update match {
case Left(lcu) => updateChannelUpdateSameSideAs(lcu.channelUpdate).updateBalances(lcu.commitments)
Expand Down Expand Up @@ -573,7 +584,6 @@ object Router {
def +(ignoreNode: PublicKey): Ignore = copy(nodes = nodes + ignoreNode)
def ++(ignoreNodes: Set[PublicKey]): Ignore = copy(nodes = nodes ++ ignoreNodes)
def +(ignoreChannel: ChannelDesc): Ignore = copy(channels = channels + ignoreChannel)
def emptyNodes(): Ignore = copy(nodes = Set.empty)
def emptyChannels(): Ignore = copy(channels = Set.empty)
// @formatter:on
}
Expand Down Expand Up @@ -622,12 +632,6 @@ object Router {
/** Full route including the final hop, if any. */
val fullRoute: Seq[Hop] = hops ++ finalHop_opt.toSeq

/**
* Fee paid for the trampoline hop, if any.
* Note that when using MPP to reach the trampoline node, the trampoline fee must be counted only once.
*/
val trampolineFee: MilliSatoshi = finalHop_opt.collect { case hop: NodeHop => hop.fee(amount) }.getOrElse(0 msat)

/**
* Fee paid for the blinded route, if any.
* Note that when we are the introduction node for the blinded route, we cannot easily compute the fee without the
Expand Down Expand Up @@ -757,7 +761,8 @@ object Router {
scid2PrivateChannels: Map[Long, ByteVector32], // real scid or alias to channel_id, only to be used for private channels
excludedChannels: Map[ChannelDesc, ExcludedChannelStatus], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure
graphWithBalances: GraphWithBalanceEstimates,
sync: Map[PublicKey, Syncing] // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
sync: Map[PublicKey, Syncing], // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
spentChannels: Map[TxId, RealShortChannelId], // transactions that spend funding txs that are not yet deeply buried
) {
def resolve(scid: ShortChannelId): Option[KnownChannel] = {
// let's assume this is a real scid
Expand Down
Loading
Loading