-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
Describe the bug
Flow.zip breaks if Flow1 impl catchs & rethrows a CancellationException, but not if Flow2 does it.
The problem boils down to the zip impl, where it cancels the Job with an AbortFlowException, but then expects the flow usercode that exception passed through to surface that exact same AbortFlowException. That isn't a fair assumption, which is especially true as the second flow is not held to the same expectation. The implementation should instead check what exception the job was cancelled with. (Note that in doing so, it should be careful to preserve exceptions thrown from the transform lambda, as well as any exceptions thrown from the first coroutine in scenarios where the job wasn't cancelled due to zip's intentional abort.)
Provide a Reproducer
Given the code:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.Duration.Companion.seconds
fun main() = runBlocking {
val f1 = flow {
try {
emit(1)
emit(2)
delay(1.seconds)
emit(3)
} catch(ex: CancellationException) {
//if(true) throw ex // Re-throwing the exception as-is makes things work
// .. but throwing a _different_ exception breaks the zip impl
throw InstrumentedCancellationException(ex)
}
}
val f2 = flow {
emit("one")
emit("two")
}
try {
val z = f1.zip(f2, { v1, v2 -> v1 to v2 })
println("f1.zip(f2) == ${z.toList()}")
} catch (ex: Exception) {
println("f1.zip(f2) failed")
ex.printStackTrace()
}
try {
val z = f2.zip(f1, { v1, v2 -> v1 to v2 })
println("f2.zip(f1) == ${z.toList()}")
} catch (ex: Exception) {
println("f2.zip(f1) failed")
ex.printStackTrace()
}
}
class InstrumentedCancellationException(override val cause: Throwable) : CancellationException()
... the f1.zip(f2) fails and an exception is printed out, but f2.zip(f1) succeeds. The stack trace printed out from f1.zip(f2) is:
InstrumentedCancellationException
at FileKt$main$1$f1$1.invokeSuspend(File.kt:14)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:280)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
at FileKt.main(File.kt:5)
at FileKt.main(File.kt)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at executors.JavaRunnerExecutor$Companion.main(JavaRunnerExecutor.kt:27)
at executors.JavaRunnerExecutor.main(JavaRunnerExecutor.kt)
Caused by: kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed
at kotlinx.coroutines.flow.internal.CombineKt$zipImpl$1$1$1.invoke(Combine.kt:108)
at kotlinx.coroutines.flow.internal.CombineKt$zipImpl$1$1$1.invoke(Combine.kt:106)
at kotlinx.coroutines.channels.BufferedChannel.invokeCloseHandler(BufferedChannel.kt:1823)
at kotlinx.coroutines.channels.BufferedChannel.closeOrCancelImpl(BufferedChannel.kt:1800)
at kotlinx.coroutines.channels.BufferedChannel.close(BufferedChannel.kt:1754)
at kotlinx.coroutines.channels.SendChannel$DefaultImpls.close$default(Channel.kt:98)
at kotlinx.coroutines.channels.ProducerCoroutine.onCompleted(Produce.kt:143)
at kotlinx.coroutines.channels.ProducerCoroutine.onCompleted(Produce.kt:136)
at kotlinx.coroutines.AbstractCoroutine.onCompletionInternal(AbstractCoroutine.kt:93)
at kotlinx.coroutines.JobSupport.tryFinalizeSimpleState(JobSupport.kt:296)
at kotlinx.coroutines.JobSupport.tryMakeCompleting(JobSupport.kt:860)
at kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:832)
at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:100)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
at kotlinx.coroutines.flow.internal.SafeCollector.invokeSuspend(SafeCollector.kt:48)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
... 14 more
(This was filed as https://youtrack.jetbrains.com/issue/KT-80239/Flow.zip-breaks-if-Flow1-impl-catchs-rethrows-CancellationException-but-not-if-Flow2-does-it, but a colleague told me that reactive stuff is done outside the mainline kt stdlib and is tracked here.)