Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make feerates resolution asynchronous #654

Merged
merged 1 commit into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions src/commonMain/kotlin/fr/acinq/lightning/channel/states/Channel.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import fr.acinq.lightning.serialization.Encryption.from
import fr.acinq.lightning.transactions.Transactions.TransactionWithInputInfo.ClosingTx
import fr.acinq.lightning.utils.*
import fr.acinq.lightning.wire.*
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.first

/*
* Channel is implemented as a finite state machine
Expand All @@ -35,11 +38,16 @@ data class StaticParams(val nodeParams: NodeParams, val remoteNodeId: PublicKey)
data class ChannelContext(
val staticParams: StaticParams,
val currentBlockHeight: Int,
val currentOnChainFeerates: OnChainFeerates,
val onChainFeerates: StateFlow<OnChainFeerates?>,
override val logger: MDCLogger
) : LoggingContext {
val keyManager: KeyManager get() = staticParams.nodeParams.keyManager
val privateKey: PrivateKey get() = staticParams.nodeParams.nodePrivateKey
suspend fun currentOnChainFeerates(): OnChainFeerates {
logger.info { "retrieving feerates" }
return onChainFeerates.filterNotNull().first()
.also { logger.info { "using feerates=$it" } }
}
}

/** Channel state. */
Expand Down Expand Up @@ -181,7 +189,7 @@ sealed class ChannelState {
ChannelAction.Blockchain.SendWatch(WatchConfirmed(channelId, tx.tx, staticParams.nodeParams.minDepthBlocks.toLong(), BITCOIN_TX_CONFIRMED(tx.tx)))
)

internal fun ChannelContext.handleLocalError(cmd: ChannelCommand, t: Throwable): Pair<ChannelState, List<ChannelAction>> {
internal suspend fun ChannelContext.handleLocalError(cmd: ChannelCommand, t: Throwable): Pair<ChannelState, List<ChannelAction>> {
when (cmd) {
is ChannelCommand.MessageReceived -> logger.error(t) { "error on message ${cmd.message::class.simpleName}" }
is ChannelCommand.WatchReceived -> logger.error { "error on watch event ${cmd.watch.event::class.simpleName}" }
Expand All @@ -199,7 +207,7 @@ sealed class ChannelState {
return Pair(Aborted, actions)
}

fun forceClose(state: ChannelStateWithCommitments): Pair<ChannelState, List<ChannelAction>> {
suspend fun forceClose(state: ChannelStateWithCommitments): Pair<ChannelState, List<ChannelAction>> {
val error = Error(state.channelId, t.message)
return state.run { spendLocalCurrent().run { copy(second = second + ChannelAction.Message.Send(error)) } }
}
Expand Down Expand Up @@ -253,7 +261,7 @@ sealed class ChannelState {
}
}

fun ChannelContext.handleRemoteError(e: Error): Pair<ChannelState, List<ChannelAction>> {
suspend fun ChannelContext.handleRemoteError(e: Error): Pair<ChannelState, List<ChannelAction>> {
// see BOLT 1: only print out data verbatim if is composed of printable ASCII characters
logger.error { "peer sent error: ascii='${e.toAscii()}' bin=${e.data.toHex()}" }
return when (this@ChannelState) {
Expand Down Expand Up @@ -386,7 +394,7 @@ sealed class ChannelStateWithCommitments : PersistedChannelState() {
/**
* Analyze and react to a potential force-close transaction spending one of our funding transactions.
*/
internal fun ChannelContext.handlePotentialForceClose(w: WatchEventSpent): Pair<ChannelStateWithCommitments, List<ChannelAction>> = when {
internal suspend fun ChannelContext.handlePotentialForceClose(w: WatchEventSpent): Pair<ChannelStateWithCommitments, List<ChannelAction>> = when {
w.event != BITCOIN_FUNDING_SPENT -> Pair(this@ChannelStateWithCommitments, listOf())
commitments.all.any { it.fundingTxId == w.tx.txid } -> Pair(this@ChannelStateWithCommitments, listOf()) // if the spending tx is itself a funding tx, this is a splice and there is nothing to do
w.tx.txid == commitments.latest.localCommit.publishableTxs.commitTx.tx.txid -> spendLocalCurrent()
Expand Down Expand Up @@ -414,11 +422,11 @@ sealed class ChannelStateWithCommitments : PersistedChannelState() {
}
}

internal fun ChannelContext.handleRemoteSpentCurrent(commitTx: Transaction, commitment: FullCommitment): Pair<Closing, List<ChannelAction>> {
internal suspend fun ChannelContext.handleRemoteSpentCurrent(commitTx: Transaction, commitment: FullCommitment): Pair<Closing, List<ChannelAction>> {
logger.warning { "they published their current commit in txid=${commitTx.txid}" }
require(commitTx.txid == commitment.remoteCommit.txid) { "txid mismatch" }

val remoteCommitPublished = claimRemoteCommitTxOutputs(channelKeys(), commitment, commitment.remoteCommit, commitTx, currentOnChainFeerates)
val remoteCommitPublished = claimRemoteCommitTxOutputs(channelKeys(), commitment, commitment.remoteCommit, commitTx, currentOnChainFeerates())

val nextState = when (this@ChannelStateWithCommitments) {
is Closing -> [email protected](remoteCommitPublished = remoteCommitPublished)
Expand All @@ -441,13 +449,13 @@ sealed class ChannelStateWithCommitments : PersistedChannelState() {
})
}

internal fun ChannelContext.handleRemoteSpentNext(commitTx: Transaction, commitment: FullCommitment): Pair<ChannelStateWithCommitments, List<ChannelAction>> {
internal suspend fun ChannelContext.handleRemoteSpentNext(commitTx: Transaction, commitment: FullCommitment): Pair<ChannelStateWithCommitments, List<ChannelAction>> {
logger.warning { "they published their next commit in txid=${commitTx.txid}" }
require(commitment.nextRemoteCommit != null) { "next remote commit must be defined" }
val remoteCommit = commitment.nextRemoteCommit.commit
require(commitTx.txid == remoteCommit.txid) { "txid mismatch" }

val remoteCommitPublished = claimRemoteCommitTxOutputs(channelKeys(), commitment, remoteCommit, commitTx, currentOnChainFeerates)
val remoteCommitPublished = claimRemoteCommitTxOutputs(channelKeys(), commitment, remoteCommit, commitTx, currentOnChainFeerates())

val nextState = when (this@ChannelStateWithCommitments) {
is Closing -> copy(nextRemoteCommitPublished = remoteCommitPublished)
Expand All @@ -470,11 +478,11 @@ sealed class ChannelStateWithCommitments : PersistedChannelState() {
})
}

internal fun ChannelContext.handleRemoteSpentOther(tx: Transaction): Pair<ChannelStateWithCommitments, List<ChannelAction>> {
internal suspend fun ChannelContext.handleRemoteSpentOther(tx: Transaction): Pair<ChannelStateWithCommitments, List<ChannelAction>> {
logger.warning { "funding tx spent in txid=${tx.txid}" }
return getRemotePerCommitmentSecret(channelKeys(), commitments.params, commitments.remotePerCommitmentSecrets, tx)?.let { (remotePerCommitmentSecret, commitmentNumber) ->
logger.warning { "txid=${tx.txid} was a revoked commitment, publishing the penalty tx" }
val revokedCommitPublished = claimRevokedRemoteCommitTxOutputs(channelKeys(), commitments.params, remotePerCommitmentSecret, tx, currentOnChainFeerates)
val revokedCommitPublished = claimRevokedRemoteCommitTxOutputs(channelKeys(), commitments.params, remotePerCommitmentSecret, tx, currentOnChainFeerates())
val ex = FundingTxSpent(channelId, tx.txid)
val error = Error(channelId, ex.message)
val nextState = when (this@ChannelStateWithCommitments) {
Expand Down Expand Up @@ -505,7 +513,7 @@ sealed class ChannelStateWithCommitments : PersistedChannelState() {
when (this@ChannelStateWithCommitments) {
is WaitForRemotePublishFutureCommitment -> {
logger.warning { "they published their future commit (because we asked them to) in txid=${tx.txid}" }
val remoteCommitPublished = claimRemoteCommitMainOutput(channelKeys(), commitments.params, tx, currentOnChainFeerates.claimMainFeerate)
val remoteCommitPublished = claimRemoteCommitMainOutput(channelKeys(), commitments.params, tx, currentOnChainFeerates().claimMainFeerate)
val nextState = Closing(
commitments = commitments,
waitingSinceBlock = currentBlockHeight.toLong(),
Expand Down Expand Up @@ -535,7 +543,7 @@ sealed class ChannelStateWithCommitments : PersistedChannelState() {
}
else -> {
logger.warning { "they published an alternative commitment with feerate=${remoteCommit.spec.feerate} txid=${tx.txid}" }
val remoteCommitPublished = claimRemoteCommitMainOutput(channelKeys(), commitments.params, tx, currentOnChainFeerates.claimMainFeerate)
val remoteCommitPublished = claimRemoteCommitMainOutput(channelKeys(), commitments.params, tx, currentOnChainFeerates().claimMainFeerate)
val nextState = when (this@ChannelStateWithCommitments) {
is Closing -> [email protected](remoteCommitPublished = remoteCommitPublished)
is Negotiating -> Closing(commitments, waitingSinceBlock = currentBlockHeight.toLong(), mutualCloseProposed = closingTxProposed.flatten().map { it.unsignedTx }, remoteCommitPublished = remoteCommitPublished)
Expand All @@ -552,7 +560,7 @@ sealed class ChannelStateWithCommitments : PersistedChannelState() {
}
}

internal fun ChannelContext.spendLocalCurrent(): Pair<ChannelStateWithCommitments, List<ChannelAction>> {
internal suspend fun ChannelContext.spendLocalCurrent(): Pair<ChannelStateWithCommitments, List<ChannelAction>> {
val outdatedCommitment = when (this@ChannelStateWithCommitments) {
is WaitForRemotePublishFutureCommitment -> true
is Closing -> [email protected] != null
Expand All @@ -568,7 +576,7 @@ sealed class ChannelStateWithCommitments : PersistedChannelState() {
channelKeys(),
commitments.latest,
commitTx,
currentOnChainFeerates
currentOnChainFeerates()
)
val nextState = when (this@ChannelStateWithCommitments) {
is Closing -> copy(localCommitPublished = localCommitPublished)
Expand Down Expand Up @@ -596,7 +604,7 @@ sealed class ChannelStateWithCommitments : PersistedChannelState() {
* Check HTLC timeout in our commitment and our remote's.
* If HTLCs are at risk, we will publish our local commitment and close the channel.
*/
internal fun ChannelContext.checkHtlcTimeout(): Pair<ChannelStateWithCommitments, List<ChannelAction>> {
internal suspend fun ChannelContext.checkHtlcTimeout(): Pair<ChannelStateWithCommitments, List<ChannelAction>> {
logger.info { "checking htlcs timeout at blockHeight=${currentBlockHeight}" }
val timedOutOutgoing = commitments.timedOutOutgoingHtlcs(currentBlockHeight.toLong())
val almostTimedOutIncoming = commitments.almostTimedOutIncomingHtlcs(currentBlockHeight.toLong(), staticParams.nodeParams.fulfillSafetyBeforeTimeoutBlocks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ data class Closing(

val revokedCommitPublishActions = mutableListOf<ChannelAction>()
val revokedCommitPublished1 = revokedCommitPublished.map { rev ->
val (newRevokedCommitPublished, penaltyTxs) = claimRevokedHtlcTxOutputs(channelKeys(), commitments.params, rev, watch.tx, currentOnChainFeerates)
val (newRevokedCommitPublished, penaltyTxs) = claimRevokedHtlcTxOutputs(channelKeys(), commitments.params, rev, watch.tx, currentOnChainFeerates())
penaltyTxs.forEach {
revokedCommitPublishActions += ChannelAction.Blockchain.PublishTx(it)
revokedCommitPublishActions += ChannelAction.Blockchain.SendWatch(WatchSpent(channelId, watch.tx, it.input.outPoint.index.toInt(), BITCOIN_OUTPUT_SPENT))
Expand Down Expand Up @@ -300,7 +300,7 @@ data class Closing(
is ChannelCommand.Closing.GetHtlcInfosResponse -> {
val index = revokedCommitPublished.indexOfFirst { it.commitTx.txid == cmd.revokedCommitTxId }
if (index >= 0) {
val revokedCommitPublished1 = claimRevokedRemoteCommitTxHtlcOutputs(channelKeys(), commitments.params, revokedCommitPublished[index], currentOnChainFeerates, cmd.htlcInfos)
val revokedCommitPublished1 = claimRevokedRemoteCommitTxHtlcOutputs(channelKeys(), commitments.params, revokedCommitPublished[index], currentOnChainFeerates(), cmd.htlcInfos)
val nextState = copy(revokedCommitPublished = revokedCommitPublished.updated(index, revokedCommitPublished1))
val actions = buildList {
add(ChannelAction.Storage.StoreState(nextState))
Expand Down Expand Up @@ -339,14 +339,14 @@ data class Closing(
logger.info { "got valid payment preimage, recalculating transactions to redeem the corresponding htlc on-chain" }
val commitments1 = result.value.first
val localCommitPublished1 = localCommitPublished?.let {
claimCurrentLocalCommitTxOutputs(channelKeys(), commitments1.latest, it.commitTx, currentOnChainFeerates)
claimCurrentLocalCommitTxOutputs(channelKeys(), commitments1.latest, it.commitTx, currentOnChainFeerates())
}
val remoteCommitPublished1 = remoteCommitPublished?.let {
claimRemoteCommitTxOutputs(channelKeys(), commitments1.latest, commitments1.latest.remoteCommit, it.commitTx, currentOnChainFeerates)
claimRemoteCommitTxOutputs(channelKeys(), commitments1.latest, commitments1.latest.remoteCommit, it.commitTx, currentOnChainFeerates())
}
val nextRemoteCommitPublished1 = nextRemoteCommitPublished?.let {
val remoteCommit = commitments1.latest.nextRemoteCommit?.commit ?: error("next remote commit must be defined")
claimRemoteCommitTxOutputs(channelKeys(), commitments1.latest, remoteCommit, it.commitTx, currentOnChainFeerates)
claimRemoteCommitTxOutputs(channelKeys(), commitments1.latest, remoteCommit, it.commitTx, currentOnChainFeerates())
}
val republishList = buildList {
val minDepth = staticParams.nodeParams.minDepthBlocks.toLong()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ data class Negotiating(
}
else -> {
val theirFeeRange = cmd.message.tlvStream.get<ClosingSignedTlv.FeeRange>()
val ourFeeRange = closingFeerates ?: ClosingFeerates(currentOnChainFeerates.mutualCloseFeerate)
val ourFeeRange = closingFeerates ?: ClosingFeerates(currentOnChainFeerates().mutualCloseFeerate)
when {
theirFeeRange != null && !commitments.params.localParams.isInitiator -> {
// if we are not the initiator and they proposed a fee range, we pick a value in that range and they should accept it without further negotiation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ data class Normal(
commitments1.latest,
localShutdown.scriptPubKey.toByteArray(),
remoteShutdown.scriptPubKey.toByteArray(),
closingFeerates ?: ClosingFeerates(currentOnChainFeerates.mutualCloseFeerate),
closingFeerates ?: ClosingFeerates(currentOnChainFeerates().mutualCloseFeerate),
)
listOf(listOf(ClosingTxProposed(closingTx, closingSigned)))
} else {
Expand Down Expand Up @@ -319,7 +319,7 @@ data class Normal(
commitments1.latest,
localShutdown.scriptPubKey.toByteArray(),
cmd.message.scriptPubKey.toByteArray(),
closingFeerates ?: ClosingFeerates(currentOnChainFeerates.mutualCloseFeerate),
closingFeerates ?: ClosingFeerates(currentOnChainFeerates().mutualCloseFeerate),
)
val nextState = Negotiating(
commitments1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ data class ShuttingDown(
commitments1.latest,
localShutdown.scriptPubKey.toByteArray(),
remoteShutdown.scriptPubKey.toByteArray(),
closingFeerates ?: ClosingFeerates(currentOnChainFeerates.mutualCloseFeerate)
closingFeerates ?: ClosingFeerates(currentOnChainFeerates().mutualCloseFeerate)
)
val nextState = Negotiating(
commitments1,
Expand Down Expand Up @@ -99,7 +99,7 @@ data class ShuttingDown(
commitments1.latest,
localShutdown.scriptPubKey.toByteArray(),
remoteShutdown.scriptPubKey.toByteArray(),
closingFeerates ?: ClosingFeerates(currentOnChainFeerates.mutualCloseFeerate)
closingFeerates ?: ClosingFeerates(currentOnChainFeerates().mutualCloseFeerate)
)
val nextState = Negotiating(
commitments1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ data class Syncing(val state: PersistedChannelState, val channelReestablishSent:
state.commitments.latest,
state.localShutdown.scriptPubKey.toByteArray(),
state.remoteShutdown.scriptPubKey.toByteArray(),
state.closingFeerates ?: ClosingFeerates(currentOnChainFeerates.mutualCloseFeerate)
state.closingFeerates ?: ClosingFeerates(currentOnChainFeerates().mutualCloseFeerate)
)
val closingTxProposed1 = state.closingTxProposed + listOf(listOf(ClosingTxProposed(closingTx, closingSigned)))
val nextState = state.copy(closingTxProposed = closingTxProposed1)
Expand Down
2 changes: 1 addition & 1 deletion src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ class Peer(
val ctx = ChannelContext(
StaticParams(nodeParams, remoteNodeId),
currentTipFlow.filterNotNull().first().first,
onChainFeeratesFlow.filterNotNull().first(),
onChainFeeratesFlow,
logger = MDCLogger(
logger = _channelLogger,
staticMdc = mapOf("remoteNodeId" to remoteNodeId) + state.mdc()
Expand Down
Loading
Loading