-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Optional check for query partition filter for Hudi #19906
Optional check for query partition filter for Hudi #19906
Conversation
cca8320
to
54b391a
Compare
- Set to `true` to force a query to use a partition filter. You can use the | ||
`query_partition_filter_required` catalog session property for temporary, | ||
catalog specific use. | ||
- `false` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution @krvikash . I am yet to review the full code. But, just for my understanding, why should the default be false? Is the plan produced an unoptimized one if the query uses a partition filter? It would be helpful if you can paste the plans for a simple query with partition filter with and without this config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See https://trino.io/docs/current/connector/delta-lake.html where delta.query-partition-filter-required
is set to false
by default as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @codope, This PR aims to mandate the inclusion of partition filtering in SELECT queries. This prevents accidental execution of SELECT *
queries on tables containing substantial amount of data. Notably, this enforcement won't alter the query plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clarification. I am still not sure why the default is false if it's a good thing?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We prefer not to enforce this unless explicitly instructed to do so, following the approach of Hive, Delta, and Iceberg.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But enabling the delta.query-partition-filter-required property by default will fail some queries that do not meet the partition column requirement. It will be unexpected for the existing user.
Exactly - This config avoid hitting potentially long running queries (for larger tables) but enabling it default might restrict the set of queries which would be executed on Trino for smaller tables as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codope does this answer your question?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes thanks @krvikash. I think we should document the full behavior, if possible with an example. User should be aware of side-effects of enabling this config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codope Thanks for review. Added the behavior in the doc. Please take a look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the other part is that depending on the expressions used in the predicate we may or may not be able to understand that the partitions are actually filtered leading to false positives too.
54b391a
to
e67badc
Compare
(some cosmetic change) |
plugin/trino-hudi/src/test/java/io/trino/plugin/hudi/TestHudiSmokeTest.java
Outdated
Show resolved
Hide resolved
plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiMetadata.java
Outdated
Show resolved
Hide resolved
plugin/trino-hudi/src/main/java/io/trino/plugin/hudi/HudiTableHandle.java
Show resolved
Hide resolved
e67badc
to
5430982
Compare
// When there is some predicate which could not be translated into tuple domain, | ||
// such as in cases with complex conditions like 'id = 1 OR name = 'INDIA'. | ||
constraint.getPredicateColumns().stream() | ||
.flatMap(Collection::stream) | ||
.map(HiveColumnHandle.class::cast)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does Hudi uses the Constraint#predicate
? Like if a query has a complex expression like id = 1 OR name = 'INDIA'
the connector could access the expression via Constraint#predicate
i.e Connector could pass in the partition values and check the result of the expression - so that it could either pick a partition or reject. I don't see the usages of the Constraint#predicate
here - If the predicate is not used - then let's not add those column to constraintColumns
as they are not participating in the partition pushdown. WDYT ?
cc: @codope I'm not a hudi expert - Please correct me if I miss something
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I compared the behavior of hive
and hudi
when query_partition_filter_required
is enabled. The partition column used in the query satisfies query_partition_filter_required
requirement even when the column doesn't contribute to the pushdown.
Hive
:
id --> Non-Part column
ds --> Part column
Query --> "EXPLAIN SELECT * FROM test_required_partition_filter WHERE id = 1 OR ds ='INDIA'"
MaterializedResult{rows=[[Trino version: testversion
Fragment 0 [SOURCE]
Output layout: [id, a, b, ds]
Output partitioning: SINGLE []
Output[columnNames = [id, a, b, ds]]
│ Layout: [id:integer, a:varchar, b:varchar, ds:varchar]
│ Estimates: {rows: 1 (22B), cpu: 0, memory: 0B, network: 0B}
└─ ScanFilter[table = hive:tpch:test_required_partition_filter, filterPredicate = (("id" = 1) OR ("ds" = VARCHAR 'INDIA'))]
Layout: [id:integer, a:varchar, b:varchar, ds:varchar]
Estimates: {rows: 1 (22B), cpu: 22, memory: 0B, network: 0B}/{rows: 1 (22B), cpu: 22, memory: 0B, network: 0B}
a := a:string:REGULAR
b := b:string:REGULAR
id := id:int:REGULAR
ds := ds:string:PARTITION_KEY
:: [[1]]
]], types=[varchar(768)], setSessionProperties={}, resetSessionProperties=[]}
Hudi
:
id --> Non-Part column
dt --> Part column
Query --> "EXPLAIN SELECT name FROM " + HUDI_COW_PT_TBL + " WHERE id = 1 OR dt = '2021-12-09'"
MaterializedResult{rows=[[Trino version: testversion
Fragment 0 [SOURCE]
Output layout: [name]
Output partitioning: SINGLE []
Output[columnNames = [name]]
│ Layout: [name:varchar]
│ Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}
└─ ScanFilterProject[table = hudi:tests.hudi_cow_pt_tbl, filterPredicate = (("id" = BIGINT '1') OR ("dt" = VARCHAR '2021-12-09'))]
Layout: [name:varchar]
Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}
dt := dt:string:PARTITION_KEY
name := name:string:REGULAR
id := id:bigint:REGULAR
]], types=[varchar(686)], setSessionProperties={}, resetSessionProperties=[]}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Praveen2112 Yes you were right Constraint#predicate
is not getting utilized during split generation that means all the data files will be read for such cases where predicate does not get translated to TupleDomain
. In this case constraint#predicateColumns
does not need to be part of constraintColumns
.
5430982
to
b1aad68
Compare
b1aad68
to
7bd0d25
Compare
(rebased and resolved conflicts) |
The change is to make the schema sync with hudi_cow_pt_tbl.
7bd0d25
to
0db58b7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Set to `true` to force a query to use a partition filter. You can use the | ||
`query_partition_filter_required` catalog session property for temporary, | ||
catalog specific use. | ||
- `false` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the clarification. I am still not sure why the default is false if it's a good thing?
|
||
// TODO Since the constraint#predicate isn't utilized during split generation. So, | ||
// Let's not add constraint#predicateColumns to newConstraintColumns. | ||
Set<HiveColumnHandle> newConstraintColumns = Stream.concat( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding
Constraint#predicate
is not getting utilized during split generation that means all the data files will be read for such cases where predicate does not get translated toTupleDomain
.
It could have been a miss in the first implementation, probably due to lack of column stats index. What do we lose if we include Constraint#predicate
in newConstraintColumns
? I think we should still include it in the metadata layer. Using it in the split manager layer can be fixed in another PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do we lose if we include Constraint#predicate in newConstraintColumns?
Thanks @codope for the review. Since during spilt generation we do not filter out the data files based on the Constraint#predicate
then even if query-partition-filter-required
is enforced all the data files will be read, which is the false alarm.
I have mentioned the TODO
comment in the HudiMetadata
class. Once Hudi uses Constraint#predicate
then we can include Constraint#predicate
in newConstraintColumns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. @mosabua Can you please look at the docs related change.
- Set to `true` to force a query to use a partition filter. You can use the | ||
`query_partition_filter_required` catalog session property for temporary, | ||
catalog specific use. | ||
- `false` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But enabling the delta.query-partition-filter-required property by default will fail some queries that do not meet the partition column requirement. It will be unexpected for the existing user.
Exactly - This config avoid hitting potentially long running queries (for larger tables) but enabling it default might restrict the set of queries which would be executed on Trino for smaller tables as well.
88162f4
to
76d000d
Compare
76d000d
to
686939f
Compare
Thanks, @codope, @mosabua, @Praveen2112, @marcinsbd for the review. Addressed comments. |
686939f
to
fe2ef1b
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I approve of the docs changes .. but overall I feel like this is a brittle feature .. definitely should be disabled by default and not too much relied upon when enabled..
I agree with you. In the future we could move it to the resource groups, this checks allow us to restrict long running query before hand. |
Description
For Hudi partitioned tables, we should reject the table scan produced by the planner when the query does not have partition field.
Add option to enforce that a filter on a partition key be present in the query. This can be enabled by setting the
hudi.query-partition-filter-required
config property or thequery_partition_filter_required
session propertyto
true
Additional context and related issues
Implementation for the Delta Lake connector #18345
cc @marcinsbd
Release notes
( ) This is not user-visible or is docs only, and no release notes are required.
( ) Release notes are required. Please propose a release note for me.
(x) Release notes are required, with the following suggested text: