-
-
Notifications
You must be signed in to change notification settings - Fork 27
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fuse more aggressively if parquet files are tiny #1029
Conversation
@@ -674,7 +676,7 @@ class GroupByReduction(Reduction, GroupByBase): | |||
_chunk_cls = GroupByChunk | |||
|
|||
def _tune_down(self): | |||
if len(self.by) > 1 and self.operand("split_out") is None: | |||
if self.operand("split_out") is None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't we always shuffle now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
except if split_out=1 is set explicitly
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We just had a conversation about this and agreed that we'll go for this automatic behavior. This means that some group by operations will perform a bit worse since we are forcing a shuffle that is not strictly necessary.
For large output results the shuffle is a necessity and for tiny output results, the additional shuffle step only adds a marginal performance penalty in our testing since it operates on the already reduced data.
It is a safer choice and most users will not want to or be able to dig in deep enough to set this parameter such that this is a good default choice.
dask_expr/io/parquet.py
Outdated
for col in approx_stats["columns"]: | ||
total_uncompressed += col["total_uncompressed_size"] | ||
if col["path_in_schema"] in col_op: | ||
after_projection += col["total_uncompressed_size"] | ||
|
||
total_uncompressed = max(total_uncompressed, 75_000_000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest to expose this as a config value
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll write docs for this tomorrow or later today after I finished the blog (we need release notes anyway for the trivial shuffles)
@@ -821,6 +821,8 @@ def sample_statistics(self, n=3): | |||
ixs = [] | |||
for i in range(0, nfrags, stepsize): | |||
sort_ix = finfo_argsort[i] | |||
# TODO: This is crude but the most conservative estimate | |||
sort_ix = sort_ix if sort_ix < nfrags else 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see #1032
dask_expr/io/parquet.py
Outdated
min_size = ( | ||
dask.config.get("dataframe.parquet.minimum-partition-size") or 75_000_000 | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just noticed that we have a parameter that more or less closely matches the functionality of this.
There is blocksize
which is used in the legacy parquet reader to control how row groups are concatenated. It's not a perfect match but very close one. I'm fine with keeping things as they are for now but wanted to document this for prosperity.
This PR does a few things:
Here is an AB test for this (fuse tag)
https://github.com/coiled/benchmarks/actions/runs/8709180542