42
42
import io .trino .plugin .base .filter .UtcConstraintExtractor ;
43
43
import io .trino .plugin .base .projection .ApplyProjectionUtil ;
44
44
import io .trino .plugin .base .projection .ApplyProjectionUtil .ProjectedColumnRepresentation ;
45
+ import io .trino .plugin .hive .HiveCompressionCodec ;
45
46
import io .trino .plugin .hive .HiveStorageFormat ;
46
47
import io .trino .plugin .hive .HiveWrittenPartitions ;
47
48
import io .trino .plugin .iceberg .aggregation .DataSketchStateSerializer ;
209
210
import java .util .List ;
210
211
import java .util .Map ;
211
212
import java .util .Optional ;
213
+ import java .util .OptionalInt ;
212
214
import java .util .OptionalLong ;
213
215
import java .util .Set ;
214
216
import java .util .concurrent .Callable ;
292
294
import static io .trino .plugin .iceberg .IcebergTableName .isIcebergTableName ;
293
295
import static io .trino .plugin .iceberg .IcebergTableName .isMaterializedViewStorage ;
294
296
import static io .trino .plugin .iceberg .IcebergTableName .tableNameFrom ;
297
+ import static io .trino .plugin .iceberg .IcebergTableProperties .COMPRESSION_LEVEL ;
295
298
import static io .trino .plugin .iceberg .IcebergTableProperties .DATA_LOCATION_PROPERTY ;
296
299
import static io .trino .plugin .iceberg .IcebergTableProperties .EXTRA_PROPERTIES_PROPERTY ;
297
300
import static io .trino .plugin .iceberg .IcebergTableProperties .FILE_FORMAT_PROPERTY ;
302
305
import static io .trino .plugin .iceberg .IcebergTableProperties .PARQUET_BLOOM_FILTER_COLUMNS_PROPERTY ;
303
306
import static io .trino .plugin .iceberg .IcebergTableProperties .PARTITIONING_PROPERTY ;
304
307
import static io .trino .plugin .iceberg .IcebergTableProperties .SORTED_BY_PROPERTY ;
308
+ import static io .trino .plugin .iceberg .IcebergTableProperties .WRITE_COMPRESSION ;
305
309
import static io .trino .plugin .iceberg .IcebergTableProperties .getPartitioning ;
306
310
import static io .trino .plugin .iceberg .IcebergTableProperties .getTableLocation ;
311
+ import static io .trino .plugin .iceberg .IcebergTableProperties .validateCompression ;
307
312
import static io .trino .plugin .iceberg .IcebergUtil .buildPath ;
308
313
import static io .trino .plugin .iceberg .IcebergUtil .canEnforceColumnConstraintInSpecs ;
309
314
import static io .trino .plugin .iceberg .IcebergUtil .checkFormatForProperty ;
315
320
import static io .trino .plugin .iceberg .IcebergUtil .firstSnapshotAfter ;
316
321
import static io .trino .plugin .iceberg .IcebergUtil .getColumnHandle ;
317
322
import static io .trino .plugin .iceberg .IcebergUtil .getColumnMetadatas ;
323
+ import static io .trino .plugin .iceberg .IcebergUtil .getCompressionLevel ;
324
+ import static io .trino .plugin .iceberg .IcebergUtil .getCompressionLevelName ;
325
+ import static io .trino .plugin .iceberg .IcebergUtil .getCompressionPropertyName ;
318
326
import static io .trino .plugin .iceberg .IcebergUtil .getFileFormat ;
327
+ import static io .trino .plugin .iceberg .IcebergUtil .getHiveCompressionCodec ;
319
328
import static io .trino .plugin .iceberg .IcebergUtil .getIcebergTableProperties ;
320
329
import static io .trino .plugin .iceberg .IcebergUtil .getPartitionKeys ;
321
330
import static io .trino .plugin .iceberg .IcebergUtil .getPartitionValues ;
@@ -421,6 +430,8 @@ public class IcebergMetadata
421
430
.add (EXTRA_PROPERTIES_PROPERTY )
422
431
.add (FILE_FORMAT_PROPERTY )
423
432
.add (FORMAT_VERSION_PROPERTY )
433
+ .add (WRITE_COMPRESSION )
434
+ .add (COMPRESSION_LEVEL )
424
435
.add (MAX_COMMIT_RETRY )
425
436
.add (OBJECT_STORE_LAYOUT_ENABLED_PROPERTY )
426
437
.add (DATA_LOCATION_PROPERTY )
@@ -1281,6 +1292,7 @@ public ConnectorOutputTableHandle beginCreateTable(ConnectorSession session, Con
1281
1292
.orElseGet (() -> catalog .defaultTableLocation (session , tableMetadata .getTable ()));
1282
1293
}
1283
1294
transaction = newCreateTableTransaction (catalog , tableMetadata , session , replace , tableLocation , allowedExtraProperties );
1295
+
1284
1296
Location location = Location .of (transaction .table ().location ());
1285
1297
try {
1286
1298
// S3 Tables internally assigns a unique location for each table
@@ -2463,10 +2475,13 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
2463
2475
}
2464
2476
}
2465
2477
2478
+ IcebergFileFormat oldFileFormat = getFileFormat (icebergTable .properties ());
2479
+ IcebergFileFormat newFileFormat = getFileFormat (icebergTable .properties ());
2480
+
2466
2481
if (properties .containsKey (FILE_FORMAT_PROPERTY )) {
2467
- IcebergFileFormat fileFormat = (IcebergFileFormat ) properties .get (FILE_FORMAT_PROPERTY )
2482
+ newFileFormat = (IcebergFileFormat ) properties .get (FILE_FORMAT_PROPERTY )
2468
2483
.orElseThrow (() -> new IllegalArgumentException ("The format property cannot be empty" ));
2469
- updateProperties .defaultFormat (fileFormat .toIceberg ());
2484
+ updateProperties .defaultFormat (newFileFormat .toIceberg ());
2470
2485
}
2471
2486
2472
2487
if (properties .containsKey (FORMAT_VERSION_PROPERTY )) {
@@ -2476,6 +2491,14 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
2476
2491
updateProperties .set (FORMAT_VERSION , Integer .toString (formatVersion ));
2477
2492
}
2478
2493
2494
+ Map <String , String > propertiesForCompression = calculateTableCompressionProperties (oldFileFormat , newFileFormat , icebergTable .properties (), properties .entrySet ().stream ()
2495
+ .filter (e -> e .getValue ().isPresent ())
2496
+ .collect (toImmutableMap (
2497
+ Map .Entry ::getKey ,
2498
+ e -> e .getValue ().get ())));
2499
+
2500
+ propertiesForCompression .forEach (updateProperties ::set );
2501
+
2479
2502
if (properties .containsKey (MAX_COMMIT_RETRY )) {
2480
2503
int formatVersion = (int ) properties .get (MAX_COMMIT_RETRY )
2481
2504
.orElseThrow (() -> new IllegalArgumentException ("The max_commit_retry property cannot be empty" ));
@@ -2531,6 +2554,29 @@ public void setTableProperties(ConnectorSession session, ConnectorTableHandle ta
2531
2554
commitTransaction (transaction , "set table properties" );
2532
2555
}
2533
2556
2557
+ public static Map <String , String > calculateTableCompressionProperties (IcebergFileFormat oldFileFormat , IcebergFileFormat newFileFormat , Map <String , String > existingProperties , Map <String , Object > inputProperties )
2558
+ {
2559
+ ImmutableMap .Builder <String , String > newCompressionProperties = ImmutableMap .builder ();
2560
+
2561
+ Optional <HiveCompressionCodec > oldCompressionCodec = getHiveCompressionCodec (oldFileFormat , existingProperties );
2562
+ Optional <HiveCompressionCodec > newCompressionCodec = IcebergTableProperties .getHiveCompressionCodec (inputProperties );
2563
+
2564
+ Optional <HiveCompressionCodec > compressionCodec = newCompressionCodec .or (() -> oldCompressionCodec );
2565
+
2566
+ OptionalInt oldCompressionLevel = getCompressionLevel (oldFileFormat , existingProperties );
2567
+ OptionalInt newCompressionLevel = IcebergTableProperties .getCompressionLevel (inputProperties );
2568
+
2569
+ OptionalInt compressionLevel = newCompressionLevel .isPresent () ? newCompressionLevel : oldCompressionLevel ;
2570
+
2571
+ validateCompression (newFileFormat , compressionCodec , compressionLevel );
2572
+
2573
+ compressionCodec .ifPresent (hiveCompressionCodec -> newCompressionProperties .put (getCompressionPropertyName (newFileFormat ), hiveCompressionCodec .name ()));
2574
+
2575
+ compressionLevel .ifPresent (level -> newCompressionProperties .put (getCompressionLevelName (newFileFormat ), String .valueOf (level )));
2576
+
2577
+ return newCompressionProperties .buildOrThrow ();
2578
+ }
2579
+
2534
2580
private static void updatePartitioning (Table icebergTable , Transaction transaction , List <String > partitionColumns )
2535
2581
{
2536
2582
UpdatePartitionSpec updatePartitionSpec = transaction .updateSpec ();
0 commit comments