diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e2f65e2..e9061801 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Change Log +## [Unreleased] - TBD + +### Changed + +- Optimize the implementation of `flowFromSuspend` and `flowFromNonSuspend`, + it is just an internal change, it does not affect the public API and behavior. + ## [0.7.4] - Nov 12, 2023 ### Changed diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/flowFromNonSuspend.kt b/src/commonMain/kotlin/com/hoc081098/flowext/flowFromNonSuspend.kt index d7318995..410009b0 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/flowFromNonSuspend.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/flowFromNonSuspend.kt @@ -26,7 +26,6 @@ package com.hoc081098.flowext import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector -import kotlinx.coroutines.flow.flow /** * Creates a _cold_ flow that produces a single value from the given [function]. @@ -66,4 +65,11 @@ import kotlinx.coroutines.flow.flow * * @see flowFromSuspend */ -public fun flowFromNonSuspend(function: () -> T): Flow = flow { return@flow emit(function()) } +public fun flowFromNonSuspend(function: () -> T): Flow = + FlowFromNonSuspend(function) + +// We don't need to use `AbstractFlow` here because we only emit a single value without a context switch, +// and we guarantee all Flow's constraints: context preservation and exception transparency. +private class FlowFromNonSuspend(private val function: () -> T) : Flow { + override suspend fun collect(collector: FlowCollector) = collector.emit(function()) +} diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/flowFromSuspend.kt b/src/commonMain/kotlin/com/hoc081098/flowext/flowFromSuspend.kt index af4f32c5..efe5f283 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/flowFromSuspend.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/flowFromSuspend.kt @@ -26,7 +26,6 @@ package com.hoc081098.flowext import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector -import kotlinx.coroutines.flow.flow /** * Creates a _cold_ flow that produces a single value from the given [function]. @@ -71,4 +70,10 @@ import kotlinx.coroutines.flow.flow * @see flowFromNonSuspend */ public fun flowFromSuspend(function: suspend () -> T): Flow = - flow { return@flow emit(function()) } + FlowFromSuspend(function) + +// We don't need to use `AbstractFlow` here because we only emit a single value without a context switch, +// and we guarantee all Flow's constraints: context preservation and exception transparency. +private class FlowFromSuspend(private val function: suspend () -> T) : Flow { + override suspend fun collect(collector: FlowCollector) = collector.emit(function()) +} diff --git a/src/commonMain/kotlin/com/hoc081098/flowext/internal/Symbols.kt b/src/commonMain/kotlin/com/hoc081098/flowext/internal/Symbols.kt index e785a8d7..4c17faf8 100644 --- a/src/commonMain/kotlin/com/hoc081098/flowext/internal/Symbols.kt +++ b/src/commonMain/kotlin/com/hoc081098/flowext/internal/Symbols.kt @@ -25,9 +25,11 @@ package com.hoc081098.flowext.internal import com.hoc081098.flowext.utils.Symbol +import kotlin.jvm.JvmField /* * Symbol used to indicate that the flow is complete. * It should never leak to the outside world. */ +@JvmField internal val DONE_VALUE = Symbol("DONE_VALUE") diff --git a/src/commonTest/kotlin/com/hoc081098/flowext/FlowFromNonSuspendTest.kt b/src/commonTest/kotlin/com/hoc081098/flowext/FlowFromNonSuspendTest.kt index 4937bcb0..6fc29360 100644 --- a/src/commonTest/kotlin/com/hoc081098/flowext/FlowFromNonSuspendTest.kt +++ b/src/commonTest/kotlin/com/hoc081098/flowext/FlowFromNonSuspendTest.kt @@ -25,10 +25,16 @@ package com.hoc081098.flowext import com.hoc081098.flowext.utils.BaseTest +import com.hoc081098.flowext.utils.NamedDispatchers import com.hoc081098.flowext.utils.TestException +import com.hoc081098.flowext.utils.assertFailsWith import com.hoc081098.flowext.utils.test import kotlin.test.Test +import kotlin.test.assertEquals +import kotlinx.coroutines.CancellationException import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.take @ExperimentalCoroutinesApi class FlowFromNonSuspendTest : BaseTest() { @@ -49,9 +55,38 @@ class FlowFromNonSuspendTest : BaseTest() { } @Test - fun deferFactoryThrows() = runTest { + fun flowFromNonSuspendFunctionThrows() = runTest { val testException = TestException() flowFromNonSuspend { throw testException }.test(listOf(Event.Error(testException))) } + + @Test + fun flowFromNonSuspendCancellation() = runTest { + fun throwsCancellationException(): Unit = throw CancellationException("Flow was cancelled") + assertFailsWith( + flowFromNonSuspend { + val i = 1 + 2 + throwsCancellationException() + i + 3 + }, + ) + } + + @Test + fun flowFromNonSuspenddTake() = runTest { + flowFromNonSuspend { 100 } + .take(1) + .test(listOf(Event.Value(100), Event.Complete)) + } + + @Test + fun testContextPreservation1() = runTest { + val flow = flowFromNonSuspend { + assertEquals("OK", NamedDispatchers.name()) + 42 + }.flowOn(NamedDispatchers("OK")) + + flow.test(listOf(Event.Value(42), Event.Complete)) + } } diff --git a/src/commonTest/kotlin/com/hoc081098/flowext/FlowFromSuspendTest.kt b/src/commonTest/kotlin/com/hoc081098/flowext/FlowFromSuspendTest.kt index c7d1c4e0..decf96bf 100644 --- a/src/commonTest/kotlin/com/hoc081098/flowext/FlowFromSuspendTest.kt +++ b/src/commonTest/kotlin/com/hoc081098/flowext/FlowFromSuspendTest.kt @@ -25,11 +25,19 @@ package com.hoc081098.flowext import com.hoc081098.flowext.utils.BaseTest +import com.hoc081098.flowext.utils.NamedDispatchers import com.hoc081098.flowext.utils.TestException +import com.hoc081098.flowext.utils.assertFailsWith import com.hoc081098.flowext.utils.test import kotlin.test.Test +import kotlin.test.assertEquals +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.flowOn +import kotlinx.coroutines.flow.take +import kotlinx.coroutines.withContext @ExperimentalCoroutinesApi class FlowFromSuspendTest : BaseTest() { @@ -51,9 +59,64 @@ class FlowFromSuspendTest : BaseTest() { } @Test - fun deferFactoryThrows() = runTest { + fun flowFromSuspendFunctionThrows() = runTest { val testException = TestException() flowFromSuspend { throw testException }.test(listOf(Event.Error(testException))) } + + @Test + fun flowFromSuspendCancellation() = runTest { + fun throwsCancellationException(): Unit = throw CancellationException("Flow was cancelled") + assertFailsWith( + flowFromSuspend { + val i = 1 + 2 + throwsCancellationException() + i + 3 + }, + ) + } + + @Test + fun flowFromSuspendTake() = runTest { + flowFromSuspend { 100 }.take(1) + .test(listOf(Event.Value(100), Event.Complete)) + + flowFromSuspend { + delay(100) + 100 + } + .take(1) + .test(listOf(Event.Value(100), Event.Complete)) + } + + @Test + fun testContextPreservation1() = runTest { + val flow = flowFromSuspend { + assertEquals("OK", NamedDispatchers.name()) + + withContext(Dispatchers.Default) { delay(100) } + + assertEquals("OK", NamedDispatchers.name()) + 42 + }.flowOn(NamedDispatchers("OK")) + + flow.test(listOf(Event.Value(42), Event.Complete)) + } + + @Test + fun testContextPreservation2() = runTest { + val flow = flowFromSuspend { + assertEquals("OK", NamedDispatchers.name()) + + withContext(Dispatchers.Default) { + delay(100) + 42 + }.also { + assertEquals("OK", NamedDispatchers.name()) + } + }.flowOn(NamedDispatchers("OK")) + + flow.test(listOf(Event.Value(42), Event.Complete)) + } }