diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java index 8dee777492d1..e5f2b529e95b 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java @@ -14,9 +14,13 @@ package io.trino.plugin.deltalake; import com.google.common.collect.ImmutableList; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; +import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.TableSnapshot; import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess; +import io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail; import io.trino.plugin.deltalake.util.PageListBuilder; import io.trino.spi.Page; import io.trino.spi.TrinoException; @@ -25,23 +29,31 @@ import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.ConnectorTransactionHandle; +import io.trino.spi.connector.EmptyPageSource; import io.trino.spi.connector.FixedPageSource; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SystemTable; +import io.trino.spi.predicate.Domain; +import io.trino.spi.predicate.Range; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.TimeZoneKey; import io.trino.spi.type.TypeManager; import java.io.IOException; import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.stream.IntStream; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.MoreCollectors.onlyElement; +import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; import static io.trino.spi.type.TypeSignature.mapType; import static io.trino.spi.type.VarcharType.VARCHAR; -import static java.util.Comparator.comparingLong; import static java.util.Objects.requireNonNull; public class DeltaLakeHistoryTable @@ -49,26 +61,24 @@ public class DeltaLakeHistoryTable { private final SchemaTableName tableName; private final String tableLocation; - private final List commitInfoEntries; + private final TrinoFileSystemFactory fileSystemFactory; private final TransactionLogAccess transactionLogAccess; private final ConnectorTableMetadata tableMetadata; public DeltaLakeHistoryTable( SchemaTableName tableName, String tableLocation, - List commitInfoEntries, + TrinoFileSystemFactory fileSystemFactory, TransactionLogAccess transactionLogAccess, TypeManager typeManager) { requireNonNull(typeManager, "typeManager is null"); this.tableName = requireNonNull(tableName, "tableName is null"); this.tableLocation = requireNonNull(tableLocation, "tableLocation is null"); - this.commitInfoEntries = ImmutableList.copyOf(requireNonNull(commitInfoEntries, "commitInfoEntries is null")).stream() - .sorted(comparingLong(CommitInfoEntry::getVersion).reversed()) - .collect(toImmutableList()); + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.transactionLogAccess = requireNonNull(transactionLogAccess, "transactionLogAccess is null"); - tableMetadata = new ConnectorTableMetadata( + this.tableMetadata = new ConnectorTableMetadata( requireNonNull(tableName, "tableName is null"), ImmutableList.builder() .add(new ColumnMetadata("version", BIGINT)) @@ -110,13 +120,59 @@ public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHand throw new TrinoException(DeltaLakeErrorCode.DELTA_LAKE_INVALID_SCHEMA, "Unable to load table metadata from location: " + tableLocation, e); } - if (commitInfoEntries.isEmpty()) { - return new FixedPageSource(ImmutableList.of()); + int versionColumnIndex = IntStream.range(0, tableMetadata.getColumns().size()) + .filter(i -> tableMetadata.getColumns().get(i).getName().equals("version")) + .boxed() + .collect(onlyElement()); + + Optional startVersionExclusive = Optional.empty(); + Optional endVersionInclusive = Optional.empty(); + + if (constraint.getDomains().isPresent()) { + Map domains = constraint.getDomains().get(); + if (domains.containsKey(versionColumnIndex)) { + Domain versionDomain = domains.get(versionColumnIndex); // The zero value here relies on the column ordering defined in the constructor + Range range = versionDomain.getValues().getRanges().getSpan(); + if (range.isSingleValue()) { + long value = (long) range.getSingleValue(); + startVersionExclusive = Optional.of(value - 1); + endVersionInclusive = Optional.of(value); + } + else { + Optional lowValue = range.getLowValue().map(Long.class::cast); + if (lowValue.isPresent()) { + startVersionExclusive = Optional.of(lowValue.get() - (range.isLowInclusive() ? 1 : 0)); + } + + Optional highValue = range.getHighValue().map(Long.class::cast); + if (highValue.isPresent()) { + endVersionInclusive = Optional.of(highValue.get() - (range.isHighInclusive() ? 0 : 1)); + } + } + } + } + + if (startVersionExclusive.isPresent() && endVersionInclusive.isPresent() && startVersionExclusive.get() >= endVersionInclusive.get()) { + return new EmptyPageSource(); + } + + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + try { + List commitInfoEntries = TransactionLogTail.loadNewTail(fileSystem, tableLocation, startVersionExclusive, endVersionInclusive).getFileEntries().stream() + .map(DeltaLakeTransactionLogEntry::getCommitInfo) + .filter(Objects::nonNull) + .collect(toImmutableList()); + return new FixedPageSource(buildPages(session, commitInfoEntries)); + } + catch (TrinoException e) { + throw e; + } + catch (IOException | RuntimeException e) { + throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Error getting commit info entries from " + tableLocation, e); } - return new FixedPageSource(buildPages(session)); } - private List buildPages(ConnectorSession session) + private List buildPages(ConnectorSession session, List commitInfoEntries) { PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata); TimeZoneKey timeZoneKey = session.getTimeZoneKey(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 856dfeca5cfd..b8cddd681908 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -3125,11 +3125,10 @@ private static boolean isFileCreatedByQuery(Location file, String queryId) @Override public Optional getSystemTable(ConnectorSession session, SchemaTableName tableName) { - return getRawSystemTable(session, tableName) - .map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader())); + return getRawSystemTable(tableName).map(systemTable -> new ClassLoaderSafeSystemTable(systemTable, getClass().getClassLoader())); } - private Optional getRawSystemTable(ConnectorSession session, SchemaTableName systemTableName) + private Optional getRawSystemTable(SchemaTableName systemTableName) { Optional tableType = DeltaLakeTableName.tableTypeFrom(systemTableName.getTableName()); if (tableType.isEmpty() || tableType.get() == DeltaLakeTableType.DATA) { @@ -3155,7 +3154,7 @@ private Optional getRawSystemTable(ConnectorSession session, Schema case HISTORY -> Optional.of(new DeltaLakeHistoryTable( systemTableName, tableLocation, - getCommitInfoEntries(tableLocation, session), + fileSystemFactory, transactionLogAccess, typeManager)); case PROPERTIES -> Optional.of(new DeltaLakePropertiesTable(systemTableName, tableLocation, transactionLogAccess)); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index 18974d218837..d6178f20ab3f 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -1949,6 +1949,26 @@ public void testOptimizeUsingForcedPartitioning() assertThat(getAllDataFilesFromTableDirectory(tableName)).isEqualTo(union(initialFiles, updatedFiles)); } + @Test + public void testHistoryTable() + { + String tableName = "test_history_table_" + randomNameSuffix(); + try (TestTable table = new TestTable(getQueryRunner()::execute, tableName, "(int_col INTEGER)")) { + assertUpdate("INSERT INTO " + table.getName() + " VALUES 1, 2, 3", 3); + assertUpdate("INSERT INTO " + table.getName() + " VALUES 4, 5, 6", 3); + assertUpdate("DELETE FROM " + table.getName() + " WHERE int_col = 1", 1); + assertUpdate("UPDATE " + table.getName() + " SET int_col = int_col * 2 WHERE int_col = 6", 1); + + assertQuery("SELECT version, operation FROM \"" + table.getName() + "$history\"", + "VALUES (0, 'CREATE TABLE'), (1, 'WRITE'), (2, 'WRITE'), (3, 'MERGE'), (4, 'MERGE')"); + assertQuery("SELECT version, operation FROM \"" + table.getName() + "$history\" WHERE version = 3", "VALUES (3, 'MERGE')"); + assertQuery("SELECT version, operation FROM \"" + table.getName() + "$history\" WHERE version > 3", "VALUES (4, 'MERGE')"); + assertQuery("SELECT version, operation FROM \"" + table.getName() + "$history\" WHERE version >= 3 OR version = 1", "VALUES (1, 'WRITE'), (3, 'MERGE'), (4, 'MERGE')"); + assertQuery("SELECT version, operation FROM \"" + table.getName() + "$history\" WHERE version >= 1 AND version < 3", "VALUES (1, 'WRITE'), (2, 'WRITE')"); + assertThat(query("SELECT version, operation FROM \"" + table.getName() + "$history\" WHERE version > 1 AND version < 2")).returnsEmptyResult(); + } + } + /** * @see BaseDeltaLakeRegisterTableProcedureTest for more detailed tests */ diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java index 65376c4a2f82..dc36a6c60203 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeFileOperations.java @@ -166,67 +166,50 @@ public void testHistorySystemTable() assertFileSystemAccesses("SELECT * FROM \"test_history_system_table$history\"", ImmutableMultiset.builder() - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 22) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 22) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 22) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 22) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 22) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 23) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM)) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 2) .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM)) .build()); assertFileSystemAccesses("SELECT * FROM \"test_history_system_table$history\" WHERE version = 3", ImmutableMultiset.builder() - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 25) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM)) .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM)) .build()); assertFileSystemAccesses("SELECT * FROM \"test_history_system_table$history\" WHERE version > 3", ImmutableMultiset.builder() - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 25) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM)) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 2) .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM)) .build()); assertFileSystemAccesses("SELECT * FROM \"test_history_system_table$history\" WHERE version >= 3 OR version = 1", ImmutableMultiset.builder() - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 25) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM)) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 2) .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM)) .build()); assertFileSystemAccesses("SELECT * FROM \"test_history_system_table$history\" WHERE version >= 1 AND version < 3", ImmutableMultiset.builder() - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 25) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM)) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM)) .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM)) .build()); assertFileSystemAccesses("SELECT * FROM \"test_history_system_table$history\" WHERE version > 1 AND version < 2", ImmutableMultiset.builder() - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 24) - .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 25) + .add(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM)) .add(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM)) .build()); }