diff --git a/modin/core/dataframe/base/partitioning/axis_partition.py b/modin/core/dataframe/base/partitioning/axis_partition.py index db6c7b883e7..fbb214ad028 100644 --- a/modin/core/dataframe/base/partitioning/axis_partition.py +++ b/modin/core/dataframe/base/partitioning/axis_partition.py @@ -22,6 +22,11 @@ class BaseDataframeAxisPartition(ABC): # pragma: no cover An abstract class that represents the parent class for any axis partition class. This class is intended to simplify the way that operations are performed. + + Attributes + ---------- + _PARTITIONS_METADATA_LEN : int + The number of metadata values that the object of `partition_type` consumes. """ @property @@ -87,8 +92,11 @@ def apply( # Child classes must have these in order to correctly subclass. instance_type = None partition_type = None + _PARTITIONS_METADATA_LEN = 0 - def _wrap_partitions(self, partitions: list) -> list: + def _wrap_partitions( + self, partitions: list, extract_metadata: Optional[bool] = None + ) -> list: """ Wrap remote partition objects with `BaseDataframePartition` class. @@ -96,6 +104,9 @@ def _wrap_partitions(self, partitions: list) -> list: ---------- partitions : list List of remotes partition objects to be wrapped with `BaseDataframePartition` class. + extract_metadata : bool, optional + Whether the partitions list contains information about partition's metadata. + If `None` was passed will take the argument's value from the value of `cls._PARTITIONS_METADATA_LEN`. Returns ------- @@ -105,7 +116,23 @@ def _wrap_partitions(self, partitions: list) -> list: assert self.partition_type is not None assert self.instance_type is not None # type: ignore - return [self.partition_type(obj) for obj in partitions] + if extract_metadata is None: + # If `_PARTITIONS_METADATA_LEN == 0` then the execution doesn't support metadata + # and thus we should never try extracting it, otherwise assuming that the common + # approach of always passing the metadata is used. + extract_metadata = bool(self._PARTITIONS_METADATA_LEN) + + if extract_metadata: + return [ + self.partition_type(*init_args) + for init_args in zip( + # `partition_type` consumes `(object_id, *metadata)`, thus adding `+1` + *[iter(partitions)] + * (self._PARTITIONS_METADATA_LEN + 1) + ) + ] + else: + return [self.partition_type(object_id) for object_id in partitions] def force_materialization( self, get_ip: bool = False diff --git a/modin/core/dataframe/pandas/partitioning/axis_partition.py b/modin/core/dataframe/pandas/partitioning/axis_partition.py index 55a36b28c15..d1dade379ef 100644 --- a/modin/core/dataframe/pandas/partitioning/axis_partition.py +++ b/modin/core/dataframe/pandas/partitioning/axis_partition.py @@ -134,6 +134,83 @@ def apply( ) ) + def split( + self, split_func, num_splits, f_args=None, f_kwargs=None, extract_metadata=False + ): + """ + Split axis partition into multiple partitions using the `split_func`. + + Parameters + ---------- + split_func : callable(pandas.DataFrame, *args, **kwargs) -> list[pandas.DataFrame] + A function that takes partition's content and split it into multiple chunks. + num_splits : int + The number of splits the `split_func` return. + f_args : iterable, optional + Positional arguments to pass to the `split_func`. + f_kwargs : dict, optional + Keyword arguments to pass to the `split_func`. + extract_metadata : bool, default: False + Whether to return metadata (length, width, ip) of the result. Passing `False` may relax + the load on object storage as the remote function would return X times fewer futures + (where X is the number of metadata values). Passing `False` makes sense for temporary + results where you know for sure that the metadata will never be requested. + """ + f_args = tuple() if f_args is None else f_args + f_kwargs = {} if f_kwargs is None else f_kwargs + return self._wrap_partitions( + self.deploy_splitting_func( + self.axis, + split_func, + f_args, + f_kwargs, + num_splits, + *self.list_of_blocks, + extract_metadata=extract_metadata, + ), + extract_metadata=extract_metadata, + ) + + @classmethod + def deploy_splitting_func( + cls, + axis, + split_func, + f_args, + f_kwargs, + num_splits, + *partitions, + extract_metadata=False, + ): + """ + Deploy a splitting function along a full axis. + + Parameters + ---------- + axis : {0, 1} + The axis to perform the function along. + split_func : callable(pandas.DataFrame, *args, **kwargs) -> list[pandas.DataFrame] + The function to perform. + f_args : list or tuple + Positional arguments to pass to ``split_func``. + f_kwargs : dict + Keyword arguments to pass to ``split_func``. + num_splits : int + The number of splits the `split_func` return. + *partitions : iterable + All partitions that make up the full axis (row or column). + extract_metadata : bool, default: False + Whether to return metadata (length, width, ip) of the result. Note that `True` value + is not supported in `PandasDataframeAxisPartition` class. + + Returns + ------- + list + A list of pandas DataFrames. + """ + dataframe = pandas.concat(list(partitions), axis=axis, copy=False) + return split_func(dataframe, *f_args, **f_kwargs) + @classmethod def deploy_axis_func( cls, diff --git a/modin/core/dataframe/pandas/partitioning/partition_manager.py b/modin/core/dataframe/pandas/partitioning/partition_manager.py index e0715da4b5b..bcbfdc962f7 100644 --- a/modin/core/dataframe/pandas/partitioning/partition_manager.py +++ b/modin/core/dataframe/pandas/partitioning/partition_manager.py @@ -1584,15 +1584,17 @@ def shuffle_partitions( # Convert our list of block partitions to row partitions. We need to create full-axis # row partitions since we need to send the whole partition to the split step as otherwise # we wouldn't know how to split the block partitions that don't contain the shuffling key. - row_partitions = [ - partition.force_materialization().list_of_block_partitions[0] - for partition in cls.row_partitions(partitions) - ] + row_partitions = [partition for partition in cls.row_partitions(partitions)] # Gather together all of the sub-partitions split_row_partitions = np.array( [ partition.split( - shuffle_functions.split_function, len(pivots) + 1, pivots + shuffle_functions.split_function, + num_splits=len(pivots) + 1, + f_args=(pivots,), + # The partition's metadata will never be accessed for the split partitions, + # thus no need to compute it. + extract_metadata=False, ) for partition in row_partitions ] diff --git a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py index 66ba925ac4e..0ec97b7a457 100644 --- a/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py +++ b/modin/core/execution/dask/implementations/pandas_on_dask/partitioning/virtual_partition.py @@ -50,6 +50,7 @@ class PandasOnDaskDataframeVirtualPartition(PandasDataframeAxisPartition): """ axis = None + _PARTITIONS_METADATA_LEN = 3 # (length, width, ip) partition_type = PandasOnDaskDataframePartition instance_type = Future @@ -145,6 +146,34 @@ def list_of_ips(self): result[idx] = partition._ip_cache return result + @classmethod + @_inherit_docstrings(PandasDataframeAxisPartition.deploy_splitting_func) + def deploy_splitting_func( + cls, + axis, + func, + f_args, + f_kwargs, + num_splits, + *partitions, + extract_metadata=False, + ): + return DaskWrapper.deploy( + func=_deploy_dask_func, + f_args=( + PandasDataframeAxisPartition.deploy_splitting_func, + axis, + func, + f_args, + f_kwargs, + num_splits, + *partitions, + ), + f_kwargs={"extract_metadata": extract_metadata}, + num_returns=num_splits * 4 if extract_metadata else num_splits, + pure=False, + ) + @classmethod def deploy_axis_func( cls, @@ -266,25 +295,6 @@ def deploy_func_between_two_axis_partitions( pure=False, ) - def _wrap_partitions(self, partitions): - """ - Wrap partitions passed as a list of distributed.Future with ``PandasOnDaskDataframePartition`` class. - - Parameters - ---------- - partitions : list - List of distributed.Future. - - Returns - ------- - list - List of ``PandasOnDaskDataframePartition`` objects. - """ - return [ - self.partition_type(future, length, width, ip) - for (future, length, width, ip) in zip(*[iter(partitions)] * 4) - ] - def apply( self, func, @@ -505,7 +515,16 @@ class PandasOnDaskDataframeRowPartition(PandasOnDaskDataframeVirtualPartition): axis = 1 -def _deploy_dask_func(deployer, axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs): +def _deploy_dask_func( + deployer, + axis, + f_to_deploy, + f_args, + f_kwargs, + *args, + extract_metadata=True, + **kwargs, +): """ Execute a function on an axis partition in a worker process. @@ -527,6 +546,11 @@ def _deploy_dask_func(deployer, axis, f_to_deploy, f_args, f_kwargs, *args, **kw Keyword arguments to pass to ``f_to_deploy``. *args : list Positional arguments to pass to ``func``. + extract_metadata : bool, default: True + Whether to return metadata (length, width, ip) of the result. Passing `False` may relax + the load on object storage as the remote function would return 4 times fewer futures. + Passing `False` makes sense for temporary results where you know for sure that the + metadata will never be requested. **kwargs : dict Keyword arguments to pass to ``func``. @@ -536,6 +560,8 @@ def _deploy_dask_func(deployer, axis, f_to_deploy, f_args, f_kwargs, *args, **kw The result of the function ``func`` and metadata for it. """ result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs) + if not extract_metadata: + return result ip = get_ip() if isinstance(result, pandas.DataFrame): return result, len(result), len(result.columns), ip diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index e3aab745169..ca538f511ea 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -29,6 +29,7 @@ # If Ray has not been initialized yet by Modin, # it will be initialized when calling `RayWrapper.put`. _DEPLOY_AXIS_FUNC = RayWrapper.put(PandasDataframeAxisPartition.deploy_axis_func) +_DEPLOY_SPLIT_FUNC = RayWrapper.put(PandasDataframeAxisPartition.deploy_splitting_func) _DRAIN = RayWrapper.put(PandasDataframeAxisPartition.drain) @@ -54,6 +55,7 @@ class PandasOnRayDataframeVirtualPartition(PandasDataframeAxisPartition): Width, or reference to width, of wrapped ``pandas.DataFrame``. """ + _PARTITIONS_METADATA_LEN = 3 # (length, width, ip) partition_type = PandasOnRayDataframePartition instance_type = ray.ObjectRef axis = None @@ -150,6 +152,31 @@ def list_of_ips(self): result[idx] = partition._ip_cache return result + @classmethod + @_inherit_docstrings(PandasDataframeAxisPartition.deploy_splitting_func) + def deploy_splitting_func( + cls, + axis, + func, + f_args, + f_kwargs, + num_splits, + *partitions, + extract_metadata=False, + ): + return _deploy_ray_func.options( + num_returns=num_splits * 4 if extract_metadata else num_splits, + ).remote( + _DEPLOY_SPLIT_FUNC, + axis, + func, + f_args, + f_kwargs, + num_splits, + *partitions, + extract_metadata=extract_metadata, + ) + @classmethod def deploy_axis_func( cls, @@ -264,25 +291,6 @@ def deploy_func_between_two_axis_partitions( *partitions, ) - def _wrap_partitions(self, partitions): - """ - Wrap partitions passed as a list of ``ray.ObjectRef`` with ``PandasOnRayDataframePartition`` class. - - Parameters - ---------- - partitions : list - List of ``ray.ObjectRef``. - - Returns - ------- - list - List of ``PandasOnRayDataframePartition`` objects. - """ - return [ - self.partition_type(object_id, length, width, ip) - for (object_id, length, width, ip) in zip(*[iter(partitions)] * 4) - ] - def apply( self, func, @@ -522,7 +530,14 @@ class PandasOnRayDataframeRowPartition(PandasOnRayDataframeVirtualPartition): @ray.remote def _deploy_ray_func( - deployer, axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs + deployer, + axis, + f_to_deploy, + f_args, + f_kwargs, + *args, + extract_metadata=True, + **kwargs, ): # pragma: no cover """ Execute a function on an axis partition in a worker process. @@ -545,6 +560,11 @@ def _deploy_ray_func( Positional arguments to pass to ``f_to_deploy``. f_kwargs : dict Keyword arguments to pass to ``f_to_deploy``. + extract_metadata : bool, default: True + Whether to return metadata (length, width, ip) of the result. Passing `False` may relax + the load on plasma storage as the remote function would return 4 times fewer futures. + Passing `False` makes sense for temporary results where you know for sure that the + metadata will never be requested. *args : list Positional arguments to pass to ``deployer``. **kwargs : dict @@ -561,6 +581,8 @@ def _deploy_ray_func( """ f_args = deserialize(f_args) result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs) + if not extract_metadata: + return result ip = get_node_ip_address() if isinstance(result, pandas.DataFrame): return result, len(result), len(result.columns), ip diff --git a/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/virtual_partition.py b/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/virtual_partition.py index c11a2b2638c..7defa7a3744 100644 --- a/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/virtual_partition.py +++ b/modin/core/execution/unidist/implementations/pandas_on_unidist/partitioning/virtual_partition.py @@ -30,6 +30,9 @@ # which is called inside of `UnidistWrapper.put`. _DEPLOY_AXIS_FUNC = UnidistWrapper.put(PandasDataframeAxisPartition.deploy_axis_func) _DRAIN = UnidistWrapper.put(PandasDataframeAxisPartition.drain) +_DEPLOY_SPLIT_FUNC = UnidistWrapper.put( + PandasDataframeAxisPartition.deploy_splitting_func +) class PandasOnUnidistDataframeVirtualPartition(PandasDataframeAxisPartition): @@ -54,6 +57,7 @@ class PandasOnUnidistDataframeVirtualPartition(PandasDataframeAxisPartition): Width, or reference to width, of wrapped ``pandas.DataFrame``. """ + _PARTITIONS_METADATA_LEN = 3 # (length, width, ip) partition_type = PandasOnUnidistDataframePartition instance_type = unidist.core.base.object_ref.ObjectRef axis = None @@ -149,6 +153,31 @@ def list_of_ips(self): result[idx] = partition._ip_cache return result + @classmethod + @_inherit_docstrings(PandasDataframeAxisPartition.deploy_splitting_func) + def deploy_splitting_func( + cls, + axis, + func, + f_args, + f_kwargs, + num_splits, + *partitions, + extract_metadata=False, + ): + return _deploy_unidist_func.options( + num_returns=num_splits * 4 if extract_metadata else num_splits, + ).remote( + _DEPLOY_SPLIT_FUNC, + axis, + func, + f_args, + f_kwargs, + num_splits, + *partitions, + extract_metadata=extract_metadata, + ) + @classmethod def deploy_axis_func( cls, @@ -263,25 +292,6 @@ def deploy_func_between_two_axis_partitions( *partitions, ) - def _wrap_partitions(self, partitions): - """ - Wrap partitions passed as a list of ``unidist.ObjectRef`` with ``PandasOnUnidistDataframePartition`` class. - - Parameters - ---------- - partitions : list - List of ``unidist.ObjectRef``. - - Returns - ------- - list - List of ``PandasOnUnidistDataframePartition`` objects. - """ - return [ - self.partition_type(object_id, length, width, ip) - for (object_id, length, width, ip) in zip(*[iter(partitions)] * 4) - ] - def apply( self, func, @@ -510,7 +520,14 @@ class PandasOnUnidistDataframeRowPartition(PandasOnUnidistDataframeVirtualPartit @unidist.remote def _deploy_unidist_func( - deployer, axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs + deployer, + axis, + f_to_deploy, + f_args, + f_kwargs, + *args, + extract_metadata=True, + **kwargs, ): # pragma: no cover """ Execute a function on an axis partition in a worker process. @@ -535,6 +552,11 @@ def _deploy_unidist_func( Keyword arguments to pass to ``f_to_deploy``. *args : list Positional arguments to pass to ``deployer``. + extract_metadata : bool, default: True + Whether to return metadata (length, width, ip) of the result. Passing `False` may relax + the load on object storage as the remote function would return 4 times fewer futures. + Passing `False` makes sense for temporary results where you know for sure that the + metadata will never be requested. **kwargs : dict Keyword arguments to pass to ``deployer``. @@ -549,6 +571,8 @@ def _deploy_unidist_func( """ f_args = deserialize(f_args) result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs) + if not extract_metadata: + return result ip = unidist.get_ip() if isinstance(result, pandas.DataFrame): return result, len(result), len(result.columns), ip