Skip to content

Commit

Permalink
Implement workaround for rapidsai#16186
Browse files Browse the repository at this point in the history
Rather than falling back to CPU for chunked read + skip_rows, just
read chunks and skip manually after the fact.

Simplify the parquet scan tests a bit and add better coverage of both
chunked and unchunked behaviour.
  • Loading branch information
wence- committed Nov 14, 2024
1 parent c23afd9 commit df341ea
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 59 deletions.
32 changes: 25 additions & 7 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -349,10 +349,10 @@ def __init__(
# TODO: polars has this implemented for parquet,
# maybe we can do this too?
raise NotImplementedError("slice pushdown for negative slices")
if self.typ in {"csv", "parquet"} and self.skip_rows != 0: # pragma: no cover
if self.typ in {"csv"} and self.skip_rows != 0: # pragma: no cover
# This comes from slice pushdown, but that
# optimization doesn't happen right now
raise NotImplementedError("skipping rows in CSV or Parquet reader")
raise NotImplementedError("skipping rows in CSV reader")
if self.cloud_options is not None and any(
self.cloud_options.get(k) is not None for k in ("aws", "azure", "gcp")
):
Expand Down Expand Up @@ -514,8 +514,14 @@ def do_evaluate(
reader = plc.io.parquet.ChunkedParquetReader(
plc.io.SourceInfo(paths),
columns=with_columns,
nrows=n_rows,
skip_rows=skip_rows,
# We handle skip_rows != 0 by reading from the
# up to n_rows + skip_rows and slicing off the
# first skip_rows entries.
# TODO: Remove this workaround once
# https://github.com/rapidsai/cudf/issues/16186
# is fixed
nrows=n_rows + skip_rows,
skip_rows=0,
chunk_read_limit=parquet_options.get(
"chunk_read_limit", cls.PARQUET_DEFAULT_CHUNK_SIZE
),
Expand All @@ -524,11 +530,23 @@ def do_evaluate(
),
)
chk = reader.read_chunk()
tbl = chk.tbl
names = chk.column_names()
rows_left_to_skip = skip_rows

def slice_skip(tbl: plc.Table):
nonlocal rows_left_to_skip
if rows_left_to_skip > 0:
table_rows = tbl.num_rows()
chunk_skip = min(rows_left_to_skip, table_rows)
(tbl,) = plc.copying.slice(tbl, [chunk_skip, table_rows])
rows_left_to_skip -= chunk_skip
return tbl

tbl = slice_skip(chk.tbl)
# TODO: Nested column names
names = chk.column_names(include_children=False)
concatenated_columns = tbl.columns()
while reader.has_next():
tbl = reader.read_chunk().tbl
tbl = slice_skip(reader.read_chunk().tbl)

for i in range(tbl.num_columns()):
concatenated_columns[i] = plc.concatenate.concatenate(
Expand Down
140 changes: 88 additions & 52 deletions python/cudf_polars/tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,17 @@
NO_CHUNK_ENGINE = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": False})


@pytest.fixture(params=[True, False], ids=["chunked", "no-chunked"])
def chunked(request):
return request.param


@pytest.fixture(
params=[(None, None), ("row-index", 0), ("index", 10)],
ids=["no-row-index", "zero-offset-row-index", "offset-row-index"],
ids=["no_row_index", "zero_offset_row_index", "offset_row_index"],
)
def row_index(request):
return request.param


@pytest.fixture(
params=[None, 2, 3],
ids=["all-rows", "n_rows-with-skip", "n_rows-no-skip"],
params=[None, 3],
ids=["all_rows", "some_rows"],
)
def n_rows(request):
return request.param
Expand All @@ -58,21 +53,15 @@ def columns(request, row_index):


@pytest.fixture(
params=[None, pl.col("c").is_not_null()], ids=["no-mask", "c-is-not-null"]
params=[None, pl.col("c").is_not_null()], ids=["no_mask", "c_is_not_null"]
)
def mask(request):
return request.param


@pytest.fixture(
params=[
None,
(1, 1),
],
ids=[
"no-slice",
"slice-second",
],
params=[None, (1, 1)],
ids=["no_slice", "slice_second"],
)
def slice(request):
# For use in testing that we handle
Expand All @@ -99,22 +88,16 @@ def make_source(df, path, format):
("csv", pl.scan_csv),
("ndjson", pl.scan_ndjson),
("parquet", pl.scan_parquet),
("chunked_parquet", pl.scan_parquet),
],
)
def test_scan(
tmp_path,
df,
format,
scan_fn,
row_index,
n_rows,
columns,
mask,
slice,
chunked,
request,
tmp_path, df, format, scan_fn, row_index, n_rows, columns, mask, slice, request
):
name, offset = row_index
is_chunked = format == "chunked_parquet"
if is_chunked:
format = "parquet"
make_source(df, tmp_path / "file", format)
request.applymarker(
pytest.mark.xfail(
Expand All @@ -128,25 +111,40 @@ def test_scan(
row_index_offset=offset,
n_rows=n_rows,
)
engine = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": is_chunked})
if (
is_chunked
and (columns is None or columns[0] != "a")
and (
# When we mask with the slice, it happens to remove the
# bad row
(mask is None and slice is not None)
# When we both slice and read a subset of rows it also
# removes the bad row
or (slice is None and n_rows is not None)
)
):
# slice read produces wrong result for string column
request.applymarker(
pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311")
)
# if (
# (mask is None and n_rows is not None)
# or (n_rows is not None and mask is not None and slice is None)
# or (n_rows is None and mask is None and slice is not None)
# ):
# request.applymarker(
# pytest.mark.xfail(
# reason="https://github.com/rapidsai/cudf/issues/17311"
# )
# )
if slice is not None:
q = q.slice(*slice)
if scan_fn is pl.scan_parquet and slice[0] != 0:
# slicing a scan optimizes to a skip_rows which
# the chunked reader does not yet support
assert_ir_translation_raises(q, NotImplementedError)
else:
assert_gpu_result_equal(q)
else:
if mask is not None:
q = q.filter(mask)
if columns is not None:
q = q.select(*columns)
assert_gpu_result_equal(
q,
engine=pl.GPUEngine(
raise_on_fail=True, parquet_options={"chunked": chunked}
),
)
if mask is not None:
q = q.filter(mask)
if columns is not None:
q = q.select(*columns)
assert_gpu_result_equal(q, engine=engine)


def test_negative_slice_pushdown_raises(tmp_path):
Expand Down Expand Up @@ -352,14 +350,52 @@ def test_scan_parquet_only_row_index_raises(df, tmp_path):
assert_ir_translation_raises(q, NotImplementedError)


@pytest.mark.parametrize("chunk_read_limit", [0, 1, 2, 4, 8, 16])
@pytest.mark.parametrize("pass_read_limit", [0, 1, 2, 4, 8, 16])
def test_scan_parquet_chunked(df, tmp_path, chunk_read_limit, pass_read_limit):
df = pl.concat([df] * 1000) # makes something large enough to meaningfully chunk
make_source(df, tmp_path / "file", "parquet")
q = pl.scan_parquet(tmp_path / "file")
@pytest.fixture(
scope="module", params=["no_slice", "skip_to_end", "skip_partial", "partial"]
)
def chunked_slice(request):
return request.param


@pytest.fixture(scope="module")
def large_df(df, tmpdir_factory, chunked_slice):
# Something big enough that we get more than a single chunk,
# empirically determined
df = pl.concat([df] * 1000)
df = pl.concat([df] * 10)
df = pl.concat([df] * 10)
path = str(tmpdir_factory.mktemp("data") / "large.pq")
make_source(df, path, "parquet")
n_rows = len(df)
q = pl.scan_parquet(path)
if chunked_slice == "no_slice":
return q
elif chunked_slice == "skip_to_end":
return q.slice(int(n_rows * 0.6), n_rows)
elif chunked_slice == "skip_partial":
return q.slice(int(n_rows * 0.6), int(n_rows * 0.2))
else:
return q.slice(0, int(n_rows * 0.6))


@pytest.mark.parametrize(
"chunk_read_limit", [0, 1, 2, 4, 8, 16], ids=lambda x: f"chunk_{x}"
)
@pytest.mark.parametrize(
"pass_read_limit", [0, 1, 2, 4, 8, 16], ids=lambda x: f"pass_{x}"
)
def test_scan_parquet_chunked(
request, chunked_slice, large_df, chunk_read_limit, pass_read_limit
):
if chunked_slice in {"skip_partial", "partial"} and (
chunk_read_limit == 0 and pass_read_limit != 0
):
request.applymarker(
pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311")
)

assert_gpu_result_equal(
q,
large_df,
engine=pl.GPUEngine(
raise_on_fail=True,
parquet_options={
Expand Down

0 comments on commit df341ea

Please sign in to comment.