From 67267745092ff3e87bf3644937adb8830d656311 Mon Sep 17 00:00:00 2001 From: "Mads R. B. Kristensen" Date: Wed, 4 May 2022 09:14:22 +0200 Subject: [PATCH] Make column serialize more general --- python/cudf/cudf/core/column/column.py | 63 ++++++++++++++++++++++---- python/cudf/cudf/core/column/string.py | 52 --------------------- python/cudf/cudf/core/column/struct.py | 33 -------------- 3 files changed, 54 insertions(+), 94 deletions(-) diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 74aa653e8f5..4d651b4f071 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -1037,32 +1037,65 @@ def unique(self) -> ColumnBase: return drop_duplicates([self], keep="first")[0] def serialize(self) -> Tuple[dict, list]: - header: Dict[Any, Any] = {} + """Serialize column + + The following attributes are serialized: + - "size" + - "data" + - "mask" + - "children" + """ + header = {} frames = [] + # "type-serialized" are used in deserialize_columns() to dispatch + # the deserialization to the correct class. It must be compatible + # with Dask: see . header["type-serialized"] = pickle.dumps(type(self)) - if hasattr(self.dtype, "str"): - # Notice, "dtype" must be availabe for deserialization thus - # if the dtype doesn't support `str` or if it is insufficient - # for deserialization, please overwrite the serialize and/or - # deserialize methods. - header["dtype"] = self.dtype.str + # If `self.dtype` provies a `serialize` method we use it. If not, + # we pickle `self.dtype` itself. + if hasattr(self.dtype, "serialize"): + # Use "dtype-type-serialized" for dispatching the deserialization + # of the dtype. + header["dtype-type-serialized"] = pickle.dumps(type(self.dtype)) + header["dtype-serialized"] = self.dtype.serialize() + else: + header["dtype-serialized"] = pickle.dumps(self.dtype) + header["size"] = self.size if self.data is not None: data_header, data_frames = self.data.serialize() + assert len(data_frames) == 1 header["data"] = data_header frames.extend(data_frames) if self.mask is not None: mask_header, mask_frames = self.mask.serialize() + assert len(mask_frames) == 1 header["mask"] = mask_header frames.extend(mask_frames) + if self.children: + header["children-frame-offset"] = len(frames) + children_headers = [] + for child in self.children: + child_header, child_frames = child.serialize() + assert len(child_frames) == 1 + children_headers.append(child_header) + frames.extend(child_frames) + header["children-headers"] = children_headers + + # "frame_count" are used in deserialize_columns() header["frame_count"] = len(frames) return header, frames @classmethod def deserialize(cls, header: dict, frames: list) -> ColumnBase: - dtype = header["dtype"] + if "dtype-type-serialized" in header: + dtype_type = pickle.loads(header["dtype-type-serialized"]) + dtype = dtype_type.deserialize(*header["dtype-serialized"]) + else: + dtype = pickle.loads(header["dtype-serialized"]) + data = None offset = 0 if "data" in header: @@ -1071,12 +1104,24 @@ def deserialize(cls, header: dict, frames: list) -> ColumnBase: mask = None if "mask" in header: mask = Buffer.deserialize(header["mask"], [frames[offset]]) + + children = [] + if "children-headers" in header: + for child_header, child_frame in zip( + header["children-headers"], + frames[header["children-frame-offset"] :], + ): + column_type = pickle.loads(child_header["type-serialized"]) + children.append( + column_type.deserialize(child_header, [child_frame]) + ) + return build_column( data=data, dtype=dtype, mask=mask, size=header.get("size", None), - children=header.get("children", ()), + children=tuple(children), ) def unary_operator(self, unaryop: str): diff --git a/python/cudf/cudf/core/column/string.py b/python/cudf/cudf/core/column/string.py index 0db7e7d9a27..70097f15372 100644 --- a/python/cudf/cudf/core/column/string.py +++ b/python/cudf/cudf/core/column/string.py @@ -2,14 +2,12 @@ from __future__ import annotations -import pickle import re import warnings from functools import cached_property from typing import ( TYPE_CHECKING, Any, - Dict, Optional, Sequence, Tuple, @@ -5336,56 +5334,6 @@ def to_pandas( pd_series.index = index return pd_series - def serialize(self) -> Tuple[dict, list]: - header: Dict[Any, Any] = {"null_count": self.null_count} - header["type-serialized"] = pickle.dumps(type(self)) - header["size"] = self.size - - frames = [] - sub_headers = [] - - for item in self.children: - sheader, sframes = item.serialize() - sub_headers.append(sheader) - frames.extend(sframes) - - if self.null_count > 0: - frames.append(self.mask) - - header["subheaders"] = sub_headers - header["frame_count"] = len(frames) - return header, frames - - @classmethod - def deserialize(cls, header: dict, frames: list) -> StringColumn: - size = header["size"] - if not isinstance(size, int): - size = pickle.loads(size) - - # Deserialize the mask, value, and offset frames - buffers = [Buffer(each_frame) for each_frame in frames] - - nbuf = None - if header["null_count"] > 0: - nbuf = buffers[2] - - children = [] - for h, b in zip(header["subheaders"], buffers[:2]): - column_type = pickle.loads(h["type-serialized"]) - children.append(column_type.deserialize(h, [b])) - - col = cast( - StringColumn, - column.build_column( - data=None, - dtype="str", - mask=nbuf, - children=tuple(children), - size=size, - ), - ) - return col - def can_cast_safely(self, to_dtype: Dtype) -> bool: to_dtype = cudf.dtype(to_dtype) diff --git a/python/cudf/cudf/core/column/struct.py b/python/cudf/cudf/core/column/struct.py index 4204fa2a259..979d0f46321 100644 --- a/python/cudf/cudf/core/column/struct.py +++ b/python/cudf/cudf/core/column/struct.py @@ -1,9 +1,6 @@ # Copyright (c) 2020-2022, NVIDIA CORPORATION. from __future__ import annotations -import pickle -from typing import Tuple, cast - import pandas as pd import pyarrow as pa @@ -154,36 +151,6 @@ def _with_type_metadata(self: StructColumn, dtype: Dtype) -> StructColumn: return self - def serialize(self) -> Tuple[dict, list]: - header, frames = super().serialize() - header["dtype"] = self.dtype.serialize() - header["size"] = self.size - - header["sub-frame-offset"] = len(frames) - sub_headers = [] - for item in self.children: - sheader, sframes = item.serialize() - sub_headers.append(sheader) - frames.extend(sframes) - - header["sub-headers"] = sub_headers - header["frame_count"] = len(frames) - return header, frames - - @classmethod - def deserialize(cls, header: dict, frames: list) -> StructColumn: - header["dtype"] = StructDtype.deserialize(*header["dtype"]) - sub_frame_offset = header["sub-frame-offset"] - children = [] - for h, b in zip(header["sub-headers"], frames[sub_frame_offset:]): - column_type = pickle.loads(h["type-serialized"]) - children.append(column_type.deserialize(h, [b])) - header["children"] = tuple(children) - return cast( - StructColumn, - super().deserialize(header, frames[:sub_frame_offset]), - ) - class StructMethods(ColumnMethods): """