Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement chunked parquet reader in cudf-python #15728

Merged
merged 20 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 240 additions & 0 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ from cudf._lib.io.utils cimport (
)
from cudf._lib.pylibcudf.libcudf.expressions cimport expression
from cudf._lib.pylibcudf.libcudf.io.parquet cimport (
chunked_parquet_reader as cpp_chunked_parquet_reader,
chunked_parquet_writer_options,
merge_row_group_metadata as parquet_merge_metadata,
parquet_chunked_writer as cpp_parquet_chunked_writer,
Expand All @@ -60,6 +61,7 @@ from cudf._lib.pylibcudf.libcudf.io.parquet_metadata cimport (
from cudf._lib.pylibcudf.libcudf.io.types cimport (
column_in_metadata,
table_input_metadata,
table_metadata,
)
from cudf._lib.pylibcudf.libcudf.table.table_view cimport table_view
from cudf._lib.pylibcudf.libcudf.types cimport data_type, size_type
Expand Down Expand Up @@ -767,6 +769,244 @@ cdef class ParquetWriter:
self.initialized = True


cdef class ParquetReader:
cdef bool initialized
cdef unique_ptr[cpp_chunked_parquet_reader] reader
cdef size_t chunk_read_limit
cdef size_t pass_read_limit
cdef size_t row_group_size_bytes
cdef table_metadata result_meta
cdef vector[unordered_map[string, string]] per_file_user_data
cdef object pandas_meta
cdef list pa_buffers
cdef bool allow_range_index
cdef object row_groups
cdef object filepaths_or_buffers
cdef object names
cdef object column_index_type
cdef object index_col_names
cdef bool is_range_index
cdef object index_col
cdef bool cpp_use_pandas_metadata

def __cinit__(self, filepaths_or_buffers, columns=None, row_groups=None,
use_pandas_metadata=True,
Expression filters=None,
size_t chunk_read_limit=0,
size_t pass_read_limit=1024000000):

# Convert NativeFile buffers to NativeFileDatasource,
# but save original buffers in case we need to use
# pyarrow for metadata processing
# (See: https://github.com/rapidsai/cudf/issues/9599)

pa_buffers = []
for i, datasource in enumerate(filepaths_or_buffers):
if isinstance(datasource, NativeFile):
pa_buffers.append(datasource)
filepaths_or_buffers[i] = NativeFileDatasource(datasource)
self.pa_buffers = pa_buffers
cdef cudf_io_types.source_info source = make_source_info(
filepaths_or_buffers)

self.cpp_use_pandas_metadata = use_pandas_metadata

cdef vector[vector[size_type]] cpp_row_groups
cdef data_type cpp_timestamp_type = cudf_types.data_type(
cudf_types.type_id.EMPTY
)
if row_groups is not None:
cpp_row_groups = row_groups

# Setup parquet reader arguments
cdef parquet_reader_options args
cdef parquet_reader_options_builder builder
builder = (
parquet_reader_options.builder(source)
.row_groups(cpp_row_groups)
.use_pandas_metadata(self.cpp_use_pandas_metadata)
.timestamp_type(cpp_timestamp_type)
)
if filters is not None:
builder = builder.filter(<expression &>dereference(filters.c_obj.get()))

args = move(builder.build())
cdef vector[string] cpp_columns
self.allow_range_index = True
if columns is not None:
cpp_columns.reserve(len(columns))
self.allow_range_index = len(columns) > 0
for col in columns:
cpp_columns.push_back(str(col).encode())
args.set_columns(cpp_columns)
# Filters don't handle the range index correctly
self.allow_range_index &= filters is None

self.chunk_read_limit = chunk_read_limit
self.pass_read_limit = pass_read_limit

with nogil:
self.reader.reset(
new cpp_chunked_parquet_reader(
chunk_read_limit,
pass_read_limit,
args
)
)
self.initialized = False
self.row_groups = row_groups
self.filepaths_or_buffers = filepaths_or_buffers

def _has_next(self):
cdef bool res
with nogil:
res = self.reader.get()[0].has_next()
return res

def _read_chunk(self):
# Read Parquet
cdef cudf_io_types.table_with_metadata c_result

with nogil:
c_result = move(self.reader.get()[0].read_chunk())

if not self.initialized:
self.names = [info.name.decode() for info in c_result.metadata.schema_info]
self.result_meta = c_result.metadata
self.per_file_user_data = c_result.metadata.per_file_user_data

# Access the Parquet per_file_user_data to find the index

self.column_index_type = None
self.index_col_names = None
self.is_range_index = True
for single_file in self.per_file_user_data:
json_str = single_file[b'pandas'].decode('utf-8')
if json_str != "":
self.pandas_meta = json.loads(json_str)
file_is_range_index, self.index_col, self.column_index_type = \
_parse_metadata(self.pandas_meta)
self.is_range_index &= file_is_range_index

if not file_is_range_index and self.index_col is not None \
and self.index_col_names is None:
self.index_col_names = {}
for idx_col in self.index_col:
for c in self.pandas_meta['columns']:
if c['field_name'] == idx_col:
self.index_col_names[idx_col] = c['name']

df = cudf.DataFrame._from_data(*data_from_unique_ptr(
move(c_result.tbl),
column_names=self.names,
))

self.initialized = True
return df

def read(self):
dfs = []
while self._has_next():
dfs.append(self._read_chunk())
df = cudf.concat(dfs)
update_struct_field_names(df, self.result_meta.schema_info)
if self.pandas_meta is not None:
# Book keep each column metadata as the order
# of `meta["columns"]` and `column_names` are not
# guaranteed to be deterministic and same always.
meta_data_per_column = {
col_meta['name']: col_meta for col_meta in self.pandas_meta["columns"]
}

# update the decimal precision of each column
for col in self.names:
if isinstance(df._data[col].dtype, cudf.core.dtypes.DecimalDtype):
df._data[col].dtype.precision = (
meta_data_per_column[col]["metadata"]["precision"]
)

# Set the index column
if self.index_col is not None and len(self.index_col) > 0:
if self.is_range_index:
if not self.allow_range_index:
return df

if len(self.per_file_user_data) > 1:
range_index_meta = {
"kind": "range",
"name": None,
"start": 0,
"stop": len(df),
"step": 1
}
else:
range_index_meta = self.index_col[0]

if self.row_groups is not None:
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
per_file_metadata = [
pa.parquet.read_metadata(
# Pyarrow cannot read directly from bytes
io.BytesIO(s) if isinstance(s, bytes) else s
) for s in (
self.pa_buffers or self.filepaths_or_buffers
)
]

filtered_idx = []
for i, file_meta in enumerate(per_file_metadata):
row_groups_i = []
start = 0
for row_group in range(file_meta.num_row_groups):
stop = start + file_meta.row_group(row_group).num_rows
row_groups_i.append((start, stop))
start = stop

for rg in self.row_groups[i]:
filtered_idx.append(
cudf.RangeIndex(
start=row_groups_i[rg][0],
stop=row_groups_i[rg][1],
step=range_index_meta['step']
)
)

if len(filtered_idx) > 0:
idx = cudf.concat(filtered_idx)
else:
idx = cudf.Index(cudf.core.column.column_empty(0))
else:
idx = cudf.RangeIndex(
start=range_index_meta['start'],
stop=range_index_meta['stop'],
step=range_index_meta['step'],
name=range_index_meta['name']
)

df._index = idx
elif set(self.index_col).issubset(self.names):
index_data = df[self.index_col]
actual_index_names = list(self.index_col_names.values())
if len(index_data._data) == 1:
idx = cudf.Index(
index_data._data.columns[0],
name=actual_index_names[0]
)
else:
idx = cudf.MultiIndex.from_frame(
index_data,
names=actual_index_names
)
df.drop(columns=self.index_col, inplace=True)
df._index = idx
else:
if self.cpp_use_pandas_metadata:
df.index.names = self.index_col

# Set column dtype for empty types.
if len(df._data.names) == 0 and self.column_index_type is not None:
df._data.label_dtype = cudf.dtype(self.column_index_type)
return df

cpdef merge_filemetadata(object filemetadata_list):
"""
Cython function to call into libcudf API, see `merge_row_group_metadata`.
Expand Down
12 changes: 12 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/libcudf/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,18 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil:
vector[string] column_chunks_file_paths,
) except +

cdef cppclass chunked_parquet_reader:
chunked_parquet_reader() except +
chunked_parquet_reader(
size_t chunk_read_limit,
const parquet_reader_options& options) except +
chunked_parquet_reader(
size_t chunk_read_limit,
size_t pass_read_limit,
const parquet_reader_options& options) except +
bool has_next() except +
cudf_io_types.table_with_metadata read_chunk() except +

cdef unique_ptr[vector[uint8_t]] merge_row_group_metadata(
const vector[unique_ptr[vector[uint8_t]]]& metadata_list
) except +
16 changes: 16 additions & 0 deletions python/cudf/cudf/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from pyarrow import fs as pa_fs, parquet as pq

import cudf
from cudf._lib.parquet import ParquetReader
from cudf.io.parquet import (
ParquetDatasetWriter,
ParquetWriter,
Expand Down Expand Up @@ -3407,3 +3408,18 @@ def test_parquet_reader_roundtrip_structs_with_arrow_schema():

# Check results
assert_eq(expected, got)


@pytest.mark.parametrize("chunk_read_limit", [0, 240, 1024000000])
@pytest.mark.parametrize("pass_read_limit", [0, 240, 1024000000])
def test_parquet_chunked_reader(chunk_read_limit, pass_read_limit):
galipremsagar marked this conversation as resolved.
Show resolved Hide resolved
expected = pd.DataFrame({"a": [1, 2, 3, 4] * 1000000})
buffer = BytesIO()
expected.to_parquet(buffer)
reader = ParquetReader(
[buffer],
chunk_read_limit=chunk_read_limit,
pass_read_limit=pass_read_limit,
)
actual = reader.read()
assert_eq(expected, actual)
Loading