diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index ab8ce08d195..66463ecb2ba 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -115,13 +115,13 @@ def deploy_splitting_func( else num_splits, ).remote( cls._get_deploy_split_func(), - axis, - func, - len(f_args), - f_kwargs, - num_splits, *f_args, + num_splits, *partitions, + axis=axis, + f_to_deploy=func, + f_len_args=len(f_args), + f_kwargs=f_kwargs, extract_metadata=extract_metadata, ) @@ -177,14 +177,14 @@ def deploy_axis_func( **({"max_retries": max_retries} if max_retries is not None else {}), ).remote( cls._get_deploy_axis_func(), - axis, - func, - len(f_args), - f_kwargs, + *f_args, num_splits, maintain_partitioning, - *f_args, *partitions, + axis=axis, + f_to_deploy=func, + f_len_args=len(f_args), + f_kwargs=f_kwargs, manual_partition=manual_partition, lengths=lengths, ) @@ -233,15 +233,15 @@ def deploy_func_between_two_axis_partitions( num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN) ).remote( PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions, - axis, - func, - len(f_args), - f_kwargs, + *f_args, num_splits, len_of_left, other_shape, - *f_args, *partitions, + axis=axis, + f_to_deploy=func, + f_len_args=len(f_args), + f_kwargs=f_kwargs, ) def wait(self): @@ -264,11 +264,11 @@ class PandasOnRayDataframeRowPartition(PandasOnRayDataframeVirtualPartition): @ray.remote def _deploy_ray_func( deployer, + *positional_args, axis, f_to_deploy, f_len_args, f_kwargs, - *futures, extract_metadata=True, **kwargs, ): # pragma: no cover @@ -277,31 +277,32 @@ def _deploy_ray_func( This is ALWAYS called on either ``PandasDataframeAxisPartition.deploy_axis_func`` or ``PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions``, which both - serve to deploy another dataframe function on a Ray worker process. The provided ``f_args`` - is thus are deserialized here (on the Ray worker) before the function is called (``f_kwargs`` - will never contain more Ray objects, and thus does not require deserialization). + serve to deploy another dataframe function on a Ray worker process. The provided `positional_args` + contains positional arguments for both: `deployer` and for `f_to_deploy`, the parameters can be separated + using the `f_len_args` value. The parameters are combined so they will be deserialized by Ray before the + kernel is executed (`f_kwargs` will never contain more Ray objects, and thus does not require deserialization). Parameters ---------- deployer : callable A `PandasDataFrameAxisPartition.deploy_*` method that will call ``f_to_deploy``. + *positional_args : list + The first `f_len_args` elements in this list represent positional arguments + to pass to the `f_to_deploy`. The rest are positional arguments that will be + passed to `deployer`. axis : {0, 1} - The axis to perform the function along. + The axis to perform the function along. This argument is keyword only. f_to_deploy : callable or RayObjectID - The function to deploy. + The function to deploy. This argument is keyword only. f_len_args : int - Number of positional arguments to pass to ``f_to_deploy``. + Number of positional arguments to pass to ``f_to_deploy``. This argument is keyword only. f_kwargs : dict - Keyword arguments to pass to ``f_to_deploy``. - *futures : list - The first `f_len_args` elements in this list represent positional arguments - to pass to the `f_to_deploy`. The rest are partitions that will be passed as - positional arguments to `deployer`. + Keyword arguments to pass to ``f_to_deploy``. This argument is keyword only. extract_metadata : bool, default: True Whether to return metadata (length, width, ip) of the result. Passing `False` may relax the load on object storage as the remote function would return 4 times fewer futures. Passing `False` makes sense for temporary results where you know for sure that the - metadata will never be requested. + metadata will never be requested. This argument is keyword only. **kwargs : dict Keyword arguments to pass to ``deployer``. @@ -314,9 +315,9 @@ def _deploy_ray_func( ----- Ray functions are not detected by codecov (thus pragma: no cover). """ - f_args = futures[:f_len_args] - partitions = futures[f_len_args:] - result = deployer(axis, f_to_deploy, f_args, f_kwargs, *partitions, **kwargs) + f_args = positional_args[:f_len_args] + deploy_args = positional_args[f_len_args:] + result = deployer(axis, f_to_deploy, f_args, f_kwargs, *deploy_args, **kwargs) if not extract_metadata: return result ip = get_node_ip_address()