Skip to content

Commit

Permalink
Add Parquet Reader options classes to pylibcudf (#17464)
Browse files Browse the repository at this point in the history
Follow up of #17263, this PR adds the parquet reader options classes to pylibcudf and plumbs the changes through cudf python.

Authors:
  - Matthew Murray (https://github.com/Matt711)

Approvers:
  - Matthew Roeschke (https://github.com/mroeschke)
  - Yunsong Wang (https://github.com/PointKernel)
  - Nghia Truong (https://github.com/ttnghia)
  - MithunR (https://github.com/mythrocks)

URL: #17464
  • Loading branch information
Matt711 authored Dec 6, 2024
1 parent b6f7e6e commit cbeefd8
Show file tree
Hide file tree
Showing 7 changed files with 333 additions and 194 deletions.
1 change: 1 addition & 0 deletions cpp/include/cudf/io/parquet.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ class parquet_reader_options_builder {
*
* @param val Boolean value whether to read matching projected and filter columns from mismatched
* Parquet sources.
*
* @return this for chaining.
*/
parquet_reader_options_builder& allow_mismatched_pq_schemas(bool val)
Expand Down
58 changes: 38 additions & 20 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ cdef object _process_metadata(object df,
else:
start = range_index_meta["start"] + skip_rows
stop = range_index_meta["stop"]
if nrows != -1:
if nrows > -1:
stop = start + nrows
idx = cudf.RangeIndex(
start=start,
Expand Down Expand Up @@ -256,16 +256,27 @@ def read_parquet_chunked(
# (see read_parquet)
allow_range_index = columns is not None and len(columns) != 0

options = (
plc.io.parquet.ParquetReaderOptions.builder(
plc.io.SourceInfo(filepaths_or_buffers)
)
.use_pandas_metadata(use_pandas_metadata)
.allow_mismatched_pq_schemas(allow_mismatched_pq_schemas)
.build()
)
if row_groups is not None:
options.set_row_groups(row_groups)
if nrows > -1:
options.set_num_rows(nrows)
if skip_rows != 0:
options.set_skip_rows(skip_rows)
if columns is not None:
options.set_columns(columns)

reader = ChunkedParquetReader(
plc.io.SourceInfo(filepaths_or_buffers),
columns,
row_groups,
use_pandas_metadata=use_pandas_metadata,
options,
chunk_read_limit=chunk_read_limit,
pass_read_limit=pass_read_limit,
skip_rows=skip_rows,
nrows=nrows,
allow_mismatched_pq_schemas=allow_mismatched_pq_schemas,
)

tbl_w_meta = reader.read_chunk()
Expand Down Expand Up @@ -325,19 +336,26 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
if columns is not None and len(columns) == 0 or filters:
allow_range_index = False

# Read Parquet

tbl_w_meta = plc.io.parquet.read_parquet(
plc.io.SourceInfo(filepaths_or_buffers),
columns,
row_groups,
filters,
convert_strings_to_categories = False,
use_pandas_metadata = use_pandas_metadata,
skip_rows = skip_rows,
nrows = nrows,
allow_mismatched_pq_schemas=allow_mismatched_pq_schemas,
options = (
plc.io.parquet.ParquetReaderOptions.builder(
plc.io.SourceInfo(filepaths_or_buffers)
)
.use_pandas_metadata(use_pandas_metadata)
.allow_mismatched_pq_schemas(allow_mismatched_pq_schemas)
.build()
)
if row_groups is not None:
options.set_row_groups(row_groups)
if nrows > -1:
options.set_num_rows(nrows)
if skip_rows != 0:
options.set_skip_rows(skip_rows)
if columns is not None:
options.set_columns(columns)
if filters is not None:
options.set_filter(filters)

tbl_w_meta = plc.io.parquet.read_parquet(options)

df = cudf.DataFrame._from_data(
*data_from_pylibcudf_io(tbl_w_meta)
Expand Down
44 changes: 27 additions & 17 deletions python/cudf_polars/cudf_polars/dsl/ir.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,17 +517,22 @@ def do_evaluate(
elif typ == "parquet":
parquet_options = config_options.get("parquet_options", {})
if parquet_options.get("chunked", True):
options = plc.io.parquet.ParquetReaderOptions.builder(
plc.io.SourceInfo(paths)
).build()
# We handle skip_rows != 0 by reading from the
# up to n_rows + skip_rows and slicing off the
# first skip_rows entries.
# TODO: Remove this workaround once
# https://github.com/rapidsai/cudf/issues/16186
# is fixed
nrows = n_rows + skip_rows
if nrows > -1:
options.set_num_rows(nrows)
if with_columns is not None:
options.set_columns(with_columns)
reader = plc.io.parquet.ChunkedParquetReader(
plc.io.SourceInfo(paths),
columns=with_columns,
# We handle skip_rows != 0 by reading from the
# up to n_rows + skip_rows and slicing off the
# first skip_rows entries.
# TODO: Remove this workaround once
# https://github.com/rapidsai/cudf/issues/16186
# is fixed
nrows=n_rows + skip_rows,
skip_rows=0,
options,
chunk_read_limit=parquet_options.get(
"chunk_read_limit", cls.PARQUET_DEFAULT_CHUNK_SIZE
),
Expand Down Expand Up @@ -573,13 +578,18 @@ def slice_skip(tbl: plc.Table):
if predicate is not None and row_index is None:
# Can't apply filters during read if we have a row index.
filters = to_parquet_filter(predicate.value)
tbl_w_meta = plc.io.parquet.read_parquet(
plc.io.SourceInfo(paths),
columns=with_columns,
filters=filters,
nrows=n_rows,
skip_rows=skip_rows,
)
options = plc.io.parquet.ParquetReaderOptions.builder(
plc.io.SourceInfo(paths)
).build()
if n_rows != -1:
options.set_num_rows(n_rows)
if skip_rows != 0:
options.set_skip_rows(skip_rows)
if with_columns is not None:
options.set_columns(with_columns)
if filters is not None:
options.set_filter(filters)
tbl_w_meta = plc.io.parquet.read_parquet(options)
df = DataFrame.from_table(
tbl_w_meta.tbl,
# TODO: consider nested column names?
Expand Down
36 changes: 22 additions & 14 deletions python/pylibcudf/pylibcudf/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ from pylibcudf.libcudf.io.parquet cimport (
chunked_parquet_reader as cpp_chunked_parquet_reader,
parquet_writer_options,
parquet_writer_options_builder,
parquet_reader_options,
parquet_reader_options_builder,
chunked_parquet_writer_options,
chunked_parquet_writer_options_builder,
)
Expand All @@ -27,27 +29,33 @@ from pylibcudf.table cimport Table
from pylibcudf.types cimport DataType


cdef class ParquetReaderOptions:
cdef parquet_reader_options c_obj
cdef SourceInfo source
cpdef void set_row_groups(self, list row_groups)
cpdef void set_num_rows(self, size_type nrows)
cpdef void set_skip_rows(self, int64_t skip_rows)
cpdef void set_columns(self, list col_names)
cpdef void set_filter(self, Expression filter)

cdef class ParquetReaderOptionsBuilder:
cdef parquet_reader_options_builder c_obj
cdef SourceInfo source
cpdef ParquetReaderOptionsBuilder convert_strings_to_categories(self, bool val)
cpdef ParquetReaderOptionsBuilder use_pandas_metadata(self, bool val)
cpdef ParquetReaderOptionsBuilder allow_mismatched_pq_schemas(self, bool val)
cpdef ParquetReaderOptionsBuilder use_arrow_schema(self, bool val)
cpdef build(self)


cdef class ChunkedParquetReader:
cdef unique_ptr[cpp_chunked_parquet_reader] reader

cpdef bool has_next(self)
cpdef TableWithMetadata read_chunk(self)


cpdef read_parquet(
SourceInfo source_info,
list columns = *,
list row_groups = *,
Expression filters = *,
bool convert_strings_to_categories = *,
bool use_pandas_metadata = *,
int64_t skip_rows = *,
size_type nrows = *,
bool allow_mismatched_pq_schemas = *,
# disabled see comment in parquet.pyx for more
# ReaderColumnSchema reader_column_schema = *,
# DataType timestamp_type = *
)
cpdef read_parquet(ParquetReaderOptions options)


cdef class ParquetChunkedWriter:
Expand Down
21 changes: 20 additions & 1 deletion python/pylibcudf/pylibcudf/io/parquet.pyi
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from collections.abc import Mapping
from typing import Self

from typing_extensions import Self

from pylibcudf.expressions import Expression
from pylibcudf.io.types import (
Expand All @@ -16,6 +17,24 @@ from pylibcudf.io.types import (
)
from pylibcudf.table import Table

class ParquetReaderOptions:
def __init__(self): ...
def set_row_groups(self, row_groups: list[list[int]]): ...
def set_num_rows(self, nrows: int): ...
def set_skip_rows(self, skip_rows: int): ...
def set_columns(self, col_names: list[str]): ...
def set_filter(self, filter: Expression): ...
@staticmethod
def builder(source: SourceInfo) -> ParquetReaderOptionsBuilder: ...

class ParquetReaderOptionsBuilder:
def __init__(self): ...
def convert_strings_to_categories(self, val: bool) -> Self: ...
def use_pandas_metadata(self, val: bool) -> Self: ...
def allow_mismatched_pq_schemas(self, val: bool) -> Self: ...
def use_arrow_schema(self, val: bool) -> Self: ...
def build(self) -> ParquetReaderOptions: ...

class ChunkedParquetReader:
def __init__(
self,
Expand Down
Loading

0 comments on commit cbeefd8

Please sign in to comment.