Skip to content

Commit

Permalink
Optional check for query partition filter for Delta
Browse files Browse the repository at this point in the history
  • Loading branch information
marcinsbd authored and Praveen2112 committed Aug 2, 2023
1 parent d0c22e7 commit 1ba82f8
Show file tree
Hide file tree
Showing 9 changed files with 626 additions and 3 deletions.
5 changes: 5 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1158,3 +1158,8 @@ connector.
* - ``delta.projection-pushdown-enabled``
- Read only projected fields from row columns while performing ``SELECT`` queries
- ``true``
* - ``delta.query-partition-filter-required``
- Set to ``true`` to force a query to use a partition filter. You can use
the ``query_partition_filter_required`` catalog session property for
temporary, catalog specific use.
- ``false``
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class DeltaLakeConfig
private boolean legacyCreateTableWithExistingLocationEnabled;
private boolean registerTableProcedureEnabled;
private boolean projectionPushdownEnabled = true;
private boolean queryPartitionFilterRequired;

public Duration getMetadataCacheTtl()
{
Expand Down Expand Up @@ -488,4 +489,17 @@ public DeltaLakeConfig setProjectionPushdownEnabled(boolean projectionPushdownEn
this.projectionPushdownEnabled = projectionPushdownEnabled;
return this;
}

public boolean isQueryPartitionFilterRequired()
{
return queryPartitionFilterRequired;
}

@Config("delta.query-partition-filter-required")
@ConfigDescription("Require filter on at least one partition column")
public DeltaLakeConfig setQueryPartitionFilterRequired(boolean queryPartitionFilterRequired)
{
this.queryPartitionFilterRequired = queryPartitionFilterRequired;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isExtendedStatisticsEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isLegacyCreateTableWithExistingLocationEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isProjectionPushdownEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isQueryPartitionFilterRequired;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isTableStatisticsEnabled;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHANGE_DATA_FEED_ENABLED_PROPERTY;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHECKPOINT_INTERVAL_PROPERTY;
Expand Down Expand Up @@ -256,6 +257,7 @@
import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY;
import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.QUERY_REJECTED;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.connector.RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW;
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
Expand Down Expand Up @@ -2516,6 +2518,13 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C

ImmutableMap.Builder<DeltaLakeColumnHandle, Domain> enforceableDomains = ImmutableMap.builder();
ImmutableMap.Builder<DeltaLakeColumnHandle, Domain> unenforceableDomains = ImmutableMap.builder();
ImmutableSet.Builder<DeltaLakeColumnHandle> constraintColumns = ImmutableSet.builder();
// We need additional field to track partition columns used in queries as enforceDomains seem to be not catching
// cases when partition columns is used within complex filter as 'partitionColumn % 2 = 0'
constraint.getPredicateColumns().stream()
.flatMap(Collection::stream)
.map(DeltaLakeColumnHandle.class::cast)
.forEach(constraintColumns::add);
for (Entry<ColumnHandle, Domain> domainEntry : constraintDomains.entrySet()) {
DeltaLakeColumnHandle column = (DeltaLakeColumnHandle) domainEntry.getKey();
if (!partitionColumns.contains(column)) {
Expand All @@ -2524,6 +2533,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
else {
enforceableDomains.put(column, domainEntry.getValue());
}
constraintColumns.add(column);
}

TupleDomain<DeltaLakeColumnHandle> newEnforcedConstraint = TupleDomain.withColumnDomains(enforceableDomains.buildOrThrow());
Expand All @@ -2541,15 +2551,19 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
tableHandle.getNonPartitionConstraint()
.intersect(newUnenforcedConstraint)
.simplify(domainCompactionThreshold),
Sets.union(tableHandle.getConstraintColumns(), constraintColumns.build()),
tableHandle.getWriteType(),
tableHandle.getProjectedColumns(),
tableHandle.getUpdatedColumns(),
tableHandle.getUpdateRowIdColumns(),
Optional.empty(),
false,
Optional.empty(),
tableHandle.getReadVersion());

if (tableHandle.getEnforcedPartitionConstraint().equals(newHandle.getEnforcedPartitionConstraint()) &&
tableHandle.getNonPartitionConstraint().equals(newHandle.getNonPartitionConstraint())) {
tableHandle.getNonPartitionConstraint().equals(newHandle.getNonPartitionConstraint()) &&
tableHandle.getConstraintColumns().equals(newHandle.getConstraintColumns())) {
return Optional.empty();
}

Expand Down Expand Up @@ -2735,6 +2749,32 @@ private static Optional<String> find(Map<String, ColumnHandle> assignments, Proj
return Optional.empty();
}

@Override
public void validateScan(ConnectorSession session, ConnectorTableHandle handle)
{
DeltaLakeTableHandle deltaLakeTableHandle = (DeltaLakeTableHandle) handle;

if (isQueryPartitionFilterRequired(session)) {
List<String> partitionColumns = deltaLakeTableHandle.getMetadataEntry().getCanonicalPartitionColumns();
if (!partitionColumns.isEmpty()) {
if (deltaLakeTableHandle.getAnalyzeHandle().isPresent()) {
throw new TrinoException(
QUERY_REJECTED,
"ANALYZE statement can not be performed on partitioned tables because filtering is required on at least one partition. However, the partition filtering check can be disabled with the catalog session property 'query_partition_filter_required'.");
}
Set<String> referencedColumns =
deltaLakeTableHandle.getConstraintColumns().stream()
.map(DeltaLakeColumnHandle::getBaseColumnName)
.collect(toImmutableSet());
if (Collections.disjoint(referencedColumns, partitionColumns)) {
throw new TrinoException(
QUERY_REJECTED,
format("Filter required on %s for at least one partition column: %s", deltaLakeTableHandle.getSchemaTableName(), String.join(", ", partitionColumns)));
}
}
}
}

@Override
public Optional<TableScanRedirectApplicationResult> applyTableScanRedirect(ConnectorSession session, ConnectorTableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public final class DeltaLakeSessionProperties
public static final String EXTENDED_STATISTICS_COLLECT_ON_WRITE = "extended_statistics_collect_on_write";
public static final String LEGACY_CREATE_TABLE_WITH_EXISTING_LOCATION_ENABLED = "legacy_create_table_with_existing_location_enabled";
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -200,6 +201,11 @@ public DeltaLakeSessionProperties(
PROJECTION_PUSHDOWN_ENABLED,
"Read only required fields from a row type",
deltaLakeConfig.isProjectionPushdownEnabled(),
false),
booleanProperty(
QUERY_PARTITION_FILTER_REQUIRED,
"Require filter on partition column",
deltaLakeConfig.isQueryPartitionFilterRequired(),
false));
}

Expand Down Expand Up @@ -304,4 +310,9 @@ public static boolean isProjectionPushdownEnabled(ConnectorSession session)
{
return session.getProperty(PROJECTION_PUSHDOWN_ENABLED, Boolean.class);
}

public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
{
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.spi.connector.SchemaTableName;
Expand Down Expand Up @@ -62,6 +63,8 @@ public enum WriteType
// OPTIMIZE only. Coordinator-only
private final boolean recordScannedFiles;
private final Optional<DataSize> maxScannedFileSize;
// Used only for validation when config property delta.query-partition-filter-required is enabled.
private final Set<DeltaLakeColumnHandle> constraintColumns;

@JsonCreator
public DeltaLakeTableHandle(
Expand All @@ -87,6 +90,7 @@ public DeltaLakeTableHandle(
metadataEntry,
enforcedPartitionConstraint,
nonPartitionConstraint,
ImmutableSet.of(),
writeType,
projectedColumns,
updatedColumns,
Expand All @@ -105,6 +109,7 @@ public DeltaLakeTableHandle(
MetadataEntry metadataEntry,
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Set<DeltaLakeColumnHandle> constraintColumns,
Optional<WriteType> writeType,
Optional<Set<DeltaLakeColumnHandle>> projectedColumns,
Optional<List<DeltaLakeColumnHandle>> updatedColumns,
Expand All @@ -131,6 +136,7 @@ public DeltaLakeTableHandle(
this.recordScannedFiles = recordScannedFiles;
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
this.readVersion = readVersion;
this.constraintColumns = ImmutableSet.copyOf(requireNonNull(constraintColumns, "constraintColumns is null"));
}

public DeltaLakeTableHandle withProjectedColumns(Set<DeltaLakeColumnHandle> projectedColumns)
Expand All @@ -143,6 +149,7 @@ public DeltaLakeTableHandle withProjectedColumns(Set<DeltaLakeColumnHandle> proj
metadataEntry,
enforcedPartitionConstraint,
nonPartitionConstraint,
constraintColumns,
writeType,
Optional.of(projectedColumns),
updatedColumns,
Expand All @@ -163,6 +170,7 @@ public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize max
metadataEntry,
enforcedPartitionConstraint,
nonPartitionConstraint,
constraintColumns,
writeType,
projectedColumns,
updatedColumns,
Expand Down Expand Up @@ -281,6 +289,12 @@ public Optional<DataSize> getMaxScannedFileSize()
return maxScannedFileSize;
}

@JsonIgnore
public Set<DeltaLakeColumnHandle> getConstraintColumns()
{
return constraintColumns;
}

@JsonProperty
public long getReadVersion()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorBehavior;
import io.trino.testing.minio.MinioClient;
import io.trino.testing.sql.TestTable;
import io.trino.tpch.TpchTable;
import org.intellij.lang.annotations.Language;
import org.testng.SkipException;
Expand Down Expand Up @@ -2133,6 +2134,23 @@ public void testProjectionPushdownMultipleRows()
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testPartitionFilterIncluded()
{
Session session = Session.builder(getSession())
.setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "query_partition_filter_required", "true")
.build();

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_no_partition_filter",
"(x varchar, part varchar) WITH (PARTITIONED_BY = ARRAY['part'])",
ImmutableList.of("'a', 'part_a'", "'b', 'part_b'"))) {
assertQueryFails(session, "SELECT * FROM %s WHERE x='a'".formatted(table.getName()), "Filter required on .*" + table.getName() + " for at least one partition column:.*");
assertQuery(session, "SELECT * FROM %s WHERE part='part_a'".formatted(table.getName()), "VALUES ('a', 'part_a')");
}
}

private Set<String> getActiveFiles(String tableName)
{
return getActiveFiles(tableName, getQueryRunner().getDefaultSession());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public void testDefaults()
.setUniqueTableLocation(true)
.setLegacyCreateTableWithExistingLocationEnabled(false)
.setRegisterTableProcedureEnabled(false)
.setProjectionPushdownEnabled(true));
.setProjectionPushdownEnabled(true)
.setQueryPartitionFilterRequired(false));
}

@Test
Expand Down Expand Up @@ -105,6 +106,7 @@ public void testExplicitPropertyMappings()
.put("delta.legacy-create-table-with-existing-location.enabled", "true")
.put("delta.register-table-procedure.enabled", "true")
.put("delta.projection-pushdown-enabled", "false")
.put("delta.query-partition-filter-required", "true")
.buildOrThrow();

DeltaLakeConfig expected = new DeltaLakeConfig()
Expand Down Expand Up @@ -137,7 +139,8 @@ public void testExplicitPropertyMappings()
.setUniqueTableLocation(false)
.setLegacyCreateTableWithExistingLocationEnabled(true)
.setRegisterTableProcedureEnabled(true)
.setProjectionPushdownEnabled(false);
.setProjectionPushdownEnabled(false)
.setQueryPartitionFilterRequired(true);

assertFullMapping(properties, expected);
}
Expand Down
Loading

0 comments on commit 1ba82f8

Please sign in to comment.