Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Parquet Reader options classes to pylibcudf #17464

Merged
merged 14 commits into from
Dec 6, 2024
1 change: 1 addition & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ class parquet_reader_options_builder {
*
* @param val Boolean value whether to read matching projected and filter columns from mismatched
* Parquet sources.
*
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the reviewer: I couldn't find this function in the docs, so I thought it may not be rendering correctly without this new line. I confirmed that you can see the function in the docs with it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, thanks for catching this! We usually don’t include a blank line between @param and @return, but it might be time to change that.

* @return this for chaining.
*/
parquet_reader_options_builder& allow_mismatched_pq_schemas(bool val)
Expand Down
58 changes: 38 additions & 20 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ cdef object _process_metadata(object df,
else:
start = range_index_meta["start"] + skip_rows
stop = range_index_meta["stop"]
if nrows != -1:
if nrows > -1:
stop = start + nrows
idx = cudf.RangeIndex(
start=start,
Expand Down Expand Up @@ -270,16 +270,27 @@ def read_parquet_chunked(
# (see read_parquet)
allow_range_index = columns is not None and len(columns) != 0

options = (
plc.io.parquet.ParquetReaderOptions.builder(
plc.io.SourceInfo(filepaths_or_buffers)
)
.use_pandas_metadata(use_pandas_metadata)
.allow_mismatched_pq_schemas(allow_mismatched_pq_schemas)
.build()
)
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
if row_groups is not None:
options.set_row_groups(row_groups)
if nrows > -1:
options.set_num_rows(nrows)
if skip_rows != 0:
options.set_skip_rows(skip_rows)
if columns is not None:
options.set_columns(columns)

reader = ChunkedParquetReader(
plc.io.SourceInfo(filepaths_or_buffers),
columns,
row_groups,
use_pandas_metadata=use_pandas_metadata,
options,
chunk_read_limit=chunk_read_limit,
pass_read_limit=pass_read_limit,
skip_rows=skip_rows,
nrows=nrows,
allow_mismatched_pq_schemas=allow_mismatched_pq_schemas,
)

tbl_w_meta = reader.read_chunk()
Expand Down Expand Up @@ -339,19 +350,26 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
if columns is not None and len(columns) == 0 or filters:
allow_range_index = False

# Read Parquet

tbl_w_meta = plc.io.parquet.read_parquet(
plc.io.SourceInfo(filepaths_or_buffers),
columns,
row_groups,
filters,
convert_strings_to_categories = False,
use_pandas_metadata = use_pandas_metadata,
skip_rows = skip_rows,
nrows = nrows,
allow_mismatched_pq_schemas=allow_mismatched_pq_schemas,
options = (
plc.io.parquet.ParquetReaderOptions.builder(
plc.io.SourceInfo(filepaths_or_buffers)
)
.use_pandas_metadata(use_pandas_metadata)
.allow_mismatched_pq_schemas(allow_mismatched_pq_schemas)
.build()
)
if row_groups is not None:
options.set_row_groups(row_groups)
if nrows > -1:
options.set_num_rows(nrows)
if skip_rows != 0:
options.set_skip_rows(skip_rows)
if columns is not None:
options.set_columns(columns)
if filters is not None:
options.set_filter(filters)

tbl_w_meta = plc.io.parquet.read_parquet(options)

df = cudf.DataFrame._from_data(
*data_from_pylibcudf_io(tbl_w_meta)
Expand Down
44 changes: 27 additions & 17 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,17 +517,22 @@ def do_evaluate(
elif typ == "parquet":
parquet_options = config_options.get("parquet_options", {})
if parquet_options.get("chunked", True):
options = plc.io.parquet.ParquetReaderOptions.builder(
plc.io.SourceInfo(paths)
).build()
# We handle skip_rows != 0 by reading from the
# up to n_rows + skip_rows and slicing off the
# first skip_rows entries.
# TODO: Remove this workaround once
# https://github.com/rapidsai/cudf/issues/16186
# is fixed
nrows = n_rows + skip_rows
if nrows > -1:
options.set_num_rows(nrows)
if with_columns is not None:
options.set_columns(with_columns)
reader = plc.io.parquet.ChunkedParquetReader(
plc.io.SourceInfo(paths),
columns=with_columns,
# We handle skip_rows != 0 by reading from the
# up to n_rows + skip_rows and slicing off the
# first skip_rows entries.
# TODO: Remove this workaround once
# https://github.com/rapidsai/cudf/issues/16186
# is fixed
nrows=n_rows + skip_rows,
skip_rows=0,
options,
chunk_read_limit=parquet_options.get(
"chunk_read_limit", cls.PARQUET_DEFAULT_CHUNK_SIZE
),
Expand Down Expand Up @@ -573,13 +578,18 @@ def slice_skip(tbl: plc.Table):
if predicate is not None and row_index is None:
# Can't apply filters during read if we have a row index.
filters = to_parquet_filter(predicate.value)
tbl_w_meta = plc.io.parquet.read_parquet(
plc.io.SourceInfo(paths),
columns=with_columns,
filters=filters,
nrows=n_rows,
skip_rows=skip_rows,
)
options = plc.io.parquet.ParquetReaderOptions.builder(
plc.io.SourceInfo(paths)
).build()
if n_rows != -1:
options.set_num_rows(n_rows)
if skip_rows != 0:
options.set_skip_rows(skip_rows)
if with_columns is not None:
options.set_columns(with_columns)
if filters is not None:
options.set_filter(filters)
tbl_w_meta = plc.io.parquet.read_parquet(options)
df = DataFrame.from_table(
tbl_w_meta.tbl,
# TODO: consider nested column names?
Expand Down
36 changes: 22 additions & 14 deletions python/pylibcudf/pylibcudf/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,41 @@ from pylibcudf.libcudf.io.parquet cimport (
chunked_parquet_reader as cpp_chunked_parquet_reader,
parquet_writer_options,
parquet_writer_options_builder,
parquet_reader_options,
parquet_reader_options_builder,
)
from pylibcudf.libcudf.types cimport size_type
from pylibcudf.table cimport Table
from pylibcudf.types cimport DataType


cdef class ParquetReaderOptions:
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
cdef parquet_reader_options c_obj
cdef SourceInfo source
cpdef void set_row_groups(self, list row_groups)
cpdef void set_num_rows(self, size_type nrows)
cpdef void set_skip_rows(self, int64_t skip_rows)
cpdef void set_columns(self, list col_names)
cpdef void set_filter(self, Expression filter)

cdef class ParquetReaderOptionsBuilder:
cdef parquet_reader_options_builder c_obj
cdef SourceInfo source
cpdef ParquetReaderOptionsBuilder convert_strings_to_categories(self, bool val)
cpdef ParquetReaderOptionsBuilder use_pandas_metadata(self, bool val)
cpdef ParquetReaderOptionsBuilder allow_mismatched_pq_schemas(self, bool val)
cpdef ParquetReaderOptionsBuilder use_arrow_schema(self, bool val)
cpdef build(self)


cdef class ChunkedParquetReader:
cdef unique_ptr[cpp_chunked_parquet_reader] reader

cpdef bool has_next(self)
cpdef TableWithMetadata read_chunk(self)


cpdef read_parquet(
SourceInfo source_info,
list columns = *,
list row_groups = *,
Expression filters = *,
bool convert_strings_to_categories = *,
bool use_pandas_metadata = *,
int64_t skip_rows = *,
size_type nrows = *,
bool allow_mismatched_pq_schemas = *,
# disabled see comment in parquet.pyx for more
# ReaderColumnSchema reader_column_schema = *,
# DataType timestamp_type = *
)
cpdef read_parquet(ParquetReaderOptions options)

cdef class ParquetWriterOptions:
cdef parquet_writer_options c_obj
Expand Down
21 changes: 20 additions & 1 deletion python/pylibcudf/pylibcudf/io/parquet.pyi
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from collections.abc import Mapping
from typing import Self

from typing_extensions import Self

from pylibcudf.expressions import Expression
from pylibcudf.io.types import (
Expand All @@ -16,6 +17,24 @@ from pylibcudf.io.types import (
)
from pylibcudf.table import Table

class ParquetReaderOptions:
def __init__(self): ...
def set_row_groups(self, row_groups: list[list[int]]): ...
def set_num_rows(self, nrows: int): ...
def set_skip_rows(self, skip_rows: int): ...
def set_columns(self, col_names: list[str]): ...
def set_filter(self, filter: Expression): ...
@staticmethod
def builder(source: SourceInfo) -> ParquetReaderOptionsBuilder: ...

class ParquetReaderOptionsBuilder:
def __init__(self): ...
def convert_strings_to_categories(self, val: bool) -> Self: ...
def use_pandas_metadata(self, val: bool) -> Self: ...
def allow_mismatched_pq_schemas(self, val: bool) -> Self: ...
def use_arrow_schema(self, val: bool) -> Self: ...
def build(self) -> ParquetReaderOptions: ...

class ChunkedParquetReader:
def __init__(
self,
Expand Down
Loading
Loading