Skip to content

Commit

Permalink
fix positional args passing
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev committed Oct 11, 2023
1 parent 67b3c98 commit 0bb11b4
Showing 1 changed file with 39 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,43 @@ def __init__(self, data, length=None, width=None, ip=None, call_queue=None):
)
)

@staticmethod
def _apply_call_queue(call_queue, data):
"""
Execute call queue over the given `data`.
Parameters
----------
call_queue : list[list[func, args, kwargs], ...]
data : ray.ObjectRef
Returns
-------
ray.ObjectRef of pandas.DataFrame
The resulting pandas DataFrame.
ray.ObjectRef of int
The number of rows of the resulting pandas DataFrame.
ray.ObjectRef of int
The number of columns of the resulting pandas DataFrame.
ray.ObjectRef of str
The node IP address of the worker process.
"""
(
num_funcs,
arg_lengths,
kw_key_lengths,
kw_value_lengths,
unfolded_queue,
) = deconstruct_call_queue(call_queue)
return _apply_list_of_funcs.remote(
data,
num_funcs,
arg_lengths,
kw_key_lengths,
kw_value_lengths,
*unfolded_queue,
)

def apply(self, func, *args, **kwargs):
"""
Apply a function to the object wrapped by this partition.
Expand Down Expand Up @@ -101,9 +138,7 @@ def apply(self, func, *args, **kwargs):
self._is_debug(log) and log.debug(
f"SUBMIT::_apply_list_of_funcs::{self._identity}"
)
result, length, width, ip = _apply_list_of_funcs.remote(
data, *deconstruct_call_queue(call_queue)
)
result, length, width, ip = self._apply_call_queue(call_queue, data)
else:
# We handle `len(call_queue) == 1` in a different way because
# this dramatically improves performance.
Expand Down Expand Up @@ -134,7 +169,7 @@ def drain_call_queue(self):
new_length,
new_width,
self._ip_cache,
) = _apply_list_of_funcs.remote(data, *deconstruct_call_queue(call_queue))
) = self._apply_call_queue(call_queue, data)
else:
# We handle `len(call_queue) == 1` in a different way because
# this dramatically improves performance.
Expand Down

0 comments on commit 0bb11b4

Please sign in to comment.