Skip to content

Commit

Permalink
Performance improvements for the Delta Lake history system table
Browse files Browse the repository at this point in the history
The $history table does not need to lead the entire transaction
log when a version filter is supplied.
  • Loading branch information
alexjo2144 authored and findepi committed Aug 8, 2023
1 parent 5f3b085 commit 9d2e3e3
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail;
import io.trino.plugin.deltalake.util.PageListBuilder;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
Expand All @@ -25,50 +29,56 @@
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorTableMetadata;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.EmptyPageSource;
import io.trino.spi.connector.FixedPageSource;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.connector.SystemTable;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.Range;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.TypeManager;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.IntStream;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.MoreCollectors.onlyElement;
import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA;
import static io.trino.spi.type.BigintType.BIGINT;
import static io.trino.spi.type.BooleanType.BOOLEAN;
import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS;
import static io.trino.spi.type.TypeSignature.mapType;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Comparator.comparingLong;
import static java.util.Objects.requireNonNull;

public class DeltaLakeHistoryTable
implements SystemTable
{
private final SchemaTableName tableName;
private final String tableLocation;
private final List<CommitInfoEntry> commitInfoEntries;
private final TrinoFileSystemFactory fileSystemFactory;
private final TransactionLogAccess transactionLogAccess;
private final ConnectorTableMetadata tableMetadata;

public DeltaLakeHistoryTable(
SchemaTableName tableName,
String tableLocation,
List<CommitInfoEntry> commitInfoEntries,
TrinoFileSystemFactory fileSystemFactory,
TransactionLogAccess transactionLogAccess,
TypeManager typeManager)
{
requireNonNull(typeManager, "typeManager is null");
this.tableName = requireNonNull(tableName, "tableName is null");
this.tableLocation = requireNonNull(tableLocation, "tableLocation is null");
this.commitInfoEntries = ImmutableList.copyOf(requireNonNull(commitInfoEntries, "commitInfoEntries is null")).stream()
.sorted(comparingLong(CommitInfoEntry::getVersion).reversed())
.collect(toImmutableList());
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");

tableMetadata = new ConnectorTableMetadata(
this.tableMetadata = new ConnectorTableMetadata(
requireNonNull(tableName, "tableName is null"),
ImmutableList.<ColumnMetadata>builder()
.add(new ColumnMetadata("version", BIGINT))
Expand Down Expand Up @@ -110,13 +120,59 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand
throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, "Unable to load table metadata from location: " + tableLocation, e);
}

if (commitInfoEntries.isEmpty()) {
return new FixedPageSource(ImmutableList.of());
int versionColumnIndex = IntStream.range(0, tableMetadata.getColumns().size())
.filter(i -> tableMetadata.getColumns().get(i).getName().equals("version"))
.boxed()
.collect(onlyElement());

Optional<Long> startVersionExclusive = Optional.empty();
Optional<Long> endVersionInclusive = Optional.empty();

if (constraint.getDomains().isPresent()) {
Map<Integer, Domain> domains = constraint.getDomains().get();
if (domains.containsKey(versionColumnIndex)) {
Domain versionDomain = domains.get(versionColumnIndex); // The zero value here relies on the column ordering defined in the constructor
Range range = versionDomain.getValues().getRanges().getSpan();
if (range.isSingleValue()) {
long value = (long) range.getSingleValue();
startVersionExclusive = Optional.of(value - 1);
endVersionInclusive = Optional.of(value);
}
else {
Optional<Long> lowValue = range.getLowValue().map(Long.class::cast);
if (lowValue.isPresent()) {
startVersionExclusive = Optional.of(lowValue.get() - (range.isLowInclusive() ? 1 : 0));
}

Optional<Long> highValue = range.getHighValue().map(Long.class::cast);
if (highValue.isPresent()) {
endVersionInclusive = Optional.of(highValue.get() - (range.isHighInclusive() ? 0 : 1));
}
}
}
}

if (startVersionExclusive.isPresent() && endVersionInclusive.isPresent() && startVersionExclusive.get() >= endVersionInclusive.get()) {
return new EmptyPageSource();
}

TrinoFileSystem fileSystem = fileSystemFactory.create(session);
try {
List<CommitInfoEntry> commitInfoEntries = TransactionLogTail.loadNewTail(fileSystem, tableLocation, startVersionExclusive, endVersionInclusive).getFileEntries().stream()
.map(DeltaLakeTransactionLogEntry::getCommitInfo)
.filter(Objects::nonNull)
.collect(toImmutableList());
return new FixedPageSource(buildPages(session, commitInfoEntries));
}
catch (TrinoException e) {
throw e;
}
catch (IOException | RuntimeException e) {
throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Error getting commit info entries from " + tableLocation, e);
}
return new FixedPageSource(buildPages(session));
}

private List<Page> buildPages(ConnectorSession session)
private List<Page> buildPages(ConnectorSession session, List<CommitInfoEntry> commitInfoEntries)
{
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);
TimeZoneKey timeZoneKey = session.getTimeZoneKey();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3125,11 +3125,10 @@ private static boolean isFileCreatedByQuery(Location file, String queryId)
@Override
public Optional<SystemTable> getSystemTable(ConnectorSession session, SchemaTableName tableName)
{
return getRawSystemTable(session, tableName)
.map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader()));
return getRawSystemTable(tableName).map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader()));
}

private Optional<SystemTable> getRawSystemTable(ConnectorSession session, SchemaTableName systemTableName)
private Optional<SystemTable> getRawSystemTable(SchemaTableName systemTableName)
{
Optional<DeltaLakeTableType> tableType = DeltaLakeTableName.tableTypeFrom(systemTableName.getTableName());
if (tableType.isEmpty() || tableType.get() == DeltaLakeTableType.DATA) {
Expand All @@ -3155,7 +3154,7 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
case HISTORY -> Optional.of(new DeltaLakeHistoryTable(
systemTableName,
tableLocation,
getCommitInfoEntries(tableLocation, session),
fileSystemFactory,
transactionLogAccess,
typeManager));
case PROPERTIES -> Optional.of(new DeltaLakePropertiesTable(systemTableName, tableLocation, transactionLogAccess));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1949,6 +1949,26 @@ public void testOptimizeUsingForcedPartitioning()
assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(union(initialFiles, updatedFiles));
}

@Test
public void testHistoryTable()
{
String tableName = "test_history_table_" + randomNameSuffix();
try (TestTable table = new TestTable(getQueryRunner()::execute, tableName, "(int_col INTEGER)")) {
assertUpdate("INSERT INTO " + table.getName() + " VALUES 1, 2, 3", 3);
assertUpdate("INSERT INTO " + table.getName() + " VALUES 4, 5, 6", 3);
assertUpdate("DELETE FROM " + table.getName() + " WHERE int_col = 1", 1);
assertUpdate("UPDATE " + table.getName() + " SET int_col = int_col * 2 WHERE int_col = 6", 1);

assertQuery("SELECT version, operation FROM \"" + table.getName() + "$history\"",
"VALUES (0, 'CREATE TABLE'), (1, 'WRITE'), (2, 'WRITE'), (3, 'MERGE'), (4, 'MERGE')");
assertQuery("SELECT version, operation FROM \"" + table.getName() + "$history\" WHERE version = 3", "VALUES (3, 'MERGE')");
assertQuery("SELECT version, operation FROM \"" + table.getName() + "$history\" WHERE version > 3", "VALUES (4, 'MERGE')");
assertQuery("SELECT version, operation FROM \"" + table.getName() + "$history\" WHERE version >= 3 OR version = 1", "VALUES (1, 'WRITE'), (3, 'MERGE'), (4, 'MERGE')");
assertQuery("SELECT version, operation FROM \"" + table.getName() + "$history\" WHERE version >= 1 AND version < 3", "VALUES (1, 'WRITE'), (2, 'WRITE')");
assertThat(query("SELECT version, operation FROM \"" + table.getName() + "$history\" WHERE version > 1 AND version < 2")).returnsEmptyResult();
}
}

/**
* @see BaseDeltaLakeRegisterTableProcedureTest for more detailed tests
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,67 +166,50 @@ public void testHistorySystemTable()

assertFileSystemAccesses("SELECT * FROM \"test_history_system_table$history\"",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 22)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 22)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 22)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 22)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 22)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 23)
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM))
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 2)
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM))
.build());

assertFileSystemAccesses("SELECT * FROM \"test_history_system_table$history\" WHERE version = 3",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 25)
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM))
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM))
.build());

assertFileSystemAccesses("SELECT * FROM \"test_history_system_table$history\" WHERE version > 3",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 25)
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM))
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 2)
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM))
.build());

assertFileSystemAccesses("SELECT * FROM \"test_history_system_table$history\" WHERE version >= 3 OR version = 1",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 25)
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM))
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 2)
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM))
.build());

assertFileSystemAccesses("SELECT * FROM \"test_history_system_table$history\" WHERE version >= 1 AND version < 3",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 25)
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM))
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM))
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM))
.build());

assertFileSystemAccesses("SELECT * FROM \"test_history_system_table$history\" WHERE version > 1 AND version < 2",
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 25)
.add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM))
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM))
.build());
}
Expand Down

0 comments on commit 9d2e3e3

Please sign in to comment.