diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/BaseTransactionsTable.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/BaseTransactionsTable.java index 387d1f0d0885..9f4febdcd0bc 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/BaseTransactionsTable.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/BaseTransactionsTable.java @@ -13,14 +13,12 @@ */ package io.trino.plugin.deltalake; -import com.google.common.collect.ImmutableList; import io.airlift.units.DataSize; import io.trino.filesystem.TrinoFileSystem; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.deltalake.transactionlog.TableSnapshot; import io.trino.plugin.deltalake.transactionlog.Transaction; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; -import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries; import io.trino.plugin.deltalake.util.PageListBuilder; import io.trino.spi.Page; import io.trino.spi.TrinoException; @@ -45,8 +43,7 @@ import static com.google.common.collect.MoreCollectors.onlyElement; import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; -import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; -import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson; +import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.loadNewTail; import static java.util.Objects.requireNonNull; public abstract class BaseTransactionsTable @@ -144,7 +141,7 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand TrinoFileSystem fileSystem = fileSystemFactory.create(session); PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata); try { - List transactions = loadNewTailBackward(fileSystem, tableLocation, startVersionExclusive, endVersionInclusive.get()).reversed(); + List transactions = loadNewTail(fileSystem, tableLocation, startVersionExclusive, endVersionInclusive, DataSize.ofBytes(0)).getTransactions(); return new FixedPageSource(buildPages(session, pagesBuilder, transactions, fileSystem)); } catch (TrinoException e) { @@ -155,39 +152,6 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand } } - // Load a section of the Transaction Log JSON entries. Optionally from a given end version (inclusive) through an start version (exclusive) - private static List loadNewTailBackward( - TrinoFileSystem fileSystem, - String tableLocation, - Optional startVersion, - long endVersion) - throws IOException - { - ImmutableList.Builder transactionsBuilder = ImmutableList.builder(); - String transactionLogDir = getTransactionLogDir(tableLocation); - - long version = endVersion; - long entryNumber = version; - boolean endOfHead = false; - - while (!endOfHead) { - Optional results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.of(0, DataSize.Unit.BYTE)); - if (results.isPresent()) { - transactionsBuilder.add(new Transaction(version, results.get())); - version = entryNumber; - entryNumber--; - } - else { - // When there is a gap in the transaction log version, indicate the end of the current head - endOfHead = true; - } - if ((startVersion.isPresent() && version == startVersion.get() + 1) || entryNumber < 0) { - endOfHead = true; - } - } - return transactionsBuilder.build(); - } - protected abstract List buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List transactions, TrinoFileSystem fileSystem) throws IOException; } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java index 6f63a9efa013..65ffb5dd4501 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/checkpoint/TransactionLogTail.java @@ -59,17 +59,34 @@ public static TransactionLogTail loadNewTail( DataSize transactionLogMaxCachedFileSize) throws IOException { - ImmutableList.Builder entriesBuilder = ImmutableList.builder(); - if (startVersion.isPresent() && endVersion.isPresent() && startVersion.get().equals(endVersion.get())) { // This is time travel to a specific checkpoint. No need to read transaction log files. - return new TransactionLogTail(entriesBuilder.build(), startVersion.get()); + return new TransactionLogTail(ImmutableList.of(), startVersion.get()); } - long version = startVersion.orElse(0L); - long entryNumber = startVersion.map(start -> start + 1).orElse(0L); - checkArgument(endVersion.isEmpty() || entryNumber <= endVersion.get(), "Invalid start/end versions: %s, %s", startVersion, endVersion); + if (endVersion.isPresent()) { + return loadNewTail(fileSystem, tableLocation, startVersion, endVersion.get(), transactionLogMaxCachedFileSize); + } + if (startVersion.isPresent()) { + return loadNewTail(fileSystem, tableLocation, startVersion.get(), startVersion.get() + 1, endVersion, transactionLogMaxCachedFileSize); + } + + return loadNewTail(fileSystem, tableLocation, 0L, 0L, endVersion, transactionLogMaxCachedFileSize); + } + + public static TransactionLogTail loadNewTail( + TrinoFileSystem fileSystem, + String tableLocation, + long version, + long startVersion, + Optional endVersion, + DataSize transactionLogMaxCachedFileSize) + throws IOException + { + ImmutableList.Builder entriesBuilder = ImmutableList.builder(); + long entryNumber = startVersion; + checkArgument(endVersion.isEmpty() || entryNumber <= endVersion.get(), "Invalid start/end versions: %s, %s", startVersion, endVersion); String transactionLogDir = getTransactionLogDir(tableLocation); boolean endOfTail = false; @@ -81,7 +98,7 @@ public static TransactionLogTail loadNewTail( entryNumber++; } else { - if (endVersion.isPresent()) { + if (endVersion.isPresent() && entryNumber > startVersion) { throw new MissingTransactionLogException(getTransactionLogJsonEntryPath(transactionLogDir, entryNumber).toString()); } endOfTail = true; @@ -95,6 +112,40 @@ public static TransactionLogTail loadNewTail( return new TransactionLogTail(entriesBuilder.build(), version); } + // Load a section of the Transaction Log JSON entries. Optionally from a given end version (inclusive) through an start version (exclusive) + private static TransactionLogTail loadNewTail( + TrinoFileSystem fileSystem, + String tableLocation, + Optional startVersion, + long endVersion, + DataSize transactionLogMaxCachedFileSize) + throws IOException + { + ImmutableList.Builder transactionsBuilder = ImmutableList.builder(); + String transactionLogDir = getTransactionLogDir(tableLocation); + + long version = endVersion; + long entryNumber = version; + boolean endOfHead = false; + + while (!endOfHead) { + Optional results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, transactionLogMaxCachedFileSize); + if (results.isPresent()) { + transactionsBuilder.add(new Transaction(entryNumber, results.get())); + version = entryNumber; + entryNumber--; + } + else { + // When there is a gap in the transaction log version, indicate the end of the current head + endOfHead = true; + } + if ((startVersion.isPresent() && version == startVersion.get() + 1) || entryNumber < 0) { + endOfHead = true; + } + } + return new TransactionLogTail(transactionsBuilder.build().reversed(), endVersion); + } + public Optional getUpdatedTail(TrinoFileSystem fileSystem, String tableLocation, Optional endVersion, DataSize transactionLogMaxCachedFileSize) throws IOException {