Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optional check for query partition filter for Delta #18345

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/src/main/sphinx/connector/delta-lake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1136,3 +1136,8 @@ connector.
* - ``delta.projection-pushdown-enabled``
marcinsbd marked this conversation as resolved.
Show resolved Hide resolved
- Read only projected fields from row columns while performing ``SELECT`` queries
- ``true``
* - ``delta.query-partition-filter-required``
- Set to ``true`` to force a query to use a partition filter. You can use
the ``query_partition_filter_required`` catalog session property for
temporary, catalog specific use.
- ``false``
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ public class DeltaLakeConfig
private boolean legacyCreateTableWithExistingLocationEnabled;
private boolean registerTableProcedureEnabled;
private boolean projectionPushdownEnabled = true;
private boolean queryPartitionFilterRequired;

public Duration getMetadataCacheTtl()
{
Expand Down Expand Up @@ -488,4 +489,17 @@ public DeltaLakeConfig setProjectionPushdownEnabled(boolean projectionPushdownEn
this.projectionPushdownEnabled = projectionPushdownEnabled;
return this;
}

public boolean isQueryPartitionFilterRequired()
{
return queryPartitionFilterRequired;
}

@Config("delta.query-partition-filter-required")
marcinsbd marked this conversation as resolved.
Show resolved Hide resolved
@ConfigDescription("Require filter on at least one partition column")
public DeltaLakeConfig setQueryPartitionFilterRequired(boolean queryPartitionFilterRequired)
{
this.queryPartitionFilterRequired = queryPartitionFilterRequired;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isExtendedStatisticsEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isLegacyCreateTableWithExistingLocationEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isProjectionPushdownEnabled;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isQueryPartitionFilterRequired;
import static io.trino.plugin.deltalake.DeltaLakeSessionProperties.isTableStatisticsEnabled;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHANGE_DATA_FEED_ENABLED_PROPERTY;
import static io.trino.plugin.deltalake.DeltaLakeTableProperties.CHECKPOINT_INTERVAL_PROPERTY;
Expand Down Expand Up @@ -256,6 +257,7 @@
import static io.trino.spi.StandardErrorCode.INVALID_ANALYZE_PROPERTY;
import static io.trino.spi.StandardErrorCode.INVALID_SCHEMA_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.StandardErrorCode.QUERY_REJECTED;
import static io.trino.spi.connector.RetryMode.NO_RETRIES;
import static io.trino.spi.connector.RowChangeParadigm.DELETE_ROW_AND_INSERT_ROW;
import static io.trino.spi.connector.SchemaTableName.schemaTableName;
Expand Down Expand Up @@ -2514,6 +2516,13 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C

ImmutableMap.Builder<DeltaLakeColumnHandle, Domain> enforceableDomains = ImmutableMap.builder();
ImmutableMap.Builder<DeltaLakeColumnHandle, Domain> unenforceableDomains = ImmutableMap.builder();
ImmutableSet.Builder<DeltaLakeColumnHandle> constraintColumns = ImmutableSet.builder();
// We need additional field to track partition columns used in queries as enforceDomains seem to be not catching
// cases when partition columns is used within complex filter as 'partitionColumn % 2 = 0'
constraint.getPredicateColumns().stream()
.flatMap(Collection::stream)
.map(DeltaLakeColumnHandle.class::cast)
.forEach(constraintColumns::add);
for (Entry<ColumnHandle, Domain> domainEntry : constraintDomains.entrySet()) {
DeltaLakeColumnHandle column = (DeltaLakeColumnHandle) domainEntry.getKey();
if (!partitionColumns.contains(column)) {
Expand All @@ -2522,6 +2531,7 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
else {
enforceableDomains.put(column, domainEntry.getValue());
}
constraintColumns.add(column);
}

TupleDomain<DeltaLakeColumnHandle> newEnforcedConstraint = TupleDomain.withColumnDomains(enforceableDomains.buildOrThrow());
Expand All @@ -2539,15 +2549,19 @@ public Optional<ConstraintApplicationResult<ConnectorTableHandle>> applyFilter(C
tableHandle.getNonPartitionConstraint()
.intersect(newUnenforcedConstraint)
.simplify(domainCompactionThreshold),
Sets.union(tableHandle.getConstraintColumns(), constraintColumns.build()),
tableHandle.getWriteType(),
tableHandle.getProjectedColumns(),
tableHandle.getUpdatedColumns(),
tableHandle.getUpdateRowIdColumns(),
Optional.empty(),
false,
Optional.empty(),
tableHandle.getReadVersion());

if (tableHandle.getEnforcedPartitionConstraint().equals(newHandle.getEnforcedPartitionConstraint()) &&
tableHandle.getNonPartitionConstraint().equals(newHandle.getNonPartitionConstraint())) {
tableHandle.getNonPartitionConstraint().equals(newHandle.getNonPartitionConstraint()) &&
tableHandle.getConstraintColumns().equals(newHandle.getConstraintColumns())) {
return Optional.empty();
}

Expand Down Expand Up @@ -2733,6 +2747,32 @@ private static Optional<String> find(Map<String, ColumnHandle> assignments, Proj
return Optional.empty();
}

@Override
public void validateScan(ConnectorSession session, ConnectorTableHandle handle)
marcinsbd marked this conversation as resolved.
Show resolved Hide resolved
{
DeltaLakeTableHandle deltaLakeTableHandle = (DeltaLakeTableHandle) handle;

if (isQueryPartitionFilterRequired(session)) {
List<String> partitionColumns = deltaLakeTableHandle.getMetadataEntry().getCanonicalPartitionColumns();
if (!partitionColumns.isEmpty()) {
if (deltaLakeTableHandle.getAnalyzeHandle().isPresent()) {
throw new TrinoException(
QUERY_REJECTED,
"ANALYZE statement can not be performed on partitioned tables because filtering is required on at least one partition. However, the partition filtering check can be disabled with the catalog session property 'query_partition_filter_required'.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a code comment that Delta ANAlyze currently does not support specificying partitioning predicates (unlike eg Hive's)

}
Set<String> referencedColumns =
deltaLakeTableHandle.getConstraintColumns().stream()
.map(DeltaLakeColumnHandle::getBaseColumnName)
.collect(toImmutableSet());
if (Collections.disjoint(referencedColumns, partitionColumns)) {
throw new TrinoException(
QUERY_REJECTED,
format("Filter required on %s for at least one partition column: %s", deltaLakeTableHandle.getSchemaTableName(), String.join(", ", partitionColumns)));
}
}
}
}

@Override
public Optional<TableScanRedirectApplicationResult> applyTableScanRedirect(ConnectorSession session, ConnectorTableHandle tableHandle)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ public final class DeltaLakeSessionProperties
public static final String EXTENDED_STATISTICS_COLLECT_ON_WRITE = "extended_statistics_collect_on_write";
public static final String LEGACY_CREATE_TABLE_WITH_EXISTING_LOCATION_ENABLED = "legacy_create_table_with_existing_location_enabled";
private static final String PROJECTION_PUSHDOWN_ENABLED = "projection_pushdown_enabled";
private static final String QUERY_PARTITION_FILTER_REQUIRED = "query_partition_filter_required";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -200,6 +201,11 @@ public DeltaLakeSessionProperties(
PROJECTION_PUSHDOWN_ENABLED,
"Read only required fields from a row type",
deltaLakeConfig.isProjectionPushdownEnabled(),
false),
booleanProperty(
QUERY_PARTITION_FILTER_REQUIRED,
"Require filter on partition column",
deltaLakeConfig.isQueryPartitionFilterRequired(),
false));
}

Expand Down Expand Up @@ -304,4 +310,9 @@ public static boolean isProjectionPushdownEnabled(ConnectorSession session)
{
return session.getProperty(PROJECTION_PUSHDOWN_ENABLED, Boolean.class);
}

public static boolean isQueryPartitionFilterRequired(ConnectorSession session)
{
return session.getProperty(QUERY_PARTITION_FILTER_REQUIRED, Boolean.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableSet;
import io.airlift.units.DataSize;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.spi.connector.SchemaTableName;
Expand Down Expand Up @@ -62,6 +63,8 @@ public enum WriteType
// OPTIMIZE only. Coordinator-only
private final boolean recordScannedFiles;
private final Optional<DataSize> maxScannedFileSize;
// Used only for validation when config property delta.query-partition-filter-required is enabled.
private final Set<DeltaLakeColumnHandle> constraintColumns;
marcinsbd marked this conversation as resolved.
Show resolved Hide resolved

@JsonCreator
public DeltaLakeTableHandle(
Expand All @@ -87,6 +90,7 @@ public DeltaLakeTableHandle(
metadataEntry,
enforcedPartitionConstraint,
nonPartitionConstraint,
ImmutableSet.of(),
writeType,
projectedColumns,
updatedColumns,
Expand All @@ -105,6 +109,7 @@ public DeltaLakeTableHandle(
MetadataEntry metadataEntry,
TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint,
TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint,
Set<DeltaLakeColumnHandle> constraintColumns,
Optional<WriteType> writeType,
Optional<Set<DeltaLakeColumnHandle>> projectedColumns,
Optional<List<DeltaLakeColumnHandle>> updatedColumns,
Expand All @@ -131,6 +136,7 @@ public DeltaLakeTableHandle(
this.recordScannedFiles = recordScannedFiles;
this.maxScannedFileSize = requireNonNull(maxScannedFileSize, "maxScannedFileSize is null");
this.readVersion = readVersion;
this.constraintColumns = ImmutableSet.copyOf(requireNonNull(constraintColumns, "constraintColumns is null"));
}

public DeltaLakeTableHandle withProjectedColumns(Set<DeltaLakeColumnHandle> projectedColumns)
Expand All @@ -143,11 +149,14 @@ public DeltaLakeTableHandle withProjectedColumns(Set<DeltaLakeColumnHandle> proj
metadataEntry,
enforcedPartitionConstraint,
nonPartitionConstraint,
constraintColumns,
writeType,
Optional.of(projectedColumns),
updatedColumns,
updateRowIdColumns,
analyzeHandle,
recordScannedFiles,
marcinsbd marked this conversation as resolved.
Show resolved Hide resolved
maxScannedFileSize,
marcinsbd marked this conversation as resolved.
Show resolved Hide resolved
readVersion);
}

Expand All @@ -161,6 +170,7 @@ public DeltaLakeTableHandle forOptimize(boolean recordScannedFiles, DataSize max
metadataEntry,
enforcedPartitionConstraint,
nonPartitionConstraint,
constraintColumns,
writeType,
projectedColumns,
updatedColumns,
Expand Down Expand Up @@ -279,6 +289,12 @@ public Optional<DataSize> getMaxScannedFileSize()
return maxScannedFileSize;
}

@JsonIgnore
public Set<DeltaLakeColumnHandle> getConstraintColumns()
{
return constraintColumns;
}

@JsonProperty
public long getReadVersion()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorBehavior;
import io.trino.testing.minio.MinioClient;
import io.trino.testing.sql.TestTable;
import io.trino.tpch.TpchTable;
import org.intellij.lang.annotations.Language;
import org.testng.SkipException;
Expand Down Expand Up @@ -2133,6 +2134,23 @@ public void testProjectionPushdownMultipleRows()
assertUpdate("DROP TABLE " + tableName);
}

@Test
public void testPartitionFilterIncluded()
marcinsbd marked this conversation as resolved.
Show resolved Hide resolved
{
Session session = Session.builder(getSession())
.setCatalogSessionProperty(getSession().getCatalog().orElseThrow(), "query_partition_filter_required", "true")
.build();

try (TestTable table = new TestTable(
getQueryRunner()::execute,
"test_no_partition_filter",
"(x varchar, part varchar) WITH (PARTITIONED_BY = ARRAY['part'])",
ImmutableList.of("'a', 'part_a'", "'b', 'part_b'"))) {
assertQueryFails(session, "SELECT * FROM %s WHERE x='a'".formatted(table.getName()), "Filter required on .*" + table.getName() + " for at least one partition column:.*");
assertQuery(session, "SELECT * FROM %s WHERE part='part_a'".formatted(table.getName()), "VALUES ('a', 'part_a')");
}
}

private Set<String> getActiveFiles(String tableName)
{
return getActiveFiles(tableName, getQueryRunner().getDefaultSession());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ public void testDefaults()
.setUniqueTableLocation(true)
.setLegacyCreateTableWithExistingLocationEnabled(false)
.setRegisterTableProcedureEnabled(false)
.setProjectionPushdownEnabled(true));
.setProjectionPushdownEnabled(true)
.setQueryPartitionFilterRequired(false));
}

@Test
Expand Down Expand Up @@ -105,6 +106,7 @@ public void testExplicitPropertyMappings()
.put("delta.legacy-create-table-with-existing-location.enabled", "true")
.put("delta.register-table-procedure.enabled", "true")
.put("delta.projection-pushdown-enabled", "false")
.put("delta.query-partition-filter-required", "true")
.buildOrThrow();

DeltaLakeConfig expected = new DeltaLakeConfig()
Expand Down Expand Up @@ -137,7 +139,8 @@ public void testExplicitPropertyMappings()
.setUniqueTableLocation(false)
.setLegacyCreateTableWithExistingLocationEnabled(true)
.setRegisterTableProcedureEnabled(true)
.setProjectionPushdownEnabled(false);
.setProjectionPushdownEnabled(false)
.setQueryPartitionFilterRequired(true);

assertFullMapping(properties, expected);
}
Expand Down
Loading