-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark: MERGE INTO Statements with only WHEN NOT MATCHED Clauses are always executed at Snapshot Isolation #12653
Comments
What Isolation level are you using? Duplication (double inserts) would be expected if the isolation level was snapshot, but forbidden if the isolation level was serializable? I'll check the operation setup as well |
The problem is how Iceberg handles the merge operation if it only contains new data (append), and how Iceberg resolves the append operation conflicts. I will let you check the operation setup, thanks! |
I am a little confused about the "operation" setup because the validation rules used by the Row Delta operations are unrelated to the Append paths as far as I know. Although something may have changed. See CopyOnWriteOperation for example |
Ok I think I see the issue, it's inside the Spark Code. Let me finish testing this |
The issue I believe is related to the optimizations here The code above takes a MergeIntoCommand which only does "NOT MATCHED INSERT" and changes the Command into an AppendData.byPosition instead of a RowDelta like AppendData is treated in Apache Iceberg as a Batch Write which always operates under what is equivalent to "Snapshot Isolation" regardless of what has been configured for RowDeltas. This means MERGE INTO NOT MATCHED INSERT commands with no match clause are running at the wrong isolation level leading to the above issue. Repo - @TestTemplate
public void testMergeIntoEmptyTableSerializableIsolationNotMatchedOnly() throws InterruptedException {
createAndInitTable("id INT, dep STRING");
sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "serializable");
createOrReplaceView(
"source",
"id INT, dep STRING",
"{ \"id\": 1, \"dep\": \"emp-id-1\" }\n"
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
+ "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
ExecutorService executorService =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));
String query = "MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN NOT MATCHED THEN INSERT *";
executorService.submit(() ->
spark.sql(
String.format(query, tableName)).collect());
executorService.submit(() ->
spark.sql(
String.format(query, tableName)).collect());
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
ImmutableList<Object[]> expectedRows =
ImmutableList.of(
row(1, "emp-id-1"), // new
row(2, "emp-id-2"), // new
row(3, "emp-id-3") // new
);
assertEquals(
"Should have expected rows",
expectedRows,
sql("SELECT * FROM %s ORDER BY id", tableName));
} |
@hussein-awala not sure about your tests but I find that everything works as expected if the query has a no-op MATCH clause @TestTemplate
public void testMergeIntoEmptyTableSerializableIsolation() throws InterruptedException {
createAndInitTable("id INT, dep STRING");
sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "serializable");
createOrReplaceView(
"source",
"id INT, dep STRING",
"{ \"id\": 1, \"dep\": \"emp-id-1\" }\n"
+ "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
+ "{ \"id\": 3, \"dep\": \"emp-id-3\" }");
ExecutorService executorService =
MoreExecutors.getExitingExecutorService(
(ThreadPoolExecutor) Executors.newFixedThreadPool(2));
String query = "MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED THEN UPDATE SET dep = 'foo' WHEN NOT MATCHED THEN INSERT *";
executorService.submit(() ->
spark.sql(
String.format(query, tableName)).collect());
executorService.submit(() ->
spark.sql(
String.format(query, tableName)).collect());
executorService.shutdown();
executorService.awaitTermination(1, TimeUnit.MINUTES);
ImmutableList<Object[]> expectedRows =
ImmutableList.of(
row(1, "emp-id-1"), // new
row(2, "emp-id-2"), // new
row(3, "emp-id-3") // new
);
assertEquals(
"Should have expected rows",
expectedRows,
sql("SELECT * FROM %s ORDER BY id", tableName));
} |
Thanks @RussellSpitzer for this investigation. All my tests only had a no-match clause. For the query with a match clause, I just checked the operation in the commit metrics, and I assumed I will have the same issue. |
@RussellSpitzer indeed, it detect a conflict when I add a For this issue, do you think we should add a note in the documentation and/or add your tests to detect a change in behavior? |
This is a serious bug and it needs to be fixed. @aokolnychyi and I were discussing it yesterday and trying to figure out what the right place to fix this is. |
@aokolnychyi Had some preliminary ideas on how to fix this, it will require a Spark change though. |
Apache Iceberg version
1.8.1 (latest release)
Query engine
Spark
Please describe the bug 🐞
When running two concurrent
MERGE INTO
operations on an Apache Iceberg table, I expect them to be idempotent -- meaning Iceberg should either detect conflicts and resolve them or fail one of the jobs to prevent data inconsistencies.However, Iceberg determines the operation type dynamically based on the result of the join condition, which can lead to unexpected behavior:
This behavior introduces a problem:
If the dataset is large enough and neither job finds a match, both will proceed with appending data independently, causing duplicate records.
Reproduction Steps
Running the following query in concurrent jobs can result in duplicate data if no matching records exist in
dest
:I initially expected the operation type to be determined by the query itself (i.e., always "append" in the query without
UPDATE
action). However, through testing, I found that Iceberg decides the operation type at runtime, based on the actual join results. This makesMERGE INTO
non-idempotent, leading to unintended duplicate inserts.Expected Behavior
Iceberg should ensure idempotency for
MERGE INTO
, preventing duplicate data when no matches are found.Additional Context
glue
) with S3 FileIOWould love to hear if others have encountered this or if there's a recommended workaround.
Willingness to contribute
The text was updated successfully, but these errors were encountered: