Skip to content

Commit

Permalink
Handling csv reader options
Browse files Browse the repository at this point in the history
Possibly some pieces are missing, but this is much closer to complete.
  • Loading branch information
wence- committed Jul 8, 2024
1 parent 7b69b47 commit a13836a
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 19 deletions.
104 changes: 93 additions & 11 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@

import dataclasses
import itertools
import json
import types
from functools import cache
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, ClassVar

import pyarrow as pa
Expand Down Expand Up @@ -185,8 +185,10 @@ class Scan(IR):

typ: str
"""What type of file are we reading? Parquet, CSV, etc..."""
options: tuple[Any, ...]
"""Type specific options, as json-encoded strings."""
reader_options: dict[str, Any]
"""Reader-specific options, as dictionary."""
cloud_options: dict[str, Any] | None
"""Cloud-related authentication options, currently ignored."""
paths: list[str]
"""List of paths to read from."""
file_options: Any
Expand All @@ -206,24 +208,104 @@ def __post_init__(self) -> None:
if self.file_options.n_rows is not None:
raise NotImplementedError("row limit in scan")
if self.typ not in ("csv", "parquet"):
raise NotImplementedError(f"Unhandled scan type: {self.typ}")
if self.cloud_options is not None and any(
self.cloud_options[k] is not None for k in ("aws", "azure", "gcp")
):
raise NotImplementedError(
f"Unhandled scan type: {self.typ}"
) # pragma: no cover; polars raises on the rust side for now
"Read from cloud storage"
) # pragma: no cover; no test yet
if self.typ == "csv":
if self.reader_options["skip_rows_after_header"] != 0:
raise NotImplementedError("Skipping rows after header in CSV reader")
parse_options = self.reader_options["parse_options"]
if (
null_values := parse_options["null_values"]
) is not None and "Named" in null_values:
raise NotImplementedError(
"Per column null value specification not supported for CSV reader"
)
if (
comment := parse_options["comment_prefix"]
) is not None and "Multi" in comment:
raise NotImplementedError(
"Multi-character comment prefix not supported for CSV reader"
)
if not self.reader_options["has_header"]:
# Need to do some file introspection to get the number
# of columns so that column projection works right.
raise NotImplementedError("Reading CSV without header")

def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
"""Evaluate and return a dataframe."""
options = self.file_options
with_columns = options.with_columns
row_index = options.row_index
if self.typ == "csv":
opts, cloud_opts = map(json.loads, self.options)
df = DataFrame.from_cudf(
cudf.concat(
[cudf.read_csv(p, usecols=with_columns) for p in self.paths]
dtype_map = {
name: cudf._lib.types.PYLIBCUDF_TO_SUPPORTED_NUMPY_TYPES[typ.id()]
for name, typ in self.schema.items()
}
parse_options = self.reader_options["parse_options"]
sep = chr(parse_options["separator"])
quote = chr(parse_options["quote_char"])
eol = chr(parse_options["eol_char"])
if self.reader_options["schema"] is not None:
# Reader schema provides names
column_names = list(self.reader_options["schema"]["inner"].keys())
else:
# file provides column names
column_names = None
usecols = with_columns
# TODO: support has_header=False
header = 0

# polars defaults to no null recognition
null_values = [""]
if parse_options["null_values"] is not None:
((typ, nulls),) = parse_options["null_values"].items()
if typ == "AllColumnsSingle":
# Single value
null_values.append(nulls)
else:
# List of values
null_values.extend(nulls)
if parse_options["comment_prefix"] is not None:
comment = chr(parse_options["comment_prefix"]["Single"])
else:
comment = None
decimal = "," if parse_options["decimal_comma"] else "."

# polars skips blank lines at the beginning of the file
pieces = []
for p in self.paths:
skiprows = self.reader_options["skip_rows"]
# TODO: read_csv expands globs which we should not do,
# because polars will already have handled them.
path = Path(p)
with path.open() as f:
while f.readline() == "\n":
skiprows += 1
pieces.append(
cudf.read_csv(
path,
sep=sep,
quotechar=quote,
lineterminator=eol,
names=column_names,
header=header,
usecols=usecols,
na_filter=True,
na_values=null_values,
keep_default_na=False,
skiprows=skiprows,
comment=comment,
decimal=decimal,
dtype=dtype_map,
)
)
)
df = DataFrame.from_cudf(cudf.concat(pieces))
elif self.typ == "parquet":
opts, cloud_opts = map(json.loads, self.options)
cdf = cudf.read_parquet(self.paths, columns=with_columns)
assert isinstance(cdf, cudf.DataFrame)
df = DataFrame.from_cudf(cdf)
Expand Down
9 changes: 8 additions & 1 deletion python/cudf_polars/cudf_polars/dsl/translate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from __future__ import annotations

import json
from contextlib import AbstractContextManager, nullcontext
from functools import singledispatch
from typing import Any
Expand Down Expand Up @@ -88,10 +89,16 @@ def _(
node: pl_ir.Scan, visitor: NodeTraverser, schema: dict[str, plc.DataType]
) -> ir.IR:
typ, *options = node.scan_type
if typ == "ndjson":
(reader_options,) = map(json.loads, options)
cloud_options = None
else:
reader_options, cloud_options = map(json.loads, options)
return ir.Scan(
schema,
typ,
tuple(options),
reader_options,
cloud_options,
node.paths,
node.file_options,
translate_named_expr(visitor, n=node.predicate)
Expand Down
97 changes: 90 additions & 7 deletions python/cudf_polars/tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@ def row_index(request):

@pytest.fixture(
params=[
(None, 0),
None,
pytest.param(
(2, 1), marks=pytest.mark.xfail(reason="No handling of row limit in scan")
2, marks=pytest.mark.xfail(reason="No handling of row limit in scan")
),
pytest.param(
(3, 0), marks=pytest.mark.xfail(reason="No handling of row limit in scan")
3, marks=pytest.mark.xfail(reason="No handling of row limit in scan")
),
],
ids=["all-rows", "n_rows-with-skip", "n_rows-no-skip"],
)
def n_rows_skip_rows(request):
def n_rows(request):
return request.param


@pytest.fixture(params=["csv", "parquet"])
def df(request, tmp_path, row_index, n_rows_skip_rows):
def df(request, tmp_path, row_index, n_rows):
df = pl.DataFrame(
{
"a": [1, 2, 3, None],
Expand All @@ -46,14 +46,12 @@ def df(request, tmp_path, row_index, n_rows_skip_rows):
}
)
name, offset = row_index
n_rows, skip_rows = n_rows_skip_rows
if request.param == "csv":
df.write_csv(tmp_path / "file.csv")
return pl.scan_csv(
tmp_path / "file.csv",
row_index_name=name,
row_index_offset=offset,
skip_rows_after_header=skip_rows,
n_rows=n_rows,
)
else:
Expand Down Expand Up @@ -107,3 +105,88 @@ def test_scan_row_index_projected_out(tmp_path):
q = pl.scan_parquet(tmp_path / "df.pq").with_row_index().select(pl.col("a"))

assert_gpu_result_equal(q)


def test_scan_csv_column_renames_projection_schema(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2\n3,4,5""")

q = pl.scan_csv(
tmp_path / "test.csv",
with_column_names=lambda names: [f"{n}_suffix" for n in names],
schema_overrides={
"foo_suffix": pl.String(),
"bar_suffix": pl.Int8(),
"baz_suffix": pl.UInt16(),
},
)

assert_gpu_result_equal(q)


def test_scan_csv_skip_after_header_not_implemented(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2,3\n3,4,5""")

q = pl.scan_csv(tmp_path / "test.csv", skip_rows_after_header=1)

assert_ir_translation_raises(q, NotImplementedError)


def test_scan_csv_null_values_per_column_not_implemented(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2,3\n3,4,5""")

q = pl.scan_csv(tmp_path / "test.csv", null_values={"foo": "1", "baz": "5"})

assert_ir_translation_raises(q, NotImplementedError)


def test_scan_csv_comment_str_not_implemented(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n// 1,2,3\n3,4,5""")

q = pl.scan_csv(tmp_path / "test.csv", comment_prefix="// ")

assert_ir_translation_raises(q, NotImplementedError)


def test_scan_csv_comment_char(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n# 1,2,3\n3,4,5""")

q = pl.scan_csv(tmp_path / "test.csv", comment_prefix="#")

assert_gpu_result_equal(q)


@pytest.mark.parametrize("nulls", [None, "3", ["3", "5"]])
def test_scan_csv_null_values(tmp_path, nulls):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2,3\n3,4,5\n5,,2""")

q = pl.scan_csv(tmp_path / "test.csv", null_values=nulls)

assert_gpu_result_equal(q)


def test_scan_csv_decimal_comma(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo|bar|baz\n1,23|2,34|3,56\n1""")

q = pl.scan_csv(tmp_path / "test.csv", separator="|", decimal_comma=True)

assert_gpu_result_equal(q)


def test_scan_csv_skip_initial_empty_rows(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""\n\n\n\nfoo|bar|baz\n1|2|3\n1""")

q = pl.scan_csv(tmp_path / "test.csv", separator="|", skip_rows=1, has_header=False)

assert_ir_translation_raises(q, NotImplementedError)

q = pl.scan_csv(tmp_path / "test.csv", separator="|", skip_rows=1)

assert_gpu_result_equal(q)

0 comments on commit a13836a

Please sign in to comment.