Skip to content

Commit

Permalink
Make column serialize more general
Browse files Browse the repository at this point in the history
  • Loading branch information
madsbk committed May 4, 2022
1 parent 47bc22f commit 6726774
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 94 deletions.
63 changes: 54 additions & 9 deletions python/cudf/cudf/core/column/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -1037,32 +1037,65 @@ def unique(self) -> ColumnBase:
return drop_duplicates([self], keep="first")[0]

def serialize(self) -> Tuple[dict, list]:
header: Dict[Any, Any] = {}
"""Serialize column
The following attributes are serialized:
- "size"
- "data"
- "mask"
- "children"
"""
header = {}
frames = []
# "type-serialized" are used in deserialize_columns() to dispatch
# the deserialization to the correct class. It must be compatible
# with Dask: see <https://github.com/rapidsai/cudf/pull/4149>.
header["type-serialized"] = pickle.dumps(type(self))
if hasattr(self.dtype, "str"):
# Notice, "dtype" must be availabe for deserialization thus
# if the dtype doesn't support `str` or if it is insufficient
# for deserialization, please overwrite the serialize and/or
# deserialize methods.
header["dtype"] = self.dtype.str
# If `self.dtype` provies a `serialize` method we use it. If not,
# we pickle `self.dtype` itself.
if hasattr(self.dtype, "serialize"):
# Use "dtype-type-serialized" for dispatching the deserialization
# of the dtype.
header["dtype-type-serialized"] = pickle.dumps(type(self.dtype))
header["dtype-serialized"] = self.dtype.serialize()
else:
header["dtype-serialized"] = pickle.dumps(self.dtype)
header["size"] = self.size

if self.data is not None:
data_header, data_frames = self.data.serialize()
assert len(data_frames) == 1
header["data"] = data_header
frames.extend(data_frames)

if self.mask is not None:
mask_header, mask_frames = self.mask.serialize()
assert len(mask_frames) == 1
header["mask"] = mask_header
frames.extend(mask_frames)

if self.children:
header["children-frame-offset"] = len(frames)
children_headers = []
for child in self.children:
child_header, child_frames = child.serialize()
assert len(child_frames) == 1
children_headers.append(child_header)
frames.extend(child_frames)
header["children-headers"] = children_headers

# "frame_count" are used in deserialize_columns()
header["frame_count"] = len(frames)
return header, frames

@classmethod
def deserialize(cls, header: dict, frames: list) -> ColumnBase:
dtype = header["dtype"]
if "dtype-type-serialized" in header:
dtype_type = pickle.loads(header["dtype-type-serialized"])
dtype = dtype_type.deserialize(*header["dtype-serialized"])
else:
dtype = pickle.loads(header["dtype-serialized"])

data = None
offset = 0
if "data" in header:
Expand All @@ -1071,12 +1104,24 @@ def deserialize(cls, header: dict, frames: list) -> ColumnBase:
mask = None
if "mask" in header:
mask = Buffer.deserialize(header["mask"], [frames[offset]])

children = []
if "children-headers" in header:
for child_header, child_frame in zip(
header["children-headers"],
frames[header["children-frame-offset"] :],
):
column_type = pickle.loads(child_header["type-serialized"])
children.append(
column_type.deserialize(child_header, [child_frame])
)

return build_column(
data=data,
dtype=dtype,
mask=mask,
size=header.get("size", None),
children=header.get("children", ()),
children=tuple(children),
)

def unary_operator(self, unaryop: str):
Expand Down
52 changes: 0 additions & 52 deletions python/cudf/cudf/core/column/string.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

from __future__ import annotations

import pickle
import re
import warnings
from functools import cached_property
from typing import (
TYPE_CHECKING,
Any,
Dict,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -5336,56 +5334,6 @@ def to_pandas(
pd_series.index = index
return pd_series

def serialize(self) -> Tuple[dict, list]:
header: Dict[Any, Any] = {"null_count": self.null_count}
header["type-serialized"] = pickle.dumps(type(self))
header["size"] = self.size

frames = []
sub_headers = []

for item in self.children:
sheader, sframes = item.serialize()
sub_headers.append(sheader)
frames.extend(sframes)

if self.null_count > 0:
frames.append(self.mask)

header["subheaders"] = sub_headers
header["frame_count"] = len(frames)
return header, frames

@classmethod
def deserialize(cls, header: dict, frames: list) -> StringColumn:
size = header["size"]
if not isinstance(size, int):
size = pickle.loads(size)

# Deserialize the mask, value, and offset frames
buffers = [Buffer(each_frame) for each_frame in frames]

nbuf = None
if header["null_count"] > 0:
nbuf = buffers[2]

children = []
for h, b in zip(header["subheaders"], buffers[:2]):
column_type = pickle.loads(h["type-serialized"])
children.append(column_type.deserialize(h, [b]))

col = cast(
StringColumn,
column.build_column(
data=None,
dtype="str",
mask=nbuf,
children=tuple(children),
size=size,
),
)
return col

def can_cast_safely(self, to_dtype: Dtype) -> bool:
to_dtype = cudf.dtype(to_dtype)

Expand Down
33 changes: 0 additions & 33 deletions python/cudf/cudf/core/column/struct.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
from __future__ import annotations

import pickle
from typing import Tuple, cast

import pandas as pd
import pyarrow as pa

Expand Down Expand Up @@ -154,36 +151,6 @@ def _with_type_metadata(self: StructColumn, dtype: Dtype) -> StructColumn:

return self

def serialize(self) -> Tuple[dict, list]:
header, frames = super().serialize()
header["dtype"] = self.dtype.serialize()
header["size"] = self.size

header["sub-frame-offset"] = len(frames)
sub_headers = []
for item in self.children:
sheader, sframes = item.serialize()
sub_headers.append(sheader)
frames.extend(sframes)

header["sub-headers"] = sub_headers
header["frame_count"] = len(frames)
return header, frames

@classmethod
def deserialize(cls, header: dict, frames: list) -> StructColumn:
header["dtype"] = StructDtype.deserialize(*header["dtype"])
sub_frame_offset = header["sub-frame-offset"]
children = []
for h, b in zip(header["sub-headers"], frames[sub_frame_offset:]):
column_type = pickle.loads(h["type-serialized"])
children.append(column_type.deserialize(h, [b]))
header["children"] = tuple(children)
return cast(
StructColumn,
super().deserialize(header, frames[:sub_frame_offset]),
)


class StructMethods(ColumnMethods):
"""
Expand Down

0 comments on commit 6726774

Please sign in to comment.