Skip to content

Commit

Permalink
Rework and simplify electrum client, miniwallet and watcher (#411)
Browse files Browse the repository at this point in the history
* Rework and simplify electrum client, miniwallet and watcher

CompletableDeferred<> is used to handle multiple concurrent calls and provide suspend methods which are much easier to use.
For example, to retrieve a transaction you can simply call `client.getTx(txid)` and you can chain such calls in your message handling loops:
this eliminate the need to send messages to yourself and create intermediate states that are just waiting for a response from the electrum client.

We also fix a bug where Phoenix would crash after being disconnected because it would try to use a closed socket to send Ping messages.

* Add a handler for exceptions thrown in electrum's main coroutine

* Add a specific ElectrumSubscriptionResponse type for subscription events

Apply suggestions from https://github.com/ACINQ/lightning-kmp/compare/electrum-client-rework-pm and add a new
ElectrumSubscriptionResponse type for subscription events.
  • Loading branch information
sstone authored Mar 28, 2023
1 parent 444ee58 commit d36d723
Show file tree
Hide file tree
Showing 10 changed files with 460 additions and 1,105 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import fr.acinq.lightning.blockchain.fee.FeeratePerKB
import fr.acinq.lightning.blockchain.fee.FeeratePerKw
import fr.acinq.lightning.utils.*
import fr.acinq.secp256k1.Hex
import kotlinx.serialization.DeserializationStrategy
import kotlinx.serialization.KSerializer
import kotlinx.serialization.SerializationException
import kotlinx.serialization.descriptors.SerialDescriptor
Expand Down Expand Up @@ -156,24 +157,23 @@ data class EstimateFees(val confirmations: Int) : ElectrumRequest(confirmations)

data class EstimateFeeResponse(val confirmations: Int, val feerate: FeeratePerKw?) : ElectrumResponse

sealed interface ElectrumSubscriptionResponse : ElectrumResponse

data class ScriptHashSubscription(val scriptHash: ByteVector32) : ElectrumRequest(scriptHash) {
override val method: String = "blockchain.scripthash.subscribe"
}

data class ScriptHashSubscriptionResponse(val scriptHash: ByteVector32, val status: String = "") : ElectrumResponse
data class ScriptHashSubscriptionResponse(val scriptHash: ByteVector32, val status: String = "") : ElectrumSubscriptionResponse

object HeaderSubscription : ElectrumRequest() {
override val method: String = "blockchain.headers.subscribe"
}

data class HeaderSubscriptionResponse(val blockHeight: Int, val header: BlockHeader) : ElectrumResponse
data class HeaderSubscriptionResponse(val blockHeight: Int, val header: BlockHeader) : ElectrumSubscriptionResponse

/**
* Other Electrum responses
*/
data class TransactionHistory(val history: List<TransactionHistoryItem>) : ElectrumResponse
data class AddressStatus(val address: String, val status: String?) : ElectrumResponse
data class ServerError(val request: ElectrumRequest, val error: JsonRPCError) : ElectrumResponse

/**
Expand All @@ -184,10 +184,10 @@ data class ServerError(val request: ElectrumRequest, val error: JsonRPCError) :
* The former are correlated 1:1 with JSON RPC requests, based on request id. The latter are not: one
* subscription can yield an indefinite number of notifications.
*/
object ElectrumResponseDeserializer : KSerializer<Either<ElectrumResponse, JsonRPCResponse>> {
object ElectrumResponseDeserializer : DeserializationStrategy<Either<ElectrumSubscriptionResponse, JsonRPCResponse>> {
private val json = Json { ignoreUnknownKeys = true }

override fun deserialize(decoder: Decoder): Either<ElectrumResponse, JsonRPCResponse> {
override fun deserialize(decoder: Decoder): Either<ElectrumSubscriptionResponse, JsonRPCResponse> {
// Decoder -> JsonInput
val input = decoder as? JsonDecoder
?: throw SerializationException("This class can be loaded only by JSON")
Expand Down Expand Up @@ -221,10 +221,6 @@ object ElectrumResponseDeserializer : KSerializer<Either<ElectrumResponse, JsonR
}
}

override fun serialize(encoder: Encoder, value: Either<ElectrumResponse, JsonRPCResponse>) {
throw SerializationException("This ($value) is not meant to be serialized!")
}

override val descriptor: SerialDescriptor
get() = buildClassSerialDescriptor("fr.acinq.lightning.utils.Either")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private sealed interface WalletCommand {
*/
class ElectrumMiniWallet(
val chainHash: ByteVector32,
private val client: ElectrumClient.Caller,
private val client: ElectrumClient,
private val scope: CoroutineScope,
loggerFactory: LoggerFactory,
private val name: String = ""
Expand Down Expand Up @@ -130,77 +130,69 @@ class ElectrumMiniWallet(
mailbox.send(WalletCommand.Companion.AddAddress(bitcoinAddress))
}
}

init {
suspend fun WalletState.processSubscriptionResponse(msg: ScriptHashSubscriptionResponse): WalletState {
val bitcoinAddress = scriptHashes[msg.scriptHash]
return when {
bitcoinAddress == null || msg.status.isEmpty() -> this
else -> {
val unspents = client.getScriptHashUnspents(msg.scriptHash)
val newUtxos = unspents.minus((_walletStateFlow.value.addresses[bitcoinAddress] ?: emptyList()).toSet())
// request new parent txs
val parentTxs = newUtxos.map { utxo ->
val tx = client.getTx(utxo.txid)
logger.mdcinfo { "received parent transaction with txid=${tx.txid}" }
tx
}
val nextWalletState = this.copy(addresses = this.addresses + (bitcoinAddress to unspents), parentTxs = this.parentTxs + parentTxs.associateBy { it.txid })
logger.mdcinfo { "${unspents.size} utxo(s) for address=$bitcoinAddress balance=${nextWalletState.totalBalance}" }
unspents.forEach { logger.debug { "utxo=${it.outPoint.txid}:${it.outPoint.index} amount=${it.value} sat" } }
nextWalletState
}
}
}

suspend fun subscribe(bitcoinAddress: String): Triple<ByteVector32, String, ScriptHashSubscriptionResponse> {
val pubkeyScript = ByteVector(Script.write(Bitcoin.addressToPublicKeyScript(chainHash, bitcoinAddress)))
val scriptHash = ElectrumClient.computeScriptHash(pubkeyScript)
logger.info { "subscribing to address=$bitcoinAddress pubkeyScript=$pubkeyScript scriptHash=$scriptHash" }
val response = client.startScriptHashSubscription(scriptHash)
return Triple(scriptHash, bitcoinAddress, response)
}

launch {
// listen to connection events
client.connectionState
.filterIsInstance<Connection.ESTABLISHED>()
.collect { mailbox.send(WalletCommand.Companion.ElectrumConnected) }
client.connectionState.filterIsInstance<Connection.ESTABLISHED>().collect { mailbox.send(WalletCommand.Companion.ElectrumConnected) }
}
launch {
// listen to subscriptions events
client.notifications
.collect { mailbox.send(WalletCommand.Companion.ElectrumNotification(it)) }
client.notifications.collect { mailbox.send(WalletCommand.Companion.ElectrumNotification(it)) }
}
launch {
mailbox.consumeAsFlow().collect {
when (it) {
is WalletCommand.Companion.ElectrumConnected -> {
logger.mdcinfo { "electrum connected" }
scriptHashes.values.forEach { subscribe(it) }
scriptHashes.values.forEach { scriptHash ->
val (_, _, response) = subscribe(scriptHash)
_walletStateFlow.value = _walletStateFlow.value.processSubscriptionResponse(response)
}
}

is WalletCommand.Companion.ElectrumNotification -> {
// NB: we ignore responses for unknown script_hashes (electrum client doesn't maintain a list of subscribers so we receive all subscriptions)
when (val msg = it.msg) {
is ScriptHashSubscriptionResponse -> {
scriptHashes[msg.scriptHash]?.let { bitcoinAddress ->
if (msg.status.isNotEmpty()) {
logger.mdcinfo { "non-empty status for address=$bitcoinAddress, requesting utxos" }
client.sendElectrumRequest(ScriptHashListUnspent(msg.scriptHash))
}
}
}

is ScriptHashListUnspentResponse -> {
scriptHashes[msg.scriptHash]?.let { address ->
val newUtxos = msg.unspents.minus((_walletStateFlow.value.addresses[address] ?: emptyList()).toSet())
// request new parent txs
newUtxos.forEach { utxo -> client.sendElectrumRequest(GetTransaction(utxo.txid)) }
val walletState = _walletStateFlow.value.copy(addresses = _walletStateFlow.value.addresses + (address to msg.unspents))
logger.mdcinfo { "${msg.unspents.size} utxo(s) for address=$address balance=${walletState.totalBalance}" }
msg.unspents.forEach { logger.debug { "utxo=${it.outPoint.txid}:${it.outPoint.index} amount=${it.value} sat" } }
// publish the updated balance
_walletStateFlow.value = walletState
}
}

is GetTransactionResponse -> {
val walletState = _walletStateFlow.value.copy(parentTxs = _walletStateFlow.value.parentTxs + (msg.tx.txid to msg.tx))
logger.mdcinfo { "received parent transaction with txid=${msg.tx.txid}" }
_walletStateFlow.value = walletState

}

else -> {} // ignore other electrum msgs
if (it.msg is ScriptHashSubscriptionResponse) {
_walletStateFlow.value = _walletStateFlow.value.processSubscriptionResponse(it.msg)
}
}

is WalletCommand.Companion.AddAddress -> {
logger.mdcinfo { "adding new address=${it.bitcoinAddress}" }
scriptHashes = scriptHashes + subscribe(it.bitcoinAddress)
val (scriptHash, address, response) = subscribe(it.bitcoinAddress)
scriptHashes = scriptHashes + (scriptHash to address)
_walletStateFlow.value = _walletStateFlow.value.processSubscriptionResponse(response)
}
}
}
}
}

private fun subscribe(bitcoinAddress: String): Pair<ByteVector32, String> {
val pubkeyScript = ByteVector(Script.write(Bitcoin.addressToPublicKeyScript(chainHash, bitcoinAddress)))
val scriptHash = ElectrumClient.computeScriptHash(pubkeyScript)
logger.info { "subscribing to address=$bitcoinAddress pubkeyScript=$pubkeyScript scriptHash=$scriptHash" }
client.sendElectrumRequest(ScriptHashSubscription(scriptHash))
return scriptHash to bitcoinAddress
}
}
Loading

0 comments on commit d36d723

Please sign in to comment.