Skip to content

Commit

Permalink
remove targetEventStreams and make events() return Flow
Browse files Browse the repository at this point in the history
  • Loading branch information
7hong13 committed Jun 19, 2023
1 parent 68602fb commit 8de3704
Showing 1 changed file with 21 additions and 30 deletions.
51 changes: 21 additions & 30 deletions yorkie/src/main/kotlin/dev/yorkie/document/Document.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.filterNot
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.shareIn
import kotlinx.coroutines.withContext

/**
Expand All @@ -44,8 +42,6 @@ public class Document(public val key: Key) {
private val eventStream = MutableSharedFlow<Event>()
public val events = eventStream.asSharedFlow()

private val targetEventStreams = mutableMapOf<String, SharedFlow<Event>>()

@Volatile
private var root: CrdtRoot = CrdtRoot(CrdtObject(InitialTimeTicket, rht = ElementRht()))

Expand Down Expand Up @@ -110,33 +106,28 @@ public class Document(public val key: Key) {
/**
* Subscribes to events on the document with the specific [targetPath].
*/
public fun events(targetPath: String): SharedFlow<Event> {
return targetEventStreams.getOrElse(targetPath) {
events.filterNot { it is Event.Snapshot && targetPath != "&" }
.mapNotNull { event ->
when (event) {
is Event.Snapshot -> event
is Event.RemoteChange -> {
event.changeInfos.filterTargetChangeInfos(targetPath)
.takeIf { it.isNotEmpty() }
?.let {
Event.RemoteChange(it)
}
}

is Event.LocalChange -> {
event.changeInfos.filterTargetChangeInfos(targetPath)
.takeIf { it.isNotEmpty() }
?.let {
Event.LocalChange(it)
}
}
public fun events(targetPath: String): Flow<Event> {
return events.filterNot { it is Event.Snapshot && targetPath != "&" }
.mapNotNull { event ->
when (event) {
is Event.Snapshot -> event
is Event.RemoteChange -> {
event.changeInfos.filterTargetChangeInfos(targetPath)
.takeIf { it.isNotEmpty() }
?.let {
Event.RemoteChange(it)
}
}

is Event.LocalChange -> {
event.changeInfos.filterTargetChangeInfos(targetPath)
.takeIf { it.isNotEmpty() }
?.let {
Event.LocalChange(it)
}
}
}.shareIn(scope, SharingStarted.Eagerly)
.also {
targetEventStreams[targetPath] = it
}
}
}
}

private fun List<Event.ChangeInfo>.filterTargetChangeInfos(targetPath: String) =
Expand Down

0 comments on commit 8de3704

Please sign in to comment.