From 8de3704a831d669b1763bca7a4d8e5fadbbd605d Mon Sep 17 00:00:00 2001 From: jee-hyun-kim Date: Mon, 19 Jun 2023 14:39:42 +0900 Subject: [PATCH] remove targetEventStreams and make events() return Flow --- .../kotlin/dev/yorkie/document/Document.kt | 51 ++++++++----------- 1 file changed, 21 insertions(+), 30 deletions(-) diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt b/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt index 78ba9f599..30dbddade 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt @@ -23,13 +23,11 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.SharedFlow -import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.flow.filterNot import kotlinx.coroutines.flow.mapNotNull -import kotlinx.coroutines.flow.shareIn import kotlinx.coroutines.withContext /** @@ -44,8 +42,6 @@ public class Document(public val key: Key) { private val eventStream = MutableSharedFlow() public val events = eventStream.asSharedFlow() - private val targetEventStreams = mutableMapOf>() - @Volatile private var root: CrdtRoot = CrdtRoot(CrdtObject(InitialTimeTicket, rht = ElementRht())) @@ -110,33 +106,28 @@ public class Document(public val key: Key) { /** * Subscribes to events on the document with the specific [targetPath]. */ - public fun events(targetPath: String): SharedFlow { - return targetEventStreams.getOrElse(targetPath) { - events.filterNot { it is Event.Snapshot && targetPath != "&" } - .mapNotNull { event -> - when (event) { - is Event.Snapshot -> event - is Event.RemoteChange -> { - event.changeInfos.filterTargetChangeInfos(targetPath) - .takeIf { it.isNotEmpty() } - ?.let { - Event.RemoteChange(it) - } - } - - is Event.LocalChange -> { - event.changeInfos.filterTargetChangeInfos(targetPath) - .takeIf { it.isNotEmpty() } - ?.let { - Event.LocalChange(it) - } - } + public fun events(targetPath: String): Flow { + return events.filterNot { it is Event.Snapshot && targetPath != "&" } + .mapNotNull { event -> + when (event) { + is Event.Snapshot -> event + is Event.RemoteChange -> { + event.changeInfos.filterTargetChangeInfos(targetPath) + .takeIf { it.isNotEmpty() } + ?.let { + Event.RemoteChange(it) + } + } + + is Event.LocalChange -> { + event.changeInfos.filterTargetChangeInfos(targetPath) + .takeIf { it.isNotEmpty() } + ?.let { + Event.LocalChange(it) + } } - }.shareIn(scope, SharingStarted.Eagerly) - .also { - targetEventStreams[targetPath] = it } - } + } } private fun List.filterTargetChangeInfos(targetPath: String) =