-
Notifications
You must be signed in to change notification settings - Fork 655
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FEAT-#5816: Implement '.split' method for axis partitions #5856
Conversation
Signed-off-by: Dmitry Chigarev <[email protected]>
Signed-off-by: Dmitry Chigarev <[email protected]>
Signed-off-by: Dmitry Chigarev <[email protected]>
Positional arguments to pass to the `split_func`. | ||
f_kwargs : dict, optional | ||
Keyword arguments to pass to the `split_func`. | ||
extract_metadata : bool, default: False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original .split
method is already implemented in the manner of not extracting the metadata:
modin/modin/core/dataframe/pandas/partitioning/partition.py
Lines 394 to 398 in cd7611c
outputs = self.execution_wrapper.deploy( | |
split_func, [self._data] + list(args), num_returns=num_splits | |
) | |
self._is_debug(log) and log.debug(f"EXIT::Partition.split::{self._identity}") | |
return [self.__constructor__(output) for output in outputs] |
So the full-axis implementation just follows the initial approach.
We don't want to extract metadata because:
- Partitions generated by this function are temporary, at the reshuffling flow the
split_row_partitions
are immediately replaced bynew_partitions
holding new metadata, meaning that the metadata ofsplit_row_partitions
is never accessed:
modin/modin/core/dataframe/pandas/partitioning/partition_manager.py
Lines 1600 to 1608 in cd7611c
# We need to convert every partition that came from the splits into a full-axis column partition. new_partitions = [ [ cls._column_partitions_class(row_partition, full_axis=False).apply( final_shuffle_func ) ] for row_partition in split_row_partitions ] - The splitting stage generates a lot of partitions (up to
ncores ^ 2
), it's already not an easy task for ray to put into storage that big amount of futures at once, the situation becomes even worse when we ask to store the metadata futures as well (4 * (ncores ^ 2)
amount of futures at once). I've measured the case from [PERF] Slow sort_values in value_counts #5533 with and without the partition's metadata, and received a stable 9% speed-up (~ 0.12s) for the case without metadata.
@@ -1608,5 +1610,5 @@ def shuffle_partitions( | |||
else: | |||
# If there are not pivots we can simply apply the function row-wise | |||
return np.array( | |||
[[row_part.apply(final_shuffle_func)] for row_part in row_partitions] | |||
[row_part.apply(final_shuffle_func) for row_part in row_partitions] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
row_part
is now actually a row partition returning a list, meaning there's no need to wrap this into a list no more
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much @dchigarev! This is a great catch! I've left a few minor comments, but overall PR looks great to me!
@@ -1608,5 +1610,5 @@ def shuffle_partitions( | |||
else: | |||
# If there are not pivots we can simply apply the function row-wise | |||
return np.array( | |||
[[row_part.apply(final_shuffle_func)] for row_part in row_partitions] | |||
[row_part.apply(final_shuffle_func) for row_part in row_partitions] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense!
modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py
Outdated
Show resolved
Hide resolved
modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py
Outdated
Show resolved
Hide resolved
...n/core/execution/unidist/implementations/pandas_on_unidist/partitioning/virtual_partition.py
Outdated
Show resolved
Hide resolved
Signed-off-by: Dmitry Chigarev <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
@RehanSD are you ok with the current changes? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Thank you for the awesome work @dchigarev!
What do these changes do?
The PR introduces an implementation of the
.split()
method for axis partitions, making it possible to avoid materialization of the virtual row partitions during reshuffling.ASV results:
flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
git commit -s
docs/development/architecture.rst
is up-to-date