From c9aa819f27e58caa61fbf3420561fee65f8d1fc8 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 22 Mar 2023 14:02:06 +0000 Subject: [PATCH 1/4] FEAT-#5816: Implement '.split' method for axis partitions Signed-off-by: Dmitry Chigarev --- .../base/partitioning/axis_partition.py | 31 +++++++- .../pandas/partitioning/axis_partition.py | 77 +++++++++++++++++++ .../pandas/partitioning/partition_manager.py | 12 +-- .../partitioning/virtual_partition.py | 66 +++++++++++----- .../partitioning/virtual_partition.py | 62 ++++++++++----- .../partitioning/virtual_partition.py | 64 ++++++++++----- 6 files changed, 245 insertions(+), 67 deletions(-) diff --git a/modin/core/dataframe/base/partitioning/axis_partition.py b/modin/core/dataframe/base/partitioning/axis_partition.py index db6c7b883e7..fbb214ad028 100644 --- a/modin/core/dataframe/base/partitioning/axis_partition.py +++ b/modin/core/dataframe/base/partitioning/axis_partition.py @@ -22,6 +22,11 @@ class BaseDataframeAxisPartition(ABC): # pragma: no cover An abstract class that represents the parent class for any axis partition class. This class is intended to simplify the way that operations are performed. + + Attributes + ---------- + _PARTITIONS_METADATA_LEN : int + The number of metadata values that the object of `partition_type` consumes. """ @property @@ -87,8 +92,11 @@ def apply( # Child classes must have these in order to correctly subclass. instance_type = None partition_type = None + _PARTITIONS_METADATA_LEN = 0 - def _wrap_partitions(self, partitions: list) -> list: + def _wrap_partitions( + self, partitions: list, extract_metadata: Optional[bool] = None + ) -> list: """ Wrap remote partition objects with `BaseDataframePartition` class. @@ -96,6 +104,9 @@ def _wrap_partitions(self, partitions: list) -> list: ---------- partitions : list List of remotes partition objects to be wrapped with `BaseDataframePartition` class. + extract_metadata : bool, optional + Whether the partitions list contains information about partition's metadata. + If `None` was passed will take the argument's value from the value of `cls._PARTITIONS_METADATA_LEN`. Returns ------- @@ -105,7 +116,23 @@ def _wrap_partitions(self, partitions: list) -> list: assert self.partition_type is not None assert self.instance_type is not None # type: ignore - return [self.partition_type(obj) for obj in partitions] + if extract_metadata is None: + # If `_PARTITIONS_METADATA_LEN == 0` then the execution doesn't support metadata + # and thus we should never try extracting it, otherwise assuming that the common + # approach of always passing the metadata is used. + extract_metadata = bool(self._PARTITIONS_METADATA_LEN) + + if extract_metadata: + return [ + self.partition_type(*init_args) + for init_args in zip( + # `partition_type` consumes `(object_id, *metadata)`, thus adding `+1` + *[iter(partitions)] + * (self._PARTITIONS_METADATA_LEN + 1) + ) + ] + else: + return [self.partition_type(object_id) for object_id in partitions] def force_materialization( self, get_ip: bool = False diff --git a/modin/core/dataframe/pandas/partitioning/axis_partition.py b/modin/core/dataframe/pandas/partitioning/axis_partition.py index 55a36b28c15..d1dade379ef 100644 --- a/modin/core/dataframe/pandas/partitioning/axis_partition.py +++ b/modin/core/dataframe/pandas/partitioning/axis_partition.py @@ -134,6 +134,83 @@ def apply( ) ) + def split( + self, split_func, num_splits, f_args=None, f_kwargs=None, extract_metadata=False + ): + """ + Split axis partition into multiple partitions using the `split_func`. + + Parameters + ---------- + split_func : callable(pandas.DataFrame, *args, **kwargs) -> list[pandas.DataFrame] + A function that takes partition's content and split it into multiple chunks. + num_splits : int + The number of splits the `split_func` return. + f_args : iterable, optional + Positional arguments to pass to the `split_func`. + f_kwargs : dict, optional + Keyword arguments to pass to the `split_func`. + extract_metadata : bool, default: False + Whether to return metadata (length, width, ip) of the result. Passing `False` may relax + the load on object storage as the remote function would return X times fewer futures + (where X is the number of metadata values). Passing `False` makes sense for temporary + results where you know for sure that the metadata will never be requested. + """ + f_args = tuple() if f_args is None else f_args + f_kwargs = {} if f_kwargs is None else f_kwargs + return self._wrap_partitions( + self.deploy_splitting_func( + self.axis, + split_func, + f_args, + f_kwargs, + num_splits, + *self.list_of_blocks, + extract_metadata=extract_metadata, + ), + extract_metadata=extract_metadata, + ) + + @classmethod + def deploy_splitting_func( + cls, + axis, + split_func, + f_args, + f_kwargs, + num_splits, + *partitions, + extract_metadata=False, + ): + """ + Deploy a splitting function along a full axis. + + Parameters + ---------- + axis : {0, 1} + The axis to perform the function along. + split_func : callable(pandas.DataFrame, *args, **kwargs) -> list[pandas.DataFrame] + The function to perform. + f_args : list or tuple + Positional arguments to pass to ``split_func``. + f_kwargs : dict + Keyword arguments to pass to ``split_func``. + num_splits : int + The number of splits the `split_func` return. + *partitions : iterable + All partitions that make up the full axis (row or column). + extract_metadata : bool, default: False + Whether to return metadata (length, width, ip) of the result. Note that `True` value + is not supported in `PandasDataframeAxisPartition` class. + + Returns + ------- + list + A list of pandas DataFrames. + """ + dataframe = pandas.concat(list(partitions), axis=axis, copy=False) + return split_func(dataframe, *f_args, **f_kwargs) + @classmethod def deploy_axis_func( cls, diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index e0715da4b5b..bcbfdc962f7 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -1584,15 +1584,17 @@ def shuffle_partitions( # Convert our list of block partitions to row partitions. We need to create full-axis # row partitions since we need to send the whole partition to the split step as otherwise # we wouldn't know how to split the block partitions that don't contain the shuffling key. - row_partitions = [ - partition.force_materialization().list_of_block_partitions[0] - for partition in cls.row_partitions(partitions) - ] + row_partitions = [partition for partition in cls.row_partitions(partitions)] # Gather together all of the sub-partitions split_row_partitions = np.array( [ partition.split( - shuffle_functions.split_function, len(pivots) + 1, pivots + shuffle_functions.split_function, + num_splits=len(pivots) + 1, + f_args=(pivots,), + # The partition's metadata will never be accessed for the split partitions, + # thus no need to compute it. + extract_metadata=False, ) for partition in row_partitions ] diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py index 66ba925ac4e..0ec97b7a457 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py @@ -50,6 +50,7 @@ class PandasOnDaskDataframeVirtualPartition(PandasDataframeAxisPartition): """ axis = None + _PARTITIONS_METADATA_LEN = 3 # (length, width, ip) partition_type = PandasOnDaskDataframePartition instance_type = Future @@ -145,6 +146,34 @@ def list_of_ips(self): result[idx] = partition._ip_cache return result + @classmethod + @_inherit_docstrings(PandasDataframeAxisPartition.deploy_splitting_func) + def deploy_splitting_func( + cls, + axis, + func, + f_args, + f_kwargs, + num_splits, + *partitions, + extract_metadata=False, + ): + return DaskWrapper.deploy( + func=_deploy_dask_func, + f_args=( + PandasDataframeAxisPartition.deploy_splitting_func, + axis, + func, + f_args, + f_kwargs, + num_splits, + *partitions, + ), + f_kwargs={"extract_metadata": extract_metadata}, + num_returns=num_splits * 4 if extract_metadata else num_splits, + pure=False, + ) + @classmethod def deploy_axis_func( cls, @@ -266,25 +295,6 @@ def deploy_func_between_two_axis_partitions( pure=False, ) - def _wrap_partitions(self, partitions): - """ - Wrap partitions passed as a list of distributed.Future with ``PandasOnDaskDataframePartition`` class. - - Parameters - ---------- - partitions : list - List of distributed.Future. - - Returns - ------- - list - List of ``PandasOnDaskDataframePartition`` objects. - """ - return [ - self.partition_type(future, length, width, ip) - for (future, length, width, ip) in zip(*[iter(partitions)] * 4) - ] - def apply( self, func, @@ -505,7 +515,16 @@ class PandasOnDaskDataframeRowPartition(PandasOnDaskDataframeVirtualPartition): axis = 1 -def _deploy_dask_func(deployer, axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs): +def _deploy_dask_func( + deployer, + axis, + f_to_deploy, + f_args, + f_kwargs, + *args, + extract_metadata=True, + **kwargs, +): """ Execute a function on an axis partition in a worker process. @@ -527,6 +546,11 @@ def _deploy_dask_func(deployer, axis, f_to_deploy, f_args, f_kwargs, *args, **kw Keyword arguments to pass to ``f_to_deploy``. *args : list Positional arguments to pass to ``func``. + extract_metadata : bool, default: True + Whether to return metadata (length, width, ip) of the result. Passing `False` may relax + the load on object storage as the remote function would return 4 times fewer futures. + Passing `False` makes sense for temporary results where you know for sure that the + metadata will never be requested. **kwargs : dict Keyword arguments to pass to ``func``. @@ -536,6 +560,8 @@ def _deploy_dask_func(deployer, axis, f_to_deploy, f_args, f_kwargs, *args, **kw The result of the function ``func`` and metadata for it. """ result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs) + if not extract_metadata: + return result ip = get_ip() if isinstance(result, pandas.DataFrame): return result, len(result), len(result.columns), ip diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index e3aab745169..ca538f511ea 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -29,6 +29,7 @@ # If Ray has not been initialized yet by Modin, # it will be initialized when calling `RayWrapper.put`. _DEPLOY_AXIS_FUNC = RayWrapper.put(PandasDataframeAxisPartition.deploy_axis_func) +_DEPLOY_SPLIT_FUNC = RayWrapper.put(PandasDataframeAxisPartition.deploy_splitting_func) _DRAIN = RayWrapper.put(PandasDataframeAxisPartition.drain) @@ -54,6 +55,7 @@ class PandasOnRayDataframeVirtualPartition(PandasDataframeAxisPartition): Width, or reference to width, of wrapped ``pandas.DataFrame``. """ + _PARTITIONS_METADATA_LEN = 3 # (length, width, ip) partition_type = PandasOnRayDataframePartition instance_type = ray.ObjectRef axis = None @@ -150,6 +152,31 @@ def list_of_ips(self): result[idx] = partition._ip_cache return result + @classmethod + @_inherit_docstrings(PandasDataframeAxisPartition.deploy_splitting_func) + def deploy_splitting_func( + cls, + axis, + func, + f_args, + f_kwargs, + num_splits, + *partitions, + extract_metadata=False, + ): + return _deploy_ray_func.options( + num_returns=num_splits * 4 if extract_metadata else num_splits, + ).remote( + _DEPLOY_SPLIT_FUNC, + axis, + func, + f_args, + f_kwargs, + num_splits, + *partitions, + extract_metadata=extract_metadata, + ) + @classmethod def deploy_axis_func( cls, @@ -264,25 +291,6 @@ def deploy_func_between_two_axis_partitions( *partitions, ) - def _wrap_partitions(self, partitions): - """ - Wrap partitions passed as a list of ``ray.ObjectRef`` with ``PandasOnRayDataframePartition`` class. - - Parameters - ---------- - partitions : list - List of ``ray.ObjectRef``. - - Returns - ------- - list - List of ``PandasOnRayDataframePartition`` objects. - """ - return [ - self.partition_type(object_id, length, width, ip) - for (object_id, length, width, ip) in zip(*[iter(partitions)] * 4) - ] - def apply( self, func, @@ -522,7 +530,14 @@ class PandasOnRayDataframeRowPartition(PandasOnRayDataframeVirtualPartition): @ray.remote def _deploy_ray_func( - deployer, axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs + deployer, + axis, + f_to_deploy, + f_args, + f_kwargs, + *args, + extract_metadata=True, + **kwargs, ): # pragma: no cover """ Execute a function on an axis partition in a worker process. @@ -545,6 +560,11 @@ def _deploy_ray_func( Positional arguments to pass to ``f_to_deploy``. f_kwargs : dict Keyword arguments to pass to ``f_to_deploy``. + extract_metadata : bool, default: True + Whether to return metadata (length, width, ip) of the result. Passing `False` may relax + the load on plasma storage as the remote function would return 4 times fewer futures. + Passing `False` makes sense for temporary results where you know for sure that the + metadata will never be requested. *args : list Positional arguments to pass to ``deployer``. **kwargs : dict @@ -561,6 +581,8 @@ def _deploy_ray_func( """ f_args = deserialize(f_args) result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs) + if not extract_metadata: + return result ip = get_node_ip_address() if isinstance(result, pandas.DataFrame): return result, len(result), len(result.columns), ip diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/virtual_partition.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/virtual_partition.py index c11a2b2638c..7defa7a3744 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/virtual_partition.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/virtual_partition.py @@ -30,6 +30,9 @@ # which is called inside of `UnidistWrapper.put`. _DEPLOY_AXIS_FUNC = UnidistWrapper.put(PandasDataframeAxisPartition.deploy_axis_func) _DRAIN = UnidistWrapper.put(PandasDataframeAxisPartition.drain) +_DEPLOY_SPLIT_FUNC = UnidistWrapper.put( + PandasDataframeAxisPartition.deploy_splitting_func +) class PandasOnUnidistDataframeVirtualPartition(PandasDataframeAxisPartition): @@ -54,6 +57,7 @@ class PandasOnUnidistDataframeVirtualPartition(PandasDataframeAxisPartition): Width, or reference to width, of wrapped ``pandas.DataFrame``. """ + _PARTITIONS_METADATA_LEN = 3 # (length, width, ip) partition_type = PandasOnUnidistDataframePartition instance_type = unidist.core.base.object_ref.ObjectRef axis = None @@ -149,6 +153,31 @@ def list_of_ips(self): result[idx] = partition._ip_cache return result + @classmethod + @_inherit_docstrings(PandasDataframeAxisPartition.deploy_splitting_func) + def deploy_splitting_func( + cls, + axis, + func, + f_args, + f_kwargs, + num_splits, + *partitions, + extract_metadata=False, + ): + return _deploy_unidist_func.options( + num_returns=num_splits * 4 if extract_metadata else num_splits, + ).remote( + _DEPLOY_SPLIT_FUNC, + axis, + func, + f_args, + f_kwargs, + num_splits, + *partitions, + extract_metadata=extract_metadata, + ) + @classmethod def deploy_axis_func( cls, @@ -263,25 +292,6 @@ def deploy_func_between_two_axis_partitions( *partitions, ) - def _wrap_partitions(self, partitions): - """ - Wrap partitions passed as a list of ``unidist.ObjectRef`` with ``PandasOnUnidistDataframePartition`` class. - - Parameters - ---------- - partitions : list - List of ``unidist.ObjectRef``. - - Returns - ------- - list - List of ``PandasOnUnidistDataframePartition`` objects. - """ - return [ - self.partition_type(object_id, length, width, ip) - for (object_id, length, width, ip) in zip(*[iter(partitions)] * 4) - ] - def apply( self, func, @@ -510,7 +520,14 @@ class PandasOnUnidistDataframeRowPartition(PandasOnUnidistDataframeVirtualPartit @unidist.remote def _deploy_unidist_func( - deployer, axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs + deployer, + axis, + f_to_deploy, + f_args, + f_kwargs, + *args, + extract_metadata=True, + **kwargs, ): # pragma: no cover """ Execute a function on an axis partition in a worker process. @@ -535,6 +552,11 @@ def _deploy_unidist_func( Keyword arguments to pass to ``f_to_deploy``. *args : list Positional arguments to pass to ``deployer``. + extract_metadata : bool, default: True + Whether to return metadata (length, width, ip) of the result. Passing `False` may relax + the load on object storage as the remote function would return 4 times fewer futures. + Passing `False` makes sense for temporary results where you know for sure that the + metadata will never be requested. **kwargs : dict Keyword arguments to pass to ``deployer``. @@ -549,6 +571,8 @@ def _deploy_unidist_func( """ f_args = deserialize(f_args) result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs) + if not extract_metadata: + return result ip = unidist.get_ip() if isinstance(result, pandas.DataFrame): return result, len(result), len(result.columns), ip From 609ad8875d338f5987dc97e542de732f76cdfd14 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Fri, 24 Mar 2023 13:51:08 +0000 Subject: [PATCH 2/4] fix docstyle Signed-off-by: Dmitry Chigarev --- modin/core/dataframe/pandas/partitioning/axis_partition.py | 5 +++++ .../pandas_on_ray/partitioning/virtual_partition.py | 6 +++--- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/modin/core/dataframe/pandas/partitioning/axis_partition.py b/modin/core/dataframe/pandas/partitioning/axis_partition.py index d1dade379ef..49b9f7f9131 100644 --- a/modin/core/dataframe/pandas/partitioning/axis_partition.py +++ b/modin/core/dataframe/pandas/partitioning/axis_partition.py @@ -155,6 +155,11 @@ def split( the load on object storage as the remote function would return X times fewer futures (where X is the number of metadata values). Passing `False` makes sense for temporary results where you know for sure that the metadata will never be requested. + + Returns + ------- + list + List of wrapped remote partition objects. """ f_args = tuple() if f_args is None else f_args f_kwargs = {} if f_kwargs is None else f_kwargs diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index ca538f511ea..fd274011ac7 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -560,13 +560,13 @@ def _deploy_ray_func( Positional arguments to pass to ``f_to_deploy``. f_kwargs : dict Keyword arguments to pass to ``f_to_deploy``. + *args : list + Positional arguments to pass to ``deployer``. extract_metadata : bool, default: True Whether to return metadata (length, width, ip) of the result. Passing `False` may relax - the load on plasma storage as the remote function would return 4 times fewer futures. + the load on object storage as the remote function would return 4 times fewer futures. Passing `False` makes sense for temporary results where you know for sure that the metadata will never be requested. - *args : list - Positional arguments to pass to ``deployer``. **kwargs : dict Keyword arguments to pass to ``deployer``. From 9316fbcc422feac9b201f5501a90b044e5e88aa5 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Fri, 24 Mar 2023 14:14:36 +0000 Subject: [PATCH 3/4] try fix doc building Signed-off-by: Dmitry Chigarev --- .../core/dataframe/pandas/partitioning/axis_partition.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/modin/core/dataframe/pandas/partitioning/axis_partition.py b/modin/core/dataframe/pandas/partitioning/axis_partition.py index 49b9f7f9131..bc6134379b7 100644 --- a/modin/core/dataframe/pandas/partitioning/axis_partition.py +++ b/modin/core/dataframe/pandas/partitioning/axis_partition.py @@ -142,7 +142,7 @@ def split( Parameters ---------- - split_func : callable(pandas.DataFrame, *args, **kwargs) -> list[pandas.DataFrame] + split_func : callable(pandas.DataFrame) -> list[pandas.DataFrame] A function that takes partition's content and split it into multiple chunks. num_splits : int The number of splits the `split_func` return. @@ -194,12 +194,12 @@ def deploy_splitting_func( ---------- axis : {0, 1} The axis to perform the function along. - split_func : callable(pandas.DataFrame, *args, **kwargs) -> list[pandas.DataFrame] + split_func : callable(pandas.DataFrame) -> list[pandas.DataFrame] The function to perform. f_args : list or tuple - Positional arguments to pass to ``split_func``. + Positional arguments to pass to `split_func`. f_kwargs : dict - Keyword arguments to pass to ``split_func``. + Keyword arguments to pass to `split_func`. num_splits : int The number of splits the `split_func` return. *partitions : iterable From d23b4f8ee12f0f3a4e10ca1dca67b6cc51d42474 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Thu, 6 Apr 2023 15:52:48 +0000 Subject: [PATCH 4/4] Apply review suggestions Signed-off-by: Dmitry Chigarev --- .../dataframe/base/partitioning/axis_partition.py | 11 ++++++++++- .../pandas/partitioning/partition_manager.py | 2 +- .../pandas_on_dask/partitioning/virtual_partition.py | 8 +++++--- .../pandas_on_ray/partitioning/virtual_partition.py | 11 ++++++++--- .../partitioning/virtual_partition.py | 11 ++++++++--- 5 files changed, 32 insertions(+), 11 deletions(-) diff --git a/modin/core/dataframe/base/partitioning/axis_partition.py b/modin/core/dataframe/base/partitioning/axis_partition.py index fbb214ad028..06f6c45c547 100644 --- a/modin/core/dataframe/base/partitioning/axis_partition.py +++ b/modin/core/dataframe/base/partitioning/axis_partition.py @@ -123,12 +123,21 @@ def _wrap_partitions( extract_metadata = bool(self._PARTITIONS_METADATA_LEN) if extract_metadata: + # Here we recieve a 1D array of futures describing partitions and their metadata as: + # [object_id{partition_idx}, metadata{partition_idx}_{metadata_idx}, ...] + # Here's an example of such array: + # [ + # object_id1, metadata1_1, metadata1_2, ..., metadata1_PARTITIONS_METADATA_LEN, + # object_id2, metadata2_1, ..., metadata2_PARTITIONS_METADATA_LEN, + # ... + # object_idN, metadataN_1, ..., metadataN_PARTITIONS_METADATA_LEN, + # ] return [ self.partition_type(*init_args) for init_args in zip( # `partition_type` consumes `(object_id, *metadata)`, thus adding `+1` *[iter(partitions)] - * (self._PARTITIONS_METADATA_LEN + 1) + * (1 + self._PARTITIONS_METADATA_LEN) ) ] else: diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index 2780f35ba3a..955726eed55 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -1581,7 +1581,7 @@ def shuffle_partitions( # Convert our list of block partitions to row partitions. We need to create full-axis # row partitions since we need to send the whole partition to the split step as otherwise # we wouldn't know how to split the block partitions that don't contain the shuffling key. - row_partitions = [partition for partition in cls.row_partitions(partitions)] + row_partitions = cls.row_partitions(partitions) if len(pivots): # Gather together all of the sub-partitions split_row_partitions = np.array( diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py index 52852d166ab..f1c8f90ba61 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py @@ -170,7 +170,9 @@ def deploy_splitting_func( *partitions, ), f_kwargs={"extract_metadata": extract_metadata}, - num_returns=num_splits * 4 if extract_metadata else num_splits, + num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN) + if extract_metadata + else num_splits, pure=False, ) @@ -234,7 +236,7 @@ def deploy_axis_func( "lengths": lengths, "manual_partition": manual_partition, }, - num_returns=result_num_splits * 4, + num_returns=result_num_splits * (1 + cls._PARTITIONS_METADATA_LEN), pure=False, ) @@ -291,7 +293,7 @@ def deploy_func_between_two_axis_partitions( other_shape, *partitions, ), - num_returns=num_splits * 4, + num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN), pure=False, ) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index 398b32ca0f5..65f2704d054 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -165,7 +165,9 @@ def deploy_splitting_func( extract_metadata=False, ): return _deploy_ray_func.options( - num_returns=num_splits * 4 if extract_metadata else num_splits, + num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN) + if extract_metadata + else num_splits, ).remote( _DEPLOY_SPLIT_FUNC, axis, @@ -224,7 +226,8 @@ def deploy_axis_func( A list of ``ray.ObjectRef``-s. """ return _deploy_ray_func.options( - num_returns=(num_splits if lengths is None else len(lengths)) * 4, + num_returns=(num_splits if lengths is None else len(lengths)) + * (1 + cls._PARTITIONS_METADATA_LEN), **({"max_retries": max_retries} if max_retries is not None else {}), ).remote( _DEPLOY_AXIS_FUNC, @@ -279,7 +282,9 @@ def deploy_func_between_two_axis_partitions( list A list of ``ray.ObjectRef``-s. """ - return _deploy_ray_func.options(num_returns=num_splits * 4).remote( + return _deploy_ray_func.options( + num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN) + ).remote( PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions, axis, func, diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/virtual_partition.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/virtual_partition.py index 4d17f3612a0..d95c81241d0 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/virtual_partition.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/virtual_partition.py @@ -166,7 +166,9 @@ def deploy_splitting_func( extract_metadata=False, ): return _deploy_unidist_func.options( - num_returns=num_splits * 4 if extract_metadata else num_splits, + num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN) + if extract_metadata + else num_splits, ).remote( _DEPLOY_SPLIT_FUNC, axis, @@ -225,7 +227,8 @@ def deploy_axis_func( A list of ``unidist.ObjectRef``-s. """ return _deploy_unidist_func.options( - num_returns=(num_splits if lengths is None else len(lengths)) * 4, + num_returns=(num_splits if lengths is None else len(lengths)) + * (1 + cls._PARTITIONS_METADATA_LEN), **({"max_retries": max_retries} if max_retries is not None else {}), ).remote( _DEPLOY_AXIS_FUNC, @@ -280,7 +283,9 @@ def deploy_func_between_two_axis_partitions( list A list of ``unidist.ObjectRef``-s. """ - return _deploy_unidist_func.options(num_returns=num_splits * 4).remote( + return _deploy_unidist_func.options( + num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN) + ).remote( PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions, axis, func,