From 3fee3fcee5afa330d62be14f27df18144fd3a3b6 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Tue, 5 Nov 2024 08:59:07 -0800 Subject: [PATCH 01/11] Migrate contiguous split APIs to pylibcudf --- python/cudf/cudf/_lib/copying.pxd | 1 - python/cudf/cudf/_lib/copying.pyx | 14 ++++++++++---- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/python/cudf/cudf/_lib/copying.pxd b/python/cudf/cudf/_lib/copying.pxd index 14c7d2066d8..1c9efe99edc 100644 --- a/python/cudf/cudf/_lib/copying.pxd +++ b/python/cudf/cudf/_lib/copying.pxd @@ -2,7 +2,6 @@ from pylibcudf.libcudf.contiguous_split cimport packed_columns - cdef class _CPackedColumns: cdef packed_columns c_obj cdef object column_names diff --git a/python/cudf/cudf/_lib/copying.pyx b/python/cudf/cudf/_lib/copying.pyx index 4221e745e65..961b554d8e4 100644 --- a/python/cudf/cudf/_lib/copying.pyx +++ b/python/cudf/cudf/_lib/copying.pyx @@ -34,6 +34,7 @@ from pylibcudf.libcudf.scalar.scalar cimport scalar from pylibcudf.libcudf.types cimport size_type from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_table_view +cimport pylibcudf as plc # workaround for https://github.com/cython/cython/issues/3885 ctypedef const scalar constscalar @@ -352,11 +353,10 @@ cdef class _CPackedColumns: or input_table.index.stop != len(input_table) or input_table.index.step != 1 ): - input_table_view = table_view_from_table(input_table) + columns = input_table._index._columns + input_table._columns p.index_names = input_table._index_names else: - input_table_view = table_view_from_table( - input_table, ignore_index=True) + columns = input_table._columns p.column_names = input_table._column_names p.column_dtypes = {} @@ -364,7 +364,13 @@ cdef class _CPackedColumns: if isinstance(col.dtype, cudf.core.dtypes._BaseDtype): p.column_dtypes[name] = col.dtype - p.c_obj = move(cpp_contiguous_split.pack(input_table_view)) + p.c_obj = move(plc.contiguous_split.pack( + plc.Table( + [ + col.to_pylibcudf(mode="read") for col in columns + ] + ) + ).c_obj.get()[0]) return p From 635595a0fb871ffd067d9a6c32cae81aaad744fa Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Wed, 6 Nov 2024 20:20:27 -0800 Subject: [PATCH 02/11] delete _CPackedColumns --- python/cudf/cudf/_lib/copying.pxd | 8 - python/cudf/cudf/_lib/copying.pyx | 218 ++++++------------ python/cudf/cudf/tests/test_pack.py | 1 - .../pylibcudf/pylibcudf/contiguous_split.pyx | 91 +++++++- 4 files changed, 165 insertions(+), 153 deletions(-) diff --git a/python/cudf/cudf/_lib/copying.pxd b/python/cudf/cudf/_lib/copying.pxd index 1c9efe99edc..441eecf5bcc 100644 --- a/python/cudf/cudf/_lib/copying.pxd +++ b/python/cudf/cudf/_lib/copying.pxd @@ -1,9 +1 @@ # Copyright (c) 2021-2024, NVIDIA CORPORATION. - -from pylibcudf.libcudf.contiguous_split cimport packed_columns - -cdef class _CPackedColumns: - cdef packed_columns c_obj - cdef object column_names - cdef object column_dtypes - cdef object index_names diff --git a/python/cudf/cudf/_lib/copying.pyx b/python/cudf/cudf/_lib/copying.pyx index 961b554d8e4..39867381bf4 100644 --- a/python/cudf/cudf/_lib/copying.pyx +++ b/python/cudf/cudf/_lib/copying.pyx @@ -2,13 +2,9 @@ import pickle -from libc.stdint cimport uint8_t, uintptr_t from libcpp cimport bool from libcpp.memory cimport unique_ptr from libcpp.utility cimport move -from libcpp.vector cimport vector - -from rmm.pylibrmm.device_buffer cimport DeviceBuffer import pylibcudf @@ -20,21 +16,20 @@ from cudf._lib.column cimport Column from cudf._lib.scalar import as_device_scalar from cudf._lib.scalar cimport DeviceScalar -from cudf._lib.utils cimport table_view_from_table from cudf._lib.reduce import minmax from cudf.core.abc import Serializable from libcpp.memory cimport make_unique -cimport pylibcudf.libcudf.contiguous_split as cpp_contiguous_split from pylibcudf.libcudf.column.column cimport column from pylibcudf.libcudf.column.column_view cimport column_view from pylibcudf.libcudf.scalar.scalar cimport scalar from pylibcudf.libcudf.types cimport size_type -from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_table_view -cimport pylibcudf as plc +from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_pylibcudf_table +import pylibcudf as plc +from pylibcudf.contiguous_split import PackedColumns as PlcPackedColumns # workaround for https://github.com/cython/cython/issues/3885 ctypedef const scalar constscalar @@ -336,143 +331,17 @@ def get_element(Column input_column, size_type index): ) -cdef class _CPackedColumns: - - @staticmethod - def from_py_table(input_table, keep_index=True): - """ - Construct a ``PackedColumns`` object from a ``cudf.DataFrame``. - """ - import cudf.core.dtypes - - cdef _CPackedColumns p = _CPackedColumns.__new__(_CPackedColumns) - - if keep_index and ( - not isinstance(input_table.index, cudf.RangeIndex) - or input_table.index.start != 0 - or input_table.index.stop != len(input_table) - or input_table.index.step != 1 - ): - columns = input_table._index._columns + input_table._columns - p.index_names = input_table._index_names - else: - columns = input_table._columns - - p.column_names = input_table._column_names - p.column_dtypes = {} - for name, col in input_table._column_labels_and_values: - if isinstance(col.dtype, cudf.core.dtypes._BaseDtype): - p.column_dtypes[name] = col.dtype - - p.c_obj = move(plc.contiguous_split.pack( - plc.Table( - [ - col.to_pylibcudf(mode="read") for col in columns - ] - ) - ).c_obj.get()[0]) - - return p - - @property - def gpu_data_ptr(self): - return int(self.c_obj.gpu_data.get()[0].data()) - - @property - def gpu_data_size(self): - return int(self.c_obj.gpu_data.get()[0].size()) - - def serialize(self): - header = {} - frames = [] - - gpu_data = as_buffer( - data=self.gpu_data_ptr, - size=self.gpu_data_size, - owner=self, - exposed=True - ) - data_header, data_frames = gpu_data.serialize() - header["data"] = data_header - frames.extend(data_frames) - - header["column-names"] = self.column_names - header["index-names"] = self.index_names - if self.c_obj.metadata.get()[0].data() != NULL: - header["metadata"] = list( - - self.c_obj.metadata.get()[0].data() - ) - - column_dtypes = {} - for name, dtype in self.column_dtypes.items(): - dtype_header, dtype_frames = dtype.serialize() - column_dtypes[name] = ( - dtype_header, - (len(frames), len(frames) + len(dtype_frames)), - ) - frames.extend(dtype_frames) - header["column-dtypes"] = column_dtypes - - return header, frames - - @staticmethod - def deserialize(header, frames): - cdef _CPackedColumns p = _CPackedColumns.__new__(_CPackedColumns) - - gpu_data = Buffer.deserialize(header["data"], frames) - - dbuf = DeviceBuffer( - ptr=gpu_data.get_ptr(mode="write"), - size=gpu_data.nbytes - ) - - cdef cpp_contiguous_split.packed_columns data - data.metadata = move( - make_unique[vector[uint8_t]]( - move(header.get("metadata", [])) - ) - ) - data.gpu_data = move(dbuf.c_obj) - - p.c_obj = move(data) - p.column_names = header["column-names"] - p.index_names = header["index-names"] - - column_dtypes = {} - for name, dtype in header["column-dtypes"].items(): - dtype_header, (start, stop) = dtype - column_dtypes[name] = pickle.loads( - dtype_header["type-serialized"] - ).deserialize(dtype_header, frames[start:stop]) - p.column_dtypes = column_dtypes - - return p - - def unpack(self): - output_table = cudf.DataFrame._from_data(*data_from_table_view( - cpp_contiguous_split.unpack(self.c_obj), - self, - self.column_names, - self.index_names - )) - - for name, dtype in self.column_dtypes.items(): - output_table._data[name] = ( - output_table._data[name]._with_type_metadata(dtype) - ) - - return output_table - - class PackedColumns(Serializable): """ A packed representation of a Frame, with all columns residing in a single GPU memory buffer. """ - def __init__(self, data): + def __init__(self, data, column_names=None, index_names=None, column_dtypes=None): self._data = data + self.column_names=column_names + self.index_names=index_names + self.column_dtypes=column_dtypes def __reduce__(self): return self.deserialize, self.serialize() @@ -488,21 +357,84 @@ class PackedColumns(Serializable): } def serialize(self): - header, frames = self._data.serialize() + gpu_data = as_buffer( + data=self._data.gpu_data_ptr, + size=self._data.gpu_data_size, + owner=self, + exposed=True + ) + header, frames = self._data.serialize( + gpu_data, + self.column_names, + self.index_names, + self.column_dtypes, + ) + self.column_dtypes = header["column-dtypes"] header["type-serialized"] = pickle.dumps(type(self)) - return header, frames @classmethod def deserialize(cls, header, frames): - return cls(_CPackedColumns.deserialize(header, frames)) + gpu_data = Buffer.deserialize(header["data"], frames) + p, col_names, index_names, col_dtypes = PlcPackedColumns.deserialize( + gpu_data, header, frames + ) + return cls( + p, + col_names, + index_names, + col_dtypes, + ) @classmethod def from_py_table(cls, input_table, keep_index=True): - return cls(_CPackedColumns.from_py_table(input_table, keep_index)) + if keep_index and ( + not isinstance(input_table.index, cudf.RangeIndex) + or input_table.index.start != 0 + or input_table.index.stop != len(input_table) + or input_table.index.step != 1 + ): + columns = input_table._index._columns + input_table._columns + index_names = input_table._index_names + else: + columns = input_table._columns + index_names = None + + column_names = input_table._column_names + column_dtypes = {} + for name, col in input_table._column_labels_and_values: + if isinstance( + col.dtype, + (cudf.core.dtypes._BaseDtype, cudf.core.dtypes.CategoricalDtype) + ): + column_dtypes[name] = col.dtype + + return cls( + plc.contiguous_split.PackedColumns.from_plc_table( + plc.Table( + [ + col.to_pylibcudf(mode="read") for col in columns + ] + ) + ), + column_names=column_names, + index_names=index_names, + column_dtypes=column_dtypes, + ) def unpack(self): - return self._data.unpack() + output_table = cudf.DataFrame._from_data(*data_from_pylibcudf_table( + plc.contiguous_split.unpack(self._data), + self.column_names, + self.index_names + )) + print("DTYPES", self.column_dtypes) + for name, dtype in self.column_dtypes.items(): + output_table._data[name] = ( + output_table._data[name]._with_type_metadata(dtype) + ) + + return output_table def pack(input_table, keep_index=True): diff --git a/python/cudf/cudf/tests/test_pack.py b/python/cudf/cudf/tests/test_pack.py index b474bbe9bd8..d770e3d0fc2 100644 --- a/python/cudf/cudf/tests/test_pack.py +++ b/python/cudf/cudf/tests/test_pack.py @@ -62,7 +62,6 @@ def assert_packed_frame_equality(df): packed = pack(df) del df unpacked = unpack(packed) - assert_eq(unpacked, pdf) diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pyx b/python/pylibcudf/pylibcudf/contiguous_split.pyx index ed926a3fcc0..3391cf506a3 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pyx +++ b/python/pylibcudf/pylibcudf/contiguous_split.pyx @@ -1,7 +1,7 @@ # Copyright (c) 2024, NVIDIA CORPORATION. from cython.operator cimport dereference -from libc.stdint cimport uint8_t +from libc.stdint cimport uint8_t, uintptr_t from libcpp.memory cimport make_unique, unique_ptr from libcpp.utility cimport move from libcpp.vector cimport vector @@ -18,6 +18,7 @@ from rmm.pylibrmm.device_buffer cimport DeviceBuffer from .gpumemoryview cimport gpumemoryview from .table cimport Table from .utils cimport int_to_void_ptr +import pickle cdef class HostBuffer: @@ -76,6 +77,94 @@ cdef class PackedColumns: out.c_obj = move(data) return out + @staticmethod + def from_plc_table(Table input): + """ + Construct a ``PackedColumns`` object from a ``pylibcudf.Table``. + """ + cdef unique_ptr[packed_columns] c_packed_columns = move( + make_unique[packed_columns]( + move( + cpp_pack( + input.view() + ) + ) + ) + ) + + return PackedColumns.from_libcudf(move(c_packed_columns)) + + def serialize( + self, + object cudf_buffer, + object column_names, + object index_names, + object column_dtypes + ): + header = {} + frames = [] + data_header, data_frames = cudf_buffer.serialize() + header["data"] = data_header + frames.extend(data_frames) + + header["column-names"] = column_names + header["index-names"] = index_names + if self.c_obj.get()[0].metadata.get()[0].data() != NULL: + header["metadata"] = list( + + self.c_obj.get()[0].metadata.get()[0].data() + ) + for name, dtype in column_dtypes.items(): + dtype_header, dtype_frames = dtype.serialize() + column_dtypes[name] = ( + dtype_header, + (len(frames), len(frames) + len(dtype_frames)), + ) + frames.extend(dtype_frames) + header["column-dtypes"] = column_dtypes + + return header, frames + + @staticmethod + def deserialize(object gpu_data, object header, object frames): + """ + Deserialize a PackedColumns instance from header and frames. + """ + cdef PackedColumns p = PackedColumns.__new__(PackedColumns) + dbuf = DeviceBuffer( + ptr=gpu_data.get_ptr(mode="write"), + size=gpu_data.nbytes + ) + + cdef packed_columns data + data.metadata = move( + make_unique[vector[uint8_t]]( + move(header.get("metadata", [])) + ) + ) + data.gpu_data = move(dbuf.c_obj) + + p.c_obj = move(make_unique[packed_columns](move(data))) + + column_dtypes = {} + for name, dtype in header["column-dtypes"].items(): + dtype_header, (start, stop) = dtype + column_dtypes[name] = pickle.loads( + dtype_header["type-serialized"] + ).deserialize(dtype_header, frames[start:stop]) + + return p, header["column-names"], header["index-names"], column_dtypes + + @property + def gpu_data_ptr(self): + if self.c_obj.get() != NULL: + return int(self.c_obj.get()[0].gpu_data.get()[0].data()) + + @property + def gpu_data_size(self): + if self.c_obj.get() != NULL: + return int(self.c_obj.get()[0].gpu_data.get()[0].size()) + def release(self): """Releases and returns the underlying serialized metadata and gpu data. From 25b145753e8fffe43be2912baec7fc09d798b14a Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Tue, 12 Nov 2024 17:26:24 -0800 Subject: [PATCH 03/11] remove serialize, deserialize from PlcPackedColumns --- python/cudf/cudf/_lib/copying.pyx | 78 +++++++++++++++---- .../pylibcudf/pylibcudf/contiguous_split.pyx | 63 +-------------- 2 files changed, 62 insertions(+), 79 deletions(-) diff --git a/python/cudf/cudf/_lib/copying.pyx b/python/cudf/cudf/_lib/copying.pyx index 39867381bf4..29bd2c195a2 100644 --- a/python/cudf/cudf/_lib/copying.pyx +++ b/python/cudf/cudf/_lib/copying.pyx @@ -29,7 +29,11 @@ from pylibcudf.libcudf.types cimport size_type from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_pylibcudf_table import pylibcudf as plc -from pylibcudf.contiguous_split import PackedColumns as PlcPackedColumns +from pylibcudf.contiguous_split cimport PackedColumns as PlcPackedColumns +cimport pylibcudf.libcudf.contiguous_split as cpp_contiguous_split +from libcpp.vector cimport vector +from libc.stdint cimport uint8_t +from rmm.pylibrmm.device_buffer cimport DeviceBuffer # workaround for https://github.com/cython/cython/issues/3885 ctypedef const scalar constscalar @@ -363,27 +367,68 @@ class PackedColumns(Serializable): owner=self, exposed=True ) - header, frames = self._data.serialize( - gpu_data, - self.column_names, - self.index_names, - self.column_dtypes, - ) - self.column_dtypes = header["column-dtypes"] + # header, frames = self._data.serialize( + # gpu_data, + # self.column_names, + # self.index_names, + # self.column_dtypes, + # ) + + header = {} + frames = [] + data_header, data_frames = gpu_data.serialize() + header["data"] = data_header + frames.extend(data_frames) + + header["column-names"] = self.column_names + header["index-names"] = self.index_names + cdef PlcPackedColumns p = self._data + if p.c_obj.get()[0].metadata.get()[0].data() != NULL: + header["metadata"] = list( + + p.c_obj.get()[0].metadata.get()[0].data() + ) + for name, dtype in self.column_dtypes.items(): + dtype_header, dtype_frames = dtype.serialize() + self.column_dtypes[name] = ( + dtype_header, + (len(frames), len(frames) + len(dtype_frames)), + ) + frames.extend(dtype_frames) + header["column-dtypes"] = self.column_dtypes header["type-serialized"] = pickle.dumps(type(self)) return header, frames @classmethod def deserialize(cls, header, frames): gpu_data = Buffer.deserialize(header["data"], frames) - p, col_names, index_names, col_dtypes = PlcPackedColumns.deserialize( - gpu_data, header, frames + cdef PlcPackedColumns p = PlcPackedColumns.__new__(PlcPackedColumns) + dbuf = DeviceBuffer( + ptr=gpu_data.get_ptr(mode="write"), + size=gpu_data.nbytes + ) + cdef cpp_contiguous_split.packed_columns data + data.metadata = move( + make_unique[vector[uint8_t]]( + move(header.get("metadata", [])) + ) ) + data.gpu_data = move(dbuf.c_obj) + + p.c_obj = move(make_unique[cpp_contiguous_split.packed_columns](move(data))) + + column_dtypes = {} + for name, dtype in header["column-dtypes"].items(): + dtype_header, (start, stop) = dtype + column_dtypes[name] = pickle.loads( + dtype_header["type-serialized"] + ).deserialize(dtype_header, frames[start:stop]) + return cls( p, - col_names, - index_names, - col_dtypes, + header["column-names"], + header["index-names"], + column_dtypes, ) @classmethod @@ -417,9 +462,9 @@ class PackedColumns(Serializable): ] ) ), - column_names=column_names, - index_names=index_names, - column_dtypes=column_dtypes, + column_names, + index_names, + column_dtypes, ) def unpack(self): @@ -428,7 +473,6 @@ class PackedColumns(Serializable): self.column_names, self.index_names )) - print("DTYPES", self.column_dtypes) for name, dtype in self.column_dtypes.items(): output_table._data[name] = ( output_table._data[name]._with_type_metadata(dtype) diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pyx b/python/pylibcudf/pylibcudf/contiguous_split.pyx index c3ea2d2f6cf..648d84e2e07 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pyx +++ b/python/pylibcudf/pylibcudf/contiguous_split.pyx @@ -18,7 +18,6 @@ from rmm.pylibrmm.device_buffer cimport DeviceBuffer from .gpumemoryview cimport gpumemoryview from .table cimport Table from .utils cimport int_to_void_ptr -import pickle __all__ = [ @@ -64,6 +63,7 @@ cdef class HostBuffer: def __releasebuffer__(self, Py_buffer *buffer): pass + cdef class PackedColumns: """Column data in a serialized format. @@ -105,67 +105,6 @@ cdef class PackedColumns: return PackedColumns.from_libcudf(move(c_packed_columns)) - def serialize( - self, - object cudf_buffer, - object column_names, - object index_names, - object column_dtypes - ): - header = {} - frames = [] - data_header, data_frames = cudf_buffer.serialize() - header["data"] = data_header - frames.extend(data_frames) - - header["column-names"] = column_names - header["index-names"] = index_names - if self.c_obj.get()[0].metadata.get()[0].data() != NULL: - header["metadata"] = list( - - self.c_obj.get()[0].metadata.get()[0].data() - ) - for name, dtype in column_dtypes.items(): - dtype_header, dtype_frames = dtype.serialize() - column_dtypes[name] = ( - dtype_header, - (len(frames), len(frames) + len(dtype_frames)), - ) - frames.extend(dtype_frames) - header["column-dtypes"] = column_dtypes - - return header, frames - - @staticmethod - def deserialize(object gpu_data, object header, object frames): - """ - Deserialize a PackedColumns instance from header and frames. - """ - cdef PackedColumns p = PackedColumns.__new__(PackedColumns) - dbuf = DeviceBuffer( - ptr=gpu_data.get_ptr(mode="write"), - size=gpu_data.nbytes - ) - - cdef packed_columns data - data.metadata = move( - make_unique[vector[uint8_t]]( - move(header.get("metadata", [])) - ) - ) - data.gpu_data = move(dbuf.c_obj) - - p.c_obj = move(make_unique[packed_columns](move(data))) - - column_dtypes = {} - for name, dtype in header["column-dtypes"].items(): - dtype_header, (start, stop) = dtype - column_dtypes[name] = pickle.loads( - dtype_header["type-serialized"] - ).deserialize(dtype_header, frames[start:stop]) - - return p, header["column-names"], header["index-names"], column_dtypes - @property def gpu_data_ptr(self): if self.c_obj.get() != NULL: From a7b9dc9171b46ff9d27444a845fc05e26c8ded0c Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Tue, 12 Nov 2024 17:35:01 -0800 Subject: [PATCH 04/11] remove declaration file --- python/cudf/cudf/_lib/copying.pxd | 1 - python/cudf/cudf/tests/test_pack.py | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) delete mode 100644 python/cudf/cudf/_lib/copying.pxd diff --git a/python/cudf/cudf/_lib/copying.pxd b/python/cudf/cudf/_lib/copying.pxd deleted file mode 100644 index 441eecf5bcc..00000000000 --- a/python/cudf/cudf/_lib/copying.pxd +++ /dev/null @@ -1 +0,0 @@ -# Copyright (c) 2021-2024, NVIDIA CORPORATION. diff --git a/python/cudf/cudf/tests/test_pack.py b/python/cudf/cudf/tests/test_pack.py index d770e3d0fc2..b474bbe9bd8 100644 --- a/python/cudf/cudf/tests/test_pack.py +++ b/python/cudf/cudf/tests/test_pack.py @@ -62,6 +62,7 @@ def assert_packed_frame_equality(df): packed = pack(df) del df unpacked = unpack(packed) + assert_eq(unpacked, pdf) From a4008c7d06ca8da61af276dc38fd5189fda828ab Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Tue, 12 Nov 2024 17:39:43 -0800 Subject: [PATCH 05/11] remove comment --- python/cudf/cudf/_lib/copying.pyx | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/python/cudf/cudf/_lib/copying.pyx b/python/cudf/cudf/_lib/copying.pyx index 29bd2c195a2..c07096edc31 100644 --- a/python/cudf/cudf/_lib/copying.pyx +++ b/python/cudf/cudf/_lib/copying.pyx @@ -361,21 +361,14 @@ class PackedColumns(Serializable): } def serialize(self): + header = {} + frames = [] gpu_data = as_buffer( data=self._data.gpu_data_ptr, size=self._data.gpu_data_size, owner=self, exposed=True ) - # header, frames = self._data.serialize( - # gpu_data, - # self.column_names, - # self.index_names, - # self.column_dtypes, - # ) - - header = {} - frames = [] data_header, data_frames = gpu_data.serialize() header["data"] = data_header frames.extend(data_frames) From 1e273034a43910a7054b9383099cf8fc80fbeae0 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Thu, 14 Nov 2024 07:12:54 -0800 Subject: [PATCH 06/11] address review --- python/cudf/cudf/_lib/copying.pyx | 46 +++++-------------- .../pylibcudf/pylibcudf/contiguous_split.pxd | 1 + .../pylibcudf/pylibcudf/contiguous_split.pyx | 2 +- 3 files changed, 13 insertions(+), 36 deletions(-) diff --git a/python/cudf/cudf/_lib/copying.pyx b/python/cudf/cudf/_lib/copying.pyx index c07096edc31..a9848fd25bf 100644 --- a/python/cudf/cudf/_lib/copying.pyx +++ b/python/cudf/cudf/_lib/copying.pyx @@ -9,7 +9,7 @@ from libcpp.utility cimport move import pylibcudf import cudf -from cudf.core.buffer import Buffer, acquire_spill_lock, as_buffer +from cudf.core.buffer import acquire_spill_lock, as_buffer from cudf._lib.column cimport Column @@ -18,7 +18,6 @@ from cudf._lib.scalar import as_device_scalar from cudf._lib.scalar cimport DeviceScalar from cudf._lib.reduce import minmax -from cudf.core.abc import Serializable from libcpp.memory cimport make_unique @@ -29,11 +28,6 @@ from pylibcudf.libcudf.types cimport size_type from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_pylibcudf_table import pylibcudf as plc -from pylibcudf.contiguous_split cimport PackedColumns as PlcPackedColumns -cimport pylibcudf.libcudf.contiguous_split as cpp_contiguous_split -from libcpp.vector cimport vector -from libc.stdint cimport uint8_t -from rmm.pylibrmm.device_buffer cimport DeviceBuffer # workaround for https://github.com/cython/cython/issues/3885 ctypedef const scalar constscalar @@ -335,7 +329,7 @@ def get_element(Column input_column, size_type index): ) -class PackedColumns(Serializable): +class PackedColumns: """ A packed representation of a Frame, with all columns residing in a single GPU memory buffer. @@ -375,12 +369,6 @@ class PackedColumns(Serializable): header["column-names"] = self.column_names header["index-names"] = self.index_names - cdef PlcPackedColumns p = self._data - if p.c_obj.get()[0].metadata.get()[0].data() != NULL: - header["metadata"] = list( - - p.c_obj.get()[0].metadata.get()[0].data() - ) for name, dtype in self.column_dtypes.items(): dtype_header, dtype_frames = dtype.serialize() self.column_dtypes[name] = ( @@ -390,35 +378,23 @@ class PackedColumns(Serializable): frames.extend(dtype_frames) header["column-dtypes"] = self.column_dtypes header["type-serialized"] = pickle.dumps(type(self)) - return header, frames + return header, (self._data.release(), frames) @classmethod def deserialize(cls, header, frames): - gpu_data = Buffer.deserialize(header["data"], frames) - cdef PlcPackedColumns p = PlcPackedColumns.__new__(PlcPackedColumns) - dbuf = DeviceBuffer( - ptr=gpu_data.get_ptr(mode="write"), - size=gpu_data.nbytes - ) - cdef cpp_contiguous_split.packed_columns data - data.metadata = move( - make_unique[vector[uint8_t]]( - move(header.get("metadata", [])) - ) - ) - data.gpu_data = move(dbuf.c_obj) - - p.c_obj = move(make_unique[cpp_contiguous_split.packed_columns](move(data))) - column_dtypes = {} for name, dtype in header["column-dtypes"].items(): dtype_header, (start, stop) = dtype column_dtypes[name] = pickle.loads( dtype_header["type-serialized"] - ).deserialize(dtype_header, frames[start:stop]) - + ).deserialize(dtype_header, frames[1][start:stop]) + packed_metadata, packed_gpu_data = frames[0] return cls( - p, + plc.contiguous_split.pack( + plc.contiguous_split.unpack_from_memoryviews( + packed_metadata, packed_gpu_data + ) + ), header["column-names"], header["index-names"], column_dtypes, @@ -448,7 +424,7 @@ class PackedColumns(Serializable): column_dtypes[name] = col.dtype return cls( - plc.contiguous_split.PackedColumns.from_plc_table( + plc.contiguous_split.pack( plc.Table( [ col.to_pylibcudf(mode="read") for col in columns diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pxd b/python/pylibcudf/pylibcudf/contiguous_split.pxd index 2a10cb5b3d5..19809e81315 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pxd +++ b/python/pylibcudf/pylibcudf/contiguous_split.pxd @@ -12,6 +12,7 @@ cdef class PackedColumns: @staticmethod cdef PackedColumns from_libcudf(unique_ptr[packed_columns] data) + cpdef release(self) cpdef PackedColumns pack(Table input) diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pyx b/python/pylibcudf/pylibcudf/contiguous_split.pyx index 648d84e2e07..08042e7dfcf 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pyx +++ b/python/pylibcudf/pylibcudf/contiguous_split.pyx @@ -115,7 +115,7 @@ cdef class PackedColumns: if self.c_obj.get() != NULL: return int(self.c_obj.get()[0].gpu_data.get()[0].size()) - def release(self): + cpdef release(self): """Releases and returns the underlying serialized metadata and gpu data. The ownership of the memory are transferred to the returned buffers. After From b371b9a904b9742a7e6d4414172f6e026815bce0 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Thu, 14 Nov 2024 07:15:24 -0800 Subject: [PATCH 07/11] remove from_plc_table --- python/pylibcudf/pylibcudf/contiguous_split.pyx | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pyx b/python/pylibcudf/pylibcudf/contiguous_split.pyx index 08042e7dfcf..e044561a800 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pyx +++ b/python/pylibcudf/pylibcudf/contiguous_split.pyx @@ -88,23 +88,6 @@ cdef class PackedColumns: out.c_obj = move(data) return out - @staticmethod - def from_plc_table(Table input): - """ - Construct a ``PackedColumns`` object from a ``pylibcudf.Table``. - """ - cdef unique_ptr[packed_columns] c_packed_columns = move( - make_unique[packed_columns]( - move( - cpp_pack( - input.view() - ) - ) - ) - ) - - return PackedColumns.from_libcudf(move(c_packed_columns)) - @property def gpu_data_ptr(self): if self.c_obj.get() != NULL: From 658ba08b408865b47fb00dcd075b0435029dd932 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Thu, 14 Nov 2024 07:55:07 -0800 Subject: [PATCH 08/11] address review --- python/cudf/cudf/_lib/copying.pyx | 24 +++++++++---------- .../pylibcudf/pylibcudf/contiguous_split.pyx | 12 +--------- 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/python/cudf/cudf/_lib/copying.pyx b/python/cudf/cudf/_lib/copying.pyx index a9848fd25bf..2c56f1dd2ca 100644 --- a/python/cudf/cudf/_lib/copying.pyx +++ b/python/cudf/cudf/_lib/copying.pyx @@ -28,6 +28,8 @@ from pylibcudf.libcudf.types cimport size_type from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_pylibcudf_table import pylibcudf as plc +from libc.stdint cimport uintptr_t +from rmm.pylibrmm.device_buffer cimport DeviceBuffer # workaround for https://github.com/cython/cython/issues/3885 ctypedef const scalar constscalar @@ -336,7 +338,7 @@ class PackedColumns: """ def __init__(self, data, column_names=None, index_names=None, column_dtypes=None): - self._data = data + self._metadata, self._gpu_data = data.release() self.column_names=column_names self.index_names=index_names self.column_dtypes=column_dtypes @@ -346,20 +348,15 @@ class PackedColumns: @property def __cuda_array_interface__(self): - return { - "data": (self._data.gpu_data_ptr, False), - "shape": (self._data.gpu_data_size,), - "strides": None, - "typestr": "|u1", - "version": 0 - } + return self._gpu_data.__cuda_array_interface__ def serialize(self): header = {} frames = [] + cdef DeviceBuffer dbuf = self._gpu_data.obj gpu_data = as_buffer( - data=self._data.gpu_data_ptr, - size=self._data.gpu_data_size, + data = int(dbuf.c_obj.get()[0].data()), + size = int(dbuf.c_obj.get()[0].size()), owner=self, exposed=True ) @@ -378,7 +375,7 @@ class PackedColumns: frames.extend(dtype_frames) header["column-dtypes"] = self.column_dtypes header["type-serialized"] = pickle.dumps(type(self)) - return header, (self._data.release(), frames) + return header, ((self._metadata, self._gpu_data), frames) @classmethod def deserialize(cls, header, frames): @@ -438,7 +435,10 @@ class PackedColumns: def unpack(self): output_table = cudf.DataFrame._from_data(*data_from_pylibcudf_table( - plc.contiguous_split.unpack(self._data), + plc.contiguous_split.unpack_from_memoryviews( + self._metadata, + self._gpu_data + ), self.column_names, self.index_names )) diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pyx b/python/pylibcudf/pylibcudf/contiguous_split.pyx index e044561a800..dfa1957b129 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pyx +++ b/python/pylibcudf/pylibcudf/contiguous_split.pyx @@ -1,7 +1,7 @@ # Copyright (c) 2024, NVIDIA CORPORATION. from cython.operator cimport dereference -from libc.stdint cimport uint8_t, uintptr_t +from libc.stdint cimport uint8_t from libcpp.memory cimport make_unique, unique_ptr from libcpp.utility cimport move from libcpp.vector cimport vector @@ -88,16 +88,6 @@ cdef class PackedColumns: out.c_obj = move(data) return out - @property - def gpu_data_ptr(self): - if self.c_obj.get() != NULL: - return int(self.c_obj.get()[0].gpu_data.get()[0].data()) - - @property - def gpu_data_size(self): - if self.c_obj.get() != NULL: - return int(self.c_obj.get()[0].gpu_data.get()[0].size()) - cpdef release(self): """Releases and returns the underlying serialized metadata and gpu data. From 89cb0c3a089a559b17cf595e2a8b1ae7a5bf8fe3 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Thu, 14 Nov 2024 11:51:19 -0800 Subject: [PATCH 09/11] add serialization methods to host buffer --- python/cudf/cudf/_lib/copying.pyx | 14 +++++++------- python/pylibcudf/pylibcudf/contiguous_split.pyx | 16 ++++++++++++++++ 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/python/cudf/cudf/_lib/copying.pyx b/python/cudf/cudf/_lib/copying.pyx index 2c56f1dd2ca..216e57e3923 100644 --- a/python/cudf/cudf/_lib/copying.pyx +++ b/python/cudf/cudf/_lib/copying.pyx @@ -5,12 +5,11 @@ import pickle from libcpp cimport bool from libcpp.memory cimport unique_ptr from libcpp.utility cimport move - import pylibcudf import cudf from cudf.core.buffer import acquire_spill_lock, as_buffer - +from cudf.core.abc import Serializable from cudf._lib.column cimport Column from cudf._lib.scalar import as_device_scalar @@ -331,7 +330,7 @@ def get_element(Column input_column, size_type index): ) -class PackedColumns: +class PackedColumns(Serializable): """ A packed representation of a Frame, with all columns residing in a single GPU memory buffer. @@ -366,6 +365,7 @@ class PackedColumns: header["column-names"] = self.column_names header["index-names"] = self.index_names + header["metadata"] = self._metadata.obj for name, dtype in self.column_dtypes.items(): dtype_header, dtype_frames = dtype.serialize() self.column_dtypes[name] = ( @@ -375,7 +375,7 @@ class PackedColumns: frames.extend(dtype_frames) header["column-dtypes"] = self.column_dtypes header["type-serialized"] = pickle.dumps(type(self)) - return header, ((self._metadata, self._gpu_data), frames) + return header, frames @classmethod def deserialize(cls, header, frames): @@ -384,12 +384,12 @@ class PackedColumns: dtype_header, (start, stop) = dtype column_dtypes[name] = pickle.loads( dtype_header["type-serialized"] - ).deserialize(dtype_header, frames[1][start:stop]) - packed_metadata, packed_gpu_data = frames[0] + ).deserialize(dtype_header, frames[start:stop]) return cls( plc.contiguous_split.pack( plc.contiguous_split.unpack_from_memoryviews( - packed_metadata, packed_gpu_data + memoryview(header["metadata"]), + plc.gpumemoryview(frames[0]), ) ), header["column-names"], diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pyx b/python/pylibcudf/pylibcudf/contiguous_split.pyx index dfa1957b129..7326ea66a01 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pyx +++ b/python/pylibcudf/pylibcudf/contiguous_split.pyx @@ -63,6 +63,22 @@ cdef class HostBuffer: def __releasebuffer__(self, Py_buffer *buffer): pass + def __getstate__(self): + return memoryview(self)[:].tobytes() + + def __setstate__(self, state): + cdef unique_ptr[vector[uint8_t]] new_c_obj = unique_ptr[vector[uint8_t]]( + new vector[uint8_t]() + ) + new_c_obj.get().reserve(len(state)) + for byte in state: + new_c_obj.get().push_back(byte) + + self.c_obj = move(new_c_obj) + self.nbytes = dereference(self.c_obj).size() + self.shape[0] = self.nbytes + self.strides[0] = 1 + cdef class PackedColumns: """Column data in a serialized format. From 4acfbded0bede6f153bfbe421abbe9232cac1a09 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Thu, 14 Nov 2024 15:47:32 -0800 Subject: [PATCH 10/11] clean up --- python/cudf/cudf/_lib/copying.pyx | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/python/cudf/cudf/_lib/copying.pyx b/python/cudf/cudf/_lib/copying.pyx index 57fe3f6df60..365e46b1043 100644 --- a/python/cudf/cudf/_lib/copying.pyx +++ b/python/cudf/cudf/_lib/copying.pyx @@ -26,8 +26,6 @@ from pylibcudf.libcudf.types cimport size_type from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_pylibcudf_table import pylibcudf as plc -from libc.stdint cimport uintptr_t -from rmm.pylibrmm.device_buffer cimport DeviceBuffer def _gather_map_is_valid( @@ -348,10 +346,9 @@ class PackedColumns(Serializable): def serialize(self): header = {} frames = [] - cdef DeviceBuffer dbuf = self._gpu_data.obj gpu_data = as_buffer( - data = int(dbuf.c_obj.get()[0].data()), - size = int(dbuf.c_obj.get()[0].size()), + data = self._gpu_data.obj.ptr, + size = self._gpu_data.obj.size, owner=self, exposed=True ) From 84adc0c467e38c9dd6a8165232bc2cc028e781c2 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Fri, 15 Nov 2024 10:27:49 -0800 Subject: [PATCH 11/11] address review --- python/cudf/cudf/_lib/copying.pyx | 11 +++++++++-- .../pylibcudf/pylibcudf/contiguous_split.pxd | 2 +- .../pylibcudf/pylibcudf/contiguous_split.pyx | 18 +----------------- 3 files changed, 11 insertions(+), 20 deletions(-) diff --git a/python/cudf/cudf/_lib/copying.pyx b/python/cudf/cudf/_lib/copying.pyx index 365e46b1043..4dfb12d8ab3 100644 --- a/python/cudf/cudf/_lib/copying.pyx +++ b/python/cudf/cudf/_lib/copying.pyx @@ -26,6 +26,7 @@ from pylibcudf.libcudf.types cimport size_type from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_pylibcudf_table import pylibcudf as plc +from pylibcudf.contiguous_split cimport PackedColumns as PlcPackedColumns def _gather_map_is_valid( @@ -330,7 +331,13 @@ class PackedColumns(Serializable): in a single GPU memory buffer. """ - def __init__(self, data, column_names=None, index_names=None, column_dtypes=None): + def __init__( + self, + PlcPackedColumns data, + object column_names = None, + object index_names = None, + object column_dtypes = None + ): self._metadata, self._gpu_data = data.release() self.column_names=column_names self.index_names=index_names @@ -358,7 +365,7 @@ class PackedColumns(Serializable): header["column-names"] = self.column_names header["index-names"] = self.index_names - header["metadata"] = self._metadata.obj + header["metadata"] = self._metadata.tobytes() for name, dtype in self.column_dtypes.items(): dtype_header, dtype_frames = dtype.serialize() self.column_dtypes[name] = ( diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pxd b/python/pylibcudf/pylibcudf/contiguous_split.pxd index 19809e81315..3745e893c3e 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pxd +++ b/python/pylibcudf/pylibcudf/contiguous_split.pxd @@ -12,7 +12,7 @@ cdef class PackedColumns: @staticmethod cdef PackedColumns from_libcudf(unique_ptr[packed_columns] data) - cpdef release(self) + cpdef tuple release(self) cpdef PackedColumns pack(Table input) diff --git a/python/pylibcudf/pylibcudf/contiguous_split.pyx b/python/pylibcudf/pylibcudf/contiguous_split.pyx index 7326ea66a01..2a40d42e6e9 100644 --- a/python/pylibcudf/pylibcudf/contiguous_split.pyx +++ b/python/pylibcudf/pylibcudf/contiguous_split.pyx @@ -63,22 +63,6 @@ cdef class HostBuffer: def __releasebuffer__(self, Py_buffer *buffer): pass - def __getstate__(self): - return memoryview(self)[:].tobytes() - - def __setstate__(self, state): - cdef unique_ptr[vector[uint8_t]] new_c_obj = unique_ptr[vector[uint8_t]]( - new vector[uint8_t]() - ) - new_c_obj.get().reserve(len(state)) - for byte in state: - new_c_obj.get().push_back(byte) - - self.c_obj = move(new_c_obj) - self.nbytes = dereference(self.c_obj).size() - self.shape[0] = self.nbytes - self.strides[0] = 1 - cdef class PackedColumns: """Column data in a serialized format. @@ -104,7 +88,7 @@ cdef class PackedColumns: out.c_obj = move(data) return out - cpdef release(self): + cpdef tuple release(self): """Releases and returns the underlying serialized metadata and gpu data. The ownership of the memory are transferred to the returned buffers. After