From 32bacfaa0a75fd3fb5fb44b106d8138f83001184 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Isma=C3=ABl=20Kon=C3=A9?= Date: Thu, 18 Nov 2021 00:24:07 +0100 Subject: [PATCH] Interchange dataframe protocol (#9071) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR is a basic implementation of the [interchange dataframe protocol](https://github.com/data-apis/dataframe-api/blob/main/protocol/dataframe_protocol.py) for cudf. As well-known, there are many dataframe libraries out there where one's weakness is handle by another. To work across these libraries, we rely on `pandas` with method like `from_pandas` and `to_pandas`. This is a bad design as libraries should maintain an additional dependency to pandas peculiarities. This protocol provides a high level API that must be implemented by dataframe libraries to allow communication between them. Thus, we get rid of the high coupling with pandas and depend only on the protocol API where each library has the freedom of its implementation details. To illustrate: - `df_obj = cudf_dataframe.__dataframe__()` `df_obj` can be consumed by any library implementing the protocol. - `df = cudf.from_dataframe(any_supported_dataframe)` here we create a `cudf dataframe` from any dataframe object supporting the protocol. So far, it supports the following: - Column dtypes: `uint8`, `int`, `float`, `bool` and `categorical`. - Missing values are handled for all these dtypes. - `string` support is on the way. Additionally, we support dataframe from CPU device like `pandas`. But it is not testable here as pandas has not yet adopted the protocol. We've tested it locally with a pandas monkey patched implementation of the protocol. Authors: - Ismaël Koné (https://github.com/iskode) - Bradley Dice (https://github.com/bdice) Approvers: - Ashwin Srinath (https://github.com/shwina) - Bradley Dice (https://github.com/bdice) - Vyas Ramasubramani (https://github.com/vyasr) URL: https://github.com/rapidsai/cudf/pull/9071 --- python/cudf/cudf/__init__.py | 2 +- python/cudf/cudf/core/dataframe.py | 13 +- python/cudf/cudf/core/df_protocol.py | 829 +++++++++++++++++++++ python/cudf/cudf/tests/test_df_protocol.py | 219 ++++++ 4 files changed, 1061 insertions(+), 2 deletions(-) create mode 100644 python/cudf/cudf/core/df_protocol.py create mode 100644 python/cudf/cudf/tests/test_df_protocol.py diff --git a/python/cudf/cudf/__init__.py b/python/cudf/cudf/__init__.py index bc35551b5bd..f696a00d1ed 100644 --- a/python/cudf/cudf/__init__.py +++ b/python/cudf/cudf/__init__.py @@ -42,7 +42,7 @@ UInt64Index, interval_range, ) -from cudf.core.dataframe import DataFrame, from_pandas, merge +from cudf.core.dataframe import DataFrame, from_pandas, merge, from_dataframe from cudf.core.series import Series from cudf.core.multiindex import MultiIndex from cudf.core.cut import cut diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index a95453a4e62..bfbe8b06c17 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -40,7 +40,7 @@ is_string_dtype, is_struct_dtype, ) -from cudf.core import column, reshape +from cudf.core import column, df_protocol, reshape from cudf.core.abc import Serializable from cudf.core.column import ( as_column, @@ -6329,6 +6329,17 @@ def explode(self, column, ignore_index=False): return super()._explode(column, ignore_index) + def __dataframe__( + self, nan_as_null: bool = False, allow_copy: bool = True + ): + return df_protocol.__dataframe__( + self, nan_as_null=nan_as_null, allow_copy=allow_copy + ) + + +def from_dataframe(df, allow_copy=False): + return df_protocol.from_dataframe(df, allow_copy=allow_copy) + def make_binop_func(op, postprocess=None): # This function is used to wrap binary operations in Frame with an diff --git a/python/cudf/cudf/core/df_protocol.py b/python/cudf/cudf/core/df_protocol.py new file mode 100644 index 00000000000..8f258ce27b2 --- /dev/null +++ b/python/cudf/cudf/core/df_protocol.py @@ -0,0 +1,829 @@ +import collections +import enum +from typing import ( + Any, + Dict, + Iterable, + Mapping, + Optional, + Sequence, + Tuple, + cast, +) + +import cupy as cp +import numpy as np +from numba.cuda import as_cuda_array + +import cudf +from cudf.core.buffer import Buffer +from cudf.core.column import as_column, build_categorical_column, build_column + +# Implementation of interchange protocol classes +# ---------------------------------------------- + + +class _DtypeKind(enum.IntEnum): + INT = 0 + UINT = 1 + FLOAT = 2 + BOOL = 20 + STRING = 21 # UTF-8 + DATETIME = 22 + CATEGORICAL = 23 + + +class _Device(enum.IntEnum): + CPU = 1 + CUDA = 2 + CPU_PINNED = 3 + OPENCL = 4 + VULKAN = 7 + METAL = 8 + VPI = 9 + ROCM = 10 + + +_SUPPORTED_KINDS = { + _DtypeKind.INT, + _DtypeKind.UINT, + _DtypeKind.FLOAT, + _DtypeKind.CATEGORICAL, + _DtypeKind.BOOL, + _DtypeKind.STRING, +} +ProtoDtype = Tuple[_DtypeKind, int, str, str] + + +class _CuDFBuffer: + """ + Data in the buffer is guaranteed to be contiguous in memory. + """ + + def __init__( + self, + buf: cudf.core.buffer.Buffer, + dtype: np.dtype, + allow_copy: bool = True, + ) -> None: + """ + Use cudf.core.buffer.Buffer object. + """ + # Store the cudf buffer where the data resides as a private + # attribute, so we can use it to retrieve the public attributes + self._buf = buf + self._dtype = dtype + self._allow_copy = allow_copy + + @property + def bufsize(self) -> int: + """ + Buffer size in bytes. + """ + return self._buf.nbytes + + @property + def ptr(self) -> int: + """ + Pointer to start of the buffer as an integer. + """ + return self._buf.ptr + + def __dlpack__(self): + """ + DLPack not implemented in NumPy yet, so leave it out here. + """ + try: + cudarray = as_cuda_array(self._buf).view(self._dtype) + res = cp.asarray(cudarray).toDlpack() + + except ValueError: + raise TypeError(f"dtype {self._dtype} unsupported by `dlpack`") + + return res + + def __dlpack_device__(self) -> Tuple[_Device, int]: + """ + _Device type and _Device ID for where the data in the buffer resides. + """ + return (_Device.CUDA, cp.asarray(self._buf).device.id) + + def __repr__(self) -> str: + return f"{self.__class__.__name__}(" + str( + { + "bufsize": self.bufsize, + "ptr": self.ptr, + "dlpack": self.__dlpack__(), + "device": self.__dlpack_device__()[0].name, + } + ) + +")" + + +class _CuDFColumn: + """ + A column object, with only the methods and properties required by the + interchange protocol defined. + + A column can contain one or more chunks. Each chunk can contain up to three + buffers - a data buffer, a mask buffer (depending on null representation), + and an offsets buffer (if variable-size binary; e.g., variable-length + strings). + + Note: this Column object can only be produced by ``__dataframe__``, so + doesn't need its own version or ``__column__`` protocol. + + """ + + def __init__( + self, + column: cudf.core.column.ColumnBase, + nan_as_null: bool = True, + allow_copy: bool = True, + ) -> None: + """ + Note: doesn't deal with extension arrays yet, just assume a regular + Series/ndarray for now. + """ + if not isinstance(column, cudf.core.column.ColumnBase): + raise TypeError( + "column must be a subtype of df.core.column.ColumnBase," + f"got {type(column)}" + ) + self._col = column + self._nan_as_null = nan_as_null + self._allow_copy = allow_copy + + @property + def size(self) -> int: + """ + Size of the column, in elements. + """ + return self._col.size + + @property + def offset(self) -> int: + """ + Offset of first element. Always zero. + """ + return 0 + + @property + def dtype(self) -> ProtoDtype: + """ + Dtype description as a tuple + ``(kind, bit-width, format string, endianness)`` + + Kind : + + - INT = 0 + - UINT = 1 + - FLOAT = 2 + - BOOL = 20 + - STRING = 21 # UTF-8 + - DATETIME = 22 + - CATEGORICAL = 23 + + Bit-width : the number of bits as an integer + Format string : data type description format string in Apache Arrow C + Data Interface format. + Endianness : current only native endianness (``=``) is supported + + Notes: + + - Kind specifiers are aligned with DLPack where possible + (hence the jump to 20, leave enough room for future extension) + - Masks must be specified as boolean with either bit width 1 + (for bit masks) or 8 (for byte masks). + - Dtype width in bits was preferred over bytes + - Endianness isn't too useful, but included now in case + in the future we need to support non-native endianness + - Went with Apache Arrow format strings over NumPy format strings + because they're more complete from a dataframe perspective + - Format strings are mostly useful for datetime specification, + and for categoricals. + - For categoricals, the format string describes the type of the + categorical in the data buffer. In case of a separate encoding + of the categorical (e.g. an integer to string mapping), + this can be derived from ``self.describe_categorical``. + - Data types not included: complex, Arrow-style null, + binary, decimal, and nested (list, struct, map, union) dtypes. + """ + dtype = self._col.dtype + + # For now, assume that, if the column dtype is 'O' (i.e., `object`), + # then we have an array of strings + if not isinstance(dtype, cudf.CategoricalDtype) and dtype.kind == "O": + return (_DtypeKind.STRING, 8, "u", "=") + + return self._dtype_from_cudfdtype(dtype) + + def _dtype_from_cudfdtype(self, dtype) -> ProtoDtype: + """ + See `self.dtype` for details. + """ + # Note: 'c' (complex) not handled yet (not in array spec v1). + # 'b', 'B' (bytes), 'S', 'a', (old-style string) 'V' (void) + # not handled datetime and timedelta both map to datetime + # (is timedelta handled?) + _np_kinds = { + "i": _DtypeKind.INT, + "u": _DtypeKind.UINT, + "f": _DtypeKind.FLOAT, + "b": _DtypeKind.BOOL, + "U": _DtypeKind.STRING, + "M": _DtypeKind.DATETIME, + "m": _DtypeKind.DATETIME, + } + kind = _np_kinds.get(dtype.kind, None) + if kind is None: + # Not a NumPy/CuPy dtype. Check if it's a categorical maybe + if isinstance(dtype, cudf.CategoricalDtype): + kind = _DtypeKind.CATEGORICAL + # Codes and categories' dtypes are different. + # We use codes' dtype as these are stored in the buffer. + codes = cast( + cudf.core.column.CategoricalColumn, self._col + ).codes + dtype = codes.dtype + else: + raise ValueError( + f"Data type {dtype} not supported by exchange protocol" + ) + + if kind not in _SUPPORTED_KINDS: + raise NotImplementedError(f"Data type {dtype} not handled yet") + + bitwidth = dtype.itemsize * 8 + format_str = dtype.str + endianness = dtype.byteorder if kind != _DtypeKind.CATEGORICAL else "=" + return (kind, bitwidth, format_str, endianness) + + @property + def describe_categorical(self) -> Tuple[bool, bool, Dict[int, Any]]: + """ + If the dtype is categorical, there are two options: + + - There are only values in the data buffer. + - There is a separate dictionary-style encoding for categorical values. + + Raises TypeError if the dtype is not categorical + + Content of returned dict: + + - "is_ordered" : bool, whether the ordering of dictionary + indices is semantically meaningful. + - "is_dictionary" : bool, whether a dictionary-style mapping of + categorical values to other objects exists + - "mapping" : dict, Python-level only (e.g. ``{int: str}``). + None if not a dictionary-style categorical. + """ + if not self.dtype[0] == _DtypeKind.CATEGORICAL: + raise TypeError( + "`describe_categorical only works on " + "a column with categorical dtype!" + ) + categ_col = cast(cudf.core.column.CategoricalColumn, self._col) + ordered = bool(categ_col.dtype.ordered) + is_dictionary = True + # NOTE: this shows the children approach is better, transforming + # `categories` to a "mapping" dict is inefficient + categories = categ_col.categories + mapping = {ix: val for ix, val in enumerate(categories.values_host)} + return ordered, is_dictionary, mapping + + @property + def describe_null(self) -> Tuple[int, Any]: + """ + Return the missing value (or "null") representation the column dtype + uses, as a tuple ``(kind, value)``. + + Kind: + + - 0 : non-nullable + - 1 : NaN/NaT + - 2 : sentinel value + - 3 : bit mask + - 4 : byte mask + + Value : if kind is "sentinel value", the actual value. + If kind is a bit mask or a byte mask, the value (0 or 1) + indicating a missing value. + None otherwise. + """ + kind = self.dtype[0] + if self.null_count == 0: + # there is no validity mask so it is non-nullable + return 0, None + + elif kind in _SUPPORTED_KINDS: + # bit mask is universally used in cudf for missing + return 3, 0 + + else: + raise NotImplementedError( + f"Data type {self.dtype} not yet supported" + ) + + @property + def null_count(self) -> int: + """ + Number of null elements. Should always be known. + """ + return self._col.null_count + + @property + def metadata(self) -> Dict[str, Any]: + """ + Store specific metadata of the column. + """ + return {} + + def num_chunks(self) -> int: + """ + Return the number of chunks the column consists of. + """ + return 1 + + def get_chunks( + self, n_chunks: Optional[int] = None + ) -> Iterable["_CuDFColumn"]: + """ + Return an iterable yielding the chunks. + + See `DataFrame.get_chunks` for details on ``n_chunks``. + """ + return (self,) + + def get_buffers( + self, + ) -> Mapping[str, Optional[Tuple[_CuDFBuffer, ProtoDtype]]]: + """ + Return a dictionary containing the underlying buffers. + + The returned dictionary has the following contents: + + - "data": a two-element tuple whose first element is a buffer + containing the data and whose second element is the data + buffer's associated dtype. + - "validity": a two-element tuple whose first element is a buffer + containing mask values indicating missing data and + whose second element is the mask value buffer's + associated dtype. None if the null representation is + not a bit or byte mask. + - "offsets": a two-element tuple whose first element is a buffer + containing the offset values for variable-size binary + data (e.g., variable-length strings) and whose second + element is the offsets buffer's associated dtype. None + if the data buffer does not have an associated offsets + buffer. + """ + buffers = {} + try: + buffers["validity"] = self._get_validity_buffer() + except RuntimeError: + buffers["validity"] = None + + try: + buffers["offsets"] = self._get_offsets_buffer() + except RuntimeError: + buffers["offsets"] = None + + buffers["data"] = self._get_data_buffer() + + return buffers + + def _get_validity_buffer( + self, + ) -> Optional[Tuple[_CuDFBuffer, ProtoDtype]]: + """ + Return the buffer containing the mask values + indicating missing data and the buffer's associated dtype. + + Raises RuntimeError if null representation is not a bit or byte mask. + """ + + null, invalid = self.describe_null + if null == 3: + if self.dtype[0] == _DtypeKind.CATEGORICAL: + valid_mask = cast( + cudf.core.column.CategoricalColumn, self._col + ).codes._get_mask_as_column() + else: + valid_mask = self._col._get_mask_as_column() + + assert (valid_mask is not None) and ( + valid_mask.data is not None + ), "valid_mask(.data) should not be None when " + "_CuDFColumn.describe_null[0] = 3" + buffer = _CuDFBuffer( + valid_mask.data, cp.uint8, allow_copy=self._allow_copy + ) + dtype = (_DtypeKind.UINT, 8, "C", "=") + return buffer, dtype + + elif null == 1: + raise RuntimeError( + "This column uses NaN as null " + "so does not have a separate mask" + ) + elif null == 0: + raise RuntimeError( + "This column is non-nullable so does not have a mask" + ) + else: + raise NotImplementedError( + f"See {self.__class__.__name__}.describe_null method." + ) + + def _get_offsets_buffer(self,) -> Optional[Tuple[_CuDFBuffer, ProtoDtype]]: + """ + Return the buffer containing the offset values for + variable-size binary data (e.g., variable-length strings) + and the buffer's associated dtype. + + Raises RuntimeError if the data buffer does not have an associated + offsets buffer. + """ + if self.dtype[0] == _DtypeKind.STRING: + offsets = self._col.children[0] + assert (offsets is not None) and (offsets.data is not None), " " + "offsets(.data) should not be None for string column" + + buffer = _CuDFBuffer( + offsets.data, offsets.dtype, allow_copy=self._allow_copy + ) + dtype = self._dtype_from_cudfdtype(offsets.dtype) + else: + raise RuntimeError( + "This column has a fixed-length dtype " + "so does not have an offsets buffer" + ) + + return buffer, dtype + + def _get_data_buffer(self,) -> Tuple[_CuDFBuffer, ProtoDtype]: + """ + Return the buffer containing the data and + the buffer's associated dtype. + """ + if self.dtype[0] in ( + _DtypeKind.INT, + _DtypeKind.UINT, + _DtypeKind.FLOAT, + _DtypeKind.BOOL, + ): + col_data = self._col + dtype = self.dtype + + elif self.dtype[0] == _DtypeKind.CATEGORICAL: + col_data = cast( + cudf.core.column.CategoricalColumn, self._col + ).codes + dtype = self._dtype_from_cudfdtype(col_data.dtype) + + elif self.dtype[0] == _DtypeKind.STRING: + col_data = self._col.children[1] + dtype = self._dtype_from_cudfdtype(col_data.dtype) + + else: + raise NotImplementedError( + f"Data type {self._col.dtype} not handled yet" + ) + assert (col_data is not None) and (col_data.data is not None), " " + f"col_data(.data) should not be None when dtype = {dtype}" + buffer = _CuDFBuffer( + col_data.data, col_data.dtype, allow_copy=self._allow_copy + ) + + return buffer, dtype + + +class _CuDFDataFrame: + """ + A data frame class, with only the methods required by the interchange + protocol defined. + + Instances of this (private) class are returned from + ``cudf.DataFrame.__dataframe__`` as objects with the methods and + attributes defined on this class. + """ + + def __init__( + self, + df: "cudf.core.dataframe.DataFrame", + nan_as_null: bool = True, + allow_copy: bool = True, + ) -> None: + """ + Constructor - an instance of this (private) class is returned from + `cudf.DataFrame.__dataframe__`. + """ + self._df = df + # ``nan_as_null`` is a keyword intended for the consumer to tell the + # producer to overwrite null values in the data with + # ``NaN`` (or ``NaT``). + # This currently has no effect; once support for nullable extension + # dtypes is added, this value should be propagated to columns. + self._nan_as_null = nan_as_null + self._allow_copy = allow_copy + + @property + def metadata(self): + # `index` isn't a regular column, and the protocol doesn't support row + # labels - so we export it as cuDF-specific metadata here. + return {"cudf.index": self._df.index} + + def num_columns(self) -> int: + return len(self._df.columns) + + def num_rows(self) -> int: + return len(self._df) + + def num_chunks(self) -> int: + return 1 + + def column_names(self) -> Iterable[str]: + return self._df.columns.tolist() + + def get_column(self, i: int) -> _CuDFColumn: + return _CuDFColumn( + as_column(self._df.iloc[:, i]), allow_copy=self._allow_copy + ) + + def get_column_by_name(self, name: str) -> _CuDFColumn: + return _CuDFColumn( + as_column(self._df[name]), allow_copy=self._allow_copy + ) + + def get_columns(self) -> Iterable[_CuDFColumn]: + return [ + _CuDFColumn(as_column(self._df[name]), allow_copy=self._allow_copy) + for name in self._df.columns + ] + + def select_columns(self, indices: Sequence[int]) -> "_CuDFDataFrame": + if not isinstance(indices, collections.abc.Sequence): + raise ValueError("`indices` is not a sequence") + + return _CuDFDataFrame(self._df.iloc[:, indices]) + + def select_columns_by_name(self, names: Sequence[str]) -> "_CuDFDataFrame": + if not isinstance(names, collections.Sequence): + raise ValueError("`names` is not a sequence") + + return _CuDFDataFrame( + self._df.loc[:, names], self._nan_as_null, self._allow_copy + ) + + def get_chunks( + self, n_chunks: Optional[int] = None + ) -> Iterable["_CuDFDataFrame"]: + """ + Return an iterator yielding the chunks. + """ + return (self,) + + +def __dataframe__( + self, nan_as_null: bool = False, allow_copy: bool = True +) -> _CuDFDataFrame: + """ + The public method to attach to cudf.DataFrame. + + ``nan_as_null`` is a keyword intended for the consumer to tell the + producer to overwrite null values in the data with ``NaN`` (or ``NaT``). + This currently has no effect; once support for nullable extension + dtypes is added, this value should be propagated to columns. + + ``allow_copy`` is a keyword that defines whether or not the library is + allowed to make a copy of the data. For example, copying data would be + necessary if a library supports strided buffers, given that this protocol + specifies contiguous buffers. + """ + return _CuDFDataFrame(self, nan_as_null=nan_as_null, allow_copy=allow_copy) + + +""" +Implementation of the dataframe exchange protocol. + +Public API +---------- + +from_dataframe : construct a cudf.DataFrame from an input data frame which + implements the exchange protocol + +Notes +----- + +- Interpreting a raw pointer (as in ``Buffer.ptr``) is annoying and unsafe to + do in pure Python. It's more general but definitely less friendly than + having ``to_arrow`` and ``to_numpy`` methods. So for the buffers which lack + ``__dlpack__`` (e.g., because the column dtype isn't supported by DLPack), + this is worth looking at again. + +""" + + +# A typing protocol could be added later to let Mypy validate code using +# `from_dataframe` better. +DataFrameObject = Any +ColumnObject = Any + + +_INTS = {8: cp.int8, 16: cp.int16, 32: cp.int32, 64: cp.int64} +_UINTS = {8: cp.uint8, 16: cp.uint16, 32: cp.uint32, 64: cp.uint64} +_FLOATS = {32: cp.float32, 64: cp.float64} +_CP_DTYPES = {0: _INTS, 1: _UINTS, 2: _FLOATS, 20: {8: bool}} + + +def from_dataframe( + df: DataFrameObject, allow_copy: bool = False +) -> _CuDFDataFrame: + """ + Construct a cudf DataFrame from ``df`` if it supports ``__dataframe__`` + """ + if isinstance(df, cudf.DataFrame): + return df + + if not hasattr(df, "__dataframe__"): + raise ValueError("`df` does not support __dataframe__") + + return _from_dataframe(df.__dataframe__(allow_copy=allow_copy)) + + +def _from_dataframe(df: DataFrameObject) -> _CuDFDataFrame: + """ + Create a cudf DataFrame object from DataFrameObject. + """ + # Check number of chunks, if there's more than one we need to iterate + if df.num_chunks() > 1: + raise NotImplementedError("More than one chunk not handled yet") + + # We need a dict of columns here, with each column being a cudf column. + columns = dict() + _buffers = [] # hold on to buffers, keeps memory alive + for name in df.column_names(): + col = df.get_column_by_name(name) + + if col.dtype[0] in ( + _DtypeKind.INT, + _DtypeKind.UINT, + _DtypeKind.FLOAT, + _DtypeKind.BOOL, + ): + columns[name], _buf = _protocol_to_cudf_column_numeric(col) + + elif col.dtype[0] == _DtypeKind.CATEGORICAL: + columns[name], _buf = _protocol_to_cudf_column_categorical(col) + + elif col.dtype[0] == _DtypeKind.STRING: + columns[name], _buf = _protocol_to_cudf_column_string(col) + + else: + raise NotImplementedError( + f"Data type {col.dtype[0]} not handled yet" + ) + + _buffers.append(_buf) + + df_new = cudf.DataFrame._from_data(columns) + df_new._buffers = _buffers + return df_new + + +def _protocol_to_cudf_column_numeric( + col: _CuDFColumn, +) -> Tuple[ + cudf.core.column.ColumnBase, + Mapping[str, Optional[Tuple[_CuDFBuffer, ProtoDtype]]], +]: + """ + Convert an int, uint, float or bool protocol column + to the corresponding cudf column + """ + if col.offset != 0: + raise NotImplementedError("column.offset > 0 not handled yet") + + buffers = col.get_buffers() + assert buffers["data"] is not None, "data buffer should not be None" + _dbuffer, _ddtype = buffers["data"] + _check_buffer_is_on_gpu(_dbuffer) + cudfcol_num = build_column( + Buffer(_dbuffer.ptr, _dbuffer.bufsize), + protocol_dtype_to_cupy_dtype(_ddtype), + ) + return _set_missing_values(col, cudfcol_num), buffers + + +def _check_buffer_is_on_gpu(buffer: _CuDFBuffer) -> None: + if ( + buffer.__dlpack_device__()[0] != _Device.CUDA + and not buffer._allow_copy + ): + raise TypeError( + "This operation must copy data from CPU to GPU. " + "Set `allow_copy=True` to allow it." + ) + + elif buffer.__dlpack_device__()[0] != _Device.CUDA and buffer._allow_copy: + raise NotImplementedError( + "Only cuDF/GPU dataframes are supported for now. " + "CPU (like `Pandas`) dataframes will be supported shortly." + ) + + +def _set_missing_values( + protocol_col: _CuDFColumn, cudf_col: cudf.core.column.ColumnBase +) -> cudf.core.column.ColumnBase: + + valid_mask = protocol_col.get_buffers()["validity"] + if valid_mask is not None: + bitmask = cp.asarray( + Buffer(valid_mask[0].ptr, valid_mask[0].bufsize), cp.bool8 + ) + cudf_col[~bitmask] = None + + return cudf_col + + +def protocol_dtype_to_cupy_dtype(_dtype: ProtoDtype) -> cp.dtype: + kind = _dtype[0] + bitwidth = _dtype[1] + if _dtype[0] not in _SUPPORTED_KINDS: + raise RuntimeError(f"Data type {_dtype[0]} not handled yet") + + return _CP_DTYPES[kind][bitwidth] + + +def _protocol_to_cudf_column_categorical( + col: _CuDFColumn, +) -> Tuple[ + cudf.core.column.ColumnBase, + Mapping[str, Optional[Tuple[_CuDFBuffer, ProtoDtype]]], +]: + """ + Convert a categorical column to a Series instance + """ + ordered, is_dict, mapping = col.describe_categorical + if not is_dict: + raise NotImplementedError( + "Non-dictionary categoricals not supported yet" + ) + + categories = as_column(mapping.values()) + buffers = col.get_buffers() + assert buffers["data"] is not None, "data buffer should not be None" + codes_buffer, codes_dtype = buffers["data"] + _check_buffer_is_on_gpu(codes_buffer) + cdtype = protocol_dtype_to_cupy_dtype(codes_dtype) + codes = build_column( + Buffer(codes_buffer.ptr, codes_buffer.bufsize), cdtype + ) + + cudfcol = build_categorical_column( + categories=categories, + codes=codes, + mask=codes.base_mask, + size=codes.size, + ordered=ordered, + ) + + return _set_missing_values(col, cudfcol), buffers + + +def _protocol_to_cudf_column_string( + col: _CuDFColumn, +) -> Tuple[ + cudf.core.column.ColumnBase, + Mapping[str, Optional[Tuple[_CuDFBuffer, ProtoDtype]]], +]: + """ + Convert a string ColumnObject to cudf Column object. + """ + # Retrieve the data buffers + buffers = col.get_buffers() + + # Retrieve the data buffer containing the UTF-8 code units + assert buffers["data"] is not None, "data buffer should never be None" + data_buffer, data_dtype = buffers["data"] + _check_buffer_is_on_gpu(data_buffer) + encoded_string = build_column( + Buffer(data_buffer.ptr, data_buffer.bufsize), + protocol_dtype_to_cupy_dtype(data_dtype), + ) + + # Retrieve the offsets buffer containing the index offsets demarcating + # the beginning and end of each string + assert buffers["offsets"] is not None, "not possible for string column" + offset_buffer, offset_dtype = buffers["offsets"] + _check_buffer_is_on_gpu(offset_buffer) + offsets = build_column( + Buffer(offset_buffer.ptr, offset_buffer.bufsize), + protocol_dtype_to_cupy_dtype(offset_dtype), + ) + + cudfcol_str = build_column( + None, dtype=cp.dtype("O"), children=(offsets, encoded_string) + ) + return _set_missing_values(col, cudfcol_str), buffers diff --git a/python/cudf/cudf/tests/test_df_protocol.py b/python/cudf/cudf/tests/test_df_protocol.py new file mode 100644 index 00000000000..d24c8ca2860 --- /dev/null +++ b/python/cudf/cudf/tests/test_df_protocol.py @@ -0,0 +1,219 @@ +from typing import Any, Tuple + +import cupy as cp +import pandas as pd +import pytest + +import cudf +from cudf.core.buffer import Buffer +from cudf.core.column import build_column +from cudf.core.df_protocol import ( + DataFrameObject, + _CuDFBuffer, + _CuDFColumn, + _DtypeKind, + _from_dataframe, + protocol_dtype_to_cupy_dtype, +) +from cudf.testing._utils import assert_eq + + +def assert_buffer_equal(buffer_and_dtype: Tuple[_CuDFBuffer, Any], cudfcol): + buf, dtype = buffer_and_dtype + device_id = cp.asarray(cudfcol.data).device.id + assert buf.__dlpack_device__() == (2, device_id) + col_from_buf = build_column( + Buffer(buf.ptr, buf.bufsize), protocol_dtype_to_cupy_dtype(dtype) + ) + # check that non null values are the equals as nulls are represented + # by sentinel values in the buffer. + non_null_idxs = cudf.Series(cudfcol) != cudf.NA + assert_eq(col_from_buf[non_null_idxs], cudfcol[non_null_idxs]) + + if dtype[0] != _DtypeKind.BOOL: + array_from_dlpack = cp.fromDlpack(buf.__dlpack__()) + col_array = cp.asarray(cudfcol.data_array_view) + assert_eq(array_from_dlpack.flatten(), col_array.flatten()) + else: + pytest.raises(TypeError, buf.__dlpack__) + + +def assert_column_equal(col: _CuDFColumn, cudfcol): + assert col.size == cudfcol.size + assert col.offset == 0 + assert col.null_count == cudfcol.null_count + assert col.num_chunks() == 1 + if col.null_count == 0: + pytest.raises(RuntimeError, col._get_validity_buffer) + assert col.get_buffers()["validity"] is None + else: + assert_buffer_equal( + col.get_buffers()["validity"], + cudfcol._get_mask_as_column().astype(cp.uint8), + ) + + if col.dtype[0] == _DtypeKind.CATEGORICAL: + assert_buffer_equal(col.get_buffers()["data"], cudfcol.codes) + assert col.get_buffers()["offsets"] is None + + elif col.dtype[0] == _DtypeKind.STRING: + assert_buffer_equal(col.get_buffers()["data"], cudfcol.children[1]) + assert_buffer_equal(col.get_buffers()["offsets"], cudfcol.children[0]) + + else: + assert_buffer_equal(col.get_buffers()["data"], cudfcol) + assert col.get_buffers()["offsets"] is None + + if col.null_count == 0: + assert col.describe_null == (0, None) + else: + assert col.describe_null == (3, 0) + + +def assert_dataframe_equal(dfo: DataFrameObject, df: cudf.DataFrame): + assert dfo.num_columns() == len(df.columns) + assert dfo.num_rows() == len(df) + assert dfo.num_chunks() == 1 + assert dfo.column_names() == list(df.columns) + for col in df.columns: + assert_column_equal(dfo.get_column_by_name(col), df[col]._column) + + +def assert_from_dataframe_equals(dfobj): + df2 = _from_dataframe(dfobj) + + assert_dataframe_equal(dfobj, df2) + if isinstance(dfobj._df, cudf.DataFrame): + assert_eq(dfobj._df, df2) + + elif isinstance(dfobj._df, pd.DataFrame): + assert_eq(cudf.DataFrame(dfobj._df), df2) + + else: + raise TypeError(f"{type(dfobj._df)} not supported yet.") + + +def assert_from_dataframe_exception(dfobj): + exception_msg = "This operation must copy data from CPU to GPU." + " Set `allow_copy=True` to allow it." + with pytest.raises(TypeError, match=exception_msg): + _from_dataframe(dfobj) + + +def assert_df_unique_dtype_cols(data): + cdf = cudf.DataFrame(data=data) + assert_from_dataframe_equals(cdf.__dataframe__(allow_copy=False)) + assert_from_dataframe_equals(cdf.__dataframe__(allow_copy=True)) + + +def test_from_dataframe(): + data = dict(a=[1, 2, 3], b=[9, 10, 11]) + df1 = cudf.DataFrame(data=data) + df2 = cudf.from_dataframe(df1) + assert_eq(df1, df2) + + +def test_int_dtype(): + data_int = dict(a=[1, 2, 3], b=[9, 10, 11]) + assert_df_unique_dtype_cols(data_int) + + +def test_float_dtype(): + data_float = dict(a=[1.5, 2.5, 3.5], b=[9.2, 10.5, 11.8]) + assert_df_unique_dtype_cols(data_float) + + +def test_categorical_dtype(): + cdf = cudf.DataFrame({"A": [1, 2, 5, 1]}) + cdf["A"] = cdf["A"].astype("category") + col = cdf.__dataframe__().get_column_by_name("A") + assert col.dtype[0] == _DtypeKind.CATEGORICAL + assert col.describe_categorical == (False, True, {0: 1, 1: 2, 2: 5}) + assert_from_dataframe_equals(cdf.__dataframe__(allow_copy=False)) + assert_from_dataframe_equals(cdf.__dataframe__(allow_copy=True)) + + +def test_bool_dtype(): + data_bool = dict(a=[True, True, False], b=[False, True, False]) + assert_df_unique_dtype_cols(data_bool) + + +def test_string_dtype(): + data_string = dict(a=["a", "b", "cdef", "", "g"]) + assert_df_unique_dtype_cols(data_string) + + +def test_mixed_dtype(): + data_mixed = dict( + int=[1, 2, 3], + float=[1.5, 2.5, 3.5], + bool=[True, False, True], + categorical=[5, 1, 5], + string=["rapidsai-cudf ", "", "df protocol"], + ) + assert_df_unique_dtype_cols(data_mixed) + + +def test_NA_int_dtype(): + data_int = dict( + a=[1, None, 3, None, 5], + b=[9, 10, None, 7, 8], + c=[6, 19, 20, 100, 1000], + ) + assert_df_unique_dtype_cols(data_int) + + +def test_NA_float_dtype(): + data_float = dict( + a=[1.4, None, 3.6, None, 5.2], + b=[9.7, 10.9, None, 7.8, 8.2], + c=[6.1, 19.2, 20.3, 100.4, 1000.5], + ) + assert_df_unique_dtype_cols(data_float) + + +def test_NA_categorical_dtype(): + df = cudf.DataFrame({"A": [1, 2, 5, 1]}) + df["B"] = df["A"].astype("category") + df.at[[1, 3], "B"] = None # Set two items to null + + # Some detailed testing for correctness of dtype and null handling: + col = df.__dataframe__().get_column_by_name("B") + assert col.dtype[0] == _DtypeKind.CATEGORICAL + assert col.null_count == 2 + assert col.describe_null == (3, 0) + assert col.num_chunks() == 1 + assert col.describe_categorical == (False, True, {0: 1, 1: 2, 2: 5}) + assert_from_dataframe_equals(df.__dataframe__(allow_copy=False)) + assert_from_dataframe_equals(df.__dataframe__(allow_copy=True)) + + +def test_NA_bool_dtype(): + data_bool = dict(a=[None, True, False], b=[False, None, None]) + assert_df_unique_dtype_cols(data_bool) + + +def test_NA_string_dtype(): + df = cudf.DataFrame({"A": ["a", "b", "cdef", "", "g"]}) + df["B"] = df["A"].astype("object") + df.at[1, "B"] = cudf.NA # Set one item to null + + # Test for correctness and null handling: + col = df.__dataframe__().get_column_by_name("B") + assert col.dtype[0] == _DtypeKind.STRING + assert col.null_count == 1 + assert col.describe_null == (3, 0) + assert col.num_chunks() == 1 + assert_from_dataframe_equals(df.__dataframe__(allow_copy=False)) + assert_from_dataframe_equals(df.__dataframe__(allow_copy=True)) + + +def test_NA_mixed_dtype(): + data_mixed = dict( + int=[1, None, 2, 3, 1000], + float=[None, 1.5, 2.5, 3.5, None], + bool=[True, None, False, None, None], + categorical=[5, 1, 5, 3, None], + string=[None, None, None, "df protocol", None], + ) + assert_df_unique_dtype_cols(data_mixed)