Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic serialization of all column types #10784

Merged
8 changes: 6 additions & 2 deletions python/cudf/cudf/core/buffer.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2020, NVIDIA CORPORATION.
# Copyright (c) 2020-2022, NVIDIA CORPORATION.
from __future__ import annotations

import functools
Expand Down Expand Up @@ -123,16 +123,20 @@ def serialize(self) -> Tuple[dict, list]:
header["constructor-kwargs"] = {}
header["desc"] = self.__cuda_array_interface__.copy()
header["desc"]["strides"] = (1,)
header["frame_count"] = 1
frames = [self]
return header, frames

@classmethod
def deserialize(cls, header: dict, frames: list) -> Buffer:
assert (
header["frame_count"] == 1
), "Only expecting to deserialize Buffer with a single frame."
buf = cls(frames[0], **header["constructor-kwargs"])

if header["desc"]["shape"] != buf.__cuda_array_interface__["shape"]:
raise ValueError(
f"Recieved a `Buffer` with the wrong size."
f"Received a `Buffer` with the wrong size."
f" Expected {header['desc']['shape']}, "
f"but got {buf.__cuda_array_interface__['shape']}"
)
Expand Down
59 changes: 1 addition & 58 deletions python/cudf/cudf/core/column/categorical.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,9 @@

from __future__ import annotations

import pickle
from collections import abc
from functools import cached_property
from typing import (
TYPE_CHECKING,
Any,
Dict,
Mapping,
Optional,
Sequence,
Tuple,
cast,
)
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, Tuple, cast

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -685,53 +675,6 @@ def __contains__(self, item: ScalarLike) -> bool:
return False
return self._encode(item) in self.as_numerical

def serialize(self) -> Tuple[dict, list]:
header: Dict[Any, Any] = {}
frames = []
header["type-serialized"] = pickle.dumps(type(self))
header["dtype"], dtype_frames = self.dtype.serialize()
header["dtype_frames_count"] = len(dtype_frames)
frames.extend(dtype_frames)
header["data"], data_frames = self.codes.serialize()
header["data_frames_count"] = len(data_frames)
frames.extend(data_frames)
if self.mask is not None:
mask_header, mask_frames = self.mask.serialize()
header["mask"] = mask_header
frames.extend(mask_frames)
header["frame_count"] = len(frames)
return header, frames

@classmethod
def deserialize(cls, header: dict, frames: list) -> CategoricalColumn:
n_dtype_frames = header["dtype_frames_count"]
dtype = CategoricalDtype.deserialize(
header["dtype"], frames[:n_dtype_frames]
)
n_data_frames = header["data_frames_count"]

column_type = pickle.loads(header["data"]["type-serialized"])
data = column_type.deserialize(
header["data"],
frames[n_dtype_frames : n_dtype_frames + n_data_frames],
)
mask = None
if "mask" in header:
mask = Buffer.deserialize(
header["mask"], [frames[n_dtype_frames + n_data_frames]]
)
return cast(
CategoricalColumn,
column.build_column(
data=None,
dtype=dtype,
mask=mask,
children=(
column.build_column(data.base_data, dtype=data.dtype),
),
),
)

def set_base_data(self, value):
if value is not None:
raise RuntimeError(
Expand Down
67 changes: 60 additions & 7 deletions python/cudf/cudf/core/column/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import pickle
import warnings
from functools import cached_property
from itertools import chain
from types import SimpleNamespace
from typing import (
Any,
Expand Down Expand Up @@ -1037,10 +1038,29 @@ def unique(self) -> ColumnBase:
return drop_duplicates([self], keep="first")[0]

def serialize(self) -> Tuple[dict, list]:
# data model:

# Serialization produces a nested metadata "header" and a flattened
# list of memoryviews/buffers that reference data (frames). Each
# header advertises a frame_count slot which indicates how many
# frames deserialization will consume. The class used to construct
# an object is named under the key "type-serialized" to match with
# Dask's serialization protocol (see
# distributed.protocol.serialize). Since column dtypes may either be
# cudf native or foreign some special-casing is required here for
# serialization.

header: Dict[Any, Any] = {}
frames = []
header["type-serialized"] = pickle.dumps(type(self))
header["dtype"] = self.dtype.str
try:
dtype, dtype_frames = self.dtype.serialize()
header["dtype"] = dtype
frames.extend(dtype_frames)
header["dtype-is-cudf-serialized"] = True
shwina marked this conversation as resolved.
Show resolved Hide resolved
except AttributeError:
header["dtype"] = pickle.dumps(self.dtype)
header["dtype-is-cudf-serialized"] = False

if self.data is not None:
data_header, data_frames = self.data.serialize()
Expand All @@ -1051,19 +1071,52 @@ def serialize(self) -> Tuple[dict, list]:
mask_header, mask_frames = self.mask.serialize()
header["mask"] = mask_header
frames.extend(mask_frames)

if self.children:
child_headers, child_frames = zip(
*(c.serialize() for c in self.children)
)
header["subheaders"] = list(child_headers)
frames.extend(chain(*child_frames))
header["size"] = self.size
header["frame_count"] = len(frames)
return header, frames

@classmethod
def deserialize(cls, header: dict, frames: list) -> ColumnBase:
dtype = header["dtype"]
data = Buffer.deserialize(header["data"], [frames[0]])
mask = None
def unpack(header, frames) -> Tuple[Any, list]:
count = header["frame_count"]
klass = pickle.loads(header["type-serialized"])
obj = klass.deserialize(header, frames[:count])
return obj, frames[count:]

assert header["frame_count"] == len(frames), (
f"Deserialization expected {header['frame_count']} frames, "
f"but received {len(frames)}"
)
if header["dtype-is-cudf-serialized"]:
dtype, frames = unpack(header["dtype"], frames)
else:
dtype = pickle.loads(header["dtype"])
if "data" in header:
data, frames = unpack(header["data"], frames)
else:
data = None
if "mask" in header:
mask = Buffer.deserialize(header["mask"], [frames[1]])
mask, frames = unpack(header["mask"], frames)
else:
mask = None
children = []
if "subheaders" in header:
for h in header["subheaders"]:
child, frames = unpack(h, frames)
children.append(child)
assert len(frames) == 0, "Deserialization did not consume all frames"
return build_column(
data=data, dtype=dtype, mask=mask, size=header.get("size", None)
data=data,
dtype=dtype,
mask=mask,
size=header.get("size", None),
children=tuple(children),
)

def unary_operator(self, unaryop: str):
Expand Down
14 changes: 1 addition & 13 deletions python/cudf/cudf/core/column/decimal.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import warnings
from decimal import Decimal
from typing import Any, Sequence, Tuple, Union, cast
from typing import Any, Sequence, Union, cast

import cupy as cp
import numpy as np
Expand Down Expand Up @@ -321,18 +321,6 @@ def to_arrow(self):
buffers=[mask_buf, data_buf],
)

def serialize(self) -> Tuple[dict, list]:
header, frames = super().serialize()
header["dtype"] = self.dtype.serialize()
header["size"] = self.size
return header, frames

@classmethod
def deserialize(cls, header: dict, frames: list) -> ColumnBase:
dtype = cudf.Decimal64Dtype.deserialize(*header["dtype"])
header["dtype"] = dtype
return super().deserialize(header, frames)

@property
def __cuda_array_interface__(self):
raise NotImplementedError(
Expand Down
60 changes: 0 additions & 60 deletions python/cudf/cudf/core/column/lists.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Copyright (c) 2020-2022, NVIDIA CORPORATION.

import pickle
from functools import cached_property
from typing import List, Optional, Sequence, Union

Expand Down Expand Up @@ -28,7 +27,6 @@
is_list_dtype,
is_scalar,
)
from cudf.core.buffer import Buffer
from cudf.core.column import ColumnBase, as_column, column
from cudf.core.column.methods import ColumnMethods, ParentType
from cudf.core.dtypes import ListDtype
Expand Down Expand Up @@ -166,64 +164,6 @@ def set_base_data(self, value):
else:
super().set_base_data(value)

def serialize(self):
header = {}
frames = []
header["type-serialized"] = pickle.dumps(type(self))
header["null_count"] = self.null_count
header["size"] = self.size
header["dtype"], dtype_frames = self.dtype.serialize()
header["dtype_frames_count"] = len(dtype_frames)
frames.extend(dtype_frames)

sub_headers = []

for item in self.children:
sheader, sframes = item.serialize()
sub_headers.append(sheader)
frames.extend(sframes)

if self.null_count > 0:
frames.append(self.mask)

header["subheaders"] = sub_headers
header["frame_count"] = len(frames)

return header, frames

@classmethod
def deserialize(cls, header, frames):

# Get null mask
if header["null_count"] > 0:
mask = Buffer(frames[-1])
else:
mask = None

# Deserialize dtype
dtype = pickle.loads(header["dtype"]["type-serialized"]).deserialize(
header["dtype"], frames[: header["dtype_frames_count"]]
)

# Deserialize child columns
children = []
f = header["dtype_frames_count"]
for h in header["subheaders"]:
fcount = h["frame_count"]
child_frames = frames[f : f + fcount]
column_type = pickle.loads(h["type-serialized"])
children.append(column_type.deserialize(h, child_frames))
f += fcount

# Materialize list column
return column.build_column(
data=None,
dtype=dtype,
mask=mask,
children=tuple(children),
size=header["size"],
)

@property
def __cuda_array_interface__(self):
raise NotImplementedError(
Expand Down
52 changes: 0 additions & 52 deletions python/cudf/cudf/core/column/string.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,12 @@

from __future__ import annotations

import pickle
import re
import warnings
from functools import cached_property
from typing import (
TYPE_CHECKING,
Any,
Dict,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -5336,56 +5334,6 @@ def to_pandas(
pd_series.index = index
return pd_series

def serialize(self) -> Tuple[dict, list]:
header: Dict[Any, Any] = {"null_count": self.null_count}
header["type-serialized"] = pickle.dumps(type(self))
header["size"] = self.size

frames = []
sub_headers = []

for item in self.children:
sheader, sframes = item.serialize()
sub_headers.append(sheader)
frames.extend(sframes)

if self.null_count > 0:
frames.append(self.mask)

header["subheaders"] = sub_headers
header["frame_count"] = len(frames)
return header, frames

@classmethod
def deserialize(cls, header: dict, frames: list) -> StringColumn:
size = header["size"]
if not isinstance(size, int):
size = pickle.loads(size)

# Deserialize the mask, value, and offset frames
buffers = [Buffer(each_frame) for each_frame in frames]

nbuf = None
if header["null_count"] > 0:
nbuf = buffers[2]

children = []
for h, b in zip(header["subheaders"], buffers[:2]):
column_type = pickle.loads(h["type-serialized"])
children.append(column_type.deserialize(h, [b]))

col = cast(
StringColumn,
column.build_column(
data=None,
dtype="str",
mask=nbuf,
children=tuple(children),
size=size,
),
)
return col

def can_cast_safely(self, to_dtype: Dtype) -> bool:
to_dtype = cudf.dtype(to_dtype)

Expand Down
Loading