Skip to content

Conversation

JiaqiWang18
Copy link
Contributor

@JiaqiWang18 JiaqiWang18 commented Aug 25, 2025

What changes were proposed in this pull request?

Propagate LogicalPlan.PLAN_ID_TAG to the resolved logical plan during SDP analysis so when the whole plan is sent to Spark for analysis, it contains the correct plan id.

Why are the changes needed?

Spark Connect attaches a plan id to each logical plan. In SDP, we take part of the logical plan and analyze it independently to resolve table references correctly. When this happens, the logical plan id is lost which causes resolution errors when the plan is sent to Spark for complete analysis.

For example, group by and rollup functions would fail with sql.AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like df1.select(df2.col("a"))

from pyspark.sql.functions import col, sum, count

@dp.materialized_view
def groupby_result():
    return spark.read.table("src").groupBy("id").count()

This happens because we take the below unresolved logical plan:

'Aggregate ['id], ['id, 'count(1) AS count#7]
+- 'UnresolvedRelation [src], [], false

Perform independent analysis on the UnresolvedRelation part to identify the table. During this analysis, the plan id is lost.

'Aggregate ['id], ['id, 'count(1) AS count#7]
+- SubqueryAlias spark_catalog.default.src        
   +- Relation spark_catalog.default.src[id#9L] parquet

So when the above partially resolved logical plan is sent to Spark for analysis, it tries to resolve the id attribute in the aggregate operation with respect to the SubqueryAlias children, and fails because the children no longer contains the same plan id.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Tests

Was this patch authored or co-authored using generative AI tooling?

@JiaqiWang18 JiaqiWang18 force-pushed the SPARK-53377-sdp-groupBy-rollup-tests branch from 394a29e to 2a7fdc0 Compare August 25, 2025 21:49
@JiaqiWang18
Copy link
Contributor Author

@anishm-db @gengliangwang

@github-actions github-actions bot removed the PYTHON label Aug 27, 2025
@JiaqiWang18 JiaqiWang18 changed the title [SPARK-53377][SDP] Tests for groupby and rollup [SPARK-53421][SPARK-53377][SDP] Propagate Logical Plan ID in SDP Analysis Aug 28, 2025
// Spark Connect requires the PLAN_ID_TAG to be propagated to the resolved plan
// to allow correct analysis of the parent plan that contains this subquery
u.getTagValue(LogicalPlan.PLAN_ID_TAG).foreach(
id => resolved.setTagValue(LogicalPlan.PLAN_ID_TAG, id)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we only attach the plan id to the top level resolved logical plan. I am wondering if this is sufficient, do we need to also attach it to the children of resolved?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, will wait for someone else to chime in

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the plan id tag is only needed to be in the root plan of a DataFrame

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to avoid other potential issues, can we copy all tree node tags from the original plan?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for normal queries, if the DataFrame is spark.table("view_name"), the analyzer also copies all the tags from UnresolvedRelation to the root node of the resolved view query plan,.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan We were directly using setTagValue(LogicalPlan.PLAN_ID_TAG ..) instead of using copyTagsFrom because the reslved plan contains a dataset_id tag and copyTagsFrom expects the destination to not contain any tags (src).

I think it make sense to also copy over other tags, so I introduced a new mergeTagsFrom method. I don't expect there will be tags that are defined in both and have conflicting values.

@@ -434,6 +434,66 @@ class PythonPipelineSuite
.map(_.identifier) == Seq(graphIdentifier("a"), graphIdentifier("something")))
}

test("groupby and rollup works with internal datasets (col, str, index)") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the test actually try index?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

referencing index is still doing eager analysis, and we disallow this behavior in query functions

@dp.materialized_view
def groupby_with_index_result():
    return spark.read.table("src").groupBy(1).agg(
        sum(col(id_col)).alias("sum_id"),
        count("*").alias("cnt")
    )

will reachout to spark connect and check with them


// groupBy: each variant should have exactly one row per id [0,1,2]
groupbyDfs.foreach { df =>
assert(df.select("id").collect().map(_.getLong(0)).toSet == Set(0L, 1L, 2L))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use PipelineTest.checkAnswer in all the tests instead of directly calling df.collect

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized we actually don't extend PipelineTest in any of the child of SparkDeclarativePipelinesServerTest

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah didn't realize either. That's fine then, not worth refactoring or changing inheritance just for it

// Spark Connect requires the PLAN_ID_TAG to be propagated to the resolved plan
// to allow correct analysis of the parent plan that contains this subquery
u.getTagValue(LogicalPlan.PLAN_ID_TAG).foreach(
id => resolved.setTagValue(LogicalPlan.PLAN_ID_TAG, id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, will wait for someone else to chime in

// to allow correct analysis of the parent plan that contains this subquery
u.getTagValue(LogicalPlan.PLAN_ID_TAG).foreach(
id => resolved.setTagValue(LogicalPlan.PLAN_ID_TAG, id)
)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we are directly using setTagValue(LogicalPlan.PLAN_ID_TAG ..) instead of using copyTagsFrom because the reslved plan contains a dataset_id tag and copyTagsFrom expects the destination to not contain any tags (src).

@JiaqiWang18
Copy link
Contributor Author

cc @cloud-fan

@JiaqiWang18
Copy link
Contributor Author

reason for adding mergeTagsFrom

@JiaqiWang18 JiaqiWang18 requested a review from cloud-fan August 29, 2025 05:20
@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in c459d71 Sep 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants