Skip to content

Commit

Permalink
Implement chunked parquet reader in cudf-python (#15728)
Browse files Browse the repository at this point in the history
Partially Addresses: #14966 

This PR implements chunked parquet bindings in python.

Authors:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Thomas Li (https://github.com/lithomas1)

URL: #15728
  • Loading branch information
galipremsagar authored Jun 6, 2024
1 parent d1e511e commit 66895af
Show file tree
Hide file tree
Showing 3 changed files with 220 additions and 61 deletions.
242 changes: 181 additions & 61 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ from libc.stdint cimport uint8_t
from libcpp cimport bool
from libcpp.map cimport map
from libcpp.memory cimport make_unique, unique_ptr
from libcpp.pair cimport pair
from libcpp.string cimport string
from libcpp.unordered_map cimport unordered_map
from libcpp.utility cimport move
Expand All @@ -44,6 +45,7 @@ from cudf._lib.io.utils cimport (
)
from cudf._lib.pylibcudf.libcudf.expressions cimport expression
from cudf._lib.pylibcudf.libcudf.io.parquet cimport (
chunked_parquet_reader as cpp_chunked_parquet_reader,
chunked_parquet_writer_options,
merge_row_group_metadata as parquet_merge_metadata,
parquet_chunked_writer as cpp_parquet_chunked_writer,
Expand All @@ -60,6 +62,7 @@ from cudf._lib.pylibcudf.libcudf.io.parquet_metadata cimport (
from cudf._lib.pylibcudf.libcudf.io.types cimport (
column_in_metadata,
table_input_metadata,
table_metadata,
)
from cudf._lib.pylibcudf.libcudf.table.table_view cimport table_view
from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type
Expand Down Expand Up @@ -126,50 +129,22 @@ def _parse_metadata(meta):
return file_is_range_index, file_index_cols, file_column_dtype


cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
use_pandas_metadata=True,
Expression filters=None):
"""
Cython function to call into libcudf API, see `read_parquet`.
filters, if not None, should be an Expression that evaluates to a
boolean predicate as a function of columns being read.
See Also
--------
cudf.io.parquet.read_parquet
cudf.io.parquet.to_parquet
"""

# Convert NativeFile buffers to NativeFileDatasource,
# but save original buffers in case we need to use
# pyarrow for metadata processing
# (See: https://github.com/rapidsai/cudf/issues/9599)
pa_buffers = []
for i, datasource in enumerate(filepaths_or_buffers):
if isinstance(datasource, NativeFile):
pa_buffers.append(datasource)
filepaths_or_buffers[i] = NativeFileDatasource(datasource)
cdef pair[parquet_reader_options, bool] _setup_parquet_reader_options(
cudf_io_types.source_info source,
vector[vector[size_type]] row_groups,
bool use_pandas_metadata,
Expression filters,
object columns):

cdef cudf_io_types.source_info source = make_source_info(
filepaths_or_buffers)

cdef bool cpp_use_pandas_metadata = use_pandas_metadata

cdef vector[vector[size_type]] cpp_row_groups
cdef parquet_reader_options args
cdef parquet_reader_options_builder builder
cdef data_type cpp_timestamp_type = cudf_types.data_type(
cudf_types.type_id.EMPTY
)
if row_groups is not None:
cpp_row_groups = row_groups

# Setup parquet reader arguments
cdef parquet_reader_options args
cdef parquet_reader_options_builder builder
builder = (
parquet_reader_options.builder(source)
.row_groups(cpp_row_groups)
.use_pandas_metadata(cpp_use_pandas_metadata)
.row_groups(row_groups)
.use_pandas_metadata(use_pandas_metadata)
.use_arrow_schema(True)
.timestamp_type(cpp_timestamp_type)
)
Expand All @@ -185,28 +160,28 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
for col in columns:
cpp_columns.push_back(str(col).encode())
args.set_columns(cpp_columns)
# Filters don't handle the range index correctly
allow_range_index &= filters is None

# Read Parquet
cdef cudf_io_types.table_with_metadata c_result

with nogil:
c_result = move(parquet_reader(args))

names = [info.name.decode() for info in c_result.metadata.schema_info]

# Access the Parquet per_file_user_data to find the index
return pair[parquet_reader_options, bool](args, allow_range_index)

cdef object _process_metadata(object df,
table_metadata table_meta,
list names,
object row_groups,
object filepaths_or_buffers,
list pa_buffers,
bool allow_range_index,
bool use_pandas_metadata):
update_struct_field_names(df, table_meta.schema_info)
index_col = None
cdef vector[unordered_map[string, string]] per_file_user_data = \
c_result.metadata.per_file_user_data

is_range_index = True
column_index_type = None
index_col_names = None
is_range_index = True
meta = None
cdef vector[unordered_map[string, string]] per_file_user_data = \
table_meta.per_file_user_data
for single_file in per_file_user_data:
json_str = single_file[b'pandas'].decode('utf-8')
meta = None
if json_str != "":
meta = json.loads(json_str)
file_is_range_index, index_col, column_index_type = _parse_metadata(meta)
Expand All @@ -220,13 +195,6 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
if c['field_name'] == idx_col:
index_col_names[idx_col] = c['name']

df = cudf.DataFrame._from_data(*data_from_unique_ptr(
move(c_result.tbl),
column_names=names
))

update_struct_field_names(df, c_result.metadata.schema_info)

if meta is not None:
# Book keep each column metadata as the order
# of `meta["columns"]` and `column_names` are not
Expand Down Expand Up @@ -319,9 +287,65 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
if use_pandas_metadata:
df.index.names = index_col

# Set column dtype for empty types.
if len(df._data.names) == 0 and column_index_type is not None:
df._data.label_dtype = cudf.dtype(column_index_type)

return df


cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
use_pandas_metadata=True,
Expression filters=None):
"""
Cython function to call into libcudf API, see `read_parquet`.
filters, if not None, should be an Expression that evaluates to a
boolean predicate as a function of columns being read.
See Also
--------
cudf.io.parquet.read_parquet
cudf.io.parquet.to_parquet
"""

# Convert NativeFile buffers to NativeFileDatasource,
# but save original buffers in case we need to use
# pyarrow for metadata processing
# (See: https://github.com/rapidsai/cudf/issues/9599)
pa_buffers = []
for i, datasource in enumerate(filepaths_or_buffers):
if isinstance(datasource, NativeFile):
pa_buffers.append(datasource)
filepaths_or_buffers[i] = NativeFileDatasource(datasource)

cdef cudf_io_types.source_info source = make_source_info(
filepaths_or_buffers)

cdef vector[vector[size_type]] cpp_row_groups
if row_groups is not None:
cpp_row_groups = row_groups

# Setup parquet reader arguments
cdef parquet_reader_options args
cdef pair[parquet_reader_options, bool] c_res = _setup_parquet_reader_options(
source, cpp_row_groups, use_pandas_metadata, filters, columns)
args, allow_range_index = c_res.first, c_res.second

# Read Parquet
cdef cudf_io_types.table_with_metadata c_result

with nogil:
c_result = move(parquet_reader(args))

names = [info.name.decode() for info in c_result.metadata.schema_info]

df = cudf.DataFrame._from_data(*data_from_unique_ptr(
move(c_result.tbl),
column_names=names
))
df = _process_metadata(df, c_result.metadata, names, row_groups,
filepaths_or_buffers, pa_buffers,
allow_range_index, use_pandas_metadata)
return df

cpdef read_parquet_metadata(filepaths_or_buffers):
Expand Down Expand Up @@ -767,6 +791,102 @@ cdef class ParquetWriter:
self.initialized = True


cdef class ParquetReader:
cdef bool initialized
cdef unique_ptr[cpp_chunked_parquet_reader] reader
cdef size_t chunk_read_limit
cdef size_t pass_read_limit
cdef size_t row_group_size_bytes
cdef table_metadata result_meta
cdef vector[unordered_map[string, string]] per_file_user_data
cdef object pandas_meta
cdef list pa_buffers
cdef bool allow_range_index
cdef object row_groups
cdef object filepaths_or_buffers
cdef object names
cdef object column_index_type
cdef object index_col_names
cdef bool is_range_index
cdef object index_col
cdef bool cpp_use_pandas_metadata

def __cinit__(self, filepaths_or_buffers, columns=None, row_groups=None,
use_pandas_metadata=True,
size_t chunk_read_limit=0,
size_t pass_read_limit=1024000000):

# Convert NativeFile buffers to NativeFileDatasource,
# but save original buffers in case we need to use
# pyarrow for metadata processing
# (See: https://github.com/rapidsai/cudf/issues/9599)

pa_buffers = []
for i, datasource in enumerate(filepaths_or_buffers):
if isinstance(datasource, NativeFile):
pa_buffers.append(datasource)
filepaths_or_buffers[i] = NativeFileDatasource(datasource)
self.pa_buffers = pa_buffers
cdef cudf_io_types.source_info source = make_source_info(
filepaths_or_buffers)

self.cpp_use_pandas_metadata = use_pandas_metadata

cdef vector[vector[size_type]] cpp_row_groups
if row_groups is not None:
cpp_row_groups = row_groups
cdef parquet_reader_options args
cdef pair[parquet_reader_options, bool] c_res = _setup_parquet_reader_options(
source, cpp_row_groups, use_pandas_metadata, None, columns)
args, self.allow_range_index = c_res.first, c_res.second

with nogil:
self.reader.reset(
new cpp_chunked_parquet_reader(
chunk_read_limit,
pass_read_limit,
args
)
)
self.initialized = False
self.row_groups = row_groups
self.filepaths_or_buffers = filepaths_or_buffers

def _has_next(self):
cdef bool res
with nogil:
res = self.reader.get()[0].has_next()
return res

def _read_chunk(self):
# Read Parquet
cdef cudf_io_types.table_with_metadata c_result

with nogil:
c_result = move(self.reader.get()[0].read_chunk())

if not self.initialized:
self.names = [info.name.decode() for info in c_result.metadata.schema_info]
self.result_meta = c_result.metadata

df = cudf.DataFrame._from_data(*data_from_unique_ptr(
move(c_result.tbl),
column_names=self.names,
))

self.initialized = True
return df

def read(self):
dfs = []
while self._has_next():
dfs.append(self._read_chunk())
df = cudf.concat(dfs)
df = _process_metadata(df, self.result_meta, self.names, self.row_groups,
self.filepaths_or_buffers, self.pa_buffers,
self.allow_range_index, self.cpp_use_pandas_metadata)
return df

cpdef merge_filemetadata(object filemetadata_list):
"""
Cython function to call into libcudf API, see `merge_row_group_metadata`.
Expand Down
12 changes: 12 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,18 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
vector[string] column_chunks_file_paths,
) except +

cdef cppclass chunked_parquet_reader:
chunked_parquet_reader() except +
chunked_parquet_reader(
size_t chunk_read_limit,
const parquet_reader_options& options) except +
chunked_parquet_reader(
size_t chunk_read_limit,
size_t pass_read_limit,
const parquet_reader_options& options) except +
bool has_next() except +
cudf_io_types.table_with_metadata read_chunk() except +

cdef unique_ptr[vector[uint8_t]] merge_row_group_metadata(
const vector[unique_ptr[vector[uint8_t]]]& metadata_list
) except +
27 changes: 27 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pyarrow import fs as pa_fs, parquet as pq

import cudf
from cudf._lib.parquet import ParquetReader
from cudf.io.parquet import (
ParquetDatasetWriter,
ParquetWriter,
Expand Down Expand Up @@ -3407,3 +3408,29 @@ def test_parquet_reader_roundtrip_structs_with_arrow_schema():

# Check results
assert_eq(expected, got)


@pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000])
@pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000])
@pytest.mark.parametrize("use_pandas_metadata", [True, False])
@pytest.mark.parametrize("row_groups", [[[0]], None, [[0, 1]]])
def test_parquet_chunked_reader(
chunk_read_limit, pass_read_limit, use_pandas_metadata, row_groups
):
df = pd.DataFrame(
{"a": [1, 2, 3, 4] * 1000000, "b": ["av", "qw", "hi", "xyz"] * 1000000}
)
buffer = BytesIO()
df.to_parquet(buffer)
reader = ParquetReader(
[buffer],
chunk_read_limit=chunk_read_limit,
pass_read_limit=pass_read_limit,
use_pandas_metadata=use_pandas_metadata,
row_groups=row_groups,
)
expected = cudf.read_parquet(
buffer, use_pandas_metadata=use_pandas_metadata, row_groups=row_groups
)
actual = reader.read()
assert_eq(expected, actual)

0 comments on commit 66895af

Please sign in to comment.