Skip to content

Commit

Permalink
Fix for #51
Browse files Browse the repository at this point in the history
  • Loading branch information
altavir committed Nov 20, 2020
1 parent f6da7d6 commit 9e524e6
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 12 deletions.
2 changes: 1 addition & 1 deletion examples/src/main/kotlin/misc/dynamicBars.kt
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ fun main() {
while (isActive) {
repeat(10) { columnIndex ->
repeat(3) { seriesIndex ->
delay(100)
delay(200)
traces["Series $seriesIndex"]?.let {bar->
println("Updating ${bar.name}, Column $columnIndex")
//TODO replace with dynamic data API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ package kscience.plotly.server
import hep.dataforge.meta.*
import hep.dataforge.names.Name
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flatMapMerge
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.transform
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
Expand All @@ -18,17 +17,17 @@ import kscience.plotly.Plot
* A change collector that combines all emitted configuration changes until read, than drops all collected changes
* and starts new batch.
*/
class MetaChangeCollector {
public class MetaChangeCollector {
private val mutex = Mutex()
private var state = Config()

suspend fun collect(name: Name, newItem: MetaItem<*>?) {
public suspend fun collect(name: Name, newItem: MetaItem<*>?) {
mutex.withLock {
state[name] = newItem
}
}

suspend fun read(): Meta {
public suspend fun read(): Meta {
return if (!state.isEmpty()) {
mutex.withLock {
state.seal().also {
Expand Down Expand Up @@ -64,13 +63,12 @@ private fun Config.flowChanges(scope: CoroutineScope, updateInterval: Long): Flo
}
}

@OptIn(FlowPreview::class)
public fun Plot.collectUpdates(plotId: String, scope: CoroutineScope, updateInterval: Long): Flow<Update> {
return config.flowChanges(scope, updateInterval).flatMapMerge { change ->
flow<Update> {
change["layout"].node?.let { emit(Update.Layout(plotId, it)) }
change.getIndexed("data").values.mapNotNull { it.node }.forEachIndexed { index, metaItem ->
emit(Update.Trace(plotId, index, metaItem))
return config.flowChanges(scope, updateInterval).transform { change ->
change["layout"].node?.let { emit(Update.Layout(plotId, it)) }
change.getIndexed("data").forEach { (index, metaItem) ->
if (metaItem is MetaItem.NodeItem) {
emit(Update.Trace(plotId, index?.toInt() ?: 0, metaItem.node))
}
}
}
Expand Down

0 comments on commit 9e524e6

Please sign in to comment.