Skip to content

Commit

Permalink
Delay fetching metadata when reading Delta Lake system tables
Browse files Browse the repository at this point in the history
  • Loading branch information
alexjo2144 committed Aug 7, 2023
1 parent a315932 commit a1aaf0c
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@

import com.google.common.collect.ImmutableList;
import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.util.PageListBuilder;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
Expand All @@ -29,6 +32,7 @@
import io.trino.spi.type.TimeZoneKey;
import io.trino.spi.type.TypeManager;

import java.io.IOException;
import java.util.List;

import static com.google.common.collect.ImmutableList.toImmutableList;
Expand All @@ -43,15 +47,26 @@
public class DeltaLakeHistoryTable
implements SystemTable
{
private final ConnectorTableMetadata tableMetadata;
private final SchemaTableName tableName;
private final String tableLocation;
private final List<CommitInfoEntry> commitInfoEntries;
private final TransactionLogAccess transactionLogAccess;
private final ConnectorTableMetadata tableMetadata;

public DeltaLakeHistoryTable(SchemaTableName tableName, List<CommitInfoEntry> commitInfoEntries, TypeManager typeManager)
public DeltaLakeHistoryTable(
SchemaTableName tableName,
String tableLocation,
List<CommitInfoEntry> commitInfoEntries,
TransactionLogAccess transactionLogAccess,
TypeManager typeManager)
{
requireNonNull(typeManager, "typeManager is null");
this.tableName = requireNonNull(tableName, "tableName is null");
this.tableLocation = requireNonNull(tableLocation, "tableLocation is null");
this.commitInfoEntries = ImmutableList.copyOf(requireNonNull(commitInfoEntries, "commitInfoEntries is null")).stream()
.sorted(comparingLong(CommitInfoEntry::getVersion).reversed())
.collect(toImmutableList());
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");

tableMetadata = new ConnectorTableMetadata(
requireNonNull(tableName, "tableName is null"),
Expand Down Expand Up @@ -85,6 +100,16 @@ public ConnectorTableMetadata getTableMetadata()
@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
try {
// Verify the transaction log is readable
SchemaTableName baseTableName = new SchemaTableName(tableName.getSchemaName(), DeltaLakeTableName.tableNameFrom(tableName.getTableName()));
TableSnapshot tableSnapshot = transactionLogAccess.loadSnapshot(baseTableName, tableLocation, session);
transactionLogAccess.getMetadataEntry(tableSnapshot, session);
}
catch (IOException e) {
throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, "Unable to load table metadata from location: " + tableLocation, e);
}

if (commitInfoEntries.isEmpty()) {
return new FixedPageSource(ImmutableList.of());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3138,28 +3138,16 @@ private Optional<SystemTable> getRawSystemTable(ConnectorSession session, Schema
}

String tableLocation = table.get().location();
TableSnapshot tableSnapshot = getSnapshot(new SchemaTableName(systemTableName.getSchemaName(), tableName), tableLocation, session);
MetadataEntry metadataEntry;
try {
metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, session);
}
catch (TrinoException e) {
if (e.getErrorCode().equals(DELTA_LAKE_INVALID_SCHEMA.toErrorCode())) {
return Optional.empty();
}
throw e;
}

return switch (tableType.get()) {
case DATA -> throw new VerifyException("Unexpected DATA table type"); // Handled above.
case HISTORY -> Optional.of(new DeltaLakeHistoryTable(
systemTableName,
tableLocation,
getCommitInfoEntries(tableLocation, session),
transactionLogAccess,
typeManager));
case PROPERTIES -> Optional.of(new DeltaLakePropertiesTable(
systemTableName,
metadataEntry,
transactionLogAccess.getProtocolEntry(session, tableSnapshot)));
case PROPERTIES -> Optional.of(new DeltaLakePropertiesTable(systemTableName, tableLocation, transactionLogAccess));
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
import com.google.common.collect.ImmutableSet;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
import io.trino.plugin.deltalake.transactionlog.TableSnapshot;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.util.PageListBuilder;
import io.trino.spi.Page;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ColumnMetadata;
import io.trino.spi.connector.ConnectorPageSource;
import io.trino.spi.connector.ConnectorSession;
Expand All @@ -29,6 +32,7 @@
import io.trino.spi.connector.SystemTable;
import io.trino.spi.predicate.TupleDomain;

import java.io.IOException;
import java.util.List;

import static io.trino.spi.type.VarcharType.VARCHAR;
Expand All @@ -46,14 +50,16 @@ public class DeltaLakePropertiesTable
.add(new ColumnMetadata("value", VARCHAR))
.build();

private final SchemaTableName tableName;
private final String tableLocation;
private final TransactionLogAccess transactionLogAccess;
private final ConnectorTableMetadata tableMetadata;
private final MetadataEntry metadataEntry;
private final ProtocolEntry protocolEntry;

public DeltaLakePropertiesTable(SchemaTableName tableName, MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
public DeltaLakePropertiesTable(SchemaTableName tableName, String tableLocation, TransactionLogAccess transactionLogAccess)
{
this.metadataEntry = requireNonNull(metadataEntry, "metadataEntry is null");
this.protocolEntry = requireNonNull(protocolEntry, "protocolEntry is null");
this.tableName = requireNonNull(tableName, "tableName is null");
this.tableLocation = requireNonNull(tableLocation, "tableLocation is null");
this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null");
this.tableMetadata = new ConnectorTableMetadata(requireNonNull(tableName, "tableName is null"), COLUMNS);
}

Expand All @@ -72,10 +78,23 @@ public ConnectorTableMetadata getTableMetadata()
@Override
public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain<Integer> constraint)
{
return new FixedPageSource(buildPages());
MetadataEntry metadataEntry;
ProtocolEntry protocolEntry;

try {
SchemaTableName baseTableName = new SchemaTableName(tableName.getSchemaName(), DeltaLakeTableName.tableNameFrom(tableName.getTableName()));
TableSnapshot tableSnapshot = transactionLogAccess.loadSnapshot(baseTableName, tableLocation, session);
metadataEntry = transactionLogAccess.getMetadataEntry(tableSnapshot, session);
protocolEntry = transactionLogAccess.getProtocolEntry(session, tableSnapshot);
}
catch (IOException e) {
throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, "Unable to load table metadata from location: " + tableLocation, e);
}

return new FixedPageSource(buildPages(metadataEntry, protocolEntry));
}

private List<Page> buildPages()
private List<Page> buildPages(MetadataEntry metadataEntry, ProtocolEntry protocolEntry)
{
PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,8 @@ private void testCorruptedTableLocation(String tableName, Path tableLocation, bo

// Assert queries fail cleanly
assertQueryFails("TABLE " + tableName, "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("SELECT * FROM \"" + tableName + "$history\"", ".* Table '.*\\$history' does not exist");
assertQueryFails("SELECT * FROM \"" + tableName + "$properties\"", ".* Table '.*\\$properties' does not exist");
assertQueryFails("SELECT * FROM \"" + tableName + "$history\"", "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("SELECT * FROM \"" + tableName + "$properties\"", "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("SELECT * FROM " + tableName + " WHERE false", "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("SELECT 1 FROM " + tableName + " WHERE false", "Metadata not found in transaction log for tpch." + tableName);
assertQueryFails("SHOW CREATE TABLE " + tableName, "Metadata not found in transaction log for tpch." + tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,8 @@ public void testHistorySystemTable()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 22)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 22)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 22)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 44)
.addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 22)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 23)
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM))
.build());

assertFileSystemAccesses("SELECT * FROM \"test_history_system_table$history\" WHERE version = 3",
Expand All @@ -182,8 +182,8 @@ public void testHistorySystemTable()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 48)
.addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 25)
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM))
.build());

assertFileSystemAccesses("SELECT * FROM \"test_history_system_table$history\" WHERE version > 3",
Expand All @@ -193,8 +193,8 @@ public void testHistorySystemTable()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 48)
.addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 25)
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM))
.build());

assertFileSystemAccesses("SELECT * FROM \"test_history_system_table$history\" WHERE version >= 3 OR version = 1",
Expand All @@ -204,8 +204,8 @@ public void testHistorySystemTable()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 48)
.addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 25)
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM))
.build());

assertFileSystemAccesses("SELECT * FROM \"test_history_system_table$history\" WHERE version >= 1 AND version < 3",
Expand All @@ -215,8 +215,8 @@ public void testHistorySystemTable()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 48)
.addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 25)
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM))
.build());

assertFileSystemAccesses("SELECT * FROM \"test_history_system_table$history\" WHERE version > 1 AND version < 2",
Expand All @@ -226,8 +226,8 @@ public void testHistorySystemTable()
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 48)
.addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 24)
.addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 25)
.add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM))
.build());
}

Expand Down

0 comments on commit a1aaf0c

Please sign in to comment.