Skip to content

Refactor TransactionLogTail#loadNewTail method #25856

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,12 @@
*/
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import io.airlift.units.DataSize;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.Transaction;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.TransactionLogEntries;
import io.trino.plugin.deltalake.util.PageListBuilder;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
Expand All @@ -45,8 +43,7 @@

import static com.google.common.collect.MoreCollectors.onlyElement;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson;
import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.loadNewTail;
import static java.util.Objects.requireNonNull;

public abstract class BaseTransactionsTable
Expand Down Expand Up @@ -144,7 +141,7 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand
TrinoFileSystem fileSystem = fileSystemFactory.create(session);
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);
try {
List<Transaction> transactions = loadNewTailBackward(fileSystem, tableLocation, startVersionExclusive, endVersionInclusive.get()).reversed();
List<Transaction> transactions = loadNewTail(fileSystem, tableLocation, startVersionExclusive, endVersionInclusive, DataSize.ofBytes(0)).getTransactions();
return new FixedPageSource(buildPages(session, pagesBuilder, transactions, fileSystem));
}
catch (TrinoException e) {
Expand All @@ -155,39 +152,6 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand
}
}

// Load a section of the Transaction Log JSON entries. Optionally from a given end version (inclusive) through an start version (exclusive)
private static List<Transaction> loadNewTailBackward(
TrinoFileSystem fileSystem,
String tableLocation,
Optional<Long> startVersion,
long endVersion)
throws IOException
{
ImmutableList.Builder<Transaction> transactionsBuilder = ImmutableList.builder();
String transactionLogDir = getTransactionLogDir(tableLocation);

long version = endVersion;
long entryNumber = version;
boolean endOfHead = false;

while (!endOfHead) {
Optional<TransactionLogEntries> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, DataSize.of(0, DataSize.Unit.BYTE));
if (results.isPresent()) {
transactionsBuilder.add(new Transaction(version, results.get()));
version = entryNumber;
entryNumber--;
}
else {
// When there is a gap in the transaction log version, indicate the end of the current head
endOfHead = true;
}
if ((startVersion.isPresent() && version == startVersion.get() + 1) || entryNumber < 0) {
endOfHead = true;
}
}
return transactionsBuilder.build();
}

protected abstract List<Page> buildPages(ConnectorSession session, PageListBuilder pagesBuilder, List<Transaction> transactions, TrinoFileSystem fileSystem)
throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,34 @@ public static TransactionLogTail loadNewTail(
DataSize transactionLogMaxCachedFileSize)
throws IOException
{
ImmutableList.Builder<Transaction> entriesBuilder = ImmutableList.builder();

if (startVersion.isPresent() && endVersion.isPresent() && startVersion.get().equals(endVersion.get())) {
// This is time travel to a specific checkpoint. No need to read transaction log files.
return new TransactionLogTail(entriesBuilder.build(), startVersion.get());
return new TransactionLogTail(ImmutableList.of(), startVersion.get());
}

long version = startVersion.orElse(0L);
long entryNumber = startVersion.map(start -> start + 1).orElse(0L);
checkArgument(endVersion.isEmpty() || entryNumber <= endVersion.get(), "Invalid start/end versions: %s, %s", startVersion, endVersion);
if (endVersion.isPresent()) {
return loadNewTail(fileSystem, tableLocation, startVersion, endVersion.get(), transactionLogMaxCachedFileSize);
}

if (startVersion.isPresent()) {
return loadNewTail(fileSystem, tableLocation, startVersion.get(), startVersion.get() + 1, endVersion, transactionLogMaxCachedFileSize);
}

return loadNewTail(fileSystem, tableLocation, 0L, 0L, endVersion, transactionLogMaxCachedFileSize);
}

public static TransactionLogTail loadNewTail(
TrinoFileSystem fileSystem,
String tableLocation,
long version,
long startVersion,
Optional<Long> endVersion,
DataSize transactionLogMaxCachedFileSize)
throws IOException
{
ImmutableList.Builder<Transaction> entriesBuilder = ImmutableList.builder();
long entryNumber = startVersion;
checkArgument(endVersion.isEmpty() || entryNumber <= endVersion.get(), "Invalid start/end versions: %s, %s", startVersion, endVersion);
String transactionLogDir = getTransactionLogDir(tableLocation);

boolean endOfTail = false;
Expand All @@ -81,7 +98,7 @@ public static TransactionLogTail loadNewTail(
entryNumber++;
}
else {
if (endVersion.isPresent()) {
if (endVersion.isPresent() && entryNumber > startVersion) {
throw new MissingTransactionLogException(getTransactionLogJsonEntryPath(transactionLogDir, entryNumber).toString());
}
endOfTail = true;
Expand All @@ -95,6 +112,40 @@ public static TransactionLogTail loadNewTail(
return new TransactionLogTail(entriesBuilder.build(), version);
}

// Load a section of the Transaction Log JSON entries. Optionally from a given end version (inclusive) through an start version (exclusive)
private static TransactionLogTail loadNewTail(
TrinoFileSystem fileSystem,
String tableLocation,
Optional<Long> startVersion,
long endVersion,
DataSize transactionLogMaxCachedFileSize)
throws IOException
{
ImmutableList.Builder<Transaction> transactionsBuilder = ImmutableList.builder();
String transactionLogDir = getTransactionLogDir(tableLocation);

long version = endVersion;
long entryNumber = version;
boolean endOfHead = false;

while (!endOfHead) {
Optional<TransactionLogEntries> results = getEntriesFromJson(entryNumber, transactionLogDir, fileSystem, transactionLogMaxCachedFileSize);
if (results.isPresent()) {
transactionsBuilder.add(new Transaction(entryNumber, results.get()));
version = entryNumber;
entryNumber--;
}
else {
// When there is a gap in the transaction log version, indicate the end of the current head
endOfHead = true;
}
if ((startVersion.isPresent() && version == startVersion.get() + 1) || entryNumber < 0) {
endOfHead = true;
}
}
return new TransactionLogTail(transactionsBuilder.build().reversed(), endVersion);
}

public Optional<TransactionLogTail> getUpdatedTail(TrinoFileSystem fileSystem, String tableLocation, Optional<Long> endVersion, DataSize transactionLogMaxCachedFileSize)
throws IOException
{
Expand Down