From c22cefa3210a5a65690aec5e326040ede928ec55 Mon Sep 17 00:00:00 2001 From: okayhooni Date: Fri, 28 Jun 2024 21:40:32 +0900 Subject: [PATCH] Add query-partition-filter-required-schemas config to iceberg --- docs/src/main/sphinx/connector/iceberg.md | 9 +++++ .../trino/plugin/iceberg/IcebergConfig.java | 20 ++++++++++ .../trino/plugin/iceberg/IcebergMetadata.java | 11 +++++- .../iceberg/IcebergSessionProperties.java | 34 +++++++++++++++++ .../iceberg/BaseIcebergConnectorTest.java | 37 +++++++++++++++++++ .../plugin/iceberg/TestIcebergConfig.java | 4 ++ 6 files changed, 114 insertions(+), 1 deletion(-) diff --git a/docs/src/main/sphinx/connector/iceberg.md b/docs/src/main/sphinx/connector/iceberg.md index c87f56b89273..4e38c2b0cd2d 100644 --- a/docs/src/main/sphinx/connector/iceberg.md +++ b/docs/src/main/sphinx/connector/iceberg.md @@ -166,6 +166,15 @@ implementation is used: `query_partition_filter_required` catalog session property for temporary, catalog specific use. - `false` +* - `iceberg.query-partition-filter-required-schemas` + - Allow specifying the list of schemas for which Trino will enforce that + queries use a filter on partition keys for source tables. The list can be + specified using the `iceberg.query-partition-filter-required-schemas`, + or the `query_partition_filter_required_schemas` session property. The list + is taken into consideration only if the `iceberg.query-partition-filter-required` + configuration property or the `query_partition_filter_required` session + property is set to `true`. + - `[]` * - `iceberg.incremental-refresh-enabled` - Set to `false` to force the materialized view refresh operation to always perform a full refresh. You can use the `incremental_refresh_enabled` diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java index af6ba7237592..366f4a3712c9 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergConfig.java @@ -13,6 +13,7 @@ */ package io.trino.plugin.iceberg; +import com.google.common.collect.ImmutableSet; import io.airlift.configuration.Config; import io.airlift.configuration.ConfigDescription; import io.airlift.configuration.DefunctConfig; @@ -28,12 +29,15 @@ import jakarta.validation.constraints.NotNull; import java.util.Optional; +import java.util.Set; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.airlift.units.DataSize.Unit.GIGABYTE; import static io.airlift.units.DataSize.Unit.MEGABYTE; import static io.trino.plugin.hive.HiveCompressionCodec.ZSTD; import static io.trino.plugin.iceberg.CatalogType.HIVE_METASTORE; import static io.trino.plugin.iceberg.IcebergFileFormat.PARQUET; +import static java.util.Locale.ENGLISH; import static java.util.concurrent.TimeUnit.DAYS; import static java.util.concurrent.TimeUnit.SECONDS; @@ -78,6 +82,7 @@ public class IcebergConfig private Optional materializedViewsStorageSchema = Optional.empty(); private boolean sortedWritingEnabled = true; private boolean queryPartitionFilterRequired; + private Set queryPartitionFilterRequiredSchemas = ImmutableSet.of(); private int splitManagerThreads = Runtime.getRuntime().availableProcessors() * 2; private boolean incrementalRefreshEnabled = true; @@ -417,6 +422,21 @@ public boolean isQueryPartitionFilterRequired() return queryPartitionFilterRequired; } + public Set getQueryPartitionFilterRequiredSchemas() + { + return queryPartitionFilterRequiredSchemas; + } + + @Config("iceberg.query-partition-filter-required-schemas") + @ConfigDescription("List of schemas for which filter on partition column is enforced") + public IcebergConfig setQueryPartitionFilterRequiredSchemas(Set queryPartitionFilterRequiredSchemas) + { + this.queryPartitionFilterRequiredSchemas = queryPartitionFilterRequiredSchemas.stream() + .map(value -> value.toLowerCase(ENGLISH)) + .collect(toImmutableSet()); + return this; + } + @Min(0) public int getSplitManagerThreads() { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java index f2393bbedbe1..e2fd24c0cc5e 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergMetadata.java @@ -229,6 +229,7 @@ import static io.trino.plugin.iceberg.IcebergMetadataColumn.isMetadataColumnId; import static io.trino.plugin.iceberg.IcebergSessionProperties.getExpireSnapshotMinRetention; import static io.trino.plugin.iceberg.IcebergSessionProperties.getHiveCatalogName; +import static io.trino.plugin.iceberg.IcebergSessionProperties.getQueryPartitionFilterRequiredSchemas; import static io.trino.plugin.iceberg.IcebergSessionProperties.getRemoveOrphanFilesMinRetention; import static io.trino.plugin.iceberg.IcebergSessionProperties.isCollectExtendedStatisticsOnWrite; import static io.trino.plugin.iceberg.IcebergSessionProperties.isExtendedStatisticsEnabled; @@ -742,7 +743,7 @@ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTable public void validateScan(ConnectorSession session, ConnectorTableHandle handle) { IcebergTableHandle table = (IcebergTableHandle) handle; - if (isQueryPartitionFilterRequired(session) && table.getEnforcedPredicate().isAll() && !table.getForAnalyze().orElseThrow()) { + if (isQueryPartitionFilterRequiredForTable(session, table) && table.getEnforcedPredicate().isAll() && !table.getForAnalyze().orElseThrow()) { Schema schema = SchemaParser.fromJson(table.getTableSchemaJson()); Optional partitionSpec = table.getPartitionSpecJson() .map(partitionSpecJson -> PartitionSpecParser.fromJson(schema, partitionSpecJson)); @@ -773,6 +774,14 @@ public void validateScan(ConnectorSession session, ConnectorTableHandle handle) } } + private static boolean isQueryPartitionFilterRequiredForTable(ConnectorSession session, IcebergTableHandle table) + { + Set requiredSchemas = getQueryPartitionFilterRequiredSchemas(session); + // If query_partition_filter_required_schemas is empty then we would apply partition filter for all tables. + return isQueryPartitionFilterRequired(session) && + (requiredSchemas.isEmpty() || requiredSchemas.contains(table.getSchemaName())); + } + @Override public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) { diff --git a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java index d0151c7c9076..32c2945fa2a8 100644 --- a/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java +++ b/plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergSessionProperties.java @@ -27,12 +27,17 @@ import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.session.PropertyMetadata; +import io.trino.spi.type.ArrayType; +import java.util.Collection; import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ThreadLocalRandom; import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Strings.isNullOrEmpty; +import static com.google.common.collect.ImmutableSet.toImmutableSet; import static io.trino.plugin.base.session.PropertyMetadataUtil.dataSizeProperty; import static io.trino.plugin.base.session.PropertyMetadataUtil.durationProperty; import static io.trino.plugin.base.session.PropertyMetadataUtil.validateMaxDataSize; @@ -51,7 +56,10 @@ import static io.trino.spi.session.PropertyMetadata.enumProperty; import static io.trino.spi.session.PropertyMetadata.integerProperty; import static io.trino.spi.session.PropertyMetadata.stringProperty; +import static io.trino.spi.type.VarcharType.VARCHAR; import static java.lang.String.format; +import static java.util.Locale.ENGLISH; +import static java.util.Objects.requireNonNull; public final class IcebergSessionProperties implements SessionPropertiesProvider @@ -98,6 +106,7 @@ public final class IcebergSessionProperties private static final String MERGE_MANIFESTS_ON_WRITE = "merge_manifests_on_write"; private static final String SORTED_WRITING_ENABLED = "sorted_writing_enabled"; private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required"; + private static final String QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS = "query_partition_filter_required_schemas"; private static final String INCREMENTAL_REFRESH_ENABLED = "incremental_refresh_enabled"; private final List> sessionProperties; @@ -355,6 +364,23 @@ public IcebergSessionProperties( "Require filter on partition column", icebergConfig.isQueryPartitionFilterRequired(), false)) + .add(new PropertyMetadata<>( + QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS, + "List of schemas for which filter on partition column is enforced.", + new ArrayType(VARCHAR), + Set.class, + icebergConfig.getQueryPartitionFilterRequiredSchemas(), + false, + object -> ((Collection) object).stream() + .map(String.class::cast) + .peek(property -> { + if (isNullOrEmpty(property)) { + throw new TrinoException(INVALID_SESSION_PROPERTY, format("Invalid null or empty value in %s property", QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS)); + } + }) + .map(schema -> schema.toLowerCase(ENGLISH)) + .collect(toImmutableSet()), + value -> value)) .add(booleanProperty( INCREMENTAL_REFRESH_ENABLED, "Enable Incremental refresh for MVs backed by Iceberg tables, when possible.", @@ -586,6 +612,14 @@ public static boolean isQueryPartitionFilterRequired(ConnectorSession session) return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class); } + @SuppressWarnings("unchecked cast") + public static Set getQueryPartitionFilterRequiredSchemas(ConnectorSession session) + { + Set queryPartitionFilterRequiredSchemas = (Set) session.getProperty(QUERY_PARTITION_FILTER_REQUIRED_SCHEMAS, Set.class); + requireNonNull(queryPartitionFilterRequiredSchemas, "queryPartitionFilterRequiredSchemas is null"); + return queryPartitionFilterRequiredSchemas; + } + public static boolean isIncrementalRefreshEnabled(ConnectorSession session) { return session.getProperty(INCREMENTAL_REFRESH_ENABLED, Boolean.class); diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java index 18b0e8318b94..728de653a232 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java @@ -7969,6 +7969,43 @@ public void testNonSelectStatementsWithPartitionFilterRequired() assertUpdate(session, "DROP TABLE " + tableName2); } + @Test + public void testPartitionFilterRequiredSchemas() + { + String schemaName = "test_unenforced_schema_" + randomNameSuffix(); + String tableName = "test_partition_" + randomNameSuffix(); + + Session session = Session.builder(withPartitionFilterRequired(getSession())) + .setCatalogSessionProperty("iceberg", "query_partition_filter_required_schemas", "[\"tpch\"]") + .build(); + + assertUpdate(session, "CREATE SCHEMA " + schemaName); + assertUpdate(session, format("CREATE TABLE %s.%s (id, a, ds) WITH (partitioning = ARRAY['ds']) AS SELECT 1, '1', '1'", schemaName, tableName), 1); + assertUpdate(session, "CREATE TABLE " + tableName + " (id, a, ds) WITH (partitioning = ARRAY['ds']) AS SELECT 1, '1', '1'", 1); + + String enforcedQuery = "SELECT id FROM tpch." + tableName + " WHERE a = '1'"; + assertQueryFails(session, enforcedQuery, "Filter required for tpch\\." + tableName + " on at least one of the partition columns: ds"); + + String unenforcedQuery = format("SELECT id FROM %s.%s WHERE a = '1'", schemaName, tableName); + assertQuerySucceeds(session, unenforcedQuery); + + assertUpdate(session, "DROP TABLE " + tableName); + assertUpdate(session, "DROP SCHEMA " + schemaName + " CASCADE"); + } + + @Test + public void testIgnorePartitionFilterRequiredSchemas() + { + String tableName = "test_partition_" + randomNameSuffix(); + + Session session = Session.builder(getSession()) + .setCatalogSessionProperty("iceberg", "query_partition_filter_required_schemas", "[\"tpch\"]") + .build(); + assertUpdate(session, "CREATE TABLE " + tableName + " (id, a, ds) WITH (partitioning = ARRAY['ds']) AS SELECT 1, '1', '1'", 1); + assertQuerySucceeds(session, "SELECT id FROM " + tableName + " WHERE a = '1'"); + assertUpdate(session, "DROP TABLE " + tableName); + } + private static Session withPartitionFilterRequired(Session session) { return Session.builder(session) diff --git a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java index bedc15342107..f77617882ae3 100644 --- a/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java +++ b/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/TestIcebergConfig.java @@ -14,6 +14,7 @@ package io.trino.plugin.iceberg; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; import io.airlift.units.DataSize; import io.airlift.units.Duration; import io.trino.plugin.hive.HiveCompressionCodec; @@ -67,6 +68,7 @@ public void testDefaults() .setRegisterTableProcedureEnabled(false) .setSortedWritingEnabled(true) .setQueryPartitionFilterRequired(false) + .setQueryPartitionFilterRequiredSchemas(ImmutableSet.of()) .setSplitManagerThreads(Runtime.getRuntime().availableProcessors() * 2) .setIncrementalRefreshEnabled(true)); } @@ -99,6 +101,7 @@ public void testExplicitPropertyMappings() .put("iceberg.register-table-procedure.enabled", "true") .put("iceberg.sorted-writing-enabled", "false") .put("iceberg.query-partition-filter-required", "true") + .put("iceberg.query-partition-filter-required-schemas", "bronze,silver") .put("iceberg.split-manager-threads", "42") .put("iceberg.incremental-refresh-enabled", "false") .buildOrThrow(); @@ -128,6 +131,7 @@ public void testExplicitPropertyMappings() .setRegisterTableProcedureEnabled(true) .setSortedWritingEnabled(false) .setQueryPartitionFilterRequired(true) + .setQueryPartitionFilterRequiredSchemas(ImmutableSet.of("bronze", "silver")) .setSplitManagerThreads(42) .setIncrementalRefreshEnabled(false);