From c6354801b6e396171f19e32edb2525df6cf179af Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Tue, 3 Dec 2024 07:26:36 -0800 Subject: [PATCH 1/4] Migrate parquet merge_row_group_metadata to pylibcudf --- python/cudf/cudf/_lib/parquet.pyx | 62 ++------------------ python/pylibcudf/pylibcudf/io/parquet.pxd | 16 ++++++ python/pylibcudf/pylibcudf/io/parquet.pyx | 70 ++++++++++++++++++++++- 3 files changed, 89 insertions(+), 59 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index d4bd0cd306c..700ceb957b3 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -14,8 +14,6 @@ except ImportError: import numpy as np -from cython.operator cimport dereference - from cudf.api.types import is_list_like from cudf._lib.utils cimport _data_from_columns, data_from_pylibcudf_io @@ -25,17 +23,17 @@ from cudf._lib.utils import _index_level_name, generate_pandas_metadata from libc.stdint cimport int64_t, uint8_t from libcpp cimport bool from libcpp.map cimport map -from libcpp.memory cimport make_unique, unique_ptr +from libcpp.memory cimport unique_ptr from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector from pylibcudf.expressions cimport Expression +from pylibcudf.io.parquet cimport BufferArrayFromVector from pylibcudf.io.parquet cimport ChunkedParquetReader from pylibcudf.libcudf.io.data_sink cimport data_sink from pylibcudf.libcudf.io.parquet cimport ( chunked_parquet_writer_options, - merge_row_group_metadata as parquet_merge_metadata, parquet_chunked_writer as cpp_parquet_chunked_writer, parquet_writer_options, write_parquet as parquet_writer, @@ -66,46 +64,6 @@ from pylibcudf cimport Table from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT -cdef class BufferArrayFromVector: - cdef Py_ssize_t length - cdef unique_ptr[vector[uint8_t]] in_vec - - # these two things declare part of the buffer interface - cdef Py_ssize_t shape[1] - cdef Py_ssize_t strides[1] - - @staticmethod - cdef BufferArrayFromVector from_unique_ptr( - unique_ptr[vector[uint8_t]] in_vec - ): - cdef BufferArrayFromVector buf = BufferArrayFromVector() - buf.in_vec = move(in_vec) - buf.length = dereference(buf.in_vec).size() - return buf - - def __getbuffer__(self, Py_buffer *buffer, int flags): - cdef Py_ssize_t itemsize = sizeof(uint8_t) - - self.shape[0] = self.length - self.strides[0] = 1 - - buffer.buf = dereference(self.in_vec).data() - - buffer.format = NULL # byte - buffer.internal = NULL - buffer.itemsize = itemsize - buffer.len = self.length * itemsize # product(shape) * itemsize - buffer.ndim = 1 - buffer.obj = self - buffer.readonly = 0 - buffer.shape = self.shape - buffer.strides = self.strides - buffer.suboffsets = NULL - - def __releasebuffer__(self, Py_buffer *buffer): - pass - - def _parse_metadata(meta): file_is_range_index = False file_index_cols = None @@ -808,19 +766,9 @@ cpdef merge_filemetadata(object filemetadata_list): -------- cudf.io.parquet.merge_row_group_metadata """ - cdef vector[unique_ptr[vector[uint8_t]]] list_c - cdef vector[uint8_t] blob_c - cdef unique_ptr[vector[uint8_t]] output_c - - for blob_py in filemetadata_list: - blob_c = blob_py - list_c.push_back(move(make_unique[vector[uint8_t]](blob_c))) - - with nogil: - output_c = move(parquet_merge_metadata(list_c)) - - out_metadata_py = BufferArrayFromVector.from_unique_ptr(move(output_c)) - return np.asarray(out_metadata_py) + return np.asarray( + plc.io.parquet.merge_row_group_metadata(filemetadata_list) + ) cdef statistics_freq _get_stat_freq(str statistics): diff --git a/python/pylibcudf/pylibcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/io/parquet.pxd index 1a61c20d783..0d5947096f5 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/io/parquet.pxd @@ -24,6 +24,20 @@ from pylibcudf.table cimport Table from pylibcudf.types cimport DataType +cdef class BufferArrayFromVector: + cdef Py_ssize_t length + cdef unique_ptr[vector[uint8_t]] in_vec + + # these two things declare part of the buffer interface + cdef Py_ssize_t shape[1] + cdef Py_ssize_t strides[1] + + @staticmethod + cdef BufferArrayFromVector from_unique_ptr( + unique_ptr[vector[uint8_t]] vec + ) + + cdef class ChunkedParquetReader: cdef unique_ptr[cpp_chunked_parquet_reader] reader @@ -91,3 +105,5 @@ cdef class ParquetWriterOptionsBuilder: cpdef ParquetWriterOptions build(self) cpdef memoryview write_parquet(ParquetWriterOptions options) + +cpdef BufferArrayFromVector merge_row_group_metadata(list metdata_list) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index b95b1f39de1..2c79e94df74 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -2,7 +2,7 @@ from cython.operator cimport dereference from libc.stdint cimport int64_t, uint8_t from libcpp cimport bool -from libcpp.memory cimport unique_ptr +from libcpp.memory cimport unique_ptr, make_unique from libcpp.string cimport string from libcpp.utility cimport move from libcpp.vector cimport vector @@ -22,6 +22,7 @@ from pylibcudf.libcudf.io.parquet cimport ( read_parquet as cpp_read_parquet, write_parquet as cpp_write_parquet, parquet_writer_options, + merge_row_group_metadata as cpp_merge_row_group_metadata, ) from pylibcudf.libcudf.io.types cimport ( compression_type, @@ -38,10 +39,44 @@ __all__ = [ "ParquetWriterOptions", "ParquetWriterOptionsBuilder", "read_parquet", - "write_parquet" + "write_parquet", + "merge_row_group_metadata", ] +cdef class BufferArrayFromVector: + @staticmethod + cdef BufferArrayFromVector from_unique_ptr( + unique_ptr[vector[uint8_t]] in_vec + ): + cdef BufferArrayFromVector buf = BufferArrayFromVector() + buf.in_vec = move(in_vec) + buf.length = dereference(buf.in_vec).size() + return buf + + def __getbuffer__(self, Py_buffer *buffer, int flags): + cdef Py_ssize_t itemsize = sizeof(uint8_t) + + self.shape[0] = self.length + self.strides[0] = 1 + + buffer.buf = dereference(self.in_vec).data() + + buffer.format = NULL # byte + buffer.internal = NULL + buffer.itemsize = itemsize + buffer.len = self.length * itemsize # product(shape) * itemsize + buffer.ndim = 1 + buffer.obj = self + buffer.readonly = 0 + buffer.shape = self.shape + buffer.strides = self.strides + buffer.suboffsets = NULL + + def __releasebuffer__(self, Py_buffer *buffer): + pass + + cdef parquet_reader_options _setup_parquet_reader_options( SourceInfo source_info, list columns = None, @@ -577,3 +612,34 @@ cpdef memoryview write_parquet(ParquetWriterOptions options): c_result = cpp_write_parquet(c_options) return memoryview(HostBuffer.from_unique_ptr(move(c_result))) + + +cpdef BufferArrayFromVector merge_row_group_metadata(list metdata_list): + """ + Merges multiple raw metadata blobs that were previously + created by write_parquet into a single metadata blob. + + For details, see :cpp:func:`merge_row_group_metadata`. + + Parameters + ---------- + metdata_list : list + List of input file metadata + + Returns + ------- + BufferArrayFromVector + A parquet-compatible blob that contains the data for all row groups in the list + """ + cdef vector[unique_ptr[vector[uint8_t]]] list_c + cdef vector[uint8_t] blob_c + cdef unique_ptr[vector[uint8_t]] output_c + + for blob in metdata_list: + blob_c = blob + list_c.push_back(move(make_unique[vector[uint8_t]](blob_c))) + + with nogil: + output_c = move(cpp_merge_row_group_metadata(list_c)) + + return BufferArrayFromVector.from_unique_ptr(move(output_c)) From 3a51a1e8c0034fb8903c3b8516641bb525ca639a Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Tue, 3 Dec 2024 22:44:00 -0800 Subject: [PATCH 2/4] address review --- python/cudf/cudf/_lib/parquet.pyx | 2 +- python/pylibcudf/pylibcudf/io/parquet.pxd | 16 +-------- python/pylibcudf/pylibcudf/io/parquet.pyx | 40 ++--------------------- 3 files changed, 5 insertions(+), 53 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 700ceb957b3..18dd9380313 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -767,7 +767,7 @@ cpdef merge_filemetadata(object filemetadata_list): cudf.io.parquet.merge_row_group_metadata """ return np.asarray( - plc.io.parquet.merge_row_group_metadata(filemetadata_list) + plc.io.parquet.merge_row_group_metadata(filemetadata_list).obj ) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/io/parquet.pxd index 0d5947096f5..79080fa7243 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/io/parquet.pxd @@ -24,20 +24,6 @@ from pylibcudf.table cimport Table from pylibcudf.types cimport DataType -cdef class BufferArrayFromVector: - cdef Py_ssize_t length - cdef unique_ptr[vector[uint8_t]] in_vec - - # these two things declare part of the buffer interface - cdef Py_ssize_t shape[1] - cdef Py_ssize_t strides[1] - - @staticmethod - cdef BufferArrayFromVector from_unique_ptr( - unique_ptr[vector[uint8_t]] vec - ) - - cdef class ChunkedParquetReader: cdef unique_ptr[cpp_chunked_parquet_reader] reader @@ -106,4 +92,4 @@ cdef class ParquetWriterOptionsBuilder: cpdef memoryview write_parquet(ParquetWriterOptions options) -cpdef BufferArrayFromVector merge_row_group_metadata(list metdata_list) +cpdef memoryview merge_row_group_metadata(list metdata_list) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 2c79e94df74..a500b35cb43 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -43,40 +43,6 @@ __all__ = [ "merge_row_group_metadata", ] - -cdef class BufferArrayFromVector: - @staticmethod - cdef BufferArrayFromVector from_unique_ptr( - unique_ptr[vector[uint8_t]] in_vec - ): - cdef BufferArrayFromVector buf = BufferArrayFromVector() - buf.in_vec = move(in_vec) - buf.length = dereference(buf.in_vec).size() - return buf - - def __getbuffer__(self, Py_buffer *buffer, int flags): - cdef Py_ssize_t itemsize = sizeof(uint8_t) - - self.shape[0] = self.length - self.strides[0] = 1 - - buffer.buf = dereference(self.in_vec).data() - - buffer.format = NULL # byte - buffer.internal = NULL - buffer.itemsize = itemsize - buffer.len = self.length * itemsize # product(shape) * itemsize - buffer.ndim = 1 - buffer.obj = self - buffer.readonly = 0 - buffer.shape = self.shape - buffer.strides = self.strides - buffer.suboffsets = NULL - - def __releasebuffer__(self, Py_buffer *buffer): - pass - - cdef parquet_reader_options _setup_parquet_reader_options( SourceInfo source_info, list columns = None, @@ -614,7 +580,7 @@ cpdef memoryview write_parquet(ParquetWriterOptions options): return memoryview(HostBuffer.from_unique_ptr(move(c_result))) -cpdef BufferArrayFromVector merge_row_group_metadata(list metdata_list): +cpdef memoryview merge_row_group_metadata(list metdata_list): """ Merges multiple raw metadata blobs that were previously created by write_parquet into a single metadata blob. @@ -628,7 +594,7 @@ cpdef BufferArrayFromVector merge_row_group_metadata(list metdata_list): Returns ------- - BufferArrayFromVector + memoryview A parquet-compatible blob that contains the data for all row groups in the list """ cdef vector[unique_ptr[vector[uint8_t]]] list_c @@ -642,4 +608,4 @@ cpdef BufferArrayFromVector merge_row_group_metadata(list metdata_list): with nogil: output_c = move(cpp_merge_row_group_metadata(list_c)) - return BufferArrayFromVector.from_unique_ptr(move(output_c)) + return memoryview(HostBuffer.from_unique_ptr(move(output_c))) From c6cbf8c39de50f76d143d4bce26c1e658d678cf1 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Wed, 4 Dec 2024 05:20:34 -0800 Subject: [PATCH 3/4] add buffer class back --- python/cudf/cudf/_lib/parquet.pyx | 42 ++++++++++++++++++++++- python/pylibcudf/pylibcudf/io/parquet.pyx | 4 +-- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/python/cudf/cudf/_lib/parquet.pyx b/python/cudf/cudf/_lib/parquet.pyx index 18dd9380313..6c80120ad6e 100644 --- a/python/cudf/cudf/_lib/parquet.pyx +++ b/python/cudf/cudf/_lib/parquet.pyx @@ -29,7 +29,6 @@ from libcpp.utility cimport move from libcpp.vector cimport vector from pylibcudf.expressions cimport Expression -from pylibcudf.io.parquet cimport BufferArrayFromVector from pylibcudf.io.parquet cimport ChunkedParquetReader from pylibcudf.libcudf.io.data_sink cimport data_sink from pylibcudf.libcudf.io.parquet cimport ( @@ -62,6 +61,47 @@ import pylibcudf as plc from pylibcudf cimport Table from cudf.utils.ioutils import _ROW_GROUP_SIZE_BYTES_DEFAULT +from cython.operator cimport dereference + + +cdef class BufferArrayFromVector: + cdef Py_ssize_t length + cdef unique_ptr[vector[uint8_t]] in_vec + + # these two things declare part of the buffer interface + cdef Py_ssize_t shape[1] + cdef Py_ssize_t strides[1] + + @staticmethod + cdef BufferArrayFromVector from_unique_ptr( + unique_ptr[vector[uint8_t]] in_vec + ): + cdef BufferArrayFromVector buf = BufferArrayFromVector() + buf.in_vec = move(in_vec) + buf.length = dereference(buf.in_vec).size() + return buf + + def __getbuffer__(self, Py_buffer *buffer, int flags): + cdef Py_ssize_t itemsize = sizeof(uint8_t) + + self.shape[0] = self.length + self.strides[0] = 1 + + buffer.buf = dereference(self.in_vec).data() + + buffer.format = NULL # byte + buffer.internal = NULL + buffer.itemsize = itemsize + buffer.len = self.length * itemsize # product(shape) * itemsize + buffer.ndim = 1 + buffer.obj = self + buffer.readonly = 0 + buffer.shape = self.shape + buffer.strides = self.strides + buffer.suboffsets = NULL + + def __releasebuffer__(self, Py_buffer *buffer): + pass def _parse_metadata(meta): diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index a500b35cb43..93843c932ad 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -598,12 +598,10 @@ cpdef memoryview merge_row_group_metadata(list metdata_list): A parquet-compatible blob that contains the data for all row groups in the list """ cdef vector[unique_ptr[vector[uint8_t]]] list_c - cdef vector[uint8_t] blob_c cdef unique_ptr[vector[uint8_t]] output_c for blob in metdata_list: - blob_c = blob - list_c.push_back(move(make_unique[vector[uint8_t]](blob_c))) + list_c.push_back(move(make_unique[vector[uint8_t]]( blob))) with nogil: output_c = move(cpp_merge_row_group_metadata(list_c)) From 6c1a1fe248fef553b59d2ba128a6c363d12ddf30 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Wed, 4 Dec 2024 05:22:39 -0800 Subject: [PATCH 4/4] update type stub --- python/pylibcudf/pylibcudf/io/parquet.pyi | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyi b/python/pylibcudf/pylibcudf/io/parquet.pyi index eb2ca68109b..3eb3d7c3a92 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyi +++ b/python/pylibcudf/pylibcudf/io/parquet.pyi @@ -78,3 +78,4 @@ class ParquetWriterOptionsBuilder: def build(self) -> ParquetWriterOptions: ... def write_parquet(options: ParquetWriterOptions) -> memoryview: ... +def merge_row_group_metadata(metdata_list: list) -> memoryview: ...