Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement chunked parquet reader in cudf-python #15728

Merged
merged 20 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 181 additions & 61 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ from libc.stdint cimport uint8_t
from libcpp cimport bool
from libcpp.map cimport map
from libcpp.memory cimport make_unique, unique_ptr
from libcpp.pair cimport pair
from libcpp.string cimport string
from libcpp.unordered_map cimport unordered_map
from libcpp.utility cimport move
Expand All @@ -44,6 +45,7 @@ from cudf._lib.io.utils cimport (
)
from cudf._lib.pylibcudf.libcudf.expressions cimport expression
from cudf._lib.pylibcudf.libcudf.io.parquet cimport (
chunked_parquet_reader as cpp_chunked_parquet_reader,
chunked_parquet_writer_options,
merge_row_group_metadata as parquet_merge_metadata,
parquet_chunked_writer as cpp_parquet_chunked_writer,
Expand All @@ -60,6 +62,7 @@ from cudf._lib.pylibcudf.libcudf.io.parquet_metadata cimport (
from cudf._lib.pylibcudf.libcudf.io.types cimport (
column_in_metadata,
table_input_metadata,
table_metadata,
)
from cudf._lib.pylibcudf.libcudf.table.table_view cimport table_view
from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type
Expand Down Expand Up @@ -126,50 +129,22 @@ def _parse_metadata(meta):
return file_is_range_index, file_index_cols, file_column_dtype


cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
use_pandas_metadata=True,
Expression filters=None):
"""
Cython function to call into libcudf API, see `read_parquet`.

filters, if not None, should be an Expression that evaluates to a
boolean predicate as a function of columns being read.

See Also
--------
cudf.io.parquet.read_parquet
cudf.io.parquet.to_parquet
"""

# Convert NativeFile buffers to NativeFileDatasource,
# but save original buffers in case we need to use
# pyarrow for metadata processing
# (See: https://github.com/rapidsai/cudf/issues/9599)
pa_buffers = []
for i, datasource in enumerate(filepaths_or_buffers):
if isinstance(datasource, NativeFile):
pa_buffers.append(datasource)
filepaths_or_buffers[i] = NativeFileDatasource(datasource)
cdef pair[parquet_reader_options, bool] _setup_parquet_reader_options(
cudf_io_types.source_info source,
vector[vector[size_type]] row_groups,
bool use_pandas_metadata,
Expression filters,
object columns):

cdef cudf_io_types.source_info source = make_source_info(
filepaths_or_buffers)

cdef bool cpp_use_pandas_metadata = use_pandas_metadata

cdef vector[vector[size_type]] cpp_row_groups
cdef parquet_reader_options args
cdef parquet_reader_options_builder builder
cdef data_type cpp_timestamp_type = cudf_types.data_type(
cudf_types.type_id.EMPTY
)
if row_groups is not None:
cpp_row_groups = row_groups

# Setup parquet reader arguments
cdef parquet_reader_options args
cdef parquet_reader_options_builder builder
builder = (
parquet_reader_options.builder(source)
.row_groups(cpp_row_groups)
.use_pandas_metadata(cpp_use_pandas_metadata)
.row_groups(row_groups)
.use_pandas_metadata(use_pandas_metadata)
.use_arrow_schema(True)
.timestamp_type(cpp_timestamp_type)
)
Expand All @@ -185,28 +160,28 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
for col in columns:
cpp_columns.push_back(str(col).encode())
args.set_columns(cpp_columns)
# Filters don't handle the range index correctly
allow_range_index &= filters is None

# Read Parquet
cdef cudf_io_types.table_with_metadata c_result

with nogil:
c_result = move(parquet_reader(args))

names = [info.name.decode() for info in c_result.metadata.schema_info]

# Access the Parquet per_file_user_data to find the index
return pair[parquet_reader_options, bool](args, allow_range_index)

cdef object _process_metadata(object df,
table_metadata table_meta,
list names,
object row_groups,
object filepaths_or_buffers,
list pa_buffers,
bool allow_range_index,
bool use_pandas_metadata):
update_struct_field_names(df, table_meta.schema_info)
index_col = None
cdef vector[unordered_map[string, string]] per_file_user_data = \
c_result.metadata.per_file_user_data

is_range_index = True
column_index_type = None
index_col_names = None
is_range_index = True
meta = None
cdef vector[unordered_map[string, string]] per_file_user_data = \
table_meta.per_file_user_data
for single_file in per_file_user_data:
json_str = single_file[b'pandas'].decode('utf-8')
meta = None
if json_str != "":
meta = json.loads(json_str)
file_is_range_index, index_col, column_index_type = _parse_metadata(meta)
Expand All @@ -220,13 +195,6 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
if c['field_name'] == idx_col:
index_col_names[idx_col] = c['name']

df = cudf.DataFrame._from_data(*data_from_unique_ptr(
move(c_result.tbl),
column_names=names
))

update_struct_field_names(df, c_result.metadata.schema_info)

if meta is not None:
# Book keep each column metadata as the order
# of `meta["columns"]` and `column_names` are not
Expand Down Expand Up @@ -319,9 +287,65 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
if use_pandas_metadata:
df.index.names = index_col

# Set column dtype for empty types.
if len(df._data.names) == 0 and column_index_type is not None:
df._data.label_dtype = cudf.dtype(column_index_type)

return df


cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
use_pandas_metadata=True,
Expression filters=None):
"""
Cython function to call into libcudf API, see `read_parquet`.

filters, if not None, should be an Expression that evaluates to a
boolean predicate as a function of columns being read.

See Also
--------
cudf.io.parquet.read_parquet
cudf.io.parquet.to_parquet
"""

# Convert NativeFile buffers to NativeFileDatasource,
# but save original buffers in case we need to use
# pyarrow for metadata processing
# (See: https://github.com/rapidsai/cudf/issues/9599)
pa_buffers = []
for i, datasource in enumerate(filepaths_or_buffers):
if isinstance(datasource, NativeFile):
pa_buffers.append(datasource)
filepaths_or_buffers[i] = NativeFileDatasource(datasource)

cdef cudf_io_types.source_info source = make_source_info(
filepaths_or_buffers)

cdef vector[vector[size_type]] cpp_row_groups
if row_groups is not None:
cpp_row_groups = row_groups

# Setup parquet reader arguments
cdef parquet_reader_options args
cdef pair[parquet_reader_options, bool] c_res = _setup_parquet_reader_options(
source, cpp_row_groups, use_pandas_metadata, filters, columns)
args, allow_range_index = c_res.first, c_res.second

# Read Parquet
cdef cudf_io_types.table_with_metadata c_result

with nogil:
c_result = move(parquet_reader(args))

names = [info.name.decode() for info in c_result.metadata.schema_info]

df = cudf.DataFrame._from_data(*data_from_unique_ptr(
move(c_result.tbl),
column_names=names
))
df = _process_metadata(df, c_result.metadata, names, row_groups,
filepaths_or_buffers, pa_buffers,
allow_range_index, use_pandas_metadata)
return df

cpdef read_parquet_metadata(filepaths_or_buffers):
Expand Down Expand Up @@ -767,6 +791,102 @@ cdef class ParquetWriter:
self.initialized = True


cdef class ParquetReader:
cdef bool initialized
cdef unique_ptr[cpp_chunked_parquet_reader] reader
cdef size_t chunk_read_limit
cdef size_t pass_read_limit
cdef size_t row_group_size_bytes
cdef table_metadata result_meta
cdef vector[unordered_map[string, string]] per_file_user_data
cdef object pandas_meta
cdef list pa_buffers
cdef bool allow_range_index
cdef object row_groups
cdef object filepaths_or_buffers
cdef object names
cdef object column_index_type
cdef object index_col_names
cdef bool is_range_index
cdef object index_col
cdef bool cpp_use_pandas_metadata

def __cinit__(self, filepaths_or_buffers, columns=None, row_groups=None,
use_pandas_metadata=True,
size_t chunk_read_limit=0,
size_t pass_read_limit=1024000000):

# Convert NativeFile buffers to NativeFileDatasource,
# but save original buffers in case we need to use
# pyarrow for metadata processing
# (See: https://github.com/rapidsai/cudf/issues/9599)

pa_buffers = []
for i, datasource in enumerate(filepaths_or_buffers):
if isinstance(datasource, NativeFile):
pa_buffers.append(datasource)
filepaths_or_buffers[i] = NativeFileDatasource(datasource)
self.pa_buffers = pa_buffers
cdef cudf_io_types.source_info source = make_source_info(
filepaths_or_buffers)

self.cpp_use_pandas_metadata = use_pandas_metadata

cdef vector[vector[size_type]] cpp_row_groups
if row_groups is not None:
cpp_row_groups = row_groups
cdef parquet_reader_options args
cdef pair[parquet_reader_options, bool] c_res = _setup_parquet_reader_options(
source, cpp_row_groups, use_pandas_metadata, None, columns)
args, self.allow_range_index = c_res.first, c_res.second

with nogil:
self.reader.reset(
new cpp_chunked_parquet_reader(
chunk_read_limit,
pass_read_limit,
args
)
)
self.initialized = False
self.row_groups = row_groups
self.filepaths_or_buffers = filepaths_or_buffers

def _has_next(self):
cdef bool res
with nogil:
res = self.reader.get()[0].has_next()
return res

def _read_chunk(self):
# Read Parquet
cdef cudf_io_types.table_with_metadata c_result

with nogil:
c_result = move(self.reader.get()[0].read_chunk())

if not self.initialized:
self.names = [info.name.decode() for info in c_result.metadata.schema_info]
self.result_meta = c_result.metadata

df = cudf.DataFrame._from_data(*data_from_unique_ptr(
move(c_result.tbl),
column_names=self.names,
))

self.initialized = True
return df

def read(self):
dfs = []
while self._has_next():
dfs.append(self._read_chunk())
df = cudf.concat(dfs)
df = _process_metadata(df, self.result_meta, self.names, self.row_groups,
self.filepaths_or_buffers, self.pa_buffers,
self.allow_range_index, self.cpp_use_pandas_metadata)
return df

cpdef merge_filemetadata(object filemetadata_list):
"""
Cython function to call into libcudf API, see `merge_row_group_metadata`.
Expand Down
12 changes: 12 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,18 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
vector[string] column_chunks_file_paths,
) except +

cdef cppclass chunked_parquet_reader:
chunked_parquet_reader() except +
chunked_parquet_reader(
size_t chunk_read_limit,
const parquet_reader_options& options) except +
chunked_parquet_reader(
size_t chunk_read_limit,
size_t pass_read_limit,
const parquet_reader_options& options) except +
bool has_next() except +
cudf_io_types.table_with_metadata read_chunk() except +

cdef unique_ptr[vector[uint8_t]] merge_row_group_metadata(
const vector[unique_ptr[vector[uint8_t]]]& metadata_list
) except +
25 changes: 25 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pyarrow import fs as pa_fs, parquet as pq

import cudf
from cudf._lib.parquet import ParquetReader
from cudf.io.parquet import (
ParquetDatasetWriter,
ParquetWriter,
Expand Down Expand Up @@ -3407,3 +3408,27 @@ def test_parquet_reader_roundtrip_structs_with_arrow_schema():

# Check results
assert_eq(expected, got)


@pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000])
@pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000])
@pytest.mark.parametrize("use_pandas_metadata", [True, False])
@pytest.mark.parametrize("row_groups", [[[0]], None, [[0, 1]]])
def test_parquet_chunked_reader(
chunk_read_limit, pass_read_limit, use_pandas_metadata, row_groups
):
df = pd.DataFrame({"a": [1, 2, 3, 4] * 1000000})
lithomas1 marked this conversation as resolved.
Show resolved Hide resolved
buffer = BytesIO()
df.to_parquet(buffer)
reader = ParquetReader(
[buffer],
chunk_read_limit=chunk_read_limit,
pass_read_limit=pass_read_limit,
use_pandas_metadata=use_pandas_metadata,
row_groups=row_groups,
)
expected = cudf.read_parquet(
buffer, use_pandas_metadata=use_pandas_metadata, row_groups=row_groups
)
actual = reader.read()
assert_eq(expected, actual)
Loading