Skip to content

Flow.zip breaks if Flow1 impl catchs & rethrows a CancellationException, but not if Flow2 does it #4507

@sameb

Description

@sameb

Describe the bug

Flow.zip breaks if Flow1 impl catchs & rethrows a CancellationException, but not if Flow2 does it.

The problem boils down to the zip impl, where it cancels the Job with an AbortFlowException, but then expects the flow usercode that exception passed through to surface that exact same AbortFlowException. That isn't a fair assumption, which is especially true as the second flow is not held to the same expectation. The implementation should instead check what exception the job was cancelled with. (Note that in doing so, it should be careful to preserve exceptions thrown from the transform lambda, as well as any exceptions thrown from the first coroutine in scenarios where the job wasn't cancelled due to zip's intentional abort.)

Provide a Reproducer

Given the code:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.time.Duration.Companion.seconds

fun main() = runBlocking {
    val f1 = flow {
        try {
          emit(1)
          emit(2)
          delay(1.seconds)
          emit(3)
        } catch(ex: CancellationException) {
           //if(true) throw ex // Re-throwing the exception as-is makes things work
           // .. but throwing a _different_ exception breaks the zip impl
           throw InstrumentedCancellationException(ex)  
        }
    }
    val f2 = flow {
        emit("one")
        emit("two")
    }
    
    try {
      val z = f1.zip(f2, { v1, v2 -> v1 to v2 })
      println("f1.zip(f2) == ${z.toList()}")
    } catch (ex: Exception) {
      println("f1.zip(f2) failed")
      ex.printStackTrace()
    }
    
    
    try {
      val z = f2.zip(f1, { v1, v2 -> v1 to v2 })
      println("f2.zip(f1) == ${z.toList()}")
    } catch (ex: Exception) {
      println("f2.zip(f1) failed")
      ex.printStackTrace()
    }
}

class InstrumentedCancellationException(override val cause: Throwable) : CancellationException()

... the f1.zip(f2) fails and an exception is printed out, but f2.zip(f1) succeeds. The stack trace printed out from f1.zip(f2) is:

InstrumentedCancellationException
	at FileKt$main$1$f1$1.invokeSuspend(File.kt:14)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:280)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at FileKt.main(File.kt:5)
	at FileKt.main(File.kt)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.base/java.lang.reflect.Method.invoke(Unknown Source)
	at executors.JavaRunnerExecutor$Companion.main(JavaRunnerExecutor.kt:27)
	at executors.JavaRunnerExecutor.main(JavaRunnerExecutor.kt)
Caused by: kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed
	at kotlinx.coroutines.flow.internal.CombineKt$zipImpl$1$1$1.invoke(Combine.kt:108)
	at kotlinx.coroutines.flow.internal.CombineKt$zipImpl$1$1$1.invoke(Combine.kt:106)
	at kotlinx.coroutines.channels.BufferedChannel.invokeCloseHandler(BufferedChannel.kt:1823)
	at kotlinx.coroutines.channels.BufferedChannel.closeOrCancelImpl(BufferedChannel.kt:1800)
	at kotlinx.coroutines.channels.BufferedChannel.close(BufferedChannel.kt:1754)
	at kotlinx.coroutines.channels.SendChannel$DefaultImpls.close$default(Channel.kt:98)
	at kotlinx.coroutines.channels.ProducerCoroutine.onCompleted(Produce.kt:143)
	at kotlinx.coroutines.channels.ProducerCoroutine.onCompleted(Produce.kt:136)
	at kotlinx.coroutines.AbstractCoroutine.onCompletionInternal(AbstractCoroutine.kt:93)
	at kotlinx.coroutines.JobSupport.tryFinalizeSimpleState(JobSupport.kt:296)
	at kotlinx.coroutines.JobSupport.tryMakeCompleting(JobSupport.kt:860)
	at kotlinx.coroutines.JobSupport.makeCompletingOnce$kotlinx_coroutines_core(JobSupport.kt:832)
	at kotlinx.coroutines.AbstractCoroutine.resumeWith(AbstractCoroutine.kt:100)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:46)
	at kotlinx.coroutines.flow.internal.SafeCollector.invokeSuspend(SafeCollector.kt:48)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:108)
	... 14 more

(This was filed as https://youtrack.jetbrains.com/issue/KT-80239/Flow.zip-breaks-if-Flow1-impl-catchs-rethrows-CancellationException-but-not-if-Flow2-does-it, but a colleague told me that reactive stuff is done outside the mainline kt stdlib and is tracked here.)

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugdocsKDoc and API referenceflow

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions