From 0760302dbb028ece7af919185dcd24f87af433ff Mon Sep 17 00:00:00 2001 From: Marius Grama Date: Mon, 20 Feb 2023 21:46:06 +0100 Subject: [PATCH] Push down version filter in the Delta `$history` metadata table --- .../deltalake/DeltaLakeHistoryTable.java | 60 ++++- .../AccessTrackingFileSystemFactory.java | 22 +- .../deltalake/TestDeltaLakeSystemTables.java | 10 +- ...ltaLakeSystemTablesWithAccessTracking.java | 240 ++++++++++++++++++ 4 files changed, 319 insertions(+), 13 deletions(-) create mode 100644 plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTablesWithAccessTracking.java diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java index ea48ee0f0775..e9a3dda6c1ed 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeHistoryTable.java @@ -29,6 +29,8 @@ import io.trino.spi.connector.RecordCursor; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.connector.SystemTable; +import io.trino.spi.predicate.Domain; +import io.trino.spi.predicate.Range; import io.trino.spi.predicate.TupleDomain; import io.trino.spi.type.TimeZoneKey; import io.trino.spi.type.Type; @@ -42,6 +44,7 @@ import java.util.Objects; import java.util.Optional; +import static com.google.common.base.Verify.verify; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.Streams.stream; import static io.trino.spi.type.BigintType.BIGINT; @@ -55,6 +58,7 @@ public class DeltaLakeHistoryTable implements SystemTable { + private static final int VERSION_COLUMN_INDEX = 0; private final SchemaTableName tableName; private final ConnectorTableMetadata tableMetadata; private final List types; @@ -113,14 +117,15 @@ public ConnectorTableMetadata getTableMetadata() @Override public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, ConnectorSession session, TupleDomain constraint) { + VersionRange versionRange = extractVersionRange(constraint); TimeZoneKey timeZoneKey = session.getTimeZoneKey(); TrinoFileSystem fileSystem = fileSystemFactory.create(session); Iterable> records = () -> stream(new DeltaLakeTransactionLogIterator( fileSystem, new Path(metastore.getTableLocation(tableName, session)), - Optional.empty(), - Optional.empty())) + versionRange.startVersion, + versionRange.endVersion)) .flatMap(Collection::stream) .map(DeltaLakeTransactionLogEntry::getCommitInfo) .filter(Objects::nonNull) @@ -163,4 +168,55 @@ private Object toVarcharVarcharMapBlock(Map values) blockBuilder.closeEntry(); return varcharToVarcharMapType.getObject(blockBuilder, 0); } + + private static VersionRange extractVersionRange(TupleDomain constraint) + { + Domain versionDomain = constraint.getDomains() + .map(map -> map.get(VERSION_COLUMN_INDEX)) + .orElse(Domain.all(BIGINT)); + Optional startVersion = Optional.empty(); + Optional endVersion = Optional.empty(); + if (versionDomain.isAll() || versionDomain.isNone()) { + return new VersionRange(startVersion, endVersion); + } + List orderedRanges = versionDomain.getValues().getRanges().getOrderedRanges(); + if (orderedRanges.size() == 1) { + // Opt for a rather pragmatical choice of extracting the version range + // only when dealing with a single range + Range range = orderedRanges.get(0); + if (range.isSingleValue()) { + long version = (long) range.getLowBoundedValue(); + startVersion = Optional.of(version); + endVersion = Optional.of(version); + } + else { + if (!range.isLowUnbounded()) { + long version = (long) range.getLowBoundedValue(); + if (!range.isLowInclusive()) { + version++; + } + startVersion = Optional.of(version); + } + if (!range.isHighUnbounded()) { + long version = (long) range.getHighBoundedValue(); + if (!range.isHighInclusive()) { + version--; + } + endVersion = Optional.of(version); + } + } + } + return new VersionRange(startVersion, endVersion); + } + + private record VersionRange(Optional startVersion, Optional endVersion) + { + @SuppressWarnings("UnusedVariable") // TODO: Remove once https://github.com/google/error-prone/issues/2713 is fixed + private VersionRange + { + requireNonNull(startVersion, "startVersion is null"); + requireNonNull(endVersion, "endVersion is null"); + verify(startVersion.orElse(0L) <= endVersion.orElse(startVersion.orElse(0L)), "startVersion is greater than endVersion"); + } + } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingFileSystemFactory.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingFileSystemFactory.java index f3c8e682b766..2b7da272cf39 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingFileSystemFactory.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/AccessTrackingFileSystemFactory.java @@ -57,6 +57,11 @@ public Map getOpenCount() return map.buildOrThrow(); } + public void resetOpenCount() + { + openedFiles.clear(); + } + private void incrementOpenCount(String path) { openedFiles.add(path.substring(path.lastIndexOf('/') + 1)); @@ -91,37 +96,42 @@ public TrinoInputFile newInputFile(String location, long length) @Override public TrinoOutputFile newOutputFile(String location) { - throw new UnsupportedOperationException(); + return delegate.newOutputFile(location); } @Override public void deleteFile(String location) + throws IOException { - throw new UnsupportedOperationException(); + delegate.deleteFile(location); } @Override public void deleteFiles(Collection locations) + throws IOException { - throw new UnsupportedOperationException(); + delegate.deleteFiles(locations); } @Override public void deleteDirectory(String location) + throws IOException { - throw new UnsupportedOperationException(); + delegate.deleteDirectory(location); } @Override public void renameFile(String source, String target) + throws IOException { - throw new UnsupportedOperationException(); + delegate.renameFile(source, target); } @Override public FileIterator listFiles(String location) + throws IOException { - throw new UnsupportedOperationException(); + return delegate.listFiles(location); } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java index f3e7db1602f9..5d4ca245f96a 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTables.java @@ -132,10 +132,10 @@ public void testHistoryTableWithLogEntriesRemoved() assertThat(query("SELECT version, operation, read_version, isolation_level, is_blind_append FROM \"" + tableName + "$history\"")) .matches(""" - VALUES - (BIGINT '3', VARCHAR 'WRITE', BIGINT '2', VARCHAR 'WriteSerializable', true), - (BIGINT '4', VARCHAR 'DELETE', BIGINT '3', VARCHAR 'WriteSerializable', false), - (BIGINT '5', VARCHAR 'SET TBLPROPERTIES', BIGINT '4', VARCHAR 'WriteSerializable', true) - """); + VALUES + (BIGINT '3', VARCHAR 'WRITE', BIGINT '2', VARCHAR 'WriteSerializable', true), + (BIGINT '4', VARCHAR 'DELETE', BIGINT '3', VARCHAR 'WriteSerializable', false), + (BIGINT '5', VARCHAR 'SET TBLPROPERTIES', BIGINT '4', VARCHAR 'WriteSerializable', true) + """); } } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTablesWithAccessTracking.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTablesWithAccessTracking.java new file mode 100644 index 000000000000..7bb1c587e1cc --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeSystemTablesWithAccessTracking.java @@ -0,0 +1,240 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import com.google.common.collect.ImmutableMap; +import io.trino.Session; +import io.trino.filesystem.hdfs.HdfsFileSystemFactory; +import io.trino.plugin.deltalake.metastore.TestingDeltaLakeMetastoreModule; +import io.trino.plugin.hive.metastore.HiveMetastore; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import org.testng.annotations.AfterClass; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Optional; + +import static com.google.common.io.MoreFiles.deleteRecursively; +import static com.google.common.io.RecursiveDeleteOption.ALLOW_INSECURE; +import static com.google.inject.util.Modules.EMPTY_MODULE; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.trino.plugin.hive.metastore.file.FileHiveMetastore.createTestingFileHiveMetastore; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static org.assertj.core.api.Assertions.assertThat; + +@Test(singleThreaded = true) +public class TestDeltaLakeSystemTablesWithAccessTracking + extends AbstractTestQueryFramework +{ + protected static final String SCHEMA = "test_delta_lake_system_tables_tracking_" + randomNameSuffix(); + + private static final Session TEST_SESSION = testSessionBuilder() + .setCatalog("delta_lake") + .setSchema(SCHEMA) + .build(); + + private File dataDirectory; + private HiveMetastore metastore; + private AccessTrackingFileSystemFactory fileSystemFactory; + + @Override + protected DistributedQueryRunner createQueryRunner() + throws Exception + { + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(TEST_SESSION).build(); + + dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("delta_lake").toFile(); + metastore = createTestingFileHiveMetastore(dataDirectory); + fileSystemFactory = new AccessTrackingFileSystemFactory(new HdfsFileSystemFactory(HDFS_ENVIRONMENT)); + + queryRunner.installPlugin(new TestingDeltaLakePlugin(Optional.of(new TestingDeltaLakeMetastoreModule(metastore)), Optional.of(fileSystemFactory), EMPTY_MODULE)); + queryRunner.createCatalog( + "delta_lake", + "delta_lake", + ImmutableMap.of("delta.enable-non-concurrent-writes", "true")); + + queryRunner.execute("CREATE SCHEMA " + SCHEMA); + return queryRunner; + } + + @AfterClass(alwaysRun = true) + public void cleanup() + throws IOException + { + if (metastore != null) { + metastore.dropDatabase(SCHEMA, false); + deleteRecursively(dataDirectory.toPath(), ALLOW_INSECURE); + } + } + + @Test + public void testHistoryTableVersionFilterPushdown() + { + try { + assertUpdate("CREATE TABLE test_simple_table_pushdown_version_filter (_bigint BIGINT)"); + assertUpdate("INSERT INTO test_simple_table_pushdown_version_filter VALUES 1", 1); + assertUpdate("INSERT INTO test_simple_table_pushdown_version_filter VALUES 2", 1); + assertUpdate("INSERT INTO test_simple_table_pushdown_version_filter VALUES 3", 1); + assertUpdate("INSERT INTO test_simple_table_pushdown_version_filter VALUES 4", 1); + assertQuerySucceeds("ALTER TABLE test_simple_table_pushdown_version_filter EXECUTE OPTIMIZE"); + + fileSystemFactory.resetOpenCount(); + // equal + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version = 0 AND operation = 'CREATE TABLE'")) + .matches("VALUES (BIGINT '0', VARCHAR 'CREATE TABLE')"); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000000.json", 1, + "00000000000000000005.checkpoint.parquet", 1, + "00000000000000000006.json", 16, + "_last_checkpoint", 17)); + fileSystemFactory.resetOpenCount(); + // less than + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version < 1")) + .matches("VALUES (BIGINT '0', VARCHAR 'CREATE TABLE')"); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000000.json", 1, + "00000000000000000006.json", 16, + "_last_checkpoint", 17)); + fileSystemFactory.resetOpenCount(); + // less than value greater than the number of transactions + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version < 10")) + .matches(""" + VALUES + (BIGINT '0', VARCHAR 'CREATE TABLE'), + (BIGINT '1', VARCHAR 'WRITE'), + (BIGINT '2', VARCHAR 'WRITE'), + (BIGINT '3', VARCHAR 'WRITE'), + (BIGINT '4', VARCHAR 'WRITE'), + (BIGINT '5', VARCHAR 'OPTIMIZE') + """); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000000.json", 1, + "00000000000000000001.json", 1, + "00000000000000000002.json", 1, + "00000000000000000003.json", 1, + "00000000000000000004.json", 1, + "00000000000000000005.json", 1, + "00000000000000000006.json", 17, + "_last_checkpoint", 17)); + fileSystemFactory.resetOpenCount(); + // less than or equal + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version <= 1")) + .matches(""" + VALUES + (BIGINT '0', VARCHAR 'CREATE TABLE'), + (BIGINT '1', VARCHAR 'WRITE') + """); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000000.json", 1, + "00000000000000000001.json", 1, + "00000000000000000006.json", 16, + "_last_checkpoint", 17)); + fileSystemFactory.resetOpenCount(); + // greater than + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version > 2")) + .matches(""" + VALUES + (BIGINT '3', VARCHAR 'WRITE'), + (BIGINT '4', VARCHAR 'WRITE'), + (BIGINT '5', VARCHAR 'OPTIMIZE') + """); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000003.json", 1, + "00000000000000000004.json", 1, + "00000000000000000005.json", 1, + "00000000000000000006.json", 17, + "_last_checkpoint", 17)); + fileSystemFactory.resetOpenCount(); + // greater than or equal + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version >= 5")) + .matches("VALUES (BIGINT '5', VARCHAR 'OPTIMIZE')"); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000005.json", 1, + "00000000000000000006.json", 17, + "_last_checkpoint", 17)); + fileSystemFactory.resetOpenCount(); + // between + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version BETWEEN 1 AND 2")) + .matches(""" + VALUES + (BIGINT '1', VARCHAR 'WRITE'), + (BIGINT '2', VARCHAR 'WRITE') + """); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000001.json", 1, + "00000000000000000002.json", 1, + "00000000000000000006.json", 16, + "_last_checkpoint", 17)); + fileSystemFactory.resetOpenCount(); + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version >= 1 AND version <= 2")) + .matches(""" + VALUES + (BIGINT '1', VARCHAR 'WRITE'), + (BIGINT '2', VARCHAR 'WRITE') + """); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000001.json", 1, + "00000000000000000002.json", 1, + "00000000000000000006.json", 16, + "_last_checkpoint", 17)); + fileSystemFactory.resetOpenCount(); + // multiple ranges + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version = 1 OR version = 2 OR version = 4")) + .matches(""" + VALUES + (BIGINT '1', VARCHAR 'WRITE'), + (BIGINT '2', VARCHAR 'WRITE'), + (BIGINT '4', VARCHAR 'WRITE') + """); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000000.json", 1, + "00000000000000000001.json", 1, + "00000000000000000002.json", 1, + "00000000000000000003.json", 1, + "00000000000000000004.json", 1, + "00000000000000000005.json", 1, + "00000000000000000006.json", 17, + "_last_checkpoint", 17)); + fileSystemFactory.resetOpenCount(); + // none domain + assertThat(query("SELECT version, operation FROM \"test_simple_table_pushdown_version_filter$history\" WHERE version IS NULL")) + .returnsEmptyResult(); + assertThat(fileSystemFactory.getOpenCount()).containsExactlyInAnyOrderEntriesOf( + ImmutableMap.of( + "00000000000000000000.json", 1, + "00000000000000000001.json", 1, + "00000000000000000002.json", 1, + "00000000000000000003.json", 1, + "00000000000000000004.json", 1, + "00000000000000000005.json", 1, + "00000000000000000006.json", 17, + "_last_checkpoint", 17)); + } + finally { + assertUpdate("DROP TABLE IF EXISTS test_simple_table_pushdown_version_filter"); + } + } +}