Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate cudf::io::merge_row_group_metadata to pylibcudf #17491

Merged
merged 6 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 5 additions & 17 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ except ImportError:

import numpy as np

from cython.operator cimport dereference

from cudf.api.types import is_list_like

from cudf._lib.utils cimport _data_from_columns, data_from_pylibcudf_io
Expand All @@ -25,7 +23,7 @@ from cudf._lib.utils import _index_level_name, generate_pandas_metadata
from libc.stdint cimport int64_t, uint8_t
from libcpp cimport bool
from libcpp.map cimport map
from libcpp.memory cimport make_unique, unique_ptr
from libcpp.memory cimport unique_ptr
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector
Expand All @@ -35,7 +33,6 @@ from pylibcudf.io.parquet cimport ChunkedParquetReader
from pylibcudf.libcudf.io.data_sink cimport data_sink
from pylibcudf.libcudf.io.parquet cimport (
chunked_parquet_writer_options,
merge_row_group_metadata as parquet_merge_metadata,
parquet_chunked_writer as cpp_parquet_chunked_writer,
parquet_writer_options,
write_parquet as parquet_writer,
Expand Down Expand Up @@ -64,6 +61,7 @@ import pylibcudf as plc
from pylibcudf cimport Table

from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT
from cython.operator cimport dereference


cdef class BufferArrayFromVector:
Expand Down Expand Up @@ -808,19 +806,9 @@ cpdef merge_filemetadata(object filemetadata_list):
--------
cudf.io.parquet.merge_row_group_metadata
"""
cdef vector[unique_ptr[vector[uint8_t]]] list_c
cdef vector[uint8_t] blob_c
cdef unique_ptr[vector[uint8_t]] output_c

for blob_py in filemetadata_list:
blob_c = blob_py
list_c.push_back(move(make_unique[vector[uint8_t]](blob_c)))

with nogil:
output_c = move(parquet_merge_metadata(list_c))

out_metadata_py = BufferArrayFromVector.from_unique_ptr(move(output_c))
return np.asarray(out_metadata_py)
return np.asarray(
plc.io.parquet.merge_row_group_metadata(filemetadata_list).obj
)


cdef statistics_freq _get_stat_freq(str statistics):
Expand Down
2 changes: 2 additions & 0 deletions python/pylibcudf/pylibcudf/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,5 @@ cdef class ParquetWriterOptionsBuilder:
cpdef ParquetWriterOptions build(self)

cpdef memoryview write_parquet(ParquetWriterOptions options)

cpdef memoryview merge_row_group_metadata(list metdata_list)
1 change: 1 addition & 0 deletions python/pylibcudf/pylibcudf/io/parquet.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,4 @@ class ParquetWriterOptionsBuilder:
def build(self) -> ParquetWriterOptions: ...

def write_parquet(options: ParquetWriterOptions) -> memoryview: ...
def merge_row_group_metadata(metdata_list: list) -> memoryview: ...
36 changes: 33 additions & 3 deletions python/pylibcudf/pylibcudf/io/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from cython.operator cimport dereference
from libc.stdint cimport int64_t, uint8_t
from libcpp cimport bool
from libcpp.memory cimport unique_ptr
from libcpp.memory cimport unique_ptr, make_unique
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector
Expand All @@ -22,6 +22,7 @@ from pylibcudf.libcudf.io.parquet cimport (
read_parquet as cpp_read_parquet,
write_parquet as cpp_write_parquet,
parquet_writer_options,
merge_row_group_metadata as cpp_merge_row_group_metadata,
)
from pylibcudf.libcudf.io.types cimport (
compression_type,
Expand All @@ -38,10 +39,10 @@ __all__ = [
"ParquetWriterOptions",
"ParquetWriterOptionsBuilder",
"read_parquet",
"write_parquet"
"write_parquet",
"merge_row_group_metadata",
]


cdef parquet_reader_options _setup_parquet_reader_options(
SourceInfo source_info,
list columns = None,
Expand Down Expand Up @@ -577,3 +578,32 @@ cpdef memoryview write_parquet(ParquetWriterOptions options):
c_result = cpp_write_parquet(c_options)

return memoryview(HostBuffer.from_unique_ptr(move(c_result)))


cpdef memoryview merge_row_group_metadata(list metdata_list):
"""
Merges multiple raw metadata blobs that were previously
created by write_parquet into a single metadata blob.

For details, see :cpp:func:`merge_row_group_metadata`.

Parameters
----------
metdata_list : list
List of input file metadata

Returns
-------
memoryview
A parquet-compatible blob that contains the data for all row groups in the list
"""
cdef vector[unique_ptr[vector[uint8_t]]] list_c
cdef unique_ptr[vector[uint8_t]] output_c

for blob in metdata_list:
list_c.push_back(move(make_unique[vector[uint8_t]](<vector[uint8_t]> blob)))

with nogil:
output_c = move(cpp_merge_row_group_metadata(list_c))

return memoryview(HostBuffer.from_unique_ptr(move(output_c)))
Loading