From 9e524e65be61ded5df55c32fe2f17ea441a310a1 Mon Sep 17 00:00:00 2001 From: Alexander Nozik Date: Fri, 20 Nov 2020 18:20:36 +0300 Subject: [PATCH] Fix for #51 --- examples/src/main/kotlin/misc/dynamicBars.kt | 2 +- .../plotly/server/MetaChangeCollector.kt | 20 +++++++++---------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/examples/src/main/kotlin/misc/dynamicBars.kt b/examples/src/main/kotlin/misc/dynamicBars.kt index 54ba6fee..59700c1b 100644 --- a/examples/src/main/kotlin/misc/dynamicBars.kt +++ b/examples/src/main/kotlin/misc/dynamicBars.kt @@ -51,7 +51,7 @@ fun main() { while (isActive) { repeat(10) { columnIndex -> repeat(3) { seriesIndex -> - delay(100) + delay(200) traces["Series $seriesIndex"]?.let {bar-> println("Updating ${bar.name}, Column $columnIndex") //TODO replace with dynamic data API diff --git a/plotlykt-server/src/main/kotlin/kscience/plotly/server/MetaChangeCollector.kt b/plotlykt-server/src/main/kotlin/kscience/plotly/server/MetaChangeCollector.kt index 50e25e0a..d90dcb50 100644 --- a/plotlykt-server/src/main/kotlin/kscience/plotly/server/MetaChangeCollector.kt +++ b/plotlykt-server/src/main/kotlin/kscience/plotly/server/MetaChangeCollector.kt @@ -3,11 +3,10 @@ package kscience.plotly.server import hep.dataforge.meta.* import hep.dataforge.names.Name import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.delay import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.flatMapMerge import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.flow.transform import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock @@ -18,17 +17,17 @@ import kscience.plotly.Plot * A change collector that combines all emitted configuration changes until read, than drops all collected changes * and starts new batch. */ -class MetaChangeCollector { +public class MetaChangeCollector { private val mutex = Mutex() private var state = Config() - suspend fun collect(name: Name, newItem: MetaItem<*>?) { + public suspend fun collect(name: Name, newItem: MetaItem<*>?) { mutex.withLock { state[name] = newItem } } - suspend fun read(): Meta { + public suspend fun read(): Meta { return if (!state.isEmpty()) { mutex.withLock { state.seal().also { @@ -64,13 +63,12 @@ private fun Config.flowChanges(scope: CoroutineScope, updateInterval: Long): Flo } } -@OptIn(FlowPreview::class) public fun Plot.collectUpdates(plotId: String, scope: CoroutineScope, updateInterval: Long): Flow { - return config.flowChanges(scope, updateInterval).flatMapMerge { change -> - flow { - change["layout"].node?.let { emit(Update.Layout(plotId, it)) } - change.getIndexed("data").values.mapNotNull { it.node }.forEachIndexed { index, metaItem -> - emit(Update.Trace(plotId, index, metaItem)) + return config.flowChanges(scope, updateInterval).transform { change -> + change["layout"].node?.let { emit(Update.Layout(plotId, it)) } + change.getIndexed("data").forEach { (index, metaItem) -> + if (metaItem is MetaItem.NodeItem) { + emit(Update.Trace(plotId, index?.toInt() ?: 0, metaItem.node)) } } }