Skip to content

Commit

Permalink
Optional check for query partition filter for Hudi
Browse files Browse the repository at this point in the history
  • Loading branch information
krvikash authored and Praveen2112 committed Dec 22, 2023
1 parent 261322f commit 1afaa52
Show file tree
Hide file tree
Showing 7 changed files with 326 additions and 5 deletions.
9 changes: 9 additions & 0 deletions docs/src/main/sphinx/connector/hudi.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,15 @@ Additionally, following configuration properties can be set depending on the use
- Maximum number of metastore data objects per transaction in the Hive
metastore cache.
- `2000`
* - `hudi.query-partition-filter-required`
- Set to `true` to force a query to use a partition column in the filter condition.
The equivalent catalog session property is `query_partition_filter_required`.
Enabling this property causes query failures if the partition column used
in the filter condition doesn't effectively reduce the number of data files read.
Example: Complex filter expressions such as `id = 1 OR part_key = '100'`
or `CAST(part_key AS INTEGER) % 2 = 0` are not recognized as partition filters,
and queries using such expressions fail if the property is set to `true`.
- `false`

:::

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class HudiConfig
private int splitLoaderParallelism = 4;
private int splitGeneratorParallelism = 4;
private long perTransactionMetastoreCacheMaximumSize = 2000;
private boolean queryPartitionFilterRequired;

public List<String> getColumnsToHide()
{
Expand Down Expand Up @@ -193,4 +194,17 @@ public HudiConfig setPerTransactionMetastoreCacheMaximumSize(long perTransaction
this.perTransactionMetastoreCacheMaximumSize = perTransactionMetastoreCacheMaximumSize;
return this;
}

@Config("hudi.query-partition-filter-required")
@ConfigDescription("Require a filter on at least one partition column")
public HudiConfig setQueryPartitionFilterRequired(boolean queryPartitionFilterRequired)
{
this.queryPartitionFilterRequired = queryPartitionFilterRequired;
return this;
}

public boolean isQueryPartitionFilterRequired()
{
return queryPartitionFilterRequired;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,23 +45,29 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Stream;

import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.hive.HiveMetadata.TABLE_COMMENT;
import static io.trino.plugin.hive.HiveTimestampPrecision.NANOSECONDS;
import static io.trino.plugin.hive.util.HiveUtil.columnMetadataGetter;
import static io.trino.plugin.hive.util.HiveUtil.getPartitionKeyColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.hiveColumnHandles;
import static io.trino.plugin.hive.util.HiveUtil.isHiveSystemSchema;
import static io.trino.plugin.hive.util.HiveUtil.isHudiTable;
import static io.trino.plugin.hudi.HudiErrorCode.HUDI_BAD_DATA;
import static io.trino.plugin.hudi.HudiSessionProperties.getColumnsToHide;
import static io.trino.plugin.hudi.HudiSessionProperties.isQueryPartitionFilterRequired;
import static io.trino.plugin.hudi.HudiTableProperties.LOCATION_PROPERTY;
import static io.trino.plugin.hudi.HudiTableProperties.PARTITIONED_BY_PROPERTY;
import static io.trino.plugin.hudi.HudiUtil.hudiMetadataExists;
import static io.trino.plugin.hudi.model.HudiTableType.COPY_ON_WRITE;
import static io.trino.spi.StandardErrorCode.QUERY_REJECTED;
import static io.trino.spi.StandardErrorCode.UNSUPPORTED_TABLE_TYPE;
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
import static java.lang.String.format;
Expand Down Expand Up @@ -114,6 +120,7 @@ public HudiTableHandle getTableHandle(ConnectorSession session, SchemaTableName
tableName.getTableName(),
table.get().getStorage().getLocation(),
COPY_ON_WRITE,
getPartitionKeyColumnHandles(table.get(), typeManager),
TupleDomain.all(),
TupleDomain.all());
}
Expand Down Expand Up @@ -162,12 +169,30 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
{
HudiTableHandle handle = (HudiTableHandle) tableHandle;
HudiPredicates predicates = HudiPredicates.from(constraint.getSummary());
TupleDomain<HiveColumnHandle> regularColumnPredicates = predicates.getRegularColumnPredicates();
TupleDomain<HiveColumnHandle> partitionColumnPredicates = predicates.getPartitionColumnPredicates();

// TODO Since the constraint#predicate isn't utilized during split generation. So,
// Let's not add constraint#predicateColumns to newConstraintColumns.
Set<HiveColumnHandle> newConstraintColumns = Stream.concat(
Stream.concat(
regularColumnPredicates.getDomains().stream()
.map(Map::keySet)
.flatMap(Collection::stream),
partitionColumnPredicates.getDomains().stream()
.map(Map::keySet)
.flatMap(Collection::stream)),
handle.getConstraintColumns().stream())
.collect(toImmutableSet());

HudiTableHandle newHudiTableHandle = handle.applyPredicates(
predicates.getPartitionColumnPredicates(),
predicates.getRegularColumnPredicates());
newConstraintColumns,
partitionColumnPredicates,
regularColumnPredicates);

if (handle.getPartitionPredicates().equals(newHudiTableHandle.getPartitionPredicates())
&& handle.getRegularPredicates().equals(newHudiTableHandle.getRegularPredicates())) {
&& handle.getRegularPredicates().equals(newHudiTableHandle.getRegularPredicates())
&& handle.getConstraintColumns().equals(newHudiTableHandle.getConstraintColumns())) {
return Optional.empty();
}

Expand Down Expand Up @@ -224,6 +249,27 @@ public Iterator<TableColumnsMetadata> streamTableColumns(ConnectorSession sessio
.iterator();
}

@Override
public void validateScan(ConnectorSession session, ConnectorTableHandle handle)
{
HudiTableHandle hudiTableHandle = (HudiTableHandle) handle;
if (isQueryPartitionFilterRequired(session)) {
if (!hudiTableHandle.getPartitionColumns().isEmpty()) {
Set<String> partitionColumns = hudiTableHandle.getPartitionColumns().stream()
.map(HiveColumnHandle::getName)
.collect(toImmutableSet());
Set<String> constraintColumns = hudiTableHandle.getConstraintColumns().stream()
.map(HiveColumnHandle::getBaseColumnName)
.collect(toImmutableSet());
if (Collections.disjoint(constraintColumns, partitionColumns)) {
throw new TrinoException(
QUERY_REJECTED,
format("Filter required on %s for at least one of the partition columns: %s", hudiTableHandle.getSchemaTableName(), String.join(", ", partitionColumns)));
}
}
}
}

HiveMetastore getMetastore()
{
return metastore;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ public class HudiSessionProperties
private static final String MAX_SPLITS_PER_SECOND = "max_splits_per_second";
private static final String MAX_OUTSTANDING_SPLITS = "max_outstanding_splits";
private static final String SPLIT_GENERATOR_PARALLELISM = "split_generator_parallelism";
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -113,6 +114,11 @@ public HudiSessionProperties(HudiConfig hudiConfig, ParquetReaderConfig parquetR
SPLIT_GENERATOR_PARALLELISM,
"Number of threads to generate splits from partitions",
hudiConfig.getSplitGeneratorParallelism(),
false),
booleanProperty(
QUERY_PARTITION_FILTER_REQUIRED,
"Require a filter on at least one partition column",
hudiConfig.isQueryPartitionFilterRequired(),
false));
}

Expand Down Expand Up @@ -167,4 +173,9 @@ public static int getSplitGeneratorParallelism(ConnectorSession session)
{
return session.getProperty(SPLIT_GENERATOR_PARALLELISM, Integer.class);
}

public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
{
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@
package io.trino.plugin.hudi;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import io.trino.plugin.hive.HiveColumnHandle;
import io.trino.plugin.hudi.model.HudiTableType;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.predicate.TupleDomain;

import java.util.List;
import java.util.Set;

import static io.trino.spi.connector.SchemaTableName.schemaTableName;
import static java.util.Objects.requireNonNull;

Expand All @@ -31,6 +36,9 @@ public class HudiTableHandle
private final String tableName;
private final String basePath;
private final HudiTableType tableType;
private final List<HiveColumnHandle> partitionColumns;
// Used only for validation when config property hudi.query-partition-filter-required is enabled
private final Set<HiveColumnHandle> constraintColumns;
private final TupleDomain<HiveColumnHandle> partitionPredicates;
private final TupleDomain<HiveColumnHandle> regularPredicates;

Expand All @@ -40,13 +48,29 @@ public HudiTableHandle(
@JsonProperty("tableName") String tableName,
@JsonProperty("basePath") String basePath,
@JsonProperty("tableType") HudiTableType tableType,
@JsonProperty("partitionColumns") List<HiveColumnHandle> partitionColumns,
@JsonProperty("partitionPredicates") TupleDomain<HiveColumnHandle> partitionPredicates,
@JsonProperty("regularPredicates") TupleDomain<HiveColumnHandle> regularPredicates)
{
this(schemaName, tableName, basePath, tableType, partitionColumns, ImmutableSet.of(), partitionPredicates, regularPredicates);
}

public HudiTableHandle(
String schemaName,
String tableName,
String basePath,
HudiTableType tableType,
List<HiveColumnHandle> partitionColumns,
Set<HiveColumnHandle> constraintColumns,
TupleDomain<HiveColumnHandle> partitionPredicates,
TupleDomain<HiveColumnHandle> regularPredicates)
{
this.schemaName = requireNonNull(schemaName, "schemaName is null");
this.tableName = requireNonNull(tableName, "tableName is null");
this.basePath = requireNonNull(basePath, "basePath is null");
this.tableType = requireNonNull(tableType, "tableType is null");
this.partitionColumns = requireNonNull(partitionColumns, "partitionColumns is null");
this.constraintColumns = requireNonNull(constraintColumns, "constraintColumns is null");
this.partitionPredicates = requireNonNull(partitionPredicates, "partitionPredicates is null");
this.regularPredicates = requireNonNull(regularPredicates, "regularPredicates is null");
}
Expand Down Expand Up @@ -81,6 +105,19 @@ public TupleDomain<HiveColumnHandle> getPartitionPredicates()
return partitionPredicates;
}

@JsonProperty
public List<HiveColumnHandle> getPartitionColumns()
{
return partitionColumns;
}

// do not serialize constraint columns as they are not needed on workers
@JsonIgnore
public Set<HiveColumnHandle> getConstraintColumns()
{
return constraintColumns;
}

@JsonProperty
public TupleDomain<HiveColumnHandle> getRegularPredicates()
{
Expand All @@ -93,6 +130,7 @@ public SchemaTableName getSchemaTableName()
}

HudiTableHandle applyPredicates(
Set<HiveColumnHandle> constraintColumns,
TupleDomain<HiveColumnHandle> partitionTupleDomain,
TupleDomain<HiveColumnHandle> regularTupleDomain)
{
Expand All @@ -101,6 +139,8 @@ HudiTableHandle applyPredicates(
tableName,
basePath,
tableType,
partitionColumns,
constraintColumns,
partitionPredicates.intersect(partitionTupleDomain),
regularPredicates.intersect(regularTupleDomain));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public void testDefaults()
.setMaxOutstandingSplits(1000)
.setSplitLoaderParallelism(4)
.setSplitGeneratorParallelism(4)
.setPerTransactionMetastoreCacheMaximumSize(2000));
.setPerTransactionMetastoreCacheMaximumSize(2000)
.setQueryPartitionFilterRequired(false));
}

@Test
Expand All @@ -56,6 +57,7 @@ public void testExplicitPropertyMappings()
.put("hudi.split-loader-parallelism", "16")
.put("hudi.split-generator-parallelism", "32")
.put("hudi.per-transaction-metastore-cache-maximum-size", "1000")
.put("hudi.query-partition-filter-required", "true")
.buildOrThrow();

HudiConfig expected = new HudiConfig()
Expand All @@ -68,7 +70,8 @@ public void testExplicitPropertyMappings()
.setMaxOutstandingSplits(100)
.setSplitLoaderParallelism(16)
.setSplitGeneratorParallelism(32)
.setPerTransactionMetastoreCacheMaximumSize(1000);
.setPerTransactionMetastoreCacheMaximumSize(1000)
.setQueryPartitionFilterRequired(true);

assertFullMapping(properties, expected);
}
Expand Down
Loading

0 comments on commit 1afaa52

Please sign in to comment.