Skip to content

Commit

Permalink
Fix issues with _CPackedColumns.serialize() handling of host and de…
Browse files Browse the repository at this point in the history
…vice data (#8759)

A few changes in here to resolve some test failures using pack/unpack with DataFrame serialization:

- We now use a `Buffer` in `frames` to represent the device data, so that Dask can correctly perform the necessary DtoD transfers when moving a packed columns object between devices
- With #8697 merged, the results of packing an empty DataFrame will set the pointer to the host data to `NULL`; since Cython cannot make a memoryview from `NULL`, we now check for this condition before making the host data array
- The serialized type is now included in the serialized header

Authors:
  - Charles Blackmon-Luca (https://github.com/charlesbluca)

Approvers:
  - https://github.com/brandon-b-miller
  - Michael Wang (https://github.com/isVoid)

URL: #8759
  • Loading branch information
charlesbluca authored Jul 19, 2021
1 parent 2a8d202 commit cfd3d5d
Showing 1 changed file with 21 additions and 11 deletions.
32 changes: 21 additions & 11 deletions python/cudf/cudf/_lib/copying.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ from libcpp.memory cimport make_shared, make_unique, shared_ptr, unique_ptr
from libcpp.utility cimport move
from libcpp.vector cimport vector

from rmm._lib.device_buffer cimport DeviceBuffer, device_buffer
from rmm._lib.device_buffer cimport DeviceBuffer
from cudf.core.buffer import Buffer

from cudf._lib.column cimport Column

Expand Down Expand Up @@ -824,14 +825,18 @@ cdef class _CPackedColumns:
header = {}
frames = []

gpu_data = Buffer(self.gpu_data_ptr, self.gpu_data_size, self)
data_header, data_frames = gpu_data.serialize()
header["data"] = data_header
frames.extend(data_frames)

header["column-names"] = self.column_names
header["index-names"] = self.index_names
header["gpu-data-ptr"] = self.gpu_data_ptr
header["gpu-data-size"] = self.gpu_data_size
header["metadata"] = list(
<uint8_t[:self.c_obj.metadata_.get()[0].size()]>
self.c_obj.metadata_.get()[0].data()
)
if self.c_obj.metadata_.get()[0].data() != NULL:
header["metadata"] = list(
<uint8_t[:self.c_obj.metadata_.get()[0].size()]>
self.c_obj.metadata_.get()[0].data()
)

column_dtypes = {}
for name, dtype in self.column_dtypes.items():
Expand All @@ -849,15 +854,17 @@ cdef class _CPackedColumns:
def deserialize(header, frames):
cdef _CPackedColumns p = _CPackedColumns.__new__(_CPackedColumns)

gpu_data = Buffer.deserialize(header["data"], frames)

dbuf = DeviceBuffer(
ptr=header["gpu-data-ptr"],
size=header["gpu-data-size"]
ptr=gpu_data.ptr,
size=gpu_data.nbytes
)

cdef cpp_copying.packed_columns data
data.metadata_ = move(
make_unique[cpp_copying.metadata](
move(<vector[uint8_t]>header["metadata"])
move(<vector[uint8_t]>header.get("metadata", []))
)
)
data.gpu_data = move(dbuf.c_obj)
Expand Down Expand Up @@ -915,7 +922,10 @@ class PackedColumns(Serializable):
}

def serialize(self):
return self._data.serialize()
header, frames = self._data.serialize()
header["type-serialized"] = pickle.dumps(type(self))

return header, frames

@classmethod
def deserialize(cls, header, frames):
Expand Down

0 comments on commit cfd3d5d

Please sign in to comment.