-
Notifications
You must be signed in to change notification settings - Fork 915
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix consumption of CPU-backed interchange protocol dataframes #11392
Changes from 36 commits
ca971da
a39281b
f87f232
fc1647a
b7b3d76
41d0381
1b92423
7095878
01bd01e
28d12db
47eea3c
8d53832
a558088
5fae833
988443d
d8d545e
817bcf1
ff61af7
092d20c
287756a
f95fd4b
c05790c
f92d742
c3a83c2
835d439
cb9c059
9602fba
1290afc
022991d
4fd41e2
98c8035
9ccd440
f1cb5cb
2a5859d
1ec350d
ecd91d2
48c7e7d
e4d1d4f
2525790
e0f3275
99e7c65
0837055
f48422f
138b4d2
e90fe68
0ac28ed
862f587
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,8 @@ | |
import numpy as np | ||
from numba.cuda import as_cuda_array | ||
|
||
import rmm | ||
|
||
import cudf | ||
from cudf.core.buffer import Buffer, as_buffer | ||
from cudf.core.column import as_column, build_categorical_column, build_column | ||
|
@@ -46,6 +48,14 @@ class _Device(enum.IntEnum): | |
ROCM = 10 | ||
|
||
|
||
class _MaskKind(enum.IntEnum): | ||
NON_NULLABLE = (0,) | ||
NAN = (1,) | ||
SENTINEL = (2,) | ||
BITMASK = (3,) | ||
BYTEMASK = 4 | ||
|
||
|
||
_SUPPORTED_KINDS = { | ||
_DtypeKind.INT, | ||
_DtypeKind.UINT, | ||
|
@@ -110,7 +120,6 @@ def __repr__(self) -> str: | |
{ | ||
"bufsize": self.bufsize, | ||
"ptr": self.ptr, | ||
"dlpack": self.__dlpack__(), | ||
"device": self.__dlpack_device__()[0].name, | ||
} | ||
) | ||
|
@@ -311,11 +320,11 @@ def describe_null(self) -> Tuple[int, Any]: | |
kind = self.dtype[0] | ||
if self.null_count == 0: | ||
# there is no validity mask so it is non-nullable | ||
return 0, None | ||
return _MaskKind.NON_NULLABLE, None | ||
|
||
elif kind in _SUPPORTED_KINDS: | ||
# bit mask is universally used in cudf for missing | ||
return 3, 0 | ||
# currently, we return a bit mask | ||
return _MaskKind.BITMASK, 0 | ||
|
||
else: | ||
raise NotImplementedError( | ||
|
@@ -399,32 +408,22 @@ def _get_validity_buffer( | |
|
||
Raises RuntimeError if null representation is not a bit or byte mask. | ||
""" | ||
|
||
shwina marked this conversation as resolved.
Show resolved
Hide resolved
|
||
null, invalid = self.describe_null | ||
if null == 3: | ||
if self.dtype[0] == _DtypeKind.CATEGORICAL: | ||
valid_mask = cast( | ||
cudf.core.column.CategoricalColumn, self._col | ||
).codes._get_mask_as_column() | ||
else: | ||
valid_mask = self._col._get_mask_as_column() | ||
|
||
assert (valid_mask is not None) and ( | ||
valid_mask.data is not None | ||
), "valid_mask(.data) should not be None when " | ||
"_CuDFColumn.describe_null[0] = 3" | ||
if null == _MaskKind.BITMASK: | ||
assert self._col.mask is not None | ||
buffer = _CuDFBuffer( | ||
valid_mask.data, cp.uint8, allow_copy=self._allow_copy | ||
self._col.mask, cp.uint8, allow_copy=self._allow_copy | ||
) | ||
dtype = (_DtypeKind.UINT, 8, "C", "=") | ||
return buffer, dtype | ||
|
||
elif null == 1: | ||
elif null == _MaskKind.NAN: | ||
raise RuntimeError( | ||
"This column uses NaN as null " | ||
"so does not have a separate mask" | ||
) | ||
elif null == 0: | ||
elif null == _MaskKind.NON_NULLABLE: | ||
raise RuntimeError( | ||
"This column is non-nullable so does not have a mask" | ||
) | ||
|
@@ -645,7 +644,13 @@ def __dataframe__( | |
_INTS = {8: cp.int8, 16: cp.int16, 32: cp.int32, 64: cp.int64} | ||
_UINTS = {8: cp.uint8, 16: cp.uint16, 32: cp.uint32, 64: cp.uint64} | ||
_FLOATS = {32: cp.float32, 64: cp.float64} | ||
_CP_DTYPES = {0: _INTS, 1: _UINTS, 2: _FLOATS, 20: {8: bool}} | ||
_CP_DTYPES = { | ||
0: _INTS, | ||
1: _UINTS, | ||
2: _FLOATS, | ||
20: {8: bool}, | ||
21: {8: cp.uint8}, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Strings columns in the protocol are composed of a As a clarifying example, look at the buffers returned by a Pandas dataframe containing a single string column: In [17]: pd.DataFrame({'a': ['x', 'y', 'z']}).__dataframe__().get_column(0).get_buffers()
Out[17]:
{'data': (PandasBuffer({'bufsize': 3, 'ptr': 140001666253824, 'device': 'CPU'}),
(<DtypeKind.STRING: 21>, 8, 'u', '=')),
'validity': (PandasBuffer({'bufsize': 3, 'ptr': 94467671186912, 'device': 'CPU'}),
(<DtypeKind.BOOL: 20>, 8, 'b', '=')),
'offsets': (PandasBuffer({'bufsize': 32, 'ptr': 94467669930400, 'device': 'CPU'}),
(<DtypeKind.INT: 0>, 64, 'l', '='))} |
||
} | ||
|
||
|
||
def from_dataframe( | ||
|
@@ -660,13 +665,8 @@ def from_dataframe( | |
if not hasattr(df, "__dataframe__"): | ||
raise ValueError("`df` does not support __dataframe__") | ||
|
||
return _from_dataframe(df.__dataframe__(allow_copy=allow_copy)) | ||
df = df.__dataframe__(allow_copy=allow_copy) | ||
shwina marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
def _from_dataframe(df: DataFrameObject) -> _CuDFDataFrame: | ||
""" | ||
Create a cudf DataFrame object from DataFrameObject. | ||
""" | ||
# Check number of chunks, if there's more than one we need to iterate | ||
if df.num_chunks() > 1: | ||
raise NotImplementedError("More than one chunk not handled yet") | ||
|
@@ -683,13 +683,19 @@ def _from_dataframe(df: DataFrameObject) -> _CuDFDataFrame: | |
_DtypeKind.FLOAT, | ||
_DtypeKind.BOOL, | ||
): | ||
columns[name], _buf = _protocol_to_cudf_column_numeric(col) | ||
columns[name], _buf = _protocol_to_cudf_column_numeric( | ||
col, allow_copy | ||
) | ||
|
||
elif col.dtype[0] == _DtypeKind.CATEGORICAL: | ||
columns[name], _buf = _protocol_to_cudf_column_categorical(col) | ||
columns[name], _buf = _protocol_to_cudf_column_categorical( | ||
col, allow_copy | ||
) | ||
|
||
elif col.dtype[0] == _DtypeKind.STRING: | ||
columns[name], _buf = _protocol_to_cudf_column_string(col) | ||
columns[name], _buf = _protocol_to_cudf_column_string( | ||
col, allow_copy | ||
) | ||
|
||
else: | ||
raise NotImplementedError( | ||
|
@@ -704,7 +710,7 @@ def _from_dataframe(df: DataFrameObject) -> _CuDFDataFrame: | |
|
||
|
||
def _protocol_to_cudf_column_numeric( | ||
col: _CuDFColumn, | ||
col, allow_copy: bool | ||
) -> Tuple[ | ||
cudf.core.column.ColumnBase, | ||
Mapping[str, Optional[Tuple[_CuDFBuffer, ProtoDtype]]], | ||
|
@@ -719,49 +725,56 @@ def _protocol_to_cudf_column_numeric( | |
buffers = col.get_buffers() | ||
assert buffers["data"] is not None, "data buffer should not be None" | ||
_dbuffer, _ddtype = buffers["data"] | ||
_check_buffer_is_on_gpu(_dbuffer) | ||
_dbuffer = _ensure_gpu_buffer(_dbuffer, _ddtype, allow_copy) | ||
cudfcol_num = build_column( | ||
as_buffer( | ||
data=_dbuffer.ptr, size=_dbuffer.bufsize, owner=None, exposed=True | ||
), | ||
_dbuffer._buf, | ||
protocol_dtype_to_cupy_dtype(_ddtype), | ||
) | ||
return _set_missing_values(col, cudfcol_num), buffers | ||
|
||
return _set_missing_values(col, cudfcol_num, allow_copy), buffers | ||
|
||
def _check_buffer_is_on_gpu(buffer: _CuDFBuffer) -> None: | ||
if ( | ||
buffer.__dlpack_device__()[0] != _Device.CUDA | ||
and not buffer._allow_copy | ||
): | ||
raise TypeError( | ||
"This operation must copy data from CPU to GPU. " | ||
"Set `allow_copy=True` to allow it." | ||
) | ||
|
||
elif buffer.__dlpack_device__()[0] != _Device.CUDA and buffer._allow_copy: | ||
raise NotImplementedError( | ||
"Only cuDF/GPU dataframes are supported for now. " | ||
"CPU (like `Pandas`) dataframes will be supported shortly." | ||
) | ||
def _ensure_gpu_buffer(buf, data_type, allow_copy: bool) -> _CuDFBuffer: | ||
# if `buf` is a (protocol) buffer that lives on the GPU already, | ||
# return it as is. Otherwise, copy it to the device and return | ||
# the resulting buffer. | ||
if buf.__dlpack_device__()[0] != _Device.CUDA: | ||
if not allow_copy: | ||
raise TypeError( | ||
"This operation must copy data from CPU to GPU. " | ||
"Set `allow_copy=True` to allow it." | ||
) | ||
else: | ||
dbuf = rmm.DeviceBuffer(ptr=buf.ptr, size=buf.bufsize) | ||
return _CuDFBuffer( | ||
as_buffer(dbuf, exposed=True), | ||
protocol_dtype_to_cupy_dtype(data_type), | ||
allow_copy, | ||
) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Style, I (at least) find double-negated conditions hard to parse. Perhaps have the truthy case in the if, and the exceptional case in the else branch:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
return buf | ||
|
||
|
||
def _set_missing_values( | ||
protocol_col: _CuDFColumn, cudf_col: cudf.core.column.ColumnBase | ||
protocol_col, | ||
cudf_col: cudf.core.column.ColumnBase, | ||
allow_copy: bool, | ||
) -> cudf.core.column.ColumnBase: | ||
|
||
valid_mask = protocol_col.get_buffers()["validity"] | ||
if valid_mask is not None: | ||
bitmask = cp.asarray( | ||
as_buffer( | ||
data=valid_mask[0].ptr, | ||
size=valid_mask[0].bufsize, | ||
exposed=True, | ||
), | ||
cp.bool8, | ||
) | ||
cudf_col[~bitmask] = None | ||
|
||
null, invalid = protocol_col.describe_null | ||
if null == _MaskKind.BYTEMASK: | ||
valid_mask = _ensure_gpu_buffer( | ||
valid_mask[0], valid_mask[1], allow_copy | ||
) | ||
boolmask = as_column(valid_mask._buf, dtype="bool") | ||
bitmask = cudf._lib.transform.bools_to_mask(boolmask) | ||
return cudf_col.set_mask(bitmask) | ||
elif null == _MaskKind.BITMASK: | ||
valid_mask = _ensure_gpu_buffer( | ||
valid_mask[0], valid_mask[1], allow_copy | ||
) | ||
bitmask = valid_mask._buf | ||
return cudf_col.set_mask(bitmask) | ||
return cudf_col | ||
|
||
|
||
|
@@ -775,7 +788,7 @@ def protocol_dtype_to_cupy_dtype(_dtype: ProtoDtype) -> cp.dtype: | |
|
||
|
||
def _protocol_to_cudf_column_categorical( | ||
col: _CuDFColumn, | ||
col, allow_copy: bool | ||
) -> Tuple[ | ||
cudf.core.column.ColumnBase, | ||
Mapping[str, Optional[Tuple[_CuDFBuffer, ProtoDtype]]], | ||
|
@@ -793,12 +806,10 @@ def _protocol_to_cudf_column_categorical( | |
buffers = col.get_buffers() | ||
assert buffers["data"] is not None, "data buffer should not be None" | ||
codes_buffer, codes_dtype = buffers["data"] | ||
_check_buffer_is_on_gpu(codes_buffer) | ||
codes_buffer = _ensure_gpu_buffer(codes_buffer, codes_dtype, allow_copy) | ||
cdtype = protocol_dtype_to_cupy_dtype(codes_dtype) | ||
codes = build_column( | ||
as_buffer( | ||
data=codes_buffer.ptr, size=codes_buffer.bufsize, exposed=True | ||
), | ||
codes_buffer._buf, | ||
cdtype, | ||
) | ||
|
||
|
@@ -810,11 +821,11 @@ def _protocol_to_cudf_column_categorical( | |
ordered=ordered, | ||
) | ||
|
||
return _set_missing_values(col, cudfcol), buffers | ||
return _set_missing_values(col, cudfcol, allow_copy), buffers | ||
|
||
|
||
def _protocol_to_cudf_column_string( | ||
col: _CuDFColumn, | ||
col, allow_copy: bool | ||
) -> Tuple[ | ||
cudf.core.column.ColumnBase, | ||
Mapping[str, Optional[Tuple[_CuDFBuffer, ProtoDtype]]], | ||
|
@@ -828,27 +839,32 @@ def _protocol_to_cudf_column_string( | |
# Retrieve the data buffer containing the UTF-8 code units | ||
assert buffers["data"] is not None, "data buffer should never be None" | ||
data_buffer, data_dtype = buffers["data"] | ||
_check_buffer_is_on_gpu(data_buffer) | ||
data_buffer = _ensure_gpu_buffer(data_buffer, data_dtype, allow_copy) | ||
encoded_string = build_column( | ||
as_buffer( | ||
data=data_buffer.ptr, size=data_buffer.bufsize, exposed=True | ||
), | ||
data_buffer._buf, | ||
protocol_dtype_to_cupy_dtype(data_dtype), | ||
) | ||
|
||
# Retrieve the offsets buffer containing the index offsets demarcating | ||
# the beginning and end of each string | ||
assert buffers["offsets"] is not None, "not possible for string column" | ||
offset_buffer, offset_dtype = buffers["offsets"] | ||
_check_buffer_is_on_gpu(offset_buffer) | ||
offset_buffer = _ensure_gpu_buffer(offset_buffer, offset_dtype, allow_copy) | ||
offsets = build_column( | ||
as_buffer( | ||
data=offset_buffer.ptr, size=offset_buffer.bufsize, exposed=True | ||
), | ||
offset_buffer._buf, | ||
protocol_dtype_to_cupy_dtype(offset_dtype), | ||
) | ||
|
||
offsets = offsets.astype("int32") | ||
cudfcol_str = build_column( | ||
None, dtype=cp.dtype("O"), children=(offsets, encoded_string) | ||
) | ||
return _set_missing_values(col, cudfcol_str), buffers | ||
return _set_missing_values(col, cudfcol_str, allow_copy), buffers | ||
|
||
|
||
def _protocol_buffer_to_cudf_buffer(protocol_buffer): | ||
return as_buffer( | ||
rmm.DeviceBuffer( | ||
ptr=protocol_buffer.ptr, size=protocol_buffer.bufsize | ||
), | ||
exposed=True, | ||
wence- marked this conversation as resolved.
Show resolved
Hide resolved
|
||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prior to this PR, we used raw ints to represent this information.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it deliberate that
BYTEMASK
is an int, but the other mask kinds are tuples? (They both turn into ints on construction, but the tuple instantiation looks kind of weird.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed to just use ints here.