From 5031bad5f2b7dd33c8a0264856a333de7c24b9c6 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 8 Nov 2022 19:37:35 -0800 Subject: [PATCH 1/2] expose engine='cudf_experimental' to dask_cudf.read_json --- python/dask_cudf/dask_cudf/backends.py | 9 +++++++-- python/dask_cudf/dask_cudf/io/json.py | 15 ++++++++++++++- .../dask_cudf/dask_cudf/io/tests/test_json.py | 19 +++++++++++++++++++ 3 files changed, 40 insertions(+), 3 deletions(-) diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index f02c75eb3e8..0d651bad0e3 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -2,6 +2,7 @@ import warnings from collections.abc import Iterator +from functools import partial import cupy as cp import numpy as np @@ -493,11 +494,15 @@ def read_parquet(*args, engine=None, **kwargs): ) @staticmethod - def read_json(*args, engine=None, **kwargs): + def read_json(*args, engine="cudf", **kwargs): return _default_backend( dd.read_json, *args, - engine=cudf.read_json, + engine=( + partial(cudf.read_json, engine=engine) + if isinstance(engine, str) + else engine + ), **kwargs, ) diff --git a/python/dask_cudf/dask_cudf/io/json.py b/python/dask_cudf/dask_cudf/io/json.py index 6c3c95d1a2e..8432a76ff0c 100644 --- a/python/dask_cudf/dask_cudf/io/json.py +++ b/python/dask_cudf/dask_cudf/io/json.py @@ -6,4 +6,17 @@ import cudf -read_json = partial(dask.dataframe.read_json, engine=cudf.read_json) + +def read_json(path, engine="auto", **kwargs): + # Wrap `dd.read_json` with special engine handling + + # TODO: Add optimized code path to leverage the + # `byte_range` argument in `cudf.read_json` for + # local storage (see `dask_cudf.read_csv`) + if isinstance(engine, str): + # Pass `str` engine argument to `cudf.read_json`` + engine = partial(cudf.read_json, engine=engine) + elif not callable(engine): + raise ValueError(f"Unsupported engine option: {engine}") + # Pass `callable` engine argument to `dd.read_json` + return dask.dataframe.read_json(path, engine=engine, **kwargs) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_json.py b/python/dask_cudf/dask_cudf/io/tests/test_json.py index d19f7736e8e..416ee320c03 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_json.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_json.py @@ -71,3 +71,22 @@ def test_read_json_lines(lines): actual = dask_cudf.read_json(f, orient="records", lines=lines) actual_pd = pd.read_json(f, orient="records", lines=lines) dd.assert_eq(actual, actual_pd) + + +@pytest.mark.filterwarnings("ignore:Using CPU") +def test_read_json_nested_experimental(): + # Check that `engine="cudf_experimental"` can + # be used to support nested data + df = pd.DataFrame( + { + "a": [{"y": 2}, {"y": 4}, {"y": 6}, {"y": 8}], + "b": [[1, 2, 3], [4, 5], [6], [7]], + "c": [1, 3, 5, 7], + } + ) + lines = dict(orient="records", lines=True) + with tmpfile("json") as f: + df.to_json(f, **lines) + actual = dask_cudf.read_json(f, engine="cudf_experimental", **lines) + actual_pd = pd.read_json(f, **lines) + dd.assert_eq(actual, actual_pd) From 7bf96b51bb6a7171c03226c695bead59b0581ecf Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 9 Nov 2022 07:13:34 -0800 Subject: [PATCH 2/2] add docstring and simplify code a bit. Improve test --- python/dask_cudf/dask_cudf/backends.py | 16 ++--- python/dask_cudf/dask_cudf/io/json.py | 67 ++++++++++++++++--- .../dask_cudf/dask_cudf/io/tests/test_json.py | 13 ++-- 3 files changed, 68 insertions(+), 28 deletions(-) diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index 0d651bad0e3..58f3d807f51 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -2,7 +2,6 @@ import warnings from collections.abc import Iterator -from functools import partial import cupy as cp import numpy as np @@ -494,17 +493,10 @@ def read_parquet(*args, engine=None, **kwargs): ) @staticmethod - def read_json(*args, engine="cudf", **kwargs): - return _default_backend( - dd.read_json, - *args, - engine=( - partial(cudf.read_json, engine=engine) - if isinstance(engine, str) - else engine - ), - **kwargs, - ) + def read_json(*args, **kwargs): + from dask_cudf.io.json import read_json + + return read_json(*args, **kwargs) @staticmethod def read_orc(*args, **kwargs): diff --git a/python/dask_cudf/dask_cudf/io/json.py b/python/dask_cudf/dask_cudf/io/json.py index 8432a76ff0c..6ab2ba415a5 100644 --- a/python/dask_cudf/dask_cudf/io/json.py +++ b/python/dask_cudf/dask_cudf/io/json.py @@ -6,17 +6,66 @@ import cudf +from dask_cudf.backends import _default_backend -def read_json(path, engine="auto", **kwargs): - # Wrap `dd.read_json` with special engine handling + +def read_json(url_path, engine="auto", **kwargs): + """Create a dask_cudf DataFrame collection from JSON data + + This function wraps ``dask.dataframe.read_json``, and passes + ``engine=partial(cudf.read_json, engine="auto")`` by default. + + Parameters + ---------- + url_path: str, list of str + Location to read from. If a string, can include a glob character to + find a set of file names. + Supports protocol specifications such as ``"s3://"``. + engine : str or Callable, default "auto" + If str, this value will be used as the ``engine`` argument when + ``cudf.read_json`` is used to create each partition. If Callable, + this value will be used as the underlying function used to create + each partition from JSON data. The default value is "auto", so + that ``engine=partial(cudf.read_json, engine="auto")`` will be + pased to ``dask.dataframe.read_json`` by default. + **kwargs : + Key-word arguments to pass through to ``dask.dataframe.read_json``. + + Returns + ------- + dask_cudf.DataFrame + + Examples + -------- + Load single file + + >>> from dask_cudf import read_json + >>> read_json('myfile.json') # doctest: +SKIP + + Load large line-delimited JSON files using partitions of approx + 256MB size + + >>> read_json('data/file*.csv', blocksize=2**28) # doctest: +SKIP + + Load nested JSON data + + >>> read_json('myfile.json', engine='cudf_experimental') # doctest: +SKIP + + See Also + -------- + dask.dataframe.io.json.read_json + """ # TODO: Add optimized code path to leverage the # `byte_range` argument in `cudf.read_json` for # local storage (see `dask_cudf.read_csv`) - if isinstance(engine, str): - # Pass `str` engine argument to `cudf.read_json`` - engine = partial(cudf.read_json, engine=engine) - elif not callable(engine): - raise ValueError(f"Unsupported engine option: {engine}") - # Pass `callable` engine argument to `dd.read_json` - return dask.dataframe.read_json(path, engine=engine, **kwargs) + return _default_backend( + dask.dataframe.read_json, + url_path, + engine=( + partial(cudf.read_json, engine=engine) + if isinstance(engine, str) + else engine + ), + **kwargs, + ) diff --git a/python/dask_cudf/dask_cudf/io/tests/test_json.py b/python/dask_cudf/dask_cudf/io/tests/test_json.py index 416ee320c03..9d26bf06545 100644 --- a/python/dask_cudf/dask_cudf/io/tests/test_json.py +++ b/python/dask_cudf/dask_cudf/io/tests/test_json.py @@ -73,8 +73,7 @@ def test_read_json_lines(lines): dd.assert_eq(actual, actual_pd) -@pytest.mark.filterwarnings("ignore:Using CPU") -def test_read_json_nested_experimental(): +def test_read_json_nested_experimental(tmp_path): # Check that `engine="cudf_experimental"` can # be used to support nested data df = pd.DataFrame( @@ -84,9 +83,9 @@ def test_read_json_nested_experimental(): "c": [1, 3, 5, 7], } ) - lines = dict(orient="records", lines=True) - with tmpfile("json") as f: - df.to_json(f, **lines) - actual = dask_cudf.read_json(f, engine="cudf_experimental", **lines) - actual_pd = pd.read_json(f, **lines) + kwargs = dict(orient="records", lines=True) + with tmp_path / "data.json" as f: + df.to_json(f, **kwargs) + actual = dask_cudf.read_json(f, engine="cudf_experimental", **kwargs) + actual_pd = pd.read_json(f, **kwargs) dd.assert_eq(actual, actual_pd)