Skip to content

Commit

Permalink
Report additional failing turbines (#233)
Browse files Browse the repository at this point in the history
* Failing test

* Build support for outerFailingFlowIsReported

* Fix nested testIn with backgroundScope

* Add stack traces

* Report only unhandled exceptions instead of all unhandled events

* Update src/commonMain/kotlin/app/cash/turbine/Turbine.kt

---------

Co-authored-by: Jake Wharton <[email protected]>
  • Loading branch information
jingibus and JakeWharton authored Jun 12, 2023
1 parent c7c3a68 commit 61b1872
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 21 deletions.
69 changes: 60 additions & 9 deletions src/commonMain/kotlin/app/cash/turbine/Turbine.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package app.cash.turbine

import kotlin.time.Duration
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
Expand Down Expand Up @@ -204,8 +205,8 @@ internal class ChannelTurbine<T>(

override suspend fun awaitError(): Throwable = withTurbineTimeout { channel.awaitError(name = name) }

override fun ensureAllEventsConsumed() {
if (ignoreRemainingEvents) return
internal fun reportUnconsumedEvents(): UnconsumedEventReport<T> {
if (ignoreRemainingEvents) return UnconsumedEventReport(emptyList())

val unconsumed = mutableListOf<Event<T>>()
var cause: Throwable? = null
Expand All @@ -219,17 +220,67 @@ internal class ChannelTurbine<T>(
break
}
}
if (unconsumed.isNotEmpty()) {

return UnconsumedEventReport(
name = name,
unconsumed = unconsumed,
cause = cause,
)
}

override fun ensureAllEventsConsumed() {
val report = reportUnconsumedEvents()

if (report.unconsumed.isNotEmpty()) {
throw TurbineAssertionError(
buildString {
append("Unconsumed events found".qualifiedBy(name))
append(":")
for (event in unconsumed) {
append("\n - $event")
}
report.describe(this)
},
cause,
report.cause,
)
}
}
}

internal data class UnconsumedEventReport<T>(
val unconsumed: List<Event<T>>,
val name: String? = null,
val cause: Throwable? = null,
) {
fun describe(builder: StringBuilder) {
with(builder) {
append("Unconsumed events found".qualifiedBy(name))
append(":")
for (event in unconsumed) {
append("\n - $event")
}
}
}

fun describeException(builder: StringBuilder) {
with(builder) {
cause?.let { cause ->
append("Unconsumed exception found".qualifiedBy(name))
append(":")
appendLine(
"""
|
|
|Stack trace:
""".trimMargin(),
)
append(cause.stackTraceToString())
appendLine()
}
}
}

fun stripCancellations(): UnconsumedEventReport<T> =
UnconsumedEventReport(
unconsumed = unconsumed.filter {
(it as? Event.Error)?.throwable !is CancellationException
},
name = name,
cause = cause?.takeUnless { it is CancellationException },
)
}
26 changes: 26 additions & 0 deletions src/commonMain/kotlin/app/cash/turbine/coroutines.kt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,32 @@ internal fun assertCallingContextIsNotSuspended() {
}
}

internal class TurbineRegistryElement(val registry: MutableList<ChannelTurbine<*>>) : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<TurbineRegistryElement>

override val key: CoroutineContext.Key<*> = Key
}

/**
* Internal tool to report turbines that have been spun up within a given scope.
*
* If reportTurbines is nested within another reportTurbines, the outer scope wins:
* no turbines will be registered from the inner scope.
*/
internal suspend fun <T> reportTurbines(registry: MutableList<ChannelTurbine<*>>, block: suspend () -> T): T {
val enclosingRegistryElement = currentCoroutineContext()[TurbineRegistryElement]
return if (enclosingRegistryElement != null) {
block()
} else {
withContext(TurbineRegistryElement(registry)) {
block()
}
}
}

internal fun CoroutineScope.reportTurbine(turbine: ChannelTurbine<*>) =
coroutineContext[TurbineRegistryElement]?.registry?.add(turbine)

internal class TurbineTimeoutElement(
val timeout: Duration,
) : CoroutineContext.Element {
Expand Down
90 changes: 78 additions & 12 deletions src/commonMain/kotlin/app/cash/turbine/flow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package app.cash.turbine

import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.time.Duration
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
Expand All @@ -24,12 +26,41 @@ import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.Channel.Factory.UNLIMITED
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import kotlinx.coroutines.plus
import kotlinx.coroutines.test.TestCoroutineScheduler
import kotlinx.coroutines.test.UnconfinedTestDispatcher

public interface TurbineTestContext<T> : ReceiveTurbine<T> {
public fun <R> Flow<R>.testIn(
scope: CoroutineScope,
timeout: Duration? = null,
name: String? = null,
): ReceiveTurbine<R>
}

internal class TurbineTestContextImpl<T>(
turbine: Turbine<T>,
turbineContext: CoroutineContext,
) : TurbineTestContext<T>, ReceiveTurbine<T> by turbine {
private val turbineElements = (turbineContext[TurbineRegistryElement] ?: EmptyCoroutineContext) +
(turbineContext[TurbineTimeoutElement] ?: EmptyCoroutineContext)
override fun <R> Flow<R>.testIn(
scope: CoroutineScope,
timeout: Duration?,
name: String?,
): ReceiveTurbine<R> =
testInInternal(
this@testIn,
timeout = timeout,
name = name,
scope = scope + turbineElements,
)
}

/**
* Terminal flow operator that collects events from given flow and allows the [validate] lambda to
* consume and assert properties on them in order. If any exception occurs during validation the
Expand All @@ -49,19 +80,48 @@ import kotlinx.coroutines.test.UnconfinedTestDispatcher
public suspend fun <T> Flow<T>.test(
timeout: Duration? = null,
name: String? = null,
validate: suspend ReceiveTurbine<T>.() -> Unit,
validate: suspend TurbineTestContext<T>.() -> Unit,
) {
coroutineScope {
collectTurbineIn(this, null, name).apply {
if (timeout != null) {
withTurbineTimeout(timeout) {
validate()
val turbineRegistry = mutableListOf<ChannelTurbine<*>>()
reportTurbines(turbineRegistry) {
coroutineScope {
collectTurbineIn(this, null, name).apply {
try {
val testContext = TurbineTestContextImpl(this@apply, currentCoroutineContext())
if (timeout != null) {
withTurbineTimeout(timeout) {
testContext.validate()
}
} else {
testContext.validate()
}
cancel()
ensureAllEventsConsumed()
} catch (e: Throwable) {
// The exception needs to be reraised. However, if there are any unconsumed events
// from other turbines (including this one), those may indicate an underlying problem.
// So: create a report with all the registered turbines, and include exception as cause
val reportsWithExceptions = turbineRegistry.map {
it.reportUnconsumedEvents()
// The exception will have cancelled its job hierarchy, producing cancellation exceptions
// in its wake. These aren't meaningful test feedback
.stripCancellations()
}
.filter { it.cause != null }
if (reportsWithExceptions.isEmpty()) {
throw e
} else {
throw TurbineAssertionError(
buildString {
reportsWithExceptions.forEach {
it.describeException(this@buildString)
}
},
e,
)
}
}
} else {
validate()
}
cancel()
ensureAllEventsConsumed()
}
}
}
Expand Down Expand Up @@ -89,12 +149,16 @@ public fun <T> Flow<T>.testIn(
timeout: Duration? = null,
name: String? = null,
): ReceiveTurbine<T> {
return testInInternal(this, timeout, scope, name)
}

private fun <T> testInInternal(flow: Flow<T>, timeout: Duration?, scope: CoroutineScope, name: String?): Turbine<T> {
if (timeout != null) {
// Eager check to throw early rather than in a subsequent 'await' call.
checkTimeout(timeout)
}

val turbine = collectTurbineIn(scope, timeout, name)
val turbine = flow.collectTurbineIn(scope, timeout, name)

scope.coroutineContext.job.invokeOnCompletion { exception ->
if (debug) println("Scope ending ${exception ?: ""}")
Expand All @@ -121,7 +185,9 @@ private fun <T> Flow<T>.collectTurbineIn(scope: CoroutineScope, timeout: Duratio
channel = collectIntoChannel(this)
}

return ChannelTurbine(channel, job, timeout, name)
return ChannelTurbine(channel, job, timeout, name).also {
scope.reportTurbine(it)
}
}

internal fun <T> Flow<T>.collectIntoChannel(scope: CoroutineScope): Channel<T> {
Expand Down
68 changes: 68 additions & 0 deletions src/commonTest/kotlin/app/cash/turbine/FlowTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package app.cash.turbine

import kotlin.coroutines.cancellation.CancellationException
import kotlin.test.Test
import kotlin.test.assertContains
import kotlin.test.assertEquals
import kotlin.test.assertFailsWith
import kotlin.test.assertFalse
Expand Down Expand Up @@ -753,4 +754,71 @@ class FlowTest {
assertTrue(awaitError() is CancellationException)
}
}

@Test
fun outerFailingFlowIsReported() = runTest {
val expected = CustomThrowable("hi")

val actual = assertFailsWith<AssertionError> {
flow<Nothing> {
throw expected
}.test(name = "outer") {
Turbine<Unit>(name = "inner").awaitItem()
}
}

val expectedPrefix = """
|Unconsumed exception found for outer:
|
|Stack trace:
""".trimMargin()
assertEquals(
actual.message?.startsWith(
expectedPrefix,
),
true,
"Expected to start with:\n\n$expectedPrefix\n\nBut was:\n\n${actual.message}",
)
assertContains(
actual.message!!,
"CustomThrowable: hi",
)
assertEquals(actual.cause?.message, "No value produced for inner in 3s")
}

@Test
fun innerFailingFlowIsReported() = runTest {
val expected = CustomThrowable("hi")

val actual = assertFailsWith<AssertionError> {
neverFlow().test(name = "outer") {
flow<Nothing> {
throw expected
}.testIn(backgroundScope, name = "inner failing")

Turbine<Unit>(name = "inner").awaitItem()
}
}

val expectedPrefix = """
|Unconsumed exception found for inner failing:
|
|Stack trace:
""".trimMargin()
assertEquals(
actual.message?.startsWith(
expectedPrefix,
),
true,
"Expected to start with:\n\n$expectedPrefix\n\nBut was:\n\n${actual.message}",
)
assertContains(
actual.message!!,
"CustomThrowable: hi",
)
assertEquals(
actual.cause?.message,
"No value produced for inner in 3s",
)
}
}

0 comments on commit 61b1872

Please sign in to comment.