diff --git a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumWatcher.kt b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumWatcher.kt index 46ccd731d..cadaa38cf 100644 --- a/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumWatcher.kt +++ b/src/commonMain/kotlin/fr/acinq/lightning/blockchain/electrum/ElectrumWatcher.kt @@ -8,35 +8,26 @@ import fr.acinq.lightning.utils.currentTimestampMillis import kotlinx.coroutines.* import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.* +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.consumeAsFlow import org.kodein.log.LoggerFactory import org.kodein.log.newLogger import kotlin.math.max -sealed class NotifyEvent -class NotifyWatchEvent(val watchEvent: WatchEvent) : NotifyEvent() -class NotifyUpToDateEvent(val millis: Long) : NotifyEvent() - class ElectrumWatcher(val client: ElectrumClient, val scope: CoroutineScope, loggerFactory: LoggerFactory) : CoroutineScope by scope { private val logger = loggerFactory.newLogger(this::class) private val mailbox = Channel(Channel.BUFFERED) - private val _notificationsFlow = MutableSharedFlow(replay = 0, extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.SUSPEND) - val notificationsFlow: SharedFlow = _notificationsFlow - fun openWatchNotificationsFlow(): Flow = _notificationsFlow.mapNotNull { - when (it) { - is NotifyWatchEvent -> it.watchEvent - else -> null - } - } + private val _notificationsFlow = MutableSharedFlow(replay = 0, extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.SUSPEND) + fun openWatchNotificationsFlow(): Flow = _notificationsFlow.asSharedFlow() - fun openUpToDateFlow(): Flow = _notificationsFlow.mapNotNull { - when (it) { - is NotifyUpToDateEvent -> it.millis - else -> null - } - } + // this is used by a Swift watch-tower module in the Phoenix iOS app to tell when the watcher is up-to-date + // the value that is emitted in the time elapsed (in milliseconds) since the watcher is ready and idle + private val _uptodateFlow = MutableSharedFlow(replay = 0, extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.SUSPEND) + fun openUpToDateFlow(): Flow = _uptodateFlow.asSharedFlow() suspend fun watch(watch: Watch) { mailbox.send(WatcherCommand.AddWatch(watch)) @@ -87,7 +78,7 @@ class ElectrumWatcher(val client: ElectrumClient, val scope: CoroutineScope, log .filter { it.txId == outPoint.txid && it.outputIndex == outPoint.index.toInt() } .map { w -> logger.info { "output ${w.txId}:${w.outputIndex} spent by transaction ${tx.txid}" } - _notificationsFlow.emit(NotifyWatchEvent(WatchEventSpent(w.channelId, w.event, tx))) + _notificationsFlow.emit(WatchEventSpent(w.channelId, w.event, tx)) } } } @@ -103,7 +94,7 @@ class ElectrumWatcher(val client: ElectrumClient, val scope: CoroutineScope, log val merkle = client.getMerkle(w.txId, item.blockHeight) val confirmations = state.height - merkle.block_height + 1 logger.info { "txid=${w.txId} had confirmations=$confirmations in block=${merkle.block_height} pos=${merkle.pos}" } - _notificationsFlow.emit(NotifyWatchEvent(WatchEventConfirmed(w.channelId, w.event, merkle.block_height, merkle.pos, txMap[w.txId]!!))) + _notificationsFlow.emit(WatchEventConfirmed(w.channelId, w.event, merkle.block_height, merkle.pos, txMap[w.txId]!!)) // check whether we have transactions to publish when (val event = w.event) { @@ -276,9 +267,10 @@ class ElectrumWatcher(val client: ElectrumClient, val scope: CoroutineScope, log is WatcherCommand.NotifyIfReady -> { if (state.isConnected) { state.idleSince?.let { - if (it < currentTimestampMillis() - 5000) { + val now = currentTimestampMillis() + if (now > it + 5000) { // no requests in progress and watcher has been idle for more than 5s - _notificationsFlow.emit(NotifyUpToDateEvent(currentTimestampMillis())) + _uptodateFlow.emit(now) } } }