diff --git a/reduks-async/src/main/kotlin/com/beyondeye/reduks/experimental/activity/AsyncReduksActivity.kt b/reduks-async/src/main/kotlin/com/beyondeye/reduks/experimental/activity/AsyncReduksActivity.kt index 0976d00..75c192e 100644 --- a/reduks-async/src/main/kotlin/com/beyondeye/reduks/experimental/activity/AsyncReduksActivity.kt +++ b/reduks-async/src/main/kotlin/com/beyondeye/reduks/experimental/activity/AsyncReduksActivity.kt @@ -42,7 +42,7 @@ abstract class AsyncReduksActivity( reduks.store.errorLogFn=defaultReduksInternalLogger } - override fun storeCreator(): StoreCreator = AsyncStore.Creator(cscope=activity_cscope,reduceContext = Dispatchers.Default,subscribeContext = Dispatchers.Main) + override fun storeCreator(): StoreCreator = AsyncStore.Creator(cscope=activity_cscope,reduceDispatcher = Dispatchers.Default,subscribeDispatcher = Dispatchers.Main) //override for making this function visible to inheritors override fun onStop() { diff --git a/reduks-core-async/src/main/kotlin/com/beyondeye/reduks/experimental/AsyncStore.kt b/reduks-core-async/src/main/kotlin/com/beyondeye/reduks/experimental/AsyncStore.kt index 0622413..e120810 100644 --- a/reduks-core-async/src/main/kotlin/com/beyondeye/reduks/experimental/AsyncStore.kt +++ b/reduks-core-async/src/main/kotlin/com/beyondeye/reduks/experimental/AsyncStore.kt @@ -11,24 +11,24 @@ import kotlin.coroutines.CoroutineContext /** * Store that use kotlin coroutine channels for notifying asynchronously to store subscribers about * state changes. - * By default the subscribeContext (the coroutine context used by subscribers when notified of store changes + * By default the subscribeDispatcher (the coroutine context used by subscribers when notified of store changes * is the Android UI thread, because usually subscribers need to update views according to state changes * More in general you can use any single thread context, for example: - * val subscribeContext=newSingleThreadContext("SubscribeThread") + * val subscribeDispatcher=newSingleThreadContext("SubscribeThread") */ // class AsyncStore(initialState: S, private var reducer: Reducer, val cscope: CoroutineScope, - subscribeContext: CoroutineContext, - val reduceContext: CoroutineContext =Dispatchers.Default + subscribeDispatcher: CoroutineDispatcher, + val reduceDispatcher: CoroutineDispatcher =Dispatchers.Default ) : Store { class Creator( val cscope: CoroutineScope, - val subscribeContext: CoroutineContext, - val reduceContext: CoroutineContext =Dispatchers.Default, + val subscribeDispatcher: CoroutineDispatcher, + val reduceDispatcher: CoroutineDispatcher =Dispatchers.Default, val withStandardMiddleware:Boolean=true) : StoreCreator { override fun create(reducer: Reducer, initialState: S): Store { - val res = AsyncStore(initialState, reducer,cscope,subscribeContext,reduceContext) + val res = AsyncStore(initialState, reducer,cscope,subscribeDispatcher,reduceDispatcher) return if (!withStandardMiddleware) res else @@ -39,7 +39,7 @@ class AsyncStore(initialState: S, private var reducer: Reducer, override var errorLogFn: ((String) -> Unit)?=null private var deferredState: Deferred = cscope.async { initialState } override val state:S - get() = runBlocking(reduceContext) { + get() = runBlocking(reduceDispatcher) { deferredState.await() } private var subscribers= listOf>() @@ -56,7 +56,7 @@ class AsyncStore(initialState: S, private var reducer: Reducer, //get a reference of current deferred so that we make sure that all reduce action are actually executed in the correct order val curDeferredState = deferredState //update deferredState with result of async job of running the reducer - deferredState = cscope.async(reduceContext) { + deferredState = cscope.async(reduceDispatcher) { val startState = curDeferredState.await() val newState = try { reducer.reduce(startState, action) //return newState @@ -69,11 +69,11 @@ class AsyncStore(initialState: S, private var reducer: Reducer, //after creating the new deferredState, handle notification of subscribers once this new //deferredState is resolved val nextDeferredState=deferredState - cscope.launch(reduceContext) { + cscope.launch(reduceDispatcher) { nextDeferredState.await() //NOTE THAT IF THE ObserveContext is a single thread(the ui thread) // then subscribers will be notified sequentially of state changes in the correct order - withContext(subscribeContext) { + withContext(subscribeDispatcher) { notifySubscribers() } } diff --git a/reduks-core-async/src/main/kotlin/com/beyondeye/reduks/experimental/middlewares/AsyncActionMiddleWare.kt b/reduks-core-async/src/main/kotlin/com/beyondeye/reduks/experimental/middlewares/AsyncActionMiddleWare.kt index c96a86a..f48b3cb 100644 --- a/reduks-core-async/src/main/kotlin/com/beyondeye/reduks/experimental/middlewares/AsyncActionMiddleWare.kt +++ b/reduks-core-async/src/main/kotlin/com/beyondeye/reduks/experimental/middlewares/AsyncActionMiddleWare.kt @@ -1,8 +1,10 @@ package com.beyondeye.reduks.experimental.middlewares import com.beyondeye.reduks.* -import kotlinx.coroutines.experimental.Deferred -import kotlinx.coroutines.experimental.async +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Deferred +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.async sealed class AsyncAction(val payloadTypename:String): Action { @@ -29,10 +31,11 @@ sealed class AsyncAction(val payloadTypename:String): Action { * it seems redundant to store both type name and define this class as template class * unfortunately this is required because of type erasure in java/kotlin generics */ - class Started(payloadTypename:String, val promise: Deferred): AsyncAction(payloadTypename) { - constructor(type:String,body: () -> PayloadType):this(type, async { body() }) + class Started(payloadTypename:String, val promise: Deferred): + AsyncAction(payloadTypename) { + constructor(type:String, cscope: CoroutineScope,body: () -> PayloadType):this(type, cscope.async { body() }) suspend fun asCompleted() = Completed(payloadTypename, promise.await()) - suspend fun asFailed() = Failed(payloadTypename, promise.getCompletionExceptionOrNull()!!) + fun asFailed() = Failed(payloadTypename, promise.getCompletionExceptionOrNull()!!) /** * block until we get back the result from the promise */ @@ -57,8 +60,8 @@ sealed class AsyncAction(val payloadTypename:String): Action { inline fun start( promise: Deferred): AsyncAction { return Started(PayloadType::class.java.canonicalName, promise) } - inline fun start(noinline body: () -> PayloadType): AsyncAction { - return Started(PayloadType::class.java.canonicalName, body) + inline fun start(cscope: CoroutineScope,noinline body: () -> PayloadType): AsyncAction { + return Started(PayloadType::class.java.canonicalName,cscope, body) } } } @@ -75,11 +78,11 @@ sealed class AsyncAction(val payloadTypename:String): Action { * * Created by daely on 5/17/2016. */ -class AsyncActionMiddleWare : Middleware { +class AsyncActionMiddleWare(val action_cscope: CoroutineScope=GlobalScope) : Middleware { override fun dispatch(store: Store, nextDispatcher: (Any)->Any, action: Any):Any { if(action is AsyncAction.Started<*>) { //queue some async actions when the promise resolves - async { + action_cscope.async { try { action.promise.await() store.dispatch(action.asCompleted()) diff --git a/reduks-core-async/src/test/kotlin/com/beyondeye/reduks/middlewares/CoroutinesAsyncActionMiddlewareTest.kt b/reduks-core-async/src/test/kotlin/com/beyondeye/reduks/middlewares/CoroutinesAsyncActionMiddlewareTest.kt index 761a5db..08efb30 100644 --- a/reduks-core-async/src/test/kotlin/com/beyondeye/reduks/middlewares/CoroutinesAsyncActionMiddlewareTest.kt +++ b/reduks-core-async/src/test/kotlin/com/beyondeye/reduks/middlewares/CoroutinesAsyncActionMiddlewareTest.kt @@ -5,7 +5,8 @@ import com.beyondeye.reduks.ReducerFn import com.beyondeye.reduks.StoreSubscriberFn import com.beyondeye.reduks.experimental.middlewares.AsyncAction import com.beyondeye.reduks.experimental.middlewares.AsyncActionMiddleWare -import kotlinx.coroutines.experimental.newSingleThreadContext +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.newSingleThreadContext import org.assertj.core.api.Assertions import org.junit.Test @@ -58,7 +59,7 @@ class CoroutinesAsyncActionMiddlewareTest { } @Test fun test_an_async_action_for_a_very_difficult_and_computation_heavy_operation() { - val store = AsyncStore(TestState(), reducer,subscribeContext = newSingleThreadContext("SubscribeThread")) //don't use android ui thread: otherwise exception if not running on android + val store = AsyncStore(TestState(), reducer,GlobalScope,subscribeDispatcher = newSingleThreadContext("SubscribeThread")) //don't use android ui thread: otherwise exception if not running on android store.applyMiddleware(AsyncActionMiddleWare()) //subscribe before dispatch!! @@ -75,14 +76,14 @@ class CoroutinesAsyncActionMiddlewareTest { } } }) //on state change - val asyncAction = AsyncAction.start { 2 + 2 } + val asyncAction = AsyncAction.start(GlobalScope) { 2 + 2 } store.dispatch(asyncAction) Thread.sleep(100) //wait for async action to be dispatched TODO: use thunk instead!! store.dispatch(EndAction()) } @Test fun test_two_async_actions_with_different_payload_type() { - val store = AsyncStore(TestState(), reducer,subscribeContext = newSingleThreadContext("SubscribeThread")) //false: otherwise exception if not running on android + val store = AsyncStore(TestState(), reducer,GlobalScope,subscribeDispatcher = newSingleThreadContext("SubscribeThread")) //false: otherwise exception if not running on android store.applyMiddleware(AsyncActionMiddleWare()) //subscribe before dispatch!! @@ -103,9 +104,9 @@ class CoroutinesAsyncActionMiddlewareTest { } } }) //on state change - val asyncAction = AsyncAction.start { 2 + 2 } + val asyncAction = AsyncAction.start(GlobalScope) { 2 + 2 } store.dispatch(asyncAction) - val asyncAction2 = AsyncAction.start { "2 + 2" } + val asyncAction2 = AsyncAction.start(GlobalScope) { "2 + 2" } store.dispatch(asyncAction2) Thread.sleep(100) ////need to wait, because otherwise the end action will be dispatched before we complete the two async actions store.dispatch(EndAction()) @@ -114,7 +115,7 @@ class CoroutinesAsyncActionMiddlewareTest { @Test fun test_an_async_action_for_a_very_difficult_and_computation_heavy_operation_that_fails() { - val store = AsyncStore(TestState(), reducer,subscribeContext = newSingleThreadContext("SubscribeThread")) //custom subscribeContext not UI: otherwise exception if not running on android + val store = AsyncStore(TestState(), reducer,GlobalScope,subscribeDispatcher = newSingleThreadContext("SubscribeThread")) //custom subscribeDispatcher not UI: otherwise exception if not running on android store.applyMiddleware(AsyncActionMiddleWare()) //subscribe before dispatch! @@ -133,7 +134,7 @@ class CoroutinesAsyncActionMiddlewareTest { } } ) - val asyncAction = AsyncAction.start { + val asyncAction = AsyncAction.start(GlobalScope) { throw Exception(actionDifficultError) } store.dispatch(asyncAction) @@ -142,7 +143,7 @@ class CoroutinesAsyncActionMiddlewareTest { @Test fun test_that_normal_actions_pass_through_the_middleware() { - val store = AsyncStore(TestState(), reducer,subscribeContext = newSingleThreadContext("SubscribeThread")) //custom subscribeContext not UI: otherwise exception if not running on android + val store = AsyncStore(TestState(), reducer,GlobalScope,subscribeDispatcher = newSingleThreadContext("SubscribeThread")) //custom subscribeDispatcher not UI: otherwise exception if not running on android store.applyMiddleware(AsyncActionMiddleWare()) diff --git a/reduks-saga/src/main/kotlin/com/beyondeye/reduks/experimental/middlewares/saga/SagaProcessor.kt b/reduks-saga/src/main/kotlin/com/beyondeye/reduks/experimental/middlewares/saga/SagaCmdProcessor.kt similarity index 89% rename from reduks-saga/src/main/kotlin/com/beyondeye/reduks/experimental/middlewares/saga/SagaProcessor.kt rename to reduks-saga/src/main/kotlin/com/beyondeye/reduks/experimental/middlewares/saga/SagaCmdProcessor.kt index 2b872cf..4631efc 100644 --- a/reduks-saga/src/main/kotlin/com/beyondeye/reduks/experimental/middlewares/saga/SagaProcessor.kt +++ b/reduks-saga/src/main/kotlin/com/beyondeye/reduks/experimental/middlewares/saga/SagaCmdProcessor.kt @@ -2,16 +2,15 @@ package com.beyondeye.reduks.experimental.middlewares.saga import com.beyondeye.reduks.Selector import com.beyondeye.reduks.SelectorBuilder -import kotlinx.coroutines.experimental.cancel -import kotlinx.coroutines.experimental.channels.Channel -import kotlinx.coroutines.experimental.channels.ReceiveChannel -import kotlinx.coroutines.experimental.channels.SendChannel -import kotlinx.coroutines.experimental.channels.actor -import kotlinx.coroutines.experimental.delay +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.channels.actor +import kotlinx.coroutines.delay import java.lang.ref.WeakReference import java.util.concurrent.CancellationException -import java.util.concurrent.TimeUnit -import kotlin.coroutines.experimental.CoroutineContext /** * extension function for yelding a state field: create a state selector on the fly, so you don't need to create it @@ -23,7 +22,7 @@ inline suspend infix fun SagaYeldSingle.selectField(noi return this.select(selValue) } -class SagaYeldSingle(private val sagaProcessor: SagaProcessor){ +class SagaYeldSingle(private val sagaCmdProcessor: SagaCmdProcessor){ suspend infix fun put(value:Any) { _yieldSingle(OpCode.Put(value)) } @@ -45,7 +44,7 @@ class SagaYeldSingle(private val sagaProcessor: SagaProcessor){ * yield a command to execute a child Saga in the same context of the current one (the parent saga): execution * of the parent saga is suspended until child saga completion. Exceptions in child saga execution * will bubble up to parent. - * The parent saga and child saga share the same [SagaProcessor] and the same incoming actions channel. + * The parent saga and child saga share the same [SagaCmdProcessor] and the same incoming actions channel. * In other words, incoming store actions processed in the child saga will be removed from the actions queue of parent saga */ suspend infix fun call(childSaga: SagaFn):R { @@ -186,9 +185,9 @@ function* saga() { //----------------------------------------------- suspend fun _yieldSingle(opcode: OpCode):Any { - sagaProcessor.inChannel.send(opcode) + sagaCmdProcessor.inChannel.send(opcode) if(opcode !is OpCode.OpCodeWithResult) return Unit - return sagaProcessor.outChannel.receive() + return sagaCmdProcessor.outChannel.receive() } } //----------------------------------------------- @@ -226,9 +225,9 @@ suspend inline fun SagaYeldSingle.throttle(delayMs:Long,han _yieldSingle(OpCode.Throttle(B::class.java,delayMs,handlerSaga)) } //----------------------------------------------- -class Saga(sagaProcessor: SagaProcessor) { - @JvmField val yield_ = SagaYeldSingle(sagaProcessor) -// val yieldAll= SagaYeldAll(sagaProcessor) +class Saga(sagaCmdProcessor: SagaCmdProcessor) { + @JvmField val yield_ = SagaYeldSingle(sagaCmdProcessor) +// val yieldAll= SagaYeldAll(sagaCmdProcessor) fun sagaFn(name: String, fn0: suspend Saga.() -> R) = SagaFn0(name, fn0) @@ -246,7 +245,7 @@ sealed class OpCode { abstract val sagaLabel:String abstract fun filterSaga(filterSagaName:String):SagaFn0 } - class Delay(val time: Long,val unit: TimeUnit = TimeUnit.MILLISECONDS): OpCodeWithResult() + class Delay(val timeMsecs: Long): OpCodeWithResult() class Put(val value:Any): OpCode() class Take(val type:Class): OpCodeWithResult() /** @@ -322,19 +321,27 @@ sealed class OpCode { // class Cancelled:OpCode() } -class SagaProcessor( +class SagaCmdProcessor( val sagaName:String, sagaMiddleWare: SagaMiddleWare, - private val dispatcherActor: SendChannel = actor { } + + internal val linkedSagaParentScope: CoroutineScope, + private val dispatcherActor: SendChannel = linkedSagaParentScope.actor { } ) { - internal var linkedSagaCoroutineContext: CoroutineContext?=null + /** + * this is needed when we want to execute the OpCode.CancelSelf, + * this is job associated of the sagafn that is executed by the saga + * that is started in [SagaMiddleWare._runSaga] + */ + internal var linkedSagaJob: Job?=null private var childCounter:Long=0 private val sm=WeakReference(sagaMiddleWare) /** * channel for communication between Saga and Saga processor + * (RENDEZVOUS channel) */ val inChannel : Channel = Channel() val outChannel : Channel = Channel() @@ -352,7 +359,7 @@ class SagaProcessor( // print(e.message) // } finally{ - + val a=1 } } @@ -360,7 +367,7 @@ class SagaProcessor( for(a in inChannel) { when(a) { is OpCode.Delay -> { - delay(a.time,a.unit) + delay(a.timeMsecs) outChannel.send(Unit) } is OpCode.Put -> @@ -389,6 +396,8 @@ class SagaProcessor( val cs: SagaFn = a.childSaga as SagaFn val childSagaName=buildChildSagaName("_call_",cs.name) val childTask=sagaMiddleware._runSaga(cs,this,childSagaName, SAGATYPE_CHILD_CALL) + //NOTE that here we are not sending the childTask to the outChannel (like in Fork, Spawn) + // this way, the main saga will be blocked waiting (yield) until the child saga complete and finally send its result to the outChannel } } is OpCode.Fork<*, *> -> { @@ -412,10 +421,15 @@ class SagaProcessor( is OpCode.SagaFinished<*> -> { //add handling of result (if not unit) than need to back to parent saga (resolve task) promise //also handling sending exception - outChannel.send(a.result) //if this is saga call that finished, don't stop processor!! - if(!a.isChildCall) { + if(a.isChildCall) { + //end of saga that was started with yield_ Call: return the result to the main saga as result of yield_ Call + outChannel.send(a.result) //if this is saga call that finished, don't stop processor!! + } else + { + //End of saga that was started with Spawn or Fork: no result need to be returned to the main saga: + //the main saga can monitor the child saga with the SagaTask that was returned on its start sm.get()?.updataSagaDataAfterProcessorFinished(sagaName) - return + return //EXIT Command processor loop that basically kill the coroutine of the actor that is handling the sagaprocessor } } is OpCode.JoinTasks -> { @@ -432,7 +446,7 @@ class SagaProcessor( a.tasks.forEach { it.cancel() } } is OpCode.CancelSelf -> { - linkedSagaCoroutineContext?.cancel() + linkedSagaJob?.cancel() } // is OpCode.Cancelled -> { // val res= sm?.get()?.isCancelled(sagaName) ?:false diff --git a/reduks-saga/src/main/kotlin/com/beyondeye/reduks/experimental/middlewares/saga/SagaData.kt b/reduks-saga/src/main/kotlin/com/beyondeye/reduks/experimental/middlewares/saga/SagaData.kt index 3a3bbad..00bb254 100644 --- a/reduks-saga/src/main/kotlin/com/beyondeye/reduks/experimental/middlewares/saga/SagaData.kt +++ b/reduks-saga/src/main/kotlin/com/beyondeye/reduks/experimental/middlewares/saga/SagaData.kt @@ -9,10 +9,10 @@ import kotlinx.coroutines.channels.SendChannel */ internal data class SagaData( /** - * note: this is actually the actor coroutine of the SagaProcessor associated with the saga + * note: this is actually the actor coroutine of the SagaCmdProcessor associated with the saga */ val inputActionsChannel: SendChannel?, - val sagaProcessor: SagaProcessor?, + val sagaCmdProcessor: SagaCmdProcessor?, val sagaJob: Deferred?, val sagaParentName:String, val sagaJobResult:R?=null @@ -20,8 +20,9 @@ internal data class SagaData( { // fun sagaProcessorCoroutine():AbstractCoroutine =inputActionsChannel as AbstractCoroutine fun sagaProcessorJob(): Job =inputActionsChannel as Job - fun isCancelled() = sagaJob == null || sagaJob.isCancelled - fun isCompleted() = sagaJob == null || sagaJob.isCompleted + + fun isCancelled() = if (sagaJob == null) sagaJobResult == null else sagaJob.isCancelled + fun isCompleted() = if(sagaJob == null) sagaJobResult!=null else sagaJob.isCompleted suspend fun await():R { return sagaJobResult?:sagaJob?.await()!! } diff --git a/reduks-saga/src/main/kotlin/com/beyondeye/reduks/experimental/middlewares/saga/SagaMiddleWare.kt b/reduks-saga/src/main/kotlin/com/beyondeye/reduks/experimental/middlewares/saga/SagaMiddleWare.kt index 1a26cfe..30e7934 100644 --- a/reduks-saga/src/main/kotlin/com/beyondeye/reduks/experimental/middlewares/saga/SagaMiddleWare.kt +++ b/reduks-saga/src/main/kotlin/com/beyondeye/reduks/experimental/middlewares/saga/SagaMiddleWare.kt @@ -1,12 +1,11 @@ package com.beyondeye.reduks.experimental.middlewares.saga import com.beyondeye.reduks.* -import kotlinx.coroutines.experimental.* -import kotlinx.coroutines.experimental.channels.* +import kotlinx.coroutines.channels.SendChannel +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.* import java.lang.ref.WeakReference import java.util.concurrent.CancellationException -import kotlin.coroutines.experimental.CoroutineContext -import kotlin.coroutines.experimental.coroutineContext /** * a port of saga middleware @@ -14,22 +13,27 @@ import kotlin.coroutines.experimental.coroutineContext * https://github.com/redux-saga/redux-saga/ * https://redux-saga.js.org/ * + * NOTE: [root_scope] is assumed be a [supervisorScope] so that if some its children fails, it is not automatically cancelled, see coroutine docs * Created by daely on 12/15/2017. */ -class SagaMiddleWare(store_:Store, rootCoroutineContext:CoroutineContext= DefaultDispatcher) : Middleware { - /** - * root coroutine context used to create all coroutines used to run and handle sagas - */ - val rootSagaCoroutineContext= newCoroutineContext(rootCoroutineContext) +class SagaMiddleWare(store_:Store, parent_scope: CoroutineScope, val sagaMiddlewareDispatcher:CoroutineDispatcher= Dispatchers.Default) : Middleware { private val dispatcherActor: SendChannel private val incomingActionsDistributionActor: SendChannel private var sagaMap:Map> + private val sagaMiddlewareRootJob:Job get() = sagaMiddlewareRootScope.coroutineContext[Job]!! + private val sagaMiddlewareRootScope:CoroutineScope internal val store:WeakReference> init { + //create a supervisor root job that is a child of parent_scope job for all coroutines started by the saga middleware + //this is a job that IS NOT cancelled if one its children is cancelled + //see https://github.com/Kotlin/kotlinx.coroutines/blob/master/docs/exception-handling.md#supervision + val sagaMiddlewareRootJob=SupervisorJob(parent_scope.coroutineContext[Job]) + //create a root scope for all coroutine launched by Saga middleware, using the specified sagaMiddlewareDispatcher + sagaMiddlewareRootScope=CoroutineScope(sagaMiddlewareDispatcher + sagaMiddlewareRootJob) store=WeakReference(store_) sagaMap = mapOf() //use an actor for dispatching so that we ensure we preserve action order - dispatcherActor = actor(rootSagaCoroutineContext) { + dispatcherActor = sagaMiddlewareRootScope.actor { for (a in channel) { //loop over incoming message try { //don't let exception bubble up to sagas store.get()?.dispatch?.invoke(a) @@ -40,7 +44,7 @@ class SagaMiddleWare(store_:Store, rootCoroutineContext:CoroutineConte } //use an actor for distributing incoming actions so we ensure we preserve action order //define a channel with unlimited capacity, because we don't want the action dispatcher - incomingActionsDistributionActor = actor(rootSagaCoroutineContext,Channel.UNLIMITED) { + incomingActionsDistributionActor = sagaMiddlewareRootScope.actor(capacity = Channel.UNLIMITED) { for (a in channel) { // iterate over incoming actions //distribute incoming actions to sagas for (saga in sagaMap.values) { @@ -53,7 +57,7 @@ class SagaMiddleWare(store_:Store, rootCoroutineContext:CoroutineConte override fun dispatch(store: Store, nextDispatcher: (Any)->Any, action: Any):Any { val res=nextDispatcher(action) //hit the reducers before processing actions in saga middleware! //use actor here to make sure that actions are distributed in the right order - launch(rootSagaCoroutineContext) { + sagaMiddlewareRootScope.launch { incomingActionsDistributionActor.send(action) } return res @@ -85,7 +89,7 @@ class SagaMiddleWare(store_:Store, rootCoroutineContext:CoroutineConte if(!existingSaga.isCompleted()) throw IllegalArgumentException("saga with name: $sagaName already running: use replaceSaga() instead") } - _runSaga(SagaFn0(sagaName, sagafn),null,sagaName, SagaProcessor.SAGATYPE_SPAWN) + _runSaga(SagaFn0(sagaName, sagafn),null,sagaName, SagaCmdProcessor.SAGATYPE_SPAWN) } /** * replace an existing saga with the specified name. If a saga with the specified name exists and its is running, it wll be cancelled @@ -94,53 +98,77 @@ class SagaMiddleWare(store_:Store, rootCoroutineContext:CoroutineConte if(sagaMap.containsKey(sagaName)) { stopSaga(sagaName) } - _runSaga(SagaFn0(sagaName, sagafn),null,sagaName, SagaProcessor.SAGATYPE_SPAWN) + _runSaga(SagaFn0(sagaName, sagafn),null,sagaName, SagaCmdProcessor.SAGATYPE_SPAWN) } /** - * child saga type can be one of [SagaProcessor.SAGATYPE_CHILD_CALL],[SagaProcessor.SAGATYPE_CHILD_FORK],[SagaProcessor.SAGATYPE_SPAWN] + * child saga type can be one of [SagaCmdProcessor.SAGATYPE_CHILD_CALL],[SagaCmdProcessor.SAGATYPE_CHILD_FORK],[SagaCmdProcessor.SAGATYPE_SPAWN] */ - internal fun _runSaga(sagafn: SagaFn, parentSagaProcessor: SagaProcessor?, sagaName: String, childType:Int): SagaTask { + internal fun _runSaga(sagafn: SagaFn, parentSagaCmdProcessor: SagaCmdProcessor?, sagaName: String, childType:Int): SagaTask { - if(childType!= SagaProcessor.SAGATYPE_SPAWN &&parentSagaProcessor==null) - throw IllegalArgumentException("Only when spawning independent(top level) sagas parentSagaProcessor can be null!") + if(childType!= SagaCmdProcessor.SAGATYPE_SPAWN &&parentSagaCmdProcessor==null) + throw IllegalArgumentException("Only when spawning independent(top level) sagas parentSagaCmdProcessor can be null!") - val parentSagaCoroutineContext = parentSagaProcessor?.linkedSagaCoroutineContext ?: rootSagaCoroutineContext - val newSagaParentCoroutineContext = when (childType) { - SagaProcessor.SAGATYPE_CHILD_CALL, SagaProcessor.SAGATYPE_CHILD_FORK -> parentSagaCoroutineContext - SagaProcessor.SAGATYPE_SPAWN -> rootSagaCoroutineContext - else -> rootSagaCoroutineContext + + // +// val parentSagaRootJob:Job + val parentSagaScope:CoroutineScope + if(parentSagaCmdProcessor==null) { + parentSagaScope = sagaMiddlewareRootScope + } else { + parentSagaScope = CoroutineScope(parentSagaCmdProcessor.linkedSagaJob!!+sagaMiddlewareDispatcher) } - //------- - var sagaInputActionsChannel: SendChannel?=null - var sagaProcessor: SagaProcessor?=null + val newSagaRootScope:CoroutineScope + var newSagaInputActionsChannel: SendChannel?=null + var newSagaCmdProcessor: SagaCmdProcessor?=null when(childType) { - SagaProcessor.SAGATYPE_CHILD_CALL -> { - sagaInputActionsChannel = null //use parent saga action channel - sagaProcessor = parentSagaProcessor //use parent saga processor + /** + * see [SagaYeldSingle.call] + */ + SagaCmdProcessor.SAGATYPE_CHILD_CALL-> { + newSagaRootScope = parentSagaScope + newSagaInputActionsChannel = null //use parent saga action channel + //if child call, we are reusing the parent saga processor + newSagaCmdProcessor = parentSagaCmdProcessor //use parent saga processor } - SagaProcessor.SAGATYPE_CHILD_FORK, SagaProcessor.SAGATYPE_SPAWN -> { - sagaProcessor = SagaProcessor(sagaName, this, dispatcherActor) - //define the saga processor receive channel, that is used to receive actions from dispatcher - //to have unlimited buffer, because we don't want to block the dispatcher - sagaInputActionsChannel = actor(rootSagaCoroutineContext, Channel.UNLIMITED) { - sagaProcessor.start(this) + /** + * see [SagaYeldSingle.fork] + */ + SagaCmdProcessor.SAGATYPE_CHILD_FORK -> { + newSagaRootScope = parentSagaScope + //------ + newSagaCmdProcessor = SagaCmdProcessor(sagaName, this, newSagaRootScope,dispatcherActor) + //define the saga processor receive channel, that is used to receive actions from dispatcher to have unlimited buffer, because we don't want to block the dispatcher + newSagaInputActionsChannel = newSagaRootScope.actor(capacity = Channel.UNLIMITED) { + newSagaCmdProcessor!!.start(this) } } + /** + * see [SagaYeldSingle.spawn] + */ + SagaCmdProcessor.SAGATYPE_SPAWN -> { + newSagaRootScope = sagaMiddlewareRootScope + //------ + newSagaCmdProcessor = SagaCmdProcessor(sagaName, this, newSagaRootScope,dispatcherActor) + //define the saga processor receive channel, that is used to receive actions from dispatcher to have unlimited buffer, because we don't want to block the dispatcher + newSagaInputActionsChannel = newSagaRootScope.actor(capacity = Channel.UNLIMITED) { + newSagaCmdProcessor.start(this) + } + } + else -> throw NotImplementedError("This must not happen") } - val newSaga = Saga(sagaProcessor!!) + val newSaga = Saga(newSagaCmdProcessor!!) //start lazily, so that we have time to insert sagaData in sagaMap map //because otherwise stopSaga() at the end of the sagaJob will not work - val sagaDeferred = async(newSagaParentCoroutineContext, start = CoroutineStart.LAZY) { - val isChildCall = (childType == SagaProcessor.SAGATYPE_CHILD_CALL) - if(!isChildCall) //if child call, don't reassign linked coroutine context, because we are reusing the parent saga processor - sagaProcessor.linkedSagaCoroutineContext= coroutineContext + val sagaDeferred = newSagaRootScope.async( start = CoroutineStart.LAZY) { + val isChildCall = (childType == SagaCmdProcessor.SAGATYPE_CHILD_CALL) val sagaResult = try { val res=sagafn.invoke(newSaga) //a parent coroutine will automatically wait for its children to complete execution, but - //we want to handle this manually because we have coordinate with saga associated sagaProcessor - coroutineContext[Job]?.children?.forEach { it.join() } + //we want to handle this manually because we have to coordinate with saga associated sagaCmdProcessor + val job=coroutineContext[Job] + job?.children?.forEach { it.join() } res } catch (e: Throwable) { e @@ -162,11 +190,17 @@ class SagaMiddleWare(store_:Store, rootCoroutineContext:CoroutineConte @Suppress("UNCHECKED_CAST") sagaResult as R } + //if the new saga is not a child saga, it has its new associated SagaCmdProcessor, so we need to + //initialize the linkedSagaJob in the associated SagaCmdProcessor + if (childType != SagaCmdProcessor.SAGATYPE_CHILD_CALL) { + newSagaCmdProcessor.linkedSagaJob= sagaDeferred + } + val parentSagaName=when(childType) { - SagaProcessor.SAGATYPE_SPAWN -> "" //if spawn, don't return any result and don't register this saga as child - else -> parentSagaProcessor!!.sagaName + SagaCmdProcessor.SAGATYPE_SPAWN -> "" //if spawn, don't return any result and don't register this saga as child + else -> parentSagaCmdProcessor!!.sagaName } - addSagaData(sagaName, SagaData(sagaInputActionsChannel, sagaProcessor, sagaDeferred, parentSagaName)) + addSagaData(sagaName, SagaData(newSagaInputActionsChannel, newSagaCmdProcessor, sagaDeferred, parentSagaName)) //we are ready to start now sagaDeferred.start() return SagaTaskFromDeferred(sagafn.name, sagaDeferred) @@ -199,8 +233,8 @@ class SagaMiddleWare(store_:Store, rootCoroutineContext:CoroutineConte updateSagaData(sagaName) { finishedSaga-> if(finishedSaga==null) throw Exception("this must not happen") - launch(rootSagaCoroutineContext) { - finishedSaga.sagaProcessor?.inChannel?.send(OpCode.SagaFinished(result, isChildCall)) + sagaMiddlewareRootScope.launch { + finishedSaga.sagaCmdProcessor?.inChannel?.send(OpCode.SagaFinished(result, isChildCall)) } finishedSaga.copy(sagaJob = null,sagaJobResult = result) } @@ -209,7 +243,7 @@ class SagaMiddleWare(store_:Store, rootCoroutineContext:CoroutineConte updateSagaData(sagaName) { sagaData -> if(sagaData==null) return@updateSagaData null sagaData.inputActionsChannel?.close() - sagaData.copy(sagaProcessor = null,inputActionsChannel = null) + sagaData.copy(sagaCmdProcessor = null,inputActionsChannel = null) } } /** @@ -221,7 +255,7 @@ class SagaMiddleWare(store_:Store, rootCoroutineContext:CoroutineConte if(!isCompleted()) { sagaJob?.cancel() inputActionsChannel?.close() - sagaProcessor?.stop() + sagaCmdProcessor?.stop() } } val childSagasNames = getSagaChildrenNames(sagaName) @@ -237,8 +271,8 @@ class SagaMiddleWare(store_:Store, rootCoroutineContext:CoroutineConte * cancelled as a result of this invocation and `false` if if it was already * cancelled or completed. See [Job.cancel] for details. */ - fun stopAll(): Boolean { - return rootSagaCoroutineContext.cancel() + fun stopAll() { + return sagaMiddlewareRootJob.cancel() } private fun getSagaChildrenNames(sagaName: String): List { val childSagasNames = sagaMap.entries.filter { it.value.sagaParentName == sagaName }.map { it.key } diff --git a/reduks-saga/src/test/kotlin/com/beyondeye/reduks/middlewares/SagaMiddlewareTest.kt b/reduks-saga/src/test/kotlin/com/beyondeye/reduks/middlewares/SagaMiddlewareTest.kt index ed43958..3cb9302 100644 --- a/reduks-saga/src/test/kotlin/com/beyondeye/reduks/middlewares/SagaMiddlewareTest.kt +++ b/reduks-saga/src/test/kotlin/com/beyondeye/reduks/middlewares/SagaMiddlewareTest.kt @@ -5,9 +5,10 @@ import com.beyondeye.reduks.SelectorBuilder import com.beyondeye.reduks.StoreSubscriberFn import com.beyondeye.reduks.experimental.AsyncStore import com.beyondeye.reduks.experimental.middlewares.saga.* -import kotlinx.coroutines.experimental.delay -import kotlinx.coroutines.experimental.newSingleThreadContext -import kotlinx.coroutines.experimental.runBlocking +import kotlinx.coroutines.GlobalScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.newSingleThreadContext +import kotlinx.coroutines.runBlocking import org.assertj.core.api.Assertions.* import org.junit.Ignore import org.junit.Test @@ -53,8 +54,10 @@ class SagaMiddlewareTest { @Test fun testSagaPut() { - val store = AsyncStore(TestState(), reducer, subscribeContext = newSingleThreadContext("SubscribeThread")) //custom subscribeContext not UI: otherwise exception if not running on android - val sagaMiddleware = SagaMiddleWare(store) + val store = AsyncStore(TestState(), + reducer, subscribeDispatcher = newSingleThreadContext("SubscribeThread"), + cscope = GlobalScope) //custom subscribeDispatcher not UI: otherwise exception if not running on android + val sagaMiddleware = SagaMiddleWare(store,GlobalScope) store.applyMiddleware(sagaMiddleware) val lock = CountDownLatch(1) @@ -81,8 +84,10 @@ class SagaMiddlewareTest { } @Test fun testSagaTake() { - val store = AsyncStore(TestState(), reducer, subscribeContext = newSingleThreadContext("SubscribeThread")) //custom subscribeContext not UI: otherwise exception if not running on android - val sagaMiddleware = SagaMiddleWare(store) + val store = AsyncStore(TestState(), reducer, + subscribeDispatcher = newSingleThreadContext("SubscribeThread"), + cscope = GlobalScope) //custom subscribeDispatcher not UI: otherwise exception if not running on android + val sagaMiddleware = SagaMiddleWare(store,GlobalScope) store.applyMiddleware(sagaMiddleware) sagaMiddleware.runSaga("incr") { //wait for SagaAction.Plus type of action @@ -118,8 +123,10 @@ class SagaMiddlewareTest { @Test fun testSagaSelect() { - val store = AsyncStore(TestState(incrCounter = 0,decrCounter = 0), reducer, subscribeContext = newSingleThreadContext("SubscribeThread")) //custom subscribeContext not UI: otherwise exception if not running on android - val sagaMiddleware = SagaMiddleWare(store) + val store = AsyncStore(TestState(incrCounter = 0,decrCounter = 0), + reducer, subscribeDispatcher = newSingleThreadContext("SubscribeThread"), + cscope = GlobalScope) //custom subscribeDispatcher not UI: otherwise exception if not running on android + val sagaMiddleware = SagaMiddleWare(store,GlobalScope) store.applyMiddleware(sagaMiddleware) sagaMiddleware.runSaga("select") { val selb=SelectorBuilder() @@ -150,8 +157,10 @@ class SagaMiddlewareTest { } @Test fun testSagaSelectField() { - val store = AsyncStore(TestState(incrCounter = 0,decrCounter = 0), reducer, subscribeContext = newSingleThreadContext("SubscribeThread")) //custom subscribeContext not UI: otherwise exception if not running on android - val sagaMiddleware = SagaMiddleWare(store) + val store = AsyncStore(TestState(incrCounter = 0,decrCounter = 0), + reducer, subscribeDispatcher = newSingleThreadContext("SubscribeThread"), + cscope = GlobalScope) //custom subscribeDispatcher not UI: otherwise exception if not running on android + val sagaMiddleware = SagaMiddleWare(store,GlobalScope) store.applyMiddleware(sagaMiddleware) sagaMiddleware.runSaga("select") { val initialIncrValue= yield_ selectField {incrCounter} @@ -180,8 +189,10 @@ class SagaMiddlewareTest { @Test fun testSagaDelay() { - val store = AsyncStore(TestState(), reducer, subscribeContext = newSingleThreadContext("SubscribeThread")) //custom subscribeContext not UI: otherwise exception if not running on android - val sagaMiddleware = SagaMiddleWare(store) + val store = AsyncStore(TestState(), + reducer, subscribeDispatcher = newSingleThreadContext("SubscribeThread"), + cscope = GlobalScope) //custom subscribeDispatcher not UI: otherwise exception if not running on android + val sagaMiddleware = SagaMiddleWare(store,GlobalScope) store.applyMiddleware(sagaMiddleware) val expectedDelay=1500L sagaMiddleware.runSaga("delay") { @@ -209,8 +220,10 @@ class SagaMiddlewareTest { } @Test fun testSagaCall() { - val store = AsyncStore(TestState(), reducer, subscribeContext = newSingleThreadContext("SubscribeThread")) //custom subscribeContext not UI: otherwise exception if not running on android - val sagaMiddleware = SagaMiddleWare(store) + val store = AsyncStore(TestState(), reducer, + subscribeDispatcher = newSingleThreadContext("SubscribeThread"), + cscope = GlobalScope) //custom subscribeDispatcher not UI: otherwise exception if not running on android + val sagaMiddleware = SagaMiddleWare(store,GlobalScope) store.applyMiddleware(sagaMiddleware) val totIncr=333 @@ -249,8 +262,10 @@ class SagaMiddlewareTest { } @Test fun testSagaForkSpawnAndJoin() { - val store = AsyncStore(TestState(), reducer, subscribeContext = newSingleThreadContext("SubscribeThread")) //custom subscribeContext not UI: otherwise exception if not running on android - val sagaMiddleware = SagaMiddleWare(store) + val store = AsyncStore(TestState(), reducer, + subscribeDispatcher = newSingleThreadContext("SubscribeThread"), + cscope = GlobalScope) //custom subscribeDispatcher not UI: otherwise exception if not running on android + val sagaMiddleware = SagaMiddleWare(store,GlobalScope) store.applyMiddleware(sagaMiddleware) val totIncr=333 @@ -301,8 +316,10 @@ class SagaMiddlewareTest { } @Test fun testSagaForkSpawnAndCancelChildren() { - val store = AsyncStore(TestState(), reducer, subscribeContext = newSingleThreadContext("SubscribeThread")) //custom subscribeContext not UI: otherwise exception if not running on android - val sagaMiddleware = SagaMiddleWare(store) + val store = AsyncStore(TestState(), reducer, + subscribeDispatcher = newSingleThreadContext("SubscribeThread"), + cscope = GlobalScope) //custom subscribeDispatcher not UI: otherwise exception if not running on android + val sagaMiddleware = SagaMiddleWare(store,GlobalScope) store.applyMiddleware(sagaMiddleware) val canceledIncr=333 @@ -338,7 +355,6 @@ class SagaMiddlewareTest { yield_ put(ActualAction.IncrementCounter(actualIncr)) yield_ put(ActualAction.DecrementCounter(actualDecr)) - } lock.await(100,TimeUnit.SECONDS) val state=store.state @@ -348,8 +364,10 @@ class SagaMiddlewareTest { } @Test fun testSagaSpawnForkParentCancel() { - val store = AsyncStore(TestState(), reducer, subscribeContext = newSingleThreadContext("SubscribeThread")) //custom subscribeContext not UI: otherwise exception if not running on android - val sagaMiddleware = SagaMiddleWare(store) + val store = AsyncStore(TestState(), reducer, + subscribeDispatcher = newSingleThreadContext("SubscribeThread"), + cscope = GlobalScope) //custom subscribeDispatcher not UI: otherwise exception if not running on android + val sagaMiddleware = SagaMiddleWare(store,GlobalScope) store.applyMiddleware(sagaMiddleware) val forkIncr=333 @@ -392,8 +410,10 @@ class SagaMiddlewareTest { } @Test fun TestSagaForkExceptionInParentTerminateChildren() { - val store = AsyncStore(TestState(), reducer, subscribeContext = newSingleThreadContext("SubscribeThread")) //custom subscribeContext not UI: otherwise exception if not running on android - val sagaMiddleware = SagaMiddleWare(store) + val store = AsyncStore(TestState(), reducer, + subscribeDispatcher = newSingleThreadContext("SubscribeThread"), + cscope = GlobalScope) //custom subscribeDispatcher not UI: otherwise exception if not running on android + val sagaMiddleware = SagaMiddleWare(store,GlobalScope) store.applyMiddleware(sagaMiddleware) val forkIncr=333 @@ -436,8 +456,10 @@ class SagaMiddlewareTest { } @Test fun testSagaForkParentAutomaticallyWaitForChildCompletion() { - val store = AsyncStore(TestState(), reducer, subscribeContext = newSingleThreadContext("SubscribeThread")) //custom subscribeContext not UI: otherwise exception if not running on android - val sagaMiddleware = SagaMiddleWare(store) + val store = AsyncStore(TestState(), reducer, + subscribeDispatcher = newSingleThreadContext("SubscribeThread"), + cscope = GlobalScope) //custom subscribeDispatcher not UI: otherwise exception if not running on android + val sagaMiddleware = SagaMiddleWare(store,GlobalScope) store.applyMiddleware(sagaMiddleware) val forkIncr=333 @@ -457,6 +479,7 @@ class SagaMiddlewareTest { val childSagaIncr2= sagaFn("childSagaIncr2") { p1:Int-> yield_ delay delayMs yield_ put ActualAction.IncrementCounter(p1) + val at_end_of_childSagaIncr2=0 } //childSagaDecr execution time should be equal the sum of its execution time and childSagaIncr, //because parent saga should automatically wait for children to complete @@ -464,6 +487,7 @@ class SagaMiddlewareTest { yield_ delay delayMs yield_ put ActualAction.IncrementCounter(p1) val incr2Task = yield_ fork childSagaIncr2.withArgs(forkIncr) + val at_end_of_childSagaIncr1=0 } val incr1Task = yield_ fork childSagaIncr1.withArgs(forkIncr) val runtime=measureTimeMillis { @@ -481,12 +505,14 @@ class SagaMiddlewareTest { assertThat(state.incrCounter).isEqualTo(2*forkIncr) } - //this test currently fails + //this test currently fails )sometimes @Ignore @Test fun testSagaTakeEvery() { - val store = AsyncStore(TestState(), reducer, subscribeContext = newSingleThreadContext("SubscribeThread")) //custom subscribeContext not UI: otherwise exception if not running on android - val sagaMiddleware = SagaMiddleWare(store) + val store = AsyncStore(TestState(), reducer, + subscribeDispatcher = newSingleThreadContext("SubscribeThread"), + cscope = GlobalScope) //custom subscribeDispatcher not UI: otherwise exception if not running on android + val sagaMiddleware = SagaMiddleWare(store,GlobalScope) store.applyMiddleware(sagaMiddleware) val initialDelaySecs:Double=0.1 sagaMiddleware.runSaga("main") { @@ -532,8 +558,10 @@ class SagaMiddlewareTest { @Ignore @Test fun testSagaTakeLatest() { - val store = AsyncStore(TestState(), reducer, subscribeContext = newSingleThreadContext("SubscribeThread")) //custom subscribeContext not UI: otherwise exception if not running on android - val sagaMiddleware = SagaMiddleWare(store) + val store = AsyncStore(TestState(), reducer, + subscribeDispatcher = newSingleThreadContext("SubscribeThread"), + cscope = GlobalScope) //custom subscribeDispatcher not UI: otherwise exception if not running on android + val sagaMiddleware = SagaMiddleWare(store,GlobalScope) store.applyMiddleware(sagaMiddleware) sagaMiddleware.runSaga("incr") { yield_ takeLatest{ a:SagaAction.SetPlus -> @@ -571,12 +599,14 @@ class SagaMiddlewareTest { assertThat(state.decrCounter).isEqualTo(-321) //one action failed // store.dispatch(EndAction()) } - //this test currently fails + //this test currently fails, part of the times @Ignore @Test fun testSagaThrottle() { - val store = AsyncStore(TestState(), reducer, subscribeContext = newSingleThreadContext("SubscribeThread")) //custom subscribeContext not UI: otherwise exception if not running on android - val sagaMiddleware = SagaMiddleWare(store) + val store = AsyncStore(TestState(), reducer, + subscribeDispatcher = newSingleThreadContext("SubscribeThread"), + cscope = GlobalScope) //custom subscribeDispatcher not UI: otherwise exception if not running on android + val sagaMiddleware = SagaMiddleWare(store,GlobalScope) store.applyMiddleware(sagaMiddleware) sagaMiddleware.runSaga("incr") { yield_.throttle(100){ a:SagaAction.SetPlus ->