Skip to content

Support setting compression_codec table properties for Iceberg Connector #25755

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

SongChujun
Copy link
Member

@SongChujun SongChujun commented May 8, 2025

This PR add support for setting compression_codec table properties for Iceberg Connector.

Users are able to run the following command now to specify the compression_codec to create a new table with compression_codec set to ZSTD.

CREATE TABLE example (
    c1 INTEGER,
    c2 DATE,
    c3 DOUBLE
)
WITH (
    format = 'AVRO',
    partitioning = ARRAY['c1', 'c2'],
    sorted_by = ARRAY['c3'],
    compression_codec = 'ZSTD'
);

Users are able able to change the compression_codec via statement Alter Table Set Properties.

Example

ALTER TABLE example SET PROPERTIES compression_codec = 'GZIP';

The write_compression users specify will take precedence over session variable iceberg.compression-codec. If the user change file_format without changing write_compression, it will inherit the write_compression set for the original file format, if the write_compression for the previous file format is set.

If the user is trying to set a write_compression that is inconsistent with the file format, the system will throw an exception.

The compatibility matrix for write_compression and file format is the following

Orc Parquet Avro
NONE
SNAPPY
LZ4
ZSTD
GZIP

Description

Additional context and related issues

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

## Section
* Fix some things. ({issue}`20401`)

@cla-bot cla-bot bot added the cla-signed label May 8, 2025
@github-actions github-actions bot added the iceberg Iceberg connector label May 8, 2025
@SongChujun SongChujun force-pushed the iceberg_avro_write_compression_level_support branch 3 times, most recently from 36c996a to b5e8d55 Compare May 8, 2025 22:16
@SongChujun SongChujun added the enhancement New feature or request label May 8, 2025
@SongChujun SongChujun requested a review from pettyjamesm May 8, 2025 23:45
Optional<HiveCompressionCodec> oldCompressionCodec = getHiveCompressionCodec(oldFileFormat, existingProperties);
Optional<HiveCompressionCodec> newCompressionCodec = IcebergTableProperties.getHiveCompressionCodec(inputProperties);

Optional<HiveCompressionCodec> compressionCodec = newCompressionCodec.or(() -> oldCompressionCodec);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: no need for a lambda here, just newCompressionCodec.orElse(oldCompressionCodec)

Copy link
Member Author

@SongChujun SongChujun May 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

orElse requires the parameter to be T and return T, but we are processing on optional and want to return optional, so want to use the or instaed of orElse

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed that- still, you want to avoid spinning a lambda for a cheap operation like this. Maybe a cleaner way to write it is: newCompressionCodec.isPresent() ? newCompressionCodec : oldCompressionCodec; like we do later on for compression level.

{
return Optional.ofNullable((Integer) tableProperties.get(COMPRESSION_LEVEL))
.map(OptionalInt::of)
.orElseGet(OptionalInt::empty);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: no need for a lambda here, just orElse(OptionalInt.empty())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

{
if (compressionCodec.isPresent()) {
if (!isCompressionCodecSupportedForFormat(fileFormat, compressionCodec.get())) {
throw new TrinoException(NOT_SUPPORTED, format("Compression codec LZ4 not supported for %s", fileFormat.humanName()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error message here mentions LZ4 explicitly, but it seems like this should be a parameter based on compressionCodec.get()

Copy link
Member Author

@SongChujun SongChujun May 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to pass the compressionCodec.get() instead of LZ4

throw new TrinoException(INVALID_TABLE_PROPERTY, "write_compression must be set when compression_level is set");
}
else {
if (!(VALID_ICEBERG_FILE_FORMATS_FOR_COMPRESSION_LEVEL_PROPERTY.contains(fileFormat)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether this branch would actually be easier to read and understand as an enum switch statement. That way we would fail to compile if a new enum were added and we would know we needed to add new handling at that point.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


public static boolean isCompressionCodecSupportedForFormat(IcebergFileFormat fileFormat, HiveCompressionCodec codec)
{
return !((fileFormat.equals(AVRO) || fileFormat.equals(PARQUET)) && codec.equals(HiveCompressionCodec.LZ4));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rewrite this as a switch over the file formats and compression codecs so that it's easier to read and understand the valid combinations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
String compressionLevelProperty = getCompressionLevelName(fileFormat);
String compressionLevelValue = storageProperties.get(compressionLevelProperty);
if (!Strings.isNullOrEmpty(compressionLevelValue)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just inline compressionLevelValue != null && !compressionLevelValue.isEmpty() instead of bringing in Strings to do that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -471,19 +486,19 @@ public static Map<PartitionField, Integer> getIdentityPartitions(PartitionSpec p
return columns.buildOrThrow();
}

public static List<Types.NestedField> primitiveFields(Schema schema)
public static List<NestedField> primitiveFields(Schema schema)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move these cleanup / refactoring changes to a separate commit


String parquetCompressionValue = txn.table().properties().get(PARQUET_COMPRESSION);
if (parquetCompressionValue != null && parquetCompressionValue.isEmpty()) {
txn.updateProperties()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this related to the iceberg parquet compression handling bug? If so, we'll want an inline comment explaining what's going on here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify what is the bug being referring to here ? I responded at #24851 (comment) to the concern raised there.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we let Iceberg write the table property write.parquet.compression-codec to zstd by default for us, then this PR will introduce some non-backward-compatible behavior. Suppose we have a user that didn't set write.parquet.compression-codec in their table property, if they are setting write.parquet.compression-codec to snappy in their iceberg config/iceberg session property, when they write a new file, on a previous version of Trino, the compression-codec being used would be snappy. But because this change will try to also read compression-codec from table properties when writing a file, this time the compression-codec being used would be zstd instead of snappy, which is not backward compatible. So we want to disable Iceberg from setting this table property if that's not being set by user.


// Iceberg will set write.parquet.compression-codec to zstd by default if this property is not set: https://github.com/trinodb/trino/issues/20401,
// but we don't want to set this property if this is not explicitly set by customer via set table properties.
if (!(fileFormat == IcebergFileFormat.PARQUET && compressionCodec.isPresent())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll definitely want a test around this quirk

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be covered by the test implicitly, but I still added a test on it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assertUpdate(format("CREATE TABLE %s WITH (format = 'AVRO') AS SELECT * FROM nation", tableName), "SELECT count(*) FROM nation");

assertQuery(format("SELECT COUNT(*) FROM \"%s$properties\" WHERE key = 'write.parquet.compression-codec'", tableName), "SELECT 0");

@pettyjamesm pettyjamesm requested a review from raunaqmorarka May 9, 2025 17:22
assertQuery(format("SELECT value FROM \"%s$properties\" WHERE key = 'write.avro.compression-codec'", tableName), "VALUES 'ZSTD'");

assertUpdate(format("ALTER TABLE %s SET PROPERTIES compression_level = 5", tableName));
assertQuery(format("SELECT value FROM \"%s$properties\" WHERE key = 'write.avro.compression-level'", tableName), ("VALUES 5"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like checkstyle is complaining about this line, but giving the wrong line number in the error.

@SongChujun SongChujun marked this pull request as ready for review May 9, 2025 18:28
@SongChujun SongChujun self-assigned this May 9, 2025
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds support for specifying the write_compression and compression_level table properties for the Iceberg connector and updates related tests and compression property validations.

  • Adds new DDL support and property validations for write_compression and compression_level.
  • Updates tests to verify new expected compression property values.
  • Adjusts compression property handling across table creation, metadata, and file writer implementations.

Reviewed Changes

Copilot reviewed 9 out of 9 changed files in this pull request and generated no comments.

Show a summary per file
File Description
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java Updated expected values in JSON assertions.
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java Modified test expectations for column_sizes and compression properties.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java Revised table properties creation and transaction handling.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java Introduced new methods for compression level and codec extraction.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java Added new table properties and validation for compression settings.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java Updated table property and compression handling during metadata updates.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java Adjusted writer creation to incorporate storage properties for compression.
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java Updated Avro writer to accept and use a compression level.
Comments suppressed due to low confidence (1)

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java:897

  • The method 'createTableProperties' is expected to return a Map<String, String>, but it is returning a Transaction object. Please update the return value to the actual table properties map or adjust the method signature accordingly to ensure type consistency.
return txn;

@SongChujun SongChujun force-pushed the iceberg_avro_write_compression_level_support branch 2 times, most recently from ffaf8b7 to 4d27522 Compare May 16, 2025 15:58
@@ -56,16 +63,30 @@ public class IcebergTableProperties
public static final String SORTED_BY_PROPERTY = "sorted_by";
public static final String LOCATION_PROPERTY = "location";
public static final String FORMAT_VERSION_PROPERTY = "format_version";

public static final String WRITE_COMPRESSION = "write_compression";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Property name should be compression_codec to maintain consistency with equivalent iceberg table property and the connector session property.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The order of precedence should be: session property > table property > catalog config property

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can change the property name. But I don't understand why session property should have a higher precedence over table property, if a user set table property for a particular table, it should have higher precedence compared to the default one from session property right?

@@ -220,6 +252,19 @@ public static IcebergFileFormat getFileFormat(Map<String, Object> tablePropertie
return (IcebergFileFormat) tableProperties.get(FILE_FORMAT_PROPERTY);
}

public static Optional<HiveCompressionCodec> getHiveCompressionCodec(Map<String, Object> inputProperties)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getCompressionCodec

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update the name

}
return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, Optional.ofNullable(tableLocation), createTableProperties(session, tableMetadata, allowedExtraProperties));

// If user doesn't set compression-codec for parquet, we need to remove write.parquet.compression-codec property,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If user doesn't set anything, our default for iceberg.compression-codec catalog config property should apply.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already replied in a previous comment.

public static final String MAX_COMMIT_RETRY = "max_commit_retry";
public static final String ORC_BLOOM_FILTER_COLUMNS_PROPERTY = "orc_bloom_filter_columns";
public static final String ORC_BLOOM_FILTER_FPP_PROPERTY = "orc_bloom_filter_fpp";
public static final String PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY = "parquet_bloom_filter_columns";
public static final String OBJECT_STORE_LAYOUT_ENABLED_PROPERTY = "object_store_layout_enabled";
public static final String DATA_LOCATION_PROPERTY = "data_location";
public static final String EXTRA_PROPERTIES_PROPERTY = "extra_properties";
public static final Set<IcebergFileFormat> VALID_ICEBERG_FILE_FORMATS_FOR_COMPRESSION_LEVEL_PROPERTY = ImmutableSet.of(IcebergFileFormat.AVRO);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels very odd to add a table property which is supported only for AVRO file format. When we're adding something as a Trino property, the expectation is that Trino is able to actually implement it for the typical use case (orc/parquet in this case). I suggest dropping the "compression_level" changes unless we actually implement it in Trino orc and parquet writers. Trino is already adhering to iceberg table property write.avro.compression-level without these changes, and you can still use ALTER TABLE tableName SET PROPERTIES extra_properties ... to change this property through Trino. So we don't strictly need this to be a Trino property.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can remove support for setting compression_level.


tableCompressionProperties.forEach(propertiesBuilder::put);

// Iceberg will set write.parquet.compression-codec to zstd by default if this property is not set: https://github.com/trinodb/trino/issues/20401,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue you've linked to is closed, we need to either re-open that (with explanation why) or file a new issue and refer to that.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, we may re-open it after we reach agreement on #25755 (comment)

tableCompressionProperties.forEach(propertiesBuilder::put);

// Iceberg will set write.parquet.compression-codec to zstd by default if this property is not set: https://github.com/trinodb/trino/issues/20401,
// but we don't want to set this property if this is not explicitly set by customer via set table properties.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ? If user has not set table property, the catalog config property for compression codec still applies

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

explained in a previous comment


String parquetCompressionValue = txn.table().properties().get(PARQUET_COMPRESSION);
if (parquetCompressionValue != null && parquetCompressionValue.isEmpty()) {
txn.updateProperties()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you clarify what is the bug being referring to here ? I responded at #24851 (comment) to the concern raised there.

@SongChujun SongChujun force-pushed the iceberg_avro_write_compression_level_support branch from 4d27522 to 4c0b43f Compare May 22, 2025 15:05
@github-actions github-actions bot added the docs label May 22, 2025
@SongChujun SongChujun changed the title Support setting write_compression and compression_level table properties for Iceberg Connector Support setting compression_codec table properties for Iceberg Connector May 22, 2025
@SongChujun SongChujun force-pushed the iceberg_avro_write_compression_level_support branch from 4c0b43f to 5d80c4b Compare May 22, 2025 17:48
@SongChujun SongChujun force-pushed the iceberg_avro_write_compression_level_support branch from 5d80c4b to e53d72e Compare May 22, 2025 20:33
@SongChujun SongChujun requested a review from raunaqmorarka May 23, 2025 14:03
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
cla-signed docs enhancement New feature or request iceberg Iceberg connector
Development

Successfully merging this pull request may close these issues.

3 participants