diff --git a/build.gradle.kts b/build.gradle.kts index 6ef7cbb70..621c0124d 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -11,7 +11,7 @@ plugins { allprojects { group = "fr.acinq.lightning" - version = "1.5.0" + version = "1.5.1-SNAPSHOT" repositories { // using the local maven repository with Kotlin Multi Platform can lead to build errors that are hard to diagnose. diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt index 31af75ba6..162520f96 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClient.kt @@ -1,8 +1,10 @@ package fr.acinq.lightning.blockchain.electrum import fr.acinq.bitcoin.* +import fr.acinq.lightning.blockchain.fee.FeeratePerByte +import fr.acinq.lightning.blockchain.fee.FeeratePerKw +import fr.acinq.lightning.blockchain.fee.OnChainFeerates import fr.acinq.lightning.io.TcpSocket -import fr.acinq.lightning.io.linesFlow import fr.acinq.lightning.io.send import fr.acinq.lightning.utils.* import kotlinx.coroutines.* @@ -23,14 +25,15 @@ sealed interface ElectrumClientCommand { sealed interface ElectrumConnectionStatus { data class Closed(val reason: TcpSocket.IOException?) : ElectrumConnectionStatus object Connecting : ElectrumConnectionStatus - data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader) : ElectrumConnectionStatus + data class Connected(val version: ServerVersionResponse, val height: Int, val header: BlockHeader, val onchainFeeRates: OnChainFeerates) : ElectrumConnectionStatus } @OptIn(ExperimentalCoroutinesApi::class) class ElectrumClient( socketBuilder: TcpSocket.Builder?, scope: CoroutineScope, - private val loggerFactory: LoggerFactory + private val loggerFactory: LoggerFactory, + exceptionHandler_opt: CoroutineExceptionHandler? = null ) : CoroutineScope by scope, IElectrumClient { private val logger = loggerFactory.newLogger(this::class) @@ -99,9 +102,11 @@ class ElectrumClient( } } - private fun establishConnection(serverAddress: ServerAddress) = launch(CoroutineExceptionHandler { _, exception -> + val exceptionHandler = exceptionHandler_opt ?: CoroutineExceptionHandler { _, exception -> logger.error(exception) { "error starting electrum client" } - }) { + } + + private fun establishConnection(serverAddress: ServerAddress) = launch(exceptionHandler) { _connectionStatus.value = ElectrumConnectionStatus.Connecting val socket: TcpSocket = try { val (host, port, tls) = serverAddress @@ -139,22 +144,41 @@ class ElectrumClient( } val flow = socket.linesFlow().map { json.decodeFromString(ElectrumResponseDeserializer, it) } - val version = ServerVersion() - sendRequest(version, 0) val rpcFlow = flow.filterIsInstance>().map { it.value } + var requestId = 0 + + val version = ServerVersion() + sendRequest(version, requestId++) val theirVersion = parseJsonResponse(version, rpcFlow.first()) require(theirVersion is ServerVersionResponse) { "invalid server version response $theirVersion" } logger.info { "server version $theirVersion" } - sendRequest(HeaderSubscription, 0) + + sendRequest(HeaderSubscription, requestId++) val header = parseJsonResponse(HeaderSubscription, rpcFlow.first()) require(header is HeaderSubscriptionResponse) { "invalid header subscription response $header" } + + suspend fun estimateFee(confirmations: Int): EstimateFeeResponse { + val request = EstimateFees(confirmations) + sendRequest(request, requestId++) + val response = parseJsonResponse(request, rpcFlow.first()) + require(response is EstimateFeeResponse) { "invalid estimatefee response $response" } + return response + } + + val fees = listOf(estimateFee(2), estimateFee(6), estimateFee(18), estimateFee(144)) + logger.info { "onchain fees $fees" } + val feeRates = OnChainFeerates( + fundingFeerate = fees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)), + mutualCloseFeerate = fees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)), + claimMainFeerate = fees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)), + fastFeerate = fees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat)) + ) _notifications.emit(header) - _connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header) + _connectionStatus.value = ElectrumConnectionStatus.Connected(theirVersion, header.blockHeight, header.header, feeRates) logger.info { "server tip $header" } // pending requests map val requestMap = mutableMapOf>>() - var requestId = 0 // reset mailbox mailbox.cancel(CancellationException("connection in progress")) diff --git a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt index ac58377af..355865217 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt @@ -195,11 +195,11 @@ class Peer( } } launch { - watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.collect { - // onchain fees are retrieved punctually, when electrum status moves to Connection.ESTABLISHED - // since the application is not running most of the time, and when it is, it will be only for a few minutes, this is good enough. - // (for a node that is online most of the time things would be different and we would need to re-evaluate onchain fee estimates on a regular basis) - updateEstimateFees() + watcher.client.connectionStatus.filterIsInstance().collect { + // Onchain fees are retrieved once when we establish a connection to an electrum server. + // It is acceptable since the application will typically not be running more than a few minutes at a time. + // (for a node that is online most of the time things would be different, and we would need to re-evaluate onchain fee estimates on a regular basis) + onChainFeeratesFlow.value = it.onchainFeeRates } } launch { @@ -257,24 +257,6 @@ class Peer( } } - private suspend fun updateEstimateFees() { - watcher.client.connectionState.filter { it == Connection.ESTABLISHED }.first() - val sortedFees = listOf( - watcher.client.estimateFees(2), - watcher.client.estimateFees(6), - watcher.client.estimateFees(18), - watcher.client.estimateFees(144), - ) - logger.info { "on-chain fees: $sortedFees" } - // TODO: If some feerates are null, we may implement a retry - onChainFeeratesFlow.value = OnChainFeerates( - fundingFeerate = sortedFees[3].feerate ?: FeeratePerKw(FeeratePerByte(2.sat)), - mutualCloseFeerate = sortedFees[2].feerate ?: FeeratePerKw(FeeratePerByte(10.sat)), - claimMainFeerate = sortedFees[1].feerate ?: FeeratePerKw(FeeratePerByte(20.sat)), - fastFeerate = sortedFees[0].feerate ?: FeeratePerKw(FeeratePerByte(50.sat)) - ) - } - fun connect() { if (connectionState.value is Connection.CLOSED) establishConnection() else logger.warning { "Peer is already connecting / connected" } diff --git a/src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt b/src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt index 7d9e01f0e..68aab9ba4 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/io/TcpSocket.kt @@ -21,6 +21,18 @@ interface TcpSocket { suspend fun receiveFully(buffer: ByteArray, offset: Int, length: Int) suspend fun receiveAvailable(buffer: ByteArray, offset: Int, length: Int): Int + fun linesFlow(): Flow { + return flow { + val buffer = ByteArray(8192) + while (true) { + val size = receiveAvailable(buffer) + emit(buffer.subArray(size)) + } + } + .decodeToString() + .splitByLines() + } + suspend fun startTls(tls: TLS): TcpSocket fun close() @@ -71,14 +83,3 @@ internal expect object PlatformSocketBuilder : TcpSocket.Builder suspend fun TcpSocket.receiveFully(size: Int): ByteArray = ByteArray(size).also { receiveFully(it) } - -fun TcpSocket.linesFlow(): Flow = - flow { - val buffer = ByteArray(8192) - while (true) { - val size = receiveAvailable(buffer) - emit(buffer.subArray(size)) - } - } - .decodeToString() - .splitByLines() \ No newline at end of file diff --git a/src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt b/src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt index b68bcd5b4..4718f5b4c 100644 --- a/src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt +++ b/src/commonTest/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumClientTest.kt @@ -2,15 +2,16 @@ package fr.acinq.lightning.blockchain.electrum import fr.acinq.bitcoin.* import fr.acinq.lightning.blockchain.fee.FeeratePerKw +import fr.acinq.lightning.io.TcpSocket import fr.acinq.lightning.tests.utils.LightningTestSuite import fr.acinq.lightning.tests.utils.runSuspendTest import fr.acinq.lightning.utils.Connection +import fr.acinq.lightning.utils.ServerAddress import fr.acinq.lightning.utils.toByteVector32 import fr.acinq.secp256k1.Hex -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.flow.first -import kotlinx.coroutines.joinAll -import kotlinx.coroutines.launch +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.kodein.log.LoggerFactory import kotlin.test.* import kotlin.time.Duration.Companion.seconds @@ -177,4 +178,45 @@ class ElectrumClientTest : LightningTestSuite() { client.stop() } + + @OptIn(DelicateCoroutinesApi::class) + @Test + fun `catch coroutine errors`() { + val myCustomError = "this is a test error" + + class MyTcpSopcket(val socket: TcpSocket) : TcpSocket by socket { + override fun linesFlow(): Flow { + return super.linesFlow().map { + // during the handshake with the electrum server we first ask for the server version, then headers, fee rates + // so id == 2 means we're asking for fee rates, and here we return an error + val sendError = it.contains("\"id\": 2") + if (sendError) { + """{"jsonrpc": "2.0", "error": {"code": 42, "message": "$myCustomError"}, "id": 2}""" + } else { + it + } + } + } + } + + class MyBuilder(val builder: TcpSocket.Builder) : TcpSocket.Builder { + override suspend fun connect(host: String, port: Int, tls: TcpSocket.TLS, loggerFactory: LoggerFactory): TcpSocket { + val socket = builder.connect(host, port, tls, loggerFactory) + return MyTcpSopcket(socket) + } + } + + runBlocking { + val builder = MyBuilder(TcpSocket.Builder()) + val errorFlow = MutableStateFlow(null) + val myErrorHandler = CoroutineExceptionHandler { _, e -> errorFlow.value = e } + val client = ElectrumClient(builder, GlobalScope, LoggerFactory.default, myErrorHandler) + client.connect(ServerAddress("electrum.acinq.co", 50002, TcpSocket.TLS.UNSAFE_CERTIFICATES)) + client.connectionState.first { it is Connection.CLOSED } + client.connectionState.first { it is Connection.ESTABLISHING } + val error = errorFlow.filterNotNull().first() + assertTrue(error.message!!.contains(myCustomError)) + client.stop() + } + } }