Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Datasets] Delineate between ref and raw APIs for the Pandas/Arrow integrations. #18992

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions doc/source/data/dataset-tensor-support.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ If you already have a Parquet dataset with columns containing serialized tensors

# Write the dataset to Parquet. The tensor column will be written as an
# array of opaque byte blobs.
ds = ray.data.from_pandas([ray.put(df)])
ds = ray.data.from_pandas([df])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we take *args for from_pandas()? I feel the common case is passing just a single df if not passing distributed refs, so accepting a list is a bit weird.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericl Agreed, I'm primarily trying to not step on this PR's toes!

ds.write_parquet(path)

# Read the Parquet files into a new Dataset, with the serialized tensors
Expand Down Expand Up @@ -85,7 +85,7 @@ If your serialized tensors don't fit the above constraints (e.g. they're stored

# Write the dataset to Parquet. The tensor column will be written as an
# array of opaque byte blobs.
ds = ray.data.from_pandas([ray.put(df)])
ds = ray.data.from_pandas([df])
ds.write_parquet(path)

# Manually deserialize the tensor pickle bytes and cast to our tensor
Expand Down Expand Up @@ -212,7 +212,7 @@ If working with in-memory Pandas DataFrames that you want to analyze, manipulate

# In addition to doing Pandas operations on the tensor column,
# you can now put the DataFrame directly into a Dataset.
ds = ray.data.from_pandas([ray.put(df)])
ds = ray.data.from_pandas([df])
# Internally, this column is represented with the corresponding
# Arrow tensor extension type.
print(ds.schema())
Expand All @@ -227,7 +227,7 @@ If working with in-memory Pandas DataFrames that you want to analyze, manipulate
# -> one: int64
# two: extension<arrow.py_extension_type<ArrowTensorType>>

read_df = ray.get(read_ds.to_pandas())[0]
read_df = read_ds.to_pandas()
print(read_df.dtypes)
# -> one int64
# two TensorDtype
Expand Down
2 changes: 1 addition & 1 deletion doc/source/data/dataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ Finally, you can create a ``Dataset`` from existing data in the Ray object store

# Create a Dataset from a list of Pandas DataFrame objects.
pdf = pd.DataFrame({"one": [1, 2, 3], "two": ["a", "b", "c"]})
ds = ray.data.from_pandas([ray.put(pdf)])
ds = ray.data.from_pandas([pdf])

# Create a Dataset from a Dask-on-Ray DataFrame.
dask_df = dd.from_pandas(pdf, npartitions=10)
Expand Down
2 changes: 2 additions & 0 deletions doc/source/data/package-ref.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@ Creating a Dataset
.. autofunction:: ray.data.read_datasource
.. autofunction:: ray.data.from_items
.. autofunction:: ray.data.from_arrow
.. autofunction:: ray.data.from_arrow_refs
.. autofunction:: ray.data.from_spark
.. autofunction:: ray.data.from_dask
.. autofunction:: ray.data.from_modin
.. autofunction:: ray.data.from_mars
.. autofunction:: ray.data.from_pandas
.. autofunction:: ray.data.from_pandas_refs
.. autofunction:: ray.data.from_numpy

Dataset API
Expand Down
7 changes: 5 additions & 2 deletions python/ray/data/__init__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from ray.data.read_api import from_items, range, range_arrow, \
range_tensor, read_parquet, read_json, read_csv, read_binary_files, \
from_dask, from_modin, from_mars, from_pandas, from_numpy, from_arrow, \
from_spark, read_datasource, read_numpy, read_text
from_dask, from_modin, from_mars, from_pandas, from_pandas_refs, \
from_numpy, from_arrow, from_arrow_refs, from_spark, read_datasource, \
read_numpy, read_text
from ray.data.datasource import Datasource, ReadTask
from ray.data.dataset import Dataset
from ray.data.impl.progress_bar import set_progress_bars
Expand All @@ -18,10 +19,12 @@
"from_dask",
"from_items",
"from_arrow",
"from_arrow_refs",
"from_mars",
"from_modin",
"from_numpy",
"from_pandas",
"from_pandas_refs",
"from_spark",
"range",
"range_arrow",
Expand Down
74 changes: 60 additions & 14 deletions python/ray/data/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,8 @@ def zip(self, other: "Dataset[U]") -> "Dataset[(T, U)]":
comes from the first dataset and v comes from the second.
"""

blocks1 = self.get_blocks()
blocks2 = other.get_blocks()
blocks1 = self.get_internal_block_refs()
blocks2 = other.get_internal_block_refs()

if len(blocks1) != len(blocks2):
# TODO(ekl) consider supporting if num_rows are equal.
Expand Down Expand Up @@ -1321,14 +1321,15 @@ def to_modin(self) -> "modin.DataFrame":
"""Convert this dataset into a Modin dataframe.

This works by first converting this dataset into a distributed set of
Pandas dataframes (using ``.to_pandas()``). Please see caveats there.
Then the individual dataframes are used to create the modin DataFrame
using
Pandas dataframes (using ``.to_pandas_refs()``). Please see caveats
there. Then the individual dataframes are used to create the modin
DataFrame using
``modin.distributed.dataframe.pandas.partitions.from_partitions()``.

This is only supported for datasets convertible to Arrow records.
This function induces a copy of the data. For zero-copy access to the
underlying data, consider using ``.to_arrow()`` or ``.get_blocks()``.
underlying data, consider using ``.to_arrow()`` or
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx for updating internal comments.

``.get_internal_block_refs()``.

Time complexity: O(dataset size / parallelism)

Expand All @@ -1338,7 +1339,7 @@ def to_modin(self) -> "modin.DataFrame":

from modin.distributed.dataframe.pandas.partitions import (
from_partitions)
pd_objs = self.to_pandas()
pd_objs = self.to_pandas_refs()
return from_partitions(pd_objs, axis=0)

def to_spark(self,
Expand All @@ -1354,17 +1355,45 @@ def to_spark(self,
core_worker = ray.worker.global_worker.core_worker
locations = [
core_worker.get_owner_address(block)
for block in self.get_blocks()
for block in self.get_internal_block_refs()
]
return raydp.spark.ray_dataset_to_spark_dataframe(
spark, self.schema(), self.get_blocks(), locations)
spark, self.schema(), self.get_internal_block_refs(), locations)

def to_pandas(self) -> List[ObjectRef["pandas.DataFrame"]]:
def to_pandas(self, limit: int = 1000) -> "pandas.DataFrame":
"""Convert this dataset into a single Pandas DataFrame.

This is only supported for datasets convertible to Arrow records. This
limits the number of records returned to the provided limit.

Time complexity: O(limit)

Args:
limit: The maximum number of records to return.

Returns:
A Pandas DataFrame created from this dataset, containing a limited
number of records.
"""

if self.count() > limit:
logger.warning(f"Only returning the first {limit} records from "
"to_pandas()")
limited_ds = self.limit(limit)
blocks = limited_ds.get_internal_block_refs()
output = DelegatingArrowBlockBuilder()
for block in ray.get(blocks):
output.add_block(block)
return output.build().to_pandas()

@DeveloperAPI
def to_pandas_refs(self) -> List[ObjectRef["pandas.DataFrame"]]:
"""Convert this dataset into a distributed set of Pandas dataframes.

This is only supported for datasets convertible to Arrow records.
This function induces a copy of the data. For zero-copy access to the
underlying data, consider using ``.to_arrow()`` or ``.get_blocks()``.
underlying data, consider using ``.to_arrow()`` or
``.get_internal_block_refs()``.

Time complexity: O(dataset size / parallelism)

Expand All @@ -1381,7 +1410,8 @@ def to_numpy(self, *,

This is only supported for datasets convertible to NumPy ndarrays.
This function induces a copy of the data. For zero-copy access to the
underlying data, consider using ``.to_arrow()`` or ``.get_blocks()``.
underlying data, consider using ``.to_arrow()`` or
``.get_internal_block_refs()``.

Time complexity: O(dataset size / parallelism)

Expand All @@ -1399,7 +1429,23 @@ def to_numpy(self, *,
for block in self._blocks
]

def to_arrow(self) -> List[ObjectRef["pyarrow.Table"]]:
def to_arrow(self) -> List["pyarrow.Table"]:
"""Convert this dataset into a list of Arrow tables.

This is only supported for datasets convertible to Arrow records.
This function is zero-copy if the existing data is already in Arrow
format. Otherwise, the data will be converted to Arrow format.

Time complexity: O(1) unless conversion is required.

Returns:
A list of Arrow tables created from this dataset.
"""

return ray.get(self.to_arrow_refs())

@DeveloperAPI
def to_arrow_refs(self) -> List[ObjectRef["pyarrow.Table"]]:
"""Convert this dataset into a distributed set of Arrow tables.

This is only supported for datasets convertible to Arrow records.
Expand Down Expand Up @@ -1546,7 +1592,7 @@ def __iter__(self):
return DatasetPipeline(it, length=len(it._splits))

@DeveloperAPI
def get_blocks(self) -> List[ObjectRef[Block]]:
def get_internal_block_refs(self) -> List[ObjectRef[Block]]:
"""Get a list of references to the underlying blocks of this dataset.

This function can be used for zero-copy access to the data.
Expand Down
4 changes: 2 additions & 2 deletions python/ray/data/extensions/tensor_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class TensorDtype(pd.api.extensions.ExtensionDtype):
one: int64
two: extension<arrow.py_extension_type<ArrowTensorType>>

>>> read_df = ray.get(read_ds.to_pandas())[0]
>>> read_df = ray.get(read_ds.to_pandas_refs())[0]
>>> read_df.dtypes
one int64
two TensorDtype
Expand Down Expand Up @@ -422,7 +422,7 @@ class TensorArray(pd.api.extensions.ExtensionArray, TensorOpsMixin):
one: int64
two: extension<arrow.py_extension_type<ArrowTensorType>>

>>> read_df = ray.get(read_ds.to_pandas())[0]
>>> read_df = ray.get(read_ds.to_pandas_refs())[0]
>>> read_df.dtypes
one int64
two TensorDtype
Expand Down
42 changes: 36 additions & 6 deletions python/ray/data/read_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import ray
from ray.types import ObjectRef
from ray.util.annotations import PublicAPI
from ray.util.annotations import PublicAPI, DeveloperAPI
from ray.data.block import Block, BlockAccessor, BlockMetadata
from ray.data.dataset import Dataset
from ray.data.datasource import Datasource, RangeDatasource, \
Expand Down Expand Up @@ -509,12 +509,27 @@ def from_modin(df: "modin.DataFrame") -> Dataset[ArrowRow]:
from modin.distributed.dataframe.pandas.partitions import unwrap_partitions

parts = unwrap_partitions(df, axis=0)
return from_pandas(parts)
return from_pandas_refs(parts)


@PublicAPI(stability="beta")
def from_pandas(dfs: List[ObjectRef["pandas.DataFrame"]]) -> Dataset[ArrowRow]:
"""Create a dataset from a set of Pandas dataframes.
def from_pandas(dfs: List["pandas.DataFrame"]) -> Dataset[ArrowRow]:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def from_pandas(dfs: List["pandas.DataFrame"]) -> Dataset[ArrowRow]:
def from_pandas(*dfs: List["pandas.DataFrame"]) -> Dataset[ArrowRow]:

"""Create a dataset from a list of Pandas dataframes.

Args:
dfs: A list of Pandas dataframes.

Returns:
Dataset holding Arrow records read from the dataframes.
"""
return from_pandas_refs([ray.put(df) for df in dfs])


@DeveloperAPI
def from_pandas_refs(
dfs: List[ObjectRef["pandas.DataFrame"]]) -> Dataset[ArrowRow]:
"""Create a dataset from a list of Ray object references to Pandas
dataframes.

Args:
dfs: A list of Ray object references to pandas dataframes.
Expand Down Expand Up @@ -546,8 +561,23 @@ def from_numpy(ndarrays: List[ObjectRef[np.ndarray]]) -> Dataset[ArrowRow]:


@PublicAPI(stability="beta")
def from_arrow(tables: List[ObjectRef[Union["pyarrow.Table", bytes]]]
) -> Dataset[ArrowRow]:
def from_arrow(
tables: List[Union["pyarrow.Table", bytes]]) -> Dataset[ArrowRow]:
"""Create a dataset from a list of Arrow tables.

Args:
tables: A list of Ray object references to Arrow tables,
or its streaming format in bytes.

Returns:
Dataset holding Arrow records from the tables.
"""
return from_arrow_refs([ray.put(t) for t in tables])


@DeveloperAPI
def from_arrow_refs(tables: List[ObjectRef[Union["pyarrow.Table", bytes]]]
) -> Dataset[ArrowRow]:
"""Create a dataset from a set of Arrow tables.

Args:
Expand Down
Loading