From 184979c993df4042b5f7805a75d1d9570bddd1d5 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Fri, 5 Jul 2024 17:33:05 +0000 Subject: [PATCH] Handling csv reader options Possibly some pieces are missing, but this is much closer to complete. --- python/cudf_polars/cudf_polars/dsl/ir.py | 104 ++++++++++++++++-- .../cudf_polars/cudf_polars/dsl/translate.py | 9 +- python/cudf_polars/tests/test_scan.py | 97 ++++++++++++++-- 3 files changed, 191 insertions(+), 19 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 6b552642e88..b32fa9c273e 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -15,9 +15,9 @@ import dataclasses import itertools -import json import types from functools import cache +from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, ClassVar import pyarrow as pa @@ -185,8 +185,10 @@ class Scan(IR): typ: str """What type of file are we reading? Parquet, CSV, etc...""" - options: tuple[Any, ...] - """Type specific options, as json-encoded strings.""" + reader_options: dict[str, Any] + """Reader-specific options, as dictionary.""" + cloud_options: dict[str, Any] | None + """Cloud-related authentication options, currently ignored.""" paths: list[str] """List of paths to read from.""" file_options: Any @@ -206,9 +208,33 @@ def __post_init__(self) -> None: if self.file_options.n_rows is not None: raise NotImplementedError("row limit in scan") if self.typ not in ("csv", "parquet"): + raise NotImplementedError(f"Unhandled scan type: {self.typ}") + if self.cloud_options is not None and any( + self.cloud_options[k] is not None for k in ("aws", "azure", "gcp") + ): raise NotImplementedError( - f"Unhandled scan type: {self.typ}" - ) # pragma: no cover; polars raises on the rust side for now + "Read from cloud storage" + ) # pragma: no cover; no test yet + if self.typ == "csv": + if self.reader_options["skip_rows_after_header"] != 0: + raise NotImplementedError("Skipping rows after header in CSV reader") + parse_options = self.reader_options["parse_options"] + if ( + null_values := parse_options["null_values"] + ) is not None and "Named" in null_values: + raise NotImplementedError( + "Per column null value specification not supported for CSV reader" + ) + if ( + comment := parse_options["comment_prefix"] + ) is not None and "Multi" in comment: + raise NotImplementedError( + "Multi-character comment prefix not supported for CSV reader" + ) + if not self.reader_options["has_header"]: + # Need to do some file introspection to get the number + # of columns so that column projection works right. + raise NotImplementedError("Reading CSV without header") def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" @@ -216,14 +242,70 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: with_columns = options.with_columns row_index = options.row_index if self.typ == "csv": - opts, cloud_opts = map(json.loads, self.options) - df = DataFrame.from_cudf( - cudf.concat( - [cudf.read_csv(p, usecols=with_columns) for p in self.paths] + dtype_map = { + name: cudf._lib.types.PYLIBCUDF_TO_SUPPORTED_NUMPY_TYPES[typ.id()] + for name, typ in self.schema.items() + } + parse_options = self.reader_options["parse_options"] + sep = chr(parse_options["separator"]) + quote = chr(parse_options["quote_char"]) + eol = chr(parse_options["eol_char"]) + if self.reader_options["schema"] is not None: + # Reader schema provides names + column_names = list(self.reader_options["schema"]["inner"].keys()) + else: + # file provides column names + column_names = None + usecols = with_columns + # TODO: support has_header=False + header = 0 + + # polars defaults to no null recognition + null_values = [""] + if parse_options["null_values"] is not None: + ((typ, nulls),) = parse_options["null_values"].items() + if typ == "AllColumnsSingle": + # Single value + null_values.append(nulls) + else: + # List of values + null_values.extend(nulls) + if parse_options["comment_prefix"] is not None: + comment = chr(parse_options["comment_prefix"]["Single"]) + else: + comment = None + decimal = "," if parse_options["decimal_comma"] else "." + + # polars skips blank lines at the beginning of the file + pieces = [] + for p in self.paths: + skiprows = self.reader_options["skip_rows"] + # TODO: read_csv expands globs which we should not do, + # because polars will already have handled them. + path = Path(p) + with path.open() as f: + while f.readline() == "\n": + skiprows += 1 + pieces.append( + cudf.read_csv( + path, + sep=sep, + quotechar=quote, + lineterminator=eol, + names=column_names, + header=header, + usecols=usecols, + na_filter=True, + na_values=null_values, + keep_default_na=False, + skiprows=skiprows, + comment=comment, + decimal=decimal, + dtype=dtype_map, + ) ) - ) + df = DataFrame.from_cudf(cudf.concat(pieces)) elif self.typ == "parquet": - opts, cloud_opts = map(json.loads, self.options) cdf = cudf.read_parquet(self.paths, columns=with_columns) assert isinstance(cdf, cudf.DataFrame) df = DataFrame.from_cudf(cdf) diff --git a/python/cudf_polars/cudf_polars/dsl/translate.py b/python/cudf_polars/cudf_polars/dsl/translate.py index 9bd58c7f466..dec45679c75 100644 --- a/python/cudf_polars/cudf_polars/dsl/translate.py +++ b/python/cudf_polars/cudf_polars/dsl/translate.py @@ -5,6 +5,7 @@ from __future__ import annotations +import json from contextlib import AbstractContextManager, nullcontext from functools import singledispatch from typing import Any @@ -89,10 +90,16 @@ def _( node: pl_ir.Scan, visitor: NodeTraverser, schema: dict[str, plc.DataType] ) -> ir.IR: typ, *options = node.scan_type + if typ == "ndjson": + (reader_options,) = map(json.loads, options) + cloud_options = None + else: + reader_options, cloud_options = map(json.loads, options) return ir.Scan( schema, typ, - tuple(options), + reader_options, + cloud_options, node.paths, node.file_options, translate_named_expr(visitor, n=node.predicate) diff --git a/python/cudf_polars/tests/test_scan.py b/python/cudf_polars/tests/test_scan.py index c00d6a254d3..c41a94da14b 100644 --- a/python/cudf_polars/tests/test_scan.py +++ b/python/cudf_polars/tests/test_scan.py @@ -22,22 +22,22 @@ def row_index(request): @pytest.fixture( params=[ - (None, 0), + None, pytest.param( - (2, 1), marks=pytest.mark.xfail(reason="No handling of row limit in scan") + 2, marks=pytest.mark.xfail(reason="No handling of row limit in scan") ), pytest.param( - (3, 0), marks=pytest.mark.xfail(reason="No handling of row limit in scan") + 3, marks=pytest.mark.xfail(reason="No handling of row limit in scan") ), ], ids=["all-rows", "n_rows-with-skip", "n_rows-no-skip"], ) -def n_rows_skip_rows(request): +def n_rows(request): return request.param @pytest.fixture(params=["csv", "parquet"]) -def df(request, tmp_path, row_index, n_rows_skip_rows): +def df(request, tmp_path, row_index, n_rows): df = pl.DataFrame( { "a": [1, 2, 3, None], @@ -46,14 +46,12 @@ def df(request, tmp_path, row_index, n_rows_skip_rows): } ) name, offset = row_index - n_rows, skip_rows = n_rows_skip_rows if request.param == "csv": df.write_csv(tmp_path / "file.csv") return pl.scan_csv( tmp_path / "file.csv", row_index_name=name, row_index_offset=offset, - skip_rows_after_header=skip_rows, n_rows=n_rows, ) else: @@ -107,3 +105,88 @@ def test_scan_row_index_projected_out(tmp_path): q = pl.scan_parquet(tmp_path / "df.pq").with_row_index().select(pl.col("a")) assert_gpu_result_equal(q) + + +def test_scan_csv_column_renames_projection_schema(tmp_path): + with (tmp_path / "test.csv").open("w") as f: + f.write("""foo,bar,baz\n1,2\n3,4,5""") + + q = pl.scan_csv( + tmp_path / "test.csv", + with_column_names=lambda names: [f"{n}_suffix" for n in names], + schema_overrides={ + "foo_suffix": pl.String(), + "bar_suffix": pl.Int8(), + "baz_suffix": pl.UInt16(), + }, + ) + + assert_gpu_result_equal(q) + + +def test_scan_csv_skip_after_header_not_implemented(tmp_path): + with (tmp_path / "test.csv").open("w") as f: + f.write("""foo,bar,baz\n1,2,3\n3,4,5""") + + q = pl.scan_csv(tmp_path / "test.csv", skip_rows_after_header=1) + + assert_ir_translation_raises(q, NotImplementedError) + + +def test_scan_csv_null_values_per_column_not_implemented(tmp_path): + with (tmp_path / "test.csv").open("w") as f: + f.write("""foo,bar,baz\n1,2,3\n3,4,5""") + + q = pl.scan_csv(tmp_path / "test.csv", null_values={"foo": "1", "baz": "5"}) + + assert_ir_translation_raises(q, NotImplementedError) + + +def test_scan_csv_comment_str_not_implemented(tmp_path): + with (tmp_path / "test.csv").open("w") as f: + f.write("""foo,bar,baz\n// 1,2,3\n3,4,5""") + + q = pl.scan_csv(tmp_path / "test.csv", comment_prefix="// ") + + assert_ir_translation_raises(q, NotImplementedError) + + +def test_scan_csv_comment_char(tmp_path): + with (tmp_path / "test.csv").open("w") as f: + f.write("""foo,bar,baz\n# 1,2,3\n3,4,5""") + + q = pl.scan_csv(tmp_path / "test.csv", comment_prefix="#") + + assert_gpu_result_equal(q) + + +@pytest.mark.parametrize("nulls", [None, "3", ["3", "5"]]) +def test_scan_csv_null_values(tmp_path, nulls): + with (tmp_path / "test.csv").open("w") as f: + f.write("""foo,bar,baz\n1,2,3\n3,4,5\n5,,2""") + + q = pl.scan_csv(tmp_path / "test.csv", null_values=nulls) + + assert_gpu_result_equal(q) + + +def test_scan_csv_decimal_comma(tmp_path): + with (tmp_path / "test.csv").open("w") as f: + f.write("""foo|bar|baz\n1,23|2,34|3,56\n1""") + + q = pl.scan_csv(tmp_path / "test.csv", separator="|", decimal_comma=True) + + assert_gpu_result_equal(q) + + +def test_scan_csv_skip_initial_empty_rows(tmp_path): + with (tmp_path / "test.csv").open("w") as f: + f.write("""\n\n\n\nfoo|bar|baz\n1|2|3\n1""") + + q = pl.scan_csv(tmp_path / "test.csv", separator="|", skip_rows=1, has_header=False) + + assert_ir_translation_raises(q, NotImplementedError) + + q = pl.scan_csv(tmp_path / "test.csv", separator="|", skip_rows=1) + + assert_gpu_result_equal(q)