From b693e79b1813276700f70c2cb251d6fef71851a1 Mon Sep 17 00:00:00 2001 From: Lawrence Mitchell Date: Tue, 9 Jul 2024 13:22:35 +0100 Subject: [PATCH] Handler csv reader options in cudf-polars (#16211) Previously we were just relying on the default cudf read_csv options which doesn't do the right thing if the user has configured things. Now that polars passes through the information to us, we can handle things properly, and raise for unsupported cases. While here, update to new polars release and adapt tests to bug fixes that have been made upstream. Authors: - Lawrence Mitchell (https://github.com/wence-) Approvers: - James Lamb (https://github.com/jameslamb) - Matthew Roeschke (https://github.com/mroeschke) URL: https://github.com/rapidsai/cudf/pull/16211 --- python/cudf/cudf/_lib/csv.pyx | 2 +- python/cudf_polars/cudf_polars/dsl/expr.py | 4 +- python/cudf_polars/cudf_polars/dsl/ir.py | 104 +++++++++++++++-- .../cudf_polars/cudf_polars/dsl/translate.py | 12 +- python/cudf_polars/tests/test_scan.py | 107 ++++++++++++++++-- 5 files changed, 206 insertions(+), 23 deletions(-) diff --git a/python/cudf/cudf/_lib/csv.pyx b/python/cudf/cudf/_lib/csv.pyx index c706351a683..9fecff5f5f6 100644 --- a/python/cudf/cudf/_lib/csv.pyx +++ b/python/cudf/cudf/_lib/csv.pyx @@ -450,7 +450,7 @@ def read_csv( col_name = df._data.names[index] df._data[col_name] = df._data[col_name].astype(col_dtype) - if names is not None and isinstance(names[0], (int)): + if names is not None and len(names) and isinstance(names[0], (int)): df.columns = [int(x) for x in df._data] # Set index if the index_col parameter is passed diff --git a/python/cudf_polars/cudf_polars/dsl/expr.py b/python/cudf_polars/cudf_polars/dsl/expr.py index 93cb9db7cbd..f83d9e82d30 100644 --- a/python/cudf_polars/cudf_polars/dsl/expr.py +++ b/python/cudf_polars/cudf_polars/dsl/expr.py @@ -32,7 +32,7 @@ if TYPE_CHECKING: from collections.abc import Mapping, Sequence - import polars.polars as plrs + import polars as pl import polars.type_aliases as pl_types from cudf_polars.containers import DataFrame @@ -377,7 +377,7 @@ class LiteralColumn(Expr): value: pa.Array[Any, Any] children: tuple[()] - def __init__(self, dtype: plc.DataType, value: plrs.PySeries) -> None: + def __init__(self, dtype: plc.DataType, value: pl.Series) -> None: super().__init__(dtype) data = value.to_arrow() self.value = data.cast(dtypes.downcast_arrow_lists(data.type)) diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 6b552642e88..b32fa9c273e 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -15,9 +15,9 @@ import dataclasses import itertools -import json import types from functools import cache +from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, ClassVar import pyarrow as pa @@ -185,8 +185,10 @@ class Scan(IR): typ: str """What type of file are we reading? Parquet, CSV, etc...""" - options: tuple[Any, ...] - """Type specific options, as json-encoded strings.""" + reader_options: dict[str, Any] + """Reader-specific options, as dictionary.""" + cloud_options: dict[str, Any] | None + """Cloud-related authentication options, currently ignored.""" paths: list[str] """List of paths to read from.""" file_options: Any @@ -206,9 +208,33 @@ def __post_init__(self) -> None: if self.file_options.n_rows is not None: raise NotImplementedError("row limit in scan") if self.typ not in ("csv", "parquet"): + raise NotImplementedError(f"Unhandled scan type: {self.typ}") + if self.cloud_options is not None and any( + self.cloud_options[k] is not None for k in ("aws", "azure", "gcp") + ): raise NotImplementedError( - f"Unhandled scan type: {self.typ}" - ) # pragma: no cover; polars raises on the rust side for now + "Read from cloud storage" + ) # pragma: no cover; no test yet + if self.typ == "csv": + if self.reader_options["skip_rows_after_header"] != 0: + raise NotImplementedError("Skipping rows after header in CSV reader") + parse_options = self.reader_options["parse_options"] + if ( + null_values := parse_options["null_values"] + ) is not None and "Named" in null_values: + raise NotImplementedError( + "Per column null value specification not supported for CSV reader" + ) + if ( + comment := parse_options["comment_prefix"] + ) is not None and "Multi" in comment: + raise NotImplementedError( + "Multi-character comment prefix not supported for CSV reader" + ) + if not self.reader_options["has_header"]: + # Need to do some file introspection to get the number + # of columns so that column projection works right. + raise NotImplementedError("Reading CSV without header") def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" @@ -216,14 +242,70 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: with_columns = options.with_columns row_index = options.row_index if self.typ == "csv": - opts, cloud_opts = map(json.loads, self.options) - df = DataFrame.from_cudf( - cudf.concat( - [cudf.read_csv(p, usecols=with_columns) for p in self.paths] + dtype_map = { + name: cudf._lib.types.PYLIBCUDF_TO_SUPPORTED_NUMPY_TYPES[typ.id()] + for name, typ in self.schema.items() + } + parse_options = self.reader_options["parse_options"] + sep = chr(parse_options["separator"]) + quote = chr(parse_options["quote_char"]) + eol = chr(parse_options["eol_char"]) + if self.reader_options["schema"] is not None: + # Reader schema provides names + column_names = list(self.reader_options["schema"]["inner"].keys()) + else: + # file provides column names + column_names = None + usecols = with_columns + # TODO: support has_header=False + header = 0 + + # polars defaults to no null recognition + null_values = [""] + if parse_options["null_values"] is not None: + ((typ, nulls),) = parse_options["null_values"].items() + if typ == "AllColumnsSingle": + # Single value + null_values.append(nulls) + else: + # List of values + null_values.extend(nulls) + if parse_options["comment_prefix"] is not None: + comment = chr(parse_options["comment_prefix"]["Single"]) + else: + comment = None + decimal = "," if parse_options["decimal_comma"] else "." + + # polars skips blank lines at the beginning of the file + pieces = [] + for p in self.paths: + skiprows = self.reader_options["skip_rows"] + # TODO: read_csv expands globs which we should not do, + # because polars will already have handled them. + path = Path(p) + with path.open() as f: + while f.readline() == "\n": + skiprows += 1 + pieces.append( + cudf.read_csv( + path, + sep=sep, + quotechar=quote, + lineterminator=eol, + names=column_names, + header=header, + usecols=usecols, + na_filter=True, + na_values=null_values, + keep_default_na=False, + skiprows=skiprows, + comment=comment, + decimal=decimal, + dtype=dtype_map, + ) ) - ) + df = DataFrame.from_cudf(cudf.concat(pieces)) elif self.typ == "parquet": - opts, cloud_opts = map(json.loads, self.options) cdf = cudf.read_parquet(self.paths, columns=with_columns) assert isinstance(cdf, cudf.DataFrame) df = DataFrame.from_cudf(cdf) diff --git a/python/cudf_polars/cudf_polars/dsl/translate.py b/python/cudf_polars/cudf_polars/dsl/translate.py index 5a1e682abe7..dec45679c75 100644 --- a/python/cudf_polars/cudf_polars/dsl/translate.py +++ b/python/cudf_polars/cudf_polars/dsl/translate.py @@ -5,6 +5,7 @@ from __future__ import annotations +import json from contextlib import AbstractContextManager, nullcontext from functools import singledispatch from typing import Any @@ -12,6 +13,7 @@ import pyarrow as pa from typing_extensions import assert_never +import polars as pl import polars.polars as plrs from polars.polars import _expr_nodes as pl_expr, _ir_nodes as pl_ir @@ -88,10 +90,16 @@ def _( node: pl_ir.Scan, visitor: NodeTraverser, schema: dict[str, plc.DataType] ) -> ir.IR: typ, *options = node.scan_type + if typ == "ndjson": + (reader_options,) = map(json.loads, options) + cloud_options = None + else: + reader_options, cloud_options = map(json.loads, options) return ir.Scan( schema, typ, - tuple(options), + reader_options, + cloud_options, node.paths, node.file_options, translate_named_expr(visitor, n=node.predicate) @@ -402,7 +410,7 @@ def _(node: pl_expr.Window, visitor: NodeTraverser, dtype: plc.DataType) -> expr @_translate_expr.register def _(node: pl_expr.Literal, visitor: NodeTraverser, dtype: plc.DataType) -> expr.Expr: if isinstance(node.value, plrs.PySeries): - return expr.LiteralColumn(dtype, node.value) + return expr.LiteralColumn(dtype, pl.Series._from_pyseries(node.value)) value = pa.scalar(node.value, type=plc.interop.to_arrow(dtype)) return expr.Literal(dtype, value) diff --git a/python/cudf_polars/tests/test_scan.py b/python/cudf_polars/tests/test_scan.py index f129cc7ca32..c41a94da14b 100644 --- a/python/cudf_polars/tests/test_scan.py +++ b/python/cudf_polars/tests/test_scan.py @@ -22,22 +22,22 @@ def row_index(request): @pytest.fixture( params=[ - (None, 0), + None, pytest.param( - (2, 1), marks=pytest.mark.xfail(reason="No handling of row limit in scan") + 2, marks=pytest.mark.xfail(reason="No handling of row limit in scan") ), pytest.param( - (3, 0), marks=pytest.mark.xfail(reason="No handling of row limit in scan") + 3, marks=pytest.mark.xfail(reason="No handling of row limit in scan") ), ], ids=["all-rows", "n_rows-with-skip", "n_rows-no-skip"], ) -def n_rows_skip_rows(request): +def n_rows(request): return request.param @pytest.fixture(params=["csv", "parquet"]) -def df(request, tmp_path, row_index, n_rows_skip_rows): +def df(request, tmp_path, row_index, n_rows): df = pl.DataFrame( { "a": [1, 2, 3, None], @@ -46,14 +46,12 @@ def df(request, tmp_path, row_index, n_rows_skip_rows): } ) name, offset = row_index - n_rows, skip_rows = n_rows_skip_rows if request.param == "csv": df.write_csv(tmp_path / "file.csv") return pl.scan_csv( tmp_path / "file.csv", row_index_name=name, row_index_offset=offset, - skip_rows_after_header=skip_rows, n_rows=n_rows, ) else: @@ -97,3 +95,98 @@ def test_scan_unsupported_raises(tmp_path): df.write_ndjson(tmp_path / "df.json") q = pl.scan_ndjson(tmp_path / "df.json") assert_ir_translation_raises(q, NotImplementedError) + + +def test_scan_row_index_projected_out(tmp_path): + df = pl.DataFrame({"a": [1, 2, 3]}) + + df.write_parquet(tmp_path / "df.pq") + + q = pl.scan_parquet(tmp_path / "df.pq").with_row_index().select(pl.col("a")) + + assert_gpu_result_equal(q) + + +def test_scan_csv_column_renames_projection_schema(tmp_path): + with (tmp_path / "test.csv").open("w") as f: + f.write("""foo,bar,baz\n1,2\n3,4,5""") + + q = pl.scan_csv( + tmp_path / "test.csv", + with_column_names=lambda names: [f"{n}_suffix" for n in names], + schema_overrides={ + "foo_suffix": pl.String(), + "bar_suffix": pl.Int8(), + "baz_suffix": pl.UInt16(), + }, + ) + + assert_gpu_result_equal(q) + + +def test_scan_csv_skip_after_header_not_implemented(tmp_path): + with (tmp_path / "test.csv").open("w") as f: + f.write("""foo,bar,baz\n1,2,3\n3,4,5""") + + q = pl.scan_csv(tmp_path / "test.csv", skip_rows_after_header=1) + + assert_ir_translation_raises(q, NotImplementedError) + + +def test_scan_csv_null_values_per_column_not_implemented(tmp_path): + with (tmp_path / "test.csv").open("w") as f: + f.write("""foo,bar,baz\n1,2,3\n3,4,5""") + + q = pl.scan_csv(tmp_path / "test.csv", null_values={"foo": "1", "baz": "5"}) + + assert_ir_translation_raises(q, NotImplementedError) + + +def test_scan_csv_comment_str_not_implemented(tmp_path): + with (tmp_path / "test.csv").open("w") as f: + f.write("""foo,bar,baz\n// 1,2,3\n3,4,5""") + + q = pl.scan_csv(tmp_path / "test.csv", comment_prefix="// ") + + assert_ir_translation_raises(q, NotImplementedError) + + +def test_scan_csv_comment_char(tmp_path): + with (tmp_path / "test.csv").open("w") as f: + f.write("""foo,bar,baz\n# 1,2,3\n3,4,5""") + + q = pl.scan_csv(tmp_path / "test.csv", comment_prefix="#") + + assert_gpu_result_equal(q) + + +@pytest.mark.parametrize("nulls", [None, "3", ["3", "5"]]) +def test_scan_csv_null_values(tmp_path, nulls): + with (tmp_path / "test.csv").open("w") as f: + f.write("""foo,bar,baz\n1,2,3\n3,4,5\n5,,2""") + + q = pl.scan_csv(tmp_path / "test.csv", null_values=nulls) + + assert_gpu_result_equal(q) + + +def test_scan_csv_decimal_comma(tmp_path): + with (tmp_path / "test.csv").open("w") as f: + f.write("""foo|bar|baz\n1,23|2,34|3,56\n1""") + + q = pl.scan_csv(tmp_path / "test.csv", separator="|", decimal_comma=True) + + assert_gpu_result_equal(q) + + +def test_scan_csv_skip_initial_empty_rows(tmp_path): + with (tmp_path / "test.csv").open("w") as f: + f.write("""\n\n\n\nfoo|bar|baz\n1|2|3\n1""") + + q = pl.scan_csv(tmp_path / "test.csv", separator="|", skip_rows=1, has_header=False) + + assert_ir_translation_raises(q, NotImplementedError) + + q = pl.scan_csv(tmp_path / "test.csv", separator="|", skip_rows=1) + + assert_gpu_result_equal(q)