Skip to content

Commit

Permalink
Rethrow AbortFlowException upon cancellation (#4034)
Browse files Browse the repository at this point in the history
collectWhile catches AbortFlowException even if the CoroutineScope is cancelled
while waiting on child tasks, causing it to fail to respond to cancel().
  • Loading branch information
jiaoxiaodong committed Oct 21, 2024
1 parent 1fcffbf commit 9bb7d70
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 1 deletion.
4 changes: 4 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/operators/Limit.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.internal.*
import kotlin.coroutines.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.flow as safeFlow
import kotlinx.coroutines.flow.internal.unsafeFlow as flow
Expand Down Expand Up @@ -133,5 +134,8 @@ internal suspend inline fun <T> Flow<T>.collectWhile(crossinline predicate: susp
collect(collector)
} catch (e: AbortFlowException) {
e.checkOwnership(collector)
// If thrown in a CoroutineScope waiting on child tasks, this exception can override a later CancellationException
// from cancel(). Rethrow if it does.
if (!coroutineContext.isActive) { throw e }
}
}
19 changes: 18 additions & 1 deletion kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package kotlinx.coroutines.flow

import kotlinx.coroutines.testing.*
import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineStart.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlin.test.*
import kotlin.time.*

class FirstTest : TestBase() {
@Test
Expand Down Expand Up @@ -173,4 +175,19 @@ class FirstTest : TestBase() {

assertFailsWith<CancellationException> { flow.first() }
}
}

@Test
fun test() = runTest {
val job = launch(start = UNDISPATCHED) {
flow {
coroutineScope {
emitAll(produce(start = UNDISPATCHED) {
channel.send(Unit)
})
}
}.first()
expectUnreached()
}
job.cancel()
}
}

0 comments on commit 9bb7d70

Please sign in to comment.