Skip to content

Commit

Permalink
Handler csv reader options in cudf-polars (#16211)
Browse files Browse the repository at this point in the history
Previously we were just relying on the default cudf read_csv options which doesn't do the right thing if the user has configured things.

Now that polars passes through the information to us, we can handle things properly, and raise for unsupported cases.

While here, update to new polars release and adapt tests to bug fixes that have been made upstream.

Authors:
  - Lawrence Mitchell (https://github.com/wence-)

Approvers:
  - James Lamb (https://github.com/jameslamb)
  - Matthew Roeschke (https://github.com/mroeschke)

URL: #16211
  • Loading branch information
wence- authored Jul 9, 2024
1 parent 65e4e99 commit b693e79
Show file tree
Hide file tree
Showing 5 changed files with 206 additions and 23 deletions.
2 changes: 1 addition & 1 deletion python/cudf/cudf/_lib/csv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ def read_csv(
col_name = df._data.names[index]
df._data[col_name] = df._data[col_name].astype(col_dtype)

if names is not None and isinstance(names[0], (int)):
if names is not None and len(names) and isinstance(names[0], (int)):
df.columns = [int(x) for x in df._data]

# Set index if the index_col parameter is passed
Expand Down
4 changes: 2 additions & 2 deletions python/cudf_polars/cudf_polars/dsl/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
if TYPE_CHECKING:
from collections.abc import Mapping, Sequence

import polars.polars as plrs
import polars as pl
import polars.type_aliases as pl_types

from cudf_polars.containers import DataFrame
Expand Down Expand Up @@ -377,7 +377,7 @@ class LiteralColumn(Expr):
value: pa.Array[Any, Any]
children: tuple[()]

def __init__(self, dtype: plc.DataType, value: plrs.PySeries) -> None:
def __init__(self, dtype: plc.DataType, value: pl.Series) -> None:
super().__init__(dtype)
data = value.to_arrow()
self.value = data.cast(dtypes.downcast_arrow_lists(data.type))
Expand Down
104 changes: 93 additions & 11 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

import dataclasses
import itertools
import json
import types
from functools import cache
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, ClassVar

import pyarrow as pa
Expand Down Expand Up @@ -185,8 +185,10 @@ class Scan(IR):

typ: str
"""What type of file are we reading? Parquet, CSV, etc..."""
options: tuple[Any, ...]
"""Type specific options, as json-encoded strings."""
reader_options: dict[str, Any]
"""Reader-specific options, as dictionary."""
cloud_options: dict[str, Any] | None
"""Cloud-related authentication options, currently ignored."""
paths: list[str]
"""List of paths to read from."""
file_options: Any
Expand All @@ -206,24 +208,104 @@ def __post_init__(self) -> None:
if self.file_options.n_rows is not None:
raise NotImplementedError("row limit in scan")
if self.typ not in ("csv", "parquet"):
raise NotImplementedError(f"Unhandled scan type: {self.typ}")
if self.cloud_options is not None and any(
self.cloud_options[k] is not None for k in ("aws", "azure", "gcp")
):
raise NotImplementedError(
f"Unhandled scan type: {self.typ}"
) # pragma: no cover; polars raises on the rust side for now
"Read from cloud storage"
) # pragma: no cover; no test yet
if self.typ == "csv":
if self.reader_options["skip_rows_after_header"] != 0:
raise NotImplementedError("Skipping rows after header in CSV reader")
parse_options = self.reader_options["parse_options"]
if (
null_values := parse_options["null_values"]
) is not None and "Named" in null_values:
raise NotImplementedError(
"Per column null value specification not supported for CSV reader"
)
if (
comment := parse_options["comment_prefix"]
) is not None and "Multi" in comment:
raise NotImplementedError(
"Multi-character comment prefix not supported for CSV reader"
)
if not self.reader_options["has_header"]:
# Need to do some file introspection to get the number
# of columns so that column projection works right.
raise NotImplementedError("Reading CSV without header")

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
"""Evaluate and return a dataframe."""
options = self.file_options
with_columns = options.with_columns
row_index = options.row_index
if self.typ == "csv":
opts, cloud_opts = map(json.loads, self.options)
df = DataFrame.from_cudf(
cudf.concat(
[cudf.read_csv(p, usecols=with_columns) for p in self.paths]
dtype_map = {
name: cudf._lib.types.PYLIBCUDF_TO_SUPPORTED_NUMPY_TYPES[typ.id()]
for name, typ in self.schema.items()
}
parse_options = self.reader_options["parse_options"]
sep = chr(parse_options["separator"])
quote = chr(parse_options["quote_char"])
eol = chr(parse_options["eol_char"])
if self.reader_options["schema"] is not None:
# Reader schema provides names
column_names = list(self.reader_options["schema"]["inner"].keys())
else:
# file provides column names
column_names = None
usecols = with_columns
# TODO: support has_header=False
header = 0

# polars defaults to no null recognition
null_values = [""]
if parse_options["null_values"] is not None:
((typ, nulls),) = parse_options["null_values"].items()
if typ == "AllColumnsSingle":
# Single value
null_values.append(nulls)
else:
# List of values
null_values.extend(nulls)
if parse_options["comment_prefix"] is not None:
comment = chr(parse_options["comment_prefix"]["Single"])
else:
comment = None
decimal = "," if parse_options["decimal_comma"] else "."

# polars skips blank lines at the beginning of the file
pieces = []
for p in self.paths:
skiprows = self.reader_options["skip_rows"]
# TODO: read_csv expands globs which we should not do,
# because polars will already have handled them.
path = Path(p)
with path.open() as f:
while f.readline() == "\n":
skiprows += 1
pieces.append(
cudf.read_csv(
path,
sep=sep,
quotechar=quote,
lineterminator=eol,
names=column_names,
header=header,
usecols=usecols,
na_filter=True,
na_values=null_values,
keep_default_na=False,
skiprows=skiprows,
comment=comment,
decimal=decimal,
dtype=dtype_map,
)
)
)
df = DataFrame.from_cudf(cudf.concat(pieces))
elif self.typ == "parquet":
opts, cloud_opts = map(json.loads, self.options)
cdf = cudf.read_parquet(self.paths, columns=with_columns)
assert isinstance(cdf, cudf.DataFrame)
df = DataFrame.from_cudf(cdf)
Expand Down
12 changes: 10 additions & 2 deletions python/cudf_polars/cudf_polars/dsl/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

from __future__ import annotations

import json
from contextlib import AbstractContextManager, nullcontext
from functools import singledispatch
from typing import Any

import pyarrow as pa
from typing_extensions import assert_never

import polars as pl
import polars.polars as plrs
from polars.polars import _expr_nodes as pl_expr, _ir_nodes as pl_ir

Expand Down Expand Up @@ -88,10 +90,16 @@ def _(
node: pl_ir.Scan, visitor: NodeTraverser, schema: dict[str, plc.DataType]
) -> ir.IR:
typ, *options = node.scan_type
if typ == "ndjson":
(reader_options,) = map(json.loads, options)
cloud_options = None
else:
reader_options, cloud_options = map(json.loads, options)
return ir.Scan(
schema,
typ,
tuple(options),
reader_options,
cloud_options,
node.paths,
node.file_options,
translate_named_expr(visitor, n=node.predicate)
Expand Down Expand Up @@ -402,7 +410,7 @@ def _(node: pl_expr.Window, visitor: NodeTraverser, dtype: plc.DataType) -> expr
@_translate_expr.register
def _(node: pl_expr.Literal, visitor: NodeTraverser, dtype: plc.DataType) -> expr.Expr:
if isinstance(node.value, plrs.PySeries):
return expr.LiteralColumn(dtype, node.value)
return expr.LiteralColumn(dtype, pl.Series._from_pyseries(node.value))
value = pa.scalar(node.value, type=plc.interop.to_arrow(dtype))
return expr.Literal(dtype, value)

Expand Down
107 changes: 100 additions & 7 deletions python/cudf_polars/tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@ def row_index(request):

@pytest.fixture(
params=[
(None, 0),
None,
pytest.param(
(2, 1), marks=pytest.mark.xfail(reason="No handling of row limit in scan")
2, marks=pytest.mark.xfail(reason="No handling of row limit in scan")
),
pytest.param(
(3, 0), marks=pytest.mark.xfail(reason="No handling of row limit in scan")
3, marks=pytest.mark.xfail(reason="No handling of row limit in scan")
),
],
ids=["all-rows", "n_rows-with-skip", "n_rows-no-skip"],
)
def n_rows_skip_rows(request):
def n_rows(request):
return request.param


@pytest.fixture(params=["csv", "parquet"])
def df(request, tmp_path, row_index, n_rows_skip_rows):
def df(request, tmp_path, row_index, n_rows):
df = pl.DataFrame(
{
"a": [1, 2, 3, None],
Expand All @@ -46,14 +46,12 @@ def df(request, tmp_path, row_index, n_rows_skip_rows):
}
)
name, offset = row_index
n_rows, skip_rows = n_rows_skip_rows
if request.param == "csv":
df.write_csv(tmp_path / "file.csv")
return pl.scan_csv(
tmp_path / "file.csv",
row_index_name=name,
row_index_offset=offset,
skip_rows_after_header=skip_rows,
n_rows=n_rows,
)
else:
Expand Down Expand Up @@ -97,3 +95,98 @@ def test_scan_unsupported_raises(tmp_path):
df.write_ndjson(tmp_path / "df.json")
q = pl.scan_ndjson(tmp_path / "df.json")
assert_ir_translation_raises(q, NotImplementedError)


def test_scan_row_index_projected_out(tmp_path):
df = pl.DataFrame({"a": [1, 2, 3]})

df.write_parquet(tmp_path / "df.pq")

q = pl.scan_parquet(tmp_path / "df.pq").with_row_index().select(pl.col("a"))

assert_gpu_result_equal(q)


def test_scan_csv_column_renames_projection_schema(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2\n3,4,5""")

q = pl.scan_csv(
tmp_path / "test.csv",
with_column_names=lambda names: [f"{n}_suffix" for n in names],
schema_overrides={
"foo_suffix": pl.String(),
"bar_suffix": pl.Int8(),
"baz_suffix": pl.UInt16(),
},
)

assert_gpu_result_equal(q)


def test_scan_csv_skip_after_header_not_implemented(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2,3\n3,4,5""")

q = pl.scan_csv(tmp_path / "test.csv", skip_rows_after_header=1)

assert_ir_translation_raises(q, NotImplementedError)


def test_scan_csv_null_values_per_column_not_implemented(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2,3\n3,4,5""")

q = pl.scan_csv(tmp_path / "test.csv", null_values={"foo": "1", "baz": "5"})

assert_ir_translation_raises(q, NotImplementedError)


def test_scan_csv_comment_str_not_implemented(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n// 1,2,3\n3,4,5""")

q = pl.scan_csv(tmp_path / "test.csv", comment_prefix="// ")

assert_ir_translation_raises(q, NotImplementedError)


def test_scan_csv_comment_char(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n# 1,2,3\n3,4,5""")

q = pl.scan_csv(tmp_path / "test.csv", comment_prefix="#")

assert_gpu_result_equal(q)


@pytest.mark.parametrize("nulls", [None, "3", ["3", "5"]])
def test_scan_csv_null_values(tmp_path, nulls):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2,3\n3,4,5\n5,,2""")

q = pl.scan_csv(tmp_path / "test.csv", null_values=nulls)

assert_gpu_result_equal(q)


def test_scan_csv_decimal_comma(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo|bar|baz\n1,23|2,34|3,56\n1""")

q = pl.scan_csv(tmp_path / "test.csv", separator="|", decimal_comma=True)

assert_gpu_result_equal(q)


def test_scan_csv_skip_initial_empty_rows(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""\n\n\n\nfoo|bar|baz\n1|2|3\n1""")

q = pl.scan_csv(tmp_path / "test.csv", separator="|", skip_rows=1, has_header=False)

assert_ir_translation_raises(q, NotImplementedError)

q = pl.scan_csv(tmp_path / "test.csv", separator="|", skip_rows=1)

assert_gpu_result_equal(q)

0 comments on commit b693e79

Please sign in to comment.