-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use non partitioned dynamic filter limits for FTE #17831
Conversation
core/trino-main/src/main/java/io/trino/execution/DynamicFilterConfig.java
Show resolved
Hide resolved
core/trino-main/src/main/java/io/trino/sql/planner/LocalExecutionPlanner.java
Outdated
Show resolved
Hide resolved
@@ -41,7 +41,7 @@ | |||
private boolean enableCoordinatorDynamicFiltersDistribution = true; | |||
private boolean enableLargeDynamicFilters; | |||
|
|||
private int smallMaxDistinctValuesPerDriver = 1_000; | |||
private int smallMaxDistinctValuesPerDriver = 10_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add benchmark results for streaming mode at 1k scale factor ?
In general, rather than increasing config by 10X because it didn't significantly regress some TPC workloads, I would prefer that we increase this only as much as is necessary for good TPC results. I already 5X'ed this number very recently.
fyi @sopel39
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This only increases the limits for BROADCAST
joins where build side is usually small, do you think it may still be significant?
I would prefer that we increase this only as much as is necessary for good TPC results
There are 5000+ partitions in partitioned tables in a TPC/DS sf10000 schema. There are also plenty of joins in TPC-DS based on date dimension. This limit increase ensures the filters are always available.
@@ -51,7 +51,7 @@ | |||
private DataSize smallPartitionedMaxSizePerOperator = DataSize.of(500, KILOBYTE); | |||
private DataSize smallMaxSizePerFilter = DataSize.of(5, MEGABYTE); | |||
|
|||
private int largeMaxDistinctValuesPerDriver = 10_000; | |||
private int largeMaxDistinctValuesPerDriver = 100_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that this is a substantial increase, is this increase benchmarked separately from the "small" one ? IIUC we're only benchmarking "small" DF limits in our usual perf benchmarks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please elaborate why do we see performance regression with increasing the limits? The values are collected with a hash set, and it's performance shouldn't vary dramatically with an increase of it's size. The complexity is still O(1) assuming right fill ration, maybe only cache misses may increase (though the total memory footprint is limited?).
119fd8a
to
e0bbd15
Compare
Replace `*.small-broadcast.*' and '*.large-broadcast.*' configuration parameters with simply `*.small.*` and `*.large.*`. This commit also adjusts documentation accordingly. "partitioned" limits no longer map to "partitioned" joins one to one, but merely indicate limits being applied to collection of dynamic filters over a "partitioned" data (such as build sides in partitioned joins in non-fte mode).
e0bbd15
to
4dea674
Compare
@raunaqmorarka I may need to take a closer look into the limits and run additional benchmarks. I decided to drop the last commit (increasing the limits) for now. |
@arhimondr I think a lot of this proposed release note should be added to docs as an explanation for these config properties... |
I updated the docs (https://github.com/trinodb/trino/pull/17831/files#diff-f7929648d722198d4cb41322270ed5016db39e42312b14f60853cf92396f84a1R214). Do you think it is insufficient and should be expanded? |
I somehow missed the docs changes for whatever reason. Yeah, that should be good. |
Description
This PR does several things:
Drops the word "broadcast" from dynamic filter limit properties. Instead there will be two types of limits:
standard
andpartitioned
. Standard limits are applied when collection happens over raw data stream (broadcast join for streaming and any join for FTE). Partitioned limits are applied when collection happens over pre-partitioned data source. Partitioned limits are expected to be lower as each operator processes only a subset of keys.Additional this PR changes FTE to use "standard" limits instead of "partitioned", because dynamic filters collection in FTE happens over raw data source (each operator will see the entire set of keys, similar when collecting dynamic filters for broadcast join in streaming).
It also further increase "standard" limits. The increase doesn't produce any visible CPU regression (tested with standard TPC-DS benchmark over partitioned sf10000 schema).
Additional context and related issues
Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(X) Release notes are required, with the following suggested text: