-
Notifications
You must be signed in to change notification settings - Fork 28.8k
[SPARK-52346][SDP] Fix counter initialization and increment logic for flow retries #52094
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-52346][SDP] Fix counter initialization and increment logic for flow retries #52094
Conversation
@SCHJonathan @gengliangwang for review, thanks! |
Also @anishm-db |
Looks good to me. |
flowToNumConsecutiveFailure.put(flowIdentifier, flowToNumConsecutiveFailure(flowIdentifier) + 1) | ||
flowToNumConsecutiveFailure.updateWith(flowIdentifier) { | ||
case Some(count) => Some(count + 1) | ||
case None => Some(1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JiaqiWang18 shall we have a unit test for this one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added unit test "consecutive failure event level is correct". flowToNumConsecutiveFailure
controls the log level and if it is incremented correctly, we should only see the last retry execution event having ERROR
level and all previous events having WARN
level .
val session = spark | ||
import session.implicits._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious, what are we using from the implicits in this test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is used in
($"id" % 2).cast("int")
Thanks, merging to master |
What changes were proposed in this pull request?
Fixes flow failure counter increments by switching to
updateWith
. This prevents first increment from throwingNoSuchElementException
when key does not exist.Why are the changes needed?
Current impl
flowToNumConsecutiveFailure(flowIdentifier)
throws exception when a flow is retried for first time:Does this PR introduce any user-facing change?
No
How was this patch tested?
Run pipeline manually with flow failure to test.
Was this patch authored or co-authored using generative AI tooling?
No