Skip to content

Commit 5f4b492

Browse files
committed
Move computing of decompressor retained size
1 parent 3d0743d commit 5f4b492

File tree

2 files changed

+18
-5
lines changed

2 files changed

+18
-5
lines changed

core/trino-main/src/main/java/io/trino/execution/buffer/CompressingDecryptingPageDeserializer.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515

1616
import com.google.common.base.VerifyException;
1717
import io.airlift.compress.v3.Decompressor;
18-
import io.airlift.compress.v3.lz4.Lz4Decompressor;
1918
import io.airlift.slice.Slice;
2019
import io.airlift.slice.SliceInput;
2120
import io.airlift.slice.Slices;
@@ -63,6 +62,7 @@ public class CompressingDecryptingPageDeserializer
6362
public CompressingDecryptingPageDeserializer(
6463
BlockEncodingSerde blockEncodingSerde,
6564
Optional<Decompressor> decompressor,
65+
int decompressorRetainedSize,
6666
Optional<SecretKey> encryptionKey,
6767
int blockSizeInBytes,
6868
OptionalInt maxCompressedBlockSizeInBytes)
@@ -72,6 +72,7 @@ public CompressingDecryptingPageDeserializer(
7272
encryptionKey.ifPresent(secretKey -> checkArgument(is256BitSecretKeySpec(secretKey), "encryptionKey is expected to be an instance of SecretKeySpec containing a 256bit key"));
7373
input = new SerializedPageInput(
7474
requireNonNull(decompressor, "decompressor is null"),
75+
decompressorRetainedSize,
7576
encryptionKey,
7677
blockSizeInBytes,
7778
maxCompressedBlockSizeInBytes);
@@ -96,19 +97,19 @@ private static class SerializedPageInput
9697
extends SliceInput
9798
{
9899
private static final int INSTANCE_SIZE = instanceSize(SerializedPageInput.class);
99-
// TODO: implement getRetainedSizeInBytes in Lz4Decompressor
100-
private static final int DECOMPRESSOR_RETAINED_SIZE = instanceSize(Lz4Decompressor.class);
101100
private static final int ENCRYPTION_KEY_RETAINED_SIZE = toIntExact(instanceSize(SecretKeySpec.class) + sizeOfByteArray(256 / 8));
102101

103102
private final Optional<Decompressor> decompressor;
103+
private final int decompressorRetainedSize;
104104
private final Optional<SecretKey> encryptionKey;
105105
private final Optional<Cipher> cipher;
106106

107107
private final ReadBuffer[] buffers;
108108

109-
private SerializedPageInput(Optional<Decompressor> decompressor, Optional<SecretKey> encryptionKey, int blockSizeInBytes, OptionalInt maxCompressedBlockSizeInBytes)
109+
private SerializedPageInput(Optional<Decompressor> decompressor, int decompressorRetainedSize, Optional<SecretKey> encryptionKey, int blockSizeInBytes, OptionalInt maxCompressedBlockSizeInBytes)
110110
{
111111
this.decompressor = requireNonNull(decompressor, "decompressor is null");
112+
this.decompressorRetainedSize = decompressorRetainedSize;
112113
this.encryptionKey = requireNonNull(encryptionKey, "encryptionKey is null");
113114

114115
buffers = new ReadBuffer[
@@ -502,7 +503,7 @@ public int skipBytes(int length)
502503
public long getRetainedSize()
503504
{
504505
long size = INSTANCE_SIZE;
505-
size += sizeOf(decompressor, compressor -> DECOMPRESSOR_RETAINED_SIZE);
506+
size += sizeOf(decompressor, compressor -> decompressorRetainedSize);
506507
size += sizeOf(encryptionKey, encryptionKey -> ENCRYPTION_KEY_RETAINED_SIZE);
507508
size += sizeOf(cipher, cipher -> ESTIMATED_AES_CIPHER_RETAINED_SIZE);
508509
for (ReadBuffer input : buffers) {

core/trino-main/src/main/java/io/trino/execution/buffer/PagesSerdeFactory.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.util.Optional;
2828
import java.util.OptionalInt;
2929

30+
import static io.airlift.slice.SizeOf.instanceSize;
3031
import static io.trino.execution.buffer.CompressionCodec.LZ4;
3132
import static io.trino.execution.buffer.CompressionCodec.ZSTD;
3233
import static java.util.Objects.requireNonNull;
@@ -66,6 +67,7 @@ public PageDeserializer createDeserializer(Optional<SecretKey> encryptionKey)
6667
return new CompressingDecryptingPageDeserializer(
6768
blockEncodingSerde,
6869
createDecompressor(compressionCodec),
70+
decompressorRetainedSize(compressionCodec),
6971
encryptionKey,
7072
blockSizeInBytes,
7173
maxCompressedSize(blockSizeInBytes, compressionCodec));
@@ -97,4 +99,14 @@ private static OptionalInt maxCompressedSize(int uncompressedSize, CompressionCo
9799
case ZSTD -> ZSTD.maxCompressedLength(uncompressedSize);
98100
};
99101
}
102+
103+
private static int decompressorRetainedSize(CompressionCodec compressionCodec)
104+
{
105+
// TODO: implement getRetainedSizeInBytes in Lz4Decompressor and ZstdDecompressor
106+
return switch (compressionCodec) {
107+
case NONE -> 0;
108+
case LZ4 -> instanceSize(Lz4Decompressor.class);
109+
case ZSTD -> instanceSize(ZstdDecompressor.class);
110+
};
111+
}
100112
}

0 commit comments

Comments
 (0)