Skip to content

Commit

Permalink
Restore ElectrumWatcher.openUpToDateFlow()
Browse files Browse the repository at this point in the history
It is used by a watch-tower module written in Swift in the iOS app.
Code has bee simplified and there are now 2 separate flows:
- one for watch events
- one for "up-to-date" events (which are just the number of milliseconds since the watcher is ready and idle)
  • Loading branch information
sstone committed Mar 28, 2023
1 parent 08640a6 commit 6ad2580
Showing 1 changed file with 15 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,26 @@ import fr.acinq.lightning.utils.currentTimestampMillis
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.consumeAsFlow
import org.kodein.log.LoggerFactory
import org.kodein.log.newLogger
import kotlin.math.max

sealed class NotifyEvent
class NotifyWatchEvent(val watchEvent: WatchEvent) : NotifyEvent()
class NotifyUpToDateEvent(val millis: Long) : NotifyEvent()

class ElectrumWatcher(val client: ElectrumClient, val scope: CoroutineScope, loggerFactory: LoggerFactory) : CoroutineScope by scope {

private val logger = loggerFactory.newLogger(this::class)
private val mailbox = Channel<WatcherCommand>(Channel.BUFFERED)
private val _notificationsFlow = MutableSharedFlow<NotifyEvent>(replay = 0, extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.SUSPEND)
val notificationsFlow: SharedFlow<NotifyEvent> = _notificationsFlow

fun openWatchNotificationsFlow(): Flow<WatchEvent> = _notificationsFlow.mapNotNull {
when (it) {
is NotifyWatchEvent -> it.watchEvent
else -> null
}
}
private val _notificationsFlow = MutableSharedFlow<WatchEvent>(replay = 0, extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.SUSPEND)
fun openWatchNotificationsFlow(): Flow<WatchEvent> = _notificationsFlow.asSharedFlow()

fun openUpToDateFlow(): Flow<Long> = _notificationsFlow.mapNotNull {
when (it) {
is NotifyUpToDateEvent -> it.millis
else -> null
}
}
// this is used by a Swift watch-tower module in the Phoenix iOS app to tell when the watcher is up-to-date
// the value that is emitted in the time elapsed (in milliseconds) since the watcher is ready and idle
private val _uptodateFlow = MutableSharedFlow<Long>(replay = 0, extraBufferCapacity = 64, onBufferOverflow = BufferOverflow.SUSPEND)
fun openUpToDateFlow(): Flow<Long> = _uptodateFlow.asSharedFlow()

suspend fun watch(watch: Watch) {
mailbox.send(WatcherCommand.AddWatch(watch))
Expand Down Expand Up @@ -87,7 +78,7 @@ class ElectrumWatcher(val client: ElectrumClient, val scope: CoroutineScope, log
.filter { it.txId == outPoint.txid && it.outputIndex == outPoint.index.toInt() }
.map { w ->
logger.info { "output ${w.txId}:${w.outputIndex} spent by transaction ${tx.txid}" }
_notificationsFlow.emit(NotifyWatchEvent(WatchEventSpent(w.channelId, w.event, tx)))
_notificationsFlow.emit(WatchEventSpent(w.channelId, w.event, tx))
}
}
}
Expand All @@ -103,7 +94,7 @@ class ElectrumWatcher(val client: ElectrumClient, val scope: CoroutineScope, log
val merkle = client.getMerkle(w.txId, item.blockHeight)
val confirmations = state.height - merkle.block_height + 1
logger.info { "txid=${w.txId} had confirmations=$confirmations in block=${merkle.block_height} pos=${merkle.pos}" }
_notificationsFlow.emit(NotifyWatchEvent(WatchEventConfirmed(w.channelId, w.event, merkle.block_height, merkle.pos, txMap[w.txId]!!)))
_notificationsFlow.emit(WatchEventConfirmed(w.channelId, w.event, merkle.block_height, merkle.pos, txMap[w.txId]!!))

// check whether we have transactions to publish
when (val event = w.event) {
Expand Down Expand Up @@ -276,9 +267,10 @@ class ElectrumWatcher(val client: ElectrumClient, val scope: CoroutineScope, log
is WatcherCommand.NotifyIfReady -> {
if (state.isConnected) {
state.idleSince?.let {
if (it < currentTimestampMillis() - 5000) {
val now = currentTimestampMillis()
if (now > it + 5000) {
// no requests in progress and watcher has been idle for more than 5s
_notificationsFlow.emit(NotifyUpToDateEvent(currentTimestampMillis()))
_uptodateFlow.emit(now)
}
}
}
Expand Down

0 comments on commit 6ad2580

Please sign in to comment.