Skip to content

Commit ffaf8b7

Browse files
committed
Support setting write_compression and compression_level table properties for Iceberg Connector
1 parent 618d44b commit ffaf8b7

File tree

9 files changed

+714
-105
lines changed

9 files changed

+714
-105
lines changed

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergAvroFileWriter.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,16 +27,21 @@
2727

2828
import java.io.Closeable;
2929
import java.io.IOException;
30+
import java.util.HashMap;
3031
import java.util.List;
32+
import java.util.Map;
3133
import java.util.Optional;
34+
import java.util.OptionalInt;
3235

3336
import static io.airlift.slice.SizeOf.instanceSize;
3437
import static io.trino.plugin.iceberg.IcebergAvroDataConversion.toIcebergRecords;
3538
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_CLOSE_ERROR;
3639
import static io.trino.plugin.iceberg.IcebergErrorCode.ICEBERG_WRITER_OPEN_ERROR;
40+
import static io.trino.plugin.iceberg.IcebergTableProperties.validateCompression;
3741
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
3842
import static java.util.Objects.requireNonNull;
3943
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION;
44+
import static org.apache.iceberg.TableProperties.AVRO_COMPRESSION_LEVEL;
4045

4146
public final class IcebergAvroFileWriter
4247
implements IcebergFileWriter
@@ -56,18 +61,25 @@ public IcebergAvroFileWriter(
5661
Closeable rollbackAction,
5762
Schema icebergSchema,
5863
List<Type> types,
59-
HiveCompressionCodec hiveCompressionCodec)
64+
HiveCompressionCodec hiveCompressionCodec,
65+
OptionalInt compressionLevel)
6066
{
6167
this.rollbackAction = requireNonNull(rollbackAction, "rollbackAction null");
6268
this.icebergSchema = requireNonNull(icebergSchema, "icebergSchema is null");
6369
this.types = ImmutableList.copyOf(requireNonNull(types, "types is null"));
6470

71+
validateCompression(IcebergFileFormat.AVRO, Optional.of(hiveCompressionCodec), compressionLevel);
72+
73+
Map<String, String> compressionProperties = new HashMap<>();
74+
compressionProperties.put(AVRO_COMPRESSION, toIcebergAvroCompressionName(hiveCompressionCodec));
75+
compressionLevel.ifPresent(level -> compressionProperties.put(AVRO_COMPRESSION_LEVEL, Integer.toString(level)));
76+
6577
try {
6678
avroWriter = Avro.write(file)
6779
.schema(icebergSchema)
6880
.createWriterFunc(DataWriter::create)
6981
.named(AVRO_TABLE_NAME)
70-
.set(AVRO_COMPRESSION, toIcebergAvroCompressionName(hiveCompressionCodec))
82+
.setAll(compressionProperties)
7183
.build();
7284
}
7385
catch (IOException e) {

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergFileWriterFactory.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@
4848
import java.util.List;
4949
import java.util.Map;
5050
import java.util.Optional;
51+
import java.util.OptionalInt;
5152
import java.util.function.Supplier;
5253
import java.util.stream.IntStream;
5354

@@ -73,6 +74,8 @@
7374
import static io.trino.plugin.iceberg.IcebergSessionProperties.getParquetWriterPageValueCount;
7475
import static io.trino.plugin.iceberg.IcebergSessionProperties.isOrcWriterValidate;
7576
import static io.trino.plugin.iceberg.IcebergTableProperties.ORC_BLOOM_FILTER_FPP_PROPERTY;
77+
import static io.trino.plugin.iceberg.IcebergUtil.getCompressionLevel;
78+
import static io.trino.plugin.iceberg.IcebergUtil.getHiveCompressionCodec;
7679
import static io.trino.plugin.iceberg.IcebergUtil.getOrcBloomFilterColumns;
7780
import static io.trino.plugin.iceberg.IcebergUtil.getOrcBloomFilterFpp;
7881
import static io.trino.plugin.iceberg.IcebergUtil.getParquetBloomFilterColumns;
@@ -131,7 +134,7 @@ public IcebergFileWriter createDataFileWriter(
131134
// TODO use metricsConfig https://github.com/trinodb/trino/issues/9791
132135
case PARQUET -> createParquetWriter(MetricsConfig.getDefault(), fileSystem, outputPath, icebergSchema, session, storageProperties);
133136
case ORC -> createOrcWriter(metricsConfig, fileSystem, outputPath, icebergSchema, session, storageProperties, getOrcStringStatisticsLimit(session));
134-
case AVRO -> createAvroWriter(fileSystem, outputPath, icebergSchema, session);
137+
case AVRO -> createAvroWriter(fileSystem, outputPath, icebergSchema, session, storageProperties);
135138
};
136139
}
137140

@@ -145,7 +148,7 @@ public IcebergFileWriter createPositionDeleteWriter(
145148
return switch (fileFormat) {
146149
case PARQUET -> createParquetWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties);
147150
case ORC -> createOrcWriter(FULL_METRICS_CONFIG, fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties, DataSize.ofBytes(Integer.MAX_VALUE));
148-
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, session);
151+
case AVRO -> createAvroWriter(fileSystem, outputPath, POSITION_DELETE_SCHEMA, session, storageProperties);
149152
};
150153
}
151154

@@ -177,7 +180,9 @@ private IcebergFileWriter createParquetWriter(
177180
.setBloomFilterColumns(getParquetBloomFilterColumns(storageProperties))
178181
.build();
179182

180-
HiveCompressionCodec hiveCompressionCodec = toCompressionCodec(getCompressionCodec(session));
183+
HiveCompressionCodec compressionCodec = getHiveCompressionCodec(IcebergFileFormat.PARQUET, storageProperties)
184+
.orElseGet(() -> toCompressionCodec(getCompressionCodec(session)));
185+
181186
return new IcebergParquetFileWriter(
182187
metricsConfig,
183188
outputFile,
@@ -188,8 +193,8 @@ private IcebergFileWriter createParquetWriter(
188193
makeTypeMap(fileColumnTypes, fileColumnNames),
189194
parquetWriterOptions,
190195
IntStream.range(0, fileColumnNames.size()).toArray(),
191-
hiveCompressionCodec.getParquetCompressionCodec()
192-
.orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Compression codec %s not supported for Parquet".formatted(hiveCompressionCodec))),
196+
compressionCodec.getParquetCompressionCodec()
197+
.orElseThrow(() -> new TrinoException(NOT_SUPPORTED, "Compression codec %s not supported for Parquet".formatted(compressionCodec))),
193198
nodeVersion.toString());
194199
}
195200
catch (IOException e) {
@@ -233,6 +238,9 @@ private IcebergFileWriter createOrcWriter(
233238
});
234239
}
235240

241+
HiveCompressionCodec compressionCodec = getHiveCompressionCodec(IcebergFileFormat.ORC, storageProperties)
242+
.orElseGet(() -> toCompressionCodec(getCompressionCodec(session)));
243+
236244
return new IcebergOrcFileWriter(
237245
metricsConfig,
238246
icebergSchema,
@@ -241,7 +249,7 @@ private IcebergFileWriter createOrcWriter(
241249
fileColumnNames,
242250
fileColumnTypes,
243251
toOrcType(icebergSchema),
244-
toCompressionCodec(getCompressionCodec(session)).getOrcCompressionKind(),
252+
compressionCodec.getOrcCompressionKind(),
245253
withBloomFilterOptions(orcWriterOptions, storageProperties)
246254
.withStripeMinSize(getOrcWriterMinStripeSize(session))
247255
.withStripeMaxSize(getOrcWriterMaxStripeSize(session))
@@ -286,19 +294,25 @@ private IcebergFileWriter createAvroWriter(
286294
TrinoFileSystem fileSystem,
287295
Location outputPath,
288296
Schema icebergSchema,
289-
ConnectorSession session)
297+
ConnectorSession session,
298+
Map<String, String> storageProperties)
290299
{
291300
Closeable rollbackAction = () -> fileSystem.deleteFile(outputPath);
292301

293302
List<Type> columnTypes = icebergSchema.columns().stream()
294303
.map(column -> toTrinoType(column.type(), typeManager))
295304
.collect(toImmutableList());
296305

306+
HiveCompressionCodec compressionCodec = getHiveCompressionCodec(IcebergFileFormat.AVRO, storageProperties)
307+
.orElseGet(() -> toCompressionCodec(getCompressionCodec(session)));
308+
OptionalInt compressionLevel = getCompressionLevel(IcebergFileFormat.AVRO, storageProperties);
309+
297310
return new IcebergAvroFileWriter(
298311
new ForwardingOutputFile(fileSystem, outputPath),
299312
rollbackAction,
300313
icebergSchema,
301314
columnTypes,
302-
toCompressionCodec(getCompressionCodec(session)));
315+
compressionCodec,
316+
compressionLevel);
303317
}
304318
}

plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import io.trino.plugin.base.filter.UtcConstraintExtractor;
4343
import io.trino.plugin.base.projection.ApplyProjectionUtil;
4444
import io.trino.plugin.base.projection.ApplyProjectionUtil.ProjectedColumnRepresentation;
45+
import io.trino.plugin.hive.HiveCompressionCodec;
4546
import io.trino.plugin.hive.HiveStorageFormat;
4647
import io.trino.plugin.hive.HiveWrittenPartitions;
4748
import io.trino.plugin.iceberg.aggregation.DataSketchStateSerializer;
@@ -209,6 +210,7 @@
209210
import java.util.List;
210211
import java.util.Map;
211212
import java.util.Optional;
213+
import java.util.OptionalInt;
212214
import java.util.OptionalLong;
213215
import java.util.Set;
214216
import java.util.concurrent.Callable;
@@ -292,6 +294,7 @@
292294
import static io.trino.plugin.iceberg.IcebergTableName.isIcebergTableName;
293295
import static io.trino.plugin.iceberg.IcebergTableName.isMaterializedViewStorage;
294296
import static io.trino.plugin.iceberg.IcebergTableName.tableNameFrom;
297+
import static io.trino.plugin.iceberg.IcebergTableProperties.COMPRESSION_LEVEL;
295298
import static io.trino.plugin.iceberg.IcebergTableProperties.DATA_LOCATION_PROPERTY;
296299
import static io.trino.plugin.iceberg.IcebergTableProperties.EXTRA_PROPERTIES_PROPERTY;
297300
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
@@ -302,8 +305,10 @@
302305
import static io.trino.plugin.iceberg.IcebergTableProperties.PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY;
303306
import static io.trino.plugin.iceberg.IcebergTableProperties.PARTITIONING_PROPERTY;
304307
import static io.trino.plugin.iceberg.IcebergTableProperties.SORTED_BY_PROPERTY;
308+
import static io.trino.plugin.iceberg.IcebergTableProperties.WRITE_COMPRESSION;
305309
import static io.trino.plugin.iceberg.IcebergTableProperties.getPartitioning;
306310
import static io.trino.plugin.iceberg.IcebergTableProperties.getTableLocation;
311+
import static io.trino.plugin.iceberg.IcebergTableProperties.validateCompression;
307312
import static io.trino.plugin.iceberg.IcebergUtil.buildPath;
308313
import static io.trino.plugin.iceberg.IcebergUtil.canEnforceColumnConstraintInSpecs;
309314
import static io.trino.plugin.iceberg.IcebergUtil.checkFormatForProperty;
@@ -315,7 +320,11 @@
315320
import static io.trino.plugin.iceberg.IcebergUtil.firstSnapshotAfter;
316321
import static io.trino.plugin.iceberg.IcebergUtil.getColumnHandle;
317322
import static io.trino.plugin.iceberg.IcebergUtil.getColumnMetadatas;
323+
import static io.trino.plugin.iceberg.IcebergUtil.getCompressionLevel;
324+
import static io.trino.plugin.iceberg.IcebergUtil.getCompressionLevelName;
325+
import static io.trino.plugin.iceberg.IcebergUtil.getCompressionPropertyName;
318326
import static io.trino.plugin.iceberg.IcebergUtil.getFileFormat;
327+
import static io.trino.plugin.iceberg.IcebergUtil.getHiveCompressionCodec;
319328
import static io.trino.plugin.iceberg.IcebergUtil.getIcebergTableProperties;
320329
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionKeys;
321330
import static io.trino.plugin.iceberg.IcebergUtil.getPartitionValues;
@@ -421,6 +430,8 @@ public class IcebergMetadata
421430
.add(EXTRA_PROPERTIES_PROPERTY)
422431
.add(FILE_FORMAT_PROPERTY)
423432
.add(FORMAT_VERSION_PROPERTY)
433+
.add(WRITE_COMPRESSION)
434+
.add(COMPRESSION_LEVEL)
424435
.add(MAX_COMMIT_RETRY)
425436
.add(OBJECT_STORE_LAYOUT_ENABLED_PROPERTY)
426437
.add(DATA_LOCATION_PROPERTY)
@@ -2463,10 +2474,13 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
24632474
}
24642475
}
24652476

2477+
IcebergFileFormat oldFileFormat = getFileFormat(icebergTable.properties());
2478+
IcebergFileFormat newFileFormat = getFileFormat(icebergTable.properties());
2479+
24662480
if (properties.containsKey(FILE_FORMAT_PROPERTY)) {
2467-
IcebergFileFormat fileFormat = (IcebergFileFormat) properties.get(FILE_FORMAT_PROPERTY)
2481+
newFileFormat = (IcebergFileFormat) properties.get(FILE_FORMAT_PROPERTY)
24682482
.orElseThrow(() -> new IllegalArgumentException("The format property cannot be empty"));
2469-
updateProperties.defaultFormat(fileFormat.toIceberg());
2483+
updateProperties.defaultFormat(newFileFormat.toIceberg());
24702484
}
24712485

24722486
if (properties.containsKey(FORMAT_VERSION_PROPERTY)) {
@@ -2476,6 +2490,14 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
24762490
updateProperties.set(FORMAT_VERSION, Integer.toString(formatVersion));
24772491
}
24782492

2493+
Map<String, String> propertiesForCompression = calculateTableCompressionProperties(oldFileFormat, newFileFormat, icebergTable.properties(), properties.entrySet().stream()
2494+
.filter(e -> e.getValue().isPresent())
2495+
.collect(toImmutableMap(
2496+
Map.Entry::getKey,
2497+
e -> e.getValue().get())));
2498+
2499+
propertiesForCompression.forEach(updateProperties::set);
2500+
24792501
if (properties.containsKey(MAX_COMMIT_RETRY)) {
24802502
int formatVersion = (int) properties.get(MAX_COMMIT_RETRY)
24812503
.orElseThrow(() -> new IllegalArgumentException("The max_commit_retry property cannot be empty"));
@@ -2531,6 +2553,29 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
25312553
commitTransaction(transaction, "set table properties");
25322554
}
25332555

2556+
public static Map<String, String> calculateTableCompressionProperties(IcebergFileFormat oldFileFormat, IcebergFileFormat newFileFormat, Map<String, String> existingProperties, Map<String, Object> inputProperties)
2557+
{
2558+
ImmutableMap.Builder<String, String> newCompressionProperties = ImmutableMap.builder();
2559+
2560+
Optional<HiveCompressionCodec> oldCompressionCodec = getHiveCompressionCodec(oldFileFormat, existingProperties);
2561+
Optional<HiveCompressionCodec> newCompressionCodec = IcebergTableProperties.getHiveCompressionCodec(inputProperties);
2562+
2563+
Optional<HiveCompressionCodec> compressionCodec = newCompressionCodec.or(() -> oldCompressionCodec);
2564+
2565+
OptionalInt oldCompressionLevel = getCompressionLevel(oldFileFormat, existingProperties);
2566+
OptionalInt newCompressionLevel = IcebergTableProperties.getCompressionLevel(inputProperties);
2567+
2568+
OptionalInt compressionLevel = newCompressionLevel.isPresent() ? newCompressionLevel : oldCompressionLevel;
2569+
2570+
validateCompression(newFileFormat, compressionCodec, compressionLevel);
2571+
2572+
compressionCodec.ifPresent(hiveCompressionCodec -> newCompressionProperties.put(getCompressionPropertyName(newFileFormat), hiveCompressionCodec.name()));
2573+
2574+
compressionLevel.ifPresent(level -> newCompressionProperties.put(getCompressionLevelName(newFileFormat), String.valueOf(level)));
2575+
2576+
return newCompressionProperties.buildOrThrow();
2577+
}
2578+
25342579
private static void updatePartitioning(Table icebergTable, Transaction transaction, List<String> partitionColumns)
25352580
{
25362581
UpdatePartitionSpec updatePartitionSpec = transaction.updateSpec();

0 commit comments

Comments
 (0)