Skip to content

Commit

Permalink
[Datasets] Streaming executor fixes #5 (ray-project#32951)
Browse files Browse the repository at this point in the history
  • Loading branch information
jianoaix authored and peytondmurray committed Mar 22, 2023
1 parent 24e3b2c commit b052fc3
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 15 deletions.
19 changes: 9 additions & 10 deletions python/ray/data/_internal/lazy_block_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@
import uuid
from typing import Any, Dict, Iterable, Iterator, List, Optional, Tuple, Union

import numpy as np

import ray
from ray.data._internal.block_list import BlockList
from ray.data._internal.progress_bar import ProgressBar
from ray.data._internal.remote_fn import cached_remote_fn
from ray.data._internal.memory_tracing import trace_allocation
from ray.data._internal.stats import DatasetStats, _get_or_create_stats_actor
from ray.data._internal.util import _split_list
from ray.data.block import (
Block,
BlockAccessor,
Expand Down Expand Up @@ -162,22 +161,22 @@ def _check_if_cleared(self):
# Note: does not force execution prior to splitting.
def split(self, split_size: int) -> List["LazyBlockList"]:
num_splits = math.ceil(len(self._tasks) / split_size)
tasks = np.array_split(self._tasks, num_splits)
block_partition_refs = np.array_split(self._block_partition_refs, num_splits)
block_partition_meta_refs = np.array_split(
tasks = _split_list(self._tasks, num_splits)
block_partition_refs = _split_list(self._block_partition_refs, num_splits)
block_partition_meta_refs = _split_list(
self._block_partition_meta_refs, num_splits
)
cached_metadata = np.array_split(self._cached_metadata, num_splits)
cached_metadata = _split_list(self._cached_metadata, num_splits)
output = []
for t, b, m, c in zip(
tasks, block_partition_refs, block_partition_meta_refs, cached_metadata
):
output.append(
LazyBlockList(
t.tolist(),
b.tolist(),
m.tolist(),
c.tolist(),
t,
b,
m,
c,
owned_by_consumer=self._owned_by_consumer,
)
)
Expand Down
19 changes: 18 additions & 1 deletion python/ray/data/_internal/util.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import importlib
import logging
import os
from typing import List, Union, Optional, TYPE_CHECKING
from typing import Any, List, Union, Optional, TYPE_CHECKING
from types import ModuleType
import sys

Expand Down Expand Up @@ -380,3 +380,20 @@ def ConsumptionAPI(*args, **kwargs):
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
return _consumption_api()(args[0])
return _consumption_api(*args, **kwargs)


def _split_list(arr: List[Any], num_splits: int) -> List[List[Any]]:
"""Split the list into `num_splits` lists.
The splits will be even if the `num_splits` divides the length of list, otherwise
the remainder (suppose it's R) will be allocated to the first R splits (one for
each).
This is the same as numpy.array_split(). The reason we make this a separate
implementation is to allow the heterogeneity in the elements in the list.
"""
assert num_splits > 0
q, r = divmod(len(arr), num_splits)
splits = [
arr[i * q + min(i, r) : (i + 1) * q + min(i + 1, r)] for i in range(num_splits)
]
return splits
16 changes: 13 additions & 3 deletions python/ray/data/tests/test_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -4581,9 +4581,19 @@ def test_warning_execute_with_no_cpu(ray_start_cluster):
ds = ray.data.range(10)
ds = ds.map_batches(lambda x: x)
ds.take()
except LoggerWarningCalled:
logger_args, logger_kwargs = mock_logger.call_args
assert "Warning: The Ray cluster currently does not have " in logger_args[0]
except Exception as e:
if ray.data.context.DatasetContext.get_current().use_streaming_executor:
assert isinstance(e, ValueError)
assert "exceeds the execution limits ExecutionResources(cpu=0.0" in str(
e
)
else:
assert isinstance(e, LoggerWarningCalled)
logger_args, logger_kwargs = mock_logger.call_args
assert (
"Warning: The Ray cluster currently does not have "
in logger_args[0]
)


def test_nowarning_execute_with_cpu(ray_start_cluster_init):
Expand Down
17 changes: 16 additions & 1 deletion python/ray/data/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import ray
import numpy as np

from ray.data._internal.util import _check_pyarrow_version
from ray.data._internal.util import _check_pyarrow_version, _split_list
from ray.data._internal.memory_tracing import (
trace_allocation,
trace_deallocation,
Expand Down Expand Up @@ -72,6 +72,21 @@ def test_memory_tracing(enabled):
assert "test5" not in report, report


def test_list_splits():
with pytest.raises(AssertionError):
_split_list(list(range(5)), 0)

with pytest.raises(AssertionError):
_split_list(list(range(5)), -1)

assert _split_list(list(range(5)), 7) == [[0], [1], [2], [3], [4], [], []]
assert _split_list(list(range(5)), 2) == [[0, 1, 2], [3, 4]]
assert _split_list(list(range(6)), 2) == [[0, 1, 2], [3, 4, 5]]
assert _split_list(list(range(5)), 1) == [[0, 1, 2, 3, 4]]
assert _split_list(["foo", 1, [0], None], 2) == [["foo", 1], [[0], None]]
assert _split_list(["foo", 1, [0], None], 3) == [["foo", 1], [[0]], [None]]


if __name__ == "__main__":
import sys

Expand Down

0 comments on commit b052fc3

Please sign in to comment.