From 0bb11b494432886e9c29778418b1dea06e5db363 Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Wed, 11 Oct 2023 11:05:21 +0000 Subject: [PATCH] fix positional args passing Signed-off-by: Dmitry Chigarev --- .../pandas_on_ray/partitioning/partition.py | 43 +++++++++++++++++-- 1 file changed, 39 insertions(+), 4 deletions(-) diff --git a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py index 339f6bf64fc..c7404bd4dc9 100644 --- a/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py +++ b/modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py @@ -70,6 +70,43 @@ def __init__(self, data, length=None, width=None, ip=None, call_queue=None): ) ) + @staticmethod + def _apply_call_queue(call_queue, data): + """ + Execute call queue over the given `data`. + + Parameters + ---------- + call_queue : list[list[func, args, kwargs], ...] + data : ray.ObjectRef + + Returns + ------- + ray.ObjectRef of pandas.DataFrame + The resulting pandas DataFrame. + ray.ObjectRef of int + The number of rows of the resulting pandas DataFrame. + ray.ObjectRef of int + The number of columns of the resulting pandas DataFrame. + ray.ObjectRef of str + The node IP address of the worker process. + """ + ( + num_funcs, + arg_lengths, + kw_key_lengths, + kw_value_lengths, + unfolded_queue, + ) = deconstruct_call_queue(call_queue) + return _apply_list_of_funcs.remote( + data, + num_funcs, + arg_lengths, + kw_key_lengths, + kw_value_lengths, + *unfolded_queue, + ) + def apply(self, func, *args, **kwargs): """ Apply a function to the object wrapped by this partition. @@ -101,9 +138,7 @@ def apply(self, func, *args, **kwargs): self._is_debug(log) and log.debug( f"SUBMIT::_apply_list_of_funcs::{self._identity}" ) - result, length, width, ip = _apply_list_of_funcs.remote( - data, *deconstruct_call_queue(call_queue) - ) + result, length, width, ip = self._apply_call_queue(call_queue, data) else: # We handle `len(call_queue) == 1` in a different way because # this dramatically improves performance. @@ -134,7 +169,7 @@ def drain_call_queue(self): new_length, new_width, self._ip_cache, - ) = _apply_list_of_funcs.remote(data, *deconstruct_call_queue(call_queue)) + ) = self._apply_call_queue(call_queue, data) else: # We handle `len(call_queue) == 1` in a different way because # this dramatically improves performance.