Skip to content

Commit 4ba38c2

Browse files
jackywang-dbgengliangwang
authored andcommitted
[SPARK-52346][SDP] Fix counter initialization and increment logic for flow retries
### What changes were proposed in this pull request? Fixes flow failure counter increments by switching to `updateWith`. This prevents first increment from throwing `NoSuchElementException` when key does not exist. ### Why are the changes needed? Current impl `flowToNumConsecutiveFailure(flowIdentifier)` throws exception when a flow is retried for first time: ``` java.util.NoSuchElementException: key not found: `spark_catalog`.`test_db`.`mv` at scala.collection.MapOps.default(Map.scala:289) at scala.collection.MapOps.default$(Map.scala:288) at scala.collection.AbstractMap.default(Map.scala:420) at scala.collection.MapOps.apply(Map.scala:176) at scala.collection.MapOps.apply$(Map.scala:175) at scala.collection.AbstractMap.apply(Map.scala:420) at org.apache.spark.sql.pipelines.graph.GraphExecution.incrementFlowToNumConsecutiveFailure(GraphExecution.scala:52) at org.apache.spark.sql.pipelines.graph.GraphExecution.$anonfun$planAndStartFlow$1(GraphExecution.scala:92) at org.apache.spark.sql.pipelines.graph.GraphExecution.$anonfun$planAndStartFlow$1$adapted(GraphExecution.scala:90) at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:484) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Run pipeline manually with flow failure to test. ### Was this patch authored or co-authored using generative AI tooling? No Closes #52094 from JiaqiWang18/sdp-graphexecution-incrementflow-map-default. Authored-by: Jacky Wang <jacky.wang@databricks.com> Signed-off-by: Gengliang Wang <gengliang@apache.org>
1 parent 183f972 commit 4ba38c2

File tree

2 files changed

+34
-2
lines changed

2 files changed

+34
-2
lines changed

sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphExecution.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,10 @@ abstract class GraphExecution(
4949

5050
/** Increments flow execution retry count for `flow`. */
5151
private def incrementFlowToNumConsecutiveFailure(flowIdentifier: TableIdentifier): Unit = {
52-
flowToNumConsecutiveFailure.put(flowIdentifier, flowToNumConsecutiveFailure(flowIdentifier) + 1)
52+
flowToNumConsecutiveFailure.updateWith(flowIdentifier) {
53+
case Some(count) => Some(count + 1)
54+
case None => Some(1)
55+
}
5356
}
5457

5558
/**

sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/graph/TriggeredGraphExecutionSuite.scala

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, TableC
2626
import org.apache.spark.sql.execution.streaming.runtime.MemoryStream
2727
import org.apache.spark.sql.pipelines.common.{FlowStatus, RunState}
2828
import org.apache.spark.sql.pipelines.graph.TriggeredGraphExecution.StreamState
29-
import org.apache.spark.sql.pipelines.logging.EventLevel
29+
import org.apache.spark.sql.pipelines.logging.{EventLevel, FlowProgress}
3030
import org.apache.spark.sql.pipelines.utils.{ExecutionTest, TestGraphRegistrationContext}
3131
import org.apache.spark.sql.test.SharedSparkSession
3232
import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
@@ -1027,4 +1027,33 @@ class TriggeredGraphExecutionSuite extends ExecutionTest with SharedSparkSession
10271027
}
10281028
)
10291029
}
1030+
1031+
test("consecutive failure event level is correct") {
1032+
val session = spark
1033+
import session.implicits._
1034+
1035+
val pipelineDef = new TestGraphRegistrationContext(spark) {
1036+
registerMaterializedView(
1037+
"retry_test",
1038+
partitionCols = Some(Seq("nonexistent_col")),
1039+
query = dfFlowFunc(spark.range(5).withColumn("id_mod", ($"id" % 2).cast("int")))
1040+
)
1041+
}
1042+
1043+
val graph = pipelineDef.toDataflowGraph
1044+
val updateContext = TestPipelineUpdateContext(spark, graph)
1045+
updateContext.pipelineExecution.runPipeline()
1046+
updateContext.pipelineExecution.awaitCompletion()
1047+
1048+
val failedEvents = updateContext.eventBuffer.getEvents.filter { e =>
1049+
e.details.isInstanceOf[FlowProgress] &&
1050+
e.details.asInstanceOf[FlowProgress].status == FlowStatus.FAILED
1051+
}
1052+
1053+
val warnCount = failedEvents.count(_.level == EventLevel.WARN)
1054+
// flowToNumConsecutiveFailure controls that the last failure should be logged as ERROR
1055+
val errorCount = failedEvents.count(_.level == EventLevel.ERROR)
1056+
1057+
assert(warnCount == 2 && errorCount == 1)
1058+
}
10301059
}

0 commit comments

Comments
 (0)