Skip to content

Commit

Permalink
1093: Launch side effects ATOMIC, await render complete
Browse files Browse the repository at this point in the history
Solves #1093

Add headlessIntegrationTest to use for the new test case
  • Loading branch information
steve-the-edwards committed Aug 10, 2023
1 parent b130a43 commit 1b1acda
Show file tree
Hide file tree
Showing 10 changed files with 452 additions and 18 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ android.useAndroidX=true
systemProp.org.gradle.internal.publish.checksums.insecure=true

GROUP=com.squareup.workflow1
VERSION_NAME=1.11.0-beta04-SNAPSHOT
VERSION_NAME=1.11.0-beta04-atomic-w-SNAPSHOT

POM_DESCRIPTION=Square Workflow

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,14 @@ import kotlin.reflect.typeOf
* )
* ```
*
* To create populate such functions from your `render` method, you first need to define a
* To create such functions from your `render` method, you first need to define a
* [WorkflowAction] to handle the event by changing state, emitting an output, or both. Then, just
* pass a lambda to your rendering that instantiates the action and passes it to
* [actionSink.send][Sink.send].
*
* ## Performing asynchronous work
*
* See [runningWorker].
* See [runningSideEffect] and [runningWorker].
*
* ## Composing children
*
Expand Down Expand Up @@ -92,8 +92,15 @@ public interface BaseRenderContext<out PropsT, StateT, in OutputT> {
* [cancelled](https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html).
*
* The coroutine will run with the same [CoroutineContext][kotlin.coroutines.CoroutineContext]
* that the workflow runtime is running in. The side effect coroutine will not be started until
* _after_ the first render call than runs it returns.
* that the workflow runtime is running in.
* The coroutine is launched with [CoroutineStart.ATOMIC][kotlinx.coroutines.CoroutineStart.ATOMIC]
* start mode, which means that it will _start_ even if the scope is cancelled before it has a
* chance to dispatch. This is to guarantee that any time a [sideEffect] is declared running
* in any render pass, it will at least be started. If the backing scope is cancelled - it is no
* longer declared as running in a consecutive render pass, or the rendering [Workflow] is no
* longer rendered - then it will be cancelled at the first suspension point within [sideEffect].
*
*
*
* @param key The string key that is used to distinguish between side effects.
* @param sideEffect The suspend function that will be launched in a coroutine to perform the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,6 @@ public interface Worker<out OutputT> {
* When the worker is torn down, the coroutine is cancelled.
* This coroutine is launched in the same scope as the workflow runtime, with a few changes:
*
* - The dispatcher is always set to [Unconfined][kotlinx.coroutines.Dispatchers.Unconfined] to
* minimize overhead for workers that don't care which thread they're executed on (e.g. logging
* side effects, workers that wrap third-party reactive libraries, etc.). If your work cares
* which thread it runs on, use [withContext][kotlinx.coroutines.withContext] or
* [flowOn][kotlinx.coroutines.flow.flowOn] to specify a dispatcher.
* - A [CoroutineName][kotlinx.coroutines.CoroutineName] that describes the `Worker` instance
* (via `toString`) and the key specified by the workflow running the worker.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import com.squareup.workflow1.internal.WorkflowRunner
import com.squareup.workflow1.internal.chained
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlin.coroutines.ContinuationInterceptor

/**
* Launches the [workflow] in a new coroutine in [scope] and returns a [StateFlow] of its
Expand Down Expand Up @@ -200,7 +202,7 @@ public fun <PropsT, OutputT, RenderingT> renderWorkflowIn(
}
}

// Pass on to the UI.
// Pass the rendering on to the UI.
renderingsAndSnapshots.value = nextRenderAndSnapshot
// And emit the Output.
sendOutput(actionResult, onOutput)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,19 @@ package com.squareup.workflow1.internal

import com.squareup.workflow1.internal.InlineLinkedList.InlineListNode
import kotlinx.coroutines.Job
import kotlinx.coroutines.sync.Mutex

/**
* Holds a [Job] that represents a running [side effect][RealRenderContext.runningSideEffect], as
* well as the key used to identify that side effect.
*
* Lastly, holds the [renderComplete] that is unlocked when render() is complete (and so the sink
* can be used).
*/
internal class SideEffectNode(
val key: String,
val job: Job
val job: Job,
val renderComplete: Mutex
) : InlineListNode<SideEffectNode> {

override var nextListNode: SideEffectNode? = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import com.squareup.workflow1.internal.RealRenderContext.SideEffectRunner
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart.LAZY
import kotlinx.coroutines.CoroutineStart.ATOMIC
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
Expand All @@ -29,6 +29,8 @@ import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.launch
import kotlinx.coroutines.plus
import kotlinx.coroutines.selects.SelectBuilder
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.coroutines.CoroutineContext

/**
Expand All @@ -40,7 +42,10 @@ import kotlin.coroutines.CoroutineContext
* worker coroutines. This context will override anything from the workflow's scope and any other
* hard-coded values added to worker contexts. It must not contain a [Job] element (it would violate
* structured concurrency).
*
* The opt-in for [ExperimentalCoroutinesApi] is for using a [ATOMIC] on side effect Jobs.
*/
@OptIn(ExperimentalCoroutinesApi::class)
internal class WorkflowNode<PropsT, StateT, OutputT, RenderingT>(
val id: WorkflowNodeId,
workflow: StatefulWorkflow<PropsT, StateT, OutputT, RenderingT>,
Expand Down Expand Up @@ -212,9 +217,9 @@ internal class WorkflowNode<PropsT, StateT, OutputT, RenderingT>(

// Tear down workflows and workers that are obsolete.
subtreeManager.commitRenderedChildren()
// Side effect jobs are launched lazily, since they can send actions to the sink, and can only
// be started after context is frozen.
sideEffects.forEachStaging { it.job.start() }
// Let all staging side effects know that render is complete.
sideEffects.forEachStaging { if (it.renderComplete.isLocked) it.renderComplete.unlock() }
// Tear down side effects that are no longer declared running.
sideEffects.commitStaging { it.job.cancel() }

return rendering
Expand Down Expand Up @@ -260,7 +265,18 @@ internal class WorkflowNode<PropsT, StateT, OutputT, RenderingT>(
sideEffect: suspend CoroutineScope.() -> Unit
): SideEffectNode {
val scope = this + CoroutineName("sideEffect[$key] for $id")
val job = scope.launch(start = LAZY, block = sideEffect)
return SideEffectNode(key, job)
val renderComplete = Mutex(locked = true)
// Side effect jobs are ATOMIC because even if the side effect is run and then NOT run
// in consecutive render passes before the side effect can be dispatched, we still want it to
// start. Note that this means that side effects must be co-operative or they could
// unnecessarily hog runtime dispatch. We could force them to be so by adding an
// `if (!isActive) yield()`
// at the start of the sideEffect block, but that also might mean that expected side effects
// don't occur when the sideEffect is run at least once.
val job = scope.launch(start = ATOMIC, block = {
renderComplete.lock()
sideEffect()
})
return SideEffectNode(key, job, renderComplete)
}
}
22 changes: 22 additions & 0 deletions workflow-testing/api/workflow-testing.api
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
public final class com/squareup/workflow1/testing/HeadlessIntegrationTestKt {
public static final fun headlessIntegrationTest (Lcom/squareup/workflow1/Workflow;Lkotlin/coroutines/CoroutineContext;Ljava/util/List;Ljava/util/Set;Lkotlin/jvm/functions/Function2;JLkotlin/jvm/functions/Function2;)V
public static final fun headlessIntegrationTest (Lcom/squareup/workflow1/Workflow;Lkotlinx/coroutines/flow/StateFlow;Lkotlin/coroutines/CoroutineContext;Ljava/util/List;Ljava/util/Set;Lkotlin/jvm/functions/Function2;JLkotlin/jvm/functions/Function2;)V
public static synthetic fun headlessIntegrationTest$default (Lcom/squareup/workflow1/Workflow;Lkotlin/coroutines/CoroutineContext;Ljava/util/List;Ljava/util/Set;Lkotlin/jvm/functions/Function2;JLkotlin/jvm/functions/Function2;ILjava/lang/Object;)V
public static synthetic fun headlessIntegrationTest$default (Lcom/squareup/workflow1/Workflow;Lkotlinx/coroutines/flow/StateFlow;Lkotlin/coroutines/CoroutineContext;Ljava/util/List;Ljava/util/Set;Lkotlin/jvm/functions/Function2;JLkotlin/jvm/functions/Function2;ILjava/lang/Object;)V
}

public final class com/squareup/workflow1/testing/RenderIdempotencyChecker : com/squareup/workflow1/WorkflowInterceptor {
public static final field INSTANCE Lcom/squareup/workflow1/testing/RenderIdempotencyChecker;
public fun onInitialState (Ljava/lang/Object;Lcom/squareup/workflow1/Snapshot;Lkotlin/jvm/functions/Function2;Lcom/squareup/workflow1/WorkflowInterceptor$WorkflowSession;)Ljava/lang/Object;
Expand Down Expand Up @@ -155,3 +162,18 @@ public final class com/squareup/workflow1/testing/WorkflowTestRuntimeKt {
public static synthetic fun launchForTestingWith$default (Lcom/squareup/workflow1/StatefulWorkflow;Ljava/lang/Object;Lcom/squareup/workflow1/testing/WorkflowTestParams;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;ILjava/lang/Object;)Ljava/lang/Object;
}

public final class com/squareup/workflow1/testing/WorkflowTurbine {
public static final field Companion Lcom/squareup/workflow1/testing/WorkflowTurbine$Companion;
public static final field WORKFLOW_TEST_DEFAULT_TIMEOUT_MS J
public fun <init> (Ljava/lang/Object;Lapp/cash/turbine/ReceiveTurbine;)V
public final fun awaitNext (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun awaitNext$default (Lcom/squareup/workflow1/testing/WorkflowTurbine;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public final fun awaitNextRendering (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun awaitNextRenderingSatisfying (Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public final fun getFirstRendering ()Ljava/lang/Object;
public final fun skipRenderings (ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

public final class com/squareup/workflow1/testing/WorkflowTurbine$Companion {
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package com.squareup.workflow1.testing

import app.cash.turbine.ReceiveTurbine
import app.cash.turbine.test
import com.squareup.workflow1.RuntimeConfig
import com.squareup.workflow1.RuntimeConfigOptions
import com.squareup.workflow1.Workflow
import com.squareup.workflow1.WorkflowInterceptor
import com.squareup.workflow1.renderWorkflowIn
import com.squareup.workflow1.testing.WorkflowTurbine.Companion.WORKFLOW_TEST_DEFAULT_TIMEOUT_MS
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.drop
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.runTest
import kotlin.coroutines.CoroutineContext
import kotlin.time.Duration.Companion.milliseconds

/**
* This is a test harness to run integration tests for a Workflow tree. The parameters passed here are
* the same as those to start a Workflow runtime with [renderWorkflowIn] except for ignoring
* state persistence as that is not needed for this style of test.
*
* The [coroutineContext] rather than a [CoroutineScope] is passed so that this harness handles the
* scope for the Workflow runtime for you but you can still specify context for it. It defaults to
* [Dispatchers.Main.immediate]. If you wish to use a dispatcher that skips delays, use a
* [StandardTestDispatcher][kotlinx.coroutines.test.StandardTestDispatcher], so that the dispatcher
* will still guarantee ordering.
*
* A [testTimeout] may be specified to override the default [WORKFLOW_TEST_DEFAULT_TIMEOUT_MS] for
* any particular test. This is the max amount of time the test could spend waiting on a rendering.
*
* This will start the Workflow runtime (with params as passed) rooted at whatever Workflow
* it is called on and then create a [WorkflowTurbine] for its renderings and run [testCase] on that.
* [testCase] can thus drive the test scenario and assert against renderings.
*/
public fun <PropsT, OutputT, RenderingT> Workflow<PropsT, OutputT, RenderingT>.headlessIntegrationTest(
props: StateFlow<PropsT>,
coroutineContext: CoroutineContext = Dispatchers.Main.immediate,
interceptors: List<WorkflowInterceptor> = emptyList(),
runtimeConfig: RuntimeConfig = RuntimeConfigOptions.DEFAULT_CONFIG,
onOutput: suspend (OutputT) -> Unit = {},
testTimeout: Long = WORKFLOW_TEST_DEFAULT_TIMEOUT_MS,
testCase: suspend WorkflowTurbine<RenderingT>.() -> Unit
) {
val workflow = this

runTest(
context = coroutineContext,
timeout = testTimeout.milliseconds
) {
// We use a sub-scope so that we can cancel the Workflow runtime when we are done with it so that
// tests don't all have to do that themselves.
val workflowRuntimeScope = CoroutineScope(coroutineContext)
val renderings = renderWorkflowIn(
workflow = workflow,
props = props,
scope = workflowRuntimeScope,
interceptors = interceptors,
runtimeConfig = runtimeConfig,
onOutput = onOutput
)

val firstRendering = renderings.value.rendering

// Drop one as its provided separately via `firstRendering`.
renderings.drop(1).map {
it.rendering
}.test {
val workflowTurbine = WorkflowTurbine(
firstRendering,
this
)
workflowTurbine.testCase()
cancelAndIgnoreRemainingEvents()
}
workflowRuntimeScope.cancel()
}
}

/**
* Version of [headlessIntegrationTest] that does not require props. For Workflows that have [Unit]
* props type.
*/
@OptIn(ExperimentalCoroutinesApi::class)
public fun <OutputT, RenderingT> Workflow<Unit, OutputT, RenderingT>.headlessIntegrationTest(
coroutineContext: CoroutineContext = UnconfinedTestDispatcher(),
interceptors: List<WorkflowInterceptor> = emptyList(),
runtimeConfig: RuntimeConfig = RuntimeConfigOptions.DEFAULT_CONFIG,
onOutput: suspend (OutputT) -> Unit = {},
testTimeout: Long = WORKFLOW_TEST_DEFAULT_TIMEOUT_MS,
testCase: suspend WorkflowTurbine<RenderingT>.() -> Unit
): Unit = headlessIntegrationTest(
props = MutableStateFlow(Unit).asStateFlow(),
coroutineContext = coroutineContext,
interceptors = interceptors,
runtimeConfig = runtimeConfig,
onOutput = onOutput,
testTimeout = testTimeout,
testCase = testCase
)

/**
* Simple wrapper around a [ReceiveTurbine] of [RenderingT] to provide convenience helper methods specific
* to Workflow renderings.
*
* @property firstRendering The first rendering of the Workflow runtime is made synchronously. This is
* provided separately if any assertions or operations are needed from it.
*/
public class WorkflowTurbine<RenderingT>(
public val firstRendering: RenderingT,
private val receiveTurbine: ReceiveTurbine<RenderingT>
) {
private var usedFirst = false

/**
* Suspend waiting for the next rendering to be produced by the Workflow runtime. Note this includes
* the first (synchronously made) rendering.
*
* @return the rendering.
*/
public suspend fun awaitNextRendering(): RenderingT {
if (!usedFirst) {
usedFirst = true
return firstRendering
}
return receiveTurbine.awaitItem()
}

public suspend fun skipRenderings(count: Int) {
val skippedCount = if (!usedFirst) {
usedFirst = true
count - 1
} else {
count
}

if (skippedCount > 0) {
receiveTurbine.skipItems(skippedCount)
}
}

/**
* Suspend waiting for the next rendering to be produced by the Workflow runtime that satisfies the
* [predicate].
*
* @return the rendering.
*/
public suspend fun awaitNextRenderingSatisfying(
predicate: (RenderingT) -> Boolean
): RenderingT {
var rendering = awaitNextRendering()
while (!predicate(rendering)) {
rendering = awaitNextRendering()
}
return rendering
}

/**
* Suspend waiting for the next rendering which satisfies [precondition], can successfully be mapped
* using [map] and satisfies the [satisfying] predicate when called on the [T] rendering after it
* has been mapped.
*
* @return the mapped rendering as [T]
*/
public suspend fun <T> awaitNext(
precondition: (RenderingT) -> Boolean = { true },
map: (RenderingT) -> T,
satisfying: T.() -> Boolean = { true }
): T =
map(
awaitNextRenderingSatisfying {
precondition(it) &&
with(map(it)) {
this.satisfying()
}
}
)

public companion object {
/**
* Default timeout to use while waiting for renderings.
*/
public const val WORKFLOW_TEST_DEFAULT_TIMEOUT_MS: Long = 60_000L
}
}
Loading

0 comments on commit 1b1acda

Please sign in to comment.