Skip to content

Commit a8410e8

Browse files
committed
Support setting compression-codec table property for Iceberg Connector
1 parent 618d44b commit a8410e8

File tree

10 files changed

+443
-105
lines changed

10 files changed

+443
-105
lines changed

docs/src/main/sphinx/connector/iceberg.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -932,6 +932,10 @@ connector using a {doc}`WITH </sql/create-table-as>` clause.
932932
- Optionally specifies the format of table data files; either `PARQUET`,
933933
`ORC`, or `AVRO`. Defaults to the value of the `iceberg.file-format` catalog
934934
configuration property, which defaults to `PARQUET`.
935+
* - `compression-codec`
936+
- Optionally specifies the compression-codec used for writing the table; either `NONE`,
937+
`ZSTD`, `SNAPPY`, `LZ4`, or `GZIP`. Defaults to the value of the `iceberg.compression-codec` catalog
938+
configuration property, which defaults to `ZSTD`
935939
* - `partitioning`
936940
- Optionally specifies table partitioning. If a table is partitioned by
937941
columns `c1` and `c2`, the partitioning property is `partitioning =
@@ -989,7 +993,7 @@ WITH (
989993
location = '/var/example_tables/test_table');
990994
```
991995

992-
The table definition below specifies to use ORC files, bloom filter index by columns
996+
The table definition below specifies to use ORC files with compression-codec SNAPPY, bloom filter index by columns
993997
`c1` and `c2`, fpp is 0.05, and a file system location of
994998
`/var/example_tables/test_table`:
995999

@@ -1000,6 +1004,7 @@ CREATE TABLE test_table (
10001004
c3 DOUBLE)
10011005
WITH (
10021006
format = 'ORC',
1007+
compression-codec = 'SNAPPY',
10031008
location = '/var/example_tables/test_table',
10041009
orc_bloom_filter_columns = ARRAY['c1', 'c2'],
10051010
orc_bloom_filter_fpp = 0.05);

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,16 @@
2727

2828
import java.io.Closeable;
2929
import java.io.IOException;
30+
import java.util.HashMap;
3031
import java.util.List;
32+
import java.util.Map;
3133
import java.util.Optional;
3234

3335
import static io.airlift.slice.SizeOf.instanceSize;
3436
import static io.trino.plugin.iceberg.IcebergAvroDataConversion.toIcebergRecords;
3537
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_CLOSE_ERROR;
3638
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR;
39+
import static io.trino.plugin.iceberg.IcebergTableProperties.validateCompression;
3740
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
3841
import static java.util.Objects.requireNonNull;
3942
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
@@ -62,12 +65,17 @@ public IcebergAvroFileWriter(
6265
this.icebergSchema = requireNonNull(icebergSchema, "icebergSchema is null");
6366
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
6467

68+
validateCompression(IcebergFileFormat.AVRO, Optional.of(hiveCompressionCodec));
69+
70+
Map<String, String> compressionProperties = new HashMap<>();
71+
compressionProperties.put(AVRO_COMPRESSION, toIcebergAvroCompressionName(hiveCompressionCodec));
72+
6573
try {
6674
avroWriter = Avro.write(file)
6775
.schema(icebergSchema)
6876
.createWriterFunc(DataWriter::create)
6977
.named(AVRO_TABLE_NAME)
70-
.set(AVRO_COMPRESSION, toIcebergAvroCompressionName(hiveCompressionCodec))
78+
.setAll(compressionProperties)
7179
.build();
7280
}
7381
catch (IOException e) {

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import static io.trino.plugin.iceberg.IcebergSessionProperties.getParquetWriterPageValueCount;
7474
import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcWriterValidate;
7575
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP_PROPERTY;
76+
import static io.trino.plugin.iceberg.IcebergUtil.getHiveCompressionCodec;
7677
import static io.trino.plugin.iceberg.IcebergUtil.getOrcBloomFilterColumns;
7778
import static io.trino.plugin.iceberg.IcebergUtil.getOrcBloomFilterFpp;
7879
import static io.trino.plugin.iceberg.IcebergUtil.getParquetBloomFilterColumns;
@@ -131,7 +132,7 @@ public IcebergFileWriter createDataFileWriter(
131132
// TODO use metricsConfig https://github.com/trinodb/trino/issues/9791
132133
case PARQUET -> createParquetWriter(MetricsConfig.getDefault(), fileSystem, outputPath, icebergSchema, session, storageProperties);
133134
case ORC -> createOrcWriter(metricsConfig, fileSystem, outputPath, icebergSchema, session, storageProperties, getOrcStringStatisticsLimit(session));
134-
case AVRO -> createAvroWriter(fileSystem, outputPath, icebergSchema, session);
135+
case AVRO -> createAvroWriter(fileSystem, outputPath, icebergSchema, session, storageProperties);
135136
};
136137
}
137138

@@ -145,7 +146,7 @@ public IcebergFileWriter createPositionDeleteWriter(
145146
return switch (fileFormat) {
146147
case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties);
147148
case ORC -> createOrcWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE));
148-
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, session);
149+
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties);
149150
};
150151
}
151152

@@ -177,7 +178,9 @@ private IcebergFileWriter createParquetWriter(
177178
.setBloomFilterColumns(getParquetBloomFilterColumns(storageProperties))
178179
.build();
179180

180-
HiveCompressionCodec hiveCompressionCodec = toCompressionCodec(getCompressionCodec(session));
181+
HiveCompressionCodec compressionCodec = getHiveCompressionCodec(IcebergFileFormat.PARQUET, storageProperties)
182+
.orElseGet(() -> toCompressionCodec(getCompressionCodec(session)));
183+
181184
return new IcebergParquetFileWriter(
182185
metricsConfig,
183186
outputFile,
@@ -188,8 +191,8 @@ private IcebergFileWriter createParquetWriter(
188191
makeTypeMap(fileColumnTypes, fileColumnNames),
189192
parquetWriterOptions,
190193
IntStream.range(0, fileColumnNames.size()).toArray(),
191-
hiveCompressionCodec.getParquetCompressionCodec()
192-
.orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Compression codec %s not supported for Parquet".formatted(hiveCompressionCodec))),
194+
compressionCodec.getParquetCompressionCodec()
195+
.orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Compression codec %s not supported for Parquet".formatted(compressionCodec))),
193196
nodeVersion.toString());
194197
}
195198
catch (IOException e) {
@@ -233,6 +236,9 @@ private IcebergFileWriter createOrcWriter(
233236
});
234237
}
235238

239+
HiveCompressionCodec compressionCodec = getHiveCompressionCodec(IcebergFileFormat.ORC, storageProperties)
240+
.orElseGet(() -> toCompressionCodec(getCompressionCodec(session)));
241+
236242
return new IcebergOrcFileWriter(
237243
metricsConfig,
238244
icebergSchema,
@@ -241,7 +247,7 @@ private IcebergFileWriter createOrcWriter(
241247
fileColumnNames,
242248
fileColumnTypes,
243249
toOrcType(icebergSchema),
244-
toCompressionCodec(getCompressionCodec(session)).getOrcCompressionKind(),
250+
compressionCodec.getOrcCompressionKind(),
245251
withBloomFilterOptions(orcWriterOptions, storageProperties)
246252
.withStripeMinSize(getOrcWriterMinStripeSize(session))
247253
.withStripeMaxSize(getOrcWriterMaxStripeSize(session))
@@ -286,19 +292,23 @@ private IcebergFileWriter createAvroWriter(
286292
TrinoFileSystem fileSystem,
287293
Location outputPath,
288294
Schema icebergSchema,
289-
ConnectorSession session)
295+
ConnectorSession session,
296+
Map<String, String> storageProperties)
290297
{
291298
Closeable rollbackAction = () -> fileSystem.deleteFile(outputPath);
292299

293300
List<Type> columnTypes = icebergSchema.columns().stream()
294301
.map(column -> toTrinoType(column.type(), typeManager))
295302
.collect(toImmutableList());
296303

304+
HiveCompressionCodec compressionCodec = getHiveCompressionCodec(IcebergFileFormat.AVRO, storageProperties)
305+
.orElseGet(() -> toCompressionCodec(getCompressionCodec(session)));
306+
297307
return new IcebergAvroFileWriter(
298308
new ForwardingOutputFile(fileSystem, outputPath),
299309
rollbackAction,
300310
icebergSchema,
301311
columnTypes,
302-
toCompressionCodec(getCompressionCodec(session)));
312+
compressionCodec);
303313
}
304314
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import io.trino.plugin.base.filter.UtcConstraintExtractor;
4343
import io.trino.plugin.base.projection.ApplyProjectionUtil;
4444
import io.trino.plugin.base.projection.ApplyProjectionUtil.ProjectedColumnRepresentation;
45+
import io.trino.plugin.hive.HiveCompressionCodec;
4546
import io.trino.plugin.hive.HiveStorageFormat;
4647
import io.trino.plugin.hive.HiveWrittenPartitions;
4748
import io.trino.plugin.iceberg.aggregation.DataSketchStateSerializer;
@@ -292,6 +293,7 @@
292293
import static io.trino.plugin.iceberg.IcebergTableName.isIcebergTableName;
293294
import static io.trino.plugin.iceberg.IcebergTableName.isMaterializedViewStorage;
294295
import static io.trino.plugin.iceberg.IcebergTableName.tableNameFrom;
296+
import static io.trino.plugin.iceberg.IcebergTableProperties.COMPRESSION_CODEC;
295297
import static io.trino.plugin.iceberg.IcebergTableProperties.DATA_LOCATION_PROPERTY;
296298
import static io.trino.plugin.iceberg.IcebergTableProperties.EXTRA_PROPERTIES_PROPERTY;
297299
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
@@ -304,6 +306,7 @@
304306
import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
305307
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
306308
import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation;
309+
import static io.trino.plugin.iceberg.IcebergTableProperties.validateCompression;
307310
import static io.trino.plugin.iceberg.IcebergUtil.buildPath;
308311
import static io.trino.plugin.iceberg.IcebergUtil.canEnforceColumnConstraintInSpecs;
309312
import static io.trino.plugin.iceberg.IcebergUtil.checkFormatForProperty;
@@ -315,7 +318,9 @@
315318
import static io.trino.plugin.iceberg.IcebergUtil.firstSnapshotAfter;
316319
import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle;
317320
import static io.trino.plugin.iceberg.IcebergUtil.getColumnMetadatas;
321+
import static io.trino.plugin.iceberg.IcebergUtil.getCompressionPropertyName;
318322
import static io.trino.plugin.iceberg.IcebergUtil.getFileFormat;
323+
import static io.trino.plugin.iceberg.IcebergUtil.getHiveCompressionCodec;
319324
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableProperties;
320325
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys;
321326
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionValues;
@@ -421,6 +426,7 @@ public class IcebergMetadata
421426
.add(EXTRA_PROPERTIES_PROPERTY)
422427
.add(FILE_FORMAT_PROPERTY)
423428
.add(FORMAT_VERSION_PROPERTY)
429+
.add(COMPRESSION_CODEC)
424430
.add(MAX_COMMIT_RETRY)
425431
.add(OBJECT_STORE_LAYOUT_ENABLED_PROPERTY)
426432
.add(DATA_LOCATION_PROPERTY)
@@ -2463,10 +2469,13 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
24632469
}
24642470
}
24652471

2472+
IcebergFileFormat oldFileFormat = getFileFormat(icebergTable.properties());
2473+
IcebergFileFormat newFileFormat = getFileFormat(icebergTable.properties());
2474+
24662475
if (properties.containsKey(FILE_FORMAT_PROPERTY)) {
2467-
IcebergFileFormat fileFormat = (IcebergFileFormat) properties.get(FILE_FORMAT_PROPERTY)
2476+
newFileFormat = (IcebergFileFormat) properties.get(FILE_FORMAT_PROPERTY)
24682477
.orElseThrow(() -> new IllegalArgumentException("The format property cannot be empty"));
2469-
updateProperties.defaultFormat(fileFormat.toIceberg());
2478+
updateProperties.defaultFormat(newFileFormat.toIceberg());
24702479
}
24712480

24722481
if (properties.containsKey(FORMAT_VERSION_PROPERTY)) {
@@ -2476,6 +2485,14 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
24762485
updateProperties.set(FORMAT_VERSION, Integer.toString(formatVersion));
24772486
}
24782487

2488+
Map<String, String> propertiesForCompression = calculateTableCompressionProperties(oldFileFormat, newFileFormat, icebergTable.properties(), properties.entrySet().stream()
2489+
.filter(e -> e.getValue().isPresent())
2490+
.collect(toImmutableMap(
2491+
Map.Entry::getKey,
2492+
e -> e.getValue().get())));
2493+
2494+
propertiesForCompression.forEach(updateProperties::set);
2495+
24792496
if (properties.containsKey(MAX_COMMIT_RETRY)) {
24802497
int formatVersion = (int) properties.get(MAX_COMMIT_RETRY)
24812498
.orElseThrow(() -> new IllegalArgumentException("The max_commit_retry property cannot be empty"));
@@ -2531,6 +2548,22 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
25312548
commitTransaction(transaction, "set table properties");
25322549
}
25332550

2551+
public static Map<String, String> calculateTableCompressionProperties(IcebergFileFormat oldFileFormat, IcebergFileFormat newFileFormat, Map<String, String> existingProperties, Map<String, Object> inputProperties)
2552+
{
2553+
ImmutableMap.Builder<String, String> newCompressionProperties = ImmutableMap.builder();
2554+
2555+
Optional<HiveCompressionCodec> oldCompressionCodec = getHiveCompressionCodec(oldFileFormat, existingProperties);
2556+
Optional<HiveCompressionCodec> newCompressionCodec = IcebergTableProperties.getCompressionCodec(inputProperties);
2557+
2558+
Optional<HiveCompressionCodec> compressionCodec = newCompressionCodec.or(() -> oldCompressionCodec);
2559+
2560+
validateCompression(newFileFormat, compressionCodec);
2561+
2562+
compressionCodec.ifPresent(hiveCompressionCodec -> newCompressionProperties.put(getCompressionPropertyName(newFileFormat), hiveCompressionCodec.name()));
2563+
2564+
return newCompressionProperties.buildOrThrow();
2565+
}
2566+
25342567
private static void updatePartitioning(Table icebergTable, Transaction transaction, List<String> partitionColumns)
25352568
{
25362569
UpdatePartitionSpec updatePartitionSpec = transaction.updateSpec();

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergTableProperties.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616
import com.google.common.collect.ImmutableList;
1717
import com.google.common.collect.ImmutableSet;
1818
import com.google.inject.Inject;
19+
import io.trino.plugin.hive.HiveCompressionCodec;
20+
import io.trino.plugin.hive.HiveCompressionCodecs;
21+
import io.trino.plugin.hive.HiveCompressionOption;
1922
import io.trino.plugin.hive.orc.OrcWriterConfig;
2023
import io.trino.spi.TrinoException;
2124
import io.trino.spi.session.PropertyMetadata;
@@ -33,7 +36,10 @@
3336
import static com.google.common.collect.ImmutableMap.toImmutableMap;
3437
import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MAX;
3538
import static io.trino.plugin.iceberg.IcebergConfig.FORMAT_VERSION_SUPPORT_MIN;
39+
import static io.trino.plugin.iceberg.IcebergFileFormat.AVRO;
40+
import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET;
3641
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
42+
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
3743
import static io.trino.spi.session.PropertyMetadata.booleanProperty;
3844
import static io.trino.spi.session.PropertyMetadata.doubleProperty;
3945
import static io.trino.spi.session.PropertyMetadata.enumProperty;
@@ -56,6 +62,9 @@ public class IcebergTableProperties
5662
public static final String SORTED_BY_PROPERTY = "sorted_by";
5763
public static final String LOCATION_PROPERTY = "location";
5864
public static final String FORMAT_VERSION_PROPERTY = "format_version";
65+
66+
public static final String COMPRESSION_CODEC = "compression_codec";
67+
5968
public static final String MAX_COMMIT_RETRY = "max_commit_retry";
6069
public static final String ORC_BLOOM_FILTER_COLUMNS_PROPERTY = "orc_bloom_filter_columns";
6170
public static final String ORC_BLOOM_FILTER_FPP_PROPERTY = "orc_bloom_filter_fpp";
@@ -66,6 +75,7 @@ public class IcebergTableProperties
6675

6776
public static final Set<String> SUPPORTED_PROPERTIES = ImmutableSet.<String>builder()
6877
.add(FILE_FORMAT_PROPERTY)
78+
.add(COMPRESSION_CODEC)
6979
.add(PARTITIONING_PROPERTY)
7080
.add(SORTED_BY_PROPERTY)
7181
.add(LOCATION_PROPERTY)
@@ -103,6 +113,12 @@ public IcebergTableProperties(
103113
IcebergFileFormat.class,
104114
icebergConfig.getFileFormat(),
105115
false))
116+
.add(enumProperty(
117+
COMPRESSION_CODEC,
118+
"Write compression codec for the table",
119+
HiveCompressionOption.class,
120+
null,
121+
false))
106122
.add(new PropertyMetadata<>(
107123
PARTITIONING_PROPERTY,
108124
"Partition transforms",
@@ -220,6 +236,12 @@ public static IcebergFileFormat getFileFormat(Map<String, Object> tablePropertie
220236
return (IcebergFileFormat) tableProperties.get(FILE_FORMAT_PROPERTY);
221237
}
222238

239+
public static Optional<HiveCompressionCodec> getCompressionCodec(Map<String, Object> inputProperties)
240+
{
241+
return Optional.ofNullable((HiveCompressionOption) inputProperties.get(COMPRESSION_CODEC))
242+
.map(HiveCompressionCodecs::toCompressionCodec);
243+
}
244+
223245
@SuppressWarnings("unchecked")
224246
public static List<String> getPartitioning(Map<String, Object> tableProperties)
225247
{
@@ -252,6 +274,25 @@ private static void validateFormatVersion(int version)
252274
}
253275
}
254276

277+
public static void validateCompression(IcebergFileFormat fileFormat, Optional<HiveCompressionCodec> compressionCodec)
278+
{
279+
if (compressionCodec.isPresent()) {
280+
if (!isCompressionCodecSupportedForFormat(fileFormat, compressionCodec.get())) {
281+
throw new TrinoException(NOT_SUPPORTED, format("Compression codec %s not supported for %s", compressionCodec.get(), fileFormat.humanName()));
282+
}
283+
}
284+
}
285+
286+
public static boolean isCompressionCodecSupportedForFormat(IcebergFileFormat fileFormat, HiveCompressionCodec codec)
287+
{
288+
switch (codec) {
289+
case LZ4:
290+
return !(fileFormat == AVRO || fileFormat == PARQUET);
291+
default:
292+
return true;
293+
}
294+
}
295+
255296
public static int getMaxCommitRetry(Map<String, Object> tableProperties)
256297
{
257298
return (int) tableProperties.getOrDefault(MAX_COMMIT_RETRY, COMMIT_NUM_RETRIES_DEFAULT);

0 commit comments

Comments
 (0)