Skip to content

Commit

Permalink
Fix consumption of CPU-backed interchange protocol dataframes (#11392)
Browse files Browse the repository at this point in the history
Closes #11245. This PR fixes a bug in our code that consumes a protocol DataFrame that is backed by CPU memory.

This enables using the `from_dataframe()` function to construct DataFrames from other libraries:

```python
In [12]: pdf = pd.DataFrame({'x': [1, 2, 3], 'y': [1.0, 2.0, np.nan], 'z': ['a', 'b', 'c']})

In [13]: vdf = vaex.from_dict({'x': [1, 2, 3], 'y': [1.0, 2.0, np.nan]})

In [14]: cudf.from_dataframe(pdf, allow_copy=True)
Out[14]: 
   x    y  z
0  1  1.0  a
1  2  2.0  b
2  3  NaN  c

In [15]: cudf.from_dataframe(vdf, allow_copy=True)
Out[15]: 
   x    y
0  1  1.0
1  2  2.0
2  3  NaN
```

Authors:
  - Ashwin Srinath (https://github.com/shwina)
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Bradley Dice (https://github.com/bdice)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #11392
  • Loading branch information
shwina authored Apr 28, 2023
1 parent fd1afc8 commit ca2ca37
Show file tree
Hide file tree
Showing 3 changed files with 211 additions and 112 deletions.
216 changes: 131 additions & 85 deletions python/cudf/cudf/core/df_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import numpy as np
from numba.cuda import as_cuda_array

import rmm

import cudf
from cudf.core.buffer import Buffer, as_buffer
from cudf.core.column import as_column, build_categorical_column, build_column
Expand Down Expand Up @@ -46,6 +48,14 @@ class _Device(enum.IntEnum):
ROCM = 10


class _MaskKind(enum.IntEnum):
NON_NULLABLE = 0
NAN = 1
SENTINEL = 2
BITMASK = 3
BYTEMASK = 4


_SUPPORTED_KINDS = {
_DtypeKind.INT,
_DtypeKind.UINT,
Expand Down Expand Up @@ -110,7 +120,6 @@ def __repr__(self) -> str:
{
"bufsize": self.bufsize,
"ptr": self.ptr,
"dlpack": self.__dlpack__(),
"device": self.__dlpack_device__()[0].name,
}
)
Expand Down Expand Up @@ -151,7 +160,6 @@ def __init__(
self._nan_as_null = nan_as_null
self._allow_copy = allow_copy

@property
def size(self) -> int:
"""
Size of the column, in elements.
Expand Down Expand Up @@ -311,11 +319,11 @@ def describe_null(self) -> Tuple[int, Any]:
kind = self.dtype[0]
if self.null_count == 0:
# there is no validity mask so it is non-nullable
return 0, None
return _MaskKind.NON_NULLABLE, None

elif kind in _SUPPORTED_KINDS:
# bit mask is universally used in cudf for missing
return 3, 0
# currently, we return a bit mask
return _MaskKind.BITMASK, 0

else:
raise NotImplementedError(
Expand Down Expand Up @@ -399,32 +407,22 @@ def _get_validity_buffer(
Raises RuntimeError if null representation is not a bit or byte mask.
"""

null, invalid = self.describe_null
if null == 3:
if self.dtype[0] == _DtypeKind.CATEGORICAL:
valid_mask = cast(
cudf.core.column.CategoricalColumn, self._col
).codes._get_mask_as_column()
else:
valid_mask = self._col._get_mask_as_column()

assert (valid_mask is not None) and (
valid_mask.data is not None
), "valid_mask(.data) should not be None when "
"_CuDFColumn.describe_null[0] = 3"
if null == _MaskKind.BITMASK:
assert self._col.mask is not None
buffer = _CuDFBuffer(
valid_mask.data, cp.uint8, allow_copy=self._allow_copy
self._col.mask, cp.uint8, allow_copy=self._allow_copy
)
dtype = (_DtypeKind.UINT, 8, "C", "=")
return buffer, dtype

elif null == 1:
elif null == _MaskKind.NAN:
raise RuntimeError(
"This column uses NaN as null "
"so does not have a separate mask"
)
elif null == 0:
elif null == _MaskKind.NON_NULLABLE:
raise RuntimeError(
"This column is non-nullable so does not have a mask"
)
Expand Down Expand Up @@ -645,28 +643,62 @@ def __dataframe__(
_INTS = {8: cp.int8, 16: cp.int16, 32: cp.int32, 64: cp.int64}
_UINTS = {8: cp.uint8, 16: cp.uint16, 32: cp.uint32, 64: cp.uint64}
_FLOATS = {32: cp.float32, 64: cp.float64}
_CP_DTYPES = {0: _INTS, 1: _UINTS, 2: _FLOATS, 20: {8: bool}}
_CP_DTYPES = {
0: _INTS,
1: _UINTS,
2: _FLOATS,
20: {8: bool},
21: {8: cp.uint8},
}


def from_dataframe(
df: DataFrameObject, allow_copy: bool = False
) -> _CuDFDataFrame:
"""
Construct a cudf DataFrame from ``df`` if it supports ``__dataframe__``
Construct a ``DataFrame`` from ``df`` if it supports the
dataframe interchange protocol (``__dataframe__``).
Parameters
----------
df : DataFrameObject
Object supporting dataframe interchange protocol
allow_copy : bool
If ``True``, allow copying of the data. If ``False``, a
``TypeError`` is raised if data copying is required to
construct the ``DataFrame`` (e.g., if ``df`` lives in CPU
memory).
Returns
-------
DataFrame
Examples
--------
>>> import pandas as pd
>>> pdf = pd.DataFrame({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})
>>> df = cudf.from_dataframe(pdf, allow_copy=True)
>>> type(df)
cudf.core.dataframe.DataFrame
>>> df
a b
0 1 x
1 2 y
2 3 z
Notes
-----
See https://data-apis.org/dataframe-protocol/latest/index.html
for the dataframe interchange protocol spec and API
"""
if isinstance(df, cudf.DataFrame):
return df

if not hasattr(df, "__dataframe__"):
raise ValueError("`df` does not support __dataframe__")

return _from_dataframe(df.__dataframe__(allow_copy=allow_copy))
df = df.__dataframe__(allow_copy=allow_copy)


def _from_dataframe(df: DataFrameObject) -> _CuDFDataFrame:
"""
Create a cudf DataFrame object from DataFrameObject.
"""
# Check number of chunks, if there's more than one we need to iterate
if df.num_chunks() > 1:
raise NotImplementedError("More than one chunk not handled yet")
Expand All @@ -683,13 +715,19 @@ def _from_dataframe(df: DataFrameObject) -> _CuDFDataFrame:
_DtypeKind.FLOAT,
_DtypeKind.BOOL,
):
columns[name], _buf = _protocol_to_cudf_column_numeric(col)
columns[name], _buf = _protocol_to_cudf_column_numeric(
col, allow_copy
)

elif col.dtype[0] == _DtypeKind.CATEGORICAL:
columns[name], _buf = _protocol_to_cudf_column_categorical(col)
columns[name], _buf = _protocol_to_cudf_column_categorical(
col, allow_copy
)

elif col.dtype[0] == _DtypeKind.STRING:
columns[name], _buf = _protocol_to_cudf_column_string(col)
columns[name], _buf = _protocol_to_cudf_column_string(
col, allow_copy
)

else:
raise NotImplementedError(
Expand All @@ -704,7 +742,7 @@ def _from_dataframe(df: DataFrameObject) -> _CuDFDataFrame:


def _protocol_to_cudf_column_numeric(
col: _CuDFColumn,
col, allow_copy: bool
) -> Tuple[
cudf.core.column.ColumnBase,
Mapping[str, Optional[Tuple[_CuDFBuffer, ProtoDtype]]],
Expand All @@ -719,49 +757,56 @@ def _protocol_to_cudf_column_numeric(
buffers = col.get_buffers()
assert buffers["data"] is not None, "data buffer should not be None"
_dbuffer, _ddtype = buffers["data"]
_check_buffer_is_on_gpu(_dbuffer)
_dbuffer = _ensure_gpu_buffer(_dbuffer, _ddtype, allow_copy)
cudfcol_num = build_column(
as_buffer(
data=_dbuffer.ptr, size=_dbuffer.bufsize, owner=None, exposed=True
),
_dbuffer._buf,
protocol_dtype_to_cupy_dtype(_ddtype),
)
return _set_missing_values(col, cudfcol_num), buffers


def _check_buffer_is_on_gpu(buffer: _CuDFBuffer) -> None:
if (
buffer.__dlpack_device__()[0] != _Device.CUDA
and not buffer._allow_copy
):
raise TypeError(
"This operation must copy data from CPU to GPU. "
"Set `allow_copy=True` to allow it."
)

elif buffer.__dlpack_device__()[0] != _Device.CUDA and buffer._allow_copy:
raise NotImplementedError(
"Only cuDF/GPU dataframes are supported for now. "
"CPU (like `Pandas`) dataframes will be supported shortly."
)
return _set_missing_values(col, cudfcol_num, allow_copy), buffers


def _ensure_gpu_buffer(buf, data_type, allow_copy: bool) -> _CuDFBuffer:
# if `buf` is a (protocol) buffer that lives on the GPU already,
# return it as is. Otherwise, copy it to the device and return
# the resulting buffer.
if buf.__dlpack_device__()[0] != _Device.CUDA:
if allow_copy:
dbuf = rmm.DeviceBuffer(ptr=buf.ptr, size=buf.bufsize)
return _CuDFBuffer(
as_buffer(dbuf, exposed=True),
protocol_dtype_to_cupy_dtype(data_type),
allow_copy,
)
else:
raise TypeError(
"This operation must copy data from CPU to GPU. "
"Set `allow_copy=True` to allow it."
)
return buf


def _set_missing_values(
protocol_col: _CuDFColumn, cudf_col: cudf.core.column.ColumnBase
protocol_col,
cudf_col: cudf.core.column.ColumnBase,
allow_copy: bool,
) -> cudf.core.column.ColumnBase:

valid_mask = protocol_col.get_buffers()["validity"]
if valid_mask is not None:
bitmask = cp.asarray(
as_buffer(
data=valid_mask[0].ptr,
size=valid_mask[0].bufsize,
exposed=True,
),
cp.bool8,
)
cudf_col[~bitmask] = None

null, invalid = protocol_col.describe_null
if null == _MaskKind.BYTEMASK:
valid_mask = _ensure_gpu_buffer(
valid_mask[0], valid_mask[1], allow_copy
)
boolmask = as_column(valid_mask._buf, dtype="bool")
bitmask = cudf._lib.transform.bools_to_mask(boolmask)
return cudf_col.set_mask(bitmask)
elif null == _MaskKind.BITMASK:
valid_mask = _ensure_gpu_buffer(
valid_mask[0], valid_mask[1], allow_copy
)
bitmask = valid_mask._buf
return cudf_col.set_mask(bitmask)
return cudf_col


Expand All @@ -775,30 +820,26 @@ def protocol_dtype_to_cupy_dtype(_dtype: ProtoDtype) -> cp.dtype:


def _protocol_to_cudf_column_categorical(
col: _CuDFColumn,
col, allow_copy: bool
) -> Tuple[
cudf.core.column.ColumnBase,
Mapping[str, Optional[Tuple[_CuDFBuffer, ProtoDtype]]],
]:
"""
Convert a categorical column to a Series instance
"""
ordered, is_dict, mapping = col.describe_categorical
ordered, is_dict, categories = col.describe_categorical
if not is_dict:
raise NotImplementedError(
"Non-dictionary categoricals not supported yet"
)

categories = as_column(mapping.values())
buffers = col.get_buffers()
assert buffers["data"] is not None, "data buffer should not be None"
codes_buffer, codes_dtype = buffers["data"]
_check_buffer_is_on_gpu(codes_buffer)
codes_buffer = _ensure_gpu_buffer(codes_buffer, codes_dtype, allow_copy)
cdtype = protocol_dtype_to_cupy_dtype(codes_dtype)
codes = build_column(
as_buffer(
data=codes_buffer.ptr, size=codes_buffer.bufsize, exposed=True
),
codes_buffer._buf,
cdtype,
)

Expand All @@ -810,11 +851,11 @@ def _protocol_to_cudf_column_categorical(
ordered=ordered,
)

return _set_missing_values(col, cudfcol), buffers
return _set_missing_values(col, cudfcol, allow_copy), buffers


def _protocol_to_cudf_column_string(
col: _CuDFColumn,
col, allow_copy: bool
) -> Tuple[
cudf.core.column.ColumnBase,
Mapping[str, Optional[Tuple[_CuDFBuffer, ProtoDtype]]],
Expand All @@ -828,27 +869,32 @@ def _protocol_to_cudf_column_string(
# Retrieve the data buffer containing the UTF-8 code units
assert buffers["data"] is not None, "data buffer should never be None"
data_buffer, data_dtype = buffers["data"]
_check_buffer_is_on_gpu(data_buffer)
data_buffer = _ensure_gpu_buffer(data_buffer, data_dtype, allow_copy)
encoded_string = build_column(
as_buffer(
data=data_buffer.ptr, size=data_buffer.bufsize, exposed=True
),
data_buffer._buf,
protocol_dtype_to_cupy_dtype(data_dtype),
)

# Retrieve the offsets buffer containing the index offsets demarcating
# the beginning and end of each string
assert buffers["offsets"] is not None, "not possible for string column"
offset_buffer, offset_dtype = buffers["offsets"]
_check_buffer_is_on_gpu(offset_buffer)
offset_buffer = _ensure_gpu_buffer(offset_buffer, offset_dtype, allow_copy)
offsets = build_column(
as_buffer(
data=offset_buffer.ptr, size=offset_buffer.bufsize, exposed=True
),
offset_buffer._buf,
protocol_dtype_to_cupy_dtype(offset_dtype),
)

offsets = offsets.astype("int32")
cudfcol_str = build_column(
None, dtype=cp.dtype("O"), children=(offsets, encoded_string)
)
return _set_missing_values(col, cudfcol_str), buffers
return _set_missing_values(col, cudfcol_str, allow_copy), buffers


def _protocol_buffer_to_cudf_buffer(protocol_buffer):
return as_buffer(
rmm.DeviceBuffer(
ptr=protocol_buffer.ptr, size=protocol_buffer.bufsize
),
exposed=True,
)
1 change: 0 additions & 1 deletion python/cudf/cudf/core/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,7 +870,6 @@ def from_arrow(cls, typ):
return IntervalDtype(typ.subtype.to_pandas_dtype(), typ.closed)

def to_arrow(self):

return ArrowIntervalType(
pa.from_numpy_dtype(self.subtype), self.closed
)
Expand Down
Loading

0 comments on commit ca2ca37

Please sign in to comment.