Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework and simplify electrum client, miniwallet and watcher #411

Merged
merged 4 commits into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import fr.acinq.lightning.blockchain.fee.FeeratePerKB
import fr.acinq.lightning.blockchain.fee.FeeratePerKw
import fr.acinq.lightning.utils.*
import fr.acinq.secp256k1.Hex
import kotlinx.serialization.DeserializationStrategy
import kotlinx.serialization.KSerializer
import kotlinx.serialization.SerializationException
import kotlinx.serialization.descriptors.SerialDescriptor
Expand Down Expand Up @@ -156,24 +157,23 @@ data class EstimateFees(val confirmations: Int) : ElectrumRequest(confirmations)

data class EstimateFeeResponse(val confirmations: Int, val feerate: FeeratePerKw?) : ElectrumResponse

sealed interface ElectrumSubscriptionResponse : ElectrumResponse

data class ScriptHashSubscription(val scriptHash: ByteVector32) : ElectrumRequest(scriptHash) {
override val method: String = "blockchain.scripthash.subscribe"
}

data class ScriptHashSubscriptionResponse(val scriptHash: ByteVector32, val status: String = "") : ElectrumResponse
data class ScriptHashSubscriptionResponse(val scriptHash: ByteVector32, val status: String = "") : ElectrumSubscriptionResponse

object HeaderSubscription : ElectrumRequest() {
override val method: String = "blockchain.headers.subscribe"
}

data class HeaderSubscriptionResponse(val blockHeight: Int, val header: BlockHeader) : ElectrumResponse
data class HeaderSubscriptionResponse(val blockHeight: Int, val header: BlockHeader) : ElectrumSubscriptionResponse

/**
* Other Electrum responses
*/
data class TransactionHistory(val history: List<TransactionHistoryItem>) : ElectrumResponse
data class AddressStatus(val address: String, val status: String?) : ElectrumResponse
data class ServerError(val request: ElectrumRequest, val error: JsonRPCError) : ElectrumResponse

/**
Expand All @@ -184,10 +184,10 @@ data class ServerError(val request: ElectrumRequest, val error: JsonRPCError) :
* The former are correlated 1:1 with JSON RPC requests, based on request id. The latter are not: one
* subscription can yield an indefinite number of notifications.
*/
object ElectrumResponseDeserializer : KSerializer<Either<ElectrumResponse, JsonRPCResponse>> {
object ElectrumResponseDeserializer : DeserializationStrategy<Either<ElectrumSubscriptionResponse, JsonRPCResponse>> {
private val json = Json { ignoreUnknownKeys = true }

override fun deserialize(decoder: Decoder): Either<ElectrumResponse, JsonRPCResponse> {
override fun deserialize(decoder: Decoder): Either<ElectrumSubscriptionResponse, JsonRPCResponse> {
// Decoder -> JsonInput
val input = decoder as? JsonDecoder
?: throw SerializationException("This class can be loaded only by JSON")
Expand Down Expand Up @@ -221,10 +221,6 @@ object ElectrumResponseDeserializer : KSerializer<Either<ElectrumResponse, JsonR
}
}

override fun serialize(encoder: Encoder, value: Either<ElectrumResponse, JsonRPCResponse>) {
throw SerializationException("This ($value) is not meant to be serialized!")
}

override val descriptor: SerialDescriptor
get() = buildClassSerialDescriptor("fr.acinq.lightning.utils.Either")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private sealed interface WalletCommand {
*/
class ElectrumMiniWallet(
val chainHash: ByteVector32,
private val client: ElectrumClient.Caller,
private val client: ElectrumClient,
private val scope: CoroutineScope,
loggerFactory: LoggerFactory,
private val name: String = ""
Expand Down Expand Up @@ -130,77 +130,69 @@ class ElectrumMiniWallet(
mailbox.send(WalletCommand.Companion.AddAddress(bitcoinAddress))
}
}

init {
suspend fun WalletState.processSubscriptionResponse(msg: ScriptHashSubscriptionResponse): WalletState {
val bitcoinAddress = scriptHashes[msg.scriptHash]
return when {
bitcoinAddress == null || msg.status.isEmpty() -> this
else -> {
val unspents = client.getScriptHashUnspents(msg.scriptHash)
val newUtxos = unspents.minus((_walletStateFlow.value.addresses[bitcoinAddress] ?: emptyList()).toSet())
// request new parent txs
val parentTxs = newUtxos.map { utxo ->
val tx = client.getTx(utxo.txid)
logger.mdcinfo { "received parent transaction with txid=${tx.txid}" }
tx
}
val nextWalletState = this.copy(addresses = this.addresses + (bitcoinAddress to unspents), parentTxs = this.parentTxs + parentTxs.associateBy { it.txid })
logger.mdcinfo { "${unspents.size} utxo(s) for address=$bitcoinAddress balance=${nextWalletState.totalBalance}" }
unspents.forEach { logger.debug { "utxo=${it.outPoint.txid}:${it.outPoint.index} amount=${it.value} sat" } }
nextWalletState
}
}
}

suspend fun subscribe(bitcoinAddress: String): Triple<ByteVector32, String, ScriptHashSubscriptionResponse> {
val pubkeyScript = ByteVector(Script.write(Bitcoin.addressToPublicKeyScript(chainHash, bitcoinAddress)))
val scriptHash = ElectrumClient.computeScriptHash(pubkeyScript)
logger.info { "subscribing to address=$bitcoinAddress pubkeyScript=$pubkeyScript scriptHash=$scriptHash" }
val response = client.startScriptHashSubscription(scriptHash)
return Triple(scriptHash, bitcoinAddress, response)
}

launch {
// listen to connection events
client.connectionState
.filterIsInstance<Connection.ESTABLISHED>()
.collect { mailbox.send(WalletCommand.Companion.ElectrumConnected) }
client.connectionState.filterIsInstance<Connection.ESTABLISHED>().collect { mailbox.send(WalletCommand.Companion.ElectrumConnected) }
}
launch {
// listen to subscriptions events
client.notifications
.collect { mailbox.send(WalletCommand.Companion.ElectrumNotification(it)) }
client.notifications.collect { mailbox.send(WalletCommand.Companion.ElectrumNotification(it)) }
}
launch {
mailbox.consumeAsFlow().collect {
when (it) {
is WalletCommand.Companion.ElectrumConnected -> {
logger.mdcinfo { "electrum connected" }
scriptHashes.values.forEach { subscribe(it) }
scriptHashes.values.forEach { scriptHash ->
val (_, _, response) = subscribe(scriptHash)
_walletStateFlow.value = _walletStateFlow.value.processSubscriptionResponse(response)
}
}

is WalletCommand.Companion.ElectrumNotification -> {
// NB: we ignore responses for unknown script_hashes (electrum client doesn't maintain a list of subscribers so we receive all subscriptions)
when (val msg = it.msg) {
is ScriptHashSubscriptionResponse -> {
scriptHashes[msg.scriptHash]?.let { bitcoinAddress ->
if (msg.status.isNotEmpty()) {
logger.mdcinfo { "non-empty status for address=$bitcoinAddress, requesting utxos" }
client.sendElectrumRequest(ScriptHashListUnspent(msg.scriptHash))
}
}
}

is ScriptHashListUnspentResponse -> {
scriptHashes[msg.scriptHash]?.let { address ->
val newUtxos = msg.unspents.minus((_walletStateFlow.value.addresses[address] ?: emptyList()).toSet())
// request new parent txs
newUtxos.forEach { utxo -> client.sendElectrumRequest(GetTransaction(utxo.txid)) }
val walletState = _walletStateFlow.value.copy(addresses = _walletStateFlow.value.addresses + (address to msg.unspents))
logger.mdcinfo { "${msg.unspents.size} utxo(s) for address=$address balance=${walletState.totalBalance}" }
msg.unspents.forEach { logger.debug { "utxo=${it.outPoint.txid}:${it.outPoint.index} amount=${it.value} sat" } }
// publish the updated balance
_walletStateFlow.value = walletState
}
}

is GetTransactionResponse -> {
val walletState = _walletStateFlow.value.copy(parentTxs = _walletStateFlow.value.parentTxs + (msg.tx.txid to msg.tx))
logger.mdcinfo { "received parent transaction with txid=${msg.tx.txid}" }
_walletStateFlow.value = walletState

}

else -> {} // ignore other electrum msgs
if (it.msg is ScriptHashSubscriptionResponse) {
_walletStateFlow.value = _walletStateFlow.value.processSubscriptionResponse(it.msg)
}
}

is WalletCommand.Companion.AddAddress -> {
logger.mdcinfo { "adding new address=${it.bitcoinAddress}" }
scriptHashes = scriptHashes + subscribe(it.bitcoinAddress)
val (scriptHash, address, response) = subscribe(it.bitcoinAddress)
scriptHashes = scriptHashes + (scriptHash to address)
_walletStateFlow.value = _walletStateFlow.value.processSubscriptionResponse(response)
}
}
}
}
}

private fun subscribe(bitcoinAddress: String): Pair<ByteVector32, String> {
val pubkeyScript = ByteVector(Script.write(Bitcoin.addressToPublicKeyScript(chainHash, bitcoinAddress)))
val scriptHash = ElectrumClient.computeScriptHash(pubkeyScript)
logger.info { "subscribing to address=$bitcoinAddress pubkeyScript=$pubkeyScript scriptHash=$scriptHash" }
client.sendElectrumRequest(ScriptHashSubscription(scriptHash))
return scriptHash to bitcoinAddress
}
}
Loading