Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Column refactoring 2 #8130

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 47 additions & 13 deletions python/cudf/cudf/core/column/categorical.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import pickle
from collections.abc import MutableSequence
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -819,7 +820,7 @@ def __contains__(self, item: ScalarLike) -> bool:
return self._encode(item) in self.as_numerical

def serialize(self) -> Tuple[dict, list]:
header = {} # type: Dict[Any, Any]
header: Dict[Any, Any] = {}
frames = []
header["type-serialized"] = pickle.dumps(type(self))
header["dtype"], dtype_frames = self.dtype.serialize()
Expand Down Expand Up @@ -1343,21 +1344,11 @@ def find_last_value(self, value: ScalarLike, closest: bool = False) -> int:

@property
def is_monotonic_increasing(self) -> bool:
if not hasattr(self, "_is_monotonic_increasing"):
self._is_monotonic_increasing = (
bool(self.ordered)
and self.as_numerical.is_monotonic_increasing
)
return self._is_monotonic_increasing
return bool(self.ordered) and self.as_numerical.is_monotonic_increasing

@property
def is_monotonic_decreasing(self) -> bool:
if not hasattr(self, "_is_monotonic_decreasing"):
self._is_monotonic_decreasing = (
bool(self.ordered)
and self.as_numerical.is_monotonic_decreasing
)
return self._is_monotonic_decreasing
return bool(self.ordered) and self.as_numerical.is_monotonic_decreasing

def as_categorical_column(
self, dtype: Dtype, **kwargs
Expand Down Expand Up @@ -1472,6 +1463,49 @@ def view(self, dtype: Dtype) -> ColumnBase:
"Categorical column views are not currently supported"
)

@staticmethod
def _concat(objs: MutableSequence[CategoricalColumn]) -> CategoricalColumn:
# TODO: This function currently assumes it is being called from
# column._concat_columns, at least to the extent that all the
# preprocessing in that function has already been done. That should be
# improved as the concatenation API is solidified.

# Find the first non-null column:
head = next((obj for obj in objs if obj.valid_count), objs[0])

# Combine and de-dupe the categories
cats = (
cudf.concat([o.cat().categories for o in objs])
.drop_duplicates()
._column
)
objs = [
o.cat()._set_categories(o.cat().categories, cats, is_unique=True)
for o in objs
]
codes = [o.codes for o in objs]

newsize = sum(map(len, codes))
if newsize > libcudf.MAX_COLUMN_SIZE:
raise MemoryError(
f"Result of concat cannot have "
f"size > {libcudf.MAX_COLUMN_SIZE_STR}"
)
elif newsize == 0:
codes_col = column.column_empty(0, head.codes.dtype, masked=True)
else:
# Filter out inputs that have 0 length, then concatenate.
codes = [o for o in codes if len(o)]
codes_col = libcudf.concat.concat_columns(objs)

return column.build_categorical_column(
categories=column.as_column(cats),
codes=column.as_column(codes_col.base_data, dtype=codes_col.dtype),
mask=codes_col.base_mask,
size=codes_col.size,
offset=codes_col.offset,
)


def _create_empty_categorical_column(
categorical_column: CategoricalColumn, dtype: "CategoricalDtype"
Expand Down
200 changes: 72 additions & 128 deletions python/cudf/cudf/core/column/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
import builtins
import pickle
import warnings
from collections.abc import MutableSequence
from types import SimpleNamespace
from typing import (
Any,
Callable,
Dict,
List,
MutableSequence,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -189,114 +189,6 @@ def __sizeof__(self) -> int:
n += bitmask_allocation_size_bytes(self.size)
return n

def cat(
self, parent=None
) -> "cudf.core.column.categorical.CategoricalAccessor":
raise NotImplementedError()

def str(self, parent=None) -> "cudf.core.column.string.StringMethods":
raise NotImplementedError()

@classmethod
def _concat(
cls, objs: "MutableSequence[ColumnBase]", dtype: Dtype = None
) -> ColumnBase:
if len(objs) == 0:
dtype = pd.api.types.pandas_dtype(dtype)
if is_categorical_dtype(dtype):
dtype = CategoricalDtype()
return column_empty(0, dtype=dtype, masked=True)

# If all columns are `NumericalColumn` with different dtypes,
# we cast them to a common dtype.
# Notice, we can always cast pure null columns
not_null_cols = list(filter(lambda o: o.valid_count > 0, objs))
if len(not_null_cols) > 0 and (
len(
[
o
for o in not_null_cols
if not is_numerical_dtype(o.dtype)
or np.issubdtype(o.dtype, np.datetime64)
]
)
== 0
):
col_dtypes = [o.dtype for o in not_null_cols]
# Use NumPy to find a common dtype
common_dtype = np.find_common_type(col_dtypes, [])
# Cast all columns to the common dtype
for i in range(len(objs)):
objs[i] = objs[i].astype(common_dtype)

# Find the first non-null column:
head = objs[0]
for i, obj in enumerate(objs):
if obj.valid_count > 0:
head = obj
break

for i, obj in enumerate(objs):
# Check that all columns are the same type:
if not pd.api.types.is_dtype_equal(obj.dtype, head.dtype):
# if all null, cast to appropriate dtype
if obj.valid_count == 0:
objs[i] = column_empty_like(
head, dtype=head.dtype, masked=True, newsize=len(obj)
)
else:
raise ValueError("All columns must be the same type")

cats = None
is_categorical = all(is_categorical_dtype(o.dtype) for o in objs)

# Combine CategoricalColumn categories
if is_categorical:
# Combine and de-dupe the categories
cats = (
cudf.concat([o.cat().categories for o in objs])
.to_series()
.drop_duplicates(ignore_index=True)
._column
)
objs = [
o.cat()._set_categories(
o.cat().categories, cats, is_unique=True
)
for o in objs
]
# Map `objs` into a list of the codes until we port Categorical to
# use the libcudf++ Category data type.
objs = [o.cat().codes._column for o in objs]
head = head.cat().codes._column

newsize = sum(map(len, objs))
if newsize > libcudf.MAX_COLUMN_SIZE:
raise MemoryError(
f"Result of concat cannot have "
f"size > {libcudf.MAX_COLUMN_SIZE_STR}"
)

# Filter out inputs that have 0 length
objs = [o for o in objs if len(o) > 0]

# Perform the actual concatenation
if newsize > 0:
col = libcudf.concat.concat_columns(objs)
else:
col = column_empty(0, head.dtype, masked=True)

if is_categorical:
col = build_categorical_column(
categories=as_column(cats),
codes=as_column(col.base_data, dtype=col.dtype),
mask=col.base_mask,
size=col.size,
offset=col.offset,
)

return col

def dropna(self, drop_nan: bool = False) -> ColumnBase:
if drop_nan:
col = self.nans_to_nulls()
Expand Down Expand Up @@ -796,7 +688,7 @@ def find_last_value(self, value: ScalarLike, closest: bool = False) -> int:
return indices[-1]

def append(self, other: ColumnBase) -> ColumnBase:
return self.__class__._concat([self, as_column(other)])
return _concat_columns([self, as_column(other)])

def quantile(
self,
Expand Down Expand Up @@ -934,25 +826,15 @@ def is_monotonic(self) -> bool:

@property
def is_monotonic_increasing(self) -> bool:
if not hasattr(self, "_is_monotonic_increasing"):
if self.has_nulls:
self._is_monotonic_increasing = False
else:
self._is_monotonic_increasing = self.as_frame()._is_sorted(
ascending=None, null_position=None
)
return self._is_monotonic_increasing
return not self.has_nulls and self.as_frame()._is_sorted(
ascending=None, null_position=None
)
vyasr marked this conversation as resolved.
Show resolved Hide resolved

@property
def is_monotonic_decreasing(self) -> bool:
if not hasattr(self, "_is_monotonic_decreasing"):
if self.has_nulls:
self._is_monotonic_decreasing = False
else:
self._is_monotonic_decreasing = self.as_frame()._is_sorted(
ascending=[False], null_position=None
)
return self._is_monotonic_decreasing
return not self.has_nulls and self.as_frame()._is_sorted(
ascending=[False], null_position=None
)
vyasr marked this conversation as resolved.
Show resolved Hide resolved

def get_slice_bound(
self, label: ScalarLike, side: builtins.str, kind: builtins.str
Expand Down Expand Up @@ -1211,7 +1093,7 @@ def unique(self) -> ColumnBase:
)

def serialize(self) -> Tuple[dict, list]:
header = {} # type: Dict[Any, Any]
header: Dict[Any, Any] = {}
frames = []
header["type-serialized"] = pickle.dumps(type(self))
header["dtype"] = self.dtype.str
Expand Down Expand Up @@ -2226,7 +2108,7 @@ def serialize_columns(columns) -> Tuple[List[dict], List]:
frames : list
list of frames
"""
headers = [] # type List[Dict[Any, Any], ...]
headers: List[Dict[Any, Any]] = []
frames = []

if len(columns) > 0:
Expand Down Expand Up @@ -2346,3 +2228,65 @@ def full(size: int, fill_value: ScalarLike, dtype: Dtype = None) -> ColumnBase:
dtype: int8
"""
return ColumnBase.from_scalar(cudf.Scalar(fill_value, dtype), size)


def _concat_columns(objs: "MutableSequence[ColumnBase]") -> ColumnBase:
"""Concatenate a sequence of columns."""
if len(objs) == 0:
dtype = pd.api.types.pandas_dtype(None)
return column_empty(0, dtype=dtype, masked=True)

# If all columns are `NumericalColumn` with different dtypes,
# we cast them to a common dtype.
# Notice, we can always cast pure null columns
not_null_col_dtypes = [o.dtype for o in objs if o.valid_count]
if len(not_null_col_dtypes) and all(
is_numerical_dtype(dtyp) and np.issubdtype(dtyp, np.datetime64)
for dtyp in not_null_col_dtypes
):
# Use NumPy to find a common dtype
common_dtype = np.find_common_type(not_null_col_dtypes, [])
vyasr marked this conversation as resolved.
Show resolved Hide resolved
vyasr marked this conversation as resolved.
Show resolved Hide resolved
# Cast all columns to the common dtype
objs = [obj.astype(common_dtype) for obj in objs]

# Find the first non-null column:
head = next((obj for obj in objs if obj.valid_count), objs[0])

for i, obj in enumerate(objs):
# Check that all columns are the same type:
if not pd.api.types.is_dtype_equal(obj.dtype, head.dtype):
vyasr marked this conversation as resolved.
Show resolved Hide resolved
# if all null, cast to appropriate dtype
if obj.valid_count == 0:
objs[i] = column_empty_like(
head, dtype=head.dtype, masked=True, newsize=len(obj)
)
else:
raise ValueError("All columns must be the same type")

# TODO: This logic should be generalized to a dispatch to
# ColumnBase._concat so that all subclasses can override necessary
# behavior. However, at the moment it's not clear what that API should look
# like, so CategoricalColumn simply implements a minimal working API.
if all(is_categorical_dtype(o.dtype) for o in objs):
return cudf.core.column.categorical.CategoricalColumn._concat(
cast(
MutableSequence[
cudf.core.column.categorical.CategoricalColumn
],
objs,
)
)

newsize = sum(map(len, objs))
if newsize > libcudf.MAX_COLUMN_SIZE:
raise MemoryError(
f"Result of concat cannot have "
f"size > {libcudf.MAX_COLUMN_SIZE_STR}"
)
elif newsize == 0:
col = column_empty(0, head.dtype, masked=True)
else:
# Filter out inputs that have 0 length, then concatenate.
objs = [o for o in objs if len(o)]
col = libcudf.concat.concat_columns(objs)
return col
22 changes: 5 additions & 17 deletions python/cudf/cudf/core/column/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

import numpy as np
import pandas as pd
from nvtx import annotate

import cudf
from cudf import _lib as libcudf
Expand Down Expand Up @@ -307,7 +306,7 @@ def binary_operator(
) -> ColumnBase:
if isinstance(rhs, cudf.DateOffset):
return rhs._datetime_binop(self, op, reflect=reflect)
lhs, rhs = self, rhs
lhs: Union[ScalarLike, ColumnBase] = self
if op in ("eq", "ne", "lt", "gt", "le", "ge", "NULL_EQUALS"):
out_dtype = np.dtype(np.bool_) # type: Dtype
elif op == "add" and pd.api.types.is_timedelta64_dtype(rhs.dtype):
Expand All @@ -332,7 +331,10 @@ def binary_operator(
f"Series of dtype {self.dtype} cannot perform "
f" the operation {op}"
)
return binop(lhs, rhs, op=op, out_dtype=out_dtype, reflect=reflect)

if reflect:
lhs, rhs = rhs, lhs
return libcudf.binaryop.binaryop(lhs, rhs, op, out_dtype)

def fillna(
self, fill_value: Any = None, method: str = None, dtype: Dtype = None
Expand Down Expand Up @@ -422,20 +424,6 @@ def _make_copy_with_na_as_null(self):
return out_col


@annotate("BINARY_OP", color="orange", domain="cudf_python")
def binop(
lhs: Union[ColumnBase, ScalarLike],
rhs: Union[ColumnBase, ScalarLike],
op: str,
out_dtype: Dtype,
reflect: bool,
) -> ColumnBase:
if reflect:
lhs, rhs = rhs, lhs
out = libcudf.binaryop.binaryop(lhs, rhs, op, out_dtype)
return out


def binop_offset(lhs, rhs, op):
if rhs._is_no_op:
return lhs
Expand Down
Loading