Skip to content

Commit

Permalink
Add query-partition-filter-required-schemas config to iceberg
Browse files Browse the repository at this point in the history
  • Loading branch information
okayhooni committed Jul 24, 2024
1 parent 0002afa commit c22cefa
Show file tree
Hide file tree
Showing 6 changed files with 114 additions and 1 deletion.
9 changes: 9 additions & 0 deletions docs/src/main/sphinx/connector/iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,15 @@ implementation is used:
`query_partition_filter_required` catalog session property for temporary,
catalog specific use.
- `false`
* - `iceberg.query-partition-filter-required-schemas`
- Allow specifying the list of schemas for which Trino will enforce that
queries use a filter on partition keys for source tables. The list can be
specified using the `iceberg.query-partition-filter-required-schemas`,
or the `query_partition_filter_required_schemas` session property. The list
is taken into consideration only if the `iceberg.query-partition-filter-required`
configuration property or the `query_partition_filter_required` session
property is set to `true`.
- `[]`
* - `iceberg.incremental-refresh-enabled`
- Set to `false` to force the materialized view refresh operation to always
perform a full refresh. You can use the `incremental_refresh_enabled`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableSet;
import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
Expand All @@ -28,12 +29,15 @@
import jakarta.validation.constraints.NotNull;

import java.util.Optional;
import java.util.Set;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.trino.plugin.hive.HiveCompressionCodec.ZSTD;
import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE;
import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET;
import static java.util.Locale.ENGLISH;
import static java.util.concurrent.TimeUnit.DAYS;
import static java.util.concurrent.TimeUnit.SECONDS;

Expand Down Expand Up @@ -78,6 +82,7 @@ public class IcebergConfig
private Optional<String> materializedViewsStorageSchema = Optional.empty();
private boolean sortedWritingEnabled = true;
private boolean queryPartitionFilterRequired;
private Set<String> queryPartitionFilterRequiredSchemas = ImmutableSet.of();
private int splitManagerThreads = Runtime.getRuntime().availableProcessors() * 2;
private boolean incrementalRefreshEnabled = true;

Expand Down Expand Up @@ -417,6 +422,21 @@ public boolean isQueryPartitionFilterRequired()
return queryPartitionFilterRequired;
}

public Set<String> getQueryPartitionFilterRequiredSchemas()
{
return queryPartitionFilterRequiredSchemas;
}

@Config("iceberg.query-partition-filter-required-schemas")
@ConfigDescription("List of schemas for which filter on partition column is enforced")
public IcebergConfig setQueryPartitionFilterRequiredSchemas(Set<String> queryPartitionFilterRequiredSchemas)
{
this.queryPartitionFilterRequiredSchemas = queryPartitionFilterRequiredSchemas.stream()
.map(value -> value.toLowerCase(ENGLISH))
.collect(toImmutableSet());
return this;
}

@Min(0)
public int getSplitManagerThreads()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@
import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getExpireSnapshotMinRetention;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getHiveCatalogName;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getQueryPartitionFilterRequiredSchemas;
import static io.trino.plugin.iceberg.IcebergSessionProperties.getRemoveOrphanFilesMinRetention;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isCollectExtendedStatisticsOnWrite;
import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled;
Expand Down Expand Up @@ -742,7 +743,7 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable
public void validateScan(ConnectorSession session, ConnectorTableHandle handle)
{
IcebergTableHandle table = (IcebergTableHandle) handle;
if (isQueryPartitionFilterRequired(session) && table.getEnforcedPredicate().isAll() && !table.getForAnalyze().orElseThrow()) {
if (isQueryPartitionFilterRequiredForTable(session, table) && table.getEnforcedPredicate().isAll() && !table.getForAnalyze().orElseThrow()) {
Schema schema = SchemaParser.fromJson(table.getTableSchemaJson());
Optional<PartitionSpec> partitionSpec = table.getPartitionSpecJson()
.map(partitionSpecJson -> PartitionSpecParser.fromJson(schema, partitionSpecJson));
Expand Down Expand Up @@ -773,6 +774,14 @@ public void validateScan(ConnectorSession session, ConnectorTableHandle handle)
}
}

private static boolean isQueryPartitionFilterRequiredForTable(ConnectorSession session, IcebergTableHandle table)
{
Set<String> requiredSchemas = getQueryPartitionFilterRequiredSchemas(session);
// If query_partition_filter_required_schemas is empty then we would apply partition filter for all tables.
return isQueryPartitionFilterRequired(session) &&
(requiredSchemas.isEmpty() || requiredSchemas.contains(table.getSchemaName()));
}

@Override
public Map<SchemaTableName, List<ColumnMetadata>> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,17 @@
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.type.ArrayType;

import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.plugin.base.session.PropertyMetadataUtil.dataSizeProperty;
import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty;
import static io.trino.plugin.base.session.PropertyMetadataUtil.validateMaxDataSize;
Expand All @@ -51,7 +56,10 @@
import static io.trino.spi.session.PropertyMetadata.enumProperty;
import static io.trino.spi.session.PropertyMetadata.integerProperty;
import static io.trino.spi.session.PropertyMetadata.stringProperty;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.lang.String.format;
import static java.util.Locale.ENGLISH;
import static java.util.Objects.requireNonNull;

public final class IcebergSessionProperties
implements SessionPropertiesProvider
Expand Down Expand Up @@ -98,6 +106,7 @@ public final class IcebergSessionProperties
private static final String MERGE_MANIFESTS_ON_WRITE = "merge_manifests_on_write";
private static final String SORTED_WRITING_ENABLED = "sorted_writing_enabled";
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";
private static final String QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS = "query_partition_filter_required_schemas";
private static final String INCREMENTAL_REFRESH_ENABLED = "incremental_refresh_enabled";

private final List<PropertyMetadata<?>> sessionProperties;
Expand Down Expand Up @@ -355,6 +364,23 @@ public IcebergSessionProperties(
"Require filter on partition column",
icebergConfig.isQueryPartitionFilterRequired(),
false))
.add(new PropertyMetadata<>(
QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS,
"List of schemas for which filter on partition column is enforced.",
new ArrayType(VARCHAR),
Set.class,
icebergConfig.getQueryPartitionFilterRequiredSchemas(),
false,
object -> ((Collection<?>) object).stream()
.map(String.class::cast)
.peek(property -> {
if (isNullOrEmpty(property)) {
throw new TrinoException(INVALID_SESSION_PROPERTY, format("Invalid null or empty value in %s property", QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS));
}
})
.map(schema -> schema.toLowerCase(ENGLISH))
.collect(toImmutableSet()),
value -> value))
.add(booleanProperty(
INCREMENTAL_REFRESH_ENABLED,
"Enable Incremental refresh for MVs backed by Iceberg tables, when possible.",
Expand Down Expand Up @@ -586,6 +612,14 @@ public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);
}

@SuppressWarnings("unchecked cast")
public static Set<String> getQueryPartitionFilterRequiredSchemas(ConnectorSession session)
{
Set<String> queryPartitionFilterRequiredSchemas = (Set<String>) session.getProperty(QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS, Set.class);
requireNonNull(queryPartitionFilterRequiredSchemas, "queryPartitionFilterRequiredSchemas is null");
return queryPartitionFilterRequiredSchemas;
}

public static boolean isIncrementalRefreshEnabled(ConnectorSession session)
{
return session.getProperty(INCREMENTAL_REFRESH_ENABLED, Boolean.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7969,6 +7969,43 @@ public void testNonSelectStatementsWithPartitionFilterRequired()
assertUpdate(session, "DROP TABLE " + tableName2);
}

@Test
public void testPartitionFilterRequiredSchemas()
{
String schemaName = "test_unenforced_schema_" + randomNameSuffix();
String tableName = "test_partition_" + randomNameSuffix();

Session session = Session.builder(withPartitionFilterRequired(getSession()))
.setCatalogSessionProperty("iceberg", "query_partition_filter_required_schemas", "[\"tpch\"]")
.build();

assertUpdate(session, "CREATE SCHEMA " + schemaName);
assertUpdate(session, format("CREATE TABLE %s.%s (id, a, ds) WITH (partitioning = ARRAY['ds']) AS SELECT 1, '1', '1'", schemaName, tableName), 1);
assertUpdate(session, "CREATE TABLE " + tableName + " (id, a, ds) WITH (partitioning = ARRAY['ds']) AS SELECT 1, '1', '1'", 1);

String enforcedQuery = "SELECT id FROM tpch." + tableName + " WHERE a = '1'";
assertQueryFails(session, enforcedQuery, "Filter required for tpch\\." + tableName + " on at least one of the partition columns: ds");

String unenforcedQuery = format("SELECT id FROM %s.%s WHERE a = '1'", schemaName, tableName);
assertQuerySucceeds(session, unenforcedQuery);

assertUpdate(session, "DROP TABLE " + tableName);
assertUpdate(session, "DROP SCHEMA " + schemaName + " CASCADE");
}

@Test
public void testIgnorePartitionFilterRequiredSchemas()
{
String tableName = "test_partition_" + randomNameSuffix();

Session session = Session.builder(getSession())
.setCatalogSessionProperty("iceberg", "query_partition_filter_required_schemas", "[\"tpch\"]")
.build();
assertUpdate(session, "CREATE TABLE " + tableName + " (id, a, ds) WITH (partitioning = ARRAY['ds']) AS SELECT 1, '1', '1'", 1);
assertQuerySucceeds(session, "SELECT id FROM " + tableName + " WHERE a = '1'");
assertUpdate(session, "DROP TABLE " + tableName);
}

private static Session withPartitionFilterRequired(Session session)
{
return Session.builder(session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.trino.plugin.hive.HiveCompressionCodec;
Expand Down Expand Up @@ -67,6 +68,7 @@ public void testDefaults()
.setRegisterTableProcedureEnabled(false)
.setSortedWritingEnabled(true)
.setQueryPartitionFilterRequired(false)
.setQueryPartitionFilterRequiredSchemas(ImmutableSet.of())
.setSplitManagerThreads(Runtime.getRuntime().availableProcessors() * 2)
.setIncrementalRefreshEnabled(true));
}
Expand Down Expand Up @@ -99,6 +101,7 @@ public void testExplicitPropertyMappings()
.put("iceberg.register-table-procedure.enabled", "true")
.put("iceberg.sorted-writing-enabled", "false")
.put("iceberg.query-partition-filter-required", "true")
.put("iceberg.query-partition-filter-required-schemas", "bronze,silver")
.put("iceberg.split-manager-threads", "42")
.put("iceberg.incremental-refresh-enabled", "false")
.buildOrThrow();
Expand Down Expand Up @@ -128,6 +131,7 @@ public void testExplicitPropertyMappings()
.setRegisterTableProcedureEnabled(true)
.setSortedWritingEnabled(false)
.setQueryPartitionFilterRequired(true)
.setQueryPartitionFilterRequiredSchemas(ImmutableSet.of("bronze", "silver"))
.setSplitManagerThreads(42)
.setIncrementalRefreshEnabled(false);

Expand Down

0 comments on commit c22cefa

Please sign in to comment.