From c81b742b2f3bb242588b534da64da9b78907b4e8 Mon Sep 17 00:00:00 2001 From: Artyom Shendrik Date: Tue, 1 Nov 2022 18:10:45 +0400 Subject: [PATCH] Leak-free transfer via channels, `Closeable` resources supported as State and SideEffects (strictly not recommended!) * Previous state will be properly closed on change * Side effects closed when not delivered For leak-free transfer details see "Undelivered elements" section in [Channel](https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-channel/) documentation. Also see: https://github.com/Kotlin/kotlinx.coroutines/issues/1936 Signed-off-by: Artyom Shendrik --- README.md | 4 + fluxo-core/api/android/fluxo-core.api | 17 +++- fluxo-core/api/jvm/fluxo-core.api | 17 +++- .../kotlin/kt/fluxo/core/InputStrategy.kt | 12 ++- .../kt/fluxo/core/intercept/FluxoEvent.kt | 33 ++++++- .../kt/fluxo/core/internal/FluxoStore.kt | 99 ++++++++++++++----- .../fluxo/core/strategy/LifoInputStrategy.kt | 5 +- 7 files changed, 151 insertions(+), 36 deletions(-) diff --git a/README.md b/README.md index 9ce4bdc6..9b49d797 100644 --- a/README.md +++ b/README.md @@ -21,6 +21,10 @@ * Full errors handling and behavior control. * Bootstrap for eager or lazy initialization. * Side jobs for long-running tasks. +* Leak-free transfer ([1](https://github.com/Kotlin/kotlinx.coroutines/issues/1936), delivery guarantees. +* Strictly not recommended, but `Closeable` resources supported as State and SideEffects + * Previous state will be properly closed on change + * Side effects closed when not delivered * Well tested. * Reactive streams compatibility through [coroutine wrappers](https://github.com/Kotlin/kotlinx.coroutines/tree/master/reactive): diff --git a/fluxo-core/api/android/fluxo-core.api b/fluxo-core/api/android/fluxo-core.api index 34ba9db1..4c4bb49b 100644 --- a/fluxo-core/api/android/fluxo-core.api +++ b/fluxo-core/api/android/fluxo-core.api @@ -70,7 +70,8 @@ public final class kt/fluxo/core/FluxoSettings { public abstract class kt/fluxo/core/InputStrategy { public static final field InBox Lkt/fluxo/core/InputStrategy$InBox; public fun ()V - public fun createQueue ()Lkotlinx/coroutines/channels/Channel; + public fun createQueue (Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/channels/Channel; + public static synthetic fun createQueue$default (Lkt/fluxo/core/InputStrategy;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel; public fun getParallelProcessing ()Z public fun getRollbackOnCancellation ()Z public abstract fun processRequests (Lkotlin/jvm/functions/Function2;Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; @@ -219,9 +220,10 @@ public final class kt/fluxo/core/intercept/FluxoEvent$IntentRejected : kt/fluxo/ public fun toString ()Ljava/lang/String; } -public final class kt/fluxo/core/intercept/FluxoEvent$SideEffectDropped : kt/fluxo/core/intercept/FluxoEvent { - public fun (Lkt/fluxo/core/Store;Ljava/lang/Object;)V - public final fun getSideEffect ()Ljava/lang/Object; +public final class kt/fluxo/core/intercept/FluxoEvent$IntentUndelivered : kt/fluxo/core/intercept/FluxoEvent { + public fun (Lkt/fluxo/core/Store;Ljava/lang/Object;Z)V + public final fun getIntent ()Ljava/lang/Object; + public final fun getResent ()Z public fun toString ()Ljava/lang/String; } @@ -231,6 +233,13 @@ public final class kt/fluxo/core/intercept/FluxoEvent$SideEffectEmitted : kt/flu public fun toString ()Ljava/lang/String; } +public final class kt/fluxo/core/intercept/FluxoEvent$SideEffectUndelivered : kt/fluxo/core/intercept/FluxoEvent { + public fun (Lkt/fluxo/core/Store;Ljava/lang/Object;Z)V + public final fun getResent ()Z + public final fun getSideEffect ()Ljava/lang/Object; + public fun toString ()Ljava/lang/String; +} + public final class kt/fluxo/core/intercept/FluxoEvent$SideJobCancelled : kt/fluxo/core/intercept/FluxoEvent { public fun (Lkt/fluxo/core/Store;Ljava/lang/String;Lkt/fluxo/core/dsl/SideJobScope$RestartState;)V public final fun getKey ()Ljava/lang/String; diff --git a/fluxo-core/api/jvm/fluxo-core.api b/fluxo-core/api/jvm/fluxo-core.api index 302478d5..4963d74d 100644 --- a/fluxo-core/api/jvm/fluxo-core.api +++ b/fluxo-core/api/jvm/fluxo-core.api @@ -70,7 +70,8 @@ public final class kt/fluxo/core/FluxoSettings { public abstract class kt/fluxo/core/InputStrategy { public static final field InBox Lkt/fluxo/core/InputStrategy$InBox; public fun ()V - public fun createQueue ()Lkotlinx/coroutines/channels/Channel; + public fun createQueue (Lkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/channels/Channel; + public static synthetic fun createQueue$default (Lkt/fluxo/core/InputStrategy;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/channels/Channel; public fun getParallelProcessing ()Z public fun getRollbackOnCancellation ()Z public abstract fun processRequests (Lkotlin/jvm/functions/Function2;Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object; @@ -225,6 +226,13 @@ public final class kt/fluxo/core/intercept/FluxoEvent$IntentRejected : kt/fluxo/ public fun toString ()Ljava/lang/String; } +public final class kt/fluxo/core/intercept/FluxoEvent$IntentUndelivered : kt/fluxo/core/intercept/FluxoEvent { + public fun (Lkt/fluxo/core/Store;Ljava/lang/Object;Z)V + public final fun getIntent ()Ljava/lang/Object; + public final fun getResent ()Z + public fun toString ()Ljava/lang/String; +} + public final class kt/fluxo/core/intercept/FluxoEvent$SideEffectDropped : kt/fluxo/core/intercept/FluxoEvent { public fun (Lkt/fluxo/core/Store;Ljava/lang/Object;)V public final fun getSideEffect ()Ljava/lang/Object; @@ -237,6 +245,13 @@ public final class kt/fluxo/core/intercept/FluxoEvent$SideEffectEmitted : kt/flu public fun toString ()Ljava/lang/String; } +public final class kt/fluxo/core/intercept/FluxoEvent$SideEffectUndelivered : kt/fluxo/core/intercept/FluxoEvent { + public fun (Lkt/fluxo/core/Store;Ljava/lang/Object;Z)V + public final fun getResent ()Z + public final fun getSideEffect ()Ljava/lang/Object; + public fun toString ()Ljava/lang/String; +} + public final class kt/fluxo/core/intercept/FluxoEvent$SideJobCancelled : kt/fluxo/core/intercept/FluxoEvent { public fun (Lkt/fluxo/core/Store;Ljava/lang/String;Lkt/fluxo/core/dsl/SideJobScope$RestartState;)V public final fun getKey ()Ljava/lang/String; diff --git a/fluxo-core/src/commonMain/kotlin/kt/fluxo/core/InputStrategy.kt b/fluxo-core/src/commonMain/kotlin/kt/fluxo/core/InputStrategy.kt index 573846ee..c915d96f 100644 --- a/fluxo-core/src/commonMain/kotlin/kt/fluxo/core/InputStrategy.kt +++ b/fluxo-core/src/commonMain/kotlin/kt/fluxo/core/InputStrategy.kt @@ -11,8 +11,16 @@ import kotlin.jvm.JvmName public abstract class InputStrategy { - public open fun createQueue(): Channel { - return Channel(capacity = Channel.UNLIMITED, onBufferOverflow = BufferOverflow.SUSPEND) + /** + * + * @param onUndeliveredElement See "Undelivered elements" section in [Channel] documentation for details. + */ + public open fun createQueue(onUndeliveredElement: ((Request) -> Unit)? = null): Channel { + return Channel( + capacity = Channel.UNLIMITED, + onBufferOverflow = BufferOverflow.SUSPEND, + onUndeliveredElement = onUndeliveredElement, + ) } public open val parallelProcessing: Boolean get() = false diff --git a/fluxo-core/src/commonMain/kotlin/kt/fluxo/core/intercept/FluxoEvent.kt b/fluxo-core/src/commonMain/kotlin/kt/fluxo/core/intercept/FluxoEvent.kt index 8d0bfb61..2e22ae6f 100644 --- a/fluxo-core/src/commonMain/kotlin/kt/fluxo/core/intercept/FluxoEvent.kt +++ b/fluxo-core/src/commonMain/kotlin/kt/fluxo/core/intercept/FluxoEvent.kt @@ -2,6 +2,7 @@ package kt.fluxo.core.intercept +import kotlinx.coroutines.channels.Channel import kt.fluxo.core.Store import kt.fluxo.core.dsl.SideJobScope.RestartState import kt.fluxo.core.dsl.SideJobScope.RestartState.Restarted @@ -77,6 +78,20 @@ public sealed class FluxoEvent( override fun toString(): String = "Intent error: $store, $intent (${e.message ?: e})" } + /** + * When object transferred via [Channel] from one coroutine to another + * it can be lost if either send or receive operation cancelled in transit. + * This event signals about such case for an [intent]. + * + * See "Undelivered elements" section in [Channel] documentation for details. + * Also see [GitHub issue](https://github.com/Kotlin/kotlinx.coroutines/issues/1936). + * + * @param resent `true` if [intent] successfully resent to the [Channel] and can be delivered later + */ + class IntentUndelivered(store: Store, val intent: I, val resent: Boolean) : FluxoEvent(store) { + override fun toString(): String = "Intent undelivered: $store, $intent" + } + // endregion // region SideEffect @@ -85,8 +100,22 @@ public sealed class FluxoEvent( override fun toString(): String = "SideEffect emitted: $store, $sideEffect" } - class SideEffectDropped(store: Store, val sideEffect: SE) : FluxoEvent(store) { - override fun toString(): String = "SideEffect dropped: $store, $sideEffect" + /** + * When object transferred via [Channel] from one coroutine to another + * it can be lost if either send or receive operation cancelled in transit. + * This event signals about such case for a [sideEffect]. + * + * See "Undelivered elements" section in [Channel] documentation for details. + * Also see [GitHub issue](https://github.com/Kotlin/kotlinx.coroutines/issues/1936). + * + * @param resent `true` if [sideEffect] successfully resent to the [Channel] and can be delivered later + */ + class SideEffectUndelivered( + store: Store, + val sideEffect: SE, + val resent: Boolean, + ) : FluxoEvent(store) { + override fun toString(): String = "SideEffect undelivered: $store, $sideEffect" } // endregion diff --git a/fluxo-core/src/commonMain/kotlin/kt/fluxo/core/internal/FluxoStore.kt b/fluxo-core/src/commonMain/kotlin/kt/fluxo/core/internal/FluxoStore.kt index 85251d2e..95a2b2af 100644 --- a/fluxo-core/src/commonMain/kotlin/kt/fluxo/core/internal/FluxoStore.kt +++ b/fluxo-core/src/commonMain/kotlin/kt/fluxo/core/internal/FluxoStore.kt @@ -6,6 +6,7 @@ import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineExceptionHandler import kotlinx.coroutines.CoroutineName import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.CoroutineStart import kotlinx.coroutines.Deferred import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.InternalCoroutinesApi @@ -63,7 +64,7 @@ internal class FluxoStore( private val intentContext: CoroutineContext private val sideJobScope: CoroutineScope private val interceptorScope: CoroutineScope - private val requestsChannel: Channel> = conf.inputStrategy.createQueue() + private val requestsChannel: Channel> private val sideJobsMap = ConcurrentHashMap>() @@ -96,11 +97,7 @@ internal class FluxoStore( val parentExceptionHandler = conf.exceptionHandler ?: ctx[CoroutineExceptionHandler] val exceptionHandler = CoroutineExceptionHandler { context, e -> events.tryEmit(FluxoEvent.UnhandledError(this, e)) - if (parentExceptionHandler != null) { - parentExceptionHandler.handleException(context, e) - } else { - throw e - } + parentExceptionHandler?.handleException(context, e) ?: throw e } scope = CoroutineScope(ctx + exceptionHandler + job) intentContext = scope.coroutineContext + conf.intentContext + when { @@ -119,9 +116,44 @@ internal class FluxoStore( } } + // Leak-free transfer via channel + // https://github.com/Kotlin/kotlinx.coroutines/issues/1936 + // — send operation cancelled before it had a chance to actually send the element. + // — receive operation retrieved the element from the channel but cancelled when trying to return it the caller. + // — channel cancelled, in which case onUndeliveredElement called on every remaining element in the channel's buffer. + // See "Undelivered elements" section in Channel documentation for details. + requestsChannel = conf.inputStrategy.createQueue { + scope.launch(Dispatchers.Unconfined, CoroutineStart.UNDISPATCHED) { + when (it) { + is StoreRequest.HandleIntent -> { + @Suppress("UNINITIALIZED_VARIABLE") + val resent = if (requestsChannel.trySend(it).isSuccess) true else { + it.intent.closeSafely() + false + } + if (isActive) { + events.emit(FluxoEvent.IntentUndelivered(this@FluxoStore, it.intent, resent = resent)) + } + } + + is StoreRequest.RestoreState -> { + if (isActive) { + updateState(it.state) + } + } + } + } + } sideEffectChannel = Channel(conf.sideEffectBufferSize, BufferOverflow.SUSPEND) { - scope.launch(Dispatchers.Unconfined) { - events.emit(FluxoEvent.SideEffectDropped(this@FluxoStore, it)) + scope.launch(Dispatchers.Unconfined, CoroutineStart.UNDISPATCHED) { + @Suppress("UNINITIALIZED_VARIABLE") + val resent = if (sideEffectChannel.trySend(it).isSuccess) true else { + it.closeSafely() + false + } + if (isActive) { + events.emit(FluxoEvent.SideEffectUndelivered(this@FluxoStore, it, resent = resent)) + } } } sideEffectFlow = sideEffectChannel.receiveAsFlow().let { @@ -179,7 +211,7 @@ internal class FluxoStore( override fun send(intent: Intent) { start() val i = if (DEBUG || conf.debugChecks) debugIntentWrapper(intent) else intent - scope.launch(Dispatchers.Unconfined) { + scope.launch(Dispatchers.Unconfined, start = CoroutineStart.UNDISPATCHED) { @Suppress("DeferredResultUnused") sendAsync(i) } } @@ -200,6 +232,7 @@ internal class FluxoStore( if (prevValue != nextValue) { // event fired only if state changed events.emit(FluxoEvent.StateChanged(this, nextValue)) + prevValue.closeSafely() } return nextValue } @@ -211,6 +244,28 @@ internal class FluxoStore( events.emit(FluxoEvent.SideEffectEmitted(this@FluxoStore, sideEffect)) } + private fun handleException(e: Throwable, context: CoroutineContext) { + if (!conf.closeOnExceptions) { + val handler = scope.coroutineContext[CoroutineExceptionHandler] + if (handler != null) { + handler.handleException(context, e) + return + } + } + throw e + } + + private suspend fun Any?.closeSafely() { + if (this is Closeable) { + try { + close() // Close if Closeable resource + } catch (e: Throwable) { + handleException(e, currentCoroutineContext()) + events.emit(FluxoEvent.UnhandledError(this@FluxoStore, e)) + } + } + } + /** Will be called only once for each [FluxoStore] */ private fun launch() = scope.launch { // observe and process intents @@ -256,8 +311,8 @@ internal class FluxoStore( try { interceptorScope.start(eventsFlow) } catch (e: Throwable) { + handleException(e, currentCoroutineContext()) events.emit(FluxoEvent.UnhandledError(this@FluxoStore, e)) - conf.exceptionHandler?.handleException(currentCoroutineContext(), e) } } } @@ -307,12 +362,8 @@ internal class FluxoStore( } events.emit(FluxoEvent.IntentCancelled(this, intent)) } catch (e: Throwable) { + handleException(e, currentCoroutineContext()) events.emit(FluxoEvent.IntentError(this, intent, e)) - if (!conf.closeOnExceptions) { - conf.exceptionHandler?.handleException(currentCoroutineContext(), e) - } else { - throw e - } } finally { deferred?.complete(Unit) } @@ -371,12 +422,8 @@ internal class FluxoStore( } catch (_: CancellationException) { events.emit(FluxoEvent.SideJobCancelled(this@FluxoStore, key, restartState)) } catch (e: Throwable) { + handleException(e, currentCoroutineContext()) events.emit(FluxoEvent.SideJobError(this@FluxoStore, key, restartState, e)) - if (!conf.closeOnExceptions) { - conf.exceptionHandler?.handleException(currentCoroutineContext(), e) - } else { - throw e - } } } ) @@ -404,12 +451,8 @@ internal class FluxoStore( } catch (_: CancellationException) { events.emit(FluxoEvent.BootstrapperCancelled(this@FluxoStore, bootstrapper)) } catch (e: Throwable) { + handleException(e, currentCoroutineContext()) events.emit(FluxoEvent.BootstrapperError(this@FluxoStore, bootstrapper, e)) - if (!conf.closeOnExceptions) { - conf.exceptionHandler?.handleException(currentCoroutineContext(), e) - } else { - throw e - } } } @@ -428,6 +471,8 @@ internal class FluxoStore( sideEffectChannel.close(ce) + // Interceptor scope shouldn't be closed immediately with scope, when interceptors set. + // It allows to process final events in interceptors. val interceptorScope = interceptorScope if (interceptorScope !== scope && interceptorScope.isActive) { val cancellationCause = cancellationCause @@ -436,6 +481,10 @@ internal class FluxoStore( interceptorScope.cancel(cancellationCause) } } + + interceptorScope.launch(Dispatchers.Unconfined, CoroutineStart.UNDISPATCHED) { + mutableState.value.closeSafely() + } } // endregion diff --git a/fluxo-core/src/commonMain/kotlin/kt/fluxo/core/strategy/LifoInputStrategy.kt b/fluxo-core/src/commonMain/kotlin/kt/fluxo/core/strategy/LifoInputStrategy.kt index 37852b21..44a2a521 100644 --- a/fluxo-core/src/commonMain/kotlin/kt/fluxo/core/strategy/LifoInputStrategy.kt +++ b/fluxo-core/src/commonMain/kotlin/kt/fluxo/core/strategy/LifoInputStrategy.kt @@ -1,5 +1,6 @@ package kt.fluxo.core.strategy +import kotlinx.coroutines.channels.BufferOverflow import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collectLatest @@ -13,8 +14,8 @@ import kt.fluxo.core.dsl.InputStrategyScope */ internal object LifoInputStrategy : InputStrategy() { - override fun createQueue(): Channel { - return Channel(capacity = Channel.CONFLATED) + override fun createQueue(onUndeliveredElement: ((Request) -> Unit)?): Channel { + return Channel(capacity = Channel.CONFLATED, onBufferOverflow = BufferOverflow.SUSPEND, onUndeliveredElement = onUndeliveredElement) } override suspend fun (InputStrategyScope).processRequests(queue: Flow) {