Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: MERGE INTO Statements with only WHEN NOT MATCHED Clauses are always executed at Snapshot Isolation #12653

Open
1 of 3 tasks
hussein-awala opened this issue Mar 26, 2025 · 10 comments
Labels
bug Something isn't working spark

Comments

@hussein-awala
Copy link
Member

Apache Iceberg version

1.8.1 (latest release)

Query engine

Spark

Please describe the bug 🐞

When running two concurrent MERGE INTO operations on an Apache Iceberg table, I expect them to be idempotent -- meaning Iceberg should either detect conflicts and resolve them or fail one of the jobs to prevent data inconsistencies.

However, Iceberg determines the operation type dynamically based on the result of the join condition, which can lead to unexpected behavior:

  • If a match is found, Iceberg treats it as an overwrite operation and fails the second job due to conflicting commits.
  • If no match is found, Iceberg considers it an append operation and attempts to resolve conflicts by creating a new manifest for appended data, as explained in the Cost of Retries doc.

This behavior introduces a problem:
If the dataset is large enough and neither job finds a match, both will proceed with appending data independently, causing duplicate records.

Reproduction Steps

Running the following query in concurrent jobs can result in duplicate data if no matching records exist in dest:

MERGE INTO dest  
USING src  
ON dest.id = src.id  
WHEN NOT MATCHED THEN  
  INSERT *
-- even with update action, we'll have the same issue
-- WHEN MATCHED THEN
--  UPDATE SET *

I initially expected the operation type to be determined by the query itself (i.e., always "append" in the query without UPDATE action). However, through testing, I found that Iceberg decides the operation type at runtime, based on the actual join results. This makes MERGE INTO non-idempotent, leading to unintended duplicate inserts.

Expected Behavior

Iceberg should ensure idempotency for MERGE INTO, preventing duplicate data when no matches are found.

Additional Context

  • Iceberg version: 1.8.1
  • Iceberg catalog: Glue catalog (type glue) with S3 FileIO
  • Spark version: 3.5.5

Would love to hear if others have encountered this or if there's a recommended workaround.

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time
@hussein-awala hussein-awala added the bug Something isn't working label Mar 26, 2025
@RussellSpitzer
Copy link
Member

What Isolation level are you using? Duplication (double inserts) would be expected if the isolation level was snapshot, but forbidden if the isolation level was serializable? I'll check the operation setup as well

@hussein-awala
Copy link
Member Author

write.merge.isolation-level, write.update.isolation-level, and write.delete.isolation-level properties are set to serializable.

but forbidden if the isolation level was serializable?

The problem is how Iceberg handles the merge operation if it only contains new data (append), and how Iceberg resolves the append operation conflicts.

I will let you check the operation setup, thanks!

@RussellSpitzer
Copy link
Member

I am a little confused about the "operation" setup because the validation rules used by the Row Delta operations are unrelated to the Append paths as far as I know. Although something may have changed. See CopyOnWriteOperation for example

@RussellSpitzer
Copy link
Member

Ok I think I see the issue, it's inside the Spark Code. Let me finish testing this

@RussellSpitzer
Copy link
Member

RussellSpitzer commented Mar 26, 2025

The issue I believe is related to the optimizations here

https://github.com/apache/spark/blob/597cafc05d9a4ecd5a111d1dc9c92fb37c77ce3f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala#L47-L75

The code above takes a MergeIntoCommand which only does "NOT MATCHED INSERT" and changes the Command into an AppendData.byPosition instead of a RowDelta like

https://github.com/apache/spark/blob/597cafc05d9a4ecd5a111d1dc9c92fb37c77ce3f/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala#L129-L138

AppendData is treated in Apache Iceberg as a Batch Write which always operates under what is equivalent to "Snapshot Isolation" regardless of what has been configured for RowDeltas. This means MERGE INTO NOT MATCHED INSERT commands with no match clause are running at the wrong isolation level leading to the above issue.

Repo -
TestMerge.java

  @TestTemplate
  public void testMergeIntoEmptyTableSerializableIsolationNotMatchedOnly() throws InterruptedException {
    createAndInitTable("id INT, dep STRING");
    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "serializable");

    createOrReplaceView(
      "source",
      "id INT, dep STRING",
      "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n"
        + "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
        + "{ \"id\": 3, \"dep\": \"emp-id-3\" }");

    ExecutorService executorService =
      MoreExecutors.getExitingExecutorService(
        (ThreadPoolExecutor) Executors.newFixedThreadPool(2));

    String query = "MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN NOT MATCHED THEN INSERT *";

    executorService.submit(() ->
      spark.sql(
        String.format(query, tableName)).collect());
    executorService.submit(() ->
      spark.sql(
        String.format(query, tableName)).collect());

    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);

    ImmutableList<Object[]> expectedRows =
      ImmutableList.of(
        row(1, "emp-id-1"), // new
        row(2, "emp-id-2"), // new
        row(3, "emp-id-3")  // new
      );

    assertEquals(
      "Should have expected rows",
      expectedRows,
      sql("SELECT * FROM %s ORDER BY id", tableName));
  }

@RussellSpitzer
Copy link
Member

RussellSpitzer commented Mar 26, 2025

@hussein-awala not sure about your tests but I find that everything works as expected if the query has a no-op MATCH clause

 @TestTemplate
  public void testMergeIntoEmptyTableSerializableIsolation() throws InterruptedException {
    createAndInitTable("id INT, dep STRING");
    sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName, MERGE_ISOLATION_LEVEL, "serializable");

    createOrReplaceView(
      "source",
      "id INT, dep STRING",
      "{ \"id\": 1, \"dep\": \"emp-id-1\" }\n"
        + "{ \"id\": 2, \"dep\": \"emp-id-2\" }\n"
        + "{ \"id\": 3, \"dep\": \"emp-id-3\" }");

    ExecutorService executorService =
      MoreExecutors.getExitingExecutorService(
        (ThreadPoolExecutor) Executors.newFixedThreadPool(2));

    String query = "MERGE INTO %s AS t USING source AS s ON t.id == s.id WHEN MATCHED THEN UPDATE SET  dep = 'foo' WHEN NOT MATCHED THEN INSERT *";

    executorService.submit(() ->
      spark.sql(
        String.format(query, tableName)).collect());
    executorService.submit(() ->
      spark.sql(
        String.format(query, tableName)).collect());

    executorService.shutdown();
    executorService.awaitTermination(1, TimeUnit.MINUTES);

    ImmutableList<Object[]> expectedRows =
      ImmutableList.of(
        row(1, "emp-id-1"), // new
        row(2, "emp-id-2"), // new
        row(3, "emp-id-3")  // new
      );

    assertEquals(
      "Should have expected rows",
      expectedRows,
      sql("SELECT * FROM %s ORDER BY id", tableName));
  }

@hussein-awala
Copy link
Member Author

Thanks @RussellSpitzer for this investigation. All my tests only had a no-match clause. For the query with a match clause, I just checked the operation in the commit metrics, and I assumed I will have the same issue.
I will test it tomorrow and see if the issue will be resolved by adding a match condition.

@hussein-awala
Copy link
Member Author

@RussellSpitzer indeed, it detect a conflict when I add a WHEN MATCHED clause, and I used WHEN MATCHED AND 1!=1 THEN UPDATE SET * to avoid updating the data, although it always rewrites the data file (the same data without any updates).

For this issue, do you think we should add a note in the documentation and/or add your tests to detect a change in behavior?

@RussellSpitzer
Copy link
Member

This is a serious bug and it needs to be fixed. @aokolnychyi and I were discussing it yesterday and trying to figure out what the right place to fix this is.

@RussellSpitzer RussellSpitzer changed the title Reliability: MERGE INTO is not idempotent when no matches are found Spark; Mar 27, 2025
@RussellSpitzer RussellSpitzer changed the title Spark; Spark: MERGE INTO WHEN NOT MATCHED INSERT Statements always executed at Snapshot Isolation due to Sparks Handling of RewriteMergeIntoTable Mar 27, 2025
@RussellSpitzer RussellSpitzer changed the title Spark: MERGE INTO WHEN NOT MATCHED INSERT Statements always executed at Snapshot Isolation due to Sparks Handling of RewriteMergeIntoTable Spark: MERGE INTO Statements with only WHEN NOT MATCHED Clauses are always executed at Snapshot Isolation Mar 27, 2025
@RussellSpitzer
Copy link
Member

@aokolnychyi Had some preliminary ideas on how to fix this, it will require a Spark change though.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working spark
Projects
None yet
Development

No branches or pull requests

2 participants