Skip to content

Commit

Permalink
migration to coroutines 1.0.0 completed
Browse files Browse the repository at this point in the history
  • Loading branch information
beyondeye authored and daely committed Nov 8, 2018
1 parent 9bfa2b7 commit 0885c23
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ abstract class AsyncReduksActivity<S:Any>(
reduks.store.errorLogFn=defaultReduksInternalLogger
}

override fun <T> storeCreator(): StoreCreator<T> = AsyncStore.Creator<T>(cscope=activity_cscope,reduceContext = Dispatchers.Default,subscribeContext = Dispatchers.Main)
override fun <T> storeCreator(): StoreCreator<T> = AsyncStore.Creator<T>(cscope=activity_cscope,reduceDispatcher = Dispatchers.Default,subscribeDispatcher = Dispatchers.Main)

//override for making this function visible to inheritors
override fun onStop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,24 @@ import kotlin.coroutines.CoroutineContext
/**
* Store that use kotlin coroutine channels for notifying asynchronously to store subscribers about
* state changes.
* By default the subscribeContext (the coroutine context used by subscribers when notified of store changes
* By default the subscribeDispatcher (the coroutine context used by subscribers when notified of store changes
* is the Android UI thread, because usually subscribers need to update views according to state changes
* More in general you can use any single thread context, for example:
* val subscribeContext=newSingleThreadContext("SubscribeThread")
* val subscribeDispatcher=newSingleThreadContext("SubscribeThread")
*/
//
class AsyncStore<S>(initialState: S, private var reducer: Reducer<S>,
val cscope: CoroutineScope,
subscribeContext: CoroutineContext,
val reduceContext: CoroutineContext =Dispatchers.Default
subscribeDispatcher: CoroutineDispatcher,
val reduceDispatcher: CoroutineDispatcher =Dispatchers.Default
) : Store<S> {
class Creator<S>(
val cscope: CoroutineScope,
val subscribeContext: CoroutineContext,
val reduceContext: CoroutineContext =Dispatchers.Default,
val subscribeDispatcher: CoroutineDispatcher,
val reduceDispatcher: CoroutineDispatcher =Dispatchers.Default,
val withStandardMiddleware:Boolean=true) : StoreCreator<S> {
override fun create(reducer: Reducer<S>, initialState: S): Store<S> {
val res = AsyncStore<S>(initialState, reducer,cscope,subscribeContext,reduceContext)
val res = AsyncStore<S>(initialState, reducer,cscope,subscribeDispatcher,reduceDispatcher)
return if (!withStandardMiddleware)
res
else
Expand All @@ -39,7 +39,7 @@ class AsyncStore<S>(initialState: S, private var reducer: Reducer<S>,
override var errorLogFn: ((String) -> Unit)?=null
private var deferredState: Deferred<S> = cscope.async { initialState }
override val state:S
get() = runBlocking(reduceContext) {
get() = runBlocking(reduceDispatcher) {
deferredState.await()
}
private var subscribers= listOf<StoreSubscriber<S>>()
Expand All @@ -56,7 +56,7 @@ class AsyncStore<S>(initialState: S, private var reducer: Reducer<S>,
//get a reference of current deferred so that we make sure that all reduce action are actually executed in the correct order
val curDeferredState = deferredState
//update deferredState with result of async job of running the reducer
deferredState = cscope.async(reduceContext) {
deferredState = cscope.async(reduceDispatcher) {
val startState = curDeferredState.await()
val newState = try {
reducer.reduce(startState, action) //return newState
Expand All @@ -69,11 +69,11 @@ class AsyncStore<S>(initialState: S, private var reducer: Reducer<S>,
//after creating the new deferredState, handle notification of subscribers once this new
//deferredState is resolved
val nextDeferredState=deferredState
cscope.launch(reduceContext) {
cscope.launch(reduceDispatcher) {
nextDeferredState.await()
//NOTE THAT IF THE ObserveContext is a single thread(the ui thread)
// then subscribers will be notified sequentially of state changes in the correct order
withContext(subscribeContext) {
withContext(subscribeDispatcher) {
notifySubscribers()
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.beyondeye.reduks.experimental.middlewares

import com.beyondeye.reduks.*
import kotlinx.coroutines.experimental.Deferred
import kotlinx.coroutines.experimental.async
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async


sealed class AsyncAction(val payloadTypename:String): Action {
Expand All @@ -29,10 +31,11 @@ sealed class AsyncAction(val payloadTypename:String): Action {
* it seems redundant to store both type name and define this class as template class
* unfortunately this is required because of type erasure in java/kotlin generics
*/
class Started<PayloadType :Any>(payloadTypename:String, val promise: Deferred<PayloadType>): AsyncAction(payloadTypename) {
constructor(type:String,body: () -> PayloadType):this(type, async { body() })
class Started<PayloadType :Any>(payloadTypename:String, val promise: Deferred<PayloadType>):
AsyncAction(payloadTypename) {
constructor(type:String, cscope: CoroutineScope,body: () -> PayloadType):this(type, cscope.async { body() })
suspend fun asCompleted() = Completed(payloadTypename, promise.await())
suspend fun asFailed() = Failed(payloadTypename, promise.getCompletionExceptionOrNull()!!)
fun asFailed() = Failed(payloadTypename, promise.getCompletionExceptionOrNull()!!)
/**
* block until we get back the result from the promise
*/
Expand All @@ -57,8 +60,8 @@ sealed class AsyncAction(val payloadTypename:String): Action {
inline fun <reified PayloadType:Any> start( promise: Deferred<PayloadType>): AsyncAction {
return Started<PayloadType>(PayloadType::class.java.canonicalName, promise)
}
inline fun <reified PayloadType:Any> start(noinline body: () -> PayloadType): AsyncAction {
return Started<PayloadType>(PayloadType::class.java.canonicalName, body)
inline fun <reified PayloadType:Any> start(cscope: CoroutineScope,noinline body: () -> PayloadType): AsyncAction {
return Started<PayloadType>(PayloadType::class.java.canonicalName,cscope, body)
}
}
}
Expand All @@ -75,11 +78,11 @@ sealed class AsyncAction(val payloadTypename:String): Action {
*
* Created by daely on 5/17/2016.
*/
class AsyncActionMiddleWare<S> : Middleware<S> {
class AsyncActionMiddleWare<S>(val action_cscope: CoroutineScope=GlobalScope) : Middleware<S> {
override fun dispatch(store: Store<S>, nextDispatcher: (Any)->Any, action: Any):Any {
if(action is AsyncAction.Started<*>) {
//queue some async actions when the promise resolves
async {
action_cscope.async {
try {
action.promise.await()
store.dispatch(action.asCompleted())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import com.beyondeye.reduks.ReducerFn
import com.beyondeye.reduks.StoreSubscriberFn
import com.beyondeye.reduks.experimental.middlewares.AsyncAction
import com.beyondeye.reduks.experimental.middlewares.AsyncActionMiddleWare
import kotlinx.coroutines.experimental.newSingleThreadContext
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.newSingleThreadContext
import org.assertj.core.api.Assertions
import org.junit.Test

Expand Down Expand Up @@ -58,7 +59,7 @@ class CoroutinesAsyncActionMiddlewareTest {
}
@Test
fun test_an_async_action_for_a_very_difficult_and_computation_heavy_operation() {
val store = AsyncStore(TestState(), reducer,subscribeContext = newSingleThreadContext("SubscribeThread")) //don't use android ui thread: otherwise exception if not running on android
val store = AsyncStore(TestState(), reducer,GlobalScope,subscribeDispatcher = newSingleThreadContext("SubscribeThread")) //don't use android ui thread: otherwise exception if not running on android
store.applyMiddleware(AsyncActionMiddleWare())

//subscribe before dispatch!!
Expand All @@ -75,14 +76,14 @@ class CoroutinesAsyncActionMiddlewareTest {
}
}
}) //on state change
val asyncAction = AsyncAction.start { 2 + 2 }
val asyncAction = AsyncAction.start(GlobalScope) { 2 + 2 }
store.dispatch(asyncAction)
Thread.sleep(100) //wait for async action to be dispatched TODO: use thunk instead!!
store.dispatch(EndAction())
}
@Test
fun test_two_async_actions_with_different_payload_type() {
val store = AsyncStore(TestState(), reducer,subscribeContext = newSingleThreadContext("SubscribeThread")) //false: otherwise exception if not running on android
val store = AsyncStore(TestState(), reducer,GlobalScope,subscribeDispatcher = newSingleThreadContext("SubscribeThread")) //false: otherwise exception if not running on android
store.applyMiddleware(AsyncActionMiddleWare())

//subscribe before dispatch!!
Expand All @@ -103,9 +104,9 @@ class CoroutinesAsyncActionMiddlewareTest {
}
}
}) //on state change
val asyncAction = AsyncAction.start { 2 + 2 }
val asyncAction = AsyncAction.start(GlobalScope) { 2 + 2 }
store.dispatch(asyncAction)
val asyncAction2 = AsyncAction.start { "2 + 2" }
val asyncAction2 = AsyncAction.start(GlobalScope) { "2 + 2" }
store.dispatch(asyncAction2)
Thread.sleep(100) ////need to wait, because otherwise the end action will be dispatched before we complete the two async actions
store.dispatch(EndAction())
Expand All @@ -114,7 +115,7 @@ class CoroutinesAsyncActionMiddlewareTest {
@Test
fun test_an_async_action_for_a_very_difficult_and_computation_heavy_operation_that_fails() {

val store = AsyncStore(TestState(), reducer,subscribeContext = newSingleThreadContext("SubscribeThread")) //custom subscribeContext not UI: otherwise exception if not running on android
val store = AsyncStore(TestState(), reducer,GlobalScope,subscribeDispatcher = newSingleThreadContext("SubscribeThread")) //custom subscribeDispatcher not UI: otherwise exception if not running on android
store.applyMiddleware(AsyncActionMiddleWare())

//subscribe before dispatch!
Expand All @@ -133,7 +134,7 @@ class CoroutinesAsyncActionMiddlewareTest {
}
}
)
val asyncAction = AsyncAction.start<Int> {
val asyncAction = AsyncAction.start<Int>(GlobalScope) {
throw Exception(actionDifficultError)
}
store.dispatch(asyncAction)
Expand All @@ -142,7 +143,7 @@ class CoroutinesAsyncActionMiddlewareTest {
@Test
fun test_that_normal_actions_pass_through_the_middleware() {

val store = AsyncStore(TestState(), reducer,subscribeContext = newSingleThreadContext("SubscribeThread")) //custom subscribeContext not UI: otherwise exception if not running on android
val store = AsyncStore(TestState(), reducer,GlobalScope,subscribeDispatcher = newSingleThreadContext("SubscribeThread")) //custom subscribeDispatcher not UI: otherwise exception if not running on android
store.applyMiddleware(AsyncActionMiddleWare())


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ package com.beyondeye.reduks.experimental.middlewares.saga

import com.beyondeye.reduks.Selector
import com.beyondeye.reduks.SelectorBuilder
import kotlinx.coroutines.experimental.cancel
import kotlinx.coroutines.experimental.channels.Channel
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.SendChannel
import kotlinx.coroutines.experimental.channels.actor
import kotlinx.coroutines.experimental.delay
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.delay
import java.lang.ref.WeakReference
import java.util.concurrent.CancellationException
import java.util.concurrent.TimeUnit
import kotlin.coroutines.experimental.CoroutineContext

/**
* extension function for yelding a state field: create a state selector on the fly, so you don't need to create it
Expand All @@ -23,7 +22,7 @@ inline suspend infix fun <reified S:Any,I:Any> SagaYeldSingle<S>.selectField(noi
return this.select(selValue)
}

class SagaYeldSingle<S:Any>(private val sagaProcessor: SagaProcessor<S>){
class SagaYeldSingle<S:Any>(private val sagaCmdProcessor: SagaCmdProcessor<S>){
suspend infix fun put(value:Any) {
_yieldSingle(OpCode.Put(value))
}
Expand All @@ -45,7 +44,7 @@ class SagaYeldSingle<S:Any>(private val sagaProcessor: SagaProcessor<S>){
* yield a command to execute a child Saga in the same context of the current one (the parent saga): execution
* of the parent saga is suspended until child saga completion. Exceptions in child saga execution
* will bubble up to parent.
* The parent saga and child saga share the same [SagaProcessor] and the same incoming actions channel.
* The parent saga and child saga share the same [SagaCmdProcessor] and the same incoming actions channel.
* In other words, incoming store actions processed in the child saga will be removed from the actions queue of parent saga
*/
suspend infix fun <S:Any,R:Any> call(childSaga: SagaFn<S, R>):R {
Expand Down Expand Up @@ -186,9 +185,9 @@ function* saga() {

//-----------------------------------------------
suspend fun _yieldSingle(opcode: OpCode):Any {
sagaProcessor.inChannel.send(opcode)
sagaCmdProcessor.inChannel.send(opcode)
if(opcode !is OpCode.OpCodeWithResult) return Unit
return sagaProcessor.outChannel.receive()
return sagaCmdProcessor.outChannel.receive()
}
}
//-----------------------------------------------
Expand Down Expand Up @@ -226,9 +225,9 @@ suspend inline fun <S:Any,reified B> SagaYeldSingle<S>.throttle(delayMs:Long,han
_yieldSingle(OpCode.Throttle<S, B>(B::class.java,delayMs,handlerSaga))
}
//-----------------------------------------------
class Saga<S:Any>(sagaProcessor: SagaProcessor<S>) {
@JvmField val yield_ = SagaYeldSingle(sagaProcessor)
// val yieldAll= SagaYeldAll(sagaProcessor)
class Saga<S:Any>(sagaCmdProcessor: SagaCmdProcessor<S>) {
@JvmField val yield_ = SagaYeldSingle(sagaCmdProcessor)
// val yieldAll= SagaYeldAll(sagaCmdProcessor)

fun <R:Any> sagaFn(name: String, fn0: suspend Saga<S>.() -> R) =
SagaFn0(name, fn0)
Expand All @@ -246,7 +245,7 @@ sealed class OpCode {
abstract val sagaLabel:String
abstract fun filterSaga(filterSagaName:String):SagaFn0<S,Unit>
}
class Delay(val time: Long,val unit: TimeUnit = TimeUnit.MILLISECONDS): OpCodeWithResult()
class Delay(val timeMsecs: Long): OpCodeWithResult()
class Put(val value:Any): OpCode()
class Take<B>(val type:Class<B>): OpCodeWithResult()
/**
Expand Down Expand Up @@ -322,19 +321,27 @@ sealed class OpCode {
// class Cancelled:OpCode()
}

class SagaProcessor<S:Any>(
class SagaCmdProcessor<S:Any>(
val sagaName:String,
sagaMiddleWare: SagaMiddleWare<S>,
private val dispatcherActor: SendChannel<Any> = actor { }

internal val linkedSagaParentScope: CoroutineScope,
private val dispatcherActor: SendChannel<Any> = linkedSagaParentScope.actor { }
)
{
internal var linkedSagaCoroutineContext: CoroutineContext?=null
/**
* this is needed when we want to execute the OpCode.CancelSelf,
* this is job associated of the sagafn that is executed by the saga
* that is started in [SagaMiddleWare._runSaga]
*/
internal var linkedSagaJob: Job?=null
private var childCounter:Long=0
private val sm=WeakReference(sagaMiddleWare)


/**
* channel for communication between Saga and Saga processor
* (RENDEZVOUS channel)
*/
val inChannel : Channel<Any> = Channel()
val outChannel : Channel<Any> = Channel()
Expand All @@ -352,15 +359,15 @@ class SagaProcessor<S:Any>(
// print(e.message)
// }
finally{

val a=1
}
}

private suspend fun processingLoop(inputActions: ReceiveChannel<Any>) {
for(a in inChannel) {
when(a) {
is OpCode.Delay -> {
delay(a.time,a.unit)
delay(a.timeMsecs)
outChannel.send(Unit)
}
is OpCode.Put ->
Expand Down Expand Up @@ -389,6 +396,8 @@ class SagaProcessor<S:Any>(
val cs: SagaFn<S, Any> = a.childSaga as SagaFn<S, Any>
val childSagaName=buildChildSagaName("_call_",cs.name)
val childTask=sagaMiddleware._runSaga(cs,this,childSagaName, SAGATYPE_CHILD_CALL)
//NOTE that here we are not sending the childTask to the outChannel (like in Fork, Spawn)
// this way, the main saga will be blocked waiting (yield) until the child saga complete and finally send its result to the outChannel
}
}
is OpCode.Fork<*, *> -> {
Expand All @@ -412,10 +421,15 @@ class SagaProcessor<S:Any>(
is OpCode.SagaFinished<*> -> {
//add handling of result (if not unit) than need to back to parent saga (resolve task) promise
//also handling sending exception
outChannel.send(a.result) //if this is saga call that finished, don't stop processor!!
if(!a.isChildCall) {
if(a.isChildCall) {
//end of saga that was started with yield_ Call: return the result to the main saga as result of yield_ Call
outChannel.send(a.result) //if this is saga call that finished, don't stop processor!!
} else
{
//End of saga that was started with Spawn or Fork: no result need to be returned to the main saga:
//the main saga can monitor the child saga with the SagaTask that was returned on its start
sm.get()?.updataSagaDataAfterProcessorFinished(sagaName)
return
return //EXIT Command processor loop that basically kill the coroutine of the actor that is handling the sagaprocessor
}
}
is OpCode.JoinTasks -> {
Expand All @@ -432,7 +446,7 @@ class SagaProcessor<S:Any>(
a.tasks.forEach { it.cancel() }
}
is OpCode.CancelSelf -> {
linkedSagaCoroutineContext?.cancel()
linkedSagaJob?.cancel()
}
// is OpCode.Cancelled -> {
// val res= sm?.get()?.isCancelled(sagaName) ?:false
Expand Down
Loading

0 comments on commit 0885c23

Please sign in to comment.