Skip to content

Commit

Permalink
Use pylibcudf contiguous split APIs in cudf python (#17246)
Browse files Browse the repository at this point in the history
Apart of #15162

Authors:
  - Matthew Murray (https://github.com/Matt711)

Approvers:
  - Lawrence Mitchell (https://github.com/wence-)

URL: #17246
  • Loading branch information
Matt711 authored Nov 16, 2024
1 parent e683647 commit 9cc9071
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 140 deletions.
10 changes: 0 additions & 10 deletions python/cudf/cudf/_lib/copying.pxd

This file was deleted.

213 changes: 84 additions & 129 deletions python/cudf/cudf/_lib/copying.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,37 +2,31 @@

import pickle

from libc.stdint cimport uint8_t, uintptr_t
from libcpp cimport bool
from libcpp.memory cimport unique_ptr
from libcpp.utility cimport move
from libcpp.vector cimport vector

from rmm.pylibrmm.device_buffer cimport DeviceBuffer

import pylibcudf

import cudf
from cudf.core.buffer import Buffer, acquire_spill_lock, as_buffer

from cudf.core.buffer import acquire_spill_lock, as_buffer
from cudf.core.abc import Serializable
from cudf._lib.column cimport Column

from cudf._lib.scalar import as_device_scalar

from cudf._lib.scalar cimport DeviceScalar
from cudf._lib.utils cimport table_view_from_table

from cudf._lib.reduce import minmax
from cudf.core.abc import Serializable

from libcpp.memory cimport make_unique

cimport pylibcudf.libcudf.contiguous_split as cpp_contiguous_split
from pylibcudf.libcudf.column.column cimport column
from pylibcudf.libcudf.column.column_view cimport column_view
from pylibcudf.libcudf.types cimport size_type

from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_table_view
from cudf._lib.utils cimport columns_from_pylibcudf_table, data_from_pylibcudf_table
import pylibcudf as plc
from pylibcudf.contiguous_split cimport PackedColumns as PlcPackedColumns


def _gather_map_is_valid(
Expand Down Expand Up @@ -331,54 +325,37 @@ def get_element(Column input_column, size_type index):
)


cdef class _CPackedColumns:

@staticmethod
def from_py_table(input_table, keep_index=True):
"""
Construct a ``PackedColumns`` object from a ``cudf.DataFrame``.
"""
import cudf.core.dtypes

cdef _CPackedColumns p = _CPackedColumns.__new__(_CPackedColumns)

if keep_index and (
not isinstance(input_table.index, cudf.RangeIndex)
or input_table.index.start != 0
or input_table.index.stop != len(input_table)
or input_table.index.step != 1
):
input_table_view = table_view_from_table(input_table)
p.index_names = input_table._index_names
else:
input_table_view = table_view_from_table(
input_table, ignore_index=True)

p.column_names = input_table._column_names
p.column_dtypes = {}
for name, col in input_table._column_labels_and_values:
if isinstance(col.dtype, cudf.core.dtypes._BaseDtype):
p.column_dtypes[name] = col.dtype

p.c_obj = move(cpp_contiguous_split.pack(input_table_view))
class PackedColumns(Serializable):
"""
A packed representation of a Frame, with all columns residing
in a single GPU memory buffer.
"""

return p
def __init__(
self,
PlcPackedColumns data,
object column_names = None,
object index_names = None,
object column_dtypes = None
):
self._metadata, self._gpu_data = data.release()
self.column_names=column_names
self.index_names=index_names
self.column_dtypes=column_dtypes

@property
def gpu_data_ptr(self):
return int(<uintptr_t>self.c_obj.gpu_data.get()[0].data())
def __reduce__(self):
return self.deserialize, self.serialize()

@property
def gpu_data_size(self):
return int(<size_t>self.c_obj.gpu_data.get()[0].size())
def __cuda_array_interface__(self):
return self._gpu_data.__cuda_array_interface__

def serialize(self):
header = {}
frames = []

gpu_data = as_buffer(
data=self.gpu_data_ptr,
size=self.gpu_data_size,
data = self._gpu_data.obj.ptr,
size = self._gpu_data.obj.size,
owner=self,
exposed=True
)
Expand All @@ -388,65 +365,83 @@ cdef class _CPackedColumns:

header["column-names"] = self.column_names
header["index-names"] = self.index_names
if self.c_obj.metadata.get()[0].data() != NULL:
header["metadata"] = list(
<uint8_t[:self.c_obj.metadata.get()[0].size()]>
self.c_obj.metadata.get()[0].data()
)

column_dtypes = {}
header["metadata"] = self._metadata.tobytes()
for name, dtype in self.column_dtypes.items():
dtype_header, dtype_frames = dtype.serialize()
column_dtypes[name] = (
self.column_dtypes[name] = (
dtype_header,
(len(frames), len(frames) + len(dtype_frames)),
)
frames.extend(dtype_frames)
header["column-dtypes"] = column_dtypes

header["column-dtypes"] = self.column_dtypes
header["type-serialized"] = pickle.dumps(type(self))
return header, frames

@staticmethod
def deserialize(header, frames):
cdef _CPackedColumns p = _CPackedColumns.__new__(_CPackedColumns)

gpu_data = Buffer.deserialize(header["data"], frames)

dbuf = DeviceBuffer(
ptr=gpu_data.get_ptr(mode="write"),
size=gpu_data.nbytes
)

cdef cpp_contiguous_split.packed_columns data
data.metadata = move(
make_unique[vector[uint8_t]](
move(<vector[uint8_t]>header.get("metadata", []))
)
)
data.gpu_data = move(dbuf.c_obj)

p.c_obj = move(data)
p.column_names = header["column-names"]
p.index_names = header["index-names"]

@classmethod
def deserialize(cls, header, frames):
column_dtypes = {}
for name, dtype in header["column-dtypes"].items():
dtype_header, (start, stop) = dtype
column_dtypes[name] = pickle.loads(
dtype_header["type-serialized"]
).deserialize(dtype_header, frames[start:stop])
p.column_dtypes = column_dtypes
return cls(
plc.contiguous_split.pack(
plc.contiguous_split.unpack_from_memoryviews(
memoryview(header["metadata"]),
plc.gpumemoryview(frames[0]),
)
),
header["column-names"],
header["index-names"],
column_dtypes,
)

return p
@classmethod
def from_py_table(cls, input_table, keep_index=True):
if keep_index and (
not isinstance(input_table.index, cudf.RangeIndex)
or input_table.index.start != 0
or input_table.index.stop != len(input_table)
or input_table.index.step != 1
):
columns = input_table._index._columns + input_table._columns
index_names = input_table._index_names
else:
columns = input_table._columns
index_names = None

column_names = input_table._column_names
column_dtypes = {}
for name, col in input_table._column_labels_and_values:
if isinstance(
col.dtype,
(cudf.core.dtypes._BaseDtype, cudf.core.dtypes.CategoricalDtype)
):
column_dtypes[name] = col.dtype

return cls(
plc.contiguous_split.pack(
plc.Table(
[
col.to_pylibcudf(mode="read") for col in columns
]
)
),
column_names,
index_names,
column_dtypes,
)

def unpack(self):
output_table = cudf.DataFrame._from_data(*data_from_table_view(
cpp_contiguous_split.unpack(self.c_obj),
self,
output_table = cudf.DataFrame._from_data(*data_from_pylibcudf_table(
plc.contiguous_split.unpack_from_memoryviews(
self._metadata,
self._gpu_data
),
self.column_names,
self.index_names
))

for name, dtype in self.column_dtypes.items():
output_table._data[name] = (
output_table._data[name]._with_type_metadata(dtype)
Expand All @@ -455,46 +450,6 @@ cdef class _CPackedColumns:
return output_table


class PackedColumns(Serializable):
"""
A packed representation of a Frame, with all columns residing
in a single GPU memory buffer.
"""

def __init__(self, data):
self._data = data

def __reduce__(self):
return self.deserialize, self.serialize()

@property
def __cuda_array_interface__(self):
return {
"data": (self._data.gpu_data_ptr, False),
"shape": (self._data.gpu_data_size,),
"strides": None,
"typestr": "|u1",
"version": 0
}

def serialize(self):
header, frames = self._data.serialize()
header["type-serialized"] = pickle.dumps(type(self))

return header, frames

@classmethod
def deserialize(cls, header, frames):
return cls(_CPackedColumns.deserialize(header, frames))

@classmethod
def from_py_table(cls, input_table, keep_index=True):
return cls(_CPackedColumns.from_py_table(input_table, keep_index))

def unpack(self):
return self._data.unpack()


def pack(input_table, keep_index=True):
"""
Pack the columns of a cudf Frame into a single GPU memory buffer.
Expand Down
1 change: 1 addition & 0 deletions python/pylibcudf/pylibcudf/contiguous_split.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ cdef class PackedColumns:

@staticmethod
cdef PackedColumns from_libcudf(unique_ptr[packed_columns] data)
cpdef tuple release(self)

cpdef PackedColumns pack(Table input)

Expand Down
3 changes: 2 additions & 1 deletion python/pylibcudf/pylibcudf/contiguous_split.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ cdef class HostBuffer:
def __releasebuffer__(self, Py_buffer *buffer):
pass


cdef class PackedColumns:
"""Column data in a serialized format.
Expand All @@ -87,7 +88,7 @@ cdef class PackedColumns:
out.c_obj = move(data)
return out

def release(self):
cpdef tuple release(self):
"""Releases and returns the underlying serialized metadata and gpu data.
The ownership of the memory are transferred to the returned buffers. After
Expand Down

0 comments on commit 9cc9071

Please sign in to comment.