-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
flatMapLatest + combine + first leads to execution while cancelled #4034
Comments
Could not reproduce with coroutines 1.7.3 using Android instrumented tests. The output for me is:
Due to cancellation, the "After first" |
Yes, that's right. It does not repro in an instrumented test for us either. But it does when you put that code in |
Here is a complete example. Run it and see the output in logcat. |
Ok, reproduced. |
@dkhalanskyjb Can you remove the "waiting for clarification" label so it doesn't get auto closed? |
collectWhile catches AbortFlowException even if the CoroutineScope is cancelled while waiting on child tasks, causing it to fail to respond to cancel().
The test case below could reproduce this issue. val job = launch(start = UNDISPATCHED) {
flow {
coroutineScope {
emitAll(produce(start = UNDISPATCHED) {
channel.send(Unit)
})
}
}.first()
expectUnreached()
}
job.cancel() That is what happened:
This PR should fix this issue #4254 |
Thanks for the analysis! This doesn't explain the difference between Android and instrumented tests, but indeed, the reproducer is valid. It can be simplified further: import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() {
runBlocking {
val job = launch(start = CoroutineStart.UNDISPATCHED) {
flow {
try {
println(1)
emit(Unit)
} finally {
runCatching { yield() }
}
}.first()
println(3)
error("unreached")
}
println(2)
job.cancel()
}
} |
If instrumented test is like something below. That won't reproduce. This is because class ExampleInstrumentedTest {
@Test
fun useAppContext() {
val dis = Dispatchers.Main.immediate
val scope = CoroutineScope(dis)
println("Before launch")
scope.launch {
flowOf(listOf(1, 2, 3))
.flatMapLatest { numbers ->
combine(numbers.map { flowOf(it) }) { it.sum() }
}
.first()
println("After first, scope is active: ${scope.isActive}")
}
println("After launch")
scope.cancel()
}
} |
collectWhile catches AbortFlowException even if the CoroutineScope is cancelled while waiting on child tasks, causing it to fail to respond to cancel().
Describe the bug
Firstly, it's possible this is expected behavior but it certainly doesn't seem that way.
We have a flow that we collect using
first()
. This flow is a combination of multiple upstream flows but they all emit their items synchronously. The downstream flow limits the number of emissions to a single element then usesawaitCompletion()
to avoid terminating the flow. This is the flow we callfirst()
on.Under some circumstances, we are seeing the
first()
call resuming after the scope has been cancelled which is causing a crash for us.If either the
flatMapLatest
orcombine
is removed, the code behaves as expected, althoughfirst()
never resumes.I know there are no real guarantees that code won't be running while canceled. But there are few things that makes me think that this doesn't apply in this case:
Provide a Reproducer
This reproduces it on an Android device, but it's possible it can reproed on the JVM too.
I expect this to print:
Instead I'm seeing this:
The text was updated successfully, but these errors were encountered: