Skip to content

Conversation

JiaqiWang18
Copy link
Contributor

@JiaqiWang18 JiaqiWang18 commented Aug 22, 2025

What changes were proposed in this pull request?

Fixes flow failure counter increments by switching to updateWith. This prevents first increment from throwing NoSuchElementException when key does not exist.

Why are the changes needed?

Current impl flowToNumConsecutiveFailure(flowIdentifier) throws exception when a flow is retried for first time:

java.util.NoSuchElementException: key not found: `spark_catalog`.`test_db`.`mv`
	at scala.collection.MapOps.default(Map.scala:289)
	at scala.collection.MapOps.default$(Map.scala:288)
	at scala.collection.AbstractMap.default(Map.scala:420)
	at scala.collection.MapOps.apply(Map.scala:176)
	at scala.collection.MapOps.apply$(Map.scala:175)
	at scala.collection.AbstractMap.apply(Map.scala:420)
	at org.apache.spark.sql.pipelines.graph.GraphExecution.incrementFlowToNumConsecutiveFailure(GraphExecution.scala:52)
	at org.apache.spark.sql.pipelines.graph.GraphExecution.$anonfun$planAndStartFlow$1(GraphExecution.scala:92)
	at org.apache.spark.sql.pipelines.graph.GraphExecution.$anonfun$planAndStartFlow$1$adapted(GraphExecution.scala:90)
	at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:484)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:840)

Does this PR introduce any user-facing change?

No

How was this patch tested?

Run pipeline manually with flow failure to test.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the SQL label Aug 22, 2025
@JiaqiWang18 JiaqiWang18 changed the title [SDP] Fix counter initialization and increment logic for flow retries [SPARK-52346][SDP] Fix counter initialization and increment logic for flow retries Aug 22, 2025
@JiaqiWang18
Copy link
Contributor Author

@SCHJonathan @gengliangwang for review, thanks!

@JiaqiWang18
Copy link
Contributor Author

Also @anishm-db

@itskals
Copy link

itskals commented Aug 25, 2025

Looks good to me.

flowToNumConsecutiveFailure.put(flowIdentifier, flowToNumConsecutiveFailure(flowIdentifier) + 1)
flowToNumConsecutiveFailure.updateWith(flowIdentifier) {
case Some(count) => Some(count + 1)
case None => Some(1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JiaqiWang18 shall we have a unit test for this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added unit test "consecutive failure event level is correct". flowToNumConsecutiveFailure controls the log level and if it is incremented correctly, we should only see the last retry execution event having ERROR level and all previous events having WARN level .

Comment on lines +1032 to +1033
val session = spark
import session.implicits._
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, what are we using from the implicits in this test?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used in

($"id" % 2).cast("int")

@gengliangwang
Copy link
Member

Thanks, merging to master

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants