Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add config to reject Iceberg queries without partition filter #17263

Merged
merged 1 commit into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,10 @@ implementation is used:
* - ``iceberg.register-table-procedure.enabled``
- Enable to allow user to call ``register_table`` procedure.
- ``false``
* - ``iceberg.query-partition-filter-required``
- Set to ``true`` to force a query to use a partition filter.
You can use the ``query_partition_filter_required`` catalog session property for temporary, catalog specific use.
- ``false``
```

## Type mapping
Expand Down
rice668 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class IcebergConfig
private double minimumAssignedSplitWeight = 0.05;
private Optional<String> materializedViewsStorageSchema = Optional.empty();
private boolean sortedWritingEnabled = true;
private boolean queryPartitionFilterRequired;
ebyhr marked this conversation as resolved.
Show resolved Hide resolved

public CatalogType getCatalogType()
{
Expand Down Expand Up @@ -367,4 +368,17 @@ public IcebergConfig setSortedWritingEnabled(boolean sortedWritingEnabled)
this.sortedWritingEnabled = sortedWritingEnabled;
return this;
}

@Config("iceberg.query-partition-filter-required")
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
@ConfigDescription("Require a filter on at least one partition column")
public IcebergConfig setQueryPartitionFilterRequired(boolean queryPartitionFilterRequired)
rice668 marked this conversation as resolved.
Show resolved Hide resolved
{
this.queryPartitionFilterRequired = queryPartitionFilterRequired;
return this;
}

public boolean isQueryPartitionFilterRequired()
{
return queryPartitionFilterRequired;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
Expand Down Expand Up @@ -217,6 +218,7 @@
import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isMergeManifestsOnWrite;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isProjectionPushdownEnabled;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isQueryPartitionFilterRequired;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isStatisticsEnabled;
import static io.trino.plugin.iceberg.IcebergTableProperties.FILE_FORMAT_PROPERTY;
import static io.trino.plugin.iceberg.IcebergTableProperties.FORMAT_VERSION_PROPERTY;
Expand Down Expand Up @@ -258,6 +260,7 @@
import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY;
import static io.trino.spi.StandardErrorCode.INVALID_ARGUMENTS;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.QUERY_REJECTED;
import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.FRESH;
import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.STALE;
import static io.trino.spi.connector.MaterializedViewFreshness.Freshness.UNKNOWN;
Expand Down Expand Up @@ -429,6 +432,8 @@ public ConnectorTableHandle getTableHandle(
table.location(),
table.properties(),
false,
Optional.empty(),
ImmutableSet.of(),
Optional.empty());
}

Expand Down Expand Up @@ -657,6 +662,41 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
.build();
}

@Override
public void validateScan(ConnectorSession session, ConnectorTableHandle handle)
{
IcebergTableHandle table = (IcebergTableHandle) handle;
if (isQueryPartitionFilterRequired(session) && table.getEnforcedPredicate().isAll() && table.getAnalyzeColumns().isEmpty()) {
Schema schema = SchemaParser.fromJson(table.getTableSchemaJson());
Optional<PartitionSpec> partitionSpec = table.getPartitionSpecJson()
.map(partitionSpecJson -> PartitionSpecParser.fromJson(schema, partitionSpecJson));
if (partitionSpec.isEmpty() || partitionSpec.get().isUnpartitioned()) {
return;
}
Set<Integer> columnsWithPredicates = new HashSet<>();
table.getConstraintColumns().stream()
.map(IcebergColumnHandle::getId)
.forEach(columnsWithPredicates::add);
table.getUnenforcedPredicate().getDomains().ifPresent(domain -> domain.keySet().stream()
.map(IcebergColumnHandle::getId)
.forEach(columnsWithPredicates::add));
Set<Integer> partitionColumns = partitionSpec.get().fields().stream()
.filter(field -> !field.transform().isVoid())
.map(PartitionField::sourceId)
.collect(toImmutableSet());
if (Collections.disjoint(columnsWithPredicates, partitionColumns)) {
String partitionColumnNames = partitionSpec.get().fields().stream()
.filter(field -> !field.transform().isVoid())
.map(PartitionField::sourceId)
.map(id -> schema.idToName().get(id))
.collect(joining(", "));
throw new TrinoException(
QUERY_REJECTED,
format("Filter required for %s on at least one of the partition columns: %s", table.getSchemaTableName(), partitionColumnNames));
}
}
}

@Override
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
{
Expand Down Expand Up @@ -2001,7 +2041,7 @@ public ConnectorAnalyzeMetadata getStatisticsCollectionMetadata(ConnectorSession
});

return new ConnectorAnalyzeMetadata(
tableHandle,
handle.withAnalyzeColumns(analyzeColumnNames.or(() -> Optional.of(ImmutableSet.of()))),
getStatisticsCollectionMetadata(
tableMetadata,
analyzeColumnNames,
Expand Down Expand Up @@ -2384,7 +2424,9 @@ public Optional<LimitApplicationResult<ConnectorTableHandle>> applyLimit(Connect
table.getTableLocation(),
table.getStorageProperties(),
table.isRecordScannedFiles(),
table.getMaxScannedFileSize());
table.getMaxScannedFileSize(),
table.getConstraintColumns(),
table.getAnalyzeColumns());

return Optional.of(new LimitApplicationResult<>(table, false, false));
}
Expand All @@ -2395,7 +2437,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
IcebergTableHandle table = (IcebergTableHandle) handle;
ConstraintExtractor.ExtractionResult extractionResult = extractTupleDomain(constraint);
TupleDomain<IcebergColumnHandle> predicate = extractionResult.tupleDomain();
if (predicate.isAll()) {
if (predicate.isAll() && constraint.getPredicateColumns().isEmpty()) {
rice668 marked this conversation as resolved.
Show resolved Hide resolved
return Optional.empty();
}
if (table.getLimit().isPresent()) {
Expand Down Expand Up @@ -2455,8 +2497,16 @@ else if (isMetadataColumnId(columnHandle.getId())) {
remainingConstraint = TupleDomain.withColumnDomains(newUnenforced).intersect(TupleDomain.withColumnDomains(unsupported));
}

Set<IcebergColumnHandle> newConstraintColumns = constraint.getPredicateColumns()
.map(columnHandles -> columnHandles.stream()
.map(columnHandle -> (IcebergColumnHandle) columnHandle)
.collect(toImmutableSet()))
.orElse(ImmutableSet.of());

if (newEnforcedConstraint.equals(table.getEnforcedPredicate())
&& newUnenforcedConstraint.equals(table.getUnenforcedPredicate())) {
&& newUnenforcedConstraint.equals(table.getUnenforcedPredicate())
&& newConstraintColumns.equals(table.getConstraintColumns())
&& constraint.getPredicateColumns().isEmpty()) {
rice668 marked this conversation as resolved.
Show resolved Hide resolved
raunaqmorarka marked this conversation as resolved.
Show resolved Hide resolved
return Optional.empty();
}

Expand All @@ -2478,7 +2528,9 @@ else if (isMetadataColumnId(columnHandle.getId())) {
table.getTableLocation(),
table.getStorageProperties(),
table.isRecordScannedFiles(),
table.getMaxScannedFileSize()),
table.getMaxScannedFileSize(),
Sets.union(table.getConstraintColumns(), newConstraintColumns),
table.getAnalyzeColumns()),
remainingConstraint.transformKeys(ColumnHandle.class::cast),
extractionResult.remainingExpression(),
false));
Expand Down Expand Up @@ -2626,7 +2678,9 @@ public TableStatistics getTableStatistics(ConnectorSession session, ConnectorTab
originalHandle.getTableLocation(),
originalHandle.getStorageProperties(),
originalHandle.isRecordScannedFiles(),
originalHandle.getMaxScannedFileSize()),
originalHandle.getMaxScannedFileSize(),
originalHandle.getConstraintColumns(),
originalHandle.getAnalyzeColumns()),
handle -> {
Table icebergTable = catalog.loadTable(session, handle.getSchemaTableName());
return TableStatisticsReader.getTableStatistics(typeManager, session, handle, icebergTable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public final class IcebergSessionProperties
public static final String REMOVE_ORPHAN_FILES_MIN_RETENTION = "remove_orphan_files_min_retention";
private static final String MERGE_MANIFESTS_ON_WRITE = "merge_manifests_on_write";
private static final String SORTED_WRITING_ENABLED = "sorted_writing_enabled";
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -299,6 +300,11 @@ public IcebergSessionProperties(
"Enable sorted writing to tables with a specified sort order",
icebergConfig.isSortedWritingEnabled(),
false))
.add(booleanProperty(
QUERY_PARTITION_FILTER_REQUIRED,
"Require filter on partition column",
icebergConfig.isQueryPartitionFilterRequired(),
false))
.build();
}

Expand Down Expand Up @@ -489,4 +495,9 @@ public static boolean isSortedWritingEnabled(ConnectorSession session)
{
return session.getProperty(SORTED_WRITING_ENABLED, Boolean.class);
}

public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
{
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ public class IcebergTableHandle
// Filter guaranteed to be enforced by Iceberg connector
private final TupleDomain<IcebergColumnHandle> enforcedPredicate;

// Columns that are present in {@link Constraint#predicate()} applied on the table scan
private final Set<IcebergColumnHandle> constraintColumns;

// semantically limit is applied after enforcedPredicate
private final OptionalLong limit;

Expand All @@ -65,6 +68,9 @@ public class IcebergTableHandle
private final boolean recordScannedFiles;
private final Optional<DataSize> maxScannedFileSize;

// ANALYZE only
private final Optional<Set<String>> analyzeColumns;

@JsonCreator
public static IcebergTableHandle fromJsonForDeserializationOnly(
@JsonProperty("catalog") CatalogHandle catalog,
Expand Down Expand Up @@ -100,6 +106,8 @@ public static IcebergTableHandle fromJsonForDeserializationOnly(
tableLocation,
storageProperties,
false,
Optional.empty(),
ImmutableSet.of(),
Optional.empty());
}

Expand All @@ -120,7 +128,9 @@ public IcebergTableHandle(
String tableLocation,
Map<String, String> storageProperties,
boolean recordScannedFiles,
Optional<DataSize> maxScannedFileSize)
Optional<DataSize> maxScannedFileSize,
Set<IcebergColumnHandle> constraintColumns,
Optional<Set<String>> analyzeColumns)
{
this.catalog = requireNonNull(catalog, "catalog is null");
this.schemaName = requireNonNull(schemaName, "schemaName is null");
Expand All @@ -139,6 +149,8 @@ public IcebergTableHandle(
this.storageProperties = ImmutableMap.copyOf(requireNonNull(storageProperties, "storageProperties is null"));
this.recordScannedFiles = recordScannedFiles;
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
this.constraintColumns = ImmutableSet.copyOf(requireNonNull(constraintColumns, "constraintColumns is null"));
this.analyzeColumns = requireNonNull(analyzeColumns, "analyzeColumns is null");
}

@JsonProperty
Expand Down Expand Up @@ -244,6 +256,18 @@ public Optional<DataSize> getMaxScannedFileSize()
return maxScannedFileSize;
}

@JsonIgnore
public Set<IcebergColumnHandle> getConstraintColumns()
{
return constraintColumns;
}

@JsonIgnore
public Optional<Set<String>> getAnalyzeColumns()
{
return analyzeColumns;
}

public SchemaTableName getSchemaTableName()
{
return new SchemaTableName(schemaName, tableName);
Expand Down Expand Up @@ -273,7 +297,33 @@ public IcebergTableHandle withProjectedColumns(Set<IcebergColumnHandle> projecte
tableLocation,
storageProperties,
recordScannedFiles,
maxScannedFileSize);
maxScannedFileSize,
constraintColumns,
analyzeColumns);
}

public IcebergTableHandle withAnalyzeColumns(Optional<Set<String>> analyzeColumns)
{
return new IcebergTableHandle(
catalog,
schemaName,
tableName,
tableType,
snapshotId,
tableSchemaJson,
partitionSpecJson,
formatVersion,
unenforcedPredicate,
enforcedPredicate,
limit,
projectedColumns,
nameMappingJson,
tableLocation,
storageProperties,
recordScannedFiles,
maxScannedFileSize,
constraintColumns,
analyzeColumns);
}

public IcebergTableHandle forOptimize(boolean recordScannedFiles, DataSize maxScannedFileSize)
Expand All @@ -295,7 +345,9 @@ public IcebergTableHandle forOptimize(boolean recordScannedFiles, DataSize maxSc
tableLocation,
storageProperties,
recordScannedFiles,
Optional.of(maxScannedFileSize));
Optional.of(maxScannedFileSize),
constraintColumns,
analyzeColumns);
}

@Override
Expand Down Expand Up @@ -325,7 +377,9 @@ public boolean equals(Object o)
Objects.equals(nameMappingJson, that.nameMappingJson) &&
Objects.equals(tableLocation, that.tableLocation) &&
Objects.equals(storageProperties, that.storageProperties) &&
Objects.equals(maxScannedFileSize, that.maxScannedFileSize);
Objects.equals(maxScannedFileSize, that.maxScannedFileSize) &&
Objects.equals(constraintColumns, that.constraintColumns) &&
Objects.equals(analyzeColumns, that.analyzeColumns);
}

@Override
Expand All @@ -348,7 +402,9 @@ public int hashCode()
tableLocation,
storageProperties,
recordScannedFiles,
maxScannedFileSize);
maxScannedFileSize,
constraintColumns,
analyzeColumns);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,24 @@ public void testMetadataTables()
}
}

@Test
public void testPartitionFilterRequired()
{
String tableName = "test_partition_" + randomNameSuffix();

Session session = Session.builder(getSession())
.setCatalogSessionProperty("iceberg", "query_partition_filter_required", "true")
.build();

assertUpdate(session, "CREATE TABLE " + tableName + " (id integer, a varchar, b varchar, ds varchar) WITH (partitioning = ARRAY['ds'])");
assertUpdate(session, "INSERT INTO " + tableName + " (id, a, ds) VALUES (1, 'a', 'a')", 1);
String query = "SELECT id FROM " + tableName + " WHERE a = 'a'";
String failureMessage = "Filter required for tpch.*\\." + tableName + " on at least one of the partition columns: ds";
assertQueryFails(session, query, failureMessage);
assertQueryFails(session, "EXPLAIN " + query, failureMessage);
assertUpdate(session, "DROP TABLE " + tableName);
}

protected abstract boolean isFileSorted(Location path, String sortColumnName);

@Test
Expand Down
Loading
Loading