Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-8824] MIT should error out for some assignment clause patterns #12584

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

Davis-Zhang-Onehouse
Copy link
Contributor

@Davis-Zhang-Onehouse Davis-Zhang-Onehouse commented Jan 7, 2025

Change Logs

For insert and update clause of MIT, in some cases if we didn't set the primary key/precombine field value explicitly, MIT errors out in the query analyze phase.

Also some changes that make sure the precombine key value data type is the same over source and target table (an irrelevant test maintenance)

Impact

Better user facing error message and guard against cases we don't support.

Risk level (write none, low medium or high below)

none

Documentation Update

We track MIT doc update all in https://issues.apache.org/jira/browse/HUDI-8527

Contributor's checklist

  • Read through contributor's guide
  • Change Logs and Impact were stated clearly
  • Adequate tests were added if applicable
  • CI passed

@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Jan 7, 2025
@Davis-Zhang-Onehouse Davis-Zhang-Onehouse force-pushed the HUDI-8824 branch 3 times, most recently from 538edc9 to 441d8eb Compare January 7, 2025 21:47
Comment on lines 485 to 491
val isPartialUpdateAction = (targetTableType == MOR_TABLE_TYPE_OPT_VAL
&& UPSERT_OPERATION_OPT_VAL == getOperationType(parameters)
&& parameters.getOrElse(
ENABLE_MERGE_INTO_PARTIAL_UPDATES.key,
ENABLE_MERGE_INTO_PARTIAL_UPDATES.defaultValue.toString).toBoolean
&& updatingActions.nonEmpty)
isPartialUpdateAction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: inline the statement

@@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
assert(insert.assignments.length <= targetTableSchema.length,
s"The number of insert assignments[${insert.assignments.length}] must be less than or equal to the " +
s"targetTable field size[${targetTableSchema.length}]"))

// Precombine field and primary key field must be present in the assignment clause of all insert actions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Precombine field and primary key field must be present in the assignment clause of all insert actions.
// Precombine field and record key field must be present in the assignment clause of all insert actions.

sparkSession.sessionState.conf.resolver,
mergeInto.targetTable,
Seq(field),
"pre-combine field",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"pre-combine field",
"precombine field",

if (hoodieCatalogTable.preCombineKey.isEmpty && updateActions.nonEmpty) {
logWarning(s"Updates without precombine can have nondeterministic behavior")
logWarning(s"Updates without pre-combine can have nondeterministic behavior")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to change

@@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie
assert(insert.assignments.length <= targetTableSchema.length,
s"The number of insert assignments[${insert.assignments.length}] must be less than or equal to the " +
s"targetTable field size[${targetTableSchema.length}]"))

// Precombine field and primary key field must be present in the assignment clause of all insert actions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the insert action should always include all columns, so such check may not be necessary, i.e., missing any column would lead to insert failure already?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep the checks so it's strict.

sparkSession.sessionState.conf.resolver,
mergeInto.targetTable,
hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty),
"primaryKey field",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use record key field to be consistent

// Precombine field and primary key field must be present in the assignment clause of all insert actions.
// Check has no effect if we don't have such fields in target table or we don't have insert actions
insertActions.foreach(action =>
hoodieCatalogTable.preCombineKey.foreach(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's only validate precombine if the merge mode is EVENT_TIME_ORDERING.

@@ -63,10 +63,10 @@ select id, name, price, cast(dt as string) from h0_p;
# CREATE TABLE

create table h1 (
id bigint,
id int,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this change?

@@ -33,7 +33,7 @@ class TestMergeIntoLogOnlyTable extends HoodieSparkSqlTestBase {
| id int,
| name string,
| price double,
| ts long
| ts int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar here

@@ -1232,7 +1233,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo
| id int,
| name string,
| value int,
| ts long
| ts int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not change the types of precombine field.

@@ -40,7 +40,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo
| id int,
| name string,
| price double,
| ts long
| ts int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we also have "(int) 0" as the natural ordering, making this integer can mask issues around precombine field and merging logic.

@@ -1051,7 +1052,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo
s"""
| merge into $tableName
| using (
| select 1 as id, 'a1' as name, 10 as price, $dataValue as c, '1' as flag
| select 1 as id, 'a1' as name, 10 as price, cast($dataValue as $dataType) as c, '1' as flag
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use 1L if there is any type mismatch in the tests. The user is expected to provide the data with the correct types.

Seq("cow", "mor").foreach { tableType =>
withRecordType()(withTempDir { tmp =>
log.info(s"Testing table type $tableType")
spark.sql(s"set ${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = false")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should unset the config after the test or set the default of the config so that it does not affect other tests in the same Spark sessions (since this SET statement has global impact).

spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, '2021-03-21')")

// Test 1: Update statements where at least one misses primary key assignment
if (tableType.equals("mor")) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also test the same SQL statements on COW table and make sure they pass with data validation (i.e., the case where the new constraint does not apply)?

@Davis-Zhang-Onehouse Davis-Zhang-Onehouse force-pushed the HUDI-8824 branch 3 times, most recently from 3f5a535 to 631494d Compare January 9, 2025 18:41
@hudi-bot
Copy link

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
size:L PR with lines of changes in (300, 1000]
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants