-
Notifications
You must be signed in to change notification settings - Fork 655
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PERF-#6464: Improve reshuffling for multi-column groupby in low-cardinality cases #6533
Conversation
Signed-off-by: Dmitry Chigarev <[email protected]>
Signed-off-by: Dmitry Chigarev <[email protected]> PERF-modin-project#6464: Improve reshuffling multi-column groupby for low-cardinality cases Signed-off-by: Dmitry Chigarev <[email protected]>
… low-cardinality cases Signed-off-by: Dmitry Chigarev <[email protected]>
Signed-off-by: Dmitry Chigarev <[email protected]>
@@ -2432,17 +2435,9 @@ def _apply_func_to_range_partitioning( | |||
# simply combine all partitions and apply the sorting to the whole dataframe | |||
return self.combine_and_apply(func=func) | |||
|
|||
if is_numeric_dtype(self.dtypes[key_column]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this logic was moved to the ShuffleSortFunctions.pivot_fn()
key_column=columns[0], func=sort_function, ascending=ascending, **kwargs | ||
key_columns=columns[0], func=sort_function, ascending=ascending, **kwargs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we still pass only one column in case of .sort_values()
as its performance with new functionality has not been yet tested
Signed-off-by: Dmitry Chigarev <[email protected]>
@@ -2432,56 +2435,53 @@ def _apply_func_to_range_partitioning( | |||
# simply combine all partitions and apply the sorting to the whole dataframe | |||
return self.combine_and_apply(func=func) | |||
|
|||
if is_numeric_dtype(self.dtypes[key_column]): | |||
method = "linear" | |||
if ideal_num_new_partitions < len(self._partitions): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this logic was moved under the if sampling_probability >= 1
condition so it would only take place when the ideal_num_new_partitions
was actually modified
range( | ||
0, | ||
len(self._partitions), | ||
round(len(self._partitions) / ideal_num_new_partitions), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why round
and not math.ceil
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I didn't write the code, but just moved it a few lines above. My understanding, is that we want for ideal_num_new_partitions
to be at least 1.5 times less in order to shrink partitioning, as far as I understand, this is done so we wouldn't shrink too much in case the len(parts)
is just slightly bigger than ideal_num_parts
:
>>> step = math.ceil(18 / 17)
>>> len(np.split(arr, range(step, 18, step))) # we wanted 17 parts but got 9, too few
9
>>> step = round(18 / 17)
>>> len(np.split(arr, range(step, 18, step))) # we wanted 17 parts but got 18, not good not bad
18
but just in case, ping @RehanSD as an author of this code
oh, and @dchigarev - awesome PR description with very thorough details and measurements, looks super-cool and informative! |
Signed-off-by: Dmitry Chigarev <[email protected]>
@vnlitvinov thanks for your review! All the comments were answered |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, awesome work @dchigarev!
Let's give a day or two for other people to take a look, then we can merge if there would be no objections.
Signed-off-by: Dmitry Chigarev <[email protected]>
What's the problem?
The current implementation of reshuffling groupby takes the first column that was specified as the
by
argument (for example, ifgroupby(["col1", "col2"])
was called, then the "col1" will be taken) and builds range partitioning based on that picked column. The problem here is that, if the "col1" has very low cardinality (too few unique values) then there will be too few range partitions at the end and so poor cores utilization.We can't do much about this problem (at least in terms of range-partitioning implementation) when grouping on a single column, however, when multiple columns are used when grouping, we can try using all the
by
columns to build the range partitioning and potentially increase the number of range-bins being generated.How was it solved?
The reshuffling implementation now samples all the
by
columns instead of a single one in order to compute their quantile ranges. It then goes through the columns one by one and compute boundaries (pivots) for the range-bins. Once the required amount of bins is created, it terminates.Then at the splitting stage these range-bins are combined and then in the result we get more bins (some of them more likely will be empty though):
Is performance better now?
I went through some multi-column groupby cases from the benchmarks we usually run on our side (found such cases in nyc-taxi, h2o + also included our internal customer's workload for which these changes were initially intended), surprisingly to me, most of the cases had this problem with low-cardinality (it mostly only visible on MODIN_CPUS=112 though).
Under the spoilers, you may see the detailed report for each of the workload (including details regarding original data sizes and data cardinality). Here's just an aggregated table with time measurements:
The bar-charts under the spoilers show the data portions distribution between range-bins (buckets):
(if the charts are too small, you can click on them to open in full-size)
Scenario when NUM_CPUS=112
NYC-taxi cases
In the first query, the data cardinality for the first column in

by
is already pretty good, so the introduced changes don't make any difference:In the second query, the first column "passenger_count" has very low cardinality (the values are in the range of 0-10) so the data distribution between buckets (bins) is quite unbalanced. After the changes, the 14% of buckets received more or less equal portions of data (quantile 0.86), although there are a lot of really small buckets (quantile 0.1), performance has improved more than 2x:

H2O cases
Here each "id*" has values in the range of (0-10), so introducing the changes helps a lot in improving the data balance between the buckets:

Actually, the situation is identical for all of the h2o multi-column queries:



Our internal workload
Here we have "col1" having really low cardinality (range from 0-3), so the changes really helps to balance the buckets:




Scenario when NUM_CPUS=16
NYC-taxi cases
Again, the data cardinality for the first

by
column is really good, so the changes don't make a difference:Data cardinality for the 'passenger_count' is low (0-10), combined with unlucky sampling it used to result into having only 4 non-empty buckets and poor parallelization. The introduced changes generate much more non-empty buckets, helping to balance the load between cores:

H2O cases
In comparison with the NUM_CPUS=112 case, the cardinality of "id*" cols is just about right for the 16-cores case, so the changes don't make any difference.
Our internal workload
Here we have "col1" having really low cardinality (range from 0-3), so the changes really helps to balance the buckets even for the 16-cores case:
Check-list
flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
git commit -s
added andare passing (the existing multi-column groupby tests should cover new functionality)docs/development/architecture.rst
is up-to-date