-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[HUDI-8824] MIT should error out for some assignment clause patterns #12584
base: master
Are you sure you want to change the base?
[HUDI-8824] MIT should error out for some assignment clause patterns #12584
Conversation
538edc9
to
441d8eb
Compare
val isPartialUpdateAction = (targetTableType == MOR_TABLE_TYPE_OPT_VAL | ||
&& UPSERT_OPERATION_OPT_VAL == getOperationType(parameters) | ||
&& parameters.getOrElse( | ||
ENABLE_MERGE_INTO_PARTIAL_UPDATES.key, | ||
ENABLE_MERGE_INTO_PARTIAL_UPDATES.defaultValue.toString).toBoolean | ||
&& updatingActions.nonEmpty) | ||
isPartialUpdateAction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: inline the statement
@@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie | |||
assert(insert.assignments.length <= targetTableSchema.length, | |||
s"The number of insert assignments[${insert.assignments.length}] must be less than or equal to the " + | |||
s"targetTable field size[${targetTableSchema.length}]")) | |||
|
|||
// Precombine field and primary key field must be present in the assignment clause of all insert actions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// Precombine field and primary key field must be present in the assignment clause of all insert actions. | |
// Precombine field and record key field must be present in the assignment clause of all insert actions. |
sparkSession.sessionState.conf.resolver, | ||
mergeInto.targetTable, | ||
Seq(field), | ||
"pre-combine field", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"pre-combine field", | |
"precombine field", |
if (hoodieCatalogTable.preCombineKey.isEmpty && updateActions.nonEmpty) { | ||
logWarning(s"Updates without precombine can have nondeterministic behavior") | ||
logWarning(s"Updates without pre-combine can have nondeterministic behavior") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to change
@@ -802,27 +813,55 @@ case class MergeIntoHoodieTableCommand(mergeInto: MergeIntoTable) extends Hoodie | |||
assert(insert.assignments.length <= targetTableSchema.length, | |||
s"The number of insert assignments[${insert.assignments.length}] must be less than or equal to the " + | |||
s"targetTable field size[${targetTableSchema.length}]")) | |||
|
|||
// Precombine field and primary key field must be present in the assignment clause of all insert actions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume the insert action should always include all columns, so such check may not be necessary, i.e., missing any column would lead to insert failure already?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's keep the checks so it's strict.
sparkSession.sessionState.conf.resolver, | ||
mergeInto.targetTable, | ||
hoodieCatalogTable.tableConfig.getRecordKeyFields.orElse(Array.empty), | ||
"primaryKey field", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use record key field to be consistent
// Precombine field and primary key field must be present in the assignment clause of all insert actions. | ||
// Check has no effect if we don't have such fields in target table or we don't have insert actions | ||
insertActions.foreach(action => | ||
hoodieCatalogTable.preCombineKey.foreach( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's only validate precombine if the merge mode is EVENT_TIME_ORDERING
.
@@ -63,10 +63,10 @@ select id, name, price, cast(dt as string) from h0_p; | |||
# CREATE TABLE | |||
|
|||
create table h1 ( | |||
id bigint, | |||
id int, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this change?
@@ -33,7 +33,7 @@ class TestMergeIntoLogOnlyTable extends HoodieSparkSqlTestBase { | |||
| id int, | |||
| name string, | |||
| price double, | |||
| ts long | |||
| ts int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar here
@@ -1232,7 +1233,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo | |||
| id int, | |||
| name string, | |||
| value int, | |||
| ts long | |||
| ts int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not change the types of precombine field.
@@ -40,7 +40,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo | |||
| id int, | |||
| name string, | |||
| price double, | |||
| ts long | |||
| ts int |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we also have "(int) 0" as the natural ordering, making this integer can mask issues around precombine field and merging logic.
@@ -1051,7 +1052,7 @@ class TestMergeIntoTable extends HoodieSparkSqlTestBase with ScalaAssertionSuppo | |||
s""" | |||
| merge into $tableName | |||
| using ( | |||
| select 1 as id, 'a1' as name, 10 as price, $dataValue as c, '1' as flag | |||
| select 1 as id, 'a1' as name, 10 as price, cast($dataValue as $dataType) as c, '1' as flag |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use 1L
if there is any type mismatch in the tests. The user is expected to provide the data with the correct types.
Seq("cow", "mor").foreach { tableType => | ||
withRecordType()(withTempDir { tmp => | ||
log.info(s"Testing table type $tableType") | ||
spark.sql(s"set ${DataSourceWriteOptions.ENABLE_MERGE_INTO_PARTIAL_UPDATES.key} = false") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should unset the config after the test or set the default of the config so that it does not affect other tests in the same Spark sessions (since this SET
statement has global impact).
spark.sql(s"insert into $tableName values (1, 'a1', 10, 1000, '2021-03-21')") | ||
|
||
// Test 1: Update statements where at least one misses primary key assignment | ||
if (tableType.equals("mor")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also test the same SQL statements on COW table and make sure they pass with data validation (i.e., the case where the new constraint does not apply)?
3f5a535
to
631494d
Compare
631494d
to
ffad811
Compare
Change Logs
For insert and update clause of MIT, in some cases if we didn't set the primary key/precombine field value explicitly, MIT errors out in the query analyze phase.
Also some changes that make sure the precombine key value data type is the same over source and target table (an irrelevant test maintenance)
Impact
Better user facing error message and guard against cases we don't support.
Risk level (write none, low medium or high below)
none
Documentation Update
We track MIT doc update all in https://issues.apache.org/jira/browse/HUDI-8527
Contributor's checklist