From df331409af2e0466f45f4360b5d705df5385f99f Mon Sep 17 00:00:00 2001 From: pm47 Date: Tue, 19 Mar 2024 18:23:30 +0100 Subject: [PATCH 1/9] introduce IWatcher/IClient interfaces --- .../fr/acinq/lightning/blockchain/IClient.kt | 46 +++++++++ .../fr/acinq/lightning/blockchain/IWatcher.kt | 13 +++ .../electrum/ElectrumClientExtensions.kt | 38 -------- .../blockchain/electrum/ElectrumWatcher.kt | 12 ++- .../blockchain/electrum/IElectrumClient.kt | 7 +- .../kotlin/fr/acinq/lightning/io/Peer.kt | 96 +++++++++++-------- .../acinq/lightning/tests/io/peer/builders.kt | 2 +- 7 files changed, 127 insertions(+), 87 deletions(-) create mode 100644 src/commonMain/kotlin/fr/acinq/lightning/blockchain/IClient.kt create mode 100644 src/commonMain/kotlin/fr/acinq/lightning/blockchain/IWatcher.kt diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/IClient.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/IClient.kt new file mode 100644 index 000000000..c33424539 --- /dev/null +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/IClient.kt @@ -0,0 +1,46 @@ +package fr.acinq.lightning.blockchain + +import fr.acinq.bitcoin.Satoshi +import fr.acinq.bitcoin.TxId +import fr.acinq.lightning.blockchain.fee.FeeratePerKw +import fr.acinq.lightning.channel.Commitments +import fr.acinq.lightning.channel.LocalFundingStatus +import fr.acinq.lightning.logging.MDCLogger +import fr.acinq.lightning.transactions.Transactions +import fr.acinq.lightning.utils.sat + +interface IClient { + suspend fun getConfirmations(txId: TxId): Int? + + /** Estimate the feerate required for a transaction to be confirmed in the next [confirmations] blocks. */ + suspend fun estimateFees(confirmations: Int): FeeratePerKw? +} + +/** + * @weight must be the total estimated weight of the splice tx, otherwise the feerate estimation will be wrong + */ +suspend fun IClient.computeSpliceCpfpFeerate(commitments: Commitments, targetFeerate: FeeratePerKw, spliceWeight: Int, logger: MDCLogger): Pair { + val (parentsWeight, parentsFees) = commitments.all + .takeWhile { getConfirmations(it.fundingTxId).let { confirmations -> confirmations == null || confirmations == 0 } } // we check for null in case the tx has been evicted + .fold(Pair(0, 0.sat)) { (parentsWeight, parentsFees), commitment -> + val weight = when (commitment.localFundingStatus) { + // weight will be underestimated if the transaction is not fully signed + is LocalFundingStatus.UnconfirmedFundingTx -> commitment.localFundingStatus.signedTx?.weight() ?: commitment.localFundingStatus.sharedTx.tx.buildUnsignedTx().weight() + is LocalFundingStatus.ConfirmedFundingTx -> commitment.localFundingStatus.signedTx.weight() + } + Pair(parentsWeight + weight, parentsFees + commitment.localFundingStatus.fee) + } + val totalWeight = parentsWeight + spliceWeight + val totalFees = Transactions.weight2fee(targetFeerate, totalWeight) + val projectedFee = totalFees - parentsFees + val projectedFeerate = Transactions.fee2rate(projectedFee, spliceWeight) + // if ancestors have a higher feerate than target, min feerate could be negative + val actualFeerate = maxOf(projectedFeerate, targetFeerate) + val actualFee = Transactions.weight2fee(actualFeerate, spliceWeight) + logger.info { "targetFeerate=$targetFeerate spliceWeight=$spliceWeight" } + logger.info { "parentsWeight=$parentsWeight parentsFees=$parentsFees" } + logger.info { "totalWeight=$totalWeight totalFees=$totalFees" } + logger.info { "projectedFeerate=$projectedFeerate projectedFee=$projectedFee" } + logger.info { "actualFeerate=$actualFeerate actualFee=$actualFee" } + return Pair(actualFeerate, actualFee) +} \ No newline at end of file diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/IWatcher.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/IWatcher.kt new file mode 100644 index 000000000..9ebaf4d3f --- /dev/null +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/IWatcher.kt @@ -0,0 +1,13 @@ +package fr.acinq.lightning.blockchain + +import fr.acinq.bitcoin.Transaction +import fr.acinq.lightning.blockchain.electrum.IElectrumClient +import kotlinx.coroutines.flow.Flow + +interface IWatcher { + fun openWatchNotificationsFlow(): Flow + + suspend fun watch(watch: Watch) + + suspend fun publish(tx: Transaction) +} \ No newline at end of file diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientExtensions.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientExtensions.kt index 1e51088a8..377acaad6 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientExtensions.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientExtensions.kt @@ -1,16 +1,6 @@ package fr.acinq.lightning.blockchain.electrum -import fr.acinq.bitcoin.Satoshi import fr.acinq.bitcoin.Transaction -import fr.acinq.bitcoin.TxId -import fr.acinq.lightning.blockchain.fee.FeeratePerKw -import fr.acinq.lightning.channel.Commitments -import fr.acinq.lightning.channel.LocalFundingStatus -import fr.acinq.lightning.logging.* -import fr.acinq.lightning.transactions.Transactions -import fr.acinq.lightning.utils.sat - -suspend fun IElectrumClient.getConfirmations(txId: TxId): Int? = getTx(txId)?.let { tx -> getConfirmations(tx) } /** * @return the number of confirmations, zero if the transaction is in the mempool, null if the transaction is not found @@ -28,31 +18,3 @@ suspend fun IElectrumClient.getConfirmations(tx: Transaction): Int? { } } -/** - * @weight must be the total estimated weight of the splice tx, otherwise the feerate estimation will be wrong - */ -suspend fun IElectrumClient.computeSpliceCpfpFeerate(commitments: Commitments, targetFeerate: FeeratePerKw, spliceWeight: Int, logger: MDCLogger): Pair { - val (parentsWeight, parentsFees) = commitments.all - .takeWhile { getConfirmations(it.fundingTxId).let { confirmations -> confirmations == null || confirmations == 0 } } // we check for null in case the tx has been evicted - .fold(Pair(0, 0.sat)) { (parentsWeight, parentsFees), commitment -> - val weight = when (commitment.localFundingStatus) { - // weight will be underestimated if the transaction is not fully signed - is LocalFundingStatus.UnconfirmedFundingTx -> commitment.localFundingStatus.signedTx?.weight() ?: commitment.localFundingStatus.sharedTx.tx.buildUnsignedTx().weight() - is LocalFundingStatus.ConfirmedFundingTx -> commitment.localFundingStatus.signedTx.weight() - } - Pair(parentsWeight + weight, parentsFees + commitment.localFundingStatus.fee) - } - val totalWeight = parentsWeight + spliceWeight - val totalFees = Transactions.weight2fee(targetFeerate, totalWeight) - val projectedFee = totalFees - parentsFees - val projectedFeerate = Transactions.fee2rate(projectedFee, spliceWeight) - // if ancestors have a higher feerate than target, min feerate could be negative - val actualFeerate = maxOf(projectedFeerate, targetFeerate) - val actualFee = Transactions.weight2fee(actualFeerate, spliceWeight) - logger.info { "targetFeerate=$targetFeerate spliceWeight=$spliceWeight" } - logger.info { "parentsWeight=$parentsWeight parentsFees=$parentsFees" } - logger.info { "totalWeight=$totalWeight totalFees=$totalFees" } - logger.info { "projectedFeerate=$projectedFeerate projectedFee=$projectedFee" } - logger.info { "actualFeerate=$actualFeerate actualFee=$actualFee" } - return Pair(actualFeerate, actualFee) -} diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumWatcher.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumWatcher.kt index 4f0425f2f..b3f1f4c1a 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumWatcher.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumWatcher.kt @@ -3,7 +3,9 @@ package fr.acinq.lightning.blockchain.electrum import fr.acinq.bitcoin.ByteVector32 import fr.acinq.bitcoin.Transaction import fr.acinq.lightning.blockchain.* -import fr.acinq.lightning.logging.* +import fr.acinq.lightning.logging.LoggerFactory +import fr.acinq.lightning.logging.debug +import fr.acinq.lightning.logging.info import fr.acinq.lightning.transactions.Scripts import fr.acinq.lightning.utils.currentTimestampMillis import kotlinx.coroutines.* @@ -15,24 +17,24 @@ import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.flow.consumeAsFlow import kotlin.math.max -class ElectrumWatcher(val client: IElectrumClient, val scope: CoroutineScope, loggerFactory: LoggerFactory) : CoroutineScope by scope { +class ElectrumWatcher(val client: IElectrumClient, val scope: CoroutineScope, loggerFactory: LoggerFactory) : IWatcher, CoroutineScope by scope { private val logger = loggerFactory.newLogger(this::class) private val mailbox = Channel(Channel.BUFFERED) private val _notificationsFlow = MutableSharedFlow(replay = 0, extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.SUSPEND) - fun openWatchNotificationsFlow(): Flow = _notificationsFlow.asSharedFlow() + override fun openWatchNotificationsFlow(): Flow = _notificationsFlow.asSharedFlow() // this is used by a Swift watch-tower module in the Phoenix iOS app to tell when the watcher is up-to-date // the value that is emitted in the time elapsed (in milliseconds) since the watcher is ready and idle private val _uptodateFlow = MutableSharedFlow(replay = 0, extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.SUSPEND) fun openUpToDateFlow(): Flow = _uptodateFlow.asSharedFlow() - suspend fun watch(watch: Watch) { + override suspend fun watch(watch: Watch) { mailbox.send(WatcherCommand.AddWatch(watch)) } - suspend fun publish(tx: Transaction) { + override suspend fun publish(tx: Transaction) { mailbox.send(WatcherCommand.Publish(tx)) } diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/IElectrumClient.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/IElectrumClient.kt index 64cb4ac5a..72d022068 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/IElectrumClient.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/IElectrumClient.kt @@ -4,12 +4,13 @@ import fr.acinq.bitcoin.BlockHeader import fr.acinq.bitcoin.ByteVector32 import fr.acinq.bitcoin.Transaction import fr.acinq.bitcoin.TxId +import fr.acinq.lightning.blockchain.IClient import fr.acinq.lightning.blockchain.fee.FeeratePerKw import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.StateFlow /** Note to implementers: methods exposed through this interface must *not* throw exceptions. */ -interface IElectrumClient { +interface IElectrumClient : IClient { val notifications: Flow val connectionStatus: StateFlow @@ -38,7 +39,7 @@ interface IElectrumClient { suspend fun broadcastTransaction(tx: Transaction): TxId /** Estimate the feerate required for a transaction to be confirmed in the next [confirmations] blocks. */ - suspend fun estimateFees(confirmations: Int): FeeratePerKw? + override suspend fun estimateFees(confirmations: Int): FeeratePerKw? /******************** Subscriptions ********************/ @@ -47,4 +48,6 @@ interface IElectrumClient { /** Subscribe to headers for new blocks found. */ suspend fun startHeaderSubscription(): HeaderSubscriptionResponse + + override suspend fun getConfirmations(txId: TxId): Int? = getTx(txId)?.let { tx -> getConfirmations(tx) } } \ No newline at end of file diff --git a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt index 5841196ef..dc54788ee 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt @@ -3,7 +3,11 @@ package fr.acinq.lightning.io import fr.acinq.bitcoin.* import fr.acinq.bitcoin.utils.Either import fr.acinq.lightning.* +import fr.acinq.lightning.blockchain.IClient +import fr.acinq.lightning.blockchain.IWatcher +import fr.acinq.lightning.blockchain.IClient.* import fr.acinq.lightning.blockchain.WatchEvent +import fr.acinq.lightning.blockchain.computeSpliceCpfpFeerate import fr.acinq.lightning.blockchain.electrum.* import fr.acinq.lightning.blockchain.fee.FeeratePerByte import fr.acinq.lightning.blockchain.fee.FeeratePerKw @@ -129,7 +133,8 @@ data class PhoenixAndroidLegacyInfoEvent(val info: PhoenixAndroidLegacyInfo) : P class Peer( val nodeParams: NodeParams, val walletParams: WalletParams, - val watcher: ElectrumWatcher, + val client: IClient, + val watcher: IWatcher, val db: Databases, socketBuilder: TcpSocket.Builder?, scope: CoroutineScope, @@ -215,8 +220,8 @@ class Peer( } } - val finalWallet = FinalWallet(nodeParams.chain, nodeParams.keyManager.finalOnChainWallet, watcher.client, scope, nodeParams.loggerFactory) - val swapInWallet = SwapInWallet(nodeParams.chain, nodeParams.keyManager.swapInOnChainWallet, watcher.client, scope, nodeParams.loggerFactory) + val finalWallet = (client as? IElectrumClient)?.let { electrumClient -> FinalWallet(nodeParams.chain, nodeParams.keyManager.finalOnChainWallet, electrumClient, scope, nodeParams.loggerFactory) } + val swapInWallet = (client as? IElectrumClient)?.let { electrumClient -> SwapInWallet(nodeParams.chain, nodeParams.keyManager.swapInOnChainWallet, electrumClient, scope, nodeParams.loggerFactory) } private var swapInJob: Job? = null @@ -225,17 +230,22 @@ class Peer( init { logger.info { "initializing peer" } launch { - watcher.client.notifications.filterIsInstance() - .collect { msg -> - currentTipFlow.value = msg.blockHeight to msg.header - } + when(client) { + is IElectrumClient -> client.notifications.filterIsInstance() + .collect { msg -> + currentTipFlow.value = msg.blockHeight to msg.header + } + } + } launch { - watcher.client.connectionStatus.filter { it is ElectrumConnectionStatus.Connected }.collect { - // onchain fees are retrieved punctually, when electrum status moves to Connection.ESTABLISHED - // since the application is not running most of the time, and when it is, it will be only for a few minutes, this is good enough. - // (for a node that is online most of the time things would be different and we would need to re-evaluate onchain fee estimates on a regular basis) - updateEstimateFees() + when(client) { + is IElectrumClient -> client.connectionStatus.filter { it is ElectrumConnectionStatus.Connected }.collect { + // onchain fees are retrieved punctually, when electrum status moves to Connection.ESTABLISHED + // since the application is not running most of the time, and when it is, it will be only for a few minutes, this is good enough. + // (for a node that is online most of the time things would be different and we would need to re-evaluate onchain fee estimates on a regular basis) + updateEstimateFees() + } } } launch { @@ -282,12 +292,11 @@ class Peer( } private suspend fun updateEstimateFees() { - watcher.client.connectionStatus.filter { it is ElectrumConnectionStatus.Connected }.first() val sortedFees = listOf( - watcher.client.estimateFees(2), - watcher.client.estimateFees(6), - watcher.client.estimateFees(18), - watcher.client.estimateFees(144), + client.estimateFees(2), + client.estimateFees(6), + client.estimateFees(18), + client.estimateFees(144), ) logger.info { "on-chain fees: $sortedFees" } // TODO: If some feerates are null, we may implement a retry @@ -463,27 +472,32 @@ class Peer( * Warning: not thread-safe! */ suspend fun startWatchSwapInWallet() { - logger.info { "starting swap-in watch job" } - if (swapInJob != null) { - logger.info { "swap-in watch job already started" } - return - } - logger.info { "waiting for peer to be ready" } - waitForPeerReady() - swapInJob = launch { - swapInWallet.wallet.walletStateFlow - .combine(currentTipFlow.filterNotNull()) { walletState, currentTip -> Pair(walletState, currentTip.first) } - .combine(swapInFeeratesFlow.filterNotNull()) { (walletState, currentTip), feerate -> Triple(walletState, currentTip, feerate) } - .combine(nodeParams.liquidityPolicy) { (walletState, currentTip, feerate), policy -> TrySwapInFlow(currentTip, walletState, feerate, policy) } - .collect { w -> - // Local mutual close txs from pre-splice channels can be used as zero-conf inputs for swap-in to facilitate migration - val mutualCloseTxs = channels.values - .filterIsInstance() - .filterNot { it.commitments.params.channelFeatures.hasFeature(Feature.DualFunding) } - .flatMap { state -> state.mutualClosePublished.map { closingTx -> closingTx.tx.txid } } - val trustedTxs = trustedSwapInTxs + mutualCloseTxs - swapInCommands.send(SwapInCommand.TrySwapIn(w.currentBlockHeight, w.walletState, walletParams.swapInParams, trustedTxs)) + when { + swapInWallet == null -> logger.warning { "swap-in wallet unavailable" } + else -> { + logger.info { "starting swap-in watch job" } + if (swapInJob != null) { + logger.info { "swap-in watch job already started" } + return } + logger.info { "waiting for peer to be ready" } + waitForPeerReady() + swapInJob = launch { + swapInWallet.wallet.walletStateFlow + .combine(currentTipFlow.filterNotNull()) { walletState, currentTip -> Pair(walletState, currentTip.first) } + .combine(swapInFeeratesFlow.filterNotNull()) { (walletState, currentTip), feerate -> Triple(walletState, currentTip, feerate) } + .combine(nodeParams.liquidityPolicy) { (walletState, currentTip, feerate), policy -> TrySwapInFlow(currentTip, walletState, feerate, policy) } + .collect { w -> + // Local mutual close txs from pre-splice channels can be used as zero-conf inputs for swap-in to facilitate migration + val mutualCloseTxs = channels.values + .filterIsInstance() + .filterNot { it.commitments.params.channelFeatures.hasFeature(Feature.DualFunding) } + .flatMap { state -> state.mutualClosePublished.map { closingTx -> closingTx.tx.txid } } + val trustedTxs = trustedSwapInTxs + mutualCloseTxs + swapInCommands.send(SwapInCommand.TrySwapIn(w.currentBlockHeight, w.walletState, walletParams.swapInParams, trustedTxs)) + } + } + } } } @@ -530,7 +544,7 @@ class Peer( .firstOrNull { it.commitments.availableBalanceForSend() > amount } ?.let { channel -> val weight = FundingContributions.computeWeightPaid(isInitiator = true, commitment = channel.commitments.active.first(), walletInputs = emptyList(), localOutputs = listOf(TxOut(amount, scriptPubKey))) - val (actualFeerate, miningFee) = watcher.client.computeSpliceCpfpFeerate(channel.commitments, targetFeerate, spliceWeight = weight, logger) + val (actualFeerate, miningFee) = client.computeSpliceCpfpFeerate(channel.commitments, targetFeerate, spliceWeight = weight, logger) Pair(actualFeerate, ChannelCommand.Commitment.Splice.Fees(miningFee, 0.msat)) } } @@ -549,7 +563,7 @@ class Peer( .find { it.channelId == channelId } ?.let { channel -> val weight = FundingContributions.computeWeightPaid(isInitiator = true, commitment = channel.commitments.active.first(), walletInputs = emptyList(), localOutputs = emptyList()) - val (actualFeerate, miningFee) = watcher.client.computeSpliceCpfpFeerate(channel.commitments, targetFeerate, spliceWeight = weight, logger) + val (actualFeerate, miningFee) = client.computeSpliceCpfpFeerate(channel.commitments, targetFeerate, spliceWeight = weight, logger) Pair(actualFeerate, ChannelCommand.Commitment.Splice.Fees(miningFee, 0.msat)) } } @@ -565,7 +579,7 @@ class Peer( ?.let { channel -> val weight = FundingContributions.computeWeightPaid(isInitiator = true, commitment = channel.commitments.active.first(), walletInputs = emptyList(), localOutputs = emptyList()) + leaseRate.fundingWeight // The mining fee below pays for the entirety of the splice transaction, including inputs and outputs from the liquidity provider. - val (actualFeerate, miningFee) = watcher.client.computeSpliceCpfpFeerate(channel.commitments, targetFeerate, spliceWeight = weight, logger) + val (actualFeerate, miningFee) = client.computeSpliceCpfpFeerate(channel.commitments, targetFeerate, spliceWeight = weight, logger) // The mining fee in the lease only covers the remote node's inputs and outputs, they are already included in the mining fee above. val leaseFees = leaseRate.fees(actualFeerate, amount, amount) Pair(actualFeerate, ChannelCommand.Commitment.Splice.Fees(miningFee, leaseFees.serviceFee.toMilliSatoshi())) @@ -1161,7 +1175,7 @@ class Peer( // we have a channel and we are connected (otherwise state would be Offline/Syncing) val targetFeerate = swapInFeeratesFlow.filterNotNull().first() val weight = FundingContributions.computeWeightPaid(isInitiator = true, commitment = channel.commitments.active.first(), walletInputs = cmd.walletInputs, localOutputs = emptyList()) - val (feerate, fee) = watcher.client.computeSpliceCpfpFeerate(channel.commitments, targetFeerate, spliceWeight = weight, logger) + val (feerate, fee) = client.computeSpliceCpfpFeerate(channel.commitments, targetFeerate, spliceWeight = weight, logger) logger.info { "requesting splice-in using balance=${cmd.walletInputs.balance} feerate=$feerate fee=$fee" } nodeParams.liquidityPolicy.value.maybeReject(cmd.walletInputs.balance.toMilliSatoshi(), fee.toMilliSatoshi(), LiquidityEvents.Source.OnChainWallet, logger)?.let { rejected -> diff --git a/src/commonTest/kotlin/fr/acinq/lightning/tests/io/peer/builders.kt b/src/commonTest/kotlin/fr/acinq/lightning/tests/io/peer/builders.kt index c82c3bde3..7a9e81219 100644 --- a/src/commonTest/kotlin/fr/acinq/lightning/tests/io/peer/builders.kt +++ b/src/commonTest/kotlin/fr/acinq/lightning/tests/io/peer/builders.kt @@ -186,7 +186,7 @@ suspend fun buildPeer( ): Peer { val electrum = ElectrumClient(scope, nodeParams.loggerFactory) val watcher = ElectrumWatcher(electrum, scope, nodeParams.loggerFactory) - val peer = Peer(nodeParams, walletParams, watcher, databases, TcpSocket.Builder(), scope) + val peer = Peer(nodeParams, walletParams, watcher.client, watcher, databases, TcpSocket.Builder(), scope) peer.currentTipFlow.value = currentTip peer.onChainFeeratesFlow.value = OnChainFeerates( fundingFeerate = FeeratePerKw(FeeratePerByte(5.sat)), From 067d6e4be9204f9031d2a0e6dcdeea7efc6fb5ad Mon Sep 17 00:00:00 2001 From: pm47 Date: Tue, 19 Mar 2024 20:41:44 +0100 Subject: [PATCH 2/9] add MempoolSpace Watcher/Client --- build.gradle.kts | 19 ++- .../blockchain/mempool/MempoolSpaceClient.kt | 126 ++++++++++++++++++ .../blockchain/mempool/MempoolSpaceWatcher.kt | 99 ++++++++++++++ .../kotlin/fr/acinq/lightning/io/Peer.kt | 39 ++++-- .../mempool/MempoolSpaceWatcherTest.kt | 68 ++++++++++ 5 files changed, 330 insertions(+), 21 deletions(-) create mode 100644 src/commonMain/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClient.kt create mode 100644 src/commonMain/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceWatcher.kt create mode 100644 src/commonTest/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceWatcherTest.kt diff --git a/build.gradle.kts b/build.gradle.kts index d708728f8..43b29c96e 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -89,16 +89,16 @@ kotlin { api("co.touchlab:kermit:$kermitLoggerVersion") api(ktor("network")) api(ktor("network-tls")) - } - } - - commonTest { - dependencies { implementation(ktor("client-core")) implementation(ktor("client-auth")) implementation(ktor("client-json")) implementation(ktor("client-content-negotiation")) implementation(ktor("serialization-kotlinx-json")) + } + } + + commonTest { + dependencies { implementation(kotlin("test-common")) implementation(kotlin("test-annotations-common")) implementation("org.kodein.memory:klio-files:0.12.0") @@ -123,14 +123,19 @@ kotlin { } if (currentOs.isMacOsX) { - iosTest { + iosMain { dependencies { implementation(ktor("client-ios")) } } + macosMain { + dependencies { + implementation(ktor("client-darwin")) + } + } } - linuxTest { + linuxMain { dependencies { implementation(ktor("client-curl")) } diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClient.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClient.kt new file mode 100644 index 000000000..721fd28ab --- /dev/null +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClient.kt @@ -0,0 +1,126 @@ +package fr.acinq.lightning.blockchain.mempool + +import fr.acinq.bitcoin.Transaction +import fr.acinq.bitcoin.TxId +import fr.acinq.lightning.blockchain.IClient +import fr.acinq.lightning.blockchain.fee.FeeratePerKw +import fr.acinq.lightning.logging.LoggerFactory +import fr.acinq.lightning.logging.debug +import fr.acinq.lightning.logging.warning +import io.ktor.client.* +import io.ktor.client.call.* +import io.ktor.client.plugins.* +import io.ktor.client.plugins.contentnegotiation.* +import io.ktor.client.request.* +import io.ktor.client.statement.* +import io.ktor.http.* +import io.ktor.serialization.kotlinx.json.* +import kotlinx.serialization.ExperimentalSerializationApi +import kotlinx.serialization.Serializable +import kotlinx.serialization.json.Json + +class MempoolSpaceClient(val mempoolUrl: Url, loggerFactory: LoggerFactory) : IClient { + + private val logger = loggerFactory.newLogger(this::class) + + @OptIn(ExperimentalSerializationApi::class) + val client = HttpClient { + install(ContentNegotiation) { + json(json = Json { + prettyPrint = true + isLenient = true + explicitNulls = false // convert absent fields to null + ignoreUnknownKeys = true + }) + } + install(DefaultRequest) { + url { + takeFrom(mempoolUrl) + } + } + } + + suspend fun publish(tx: Transaction) { + val res = client.post("api/tx") { + contentType(ContentType.Text.Plain) + setBody(tx.toString()) + } + if (!res.status.isSuccess()) { + logger.warning { "tx publish failed for txid=${tx.txid}: ${res.bodyAsText()}" } + } + } + + suspend fun getTransaction(txId: TxId): Transaction? { + return kotlin.runCatching { + val res = client.get("api/tx/$txId/hex") + if (res.status.isSuccess()) { + Transaction.read(res.bodyAsText()) + } else null + }.onFailure { logger.warning(it) { "error in getTransaction " } } + .getOrNull() + } + + suspend fun getTransactionMerkleProof(txId: TxId): MempoolSpaceTransactionMerkleProofResponse? { + return kotlin.runCatching { + val res = client.get("api/tx/$txId/merkle-proof") + if (res.status.isSuccess()) { + val txStatus: MempoolSpaceTransactionMerkleProofResponse = res.body() + txStatus + } else null + }.onFailure { logger.warning(it) { "error in getTransactionMerkleProof " } } + .getOrNull() + } + + suspend fun getOutspend(txId: TxId, outputIndex: Int): Transaction? { + return kotlin.runCatching { + logger.debug { "checking output $txId:$outputIndex" } + val res: MempoolSpaceOutspendResponse = client.get("api/tx/$txId/outspend/$outputIndex").body() + res.txid?.let { getTransaction(TxId(it)) } + }.onFailure { logger.warning(it) { "error in getOutspend " } } + .getOrNull() + } + + suspend fun getBlockTipHeight(): Int? { + return kotlin.runCatching { + val res = client.get("api/blocks/tip/height") + if (res.status.isSuccess()) { + res.bodyAsText().toInt() + } else null + }.onFailure { logger.warning(it) { "error in getBlockTipHeight " } } + .getOrNull() + } + + override suspend fun getConfirmations(txId: TxId): Int? { + return kotlin.runCatching { + val confirmedAtBlockHeight = getTransactionMerkleProof(txId)?.block_height + val currentBlockHeight = getBlockTipHeight() + when { + confirmedAtBlockHeight != null && currentBlockHeight != null -> currentBlockHeight - confirmedAtBlockHeight + 1 + else -> null + } + }.onFailure { logger.warning(it) { "error in getConfirmations" } } + .getOrNull() + } + + override suspend fun estimateFees(confirmations: Int): FeeratePerKw? { + TODO("Not yet implemented") + } + + companion object { + val OfficialMempoolMainnet = Url("https://mempool.space") + val OfficialMempoolTestnet = Url("https://mempool.space/testnet/") + } +} + +@Serializable +data class MempoolSpaceOutspendResponse( + val spent: Boolean, + val txid: String?, + val vin: Int?, +) + +@Serializable +data class MempoolSpaceTransactionMerkleProofResponse( + val block_height: Int, + val pos: Int +) \ No newline at end of file diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceWatcher.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceWatcher.kt new file mode 100644 index 000000000..7652361b2 --- /dev/null +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceWatcher.kt @@ -0,0 +1,99 @@ +package fr.acinq.lightning.blockchain.mempool + +import fr.acinq.bitcoin.Transaction +import fr.acinq.lightning.blockchain.* +import fr.acinq.lightning.logging.LoggerFactory +import fr.acinq.lightning.logging.debug +import fr.acinq.lightning.logging.info +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.consumeAsFlow +import kotlinx.coroutines.launch +import kotlin.time.Duration +import kotlin.time.Duration.Companion.minutes + +class MempoolSpaceWatcher(val client: MempoolSpaceClient, val scope: CoroutineScope, loggerFactory: LoggerFactory, val pollingInterval: Duration = 10.minutes) : IWatcher { + + private val logger = loggerFactory.newLogger(this::class) + private val mailbox = Channel(Channel.BUFFERED) + + private val _notificationsFlow = MutableSharedFlow(replay = 0, extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.SUSPEND) + override fun openWatchNotificationsFlow(): Flow = _notificationsFlow.asSharedFlow() + + + init { + scope.launch { + mailbox.consumeAsFlow().collect { watch -> + when (watch) { + is WatchSpent -> scope.launch { + logger.info { "add watch-spent on ${watch.txId}:${watch.outputIndex}" } + val spendingTxs = mutableSetOf() + while (true) { + when (val spendingTx = client.getOutspend(watch.txId, watch.outputIndex)) { + null -> delay(pollingInterval) + else -> { + // There may be multiple txs spending the same outpoint, due to RBFs, etc. We notify + // each of them once. + if (!spendingTxs.contains(spendingTx)) { + logger.info { "${watch.txId}:${watch.outputIndex} was spent by txId=${spendingTx.txid}" } + _notificationsFlow.emit(WatchEventSpent(watch.channelId, watch.event, spendingTx)) + spendingTxs.add(spendingTx) + } + // We keep watching for spending transactions until one of them confirms + if ((client.getConfirmations(spendingTx.txid) ?: 0) > 3) { + logger.info { "transaction txId=${spendingTx.txid} spending ${watch.txId}:${watch.outputIndex} has confirmed" } + break + } + } + } + } + logger.debug { "terminating watch-spent on ${watch.txId}:${watch.outputIndex}" } + } + is WatchConfirmed -> scope.launch { + logger.info { "add watch-confirmed on ${watch.txId}" } + while (true) { + val merkleProof = client.getTransactionMerkleProof(watch.txId) + val currentBlockHeight = client.getBlockTipHeight() + when { + merkleProof == null || currentBlockHeight == null -> {} + else -> { + val confirmations = (currentBlockHeight - merkleProof.block_height + 1) + logger.info { "${watch.txId} has $confirmations/${watch.minDepth} confirmations" } + when { + confirmations < watch.minDepth -> {} + else -> { + val tx = client.getTransaction(watch.txId) + when { + tx == null -> {} + else -> { + logger.info { "${watch.txId} has reached min depth" } + _notificationsFlow.emit(WatchEventConfirmed(watch.channelId, watch.event, merkleProof.block_height, merkleProof.pos, tx)) + break + } + } + } + } + } + } + delay(pollingInterval) + } + logger.debug { "terminating watch-confirmed on ${watch.txId}" } + } + } + } + } + } + + override suspend fun watch(watch: Watch) { + mailbox.send(watch) + } + + override suspend fun publish(tx: Transaction) { + client.publish(tx) + } +} \ No newline at end of file diff --git a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt index dc54788ee..117a5190f 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt @@ -12,6 +12,7 @@ import fr.acinq.lightning.blockchain.electrum.* import fr.acinq.lightning.blockchain.fee.FeeratePerByte import fr.acinq.lightning.blockchain.fee.FeeratePerKw import fr.acinq.lightning.blockchain.fee.OnChainFeerates +import fr.acinq.lightning.blockchain.mempool.MempoolSpaceClient import fr.acinq.lightning.channel.* import fr.acinq.lightning.channel.states.* import fr.acinq.lightning.crypto.noise.* @@ -34,6 +35,7 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED import kotlinx.coroutines.channels.onFailure import kotlinx.coroutines.flow.* import kotlin.time.Duration +import kotlin.time.Duration.Companion.minutes import kotlin.time.Duration.Companion.seconds sealed class PeerCommand @@ -96,6 +98,7 @@ data class PurgeExpiredPayments(val fromCreatedAt: Long, val toCreatedAt: Long) data class SendOnionMessage(val message: OnionMessage) : PeerCommand() sealed class PeerEvent + @Deprecated("Replaced by NodeEvents", replaceWith = ReplaceWith("PaymentEvents.PaymentReceived", "fr.acinq.lightning.PaymentEvents")) data class PaymentReceived(val incomingPayment: IncomingPayment, val received: IncomingPayment.Received) : PeerEvent() data class PaymentProgress(val request: SendPayment, val fees: MilliSatoshi) : PeerEvent() @@ -230,22 +233,35 @@ class Peer( init { logger.info { "initializing peer" } launch { - when(client) { + when (client) { is IElectrumClient -> client.notifications.filterIsInstance() .collect { msg -> currentTipFlow.value = msg.blockHeight to msg.header } + is MempoolSpaceClient -> while (true) { + runCatching { + client.getBlockTipHeight()?.let { currentBlockHeight -> + logger.debug { "current block height is $currentBlockHeight"} + currentTipFlow.value = currentBlockHeight to BlockHeader(0, BlockHash(ByteVector32.Zeroes), ByteVector32.Zeroes, 0, 0, 0) + } + } + delay(1.minutes) + } } } launch { - when(client) { + when (client) { is IElectrumClient -> client.connectionStatus.filter { it is ElectrumConnectionStatus.Connected }.collect { // onchain fees are retrieved punctually, when electrum status moves to Connection.ESTABLISHED // since the application is not running most of the time, and when it is, it will be only for a few minutes, this is good enough. // (for a node that is online most of the time things would be different and we would need to re-evaluate onchain fee estimates on a regular basis) updateEstimateFees() } + is MempoolSpaceClient -> while (true) { + updateEstimateFees() + delay(3.minutes) + } } } launch { @@ -292,20 +308,15 @@ class Peer( } private suspend fun updateEstimateFees() { - val sortedFees = listOf( - client.estimateFees(2), - client.estimateFees(6), - client.estimateFees(18), - client.estimateFees(144), - ) - logger.info { "on-chain fees: $sortedFees" } // TODO: If some feerates are null, we may implement a retry - onChainFeeratesFlow.value = OnChainFeerates( - fundingFeerate = sortedFees[3] ?: FeeratePerKw(FeeratePerByte(2.sat)), - mutualCloseFeerate = sortedFees[2] ?: FeeratePerKw(FeeratePerByte(10.sat)), - claimMainFeerate = sortedFees[1] ?: FeeratePerKw(FeeratePerByte(20.sat)), - fastFeerate = sortedFees[0] ?: FeeratePerKw(FeeratePerByte(50.sat)) + val onChainFeerates = OnChainFeerates( + fundingFeerate = (client.estimateFees(144) ?: onChainFeeratesFlow.value?.fundingFeerate) ?: FeeratePerKw(FeeratePerByte(2.sat)), + mutualCloseFeerate = (client.estimateFees(18) ?: onChainFeeratesFlow.value?.mutualCloseFeerate) ?: FeeratePerKw(FeeratePerByte(10.sat)), + claimMainFeerate = (client.estimateFees(6) ?: onChainFeeratesFlow.value?.claimMainFeerate) ?: FeeratePerKw(FeeratePerByte(20.sat)), + fastFeerate = (client.estimateFees(2) ?: onChainFeeratesFlow.value?.fastFeerate) ?: FeeratePerKw(FeeratePerByte(50.sat)) ) + logger.info { "on-chain fees: $onChainFeerates" } + onChainFeeratesFlow.value = onChainFeerates } data class ConnectionJob(val job: Job, val socket: TcpSocket) { diff --git a/src/commonTest/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceWatcherTest.kt b/src/commonTest/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceWatcherTest.kt new file mode 100644 index 000000000..90e41d110 --- /dev/null +++ b/src/commonTest/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceWatcherTest.kt @@ -0,0 +1,68 @@ +package fr.acinq.lightning.blockchain.mempool + +import fr.acinq.bitcoin.ByteVector +import fr.acinq.bitcoin.Transaction +import fr.acinq.bitcoin.TxId +import fr.acinq.lightning.Lightning.randomBytes32 +import fr.acinq.lightning.blockchain.* +import fr.acinq.lightning.blockchain.mempool.MempoolSpaceClient.Companion.OfficialMempoolTestnet +import fr.acinq.lightning.tests.utils.LightningTestSuite +import fr.acinq.lightning.tests.utils.runSuspendTest +import fr.acinq.lightning.tests.utils.testLoggerFactory +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.first +import kotlin.test.Test +import kotlin.test.assertEquals +import kotlin.test.assertIs +import kotlin.time.Duration.Companion.seconds + +class MempoolSpaceWatcherTest : LightningTestSuite() { + + @Test + fun `publish a transaction`() = runSuspendTest { + val client = MempoolSpaceClient(mempoolUrl = OfficialMempoolTestnet, testLoggerFactory) + val watcher = MempoolSpaceWatcher(client, scope = this, testLoggerFactory) + val tx = + Transaction.read("0200000000010146c398e70cceaf9d8f734e603bc53e4c4c0605ab46cb1b5807a62c90f5aed50d0100000000feffffff023c0fc10c010000001600145033f65b590f2065fe55414213f1d25ab20b6c4f487d1700000000001600144b812d5ef41fc433654d186463d41b458821ff740247304402202438dc18801919baa64eb18f7e925ab6acdedc3751ea58ea164a26723b79fd39022060b46c1d277714c640cdc8512c36c862ffc646e7ff62438ef5cc847a5990bbf801210247b49d9e6b0089a1663668829e573c629c936eb430c043af9634aa57cf97a33cbee81f00") + watcher.publish(tx) + } + + @Test + fun `watch-spent on a transaction`() = runSuspendTest { + val client = MempoolSpaceClient(mempoolUrl = OfficialMempoolTestnet, testLoggerFactory) + val watcher = MempoolSpaceWatcher(client, scope = this, testLoggerFactory) + + val notifications = watcher.openWatchNotificationsFlow() + + val watch = WatchSpent( + channelId = randomBytes32(), + txId = TxId("b97167ea09da62daaa1d3198460fc4c204a553cb3e5c80ab48f5b75a870f15c5"), + outputIndex = 0, + publicKeyScript = ByteVector.empty, + event = BITCOIN_FUNDING_SPENT + ) + watcher.watch(watch) + val event = assertIs(notifications.first()) + assertEquals(TxId("5693d68997abfacb65cc7e6019e8ed2edb3f9f2260ae8d221e8726b8fe870ae0"), event.tx.txid) + delay(2.seconds) + } + + @Test + fun `watch-confirmed on a transaction`() = runSuspendTest { + val client = MempoolSpaceClient(mempoolUrl = OfficialMempoolTestnet, testLoggerFactory) + val watcher = MempoolSpaceWatcher(client, scope = this, testLoggerFactory) + + val notifications = watcher.openWatchNotificationsFlow() + + val watch = WatchConfirmed( + channelId = randomBytes32(), + txId = TxId("5693d68997abfacb65cc7e6019e8ed2edb3f9f2260ae8d221e8726b8fe870ae0"), + publicKeyScript = ByteVector.empty, + event = BITCOIN_FUNDING_DEPTHOK, + minDepth = 5 + ) + watcher.watch(watch) + val event = assertIs(notifications.first()) + assertEquals(TxId("5693d68997abfacb65cc7e6019e8ed2edb3f9f2260ae8d221e8726b8fe870ae0"), event.tx.txid) + } +} \ No newline at end of file From 0724e56a08c59785bbc4bde0aa80437aa1bec919 Mon Sep 17 00:00:00 2001 From: pm47 Date: Tue, 19 Mar 2024 21:39:12 +0100 Subject: [PATCH 3/9] refactor feerates logic We use slow/medium/fast/fastest as a standard for feerates, instead of explicit block targets. Note: electrum and mempool.space backends have a different logic for updating feerates. --- .../fr/acinq/lightning/blockchain/IClient.kt | 11 +++++++- .../blockchain/electrum/ElectrumClient.kt | 5 +++- .../blockchain/electrum/IElectrumClient.kt | 14 +++++++++- .../lightning/blockchain/fee/FeeEstimator.kt | 10 ++++++- .../blockchain/mempool/MempoolSpaceClient.kt | 27 ++++++++++++++++--- .../kotlin/fr/acinq/lightning/io/Peer.kt | 27 +++++++------------ .../mempool/MempoolSpaceClientTest.kt | 26 ++++++++++++++++++ 7 files changed, 96 insertions(+), 24 deletions(-) create mode 100644 src/commonTest/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClientTest.kt diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/IClient.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/IClient.kt index c33424539..b01d29405 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/IClient.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/IClient.kt @@ -2,6 +2,7 @@ package fr.acinq.lightning.blockchain import fr.acinq.bitcoin.Satoshi import fr.acinq.bitcoin.TxId +import fr.acinq.lightning.blockchain.fee.FeeratePerByte import fr.acinq.lightning.blockchain.fee.FeeratePerKw import fr.acinq.lightning.channel.Commitments import fr.acinq.lightning.channel.LocalFundingStatus @@ -13,9 +14,17 @@ interface IClient { suspend fun getConfirmations(txId: TxId): Int? /** Estimate the feerate required for a transaction to be confirmed in the next [confirmations] blocks. */ - suspend fun estimateFees(confirmations: Int): FeeratePerKw? + suspend fun getFeerates(): Feerates? } +data class Feerates( + val minimum: FeeratePerByte, + val slow: FeeratePerByte, + val medium: FeeratePerByte, + val fast: FeeratePerByte, + val fastest: FeeratePerByte +) + /** * @weight must be the total estimated weight of the splice tx, otherwise the feerate estimation will be wrong */ diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt index fb846b19b..5930b54d6 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt @@ -5,7 +5,10 @@ import fr.acinq.bitcoin.utils.Either import fr.acinq.lightning.blockchain.fee.FeeratePerKw import fr.acinq.lightning.io.TcpSocket import fr.acinq.lightning.io.send -import fr.acinq.lightning.logging.* +import fr.acinq.lightning.logging.LoggerFactory +import fr.acinq.lightning.logging.debug +import fr.acinq.lightning.logging.info +import fr.acinq.lightning.logging.warning import fr.acinq.lightning.utils.* import kotlinx.coroutines.* import kotlinx.coroutines.channels.BufferOverflow diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/IElectrumClient.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/IElectrumClient.kt index 72d022068..713339daa 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/IElectrumClient.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/IElectrumClient.kt @@ -4,7 +4,9 @@ import fr.acinq.bitcoin.BlockHeader import fr.acinq.bitcoin.ByteVector32 import fr.acinq.bitcoin.Transaction import fr.acinq.bitcoin.TxId +import fr.acinq.lightning.blockchain.Feerates import fr.acinq.lightning.blockchain.IClient +import fr.acinq.lightning.blockchain.fee.FeeratePerByte import fr.acinq.lightning.blockchain.fee.FeeratePerKw import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.StateFlow @@ -39,7 +41,7 @@ interface IElectrumClient : IClient { suspend fun broadcastTransaction(tx: Transaction): TxId /** Estimate the feerate required for a transaction to be confirmed in the next [confirmations] blocks. */ - override suspend fun estimateFees(confirmations: Int): FeeratePerKw? + suspend fun estimateFees(confirmations: Int): FeeratePerKw? /******************** Subscriptions ********************/ @@ -50,4 +52,14 @@ interface IElectrumClient : IClient { suspend fun startHeaderSubscription(): HeaderSubscriptionResponse override suspend fun getConfirmations(txId: TxId): Int? = getTx(txId)?.let { tx -> getConfirmations(tx) } + + override suspend fun getFeerates(): Feerates? { + return Feerates( + minimum = estimateFees(144)?.let { FeeratePerByte(it) } ?: return null, + slow = estimateFees(18)?.let { FeeratePerByte(it) } ?: return null, + medium = estimateFees(6)?.let { FeeratePerByte(it) } ?: return null, + fast = estimateFees(2)?.let { FeeratePerByte(it) } ?: return null, + fastest = estimateFees(1)?.let { FeeratePerByte(it) } ?: return null, + ) + } } \ No newline at end of file diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/fee/FeeEstimator.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/fee/FeeEstimator.kt index 51da51594..5821bd53d 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/fee/FeeEstimator.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/fee/FeeEstimator.kt @@ -1,6 +1,7 @@ package fr.acinq.lightning.blockchain.fee import fr.acinq.bitcoin.Satoshi +import fr.acinq.lightning.blockchain.Feerates import fr.acinq.lightning.utils.sat interface FeeEstimator { @@ -15,7 +16,14 @@ interface FeeEstimator { * @param claimMainFeerate feerate used to claim our main output when a channel is force-closed (typically configured by the user, based on their preference). * @param fastFeerate feerate used to claim outputs quickly to avoid loss of funds: this one should not be set by the user (we should look at current on-chain fees). */ -data class OnChainFeerates(val fundingFeerate: FeeratePerKw, val mutualCloseFeerate: FeeratePerKw, val claimMainFeerate: FeeratePerKw, val fastFeerate: FeeratePerKw) +data class OnChainFeerates(val fundingFeerate: FeeratePerKw, val mutualCloseFeerate: FeeratePerKw, val claimMainFeerate: FeeratePerKw, val fastFeerate: FeeratePerKw) { + constructor(feerates: Feerates) : this( + fundingFeerate = FeeratePerKw(feerates.slow), + mutualCloseFeerate = FeeratePerKw(feerates.medium), + claimMainFeerate = FeeratePerKw(feerates.medium), + fastFeerate = FeeratePerKw(feerates.fast), + ) +} data class FeerateTolerance(val ratioLow: Double, val ratioHigh: Double) diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClient.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClient.kt index 721fd28ab..8a341fb96 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClient.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClient.kt @@ -2,11 +2,13 @@ package fr.acinq.lightning.blockchain.mempool import fr.acinq.bitcoin.Transaction import fr.acinq.bitcoin.TxId +import fr.acinq.lightning.blockchain.Feerates import fr.acinq.lightning.blockchain.IClient -import fr.acinq.lightning.blockchain.fee.FeeratePerKw +import fr.acinq.lightning.blockchain.fee.FeeratePerByte import fr.acinq.lightning.logging.LoggerFactory import fr.acinq.lightning.logging.debug import fr.acinq.lightning.logging.warning +import fr.acinq.lightning.utils.sat import io.ktor.client.* import io.ktor.client.call.* import io.ktor.client.plugins.* @@ -102,8 +104,18 @@ class MempoolSpaceClient(val mempoolUrl: Url, loggerFactory: LoggerFactory) : IC .getOrNull() } - override suspend fun estimateFees(confirmations: Int): FeeratePerKw? { - TODO("Not yet implemented") + override suspend fun getFeerates(): Feerates? { + return kotlin.runCatching { + val res: MempoolSpaceRecommendedFeerates = client.get("api/v1/fees/recommended").body() + Feerates( + minimum = FeeratePerByte(res.minimumFee.sat), + slow = FeeratePerByte(res.economyFee.sat), + medium = FeeratePerByte(res.hourFee.sat), + fast = FeeratePerByte(res.halfHourFee.sat), + fastest = FeeratePerByte(res.fastestFee.sat), + ) + }.onFailure { logger.warning(it) { "error in getFeerates " } } + .getOrNull() } companion object { @@ -123,4 +135,13 @@ data class MempoolSpaceOutspendResponse( data class MempoolSpaceTransactionMerkleProofResponse( val block_height: Int, val pos: Int +) + +@Serializable +data class MempoolSpaceRecommendedFeerates( + val fastestFee: Int, + val halfHourFee: Int, + val hourFee: Int, + val economyFee: Int, + val minimumFee: Int ) \ No newline at end of file diff --git a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt index 117a5190f..c0b398bfc 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt @@ -5,11 +5,9 @@ import fr.acinq.bitcoin.utils.Either import fr.acinq.lightning.* import fr.acinq.lightning.blockchain.IClient import fr.acinq.lightning.blockchain.IWatcher -import fr.acinq.lightning.blockchain.IClient.* import fr.acinq.lightning.blockchain.WatchEvent import fr.acinq.lightning.blockchain.computeSpliceCpfpFeerate import fr.acinq.lightning.blockchain.electrum.* -import fr.acinq.lightning.blockchain.fee.FeeratePerByte import fr.acinq.lightning.blockchain.fee.FeeratePerKw import fr.acinq.lightning.blockchain.fee.OnChainFeerates import fr.acinq.lightning.blockchain.mempool.MempoolSpaceClient @@ -241,7 +239,7 @@ class Peer( is MempoolSpaceClient -> while (true) { runCatching { client.getBlockTipHeight()?.let { currentBlockHeight -> - logger.debug { "current block height is $currentBlockHeight"} + logger.debug { "current block height is $currentBlockHeight" } currentTipFlow.value = currentBlockHeight to BlockHeader(0, BlockHash(ByteVector32.Zeroes), ByteVector32.Zeroes, 0, 0, 0) } } @@ -251,15 +249,22 @@ class Peer( } launch { + suspend fun updateFeerates() { + client.getFeerates()?.let { feerates -> + logger.info { "on-chain fees: $feerates" } + onChainFeeratesFlow.value = OnChainFeerates(feerates) + } ?: logger.error { "cannot retrieve feerates!" } + } + when (client) { is IElectrumClient -> client.connectionStatus.filter { it is ElectrumConnectionStatus.Connected }.collect { // onchain fees are retrieved punctually, when electrum status moves to Connection.ESTABLISHED // since the application is not running most of the time, and when it is, it will be only for a few minutes, this is good enough. // (for a node that is online most of the time things would be different and we would need to re-evaluate onchain fee estimates on a regular basis) - updateEstimateFees() + updateFeerates() } is MempoolSpaceClient -> while (true) { - updateEstimateFees() + updateFeerates() delay(3.minutes) } } @@ -307,18 +312,6 @@ class Peer( } } - private suspend fun updateEstimateFees() { - // TODO: If some feerates are null, we may implement a retry - val onChainFeerates = OnChainFeerates( - fundingFeerate = (client.estimateFees(144) ?: onChainFeeratesFlow.value?.fundingFeerate) ?: FeeratePerKw(FeeratePerByte(2.sat)), - mutualCloseFeerate = (client.estimateFees(18) ?: onChainFeeratesFlow.value?.mutualCloseFeerate) ?: FeeratePerKw(FeeratePerByte(10.sat)), - claimMainFeerate = (client.estimateFees(6) ?: onChainFeeratesFlow.value?.claimMainFeerate) ?: FeeratePerKw(FeeratePerByte(20.sat)), - fastFeerate = (client.estimateFees(2) ?: onChainFeeratesFlow.value?.fastFeerate) ?: FeeratePerKw(FeeratePerByte(50.sat)) - ) - logger.info { "on-chain fees: $onChainFeerates" } - onChainFeeratesFlow.value = onChainFeerates - } - data class ConnectionJob(val job: Job, val socket: TcpSocket) { fun cancel() { job.cancel() diff --git a/src/commonTest/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClientTest.kt b/src/commonTest/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClientTest.kt new file mode 100644 index 000000000..f42d3a299 --- /dev/null +++ b/src/commonTest/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClientTest.kt @@ -0,0 +1,26 @@ +package fr.acinq.lightning.blockchain.mempool + +import fr.acinq.lightning.blockchain.mempool.MempoolSpaceClient.Companion.OfficialMempoolMainnet +import fr.acinq.lightning.blockchain.mempool.MempoolSpaceClient.Companion.OfficialMempoolTestnet +import fr.acinq.lightning.tests.utils.LightningTestSuite +import fr.acinq.lightning.tests.utils.runSuspendTest +import fr.acinq.lightning.tests.utils.testLoggerFactory +import kotlin.test.Test +import kotlin.test.assertNotNull + +class MempoolSpaceClientTest : LightningTestSuite() { + + @Test + fun `retrieve feerates -- testnet`() = runSuspendTest { + val client = MempoolSpaceClient(mempoolUrl = OfficialMempoolTestnet, testLoggerFactory) + val feerates = client.getFeerates() + assertNotNull(feerates) + } + + @Test + fun `retrieve feerates -- mainnet`() = runSuspendTest { + val client = MempoolSpaceClient(mempoolUrl = OfficialMempoolMainnet, testLoggerFactory) + val feerates = client.getFeerates() + assertNotNull(feerates) + } +} \ No newline at end of file From 12d23104ddab86059e616d6b5f019ea2800ccdff Mon Sep 17 00:00:00 2001 From: pm47 Date: Fri, 22 Mar 2024 17:01:42 +0100 Subject: [PATCH 4/9] add more mempool tests --- .../mempool/MempoolSpaceClientTest.kt | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/src/commonTest/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClientTest.kt b/src/commonTest/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClientTest.kt index f42d3a299..92f18cda5 100644 --- a/src/commonTest/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClientTest.kt +++ b/src/commonTest/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClientTest.kt @@ -1,12 +1,15 @@ package fr.acinq.lightning.blockchain.mempool +import fr.acinq.bitcoin.TxId import fr.acinq.lightning.blockchain.mempool.MempoolSpaceClient.Companion.OfficialMempoolMainnet import fr.acinq.lightning.blockchain.mempool.MempoolSpaceClient.Companion.OfficialMempoolTestnet import fr.acinq.lightning.tests.utils.LightningTestSuite import fr.acinq.lightning.tests.utils.runSuspendTest import fr.acinq.lightning.tests.utils.testLoggerFactory import kotlin.test.Test +import kotlin.test.assertEquals import kotlin.test.assertNotNull +import kotlin.test.assertTrue class MempoolSpaceClientTest : LightningTestSuite() { @@ -23,4 +26,52 @@ class MempoolSpaceClientTest : LightningTestSuite() { val feerates = client.getFeerates() assertNotNull(feerates) } + + @Test + fun `get tx -- testnet`() = runSuspendTest { + val client = MempoolSpaceClient(mempoolUrl = OfficialMempoolTestnet, testLoggerFactory) + val res = client.getTransaction(TxId("5693d68997abfacb65cc7e6019e8ed2edb3f9f2260ae8d221e8726b8fe870ae0")) + assertNotNull(res) + assertEquals(TxId("5693d68997abfacb65cc7e6019e8ed2edb3f9f2260ae8d221e8726b8fe870ae0"), res.txid) + } + + @Test + fun `get tx -- mainnet`() = runSuspendTest { + val client = MempoolSpaceClient(mempoolUrl = OfficialMempoolMainnet, testLoggerFactory) + val res = client.getTransaction(TxId("8a391488ed266191e3243ea7ac55d358080a3434865b14994747f6cdca38b640")) + assertNotNull(res) + assertEquals(TxId("8a391488ed266191e3243ea7ac55d358080a3434865b14994747f6cdca38b640"), res.txid) + } + + @Test + fun `get tx confirmations -- testnet`() = runSuspendTest { + val client = MempoolSpaceClient(mempoolUrl = OfficialMempoolTestnet, testLoggerFactory) + val res = client.getConfirmations(TxId("5693d68997abfacb65cc7e6019e8ed2edb3f9f2260ae8d221e8726b8fe870ae0")) + assertNotNull(res) + assertTrue(res > 0) + } + + @Test + fun `get tx confirmations -- mainnet`() = runSuspendTest { + val client = MempoolSpaceClient(mempoolUrl = OfficialMempoolMainnet, testLoggerFactory) + val res = client.getConfirmations(TxId("8a391488ed266191e3243ea7ac55d358080a3434865b14994747f6cdca38b640")) + assertNotNull(res) + assertTrue(res > 0) + } + + @Test + fun `get spending tx -- testnet`() = runSuspendTest { + val client = MempoolSpaceClient(mempoolUrl = OfficialMempoolTestnet, testLoggerFactory) + val res = client.getOutspend(TxId("b97167ea09da62daaa1d3198460fc4c204a553cb3e5c80ab48f5b75a870f15c5"), 0) + assertNotNull(res) + assertEquals(TxId("5693d68997abfacb65cc7e6019e8ed2edb3f9f2260ae8d221e8726b8fe870ae0"), res.txid) + } + + @Test + fun `get spending tx -- mainnet`() = runSuspendTest { + val client = MempoolSpaceClient(mempoolUrl = OfficialMempoolMainnet, testLoggerFactory) + val res = client.getOutspend(TxId("308c09d986000be7f05ba776b38204317bf928b70db65bf175af7f2036951649"), 3) + assertNotNull(res) + assertEquals(TxId("8a391488ed266191e3243ea7ac55d358080a3434865b14994747f6cdca38b640"), res.txid) + } } \ No newline at end of file From c4d0218ee9bcb965f92756e98941727a7eb731b9 Mon Sep 17 00:00:00 2001 From: pm47 Date: Mon, 3 Jun 2024 15:29:53 +0200 Subject: [PATCH 5/9] review nits --- .../fr/acinq/lightning/blockchain/IClient.kt | 1 - .../electrum/ElectrumClientExtensions.kt | 20 ------------------- .../blockchain/electrum/IElectrumClient.kt | 16 +++++++++++++++ .../lightning/blockchain/fee/FeeEstimator.kt | 2 +- 4 files changed, 17 insertions(+), 22 deletions(-) delete mode 100644 src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientExtensions.kt diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/IClient.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/IClient.kt index b01d29405..71d76fbb0 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/IClient.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/IClient.kt @@ -13,7 +13,6 @@ import fr.acinq.lightning.utils.sat interface IClient { suspend fun getConfirmations(txId: TxId): Int? - /** Estimate the feerate required for a transaction to be confirmed in the next [confirmations] blocks. */ suspend fun getFeerates(): Feerates? } diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientExtensions.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientExtensions.kt deleted file mode 100644 index 377acaad6..000000000 --- a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientExtensions.kt +++ /dev/null @@ -1,20 +0,0 @@ -package fr.acinq.lightning.blockchain.electrum - -import fr.acinq.bitcoin.Transaction - -/** - * @return the number of confirmations, zero if the transaction is in the mempool, null if the transaction is not found - */ -suspend fun IElectrumClient.getConfirmations(tx: Transaction): Int? { - return when (val status = connectionStatus.value) { - is ElectrumConnectionStatus.Connected -> { - val currentBlockHeight = status.height - val scriptHash = ElectrumClient.computeScriptHash(tx.txOut.first().publicKeyScript) - val scriptHashHistory = getScriptHashHistory(scriptHash) - val item = scriptHashHistory.find { it.txid == tx.txid } - item?.let { if (item.blockHeight > 0) currentBlockHeight - item.blockHeight + 1 else 0 } - } - else -> null - } -} - diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/IElectrumClient.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/IElectrumClient.kt index 713339daa..4f5991044 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/IElectrumClient.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/IElectrumClient.kt @@ -51,6 +51,22 @@ interface IElectrumClient : IClient { /** Subscribe to headers for new blocks found. */ suspend fun startHeaderSubscription(): HeaderSubscriptionResponse + /** + * @return the number of confirmations, zero if the transaction is in the mempool, null if the transaction is not found + */ + suspend fun getConfirmations(tx: Transaction): Int? { + return when (val status = connectionStatus.value) { + is ElectrumConnectionStatus.Connected -> { + val currentBlockHeight = status.height + val scriptHash = ElectrumClient.computeScriptHash(tx.txOut.first().publicKeyScript) + val scriptHashHistory = getScriptHashHistory(scriptHash) + val item = scriptHashHistory.find { it.txid == tx.txid } + item?.let { if (item.blockHeight > 0) currentBlockHeight - item.blockHeight + 1 else 0 } + } + else -> null + } + } + override suspend fun getConfirmations(txId: TxId): Int? = getTx(txId)?.let { tx -> getConfirmations(tx) } override suspend fun getFeerates(): Feerates? { diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/fee/FeeEstimator.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/fee/FeeEstimator.kt index 5821bd53d..a8165a629 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/fee/FeeEstimator.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/fee/FeeEstimator.kt @@ -18,7 +18,7 @@ interface FeeEstimator { */ data class OnChainFeerates(val fundingFeerate: FeeratePerKw, val mutualCloseFeerate: FeeratePerKw, val claimMainFeerate: FeeratePerKw, val fastFeerate: FeeratePerKw) { constructor(feerates: Feerates) : this( - fundingFeerate = FeeratePerKw(feerates.slow), + fundingFeerate = FeeratePerKw(feerates.medium), mutualCloseFeerate = FeeratePerKw(feerates.medium), claimMainFeerate = FeeratePerKw(feerates.medium), fastFeerate = FeeratePerKw(feerates.fast), From 80a8e9f798af1847f6d27127b3ca1433a97bc617 Mon Sep 17 00:00:00 2001 From: pm47 Date: Mon, 3 Jun 2024 15:46:04 +0200 Subject: [PATCH 6/9] factor calls to mempool.space api --- .../blockchain/mempool/MempoolSpaceClient.kt | 103 +++++++++--------- 1 file changed, 49 insertions(+), 54 deletions(-) diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClient.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClient.kt index 8a341fb96..09db7c322 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClient.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceClient.kt @@ -52,73 +52,68 @@ class MempoolSpaceClient(val mempoolUrl: Url, loggerFactory: LoggerFactory) : IC } } - suspend fun getTransaction(txId: TxId): Transaction? { - return kotlin.runCatching { - val res = client.get("api/tx/$txId/hex") - if (res.status.isSuccess()) { - Transaction.read(res.bodyAsText()) - } else null - }.onFailure { logger.warning(it) { "error in getTransaction " } } + /** Helper method to factor error handling and logging for api calls. */ + private suspend fun tryWithLogs(f: suspend () -> T?): T? { + return kotlin.runCatching { f() } + .onFailure { logger.warning(it) { "mempool.space api error: " } } .getOrNull() } - suspend fun getTransactionMerkleProof(txId: TxId): MempoolSpaceTransactionMerkleProofResponse? { - return kotlin.runCatching { - val res = client.get("api/tx/$txId/merkle-proof") - if (res.status.isSuccess()) { - val txStatus: MempoolSpaceTransactionMerkleProofResponse = res.body() - txStatus - } else null - }.onFailure { logger.warning(it) { "error in getTransactionMerkleProof " } } - .getOrNull() + suspend fun getTransaction(txId: TxId): Transaction? = tryWithLogs { + val res = client.get("api/tx/$txId/hex") + if (res.status.isSuccess()) { + Transaction.read(res.bodyAsText()) + } else null } - suspend fun getOutspend(txId: TxId, outputIndex: Int): Transaction? { - return kotlin.runCatching { - logger.debug { "checking output $txId:$outputIndex" } - val res: MempoolSpaceOutspendResponse = client.get("api/tx/$txId/outspend/$outputIndex").body() - res.txid?.let { getTransaction(TxId(it)) } - }.onFailure { logger.warning(it) { "error in getOutspend " } } - .getOrNull() + /** + * Returns a merkle inclusion proof for the transaction using Electrum's blockchain.transaction.get_merkle + * format. + * */ + suspend fun getTransactionMerkleProof(txId: TxId): MempoolSpaceTransactionMerkleProofResponse? = tryWithLogs { + val res = client.get("api/tx/$txId/merkle-proof") + if (res.status.isSuccess()) { + val txStatus: MempoolSpaceTransactionMerkleProofResponse = res.body() + txStatus + } else null } - suspend fun getBlockTipHeight(): Int? { - return kotlin.runCatching { - val res = client.get("api/blocks/tip/height") - if (res.status.isSuccess()) { - res.bodyAsText().toInt() - } else null - }.onFailure { logger.warning(it) { "error in getBlockTipHeight " } } - .getOrNull() + /** Returns the spending status of a transaction output. */ + suspend fun getOutspend(txId: TxId, outputIndex: Int): Transaction? = tryWithLogs { + logger.debug { "checking output $txId:$outputIndex" } + val res: MempoolSpaceOutspendResponse = client.get("api/tx/$txId/outspend/$outputIndex").body() + res.txid?.let { getTransaction(TxId(it)) } } - override suspend fun getConfirmations(txId: TxId): Int? { - return kotlin.runCatching { - val confirmedAtBlockHeight = getTransactionMerkleProof(txId)?.block_height - val currentBlockHeight = getBlockTipHeight() - when { - confirmedAtBlockHeight != null && currentBlockHeight != null -> currentBlockHeight - confirmedAtBlockHeight + 1 - else -> null - } - }.onFailure { logger.warning(it) { "error in getConfirmations" } } - .getOrNull() + /** Returns the height of the last block. */ + suspend fun getBlockTipHeight(): Int? = tryWithLogs { + val res = client.get("api/blocks/tip/height") + if (res.status.isSuccess()) { + res.bodyAsText().toInt() + } else null } - override suspend fun getFeerates(): Feerates? { - return kotlin.runCatching { - val res: MempoolSpaceRecommendedFeerates = client.get("api/v1/fees/recommended").body() - Feerates( - minimum = FeeratePerByte(res.minimumFee.sat), - slow = FeeratePerByte(res.economyFee.sat), - medium = FeeratePerByte(res.hourFee.sat), - fast = FeeratePerByte(res.halfHourFee.sat), - fastest = FeeratePerByte(res.fastestFee.sat), - ) - }.onFailure { logger.warning(it) { "error in getFeerates " } } - .getOrNull() + override suspend fun getConfirmations(txId: TxId): Int? = tryWithLogs { + val confirmedAtBlockHeight = getTransactionMerkleProof(txId)?.block_height + val currentBlockHeight = getBlockTipHeight() + when { + confirmedAtBlockHeight != null && currentBlockHeight != null -> currentBlockHeight - confirmedAtBlockHeight + 1 + else -> null + } + } + + override suspend fun getFeerates(): Feerates? = tryWithLogs { + val res: MempoolSpaceRecommendedFeerates = client.get("api/v1/fees/recommended").body() + Feerates( + minimum = FeeratePerByte(res.minimumFee.sat), + slow = FeeratePerByte(res.economyFee.sat), + medium = FeeratePerByte(res.hourFee.sat), + fast = FeeratePerByte(res.halfHourFee.sat), + fastest = FeeratePerByte(res.fastestFee.sat), + ) } - companion object { + companion object { val OfficialMempoolMainnet = Url("https://mempool.space") val OfficialMempoolTestnet = Url("https://mempool.space/testnet/") } From 61a200ae1c852778f4413c2ff5e7de597182b84a Mon Sep 17 00:00:00 2001 From: pm47 Date: Mon, 3 Jun 2024 16:07:51 +0200 Subject: [PATCH 7/9] simpler way to disable tests on ios simulator --- build.gradle.kts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/build.gradle.kts b/build.gradle.kts index 43b29c96e..ab65bbdf1 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,4 +1,5 @@ import org.jetbrains.kotlin.gradle.plugin.mpp.KotlinNativeTarget +import org.jetbrains.kotlin.gradle.targets.native.tasks.KotlinNativeSimulatorTest import org.jetbrains.kotlin.gradle.targets.native.tasks.KotlinNativeTest plugins { @@ -311,6 +312,13 @@ tasks it.filter.excludeTestsMatching("*SwapInWalletTestsCommon") } +// Those tests do not work with the ios simulator +tasks + .filterIsInstance() + .map { + it.filter.excludeTestsMatching("*MempoolSpace*Test") + } + // Make NS_FORMAT_ARGUMENT(1) a no-op // This fixes an issue when building PhoenixCrypto using XCode 13 // More on this: https://youtrack.jetbrains.com/issue/KT-48807#focus=Comments-27-5210791.0-0 From a325366bf124e27e142ebe144ab6de6763af896a Mon Sep 17 00:00:00 2001 From: pm47 Date: Mon, 3 Jun 2024 16:35:22 +0200 Subject: [PATCH 8/9] remove header from block tip flow --- .../kotlin/fr/acinq/lightning/io/Peer.kt | 20 +++++++++---------- .../fr/acinq/lightning/io/peer/PeerTest.kt | 6 +++--- .../acinq/lightning/tests/io/peer/builders.kt | 2 +- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt index c0b398bfc..e730d8a7f 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt @@ -197,7 +197,7 @@ class Peer( private val ourInit = Init(features.initFeatures(), initTlvStream) private var theirInit: Init? = null - val currentTipFlow = MutableStateFlow?>(null) + val currentTipFlow = MutableStateFlow(null) val onChainFeeratesFlow = MutableStateFlow(null) val swapInFeeratesFlow = MutableStateFlow(null) @@ -206,7 +206,7 @@ class Peer( val state = this val ctx = ChannelContext( StaticParams(nodeParams, remoteNodeId), - currentTipFlow.filterNotNull().first().first, + currentTipFlow.filterNotNull().first(), onChainFeeratesFlow, logger = MDCLogger( logger = _channelLogger, @@ -234,13 +234,13 @@ class Peer( when (client) { is IElectrumClient -> client.notifications.filterIsInstance() .collect { msg -> - currentTipFlow.value = msg.blockHeight to msg.header + currentTipFlow.value = msg.blockHeight } is MempoolSpaceClient -> while (true) { runCatching { client.getBlockTipHeight()?.let { currentBlockHeight -> logger.debug { "current block height is $currentBlockHeight" } - currentTipFlow.value = currentBlockHeight to BlockHeader(0, BlockHash(ByteVector32.Zeroes), ByteVector32.Zeroes, 0, 0, 0) + currentTipFlow.value = currentBlockHeight } } delay(1.minutes) @@ -488,7 +488,7 @@ class Peer( waitForPeerReady() swapInJob = launch { swapInWallet.wallet.walletStateFlow - .combine(currentTipFlow.filterNotNull()) { walletState, currentTip -> Pair(walletState, currentTip.first) } + .combine(currentTipFlow.filterNotNull()) { walletState, currentTip -> Pair(walletState, currentTip) } .combine(swapInFeeratesFlow.filterNotNull()) { (walletState, currentTip), feerate -> Triple(walletState, currentTip, feerate) } .combine(nodeParams.liquidityPolicy) { (walletState, currentTip, feerate), policy -> TrySwapInFlow(currentTip, walletState, feerate, policy) } .collect { w -> @@ -635,7 +635,7 @@ class Peer( .filterIsInstance() .firstOrNull() ?.let { channel -> - val leaseStart = currentTipFlow.filterNotNull().first().first + val leaseStart = currentTipFlow.filterNotNull().first() val spliceCommand = ChannelCommand.Commitment.Splice.Request( replyTo = CompletableDeferred(), spliceIn = null, @@ -735,7 +735,7 @@ class Peer( is ChannelAction.ProcessCmdRes.AddSettledFail -> { val currentTip = currentTipFlow.filterNotNull().first() - when (val result = outgoingPaymentHandler.processAddSettled(actualChannelId, action, _channels, currentTip.first)) { + when (val result = outgoingPaymentHandler.processAddSettled(actualChannelId, action, _channels, currentTip)) { is OutgoingPaymentHandler.Progress -> { _eventsFlow.emit(PaymentProgress(result.request, result.fees)) result.actions.forEach { input.send(it) } @@ -857,7 +857,7 @@ class Peer( } private suspend fun processIncomingPayment(item: Either) { - val currentBlockHeight = currentTipFlow.filterNotNull().first().first + val currentBlockHeight = currentTipFlow.filterNotNull().first() val result = when (item) { is Either.Right -> incomingPaymentHandler.process(item.value, currentBlockHeight) is Either.Left -> incomingPaymentHandler.process(item.value, currentBlockHeight) @@ -1153,7 +1153,7 @@ class Peer( else -> null } } - offerManager.receiveMessage(msg, remoteChannelUpdates, currentTipFlow.filterNotNull().first().first)?.let { + offerManager.receiveMessage(msg, remoteChannelUpdates, currentTipFlow.filterNotNull().first())?.let { when (it) { is OnionMessageAction.PayInvoice -> input.send(PayInvoice(it.payOffer.paymentId, it.payOffer.amount, LightningOutgoingPayment.Details.Blinded(it.invoice, it.payOffer.payerKey), it.payOffer.trampolineFeesOverride)) is OnionMessageAction.SendMessage -> input.send(SendOnionMessage(it.message)) @@ -1256,7 +1256,7 @@ class Peer( } is PayInvoice -> { val currentTip = currentTipFlow.filterNotNull().first() - when (val result = outgoingPaymentHandler.sendPayment(cmd, _channels, currentTip.first)) { + when (val result = outgoingPaymentHandler.sendPayment(cmd, _channels, currentTip)) { is OutgoingPaymentHandler.Progress -> { _eventsFlow.emit(PaymentProgress(result.request, result.fees)) result.actions.forEach { input.send(it) } diff --git a/src/commonTest/kotlin/fr/acinq/lightning/io/peer/PeerTest.kt b/src/commonTest/kotlin/fr/acinq/lightning/io/peer/PeerTest.kt index eca8203ba..d46551f65 100644 --- a/src/commonTest/kotlin/fr/acinq/lightning/io/peer/PeerTest.kt +++ b/src/commonTest/kotlin/fr/acinq/lightning/io/peer/PeerTest.kt @@ -336,7 +336,7 @@ class PeerTest : LightningTestSuite() { alice0.staticParams.nodeParams.copy(checkHtlcTimeoutAfterStartupDelay = 5.seconds), TestConstants.Alice.walletParams, databases = InMemoryDatabases().also { it.channels.addOrUpdateChannel(alice1.state) }, - currentTip = htlc.cltvExpiry.toLong().toInt() to Block.RegtestGenesisBlock.header + currentTip = htlc.cltvExpiry.toLong().toInt() ) val initChannels = peer.channelsFlow.first { it.values.isNotEmpty() } @@ -406,7 +406,7 @@ class PeerTest : LightningTestSuite() { bob0.staticParams.nodeParams.copy(checkHtlcTimeoutAfterStartupDelay = 5.seconds), TestConstants.Bob.walletParams, databases = InMemoryDatabases(), // NB: empty database (Bob has lost its channel state) - currentTip = htlc.cltvExpiry.toLong().toInt() to Block.RegtestGenesisBlock.header + currentTip = htlc.cltvExpiry.toLong().toInt() ) // Simulate a reconnection with Alice. @@ -436,7 +436,7 @@ class PeerTest : LightningTestSuite() { bob0.staticParams.nodeParams.copy(checkHtlcTimeoutAfterStartupDelay = 5.seconds), TestConstants.Bob.walletParams, databases = InMemoryDatabases().also { it.channels.addOrUpdateChannel(bob0.state) }, // NB: outdated channel data - currentTip = htlc.cltvExpiry.toLong().toInt() to Block.RegtestGenesisBlock.header + currentTip = htlc.cltvExpiry.toLong().toInt() ) // Simulate a reconnection with Alice. diff --git a/src/commonTest/kotlin/fr/acinq/lightning/tests/io/peer/builders.kt b/src/commonTest/kotlin/fr/acinq/lightning/tests/io/peer/builders.kt index 7a9e81219..ed5dda669 100644 --- a/src/commonTest/kotlin/fr/acinq/lightning/tests/io/peer/builders.kt +++ b/src/commonTest/kotlin/fr/acinq/lightning/tests/io/peer/builders.kt @@ -182,7 +182,7 @@ suspend fun buildPeer( nodeParams: NodeParams, walletParams: WalletParams, databases: InMemoryDatabases = InMemoryDatabases(), - currentTip: Pair = 0 to Block.RegtestGenesisBlock.header + currentTip: Int = 0 ): Peer { val electrum = ElectrumClient(scope, nodeParams.loggerFactory) val watcher = ElectrumWatcher(electrum, scope, nodeParams.loggerFactory) From c8d1a0e9fbaaa7eb2438d7ce1ecf8acc6cd8ece0 Mon Sep 17 00:00:00 2001 From: pm47 Date: Mon, 3 Jun 2024 16:43:35 +0200 Subject: [PATCH 9/9] tests cleanup --- .../blockchain/mempool/MempoolSpaceWatcherTest.kt | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/src/commonTest/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceWatcherTest.kt b/src/commonTest/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceWatcherTest.kt index 90e41d110..beab577d6 100644 --- a/src/commonTest/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceWatcherTest.kt +++ b/src/commonTest/kotlin/fr/acinq/lightning/blockchain/mempool/MempoolSpaceWatcherTest.kt @@ -18,15 +18,6 @@ import kotlin.time.Duration.Companion.seconds class MempoolSpaceWatcherTest : LightningTestSuite() { - @Test - fun `publish a transaction`() = runSuspendTest { - val client = MempoolSpaceClient(mempoolUrl = OfficialMempoolTestnet, testLoggerFactory) - val watcher = MempoolSpaceWatcher(client, scope = this, testLoggerFactory) - val tx = - Transaction.read("0200000000010146c398e70cceaf9d8f734e603bc53e4c4c0605ab46cb1b5807a62c90f5aed50d0100000000feffffff023c0fc10c010000001600145033f65b590f2065fe55414213f1d25ab20b6c4f487d1700000000001600144b812d5ef41fc433654d186463d41b458821ff740247304402202438dc18801919baa64eb18f7e925ab6acdedc3751ea58ea164a26723b79fd39022060b46c1d277714c640cdc8512c36c862ffc646e7ff62438ef5cc847a5990bbf801210247b49d9e6b0089a1663668829e573c629c936eb430c043af9634aa57cf97a33cbee81f00") - watcher.publish(tx) - } - @Test fun `watch-spent on a transaction`() = runSuspendTest { val client = MempoolSpaceClient(mempoolUrl = OfficialMempoolTestnet, testLoggerFactory) @@ -44,6 +35,9 @@ class MempoolSpaceWatcherTest : LightningTestSuite() { watcher.watch(watch) val event = assertIs(notifications.first()) assertEquals(TxId("5693d68997abfacb65cc7e6019e8ed2edb3f9f2260ae8d221e8726b8fe870ae0"), event.tx.txid) + // Right after checking whether the watched utxo is spent, a 2nd call is made by the watcher + // to find out whether the spending tx is confirmed, and the watch can be cleaned up. We give + // some time for that call to complete, in order to prevent a coroutine cancellation stack trace. delay(2.seconds) }