-
Notifications
You must be signed in to change notification settings - Fork 28.8k
[SPARK-53421][SPARK-53377][SDP] Propagate Logical Plan ID in SDP Analysis #52121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-53421][SPARK-53377][SDP] Propagate Logical Plan ID in SDP Analysis #52121
Conversation
394a29e
to
2a7fdc0
Compare
...nnect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala
Outdated
Show resolved
Hide resolved
...nnect/server/src/test/scala/org/apache/spark/sql/connect/pipelines/PythonPipelineSuite.scala
Outdated
Show resolved
Hide resolved
// Spark Connect requires the PLAN_ID_TAG to be propagated to the resolved plan | ||
// to allow correct analysis of the parent plan that contains this subquery | ||
u.getTagValue(LogicalPlan.PLAN_ID_TAG).foreach( | ||
id => resolved.setTagValue(LogicalPlan.PLAN_ID_TAG, id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we only attach the plan id to the top level resolved
logical plan. I am wondering if this is sufficient, do we need to also attach it to the children of resolved
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, will wait for someone else to chime in
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the plan id tag is only needed to be in the root plan of a DataFrame
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to avoid other potential issues, can we copy all tree node tags from the original plan?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for normal queries, if the DataFrame is spark.table("view_name")
, the analyzer also copies all the tags from UnresolvedRelation
to the root node of the resolved view query plan,.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan We were directly using setTagValue(LogicalPlan.PLAN_ID_TAG ..)
instead of using copyTagsFrom
because the reslved
plan contains a dataset_id
tag and copyTagsFrom
expects the destination to not contain any tags (src).
I think it make sense to also copy over other tags, so I introduced a new mergeTagsFrom
method. I don't expect there will be tags that are defined in both and have conflicting values.
@@ -434,6 +434,66 @@ class PythonPipelineSuite | |||
.map(_.identifier) == Seq(graphIdentifier("a"), graphIdentifier("something"))) | |||
} | |||
|
|||
test("groupby and rollup works with internal datasets (col, str, index)") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
does the test actually try index?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
referencing index is still doing eager analysis, and we disallow this behavior in query functions
@dp.materialized_view
def groupby_with_index_result():
return spark.read.table("src").groupBy(1).agg(
sum(col(id_col)).alias("sum_id"),
count("*").alias("cnt")
)
will reachout to spark connect and check with them
|
||
// groupBy: each variant should have exactly one row per id [0,1,2] | ||
groupbyDfs.foreach { df => | ||
assert(df.select("id").collect().map(_.getLong(0)).toSet == Set(0L, 1L, 2L)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use PipelineTest.checkAnswer
in all the tests instead of directly calling df.collect
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just realized we actually don't extend PipelineTest
in any of the child of SparkDeclarativePipelinesServerTest
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah didn't realize either. That's fine then, not worth refactoring or changing inheritance just for it
// Spark Connect requires the PLAN_ID_TAG to be propagated to the resolved plan | ||
// to allow correct analysis of the parent plan that contains this subquery | ||
u.getTagValue(LogicalPlan.PLAN_ID_TAG).foreach( | ||
id => resolved.setTagValue(LogicalPlan.PLAN_ID_TAG, id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, will wait for someone else to chime in
// to allow correct analysis of the parent plan that contains this subquery | ||
u.getTagValue(LogicalPlan.PLAN_ID_TAG).foreach( | ||
id => resolved.setTagValue(LogicalPlan.PLAN_ID_TAG, id) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, we are directly using setTagValue(LogicalPlan.PLAN_ID_TAG ..)
instead of using copyTagsFrom
because the reslved
plan contains a dataset_id
tag and copyTagsFrom
expects the destination to not contain any tags (src).
cc @cloud-fan |
reason for adding mergeTagsFrom |
thanks, merging to master! |
What changes were proposed in this pull request?
Propagate
LogicalPlan.PLAN_ID_TAG
to the resolved logical plan during SDP analysis so when the whole plan is sent to Spark for analysis, it contains the correct plan id.Why are the changes needed?
Spark Connect attaches a plan id to each logical plan. In SDP, we take part of the logical plan and analyze it independently to resolve table references correctly. When this happens, the logical plan id is lost which causes resolution errors when the plan is sent to Spark for complete analysis.
For example, group by and rollup functions would fail with
sql.AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like df1.select(df2.col("a"))
This happens because we take the below unresolved logical plan:
Perform independent analysis on the
UnresolvedRelation
part to identify the table. During this analysis, the plan id is lost.So when the above partially resolved logical plan is sent to Spark for analysis, it tries to resolve the
id
attribute in the aggregate operation with respect to theSubqueryAlias
children, and fails because the children no longer contains the same plan id.Does this PR introduce any user-facing change?
No
How was this patch tested?
Tests
Was this patch authored or co-authored using generative AI tooling?