Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add query-partition-filter-required-schemas config property to Iceberg connector #22540

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,15 @@ implementation is used:
`query_partition_filter_required` catalog session property for temporary,
catalog specific use.
- `false`
ebyhr marked this conversation as resolved.
Show resolved Hide resolved
* - `iceberg.query-partition-filter-required-schemas`
- Allow specifying the list of schemas for which Trino will enforce that
queries use a filter on partition keys for source tables. The list can be
specified using the `iceberg.query-partition-filter-required-schemas`,
or the `query_partition_filter_required_schemas` session property. The list
is taken into consideration only if the `iceberg.query-partition-filter-required`
configuration property or the `query_partition_filter_required` session
property is set to `true`.
- `[]`
* - `iceberg.incremental-refresh-enabled`
- Set to `false` to force the materialized view refresh operation to always
perform a full refresh. You can use the `incremental_refresh_enabled`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableSet;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
Expand All @@ -28,12 +29,15 @@
import jakarta.validation.constraints.NotNull;

import java.util.Optional;
import java.util.Set;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.trino.plugin.hive.HiveCompressionCodec.ZSTD;
import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE;
import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET;
import static java.util.Locale.ENGLISH;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -78,6 +82,7 @@ public class IcebergConfig
private Optional<String> materializedViewsStorageSchema = Optional.empty();
private boolean sortedWritingEnabled = true;
private boolean queryPartitionFilterRequired;
private Set<String> queryPartitionFilterRequiredSchemas = ImmutableSet.of();
private int splitManagerThreads = Runtime.getRuntime().availableProcessors() * 2;
private boolean incrementalRefreshEnabled = true;

Expand Down Expand Up @@ -417,6 +422,21 @@ public boolean isQueryPartitionFilterRequired()
return queryPartitionFilterRequired;
}

public Set<String> getQueryPartitionFilterRequiredSchemas()
{
return queryPartitionFilterRequiredSchemas;
}

@Config("iceberg.query-partition-filter-required-schemas")
@ConfigDescription("List of schemas for which filter on partition column is enforced")
public IcebergConfig setQueryPartitionFilterRequiredSchemas(Set<String> queryPartitionFilterRequiredSchemas)
{
this.queryPartitionFilterRequiredSchemas = queryPartitionFilterRequiredSchemas.stream()
.map(value -> value.toLowerCase(ENGLISH))
.collect(toImmutableSet());
return this;
}

@Min(0)
public int getSplitManagerThreads()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@
import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getExpireSnapshotMinRetention;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getHiveCatalogName;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getQueryPartitionFilterRequiredSchemas;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getRemoveOrphanFilesMinRetention;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isCollectExtendedStatisticsOnWrite;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled;
Expand Down Expand Up @@ -742,7 +743,7 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
public void validateScan(ConnectorSession session, ConnectorTableHandle handle)
{
IcebergTableHandle table = (IcebergTableHandle) handle;
if (isQueryPartitionFilterRequired(session) && table.getEnforcedPredicate().isAll() && !table.getForAnalyze().orElseThrow()) {
if (isQueryPartitionFilterRequiredForTable(session, table) && table.getEnforcedPredicate().isAll() && !table.getForAnalyze().orElseThrow()) {
Schema schema = SchemaParser.fromJson(table.getTableSchemaJson());
Optional<PartitionSpec> partitionSpec = table.getPartitionSpecJson()
.map(partitionSpecJson -> PartitionSpecParser.fromJson(schema, partitionSpecJson));
Expand Down Expand Up @@ -773,6 +774,14 @@ public void validateScan(ConnectorSession session, ConnectorTableHandle handle)
}
}

private static boolean isQueryPartitionFilterRequiredForTable(ConnectorSession session, IcebergTableHandle table)
{
Set<String> requiredSchemas = getQueryPartitionFilterRequiredSchemas(session);
// If query_partition_filter_required_schemas is empty then we would apply partition filter for all tables.
return isQueryPartitionFilterRequired(session) &&
(requiredSchemas.isEmpty() || requiredSchemas.contains(table.getSchemaName()));
}

@Override
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.type.ArrayType;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.base.session.PropertyMetadataUtil.dataSizeProperty;
import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.trino.plugin.base.session.PropertyMetadataUtil.validateMaxDataSize;
Expand All @@ -51,7 +56,10 @@
import static io.trino.spi.session.PropertyMetadata.enumProperty;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
import static io.trino.spi.session.PropertyMetadata.stringProperty;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

public final class IcebergSessionProperties
implements SessionPropertiesProvider
Expand Down Expand Up @@ -98,6 +106,7 @@ public final class IcebergSessionProperties
private static final String MERGE_MANIFESTS_ON_WRITE = "merge_manifests_on_write";
private static final String SORTED_WRITING_ENABLED = "sorted_writing_enabled";
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
private static final String QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS = "query_partition_filter_required_schemas";
private static final String INCREMENTAL_REFRESH_ENABLED = "incremental_refresh_enabled";

private final List<PropertyMetadata<?>> sessionProperties;
Expand Down Expand Up @@ -355,6 +364,23 @@ public IcebergSessionProperties(
"Require filter on partition column",
icebergConfig.isQueryPartitionFilterRequired(),
false))
.add(new PropertyMetadata<>(
QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS,
"List of schemas for which filter on partition column is enforced.",
new ArrayType(VARCHAR),
Set.class,
icebergConfig.getQueryPartitionFilterRequiredSchemas(),
false,
object -> ((Collection<?>) object).stream()
.map(String.class::cast)
.peek(property -> {
if (isNullOrEmpty(property)) {
throw new TrinoException(INVALID_SESSION_PROPERTY, format("Invalid null or empty value in %s property", QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS));
}
})
.map(schema -> schema.toLowerCase(ENGLISH))
.collect(toImmutableSet()),
value -> value))
.add(booleanProperty(
INCREMENTAL_REFRESH_ENABLED,
"Enable Incremental refresh for MVs backed by Iceberg tables, when possible.",
Expand Down Expand Up @@ -586,6 +612,14 @@ public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);
}

@SuppressWarnings("unchecked cast")
public static Set<String> getQueryPartitionFilterRequiredSchemas(ConnectorSession session)
{
Set<String> queryPartitionFilterRequiredSchemas = (Set<String>) session.getProperty(QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS, Set.class);
requireNonNull(queryPartitionFilterRequiredSchemas, "queryPartitionFilterRequiredSchemas is null");
return queryPartitionFilterRequiredSchemas;
}

public static boolean isIncrementalRefreshEnabled(ConnectorSession session)
{
return session.getProperty(INCREMENTAL_REFRESH_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7969,6 +7969,43 @@ public void testNonSelectStatementsWithPartitionFilterRequired()
assertUpdate(session, "DROP TABLE " + tableName2);
}

@Test
public void testPartitionFilterRequiredSchemas()
{
String schemaName = "test_unenforced_schema_" + randomNameSuffix();
String tableName = "test_partition_" + randomNameSuffix();

Session session = Session.builder(withPartitionFilterRequired(getSession()))
.setCatalogSessionProperty("iceberg", "query_partition_filter_required_schemas", "[\"tpch\"]")
.build();

assertUpdate(session, "CREATE SCHEMA " + schemaName);
assertUpdate(session, format("CREATE TABLE %s.%s (id, a, ds) WITH (partitioning = ARRAY['ds']) AS SELECT 1, '1', '1'", schemaName, tableName), 1);
assertUpdate(session, "CREATE TABLE " + tableName + " (id, a, ds) WITH (partitioning = ARRAY['ds']) AS SELECT 1, '1', '1'", 1);

String enforcedQuery = "SELECT id FROM tpch." + tableName + " WHERE a = '1'";
assertQueryFails(session, enforcedQuery, "Filter required for tpch\\." + tableName + " on at least one of the partition columns: ds");

String unenforcedQuery = format("SELECT id FROM %s.%s WHERE a = '1'", schemaName, tableName);
assertQuerySucceeds(session, unenforcedQuery);

assertUpdate(session, "DROP TABLE " + tableName);
assertUpdate(session, "DROP SCHEMA " + schemaName + " CASCADE");
}

@Test
public void testIgnorePartitionFilterRequiredSchemas()
{
String tableName = "test_partition_" + randomNameSuffix();

Session session = Session.builder(getSession())
.setCatalogSessionProperty("iceberg", "query_partition_filter_required_schemas", "[\"tpch\"]")
.build();
assertUpdate(session, "CREATE TABLE " + tableName + " (id, a, ds) WITH (partitioning = ARRAY['ds']) AS SELECT 1, '1', '1'", 1);
assertQuerySucceeds(session, "SELECT id FROM " + tableName + " WHERE a = '1'");
assertUpdate(session, "DROP TABLE " + tableName);
}

private static Session withPartitionFilterRequired(Session session)
{
return Session.builder(session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.plugin.hive.HiveCompressionCodec;
Expand Down Expand Up @@ -67,6 +68,7 @@ public void testDefaults()
.setRegisterTableProcedureEnabled(false)
.setSortedWritingEnabled(true)
.setQueryPartitionFilterRequired(false)
.setQueryPartitionFilterRequiredSchemas(ImmutableSet.of())
.setSplitManagerThreads(Runtime.getRuntime().availableProcessors() * 2)
.setIncrementalRefreshEnabled(true));
}
Expand Down Expand Up @@ -99,6 +101,7 @@ public void testExplicitPropertyMappings()
.put("iceberg.register-table-procedure.enabled", "true")
.put("iceberg.sorted-writing-enabled", "false")
.put("iceberg.query-partition-filter-required", "true")
.put("iceberg.query-partition-filter-required-schemas", "bronze,silver")
.put("iceberg.split-manager-threads", "42")
.put("iceberg.incremental-refresh-enabled", "false")
.buildOrThrow();
Expand Down Expand Up @@ -128,6 +131,7 @@ public void testExplicitPropertyMappings()
.setRegisterTableProcedureEnabled(true)
.setSortedWritingEnabled(false)
.setQueryPartitionFilterRequired(true)
.setQueryPartitionFilterRequiredSchemas(ImmutableSet.of("bronze", "silver"))
.setSplitManagerThreads(42)
.setIncrementalRefreshEnabled(false);

Expand Down
Loading