Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Queries are very memory intensive due to low read parallelism in HoodieMergeOnReadRDD #12434

Open
mzheng-plaid opened this issue Dec 5, 2024 · 5 comments

Comments

@mzheng-plaid
Copy link

Describe the problem you faced

We have jobs that read from a MOR table using the following pyspark pseudo-code (event_table_rt is the MOR table):

partitions = ["2023-11-13", "2023-11-14", "2023-11-15", "2023-11-16", "2023-11-17"]
event_df = spark.sql("select * from event_table_rt").filter(
    F.col("dt").isin(partitions)
)
user_df = spark.read.format("csv").option("header", "true").load(users_path)
filtered_events_df = df.join(
    F.broadcast(user_df),
    on=df["user_id"] == user_df["id"],
    how="inner",
)
filtered_events_df.write.format("parquet").save("s3://...")

We're running into a bottleneck on HoodieMergeOnReadRDD (https://github.com/apache/hudi/blob/release-0.14.2/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieMergeOnReadRDD.scala#L37) where the number of tasks in the stage reading event_df seems to be non-configurable and (I think) equal to the number of files being read. This is causing massive disk/memory spill and bottlenecking performance.

Is it possible to configure the read parallelism to be higher or is this a fundamental limitation of Hudi with MOR tables? What is the recommendation for how to tune resourcing for readers of MOR tables?

Environment Description

  • Hudi version : 0.14.1-amzn-1 (EMR 7.2.0)

  • Spark version : 3.5.1

  • Hive version : 3.1.3

  • Hadoop version : 3.3.6

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

@ad1happy2go
Copy link
Collaborator

@mzheng-plaid The parallelism should be equal to the number of file groups as we can have one task reading one parquet file. Want to understand more if the parquet files / log files are properly sized then whey you will face bottleneck at task level?

@mzheng-plaid
Copy link
Author

We set these to:

            "hoodie.parquet.max.file.size": (512 * 1024 * 1024),  # 512 mb
            "hoodie.parquet.block.size": (
                512 * 1024 * 1024
            ),  # 512 mb
            "hoodie.parquet.small.file.limit": (256 * 1024 * 1024),

So with parquet there is spark.sql.files.maxPartitionBytes and if I'm understanding apache/iceberg#8922 (comment) correctly Iceberg also supports split size - does Hudi have any similar support for reading large files? As far as I can tell spark.sql.files.maxPartitionBytes is ignored

@ad1happy2go
Copy link
Collaborator

@mzheng-plaid I dont think HoodieMergeOnReadRDD has a way to split filegroups further during read. Any way it will be difficult with snapshot read, as log files has to be applied on the parquet records.

@mzheng-plaid
Copy link
Author

This is problematic even on the read optimized table (ie. just the base parquet files), which is really surprising

I tried:

  1. A read-optimized query on the Hudi table
  2. Calling spark.read.format("parquet").load({s3_path})

And just reading the parquet files directly was much less memory intensive and faster (ie. not spilling to disk) when I tuned spark.sql.files.maxPartitionBytes. I understand this will read multiple versions of the file groups but its surprising how much worse read performance is with Hudi.

@mzheng-plaid
Copy link
Author

@ad1happy2go bump on this, is there any workaround for read-optimized queries? That behavior is surprising

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
Status: Awaiting Triage
Development

No branches or pull requests

2 participants