From 1ba82f80c06ad56bcab4ae46b15d2b995b152e12 Mon Sep 17 00:00:00 2001 From: Marcin Rusek Date: Tue, 1 Aug 2023 10:02:37 +0200 Subject: [PATCH] Optional check for query partition filter for Delta --- docs/src/main/sphinx/connector/delta-lake.rst | 5 + .../plugin/deltalake/DeltaLakeConfig.java | 14 + .../plugin/deltalake/DeltaLakeMetadata.java | 42 +- .../deltalake/DeltaLakeSessionProperties.java | 11 + .../deltalake/DeltaLakeTableHandle.java | 14 + .../BaseDeltaLakeConnectorSmokeTest.java | 18 + .../plugin/deltalake/TestDeltaLakeConfig.java | 7 +- .../deltalake/TestDeltaLakeConnectorTest.java | 424 ++++++++++++++++++ ...DeltaLakeRequireQueryPartitionsFilter.java | 94 ++++ 9 files changed, 626 insertions(+), 3 deletions(-) create mode 100644 testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeRequireQueryPartitionsFilter.java diff --git a/docs/src/main/sphinx/connector/delta-lake.rst b/docs/src/main/sphinx/connector/delta-lake.rst index 233fff46c8c8..7acfb3de8a16 100644 --- a/docs/src/main/sphinx/connector/delta-lake.rst +++ b/docs/src/main/sphinx/connector/delta-lake.rst @@ -1158,3 +1158,8 @@ connector. * - ``delta.projection-pushdown-enabled`` - Read only projected fields from row columns while performing ``SELECT`` queries - ``true`` + * - ``delta.query-partition-filter-required`` + - Set to ``true`` to force a query to use a partition filter. You can use + the ``query_partition_filter_required`` catalog session property for + temporary, catalog specific use. + - ``false`` diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java index 3ca2af6a8e7a..9dd03f686b18 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeConfig.java @@ -77,6 +77,7 @@ public class DeltaLakeConfig private boolean legacyCreateTableWithExistingLocationEnabled; private boolean registerTableProcedureEnabled; private boolean projectionPushdownEnabled = true; + private boolean queryPartitionFilterRequired; public Duration getMetadataCacheTtl() { @@ -488,4 +489,17 @@ public DeltaLakeConfig setProjectionPushdownEnabled(boolean projectionPushdownEn this.projectionPushdownEnabled = projectionPushdownEnabled; return this; } + + public boolean isQueryPartitionFilterRequired() + { + return queryPartitionFilterRequired; + } + + @Config("delta.query-partition-filter-required") + @ConfigDescription("Require filter on at least one partition column") + public DeltaLakeConfig setQueryPartitionFilterRequired(boolean queryPartitionFilterRequired) + { + this.queryPartitionFilterRequired = queryPartitionFilterRequired; + return this; + } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java index c8a5f0a1a2f8..ead8ecbdb032 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeMetadata.java @@ -195,6 +195,7 @@ import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isExtendedStatisticsEnabled; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isLegacyCreateTableWithExistingLocationEnabled; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isProjectionPushdownEnabled; +import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isQueryPartitionFilterRequired; import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isTableStatisticsEnabled; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHANGE_DATA_FEED_ENABLED_PROPERTY; import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHECKPOINT_INTERVAL_PROPERTY; @@ -256,6 +257,7 @@ import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY; import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; +import static io.trino.spi.StandardErrorCode.QUERY_REJECTED; import static io.trino.spi.connector.RetryMode.NO_RETRIES; import static io.trino.spi.connector.RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW; import static io.trino.spi.connector.SchemaTableName.schemaTableName; @@ -2516,6 +2518,13 @@ public Optional> applyFilter(C ImmutableMap.Builder enforceableDomains = ImmutableMap.builder(); ImmutableMap.Builder unenforceableDomains = ImmutableMap.builder(); + ImmutableSet.Builder constraintColumns = ImmutableSet.builder(); + // We need additional field to track partition columns used in queries as enforceDomains seem to be not catching + // cases when partition columns is used within complex filter as 'partitionColumn % 2 = 0' + constraint.getPredicateColumns().stream() + .flatMap(Collection::stream) + .map(DeltaLakeColumnHandle.class::cast) + .forEach(constraintColumns::add); for (Entry domainEntry : constraintDomains.entrySet()) { DeltaLakeColumnHandle column = (DeltaLakeColumnHandle) domainEntry.getKey(); if (!partitionColumns.contains(column)) { @@ -2524,6 +2533,7 @@ public Optional> applyFilter(C else { enforceableDomains.put(column, domainEntry.getValue()); } + constraintColumns.add(column); } TupleDomain newEnforcedConstraint = TupleDomain.withColumnDomains(enforceableDomains.buildOrThrow()); @@ -2541,15 +2551,19 @@ public Optional> applyFilter(C tableHandle.getNonPartitionConstraint() .intersect(newUnenforcedConstraint) .simplify(domainCompactionThreshold), + Sets.union(tableHandle.getConstraintColumns(), constraintColumns.build()), tableHandle.getWriteType(), tableHandle.getProjectedColumns(), tableHandle.getUpdatedColumns(), tableHandle.getUpdateRowIdColumns(), Optional.empty(), + false, + Optional.empty(), tableHandle.getReadVersion()); if (tableHandle.getEnforcedPartitionConstraint().equals(newHandle.getEnforcedPartitionConstraint()) && - tableHandle.getNonPartitionConstraint().equals(newHandle.getNonPartitionConstraint())) { + tableHandle.getNonPartitionConstraint().equals(newHandle.getNonPartitionConstraint()) && + tableHandle.getConstraintColumns().equals(newHandle.getConstraintColumns())) { return Optional.empty(); } @@ -2735,6 +2749,32 @@ private static Optional find(Map assignments, Proj return Optional.empty(); } + @Override + public void validateScan(ConnectorSession session, ConnectorTableHandle handle) + { + DeltaLakeTableHandle deltaLakeTableHandle = (DeltaLakeTableHandle) handle; + + if (isQueryPartitionFilterRequired(session)) { + List partitionColumns = deltaLakeTableHandle.getMetadataEntry().getCanonicalPartitionColumns(); + if (!partitionColumns.isEmpty()) { + if (deltaLakeTableHandle.getAnalyzeHandle().isPresent()) { + throw new TrinoException( + QUERY_REJECTED, + "ANALYZE statement can not be performed on partitioned tables because filtering is required on at least one partition. However, the partition filtering check can be disabled with the catalog session property 'query_partition_filter_required'."); + } + Set referencedColumns = + deltaLakeTableHandle.getConstraintColumns().stream() + .map(DeltaLakeColumnHandle::getBaseColumnName) + .collect(toImmutableSet()); + if (Collections.disjoint(referencedColumns, partitionColumns)) { + throw new TrinoException( + QUERY_REJECTED, + format("Filter required on %s for at least one partition column: %s", deltaLakeTableHandle.getSchemaTableName(), String.join(", ", partitionColumns))); + } + } + } + } + @Override public Optional applyTableScanRedirect(ConnectorSession session, ConnectorTableHandle tableHandle) { diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java index 362ca0f5a0d6..d1ecc0e2cbcc 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeSessionProperties.java @@ -69,6 +69,7 @@ public final class DeltaLakeSessionProperties public static final String EXTENDED_STATISTICS_COLLECT_ON_WRITE = "extended_statistics_collect_on_write"; public static final String LEGACY_CREATE_TABLE_WITH_EXISTING_LOCATION_ENABLED = "legacy_create_table_with_existing_location_enabled"; private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled"; + private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required"; private final List> sessionProperties; @@ -200,6 +201,11 @@ public DeltaLakeSessionProperties( PROJECTION_PUSHDOWN_ENABLED, "Read only required fields from a row type", deltaLakeConfig.isProjectionPushdownEnabled(), + false), + booleanProperty( + QUERY_PARTITION_FILTER_REQUIRED, + "Require filter on partition column", + deltaLakeConfig.isQueryPartitionFilterRequired(), false)); } @@ -304,4 +310,9 @@ public static boolean isProjectionPushdownEnabled(ConnectorSession session) { return session.getProperty(PROJECTION_PUSHDOWN_ENABLED, Boolean.class); } + + public static boolean isQueryPartitionFilterRequired(ConnectorSession session) + { + return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class); + } } diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java index fb974c8a3871..5a69dd105107 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeTableHandle.java @@ -16,6 +16,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableSet; import io.airlift.units.DataSize; import io.trino.plugin.deltalake.transactionlog.MetadataEntry; import io.trino.spi.connector.SchemaTableName; @@ -62,6 +63,8 @@ public enum WriteType // OPTIMIZE only. Coordinator-only private final boolean recordScannedFiles; private final Optional maxScannedFileSize; + // Used only for validation when config property delta.query-partition-filter-required is enabled. + private final Set constraintColumns; @JsonCreator public DeltaLakeTableHandle( @@ -87,6 +90,7 @@ public DeltaLakeTableHandle( metadataEntry, enforcedPartitionConstraint, nonPartitionConstraint, + ImmutableSet.of(), writeType, projectedColumns, updatedColumns, @@ -105,6 +109,7 @@ public DeltaLakeTableHandle( MetadataEntry metadataEntry, TupleDomain enforcedPartitionConstraint, TupleDomain nonPartitionConstraint, + Set constraintColumns, Optional writeType, Optional> projectedColumns, Optional> updatedColumns, @@ -131,6 +136,7 @@ public DeltaLakeTableHandle( this.recordScannedFiles = recordScannedFiles; this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null"); this.readVersion = readVersion; + this.constraintColumns = ImmutableSet.copyOf(requireNonNull(constraintColumns, "constraintColumns is null")); } public DeltaLakeTableHandle withProjectedColumns(Set projectedColumns) @@ -143,6 +149,7 @@ public DeltaLakeTableHandle withProjectedColumns(Set proj metadataEntry, enforcedPartitionConstraint, nonPartitionConstraint, + constraintColumns, writeType, Optional.of(projectedColumns), updatedColumns, @@ -163,6 +170,7 @@ public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize max metadataEntry, enforcedPartitionConstraint, nonPartitionConstraint, + constraintColumns, writeType, projectedColumns, updatedColumns, @@ -281,6 +289,12 @@ public Optional getMaxScannedFileSize() return maxScannedFileSize; } + @JsonIgnore + public Set getConstraintColumns() + { + return constraintColumns; + } + @JsonProperty public long getReadVersion() { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java index b3dc44f763c2..18974d218837 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/BaseDeltaLakeConnectorSmokeTest.java @@ -39,6 +39,7 @@ import io.trino.testing.QueryRunner; import io.trino.testing.TestingConnectorBehavior; import io.trino.testing.minio.MinioClient; +import io.trino.testing.sql.TestTable; import io.trino.tpch.TpchTable; import org.intellij.lang.annotations.Language; import org.testng.SkipException; @@ -2133,6 +2134,23 @@ public void testProjectionPushdownMultipleRows() assertUpdate("DROP TABLE " + tableName); } + @Test + public void testPartitionFilterIncluded() + { + Session session = Session.builder(getSession()) + .setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "query_partition_filter_required", "true") + .build(); + + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_no_partition_filter", + "(x varchar, part varchar) WITH (PARTITIONED_BY = ARRAY['part'])", + ImmutableList.of("'a', 'part_a'", "'b', 'part_b'"))) { + assertQueryFails(session, "SELECT * FROM %s WHERE x='a'".formatted(table.getName()), "Filter required on .*" + table.getName() + " for at least one partition column:.*"); + assertQuery(session, "SELECT * FROM %s WHERE part='part_a'".formatted(table.getName()), "VALUES ('a', 'part_a')"); + } + } + private Set getActiveFiles(String tableName) { return getActiveFiles(tableName, getQueryRunner().getDefaultSession()); diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java index 42ca3bd6fcd4..de03744fa7a9 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConfig.java @@ -68,7 +68,8 @@ public void testDefaults() .setUniqueTableLocation(true) .setLegacyCreateTableWithExistingLocationEnabled(false) .setRegisterTableProcedureEnabled(false) - .setProjectionPushdownEnabled(true)); + .setProjectionPushdownEnabled(true) + .setQueryPartitionFilterRequired(false)); } @Test @@ -105,6 +106,7 @@ public void testExplicitPropertyMappings() .put("delta.legacy-create-table-with-existing-location.enabled", "true") .put("delta.register-table-procedure.enabled", "true") .put("delta.projection-pushdown-enabled", "false") + .put("delta.query-partition-filter-required", "true") .buildOrThrow(); DeltaLakeConfig expected = new DeltaLakeConfig() @@ -137,7 +139,8 @@ public void testExplicitPropertyMappings() .setUniqueTableLocation(false) .setLegacyCreateTableWithExistingLocationEnabled(true) .setRegisterTableProcedureEnabled(true) - .setProjectionPushdownEnabled(false); + .setProjectionPushdownEnabled(false) + .setQueryPartitionFilterRequired(true); assertFullMapping(properties, expected); } diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java index 2b18fee5d186..7b4299e81695 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeConnectorTest.java @@ -14,6 +14,7 @@ package io.trino.plugin.deltalake; import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.airlift.units.DataSize; import io.trino.Session; @@ -31,6 +32,7 @@ import io.trino.testing.containers.Minio; import io.trino.testing.minio.MinioClient; import io.trino.testing.sql.TestTable; +import io.trino.testing.sql.TrinoSqlExecutor; import org.intellij.lang.annotations.Language; import org.testng.SkipException; import org.testng.annotations.AfterClass; @@ -40,6 +42,7 @@ import java.time.ZonedDateTime; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.OptionalInt; import java.util.Set; @@ -1853,4 +1856,425 @@ private Set getAllFilesFromCdfDirectory(String tableName) .filter(path -> path.contains("/" + CHANGE_DATA_FOLDER_NAME)) .collect(toImmutableSet()); } + + @Test + public void testPartitionFilterQueryNotDemanded() + { + Map catalogProperties = getSession().getCatalogProperties(getSession().getCatalog().orElseThrow()); + assertThat(catalogProperties).doesNotContainKey("query_partition_filter_required"); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_partition_filter_not_demanded", + "(x varchar, part varchar) WITH (partitioned_by = ARRAY['part'])", + ImmutableList.of("'a', 'part_a'", "'b', 'part_b'"))) { + assertQuery("SELECT * FROM %s WHERE x='a'".formatted(table.getName()), "VALUES('a', 'part_a')"); + assertQuery("SELECT * FROM %s WHERE part='part_a'".formatted(table.getName()), "VALUES('a', 'part_a')"); + } + } + + @Test + public void testQueryWithoutPartitionOnNonPartitionedTableNotDemanded() + { + Session session = sessionWithPartitionFilterRequirement(); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_no_partition_table_", + "(x varchar, part varchar)", + ImmutableList.of("('a', 'part_a')", "('b', 'part_b')"))) { + assertQuery(session, "SELECT * FROM %s WHERE x='a'".formatted(table.getName()), "VALUES('a', 'part_a')"); + assertQuery(session, "SELECT * FROM %s WHERE part='part_a'".formatted(table.getName()), "VALUES('a', 'part_a')"); + } + } + + @Test + public void testQueryWithoutPartitionFilterNotAllowed() + { + Session session = sessionWithPartitionFilterRequirement(); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_no_partition_filter_", + "(x varchar, part varchar) WITH (partitioned_by = ARRAY['part'])", + ImmutableList.of("('a', 'part_a')", "('b', 'part_b')"))) { + assertQueryFails( + session, + "SELECT * FROM %s WHERE x='a'".formatted(table.getName()), + "Filter required on .*" + table.getName() + " for at least one partition column:.*"); + } + } + + @Test + public void testPartitionFilterRemovedByPlanner() + { + Session session = sessionWithPartitionFilterRequirement(); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_partition_filter_removed_", + "(x varchar, part varchar) WITH (partitioned_by = ARRAY['part'])", + ImmutableList.of("('a', 'part_a')", "('b', 'part_b')"))) { + assertQueryFails( + session, + "SELECT x FROM " + table.getName() + " WHERE part IS NOT NULL OR TRUE", + "Filter required on .*" + table.getName() + " for at least one partition column:.*"); + } + } + + @Test + public void testPartitionFilterIncluded() + { + Session session = sessionWithPartitionFilterRequirement(); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_partition_filter_included", + "(x varchar, part integer) WITH (partitioned_by = ARRAY['part'])", + ImmutableList.of("('a', 1)", "('a', 2)", "('a', 3)", "('a', 4)", "('b', 1)", "('b', 2)", "('b', 3)", "('b', 4)"))) { + assertQuery(session, "SELECT * FROM " + table.getName() + " WHERE part = 1", "VALUES ('a', 1), ('b', 1)"); + assertQuery(session, "SELECT count(*) FROM " + table.getName() + " WHERE part < 2", "VALUES 2"); + assertQuery(session, "SELECT count(*) FROM " + table.getName() + " WHERE Part < 2", "VALUES 2"); + assertQuery(session, "SELECT count(*) FROM " + table.getName() + " WHERE PART < 2", "VALUES 2"); + assertQuery(session, "SELECT count(*) FROM " + table.getName() + " WHERE parT < 2", "VALUES 2"); + assertQuery(session, "SELECT count(*) FROM " + table.getName() + " WHERE part % 2 = 0", "VALUES 4"); + assertQuery(session, "SELECT count(*) FROM " + table.getName() + " WHERE part - 2 = 0", "VALUES 2"); + assertQuery(session, "SELECT count(*) FROM " + table.getName() + " WHERE part * 4 = 4", "VALUES 2"); + assertQuery(session, "SELECT count(*) FROM " + table.getName() + " WHERE part % 2 > 0", "VALUES 4"); + assertQuery(session, "SELECT count(*) FROM " + table.getName() + " WHERE part % 2 = 1 and part IS NOT NULL", "VALUES 4"); + assertQuery(session, "SELECT count(*) FROM " + table.getName() + " WHERE part IS NULL", "VALUES 0"); + assertQuery(session, "SELECT count(*) FROM " + table.getName() + " WHERE part = 1 OR x = 'a' ", "VALUES 5"); + assertQuery(session, "SELECT count(*) FROM " + table.getName() + " WHERE part = 1 AND x = 'a' ", "VALUES 1"); + assertQuery(session, "SELECT count(*) FROM " + table.getName() + " WHERE part IS NOT NULL", "VALUES 8"); + assertQuery(session, "SELECT x, count(*) AS COUNT FROM " + table.getName() + " WHERE part > 2 GROUP BY x ", "VALUES ('a', 2), ('b', 2)"); + assertQueryFails(session, "SELECT count(*) FROM " + table.getName() + " WHERE x= 'a'", "Filter required on .*" + table.getName() + " for at least one partition column:.*"); + } + } + + @Test + public void testRequiredPartitionFilterOnJoin() + { + Session session = sessionWithPartitionFilterRequirement(); + + try (TestTable leftTable = new TestTable( + getQueryRunner()::execute, + "test_partition_left_", + "(x varchar, part varchar)", + ImmutableList.of("('a', 'part_a')")); + TestTable rightTable = new TestTable( + new TrinoSqlExecutor(getQueryRunner(), session), + "test_partition_right_", + "(x varchar, part varchar) WITH (partitioned_by = ARRAY['part'])", + ImmutableList.of("('a', 'part_a')"))) { + assertQueryFails( + session, + "SELECT a.x, b.x from %s a JOIN %s b on (a.x = b.x) where a.x = 'a'".formatted(leftTable.getName(), rightTable.getName()), + "Filter required on .*" + rightTable.getName() + " for at least one partition column:.*"); + assertQuery( + session, + "SELECT a.x, b.x from %s a JOIN %s b on (a.part = b.part) where a.part = 'part_a'".formatted(leftTable.getName(), rightTable.getName()), + "VALUES ('a', 'a')"); + } + } + + @Test + public void testRequiredPartitionFilterOnJoinBothTablePartitioned() + { + Session session = sessionWithPartitionFilterRequirement(); + + try (TestTable leftTable = new TestTable( + getQueryRunner()::execute, + "test_partition_inferred_left_", + "(x varchar, part varchar) WITH (partitioned_by = ARRAY['part'])", + ImmutableList.of("('a', 'part_a')")); + TestTable rightTable = new TestTable( + new TrinoSqlExecutor(getQueryRunner(), session), + "test_partition_inferred_right_", + "(x varchar, part varchar) WITH (partitioned_by = ARRAY['part'])", + ImmutableList.of("('a', 'part_a')"))) { + assertQueryFails( + session, + "SELECT a.x, b.x from %s a JOIN %s b on (a.x = b.x) where a.x = 'a'".formatted(leftTable.getName(), rightTable.getName()), + "Filter required on .*" + leftTable.getName() + " for at least one partition column:.*"); + assertQuery( + session, + "SELECT a.x, b.x from %s a JOIN %s b on (a.part = b.part) where a.part = 'part_a'".formatted(leftTable.getName(), rightTable.getName()), + "VALUES ('a', 'a')"); + } + } + + @Test + public void testComplexPartitionPredicateWithCasting() + { + Session session = sessionWithPartitionFilterRequirement(); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_partition_predicate", + "(x varchar, part varchar) WITH (partitioned_by = ARRAY['part'])", + ImmutableList.of("('a', '1')", "('b', '2')"))) { + assertQuery(session, "SELECT * FROM " + table.getName() + " WHERE CAST (part AS integer) = 1", "VALUES ('a', 1)"); + } + } + + @Test + public void testPartitionPredicateInOuterQuery() + { + Session session = sessionWithPartitionFilterRequirement(); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_partition_predicate", + "(x integer, part integer) WITH (partitioned_by = ARRAY['part'])", + ImmutableList.of("(1, 11)", "(2, 22)"))) { + assertQuery(session, "SELECT * FROM (SELECT * FROM " + table.getName() + " WHERE x = 1) WHERE part = 11", "VALUES (1, 11)"); + } + } + + @Test + public void testPartitionPredicateInInnerQuery() + { + Session session = sessionWithPartitionFilterRequirement(); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_partition_predicate", + "(x integer, part integer) WITH (partitioned_by = ARRAY['part'])", + ImmutableList.of("(1, 11)", "(2, 22)"))) { + assertQuery(session, "SELECT * FROM (SELECT * FROM " + table.getName() + " WHERE part = 11) WHERE x = 1", "VALUES (1, 11)"); + } + } + + @Test + public void testPartitionPredicateFilterAndAnalyzeOnPartitionedTable() + { + Session session = sessionWithPartitionFilterRequirement(); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_partition_predicate_analyze_", + "(x integer, part integer) WITH (partitioned_by = ARRAY['part'])", + ImmutableList.of("(1, 11)", "(2, 22)"))) { + String expectedMessageRegExp = "ANALYZE statement can not be performed on partitioned tables because filtering is required on at least one partition." + + " However, the partition filtering check can be disabled with the catalog session property 'query_partition_filter_required'."; + assertQueryFails(session, "ANALYZE " + table.getName(), expectedMessageRegExp); + assertQueryFails(session, "EXPLAIN ANALYZE " + table.getName(), expectedMessageRegExp); + } + } + + @Test + public void testPartitionPredicateFilterAndAnalyzeOnNonPartitionedTable() + { + Session session = sessionWithPartitionFilterRequirement(); + try (TestTable nonPartitioned = new TestTable( + getQueryRunner()::execute, + "test_partition_predicate_analyze_nonpartitioned", + "(a integer, b integer) ", + ImmutableList.of("(1, 11)", "(2, 22)"))) { + assertUpdate(session, "ANALYZE " + nonPartitioned.getName()); + computeActual(session, "EXPLAIN ANALYZE " + nonPartitioned.getName()); + } + } + + @Test + public void testPartitionFilterMultiplePartition() + { + Session session = sessionWithPartitionFilterRequirement(); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_partition_filter_multiple_partition_", + "(x varchar, part1 integer, part2 integer) WITH (partitioned_by = ARRAY['part1', 'part2'])", + ImmutableList.of("('a', 1, 1)", "('a', 1, 2)", "('a', 2, 1)", "('a', 2, 2)", "('b', 1, 1)", "('b', 1, 2)", "('b', 2, 1)", "('b', 2, 2)"))) { + assertQuery(session, "SELECT count(*) FROM %s WHERE part1 = 1".formatted(table.getName()), "VALUES 4"); + assertQuery(session, "SELECT count(*) FROM %s WHERE part2 = 1".formatted(table.getName()), "VALUES 4"); + assertQuery(session, "SELECT count(*) FROM %s WHERE part1 = 1 AND part2 = 2".formatted(table.getName()), "VALUES 2"); + assertQuery(session, "SELECT count(*) FROM %s WHERE part2 IS NOT NULL".formatted(table.getName()), "VALUES 8"); + assertQuery(session, "SELECT count(*) FROM %s WHERE part2 IS NULL".formatted(table.getName()), "VALUES 0"); + assertQuery(session, "SELECT count(*) FROM %s WHERE part2 < 0".formatted(table.getName()), "VALUES 0"); + assertQuery(session, "SELECT count(*) FROM %s WHERE part1 = 1 OR part2 > 1".formatted(table.getName()), "VALUES 6"); + assertQuery(session, "SELECT count(*) FROM %s WHERE part1 = 1 AND part2 > 1".formatted(table.getName()), "VALUES 2"); + assertQuery(session, "SELECT count(*) FROM %s WHERE part1 IS NOT NULL OR part2 > 1".formatted(table.getName()), "VALUES 8"); + assertQuery(session, "SELECT count(*) FROM %s WHERE part1 IS NOT NULL AND part2 > 1".formatted(table.getName()), "VALUES 4"); + assertQuery(session, "SELECT count(*) FROM %s WHERE x = 'a' AND part2 = 2".formatted(table.getName()), "VALUES 2"); + assertQuery(session, "SELECT x, PART1 * 10 + PART2 AS Y FROM %s WHERE x = 'a' AND part2 = 2".formatted(table.getName()), "VALUES ('a', 12), ('a', 22)"); + assertQuery(session, "SELECT x, CAST (PART1 AS varchar) || CAST (PART2 AS varchar) FROM %s WHERE x = 'a' AND part2 = 2".formatted(table.getName()), "VALUES ('a', '12'), ('a', '22')"); + assertQuery(session, "SELECT x, MAX(PART1) FROM %s WHERE part2 = 2 GROUP BY X".formatted(table.getName()), "VALUES ('a', 2), ('b', 2)"); + assertQuery(session, "SELECT x, reduce_agg(part1, 0, (a, b) -> a + b, (a, b) -> a + b) FROM " + table.getName() + " WHERE part2 > 1 GROUP BY X", "VALUES ('a', 3), ('b', 3)"); + String expectedMessageRegExp = "Filter required on .*" + table.getName() + " for at least one partition column:.*"; + assertQueryFails(session, "SELECT X, CAST (PART1 AS varchar) || CAST (PART2 AS varchar) FROM %s WHERE x = 'a'".formatted(table.getName()), expectedMessageRegExp); + assertQueryFails(session, "SELECT count(*) FROM %s WHERE x='a'".formatted(table.getName()), expectedMessageRegExp); + } + } + + @Test + public void testPartitionFilterRequiredAndOptimize() + { + Session session = sessionWithPartitionFilterRequirement(); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_partition_filter_optimize", + "(part integer, name varchar(50)) WITH (partitioned_by = ARRAY['part'])", + ImmutableList.of("(1, 'Bob')", "(2, 'Alice')"))) { + assertUpdate(session, "ALTER TABLE " + table.getName() + " ADD COLUMN last_name varchar(50)"); + assertUpdate(session, "INSERT INTO " + table.getName() + " SELECT 3, 'John', 'Doe'", 1); + + assertQuery(session, + "SELECT part, name, last_name FROM " + table.getName() + " WHERE part < 4", + "VALUES (1, 'Bob', NULL), (2, 'Alice', NULL), (3, 'John', 'Doe')"); + + Set beforeActiveFiles = getActiveFiles(table.getName()); + assertQueryFails(session, "ALTER TABLE " + table.getName() + " EXECUTE OPTIMIZE", "Filter required on .*" + table.getName() + " for at least one partition column:.*"); + computeActual(session, "ALTER TABLE " + table.getName() + " EXECUTE OPTIMIZE WHERE part=1"); + + assertThat(beforeActiveFiles).isNotEqualTo(getActiveFiles(table.getName())); + assertQuery(session, + "SELECT part, name, last_name FROM " + table.getName() + " WHERE part < 4", + "VALUES (1, 'Bob', NULL), (2, 'Alice', NULL), (3, 'John', 'Doe')"); + } + } + + @Test + public void testPartitionFilterEnabledAndOptimizeForNonPartitionedTable() + { + Session session = sessionWithPartitionFilterRequirement(); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_partition_filter_nonpartitioned_optimize", + "(part integer, name varchar(50))", + ImmutableList.of("(1, 'Bob')", "(2, 'Alice')"))) { + assertUpdate(session, "ALTER TABLE " + table.getName() + " ADD COLUMN last_name varchar(50)"); + assertUpdate(session, "INSERT INTO " + table.getName() + " SELECT 3, 'John', 'Doe'", 1); + + assertQuery(session, + "SELECT part, name, last_name FROM " + table.getName() + " WHERE part < 4", + "VALUES (1, 'Bob', NULL), (2, 'Alice', NULL), (3, 'John', 'Doe')"); + + Set beforeActiveFiles = getActiveFiles(table.getName()); + computeActual(session, "ALTER TABLE " + table.getName() + " EXECUTE OPTIMIZE (file_size_threshold => '10kB')"); + + assertThat(beforeActiveFiles).isNotEqualTo(getActiveFiles(table.getName())); + assertQuery(session, + "SELECT part, name, last_name FROM " + table.getName() + " WHERE part < 4", + "VALUES (1, 'Bob', NULL), (2, 'Alice', NULL), (3, 'John', 'Doe')"); + } + } + + @Test + public void testPartitionFilterRequiredAndWriteOperation() + { + Session session = sessionWithPartitionFilterRequirement(); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_partition_filter_table_changes", + "(x integer, part integer) WITH (partitioned_by = ARRAY['part'], change_data_feed_enabled = true)", + ImmutableList.of("(1, 11)", "(2, 22)", "(3, 33)"))) { + @Language("RegExp") + String expectedMessageRegExp = "Filter required on test_schema\\." + table.getName() + " for at least one partition column: part"; + + assertQueryFails(session, "UPDATE " + table.getName() + " SET x = 10 WHERE x = 1", expectedMessageRegExp); + assertUpdate(session, "UPDATE " + table.getName() + " SET x = 20 WHERE part = 22", 1); + + assertQueryFails(session, "MERGE INTO " + table.getName() + " t " + + "USING (SELECT * FROM (VALUES (3, 99), (4,44))) AS s(x, part) " + + "ON t.x = s.x " + + "WHEN MATCHED THEN DELETE ", expectedMessageRegExp); + assertUpdate(session, "MERGE INTO " + table.getName() + " t " + + "USING (SELECT * FROM (VALUES (2, 22), (4 , 44))) AS s(x, part) " + + "ON (t.part = s.part) " + + "WHEN MATCHED THEN UPDATE " + + " SET x = t.x + s.x, part = t.part ", 1); + + assertQueryFails(session, "MERGE INTO " + table.getName() + " t " + + "USING (SELECT * FROM (VALUES (4,44))) AS s(x, part) " + + "ON t.x = s.x " + + "WHEN NOT MATCHED THEN INSERT (x, part) VALUES(s.x, s.part) ", expectedMessageRegExp); + assertUpdate(session, "MERGE INTO " + table.getName() + " t " + + "USING (SELECT * FROM (VALUES (4, 44))) AS s(x, part) " + + "ON (t.part = s.part) " + + "WHEN NOT MATCHED THEN INSERT (x, part) VALUES(s.x, s.part) ", 1); + + assertQueryFails(session, "DELETE FROM " + table.getName() + " WHERE x = 3", expectedMessageRegExp); + assertUpdate(session, "DELETE FROM " + table.getName() + " WHERE part = 33 and x = 3", 1); + } + } + + @Test + public void testPartitionFilterRequiredAndTableChanges() + { + Session session = sessionWithPartitionFilterRequirement(); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_partition_filter_table_changes", + "(x integer, part integer) WITH (partitioned_by = ARRAY['part'], change_data_feed_enabled = true)", + ImmutableList.of("(1, 11)", "(2, 22)", "(3, 33)"))) { + @Language("RegExp") + String expectedMessageRegExp = "Filter required on test_schema\\." + table.getName() + " for at least one partition column: part"; + + assertQueryFails(session, "UPDATE " + table.getName() + " SET x = 10 WHERE x = 1", expectedMessageRegExp); + assertUpdate(session, "UPDATE " + table.getName() + " SET x = 20 WHERE part = 22", 1); + // TODO (https://github.com/trinodb/trino/issues/18498) Check for partition filter for table_changes when the following issue will be completed https://github.com/trinodb/trino/pull/17928 + assertTableChangesQuery("SELECT * FROM TABLE(system.table_changes('test_schema', '" + table.getName() + "'))", + """ + VALUES + (1, 11, 'insert', BIGINT '1'), + (2, 22, 'insert', BIGINT '2'), + (3, 33, 'insert', BIGINT '3'), + (2, 22, 'update_preimage', BIGINT '4'), + (20, 22, 'update_postimage', BIGINT '4') + """); + + assertQueryFails(session, "DELETE FROM " + table.getName() + " WHERE x = 3", expectedMessageRegExp); + assertUpdate(session, "DELETE FROM " + table.getName() + " WHERE part = 33 and x = 3", 1); + assertTableChangesQuery("SELECT * FROM TABLE(system.table_changes('test_schema', '" + table.getName() + "', 4))", + """ + VALUES + (3, 33, 'delete', BIGINT '5') + """); + + assertTableChangesQuery("SELECT * FROM TABLE(system.table_changes('test_schema', '" + table.getName() + "')) ORDER BY _commit_version, _change_type, part", + """ + VALUES + (1, 11, 'insert', BIGINT '1'), + (2, 22, 'insert', BIGINT '2'), + (3, 33, 'insert', BIGINT '3'), + (2, 22, 'update_preimage', BIGINT '4'), + (20, 22, 'update_postimage', BIGINT '4'), + (3, 33, 'delete', BIGINT '5') + """); + } + } + + @Test + public void testPartitionFilterRequiredAndHistoryTable() + { + Session session = sessionWithPartitionFilterRequirement(); + try (TestTable table = new TestTable( + getQueryRunner()::execute, + "test_partition_filter_table_changes", + "(x integer, part integer) WITH (partitioned_by = ARRAY['part'], change_data_feed_enabled = true)", + ImmutableList.of("(1, 11)", "(2, 22)", "(3, 33)"))) { + @Language("RegExp") + String expectedMessageRegExp = "Filter required on test_schema\\." + table.getName() + " for at least one partition column: part"; + + assertQuery("SELECT version, operation, read_version FROM \"" + table.getName() + "$history\"", + """ + VALUES + (0, 'CREATE TABLE', 0), + (1, 'WRITE', 0), + (2, 'WRITE', 1), + (3, 'WRITE', 2) + """); + + assertQueryFails(session, "UPDATE " + table.getName() + " SET x = 10 WHERE x = 1", expectedMessageRegExp); + assertUpdate(session, "UPDATE " + table.getName() + " SET x = 20 WHERE part = 22", 1); + + assertQuery("SELECT version, operation, read_version FROM \"" + table.getName() + "$history\"", + """ + VALUES + (0, 'CREATE TABLE', 0), + (1, 'WRITE', 0), + (2, 'WRITE', 1), + (3, 'WRITE', 2), + (4, 'MERGE', 3) + """); + } + } + + private Session sessionWithPartitionFilterRequirement() + { + return Session.builder(getSession()) + .setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "query_partition_filter_required", "true") + .build(); + } } diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeRequireQueryPartitionsFilter.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeRequireQueryPartitionsFilter.java new file mode 100644 index 000000000000..673382011edc --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeRequireQueryPartitionsFilter.java @@ -0,0 +1,94 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.deltalake; + +import org.testng.annotations.Test; + +import static io.trino.tempto.assertions.QueryAssert.Row.row; +import static io.trino.tempto.assertions.QueryAssert.assertQueryFailure; +import static io.trino.testing.TestingNames.randomNameSuffix; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_DATABRICKS; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS; +import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.dropDeltaTableWithRetry; +import static io.trino.tests.product.deltalake.util.DeltaLakeTestUtils.getColumnNamesOnDelta; +import static io.trino.tests.product.utils.QueryExecutors.onDelta; +import static io.trino.tests.product.utils.QueryExecutors.onTrino; +import static java.lang.String.format; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestDeltaLakeRequireQueryPartitionsFilter + extends BaseTestDeltaLakeS3Storage +{ + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) + public void testRequiresQueryPartitionFilter() + { + String tableName = "test_require_partition_filter_" + randomNameSuffix(); + + onDelta().executeQuery("" + + "CREATE TABLE default." + tableName + " " + + "USING DELTA " + + "PARTITIONED BY (part) " + + "LOCATION 's3://" + bucketName + "/databricks-compatibility-test-" + tableName + "' " + + "AS VALUES (1, 11), (2, 22) AS data(x, part)"); + + try { + assertThat(onTrino().executeQuery("SELECT * FROM " + tableName)).containsOnly(row(1, 11), row(2, 22)); + assertThat(onDelta().executeQuery("SELECT * FROM " + tableName)).containsOnly(row(1, 11), row(2, 22)); + + onTrino().executeQuery("SET SESSION delta.query_partition_filter_required = true"); + + assertQueryFailure(() -> onTrino().executeQuery("SELECT COUNT(*) FROM " + tableName)) + .hasMessageMatching(format("Query failed \\(#\\w+\\): Filter required on default\\.%s for at least one partition column: part", tableName)); + assertThat(onDelta().executeQuery("SELECT * FROM " + tableName)).containsOnly(row(1, 11), row(2, 22)); + + assertThat(onTrino().executeQuery(format("SELECT * FROM %s WHERE part = 11", tableName))).containsOnly(row(1, 11)); + assertThat(onDelta().executeQuery(format("SELECT * FROM %s WHERE part = 11", tableName))).containsOnly(row(1, 11)); + } + finally { + dropDeltaTableWithRetry(tableName); + } + } + + @Test(groups = {DELTA_LAKE_DATABRICKS, DELTA_LAKE_OSS, PROFILE_SPECIFIC_TESTS}) + public void testRequiresQueryPartitionFilterWithUppercaseColumnName() + { + String tableName = "test_require_partition_filter_" + randomNameSuffix(); + + onDelta().executeQuery(""" + CREATE TABLE default.%s + (X integer, PART integer) + USING DELTA PARTITIONED BY (PART) + LOCATION 's3://%s/databricks-compatibility-test-%s' + """.formatted(tableName, bucketName, tableName)); + + try { + assertThat(getColumnNamesOnDelta("default", tableName)).containsExactly("X", "PART"); + + onDelta().executeQuery("INSERT INTO default." + tableName + " VALUES (1, 11), (2, 22)"); + + assertThat(onTrino().executeQuery("SELECT * FROM " + tableName)).containsOnly(row(1, 11), row(2, 22)); + assertThat(onDelta().executeQuery("SELECT * FROM " + tableName)).containsOnly(row(1, 11), row(2, 22)); + + onTrino().executeQuery("SET SESSION delta.query_partition_filter_required = true"); + + assertThat(onTrino().executeQuery(format("SELECT * FROM %s WHERE \"part\" = 11", tableName))).containsOnly(row(1, 11)); + assertThat(onTrino().executeQuery(format("SELECT * FROM %s WHERE \"PART\" = 11", tableName))).containsOnly(row(1, 11)); + assertThat(onTrino().executeQuery(format("SELECT * FROM %s WHERE \"Part\" = 11", tableName))).containsOnly(row(1, 11)); + } + finally { + dropDeltaTableWithRetry(tableName); + } + } +}