Skip to content

Commit

Permalink
Lock local changes to assert transactions (#203)
Browse files Browse the repository at this point in the history
  • Loading branch information
skhugh authored Jun 13, 2024
1 parent f0163c7 commit f1a80fd
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 114 deletions.
2 changes: 1 addition & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
minSdk = "24"
compileSdk = "34"
targetSdk = "34"
agp = "8.4.1"
agp = "8.4.2"
connectKotlin = "0.6.1"
okhttp = "4.12.0"
coroutines = "1.8.1"
Expand Down
50 changes: 50 additions & 0 deletions yorkie/src/androidTest/kotlin/dev/yorkie/core/ClientTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -840,4 +840,54 @@ class ClientTest {
collectJobs.forEach(Job::cancel)
}
}

@Test
fun test_duplicated_local_changes_not_sent_to_server() {
withTwoClientsAndDocuments(detachDocuments = false) { c1, c2, d1, d2, _ ->
val d1Events = mutableListOf<Document.Event>()
val d2Events = mutableListOf<Document.Event>()
val collectJobs = listOf(
launch(start = CoroutineStart.UNDISPATCHED) {
d1.events.filter { it is RemoteChange || it is LocalChange }
.collect(d1Events::add)
},
launch(start = CoroutineStart.UNDISPATCHED) {
d2.events.filter { it is RemoteChange || it is LocalChange }
.collect(d2Events::add)
},
)

listOf(
d1.updateAsync { root, _ ->
root.setNewTree(
"t",
element("doc") {
element("p") { text { "12" } }
element("p") { text { "34" } }
},
)
},
c1.syncAsync(),
c1.syncAsync(),
c1.syncAsync(),
c1.detachAsync(d1),
)

withTimeout(GENERAL_TIMEOUT) {
while (d2Events.isEmpty()) {
delay(50)
}
}
assertIs<RemoteChange>(d2Events.first())
assertTreesXmlEquals("<doc><p>12</p><p>34</p></doc>", d1, d2)

delay(500)

assertEquals(1, d2Events.size)

c2.detachAsync(d2).await()

collectJobs.forEach(Job::cancel)
}
}
}
200 changes: 109 additions & 91 deletions yorkie/src/main/kotlin/dev/yorkie/core/Client.kt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ import kotlinx.coroutines.flow.fold
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import okhttp3.OkHttpClient

Expand Down Expand Up @@ -109,6 +111,11 @@ public class Client @VisibleForTesting internal constructor(
"x-shard-key" to listOf("${options.apiKey.orEmpty()}/$value"),
)

private val mutexForDocuments = mutableMapOf<Document.Key, Mutex>()

private val Document.mutex
get() = mutexForDocuments.getOrPut(key) { Mutex() }

public constructor(
host: String,
options: Options = Options(),
Expand Down Expand Up @@ -236,32 +243,35 @@ public class Client @VisibleForTesting internal constructor(
SyncResult(
document,
runCatching {
val request = pushPullChangesRequest {
clientId = requireClientId().value
changePack = document.createChangePack().toPBChangePack()
documentId = documentID
pushOnly = syncMode == SyncMode.RealtimePushOnly
}
val response = service.pushPullChanges(
request,
document.key.documentBasedRequestHeader,
).getOrThrow()
val responsePack = response.changePack.toChangePack()
// NOTE(7hong13, chacha912, hackerwins): If syncLoop already executed with
// PushPull, ignore the response when the syncMode is PushOnly.
val currentSyncMode = attachments.value[document.key]?.syncMode
if (responsePack.hasChanges &&
currentSyncMode == SyncMode.RealtimePushOnly ||
currentSyncMode == SyncMode.RealtimeSyncOff
) {
return@runCatching
}
document.mutex.withLock {
val request = pushPullChangesRequest {
clientId = requireClientId().value
changePack = document.createChangePack().toPBChangePack()
documentId = documentID
pushOnly = syncMode == SyncMode.RealtimePushOnly
}
val response = service.pushPullChanges(
request,
document.key.documentBasedRequestHeader,
).getOrThrow()
val responsePack = response.changePack.toChangePack()
// NOTE(7hong13, chacha912, hackerwins): If syncLoop already executed with
// PushPull, ignore the response when the syncMode is PushOnly.
val currentSyncMode = attachments.value[document.key]?.syncMode
if (responsePack.hasChanges &&
currentSyncMode == SyncMode.RealtimePushOnly ||
currentSyncMode == SyncMode.RealtimeSyncOff
) {
return@runCatching
}

document.applyChangePack(responsePack)
// NOTE(chacha912): If a document has been removed, watchStream should
// be disconnected to not receive an event for that document.
if (document.status == DocumentStatus.Removed) {
attachments.value -= document.key
document.applyChangePack(responsePack)
// NOTE(chacha912): If a document has been removed, watchStream should
// be disconnected to not receive an event for that document.
if (document.status == DocumentStatus.Removed) {
attachments.value -= document.key
mutexForDocuments.remove(document.key)
}
}
}.onFailure {
coroutineContext.ensureActive()
Expand Down Expand Up @@ -478,36 +488,38 @@ public class Client @VisibleForTesting internal constructor(
require(document.status == DocumentStatus.Detached) {
"document is not detached"
}
document.setActor(requireClientId())
document.updateAsync { _, presence ->
presence.put(initialPresence)
}.await()

val request = attachDocumentRequest {
clientId = requireClientId().value
changePack = document.createChangePack().toPBChangePack()
}
val response = service.attachDocument(
request,
document.key.documentBasedRequestHeader,
).getOrElse {
ensureActive()
return@async Result.failure(it)
}
val pack = response.changePack.toChangePack()
document.applyChangePack(pack)
document.mutex.withLock {
document.setActor(requireClientId())
document.updateAsync { _, presence ->
presence.put(initialPresence)
}.await()

if (document.status == DocumentStatus.Removed) {
return@async SUCCESS
}
val request = attachDocumentRequest {
clientId = requireClientId().value
changePack = document.createChangePack().toPBChangePack()
}
val response = service.attachDocument(
request,
document.key.documentBasedRequestHeader,
).getOrElse {
ensureActive()
return@async Result.failure(it)
}
val pack = response.changePack.toChangePack()
document.applyChangePack(pack)

document.status = DocumentStatus.Attached
attachments.value += document.key to Attachment(
document,
response.documentId,
syncMode,
)
waitForInitialization(document.key)
if (document.status == DocumentStatus.Removed) {
return@async SUCCESS
}

document.status = DocumentStatus.Attached
attachments.value += document.key to Attachment(
document,
response.documentId,
syncMode,
)
waitForInitialization(document.key)
}
SUCCESS
}
}
Expand All @@ -525,30 +537,33 @@ public class Client @VisibleForTesting internal constructor(
check(isActive) {
"client is not active"
}
val attachment = attachments.value[document.key]
?: throw IllegalArgumentException("document is not attached")
document.mutex.withLock {
val attachment = attachments.value[document.key]
?: throw IllegalArgumentException("document is not attached")

document.updateAsync { _, presence ->
presence.clear()
}.await()
document.updateAsync { _, presence ->
presence.clear()
}.await()

val request = detachDocumentRequest {
clientId = requireClientId().value
changePack = document.createChangePack().toPBChangePack()
documentId = attachment.documentID
}
val response = service.detachDocument(
request,
document.key.documentBasedRequestHeader,
).getOrElse {
ensureActive()
return@async Result.failure(it)
}
val pack = response.changePack.toChangePack()
document.applyChangePack(pack)
if (document.status != DocumentStatus.Removed) {
document.status = DocumentStatus.Detached
attachments.value -= document.key
val request = detachDocumentRequest {
clientId = requireClientId().value
changePack = document.createChangePack().toPBChangePack()
documentId = attachment.documentID
}
val response = service.detachDocument(
request,
document.key.documentBasedRequestHeader,
).getOrElse {
ensureActive()
return@async Result.failure(it)
}
val pack = response.changePack.toChangePack()
document.applyChangePack(pack)
if (document.status != DocumentStatus.Removed) {
document.status = DocumentStatus.Detached
attachments.value -= document.key
mutexForDocuments.remove(document.key)
}
}
SUCCESS
}
Expand Down Expand Up @@ -586,24 +601,27 @@ public class Client @VisibleForTesting internal constructor(
check(isActive) {
"client is not active"
}
val attachment = attachments.value[document.key]
?: throw IllegalArgumentException("document is not attached")
document.mutex.withLock {
val attachment = attachments.value[document.key]
?: throw IllegalArgumentException("document is not attached")

val request = removeDocumentRequest {
clientId = requireClientId().value
changePack = document.createChangePack(forceRemove = true).toPBChangePack()
documentId = attachment.documentID
}
val response = service.removeDocument(
request,
document.key.documentBasedRequestHeader,
).getOrElse {
ensureActive()
return@async Result.failure(it)
val request = removeDocumentRequest {
clientId = requireClientId().value
changePack = document.createChangePack(forceRemove = true).toPBChangePack()
documentId = attachment.documentID
}
val response = service.removeDocument(
request,
document.key.documentBasedRequestHeader,
).getOrElse {
ensureActive()
return@async Result.failure(it)
}
val pack = response.changePack.toChangePack()
document.applyChangePack(pack)
attachments.value -= document.key
mutexForDocuments.remove(document.key)
}
val pack = response.changePack.toChangePack()
document.applyChangePack(pack)
attachments.value -= document.key
SUCCESS
}
}
Expand Down
38 changes: 16 additions & 22 deletions yorkie/src/main/kotlin/dev/yorkie/document/Document.kt
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ import kotlinx.coroutines.flow.filterNot
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext

/**
Expand Down Expand Up @@ -128,8 +126,6 @@ public class Document(
.takeIf { status == DocumentStatus.Attached }
.orEmpty()

private val changeMutex = Mutex()

/**
* Executes the given [updater] to update this document.
*/
Expand Down Expand Up @@ -259,29 +255,27 @@ public class Document(
* 3. Do Garbage collection.
*/
internal suspend fun applyChangePack(pack: ChangePack): Unit = withContext(dispatcher) {
changeMutex.withLock {
if (pack.hasSnapshot) {
applySnapshot(pack.checkPoint.serverSeq, checkNotNull(pack.snapshot))
} else if (pack.hasChanges) {
applyChanges(pack.changes)
}
if (pack.hasSnapshot) {
applySnapshot(pack.checkPoint.serverSeq, checkNotNull(pack.snapshot))
} else if (pack.hasChanges) {
applyChanges(pack.changes)
}

val iterator = localChanges.iterator()
while (iterator.hasNext()) {
val change = iterator.next()
if (change.id.clientSeq > pack.checkPoint.clientSeq) {
break
}
iterator.remove()
val iterator = localChanges.iterator()
while (iterator.hasNext()) {
val change = iterator.next()
if (change.id.clientSeq > pack.checkPoint.clientSeq) {
break
}
iterator.remove()
}

checkPoint = checkPoint.forward(pack.checkPoint)
checkPoint = checkPoint.forward(pack.checkPoint)

pack.minSyncedTicket?.let(::garbageCollect)
pack.minSyncedTicket?.let(::garbageCollect)

if (pack.isRemoved) {
status = DocumentStatus.Removed
}
if (pack.isRemoved) {
status = DocumentStatus.Removed
}
}

Expand Down

0 comments on commit f1a80fd

Please sign in to comment.