Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix consumption of CPU-backed interchange protocol dataframes #11392

Merged
Show file tree
Hide file tree
Changes from 41 commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
ca971da
Fix consumption of non-GPU backed protocol dataframes
shwina Jul 28, 2022
a39281b
test pandas 1.5 rc0
galipremsagar Aug 29, 2022
f87f232
temp commit
galipremsagar Aug 29, 2022
fc1647a
Merge remote-tracking branch 'upstream/branch-22.10' into pandas_1.5.x
galipremsagar Aug 29, 2022
b7b3d76
initial pass of fixes
galipremsagar Aug 30, 2022
41d0381
Merge remote-tracking branch 'upstream/branch-22.10' into pandas_1.5.x
galipremsagar Aug 30, 2022
1b92423
Merge remote-tracking branch 'upstream/branch-22.10' into pandas_1.5.x
galipremsagar Aug 31, 2022
7095878
fix
galipremsagar Aug 31, 2022
01bd01e
Merge remote-tracking branch 'upstream/branch-22.10' into pandas_1.5.x
galipremsagar Aug 31, 2022
28d12db
more fixes
galipremsagar Aug 31, 2022
47eea3c
more fixes
galipremsagar Aug 31, 2022
8d53832
more fixes
galipremsagar Sep 1, 2022
a558088
Merge remote-tracking branch 'upstream/branch-22.10' into pandas_1.5.x
galipremsagar Sep 1, 2022
5fae833
more fixes
galipremsagar Sep 1, 2022
988443d
Merge remote-tracking branch 'upstream/branch-22.10' into pandas_1.5.x
galipremsagar Sep 1, 2022
d8d545e
fix
galipremsagar Sep 1, 2022
817bcf1
merge
galipremsagar Sep 2, 2022
ff61af7
update
galipremsagar Sep 2, 2022
092d20c
Merge remote-tracking branch 'upstream/branch-22.10' into pandas_1.5.x
galipremsagar Sep 2, 2022
287756a
fix
galipremsagar Sep 2, 2022
f95fd4b
Merge remote-tracking branch 'upstream/branch-22.10' into pandas_1.5.x
galipremsagar Sep 6, 2022
c05790c
Merge branch 'branch-22.10' of https://github.com/rapidsai/cudf into …
shwina Sep 6, 2022
f92d742
Merge remote-tracking branch 'galipremsagar/pandas_1.5.x' into fix-in…
shwina Sep 6, 2022
c3a83c2
Merge branch 'branch-22.10' of https://github.com/rapidsai/cudf into …
shwina Sep 21, 2022
835d439
LT -> GE
shwina Sep 21, 2022
cb9c059
Undo changes introduced by upstream PR
shwina Sep 22, 2022
9602fba
Merge branch 'branch-22.10' of https://github.com/rapidsai/cudf into …
shwina Oct 11, 2022
1290afc
Merge branch 'branch-22.12' of https://github.com/rapidsai/cudf into …
shwina Oct 11, 2022
022991d
Merge branch 'branch-22.12' of github.com:rapidsai/cudf into fix-inte…
shwina Oct 28, 2022
4fd41e2
Add true support for CPU backed DFs
shwina Oct 31, 2022
98c8035
Lots of fixes
shwina Oct 31, 2022
9ccd440
More improvements
shwina Oct 31, 2022
f1cb5cb
Make the Pandas DF nullable
shwina Oct 31, 2022
2a5859d
use enum
shwina Oct 31, 2022
1ec350d
Int -> enum
shwina Oct 31, 2022
ecd91d2
Merge branch 'branch-23.02' into fix-interchange-protocol-error-cross…
shwina Dec 2, 2022
48c7e7d
Merge branch 'branch-23.06' of github.com:rapidsai/cudf into fix-inte…
shwina Apr 5, 2023
e4d1d4f
Add docstring for from_dataframe
shwina Apr 5, 2023
2525790
Use ints to construct IntEnum
shwina Apr 5, 2023
e0f3275
Review suggestions
shwina Apr 5, 2023
99e7c65
Merge branch 'branch-23.06' into fix-interchange-protocol-error-cross…
vyasr Apr 13, 2023
0837055
Merge branch 'branch-23.06' of github.com:rapidsai/cudf into fix-inte…
shwina Apr 14, 2023
f48422f
Fix categorical handling and add tests for nulls
shwina Apr 14, 2023
138b4d2
Merge branch 'fix-interchange-protocol-error-cross-device' of github.…
shwina Apr 14, 2023
e90fe68
Update python/cudf/cudf/core/df_protocol.py
shwina Apr 14, 2023
0ac28ed
Merge branch 'branch-23.06' into fix-interchange-protocol-error-cross…
shwina Apr 28, 2023
862f587
size is a method
shwina Apr 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 129 additions & 81 deletions python/cudf/cudf/core/df_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import numpy as np
from numba.cuda import as_cuda_array

import rmm

import cudf
from cudf.core.buffer import Buffer, as_buffer
from cudf.core.column import as_column, build_categorical_column, build_column
Expand Down Expand Up @@ -46,6 +48,14 @@ class _Device(enum.IntEnum):
ROCM = 10


class _MaskKind(enum.IntEnum):
NON_NULLABLE = 0
NAN = 1
SENTINEL = 2
BITMASK = 3
BYTEMASK = 4


_SUPPORTED_KINDS = {
_DtypeKind.INT,
_DtypeKind.UINT,
Expand Down Expand Up @@ -110,7 +120,6 @@ def __repr__(self) -> str:
{
"bufsize": self.bufsize,
"ptr": self.ptr,
"dlpack": self.__dlpack__(),
"device": self.__dlpack_device__()[0].name,
}
)
Expand Down Expand Up @@ -311,11 +320,11 @@ def describe_null(self) -> Tuple[int, Any]:
kind = self.dtype[0]
if self.null_count == 0:
# there is no validity mask so it is non-nullable
return 0, None
return _MaskKind.NON_NULLABLE, None

elif kind in _SUPPORTED_KINDS:
# bit mask is universally used in cudf for missing
return 3, 0
# currently, we return a bit mask
return _MaskKind.BITMASK, 0

else:
raise NotImplementedError(
Expand Down Expand Up @@ -399,32 +408,22 @@ def _get_validity_buffer(

Raises RuntimeError if null representation is not a bit or byte mask.
"""

shwina marked this conversation as resolved.
Show resolved Hide resolved
null, invalid = self.describe_null
if null == 3:
if self.dtype[0] == _DtypeKind.CATEGORICAL:
valid_mask = cast(
cudf.core.column.CategoricalColumn, self._col
).codes._get_mask_as_column()
else:
valid_mask = self._col._get_mask_as_column()

assert (valid_mask is not None) and (
valid_mask.data is not None
), "valid_mask(.data) should not be None when "
"_CuDFColumn.describe_null[0] = 3"
if null == _MaskKind.BITMASK:
assert self._col.mask is not None
buffer = _CuDFBuffer(
valid_mask.data, cp.uint8, allow_copy=self._allow_copy
self._col.mask, cp.uint8, allow_copy=self._allow_copy
)
dtype = (_DtypeKind.UINT, 8, "C", "=")
return buffer, dtype

elif null == 1:
elif null == _MaskKind.NAN:
raise RuntimeError(
"This column uses NaN as null "
"so does not have a separate mask"
)
elif null == 0:
elif null == _MaskKind.NON_NULLABLE:
raise RuntimeError(
"This column is non-nullable so does not have a mask"
)
Expand Down Expand Up @@ -645,28 +644,61 @@ def __dataframe__(
_INTS = {8: cp.int8, 16: cp.int16, 32: cp.int32, 64: cp.int64}
_UINTS = {8: cp.uint8, 16: cp.uint16, 32: cp.uint32, 64: cp.uint64}
_FLOATS = {32: cp.float32, 64: cp.float64}
_CP_DTYPES = {0: _INTS, 1: _UINTS, 2: _FLOATS, 20: {8: bool}}
_CP_DTYPES = {
0: _INTS,
1: _UINTS,
2: _FLOATS,
20: {8: bool},
21: {8: cp.uint8},
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strings columns in the protocol are composed of a uint8 buffer of data (chars) and an int buffer of offsets. The data column has a distinct type ID (21) that we previously didn't handle.

As a clarifying example, look at the buffers returned by a Pandas dataframe containing a single string column:

In [17]: pd.DataFrame({'a': ['x', 'y', 'z']}).__dataframe__().get_column(0).get_buffers()
Out[17]: 
{'data': (PandasBuffer({'bufsize': 3, 'ptr': 140001666253824, 'device': 'CPU'}),
  (<DtypeKind.STRING: 21>, 8, 'u', '=')),
 'validity': (PandasBuffer({'bufsize': 3, 'ptr': 94467671186912, 'device': 'CPU'}),
  (<DtypeKind.BOOL: 20>, 8, 'b', '=')),
 'offsets': (PandasBuffer({'bufsize': 32, 'ptr': 94467669930400, 'device': 'CPU'}),
  (<DtypeKind.INT: 0>, 64, 'l', '='))}

}


def from_dataframe(
df: DataFrameObject, allow_copy: bool = False
) -> _CuDFDataFrame:
"""
Construct a cudf DataFrame from ``df`` if it supports ``__dataframe__``
Construct a ``DataFrame`` from ``df`` if it supports the
dataframe interchange protocol (``__dataframe__``).

Parameters
----------
df: Object supporting dataframe interchange protocol
allow_copy
If ``True``, allow copying of the data. If ``False``, a
``TypeError`` is raised if data copying is required to
construct the ``DataFrame`` (e.g., if ``df`` lives in CPU
memory).
shwina marked this conversation as resolved.
Show resolved Hide resolved

Returns
-------
DataFrame

Examples
--------
>>> import pandas as pd
>>> pdf = pd.DataFrame({'a': [1, 2, 3], 'b': ['x', 'y', 'z']})
>>> df = cudf.from_dataframe(pdf, allow_copy=True)
>>> type(df)
cudf.core.dataframe.DataFrame
>>> df
a b
0 1 x
1 2 y
2 3 z

Notes
-----
See https://data-apis.org/dataframe-protocol/latest/index.html
for the dataframe interchange protocol spec and API
"""
if isinstance(df, cudf.DataFrame):
return df

if not hasattr(df, "__dataframe__"):
raise ValueError("`df` does not support __dataframe__")

return _from_dataframe(df.__dataframe__(allow_copy=allow_copy))
df = df.__dataframe__(allow_copy=allow_copy)
shwina marked this conversation as resolved.
Show resolved Hide resolved


def _from_dataframe(df: DataFrameObject) -> _CuDFDataFrame:
"""
Create a cudf DataFrame object from DataFrameObject.
"""
# Check number of chunks, if there's more than one we need to iterate
if df.num_chunks() > 1:
raise NotImplementedError("More than one chunk not handled yet")
Expand All @@ -683,13 +715,19 @@ def _from_dataframe(df: DataFrameObject) -> _CuDFDataFrame:
_DtypeKind.FLOAT,
_DtypeKind.BOOL,
):
columns[name], _buf = _protocol_to_cudf_column_numeric(col)
columns[name], _buf = _protocol_to_cudf_column_numeric(
col, allow_copy
)

elif col.dtype[0] == _DtypeKind.CATEGORICAL:
columns[name], _buf = _protocol_to_cudf_column_categorical(col)
columns[name], _buf = _protocol_to_cudf_column_categorical(
col, allow_copy
)

elif col.dtype[0] == _DtypeKind.STRING:
columns[name], _buf = _protocol_to_cudf_column_string(col)
columns[name], _buf = _protocol_to_cudf_column_string(
col, allow_copy
)

else:
raise NotImplementedError(
Expand All @@ -704,7 +742,7 @@ def _from_dataframe(df: DataFrameObject) -> _CuDFDataFrame:


def _protocol_to_cudf_column_numeric(
col: _CuDFColumn,
col, allow_copy: bool
) -> Tuple[
cudf.core.column.ColumnBase,
Mapping[str, Optional[Tuple[_CuDFBuffer, ProtoDtype]]],
Expand All @@ -719,49 +757,56 @@ def _protocol_to_cudf_column_numeric(
buffers = col.get_buffers()
assert buffers["data"] is not None, "data buffer should not be None"
_dbuffer, _ddtype = buffers["data"]
_check_buffer_is_on_gpu(_dbuffer)
_dbuffer = _ensure_gpu_buffer(_dbuffer, _ddtype, allow_copy)
cudfcol_num = build_column(
as_buffer(
data=_dbuffer.ptr, size=_dbuffer.bufsize, owner=None, exposed=True
),
_dbuffer._buf,
protocol_dtype_to_cupy_dtype(_ddtype),
)
return _set_missing_values(col, cudfcol_num), buffers


def _check_buffer_is_on_gpu(buffer: _CuDFBuffer) -> None:
if (
buffer.__dlpack_device__()[0] != _Device.CUDA
and not buffer._allow_copy
):
raise TypeError(
"This operation must copy data from CPU to GPU. "
"Set `allow_copy=True` to allow it."
)

elif buffer.__dlpack_device__()[0] != _Device.CUDA and buffer._allow_copy:
raise NotImplementedError(
"Only cuDF/GPU dataframes are supported for now. "
"CPU (like `Pandas`) dataframes will be supported shortly."
)
return _set_missing_values(col, cudfcol_num, allow_copy), buffers


def _ensure_gpu_buffer(buf, data_type, allow_copy: bool) -> _CuDFBuffer:
# if `buf` is a (protocol) buffer that lives on the GPU already,
# return it as is. Otherwise, copy it to the device and return
# the resulting buffer.
if buf.__dlpack_device__()[0] != _Device.CUDA:
if allow_copy:
dbuf = rmm.DeviceBuffer(ptr=buf.ptr, size=buf.bufsize)
return _CuDFBuffer(
as_buffer(dbuf, exposed=True),
protocol_dtype_to_cupy_dtype(data_type),
allow_copy,
)
else:
raise TypeError(
"This operation must copy data from CPU to GPU. "
"Set `allow_copy=True` to allow it."
)
return buf


def _set_missing_values(
protocol_col: _CuDFColumn, cudf_col: cudf.core.column.ColumnBase
protocol_col,
cudf_col: cudf.core.column.ColumnBase,
allow_copy: bool,
) -> cudf.core.column.ColumnBase:

valid_mask = protocol_col.get_buffers()["validity"]
if valid_mask is not None:
bitmask = cp.asarray(
as_buffer(
data=valid_mask[0].ptr,
size=valid_mask[0].bufsize,
exposed=True,
),
cp.bool8,
)
cudf_col[~bitmask] = None

null, invalid = protocol_col.describe_null
if null == _MaskKind.BYTEMASK:
valid_mask = _ensure_gpu_buffer(
valid_mask[0], valid_mask[1], allow_copy
)
boolmask = as_column(valid_mask._buf, dtype="bool")
bitmask = cudf._lib.transform.bools_to_mask(boolmask)
return cudf_col.set_mask(bitmask)
elif null == _MaskKind.BITMASK:
valid_mask = _ensure_gpu_buffer(
valid_mask[0], valid_mask[1], allow_copy
)
bitmask = valid_mask._buf
return cudf_col.set_mask(bitmask)
return cudf_col


Expand All @@ -775,7 +820,7 @@ def protocol_dtype_to_cupy_dtype(_dtype: ProtoDtype) -> cp.dtype:


def _protocol_to_cudf_column_categorical(
col: _CuDFColumn,
col, allow_copy: bool
) -> Tuple[
cudf.core.column.ColumnBase,
Mapping[str, Optional[Tuple[_CuDFBuffer, ProtoDtype]]],
Expand All @@ -793,12 +838,10 @@ def _protocol_to_cudf_column_categorical(
buffers = col.get_buffers()
assert buffers["data"] is not None, "data buffer should not be None"
codes_buffer, codes_dtype = buffers["data"]
_check_buffer_is_on_gpu(codes_buffer)
codes_buffer = _ensure_gpu_buffer(codes_buffer, codes_dtype, allow_copy)
cdtype = protocol_dtype_to_cupy_dtype(codes_dtype)
codes = build_column(
as_buffer(
data=codes_buffer.ptr, size=codes_buffer.bufsize, exposed=True
),
codes_buffer._buf,
cdtype,
)

Expand All @@ -810,11 +853,11 @@ def _protocol_to_cudf_column_categorical(
ordered=ordered,
)

return _set_missing_values(col, cudfcol), buffers
return _set_missing_values(col, cudfcol, allow_copy), buffers


def _protocol_to_cudf_column_string(
col: _CuDFColumn,
col, allow_copy: bool
) -> Tuple[
cudf.core.column.ColumnBase,
Mapping[str, Optional[Tuple[_CuDFBuffer, ProtoDtype]]],
Expand All @@ -828,27 +871,32 @@ def _protocol_to_cudf_column_string(
# Retrieve the data buffer containing the UTF-8 code units
assert buffers["data"] is not None, "data buffer should never be None"
data_buffer, data_dtype = buffers["data"]
_check_buffer_is_on_gpu(data_buffer)
data_buffer = _ensure_gpu_buffer(data_buffer, data_dtype, allow_copy)
encoded_string = build_column(
as_buffer(
data=data_buffer.ptr, size=data_buffer.bufsize, exposed=True
),
data_buffer._buf,
protocol_dtype_to_cupy_dtype(data_dtype),
)

# Retrieve the offsets buffer containing the index offsets demarcating
# the beginning and end of each string
assert buffers["offsets"] is not None, "not possible for string column"
offset_buffer, offset_dtype = buffers["offsets"]
_check_buffer_is_on_gpu(offset_buffer)
offset_buffer = _ensure_gpu_buffer(offset_buffer, offset_dtype, allow_copy)
offsets = build_column(
as_buffer(
data=offset_buffer.ptr, size=offset_buffer.bufsize, exposed=True
),
offset_buffer._buf,
protocol_dtype_to_cupy_dtype(offset_dtype),
)

offsets = offsets.astype("int32")
cudfcol_str = build_column(
None, dtype=cp.dtype("O"), children=(offsets, encoded_string)
)
return _set_missing_values(col, cudfcol_str), buffers
return _set_missing_values(col, cudfcol_str, allow_copy), buffers


def _protocol_buffer_to_cudf_buffer(protocol_buffer):
return as_buffer(
rmm.DeviceBuffer(
ptr=protocol_buffer.ptr, size=protocol_buffer.bufsize
),
exposed=True,
wence- marked this conversation as resolved.
Show resolved Hide resolved
)
1 change: 0 additions & 1 deletion python/cudf/cudf/core/dtypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -868,7 +868,6 @@ def from_arrow(cls, typ):
return IntervalDtype(typ.subtype.to_pandas_dtype(), typ.closed)

def to_arrow(self):

return ArrowIntervalType(
pa.from_numpy_dtype(self.subtype), self.closed
)
Expand Down
Loading