Skip to content

Commit

Permalink
Fix reading Parquet string cols when nrows and input_pass_limit >…
Browse files Browse the repository at this point in the history
… 0 (#17321)

This PR fixes reading string columns in Parquet using chunked parquet reader when `nrows` and `input_pass_limit` are > 0.

Closes #17311

Authors:
  - Muhammad Haseeb (https://github.com/mhaseeb123)

Approvers:
  - Vukasin Milovanovic (https://github.com/vuule)
  - Ed Seidl (https://github.com/etseidl)
  - Lawrence Mitchell (https://github.com/wence-)
  - Bradley Dice (https://github.com/bdice)
  - https://github.com/nvdbaranec
  - GALI PREM SAGAR (https://github.com/galipremsagar)

URL: #17321
  • Loading branch information
mhaseeb123 authored Nov 18, 2024
1 parent d514517 commit 43f2f68
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 33 deletions.
19 changes: 15 additions & 4 deletions cpp/src/io/parquet/page_decode.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,21 @@ inline __device__ bool is_bounds_page(page_state_s* const s,
size_t const begin = start_row;
size_t const end = start_row + num_rows;

// for non-nested schemas, rows cannot span pages, so use a more restrictive test
return has_repetition
? ((page_begin <= begin && page_end >= begin) || (page_begin <= end && page_end >= end))
: ((page_begin < begin && page_end > begin) || (page_begin < end && page_end > end));
// Test for list schemas.
auto const is_bounds_page_lists =
((page_begin <= begin and page_end >= begin) or (page_begin <= end and page_end >= end));

// For non-list schemas, rows cannot span pages, so use a more restrictive test. Make sure to
// relax the test for `page_end` if we adjusted the `num_rows` for the last page to compensate
// for list row size estimates in `generate_list_column_row_count_estimates()` when chunked
// read mode.
auto const test_page_end_nonlists =
s->page.is_num_rows_adjusted ? page_end >= end : page_end > end;

auto const is_bounds_page_nonlists =
(page_begin < begin and page_end > begin) or (page_begin < end and test_page_end_nonlists);

return has_repetition ? is_bounds_page_lists : is_bounds_page_nonlists;
}

/**
Expand Down
1 change: 1 addition & 0 deletions cpp/src/io/parquet/page_hdr.cu
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,7 @@ void __launch_bounds__(128) gpuDecodePageHeaders(ColumnChunkDesc* chunks,
// definition levels
bs->page.chunk_row = 0;
bs->page.num_rows = 0;
bs->page.is_num_rows_adjusted = false;
bs->page.skipped_values = -1;
bs->page.skipped_leaf_values = 0;
bs->page.str_bytes = 0;
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/io/parquet/parquet_gpu.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,10 @@ struct PageInfo {
// - In the case of a nested schema, you have to decode the repetition and definition
// levels to extract actual column values
int32_t num_input_values;
int32_t chunk_row; // starting row of this page relative to the start of the chunk
int32_t num_rows; // number of rows in this page
int32_t chunk_row; // starting row of this page relative to the start of the chunk
int32_t num_rows; // number of rows in this page
bool is_num_rows_adjusted; // Flag to indicate if the number of rows of this page have been
// adjusted to compensate for the list row size estimates.
// the next four are calculated in gpuComputePageStringSizes
int32_t num_nulls; // number of null values (V2 header), but recalculated for string cols
int32_t num_valids; // number of non-null values, taking into account skip_rows/num_rows
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,10 @@ struct set_final_row_count {
if (i < pages.size() - 1 && (pages[i + 1].chunk_idx == page.chunk_idx)) { return; }
size_t const page_start_row = chunk.start_row + page.chunk_row;
size_t const chunk_last_row = chunk.start_row + chunk.num_rows;
page.num_rows = chunk_last_row - page_start_row;
// Mark `is_num_rows_adjusted` to signal string decoders that the `num_rows` of this page has
// been adjusted.
page.is_num_rows_adjusted = page.num_rows != (chunk_last_row - page_start_row);
page.num_rows = chunk_last_row - page_start_row;
}
};

Expand Down
106 changes: 104 additions & 2 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -3771,10 +3771,10 @@ def test_parquet_chunked_reader(
chunk_read_limit, pass_read_limit, use_pandas_metadata, row_groups
):
df = pd.DataFrame(
{"a": [1, 2, 3, 4] * 1000000, "b": ["av", "qw", "hi", "xyz"] * 1000000}
{"a": [1, 2, 3, None] * 10000, "b": ["av", "qw", None, "xyz"] * 10000}
)
buffer = BytesIO()
df.to_parquet(buffer)
df.to_parquet(buffer, row_group_size=10000)
actual = read_parquet_chunked(
[buffer],
chunk_read_limit=chunk_read_limit,
Expand All @@ -3788,6 +3788,108 @@ def test_parquet_chunked_reader(
assert_eq(expected, actual)


@pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000])
@pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000])
@pytest.mark.parametrize("num_rows", [997, 2997, None])
def test_parquet_chunked_reader_structs(
chunk_read_limit,
pass_read_limit,
num_rows,
):
data = [
{
"a": "g",
"b": {
"b_a": 10,
"b_b": {"b_b_b": None, "b_b_a": 2},
},
"c": None,
},
{"a": None, "b": {"b_a": None, "b_b": None}, "c": [15, 16]},
{"a": "j", "b": None, "c": [8, 10]},
{"a": None, "b": {"b_a": None, "b_b": None}, "c": None},
None,
{
"a": None,
"b": {"b_a": None, "b_b": {"b_b_b": 1}},
"c": [18, 19],
},
{"a": None, "b": None, "c": None},
] * 1000

pa_struct = pa.Table.from_pydict({"struct": data})
df = cudf.DataFrame.from_arrow(pa_struct)
buffer = BytesIO()
df.to_parquet(buffer)

# Number of rows to read
nrows = num_rows if num_rows is not None else len(df)

actual = read_parquet_chunked(
[buffer],
chunk_read_limit=chunk_read_limit,
pass_read_limit=pass_read_limit,
nrows=nrows,
)
expected = cudf.read_parquet(
buffer,
nrows=nrows,
)
assert_eq(expected, actual)


@pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000])
@pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000])
@pytest.mark.parametrize("num_rows", [4997, 9997, None])
@pytest.mark.parametrize(
"str_encoding",
[
"PLAIN",
"DELTA_BYTE_ARRAY",
"DELTA_LENGTH_BYTE_ARRAY",
],
)
def test_parquet_chunked_reader_string_decoders(
chunk_read_limit,
pass_read_limit,
num_rows,
str_encoding,
):
df = pd.DataFrame(
{
"i64": [1, 2, 3, None] * 10000,
"str": ["av", "qw", "asd", "xyz"] * 10000,
"list": list(
[["ad", "cd"], ["asd", "fd"], None, ["asd", None]] * 10000
),
}
)
buffer = BytesIO()
# Write 4 Parquet row groups with string column encoded
df.to_parquet(
buffer,
row_group_size=10000,
use_dictionary=False,
column_encoding={"str": str_encoding},
)

# Number of rows to read
nrows = num_rows if num_rows is not None else len(df)

# Check with num_rows specified
actual = read_parquet_chunked(
[buffer],
chunk_read_limit=chunk_read_limit,
pass_read_limit=pass_read_limit,
nrows=nrows,
)
expected = cudf.read_parquet(
buffer,
nrows=nrows,
)
assert_eq(expected, actual)


@pytest.mark.parametrize(
"nrows,skip_rows",
[
Expand Down
1 change: 0 additions & 1 deletion python/cudf_polars/cudf_polars/testing/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def pytest_configure(config: pytest.Config) -> None:
"tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read[False]": "Incomplete handling of projected reads with mismatching schemas, cudf#16394",
"tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read_dtype_mismatch[False]": "Different exception raised, but correctly raises an exception",
"tests/unit/io/test_lazy_parquet.py::test_parquet_unaligned_schema_read_missing_cols_from_first[False]": "Different exception raised, but correctly raises an exception",
"tests/unit/io/test_lazy_parquet.py::test_glob_n_rows": "https://github.com/rapidsai/cudf/issues/17311",
"tests/unit/io/test_parquet.py::test_read_parquet_only_loads_selected_columns_15098": "Memory usage won't be correct due to GPU",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection0-False-none]": "Mismatching column read cudf#16394",
"tests/unit/io/test_parquet.py::test_allow_missing_columns[projection1-False-none]": "Mismatching column read cudf#16394",
Expand Down
24 changes: 1 addition & 23 deletions python/cudf_polars/tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,7 @@ def test_scan(
n_rows=n_rows,
)
engine = pl.GPUEngine(raise_on_fail=True, parquet_options={"chunked": is_chunked})
if (
is_chunked
and (columns is None or columns[0] != "a")
and (
# When we mask with the slice, it happens to remove the
# bad row
(mask is None and slice is not None)
# When we both slice and read a subset of rows it also
# removes the bad row
or (slice is None and n_rows is not None)
)
):
# slice read produces wrong result for string column
request.applymarker(
pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311")
)

if slice is not None:
q = q.slice(*slice)
if mask is not None:
Expand Down Expand Up @@ -377,13 +362,6 @@ def large_df(df, tmpdir_factory, chunked_slice):
def test_scan_parquet_chunked(
request, chunked_slice, large_df, chunk_read_limit, pass_read_limit
):
if chunked_slice in {"skip_partial", "partial"} and (
chunk_read_limit == 0 and pass_read_limit != 0
):
request.applymarker(
pytest.mark.xfail(reason="https://github.com/rapidsai/cudf/issues/17311")
)

assert_gpu_result_equal(
large_df,
engine=pl.GPUEngine(
Expand Down

0 comments on commit 43f2f68

Please sign in to comment.