Skip to content

Commit

Permalink
fix full-axis functions
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev committed Oct 11, 2023
1 parent 0bb11b4 commit 746fdbd
Showing 1 changed file with 32 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,13 @@ def deploy_splitting_func(
else num_splits,
).remote(
cls._get_deploy_split_func(),
axis,
func,
len(f_args),
f_kwargs,
num_splits,
*f_args,
num_splits,
*partitions,
axis=axis,
f_to_deploy=func,
f_len_args=len(f_args),
f_kwargs=f_kwargs,
extract_metadata=extract_metadata,
)

Expand Down Expand Up @@ -177,14 +177,14 @@ def deploy_axis_func(
**({"max_retries": max_retries} if max_retries is not None else {}),
).remote(
cls._get_deploy_axis_func(),
axis,
func,
len(f_args),
f_kwargs,
*f_args,
num_splits,
maintain_partitioning,
*f_args,
*partitions,
axis=axis,
f_to_deploy=func,
f_len_args=len(f_args),
f_kwargs=f_kwargs,
manual_partition=manual_partition,
lengths=lengths,
)
Expand Down Expand Up @@ -233,15 +233,15 @@ def deploy_func_between_two_axis_partitions(
num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN)
).remote(
PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions,
axis,
func,
len(f_args),
f_kwargs,
*f_args,
num_splits,
len_of_left,
other_shape,
*f_args,
*partitions,
axis=axis,
f_to_deploy=func,
f_len_args=len(f_args),
f_kwargs=f_kwargs,
)

def wait(self):
Expand All @@ -264,11 +264,11 @@ class PandasOnRayDataframeRowPartition(PandasOnRayDataframeVirtualPartition):
@ray.remote
def _deploy_ray_func(
deployer,
*positional_args,
axis,
f_to_deploy,
f_len_args,
f_kwargs,
*futures,
extract_metadata=True,
**kwargs,
): # pragma: no cover
Expand All @@ -277,31 +277,32 @@ def _deploy_ray_func(
This is ALWAYS called on either ``PandasDataframeAxisPartition.deploy_axis_func``
or ``PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions``, which both
serve to deploy another dataframe function on a Ray worker process. The provided ``f_args``
is thus are deserialized here (on the Ray worker) before the function is called (``f_kwargs``
will never contain more Ray objects, and thus does not require deserialization).
serve to deploy another dataframe function on a Ray worker process. The provided `positional_args`
contains positional arguments for both: `deployer` and for `f_to_deploy`, the parameters can be separated
using the `f_len_args` value. The parameters are combined so they will be deserialized by Ray before the
kernel is executed (`f_kwargs` will never contain more Ray objects, and thus does not require deserialization).
Parameters
----------
deployer : callable
A `PandasDataFrameAxisPartition.deploy_*` method that will call ``f_to_deploy``.
*positional_args : list
The first `f_len_args` elements in this list represent positional arguments
to pass to the `f_to_deploy`. The rest are positional arguments that will be
passed to `deployer`.
axis : {0, 1}
The axis to perform the function along.
The axis to perform the function along. This argument is keyword only.
f_to_deploy : callable or RayObjectID
The function to deploy.
The function to deploy. This argument is keyword only.
f_len_args : int
Number of positional arguments to pass to ``f_to_deploy``.
Number of positional arguments to pass to ``f_to_deploy``. This argument is keyword only.
f_kwargs : dict
Keyword arguments to pass to ``f_to_deploy``.
*futures : list
The first `f_len_args` elements in this list represent positional arguments
to pass to the `f_to_deploy`. The rest are partitions that will be passed as
positional arguments to `deployer`.
Keyword arguments to pass to ``f_to_deploy``. This argument is keyword only.
extract_metadata : bool, default: True
Whether to return metadata (length, width, ip) of the result. Passing `False` may relax
the load on object storage as the remote function would return 4 times fewer futures.
Passing `False` makes sense for temporary results where you know for sure that the
metadata will never be requested.
metadata will never be requested. This argument is keyword only.
**kwargs : dict
Keyword arguments to pass to ``deployer``.
Expand All @@ -314,9 +315,9 @@ def _deploy_ray_func(
-----
Ray functions are not detected by codecov (thus pragma: no cover).
"""
f_args = futures[:f_len_args]
partitions = futures[f_len_args:]
result = deployer(axis, f_to_deploy, f_args, f_kwargs, *partitions, **kwargs)
f_args = positional_args[:f_len_args]
deploy_args = positional_args[f_len_args:]
result = deployer(axis, f_to_deploy, f_args, f_kwargs, *deploy_args, **kwargs)
if not extract_metadata:
return result
ip = get_node_ip_address()
Expand Down

0 comments on commit 746fdbd

Please sign in to comment.