Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#4507: Do not call 'ray.get()' inside of the kernel executing call queues #6633

Merged
merged 5 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions modin/core/execution/ray/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,3 +286,111 @@ def deserialize(obj): # pragma: no cover
return dict(zip(obj.keys(), RayWrapper.materialize(list(obj.values()))))
else:
return obj


def deconstruct_call_queue(call_queue):
"""
Deconstruct the passed call queue into a 1D list.

This is required, so the call queue can be then passed to a Ray's kernel
as a variable-length argument ``kernel(*queue)`` so the Ray engine
automatically materialize all the futures that the queue might have contained.

Parameters
----------
call_queue : list[list[func, args, kwargs], ...]

Returns
-------
num_funcs : int
The number of functions in the call queue.
arg_lengths : list of ints
The number of positional arguments for each function in the call queue.
kw_key_lengths : list of ints
The number of key-word arguments for each function in the call queue.
kw_value_lengths : 2D list of dict{"len": int, "was_iterable": bool}
Description of keyword arguments for each function. For example, `kw_value_lengths[i][j]`
describes the j-th keyword argument of the i-th function in the call queue.
The describtion contains of the lengths of the argument and whether it's a list at all
(for example, {"len": 1, "was_iterable": False} describes a non-list argument).
unfolded_queue : list
A 1D call queue that can be reconstructed using ``reconstruct_call_queue`` function.
"""
num_funcs = len(call_queue)
arg_lengths = []
kw_key_lengths = []
kw_value_lengths = []
unfolded_queue = []
for call in call_queue:
unfolded_queue.append(call[0])
unfolded_queue.extend(call[1])
arg_lengths.append(len(call[1]))
# unfold keyword dict
## unfold keys
unfolded_queue.extend(call[2].keys())
kw_key_lengths.append(len(call[2]))
## unfold values
value_lengths = []
for value in call[2].values():
was_iterable = True
if not isinstance(value, (list, tuple)):
was_iterable = False
value = (value,)
unfolded_queue.extend(value)
value_lengths.append({"len": len(value), "was_iterable": was_iterable})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe using tuple (len(value), was_iterable) here will be faster. Consider only if unboxing/packing performance is not sufficient.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, it may be faster for a couple of nanoseconds, but I would prefer readability of the code to saving not sufficient amount of time here

kw_value_lengths.append(value_lengths)

return num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, *unfolded_queue


def reconstruct_call_queue(
num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, unfolded_queue
):
"""
Reconstruct original call queue from the result of the ``deconstruct_call_queue()``.

Parameters
----------
num_funcs : int
The number of functions in the call queue.
arg_lengths : list of ints
The number of positional arguments for each function in the call queue.
kw_key_lengths : list of ints
The number of key-word arguments for each function in the call queue.
kw_value_lengths : 2D list of dict{"len": int, "was_iterable": bool}
Description of keyword arguments for each function. For example, `kw_value_lengths[i][j]`
describes the j-th keyword argument of the i-th function in the call queue.
The describtion contains of the lengths of the argument and whether it's a list at all
(for example, {"len": 1, "was_iterable": False} describes a non-list argument).
unfolded_queue : list
A 1D call queue that is result of the ``deconstruct_call_queue()`` function.

Returns
-------
list[list[func, args, kwargs], ...]
Original call queue.
"""
items_took = 0

def take_n_items(n):
nonlocal items_took
res = unfolded_queue[items_took : items_took + n]
items_took += n
return res

call_queue = []
for i in range(num_funcs):
func = take_n_items(1)[0]
args = take_n_items(arg_lengths[i])
kw_keys = take_n_items(kw_key_lengths[i])
kwargs = {}
value_lengths = kw_value_lengths[i]
for key, value_length in zip(kw_keys, value_lengths):
vals = take_n_items(value_length["len"])
if value_length["len"] == 1 and not value_length["was_iterable"]:
vals = vals[0]
kwargs[key] = vals

call_queue.append((func, args, kwargs))

return call_queue
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@

from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition
from modin.core.execution.ray.common import RayWrapper
from modin.core.execution.ray.common.utils import ObjectIDType, deserialize
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
from modin.core.execution.ray.common.utils import (
ObjectIDType,
deconstruct_call_queue,
reconstruct_call_queue,
)
from modin.logging import get_logger
from modin.pandas.indexing import compute_sliced_len

Expand Down Expand Up @@ -97,7 +101,9 @@ def apply(self, func, *args, **kwargs):
self._is_debug(log) and log.debug(
f"SUBMIT::_apply_list_of_funcs::{self._identity}"
)
result, length, width, ip = _apply_list_of_funcs.remote(call_queue, data)
result, length, width, ip = _apply_list_of_funcs.remote(
data, *deconstruct_call_queue(call_queue)
)
else:
# We handle `len(call_queue) == 1` in a different way because
# this dramatically improves performance.
Expand Down Expand Up @@ -128,7 +134,7 @@ def drain_call_queue(self):
new_length,
new_width,
self._ip_cache,
) = _apply_list_of_funcs.remote(call_queue, data)
) = _apply_list_of_funcs.remote(data, *deconstruct_call_queue(call_queue))
else:
# We handle `len(call_queue) == 1` in a different way because
# this dramatically improves performance.
Expand Down Expand Up @@ -391,16 +397,29 @@ def _apply_func(partition, func, *args, **kwargs): # pragma: no cover


@ray.remote(num_returns=4)
def _apply_list_of_funcs(call_queue, partition): # pragma: no cover
def _apply_list_of_funcs(
partition, num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, *futures
): # pragma: no cover
"""
Execute all operations stored in the call queue on the partition in a worker process.

Parameters
----------
call_queue : list
A call queue that needs to be executed on the partition.
partition : pandas.DataFrame
A pandas DataFrame the call queue needs to be executed on.
num_funcs : int
The number of functions in the call queue.
arg_lengths : list of ints
The number of positional arguments for each function in the call queue.
kw_key_lengths : list of ints
The number of key-word arguments for each function in the call queue.
kw_value_lengths : 2D list of dict{"len": int, "was_iterable": bool}
Description of keyword arguments for each function. For example, `kw_value_lengths[i][j]`
describes the j-th keyword argument of the i-th function in the call queue.
The describtion contains of the lengths of the argument and whether it's a list at all
(for example, {"len": 1, "was_iterable": False} describes a non-list argument).
*futures : list
A 1D call queue that is result of the ``deconstruct_call_queue()`` function.

Returns
-------
Expand All @@ -413,10 +432,10 @@ def _apply_list_of_funcs(call_queue, partition): # pragma: no cover
str
The node IP address of the worker process.
"""
for func, f_args, f_kwargs in call_queue:
func = deserialize(func)
args = deserialize(f_args)
kwargs = deserialize(f_kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we are passing anything at all through this parameter that could lead to a hang? I believe objects created explicitly using ray.put should work correctly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we are passing anything at all through this parameter that could lead to a hang?

I think that we're indeed not passing anything like that via func nor via f_kwargs, but I thought that it might be a good idea to unify the approach across f_args, funcs and f_kwargs to avoid such problems in the future

call_queue = reconstruct_call_queue(
dchigarev marked this conversation as resolved.
Show resolved Hide resolved
num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, futures
)
for func, args, kwargs in call_queue:
try:
partition = func(partition, *args, **kwargs)
# Sometimes Arrow forces us to make a copy of an object before we operate on it. We
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
PandasDataframeAxisPartition,
)
from modin.core.execution.ray.common import RayWrapper
from modin.core.execution.ray.common.utils import deserialize
from modin.utils import _inherit_docstrings

from .partition import PandasOnRayDataframePartition
Expand Down Expand Up @@ -116,12 +115,13 @@ def deploy_splitting_func(
else num_splits,
).remote(
cls._get_deploy_split_func(),
axis,
func,
f_args,
f_kwargs,
*f_args,
num_splits,
*partitions,
axis=axis,
f_to_deploy=func,
f_len_args=len(f_args),
f_kwargs=f_kwargs,
extract_metadata=extract_metadata,
)

Expand Down Expand Up @@ -177,13 +177,14 @@ def deploy_axis_func(
**({"max_retries": max_retries} if max_retries is not None else {}),
).remote(
cls._get_deploy_axis_func(),
axis,
func,
f_args,
f_kwargs,
*f_args,
num_splits,
maintain_partitioning,
*partitions,
axis=axis,
f_to_deploy=func,
f_len_args=len(f_args),
f_kwargs=f_kwargs,
manual_partition=manual_partition,
lengths=lengths,
)
Expand Down Expand Up @@ -232,14 +233,15 @@ def deploy_func_between_two_axis_partitions(
num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN)
).remote(
PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions,
axis,
func,
f_args,
f_kwargs,
*f_args,
num_splits,
len_of_left,
other_shape,
*partitions,
axis=axis,
f_to_deploy=func,
f_len_args=len(f_args),
f_kwargs=f_kwargs,
)

def wait(self):
Expand All @@ -262,11 +264,11 @@ class PandasOnRayDataframeRowPartition(PandasOnRayDataframeVirtualPartition):
@ray.remote
def _deploy_ray_func(
deployer,
*positional_args,
axis,
f_to_deploy,
f_args,
f_len_args,
f_kwargs,
*args,
extract_metadata=True,
**kwargs,
): # pragma: no cover
Expand All @@ -275,29 +277,32 @@ def _deploy_ray_func(

This is ALWAYS called on either ``PandasDataframeAxisPartition.deploy_axis_func``
or ``PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions``, which both
serve to deploy another dataframe function on a Ray worker process. The provided ``f_args``
is thus are deserialized here (on the Ray worker) before the function is called (``f_kwargs``
will never contain more Ray objects, and thus does not require deserialization).
serve to deploy another dataframe function on a Ray worker process. The provided `positional_args`
contains positional arguments for both: `deployer` and for `f_to_deploy`, the parameters can be separated
using the `f_len_args` value. The parameters are combined so they will be deserialized by Ray before the
kernel is executed (`f_kwargs` will never contain more Ray objects, and thus does not require deserialization).

Parameters
----------
deployer : callable
A `PandasDataFrameAxisPartition.deploy_*` method that will call ``f_to_deploy``.
*positional_args : list
The first `f_len_args` elements in this list represent positional arguments
to pass to the `f_to_deploy`. The rest are positional arguments that will be
passed to `deployer`.
axis : {0, 1}
The axis to perform the function along.
The axis to perform the function along. This argument is keyword only.
f_to_deploy : callable or RayObjectID
The function to deploy.
f_args : list or tuple
Positional arguments to pass to ``f_to_deploy``.
The function to deploy. This argument is keyword only.
f_len_args : int
Number of positional arguments to pass to ``f_to_deploy``. This argument is keyword only.
f_kwargs : dict
Keyword arguments to pass to ``f_to_deploy``.
*args : list
Positional arguments to pass to ``deployer``.
Keyword arguments to pass to ``f_to_deploy``. This argument is keyword only.
extract_metadata : bool, default: True
Whether to return metadata (length, width, ip) of the result. Passing `False` may relax
the load on object storage as the remote function would return 4 times fewer futures.
Passing `False` makes sense for temporary results where you know for sure that the
metadata will never be requested.
metadata will never be requested. This argument is keyword only.
**kwargs : dict
Keyword arguments to pass to ``deployer``.

Expand All @@ -310,8 +315,9 @@ def _deploy_ray_func(
-----
Ray functions are not detected by codecov (thus pragma: no cover).
"""
f_args = deserialize(f_args)
result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs)
f_args = positional_args[:f_len_args]
deploy_args = positional_args[f_len_args:]
result = deployer(axis, f_to_deploy, f_args, f_kwargs, *deploy_args, **kwargs)
if not extract_metadata:
return result
ip = get_node_ip_address()
Expand Down
Loading