Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement read_csv in cudf-polars using pylibcudf #16307

Merged
merged 9 commits into from
Jul 19, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 27 additions & 24 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,6 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:
with_columns = options.with_columns
row_index = options.row_index
if self.typ == "csv":
dtype_map = {
name: cudf._lib.types.PYLIBCUDF_TO_SUPPORTED_NUMPY_TYPES[typ.id()]
for name, typ in self.schema.items()
}
parse_options = self.reader_options["parse_options"]
sep = chr(parse_options["separator"])
quote = chr(parse_options["quote_char"])
Expand Down Expand Up @@ -278,33 +274,40 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame:

# polars skips blank lines at the beginning of the file
pieces = []
colnames = None
lithomas1 marked this conversation as resolved.
Show resolved Hide resolved
for p in self.paths:
skiprows = self.reader_options["skip_rows"]
# TODO: read_csv expands globs which we should not do,
# because polars will already have handled them.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a test for this case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see it. What we want to check is that (with test1.csv, test2.csv, test*.csv)

pl.scan_csv("test*.csv", glob=False)

reads the single test*.csv file correctly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I completely misunderstood there.

I added a glob=False test case in the latest commit.

path = Path(p)
with path.open() as f:
while f.readline() == "\n":
skiprows += 1
pieces.append(
cudf.read_csv(
path,
sep=sep,
quotechar=quote,
lineterminator=eol,
names=column_names,
header=header,
usecols=usecols,
na_filter=True,
na_values=null_values,
keep_default_na=False,
skiprows=skiprows,
comment=comment,
decimal=decimal,
dtype=dtype_map,
)
tbl_w_meta = plc.io.csv.read_csv(
plc.io.SourceInfo([path]),
delimiter=sep,
quotechar=quote,
lineterminator=eol,
col_names=column_names,
header=header,
usecols=usecols,
na_filter=True,
na_values=null_values,
keep_default_na=False,
skiprows=skiprows,
comment=comment,
decimal=decimal,
dtypes=self.schema,
)
pieces.append(tbl_w_meta)
tbls, colnames = zip(
lithomas1 marked this conversation as resolved.
Show resolved Hide resolved
*(
(piece.tbl, piece.column_names(include_children=False))
for piece in pieces
)
df = DataFrame.from_cudf(cudf.concat(pieces))
)
df = DataFrame.from_table(
plc.concatenate.concatenate(list(tbls)),
colnames[0],
)
elif self.typ == "parquet":
cdf = cudf.read_parquet(self.paths, columns=with_columns)
assert isinstance(cdf, cudf.DataFrame)
Expand Down
27 changes: 27 additions & 0 deletions python/cudf_polars/tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

import os

import pytest

import polars as pl

from cudf_polars.callback import execute_with_cudf
from cudf_polars.testing.asserts import (
assert_gpu_result_equal,
assert_ir_translation_raises,
Expand Down Expand Up @@ -129,6 +132,30 @@ def test_scan_csv_column_renames_projection_schema(tmp_path):
assert_gpu_result_equal(q)


@pytest.mark.parametrize("filename", [["test1.csv", "test2.csv"], "test*.csv"])
def test_scan_csv_multi(tmp_path, filename):
with (tmp_path / "test1.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2\n3,4,5""")
with (tmp_path / "test2.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2\n3,4,5""")
os.chdir(tmp_path)
q = pl.scan_csv(filename)

assert_gpu_result_equal(q)


def test_scan_csv_multi_differing_colnames(tmp_path):
with (tmp_path / "test1.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2\n3,4,5""")
with (tmp_path / "test2.csv").open("w") as f:
f.write("""abc,def,ghi\n1,2\n3,4,5""")
q = pl.scan_csv(
[tmp_path / "test1.csv", tmp_path / "test2.csv"],
)
with pytest.raises(pl.exceptions.ComputeError):
q.collect(post_opt_callback=execute_with_cudf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this testing?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is testing cases where the column names are different between CSV files in the multiple CSV file case.

It looks like polars checks this for us (but it's good to make sure that's the case with a test).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, we should get this just "explain"ing the plan, the only way this can raise with the execute_with_cudf callback on the polars side is if it happens during plan conversion:

Suggested change
with pytest.raises(pl.exceptions.ComputeError):
q.collect(post_opt_callback=execute_with_cudf)
with pytest.raises(pl.exceptions.ComputeError):
q.explain()



def test_scan_csv_skip_after_header_not_implemented(tmp_path):
with (tmp_path / "test.csv").open("w") as f:
f.write("""foo,bar,baz\n1,2,3\n3,4,5""")
Expand Down
Loading