Skip to content

Commit

Permalink
FEAT-#6990: Implement lazy execution for the Ray virtual partitions.
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreyPavlenko committed Mar 1, 2024
1 parent a966395 commit ea540cc
Show file tree
Hide file tree
Showing 4 changed files with 452 additions and 298 deletions.
16 changes: 13 additions & 3 deletions modin/core/dataframe/pandas/partitioning/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ def deploy_axis_func(
*partitions,
lengths=None,
manual_partition=False,
split_func=split_result_of_axis_func_pandas,
):
"""
Deploy a function along a full axis.
Expand All @@ -413,13 +414,19 @@ def deploy_axis_func(
The list of lengths to shuffle the object.
manual_partition : bool, default: False
If True, partition the result with `lengths`.
split_func : callable, optional
Split the result with the specified function.
Returns
-------
list
A list of pandas DataFrames.
"""
dataframe = pandas.concat(list(partitions), axis=axis, copy=False)
dataframe = (
partitions[0]
if len(partitions) == 1
else pandas.concat(partitions, axis=axis, copy=False)
)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
try:
Expand Down Expand Up @@ -451,7 +458,7 @@ def deploy_axis_func(
lengths = [len(part.columns) for part in partitions]
if sum(lengths) != len(result.columns):
lengths = None
return split_result_of_axis_func_pandas(axis, num_splits, result, lengths)
return split_func(axis, num_splits, result, lengths)

@classmethod
def deploy_func_between_two_axis_partitions(
Expand All @@ -464,6 +471,7 @@ def deploy_func_between_two_axis_partitions(
len_of_left,
other_shape,
*partitions,
split_func=split_result_of_axis_func_pandas,
):
"""
Deploy a function along a full axis between two data sets.
Expand All @@ -487,6 +495,8 @@ def deploy_func_between_two_axis_partitions(
(other_shape[i-1], other_shape[i]) will indicate slice to restore i-1 axis partition.
*partitions : iterable
All partitions that make up the full axis (row or column) for both data sets.
split_func : callable, optional
Split the result with the specified function.
Returns
-------
Expand All @@ -510,7 +520,7 @@ def deploy_func_between_two_axis_partitions(
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
result = func(lt_frame, rt_frame, *f_args, **f_kwargs)
return split_result_of_axis_func_pandas(axis, num_splits, result)
return split_func(axis, num_splits, result)

Check warning on line 523 in modin/core/dataframe/pandas/partitioning/axis_partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/dataframe/pandas/partitioning/axis_partition.py#L523

Added line #L523 was not covered by tests

@classmethod
def drain(cls, df: pandas.DataFrame, call_queue: list):
Expand Down
83 changes: 47 additions & 36 deletions modin/core/execution/ray/common/deferred_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,14 @@ class DeferredExecution:
The execution input.
func : callable or ObjectRefType
A function to be executed.
args : list or tuple
args : list or tuple, optional
Additional positional arguments to be passed in `func`.
kwargs : dict
kwargs : dict, optional
Additional keyword arguments to be passed in `func`.
num_returns : int
num_returns : int, default: 1
The number of the return values.
flat_data : bool
True means that the data is neither DeferredExecution nor list.
flat_args : bool
True means that there are no lists or DeferredExecution objects in `args`.
In this case, no arguments processing is performed and `args` is passed
Expand All @@ -88,26 +90,29 @@ class DeferredExecution:

def __init__(
self,
data: Union[
ObjectRefType,
"DeferredExecution",
List[Union[ObjectRefType, "DeferredExecution"]],
],
data: Any,
func: Union[Callable, ObjectRefType],
args: Union[List[Any], Tuple[Any]],
kwargs: Dict[str, Any],
args: Union[List[Any], Tuple[Any]] = None,
kwargs: Dict[str, Any] = None,
num_returns=1,
):
if isinstance(data, DeferredExecution):
data.subscribe()
self.flat_data = self._flat_args((data,))

Check warning on line 99 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L99

Added line #L99 was not covered by tests
self.data = data
self.func = func
self.args = args
self.kwargs = kwargs
self.num_returns = num_returns
self.flat_args = self._flat_args(args)
self.flat_kwargs = self._flat_args(kwargs.values())
self.subscribers = 0
if args is not None:
self.args = args
self.flat_args = self._flat_args(args)

Check warning on line 106 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L104-L106

Added lines #L104 - L106 were not covered by tests
else:
self.args = ()
self.flat_args = True
if kwargs is not None:
self.kwargs = kwargs
self.flat_kwargs = self._flat_args(kwargs.values())

Check warning on line 112 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L108-L112

Added lines #L108 - L112 were not covered by tests
else:
self.kwargs = {}
self.flat_kwargs = True

Check warning on line 115 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L114-L115

Added lines #L114 - L115 were not covered by tests

@classmethod
def _flat_args(cls, args: Iterable):
Expand All @@ -134,7 +139,7 @@ def _flat_args(cls, args: Iterable):

def exec(
self,
) -> Tuple[ObjectRefOrListType, Union["MetaList", List], Union[int, List[int]]]:
) -> Tuple[ObjectRefOrListType, "MetaList", Union[int, List[int]]]:
"""
Execute this task, if required.
Expand All @@ -150,7 +155,7 @@ def exec(
return self.data, self.meta, self.meta_offset

if (
not isinstance(self.data, DeferredExecution)
self.flat_data
and self.flat_args
and self.flat_kwargs
and self.num_returns == 1
Expand All @@ -166,14 +171,16 @@ def exec(
# it back. After the execution, the result is saved and the counter has no effect.
self.subscribers += 2
consumers, output = self._deconstruct()
assert not any(isinstance(o, ListOrTuple) for o in output)
# The last result is the MetaList, so adding +1 here.
num_returns = sum(c.num_returns for c in consumers) + 1
results = self._remote_exec_chain(num_returns, *output)
meta = MetaList(results.pop())
meta_offset = 0
results = iter(results)
for de in consumers:
if de.num_returns == 1:
num_returns = de.num_returns
if num_returns == 1:

Check warning on line 183 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L182-L183

Added lines #L182 - L183 were not covered by tests
de._set_result(next(results), meta, meta_offset)
meta_offset += 2
else:
Expand Down Expand Up @@ -318,6 +325,7 @@ def _deconstruct_chain(
break
elif not isinstance(data := de.data, DeferredExecution):
if isinstance(data, ListOrTuple):
out_append(_Tag.LIST)

Check warning on line 328 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L328

Added line #L328 was not covered by tests
yield cls._deconstruct_list(
data, output, stack, result_consumers, out_append
)
Expand Down Expand Up @@ -394,7 +402,13 @@ def _deconstruct_list(
if out_pos := getattr(obj, "out_pos", None):
obj.unsubscribe()
if obj.has_result:
out_append(obj.data)
if isinstance(obj.data, ListOrTuple):
out_append(_Tag.LIST)
yield cls._deconstruct_list(

Check warning on line 407 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L405-L407

Added lines #L405 - L407 were not covered by tests
obj.data, output, stack, result_consumers, out_append
)
else:
out_append(obj.data)

Check warning on line 411 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L411

Added line #L411 was not covered by tests
else:
out_append(_Tag.REF)
out_append(out_pos)
Expand Down Expand Up @@ -432,13 +446,13 @@ def _remote_exec_chain(num_returns: int, *args: Tuple) -> List[Any]:
list
The execution results. The last element of this list is the ``MetaList``.
"""
# Prefer _remote_exec_single_chain(). It has fewer arguments and
# does not require the num_returns to be specified in options.
# Prefer _remote_exec_single_chain(). It does not require the num_returns
# to be specified in options.
if num_returns == 2:
return _remote_exec_single_chain.remote(*args)
else:
return _remote_exec_multi_chain.options(num_returns=num_returns).remote(
num_returns, *args
*args
)

def _set_result(
Expand All @@ -456,7 +470,7 @@ def _set_result(
meta : MetaList
meta_offset : int or list of int
"""
del self.func, self.args, self.kwargs, self.flat_args, self.flat_kwargs
del self.func, self.args, self.kwargs

Check warning on line 473 in modin/core/execution/ray/common/deferred_execution.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/common/deferred_execution.py#L473

Added line #L473 was not covered by tests
self.data = result
self.meta = meta
self.meta_offset = meta_offset
Expand Down Expand Up @@ -564,7 +578,7 @@ def exec_func(fn: Callable, obj: Any, args: Tuple, kwargs: Dict) -> Any:
raise err

@classmethod
def construct(cls, num_returns: int, args: Tuple): # pragma: no cover
def construct(cls, args: Tuple): # pragma: no cover
"""
Construct and execute the specified chain.
Expand All @@ -574,7 +588,6 @@ def construct(cls, num_returns: int, args: Tuple): # pragma: no cover
Parameters
----------
num_returns : int
args : tuple
Yields
Expand Down Expand Up @@ -646,7 +659,7 @@ def construct_chain(

while chain:
fn = pop()
if fn == tg_e:
if fn is tg_e:
lst.append(obj)
break

Expand Down Expand Up @@ -676,10 +689,10 @@ def construct_chain(

itr = iter([obj] if num_returns == 1 else obj)
for _ in range(num_returns):
obj = next(itr)
meta.append(len(obj) if hasattr(obj, "__len__") else 0)
meta.append(len(obj.columns) if hasattr(obj, "columns") else 0)
yield obj
o = next(itr)
meta.append(len(o) if hasattr(o, "__len__") else 0)
meta.append(len(o.columns) if hasattr(o, "columns") else 0)
yield o

@classmethod
def construct_list(
Expand Down Expand Up @@ -793,20 +806,18 @@ def _remote_exec_single_chain(
-------
Generator
"""
return remote_executor.construct(num_returns=2, args=args)
return remote_executor.construct(args=args)


@ray.remote
def _remote_exec_multi_chain(
num_returns: int, *args: Tuple, remote_executor=_REMOTE_EXEC
*args: Tuple, remote_executor=_REMOTE_EXEC
) -> Generator: # pragma: no cover
"""
Execute the deconstructed chain with a multiple return values in a worker process.
Parameters
----------
num_returns : int
The number of return values.
*args : tuple
A deconstructed chain to be executed.
remote_executor : _RemoteExecutor, default: _REMOTE_EXEC
Expand All @@ -816,4 +827,4 @@ def _remote_exec_multi_chain(
-------
Generator
"""
return remote_executor.construct(num_returns, args)
return remote_executor.construct(args)
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ def add_to_apply_calls(
def drain_call_queue(self):
data = self._data_ref
if not isinstance(data, DeferredExecution):
return data
return

Check warning on line 151 in modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py

View check run for this annotation

Codecov / codecov/patch

modin/core/execution/ray/implementations/pandas_on_ray/partitioning/partition.py#L151

Added line #L151 was not covered by tests

log = get_logger()
self._is_debug(log) and log.debug(
Expand Down
Loading

0 comments on commit ea540cc

Please sign in to comment.