From 5ec634fb853c52e8ffaf73c4900b16645b626013 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 23 Sep 2021 19:02:06 -0700 Subject: [PATCH 1/8] wip --- doc/source/data/dataset-tensor-support.rst | 2 +- python/ray/data/block.py | 24 +++--- python/ray/data/dataset.py | 45 ++++++----- python/ray/data/datasource/datasource.py | 16 +++- .../data/datasource/file_based_datasource.py | 3 +- .../ray/data/datasource/numpy_datasource.py | 13 ++- .../ray/data/extensions/tensor_extension.py | 4 + python/ray/data/impl/arrow_block.py | 20 +++-- python/ray/data/impl/simple_block.py | 4 +- python/ray/data/impl/tensor_block.py | 80 ------------------- python/ray/data/read_api.py | 11 ++- python/ray/data/tests/test_dataset.py | 60 +++++--------- 12 files changed, 112 insertions(+), 170 deletions(-) delete mode 100644 python/ray/data/impl/tensor_block.py diff --git a/doc/source/data/dataset-tensor-support.rst b/doc/source/data/dataset-tensor-support.rst index b8a4ad68eed4e..d11a10dd3c06c 100644 --- a/doc/source/data/dataset-tensor-support.rst +++ b/doc/source/data/dataset-tensor-support.rst @@ -150,7 +150,7 @@ Now that the tensor column is properly typed and in a ``Dataset``, we can perfor # Arrow and Pandas is now aware of this tensor column, so we can do the # typical DataFrame operations on this column. - ds = ds.map_batches(lambda x: 2 * (x + 1), format="pandas") + ds = ds.map_batches(lambda x: 2 * (x + 1), batch_format="pandas") # -> Map Progress: 100%|████████████████████| 200/200 [00:00<00:00, 1123.54it/s] print(ds) # -> Dataset( diff --git a/python/ray/data/block.py b/python/ray/data/block.py index 35b99780c5e0d..e7edab74863ad 100644 --- a/python/ray/data/block.py +++ b/python/ray/data/block.py @@ -16,8 +16,8 @@ # Represents a batch of records to be stored in the Ray object store. # # Block data can be accessed in a uniform way via ``BlockAccessors`` such as -# ``SimpleBlockAccessor``, ``ArrowBlockAccessor``, and ``TensorBlockAccessor``. -Block = Union[List[T], np.ndarray, "pyarrow.Table", bytes] +# ``SimpleBlockAccessor`` and ``ArrowBlockAccessor``. +Block = Union[List[T], "pyarrow.Table", bytes] @DeveloperAPI @@ -52,8 +52,8 @@ class BlockAccessor(Generic[T]): as a top-level Ray object, without a wrapping class (issue #17186). There are three types of block accessors: ``SimpleBlockAccessor``, which - operates over a plain Python list, ``ArrowBlockAccessor``, for - ``pyarrow.Table`` type blocks, and ``TensorBlockAccessor``, for tensors. + operates over a plain Python list, and ``ArrowBlockAccessor`` for + ``pyarrow.Table`` type blocks. """ def num_rows(self) -> int: @@ -85,12 +85,16 @@ def to_pandas(self) -> "pandas.DataFrame": """Convert this block into a Pandas dataframe.""" raise NotImplementedError - def to_numpy(self) -> np.ndarray: - """Convert this block into a NumPy ndarray.""" + def to_numpy(self, column: str = None) -> np.ndarray: + """Convert this block (or column of block) into a NumPy ndarray. + + Args: + column: Name of column to convert, or None. + """ raise NotImplementedError - def to_arrow(self) -> Union["pyarrow.Table", "pyarrow.Tensor"]: - """Convert this block into an Arrow table or tensor.""" + def to_arrow(self) -> "pyarrow.Table": + """Convert this block into an Arrow table.""" raise NotImplementedError def size_bytes(self) -> int: @@ -136,10 +140,6 @@ def for_block(block: Block) -> "BlockAccessor[T]": from ray.data.impl.simple_block import \ SimpleBlockAccessor return SimpleBlockAccessor(block) - elif isinstance(block, np.ndarray): - from ray.data.impl.tensor_block import \ - TensorBlockAccessor - return TensorBlockAccessor(block) else: raise TypeError("Not a block type: {}".format(block)) diff --git a/python/ray/data/dataset.py b/python/ray/data/dataset.py index 11d0a13c9cbae..3042a01b24f3a 100644 --- a/python/ray/data/dataset.py +++ b/python/ray/data/dataset.py @@ -51,8 +51,7 @@ class Dataset(Generic[T]): Datasets are implemented as a list of ``ObjectRef[Block]``. The block also determines the unit of parallelism. The default block type is the - ``pyarrow.Table``. Tensor objects are held in ``np.ndarray`` blocks, - and other Arrow-incompatible objects are held in ``list`` blocks. + ``pyarrow.Table``. Arrow-incompatible objects are held in ``list`` blocks. Since Datasets are just lists of Ray object refs, they can be passed between Ray tasks and actors just like any other object. Datasets support @@ -169,7 +168,7 @@ def map_batches(self, tasks, or "actors" to use an autoscaling Ray actor pool. batch_format: Specify "native" to use the native block format, "pandas" to select ``pandas.DataFrame`` as the batch format, - or "pyarrow" to select ``pyarrow.Table/Tensor``. + or "pyarrow" to select ``pyarrow.Table``. ray_remote_args: Additional resource requirements to request from ray (e.g., num_gpus=1 to request GPUs for the map tasks). """ @@ -205,19 +204,15 @@ def transform(block: Block) -> Block: "or 'pyarrow', got: {}".format(batch_format)) applied = fn(view) - if (isinstance(applied, list) or isinstance(applied, pa.Table) - or isinstance(applied, np.ndarray)): + if isinstance(applied, list) or isinstance(applied, pa.Table): applied = applied elif isinstance(applied, pd.core.frame.DataFrame): applied = pa.Table.from_pandas(applied) - elif isinstance(applied, pa.Tensor): - applied = applied.to_numpy() else: raise ValueError("The map batches UDF returned a type " f"{type(applied)}, which is not allowed. " "The return type must be either list, " - "pandas.DataFrame, np.ndarray, " - "pyarrow.Tensor, or pyarrow.Table") + "pandas.DataFrame, or pyarrow.Table") builder.add_block(applied) return builder.build() @@ -947,11 +942,13 @@ def write_numpy( self, path: str, *, + column: str = "value", filesystem: Optional["pyarrow.fs.FileSystem"] = None) -> None: - """Write the dataset to npy files. + """Write a tensor column of the dataset to npy files. - This is only supported for datasets of Tensor records. - To control the number of files, use ``.repartition()``. + This is only supported for datasets convertible to Arrow records that + contain a TensorArray column. To control the number of files, use + ``.repartition()``. The format of the output files will be {self._uuid}_{block_idx}.npy, where ``uuid`` is an unique id for the dataset. @@ -964,12 +961,15 @@ def write_numpy( Args: path: The path to the destination root directory, where npy files will be written to. + column: The name of the table column that contains the tensor to + be written. This defaults to "value". filesystem: The filesystem implementation to write to. """ self.write_datasource( NumpyDatasource(), path=path, dataset_uuid=self._uuid, + column=column, filesystem=filesystem) def write_datasource(self, datasource: Datasource[T], @@ -1042,7 +1042,7 @@ def iter_batches(self, batch_format: The format in which to return each batch. Specify "native" to use the current block format, "pandas" to select ``pandas.DataFrame`` or "pyarrow" to select - ``pyarrow.Table/Tensor``. Default is "native". + ``pyarrow.Table``. Default is "native". drop_last: Whether to drop the last batch if it's incomplete. Returns: @@ -1364,7 +1364,8 @@ def to_pandas(self) -> List[ObjectRef["pandas.DataFrame"]]: block_to_df = cached_remote_fn(_block_to_df) return [block_to_df.remote(block) for block in self._blocks] - def to_numpy(self) -> List[ObjectRef[np.ndarray]]: + def to_numpy(self, *, + column: Optional[str] = None) -> List[ObjectRef[np.ndarray]]: """Convert this dataset into a distributed set of NumPy ndarrays. This is only supported for datasets convertible to NumPy ndarrays. @@ -1373,12 +1374,19 @@ def to_numpy(self) -> List[ObjectRef[np.ndarray]]: Time complexity: O(dataset size / parallelism) + Args: + column: The name of the column to convert to numpy, or None to + specify the entire row. Required for Arrow tables. + Returns: A list of remote NumPy ndarrays created from this dataset. """ block_to_ndarray = cached_remote_fn(_block_to_ndarray) - return [block_to_ndarray.remote(block) for block in self._blocks] + return [ + block_to_ndarray.remote(block, column=column) + for block in self._blocks + ] def to_arrow(self) -> List[ObjectRef["pyarrow.Table"]]: """Convert this dataset into a distributed set of Arrow tables. @@ -1585,9 +1593,6 @@ def __repr__(self) -> str: schema = self.schema() if schema is None: schema_str = "Unknown schema" - elif isinstance(schema, dict): - schema_str = "".format( - schema["shape"], schema["dtype"]) elif isinstance(schema, type): schema_str = str(schema) else: @@ -1640,9 +1645,9 @@ def _block_to_df(block: Block): return block.to_pandas() -def _block_to_ndarray(block: Block): +def _block_to_ndarray(block: Block, column: Optional[str]): block = BlockAccessor.for_block(block) - return block.to_numpy() + return block.to_numpy(column) def _block_to_arrow(block: Block): diff --git a/python/ray/data/datasource/datasource.py b/python/ray/data/datasource/datasource.py index 46b313ab3bfd0..b45b3ab3930b2 100644 --- a/python/ray/data/datasource/datasource.py +++ b/python/ray/data/datasource/datasource.py @@ -130,10 +130,11 @@ def make_block(start: int, count: int) -> Block: return pyarrow.Table.from_arrays( [np.arange(start, start + count)], names=["value"]) elif block_format == "tensor": - return np.ones( - tensor_shape, dtype=np.int64) * np.expand_dims( + tensor = TensorArray( + np.ones(tensor_shape, dtype=np.int64) * np.expand_dims( np.arange(start, start + count), - tuple(range(1, 1 + len(tensor_shape)))) + tuple(range(1, 1 + len(tensor_shape))))) + return pyarrow.Table.from_pydict({"value": tensor}) else: return list(builtins.range(start, start + count)) @@ -145,7 +146,14 @@ def make_block(start: int, count: int) -> Block: import pyarrow schema = pyarrow.Table.from_pydict({"value": [0]}).schema elif block_format == "tensor": - schema = {"dtype": "int64", "shape": (None, ) + tensor_shape} + _check_pyarrow_version() + from ray.data.extensions import TensorArray + import pyarrow + tensor = TensorArray( + np.ones(tensor_shape, dtype=np.int64) * np.expand_dims( + np.arange(0, 10), tuple( + range(1, 1 + len(tensor_shape))))) + schema = pyarrow.Table.from_pydict({"value": tensor}).schema elif block_format == "list": schema = int else: diff --git a/python/ray/data/datasource/file_based_datasource.py b/python/ray/data/datasource/file_based_datasource.py index 9a326ebdcf62d..529dd343bc189 100644 --- a/python/ray/data/datasource/file_based_datasource.py +++ b/python/ray/data/datasource/file_based_datasource.py @@ -133,7 +133,8 @@ def write_block(write_path: str, block: Block): if _block_udf is not None: block = _block_udf(block) with fs.open_output_stream(write_path) as f: - _write_block_to_file(f, BlockAccessor.for_block(block)) + _write_block_to_file(f, BlockAccessor.for_block(block), + **write_args) write_block = cached_remote_fn(write_block) diff --git a/python/ray/data/datasource/numpy_datasource.py b/python/ray/data/datasource/numpy_datasource.py index 08bc7f2c0916e..caaaecbab4cc7 100644 --- a/python/ray/data/datasource/numpy_datasource.py +++ b/python/ray/data/datasource/numpy_datasource.py @@ -7,7 +7,8 @@ import pyarrow from ray.data.block import BlockAccessor -from ray.data.datasource.file_based_datasource import (FileBasedDatasource) +from ray.data.datasource.file_based_datasource import FileBasedDatasource +from ray.data.extensions import TensorArray class NumpyDatasource(FileBasedDatasource): @@ -21,17 +22,21 @@ class NumpyDatasource(FileBasedDatasource): """ def _read_file(self, f: "pyarrow.NativeFile", path: str, **reader_args): + import pyarrow as pa # TODO(ekl) Ideally numpy can read directly from the file, but it # seems like it requires the file to be seekable. buf = BytesIO() data = f.readall() buf.write(data) buf.seek(0) - return np.load(buf) + return pa.Table.from_pydict({ + "value": TensorArray(np.load(buf, allow_pickle=True)) + }) def _write_block(self, f: "pyarrow.NativeFile", block: BlockAccessor, - **writer_args): - np.save(f, block.to_arrow()) + column: str, **writer_args): + value = block.to_numpy(column) + np.save(f, value) def _file_format(self): return "npy" diff --git a/python/ray/data/extensions/tensor_extension.py b/python/ray/data/extensions/tensor_extension.py index 3c80fed64242f..579d7ef03cd6b 100644 --- a/python/ray/data/extensions/tensor_extension.py +++ b/python/ray/data/extensions/tensor_extension.py @@ -1155,6 +1155,10 @@ def __arrow_ext_class__(self): """ return ArrowTensorArray + def __str__(self): + return "".format( + self.shape, self.storage_type.value_type) + @PublicAPI(stability="beta") class ArrowTensorArray(pa.ExtensionArray): diff --git a/python/ray/data/impl/arrow_block.py b/python/ray/data/impl/arrow_block.py index 41c5875bb6c16..a9d0634930a49 100644 --- a/python/ray/data/impl/arrow_block.py +++ b/python/ray/data/impl/arrow_block.py @@ -13,7 +13,6 @@ from ray.data.block import Block, BlockAccessor, BlockMetadata from ray.data.impl.block_builder import BlockBuilder from ray.data.impl.simple_block import SimpleBlockBuilder -from ray.data.impl.tensor_block import TensorBlockBuilder if TYPE_CHECKING: import pandas @@ -78,8 +77,6 @@ def add(self, item: Any) -> None: self._builder = ArrowBlockBuilder() except (TypeError, pyarrow.lib.ArrowInvalid): self._builder = SimpleBlockBuilder() - elif isinstance(item, np.ndarray): - self._builder = TensorBlockBuilder() else: self._builder = SimpleBlockBuilder() self._builder.add(item) @@ -188,8 +185,21 @@ def schema(self) -> "pyarrow.lib.Schema": def to_pandas(self) -> "pandas.DataFrame": return self._table.to_pandas() - def to_numpy(self) -> np.ndarray: - return np.array(self._table) + def to_numpy(self, column: str = None) -> np.ndarray: + if not column: + raise ValueError( + "`column` must be specified when calling .to_numpy() " + "on Arrow blocks.") + if column not in self._table.column_names: + raise ValueError( + "Cannot find column {}, available columns: {}".format( + column, self._table.column_names)) + array = self._table[column] + if array.num_chunks > 1: + # TODO(ekl) combine fails since we can't concat ArrowTensorType? + array = array.combine_chunks() + assert array.num_chunks == 1, array + return self._table[column].chunk(0).to_numpy() def to_arrow(self) -> "pyarrow.Table": return self._table diff --git a/python/ray/data/impl/simple_block.py b/python/ray/data/impl/simple_block.py index ba20d1334b06b..f609c65bd28b8 100644 --- a/python/ray/data/impl/simple_block.py +++ b/python/ray/data/impl/simple_block.py @@ -58,7 +58,9 @@ def to_pandas(self) -> "pandas.DataFrame": import pandas return pandas.DataFrame(self._items) - def to_numpy(self) -> np.ndarray: + def to_numpy(self, column: str = None) -> np.ndarray: + if column: + raise ValueError("`column` arg not supported for list block") return np.array(self._items) def to_arrow(self) -> "pyarrow.Table": diff --git a/python/ray/data/impl/tensor_block.py b/python/ray/data/impl/tensor_block.py deleted file mode 100644 index 3ad8d8afad71b..0000000000000 --- a/python/ray/data/impl/tensor_block.py +++ /dev/null @@ -1,80 +0,0 @@ -from typing import Iterator, List, TypeVar, Dict, TYPE_CHECKING - -import numpy as np - -if TYPE_CHECKING: - import pandas - import pyarrow - -from ray.data.block import Block, BlockAccessor -from ray.data.impl.block_builder import BlockBuilder - -T = TypeVar("T") - - -# TODO(ekl) switch to pyarrow.Tensor as the block type; currently there is a -# serialization issue with pyarrow tensors. -class TensorBlockBuilder(BlockBuilder[T]): - def __init__(self): - self._rows = [] - self._tensors: List[np.ndarray] = [] - self._num_rows = 0 - - def add(self, row: np.ndarray) -> None: - self._rows.append(row) - self._num_rows += 1 - - def add_block(self, block: np.ndarray) -> None: - assert isinstance(block, np.ndarray), block - self._tensors.append(block) - self._num_rows += len(block) - - def build(self) -> Block: - tensors = self._tensors.copy() - if self._rows: - tensors.append(np.stack(self._rows, axis=0)) - return np.concatenate(tensors, axis=0) - - def num_rows(self) -> int: - return self._num_rows - - -class TensorBlockAccessor(BlockAccessor): - def __init__(self, tensor: np.ndarray): - self._tensor = tensor - - def iter_rows(self) -> Iterator[np.ndarray]: - return iter(self._tensor) - - def slice(self, start: int, end: int, - copy: bool) -> "TensorBlockAccessor[T]": - view = self._tensor[start:end] - if copy: - view = view.copy() - return view - - def to_pandas(self) -> "pandas.DataFrame": - import pandas - return pandas.DataFrame(self._tensor) - - def to_numpy(self) -> np.ndarray: - return self._tensor - - def to_arrow(self) -> "pyarrow.Tensor": - import pyarrow - return pyarrow.Tensor.from_numpy(self._tensor) - - def schema(self) -> Dict: - shape = self._tensor.shape - shape = (None, ) + shape[1:] - return {"shape": shape, "dtype": self._tensor.dtype.name} - - def num_rows(self) -> int: - return len(self._tensor) - - def size_bytes(self) -> int: - return self._tensor.nbytes - - @staticmethod - def builder() -> TensorBlockBuilder[T]: - return TensorBlockBuilder() diff --git a/python/ray/data/read_api.py b/python/ray/data/read_api.py index 887e08baa1495..aa4e91159f5aa 100644 --- a/python/ray/data/read_api.py +++ b/python/ray/data/read_api.py @@ -392,7 +392,7 @@ def read_numpy(paths: Union[str, List[str]], *, filesystem: Optional["pyarrow.fs.FileSystem"] = None, parallelism: int = 200, - **numpy_load_args) -> Dataset[np.ndarray]: + **numpy_load_args) -> Dataset[ArrowRow]: """Create an Arrow dataset from csv files. Examples: @@ -529,7 +529,7 @@ def from_pandas(dfs: List[ObjectRef["pandas.DataFrame"]]) -> Dataset[ArrowRow]: return Dataset(BlockList(blocks, ray.get(list(metadata)))) -def from_numpy(ndarrays: List[ObjectRef[np.ndarray]]) -> Dataset[np.ndarray]: +def from_numpy(ndarrays: List[ObjectRef[np.ndarray]]) -> Dataset[ArrowRow]: """Create a dataset from a set of NumPy ndarrays. Args: @@ -590,8 +590,11 @@ def _df_to_block(df: "pandas.DataFrame") -> Block[ArrowRow]: def _ndarray_to_block(ndarray: np.ndarray) -> Block[np.ndarray]: - return (ndarray, - BlockAccessor.for_block(ndarray).get_metadata(input_files=None)) + import pyarrow as pa + from ray.data.extensions import TensorArray + table = pa.Table.from_pydict({"value": TensorArray(ndarray)}) + return (table, + BlockAccessor.for_block(table).get_metadata(input_files=None)) def _get_schema(block: Block) -> Any: diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 7562e2c5a7105..1f2ab60a3d1a7 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -195,30 +195,15 @@ def test_batch_tensors(ray_start_regular_shared): def test_tensors(ray_start_regular_shared): # Create directly. ds = ray.data.range_tensor(5, shape=(3, 5)) - assert str(ds) == ("Dataset(num_blocks=5, num_rows=5, " - "schema=)") - - # Transform. - ds = ds.map_batches(lambda t: np.expand_dims(t, 3)) - assert str(ds) == ("Dataset(num_blocks=5, num_rows=5, " - "schema=)") + assert str(ds) == ( + "Dataset(num_blocks=5, num_rows=5, " + "schema={value: })") # Pandas conversion. res = ray.data.range_tensor(10).map_batches( lambda t: t + 2, batch_format="pandas").take(2) - assert str(res) == "[ArrowRow({'0': 2}), ArrowRow({'0': 3})]", res - - # From other formats. - ds = ray.data.range(10).map_batches(lambda x: np.array(x)) - assert str(ds) == ("Dataset(num_blocks=10, num_rows=10, " - "schema=)") - ds = ray.data.range(10).map(lambda x: np.array(x)) - assert str(ds) == ("Dataset(num_blocks=10, num_rows=10, " - "schema=)") - ds = ray.data.from_items([np.zeros(shape=(2, 2, 2)) for _ in range(4)]) - assert str(ds) == ( - "Dataset(num_blocks=4, num_rows=4, " - "schema=)"), ds + assert str(res) == \ + "[ArrowRow({'value': array([2])}), ArrowRow({'value': array([3])})]" def test_tensor_array_ops(ray_start_regular_shared): @@ -639,13 +624,11 @@ def test_numpy_roundtrip(ray_start_regular_shared, fs, data_path): ds = ray.data.range_tensor(10, parallelism=2) ds.write_numpy(data_path, filesystem=fs) ds = ray.data.read_numpy(data_path, filesystem=fs) - assert str(ds) == ("Dataset(num_blocks=2, num_rows=?, " - "schema=)") - - assert str( - ds.take()) == ("[array([0]), array([1]), array([2]), " - "array([3]), array([4]), array([5]), array([6]), " - "array([7]), array([8]), array([9])]"), ds.take() + assert str(ds) == ( + "Dataset(num_blocks=2, num_rows=?, " + "schema={value: })") + assert str(ds.take(2)) == \ + "[ArrowRow({'value': array([0])}), ArrowRow({'value': array([1])})]" def test_numpy_read(ray_start_regular_shared, tmp_path): @@ -654,13 +637,11 @@ def test_numpy_read(ray_start_regular_shared, tmp_path): np.save( os.path.join(path, "test.npy"), np.expand_dims(np.arange(0, 10), 1)) ds = ray.data.read_numpy(path) - assert str(ds) == ("Dataset(num_blocks=1, num_rows=?, " - "schema=)") - - assert str( - ds.take()) == ("[array([0]), array([1]), array([2]), " - "array([3]), array([4]), array([5]), array([6]), " - "array([7]), array([8]), array([9])]"), ds.take() + assert str(ds) == ( + "Dataset(num_blocks=1, num_rows=?, " + "schema={value: })") + assert str(ds.take(2)) == \ + "[ArrowRow({'value': array([0])}), ArrowRow({'value': array([1])})]" @pytest.mark.parametrize("fs,data_path,endpoint_url", [ @@ -845,7 +826,10 @@ def test_from_numpy(ray_start_regular_shared): arr2 = np.expand_dims(np.arange(4, 8), 1) ds = ray.data.from_numpy([ray.put(arr1), ray.put(arr2)]) values = np.array(ds.take(8)) - np.testing.assert_equal(np.concatenate((arr1, arr2)), values) + for i in range(4): + assert values[i]["value"] == arr1[i] + for i in range(4, 8): + assert values[i]["value"] == arr2[i - 4] def test_from_arrow(ray_start_regular_shared): @@ -871,13 +855,13 @@ def test_to_pandas(ray_start_regular_shared): def test_to_numpy(ray_start_regular_shared): # Tensor Dataset ds = ray.data.range_tensor(10, parallelism=2) - arr = np.concatenate(ray.get(ds.to_numpy())) + arr = np.concatenate(ray.get(ds.to_numpy(column="value"))) np.testing.assert_equal(arr, np.expand_dims(np.arange(0, 10), 1)) # Table Dataset ds = ray.data.range_arrow(10) - arr = np.concatenate(ray.get(ds.to_numpy())) - np.testing.assert_equal(arr, np.expand_dims(np.arange(0, 10), 1)) + arr = np.concatenate(ray.get(ds.to_numpy(column="value"))) + np.testing.assert_equal(arr, np.arange(0, 10)) # Simple Dataset ds = ray.data.range(10) From e280e03c2aa2a907f180e9bfe05952cba9eca951 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 23 Sep 2021 19:35:17 -0700 Subject: [PATCH 2/8] update --- doc/source/data/dataset-tensor-support.rst | 62 ++++++---------------- doc/source/data/dataset.rst | 2 +- 2 files changed, 16 insertions(+), 48 deletions(-) diff --git a/doc/source/data/dataset-tensor-support.rst b/doc/source/data/dataset-tensor-support.rst index d11a10dd3c06c..20d751e77eaec 100644 --- a/doc/source/data/dataset-tensor-support.rst +++ b/doc/source/data/dataset-tensor-support.rst @@ -3,66 +3,34 @@ Dataset Tensor Support ====================== -Tensor-typed values -------------------- +Tables with tensor columns +-------------------------- + +Datasets supports tables with fixed-shape tensor columns, where each element in the column is a tensor (n-dimensional array) with the same shape. As an example, this allows you to use both Pandas and Ray Datasets to read, write, and manipulate a table with a column of e.g. images (2D arrays), with all conversions between Pandas, Arrow, and Parquet, and all application of aggregations/operations to the underlying image ndarrays, being taken care of by Ray Datasets. + +With our Pandas extension type, :class:`TensorDtype `, and extension array, :class:`TensorArray `, you can do familiar aggregations and arithmetic, comparison, and logical operations on a DataFrame containing a tensor column and the operations will be applied to the underlying tensors as expected. With our Arrow extension type, :class:`ArrowTensorType `, and extension array, :class:`ArrowTensorArray `, you'll be able to import that DataFrame into Ray Datasets and read/write the data from/to the Parquet format. + +Automatic conversion between the Pandas and Arrow extension types/arrays keeps the details under-the-hood, so you only have to worry about casting the column to a tensor column using our Pandas extension type when first ingesting the table into a ``Dataset``, whether from storage or in-memory. All table operations downstream from that cast should work automatically. -Datasets support tensor-typed values, which are represented in-memory as Arrow tensors (i.e., np.ndarray format). Tensor datasets can be read from and written to ``.npy`` files. Here are some examples: +Single-column tensor datasets +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The most basic case is when a dataset only has a single column, which is of tensor type. This kind of dataset can be created with ``.range_tensor()``, and can be read from and written to ``.npy`` files. Here are some examples: .. code-block:: python # Create a Dataset of tensor-typed values. ds = ray.data.range_tensor(10000, shape=(3, 5)) # -> Dataset(num_blocks=200, num_rows=10000, - # schema=) - - ds.map_batches(lambda t: t + 2).show(2) - # -> [[2 2 2 2 2] - # [2 2 2 2 2] - # [2 2 2 2 2]] - # [[3 3 3 3 3] - # [3 3 3 3 3] - # [3 3 3 3 3]] + # schema={value: }) # Save to storage. - ds.write_numpy("/tmp/tensor_out") + ds.write_numpy("/tmp/tensor_out", column="value") # Read from storage. ray.data.read_numpy("/tmp/tensor_out") # -> Dataset(num_blocks=200, num_rows=?, - # schema=) - -Tensor datasets are also created whenever an array type is returned from a map function: - -.. code-block:: python - - # Create a dataset of Python integers. - ds = ray.data.range(10) - # -> Dataset(num_blocks=10, num_rows=10, schema=) - - # It is now converted into a Tensor dataset. - ds = ds.map_batches(lambda x: np.array(x)) - # -> Dataset(num_blocks=10, num_rows=10, - # schema=) - -Tensor datasets can also be created from NumPy ndarrays that are already stored in the Ray object store: - -.. code-block:: python - - import numpy as np - - # Create a Dataset from a list of NumPy ndarray objects. - arr1 = np.arange(0, 10) - arr2 = np.arange(10, 20) - ds = ray.data.from_numpy([ray.put(arr1), ray.put(arr2)]) - -Tables with tensor columns --------------------------- - -In addition to tensor datasets, Datasets also supports tables with fixed-shape tensor columns, where each element in the column is a tensor (n-dimensional array) with the same shape. As an example, this allows you to use both Pandas and Ray Datasets to read, write, and manipulate a table with a column of e.g. images (2D arrays), with all conversions between Pandas, Arrow, and Parquet, and all application of aggregations/operations to the underlying image ndarrays, being taken care of by Ray Datasets. - -With our Pandas extension type, :class:`TensorDtype `, and extension array, :class:`TensorArray `, you can do familiar aggregations and arithmetic, comparison, and logical operations on a DataFrame containing a tensor column and the operations will be applied to the underlying tensors as expected. With our Arrow extension type, :class:`ArrowTensorType `, and extension array, :class:`ArrowTensorArray `, you'll be able to import that DataFrame into Ray Datasets and read/write the data from/to the Parquet format. - -Automatic conversion between the Pandas and Arrow extension types/arrays keeps the details under-the-hood, so you only have to worry about casting the column to a tensor column using our Pandas extension type when first ingesting the table into a ``Dataset``, whether from storage or in-memory. All table operations downstream from that cast should work automatically. + # schema={value: }) Reading existing serialized tensor columns ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ diff --git a/doc/source/data/dataset.rst b/doc/source/data/dataset.rst index 7142691e5df45..0cae089ce9d67 100644 --- a/doc/source/data/dataset.rst +++ b/doc/source/data/dataset.rst @@ -16,7 +16,7 @@ Ray Datasets are the standard way to load and exchange data in Ray libraries and Concepts -------- -Ray Datasets implement `Distributed Arrow `__. A Dataset consists of a list of Ray object references to *blocks*. Each block holds a set of items in either an `Arrow table `__, `Arrow tensor `__, or a Python list (for Arrow incompatible objects). Having multiple blocks in a dataset allows for parallel transformation and ingest of the data. +Ray Datasets implement `Distributed Arrow `__. A Dataset consists of a list of Ray object references to *blocks*. Each block holds a set of items in either an `Arrow table `__ or a Python list (for Arrow incompatible objects). Having multiple blocks in a dataset allows for parallel transformation and ingest of the data. The following figure visualizes a Dataset that has three Arrow table blocks, each block holding 1000 rows each: From 0a9200e16b9bfc6861af0090e73d1ab57bf0d34e Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 23 Sep 2021 19:36:39 -0700 Subject: [PATCH 3/8] wip --- python/ray/data/tests/test_dataset.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 1f2ab60a3d1a7..6ae28ca2c46de 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -663,7 +663,8 @@ def test_numpy_write(ray_start_regular_shared, fs, data_path, endpoint_url): s3 = S3FileSystem(client_kwargs={"endpoint_url": endpoint_url}) arr1 = np.load(s3.open(file_path1)) arr2 = np.load(s3.open(file_path2)) - np.testing.assert_equal(np.concatenate((arr1, arr2)), ds.take()) + assert ds.count() == 10 + assert str(ds.take(1)) == "[ArrowRow({'value': array([0])})]" def test_read_text(ray_start_regular_shared, tmp_path): From 7eb0f2d4f1c2dfb408e95ddf2f095c75917a7bcc Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 23 Sep 2021 21:22:12 -0700 Subject: [PATCH 4/8] fix --- python/ray/data/tests/test_dataset.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 6ae28ca2c46de..7f4b7ae3bc134 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -655,14 +655,6 @@ def test_numpy_write(ray_start_regular_shared, fs, data_path, endpoint_url): ds.write_numpy(data_path, filesystem=fs) file_path1 = os.path.join(data_path, "data_000000.npy") file_path2 = os.path.join(data_path, "data_000001.npy") - if endpoint_url is None: - arr1 = np.load(file_path1) - arr2 = np.load(file_path2) - else: - from s3fs.core import S3FileSystem - s3 = S3FileSystem(client_kwargs={"endpoint_url": endpoint_url}) - arr1 = np.load(s3.open(file_path1)) - arr2 = np.load(s3.open(file_path2)) assert ds.count() == 10 assert str(ds.take(1)) == "[ArrowRow({'value': array([0])})]" From 5fb2e813ffa4d531d4ff7ba71ff37ec44cd7a697 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 23 Sep 2021 21:22:59 -0700 Subject: [PATCH 5/8] update --- python/ray/data/tests/test_dataset.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index 7f4b7ae3bc134..f2a1565a2ae6a 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -655,7 +655,17 @@ def test_numpy_write(ray_start_regular_shared, fs, data_path, endpoint_url): ds.write_numpy(data_path, filesystem=fs) file_path1 = os.path.join(data_path, "data_000000.npy") file_path2 = os.path.join(data_path, "data_000001.npy") + if endpoint_url is None: + arr1 = np.load(file_path1) + arr2 = np.load(file_path2) + else: + from s3fs.core import S3FileSystem + s3 = S3FileSystem(client_kwargs={"endpoint_url": endpoint_url}) + arr1 = np.load(s3.open(file_path1)) + arr2 = np.load(s3.open(file_path2)) assert ds.count() == 10 + assert len(arr1) == 5 + assert len(arr2) == 5 assert str(ds.take(1)) == "[ArrowRow({'value': array([0])})]" From befd773501fe2b33fb12836dacb3b38b31c1f545 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 23 Sep 2021 21:24:00 -0700 Subject: [PATCH 6/8] fix --- python/ray/data/tests/test_dataset.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/ray/data/tests/test_dataset.py b/python/ray/data/tests/test_dataset.py index f2a1565a2ae6a..c25f31cd16e59 100644 --- a/python/ray/data/tests/test_dataset.py +++ b/python/ray/data/tests/test_dataset.py @@ -666,6 +666,8 @@ def test_numpy_write(ray_start_regular_shared, fs, data_path, endpoint_url): assert ds.count() == 10 assert len(arr1) == 5 assert len(arr2) == 5 + assert arr1.sum() == 10 + assert arr2.sum() == 35 assert str(ds.take(1)) == "[ArrowRow({'value': array([0])})]" From b06e895af8de79bf013a6e753bf5c7d36cd77c20 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Thu, 23 Sep 2021 22:01:16 -0700 Subject: [PATCH 7/8] fix --- python/ray/data/datasource/numpy_datasource.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/ray/data/datasource/numpy_datasource.py b/python/ray/data/datasource/numpy_datasource.py index caaaecbab4cc7..8ba02e9d40cc5 100644 --- a/python/ray/data/datasource/numpy_datasource.py +++ b/python/ray/data/datasource/numpy_datasource.py @@ -8,7 +8,6 @@ from ray.data.block import BlockAccessor from ray.data.datasource.file_based_datasource import FileBasedDatasource -from ray.data.extensions import TensorArray class NumpyDatasource(FileBasedDatasource): @@ -22,6 +21,7 @@ class NumpyDatasource(FileBasedDatasource): """ def _read_file(self, f: "pyarrow.NativeFile", path: str, **reader_args): + from ray.data.extensions import TensorArray import pyarrow as pa # TODO(ekl) Ideally numpy can read directly from the file, but it # seems like it requires the file to be seekable. From 62cfbc86ff0e06ab46b78e8b057db676323350a6 Mon Sep 17 00:00:00 2001 From: Eric Liang Date: Fri, 24 Sep 2021 18:01:37 -0700 Subject: [PATCH 8/8] update --- doc/source/data/dataset-tensor-support.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/source/data/dataset-tensor-support.rst b/doc/source/data/dataset-tensor-support.rst index 20d751e77eaec..c231ec4633de0 100644 --- a/doc/source/data/dataset-tensor-support.rst +++ b/doc/source/data/dataset-tensor-support.rst @@ -6,7 +6,7 @@ Dataset Tensor Support Tables with tensor columns -------------------------- -Datasets supports tables with fixed-shape tensor columns, where each element in the column is a tensor (n-dimensional array) with the same shape. As an example, this allows you to use both Pandas and Ray Datasets to read, write, and manipulate a table with a column of e.g. images (2D arrays), with all conversions between Pandas, Arrow, and Parquet, and all application of aggregations/operations to the underlying image ndarrays, being taken care of by Ray Datasets. +Datasets supports tables with fixed-shape tensor columns, where each element in the column is a tensor (n-dimensional array) with the same shape. As an example, this allows you to use Pandas and Ray Datasets to read, write, and manipulate e.g., images. All conversions between Pandas, Arrow, and Parquet, and all application of aggregations/operations to the underlying image ndarrays are taken care of by Ray Datasets. With our Pandas extension type, :class:`TensorDtype `, and extension array, :class:`TensorArray `, you can do familiar aggregations and arithmetic, comparison, and logical operations on a DataFrame containing a tensor column and the operations will be applied to the underlying tensors as expected. With our Arrow extension type, :class:`ArrowTensorType `, and extension array, :class:`ArrowTensorArray `, you'll be able to import that DataFrame into Ray Datasets and read/write the data from/to the Parquet format.