From 33fb62fec0ae0ffc104545d4918c248c70cfae3d Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Mon, 20 Feb 2023 16:11:29 +0100 Subject: [PATCH 1/4] Use cursor-based implementation for the Delta `$history` metadata table --- .../deltalake/DeltaLakeHistoryTable.java | 88 ++++++++++--------- 1 file changed, 46 insertions(+), 42 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java index fa68cfec7717..a408038adc2f 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java @@ -14,26 +14,30 @@ package io.trino.plugin.deltalake; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; -import io.trino.plugin.deltalake.util.PageListBuilder; -import io.trino.spi.Page; +import io.trino.spi.block.BlockBuilder; import io.trino.spi.connector.ColumnMetadata; -import io.trino.spi.connector.ConnectorPageSource; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorTableMetadata; import io.trino.spi.connector.ConnectorTransactionHandle; -import io.trino.spi.connector.FixedPageSource; +import io.trino.spi.connector.InMemoryRecordSet; +import io.trino.spi.connector.RecordCursor; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SystemTable; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.TimeZoneKey; +import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; +import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone; import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; import static io.trino.spi.type.TypeSignature.mapType; import static io.trino.spi.type.VarcharType.VARCHAR; @@ -44,7 +48,9 @@ public class DeltaLakeHistoryTable implements SystemTable { private final ConnectorTableMetadata tableMetadata; - private final List commitInfoEntries; + private final List types; + private final Iterable commitInfoEntries; + private final Type varcharToVarcharMapType; public DeltaLakeHistoryTable(SchemaTableName tableName, List commitInfoEntries, TypeManager typeManager) { @@ -68,6 +74,10 @@ public DeltaLakeHistoryTable(SchemaTableName tableName, List co .add(new ColumnMetadata("is_blind_append", BOOLEAN)) //TODO add support for operationMetrics, userMetadata, engineInfo .build()); + types = tableMetadata.getColumns().stream() + .map(ColumnMetadata::getType) + .collect(toImmutableList()); + varcharToVarcharMapType = typeManager.getType(mapType(VARCHAR.getTypeSignature(), VARCHAR.getTypeSignature())); } @Override @@ -83,51 +93,45 @@ public ConnectorTableMetadata getTableMetadata() } @Override - public ConnectorPageSource pageSource(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) + public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) { - if (commitInfoEntries.isEmpty()) { - return new FixedPageSource(ImmutableList.of()); - } - return new FixedPageSource(buildPages(session)); - } - - private List buildPages(ConnectorSession session) - { - PageListBuilder pagesBuilder = PageListBuilder.forTable(tableMetadata); TimeZoneKey timeZoneKey = session.getTimeZoneKey(); + Iterable> records = Iterables.transform(commitInfoEntries, commitInfoEntry -> getRecord(commitInfoEntry, timeZoneKey)); - commitInfoEntries.forEach(commitInfoEntry -> { - pagesBuilder.beginRow(); - - pagesBuilder.appendBigint(commitInfoEntry.getVersion()); - pagesBuilder.appendTimestampTzMillis(commitInfoEntry.getTimestamp(), timeZoneKey); - write(commitInfoEntry.getUserId(), pagesBuilder); - write(commitInfoEntry.getUserName(), pagesBuilder); - write(commitInfoEntry.getOperation(), pagesBuilder); - if (commitInfoEntry.getOperationParameters() == null) { - pagesBuilder.appendNull(); - } - else { - pagesBuilder.appendVarcharVarcharMap(commitInfoEntry.getOperationParameters()); - } - write(commitInfoEntry.getClusterId(), pagesBuilder); - pagesBuilder.appendBigint(commitInfoEntry.getReadVersion()); - write(commitInfoEntry.getIsolationLevel(), pagesBuilder); - commitInfoEntry.isBlindAppend().ifPresentOrElse(pagesBuilder::appendBoolean, pagesBuilder::appendNull); - - pagesBuilder.endRow(); - }); - - return pagesBuilder.build(); + return new InMemoryRecordSet(types, records).cursor(); } - private static void write(String value, PageListBuilder pagesBuilder) + private List getRecord(CommitInfoEntry commitInfoEntry, TimeZoneKey timeZoneKey) { - if (value == null) { - pagesBuilder.appendNull(); + List columns = new ArrayList<>(); + columns.add(commitInfoEntry.getVersion()); + columns.add(packDateTimeWithZone(commitInfoEntry.getTimestamp(), timeZoneKey)); + columns.add(commitInfoEntry.getUserId()); + columns.add(commitInfoEntry.getUserName()); + columns.add(commitInfoEntry.getOperation()); + if (commitInfoEntry.getOperationParameters() == null) { + columns.add(null); } else { - pagesBuilder.appendVarchar(value); + columns.add(toVarcharVarcharMapBlock(commitInfoEntry.getOperationParameters())); } + columns.add(commitInfoEntry.getClusterId()); + columns.add(commitInfoEntry.getReadVersion()); + columns.add(commitInfoEntry.getIsolationLevel()); + columns.add(commitInfoEntry.isBlindAppend().orElse(null)); + + return columns; + } + + private Object toVarcharVarcharMapBlock(Map values) + { + BlockBuilder blockBuilder = varcharToVarcharMapType.createBlockBuilder(null, 1); + BlockBuilder singleMapBlockBuilder = blockBuilder.beginBlockEntry(); + values.forEach((key, value) -> { + VARCHAR.writeString(singleMapBlockBuilder, key); + VARCHAR.writeString(singleMapBlockBuilder, value); + }); + blockBuilder.closeEntry(); + return varcharToVarcharMapType.getObject(blockBuilder, 0); } } From 6d63ff6330ab62c5f3b45eaa8bb49de9ea3c8e55 Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Tue, 21 Feb 2023 13:32:25 +0100 Subject: [PATCH 2/4] Avoid materializing at once all records in Delta `$history` metadata table --- .../deltalake/DeltaLakeHistoryTable.java | 47 +++++-- .../plugin/deltalake/DeltaLakeMetadata.java | 26 +--- .../DeltaLakeTransactionLogIterator.java | 115 ++++++++++++++++++ .../deltalake/TestDeltaLakeSystemTables.java | 41 ++++++- .../TestDeltaLakeTransactionLogIterator.java | 110 +++++++++++++++++ .../_delta_log/00000000000000000003.json | 2 + .../00000000000000000004.checkpoint.parquet | Bin 0 -> 15850 bytes .../_delta_log/00000000000000000004.json | 3 + .../_delta_log/00000000000000000005.json | 2 + .../_delta_log/_last_checkpoint | 1 + ...4616-937f-14a6559b3646-c000.snappy.parquet | Bin 0 -> 577 bytes 11 files changed, 311 insertions(+), 36 deletions(-) create mode 100644 plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeTransactionLogIterator.java create mode 100644 plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeTransactionLogIterator.java create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/00000000000000000003.json create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/00000000000000000004.checkpoint.parquet create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/00000000000000000004.json create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/00000000000000000005.json create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/_last_checkpoint create mode 100644 plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/part-00000-765d5da0-2c61-4616-937f-14a6559b3646-c000.snappy.parquet diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java index a408038adc2f..ea48ee0f0775 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java @@ -14,8 +14,12 @@ package io.trino.plugin.deltalake; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.plugin.deltalake.metastore.DeltaLakeMetastore; import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; +import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; +import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogIterator; import io.trino.spi.block.BlockBuilder; import io.trino.spi.connector.ColumnMetadata; import io.trino.spi.connector.ConnectorSession; @@ -29,38 +33,52 @@ import io.trino.spi.type.TimeZoneKey; import io.trino.spi.type.Type; import io.trino.spi.type.TypeManager; +import org.apache.hadoop.fs.Path; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Streams.stream; import static io.trino.spi.type.BigintType.BIGINT; import static io.trino.spi.type.BooleanType.BOOLEAN; import static io.trino.spi.type.DateTimeEncoding.packDateTimeWithZone; import static io.trino.spi.type.TimestampWithTimeZoneType.TIMESTAMP_TZ_MILLIS; import static io.trino.spi.type.TypeSignature.mapType; import static io.trino.spi.type.VarcharType.VARCHAR; -import static java.util.Comparator.comparingLong; import static java.util.Objects.requireNonNull; public class DeltaLakeHistoryTable implements SystemTable { + private final SchemaTableName tableName; private final ConnectorTableMetadata tableMetadata; private final List types; - private final Iterable commitInfoEntries; + private final TrinoFileSystemFactory fileSystemFactory; + private final DeltaLakeMetastore metastore; + private final Type varcharToVarcharMapType; - public DeltaLakeHistoryTable(SchemaTableName tableName, List commitInfoEntries, TypeManager typeManager) + public DeltaLakeHistoryTable( + SchemaTableName tableName, + TrinoFileSystemFactory fileSystemFactory, + DeltaLakeMetastore metastore, + TypeManager typeManager) { + this.tableName = requireNonNull(tableName, "tableName is null"); requireNonNull(typeManager, "typeManager is null"); - this.commitInfoEntries = ImmutableList.copyOf(requireNonNull(commitInfoEntries, "commitInfoEntries is null")).stream() - .sorted(comparingLong(CommitInfoEntry::getVersion).reversed()) - .collect(toImmutableList()); + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); + this.metastore = requireNonNull(metastore, "metastore is null"); + SchemaTableName systemTableName = new SchemaTableName( + tableName.getSchemaName(), + new DeltaLakeTableName(tableName.getTableName(), DeltaLakeTableType.HISTORY).getTableNameWithType()); tableMetadata = new ConnectorTableMetadata( - requireNonNull(tableName, "tableName is null"), + systemTableName, ImmutableList.builder() .add(new ColumnMetadata("version", BIGINT)) .add(new ColumnMetadata("timestamp", TIMESTAMP_TZ_MILLIS)) @@ -96,7 +114,18 @@ public ConnectorTableMetadata getTableMetadata() public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) { TimeZoneKey timeZoneKey = session.getTimeZoneKey(); - Iterable> records = Iterables.transform(commitInfoEntries, commitInfoEntry -> getRecord(commitInfoEntry, timeZoneKey)); + TrinoFileSystem fileSystem = fileSystemFactory.create(session); + Iterable> records = () -> + stream(new DeltaLakeTransactionLogIterator( + fileSystem, + new Path(metastore.getTableLocation(tableName, session)), + Optional.empty(), + Optional.empty())) + .flatMap(Collection::stream) + .map(DeltaLakeTransactionLogEntry::getCommitInfo) + .filter(Objects::nonNull) + .map(commitInfoEntry -> getRecord(commitInfoEntry, timeZoneKey)) + .iterator(); return new InMemoryRecordSet(types, records).cursor(); } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index 07fcdfc81aff..1018b087e0a1 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -43,14 +43,12 @@ import io.trino.plugin.deltalake.transactionlog.AddFileEntry; import io.trino.plugin.deltalake.transactionlog.CdfFileEntry; import io.trino.plugin.deltalake.transactionlog.CommitInfoEntry; -import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.plugin.deltalake.transactionlog.MetadataEntry.Format; import io.trino.plugin.deltalake.transactionlog.ProtocolEntry; import io.trino.plugin.deltalake.transactionlog.RemoveFileEntry; import io.trino.plugin.deltalake.transactionlog.TableSnapshot; import io.trino.plugin.deltalake.transactionlog.checkpoint.CheckpointWriterManager; -import io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail; import io.trino.plugin.deltalake.transactionlog.statistics.DeltaLakeFileStatistics; import io.trino.plugin.deltalake.transactionlog.writer.TransactionConflictException; import io.trino.plugin.deltalake.transactionlog.writer.TransactionLogWriter; @@ -140,7 +138,6 @@ import java.util.Locale; import java.util.Map; import java.util.Map.Entry; -import java.util.Objects; import java.util.Optional; import java.util.OptionalInt; import java.util.OptionalLong; @@ -2480,12 +2477,12 @@ private Optional getRawSystemTable(ConnectorSession session, Schema return Optional.empty(); } DeltaLakeTableName deltaLakeTableName = new DeltaLakeTableName(name, tableType.get()); - SchemaTableName systemTableName = new SchemaTableName(tableName.getSchemaName(), deltaLakeTableName.getTableNameWithType()); return switch (deltaLakeTableName.getTableType()) { case DATA -> Optional.empty(); // Handled above case HISTORY -> Optional.of(new DeltaLakeHistoryTable( - systemTableName, - getCommitInfoEntries(tableHandle.getSchemaTableName(), session), + tableHandle.getSchemaTableName(), + fileSystemFactory, + metastore, typeManager)); }; } @@ -2566,23 +2563,6 @@ public DeltaLakeMetastore getMetastore() return metastore; } - private List getCommitInfoEntries(SchemaTableName table, ConnectorSession session) - { - TrinoFileSystem fileSystem = fileSystemFactory.create(session); - try { - return TransactionLogTail.loadNewTail(fileSystem, new Path(metastore.getTableLocation(table, session)), Optional.empty()).getFileEntries().stream() - .map(DeltaLakeTransactionLogEntry::getCommitInfo) - .filter(Objects::nonNull) - .collect(toImmutableList()); - } - catch (TrinoException e) { - throw e; - } - catch (IOException | RuntimeException e) { - throw new TrinoException(DELTA_LAKE_INVALID_SCHEMA, "Error getting commit info entries for " + table, e); - } - } - private static ColumnMetadata getColumnMetadata(DeltaLakeColumnHandle column, @Nullable String comment, boolean nullability) { return ColumnMetadata.builder() diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeTransactionLogIterator.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeTransactionLogIterator.java new file mode 100644 index 000000000000..e8e03f52531a --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/transactionlog/DeltaLakeTransactionLogIterator.java @@ -0,0 +1,115 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.transactionlog; + +import io.trino.filesystem.TrinoFileSystem; +import io.trino.plugin.deltalake.transactionlog.checkpoint.LastCheckpoint; +import io.trino.spi.TrinoException; +import org.apache.hadoop.fs.Path; + +import java.io.IOException; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.Optional; + +import static com.google.common.base.Verify.verify; +import static io.trino.plugin.deltalake.DeltaLakeErrorCode.DELTA_LAKE_FILESYSTEM_ERROR; +import static io.trino.plugin.deltalake.transactionlog.TransactionLogParser.readLastCheckpoint; +import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogDir; +import static io.trino.plugin.deltalake.transactionlog.TransactionLogUtil.getTransactionLogJsonEntryPath; +import static io.trino.plugin.deltalake.transactionlog.checkpoint.TransactionLogTail.getEntriesFromJson; +import static java.util.Objects.requireNonNull; + +public class DeltaLakeTransactionLogIterator + implements Iterator> +{ + private final TrinoFileSystem fileSystem; + private final Path transactionLogDir; + private final Optional endVersion; + private final Optional lastCheckpointVersion; + + private long version; + private Optional> entries = Optional.empty(); + private boolean stopped; + + public DeltaLakeTransactionLogIterator( + TrinoFileSystem fileSystem, + Path tableLocation, + Optional startVersion, + Optional endVersion) + { + verify(startVersion.orElse(0L) <= endVersion.orElse(startVersion.orElse(0L)), "startVersion must be less or equal than endVersion"); + version = startVersion.orElse(0L) - 1; + this.endVersion = endVersion; + this.fileSystem = requireNonNull(fileSystem, "fileSystem is null"); + transactionLogDir = getTransactionLogDir(requireNonNull(tableLocation, "tableLocation is null")); + lastCheckpointVersion = readLastCheckpoint(fileSystem, tableLocation).map(LastCheckpoint::getVersion); + } + + @Override + public boolean hasNext() + { + if (entries.isPresent()) { + return true; + } + entries = retrieveNextEntries(); + return entries.isPresent(); + } + + @Override + public List next() + { + if (entries.isPresent()) { + List next = entries.get(); + entries = Optional.empty(); + return next; + } + + return retrieveNextEntries().orElseThrow(NoSuchElementException::new); + } + + private Optional> retrieveNextEntries() + { + if (stopped) { + return Optional.empty(); + } + if (endVersion.isPresent() && version == endVersion.get()) { + stopped = true; + return Optional.empty(); + } + while (true) { + version++; + Optional> nextEntries; + try { + nextEntries = getEntriesFromJson(version, transactionLogDir, fileSystem); + } + catch (IOException e) { + throw new TrinoException(DELTA_LAKE_FILESYSTEM_ERROR, "Failed accessing transaction log " + getTransactionLogJsonEntryPath(transactionLogDir, version), e); + } + if (nextEntries.isPresent()) { + return nextEntries; + } + // The outdated transaction log file may have been removed after adding a subsequent checkpoint + if (lastCheckpointVersion.isPresent() && version < lastCheckpointVersion.get()) { + if (endVersion.isPresent() && version == endVersion.get()) { + return Optional.empty(); + } + continue; + } + stopped = true; + return Optional.empty(); + } + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java index 49e71475e63b..f3e7db1602f9 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java @@ -14,25 +14,38 @@ package io.trino.plugin.deltalake; import com.google.common.collect.ImmutableMap; +import io.trino.plugin.hive.containers.HiveMinioDataLake; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.DistributedQueryRunner; import org.testng.annotations.Test; import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.DELTA_CATALOG; -import static io.trino.plugin.deltalake.DeltaLakeQueryRunner.createDeltaLakeQueryRunner; +import static java.lang.String.format; import static org.assertj.core.api.Assertions.assertThat; public class TestDeltaLakeSystemTables extends AbstractTestQueryFramework { + private static final String SCHEMA = "test_schema"; + private static final String BUCKET_NAME = "deltalake-system-tables"; + private HiveMinioDataLake hiveMinioDataLake; + @Override protected DistributedQueryRunner createQueryRunner() throws Exception { - return createDeltaLakeQueryRunner( + hiveMinioDataLake = closeAfterClass(new HiveMinioDataLake(BUCKET_NAME)); + hiveMinioDataLake.start(); + DistributedQueryRunner queryRunner = DeltaLakeQueryRunner.createS3DeltaLakeQueryRunner( DELTA_CATALOG, - ImmutableMap.of(), - ImmutableMap.of("delta.enable-non-concurrent-writes", "true")); + SCHEMA, + ImmutableMap.of( + "delta.enable-non-concurrent-writes", "true", + "delta.register-table-procedure.enabled", "true"), + hiveMinioDataLake.getMinio().getMinioAddress(), + hiveMinioDataLake.getHiveHadoop()); + queryRunner.execute("CREATE SCHEMA " + SCHEMA + " WITH (location = 's3://" + BUCKET_NAME + "/" + SCHEMA + "')"); + return queryRunner; } @Test @@ -105,4 +118,24 @@ public void testHistoryTable() assertUpdate("DROP TABLE IF EXISTS test_checkpoint_table"); } } + + @Test + public void testHistoryTableWithLogEntriesRemoved() + { + String tableName = "delta_log_retention"; + hiveMinioDataLake.copyResources("databricks/delta_log_retention", tableName); + getQueryRunner().execute(format( + "CALL system.register_table('%s', '%s', '%s')", + SCHEMA, + tableName, + format("s3://%s/%s", BUCKET_NAME, tableName))); + + assertThat(query("SELECT version, operation, read_version, isolation_level, is_blind_append FROM \"" + tableName + "$history\"")) + .matches(""" + VALUES + (BIGINT '3', VARCHAR 'WRITE', BIGINT '2', VARCHAR 'WriteSerializable', true), + (BIGINT '4', VARCHAR 'DELETE', BIGINT '3', VARCHAR 'WriteSerializable', false), + (BIGINT '5', VARCHAR 'SET TBLPROPERTIES', BIGINT '4', VARCHAR 'WriteSerializable', true) + """); + } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeTransactionLogIterator.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeTransactionLogIterator.java new file mode 100644 index 000000000000..90a41ee07c85 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/transactionlog/TestDeltaLakeTransactionLogIterator.java @@ -0,0 +1,110 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.transactionlog; + +import io.trino.filesystem.TrinoFileSystem; +import io.trino.filesystem.hdfs.HdfsFileSystemFactory; +import io.trino.plugin.deltalake.DeltaTestingConnectorSession; +import org.apache.hadoop.fs.Path; +import org.testng.annotations.Test; + +import java.net.URI; +import java.util.Collection; +import java.util.List; +import java.util.Objects; +import java.util.Optional; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.Streams.stream; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static java.util.stream.LongStream.rangeClosed; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.testng.Assert.assertEquals; + +public class TestDeltaLakeTransactionLogIterator +{ + @Test + public void testIterator() + throws Exception + { + TrinoFileSystem fileSystem = new HdfsFileSystemFactory(HDFS_ENVIRONMENT).create(DeltaTestingConnectorSession.SESSION); + URI resource = getClass().getClassLoader().getResource("deltalake/person").toURI(); + Path tablePath = new Path(resource); + + assertEquals( + stream(new DeltaLakeTransactionLogIterator(fileSystem, tablePath, Optional.empty(), Optional.empty())) + .collect(toImmutableList()).size(), + 14); + assertThat(getCommitVersions(fileSystem, tablePath, Optional.empty(), Optional.empty())) + .hasSameElementsAs(rangeClosed(0L, 13L).boxed().toList()); + assertThat(getCommitVersions(fileSystem, tablePath, Optional.of(0L), Optional.of(0L))) + .hasSameElementsAs(rangeClosed(0L, 0L).boxed().toList()); + assertThat(getCommitVersions(fileSystem, tablePath, Optional.of(10L), Optional.empty())) + .hasSameElementsAs(rangeClosed(10L, 13L).boxed().toList()); + assertThat(getCommitVersions(fileSystem, tablePath, Optional.of(10L), Optional.of(12L))) + .hasSameElementsAs(rangeClosed(10L, 12L).boxed().toList()); + assertThat(getCommitVersions(fileSystem, tablePath, Optional.empty(), Optional.of(5L))) + .hasSameElementsAs(rangeClosed(0L, 5L).boxed().toList()); + assertThat(getCommitVersions(fileSystem, tablePath, Optional.of(5L), Optional.of(5L))) + .hasSameElementsAs(rangeClosed(5L, 5L).boxed().toList()); + + assertThat(getCommitVersions(fileSystem, tablePath, Optional.empty(), Optional.of(14L))) + .hasSameElementsAs(rangeClosed(0L, 13L).boxed().toList()); + + assertThatThrownBy(() -> getCommitVersions(fileSystem, tablePath, Optional.of(6L), Optional.of(5L))) + .hasMessage("startVersion must be less or equal than endVersion"); + } + + @Test + public void testIteratorOnTableWithTransactionLogsRemoved() + throws Exception + { + TrinoFileSystem fileSystem = new HdfsFileSystemFactory(HDFS_ENVIRONMENT).create(DeltaTestingConnectorSession.SESSION); + URI resource = getClass().getClassLoader().getResource("databricks/delta_log_retention").toURI(); + Path tablePath = new Path(resource); + + assertEquals( + stream(new DeltaLakeTransactionLogIterator(fileSystem, tablePath, Optional.empty(), Optional.empty())) + .collect(toImmutableList()).size(), + 3); + assertThat(getCommitVersions(fileSystem, tablePath, Optional.empty(), Optional.empty())) + .hasSameElementsAs(rangeClosed(3L, 5L).boxed().toList()); + assertThat(getCommitVersions(fileSystem, tablePath, Optional.of(1L), Optional.empty())) + .hasSameElementsAs(rangeClosed(3L, 5L).boxed().toList()); + assertThat(getCommitVersions(fileSystem, tablePath, Optional.of(1L), Optional.of(3L))) + .hasSameElementsAs(rangeClosed(3L, 3L).boxed().toList()); + // commit corresponding to the checkpoint + assertThat(getCommitVersions(fileSystem, tablePath, Optional.of(4L), Optional.of(4L))) + .hasSameElementsAs(rangeClosed(4L, 4L).boxed().toList()); + // commit following the checkpoint + assertThat(getCommitVersions(fileSystem, tablePath, Optional.of(5L), Optional.of(5L))) + .hasSameElementsAs(rangeClosed(5L, 5L).boxed().toList()); + assertThat(getCommitVersions(fileSystem, tablePath, Optional.of(1L), Optional.of(2L))) + .isEmpty(); + + assertThat(getCommitVersions(fileSystem, tablePath, Optional.empty(), Optional.of(6L))) + .hasSameElementsAs(rangeClosed(3L, 5L).boxed().toList()); + } + + private static List getCommitVersions(TrinoFileSystem fileSystem, Path tablePath, Optional startVersion, Optional endVersion) + { + return stream(new DeltaLakeTransactionLogIterator(fileSystem, tablePath, startVersion, endVersion)) + .flatMap(Collection::stream) + .map(DeltaLakeTransactionLogEntry::getCommitInfo) + .filter(Objects::nonNull) + .map(CommitInfoEntry::getVersion) + .collect(toImmutableList()); + } +} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/00000000000000000003.json b/plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/00000000000000000003.json new file mode 100644 index 000000000000..892c9c174326 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/00000000000000000003.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1676977535691,"userId":"615774135840106","userName":"marius.grama@starburstdata.com","operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"notebook":{"notebookId":"3813892257906498"},"clusterId":"0110-055035-89ctt73p","readVersion":2,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"1","numOutputBytes":"577"},"engineInfo":"Databricks-Runtime/11.3.x-scala2.12","txnId":"bb6dbe77-8325-4991-a3ad-791b70e79f4c"}} +{"add":{"path":"part-00000-765d5da0-2c61-4616-937f-14a6559b3646-c000.snappy.parquet","partitionValues":{},"size":577,"modificationTime":1676977536000,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"col\":3},\"maxValues\":{\"col\":3},\"nullCount\":{\"col\":0}}","tags":{"INSERTION_TIME":"1676977536000000","MIN_INSERTION_TIME":"1676977536000000","MAX_INSERTION_TIME":"1676977536000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/00000000000000000004.checkpoint.parquet b/plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/00000000000000000004.checkpoint.parquet new file mode 100644 index 0000000000000000000000000000000000000000..9796aca7342be8c1745f076caf558e62c9a44aa8 GIT binary patch literal 15850 zcmds84{%e*nSW1~Wnml;gnf~SLRH0RoSfyz`iIOZV6aK-9B~-OV1rwKQQ;dXsS&LfbjU$E2yRv+xK?g)01pVcF3i52wtte{e6G-+wa@`cGb2V8*UcdhOJN1l< zO%QGhSg0NvFQ9T051D@Q^=~gTP%R6qR8~rBMWxpeFZqXa|8ngnMg~S%9@AhpAvyeL zbZ5SI_-{F!kxuy9@%-Z=rE`!~DD{o(U&KidgpzCL24j^uiu8Vf8TNepM1NCBH3G@V` zv2Z97jzmEUFY%F=FFgKDD+*;*g<8?63ZXQfzY2g+APa#SQwz4ppJqa*FSpy}8T zz>j(}V!Yp9eo?kUKNptG?Veu zmo7FMkQ{M_q;W*F?^wto3px4N-z5x`Ul&zZ_gK1sRCo1AKn8~%iuz-bp|C$JD}(;Q zaC?V88Ho(aF+~oAgP}^UzW!ceDml~<6Tvzaydy%8_U!5T0e3 zSYaAhk-y%5aeR54SkMdX?wyCZo311hIq+96zC@=GryJaPiHA<1$0F2$ia7k>&(2Qr zRR$EABLjs-@QwF=`z_=2Z~{cYb^4S2PrbvJmU*3eEI@_p^!Z<&c(I9D5~xbn>9|J# zheUB6`{{50*T%ImEYA6~v62%a`5O7*yAwaL$B7-Su|2t{MJA9I(n`i3fB8Ql)=6x^ ztlEketdBL+4`=Taj&_72NGH2!HSc zMrtJi^6vg$zXH?6nMng0vj$`)^cI^%VAuI&Aje4m!Ows5GAw4kBD6^F5I+Uqe~Ao| zmyi78ca1^vuG7@}>_RiB6H5*S!+{RjFAqVA2(KH8_+wFd$e#=rCyg@IQT?+> zE{Npc{{FvTvM@MUT*~`N%>p}s^P}Fvo>9fS-kUGv#u5c@t9K}+q%oX+0ivrsf?r`E zZDn++OhFk|a?pYCR31zN1q@Xn%qj&rDHmi2uY33I#K?+b9(elTagnh%YXkWL$o@to zMUi~tz4K3k-!(x8O;z`*smh9}DlV)BS(8e-Ah)Hn!~IG@$rMuAO!rt$7VUzQtm#fX zq$D05&4PHnAYpE|oCcr`q?3$4KaHV*<9K2F2AKibE8pHY~8Rz^IFa7%?brXWJOC3u_v_wxY|rob=cH-MC`t4;oqfF^n?$(}K1nrT(;# zoeA|Pn>xjIwi^LySretzzSdoWb-?-&MMg3AVnUCb#UVAc>K#lQz#WWbWwV1J-P}^u z^ew2+Tum0Nl8pJm+%f>u1})|3t*VE@!dP6+qHAkW5QDP_3yL|_ELEP4T+Y>%6Bs5k zhKC--vJv=U(Adus50J2?;cF|dYgohh%h-)62D)jcsV2(0B|GVqaFAQ-;jMilZ=T0p zg+_3wj%?MM({-dzYo4@{ZCbNf&zl$P$!4wh85pj-Yn={QE;?=ck6WryaKkI4A!;*Y>R~|ffL?(?w z#Nw?P0RVh=SE=zs3hrFIhiC5 zb*yz%E~tGN%(jh3yg02{G7mprWO*#(=oF$K;qz*unBK@Vd~1)<-v z9u&lyPyL0WO5qsECR0PHgpB9|sS)TFfT9uirCkrnnPCNGvlisx{9?FLIr0U$kk_?p zqMvtxZh0jM=Ui%|M!i6RjF<2N1%b1z1thD3&9(ZC{0Os#=^I|Rz%i&V8)M5UBiY?b zB`Z_96%cO}W(bgOWVGBG)I6({M_@ypRFe0k(#rkXHXfzAF*Efs%m6?vW7=(35`}CI zJeKRP$I^Ez<%sNgIqtn#=A^C6$~`(U7G>7VW7`>Y9GAkV|ETb$AC~hF#h8*hmKX*tW*ib!bhe@&$A_ zRG-}4)MAzrI`q;+PQkcA7c5#BY*EO<%D`oWGvt7E)33;2 zo$V~7y3sEH3kdH4#Ic-Gi&D$JO0jAkDK)y>u8;gaIN>$9B7)8CYQey% z225g+8l||@>9fb9a2#{PX%AEq1Zm$!vP#c# zt&L-iQJ9b4Cz=3W+DIhn90fQJ09QQO7O51!Vt#SQeqqo5P{J5j{ z8V&J8CvZ4(2iYL_icrusawyBpoLjLTxUqw~-a>BWy+&?Fk=uuq+eP3eur&nw7!p3^ zvq9SIy9j(l0sa6o{cXg01+h-wNxXvZ8C%H=(#=BIv^k{TiP9S)5XYnTmWpKqb9KSU z?#)i{n9FyPLBaPLjT>uiR@Grl?D(8Iq|>5wMg(%-3e{jM#&5`B zG~P_psreJKP4GPnrF7$(uctc~YNr`dx+nrc+}Xd#@xG7? z5~lht6?dAH?>*9eFea{+h}6G#^i z=h?fE&zrzUGhzjO_zs%$>67|l48|SpM{l7W%X^LbT|&O+?m{~r!OF50ceL%eotpSKpk!zd44T}NlhDROLf;PL=)uL%qetw|7uv_{| z-%*F}SIgacMNMx>wiaGz(|aofq~d0<;?W>laT<8D=&kXt6u_q)EP&g}1Nb3?yAs3z zetbn4T*VU()dn-e@f83p!XFTx{x%xv1q2=sA@EfI)^fWhW|isv>IzdTK8K*+4qS8Bx!G5|8K#gGF!dHrVR>(bDSV&WJcLtt z3@$OiG?>NMI>1AAS>A*IWRb3mK<^u0<%We(W$G-;XODM*suN+%px32&cyqH!(lt@K zBEksYb%NY0B~ey+t)TD(GQA$gBsz5qGNs%7n5MDmL~1;VQdh%cz|<|KDLaXflM(cU z3Fmx}rb8@8op72^&m-y#MV$gvEdG|4V|9X~%$QLo z5#=R{a>C7ph;id;)eK(6ph*4FcHaqioEQ6f1FXd^0C=BK=V|0N86)dq2XPy6!}HYV z&CFPRq%6oZN_eI!-e$7Ihlskb11&LG3n~vED(<8i_uv-5Jx*~Cui>`Q9j~PRLJjkg zC>1Tf!)xNs9gDrpnEe9&MqG}p!=T*5A(Ag?q_8cGFla5)i64f zc+j5km>lf~=n(JzDLUG<3b3Wcvoh~(lFo|KDG`R^(Q=MUquKA8j7nZYCCB6N!~(Xf z-f|^plZJbJla!V+FchT`gZllHk;x2dT)dkE1mA(%c{taXfU&|y?S^HB1GmEloLyz< zE%2=hy+&@udw|=?yK&z!1>CfJ)WXY#W#vN3vRmo_ayVPy*-8BySQ$PUSsX$Z@86AQ zi(}2RuqY`#au&y$fdyW-sedC2{gaW!3&`Tg24pbx)iTHaFZRkIHaq)JE@*+6r*# zvLl;OoDEwFiMu{G&?PwCWkhLZSWfKe?HlNb!h5md@TjF^!Xk^welxanm#%}I$O(d!`)hK z5If=d>zwbjbP=KSz_F{b2;Qgg0W}+h-#w_8UL1&X9+5|(6KbS*k)ys(Q!P%5oE1B} zvYCRM%H;dtVVe3Vt$OXT{>g>#);2rlJI96M=$Di_!eBO=R%GF!j zluJ>wa~N+@ISSWhj+cu1Z7UyE>s;~|U5M1A?f$5*2<*H2`p zId7KIJ;L}}hf7Woktv&8BZ>iyh&w_W;C9i|ZIe#J+*9~fT zUBqRnD!tY)Y3#)kXTZ%o#}3W>s)-Nw)*KN5s-zsT2&mHcQyhpZbOZgSv^wz#u%35@ zckUIOpD8OosPzkY1E}D}4NDk1V7=iTt$!rv(GL5b_O|w5TfmQ7v%y>{@o*mBP_6T{ z434GJNskhX$cc_bXfQmKh{0`c$6zwr9%v7QlJE~NW5Hk|HWUjhO85?VTLy1#KMj9` NJK>e?HuzWG{|D8|QS|@- literal 0 HcmV?d00001 diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/00000000000000000004.json b/plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/00000000000000000004.json new file mode 100644 index 000000000000..37a58d920320 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/00000000000000000004.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1676977543781,"userId":"615774135840106","userName":"marius.grama@starburstdata.com","operation":"DELETE","operationParameters":{"predicate":"[\"(spark_catalog.default.delta_log_retention.col <= 2)\"]"},"notebook":{"notebookId":"3813892257906498"},"clusterId":"0110-055035-89ctt73p","readVersion":3,"isolationLevel":"WriteSerializable","isBlindAppend":false,"operationMetrics":{"numRemovedFiles":"2","numCopiedRows":"0","numAddedChangeFiles":"0","executionTimeMs":"3826","numDeletedRows":"2","scanTimeMs":"3007","numAddedFiles":"0","rewriteTimeMs":"812"},"engineInfo":"Databricks-Runtime/11.3.x-scala2.12","txnId":"8e427e9c-b480-4614-a5de-8e0d3b602703"}} +{"remove":{"path":"part-00000-50a59bf6-85f4-4aeb-b419-d55ba8ea3423-c000.snappy.parquet","deletionTimestamp":1676977543773,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":577,"tags":{"INSERTION_TIME":"1676977523000000","MIN_INSERTION_TIME":"1676977523000000","MAX_INSERTION_TIME":"1676977523000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} +{"remove":{"path":"part-00000-df905038-3bec-483a-bf62-95eba5650446-c000.snappy.parquet","deletionTimestamp":1676977543773,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":577,"tags":{"INSERTION_TIME":"1676977518000000","MIN_INSERTION_TIME":"1676977518000000","MAX_INSERTION_TIME":"1676977518000000","OPTIMIZE_TARGET_SIZE":"268435456"}}} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/00000000000000000005.json b/plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/00000000000000000005.json new file mode 100644 index 000000000000..3a06cc0ce251 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/00000000000000000005.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1676980634723,"userId":"615774135840106","userName":"marius.grama@starburstdata.com","operation":"SET TBLPROPERTIES","operationParameters":{"properties":"{\"comment\":\"table comment\"}"},"notebook":{"notebookId":"3813892257906498"},"clusterId":"0110-055035-89ctt73p","readVersion":4,"isolationLevel":"WriteSerializable","isBlindAppend":true,"operationMetrics":{},"engineInfo":"Databricks-Runtime/11.3.x-scala2.12","txnId":"2fe49e20-b13b-4604-9faf-13e72d6353c5"}} +{"metaData":{"id":"8df2409a-afa0-47f5-86af-d2e835eb2520","description":"table comment","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"col\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.logRetentionDuration":"1 minute","delta.checkpointInterval":"2"},"createdTime":1676977484427}} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/_last_checkpoint b/plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/_last_checkpoint new file mode 100644 index 000000000000..268ae232ed1c --- /dev/null +++ b/plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/_delta_log/_last_checkpoint @@ -0,0 +1 @@ +{"version":4,"size":5,"sizeInBytes":15850,"numOfAddFiles":1,"checkpointSchema":{"type":"struct","fields":[{"name":"txn","type":{"type":"struct","fields":[{"name":"appId","type":"string","nullable":true,"metadata":{}},{"name":"version","type":"long","nullable":true,"metadata":{}},{"name":"lastUpdated","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"add","type":{"type":"struct","fields":[{"name":"path","type":"string","nullable":true,"metadata":{}},{"name":"partitionValues","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"size","type":"long","nullable":true,"metadata":{}},{"name":"modificationTime","type":"long","nullable":true,"metadata":{}},{"name":"dataChange","type":"boolean","nullable":true,"metadata":{}},{"name":"tags","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"stats","type":"string","nullable":true,"metadata":{}},{"name":"stats_parsed","type":{"type":"struct","fields":[{"name":"numRecords","type":"long","nullable":true,"metadata":{}},{"name":"minValues","type":{"type":"struct","fields":[{"name":"col","type":"integer","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"maxValues","type":{"type":"struct","fields":[{"name":"col","type":"integer","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"nullCount","type":{"type":"struct","fields":[{"name":"col","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"remove","type":{"type":"struct","fields":[{"name":"path","type":"string","nullable":true,"metadata":{}},{"name":"deletionTimestamp","type":"long","nullable":true,"metadata":{}},{"name":"dataChange","type":"boolean","nullable":true,"metadata":{}},{"name":"extendedFileMetadata","type":"boolean","nullable":true,"metadata":{}},{"name":"partitionValues","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"size","type":"long","nullable":true,"metadata":{}},{"name":"tags","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"deletionVector","type":{"type":"struct","fields":[{"name":"storageType","type":"string","nullable":true,"metadata":{}},{"name":"pathOrInlineDv","type":"string","nullable":true,"metadata":{}},{"name":"offset","type":"integer","nullable":true,"metadata":{}},{"name":"sizeInBytes","type":"integer","nullable":true,"metadata":{}},{"name":"cardinality","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"metaData","type":{"type":"struct","fields":[{"name":"id","type":"string","nullable":true,"metadata":{}},{"name":"name","type":"string","nullable":true,"metadata":{}},{"name":"description","type":"string","nullable":true,"metadata":{}},{"name":"format","type":{"type":"struct","fields":[{"name":"provider","type":"string","nullable":true,"metadata":{}},{"name":"options","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"schemaString","type":"string","nullable":true,"metadata":{}},{"name":"partitionColumns","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"configuration","type":{"type":"map","keyType":"string","valueType":"string","valueContainsNull":true},"nullable":true,"metadata":{}},{"name":"createdTime","type":"long","nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}},{"name":"protocol","type":{"type":"struct","fields":[{"name":"minReaderVersion","type":"integer","nullable":true,"metadata":{}},{"name":"minWriterVersion","type":"integer","nullable":true,"metadata":{}},{"name":"readerFeatures","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}},{"name":"writerFeatures","type":{"type":"array","elementType":"string","containsNull":true},"nullable":true,"metadata":{}}]},"nullable":true,"metadata":{}}]},"checksum":"99eaea32e46802bb0ed64901115d3c80"} diff --git a/plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/part-00000-765d5da0-2c61-4616-937f-14a6559b3646-c000.snappy.parquet b/plugin/trino-delta-lake/src/test/resources/databricks/delta_log_retention/part-00000-765d5da0-2c61-4616-937f-14a6559b3646-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..f7a4543df328036f4e7629f7a93cd2db1b91ecde GIT binary patch literal 577 zcmZWn%Z}496uos}h!7j3+H#~wR2Eb-QfrdFNGmqHD%gxhsB8)dIgVSR$)l6Fpi$|! zAohF_f59Jc+mux|*S^Q+-g}Pis~7JB3baKz{r%_XuZBknrf1Xvxc`^{K(o=r=sqMa zzjN5t3neZ?*kcZz;RZ18zI{-wReAlT(qB!cC_(TE;Z0qnz=-R)UdO@q?g-8t_W)DE z2z$w99p`!7wT=wV52NY~hoBpI+HydMR zS$vn7u_{(p@x@ly+qDr>QQ2~%t>}tnV$!%0(~nz`t89OS7lZhbm*m#0Oo>3gNmCW2 z$iPz575|%x>2|mKyaWE|L`QAqwX$Z8&i8H>M_IkBY#)8J0~z$C-`fj}%0ypRJ>U1A zwa=qXlE!Tlj+CD0K{Q Date: Tue, 21 Feb 2023 22:21:47 +0100 Subject: [PATCH 3/4] Provide injection mechanism for the file system factory --- .../trino/plugin/deltalake/DeltaLakeConnectorFactory.java | 4 ++-- .../deltalake/InternalDeltaLakeConnectorFactory.java | 6 +++++- .../BaseDeltaLakeRegisterTableProcedureTest.java | 2 +- .../deltalake/BaseDeltaLakeSharedMetastoreViewsTest.java | 2 +- .../deltalake/TestDeltaLakeCreateSchemaInternalRetry.java | 2 +- .../TestDeltaLakePerTransactionMetastoreCache.java | 1 + .../io/trino/plugin/deltalake/TestingDeltaLakePlugin.java | 8 ++++++-- .../metastore/TestDeltaLakeMetastoreAccessOperations.java | 2 +- .../TestDeltaLakeConcurrentModificationGlueMetastore.java | 2 +- .../metastore/glue/TestDeltaLakeViewsGlueMetastore.java | 2 +- 10 files changed, 20 insertions(+), 11 deletions(-) diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConnectorFactory.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConnectorFactory.java index 16df93c674ad..2adb6d98fd14 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConnectorFactory.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConnectorFactory.java @@ -54,8 +54,8 @@ public Connector create(String catalogName, Map config, Connecto Class moduleClass = classLoader.loadClass(Module.class.getName()); Object moduleInstance = classLoader.loadClass(module.getName()).getConstructor().newInstance(); return (Connector) classLoader.loadClass(InternalDeltaLakeConnectorFactory.class.getName()) - .getMethod("createConnector", String.class, Map.class, ConnectorContext.class, Optional.class, moduleClass) - .invoke(null, catalogName, config, context, Optional.empty(), moduleInstance); + .getMethod("createConnector", String.class, Map.class, ConnectorContext.class, Optional.class, Optional.class, moduleClass) + .invoke(null, catalogName, config, context, Optional.empty(), Optional.empty(), moduleInstance); } catch (InvocationTargetException e) { Throwable targetException = e.getTargetException(); diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/InternalDeltaLakeConnectorFactory.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/InternalDeltaLakeConnectorFactory.java index efd32fe32890..ae1708827841 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/InternalDeltaLakeConnectorFactory.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/InternalDeltaLakeConnectorFactory.java @@ -22,6 +22,7 @@ import io.airlift.bootstrap.LifeCycleManager; import io.airlift.event.client.EventModule; import io.airlift.json.JsonModule; +import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.hdfs.HdfsFileSystemModule; import io.trino.hdfs.HdfsModule; import io.trino.hdfs.authentication.HdfsAuthenticationModule; @@ -73,6 +74,7 @@ public static Connector createConnector( Map config, ConnectorContext context, Optional metastoreModule, + Optional fileSystemFactory, Module module) { ClassLoader classLoader = InternalDeltaLakeConnectorFactory.class.getClassLoader(); @@ -91,7 +93,6 @@ public static Connector createConnector( new HiveGcsModule(), new DeltaLakeGcsModule(), new HdfsAuthenticationModule(), - new HdfsFileSystemModule(), new CatalogNameModule(catalogName), metastoreModule.orElse(new DeltaLakeMetastoreModule()), new DeltaLakeModule(), @@ -103,6 +104,9 @@ public static Connector createConnector( binder.bind(PageIndexerFactory.class).toInstance(context.getPageIndexerFactory()); binder.bind(CatalogName.class).toInstance(new CatalogName(catalogName)); newSetBinder(binder, EventListener.class); + fileSystemFactory.ifPresentOrElse( + factory -> binder.bind(TrinoFileSystemFactory.class).toInstance(factory), + () -> binder.install(new HdfsFileSystemModule())); }, module); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeRegisterTableProcedureTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeRegisterTableProcedureTest.java index 55b69834cda0..3fdb3f18df0e 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeRegisterTableProcedureTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeRegisterTableProcedureTest.java @@ -68,7 +68,7 @@ protected QueryRunner createQueryRunner() this.dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("delta_lake_data").toString(); this.metastore = createTestMetastore(dataDirectory); - queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.of(new TestingDeltaLakeMetastoreModule(metastore)), EMPTY_MODULE)); + queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.of(new TestingDeltaLakeMetastoreModule(metastore)), Optional.empty(), EMPTY_MODULE)); Map connectorProperties = ImmutableMap.builder() .put("delta.unique-table-location", "true") diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeSharedMetastoreViewsTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeSharedMetastoreViewsTest.java index 772f3c11e9a9..8705fc06dedc 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeSharedMetastoreViewsTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeSharedMetastoreViewsTest.java @@ -61,7 +61,7 @@ protected QueryRunner createQueryRunner() this.dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("delta_lake_data").toString(); this.metastore = createTestMetastore(dataDirectory); - queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.of(new TestingDeltaLakeMetastoreModule(metastore)), EMPTY_MODULE)); + queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.of(new TestingDeltaLakeMetastoreModule(metastore)), Optional.empty(), EMPTY_MODULE)); queryRunner.createCatalog(DELTA_CATALOG_NAME, "delta_lake"); queryRunner.installPlugin(new TestingHivePlugin(metastore)); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeCreateSchemaInternalRetry.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeCreateSchemaInternalRetry.java index 558f99cf025c..bcb14e7853b5 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeCreateSchemaInternalRetry.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeCreateSchemaInternalRetry.java @@ -88,7 +88,7 @@ public synchronized void createDatabase(Database database) } }; - queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.of(new TestingDeltaLakeMetastoreModule(metastore)), EMPTY_MODULE)); + queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.of(new TestingDeltaLakeMetastoreModule(metastore)), Optional.empty(), EMPTY_MODULE)); queryRunner.createCatalog(CATALOG_NAME, CONNECTOR_NAME, Map.of()); return queryRunner; } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePerTransactionMetastoreCache.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePerTransactionMetastoreCache.java index 54a1aed72b10..b6601637897b 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePerTransactionMetastoreCache.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakePerTransactionMetastoreCache.java @@ -100,6 +100,7 @@ private DistributedQueryRunner createQueryRunner(boolean enablePerTransactionHiv executorService = newCachedThreadPool(threadsNamed("hive-thrift-statistics-write-%s")); queryRunner.installPlugin(new TestingDeltaLakePlugin( + Optional.empty(), Optional.empty(), new AbstractConfigurationAwareModule() { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakePlugin.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakePlugin.java index e26d6076592e..3fc363f27b55 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakePlugin.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestingDeltaLakePlugin.java @@ -16,6 +16,7 @@ import com.google.inject.Binder; import com.google.inject.Module; import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.spi.connector.Connector; import io.trino.spi.connector.ConnectorContext; import io.trino.spi.connector.ConnectorFactory; @@ -31,16 +32,18 @@ public class TestingDeltaLakePlugin extends DeltaLakePlugin { private final Optional metastoreModule; + private final Optional fileSystemFactory; private final Module additionalModule; public TestingDeltaLakePlugin() { - this(Optional.empty(), EMPTY_MODULE); + this(Optional.empty(), Optional.empty(), EMPTY_MODULE); } - public TestingDeltaLakePlugin(Optional metastoreModule, Module additionalModule) + public TestingDeltaLakePlugin(Optional metastoreModule, Optional fileSystemFactory, Module additionalModule) { this.metastoreModule = requireNonNull(metastoreModule, "metastoreModule is null"); + this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null"); this.additionalModule = requireNonNull(additionalModule, "additionalModule is null"); } @@ -63,6 +66,7 @@ public Connector create(String catalogName, Map config, Connecto config, context, metastoreModule, + fileSystemFactory, new AbstractConfigurationAwareModule() { @Override diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreAccessOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreAccessOperations.java index 9b7b957770ff..a3a21380f502 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreAccessOperations.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/TestDeltaLakeMetastoreAccessOperations.java @@ -67,7 +67,7 @@ protected DistributedQueryRunner createQueryRunner() File baseDir = queryRunner.getCoordinator().getBaseDataDir().resolve("delta_lake").toFile(); metastore = new CountingAccessHiveMetastore(createTestingFileHiveMetastore(baseDir)); - queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.empty(), new CountingAccessMetastoreModule(metastore))); + queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.empty(), Optional.empty(), new CountingAccessMetastoreModule(metastore))); ImmutableMap.Builder deltaLakeProperties = ImmutableMap.builder(); deltaLakeProperties.put("hive.metastore", "test"); // use test value so we do not get clash with default bindings) queryRunner.createCatalog("delta_lake", "delta_lake", deltaLakeProperties.buildOrThrow()); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeConcurrentModificationGlueMetastore.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeConcurrentModificationGlueMetastore.java index 794070f2de66..ebb73093b711 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeConcurrentModificationGlueMetastore.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeConcurrentModificationGlueMetastore.java @@ -100,7 +100,7 @@ protected QueryRunner createQueryRunner() stats, table -> true); - queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.of(new TestingDeltaLakeMetastoreModule(metastore)), EMPTY_MODULE)); + queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.of(new TestingDeltaLakeMetastoreModule(metastore)), Optional.empty(), EMPTY_MODULE)); queryRunner.createCatalog(CATALOG_NAME, "delta_lake"); queryRunner.execute("CREATE SCHEMA " + SCHEMA); return queryRunner; diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeViewsGlueMetastore.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeViewsGlueMetastore.java index 178dd6549b40..341ea2376929 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeViewsGlueMetastore.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/metastore/glue/TestDeltaLakeViewsGlueMetastore.java @@ -64,7 +64,7 @@ protected QueryRunner createQueryRunner() dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("data_delta_lake_views").toString(); metastore = createTestMetastore(dataDirectory); - queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.of(new TestingDeltaLakeMetastoreModule(metastore)), EMPTY_MODULE)); + queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.of(new TestingDeltaLakeMetastoreModule(metastore)), Optional.empty(), EMPTY_MODULE)); queryRunner.createCatalog(CATALOG_NAME, "delta_lake"); queryRunner.execute("CREATE SCHEMA " + SCHEMA); From 0760302dbb028ece7af919185dcd24f87af433ff Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Mon, 20 Feb 2023 21:46:06 +0100 Subject: [PATCH 4/4] Push down version filter in the Delta `$history` metadata table --- .../deltalake/DeltaLakeHistoryTable.java | 60 ++++- .../AccessTrackingFileSystemFactory.java | 22 +- .../deltalake/TestDeltaLakeSystemTables.java | 10 +- ...ltaLakeSystemTablesWithAccessTracking.java | 240 ++++++++++++++++++ 4 files changed, 319 insertions(+), 13 deletions(-) create mode 100644 plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTablesWithAccessTracking.java diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java index ea48ee0f0775..e9a3dda6c1ed 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java @@ -29,6 +29,8 @@ import io.trino.spi.connector.RecordCursor; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SystemTable; +import io.trino.spi.predicate.Domain; +import io.trino.spi.predicate.Range; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.TimeZoneKey; import io.trino.spi.type.Type; @@ -42,6 +44,7 @@ import java.util.Objects; import java.util.Optional; +import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Streams.stream; import static io.trino.spi.type.BigintType.BIGINT; @@ -55,6 +58,7 @@ public class DeltaLakeHistoryTable implements SystemTable { + private static final int VERSION_COLUMN_INDEX = 0; private final SchemaTableName tableName; private final ConnectorTableMetadata tableMetadata; private final List types; @@ -113,14 +117,15 @@ public ConnectorTableMetadata getTableMetadata() @Override public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) { + VersionRange versionRange = extractVersionRange(constraint); TimeZoneKey timeZoneKey = session.getTimeZoneKey(); TrinoFileSystem fileSystem = fileSystemFactory.create(session); Iterable> records = () -> stream(new DeltaLakeTransactionLogIterator( fileSystem, new Path(metastore.getTableLocation(tableName, session)), - Optional.empty(), - Optional.empty())) + versionRange.startVersion, + versionRange.endVersion)) .flatMap(Collection::stream) .map(DeltaLakeTransactionLogEntry::getCommitInfo) .filter(Objects::nonNull) @@ -163,4 +168,55 @@ private Object toVarcharVarcharMapBlock(Map values) blockBuilder.closeEntry(); return varcharToVarcharMapType.getObject(blockBuilder, 0); } + + private static VersionRange extractVersionRange(TupleDomain constraint) + { + Domain versionDomain = constraint.getDomains() + .map(map -> map.get(VERSION_COLUMN_INDEX)) + .orElse(Domain.all(BIGINT)); + Optional startVersion = Optional.empty(); + Optional endVersion = Optional.empty(); + if (versionDomain.isAll() || versionDomain.isNone()) { + return new VersionRange(startVersion, endVersion); + } + List orderedRanges = versionDomain.getValues().getRanges().getOrderedRanges(); + if (orderedRanges.size() == 1) { + // Opt for a rather pragmatical choice of extracting the version range + // only when dealing with a single range + Range range = orderedRanges.get(0); + if (range.isSingleValue()) { + long version = (long) range.getLowBoundedValue(); + startVersion = Optional.of(version); + endVersion = Optional.of(version); + } + else { + if (!range.isLowUnbounded()) { + long version = (long) range.getLowBoundedValue(); + if (!range.isLowInclusive()) { + version++; + } + startVersion = Optional.of(version); + } + if (!range.isHighUnbounded()) { + long version = (long) range.getHighBoundedValue(); + if (!range.isHighInclusive()) { + version--; + } + endVersion = Optional.of(version); + } + } + } + return new VersionRange(startVersion, endVersion); + } + + private record VersionRange(Optional startVersion, Optional endVersion) + { + @SuppressWarnings("UnusedVariable") // TODO: Remove once https://github.com/google/error-prone/issues/2713 is fixed + private VersionRange + { + requireNonNull(startVersion, "startVersion is null"); + requireNonNull(endVersion, "endVersion is null"); + verify(startVersion.orElse(0L) <= endVersion.orElse(startVersion.orElse(0L)), "startVersion is greater than endVersion"); + } + } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingFileSystemFactory.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingFileSystemFactory.java index f3c8e682b766..2b7da272cf39 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingFileSystemFactory.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingFileSystemFactory.java @@ -57,6 +57,11 @@ public Map getOpenCount() return map.buildOrThrow(); } + public void resetOpenCount() + { + openedFiles.clear(); + } + private void incrementOpenCount(String path) { openedFiles.add(path.substring(path.lastIndexOf('/') + 1)); @@ -91,37 +96,42 @@ public TrinoInputFile newInputFile(String location, long length) @Override public TrinoOutputFile newOutputFile(String location) { - throw new UnsupportedOperationException(); + return delegate.newOutputFile(location); } @Override public void deleteFile(String location) + throws IOException { - throw new UnsupportedOperationException(); + delegate.deleteFile(location); } @Override public void deleteFiles(Collection locations) + throws IOException { - throw new UnsupportedOperationException(); + delegate.deleteFiles(locations); } @Override public void deleteDirectory(String location) + throws IOException { - throw new UnsupportedOperationException(); + delegate.deleteDirectory(location); } @Override public void renameFile(String source, String target) + throws IOException { - throw new UnsupportedOperationException(); + delegate.renameFile(source, target); } @Override public FileIterator listFiles(String location) + throws IOException { - throw new UnsupportedOperationException(); + return delegate.listFiles(location); } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java index f3e7db1602f9..5d4ca245f96a 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java @@ -132,10 +132,10 @@ public void testHistoryTableWithLogEntriesRemoved() assertThat(query("SELECT version, operation, read_version, isolation_level, is_blind_append FROM \"" + tableName + "$history\"")) .matches(""" - VALUES - (BIGINT '3', VARCHAR 'WRITE', BIGINT '2', VARCHAR 'WriteSerializable', true), - (BIGINT '4', VARCHAR 'DELETE', BIGINT '3', VARCHAR 'WriteSerializable', false), - (BIGINT '5', VARCHAR 'SET TBLPROPERTIES', BIGINT '4', VARCHAR 'WriteSerializable', true) - """); + VALUES + (BIGINT '3', VARCHAR 'WRITE', BIGINT '2', VARCHAR 'WriteSerializable', true), + (BIGINT '4', VARCHAR 'DELETE', BIGINT '3', VARCHAR 'WriteSerializable', false), + (BIGINT '5', VARCHAR 'SET TBLPROPERTIES', BIGINT '4', VARCHAR 'WriteSerializable', true) + """); } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTablesWithAccessTracking.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTablesWithAccessTracking.java new file mode 100644 index 000000000000..7bb1c587e1cc --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTablesWithAccessTracking.java @@ -0,0 +1,240 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import com.google.common.collect.ImmutableMap; +import io.trino.Session; +import io.trino.filesystem.hdfs.HdfsFileSystemFactory; +import io.trino.plugin.deltalake.metastore.TestingDeltaLakeMetastoreModule; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Optional; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static com.google.inject.util.Modules.EMPTY_MODULE; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static org.assertj.core.api.Assertions.assertThat; + +@Test(singleThreaded = true) +public class TestDeltaLakeSystemTablesWithAccessTracking + extends AbstractTestQueryFramework +{ + protected static final String SCHEMA = "test_delta_lake_system_tables_tracking_" + randomNameSuffix(); + + private static final Session TEST_SESSION = testSessionBuilder() + .setCatalog("delta_lake") + .setSchema(SCHEMA) + .build(); + + private File dataDirectory; + private HiveMetastore metastore; + private AccessTrackingFileSystemFactory fileSystemFactory; + + @Override + protected DistributedQueryRunner createQueryRunner() + throws Exception + { + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(TEST_SESSION).build(); + + dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("delta_lake").toFile(); + metastore = createTestingFileHiveMetastore(dataDirectory); + fileSystemFactory = new AccessTrackingFileSystemFactory(new HdfsFileSystemFactory(HDFS_ENVIRONMENT)); + + queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.of(new TestingDeltaLakeMetastoreModule(metastore)), Optional.of(fileSystemFactory), EMPTY_MODULE)); + queryRunner.createCatalog( + "delta_lake", + "delta_lake", + ImmutableMap.of("delta.enable-non-concurrent-writes", "true")); + + queryRunner.execute("CREATE SCHEMA " + SCHEMA); + return queryRunner; + } + + @AfterClass(alwaysRun = true) + public void cleanup() + throws IOException + { + if (metastore != null) { + metastore.dropDatabase(SCHEMA, false); + deleteRecursively(dataDirectory.toPath(), ALLOW_INSECURE); + } + } + + @Test + public void testHistoryTableVersionFilterPushdown() + { + try { + assertUpdate("CREATE TABLE test_simple_table_pushdown_version_filter (_bigint BIGINT)"); + assertUpdate("INSERT INTO test_simple_table_pushdown_version_filter VALUES 1", 1); + assertUpdate("INSERT INTO test_simple_table_pushdown_version_filter VALUES 2", 1); + assertUpdate("INSERT INTO test_simple_table_pushdown_version_filter VALUES 3", 1); + assertUpdate("INSERT INTO test_simple_table_pushdown_version_filter VALUES 4", 1); + assertQuerySucceeds("ALTER TABLE test_simple_table_pushdown_version_filter EXECUTE OPTIMIZE"); + + fileSystemFactory.resetOpenCount(); + // equal + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version = 0 AND operation = 'CREATE TABLE'")) + .matches("VALUES (BIGINT '0', VARCHAR 'CREATE TABLE')"); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000000.json", 1, + "00000000000000000005.checkpoint.parquet", 1, + "00000000000000000006.json", 16, + "_last_checkpoint", 17)); + fileSystemFactory.resetOpenCount(); + // less than + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version < 1")) + .matches("VALUES (BIGINT '0', VARCHAR 'CREATE TABLE')"); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000000.json", 1, + "00000000000000000006.json", 16, + "_last_checkpoint", 17)); + fileSystemFactory.resetOpenCount(); + // less than value greater than the number of transactions + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version < 10")) + .matches(""" + VALUES + (BIGINT '0', VARCHAR 'CREATE TABLE'), + (BIGINT '1', VARCHAR 'WRITE'), + (BIGINT '2', VARCHAR 'WRITE'), + (BIGINT '3', VARCHAR 'WRITE'), + (BIGINT '4', VARCHAR 'WRITE'), + (BIGINT '5', VARCHAR 'OPTIMIZE') + """); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000000.json", 1, + "00000000000000000001.json", 1, + "00000000000000000002.json", 1, + "00000000000000000003.json", 1, + "00000000000000000004.json", 1, + "00000000000000000005.json", 1, + "00000000000000000006.json", 17, + "_last_checkpoint", 17)); + fileSystemFactory.resetOpenCount(); + // less than or equal + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version <= 1")) + .matches(""" + VALUES + (BIGINT '0', VARCHAR 'CREATE TABLE'), + (BIGINT '1', VARCHAR 'WRITE') + """); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000000.json", 1, + "00000000000000000001.json", 1, + "00000000000000000006.json", 16, + "_last_checkpoint", 17)); + fileSystemFactory.resetOpenCount(); + // greater than + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version > 2")) + .matches(""" + VALUES + (BIGINT '3', VARCHAR 'WRITE'), + (BIGINT '4', VARCHAR 'WRITE'), + (BIGINT '5', VARCHAR 'OPTIMIZE') + """); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000003.json", 1, + "00000000000000000004.json", 1, + "00000000000000000005.json", 1, + "00000000000000000006.json", 17, + "_last_checkpoint", 17)); + fileSystemFactory.resetOpenCount(); + // greater than or equal + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version >= 5")) + .matches("VALUES (BIGINT '5', VARCHAR 'OPTIMIZE')"); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000005.json", 1, + "00000000000000000006.json", 17, + "_last_checkpoint", 17)); + fileSystemFactory.resetOpenCount(); + // between + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version BETWEEN 1 AND 2")) + .matches(""" + VALUES + (BIGINT '1', VARCHAR 'WRITE'), + (BIGINT '2', VARCHAR 'WRITE') + """); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000001.json", 1, + "00000000000000000002.json", 1, + "00000000000000000006.json", 16, + "_last_checkpoint", 17)); + fileSystemFactory.resetOpenCount(); + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version >= 1 AND version <= 2")) + .matches(""" + VALUES + (BIGINT '1', VARCHAR 'WRITE'), + (BIGINT '2', VARCHAR 'WRITE') + """); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000001.json", 1, + "00000000000000000002.json", 1, + "00000000000000000006.json", 16, + "_last_checkpoint", 17)); + fileSystemFactory.resetOpenCount(); + // multiple ranges + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version = 1 OR version = 2 OR version = 4")) + .matches(""" + VALUES + (BIGINT '1', VARCHAR 'WRITE'), + (BIGINT '2', VARCHAR 'WRITE'), + (BIGINT '4', VARCHAR 'WRITE') + """); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000000.json", 1, + "00000000000000000001.json", 1, + "00000000000000000002.json", 1, + "00000000000000000003.json", 1, + "00000000000000000004.json", 1, + "00000000000000000005.json", 1, + "00000000000000000006.json", 17, + "_last_checkpoint", 17)); + fileSystemFactory.resetOpenCount(); + // none domain + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version IS NULL")) + .returnsEmptyResult(); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000000.json", 1, + "00000000000000000001.json", 1, + "00000000000000000002.json", 1, + "00000000000000000003.json", 1, + "00000000000000000004.json", 1, + "00000000000000000005.json", 1, + "00000000000000000006.json", 17, + "_last_checkpoint", 17)); + } + finally { + assertUpdate("DROP TABLE IF EXISTS test_simple_table_pushdown_version_filter"); + } + } +}