From 67ce2ee90bc6744597c8170c81aaccbb49e599c1 Mon Sep 17 00:00:00 2001 From: jee-hyun-kim Date: Tue, 13 Jun 2023 22:31:22 +0900 Subject: [PATCH 1/8] add OperationInfo classes --- .../kotlin/dev/yorkie/document/Document.kt | 37 +++++------ .../dev/yorkie/document/change/Change.kt | 5 +- .../dev/yorkie/document/crdt/CrdtRoot.kt | 3 +- .../dev/yorkie/document/crdt/CrdtText.kt | 27 ++++---- .../dev/yorkie/document/json/JsonText.kt | 2 +- .../yorkie/document/operation/AddOperation.kt | 11 +++- .../document/operation/EditOperation.kt | 25 ++++++-- .../document/operation/IncreaseOperation.kt | 17 ++++- .../document/operation/MoveOperation.kt | 13 +++- .../yorkie/document/operation/Operation.kt | 2 +- .../document/operation/OperationInfo.kt | 63 +++++++++++++++++++ .../document/operation/RemoveOperation.kt | 14 ++++- .../document/operation/SelectOperation.kt | 14 ++++- .../yorkie/document/operation/SetOperation.kt | 6 +- .../document/operation/StyleOperation.kt | 19 +++++- 15 files changed, 199 insertions(+), 59 deletions(-) create mode 100644 yorkie/src/main/kotlin/dev/yorkie/document/operation/OperationInfo.kt diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt b/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt index f3b1e28d0..4e6580d11 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt @@ -12,12 +12,13 @@ import dev.yorkie.document.crdt.CrdtObject import dev.yorkie.document.crdt.CrdtRoot import dev.yorkie.document.crdt.ElementRht import dev.yorkie.document.json.JsonObject +import dev.yorkie.document.operation.InternalOpInfo +import dev.yorkie.document.operation.OperationInfo import dev.yorkie.document.time.ActorID import dev.yorkie.document.time.TimeTicket import dev.yorkie.document.time.TimeTicket.Companion.InitialTimeTicket import dev.yorkie.util.YorkieLogger import dev.yorkie.util.createSingleThreadDispatcher -import dev.yorkie.util.findPrefixes import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.SupervisorJob @@ -25,7 +26,6 @@ import kotlinx.coroutines.async import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.withContext -import org.apache.commons.collections4.trie.PatriciaTrie /** * A CRDT-based data type. @@ -91,10 +91,11 @@ public class Document(public val key: Key) { return@async true } val change = context.getChange() - change.execute(root) + val internalOpInfos = change.execute(root) localChanges += change changeID = change.id - eventStream.emit(change.asLocal()) + val changeInfos = listOf(change.toChangeInfo(internalOpInfos)) + eventStream.emit(Event.LocalChange(changeInfos)) true } } @@ -147,9 +148,9 @@ public class Document(public val key: Key) { val clone = ensureClone() val changesInfo = changes.map { it.execute(clone) - it.execute(root) + val internalOpInfos = it.execute(root) changeID = changeID.syncLamport(it.id.lamport) - it.toChangeInfo() + it.toChangeInfo(internalOpInfos) } if (changesInfo.isEmpty()) { return @@ -203,20 +204,16 @@ public class Document(public val key: Key) { return root.garbageCollect(ticket) } - private fun Change.createPaths(): List { - val pathTrie = PatriciaTrie() - operations.forEach { operation -> - val createdAt = operation.effectedCreatedAt - val subPaths = root.createSubPaths(createdAt).drop(1) - subPaths.forEach { subPath -> pathTrie[subPath] = subPath } + private fun Change.toChangeInfo(internalOpInfos: List) = + Event.ChangeInfo(message.orEmpty(), internalOpInfos.map { it.toOperationInfo() }) + + private fun InternalOpInfo.toOperationInfo(): OperationInfo { + val path = root.createSubPaths(element).joinToString(".") + return operationInfo.apply { + this.path = path } - return pathTrie.findPrefixes().map { "." + it.joinToString(".") } } - private fun Change.asLocal() = Event.LocalChange(listOf(toChangeInfo())) - - private fun Change.toChangeInfo() = Event.ChangeInfo(this, createPaths()) - public fun toJson(): String { return root.toJson() } @@ -243,11 +240,11 @@ public class Document(public val key: Key) { ) : Event /** - * Represents a pair of [Change] and the JsonPath of the changed element. + * Represents the modification made during a document update and the message passed. */ public class ChangeInfo( - public val change: Change, - @Suppress("unused") public val paths: List, + public val message: String, + public val operations: List, ) } diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/change/Change.kt b/yorkie/src/main/kotlin/dev/yorkie/document/change/Change.kt index 3b3e9817e..90a7cc468 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/change/Change.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/change/Change.kt @@ -1,6 +1,7 @@ package dev.yorkie.document.change import dev.yorkie.document.crdt.CrdtRoot +import dev.yorkie.document.operation.InternalOpInfo import dev.yorkie.document.operation.Operation import dev.yorkie.document.time.ActorID @@ -20,8 +21,8 @@ public data class Change internal constructor( id = id.setActor(actorID) } - internal fun execute(root: CrdtRoot) { - operations.forEach { + internal fun execute(root: CrdtRoot): List { + return operations.flatMap { it.execute(root) } } diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/crdt/CrdtRoot.kt b/yorkie/src/main/kotlin/dev/yorkie/document/crdt/CrdtRoot.kt index ab5aa4b56..bf431a13f 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/crdt/CrdtRoot.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/crdt/CrdtRoot.kt @@ -49,11 +49,10 @@ internal class CrdtRoot(val rootObject: CrdtObject) { while (true) { val parent = pair.parent ?: break val currentCreatedAt = pair.element.createdAt - var subPath = parent.subPathOf(currentCreatedAt) + val subPath = parent.subPathOf(currentCreatedAt) if (subPath == null) { YorkieLogger.e(TAG, "fail to find the given element: $currentCreatedAt") } else { - subPath = subPath.replace(Regex("/[\$.]/g"), "\\$&") subPaths.add(0, subPath) } pair = elementPairMapByCreatedAt[parent.createdAt] ?: break diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/crdt/CrdtText.kt b/yorkie/src/main/kotlin/dev/yorkie/document/crdt/CrdtText.kt index 24a3a83f7..6173cb157 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/crdt/CrdtText.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/crdt/CrdtText.kt @@ -41,7 +41,7 @@ internal data class CrdtText( executedAt: TimeTicket, attributes: Map? = null, latestCreatedAtMapByActor: Map? = null, - ): Map { + ): Pair, List> { val textValue = if (value.isNotEmpty()) { TextValue(value).apply { attributes?.forEach { setAttribute(it.key, it.value, executedAt) } @@ -72,15 +72,12 @@ internal data class CrdtText( } selectPrev(RgaTreeSplitNodeRange(caretPos, caretPos), executedAt)?.let { changes.add(it) } handleChanges(changes) - return latestCreatedAtMap + return latestCreatedAtMap to changes } private fun selectPrev(range: RgaTreeSplitNodeRange, executedAt: TimeTicket): TextChange? { - val prevSelection = selectionMap[executedAt.actorID] ?: run { - selectionMap[executedAt.actorID] = Selection(range.first, range.second, executedAt) - return null - } - return if (prevSelection.executedAt < executedAt) { + val prevSelection = selectionMap[executedAt.actorID] + return if (prevSelection == null || prevSelection.executedAt < executedAt) { selectionMap[executedAt.actorID] = Selection(range.first, range.second, executedAt) val (from, to) = rgaTreeSplit.findIndexesFromRange(range) TextChange(TextChangeType.Selection, executedAt.actorID, from, to) @@ -98,7 +95,7 @@ internal data class CrdtText( range: RgaTreeSplitNodeRange, attributes: Map, executedAt: TimeTicket, - ) { + ): List { // 1. Split nodes with from and to. val toRight = rgaTreeSplit.findNodeWithSplit(range.second, executedAt).second val fromRight = rgaTreeSplit.findNodeWithSplit(range.first, executedAt).second @@ -120,16 +117,20 @@ internal data class CrdtText( } handleChanges(changes) + return changes } /** * Stores that the given [range] has been selected. */ - fun select(range: RgaTreeSplitNodeRange, executedAt: TimeTicket) { - if (remoteChangeLock) return - - val change = selectPrev(range, executedAt) ?: return - handleChanges(listOf(change)) + fun select(range: RgaTreeSplitNodeRange, executedAt: TimeTicket): TextChange? { + return if (remoteChangeLock) { + null + } else { + selectPrev(range, executedAt)?.also { + handleChanges(listOf(it)) + } + } } /** diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/json/JsonText.kt b/yorkie/src/main/kotlin/dev/yorkie/document/json/JsonText.kt index 41dd535c1..712518318 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/json/JsonText.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/json/JsonText.kt @@ -43,7 +43,7 @@ public class JsonText internal constructor( val range = target.createRange(fromIndex, toIndex) val executedAt = context.issueTimeTicket() - val maxCreatedAtMapByActor = target.edit(range, content, executedAt, attributes) + val maxCreatedAtMapByActor = target.edit(range, content, executedAt, attributes).first context.push( EditOperation( fromPos = range.first, diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/AddOperation.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/AddOperation.kt index 000e0478f..b8299a6f8 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/AddOperation.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/AddOperation.kt @@ -25,15 +25,22 @@ internal data class AddOperation( /** * Executes this [AddOperation] on the given [root]. */ - override fun execute(root: CrdtRoot) { + override fun execute(root: CrdtRoot): List { val parentObject = root.findByCreatedAt(parentCreatedAt) - if (parentObject is CrdtArray) { + return if (parentObject is CrdtArray) { val copiedValue = value.deepCopy() parentObject.insertAfter(prevCreatedAt, copiedValue) root.registerElement(copiedValue, parentObject) + listOf( + InternalOpInfo( + parentCreatedAt, + OperationInfo.AddOpInfo(parentObject.subPathOf(effectedCreatedAt).toInt()), + ), + ) } else { parentObject ?: YorkieLogger.e(TAG, "fail to find $parentCreatedAt") YorkieLogger.e(TAG, "fail to execute, only array can execute add") + emptyList() } } diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/EditOperation.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/EditOperation.kt index 84ee2eb22..499e67c54 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/EditOperation.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/EditOperation.kt @@ -4,6 +4,8 @@ import dev.yorkie.document.crdt.CrdtRoot import dev.yorkie.document.crdt.CrdtText import dev.yorkie.document.crdt.RgaTreeSplitNodePos import dev.yorkie.document.crdt.RgaTreeSplitNodeRange +import dev.yorkie.document.crdt.TextChangeType +import dev.yorkie.document.crdt.TextWithAttributes import dev.yorkie.document.time.ActorID import dev.yorkie.document.time.TimeTicket import dev.yorkie.util.YorkieLogger @@ -27,24 +29,39 @@ internal data class EditOperation( override val effectedCreatedAt: TimeTicket get() = parentCreatedAt - override fun execute(root: CrdtRoot) { + override fun execute(root: CrdtRoot): List { val parentObject = root.findByCreatedAt(parentCreatedAt) - if (parentObject is CrdtText) { - parentObject.edit( + return if (parentObject is CrdtText) { + val changes = parentObject.edit( RgaTreeSplitNodeRange(fromPos, toPos), content, executedAt, attributes, maxCreatedAtMapByActor, - ) + ).second if (fromPos != toPos) { root.registerTextWithGarbage(parentObject) } + changes.map { (type, _, from, to, content, attributes) -> + if (type == TextChangeType.Content) { + InternalOpInfo( + parentCreatedAt, + OperationInfo.EditOpInfo( + from, + to, + TextWithAttributes(content.orEmpty() to attributes.orEmpty()), + ), + ) + } else { + InternalOpInfo(parentCreatedAt, OperationInfo.SelectOpInfo(from, to)) + } + } } else { if (parentObject == null) { YorkieLogger.e(TAG, "fail to find $parentCreatedAt") } YorkieLogger.e(TAG, "fail to execute, only Text can execute edit") + emptyList() } } diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/IncreaseOperation.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/IncreaseOperation.kt index 1254e9188..7aca0794f 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/IncreaseOperation.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/IncreaseOperation.kt @@ -3,6 +3,7 @@ package dev.yorkie.document.operation import dev.yorkie.document.crdt.CrdtCounter import dev.yorkie.document.crdt.CrdtElement import dev.yorkie.document.crdt.CrdtPrimitive +import dev.yorkie.document.crdt.CrdtPrimitive.Type import dev.yorkie.document.crdt.CrdtRoot import dev.yorkie.document.time.TimeTicket import dev.yorkie.util.YorkieLogger @@ -26,14 +27,26 @@ internal data class IncreaseOperation( /** * Executes this [IncreaseOperation] on the given [root]. */ - override fun execute(root: CrdtRoot) { + override fun execute(root: CrdtRoot): List { val parentObject = root.findByCreatedAt(parentCreatedAt) - if (parentObject is CrdtCounter) { + return if (parentObject is CrdtCounter) { val copiedValue = value.deepCopy() as CrdtPrimitive parentObject.increase(copiedValue) + val increasedValue = if (copiedValue.type == Type.Integer) { + copiedValue.value as Int + } else { + copiedValue.value as Long + } + listOf( + InternalOpInfo( + effectedCreatedAt, + OperationInfo.IncreaseOpInfo(increasedValue), + ), + ) } else { parentObject ?: YorkieLogger.e(TAG, "fail to find $parentCreatedAt") YorkieLogger.e(TAG, "fail to execute, only Counter can execute increase") + emptyList() } } diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/MoveOperation.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/MoveOperation.kt index 90699d044..954f11f1f 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/MoveOperation.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/MoveOperation.kt @@ -24,13 +24,22 @@ internal data class MoveOperation( /** * Executes this [MoveOperation] on the given [root]. */ - override fun execute(root: CrdtRoot) { + override fun execute(root: CrdtRoot): List { val parentObject = root.findByCreatedAt(parentCreatedAt) - if (parentObject is CrdtArray) { + return if (parentObject is CrdtArray) { + val previousIndex = parentObject.subPathOf(createdAt).toInt() parentObject.moveAfter(prevCreatedAt, createdAt, executedAt) + val index = parentObject.subPathOf(createdAt).toInt() + listOf( + InternalOpInfo( + parentCreatedAt, + OperationInfo.MoveOpInfo(previousIndex = previousIndex, index = index), + ), + ) } else { parentObject ?: YorkieLogger.e(TAG, "fail to find $parentCreatedAt") YorkieLogger.e(TAG, "fail to execute, only array can execute move") + emptyList() } } diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/Operation.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/Operation.kt index 0bc3a4611..76fca08a0 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/Operation.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/Operation.kt @@ -22,7 +22,7 @@ internal abstract class Operation { /** * Executes this [Operation] on the given [root]. */ - abstract fun execute(root: CrdtRoot) + abstract fun execute(root: CrdtRoot): List /** * Sets the given [ActorID] to this [Operation]. diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/OperationInfo.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/OperationInfo.kt new file mode 100644 index 000000000..eb9f4b15c --- /dev/null +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/OperationInfo.kt @@ -0,0 +1,63 @@ +package dev.yorkie.document.operation + +import dev.yorkie.document.crdt.TextWithAttributes +import dev.yorkie.document.time.TimeTicket + +/** + * [OperationInfo] represents the information of an operation. + * It is used to inform to the user what kind of operation was executed. + */ +public sealed class OperationInfo { + + public abstract var path: String + + public data class AddOpInfo(val index: Int, override var path: String = INITIAL_PATH) : + OperationInfo() + + public data class MoveOpInfo( + val previousIndex: Int, + val index: Int, + override var path: String = INITIAL_PATH, + ) : OperationInfo() + + public data class SetOpInfo(val key: String, override var path: String = INITIAL_PATH) : + OperationInfo() + + public data class RemoveOpInfo( + val key: String?, + val index: Int?, + override var path: String = INITIAL_PATH, + ) : OperationInfo() + + public data class IncreaseOpInfo(val value: Number, override var path: String = INITIAL_PATH) : + OperationInfo() + + public data class EditOpInfo( + val from: Int, + val to: Int, + val value: TextWithAttributes, + override var path: String = INITIAL_PATH, + ) : OperationInfo() + + public data class StyleOpInfo( + val from: Int, + val to: Int, + val value: TextWithAttributes, + override var path: String = INITIAL_PATH, + ) : OperationInfo() + + public data class SelectOpInfo( + val from: Int, + val to: Int, + override var path: String = INITIAL_PATH, + ) : OperationInfo() + + companion object { + const val INITIAL_PATH = "initial path" + } +} + +internal data class InternalOpInfo( + val element: TimeTicket, + val operationInfo: OperationInfo, +) diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/RemoveOperation.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/RemoveOperation.kt index 7d237c83e..099595673 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/RemoveOperation.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/RemoveOperation.kt @@ -1,5 +1,6 @@ package dev.yorkie.document.operation +import dev.yorkie.document.crdt.CrdtArray import dev.yorkie.document.crdt.CrdtContainer import dev.yorkie.document.crdt.CrdtRoot import dev.yorkie.document.time.TimeTicket @@ -23,14 +24,23 @@ internal data class RemoveOperation( /** * Executes this [RemoveOperation] on the given [root]. */ - override fun execute(root: CrdtRoot) { + override fun execute(root: CrdtRoot): List { val parentObject = root.findByCreatedAt(parentCreatedAt) - if (parentObject is CrdtContainer) { + return if (parentObject is CrdtContainer) { + val key = parentObject.subPathOf(createdAt) val element = parentObject.remove(createdAt, executedAt) root.registerRemovedElement(element) + val index = if (parentObject is CrdtArray) key?.toInt() else null + listOf( + InternalOpInfo( + effectedCreatedAt, + OperationInfo.RemoveOpInfo(key, index), + ), + ) } else { parentObject ?: YorkieLogger.e(TAG, "fail to find $parentCreatedAt") YorkieLogger.e(TAG, "only object and array can execute remove: $parentObject") + emptyList() } } diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/SelectOperation.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/SelectOperation.kt index 749d91004..7d37c033e 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/SelectOperation.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/SelectOperation.kt @@ -20,13 +20,21 @@ internal data class SelectOperation( /** * Returns the created time of the effected element. */ - override fun execute(root: CrdtRoot) { + override fun execute(root: CrdtRoot): List { val parentObject = root.findByCreatedAt(parentCreatedAt) - if (parentObject is CrdtText) { - parentObject.select(RgaTreeSplitNodeRange(fromPos, toPos), executedAt) + return if (parentObject is CrdtText) { + val change = parentObject.select(RgaTreeSplitNodeRange(fromPos, toPos), executedAt) + ?: return emptyList() + listOf( + InternalOpInfo( + parentCreatedAt, + OperationInfo.SelectOpInfo(from = change.from, to = change.to), + ), + ) } else { parentObject ?: YorkieLogger.e(TAG, "fail to find $parentCreatedAt") YorkieLogger.e(TAG, "fail to execute, only Text, RichText can execute select") + emptyList() } } diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/SetOperation.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/SetOperation.kt index 96e7f38a0..564929ad5 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/SetOperation.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/SetOperation.kt @@ -26,15 +26,17 @@ internal data class SetOperation( /** * Executes this [SetOperation] on the given [root]. */ - override fun execute(root: CrdtRoot) { + override fun execute(root: CrdtRoot): List { val parentObject = root.findByCreatedAt(parentCreatedAt) - if (parentObject is CrdtObject) { + return if (parentObject is CrdtObject) { val copiedValue = value.deepCopy() parentObject[key] = copiedValue root.registerElement(copiedValue, parentObject) + listOf(InternalOpInfo(parentCreatedAt, OperationInfo.SetOpInfo(key))) } else { parentObject ?: YorkieLogger.e(TAG, "fail to find $parentCreatedAt") YorkieLogger.e(TAG, "fail to execute, only object can execute set") + emptyList() } } diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/StyleOperation.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/StyleOperation.kt index 51d672902..b9b609271 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/StyleOperation.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/StyleOperation.kt @@ -4,6 +4,7 @@ import dev.yorkie.document.crdt.CrdtRoot import dev.yorkie.document.crdt.CrdtText import dev.yorkie.document.crdt.RgaTreeSplitNodePos import dev.yorkie.document.crdt.RgaTreeSplitNodeRange +import dev.yorkie.document.crdt.TextWithAttributes import dev.yorkie.document.time.TimeTicket import dev.yorkie.util.YorkieLogger @@ -18,13 +19,25 @@ internal data class StyleOperation( override val effectedCreatedAt: TimeTicket get() = parentCreatedAt - override fun execute(root: CrdtRoot) { + override fun execute(root: CrdtRoot): List { val parentObject = root.findByCreatedAt(parentCreatedAt) - if (parentObject is CrdtText) { - parentObject.style(RgaTreeSplitNodeRange(fromPos, toPos), attributes, executedAt) + return if (parentObject is CrdtText) { + val changes = + parentObject.style(RgaTreeSplitNodeRange(fromPos, toPos), attributes, executedAt) + changes.map { + InternalOpInfo( + parentCreatedAt, + OperationInfo.StyleOpInfo( + it.from, + it.to, + TextWithAttributes(it.content.orEmpty() to it.attributes.orEmpty()), + ), + ) + } } else { parentObject ?: YorkieLogger.e(TAG, "fail to find $parentCreatedAt") YorkieLogger.e(TAG, "fail to execute, only Text can execute style") + emptyList() } } From 7ff2e7b2945b6496748731e20d416843fe47d673 Mon Sep 17 00:00:00 2001 From: jee-hyun-kim Date: Thu, 15 Jun 2023 17:56:13 +0900 Subject: [PATCH 2/8] implement document subscription with the specific target path --- .../kotlin/dev/yorkie/core/ClientTest.kt | 28 +++-- .../kotlin/dev/yorkie/document/Document.kt | 100 +++++++++++++++--- .../dev/yorkie/document/change/Change.kt | 4 +- .../dev/yorkie/document/json/JsonArray.kt | 14 +++ .../dev/yorkie/document/json/JsonElement.kt | 1 - .../dev/yorkie/document/json/JsonPrimitive.kt | 1 - .../yorkie/document/operation/AddOperation.kt | 9 +- .../document/operation/EditOperation.kt | 17 ++- .../document/operation/IncreaseOperation.kt | 9 +- .../document/operation/MoveOperation.kt | 9 +- .../yorkie/document/operation/Operation.kt | 2 +- .../document/operation/OperationInfo.kt | 11 +- .../document/operation/RemoveOperation.kt | 9 +- .../document/operation/SelectOperation.kt | 9 +- .../yorkie/document/operation/SetOperation.kt | 8 +- .../document/operation/StyleOperation.kt | 18 ++-- .../kotlin/dev/yorkie/api/ConverterTest.kt | 5 +- .../dev/yorkie/document/DocumentTest.kt | 31 +++--- 18 files changed, 177 insertions(+), 108 deletions(-) diff --git a/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt b/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt index 9cee22008..6bad425ae 100644 --- a/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt +++ b/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt @@ -10,11 +10,9 @@ import dev.yorkie.document.Document import dev.yorkie.document.Document.Event.LocalChange import dev.yorkie.document.Document.Event.RemoteChange import dev.yorkie.document.change.CheckPoint -import dev.yorkie.document.crdt.CrdtPrimitive import dev.yorkie.document.json.JsonCounter import dev.yorkie.document.json.JsonPrimitive -import dev.yorkie.document.operation.RemoveOperation -import dev.yorkie.document.operation.SetOperation +import dev.yorkie.document.operation.OperationInfo import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.Job import kotlinx.coroutines.delay @@ -108,20 +106,18 @@ class ClientTest { assertIs(syncEvent.result) val localSetEvent = assertIs(document1Events.first()) - val localSetOperation = assertIs( - localSetEvent.changeInfos.first().change.operations.first(), + val localSetOperation = assertIs( + localSetEvent.changeInfos.first().operations.first(), ) assertEquals("k1", localSetOperation.key) - assertEquals("v1", (localSetOperation.value as CrdtPrimitive).value) - assertEquals(".k.1", localSetEvent.changeInfos.first().paths.first()) + assertEquals("$", localSetEvent.changeInfos.first().operations.first().path) document1Events.clear() val remoteSetEvent = assertIs(document2Events.first()) - val remoteSetOperation = assertIs( - remoteSetEvent.changeInfos.first().change.operations.first(), + val remoteSetOperation = assertIs( + remoteSetEvent.changeInfos.first().operations.first(), ) assertEquals("k1", remoteSetOperation.key) - assertEquals("v1", (remoteSetOperation.value as CrdtPrimitive).value) document2Events.clear() val root2 = document2.getRoot() @@ -146,16 +142,16 @@ class ClientTest { assertTrue(root1.keys.isEmpty()) val remoteRemoveEvent = assertIs(document1Events.first()) - val remoteRemoveOperation = assertIs( - remoteRemoveEvent.changeInfos.first().change.operations.first(), + val remoteRemoveOperation = assertIs( + remoteRemoveEvent.changeInfos.first().operations.first(), ) - assertEquals(localSetOperation.effectedCreatedAt, remoteRemoveOperation.createdAt) + assertEquals(localSetOperation.executedAt, remoteRemoveOperation.executedAt) val localRemoveEvent = assertIs(document2Events.first()) - val localRemoveOperation = assertIs( - localRemoveEvent.changeInfos.first().change.operations.first(), + val localRemoveOperation = assertIs( + localRemoveEvent.changeInfos.first().operations.first(), ) - assertEquals(remoteSetOperation.effectedCreatedAt, localRemoveOperation.createdAt) + assertEquals(remoteSetOperation.executedAt, localRemoveOperation.executedAt) assertEquals(1, document1.clone?.getGarbageLength()) assertEquals(1, document2.clone?.getGarbageLength()) diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt b/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt index 4e6580d11..78ba9f599 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt @@ -11,8 +11,8 @@ import dev.yorkie.document.change.CheckPoint import dev.yorkie.document.crdt.CrdtObject import dev.yorkie.document.crdt.CrdtRoot import dev.yorkie.document.crdt.ElementRht +import dev.yorkie.document.json.JsonElement import dev.yorkie.document.json.JsonObject -import dev.yorkie.document.operation.InternalOpInfo import dev.yorkie.document.operation.OperationInfo import dev.yorkie.document.time.ActorID import dev.yorkie.document.time.TimeTicket @@ -24,7 +24,12 @@ import kotlinx.coroutines.Deferred import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.flow.filterNot +import kotlinx.coroutines.flow.mapNotNull +import kotlinx.coroutines.flow.shareIn import kotlinx.coroutines.withContext /** @@ -39,6 +44,8 @@ public class Document(public val key: Key) { private val eventStream = MutableSharedFlow() public val events = eventStream.asSharedFlow() + private val targetEventStreams = mutableMapOf>() + @Volatile private var root: CrdtRoot = CrdtRoot(CrdtObject(InitialTimeTicket, rht = ElementRht())) @@ -91,15 +98,82 @@ public class Document(public val key: Key) { return@async true } val change = context.getChange() - val internalOpInfos = change.execute(root) + val operationInfos = change.execute(root) localChanges += change changeID = change.id - val changeInfos = listOf(change.toChangeInfo(internalOpInfos)) + val changeInfos = listOf(change.toChangeInfo(operationInfos)) eventStream.emit(Event.LocalChange(changeInfos)) true } } + /** + * Subscribes to events on the document with the specific [targetPath]. + */ + public fun events(targetPath: String): SharedFlow { + return targetEventStreams.getOrElse(targetPath) { + events.filterNot { it is Event.Snapshot && targetPath != "&" } + .mapNotNull { event -> + when (event) { + is Event.Snapshot -> event + is Event.RemoteChange -> { + event.changeInfos.filterTargetChangeInfos(targetPath) + .takeIf { it.isNotEmpty() } + ?.let { + Event.RemoteChange(it) + } + } + + is Event.LocalChange -> { + event.changeInfos.filterTargetChangeInfos(targetPath) + .takeIf { it.isNotEmpty() } + ?.let { + Event.LocalChange(it) + } + } + } + }.shareIn(scope, SharingStarted.Eagerly) + .also { + targetEventStreams[targetPath] = it + } + } + } + + private fun List.filterTargetChangeInfos(targetPath: String) = + mapNotNull { (message, operations) -> + val targetOps = operations.filter { isSameElementOrChildOf(it.path, targetPath) } + if (targetOps.isEmpty()) { + null + } else { + Event.ChangeInfo(message, targetOps) + } + } + + private fun isSameElementOrChildOf(element: String, parent: String): Boolean { + return if (parent == element) { + true + } else { + val nodePath = element.split(".") + val targetPath = parent.split(".") + targetPath.withIndex().all { (index, path) -> path == nodePath.getOrNull(index) } + } + } + + /** + * Returns the [JsonElement] corresponding to the [path]. + */ + public suspend fun getValueByPath(path: String): JsonElement? { + require(path.startsWith("$")) { + "the path must start with \"$\"" + } + val paths = path.split(".").drop(1) + var value = getRoot() + paths.dropLast(1).forEach { key -> + value = value[key] as? JsonObject ?: return null + } + return value.getOrNull(paths.last()) + } + /** * Applies the given [pack] into this document. * 1. Remove local changes applied to server. @@ -148,9 +222,9 @@ public class Document(public val key: Key) { val clone = ensureClone() val changesInfo = changes.map { it.execute(clone) - val internalOpInfos = it.execute(root) + val operationInfos = it.execute(root) changeID = changeID.syncLamport(it.id.lamport) - it.toChangeInfo(internalOpInfos) + it.toChangeInfo(operationInfos) } if (changesInfo.isEmpty()) { return @@ -204,21 +278,19 @@ public class Document(public val key: Key) { return root.garbageCollect(ticket) } - private fun Change.toChangeInfo(internalOpInfos: List) = - Event.ChangeInfo(message.orEmpty(), internalOpInfos.map { it.toOperationInfo() }) + private fun Change.toChangeInfo(operationInfos: List) = + Event.ChangeInfo(message.orEmpty(), operationInfos.map { it.updatePath() }) - private fun InternalOpInfo.toOperationInfo(): OperationInfo { - val path = root.createSubPaths(element).joinToString(".") - return operationInfo.apply { - this.path = path - } + private fun OperationInfo.updatePath(): OperationInfo { + val path = root.createSubPaths(executedAt).joinToString(".") + return apply { this.path = path } } public fun toJson(): String { return root.toJson() } - public interface Event { + public sealed interface Event { /** * An event that occurs when a snapshot is received from the server. @@ -242,7 +314,7 @@ public class Document(public val key: Key) { /** * Represents the modification made during a document update and the message passed. */ - public class ChangeInfo( + public data class ChangeInfo( public val message: String, public val operations: List, ) diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/change/Change.kt b/yorkie/src/main/kotlin/dev/yorkie/document/change/Change.kt index 90a7cc468..b23c33e9c 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/change/Change.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/change/Change.kt @@ -1,7 +1,7 @@ package dev.yorkie.document.change import dev.yorkie.document.crdt.CrdtRoot -import dev.yorkie.document.operation.InternalOpInfo +import dev.yorkie.document.operation.OperationInfo import dev.yorkie.document.operation.Operation import dev.yorkie.document.time.ActorID @@ -21,7 +21,7 @@ public data class Change internal constructor( id = id.setActor(actorID) } - internal fun execute(root: CrdtRoot): List { + internal fun execute(root: CrdtRoot): List { return operations.flatMap { it.execute(root) } diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/json/JsonArray.kt b/yorkie/src/main/kotlin/dev/yorkie/document/json/JsonArray.kt index 0caa35a24..ad4403e59 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/json/JsonArray.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/json/JsonArray.kt @@ -7,6 +7,7 @@ import dev.yorkie.document.crdt.CrdtObject import dev.yorkie.document.crdt.CrdtPrimitive import dev.yorkie.document.crdt.ElementRht import dev.yorkie.document.operation.AddOperation +import dev.yorkie.document.operation.MoveOperation import dev.yorkie.document.operation.RemoveOperation import dev.yorkie.document.time.TimeTicket import java.util.Date @@ -122,6 +123,19 @@ public class JsonArray internal constructor( return deleted.toJsonElement(context) } + public fun moveAfter(prevCreatedAt: TimeTicket, createdAt: TimeTicket) { + val executedAt = context.issueTimeTicket() + target.moveAfter(prevCreatedAt, createdAt, executedAt) + context.push( + MoveOperation( + parentCreatedAt = target.createdAt, + prevCreatedAt = prevCreatedAt, + createdAt = createdAt, + executedAt = executedAt, + ), + ) + } + override fun contains(element: JsonElement): Boolean { return target.asSequence().map { it.toJsonElement(context) }.contains(element) } diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/json/JsonElement.kt b/yorkie/src/main/kotlin/dev/yorkie/document/json/JsonElement.kt index 2a3d8f049..35e2cd2ba 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/json/JsonElement.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/json/JsonElement.kt @@ -28,7 +28,6 @@ public abstract class JsonElement { CrdtCounter::class.java to JsonCounter::class.java, ) - @Suppress("UNCHECKED_CAST") internal inline fun CrdtElement.toJsonElement( context: ChangeContext, ): T { diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/json/JsonPrimitive.kt b/yorkie/src/main/kotlin/dev/yorkie/document/json/JsonPrimitive.kt index 4bc1afda5..aaabe6949 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/json/JsonPrimitive.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/json/JsonPrimitive.kt @@ -2,7 +2,6 @@ package dev.yorkie.document.json import com.google.protobuf.ByteString import dev.yorkie.document.crdt.CrdtPrimitive -import dev.yorkie.document.crdt.CrdtPrimitive.Type import java.util.Date /** diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/AddOperation.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/AddOperation.kt index b8299a6f8..cc33a40f8 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/AddOperation.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/AddOperation.kt @@ -25,17 +25,16 @@ internal data class AddOperation( /** * Executes this [AddOperation] on the given [root]. */ - override fun execute(root: CrdtRoot): List { + override fun execute(root: CrdtRoot): List { val parentObject = root.findByCreatedAt(parentCreatedAt) return if (parentObject is CrdtArray) { val copiedValue = value.deepCopy() parentObject.insertAfter(prevCreatedAt, copiedValue) root.registerElement(copiedValue, parentObject) listOf( - InternalOpInfo( - parentCreatedAt, - OperationInfo.AddOpInfo(parentObject.subPathOf(effectedCreatedAt).toInt()), - ), + OperationInfo.AddOpInfo(parentObject.subPathOf(effectedCreatedAt).toInt()).apply { + executedAt = parentCreatedAt + }, ) } else { parentObject ?: YorkieLogger.e(TAG, "fail to find $parentCreatedAt") diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/EditOperation.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/EditOperation.kt index 499e67c54..dfe676e1f 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/EditOperation.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/EditOperation.kt @@ -29,7 +29,7 @@ internal data class EditOperation( override val effectedCreatedAt: TimeTicket get() = parentCreatedAt - override fun execute(root: CrdtRoot): List { + override fun execute(root: CrdtRoot): List { val parentObject = root.findByCreatedAt(parentCreatedAt) return if (parentObject is CrdtText) { val changes = parentObject.edit( @@ -44,16 +44,15 @@ internal data class EditOperation( } changes.map { (type, _, from, to, content, attributes) -> if (type == TextChangeType.Content) { - InternalOpInfo( - parentCreatedAt, - OperationInfo.EditOpInfo( - from, - to, - TextWithAttributes(content.orEmpty() to attributes.orEmpty()), - ), + OperationInfo.EditOpInfo( + from, + to, + TextWithAttributes(content.orEmpty() to attributes.orEmpty()), ) } else { - InternalOpInfo(parentCreatedAt, OperationInfo.SelectOpInfo(from, to)) + OperationInfo.SelectOpInfo(from, to) + }.apply { + executedAt = parentCreatedAt } } } else { diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/IncreaseOperation.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/IncreaseOperation.kt index 7aca0794f..da4e208f1 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/IncreaseOperation.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/IncreaseOperation.kt @@ -27,7 +27,7 @@ internal data class IncreaseOperation( /** * Executes this [IncreaseOperation] on the given [root]. */ - override fun execute(root: CrdtRoot): List { + override fun execute(root: CrdtRoot): List { val parentObject = root.findByCreatedAt(parentCreatedAt) return if (parentObject is CrdtCounter) { val copiedValue = value.deepCopy() as CrdtPrimitive @@ -38,10 +38,9 @@ internal data class IncreaseOperation( copiedValue.value as Long } listOf( - InternalOpInfo( - effectedCreatedAt, - OperationInfo.IncreaseOpInfo(increasedValue), - ), + OperationInfo.IncreaseOpInfo(increasedValue).apply { + executedAt = effectedCreatedAt + }, ) } else { parentObject ?: YorkieLogger.e(TAG, "fail to find $parentCreatedAt") diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/MoveOperation.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/MoveOperation.kt index 954f11f1f..a061f066b 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/MoveOperation.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/MoveOperation.kt @@ -24,17 +24,16 @@ internal data class MoveOperation( /** * Executes this [MoveOperation] on the given [root]. */ - override fun execute(root: CrdtRoot): List { + override fun execute(root: CrdtRoot): List { val parentObject = root.findByCreatedAt(parentCreatedAt) return if (parentObject is CrdtArray) { val previousIndex = parentObject.subPathOf(createdAt).toInt() parentObject.moveAfter(prevCreatedAt, createdAt, executedAt) val index = parentObject.subPathOf(createdAt).toInt() listOf( - InternalOpInfo( - parentCreatedAt, - OperationInfo.MoveOpInfo(previousIndex = previousIndex, index = index), - ), + OperationInfo.MoveOpInfo(previousIndex = previousIndex, index = index).apply { + executedAt = parentCreatedAt + }, ) } else { parentObject ?: YorkieLogger.e(TAG, "fail to find $parentCreatedAt") diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/Operation.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/Operation.kt index 76fca08a0..61f53b297 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/Operation.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/Operation.kt @@ -22,7 +22,7 @@ internal abstract class Operation { /** * Executes this [Operation] on the given [root]. */ - abstract fun execute(root: CrdtRoot): List + abstract fun execute(root: CrdtRoot): List /** * Sets the given [ActorID] to this [Operation]. diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/OperationInfo.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/OperationInfo.kt index eb9f4b15c..34905b0a4 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/OperationInfo.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/OperationInfo.kt @@ -11,6 +11,8 @@ public sealed class OperationInfo { public abstract var path: String + internal var executedAt: TimeTicket = TimeTicket.InitialTimeTicket + public data class AddOpInfo(val index: Int, override var path: String = INITIAL_PATH) : OperationInfo() @@ -42,7 +44,7 @@ public sealed class OperationInfo { public data class StyleOpInfo( val from: Int, val to: Int, - val value: TextWithAttributes, + val attributes: Map, override var path: String = INITIAL_PATH, ) : OperationInfo() @@ -53,11 +55,6 @@ public sealed class OperationInfo { ) : OperationInfo() companion object { - const val INITIAL_PATH = "initial path" + private const val INITIAL_PATH = "initial path" } } - -internal data class InternalOpInfo( - val element: TimeTicket, - val operationInfo: OperationInfo, -) diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/RemoveOperation.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/RemoveOperation.kt index 099595673..e74662678 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/RemoveOperation.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/RemoveOperation.kt @@ -24,7 +24,7 @@ internal data class RemoveOperation( /** * Executes this [RemoveOperation] on the given [root]. */ - override fun execute(root: CrdtRoot): List { + override fun execute(root: CrdtRoot): List { val parentObject = root.findByCreatedAt(parentCreatedAt) return if (parentObject is CrdtContainer) { val key = parentObject.subPathOf(createdAt) @@ -32,10 +32,9 @@ internal data class RemoveOperation( root.registerRemovedElement(element) val index = if (parentObject is CrdtArray) key?.toInt() else null listOf( - InternalOpInfo( - effectedCreatedAt, - OperationInfo.RemoveOpInfo(key, index), - ), + OperationInfo.RemoveOpInfo(key, index).apply { + executedAt = effectedCreatedAt + }, ) } else { parentObject ?: YorkieLogger.e(TAG, "fail to find $parentCreatedAt") diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/SelectOperation.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/SelectOperation.kt index 7d37c033e..8407a7b1d 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/SelectOperation.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/SelectOperation.kt @@ -20,16 +20,15 @@ internal data class SelectOperation( /** * Returns the created time of the effected element. */ - override fun execute(root: CrdtRoot): List { + override fun execute(root: CrdtRoot): List { val parentObject = root.findByCreatedAt(parentCreatedAt) return if (parentObject is CrdtText) { val change = parentObject.select(RgaTreeSplitNodeRange(fromPos, toPos), executedAt) ?: return emptyList() listOf( - InternalOpInfo( - parentCreatedAt, - OperationInfo.SelectOpInfo(from = change.from, to = change.to), - ), + OperationInfo.SelectOpInfo(from = change.from, to = change.to).apply { + executedAt = parentCreatedAt + }, ) } else { parentObject ?: YorkieLogger.e(TAG, "fail to find $parentCreatedAt") diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/SetOperation.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/SetOperation.kt index 564929ad5..f3e289f6b 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/SetOperation.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/SetOperation.kt @@ -26,13 +26,17 @@ internal data class SetOperation( /** * Executes this [SetOperation] on the given [root]. */ - override fun execute(root: CrdtRoot): List { + override fun execute(root: CrdtRoot): List { val parentObject = root.findByCreatedAt(parentCreatedAt) return if (parentObject is CrdtObject) { val copiedValue = value.deepCopy() parentObject[key] = copiedValue root.registerElement(copiedValue, parentObject) - listOf(InternalOpInfo(parentCreatedAt, OperationInfo.SetOpInfo(key))) + listOf( + OperationInfo.SetOpInfo(key).apply { + executedAt = parentCreatedAt + }, + ) } else { parentObject ?: YorkieLogger.e(TAG, "fail to find $parentCreatedAt") YorkieLogger.e(TAG, "fail to execute, only object can execute set") diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/operation/StyleOperation.kt b/yorkie/src/main/kotlin/dev/yorkie/document/operation/StyleOperation.kt index b9b609271..3b510b600 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/operation/StyleOperation.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/operation/StyleOperation.kt @@ -4,7 +4,6 @@ import dev.yorkie.document.crdt.CrdtRoot import dev.yorkie.document.crdt.CrdtText import dev.yorkie.document.crdt.RgaTreeSplitNodePos import dev.yorkie.document.crdt.RgaTreeSplitNodeRange -import dev.yorkie.document.crdt.TextWithAttributes import dev.yorkie.document.time.TimeTicket import dev.yorkie.util.YorkieLogger @@ -19,20 +18,19 @@ internal data class StyleOperation( override val effectedCreatedAt: TimeTicket get() = parentCreatedAt - override fun execute(root: CrdtRoot): List { + override fun execute(root: CrdtRoot): List { val parentObject = root.findByCreatedAt(parentCreatedAt) return if (parentObject is CrdtText) { val changes = parentObject.style(RgaTreeSplitNodeRange(fromPos, toPos), attributes, executedAt) changes.map { - InternalOpInfo( - parentCreatedAt, - OperationInfo.StyleOpInfo( - it.from, - it.to, - TextWithAttributes(it.content.orEmpty() to it.attributes.orEmpty()), - ), - ) + OperationInfo.StyleOpInfo( + it.from, + it.to, + it.attributes.orEmpty(), + ).apply { + executedAt = parentCreatedAt + } } } else { parentObject ?: YorkieLogger.e(TAG, "fail to find $parentCreatedAt") diff --git a/yorkie/src/test/kotlin/dev/yorkie/api/ConverterTest.kt b/yorkie/src/test/kotlin/dev/yorkie/api/ConverterTest.kt index b3d85db2d..dc03440f7 100644 --- a/yorkie/src/test/kotlin/dev/yorkie/api/ConverterTest.kt +++ b/yorkie/src/test/kotlin/dev/yorkie/api/ConverterTest.kt @@ -24,6 +24,7 @@ import dev.yorkie.document.operation.EditOperation import dev.yorkie.document.operation.IncreaseOperation import dev.yorkie.document.operation.MoveOperation import dev.yorkie.document.operation.Operation +import dev.yorkie.document.operation.OperationInfo import dev.yorkie.document.operation.RemoveOperation import dev.yorkie.document.operation.SelectOperation import dev.yorkie.document.operation.SetOperation @@ -371,9 +372,7 @@ class ConverterTest { override var executedAt: TimeTicket, override val effectedCreatedAt: TimeTicket, ) : Operation() { - override fun execute(root: CrdtRoot) { - println("should throw IllegalArgumentException") - } + override fun execute(root: CrdtRoot): List = emptyList() } private class TestCrdtElement( diff --git a/yorkie/src/test/kotlin/dev/yorkie/document/DocumentTest.kt b/yorkie/src/test/kotlin/dev/yorkie/document/DocumentTest.kt index d54317304..d6e0b8c15 100644 --- a/yorkie/src/test/kotlin/dev/yorkie/document/DocumentTest.kt +++ b/yorkie/src/test/kotlin/dev/yorkie/document/DocumentTest.kt @@ -1,11 +1,10 @@ package dev.yorkie.document import dev.yorkie.assertJsonContentEquals -import dev.yorkie.document.crdt.CrdtPrimitive import dev.yorkie.document.json.JsonArray import dev.yorkie.document.json.JsonText -import dev.yorkie.document.operation.RemoveOperation -import dev.yorkie.document.operation.SetOperation +import dev.yorkie.document.operation.OperationInfo.RemoveOpInfo +import dev.yorkie.document.operation.OperationInfo.SetOpInfo import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.launch import kotlinx.coroutines.test.UnconfinedTestDispatcher @@ -142,17 +141,15 @@ class DocumentTest { assertEquals(1, events.size) var event = events.first() assertIs(event) - var change = event.changeInfos.first().change - assertEquals(2, change.operations.size) - assertTrue(change.operations.all { it is SetOperation }) + var operations = event.changeInfos.first().operations + assertEquals(2, operations.size) + assertTrue(operations.all { it is SetOpInfo }) - val firstSet = change.operations.first() as SetOperation + val firstSet = operations.first() as SetOpInfo assertEquals("k1", firstSet.key) - assertEquals(1, (firstSet.value as CrdtPrimitive).value) - val secondSet = change.operations.last() as SetOperation + val secondSet = operations.last() as SetOpInfo assertEquals("k2", secondSet.key) - assertTrue((secondSet.value as CrdtPrimitive).value as Boolean) target.updateAsync { it.remove("k2") @@ -162,15 +159,15 @@ class DocumentTest { assertEquals(2, events.size) event = events.last() assertIs(event) - change = event.changeInfos.first().change - assertEquals(2, change.operations.size) - assertTrue(change.operations.all { it is RemoveOperation }) + operations = event.changeInfos.first().operations + assertEquals(2, operations.size) + assertTrue(operations.all { it is RemoveOpInfo }) - val firstRemove = change.operations.first() as RemoveOperation - assertEquals(secondSet.effectedCreatedAt, firstRemove.createdAt) + val firstRemove = operations.first() as RemoveOpInfo + assertEquals(secondSet.executedAt, firstRemove.executedAt) - val secondRemove = change.operations.last() as RemoveOperation - assertEquals(firstSet.effectedCreatedAt, secondRemove.createdAt) + val secondRemove = operations.last() as RemoveOpInfo + assertEquals(firstSet.executedAt, secondRemove.executedAt) assertTrue(target.hasLocalChanges) From 6380254b13a335c81d898900f2462b255e7e2f0c Mon Sep 17 00:00:00 2001 From: jee-hyun-kim Date: Thu, 15 Jun 2023 17:56:31 +0900 Subject: [PATCH 3/8] add instrumentation tests on document subscription --- .../kotlin/dev/yorkie/core/DocumentTest.kt | 415 ++++++++++++++++++ 1 file changed, 415 insertions(+) diff --git a/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt b/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt index f84bc7634..b1837433f 100644 --- a/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt +++ b/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt @@ -1,9 +1,26 @@ package dev.yorkie.core import androidx.test.ext.junit.runners.AndroidJUnit4 +import dev.yorkie.assertJsonContentEquals import dev.yorkie.document.Document import dev.yorkie.document.Document.DocumentStatus +import dev.yorkie.document.Document.Event +import dev.yorkie.document.Document.Event.ChangeInfo +import dev.yorkie.document.crdt.TextWithAttributes +import dev.yorkie.document.json.JsonArray +import dev.yorkie.document.json.JsonCounter +import dev.yorkie.document.json.JsonObject +import dev.yorkie.document.json.JsonPrimitive +import dev.yorkie.document.json.JsonText +import dev.yorkie.document.operation.OperationInfo +import dev.yorkie.document.operation.OperationInfo.* +import kotlinx.coroutines.CoroutineStart +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.filterIsInstance +import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.withTimeout import org.junit.Test import org.junit.runner.RunWith import java.util.UUID @@ -198,4 +215,402 @@ class DocumentTest { client.deactivateAsync().await() } } + + @Test + fun test_document_event_stream() { + withTwoClientsAndDocuments { _, _, document1, document2, _ -> + val document1Ops = mutableListOf() + val document2Ops = mutableListOf() + val collectJobs = listOf( + launch(start = CoroutineStart.UNDISPATCHED) { + document1.events.filterIsInstance() + .collect { + document1Ops.addAll(it.changeInfos.flatMap(ChangeInfo::operations)) + } + }, + launch(start = CoroutineStart.UNDISPATCHED) { + document2.events.filterIsInstance() + .collect { + document2Ops.addAll(it.changeInfos.flatMap(ChangeInfo::operations)) + } + }, + ) + + document1.updateAsync { + it.setNewCounter("counter", 100) + it.setNewArray("todos").apply { + put("todo1") + put("todo2") + put("todo3") + } + it.setNewText("content").edit( + 0, + 0, + "hello world", + mapOf( + "italic" to "true", + ), + ) + it.setNewObject("obj").apply { + set("name", "josh") + set("age", 14) + setNewArray("food").apply { + put("apple") + put("grape") + } + setNewObject("score").apply { + set("english", 80) + set("math", 90) + } + setNewObject("score").apply { + set("science", 100) + } + remove("food") + } + }.await() + + withTimeout(1_000) { + while (document2Ops.size < 4) { + delay(50) + } + } + + document2.updateAsync { + it.getAs("counter").increase(1) + it.getAs("todos").apply { + put("todo4") + val prevItem = requireNotNull(getAs(1)) + val currItem = requireNotNull(getAs(0)) + moveAfter(prevItem.target.id, currItem.target.id) + } + it.getAs("content").apply { + select(0, 5) + style(0, 5, mapOf("bold" to "true")) + } + }.await() + + withTimeout(1_000) { + while (document1Ops.size < 3) { + delay(50) + } + } + + assertJsonContentEquals(document1.toJson(), document2.toJson()) + val expectedDocument1Ops = listOf( + IncreaseOpInfo(path = "$.counter", value = 1), + AddOpInfo(path = "$.todos", index = 3), + MoveOpInfo(path = "$.todos", index = 1, previousIndex = 0), + SelectOpInfo(path = "$.content", from = 0, to = 5), + StyleOpInfo( + path = "$.content", + from = 0, + to = 5, + attributes = mapOf("bold" to "true"), + ), + ) + val expectedDocument2Ops = listOf( + SetOpInfo(path = "$", key = "counter"), + SetOpInfo(path = "$", key = "todos"), + AddOpInfo(path = "$.todos", index = 0), + AddOpInfo(path = "$.todos", index = 1), + AddOpInfo(path = "$.todos", index = 2), + SetOpInfo(path = "$", key = "content"), + EditOpInfo( + from = 0, + to = 0, + value = TextWithAttributes("hello world" to mapOf("italic" to "true")), + path = "$.content", + ), + SelectOpInfo(path = "$.content", from = 11, to = 11), + SetOpInfo(path = "$", key = "obj"), + SetOpInfo(path = "$.obj", key = "name"), + SetOpInfo(path = "$.obj", key = "age"), + SetOpInfo(path = "$.obj", key = "food"), + AddOpInfo(path = "$.obj.food", index = 0), + AddOpInfo(path = "$.obj.food", index = 1), + SetOpInfo(path = "$.obj", key = "score"), + SetOpInfo(path = "$.obj.score", key = "english"), + SetOpInfo(path = "$.obj.score", key = "math"), + SetOpInfo(path = "$.obj", key = "score"), + SetOpInfo(path = "$.obj.score", key = "science"), + RemoveOpInfo(path = "$.obj", key = "food", index = null), + ) + assertEquals(expectedDocument1Ops, document1Ops) + assertEquals(expectedDocument2Ops, document2Ops) + + collectJobs.forEach(Job::cancel) + } + } + + @Test + fun test_document_event_stream_with_specific_topic() { + withTwoClientsAndDocuments { _, _, document1, document2, _ -> + val document1Ops = mutableListOf() + val document1TodosOps = mutableListOf() + val document1CounterOps = mutableListOf() + val collectJobs = mapOf( + "events" to launch(start = CoroutineStart.UNDISPATCHED) { + document1.events.filterIsInstance() + .collect { + document1Ops.addAll(it.changeInfos.flatMap(ChangeInfo::operations)) + } + }, + "todos" to launch(start = CoroutineStart.UNDISPATCHED) { + document1.events("$.todos").filterIsInstance() + .collect { + document1TodosOps.addAll(it.changeInfos.flatMap(ChangeInfo::operations)) + } + }, + "counter" to launch(start = CoroutineStart.UNDISPATCHED) { + document1.events("$.counter").filterIsInstance() + .collect { + document1CounterOps.addAll(it.changeInfos.flatMap(ChangeInfo::operations)) + } + }, + ) + + document2.updateAsync { + it.setNewCounter("counter", 0) + it.setNewArray("todos").apply { + put("todo1") + put("todo2") + } + }.await() + + withTimeout(1_000) { + // ops: counter, todos, todoOps: todos + while (document1Ops.size < 2 || document1TodosOps.isEmpty()) { + delay(50) + } + } + + assertEquals( + listOf( + SetOpInfo(path = "$", key = "counter"), + SetOpInfo(path = "$", key = "todos"), + AddOpInfo(path = "$.todos", index = 0), + AddOpInfo(path = "$.todos", index = 1), + ), + document1Ops, + ) + assertEquals( + listOf( + AddOpInfo(path = "$.todos", index = 0), + AddOpInfo(path = "$.todos", index = 1), + ), + document1TodosOps, + ) + document1Ops.clear() + document1TodosOps.clear() + + document2.updateAsync { + it.getAs("counter").increase(10) + }.await() + + withTimeout(1_000) { + while (document1Ops.isEmpty() || document1CounterOps.isEmpty()) { + delay(50) + } + } + + assertEquals(IncreaseOpInfo(path = "$.counter", value = 10), document1Ops.first()) + assertEquals( + IncreaseOpInfo(path = "$.counter", value = 10), + document1CounterOps.first(), + ) + document1Ops.clear() + document1CounterOps.clear() + + document2.updateAsync { + it.getAs("todos").put("todo3") + }.await() + + withTimeout(1_000) { + while (document1Ops.isEmpty() || document1TodosOps.isEmpty()) { + delay(50) + } + } + + assertEquals(AddOpInfo(path = "$.todos", index = 2), document1Ops.first()) + assertEquals(AddOpInfo(path = "$.todos", index = 2), document1TodosOps.first()) + document1Ops.clear() + document1TodosOps.clear() + + collectJobs["todos"]?.cancel() + document2.updateAsync { + it.getAs("todos").put("todo4") + }.await() + + withTimeout(1_000) { + while (document1Ops.isEmpty()) { + delay(50) + } + } + + assertEquals(AddOpInfo(path = "$.todos", index = 3), document1Ops.first()) + assert(document1TodosOps.isEmpty()) + document1Ops.clear() + + collectJobs["counter"]?.cancel() + document2.updateAsync { + it.getAs("counter").increase(10) + }.await() + + withTimeout(1_000) { + while (document1Ops.isEmpty()) { + delay(50) + } + } + + assertEquals(IncreaseOpInfo(path = "$.counter", value = 10), document1Ops.first()) + assert(document1CounterOps.isEmpty()) + + collectJobs.values.forEach(Job::cancel) + } + } + + @Test + fun test_document_event_stream_with_nested_topic() { + withTwoClientsAndDocuments { _, _, document1, document2, _ -> + val document1Ops = mutableListOf() + val document1TodosOps = mutableListOf() + val document1ObjOps = mutableListOf() + val collectJobs = mapOf( + "events" to launch(start = CoroutineStart.UNDISPATCHED) { + document1.events.filterIsInstance() + .collect { + document1Ops.addAll(it.changeInfos.flatMap(ChangeInfo::operations)) + } + }, + "todos" to launch(start = CoroutineStart.UNDISPATCHED) { + document1.events("$.todos.0").filterIsInstance() + .collect { + document1TodosOps.addAll(it.changeInfos.flatMap(ChangeInfo::operations)) + } + }, + "obj" to launch(start = CoroutineStart.UNDISPATCHED) { + document1.events("$.obj.c1").filterIsInstance() + .collect { + document1ObjOps.addAll(it.changeInfos.flatMap(ChangeInfo::operations)) + } + }, + ) + + document2.updateAsync { + it.setNewArray("todos").putNewObject().apply { + set("text", "todo1") + set("completed", false) + } + it.setNewObject("obj").setNewObject("c1").apply { + set("name", "josh") + set("age", 14) + } + }.await() + + withTimeout(1_000) { + while (document1Ops.size < 2 + || document1TodosOps.isEmpty() + || document1ObjOps.isEmpty() + ) { + println(document1Ops) + delay(50) + } + } + + assertEquals( + listOf( + SetOpInfo(path = "$", key = "todos"), + AddOpInfo(path = "$.todos", index = 0), + SetOpInfo(path = "$.todos.0", key = "text"), + SetOpInfo(path = "$.todos.0", key = "completed"), + SetOpInfo(path = "$", key = "obj"), + SetOpInfo(path = "$.obj", key = "c1"), + SetOpInfo(path = "$.obj.c1", key = "name"), + SetOpInfo(path = "$.obj.c1", key = "age"), + ), + document1Ops, + ) + assertEquals( + listOf( + SetOpInfo(path = "$.todos.0", key = "text"), + SetOpInfo(path = "$.todos.0", key = "completed"), + ), + document1TodosOps, + ) + assertEquals( + listOf( + SetOpInfo(path = "$.obj.c1", key = "name"), + SetOpInfo(path = "$.obj.c1", key = "age"), + ), + document1ObjOps, + ) + document1Ops.clear() + document1TodosOps.clear() + document1ObjOps.clear() + + document2.updateAsync { + it.getAs("obj").getAs("c1")["name"] = "john" + }.await() + + withTimeout(1_000) { + while (document1Ops.isEmpty() || document1ObjOps.isEmpty()) { + delay(50) + } + } + + assertEquals(SetOpInfo(path = "$.obj.c1", key = "name"), document1Ops.first()) + assertEquals(SetOpInfo(path = "$.obj.c1", key = "name"), document1ObjOps.first()) + document1Ops.clear() + document1ObjOps.clear() + + document2.updateAsync { + it.getAs("todos").getAs(0)?.set("completed", true) + }.await() + + withTimeout(1_000) { + while (document1Ops.isEmpty() || document1TodosOps.isEmpty()) { + delay(50) + } + } + + assertEquals(SetOpInfo(path = "$.todos.0", key = "completed"), document1Ops.first()) + assertEquals( + SetOpInfo(path = "$.todos.0", key = "completed"), + document1TodosOps.first(), + ) + document1Ops.clear() + document1TodosOps.clear() + + collectJobs["todos"]?.cancel() + document2.updateAsync { + it.getAs("todos").getAs(0)?.set("text", "todo_1") + }.await() + + withTimeout(1_000) { + while (document1Ops.isEmpty()) { + delay(50) + } + } + + assertEquals(SetOpInfo(path = "$.todos.0", key = "text"), document1Ops.first()) + assert(document1TodosOps.isEmpty()) + document1Ops.clear() + + collectJobs["obj"]?.cancel() + document2.updateAsync { + it.getAs("obj").getAs("c1")["age"] = 15 + }.await() + + withTimeout(1_000) { + while (document1Ops.isEmpty()) { + delay(50) + } + } + + assertEquals(SetOpInfo(path = "$.obj.c1", key = "age"), document1Ops.first()) + assert(document1ObjOps.isEmpty()) + + collectJobs.values.forEach(Job::cancel) + } + } } From a277130637edfe7a4706535d7285c86c4fb592e7 Mon Sep 17 00:00:00 2001 From: jee-hyun-kim Date: Thu, 15 Jun 2023 17:58:06 +0900 Subject: [PATCH 4/8] format fixed --- .../androidTest/kotlin/dev/yorkie/core/DocumentTest.kt | 10 ++++++---- .../main/kotlin/dev/yorkie/document/change/Change.kt | 2 +- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt b/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt index b1837433f..4289c2a61 100644 --- a/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt +++ b/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt @@ -364,7 +364,9 @@ class DocumentTest { "counter" to launch(start = CoroutineStart.UNDISPATCHED) { document1.events("$.counter").filterIsInstance() .collect { - document1CounterOps.addAll(it.changeInfos.flatMap(ChangeInfo::operations)) + document1CounterOps.addAll( + it.changeInfos.flatMap(ChangeInfo::operations), + ) } }, ) @@ -508,9 +510,9 @@ class DocumentTest { }.await() withTimeout(1_000) { - while (document1Ops.size < 2 - || document1TodosOps.isEmpty() - || document1ObjOps.isEmpty() + while (document1Ops.size < 2 || + document1TodosOps.isEmpty() || + document1ObjOps.isEmpty() ) { println(document1Ops) delay(50) diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/change/Change.kt b/yorkie/src/main/kotlin/dev/yorkie/document/change/Change.kt index b23c33e9c..8b68ae9e5 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/change/Change.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/change/Change.kt @@ -1,8 +1,8 @@ package dev.yorkie.document.change import dev.yorkie.document.crdt.CrdtRoot -import dev.yorkie.document.operation.OperationInfo import dev.yorkie.document.operation.Operation +import dev.yorkie.document.operation.OperationInfo import dev.yorkie.document.time.ActorID /** From c88a30a1116996c1e5e585194a659a30f1411ac4 Mon Sep 17 00:00:00 2001 From: jee-hyun-kim Date: Thu, 15 Jun 2023 18:01:17 +0900 Subject: [PATCH 5/8] remove wildcard --- .../androidTest/kotlin/dev/yorkie/core/DocumentTest.kt | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt b/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt index 4289c2a61..1b1bb0a4d 100644 --- a/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt +++ b/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt @@ -13,7 +13,14 @@ import dev.yorkie.document.json.JsonObject import dev.yorkie.document.json.JsonPrimitive import dev.yorkie.document.json.JsonText import dev.yorkie.document.operation.OperationInfo -import dev.yorkie.document.operation.OperationInfo.* +import dev.yorkie.document.operation.OperationInfo.AddOpInfo +import dev.yorkie.document.operation.OperationInfo.EditOpInfo +import dev.yorkie.document.operation.OperationInfo.IncreaseOpInfo +import dev.yorkie.document.operation.OperationInfo.MoveOpInfo +import dev.yorkie.document.operation.OperationInfo.RemoveOpInfo +import dev.yorkie.document.operation.OperationInfo.SelectOpInfo +import dev.yorkie.document.operation.OperationInfo.SetOpInfo +import dev.yorkie.document.operation.OperationInfo.StyleOpInfo import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.Job import kotlinx.coroutines.delay From fb3217752e466dc57a15ca6cf110e5489f95c688 Mon Sep 17 00:00:00 2001 From: Jeehyun Kim Date: Mon, 19 Jun 2023 10:31:10 +0900 Subject: [PATCH 6/8] Fix PresenceInfo inconsistency and test failures (#106) * fix realTimeAttachments() * adjust timeout limit --- .../kotlin/dev/yorkie/core/ClientTest.kt | 24 ++++++++++++------- .../kotlin/dev/yorkie/core/DocumentTest.kt | 24 +++++++++---------- .../src/main/kotlin/dev/yorkie/core/Client.kt | 9 +++---- 3 files changed, 32 insertions(+), 25 deletions(-) diff --git a/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt b/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt index 6bad425ae..a21823120 100644 --- a/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt +++ b/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt @@ -17,6 +17,7 @@ import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.Job import kotlinx.coroutines.delay import kotlinx.coroutines.flow.dropWhile +import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.filterNot import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.mapNotNull @@ -93,7 +94,7 @@ class ClientTest { it["k1"] = "v1" }.await() - withTimeout(1_000L) { + withTimeout(2_000) { while (client2Events.none { it is DocumentSynced }) { delay(50) } @@ -247,7 +248,7 @@ class ClientTest { it["version"] = "v2" }.await() client1.syncAsync().await() - withTimeout(1_000) { + withTimeout(2_000) { while (client2Events.size < 2) { delay(50) } @@ -353,7 +354,7 @@ class ClientTest { val document1Events = mutableListOf() val document2Events = mutableListOf() - val document3Events = mutableListOf() + val document3Ops = mutableListOf() val collectJobs = listOf( launch(start = CoroutineStart.UNDISPATCHED) { document1.events.collect(document1Events::add) @@ -362,7 +363,9 @@ class ClientTest { document2.events.collect(document2Events::add) }, launch(start = CoroutineStart.UNDISPATCHED) { - document3.events.collect(document3Events::add) + document3.events.filterIsInstance().collect { event -> + document3Ops.addAll(event.changeInfos.flatMap { it.operations }) + } }, ) @@ -373,9 +376,12 @@ class ClientTest { document2.updateAsync { it["c2"] = 0 }.await() - withTimeout(1_000L) { + withTimeout(1_000) { // size should be 2 since it has local-change and remote-change - while (document1Events.size < 2 || document2Events.size < 2) { + while (document1Events.size < 2 || + document2Events.size < 2 || + document3Ops.size < 2 + ) { delay(50) } } @@ -393,10 +399,10 @@ class ClientTest { document2.updateAsync { it["c2"] = 1 }.await() - withTimeout(1_000L) { + withTimeout(1_000) { while (document1Events.size < 3 || document2Events.size < 3 || - document3Events.size < 2 + document3Ops.size < 4 ) { delay(50) } @@ -408,7 +414,7 @@ class ClientTest { // 04. c1 and c2 sync with push-pull mode. client1.resumeRemoteChanges(document1) client2.resumeRemoteChanges(document2) - withTimeout(1_000L) { + withTimeout(2_000) { while (document1Events.size < 4 || document2Events.size < 4) { delay(50) } diff --git a/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt b/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt index 1b1bb0a4d..a18a2ac2e 100644 --- a/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt +++ b/yorkie/src/androidTest/kotlin/dev/yorkie/core/DocumentTest.kt @@ -276,7 +276,7 @@ class DocumentTest { } }.await() - withTimeout(1_000) { + withTimeout(2_000) { while (document2Ops.size < 4) { delay(50) } @@ -296,7 +296,7 @@ class DocumentTest { } }.await() - withTimeout(1_000) { + withTimeout(2_000) { while (document1Ops.size < 3) { delay(50) } @@ -386,7 +386,7 @@ class DocumentTest { } }.await() - withTimeout(1_000) { + withTimeout(2_000) { // ops: counter, todos, todoOps: todos while (document1Ops.size < 2 || document1TodosOps.isEmpty()) { delay(50) @@ -416,7 +416,7 @@ class DocumentTest { it.getAs("counter").increase(10) }.await() - withTimeout(1_000) { + withTimeout(2_000) { while (document1Ops.isEmpty() || document1CounterOps.isEmpty()) { delay(50) } @@ -434,7 +434,7 @@ class DocumentTest { it.getAs("todos").put("todo3") }.await() - withTimeout(1_000) { + withTimeout(2_000) { while (document1Ops.isEmpty() || document1TodosOps.isEmpty()) { delay(50) } @@ -450,7 +450,7 @@ class DocumentTest { it.getAs("todos").put("todo4") }.await() - withTimeout(1_000) { + withTimeout(2_000) { while (document1Ops.isEmpty()) { delay(50) } @@ -465,7 +465,7 @@ class DocumentTest { it.getAs("counter").increase(10) }.await() - withTimeout(1_000) { + withTimeout(2_000) { while (document1Ops.isEmpty()) { delay(50) } @@ -516,7 +516,7 @@ class DocumentTest { } }.await() - withTimeout(1_000) { + withTimeout(2_000) { while (document1Ops.size < 2 || document1TodosOps.isEmpty() || document1ObjOps.isEmpty() @@ -561,7 +561,7 @@ class DocumentTest { it.getAs("obj").getAs("c1")["name"] = "john" }.await() - withTimeout(1_000) { + withTimeout(2_000) { while (document1Ops.isEmpty() || document1ObjOps.isEmpty()) { delay(50) } @@ -576,7 +576,7 @@ class DocumentTest { it.getAs("todos").getAs(0)?.set("completed", true) }.await() - withTimeout(1_000) { + withTimeout(2_000) { while (document1Ops.isEmpty() || document1TodosOps.isEmpty()) { delay(50) } @@ -595,7 +595,7 @@ class DocumentTest { it.getAs("todos").getAs(0)?.set("text", "todo_1") }.await() - withTimeout(1_000) { + withTimeout(2_000) { while (document1Ops.isEmpty()) { delay(50) } @@ -610,7 +610,7 @@ class DocumentTest { it.getAs("obj").getAs("c1")["age"] = 15 }.await() - withTimeout(1_000) { + withTimeout(2_000) { while (document1Ops.isEmpty()) { delay(50) } diff --git a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt index ac46a62d1..2f6c77922 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/core/Client.kt @@ -391,10 +391,10 @@ public class Client @VisibleForTesting internal constructor( return@async false } - fun realTimeAttachments() = attachments.value.filter { it.value.isRealTimeSync } + fun realTimeAttachments() = attachments.value.filter { it.value.isRealTimeSync }.keys realTimeAttachments().forEach { - waitForInitialization(it.key) + waitForInitialization(it) } presenceInfo = presenceInfo.copy( @@ -403,12 +403,12 @@ public class Client @VisibleForTesting internal constructor( ) realTimeAttachments().takeUnless { it.isEmpty() } - ?.forEach { (key, attachment) -> + ?.forEach { key -> try { service.updatePresence( updatePresenceRequest { client = toPBClient() - documentId = attachment.documentID + documentId = attachments.value[key]?.documentID ?: return@forEach }, documentBasedRequestHeader(key), ) @@ -416,6 +416,7 @@ public class Client @VisibleForTesting internal constructor( YorkieLogger.e("Client.updatePresence", e.stackTraceToString()) return@async false } + val attachment = attachments.value[key] ?: return@forEach val newPeers = attachment.peerPresences + (requireClientId() to presenceInfo) attachments.value += key to attachment.copy(peerPresences = newPeers) } ?: return@async true From 68602fbeebbfb6082bc786d9f10f9dfe91dfcf42 Mon Sep 17 00:00:00 2001 From: jee-hyun-kim Date: Mon, 19 Jun 2023 11:32:51 +0900 Subject: [PATCH 7/8] adjust timeout limit --- yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt b/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt index a21823120..9562eeeae 100644 --- a/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt +++ b/yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt @@ -376,7 +376,7 @@ class ClientTest { document2.updateAsync { it["c2"] = 0 }.await() - withTimeout(1_000) { + withTimeout(2_000) { // size should be 2 since it has local-change and remote-change while (document1Events.size < 2 || document2Events.size < 2 || @@ -399,7 +399,7 @@ class ClientTest { document2.updateAsync { it["c2"] = 1 }.await() - withTimeout(1_000) { + withTimeout(2_000) { while (document1Events.size < 3 || document2Events.size < 3 || document3Ops.size < 4 From 8de3704a831d669b1763bca7a4d8e5fadbbd605d Mon Sep 17 00:00:00 2001 From: jee-hyun-kim Date: Mon, 19 Jun 2023 14:39:42 +0900 Subject: [PATCH 8/8] remove targetEventStreams and make events() return Flow --- .../kotlin/dev/yorkie/document/Document.kt | 51 ++++++++----------- 1 file changed, 21 insertions(+), 30 deletions(-) diff --git a/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt b/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt index 78ba9f599..30dbddade 100644 --- a/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt +++ b/yorkie/src/main/kotlin/dev/yorkie/document/Document.kt @@ -23,13 +23,11 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async +import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow -import kotlinx.coroutines.flow.SharedFlow -import kotlinx.coroutines.flow.SharingStarted import kotlinx.coroutines.flow.asSharedFlow import kotlinx.coroutines.flow.filterNot import kotlinx.coroutines.flow.mapNotNull -import kotlinx.coroutines.flow.shareIn import kotlinx.coroutines.withContext /** @@ -44,8 +42,6 @@ public class Document(public val key: Key) { private val eventStream = MutableSharedFlow() public val events = eventStream.asSharedFlow() - private val targetEventStreams = mutableMapOf>() - @Volatile private var root: CrdtRoot = CrdtRoot(CrdtObject(InitialTimeTicket, rht = ElementRht())) @@ -110,33 +106,28 @@ public class Document(public val key: Key) { /** * Subscribes to events on the document with the specific [targetPath]. */ - public fun events(targetPath: String): SharedFlow { - return targetEventStreams.getOrElse(targetPath) { - events.filterNot { it is Event.Snapshot && targetPath != "&" } - .mapNotNull { event -> - when (event) { - is Event.Snapshot -> event - is Event.RemoteChange -> { - event.changeInfos.filterTargetChangeInfos(targetPath) - .takeIf { it.isNotEmpty() } - ?.let { - Event.RemoteChange(it) - } - } - - is Event.LocalChange -> { - event.changeInfos.filterTargetChangeInfos(targetPath) - .takeIf { it.isNotEmpty() } - ?.let { - Event.LocalChange(it) - } - } + public fun events(targetPath: String): Flow { + return events.filterNot { it is Event.Snapshot && targetPath != "&" } + .mapNotNull { event -> + when (event) { + is Event.Snapshot -> event + is Event.RemoteChange -> { + event.changeInfos.filterTargetChangeInfos(targetPath) + .takeIf { it.isNotEmpty() } + ?.let { + Event.RemoteChange(it) + } + } + + is Event.LocalChange -> { + event.changeInfos.filterTargetChangeInfos(targetPath) + .takeIf { it.isNotEmpty() } + ?.let { + Event.LocalChange(it) + } } - }.shareIn(scope, SharingStarted.Eagerly) - .also { - targetEventStreams[targetPath] = it } - } + } } private fun List.filterTargetChangeInfos(targetPath: String) =