diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 8130b18333c..dd54c9202a4 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -349,10 +349,10 @@ def __init__( # TODO: polars has this implemented for parquet, # maybe we can do this too? raise NotImplementedError("slice pushdown for negative slices") - if self.typ in {"csv", "parquet"} and self.skip_rows != 0: # pragma: no cover + if self.typ in {"csv"} and self.skip_rows != 0: # pragma: no cover # This comes from slice pushdown, but that # optimization doesn't happen right now - raise NotImplementedError("skipping rows in CSV or Parquet reader") + raise NotImplementedError("skipping rows in CSV reader") if self.cloud_options is not None and any( self.cloud_options.get(k) is not None for k in ("aws", "azure", "gcp") ): @@ -514,8 +514,14 @@ def do_evaluate( reader = plc.io.parquet.ChunkedParquetReader( plc.io.SourceInfo(paths), columns=with_columns, - nrows=n_rows, - skip_rows=skip_rows, + # We handle skip_rows != 0 by reading from the + # up to n_rows + skip_rows and slicing off the + # first skip_rows entries. + # TODO: Remove this workaround once + # https://github.com/rapidsai/cudf/issues/16186 + # is fixed + nrows=n_rows + skip_rows, + skip_rows=0, chunk_read_limit=parquet_options.get( "chunk_read_limit", cls.PARQUET_DEFAULT_CHUNK_SIZE ), @@ -524,11 +530,23 @@ def do_evaluate( ), ) chk = reader.read_chunk() - tbl = chk.tbl - names = chk.column_names() + rows_left_to_skip = skip_rows + + def slice_skip(tbl: plc.Table): + nonlocal rows_left_to_skip + if rows_left_to_skip > 0: + table_rows = tbl.num_rows() + chunk_skip = min(rows_left_to_skip, table_rows) + (tbl,) = plc.copying.slice(tbl, [chunk_skip, table_rows]) + rows_left_to_skip -= chunk_skip + return tbl + + tbl = slice_skip(chk.tbl) + # TODO: Nested column names + names = chk.column_names(include_children=False) concatenated_columns = tbl.columns() while reader.has_next(): - tbl = reader.read_chunk().tbl + tbl = slice_skip(reader.read_chunk().tbl) for i in range(tbl.num_columns()): concatenated_columns[i] = plc.concatenate.concatenate( diff --git a/python/cudf_polars/tests/test_scan.py b/python/cudf_polars/tests/test_scan.py index dbc7a0a0f51..572905102d5 100644 --- a/python/cudf_polars/tests/test_scan.py +++ b/python/cudf_polars/tests/test_scan.py @@ -16,22 +16,17 @@ NO_CHUNK_ENGINE = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": False}) -@pytest.fixture(params=[True, False], ids=["chunked", "no-chunked"]) -def chunked(request): - return request.param - - @pytest.fixture( params=[(None, None), ("row-index", 0), ("index", 10)], - ids=["no-row-index", "zero-offset-row-index", "offset-row-index"], + ids=["no_row_index", "zero_offset_row_index", "offset_row_index"], ) def row_index(request): return request.param @pytest.fixture( - params=[None, 2, 3], - ids=["all-rows", "n_rows-with-skip", "n_rows-no-skip"], + params=[None, 3], + ids=["all_rows", "some_rows"], ) def n_rows(request): return request.param @@ -58,21 +53,15 @@ def columns(request, row_index): @pytest.fixture( - params=[None, pl.col("c").is_not_null()], ids=["no-mask", "c-is-not-null"] + params=[None, pl.col("c").is_not_null()], ids=["no_mask", "c_is_not_null"] ) def mask(request): return request.param @pytest.fixture( - params=[ - None, - (1, 1), - ], - ids=[ - "no-slice", - "slice-second", - ], + params=[None, (1, 1)], + ids=["no_slice", "slice_second"], ) def slice(request): # For use in testing that we handle @@ -99,22 +88,16 @@ def make_source(df, path, format): ("csv", pl.scan_csv), ("ndjson", pl.scan_ndjson), ("parquet", pl.scan_parquet), + ("chunked_parquet", pl.scan_parquet), ], ) def test_scan( - tmp_path, - df, - format, - scan_fn, - row_index, - n_rows, - columns, - mask, - slice, - chunked, - request, + tmp_path, df, format, scan_fn, row_index, n_rows, columns, mask, slice, request ): name, offset = row_index + is_chunked = format == "chunked_parquet" + if is_chunked: + format = "parquet" make_source(df, tmp_path / "file", format) request.applymarker( pytest.mark.xfail( @@ -128,25 +111,40 @@ def test_scan( row_index_offset=offset, n_rows=n_rows, ) + engine = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": is_chunked}) + if ( + is_chunked + and (columns is None or columns[0] != "a") + and ( + # When we mask with the slice, it happens to remove the + # bad row + (mask is None and slice is not None) + # When we both slice and read a subset of rows it also + # removes the bad row + or (slice is None and n_rows is not None) + ) + ): + # slice read produces wrong result for string column + request.applymarker( + pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311") + ) + # if ( + # (mask is None and n_rows is not None) + # or (n_rows is not None and mask is not None and slice is None) + # or (n_rows is None and mask is None and slice is not None) + # ): + # request.applymarker( + # pytest.mark.xfail( + # reason="https://github.com/rapidsai/cudf/issues/17311" + # ) + # ) if slice is not None: q = q.slice(*slice) - if scan_fn is pl.scan_parquet and slice[0] != 0: - # slicing a scan optimizes to a skip_rows which - # the chunked reader does not yet support - assert_ir_translation_raises(q, NotImplementedError) - else: - assert_gpu_result_equal(q) - else: - if mask is not None: - q = q.filter(mask) - if columns is not None: - q = q.select(*columns) - assert_gpu_result_equal( - q, - engine=pl.GPUEngine( - raise_on_fail=True, parquet_options={"chunked": chunked} - ), - ) + if mask is not None: + q = q.filter(mask) + if columns is not None: + q = q.select(*columns) + assert_gpu_result_equal(q, engine=engine) def test_negative_slice_pushdown_raises(tmp_path): @@ -352,14 +350,52 @@ def test_scan_parquet_only_row_index_raises(df, tmp_path): assert_ir_translation_raises(q, NotImplementedError) -@pytest.mark.parametrize("chunk_read_limit", [0, 1, 2, 4, 8, 16]) -@pytest.mark.parametrize("pass_read_limit", [0, 1, 2, 4, 8, 16]) -def test_scan_parquet_chunked(df, tmp_path, chunk_read_limit, pass_read_limit): - df = pl.concat([df] * 1000) # makes something large enough to meaningfully chunk - make_source(df, tmp_path / "file", "parquet") - q = pl.scan_parquet(tmp_path / "file") +@pytest.fixture( + scope="module", params=["no_slice", "skip_to_end", "skip_partial", "partial"] +) +def chunked_slice(request): + return request.param + + +@pytest.fixture(scope="module") +def large_df(df, tmpdir_factory, chunked_slice): + # Something big enough that we get more than a single chunk, + # empirically determined + df = pl.concat([df] * 1000) + df = pl.concat([df] * 10) + df = pl.concat([df] * 10) + path = str(tmpdir_factory.mktemp("data") / "large.pq") + make_source(df, path, "parquet") + n_rows = len(df) + q = pl.scan_parquet(path) + if chunked_slice == "no_slice": + return q + elif chunked_slice == "skip_to_end": + return q.slice(int(n_rows * 0.6), n_rows) + elif chunked_slice == "skip_partial": + return q.slice(int(n_rows * 0.6), int(n_rows * 0.2)) + else: + return q.slice(0, int(n_rows * 0.6)) + + +@pytest.mark.parametrize( + "chunk_read_limit", [0, 1, 2, 4, 8, 16], ids=lambda x: f"chunk_{x}" +) +@pytest.mark.parametrize( + "pass_read_limit", [0, 1, 2, 4, 8, 16], ids=lambda x: f"pass_{x}" +) +def test_scan_parquet_chunked( + request, chunked_slice, large_df, chunk_read_limit, pass_read_limit +): + if chunked_slice in {"skip_partial", "partial"} and ( + chunk_read_limit == 0 and pass_read_limit != 0 + ): + request.applymarker( + pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311") + ) + assert_gpu_result_equal( - q, + large_df, engine=pl.GPUEngine( raise_on_fail=True, parquet_options={