From 067bd41df2f3fb8caf5b22d1612a0f97dbe6cab1 Mon Sep 17 00:00:00 2001 From: Amr Yousef <5002609+amrfarid140@users.noreply.github.com> Date: Fri, 4 Oct 2024 19:51:05 +0100 Subject: [PATCH] Avoid deadlock in RealMutableStore (#658) * Add test case Signed-off-by: Amr Yousef * Always Release storeLock Signed-off-by: Amr Yousef * Update kermit to 2.0.4 (#655) Fixes #653 and #654 Signed-off-by: Scott Olcott Signed-off-by: Amr Yousef * Revert "Update kermit to 2.0.4 (#655)" This reverts commit 76f34d48973f8b810bf247219a933d9ff0d686a7. Signed-off-by: Amr Yousef --------- Signed-off-by: Amr Yousef Signed-off-by: Scott Olcott Co-authored-by: Scott Olcott --- .../store/store5/impl/RealMutableStore.kt | 11 ++- .../store5/StoreWithInMemoryCacheTests.kt | 92 +++++++++++++++++++ 2 files changed, 99 insertions(+), 4 deletions(-) diff --git a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt index f5dbb54f9..84fd02948 100644 --- a/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt +++ b/store/src/commonMain/kotlin/org/mobilenativefoundation/store/store5/impl/RealMutableStore.kt @@ -203,10 +203,13 @@ internal class RealMutableStore Output, ): Output { storeLock.lock() - val threadSafety = requireNotNull(keyToThreadSafety[key]) - val output = threadSafety.block() - storeLock.unlock() - return output + try { + val threadSafety = requireNotNull(keyToThreadSafety[key]) + val output = threadSafety.block() + return output + } finally { + storeLock.unlock() + } } private suspend fun conflictsMightExist(key: Key): Boolean { diff --git a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/StoreWithInMemoryCacheTests.kt b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/StoreWithInMemoryCacheTests.kt index b50ec9703..ba3a03289 100644 --- a/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/StoreWithInMemoryCacheTests.kt +++ b/store/src/commonTest/kotlin/org/mobilenativefoundation/store/store5/StoreWithInMemoryCacheTests.kt @@ -1,12 +1,21 @@ package org.mobilenativefoundation.store.store5 +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.Job +import kotlinx.coroutines.async +import kotlinx.coroutines.awaitAll +import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.* import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.runTest import org.mobilenativefoundation.store.store5.impl.extensions.get import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.test.assertIs +import kotlin.test.assertNotNull import kotlin.time.Duration.Companion.hours @FlowPreview @@ -39,4 +48,87 @@ class StoreWithInMemoryCacheTests { assertEquals("result", c) assertEquals("result", d) } + + @Test + fun storeDeadlock() = + testScope.runTest { + repeat(1000) { + val store = + StoreBuilder + .from( + fetcher = Fetcher.of { key: Int -> "fetcher_${key}" }, + sourceOfTruth = SourceOfTruth.Companion.of( + reader = { key -> + flow { + emit("source_of_truth_${key}") + } + }, + writer = { key: Int, local: String -> + + } + ) + ) + .disableCache() + .toMutableStoreBuilder( + converter = object : Converter { + override fun fromNetworkToLocal(network: String): String { + return network + } + + override fun fromOutputToLocal(output: String): String { + return output + } + }, + ) + .build( + updater = object : Updater { + var callCount = -1 + override suspend fun post(key: Int, value: String): UpdaterResult { + callCount += 1 + if (callCount % 2 == 0) { + throw IllegalArgumentException(key.toString() + "value:$value") + } else { + return UpdaterResult.Success.Untyped("") + } + } + + override val onCompletion: OnUpdaterCompletion? + get() = null + + } + ) + + val jobs = mutableListOf() + jobs.add( + store.stream(StoreReadRequest.cached(1, refresh = true)) + .mapNotNull { it.dataOrNull() } + .launchIn(CoroutineScope(Dispatchers.Default)) + ) + val job1 = store.stream(StoreReadRequest.cached(0, refresh = true)) + .mapNotNull { it.dataOrNull() } + .launchIn(CoroutineScope(Dispatchers.Default)) + jobs.add( + store.stream(StoreReadRequest.cached(2, refresh = true)) + .mapNotNull { it.dataOrNull() } + .launchIn(CoroutineScope(Dispatchers.Default))) + jobs.add( + store.stream(StoreReadRequest.cached(3, refresh = true)) + .mapNotNull { it.dataOrNull() } + .launchIn(CoroutineScope(Dispatchers.Default))) + job1.cancel() + assertEquals( + expected = "source_of_truth_0", + actual = store.stream(StoreReadRequest.cached(0, refresh = true)) + .mapNotNull { it.dataOrNull() }.first() + ) + jobs.forEach { + it.cancel() + assertEquals( + expected = "source_of_truth_0", + actual = store.stream(StoreReadRequest.cached(0, refresh = true)) + .mapNotNull { it.dataOrNull() }.first() + ) + } + } + } }