diff --git a/python/cudf/cudf/core/df_protocol.py b/python/cudf/cudf/core/df_protocol.py index 2090906380e..6e1c5f6fd00 100644 --- a/python/cudf/cudf/core/df_protocol.py +++ b/python/cudf/cudf/core/df_protocol.py @@ -17,6 +17,8 @@ import numpy as np from numba.cuda import as_cuda_array +import rmm + import cudf from cudf.core.buffer import Buffer, as_buffer from cudf.core.column import as_column, build_categorical_column, build_column @@ -46,6 +48,14 @@ class _Device(enum.IntEnum): ROCM = 10 +class _MaskKind(enum.IntEnum): + NON_NULLABLE = 0 + NAN = 1 + SENTINEL = 2 + BITMASK = 3 + BYTEMASK = 4 + + _SUPPORTED_KINDS = { _DtypeKind.INT, _DtypeKind.UINT, @@ -110,7 +120,6 @@ def __repr__(self) -> str: { "bufsize": self.bufsize, "ptr": self.ptr, - "dlpack": self.__dlpack__(), "device": self.__dlpack_device__()[0].name, } ) @@ -151,7 +160,6 @@ def __init__( self._nan_as_null = nan_as_null self._allow_copy = allow_copy - @property def size(self) -> int: """ Size of the column, in elements. @@ -311,11 +319,11 @@ def describe_null(self) -> Tuple[int, Any]: kind = self.dtype[0] if self.null_count == 0: # there is no validity mask so it is non-nullable - return 0, None + return _MaskKind.NON_NULLABLE, None elif kind in _SUPPORTED_KINDS: - # bit mask is universally used in cudf for missing - return 3, 0 + # currently, we return a bit mask + return _MaskKind.BITMASK, 0 else: raise NotImplementedError( @@ -399,32 +407,22 @@ def _get_validity_buffer( Raises RuntimeError if null representation is not a bit or byte mask. """ - null, invalid = self.describe_null - if null == 3: - if self.dtype[0] == _DtypeKind.CATEGORICAL: - valid_mask = cast( - cudf.core.column.CategoricalColumn, self._col - ).codes._get_mask_as_column() - else: - valid_mask = self._col._get_mask_as_column() - assert (valid_mask is not None) and ( - valid_mask.data is not None - ), "valid_mask(.data) should not be None when " - "_CuDFColumn.describe_null[0] = 3" + if null == _MaskKind.BITMASK: + assert self._col.mask is not None buffer = _CuDFBuffer( - valid_mask.data, cp.uint8, allow_copy=self._allow_copy + self._col.mask, cp.uint8, allow_copy=self._allow_copy ) dtype = (_DtypeKind.UINT, 8, "C", "=") return buffer, dtype - elif null == 1: + elif null == _MaskKind.NAN: raise RuntimeError( "This column uses NaN as null " "so does not have a separate mask" ) - elif null == 0: + elif null == _MaskKind.NON_NULLABLE: raise RuntimeError( "This column is non-nullable so does not have a mask" ) @@ -645,14 +643,53 @@ def __dataframe__( _INTS = {8: cp.int8, 16: cp.int16, 32: cp.int32, 64: cp.int64} _UINTS = {8: cp.uint8, 16: cp.uint16, 32: cp.uint32, 64: cp.uint64} _FLOATS = {32: cp.float32, 64: cp.float64} -_CP_DTYPES = {0: _INTS, 1: _UINTS, 2: _FLOATS, 20: {8: bool}} +_CP_DTYPES = { + 0: _INTS, + 1: _UINTS, + 2: _FLOATS, + 20: {8: bool}, + 21: {8: cp.uint8}, +} def from_dataframe( df: DataFrameObject, allow_copy: bool = False ) -> _CuDFDataFrame: """ - Construct a cudf DataFrame from ``df`` if it supports ``__dataframe__`` + Construct a ``DataFrame`` from ``df`` if it supports the + dataframe interchange protocol (``__dataframe__``). + + Parameters + ---------- + df : DataFrameObject + Object supporting dataframe interchange protocol + allow_copy : bool + If ``True``, allow copying of the data. If ``False``, a + ``TypeError`` is raised if data copying is required to + construct the ``DataFrame`` (e.g., if ``df`` lives in CPU + memory). + + Returns + ------- + DataFrame + + Examples + -------- + >>> import pandas as pd + >>> pdf = pd.DataFrame({'a': [1, 2, 3], 'b': ['x', 'y', 'z']}) + >>> df = cudf.from_dataframe(pdf, allow_copy=True) + >>> type(df) + cudf.core.dataframe.DataFrame + >>> df + a b + 0 1 x + 1 2 y + 2 3 z + + Notes + ----- + See https://data-apis.org/dataframe-protocol/latest/index.html + for the dataframe interchange protocol spec and API """ if isinstance(df, cudf.DataFrame): return df @@ -660,13 +697,8 @@ def from_dataframe( if not hasattr(df, "__dataframe__"): raise ValueError("`df` does not support __dataframe__") - return _from_dataframe(df.__dataframe__(allow_copy=allow_copy)) + df = df.__dataframe__(allow_copy=allow_copy) - -def _from_dataframe(df: DataFrameObject) -> _CuDFDataFrame: - """ - Create a cudf DataFrame object from DataFrameObject. - """ # Check number of chunks, if there's more than one we need to iterate if df.num_chunks() > 1: raise NotImplementedError("More than one chunk not handled yet") @@ -683,13 +715,19 @@ def _from_dataframe(df: DataFrameObject) -> _CuDFDataFrame: _DtypeKind.FLOAT, _DtypeKind.BOOL, ): - columns[name], _buf = _protocol_to_cudf_column_numeric(col) + columns[name], _buf = _protocol_to_cudf_column_numeric( + col, allow_copy + ) elif col.dtype[0] == _DtypeKind.CATEGORICAL: - columns[name], _buf = _protocol_to_cudf_column_categorical(col) + columns[name], _buf = _protocol_to_cudf_column_categorical( + col, allow_copy + ) elif col.dtype[0] == _DtypeKind.STRING: - columns[name], _buf = _protocol_to_cudf_column_string(col) + columns[name], _buf = _protocol_to_cudf_column_string( + col, allow_copy + ) else: raise NotImplementedError( @@ -704,7 +742,7 @@ def _from_dataframe(df: DataFrameObject) -> _CuDFDataFrame: def _protocol_to_cudf_column_numeric( - col: _CuDFColumn, + col, allow_copy: bool ) -> Tuple[ cudf.core.column.ColumnBase, Mapping[str, Optional[Tuple[_CuDFBuffer, ProtoDtype]]], @@ -719,49 +757,56 @@ def _protocol_to_cudf_column_numeric( buffers = col.get_buffers() assert buffers["data"] is not None, "data buffer should not be None" _dbuffer, _ddtype = buffers["data"] - _check_buffer_is_on_gpu(_dbuffer) + _dbuffer = _ensure_gpu_buffer(_dbuffer, _ddtype, allow_copy) cudfcol_num = build_column( - as_buffer( - data=_dbuffer.ptr, size=_dbuffer.bufsize, owner=None, exposed=True - ), + _dbuffer._buf, protocol_dtype_to_cupy_dtype(_ddtype), ) - return _set_missing_values(col, cudfcol_num), buffers - - -def _check_buffer_is_on_gpu(buffer: _CuDFBuffer) -> None: - if ( - buffer.__dlpack_device__()[0] != _Device.CUDA - and not buffer._allow_copy - ): - raise TypeError( - "This operation must copy data from CPU to GPU. " - "Set `allow_copy=True` to allow it." - ) - - elif buffer.__dlpack_device__()[0] != _Device.CUDA and buffer._allow_copy: - raise NotImplementedError( - "Only cuDF/GPU dataframes are supported for now. " - "CPU (like `Pandas`) dataframes will be supported shortly." - ) + return _set_missing_values(col, cudfcol_num, allow_copy), buffers + + +def _ensure_gpu_buffer(buf, data_type, allow_copy: bool) -> _CuDFBuffer: + # if `buf` is a (protocol) buffer that lives on the GPU already, + # return it as is. Otherwise, copy it to the device and return + # the resulting buffer. + if buf.__dlpack_device__()[0] != _Device.CUDA: + if allow_copy: + dbuf = rmm.DeviceBuffer(ptr=buf.ptr, size=buf.bufsize) + return _CuDFBuffer( + as_buffer(dbuf, exposed=True), + protocol_dtype_to_cupy_dtype(data_type), + allow_copy, + ) + else: + raise TypeError( + "This operation must copy data from CPU to GPU. " + "Set `allow_copy=True` to allow it." + ) + return buf def _set_missing_values( - protocol_col: _CuDFColumn, cudf_col: cudf.core.column.ColumnBase + protocol_col, + cudf_col: cudf.core.column.ColumnBase, + allow_copy: bool, ) -> cudf.core.column.ColumnBase: valid_mask = protocol_col.get_buffers()["validity"] if valid_mask is not None: - bitmask = cp.asarray( - as_buffer( - data=valid_mask[0].ptr, - size=valid_mask[0].bufsize, - exposed=True, - ), - cp.bool8, - ) - cudf_col[~bitmask] = None - + null, invalid = protocol_col.describe_null + if null == _MaskKind.BYTEMASK: + valid_mask = _ensure_gpu_buffer( + valid_mask[0], valid_mask[1], allow_copy + ) + boolmask = as_column(valid_mask._buf, dtype="bool") + bitmask = cudf._lib.transform.bools_to_mask(boolmask) + return cudf_col.set_mask(bitmask) + elif null == _MaskKind.BITMASK: + valid_mask = _ensure_gpu_buffer( + valid_mask[0], valid_mask[1], allow_copy + ) + bitmask = valid_mask._buf + return cudf_col.set_mask(bitmask) return cudf_col @@ -775,7 +820,7 @@ def protocol_dtype_to_cupy_dtype(_dtype: ProtoDtype) -> cp.dtype: def _protocol_to_cudf_column_categorical( - col: _CuDFColumn, + col, allow_copy: bool ) -> Tuple[ cudf.core.column.ColumnBase, Mapping[str, Optional[Tuple[_CuDFBuffer, ProtoDtype]]], @@ -783,22 +828,18 @@ def _protocol_to_cudf_column_categorical( """ Convert a categorical column to a Series instance """ - ordered, is_dict, mapping = col.describe_categorical + ordered, is_dict, categories = col.describe_categorical if not is_dict: raise NotImplementedError( "Non-dictionary categoricals not supported yet" ) - - categories = as_column(mapping.values()) buffers = col.get_buffers() assert buffers["data"] is not None, "data buffer should not be None" codes_buffer, codes_dtype = buffers["data"] - _check_buffer_is_on_gpu(codes_buffer) + codes_buffer = _ensure_gpu_buffer(codes_buffer, codes_dtype, allow_copy) cdtype = protocol_dtype_to_cupy_dtype(codes_dtype) codes = build_column( - as_buffer( - data=codes_buffer.ptr, size=codes_buffer.bufsize, exposed=True - ), + codes_buffer._buf, cdtype, ) @@ -810,11 +851,11 @@ def _protocol_to_cudf_column_categorical( ordered=ordered, ) - return _set_missing_values(col, cudfcol), buffers + return _set_missing_values(col, cudfcol, allow_copy), buffers def _protocol_to_cudf_column_string( - col: _CuDFColumn, + col, allow_copy: bool ) -> Tuple[ cudf.core.column.ColumnBase, Mapping[str, Optional[Tuple[_CuDFBuffer, ProtoDtype]]], @@ -828,11 +869,9 @@ def _protocol_to_cudf_column_string( # Retrieve the data buffer containing the UTF-8 code units assert buffers["data"] is not None, "data buffer should never be None" data_buffer, data_dtype = buffers["data"] - _check_buffer_is_on_gpu(data_buffer) + data_buffer = _ensure_gpu_buffer(data_buffer, data_dtype, allow_copy) encoded_string = build_column( - as_buffer( - data=data_buffer.ptr, size=data_buffer.bufsize, exposed=True - ), + data_buffer._buf, protocol_dtype_to_cupy_dtype(data_dtype), ) @@ -840,15 +879,22 @@ def _protocol_to_cudf_column_string( # the beginning and end of each string assert buffers["offsets"] is not None, "not possible for string column" offset_buffer, offset_dtype = buffers["offsets"] - _check_buffer_is_on_gpu(offset_buffer) + offset_buffer = _ensure_gpu_buffer(offset_buffer, offset_dtype, allow_copy) offsets = build_column( - as_buffer( - data=offset_buffer.ptr, size=offset_buffer.bufsize, exposed=True - ), + offset_buffer._buf, protocol_dtype_to_cupy_dtype(offset_dtype), ) - + offsets = offsets.astype("int32") cudfcol_str = build_column( None, dtype=cp.dtype("O"), children=(offsets, encoded_string) ) - return _set_missing_values(col, cudfcol_str), buffers + return _set_missing_values(col, cudfcol_str, allow_copy), buffers + + +def _protocol_buffer_to_cudf_buffer(protocol_buffer): + return as_buffer( + rmm.DeviceBuffer( + ptr=protocol_buffer.ptr, size=protocol_buffer.bufsize + ), + exposed=True, + ) diff --git a/python/cudf/cudf/core/dtypes.py b/python/cudf/cudf/core/dtypes.py index 406708fd58b..49c931a4218 100644 --- a/python/cudf/cudf/core/dtypes.py +++ b/python/cudf/cudf/core/dtypes.py @@ -870,7 +870,6 @@ def from_arrow(cls, typ): return IntervalDtype(typ.subtype.to_pandas_dtype(), typ.closed) def to_arrow(self): - return ArrowIntervalType( pa.from_numpy_dtype(self.subtype), self.closed ) diff --git a/python/cudf/cudf/tests/test_df_protocol.py b/python/cudf/cudf/tests/test_df_protocol.py index 8a53c77da66..d6134c7bb01 100644 --- a/python/cudf/cudf/tests/test_df_protocol.py +++ b/python/cudf/cudf/tests/test_df_protocol.py @@ -7,25 +7,70 @@ import pytest import cudf +from cudf.core._compat import PANDAS_GE_150 from cudf.core.buffer import as_buffer -from cudf.core.column import build_column +from cudf.core.column import as_column, build_column from cudf.core.df_protocol import ( DataFrameObject, _CuDFBuffer, _CuDFColumn, _DtypeKind, - _from_dataframe, + _MaskKind, + _protocol_buffer_to_cudf_buffer, + from_dataframe, protocol_dtype_to_cupy_dtype, ) from cudf.testing._utils import assert_eq +@pytest.fixture( + params=[ + {"a": [1, 2, 3], "b": ["x", "y", "z"]}, + {"a": [1, 2, None], "b": ["x", "y", "z"]}, + {"a": [1, 2, 3], "b": pd.Categorical(["x", "y", None])}, + ] +) +def pandas_df(request): + data = request.param + return pd.DataFrame(data) + + +def assert_validity_equal(protocol_buffer, cudf_buffer, size, null, valid): + if null == _MaskKind.BYTEMASK: + protocol_mask = _protocol_buffer_to_cudf_buffer(protocol_buffer) + assert_eq( + as_column(protocol_mask, dtype="bool"), + as_column(cudf_buffer, dtype="bool"), + ) + elif null == _MaskKind.BITMASK: + protocol_mask = _protocol_buffer_to_cudf_buffer(protocol_buffer) + cudf_mask = cudf_buffer + assert_eq( + build_column( + as_buffer(cp.zeros(10, dtype="int8")), + "int8", + size=size, + mask=protocol_mask, + children=(), + ), + build_column( + as_buffer(cp.zeros(10, dtype="int8")), + "int8", + size=size, + mask=cudf_mask, + children=(), + ), + ) + else: + raise NotImplementedError() + + def assert_buffer_equal(buffer_and_dtype: Tuple[_CuDFBuffer, Any], cudfcol): buf, dtype = buffer_and_dtype device_id = cp.asarray(cudfcol.data).device.id assert buf.__dlpack_device__() == (2, device_id) col_from_buf = build_column( - as_buffer(data=buf.ptr, size=buf.bufsize), + _protocol_buffer_to_cudf_buffer(buf), protocol_dtype_to_cupy_dtype(dtype), ) # check that non null values are the equals as nulls are represented @@ -38,7 +83,6 @@ def assert_buffer_equal(buffer_and_dtype: Tuple[_CuDFBuffer, Any], cudfcol): col_from_buf.apply_boolean_mask(non_null_idxs), cudfcol.apply_boolean_mask(non_null_idxs), ) - array_from_dlpack = cp.from_dlpack(buf.__dlpack__()).get() col_array = cp.asarray(cudfcol.data_array_view(mode="read")).get() assert_eq( @@ -48,7 +92,7 @@ def assert_buffer_equal(buffer_and_dtype: Tuple[_CuDFBuffer, Any], cudfcol): def assert_column_equal(col: _CuDFColumn, cudfcol): - assert col.size == cudfcol.size + assert col.size() == cudfcol.size assert col.offset == 0 assert col.null_count == cudfcol.null_count assert col.num_chunks() == 1 @@ -56,9 +100,11 @@ def assert_column_equal(col: _CuDFColumn, cudfcol): pytest.raises(RuntimeError, col._get_validity_buffer) assert col.get_buffers()["validity"] is None else: - assert_buffer_equal( - col.get_buffers()["validity"], - cudfcol._get_mask_as_column().astype(cp.uint8), + assert_validity_equal( + col.get_buffers()["validity"][0], + cudfcol.mask, + cudfcol.size, + *col.describe_null, ) if col.dtype[0] == _DtypeKind.CATEGORICAL: @@ -88,31 +134,31 @@ def assert_dataframe_equal(dfo: DataFrameObject, df: cudf.DataFrame): assert_column_equal(dfo.get_column_by_name(col), df[col]._column) -def assert_from_dataframe_equals(dfobj): - df2 = _from_dataframe(dfobj) +def assert_from_dataframe_equals(dfobj, allow_copy): + df2 = from_dataframe(dfobj, allow_copy=allow_copy) - assert_dataframe_equal(dfobj, df2) - if isinstance(dfobj._df, cudf.DataFrame): - assert_eq(dfobj._df, df2) + assert_dataframe_equal(dfobj.__dataframe__(allow_copy), df2) + if isinstance(dfobj, cudf.DataFrame): + assert_eq(dfobj, df2) - elif isinstance(dfobj._df, pd.DataFrame): - assert_eq(cudf.DataFrame(dfobj._df), df2) + elif isinstance(dfobj, pd.DataFrame): + assert_eq(cudf.DataFrame(dfobj), df2) else: - raise TypeError(f"{type(dfobj._df)} not supported yet.") + raise TypeError(f"{type(dfobj)} not supported yet.") -def assert_from_dataframe_exception(dfobj): +def test_from_dataframe_exception(pandas_df): exception_msg = "This operation must copy data from CPU to GPU." " Set `allow_copy=True` to allow it." with pytest.raises(TypeError, match=exception_msg): - _from_dataframe(dfobj) + from_dataframe(pandas_df) def assert_df_unique_dtype_cols(data): cdf = cudf.DataFrame(data=data) - assert_from_dataframe_equals(cdf.__dataframe__(allow_copy=False)) - assert_from_dataframe_equals(cdf.__dataframe__(allow_copy=True)) + assert_from_dataframe_equals(cdf, allow_copy=False) + assert_from_dataframe_equals(cdf, allow_copy=True) def test_from_dataframe(): @@ -141,8 +187,8 @@ def test_categorical_dtype(): col = cdf.__dataframe__().get_column_by_name("A") assert col.dtype[0] == _DtypeKind.CATEGORICAL assert col.describe_categorical == (False, True, {0: 1, 1: 2, 2: 5}) - assert_from_dataframe_equals(cdf.__dataframe__(allow_copy=False)) - assert_from_dataframe_equals(cdf.__dataframe__(allow_copy=True)) + assert_from_dataframe_equals(cdf, allow_copy=False) + assert_from_dataframe_equals(cdf, allow_copy=True) def test_bool_dtype(): @@ -196,8 +242,8 @@ def test_NA_categorical_dtype(): assert col.describe_null == (3, 0) assert col.num_chunks() == 1 assert col.describe_categorical == (False, True, {0: 1, 1: 2, 2: 5}) - assert_from_dataframe_equals(df.__dataframe__(allow_copy=False)) - assert_from_dataframe_equals(df.__dataframe__(allow_copy=True)) + assert_from_dataframe_equals(df, allow_copy=False) + assert_from_dataframe_equals(df, allow_copy=True) def test_NA_bool_dtype(): @@ -216,8 +262,8 @@ def test_NA_string_dtype(): assert col.null_count == 1 assert col.describe_null == (3, 0) assert col.num_chunks() == 1 - assert_from_dataframe_equals(df.__dataframe__(allow_copy=False)) - assert_from_dataframe_equals(df.__dataframe__(allow_copy=True)) + assert_from_dataframe_equals(df, allow_copy=False) + assert_from_dataframe_equals(df, allow_copy=True) def test_NA_mixed_dtype(): @@ -229,3 +275,11 @@ def test_NA_mixed_dtype(): string=[None, None, None, "df protocol", None], ) assert_df_unique_dtype_cols(data_mixed) + + +@pytest.mark.skipif( + not PANDAS_GE_150, + reason="Pandas versions < 1.5.0 do not support interchange protocol", +) +def test_from_cpu_df(pandas_df): + cudf.from_dataframe(pandas_df, allow_copy=True)