-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Support setting compression_codec table properties for Iceberg Connector #25755
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Support setting compression_codec table properties for Iceberg Connector #25755
Conversation
36c996a
to
b5e8d55
Compare
Optional<HiveCompressionCodec> oldCompressionCodec = getHiveCompressionCodec(oldFileFormat, existingProperties); | ||
Optional<HiveCompressionCodec> newCompressionCodec = IcebergTableProperties.getHiveCompressionCodec(inputProperties); | ||
|
||
Optional<HiveCompressionCodec> compressionCodec = newCompressionCodec.or(() -> oldCompressionCodec); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: no need for a lambda here, just newCompressionCodec.orElse(oldCompressionCodec)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
orElse requires the parameter to be T and return T, but we are processing on optional and want to return optional, so want to use the or
instaed of orElse
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I missed that- still, you want to avoid spinning a lambda for a cheap operation like this. Maybe a cleaner way to write it is: newCompressionCodec.isPresent() ? newCompressionCodec : oldCompressionCodec;
like we do later on for compression level.
{ | ||
return Optional.ofNullable((Integer) tableProperties.get(COMPRESSION_LEVEL)) | ||
.map(OptionalInt::of) | ||
.orElseGet(OptionalInt::empty); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: no need for a lambda here, just orElse(OptionalInt.empty())
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
{ | ||
if (compressionCodec.isPresent()) { | ||
if (!isCompressionCodecSupportedForFormat(fileFormat, compressionCodec.get())) { | ||
throw new TrinoException(NOT_SUPPORTED, format("Compression codec LZ4 not supported for %s", fileFormat.humanName())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error message here mentions LZ4
explicitly, but it seems like this should be a parameter based on compressionCodec.get()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to pass the compressionCodec.get() instead of LZ4
throw new TrinoException(INVALID_TABLE_PROPERTY, "write_compression must be set when compression_level is set"); | ||
} | ||
else { | ||
if (!(VALID_ICEBERG_FILE_FORMATS_FOR_COMPRESSION_LEVEL_PROPERTY.contains(fileFormat) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether this branch would actually be easier to read and understand as an enum switch statement. That way we would fail to compile if a new enum were added and we would know we needed to add new handling at that point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
public static boolean isCompressionCodecSupportedForFormat(IcebergFileFormat fileFormat, HiveCompressionCodec codec) | ||
{ | ||
return !((fileFormat.equals(AVRO) || fileFormat.equals(PARQUET)) && codec.equals(HiveCompressionCodec.LZ4)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's rewrite this as a switch over the file formats and compression codecs so that it's easier to read and understand the valid combinations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
String compressionLevelProperty = getCompressionLevelName(fileFormat); | ||
String compressionLevelValue = storageProperties.get(compressionLevelProperty); | ||
if (!Strings.isNullOrEmpty(compressionLevelValue)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would just inline compressionLevelValue != null && !compressionLevelValue.isEmpty()
instead of bringing in Strings
to do that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -471,19 +486,19 @@ public static Map<PartitionField, Integer> getIdentityPartitions(PartitionSpec p | |||
return columns.buildOrThrow(); | |||
} | |||
|
|||
public static List<Types.NestedField> primitiveFields(Schema schema) | |||
public static List<NestedField> primitiveFields(Schema schema) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's move these cleanup / refactoring changes to a separate commit
|
||
String parquetCompressionValue = txn.table().properties().get(PARQUET_COMPRESSION); | ||
if (parquetCompressionValue != null && parquetCompressionValue.isEmpty()) { | ||
txn.updateProperties() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this related to the iceberg parquet compression handling bug? If so, we'll want an inline comment explaining what's going on here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clarify what is the bug being referring to here ? I responded at #24851 (comment) to the concern raised there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we let Iceberg write the table property write.parquet.compression-codec
to zstd by default for us, then this PR will introduce some non-backward-compatible behavior. Suppose we have a user that didn't set write.parquet.compression-codec
in their table property, if they are setting write.parquet.compression-codec
to snappy in their iceberg config/iceberg session property, when they write a new file, on a previous version of Trino, the compression-codec being used would be snappy. But because this change will try to also read compression-codec from table properties when writing a file, this time the compression-codec being used would be zstd instead of snappy, which is not backward compatible. So we want to disable Iceberg from setting this table property if that's not being set by user.
|
||
// Iceberg will set write.parquet.compression-codec to zstd by default if this property is not set: https://github.com/trinodb/trino/issues/20401, | ||
// but we don't want to set this property if this is not explicitly set by customer via set table properties. | ||
if (!(fileFormat == IcebergFileFormat.PARQUET && compressionCodec.isPresent())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll definitely want a test around this quirk
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it should be covered by the test implicitly, but I still added a test on it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
assertUpdate(format("CREATE TABLE %s WITH (format = 'AVRO') AS SELECT * FROM nation", tableName), "SELECT count(*) FROM nation");
assertQuery(format("SELECT COUNT(*) FROM \"%s$properties\" WHERE key = 'write.parquet.compression-codec'", tableName), "SELECT 0");
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java
Outdated
Show resolved
Hide resolved
assertQuery(format("SELECT value FROM \"%s$properties\" WHERE key = 'write.avro.compression-codec'", tableName), "VALUES 'ZSTD'"); | ||
|
||
assertUpdate(format("ALTER TABLE %s SET PROPERTIES compression_level = 5", tableName)); | ||
assertQuery(format("SELECT value FROM \"%s$properties\" WHERE key = 'write.avro.compression-level'", tableName), ("VALUES 5")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like checkstyle is complaining about this line, but giving the wrong line number in the error.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for specifying the write_compression and compression_level table properties for the Iceberg connector and updates related tests and compression property validations.
- Adds new DDL support and property validations for write_compression and compression_level.
- Updates tests to verify new expected compression property values.
- Adjusts compression property handling across table creation, metadata, and file writer implementations.
Reviewed Changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated no comments.
Show a summary per file
File | Description |
---|---|
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergV2.java | Updated expected values in JSON assertions. |
plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergSystemTables.java | Modified test expectations for column_sizes and compression properties. |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java | Revised table properties creation and transaction handling. |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java | Introduced new methods for compression level and codec extraction. |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java | Added new table properties and validation for compression settings. |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java | Updated table property and compression handling during metadata updates. |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java | Adjusted writer creation to incorporate storage properties for compression. |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java | Updated Avro writer to accept and use a compression level. |
Comments suppressed due to low confidence (1)
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/catalog/AbstractTrinoCatalog.java:897
- The method 'createTableProperties' is expected to return a Map<String, String>, but it is returning a Transaction object. Please update the return value to the actual table properties map or adjust the method signature accordingly to ensure type consistency.
return txn;
ffaf8b7
to
4d27522
Compare
@@ -56,16 +63,30 @@ public class IcebergTableProperties | |||
public static final String SORTED_BY_PROPERTY = "sorted_by"; | |||
public static final String LOCATION_PROPERTY = "location"; | |||
public static final String FORMAT_VERSION_PROPERTY = "format_version"; | |||
|
|||
public static final String WRITE_COMPRESSION = "write_compression"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Property name should be compression_codec
to maintain consistency with equivalent iceberg table property and the connector session property.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The order of precedence should be: session property > table property > catalog config property
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can change the property name. But I don't understand why session property should have a higher precedence over table property, if a user set table property for a particular table, it should have higher precedence compared to the default one from session property right?
@@ -220,6 +252,19 @@ public static IcebergFileFormat getFileFormat(Map<String, Object> tablePropertie | |||
return (IcebergFileFormat) tableProperties.get(FILE_FORMAT_PROPERTY); | |||
} | |||
|
|||
public static Optional<HiveCompressionCodec> getHiveCompressionCodec(Map<String, Object> inputProperties) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getCompressionCodec
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update the name
} | ||
return catalog.newCreateTableTransaction(session, schemaTableName, schema, partitionSpec, sortOrder, Optional.ofNullable(tableLocation), createTableProperties(session, tableMetadata, allowedExtraProperties)); | ||
|
||
// If user doesn't set compression-codec for parquet, we need to remove write.parquet.compression-codec property, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If user doesn't set anything, our default for iceberg.compression-codec
catalog config property should apply.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already replied in a previous comment.
public static final String MAX_COMMIT_RETRY = "max_commit_retry"; | ||
public static final String ORC_BLOOM_FILTER_COLUMNS_PROPERTY = "orc_bloom_filter_columns"; | ||
public static final String ORC_BLOOM_FILTER_FPP_PROPERTY = "orc_bloom_filter_fpp"; | ||
public static final String PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY = "parquet_bloom_filter_columns"; | ||
public static final String OBJECT_STORE_LAYOUT_ENABLED_PROPERTY = "object_store_layout_enabled"; | ||
public static final String DATA_LOCATION_PROPERTY = "data_location"; | ||
public static final String EXTRA_PROPERTIES_PROPERTY = "extra_properties"; | ||
public static final Set<IcebergFileFormat> VALID_ICEBERG_FILE_FORMATS_FOR_COMPRESSION_LEVEL_PROPERTY = ImmutableSet.of(IcebergFileFormat.AVRO); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels very odd to add a table property which is supported only for AVRO file format. When we're adding something as a Trino property, the expectation is that Trino is able to actually implement it for the typical use case (orc/parquet in this case). I suggest dropping the "compression_level" changes unless we actually implement it in Trino orc and parquet writers. Trino is already adhering to iceberg table property write.avro.compression-level
without these changes, and you can still use ALTER TABLE tableName SET PROPERTIES extra_properties ...
to change this property through Trino. So we don't strictly need this to be a Trino property.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I can remove support for setting compression_level.
|
||
tableCompressionProperties.forEach(propertiesBuilder::put); | ||
|
||
// Iceberg will set write.parquet.compression-codec to zstd by default if this property is not set: https://github.com/trinodb/trino/issues/20401, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue you've linked to is closed, we need to either re-open that (with explanation why) or file a new issue and refer to that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, we may re-open it after we reach agreement on #25755 (comment)
tableCompressionProperties.forEach(propertiesBuilder::put); | ||
|
||
// Iceberg will set write.parquet.compression-codec to zstd by default if this property is not set: https://github.com/trinodb/trino/issues/20401, | ||
// but we don't want to set this property if this is not explicitly set by customer via set table properties. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why ? If user has not set table property, the catalog config property for compression codec still applies
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
explained in a previous comment
|
||
String parquetCompressionValue = txn.table().properties().get(PARQUET_COMPRESSION); | ||
if (parquetCompressionValue != null && parquetCompressionValue.isEmpty()) { | ||
txn.updateProperties() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clarify what is the bug being referring to here ? I responded at #24851 (comment) to the concern raised there.
4d27522
to
4c0b43f
Compare
4c0b43f
to
5d80c4b
Compare
5d80c4b
to
e53d72e
Compare
This PR add support for setting compression_codec table properties for Iceberg Connector.
Users are able to run the following command now to specify the compression_codec to create a new table with compression_codec set to ZSTD.
Users are able able to change the compression_codec via statement
Alter Table Set Properties
.Example
The write_compression users specify will take precedence over session variable
iceberg.compression-codec
. If the user change file_format without changing write_compression, it will inherit the write_compression set for the original file format, if the write_compression for the previous file format is set.If the user is trying to set a write_compression that is inconsistent with the file format, the system will throw an exception.
The compatibility matrix for write_compression and file format is the following
Description
Additional context and related issues
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text: