Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handler csv reader options in cudf-polars #16211

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/cudf/cudf/_lib/csv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ def read_csv(
col_name = df._data.names[index]
df._data[col_name] = df._data[col_name].astype(col_dtype)

if names is not None and isinstance(names[0], (int)):
if names is not None and len(names) and isinstance(names[0], (int)):
df.columns = [int(x) for x in df._data]

# Set index if the index_col parameter is passed
Expand Down
4 changes: 2 additions & 2 deletions python/cudf_polars/cudf_polars/dsl/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
if TYPE_CHECKING:
from collections.abc import Mapping, Sequence

import polars.polars as plrs
import polars as pl
import polars.type_aliases as pl_types

from cudf_polars.containers import DataFrame
Expand Down Expand Up @@ -377,7 +377,7 @@ class LiteralColumn(Expr):
value: pa.Array[Any, Any]
children: tuple[()]

def __init__(self, dtype: plc.DataType, value: plrs.PySeries) -> None:
def __init__(self, dtype: plc.DataType, value: pl.Series) -> None:
super().__init__(dtype)
data = value.to_arrow()
self.value = data.cast(dtypes.downcast_arrow_lists(data.type))
Expand Down
104 changes: 93 additions & 11 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

import dataclasses
import itertools
import json
import types
from functools import cache
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, ClassVar

import pyarrow as pa
Expand Down Expand Up @@ -185,8 +185,10 @@ class Scan(IR):

typ: str
"""What type of file are we reading? Parquet, CSV, etc..."""
options: tuple[Any, ...]
"""Type specific options, as json-encoded strings."""
reader_options: dict[str, Any]
"""Reader-specific options, as dictionary."""
cloud_options: dict[str, Any] | None
"""Cloud-related authentication options, currently ignored."""
paths: list[str]
"""List of paths to read from."""
file_options: Any
Expand All @@ -206,24 +208,104 @@ def __post_init__(self) -> None:
if self.file_options.n_rows is not None:
raise NotImplementedError("row limit in scan")
if self.typ not in ("csv", "parquet"):
raise NotImplementedError(f"Unhandled scan type: {self.typ}")
if self.cloud_options is not None and any(
self.cloud_options[k] is not None for k in ("aws", "azure", "gcp")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirming that these strings are guaranteed to be in the dict at this point? If this option isn't supported at all would it be worthwhile just to only accept None here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

csv reader has non-None cloud_options but the values are None, so I can't punt for cloud_options is not None.

):
raise NotImplementedError(
f"Unhandled scan type: {self.typ}"
) # pragma: no cover; polars raises on the rust side for now
"Read from cloud storage"
) # pragma: no cover; no test yet
if self.typ == "csv":
if self.reader_options["skip_rows_after_header"] != 0:
raise NotImplementedError("Skipping rows after header in CSV reader")
parse_options = self.reader_options["parse_options"]
if (
null_values := parse_options["null_values"]
) is not None and "Named" in null_values:
raise NotImplementedError(
"Per column null value specification not supported for CSV reader"
)
if (
comment := parse_options["comment_prefix"]
) is not None and "Multi" in comment:
raise NotImplementedError(
"Multi-character comment prefix not supported for CSV reader"
)
if not self.reader_options["has_header"]:
# Need to do some file introspection to get the number
# of columns so that column projection works right.
raise NotImplementedError("Reading CSV without header")

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
"""Evaluate and return a dataframe."""
options = self.file_options
with_columns = options.with_columns
row_index = options.row_index
if self.typ == "csv":
opts, cloud_opts = map(json.loads, self.options)
df = DataFrame.from_cudf(
cudf.concat(
[cudf.read_csv(p, usecols=with_columns) for p in self.paths]
dtype_map = {
name: cudf._lib.types.PYLIBCUDF_TO_SUPPORTED_NUMPY_TYPES[typ.id()]
for name, typ in self.schema.items()
}
parse_options = self.reader_options["parse_options"]
sep = chr(parse_options["separator"])
quote = chr(parse_options["quote_char"])
eol = chr(parse_options["eol_char"])
if self.reader_options["schema"] is not None:
# Reader schema provides names
column_names = list(self.reader_options["schema"]["inner"].keys())
else:
# file provides column names
column_names = None
usecols = with_columns
# TODO: support has_header=False
header = 0

# polars defaults to no null recognition
null_values = [""]
if parse_options["null_values"] is not None:
((typ, nulls),) = parse_options["null_values"].items()
if typ == "AllColumnsSingle":
# Single value
null_values.append(nulls)
else:
# List of values
null_values.extend(nulls)
if parse_options["comment_prefix"] is not None:
comment = chr(parse_options["comment_prefix"]["Single"])
else:
comment = None
decimal = "," if parse_options["decimal_comma"] else "."

# polars skips blank lines at the beginning of the file
pieces = []
for p in self.paths:
skiprows = self.reader_options["skip_rows"]
# TODO: read_csv expands globs which we should not do,
# because polars will already have handled them.
path = Path(p)
with path.open() as f:
while f.readline() == "\n":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this something that read_csv should eventually implement?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At some point (maybe soon?), cudf-polars is not going to use cudf.read_csv, but rather the libcudf layer directly, so we'll probably need to do this here anyway.

skiprows += 1
pieces.append(
cudf.read_csv(
path,
sep=sep,
quotechar=quote,
lineterminator=eol,
names=column_names,
header=header,
usecols=usecols,
na_filter=True,
na_values=null_values,
keep_default_na=False,
skiprows=skiprows,
comment=comment,
decimal=decimal,
dtype=dtype_map,
)
)
)
df = DataFrame.from_cudf(cudf.concat(pieces))
elif self.typ == "parquet":
opts, cloud_opts = map(json.loads, self.options)
cdf = cudf.read_parquet(self.paths, columns=with_columns)
assert isinstance(cdf, cudf.DataFrame)
df = DataFrame.from_cudf(cdf)
Expand Down
12 changes: 10 additions & 2 deletions python/cudf_polars/cudf_polars/dsl/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@

from __future__ import annotations

import json
from contextlib import AbstractContextManager, nullcontext
from functools import singledispatch
from typing import Any

import pyarrow as pa
from typing_extensions import assert_never

import polars as pl
import polars.polars as plrs
from polars.polars import _expr_nodes as pl_expr, _ir_nodes as pl_ir

Expand Down Expand Up @@ -88,10 +90,16 @@ def _(
node: pl_ir.Scan, visitor: NodeTraverser, schema: dict[str, plc.DataType]
) -> ir.IR:
typ, *options = node.scan_type
if typ == "ndjson":
(reader_options,) = map(json.loads, options)
cloud_options = None
else:
reader_options, cloud_options = map(json.loads, options)
return ir.Scan(
schema,
typ,
tuple(options),
reader_options,
cloud_options,
node.paths,
node.file_options,
translate_named_expr(visitor, n=node.predicate)
Expand Down Expand Up @@ -402,7 +410,7 @@ def _(node: pl_expr.Window, visitor: NodeTraverser, dtype: plc.DataType) -> expr
@_translate_expr.register
def _(node: pl_expr.Literal, visitor: NodeTraverser, dtype: plc.DataType) -> expr.Expr:
if isinstance(node.value, plrs.PySeries):
return expr.LiteralColumn(dtype, node.value)
return expr.LiteralColumn(dtype, pl.Series._from_pyseries(node.value))
value = pa.scalar(node.value, type=plc.interop.to_arrow(dtype))
return expr.Literal(dtype, value)

Expand Down
107 changes: 100 additions & 7 deletions python/cudf_polars/tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@ def row_index(request):

@pytest.fixture(
params=[
(None, 0),
None,
pytest.param(
(2, 1), marks=pytest.mark.xfail(reason="No handling of row limit in scan")
2, marks=pytest.mark.xfail(reason="No handling of row limit in scan")
),
pytest.param(
(3, 0), marks=pytest.mark.xfail(reason="No handling of row limit in scan")
3, marks=pytest.mark.xfail(reason="No handling of row limit in scan")
),
],
ids=["all-rows", "n_rows-with-skip", "n_rows-no-skip"],
)
def n_rows_skip_rows(request):
def n_rows(request):
return request.param


@pytest.fixture(params=["csv", "parquet"])
def df(request, tmp_path, row_index, n_rows_skip_rows):
def df(request, tmp_path, row_index, n_rows):
df = pl.DataFrame(
{
"a": [1, 2, 3, None],
Expand All @@ -46,14 +46,12 @@ def df(request, tmp_path, row_index, n_rows_skip_rows):
}
)
name, offset = row_index
n_rows, skip_rows = n_rows_skip_rows
if request.param == "csv":
df.write_csv(tmp_path / "file.csv")
return pl.scan_csv(
tmp_path / "file.csv",
row_index_name=name,
row_index_offset=offset,
skip_rows_after_header=skip_rows,
n_rows=n_rows,
)
else:
Expand Down Expand Up @@ -97,3 +95,98 @@ def test_scan_unsupported_raises(tmp_path):
df.write_ndjson(tmp_path / "df.json")
q = pl.scan_ndjson(tmp_path / "df.json")
assert_ir_translation_raises(q, NotImplementedError)


def test_scan_row_index_projected_out(tmp_path):
df = pl.DataFrame({"a": [1, 2, 3]})

df.write_parquet(tmp_path / "df.pq")

q = pl.scan_parquet(tmp_path / "df.pq").with_row_index().select(pl.col("a"))

assert_gpu_result_equal(q)


def test_scan_csv_column_renames_projection_schema(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2\n3,4,5""")

q = pl.scan_csv(
tmp_path / "test.csv",
with_column_names=lambda names: [f"{n}_suffix" for n in names],
schema_overrides={
"foo_suffix": pl.String(),
"bar_suffix": pl.Int8(),
"baz_suffix": pl.UInt16(),
},
)

assert_gpu_result_equal(q)


def test_scan_csv_skip_after_header_not_implemented(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2,3\n3,4,5""")

q = pl.scan_csv(tmp_path / "test.csv", skip_rows_after_header=1)

assert_ir_translation_raises(q, NotImplementedError)


def test_scan_csv_null_values_per_column_not_implemented(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2,3\n3,4,5""")

q = pl.scan_csv(tmp_path / "test.csv", null_values={"foo": "1", "baz": "5"})

assert_ir_translation_raises(q, NotImplementedError)


def test_scan_csv_comment_str_not_implemented(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n// 1,2,3\n3,4,5""")

q = pl.scan_csv(tmp_path / "test.csv", comment_prefix="// ")

assert_ir_translation_raises(q, NotImplementedError)


def test_scan_csv_comment_char(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n# 1,2,3\n3,4,5""")

q = pl.scan_csv(tmp_path / "test.csv", comment_prefix="#")

assert_gpu_result_equal(q)


@pytest.mark.parametrize("nulls", [None, "3", ["3", "5"]])
def test_scan_csv_null_values(tmp_path, nulls):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2,3\n3,4,5\n5,,2""")

q = pl.scan_csv(tmp_path / "test.csv", null_values=nulls)

assert_gpu_result_equal(q)


def test_scan_csv_decimal_comma(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo|bar|baz\n1,23|2,34|3,56\n1""")

q = pl.scan_csv(tmp_path / "test.csv", separator="|", decimal_comma=True)

assert_gpu_result_equal(q)


def test_scan_csv_skip_initial_empty_rows(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""\n\n\n\nfoo|bar|baz\n1|2|3\n1""")

q = pl.scan_csv(tmp_path / "test.csv", separator="|", skip_rows=1, has_header=False)

assert_ir_translation_raises(q, NotImplementedError)

q = pl.scan_csv(tmp_path / "test.csv", separator="|", skip_rows=1)

assert_gpu_result_equal(q)
Loading