From 1afaa520693832b9d762f87d679b64d34f68de2f Mon Sep 17 00:00:00 2001 From: Vikash Kumar Date: Tue, 21 Nov 2023 18:20:08 +0530 Subject: [PATCH] Optional check for query partition filter for Hudi --- docs/src/main/sphinx/connector/hudi.md | 9 + .../java/io/trino/plugin/hudi/HudiConfig.java | 14 ++ .../io/trino/plugin/hudi/HudiMetadata.java | 52 ++++- .../plugin/hudi/HudiSessionProperties.java | 11 + .../io/trino/plugin/hudi/HudiTableHandle.java | 40 ++++ .../io/trino/plugin/hudi/TestHudiConfig.java | 7 +- .../trino/plugin/hudi/TestHudiSmokeTest.java | 198 ++++++++++++++++++ 7 files changed, 326 insertions(+), 5 deletions(-) diff --git a/docs/src/main/sphinx/connector/hudi.md b/docs/src/main/sphinx/connector/hudi.md index 8289e6fb6eb1..2f8be98a3f3d 100644 --- a/docs/src/main/sphinx/connector/hudi.md +++ b/docs/src/main/sphinx/connector/hudi.md @@ -82,6 +82,15 @@ Additionally, following configuration properties can be set depending on the use - Maximum number of metastore data objects per transaction in the Hive metastore cache. - `2000` +* - `hudi.query-partition-filter-required` + - Set to `true` to force a query to use a partition column in the filter condition. + The equivalent catalog session property is `query_partition_filter_required`. + Enabling this property causes query failures if the partition column used + in the filter condition doesn't effectively reduce the number of data files read. + Example: Complex filter expressions such as `id = 1 OR part_key = '100'` + or `CAST(part_key AS INTEGER) % 2 = 0` are not recognized as partition filters, + and queries using such expressions fail if the property is set to `true`. + - `false` ::: diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java index c7fd2f84e4bb..306287f9ff9f 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiConfig.java @@ -50,6 +50,7 @@ public class HudiConfig private int splitLoaderParallelism = 4; private int splitGeneratorParallelism = 4; private long perTransactionMetastoreCacheMaximumSize = 2000; + private boolean queryPartitionFilterRequired; public List getColumnsToHide() { @@ -193,4 +194,17 @@ public HudiConfig setPerTransactionMetastoreCacheMaximumSize(long perTransaction this.perTransactionMetastoreCacheMaximumSize = perTransactionMetastoreCacheMaximumSize; return this; } + + @Config("hudi.query-partition-filter-required") + @ConfigDescription("Require a filter on at least one partition column") + public HudiConfig setQueryPartitionFilterRequired(boolean queryPartitionFilterRequired) + { + this.queryPartitionFilterRequired = queryPartitionFilterRequired; + return this; + } + + public boolean isQueryPartitionFilterRequired() + { + return queryPartitionFilterRequired; + } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java index 152ca336bd8b..e732474cae63 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java @@ -45,23 +45,29 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.function.Function; +import java.util.stream.Stream; import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT; import static io.trino.plugin.hive.HiveTimestampPrecision.NANOSECONDS; import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter; +import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles; import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles; import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema; import static io.trino.plugin.hive.util.HiveUtil.isHudiTable; import static io.trino.plugin.hudi.HudiErrorCode.HUDI_BAD_DATA; import static io.trino.plugin.hudi.HudiSessionProperties.getColumnsToHide; +import static io.trino.plugin.hudi.HudiSessionProperties.isQueryPartitionFilterRequired; import static io.trino.plugin.hudi.HudiTableProperties.LOCATION_PROPERTY; import static io.trino.plugin.hudi.HudiTableProperties.PARTITIONED_BY_PROPERTY; import static io.trino.plugin.hudi.HudiUtil.hudiMetadataExists; import static io.trino.plugin.hudi.model.HudiTableType.COPY_ON_WRITE; +import static io.trino.spi.StandardErrorCode.QUERY_REJECTED; import static io.trino.spi.StandardErrorCode.UNSUPPORTED_TABLE_TYPE; import static io.trino.spi.connector.SchemaTableName.schemaTableName; import static java.lang.String.format; @@ -114,6 +120,7 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName.getTableName(), table.get().getStorage().getLocation(), COPY_ON_WRITE, + getPartitionKeyColumnHandles(table.get(), typeManager), TupleDomain.all(), TupleDomain.all()); } @@ -162,12 +169,30 @@ public Optional> applyFilter(C { HudiTableHandle handle = (HudiTableHandle) tableHandle; HudiPredicates predicates = HudiPredicates.from(constraint.getSummary()); + TupleDomain regularColumnPredicates = predicates.getRegularColumnPredicates(); + TupleDomain partitionColumnPredicates = predicates.getPartitionColumnPredicates(); + + // TODO Since the constraint#predicate isn't utilized during split generation. So, + // Let's not add constraint#predicateColumns to newConstraintColumns. + Set newConstraintColumns = Stream.concat( + Stream.concat( + regularColumnPredicates.getDomains().stream() + .map(Map::keySet) + .flatMap(Collection::stream), + partitionColumnPredicates.getDomains().stream() + .map(Map::keySet) + .flatMap(Collection::stream)), + handle.getConstraintColumns().stream()) + .collect(toImmutableSet()); + HudiTableHandle newHudiTableHandle = handle.applyPredicates( - predicates.getPartitionColumnPredicates(), - predicates.getRegularColumnPredicates()); + newConstraintColumns, + partitionColumnPredicates, + regularColumnPredicates); if (handle.getPartitionPredicates().equals(newHudiTableHandle.getPartitionPredicates()) - && handle.getRegularPredicates().equals(newHudiTableHandle.getRegularPredicates())) { + && handle.getRegularPredicates().equals(newHudiTableHandle.getRegularPredicates()) + && handle.getConstraintColumns().equals(newHudiTableHandle.getConstraintColumns())) { return Optional.empty(); } @@ -224,6 +249,27 @@ public Iterator streamTableColumns(ConnectorSession sessio .iterator(); } + @Override + public void validateScan(ConnectorSession session, ConnectorTableHandle handle) + { + HudiTableHandle hudiTableHandle = (HudiTableHandle) handle; + if (isQueryPartitionFilterRequired(session)) { + if (!hudiTableHandle.getPartitionColumns().isEmpty()) { + Set partitionColumns = hudiTableHandle.getPartitionColumns().stream() + .map(HiveColumnHandle::getName) + .collect(toImmutableSet()); + Set constraintColumns = hudiTableHandle.getConstraintColumns().stream() + .map(HiveColumnHandle::getBaseColumnName) + .collect(toImmutableSet()); + if (Collections.disjoint(constraintColumns, partitionColumns)) { + throw new TrinoException( + QUERY_REJECTED, + format("Filter required on %s for at least one of the partition columns: %s", hudiTableHandle.getSchemaTableName(), String.join(", ", partitionColumns))); + } + } + } + } + HiveMetastore getMetastore() { return metastore; diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java index ede43ec3386b..f7946ff3ef9f 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiSessionProperties.java @@ -50,6 +50,7 @@ public class HudiSessionProperties private static final String MAX_SPLITS_PER_SECOND = "max_splits_per_second"; private static final String MAX_OUTSTANDING_SPLITS = "max_outstanding_splits"; private static final String SPLIT_GENERATOR_PARALLELISM = "split_generator_parallelism"; + private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required"; private final List> sessionProperties; @@ -113,6 +114,11 @@ public HudiSessionProperties(HudiConfig hudiConfig, ParquetReaderConfig parquetR SPLIT_GENERATOR_PARALLELISM, "Number of threads to generate splits from partitions", hudiConfig.getSplitGeneratorParallelism(), + false), + booleanProperty( + QUERY_PARTITION_FILTER_REQUIRED, + "Require a filter on at least one partition column", + hudiConfig.isQueryPartitionFilterRequired(), false)); } @@ -167,4 +173,9 @@ public static int getSplitGeneratorParallelism(ConnectorSession session) { return session.getProperty(SPLIT_GENERATOR_PARALLELISM, Integer.class); } + + public static boolean isQueryPartitionFilterRequired(ConnectorSession session) + { + return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class); + } } diff --git a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java index 0da9f2d897a7..9101deb4de0b 100644 --- a/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java +++ b/plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java @@ -14,13 +14,18 @@ package io.trino.plugin.hudi; import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableSet; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hudi.model.HudiTableType; import io.trino.spi.connector.ConnectorTableHandle; import io.trino.spi.connector.SchemaTableName; import io.trino.spi.predicate.TupleDomain; +import java.util.List; +import java.util.Set; + import static io.trino.spi.connector.SchemaTableName.schemaTableName; import static java.util.Objects.requireNonNull; @@ -31,6 +36,9 @@ public class HudiTableHandle private final String tableName; private final String basePath; private final HudiTableType tableType; + private final List partitionColumns; + // Used only for validation when config property hudi.query-partition-filter-required is enabled + private final Set constraintColumns; private final TupleDomain partitionPredicates; private final TupleDomain regularPredicates; @@ -40,13 +48,29 @@ public HudiTableHandle( @JsonProperty("tableName") String tableName, @JsonProperty("basePath") String basePath, @JsonProperty("tableType") HudiTableType tableType, + @JsonProperty("partitionColumns") List partitionColumns, @JsonProperty("partitionPredicates") TupleDomain partitionPredicates, @JsonProperty("regularPredicates") TupleDomain regularPredicates) + { + this(schemaName, tableName, basePath, tableType, partitionColumns, ImmutableSet.of(), partitionPredicates, regularPredicates); + } + + public HudiTableHandle( + String schemaName, + String tableName, + String basePath, + HudiTableType tableType, + List partitionColumns, + Set constraintColumns, + TupleDomain partitionPredicates, + TupleDomain regularPredicates) { this.schemaName = requireNonNull(schemaName, "schemaName is null"); this.tableName = requireNonNull(tableName, "tableName is null"); this.basePath = requireNonNull(basePath, "basePath is null"); this.tableType = requireNonNull(tableType, "tableType is null"); + this.partitionColumns = requireNonNull(partitionColumns, "partitionColumns is null"); + this.constraintColumns = requireNonNull(constraintColumns, "constraintColumns is null"); this.partitionPredicates = requireNonNull(partitionPredicates, "partitionPredicates is null"); this.regularPredicates = requireNonNull(regularPredicates, "regularPredicates is null"); } @@ -81,6 +105,19 @@ public TupleDomain getPartitionPredicates() return partitionPredicates; } + @JsonProperty + public List getPartitionColumns() + { + return partitionColumns; + } + + // do not serialize constraint columns as they are not needed on workers + @JsonIgnore + public Set getConstraintColumns() + { + return constraintColumns; + } + @JsonProperty public TupleDomain getRegularPredicates() { @@ -93,6 +130,7 @@ public SchemaTableName getSchemaTableName() } HudiTableHandle applyPredicates( + Set constraintColumns, TupleDomain partitionTupleDomain, TupleDomain regularTupleDomain) { @@ -101,6 +139,8 @@ HudiTableHandle applyPredicates( tableName, basePath, tableType, + partitionColumns, + constraintColumns, partitionPredicates.intersect(partitionTupleDomain), regularPredicates.intersect(regularTupleDomain)); } diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiConfig.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiConfig.java index 2aaed93bcad8..719ef64bce1a 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiConfig.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiConfig.java @@ -39,7 +39,8 @@ public void testDefaults() .setMaxOutstandingSplits(1000) .setSplitLoaderParallelism(4) .setSplitGeneratorParallelism(4) - .setPerTransactionMetastoreCacheMaximumSize(2000)); + .setPerTransactionMetastoreCacheMaximumSize(2000) + .setQueryPartitionFilterRequired(false)); } @Test @@ -56,6 +57,7 @@ public void testExplicitPropertyMappings() .put("hudi.split-loader-parallelism", "16") .put("hudi.split-generator-parallelism", "32") .put("hudi.per-transaction-metastore-cache-maximum-size", "1000") + .put("hudi.query-partition-filter-required", "true") .buildOrThrow(); HudiConfig expected = new HudiConfig() @@ -68,7 +70,8 @@ public void testExplicitPropertyMappings() .setMaxOutstandingSplits(100) .setSplitLoaderParallelism(16) .setSplitGeneratorParallelism(32) - .setPerTransactionMetastoreCacheMaximumSize(1000); + .setPerTransactionMetastoreCacheMaximumSize(1000) + .setQueryPartitionFilterRequired(true); assertFullMapping(properties, expected); } diff --git a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java index 565cf9d1fed6..6313e921cdf2 100644 --- a/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java +++ b/plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java @@ -14,6 +14,7 @@ package io.trino.plugin.hudi; import com.google.common.collect.ImmutableMap; +import io.trino.Session; import io.trino.filesystem.Location; import io.trino.filesystem.TrinoFileSystemFactory; import io.trino.filesystem.TrinoInputFile; @@ -21,6 +22,7 @@ import io.trino.spi.security.ConnectorIdentity; import io.trino.testing.AbstractTestQueryFramework; import io.trino.testing.QueryRunner; +import org.intellij.lang.annotations.Language; import org.junit.jupiter.api.Test; import java.time.ZonedDateTime; @@ -168,6 +170,202 @@ public void testPartitionColumn() assertQueryFails("SELECT \"$partition\" FROM " + HUDI_NON_PART_COW, ".* Column '\\$partition' cannot be resolved"); } + @Test + public void testPartitionFilterRequired() + { + Session session = withPartitionFilterRequired(getSession()); + + assertQueryFails( + session, + "SELECT * FROM " + HUDI_COW_PT_TBL, + "Filter required on tests." + HUDI_COW_PT_TBL.getTableName() + " for at least one of the partition columns: dt, hh"); + } + + @Test + public void testPartitionFilterRequiredPredicateOnNonPartitionColumn() + { + Session session = withPartitionFilterRequired(getSession()); + + assertQueryFails( + session, + "SELECT * FROM " + HUDI_COW_PT_TBL + " WHERE id = 1", + "Filter required on tests." + HUDI_COW_PT_TBL.getTableName() + " for at least one of the partition columns: dt, hh"); + } + + @Test + public void testPartitionFilterRequiredNestedQueryWithInnerPartitionPredicate() + { + Session session = withPartitionFilterRequired(getSession()); + + assertQuery(session, "SELECT name FROM (SELECT * FROM " + HUDI_COW_PT_TBL + " WHERE dt = '2021-12-09') WHERE id = 1", "VALUES 'a1'"); + } + + @Test + public void testPartitionFilterRequiredNestedQueryWithOuterPartitionPredicate() + { + Session session = withPartitionFilterRequired(getSession()); + + assertQuery(session, "SELECT name FROM (SELECT * FROM " + HUDI_COW_PT_TBL + " WHERE id = 1) WHERE dt = '2021-12-09'", "VALUES 'a1'"); + } + + @Test + public void testPartitionFilterRequiredNestedWithIsNotNullFilter() + { + Session session = withPartitionFilterRequired(getSession()); + + assertQuery(session, "SELECT name FROM " + HUDI_COW_PT_TBL + " WHERE dt IS NOT null", "VALUES 'a1', 'a2'"); + } + + @Test + public void testPartitionFilterRequiredFilterRemovedByPlanner() + { + Session session = withPartitionFilterRequired(getSession()); + + assertQueryFails( + session, + "SELECT id FROM " + HUDI_COW_PT_TBL + " WHERE dt IS NOT null OR true", + "Filter required on tests." + HUDI_COW_PT_TBL.getTableName() + " for at least one of the partition columns: dt, hh"); + } + + @Test + public void testPartitionFilterRequiredOnJoin() + { + Session session = withPartitionFilterRequired(getSession()); + @Language("RegExp") String errorMessage = "Filter required on tests." + HUDI_COW_PT_TBL.getTableName() + " for at least one of the partition columns: dt, hh"; + + // ON with partition column + assertQueryFails( + session, + "SELECT t1.name, t2.name FROM " + HUDI_COW_PT_TBL + " t1 JOIN " + HUDI_NON_PART_COW + " t2 ON (t1.dt = t2.dt)", + errorMessage); + // ON with partition column and WHERE with same left table's partition column + assertQuery( + session, + "SELECT t1.name, t2.name FROM " + HUDI_COW_PT_TBL + " t1 JOIN " + HUDI_NON_PART_COW + " t2 ON (t1.dt = t2.dt) WHERE t1.dt = '2021-12-09'", + "VALUES ('a1', 'a1'), ('a2', 'a2'), ('a1', 'a2'), ('a2', 'a1')"); + // ON with partition column and WHERE with same right table's regular column + assertQuery( + session, + "SELECT t1.name, t2.name FROM " + HUDI_COW_PT_TBL + " t1 JOIN " + HUDI_NON_PART_COW + " t2 ON (t1.dt = t2.dt) WHERE t2.dt = '2021-12-09'", + "VALUES ('a1', 'a1'), ('a2', 'a2'), ('a1', 'a2'), ('a2', 'a1')"); + // ON with partition column and WHERE with different left table's partition column + assertQuery( + session, + "SELECT t1.name, t2.name FROM " + HUDI_COW_PT_TBL + " t1 JOIN " + HUDI_NON_PART_COW + " t2 ON (t1.dt = t2.dt) WHERE t1.hh = '10'", + "VALUES ('a1', 'a1'), ('a1', 'a2')"); + // ON with partition column and WHERE with different regular column + assertQueryFails( + session, + "SELECT t1.name, t2.name FROM " + HUDI_COW_PT_TBL + " t1 JOIN " + HUDI_NON_PART_COW + " t2 ON (t1.dt = t2.dt) WHERE t2.hh = '10'", + errorMessage); + // ON with partition column and WHERE with regular column + assertQueryFails( + session, + "SELECT t1.name, t2.name FROM " + HUDI_COW_PT_TBL + " t1 JOIN " + HUDI_NON_PART_COW + " t2 ON (t1.dt = t2.dt) WHERE t1.id = 1", + errorMessage); + + // ON with regular column + assertQueryFails( + session, + "SELECT t1.name, t2.name FROM " + HUDI_COW_PT_TBL + " t1 JOIN " + HUDI_NON_PART_COW + " t2 ON (t1.id = t2.id)", + errorMessage); + // ON with regular column and WHERE with left table's partition column + assertQuery( + session, + "SELECT t1.name, t2.name FROM " + HUDI_COW_PT_TBL + " t1 JOIN " + HUDI_NON_PART_COW + " t2 ON (t1.id = t2.id) WHERE t1.dt = '2021-12-09'", + "VALUES ('a1', 'a1'), ('a2', 'a2')"); + // ON with partition column and WHERE with right table's regular column + assertQueryFails( + session, + "SELECT t1.name, t2.name FROM " + HUDI_COW_PT_TBL + " t1 JOIN " + HUDI_NON_PART_COW + " t2 ON (t1.dt = t2.dt) WHERE t2.id = 1", + errorMessage); + } + + @Test + public void testPartitionFilterRequiredOnJoinBothTablePartitioned() + { + Session session = withPartitionFilterRequired(getSession()); + + // ON with partition column + assertQueryFails( + session, + "SELECT t1.name, t2.name FROM " + HUDI_COW_PT_TBL + " t1 JOIN " + HUDI_COW_PT_TBL + " t2 ON (t1.dt = t2.dt)", + "Filter required on tests." + HUDI_COW_PT_TBL.getTableName() + " for at least one of the partition columns: dt, hh"); + // ON with partition column and WHERE with same left table's partition column + assertQuery( + session, + "SELECT t1.name, t2.name FROM " + HUDI_COW_PT_TBL + " t1 JOIN " + HUDI_COW_PT_TBL + " t2 ON (t1.dt = t2.dt) WHERE t1.dt = '2021-12-09'", + "VALUES ('a1', 'a1'), ('a2', 'a2'), ('a1', 'a2'), ('a2', 'a1')"); + // ON with partition column and WHERE with same right table's partition column + assertQuery( + session, + "SELECT t1.name, t2.name FROM " + HUDI_COW_PT_TBL + " t1 JOIN " + HUDI_COW_PT_TBL + " t2 ON (t1.dt = t2.dt) WHERE t2.dt = '2021-12-09'", + "VALUES ('a1', 'a1'), ('a2', 'a2'), ('a1', 'a2'), ('a2', 'a1')"); + + @Language("RegExp") String errorMessage = "Filter required on tests." + HUDI_COW_PT_TBL.getTableName() + " for at least one of the partition columns: dt, hh"; + // ON with partition column and WHERE with different left table's partition column + assertQueryFails(session, "SELECT t1.name, t2.name FROM " + HUDI_COW_PT_TBL + " t1 JOIN " + HUDI_COW_PT_TBL + " t2 ON (t1.dt = t2.dt) WHERE t1.hh = '10'", errorMessage); + // ON with partition column and WHERE with different right table's partition column + assertQueryFails(session, "SELECT t1.name, t2.name FROM " + HUDI_COW_PT_TBL + " t1 JOIN " + HUDI_COW_PT_TBL + " t2 ON (t1.dt = t2.dt) WHERE t2.hh = '10'", errorMessage); + // ON with partition column and WHERE with regular column + assertQueryFails(session, "SELECT t1.name, t2.name FROM " + HUDI_COW_PT_TBL + " t1 JOIN " + HUDI_COW_PT_TBL + " t2 ON (t1.dt = t2.dt) WHERE t2.id = 1", errorMessage); + + // ON with regular column + assertQueryFails(session, "SELECT t1.name, t2.name FROM " + HUDI_COW_PT_TBL + " t1 JOIN " + HUDI_COW_PT_TBL + " t2 ON (t1.id = t2.id)", errorMessage); + // ON with regular column and WHERE with regular column + assertQueryFails(session, "SELECT t1.name, t2.name FROM " + HUDI_COW_PT_TBL + " t1 JOIN " + HUDI_COW_PT_TBL + " t2 ON (t1.id = t2.id) WHERE t1.id = 1", errorMessage); + // ON with regular column and WHERE with left table's partition column + assertQueryFails(session, "SELECT t1.name, t2.name FROM " + HUDI_COW_PT_TBL + " t1 JOIN " + HUDI_COW_PT_TBL + " t2 ON (t1.id = t2.id) WHERE t1.dt = '2021-12-09'", errorMessage); + } + + @Test + public void testPartitionFilterRequiredWithLike() + { + Session session = withPartitionFilterRequired(getSession()); + assertQueryFails( + session, + "SELECT name FROM " + HUDI_COW_PT_TBL + " WHERE name LIKE '%1'", + "Filter required on tests." + HUDI_COW_PT_TBL.getTableName() + " for at least one of the partition columns: dt, hh"); + } + + @Test + public void testPartitionFilterRequiredFilterIncluded() + { + Session session = withPartitionFilterRequired(getSession()); + assertQuery(session, "SELECT name FROM " + HUDI_COW_PT_TBL + " WHERE hh = '10'", "VALUES 'a1'"); + assertQuery(session, "SELECT count(*) FROM " + HUDI_COW_PT_TBL + " WHERE hh < '12'", "VALUES 2"); + assertQuery(session, "SELECT count(*) FROM " + HUDI_COW_PT_TBL + " WHERE Hh < '11'", "VALUES 1"); + assertQuery(session, "SELECT count(*) FROM " + HUDI_COW_PT_TBL + " WHERE HH < '10'", "VALUES 0"); + assertQuery(session, "SELECT name FROM " + HUDI_COW_PT_TBL + " WHERE CAST(hh AS INTEGER) % 2 = 1 and hh IS NOT NULL", "VALUES 'a2'"); + assertQuery(session, "SELECT count(*) FROM " + HUDI_COW_PT_TBL + " WHERE hh IS NULL", "VALUES 0"); + assertQuery(session, "SELECT count(*) FROM " + HUDI_COW_PT_TBL + " WHERE hh IS NOT NULL", "VALUES 2"); + assertQuery(session, "SELECT name FROM " + HUDI_COW_PT_TBL + " WHERE hh LIKE '10'", "VALUES 'a1'"); + assertQuery(session, "SELECT name FROM " + HUDI_COW_PT_TBL + " WHERE hh LIKE '1%'", "VALUES 'a1', 'a2'"); + assertQuery(session, "SELECT name FROM " + HUDI_COW_PT_TBL + " WHERE id = 1 AND dt = '2021-12-09'", "VALUES 'a1'"); + assertQuery(session, "SELECT name FROM " + HUDI_COW_PT_TBL + " WHERE hh = '11' AND dt = '2021-12-09'", "VALUES 'a2'"); + assertQuery(session, "SELECT count(*) FROM " + HUDI_COW_PT_TBL + " WHERE hh = '12' AND dt = '2021-12-19'", "VALUES 0"); + + // Predicate which could not be translated into tuple domain + @Language("RegExp") String errorMessage = "Filter required on tests." + HUDI_COW_PT_TBL.getTableName() + " for at least one of the partition columns: dt, hh"; + assertQueryFails(session, "SELECT count(*) FROM " + HUDI_COW_PT_TBL + " WHERE CAST(hh AS INTEGER) % 2 = 0", errorMessage); + assertQueryFails(session, "SELECT count(*) FROM " + HUDI_COW_PT_TBL + " WHERE CAST(hh AS INTEGER) - 11 = 0", errorMessage); + assertQueryFails(session, "SELECT count(*) FROM " + HUDI_COW_PT_TBL + " WHERE CAST(hh AS INTEGER) * 2 = 20", errorMessage); + assertQueryFails(session, "SELECT count(*) FROM " + HUDI_COW_PT_TBL + " WHERE CAST(hh AS INTEGER) % 2 > 0", errorMessage); + assertQueryFails(session, "SELECT count(*) FROM " + HUDI_COW_PT_TBL + " WHERE name LIKE '%1' OR hh LIKE '%1'", errorMessage); + assertQueryFails(session, "SELECT count(*) FROM " + HUDI_COW_PT_TBL + " WHERE name LIKE '%1' AND hh LIKE '%0'", errorMessage); + assertQueryFails(session, "SELECT count(*) FROM " + HUDI_COW_PT_TBL + " WHERE id = 1 OR dt = '2021-12-09'", errorMessage); + assertQueryFails(session, "SELECT count(*) FROM " + HUDI_COW_PT_TBL + " WHERE hh = '11' OR dt = '2021-12-09'", errorMessage); + assertQueryFails(session, "SELECT count(*) FROM " + HUDI_COW_PT_TBL + " WHERE hh = '12' OR dt = '2021-12-19'", errorMessage); + assertQueryFails(session, "SELECT count(*) AS COUNT FROM " + HUDI_COW_PT_TBL + " WHERE CAST(hh AS INTEGER) > 2 GROUP BY name ", errorMessage); + } + + private static Session withPartitionFilterRequired(Session session) + { + return Session.builder(session) + .setCatalogSessionProperty(session.getCatalog().orElseThrow(), "query_partition_filter_required", "true") + .build(); + } + private TrinoInputFile toInputFile(String path) { return ((HudiConnector) getDistributedQueryRunner().getCoordinator().getConnector("hudi")).getInjector()