Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up __cuda_array_interface__ handling in as_column #15477

Merged
merged 9 commits into from
Apr 18, 2024
98 changes: 24 additions & 74 deletions python/cudf/cudf/core/column/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -1679,27 +1679,6 @@ def build_categorical_column(
return cast("cudf.core.column.CategoricalColumn", result)


def _make_copy_replacing_NaT_with_null(column):
"""Return a copy with NaT values replaced with nulls."""
if np.issubdtype(column.dtype, np.timedelta64):
na_value = np.timedelta64("NaT", column.time_unit)
elif np.issubdtype(column.dtype, np.datetime64):
na_value = np.datetime64("NaT", column.time_unit)
else:
raise ValueError("This type does not support replacing NaT with null.")

null = column_empty_like(column, masked=True, newsize=1)
out_col = cudf._lib.replace.replace(
column,
build_column(
as_buffer(np.array([na_value], dtype=column.dtype).view("|u1")),
dtype=column.dtype,
),
null,
)
return out_col


def check_invalid_array(shape: tuple, dtype):
"""Invalid ndarrays properties that are not supported"""
if len(shape) > 1:
Expand Down Expand Up @@ -1782,50 +1761,30 @@ def as_column(
return arbitrary
elif hasattr(arbitrary, "__cuda_array_interface__"):
desc = arbitrary.__cuda_array_interface__
shape = desc["shape"]
current_dtype = np.dtype(desc["typestr"])

check_invalid_array(shape, current_dtype)

arb_dtype = cudf.dtype(current_dtype)
check_invalid_array(desc["shape"], np.dtype(desc["typestr"]))

if desc.get("mask", None) is not None:
# Extract and remove the mask from arbitrary before
# passing to cupy.asarray
mask = _mask_from_cuda_array_interface_desc(arbitrary)
arbitrary = SimpleNamespace(__cuda_array_interface__=desc.copy())
arbitrary.__cuda_array_interface__["mask"] = None
desc = arbitrary.__cuda_array_interface__
cai_copy = desc.copy()
mask = _mask_from_cuda_array_interface_desc(
arbitrary, cai_copy.pop("mask")
)
arbitrary = SimpleNamespace(__cuda_array_interface__=cai_copy)
else:
mask = None

arbitrary = cupy.asarray(arbitrary)

if arb_dtype != current_dtype:
arbitrary = arbitrary.astype(arb_dtype)
current_dtype = arb_dtype

if (
desc["strides"] is not None
and not (arbitrary.itemsize,) == arbitrary.strides
):
arbitrary = cupy.ascontiguousarray(arbitrary)
arbitrary = cupy.ascontiguousarray(arbitrary)

data = as_buffer(arbitrary, exposed=cudf.get_option("copy_on_write"))
col = build_column(data, dtype=current_dtype, mask=mask)

col = build_column(data, dtype=arbitrary.dtype, mask=mask)
if (
nan_as_null or (mask is None and nan_as_null is None)
) and col.dtype.kind == "f":
col = col.nans_to_nulls()
if dtype is not None:
col = col.astype(dtype)

if isinstance(col, cudf.core.column.CategoricalColumn):
return col
elif np.issubdtype(col.dtype, np.floating):
if nan_as_null or (mask is None and nan_as_null is None):
mask = libcudf.transform.nans_to_nulls(col.fillna(np.nan))
col = col.set_mask(mask)
elif np.issubdtype(col.dtype, np.datetime64):
if nan_as_null or (mask is None and nan_as_null is None):
col = _make_copy_replacing_NaT_with_null(col)
return col

elif isinstance(arbitrary, (pa.Array, pa.ChunkedArray)):
Expand Down Expand Up @@ -2222,27 +2181,18 @@ def as_column(
return as_column(arbitrary, nan_as_null=nan_as_null, dtype=dtype)


def _mask_from_cuda_array_interface_desc(obj) -> Union[Buffer, None]:
desc = obj.__cuda_array_interface__
mask = desc.get("mask", None)

if mask is not None:
desc = mask.__cuda_array_interface__
ptr = desc["data"][0]
nelem = desc["shape"][0]
typestr = desc["typestr"]
typecode = typestr[1]
if typecode == "t":
mask_size = bitmask_allocation_size_bytes(nelem)
mask = as_buffer(data=ptr, size=mask_size, owner=obj)
elif typecode == "b":
col = as_column(mask)
mask = bools_to_mask(col)
else:
raise NotImplementedError(
f"Cannot infer mask from typestr {typestr}"
)
return mask
def _mask_from_cuda_array_interface_desc(obj, cai_mask) -> Buffer:
desc = cai_mask.__cuda_array_interface__
typestr = desc["typestr"]
typecode = typestr[1]
if typecode == "t":
mask_size = bitmask_allocation_size_bytes(desc["shape"][0])
return as_buffer(data=desc["data"][0], size=mask_size, owner=obj)
elif typecode == "b":
col = as_column(cai_mask)
return bools_to_mask(col)
else:
raise NotImplementedError(f"Cannot infer mask from typestr {typestr}")


def serialize_columns(columns) -> Tuple[List[dict], List]:
Expand Down
Loading