Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT]Checkpoint Fails with Timeout while waiting for instant initialize When Migrating Data from Kudu to Hudi COW Bucketed Table #12589

Open
pengxianzi opened this issue Jan 7, 2025 · 11 comments

Comments

@pengxianzi
Copy link

pengxianzi commented Jan 7, 2025

Problem Description
We encountered the following issues while using Apache Hudi for data migration and real-time writing:

Scenario 1:

Migrating data from Kudu to a Hudi MOR bucketed table, then writing data from MySQL via Kafka to the Hudi MOR bucketed table works fine.

Scenario 2:

Migrating data from Kudu to a Hudi COW bucketed table, then writing data from MySQL via Kafka to the Hudi COW bucketed table fails to generate commits, and the checkpoint fails.

Error Log
Here is the error log when the task fails:

org.apache.flink.streaming.runtime.tasks.AsynccheckpointRunnable []. bucket_write default_databases.table_cow ardemmewlcos part of checkpoint 1 could not be completed.

java.util.concurrent.cancellationException:null

org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 2 for operator bucket_write: default_database.table_con (3/4). failure reason: Checkpoint was declined.

Caused by :org.apache.hudi.exception.HoodieException:Timeout(1201000ms) while waiting for instant initialize

switched from RUNNING to FAILED with failure cause: jave.io.I0Exception: could not perforn checkpoint 2 for operator bucket_write: default_database.table_cow(3/4)#0.

Configuration Parameters
Here is our Hudi table configuration:

options.put("hoodie.upsert.shuffle.parallelism", "20");
options.put("hoodie.insert.shuffle.parallelism", "20");
options.put("write.operation", "upsert");
options.put(FlinkOptions.TABLE_TYPE.key(), name);
options.put(FlinkOptions.PRECOMBINE_FIELD.key(),precombing);
options.put(FlinkOptions.PRE_COMBINE.key(), "true");
options.put("hoodie.clean.automatic", "true");
options.put("hoodie.cleaner.policy", "KEEP_LATEST_COMMITS");
options.put("hoodie.cleaner.commits.retained", "5");
options.put("hoodie.clean.async", "true");
options.put("hoodie.archive.min.commits", "20");
options.put("hoodie.archive.max.commits", "30");
options.put("hoodie.clean.parallelism", "20");
options.put("hoodie.archive.parallelism", "20");
options.put("hoodie.write.concurrency.mode","optimistic_concurrency_control");
options.put("write.tasks", "20");
options.put("index.type","BUCKET");
options.put("hoodie.bucket.index.num.buckets","80");
options.put("hoodie.index.bucket.engine","SIMPLE");

Checkpoint Configuration
We tested various checkpoint timeout and interval configurations, but the issue persists:

env.getCheckpointConfig().setCheckpointTimeout(5601000L);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(60*1000L);

Steps to Reproduce
Migrate data from Kudu to a Hudi COW bucketed table.

Write data from MySQL via Kafka to the Hudi COW bucketed table.

The task fails with the error Timeout while waiting for instant initialize.

Expected Behavior
The task should generate commits normally, and the checkpoint should succeed.

Actual Behavior
The task fails, no commits are generated, and the checkpoint fails with an error.

Hudi version : 0.14.0

Spark version : 2.4.7

Hive version : 3.1.3

Hadoop version : 3.1.1
Further Questions
Checkpoint Timeout Issue:
The error log mentions Timeout while waiting for instant initialize. Is this related to the initialization mechanism of the Hudi COW table? Are there ways to optimize the initialization time?

COW Table Write Performance:
Is the write performance of COW tables slower than MOR tables? Are there optimization suggestions for COW tables?

Impact of Bucketed Table:
Does the bucketed table have a specific impact on the write performance of COW tables? Are there optimization configurations for bucketed tables?

Checkpoint Configuration:
We tried various checkpoint timeout and interval configurations, but the issue persists. Are there recommended checkpoint configurations?

Summary
We would like to know:

Why does the Hudi COW bucketed table encounter checkpoint timeout issues during writes?

Are there optimization suggestions for COW table write performance?

Does the bucketed table have a specific impact on COW table write performance?

Are there recommended checkpoint configurations?

Thank you for your help!

@danny0405
Copy link
Contributor

@pengxianzi How may records are there in the Kudu table, can you switch to mor table instead, mor is more suitable for large table.

And if possible, can you upgrade to 0.14.1 or 0.15.0 which fix some stability bugs.

@pengxianzi
Copy link
Author

pengxianzi commented Jan 7, 2025

@pengxianzi How may records are there in the Kudu table, can you switch to mor table instead, mor is more suitable for large table.

And if possible, can you upgrade to 0.14.1 or 0.15.0 which fix some stability bugs.

Thank you for your response! Here is our further feedback:

Regarding Upgrading Hudi Version:
We are currently unable to upgrade the Hudi version to resolve the issue, as our production environment relies on version 0.14.0.

Regarding Data Volume:
The data volume migrated from Kudu to the Hudi COW bucketed table is typically in the tens of millions. We tried increasing the number of buckets and parallelism, and kept the file size within each bucket below 100MB, but the write still fails.

Regarding Switching to MOR Table:
We also have a MOR table with hundreds of millions of records, divided into 1000 buckets, but it encounters similar issues. Therefore, the problem does not seem to be related to the table type (COW or MOR).

Regarding the Impact of Kudu Table:
We believe this issue is not related to the Kudu table, as we have smaller tables that can be written normally. The problem seems to primarily occur with large tables.

@danny0405
Copy link
Contributor

danny0405 commented Jan 7, 2025

For migration, maybe you can use the bulk_insert to write the history dataset from Kudu in batch execution mode, you can then ingest into this Hudi table switching to the upsert operation.

The write is slow for cow because for each checkpointing, the cow would trigger a whole table rewrite, this is also true for mor compaction.

So maybe you can migrate the existing data set from Kudu using the bulk_insert operation, and do streaming upsert with the incremental inputs. If the data set itself is huge, partition table using datetime should also be helpful because that would reduce the scope for rewrite significantly.

@pengxianzi
Copy link
Author

For migration, maybe you can use the bulk_insert to write the history dataset from Kudu in batch execution mode, you can then ingest into this Hudi table switching to the upsert operation.

The write is slow for cow because for each checkpointing, the cow would trigger a whole table rewrite, this is also true for mor compaction.

So maybe you can migrate the existing data set from Kudu using the bulk_insert operation, and do streaming upsert with the incremental inputs. If the data set itself is huge, partition table using datetime should also be helpful because that would reduce the scope for rewrite significally.

Thank you for your suggestions! Our current approach for large tables aligns with your recommendations:

We use the bulk_insert operation to migrate historical data from Kudu to the Hudi table.

We switch to the upsert operation for incremental data writes.

However, we encountered the following issues during implementation:

Necessity of Bucketing: Without bucketing, data duplication occurs during Flink writes. Only after adding bucketing is the data duplication issue resolved.

@danny0405
Copy link
Contributor

you are right, bulk_insert also supports bucket index.

@pengxianzi
Copy link
Author

you are right, bulk_insert also supports bucket index.

We are still facing write failure issues.
How to solve this problem

@danny0405
Copy link
Contributor

what kind of failures did you encounter?

@pengxianzi
Copy link
Author

@danny0405 I couldn't find the reason. The error log indicates that the checkpoint failed

org.apache.flink.streaming.runtime.tasks.AsynccheckpointRunnable []. bucket_write default_databases.loan_withdraw_order_new_cow ardemmewlcos part of checkpoint 1 could not be completed.
java.util.concurrent.cancellationException:null
at java.util.concurrent.FutureTask.report(FutureTask.java:122) [? : 1.8.0_232]
at java.util.concurrent.FutureTask.get(FutureTask.java:192) [? : 1.8.0_232]
at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:636) [flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.(OperatorSnapshotFinalizer.java:60) [flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointRunnable.run(SubtaskCheckpointRunnable.java:128) [flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [? : 1.8.0_232]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [? : 1.8.0_232]
at java.lang.Thread.run(Thread.java:748) [? : 1.8.0_232]

org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 2 for operator bucket_write: default_database.lom_withdraw_order_new_con (3/4). failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264) [flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169) [flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371) [flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706) [flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627) [flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590) [flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312) [flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$13(StreamTask.java:1089) [flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) [flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1084) [flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:903) [flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135) [flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.processBarrier(CheckpointBarrierHandler.java:250) [flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:61) [flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:231) [flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]

Caused by :org.apache.hudi.exception.HoodieException:Timeout(1201000ms) while waiting for instant initialize

at org.apache.hudi.sink.utils.TimeWait.waitFor(TimeWait.java:57) ~[hudi-flink1.13-bundle-0.14.0.jar:0.14.0]
at org.apache.hudi.sink.common.AbstractStreamWriteFunction.instantToWrite(AbstractStreamWriteFunction.java:269) ~[hudi-flink1.13-bundle-0.14.0.jar:0.14.0]
at org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:452) ~[hudi-flink1.13-bundle-0.14.0.jar:0.14.0]
at org.apache.hudi.sink.StreamWriteFunction.snapshotState(StreamWriteFunction.java:137) ~[hudi-flink1.13-bundle-0.14.0.jar:0.14.0]
at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.snapshotState(BucketStreamWriteFunction.java:101) ~[hudi-flink1.13-bundle-0.14.0.jar:0.14.0]
at org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:167) ~[hudi-flink1.13-bundle-0.14.0.jar:0.14.0]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118) ~[flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99) ~[flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89) ~[flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:218) ~[flink-1.0-SNAPSHOT-jar-with-dependencies.jar:?]... 29 more

switched from RUNNING to FAILED with failure cause: jave.io.I0Exception: could not perforn checkpoint 2 for operator bucket_write: default_database.loan_withdraw_order_new_cow(3/4)#0.

at org.apache.flink.streaming.runtime.tasks.streamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:298)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.accessId@stringLcheckpointBarrierHandler.java:61)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandlerControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:43)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstrackAlignmentBarrierHandlerState.DarrierReceivedAbstractsLineOfBarrierHandlerState.java:61)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
at org.apache.flink.streaming.runtime.io.AbstrackStreamTaskMethodFunction.emitNext(AbstrackStreamTaskMethodFunction.java:158)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.streamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailLoopProcessor.runMailBoxLoop(AllDownProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailBoxLoop(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:616)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMitChCleanuponMail(StreamTask.java:647)

@pengxianzi
Copy link
Author

Files Generated in HDFS but No Commits: During task execution, new files are generated in HDFS, but there are no commit records in the Hudi CLI.

Checkpoint Timeout Failure: Flink checkpoints timeout and fail without specific reasons in the logs. Extending the checkpoint interval may allow successful writes, but this reduces data update frequency and affects real-time performance.

Why are new files generated in HDFS, but no commit records appear in the Hudi CLI?

How can we solve the checkpoint timeout issue and ensure successful writes while maintaining data real-time performance?

@pengxianzi pengxianzi changed the title Checkpoint Fails with Timeout while waiting for instant initialize When Migrating Data from Kudu to Hudi COW Bucketed Table [SUPPORT]Checkpoint Fails with Timeout while waiting for instant initialize When Migrating Data from Kudu to Hudi COW Bucketed Table Jan 8, 2025
@pengxianzi
Copy link
Author

@danny0405
We may have discovered a new issue. Previously, some large tables could be written successfully. However, there is now an MOR table with over 100 million records, and its table structure includes some composite primary keys. I configured it as follows:

builder.pk(primaryKey)
.options(options);

primaryKey=id,age,name,address,phone,email

However, I am unsure if this configuration is effective. I couldn't find any documentation regarding composite primary keys on the Hudi official website. Additionally, when checking through the Hudi CLI, the total records written and total updates written counts are the same, which makes me suspect that the composite primary key configuration has not been applied correctly.

The Flink job is running normally, but no files are being written.

Looking forward to your response.

@danny0405
Copy link
Contributor

You can check the hoodie.properties file for the option hoodie.table.recordkey.fields for the record key fields.

The COW bucket index table is prone to timed out because each checkpointing triggers a rewrite to the whole table, if the table is not well partitioned.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants