diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index e5691cba7dd..7f62dff4389 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -204,10 +204,14 @@ class Scan(IR): def __post_init__(self) -> None: """Validate preconditions.""" - if self.typ not in ("csv", "parquet"): + if self.typ not in ("csv", "parquet", "ndjson"): # pragma: no cover + # This line is unhittable ATM since IPC/Anonymous scan raise + # on the polars side raise NotImplementedError(f"Unhandled scan type: {self.typ}") + if self.typ == "ndjson" and self.file_options.n_rows is not None: + raise NotImplementedError("row limit in scan") if self.cloud_options is not None and any( - self.cloud_options[k] is not None for k in ("aws", "azure", "gcp") + self.cloud_options.get(k) is not None for k in ("aws", "azure", "gcp") ): raise NotImplementedError( "Read from cloud storage" @@ -232,6 +236,13 @@ def __post_init__(self) -> None: # Need to do some file introspection to get the number # of columns so that column projection works right. raise NotImplementedError("Reading CSV without header") + elif self.typ == "ndjson": + # TODO: consider handling the low memory option here + # (maybe use chunked JSON reader) + if self.reader_options["ignore_errors"]: + raise NotImplementedError( + "ignore_errors is not supported in the JSON reader" + ) def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" @@ -317,6 +328,28 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: # TODO: consider nested column names? tbl_w_meta.column_names(include_children=False), ) + elif self.typ == "ndjson": + json_schema: list[tuple[str, str, list]] = [ + (name, typ, []) for name, typ in self.schema.items() + ] + plc_tbl_w_meta = plc.io.json.read_json( + plc.io.SourceInfo(self.paths), + lines=True, + dtypes=json_schema, + prune_columns=True, + ) + # TODO: I don't think cudf-polars supports nested types in general right now + # (but when it does, we should pass child column names from nested columns in) + df = DataFrame.from_table( + plc_tbl_w_meta.tbl, plc_tbl_w_meta.column_names(include_children=False) + ) + col_order = list(self.schema.keys()) + # TODO: remove condition when dropping support for polars 1.0 + # https://github.com/pola-rs/polars/pull/17363 + if row_index is not None and row_index[0] in self.schema: + col_order.remove(row_index[0]) + if col_order is not None: + df = df.select(col_order) else: raise NotImplementedError( f"Unhandled scan type: {self.typ}" diff --git a/python/cudf_polars/cudf_polars/testing/asserts.py b/python/cudf_polars/cudf_polars/testing/asserts.py index a9a4ae5f0a6..d37c96a15de 100644 --- a/python/cudf_polars/cudf_polars/testing/asserts.py +++ b/python/cudf_polars/cudf_polars/testing/asserts.py @@ -14,8 +14,6 @@ from cudf_polars.dsl.translate import translate_ir if TYPE_CHECKING: - from collections.abc import Mapping - import polars as pl from cudf_polars.typing import OptimizationArgs @@ -26,7 +24,9 @@ def assert_gpu_result_equal( lazydf: pl.LazyFrame, *, - collect_kwargs: Mapping[OptimizationArgs, bool] | None = None, + collect_kwargs: dict[OptimizationArgs, bool] | None = None, + polars_collect_kwargs: dict[OptimizationArgs, bool] | None = None, + cudf_collect_kwargs: dict[OptimizationArgs, bool] | None = None, check_row_order: bool = True, check_column_order: bool = True, check_dtypes: bool = True, @@ -43,8 +43,17 @@ def assert_gpu_result_equal( lazydf frame to collect. collect_kwargs - Keyword arguments to pass to collect. Useful for controlling - optimization settings. + Common keyword arguments to pass to collect for both polars CPU and + cudf-polars. + Useful for controlling optimization settings. + polars_collect_kwargs + Keyword arguments to pass to collect for execution on polars CPU. + Overrides kwargs in collect_kwargs. + Useful for controlling optimization settings. + cudf_collect_kwargs + Keyword arguments to pass to collect for execution on cudf-polars. + Overrides kwargs in collect_kwargs. + Useful for controlling optimization settings. check_row_order Expect rows to be in same order check_column_order @@ -68,10 +77,19 @@ def assert_gpu_result_equal( NotImplementedError If GPU collection failed in some way. """ - collect_kwargs = {} if collect_kwargs is None else collect_kwargs - expect = lazydf.collect(**collect_kwargs) + if collect_kwargs is None: + collect_kwargs = {} + final_polars_collect_kwargs = collect_kwargs.copy() + final_cudf_collect_kwargs = collect_kwargs.copy() + if polars_collect_kwargs is not None: + final_polars_collect_kwargs.update(polars_collect_kwargs) + if cudf_collect_kwargs is not None: # pragma: no cover + # exclude from coverage since not used ATM + # but this is probably still useful + final_cudf_collect_kwargs.update(cudf_collect_kwargs) + expect = lazydf.collect(**final_polars_collect_kwargs) got = lazydf.collect( - **collect_kwargs, + **final_cudf_collect_kwargs, post_opt_callback=partial(execute_with_cudf, raise_on_fail=True), ) assert_frame_equal( diff --git a/python/cudf_polars/tests/test_scan.py b/python/cudf_polars/tests/test_scan.py index 642b6ae8a37..64acbb076ed 100644 --- a/python/cudf_polars/tests/test_scan.py +++ b/python/cudf_polars/tests/test_scan.py @@ -31,33 +31,16 @@ def n_rows(request): return request.param -@pytest.fixture(params=["csv", "parquet"]) -def df(request, tmp_path, row_index, n_rows): - df = pl.DataFrame( +@pytest.fixture(scope="module") +def df(): + # TODO: more dtypes + return pl.DataFrame( { - "a": [1, 2, 3, None], - "b": ["ẅ", "x", "y", "z"], - "c": [None, None, 4, 5], + "a": [1, 2, 3, None, 4, 5], + "b": ["ẅ", "x", "y", "z", "123", "abcd"], + "c": [None, None, 4, 5, -1, 0], } ) - name, offset = row_index - if request.param == "csv": - df.write_csv(tmp_path / "file.csv") - return pl.scan_csv( - tmp_path / "file.csv", - row_index_name=name, - row_index_offset=offset, - n_rows=n_rows, - ) - else: - df.write_parquet(tmp_path / "file.pq") - # parquet doesn't have skip_rows argument - return pl.scan_parquet( - tmp_path / "file.pq", - row_index_name=name, - row_index_offset=offset, - n_rows=n_rows, - ) @pytest.fixture(params=[None, ["a"], ["b", "a"]], ids=["all", "subset", "reordered"]) @@ -75,20 +58,72 @@ def mask(request): return request.param -def test_scan(df, columns, mask): - q = df +def make_source(df, path, format): + """ + Writes the passed polars df to a file of + the desired format + """ + if format == "csv": + df.write_csv(path) + elif format == "ndjson": + df.write_ndjson(path) + else: + df.write_parquet(path) + + +@pytest.mark.parametrize( + "format, scan_fn", + [ + ("csv", pl.scan_csv), + ("ndjson", pl.scan_ndjson), + ("parquet", pl.scan_parquet), + ], +) +def test_scan(tmp_path, df, format, scan_fn, row_index, n_rows, columns, mask, request): + name, offset = row_index + make_source(df, tmp_path / "file", format) + request.applymarker( + pytest.mark.xfail( + condition=(n_rows is not None and scan_fn is pl.scan_ndjson), + reason="libcudf does not support n_rows", + ) + ) + q = scan_fn( + tmp_path / "file", + row_index_name=name, + row_index_offset=offset, + n_rows=n_rows, + ) if mask is not None: q = q.filter(mask) if columns is not None: - q = df.select(*columns) - assert_gpu_result_equal(q) + q = q.select(*columns) + polars_collect_kwargs = {} + if versions.POLARS_VERSION_LT_12: + # https://github.com/pola-rs/polars/issues/17553 + polars_collect_kwargs = {"projection_pushdown": False} + assert_gpu_result_equal( + q, + polars_collect_kwargs=polars_collect_kwargs, + # This doesn't work in polars < 1.2 since the row-index + # is in the wrong order in previous polars releases + check_column_order=versions.POLARS_VERSION_LT_12, + ) def test_scan_unsupported_raises(tmp_path): df = pl.DataFrame({"a": [1, 2, 3]}) - df.write_ndjson(tmp_path / "df.json") - q = pl.scan_ndjson(tmp_path / "df.json") + df.write_ipc(tmp_path / "df.ipc") + q = pl.scan_ipc(tmp_path / "df.ipc") + assert_ir_translation_raises(q, NotImplementedError) + + +def test_scan_ndjson_nrows_notimplemented(tmp_path, df): + df = pl.DataFrame({"a": [1, 2, 3]}) + + df.write_ndjson(tmp_path / "df.jsonl") + q = pl.scan_ndjson(tmp_path / "df.jsonl", n_rows=1) assert_ir_translation_raises(q, NotImplementedError) @@ -225,3 +260,23 @@ def test_scan_csv_skip_initial_empty_rows(tmp_path): q = pl.scan_csv(tmp_path / "test.csv", separator="|", skip_rows=1) assert_gpu_result_equal(q) + + +@pytest.mark.parametrize( + "schema", + [ + # List of colnames (basicaly like names param in CSV) + {"b": pl.String, "a": pl.Float32}, + {"a": pl.UInt64}, + ], +) +def test_scan_ndjson_schema(df, tmp_path, schema): + make_source(df, tmp_path / "file", "ndjson") + q = pl.scan_ndjson(tmp_path / "file", schema=schema) + assert_gpu_result_equal(q) + + +def test_scan_ndjson_unsupported(df, tmp_path): + make_source(df, tmp_path / "file", "ndjson") + q = pl.scan_ndjson(tmp_path / "file", ignore_errors=True) + assert_ir_translation_raises(q, NotImplementedError)