Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dask>=2024.11.2 in Dask cuDF #17439

Merged
merged 13 commits into from
Nov 27, 2024
Merged
1 change: 1 addition & 0 deletions docs/cudf/source/user_guide/api_docs/series.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Attributes
Series.values
Series.data
Series.dtype
Series.dtypes
Series.shape
Series.ndim
Series.nullable
Expand Down
9 changes: 9 additions & 0 deletions python/cudf/cudf/core/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1598,6 +1598,15 @@ def dtype(self):
"""The dtype of the Series."""
return self._column.dtype

@property # type: ignore
@_performance_tracking
def dtypes(self):
"""The dtype of the Series.

This is an alias for `Series.dtype`.
"""
return self.dtype
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Turns out that Pandas uses Series.dtypes as an alias for Series.dtype (and some recent dask-expr code takes advantage of the Sereies<->DataFrame portability of the dtypes attribute).


@classmethod
@_performance_tracking
def _concat(cls, objs, axis=0, index: bool = True):
Expand Down
6 changes: 6 additions & 0 deletions python/cudf/cudf/tests/test_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2934,3 +2934,9 @@ def test_empty_astype_always_castable(type1, type2, as_dtype, copy):
assert ser._column is result._column
else:
assert ser._column is not result._column


def test_dtype_dtypes_equal():
ser = cudf.Series([0])
assert ser.dtype is ser.dtypes
assert ser.dtypes is ser.to_pandas().dtypes
6 changes: 1 addition & 5 deletions python/dask_cudf/dask_cudf/_expr/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,16 +156,12 @@ def to_orc(self, *args, **kwargs):
from dask_cudf._legacy.io import to_orc

return to_orc(self, *args, **kwargs)
# return self.to_legacy_dataframe().to_orc(*args, **kwargs)

@staticmethod
def read_text(*args, **kwargs):
from dask_expr import from_legacy_dataframe

from dask_cudf._legacy.io.text import read_text as legacy_read_text

ddf = legacy_read_text(*args, **kwargs)
return from_legacy_dataframe(ddf)
return legacy_read_text(*args, **kwargs)


class Series(DXSeries, CudfFrameBase):
Expand Down
32 changes: 14 additions & 18 deletions python/dask_cudf/dask_cudf/_legacy/io/orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
from pyarrow import orc as orc

from dask import dataframe as dd
from dask.base import tokenize
from dask.dataframe.io.utils import _get_pyarrow_dtypes

import cudf


def _read_orc_stripe(fs, path, stripe, columns, kwargs=None):
def _read_orc_stripe(source, fs, columns=None, kwargs=None):
"""Pull out specific columns from specific stripe"""
path, stripe = source
if kwargs is None:
kwargs = {}
with fs.open(path, "rb") as f:
Expand Down Expand Up @@ -67,7 +67,7 @@ def read_orc(path, columns=None, filters=None, storage_options=None, **kwargs):
"""

storage_options = storage_options or {}
fs, fs_token, paths = get_fs_token_paths(
fs, _, paths = get_fs_token_paths(
path, mode="rb", storage_options=storage_options
)
schema = None
Expand Down Expand Up @@ -100,27 +100,23 @@ def read_orc(path, columns=None, filters=None, storage_options=None, **kwargs):
**kwargs,
)

name = "read-orc-" + tokenize(fs_token, path, columns, filters, **kwargs)
dsk = {}
N = 0
sources = []
for path, n in zip(paths, nstripes_per_file):
for stripe in (
range(n)
if filters is None
else cudf.io.orc._filter_stripes(filters, path)
):
dsk[(name, N)] = (
_read_orc_stripe,
fs,
path,
stripe,
columns,
kwargs,
)
N += 1

divisions = [None] * (len(dsk) + 1)
return dd.core.new_dd_object(dsk, name, meta, divisions)
sources.append((path, stripe))

return dd.from_map(
_read_orc_stripe,
sources,
args=[fs],
columns=columns,
kwargs=kwargs,
meta=meta,
)


def write_orc_partition(df, path, fs, filename, compression="snappy"):
Expand Down
40 changes: 21 additions & 19 deletions python/dask_cudf/dask_cudf/_legacy/io/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,18 @@
from glob import glob

import dask.dataframe as dd
from dask.base import tokenize
from dask.utils import apply, parse_bytes
from dask.utils import parse_bytes

import cudf


def read_text(path, chunksize="256 MiB", **kwargs):
def _read_text(source, **kwargs):
# Wrapper for cudf.read_text operation
fn, byte_range = source
return cudf.read_text(fn, byte_range=byte_range, **kwargs)


def read_text(path, chunksize="256 MiB", byte_range=None, **kwargs):
if isinstance(chunksize, str):
chunksize = parse_bytes(chunksize)

Expand All @@ -27,28 +32,25 @@ def read_text(path, chunksize="256 MiB", **kwargs):
msg = f"A file in: {filenames} does not exist."
raise FileNotFoundError(msg)

name = "read-text-" + tokenize(path, tokenize, **kwargs)
if chunksize and byte_range:
raise ValueError("Cannot specify both chunksize and byte_range.")

if chunksize:
dsk = {}
i = 0
sources = []
for fn in filenames:
size = os.path.getsize(fn)
for start in range(0, size, chunksize):
kwargs1 = kwargs.copy()
kwargs1["byte_range"] = (
byte_range = (
start,
chunksize,
) # specify which chunk of the file we care about

dsk[(name, i)] = (apply, cudf.read_text, [fn], kwargs1)
i += 1
sources.append((fn, byte_range))
else:
dsk = {
(name, i): (apply, cudf.read_text, [fn], kwargs)
for i, fn in enumerate(filenames)
}

meta = cudf.Series([], dtype="O")
divisions = [None] * (len(dsk) + 1)
return dd.core.new_dd_object(dsk, name, meta, divisions)
sources = [(fn, byte_range) for fn in filenames]

return dd.from_map(
_read_text,
sources,
meta=cudf.Series([], dtype="O"),
**kwargs,
)
5 changes: 1 addition & 4 deletions python/dask_cudf/dask_cudf/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,9 +738,6 @@ def read_json(*args, **kwargs):

@staticmethod
def read_orc(*args, **kwargs):
from dask_expr import from_legacy_dataframe

from dask_cudf._legacy.io.orc import read_orc as legacy_read_orc

ddf = legacy_read_orc(*args, **kwargs)
return from_legacy_dataframe(ddf)
return legacy_read_orc(*args, **kwargs)
11 changes: 10 additions & 1 deletion python/dask_cudf/dask_cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@
from dask.tokenize import tokenize
from dask.utils import parse_bytes

try:
# TODO: Remove try/except when dask>2024.11.2
from dask._task_spec import List as TaskList
except ImportError:

def TaskList(*x):
return list(x)


import cudf

from dask_cudf import QUERY_PLANNING_ON, _deprecated_api
Expand Down Expand Up @@ -447,7 +456,7 @@ def _task(self, name, index: int):
return Task(
name,
cudf.concat,
[expr._filtered_task(name, i) for i in bucket],
TaskList(*(expr._filtered_task(name, i) for i in bucket)),
)

pieces = []
Expand Down
Loading