From 960275283563bba2e96f47f6c50c54054aedb612 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Thu, 5 Oct 2023 10:09:43 +0000 Subject: [PATCH 1/5] FIX-#4507: Do not call 'ray.get()' inside of the kernel executing call queues Signed-off-by: Dmitry Chigarev --- .../pandas_on_ray/partitioning/partition.py | 176 +++++++++++++++++- .../storage_formats/pandas/test_internals.py | 121 ++++++++++++ 2 files changed, 287 insertions(+), 10 deletions(-) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py index 380095f7fbc..06e8789c543 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py @@ -18,7 +18,7 @@ from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition from modin.core.execution.ray.common import RayWrapper -from modin.core.execution.ray.common.utils import ObjectIDType, deserialize +from modin.core.execution.ray.common.utils import ObjectIDType from modin.logging import get_logger from modin.pandas.indexing import compute_sliced_len @@ -66,6 +66,43 @@ def __init__(self, data, length=None, width=None, ip=None, call_queue=None): ) ) + @staticmethod + def _apply_call_queue(call_queue, data): + """ + Execute call queue over the given `data`. + + Parameters + ---------- + call_queue : list[list[func, args, kwargs], ...] + data : ray.ObjectRef + + Returns + ------- + ray.ObjectRef of pandas.DataFrame + The resulting pandas DataFrame. + ray.ObjectRef of int + The number of rows of the resulting pandas DataFrame. + ray.ObjectRef of int + The number of columns of the resulting pandas DataFrame. + ray.ObjectRef of str + The node IP address of the worker process. + """ + ( + num_funcs, + arg_lengths, + kw_key_lengths, + kw_value_lengths, + unfolded_queue, + ) = deconstruct_call_queue(call_queue) + return _apply_list_of_funcs.remote( + data, + num_funcs, + arg_lengths, + kw_key_lengths, + kw_value_lengths, + *unfolded_queue, + ) + def apply(self, func, *args, **kwargs): """ Apply a function to the object wrapped by this partition. @@ -97,7 +134,7 @@ def apply(self, func, *args, **kwargs): self._is_debug(log) and log.debug( f"SUBMIT::_apply_list_of_funcs::{self._identity}" ) - result, length, width, ip = _apply_list_of_funcs.remote(call_queue, data) + result, length, width, ip = self._apply_call_queue(call_queue, data) else: # We handle `len(call_queue) == 1` in a different way because # this dramatically improves performance. @@ -128,7 +165,7 @@ def drain_call_queue(self): new_length, new_width, self._ip_cache, - ) = _apply_list_of_funcs.remote(call_queue, data) + ) = self._apply_call_queue(call_queue, data) else: # We handle `len(call_queue) == 1` in a different way because # this dramatically improves performance. @@ -343,6 +380,112 @@ def _get_index_and_columns(df): # pragma: no cover return len(df.index), len(df.columns) +def deconstruct_call_queue(call_queue): + """ + Deconstruct the passed call queue into a 1D list. + + This is required, so the call queue can be then passed to a Ray's kernel + as a variable-length argument ``kernel(*queue)`` so the Ray engine + automatically materialize all the futures that the queue might have contained. + + Parameters + ---------- + call_queue : list[list[func, args, kwargs], ...] + + Returns + ------- + num_funcs : int + The number of functions in the call queue. + arg_lengths : list of ints + The number of positional arguments for each function in the call queue. + kw_key_lengths : list of ints + The number of key-word arguments for each function in the call queue. + kw_value_lengths : 2D list of dict{"len": int, "was_iterable": bool} + Description of keyword arguments for each function. For example, `kw_value_lengths[i][j]` + describes the j-th keyword argument of the i-th function in the call queue. + The describtion contains of the lengths of the argument and whether it's a list at all + (for example, {"len": 1, "was_iterable": False} describes a non-list argument). + unfolded_queue : list + A 1D call queue that can be reconstructed using ``reconstruct_call_queue`` function. + """ + num_funcs = len(call_queue) + arg_lengths = [] + kw_key_lengths = [] + kw_value_lengths = [] + unfolded_queue = [] + for call in call_queue: + unfolded_queue.append(call[0]) + unfolded_queue.extend(call[1]) + unfolded_queue.extend(call[2].keys()) + value_lengths = [] + for value in call[2].values(): + was_iterable = True + if not isinstance(value, (list, tuple)): + was_iterable = False + value = [value] + unfolded_queue.extend(value) + value_lengths.append({"len": len(value), "was_iterable": was_iterable}) + + arg_lengths.append(len(call[1])) + kw_key_lengths.append(len(call[2])) + kw_value_lengths.append(value_lengths) + + return num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, unfolded_queue + + +def reconstruct_call_queue( + num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, unfolded_queue +): + """ + Reconstruct original call queue from the result of the ``deconstruct_call_queue()``. + + Parameters + ---------- + num_funcs : int + The number of functions in the call queue. + arg_lengths : list of ints + The number of positional arguments for each function in the call queue. + kw_key_lengths : list of ints + The number of key-word arguments for each function in the call queue. + kw_value_lengths : 2D list of dict{"len": int, "was_iterable": bool} + Description of keyword arguments for each function. For example, `kw_value_lengths[i][j]` + describes the j-th keyword argument of the i-th function in the call queue. + The describtion contains of the lengths of the argument and whether it's a list at all + (for example, {"len": 1, "was_iterable": False} describes a non-list argument). + unfolded_queue : list + A 1D call queue that is result of the ``deconstruct_call_queue()`` function. + + Returns + ------- + list[list[func, args, kwargs], ...] + Original call queue. + """ + items_took = 0 + + def take_n_items(n): + nonlocal items_took + res = unfolded_queue[items_took : items_took + n] + items_took += n + return res + + call_queue = [] + for i in range(num_funcs): + func = take_n_items(1)[0] + args = take_n_items(arg_lengths[i]) + kw_keys = take_n_items(kw_key_lengths[i]) + kwargs = {} + value_lengths = kw_value_lengths[i] + for j, key in enumerate(kw_keys): + vals = take_n_items(value_lengths[j]["len"]) + if value_lengths[j]["len"] == 1 and not value_lengths[j]["was_iterable"]: + vals = vals[0] + kwargs[key] = vals + + call_queue.append((func, args, kwargs)) + + return call_queue + + @ray.remote(num_returns=4) def _apply_func(partition, func, *args, **kwargs): # pragma: no cover """ @@ -391,16 +534,29 @@ def _apply_func(partition, func, *args, **kwargs): # pragma: no cover @ray.remote(num_returns=4) -def _apply_list_of_funcs(call_queue, partition): # pragma: no cover +def _apply_list_of_funcs( + partition, num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, *futures +): # pragma: no cover """ Execute all operations stored in the call queue on the partition in a worker process. Parameters ---------- - call_queue : list - A call queue that needs to be executed on the partition. partition : pandas.DataFrame A pandas DataFrame the call queue needs to be executed on. + num_funcs : int + The number of functions in the call queue. + arg_lengths : list of ints + The number of positional arguments for each function in the call queue. + kw_key_lengths : list of ints + The number of key-word arguments for each function in the call queue. + kw_value_lengths : 2D list of dict{"len": int, "was_iterable": bool} + Description of keyword arguments for each function. For example, `kw_value_lengths[i][j]` + describes the j-th keyword argument of the i-th function in the call queue. + The describtion contains of the lengths of the argument and whether it's a list at all + (for example, {"len": 1, "was_iterable": False} describes a non-list argument). + *futures : list + A 1D call queue that is result of the ``deconstruct_call_queue()`` function. Returns ------- @@ -413,10 +569,10 @@ def _apply_list_of_funcs(call_queue, partition): # pragma: no cover str The node IP address of the worker process. """ - for func, f_args, f_kwargs in call_queue: - func = deserialize(func) - args = deserialize(f_args) - kwargs = deserialize(f_kwargs) + call_queue = reconstruct_call_queue( + num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, futures + ) + for func, args, kwargs in call_queue: try: partition = func(partition, *args, **kwargs) # Sometimes Arrow forces us to make a copy of an object before we operate on it. We diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index ddecde800ac..d7b1242ba75 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -1390,3 +1390,124 @@ def test_sort_values_cache(): # check that the initial frame's cache wasn't changed assert mf_initial._column_widths_cache == [32, 32] validate_partitions_cache(mf_initial, axis=1) + + +class DummyFuture: + """ + A dummy object emulating future's behaviour, this class is used in ``test_call_queue_serialization``. + + It stores a random numeric value representing its data and `was_materialized` state. + Initially this object is considered to be serialized, the state can be changed by calling + the ``.materialize()`` method. + """ + + def __init__(self): + self._value = np.random.randint(0, 1_000_000) + self._was_materialized = False + + def materialize(self): + self._was_materialized = True + return self + + def __eq__(self, other): + if isinstance(other, type(self)) and self._value == other._value: + return True + return False + + +@pytest.mark.parametrize( + "call_queue", + [ + # empty call queue + [], + # a single-function call queue (the function has no argument and it's materialized) + [(0, [], {})], + # a single-function call queue (the function has no argument and it's serialized) + [(DummyFuture(), [], {})], + # a multiple-functions call queue, none of the functions have arguments + [(DummyFuture(), [], {}), (DummyFuture(), [], {}), (0, [], {})], + # a single-function call queue (the function has both positional and keyword arguments) + [ + ( + DummyFuture(), + [DummyFuture()], + { + "a": DummyFuture(), + "b": [DummyFuture()], + "c": [DummyFuture, DummyFuture()], + }, + ) + ], + # a multiple-functions call queue with mixed types of functions/arguments + [ + ( + DummyFuture(), + [1, DummyFuture(), DummyFuture(), [4, 5]], + {"a": [DummyFuture(), 2], "b": DummyFuture(), "c": [1]}, + ), + (0, [], {}), + (0, [1], {}), + (0, [DummyFuture(), DummyFuture()], {}), + ], + ], +) +def test_call_queue_serialization(call_queue): + """ + Test that the process of passing a call queue to Ray's kernel works correctly. + + Before passing a call queue to the kernel that actually executes it, the call queue + is unwrapped into a 1D list using the ``deconstruct_call_queue`` function. After that, + the 1D list is passed as a variable length argument to the kernel ``kernel(*queue)``, + this is done so the Ray engine automatically materialize all the futures that the queue + might have contained. In the end, inside of the kernel, the ``reconstruct_call_queue`` function + is called to rebuild the call queue into its original structure. + + This test emulates the described flow and verifies that it works properly. + """ + from modin.core.execution.ray.implementations.pandas_on_ray.partitioning.partition import ( + deconstruct_call_queue, + reconstruct_call_queue, + ) + + def materialize_queue(*values): + """ + Walk over the `values` and materialize all the future types. + + This function emulates how Ray remote functions materialize their positional arguments. + """ + return [ + val.materialize() if isinstance(val, DummyFuture) else val for val in values + ] + + def assert_everything_materialized(queue): + """Walk over the call queue and verify that all entities there are materialized.""" + + def assert_materialized(obj): + assert ( + isinstance(obj, DummyFuture) and obj._was_materialized + ) or not isinstance(obj, DummyFuture) + + for func, args, kwargs in queue: + assert_materialized(func) + for arg in args: + assert_materialized(arg) + for value in kwargs.values(): + if not isinstance(value, (list, tuple)): + value = [value] + for val in value: + assert_materialized(val) + + ( + num_funcs, + arg_lengths, + kw_key_lengths, + kw_value_lengths, + queue, + ) = deconstruct_call_queue(call_queue) + queue = materialize_queue(*queue) + reconstructed_queue = reconstruct_call_queue( + num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, queue + ) + + assert call_queue == reconstructed_queue + assert_everything_materialized(reconstructed_queue) From 67b3c9803f3ecc457597e5aa6b6e15eebf19423f Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 11 Oct 2023 10:28:50 +0000 Subject: [PATCH 2/5] apply suggestions from code review Signed-off-by: Dmitry Chigarev --- modin/core/execution/ray/common/utils.py | 108 ++++++++++++ .../pandas_on_ray/partitioning/partition.py | 155 +----------------- .../partitioning/virtual_partition.py | 29 ++-- 3 files changed, 134 insertions(+), 158 deletions(-) diff --git a/modin/core/execution/ray/common/utils.py b/modin/core/execution/ray/common/utils.py index 3b168c6f044..cace16c9f28 100644 --- a/modin/core/execution/ray/common/utils.py +++ b/modin/core/execution/ray/common/utils.py @@ -286,3 +286,111 @@ def deserialize(obj): # pragma: no cover return dict(zip(obj.keys(), RayWrapper.materialize(list(obj.values())))) else: return obj + + +def deconstruct_call_queue(call_queue): + """ + Deconstruct the passed call queue into a 1D list. + + This is required, so the call queue can be then passed to a Ray's kernel + as a variable-length argument ``kernel(*queue)`` so the Ray engine + automatically materialize all the futures that the queue might have contained. + + Parameters + ---------- + call_queue : list[list[func, args, kwargs], ...] + + Returns + ------- + num_funcs : int + The number of functions in the call queue. + arg_lengths : list of ints + The number of positional arguments for each function in the call queue. + kw_key_lengths : list of ints + The number of key-word arguments for each function in the call queue. + kw_value_lengths : 2D list of dict{"len": int, "was_iterable": bool} + Description of keyword arguments for each function. For example, `kw_value_lengths[i][j]` + describes the j-th keyword argument of the i-th function in the call queue. + The describtion contains of the lengths of the argument and whether it's a list at all + (for example, {"len": 1, "was_iterable": False} describes a non-list argument). + unfolded_queue : list + A 1D call queue that can be reconstructed using ``reconstruct_call_queue`` function. + """ + num_funcs = len(call_queue) + arg_lengths = [] + kw_key_lengths = [] + kw_value_lengths = [] + unfolded_queue = [] + for call in call_queue: + unfolded_queue.append(call[0]) + unfolded_queue.extend(call[1]) + arg_lengths.append(len(call[1])) + # unfold keyword dict + ## unfold keys + unfolded_queue.extend(call[2].keys()) + kw_key_lengths.append(len(call[2])) + ## unfold values + value_lengths = [] + for value in call[2].values(): + was_iterable = True + if not isinstance(value, (list, tuple)): + was_iterable = False + value = [value] + unfolded_queue.extend(value) + value_lengths.append({"len": len(value), "was_iterable": was_iterable}) + kw_value_lengths.append(value_lengths) + + return num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, unfolded_queue + + +def reconstruct_call_queue( + num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, unfolded_queue +): + """ + Reconstruct original call queue from the result of the ``deconstruct_call_queue()``. + + Parameters + ---------- + num_funcs : int + The number of functions in the call queue. + arg_lengths : list of ints + The number of positional arguments for each function in the call queue. + kw_key_lengths : list of ints + The number of key-word arguments for each function in the call queue. + kw_value_lengths : 2D list of dict{"len": int, "was_iterable": bool} + Description of keyword arguments for each function. For example, `kw_value_lengths[i][j]` + describes the j-th keyword argument of the i-th function in the call queue. + The describtion contains of the lengths of the argument and whether it's a list at all + (for example, {"len": 1, "was_iterable": False} describes a non-list argument). + unfolded_queue : list + A 1D call queue that is result of the ``deconstruct_call_queue()`` function. + + Returns + ------- + list[list[func, args, kwargs], ...] + Original call queue. + """ + items_took = 0 + + def take_n_items(n): + nonlocal items_took + res = unfolded_queue[items_took : items_took + n] + items_took += n + return res + + call_queue = [] + for i in range(num_funcs): + func = take_n_items(1)[0] + args = take_n_items(arg_lengths[i]) + kw_keys = take_n_items(kw_key_lengths[i]) + kwargs = {} + value_lengths = kw_value_lengths[i] + for j, key in enumerate(kw_keys): + vals = take_n_items(value_lengths[j]["len"]) + if value_lengths[j]["len"] == 1 and not value_lengths[j]["was_iterable"]: + vals = vals[0] + kwargs[key] = vals + + call_queue.append((func, args, kwargs)) + + return call_queue diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py index 06e8789c543..339f6bf64fc 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py @@ -18,7 +18,11 @@ from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition from modin.core.execution.ray.common import RayWrapper -from modin.core.execution.ray.common.utils import ObjectIDType +from modin.core.execution.ray.common.utils import ( + ObjectIDType, + deconstruct_call_queue, + reconstruct_call_queue, +) from modin.logging import get_logger from modin.pandas.indexing import compute_sliced_len @@ -66,43 +70,6 @@ def __init__(self, data, length=None, width=None, ip=None, call_queue=None): ) ) - @staticmethod - def _apply_call_queue(call_queue, data): - """ - Execute call queue over the given `data`. - - Parameters - ---------- - call_queue : list[list[func, args, kwargs], ...] - data : ray.ObjectRef - - Returns - ------- - ray.ObjectRef of pandas.DataFrame - The resulting pandas DataFrame. - ray.ObjectRef of int - The number of rows of the resulting pandas DataFrame. - ray.ObjectRef of int - The number of columns of the resulting pandas DataFrame. - ray.ObjectRef of str - The node IP address of the worker process. - """ - ( - num_funcs, - arg_lengths, - kw_key_lengths, - kw_value_lengths, - unfolded_queue, - ) = deconstruct_call_queue(call_queue) - return _apply_list_of_funcs.remote( - data, - num_funcs, - arg_lengths, - kw_key_lengths, - kw_value_lengths, - *unfolded_queue, - ) - def apply(self, func, *args, **kwargs): """ Apply a function to the object wrapped by this partition. @@ -134,7 +101,9 @@ def apply(self, func, *args, **kwargs): self._is_debug(log) and log.debug( f"SUBMIT::_apply_list_of_funcs::{self._identity}" ) - result, length, width, ip = self._apply_call_queue(call_queue, data) + result, length, width, ip = _apply_list_of_funcs.remote( + data, *deconstruct_call_queue(call_queue) + ) else: # We handle `len(call_queue) == 1` in a different way because # this dramatically improves performance. @@ -165,7 +134,7 @@ def drain_call_queue(self): new_length, new_width, self._ip_cache, - ) = self._apply_call_queue(call_queue, data) + ) = _apply_list_of_funcs.remote(data, *deconstruct_call_queue(call_queue)) else: # We handle `len(call_queue) == 1` in a different way because # this dramatically improves performance. @@ -380,112 +349,6 @@ def _get_index_and_columns(df): # pragma: no cover return len(df.index), len(df.columns) -def deconstruct_call_queue(call_queue): - """ - Deconstruct the passed call queue into a 1D list. - - This is required, so the call queue can be then passed to a Ray's kernel - as a variable-length argument ``kernel(*queue)`` so the Ray engine - automatically materialize all the futures that the queue might have contained. - - Parameters - ---------- - call_queue : list[list[func, args, kwargs], ...] - - Returns - ------- - num_funcs : int - The number of functions in the call queue. - arg_lengths : list of ints - The number of positional arguments for each function in the call queue. - kw_key_lengths : list of ints - The number of key-word arguments for each function in the call queue. - kw_value_lengths : 2D list of dict{"len": int, "was_iterable": bool} - Description of keyword arguments for each function. For example, `kw_value_lengths[i][j]` - describes the j-th keyword argument of the i-th function in the call queue. - The describtion contains of the lengths of the argument and whether it's a list at all - (for example, {"len": 1, "was_iterable": False} describes a non-list argument). - unfolded_queue : list - A 1D call queue that can be reconstructed using ``reconstruct_call_queue`` function. - """ - num_funcs = len(call_queue) - arg_lengths = [] - kw_key_lengths = [] - kw_value_lengths = [] - unfolded_queue = [] - for call in call_queue: - unfolded_queue.append(call[0]) - unfolded_queue.extend(call[1]) - unfolded_queue.extend(call[2].keys()) - value_lengths = [] - for value in call[2].values(): - was_iterable = True - if not isinstance(value, (list, tuple)): - was_iterable = False - value = [value] - unfolded_queue.extend(value) - value_lengths.append({"len": len(value), "was_iterable": was_iterable}) - - arg_lengths.append(len(call[1])) - kw_key_lengths.append(len(call[2])) - kw_value_lengths.append(value_lengths) - - return num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, unfolded_queue - - -def reconstruct_call_queue( - num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, unfolded_queue -): - """ - Reconstruct original call queue from the result of the ``deconstruct_call_queue()``. - - Parameters - ---------- - num_funcs : int - The number of functions in the call queue. - arg_lengths : list of ints - The number of positional arguments for each function in the call queue. - kw_key_lengths : list of ints - The number of key-word arguments for each function in the call queue. - kw_value_lengths : 2D list of dict{"len": int, "was_iterable": bool} - Description of keyword arguments for each function. For example, `kw_value_lengths[i][j]` - describes the j-th keyword argument of the i-th function in the call queue. - The describtion contains of the lengths of the argument and whether it's a list at all - (for example, {"len": 1, "was_iterable": False} describes a non-list argument). - unfolded_queue : list - A 1D call queue that is result of the ``deconstruct_call_queue()`` function. - - Returns - ------- - list[list[func, args, kwargs], ...] - Original call queue. - """ - items_took = 0 - - def take_n_items(n): - nonlocal items_took - res = unfolded_queue[items_took : items_took + n] - items_took += n - return res - - call_queue = [] - for i in range(num_funcs): - func = take_n_items(1)[0] - args = take_n_items(arg_lengths[i]) - kw_keys = take_n_items(kw_key_lengths[i]) - kwargs = {} - value_lengths = kw_value_lengths[i] - for j, key in enumerate(kw_keys): - vals = take_n_items(value_lengths[j]["len"]) - if value_lengths[j]["len"] == 1 and not value_lengths[j]["was_iterable"]: - vals = vals[0] - kwargs[key] = vals - - call_queue.append((func, args, kwargs)) - - return call_queue - - @ray.remote(num_returns=4) def _apply_func(partition, func, *args, **kwargs): # pragma: no cover """ diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index c04680a1a36..ab8ce08d195 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -21,7 +21,6 @@ PandasDataframeAxisPartition, ) from modin.core.execution.ray.common import RayWrapper -from modin.core.execution.ray.common.utils import deserialize from modin.utils import _inherit_docstrings from .partition import PandasOnRayDataframePartition @@ -118,9 +117,10 @@ def deploy_splitting_func( cls._get_deploy_split_func(), axis, func, - f_args, + len(f_args), f_kwargs, num_splits, + *f_args, *partitions, extract_metadata=extract_metadata, ) @@ -179,10 +179,11 @@ def deploy_axis_func( cls._get_deploy_axis_func(), axis, func, - f_args, + len(f_args), f_kwargs, num_splits, maintain_partitioning, + *f_args, *partitions, manual_partition=manual_partition, lengths=lengths, @@ -234,11 +235,12 @@ def deploy_func_between_two_axis_partitions( PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions, axis, func, - f_args, + len(f_args), f_kwargs, num_splits, len_of_left, other_shape, + *f_args, *partitions, ) @@ -264,9 +266,9 @@ def _deploy_ray_func( deployer, axis, f_to_deploy, - f_args, + f_len_args, f_kwargs, - *args, + *futures, extract_metadata=True, **kwargs, ): # pragma: no cover @@ -287,12 +289,14 @@ def _deploy_ray_func( The axis to perform the function along. f_to_deploy : callable or RayObjectID The function to deploy. - f_args : list or tuple - Positional arguments to pass to ``f_to_deploy``. + f_len_args : int + Number of positional arguments to pass to ``f_to_deploy``. f_kwargs : dict Keyword arguments to pass to ``f_to_deploy``. - *args : list - Positional arguments to pass to ``deployer``. + *futures : list + The first `f_len_args` elements in this list represent positional arguments + to pass to the `f_to_deploy`. The rest are partitions that will be passed as + positional arguments to `deployer`. extract_metadata : bool, default: True Whether to return metadata (length, width, ip) of the result. Passing `False` may relax the load on object storage as the remote function would return 4 times fewer futures. @@ -310,8 +314,9 @@ def _deploy_ray_func( ----- Ray functions are not detected by codecov (thus pragma: no cover). """ - f_args = deserialize(f_args) - result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs) + f_args = futures[:f_len_args] + partitions = futures[f_len_args:] + result = deployer(axis, f_to_deploy, f_args, f_kwargs, *partitions, **kwargs) if not extract_metadata: return result ip = get_node_ip_address() From 0bb11b494432886e9c29778418b1dea06e5db363 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 11 Oct 2023 11:05:21 +0000 Subject: [PATCH 3/5] fix positional args passing Signed-off-by: Dmitry Chigarev --- .../pandas_on_ray/partitioning/partition.py | 43 +++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py index 339f6bf64fc..c7404bd4dc9 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py @@ -70,6 +70,43 @@ def __init__(self, data, length=None, width=None, ip=None, call_queue=None): ) ) + @staticmethod + def _apply_call_queue(call_queue, data): + """ + Execute call queue over the given `data`. + + Parameters + ---------- + call_queue : list[list[func, args, kwargs], ...] + data : ray.ObjectRef + + Returns + ------- + ray.ObjectRef of pandas.DataFrame + The resulting pandas DataFrame. + ray.ObjectRef of int + The number of rows of the resulting pandas DataFrame. + ray.ObjectRef of int + The number of columns of the resulting pandas DataFrame. + ray.ObjectRef of str + The node IP address of the worker process. + """ + ( + num_funcs, + arg_lengths, + kw_key_lengths, + kw_value_lengths, + unfolded_queue, + ) = deconstruct_call_queue(call_queue) + return _apply_list_of_funcs.remote( + data, + num_funcs, + arg_lengths, + kw_key_lengths, + kw_value_lengths, + *unfolded_queue, + ) + def apply(self, func, *args, **kwargs): """ Apply a function to the object wrapped by this partition. @@ -101,9 +138,7 @@ def apply(self, func, *args, **kwargs): self._is_debug(log) and log.debug( f"SUBMIT::_apply_list_of_funcs::{self._identity}" ) - result, length, width, ip = _apply_list_of_funcs.remote( - data, *deconstruct_call_queue(call_queue) - ) + result, length, width, ip = self._apply_call_queue(call_queue, data) else: # We handle `len(call_queue) == 1` in a different way because # this dramatically improves performance. @@ -134,7 +169,7 @@ def drain_call_queue(self): new_length, new_width, self._ip_cache, - ) = _apply_list_of_funcs.remote(data, *deconstruct_call_queue(call_queue)) + ) = self._apply_call_queue(call_queue, data) else: # We handle `len(call_queue) == 1` in a different way because # this dramatically improves performance. From 746fdbd4cddfc0cf52a636bab7ab33e131d93280 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 11 Oct 2023 11:44:11 +0000 Subject: [PATCH 4/5] fix full-axis functions Signed-off-by: Dmitry Chigarev --- .../partitioning/virtual_partition.py | 63 ++++++++++--------- 1 file changed, 32 insertions(+), 31 deletions(-) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py index ab8ce08d195..66463ecb2ba 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/virtual_partition.py @@ -115,13 +115,13 @@ def deploy_splitting_func( else num_splits, ).remote( cls._get_deploy_split_func(), - axis, - func, - len(f_args), - f_kwargs, - num_splits, *f_args, + num_splits, *partitions, + axis=axis, + f_to_deploy=func, + f_len_args=len(f_args), + f_kwargs=f_kwargs, extract_metadata=extract_metadata, ) @@ -177,14 +177,14 @@ def deploy_axis_func( **({"max_retries": max_retries} if max_retries is not None else {}), ).remote( cls._get_deploy_axis_func(), - axis, - func, - len(f_args), - f_kwargs, + *f_args, num_splits, maintain_partitioning, - *f_args, *partitions, + axis=axis, + f_to_deploy=func, + f_len_args=len(f_args), + f_kwargs=f_kwargs, manual_partition=manual_partition, lengths=lengths, ) @@ -233,15 +233,15 @@ def deploy_func_between_two_axis_partitions( num_returns=num_splits * (1 + cls._PARTITIONS_METADATA_LEN) ).remote( PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions, - axis, - func, - len(f_args), - f_kwargs, + *f_args, num_splits, len_of_left, other_shape, - *f_args, *partitions, + axis=axis, + f_to_deploy=func, + f_len_args=len(f_args), + f_kwargs=f_kwargs, ) def wait(self): @@ -264,11 +264,11 @@ class PandasOnRayDataframeRowPartition(PandasOnRayDataframeVirtualPartition): @ray.remote def _deploy_ray_func( deployer, + *positional_args, axis, f_to_deploy, f_len_args, f_kwargs, - *futures, extract_metadata=True, **kwargs, ): # pragma: no cover @@ -277,31 +277,32 @@ def _deploy_ray_func( This is ALWAYS called on either ``PandasDataframeAxisPartition.deploy_axis_func`` or ``PandasDataframeAxisPartition.deploy_func_between_two_axis_partitions``, which both - serve to deploy another dataframe function on a Ray worker process. The provided ``f_args`` - is thus are deserialized here (on the Ray worker) before the function is called (``f_kwargs`` - will never contain more Ray objects, and thus does not require deserialization). + serve to deploy another dataframe function on a Ray worker process. The provided `positional_args` + contains positional arguments for both: `deployer` and for `f_to_deploy`, the parameters can be separated + using the `f_len_args` value. The parameters are combined so they will be deserialized by Ray before the + kernel is executed (`f_kwargs` will never contain more Ray objects, and thus does not require deserialization). Parameters ---------- deployer : callable A `PandasDataFrameAxisPartition.deploy_*` method that will call ``f_to_deploy``. + *positional_args : list + The first `f_len_args` elements in this list represent positional arguments + to pass to the `f_to_deploy`. The rest are positional arguments that will be + passed to `deployer`. axis : {0, 1} - The axis to perform the function along. + The axis to perform the function along. This argument is keyword only. f_to_deploy : callable or RayObjectID - The function to deploy. + The function to deploy. This argument is keyword only. f_len_args : int - Number of positional arguments to pass to ``f_to_deploy``. + Number of positional arguments to pass to ``f_to_deploy``. This argument is keyword only. f_kwargs : dict - Keyword arguments to pass to ``f_to_deploy``. - *futures : list - The first `f_len_args` elements in this list represent positional arguments - to pass to the `f_to_deploy`. The rest are partitions that will be passed as - positional arguments to `deployer`. + Keyword arguments to pass to ``f_to_deploy``. This argument is keyword only. extract_metadata : bool, default: True Whether to return metadata (length, width, ip) of the result. Passing `False` may relax the load on object storage as the remote function would return 4 times fewer futures. Passing `False` makes sense for temporary results where you know for sure that the - metadata will never be requested. + metadata will never be requested. This argument is keyword only. **kwargs : dict Keyword arguments to pass to ``deployer``. @@ -314,9 +315,9 @@ def _deploy_ray_func( ----- Ray functions are not detected by codecov (thus pragma: no cover). """ - f_args = futures[:f_len_args] - partitions = futures[f_len_args:] - result = deployer(axis, f_to_deploy, f_args, f_kwargs, *partitions, **kwargs) + f_args = positional_args[:f_len_args] + deploy_args = positional_args[f_len_args:] + result = deployer(axis, f_to_deploy, f_args, f_kwargs, *deploy_args, **kwargs) if not extract_metadata: return result ip = get_node_ip_address() From fbfc276f2ec1d321c2c60be1b4e08c3bc37a7f32 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 11 Oct 2023 14:57:20 +0000 Subject: [PATCH 5/5] apply suggestions Signed-off-by: Dmitry Chigarev --- modin/core/execution/ray/common/utils.py | 10 ++--- .../pandas_on_ray/partitioning/partition.py | 43 ++----------------- .../storage_formats/pandas/test_internals.py | 2 +- 3 files changed, 10 insertions(+), 45 deletions(-) diff --git a/modin/core/execution/ray/common/utils.py b/modin/core/execution/ray/common/utils.py index cace16c9f28..bca48f97473 100644 --- a/modin/core/execution/ray/common/utils.py +++ b/modin/core/execution/ray/common/utils.py @@ -335,12 +335,12 @@ def deconstruct_call_queue(call_queue): was_iterable = True if not isinstance(value, (list, tuple)): was_iterable = False - value = [value] + value = (value,) unfolded_queue.extend(value) value_lengths.append({"len": len(value), "was_iterable": was_iterable}) kw_value_lengths.append(value_lengths) - return num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, unfolded_queue + return num_funcs, arg_lengths, kw_key_lengths, kw_value_lengths, *unfolded_queue def reconstruct_call_queue( @@ -385,9 +385,9 @@ def take_n_items(n): kw_keys = take_n_items(kw_key_lengths[i]) kwargs = {} value_lengths = kw_value_lengths[i] - for j, key in enumerate(kw_keys): - vals = take_n_items(value_lengths[j]["len"]) - if value_lengths[j]["len"] == 1 and not value_lengths[j]["was_iterable"]: + for key, value_length in zip(kw_keys, value_lengths): + vals = take_n_items(value_length["len"]) + if value_length["len"] == 1 and not value_length["was_iterable"]: vals = vals[0] kwargs[key] = vals diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py index c7404bd4dc9..339f6bf64fc 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py @@ -70,43 +70,6 @@ def __init__(self, data, length=None, width=None, ip=None, call_queue=None): ) ) - @staticmethod - def _apply_call_queue(call_queue, data): - """ - Execute call queue over the given `data`. - - Parameters - ---------- - call_queue : list[list[func, args, kwargs], ...] - data : ray.ObjectRef - - Returns - ------- - ray.ObjectRef of pandas.DataFrame - The resulting pandas DataFrame. - ray.ObjectRef of int - The number of rows of the resulting pandas DataFrame. - ray.ObjectRef of int - The number of columns of the resulting pandas DataFrame. - ray.ObjectRef of str - The node IP address of the worker process. - """ - ( - num_funcs, - arg_lengths, - kw_key_lengths, - kw_value_lengths, - unfolded_queue, - ) = deconstruct_call_queue(call_queue) - return _apply_list_of_funcs.remote( - data, - num_funcs, - arg_lengths, - kw_key_lengths, - kw_value_lengths, - *unfolded_queue, - ) - def apply(self, func, *args, **kwargs): """ Apply a function to the object wrapped by this partition. @@ -138,7 +101,9 @@ def apply(self, func, *args, **kwargs): self._is_debug(log) and log.debug( f"SUBMIT::_apply_list_of_funcs::{self._identity}" ) - result, length, width, ip = self._apply_call_queue(call_queue, data) + result, length, width, ip = _apply_list_of_funcs.remote( + data, *deconstruct_call_queue(call_queue) + ) else: # We handle `len(call_queue) == 1` in a different way because # this dramatically improves performance. @@ -169,7 +134,7 @@ def drain_call_queue(self): new_length, new_width, self._ip_cache, - ) = self._apply_call_queue(call_queue, data) + ) = _apply_list_of_funcs.remote(data, *deconstruct_call_queue(call_queue)) else: # We handle `len(call_queue) == 1` in a different way because # this dramatically improves performance. diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index d7b1242ba75..71310f3ee5d 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -1502,7 +1502,7 @@ def assert_materialized(obj): arg_lengths, kw_key_lengths, kw_value_lengths, - queue, + *queue, ) = deconstruct_call_queue(call_queue) queue = materialize_queue(*queue) reconstructed_queue = reconstruct_call_queue(