Skip to content

Commit

Permalink
Implement support for scan_ndjson in cudf-polars (#16263)
Browse files Browse the repository at this point in the history
Implement support for scan_ndjson in cudf-polars.

Authors:
  - Thomas Li (https://github.com/lithomas1)
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #16263
  • Loading branch information
lithomas1 authored Jul 25, 2024
1 parent d953676 commit f756e01
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 40 deletions.
37 changes: 35 additions & 2 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,10 +204,14 @@ class Scan(IR):

def __post_init__(self) -> None:
"""Validate preconditions."""
if self.typ not in ("csv", "parquet"):
if self.typ not in ("csv", "parquet", "ndjson"): # pragma: no cover
# This line is unhittable ATM since IPC/Anonymous scan raise
# on the polars side
raise NotImplementedError(f"Unhandled scan type: {self.typ}")
if self.typ == "ndjson" and self.file_options.n_rows is not None:
raise NotImplementedError("row limit in scan")
if self.cloud_options is not None and any(
self.cloud_options[k] is not None for k in ("aws", "azure", "gcp")
self.cloud_options.get(k) is not None for k in ("aws", "azure", "gcp")
):
raise NotImplementedError(
"Read from cloud storage"
Expand All @@ -232,6 +236,13 @@ def __post_init__(self) -> None:
# Need to do some file introspection to get the number
# of columns so that column projection works right.
raise NotImplementedError("Reading CSV without header")
elif self.typ == "ndjson":
# TODO: consider handling the low memory option here
# (maybe use chunked JSON reader)
if self.reader_options["ignore_errors"]:
raise NotImplementedError(
"ignore_errors is not supported in the JSON reader"
)

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
"""Evaluate and return a dataframe."""
Expand Down Expand Up @@ -317,6 +328,28 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
# TODO: consider nested column names?
tbl_w_meta.column_names(include_children=False),
)
elif self.typ == "ndjson":
json_schema: list[tuple[str, str, list]] = [
(name, typ, []) for name, typ in self.schema.items()
]
plc_tbl_w_meta = plc.io.json.read_json(
plc.io.SourceInfo(self.paths),
lines=True,
dtypes=json_schema,
prune_columns=True,
)
# TODO: I don't think cudf-polars supports nested types in general right now
# (but when it does, we should pass child column names from nested columns in)
df = DataFrame.from_table(
plc_tbl_w_meta.tbl, plc_tbl_w_meta.column_names(include_children=False)
)
col_order = list(self.schema.keys())
# TODO: remove condition when dropping support for polars 1.0
# https://github.com/pola-rs/polars/pull/17363
if row_index is not None and row_index[0] in self.schema:
col_order.remove(row_index[0])
if col_order is not None:
df = df.select(col_order)
else:
raise NotImplementedError(
f"Unhandled scan type: {self.typ}"
Expand Down
34 changes: 26 additions & 8 deletions python/cudf_polars/cudf_polars/testing/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@
from cudf_polars.dsl.translate import translate_ir

if TYPE_CHECKING:
from collections.abc import Mapping

import polars as pl

from cudf_polars.typing import OptimizationArgs
Expand All @@ -26,7 +24,9 @@
def assert_gpu_result_equal(
lazydf: pl.LazyFrame,
*,
collect_kwargs: Mapping[OptimizationArgs, bool] | None = None,
collect_kwargs: dict[OptimizationArgs, bool] | None = None,
polars_collect_kwargs: dict[OptimizationArgs, bool] | None = None,
cudf_collect_kwargs: dict[OptimizationArgs, bool] | None = None,
check_row_order: bool = True,
check_column_order: bool = True,
check_dtypes: bool = True,
Expand All @@ -43,8 +43,17 @@ def assert_gpu_result_equal(
lazydf
frame to collect.
collect_kwargs
Keyword arguments to pass to collect. Useful for controlling
optimization settings.
Common keyword arguments to pass to collect for both polars CPU and
cudf-polars.
Useful for controlling optimization settings.
polars_collect_kwargs
Keyword arguments to pass to collect for execution on polars CPU.
Overrides kwargs in collect_kwargs.
Useful for controlling optimization settings.
cudf_collect_kwargs
Keyword arguments to pass to collect for execution on cudf-polars.
Overrides kwargs in collect_kwargs.
Useful for controlling optimization settings.
check_row_order
Expect rows to be in same order
check_column_order
Expand All @@ -68,10 +77,19 @@ def assert_gpu_result_equal(
NotImplementedError
If GPU collection failed in some way.
"""
collect_kwargs = {} if collect_kwargs is None else collect_kwargs
expect = lazydf.collect(**collect_kwargs)
if collect_kwargs is None:
collect_kwargs = {}
final_polars_collect_kwargs = collect_kwargs.copy()
final_cudf_collect_kwargs = collect_kwargs.copy()
if polars_collect_kwargs is not None:
final_polars_collect_kwargs.update(polars_collect_kwargs)
if cudf_collect_kwargs is not None: # pragma: no cover
# exclude from coverage since not used ATM
# but this is probably still useful
final_cudf_collect_kwargs.update(cudf_collect_kwargs)
expect = lazydf.collect(**final_polars_collect_kwargs)
got = lazydf.collect(
**collect_kwargs,
**final_cudf_collect_kwargs,
post_opt_callback=partial(execute_with_cudf, raise_on_fail=True),
)
assert_frame_equal(
Expand Down
115 changes: 85 additions & 30 deletions python/cudf_polars/tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,33 +31,16 @@ def n_rows(request):
return request.param


@pytest.fixture(params=["csv", "parquet"])
def df(request, tmp_path, row_index, n_rows):
df = pl.DataFrame(
@pytest.fixture(scope="module")
def df():
# TODO: more dtypes
return pl.DataFrame(
{
"a": [1, 2, 3, None],
"b": ["ẅ", "x", "y", "z"],
"c": [None, None, 4, 5],
"a": [1, 2, 3, None, 4, 5],
"b": ["ẅ", "x", "y", "z", "123", "abcd"],
"c": [None, None, 4, 5, -1, 0],
}
)
name, offset = row_index
if request.param == "csv":
df.write_csv(tmp_path / "file.csv")
return pl.scan_csv(
tmp_path / "file.csv",
row_index_name=name,
row_index_offset=offset,
n_rows=n_rows,
)
else:
df.write_parquet(tmp_path / "file.pq")
# parquet doesn't have skip_rows argument
return pl.scan_parquet(
tmp_path / "file.pq",
row_index_name=name,
row_index_offset=offset,
n_rows=n_rows,
)


@pytest.fixture(params=[None, ["a"], ["b", "a"]], ids=["all", "subset", "reordered"])
Expand All @@ -75,20 +58,72 @@ def mask(request):
return request.param


def test_scan(df, columns, mask):
q = df
def make_source(df, path, format):
"""
Writes the passed polars df to a file of
the desired format
"""
if format == "csv":
df.write_csv(path)
elif format == "ndjson":
df.write_ndjson(path)
else:
df.write_parquet(path)


@pytest.mark.parametrize(
"format, scan_fn",
[
("csv", pl.scan_csv),
("ndjson", pl.scan_ndjson),
("parquet", pl.scan_parquet),
],
)
def test_scan(tmp_path, df, format, scan_fn, row_index, n_rows, columns, mask, request):
name, offset = row_index
make_source(df, tmp_path / "file", format)
request.applymarker(
pytest.mark.xfail(
condition=(n_rows is not None and scan_fn is pl.scan_ndjson),
reason="libcudf does not support n_rows",
)
)
q = scan_fn(
tmp_path / "file",
row_index_name=name,
row_index_offset=offset,
n_rows=n_rows,
)
if mask is not None:
q = q.filter(mask)
if columns is not None:
q = df.select(*columns)
assert_gpu_result_equal(q)
q = q.select(*columns)
polars_collect_kwargs = {}
if versions.POLARS_VERSION_LT_12:
# https://github.com/pola-rs/polars/issues/17553
polars_collect_kwargs = {"projection_pushdown": False}
assert_gpu_result_equal(
q,
polars_collect_kwargs=polars_collect_kwargs,
# This doesn't work in polars < 1.2 since the row-index
# is in the wrong order in previous polars releases
check_column_order=versions.POLARS_VERSION_LT_12,
)


def test_scan_unsupported_raises(tmp_path):
df = pl.DataFrame({"a": [1, 2, 3]})

df.write_ndjson(tmp_path / "df.json")
q = pl.scan_ndjson(tmp_path / "df.json")
df.write_ipc(tmp_path / "df.ipc")
q = pl.scan_ipc(tmp_path / "df.ipc")
assert_ir_translation_raises(q, NotImplementedError)


def test_scan_ndjson_nrows_notimplemented(tmp_path, df):
df = pl.DataFrame({"a": [1, 2, 3]})

df.write_ndjson(tmp_path / "df.jsonl")
q = pl.scan_ndjson(tmp_path / "df.jsonl", n_rows=1)
assert_ir_translation_raises(q, NotImplementedError)


Expand Down Expand Up @@ -225,3 +260,23 @@ def test_scan_csv_skip_initial_empty_rows(tmp_path):
q = pl.scan_csv(tmp_path / "test.csv", separator="|", skip_rows=1)

assert_gpu_result_equal(q)


@pytest.mark.parametrize(
"schema",
[
# List of colnames (basicaly like names param in CSV)
{"b": pl.String, "a": pl.Float32},
{"a": pl.UInt64},
],
)
def test_scan_ndjson_schema(df, tmp_path, schema):
make_source(df, tmp_path / "file", "ndjson")
q = pl.scan_ndjson(tmp_path / "file", schema=schema)
assert_gpu_result_equal(q)


def test_scan_ndjson_unsupported(df, tmp_path):
make_source(df, tmp_path / "file", "ndjson")
q = pl.scan_ndjson(tmp_path / "file", ignore_errors=True)
assert_ir_translation_raises(q, NotImplementedError)

0 comments on commit f756e01

Please sign in to comment.