Skip to content

Commit

Permalink
Refactor array function (#10364)
Browse files Browse the repository at this point in the history
This PR cleans up the implementation of `__array_function__` for `Series` and `DataFrame` to bring them further into alignment. It also inlines a number of functions defined in `utils/utils.py` that were previously used only in `Series.__array_ufunc__`, building on the improvements in #10217, #10287, and #10346 to clear out methods related to the old `__array_ufunc__` dispatch that are now only used by this `__array_function__` implementation. Inlining these methods also allows significant simplification since they were handling cases that are no longer relevant or possible. Unlike those previous PRs, this one does not actually enable any new features. Although it should marginally accelerate array functions by simplifying the dispatch logic, the fact that this API makes few promises about the nature of the function being applied and our desire to have it "just work" as much as possible means that we must simply adopt an EAFP approach and return `NotImplemented` if any part of the process fails.

Authors:
  - Vyas Ramasubramani (https://github.com/vyasr)

Approvers:
  - Bradley Dice (https://github.com/bdice)

URL: #10364
  • Loading branch information
vyasr authored Mar 2, 2022
1 parent 7120694 commit b4d262d
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 185 deletions.
58 changes: 24 additions & 34 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1337,42 +1337,32 @@ def memory_usage(self, index=True, deep=False):

@annotate("DATAFRAME_ARRAY_FUNCTION", color="blue", domain="cudf_python")
def __array_function__(self, func, types, args, kwargs):

cudf_df_module = DataFrame
cudf_series_module = Series

for submodule in func.__module__.split(".")[1:]:
# point cudf to the correct submodule
if hasattr(cudf_df_module, submodule):
cudf_df_module = getattr(cudf_df_module, submodule)
else:
return NotImplemented

fname = func.__name__

handled_types = [cudf_df_module, cudf_series_module]

for t in types:
if t not in handled_types:
return NotImplemented

if hasattr(cudf_df_module, fname):
cudf_func = getattr(cudf_df_module, fname)
# Handle case if cudf_func is same as numpy function
if cudf_func is func:
return NotImplemented
# numpy returns an array from the dot product of two dataframes
elif (
func is np.dot
and isinstance(args[0], (DataFrame, pd.DataFrame))
and isinstance(args[1], (DataFrame, pd.DataFrame))
):
return cudf_func(*args, **kwargs).values
else:
return cudf_func(*args, **kwargs)
else:
if "out" in kwargs or not all(
issubclass(t, (Series, DataFrame)) for t in types
):
return NotImplemented

try:
if cudf_func := getattr(self.__class__, func.__name__, None):
out = cudf_func(*args, **kwargs)
# The dot product of two DataFrames returns an array in pandas.
if (
func is np.dot
and isinstance(args[0], (DataFrame, pd.DataFrame))
and isinstance(args[1], (DataFrame, pd.DataFrame))
):
return out.values
return out
except Exception:
# The rare instance where a "silent" failure is preferable. Except
# in the (highly unlikely) case that some other library
# interoperates with cudf objects, the result will be that numpy
# raises a TypeError indicating that the operation is not
# implemented, which is much friendlier than an arbitrary internal
# cudf error.
pass
return NotImplemented

# The _get_numeric_data method is necessary for dask compatibility.
@annotate("DATAFRAME_GET_NUMERIC_DATA", color="blue", domain="cudf_python")
def _get_numeric_data(self):
Expand Down
72 changes: 52 additions & 20 deletions python/cudf/cudf/core/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,7 @@
is_mixed_with_object_dtype,
min_scalar_type,
)
from cudf.utils.utils import (
get_appropriate_dispatched_func,
get_relevant_submodule,
to_cudf_compatible_scalar,
)
from cudf.utils.utils import to_cudf_compatible_scalar


def _append_new_row_inplace(col: ColumnLike, value: ScalarLike):
Expand Down Expand Up @@ -960,23 +956,59 @@ def memory_usage(self, index=True, deep=False):
return sum(super().memory_usage(index, deep).values())

def __array_function__(self, func, types, args, kwargs):
handled_types = [cudf.Series]
for t in types:
if t not in handled_types:
if "out" in kwargs or not all(issubclass(t, Series) for t in types):
return NotImplemented

try:
# Apply a Series method if one exists.
if cudf_func := getattr(Series, func.__name__, None):
return cudf_func(*args, **kwargs)

# Assume that cupy subpackages match numpy and search the
# corresponding cupy submodule based on the func's __module__.
numpy_submodule = func.__module__.split(".")[1:]
cupy_func = cupy
for name in (*numpy_submodule, func.__name__):
cupy_func = getattr(cupy_func, name, None)

# Handle case if cupy does not implement the function or just
# aliases the numpy function.
if not cupy_func or cupy_func is func:
return NotImplemented

cudf_submodule = get_relevant_submodule(func, cudf)
cudf_ser_submodule = get_relevant_submodule(func, cudf.Series)
cupy_submodule = get_relevant_submodule(func, cupy)

return get_appropriate_dispatched_func(
cudf_submodule,
cudf_ser_submodule,
cupy_submodule,
func,
args,
kwargs,
)
# For now just fail on cases with mismatched indices. There is
# almost certainly no general solution for all array functions.
index = args[0].index
if not all(s.index.equals(index) for s in args):
return NotImplemented
out = cupy_func(*(s.values for s in args), **kwargs)

# Return (host) scalar values immediately.
if not isinstance(out, cupy.ndarray):
return out

# 0D array (scalar)
if out.ndim == 0:
return to_cudf_compatible_scalar(out)
# 1D array
elif (
# Only allow 1D arrays
((out.ndim == 1) or (out.ndim == 2 and out.shape[1] == 1))
# If we have an index, it must be the same length as the
# output for cupy dispatching to be well-defined.
and len(index) == len(out)
):
return Series(out, index=index)
except Exception:
# The rare instance where a "silent" failure is preferable. Except
# in the (highly unlikely) case that some other library
# interoperates with cudf objects, the result will be that numpy
# raises a TypeError indicating that the operation is not
# implemented, which is much friendlier than an arbitrary internal
# cudf error.
pass

return NotImplemented

def map(self, arg, na_action=None) -> "Series":
"""
Expand Down
131 changes: 0 additions & 131 deletions python/cudf/cudf/utils/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import functools
import os
import traceback
from collections.abc import Sequence
from typing import FrozenSet, Set, Union

import cupy as cp
Expand Down Expand Up @@ -317,136 +316,6 @@ def search_range(start, stop, x, step=1, side="left"):
return max(min(length, i), 0)


_UFUNC_ALIASES = {
"power": "pow",
"equal": "eq",
"not_equal": "ne",
"less": "lt",
"less_equal": "le",
"greater": "gt",
"greater_equal": "ge",
"absolute": "abs",
}
# For op(., cudf.Series) -> cudf.Series.__r{op}__
_REVERSED_NAMES = {
"lt": "__gt__",
"le": "__ge__",
"gt": "__lt__",
"ge": "__le__",
"eq": "__eq__",
"ne": "__ne__",
}


# todo: can probably be used to remove cudf/core/ops.py
def _get_cudf_series_ufunc(fname, args, kwargs, cudf_ser_submodule):
if isinstance(args[0], cudf.Series):
cudf_ser_func = getattr(cudf_ser_submodule, fname)
return cudf_ser_func(*args, **kwargs)
elif len(args) == 2 and isinstance(args[1], cudf.Series):
rev_name = _REVERSED_NAMES.get(fname, f"__r{fname}__")
cudf_ser_func = getattr(cudf_ser_submodule, rev_name)
return cudf_ser_func(args[1], args[0], **kwargs)
return NotImplemented


# Utils for using appropriate dispatch for array functions
def get_appropriate_dispatched_func(
cudf_submodule, cudf_ser_submodule, cupy_submodule, func, args, kwargs
):
if kwargs.get("out") is None:
fname = func.__name__
# Dispatch these functions to appropiate alias from the _UFUNC_ALIASES
is_ufunc = fname in _UFUNC_ALIASES
fname = _UFUNC_ALIASES.get(fname, fname)

if hasattr(cudf_submodule, fname):
cudf_func = getattr(cudf_submodule, fname)
return cudf_func(*args, **kwargs)

elif hasattr(cudf_ser_submodule, fname):
if is_ufunc:
return _get_cudf_series_ufunc(
fname, args, kwargs, cudf_ser_submodule
)
else:
cudf_ser_func = getattr(cudf_ser_submodule, fname)
return cudf_ser_func(*args, **kwargs)

elif hasattr(cupy_submodule, fname):
cupy_func = getattr(cupy_submodule, fname)
# Handle case if cupy implements it as a numpy function
# Unsure if needed
if cupy_func is func:
return NotImplemented

cupy_compatible_args, index = _get_cupy_compatible_args_index(args)
if cupy_compatible_args:
cupy_output = cupy_func(*cupy_compatible_args, **kwargs)
if isinstance(cupy_output, cp.ndarray):
return _cast_to_appropriate_cudf_type(cupy_output, index)
else:
return cupy_output

return NotImplemented


def _cast_to_appropriate_cudf_type(val, index=None):
# Handle scalar
if val.ndim == 0:
return to_cudf_compatible_scalar(val)
# 1D array
elif (val.ndim == 1) or (val.ndim == 2 and val.shape[1] == 1):
# if index is not None and is of a different length
# than the index, cupy dispatching behaviour is undefined
# so we don't implement it
if (index is None) or (len(index) == len(val)):
return cudf.Series(val, index=index)

return NotImplemented


def _get_cupy_compatible_args_index(args, ser_index=None):
"""
This function returns cupy compatible arguments and output index
if conversion is not possible it returns None
"""

casted_ls = []
for arg in args:
if isinstance(arg, cp.ndarray):
casted_ls.append(arg)
elif isinstance(arg, cudf.Series):
# check if indexes can be aligned
if (ser_index is None) or (ser_index.equals(arg.index)):
ser_index = arg.index
casted_ls.append(arg.values)
else:
# this throws a value-error if indexes are not aligned
# following pandas behavior for ufunc numpy dispatching
raise ValueError(
"Can only compare identically-labeled Series objects"
)
elif isinstance(arg, Sequence):
# we dont handle list of inputs for functions as
# these form inputs for functions like
# np.concatenate, vstack have ambiguity around index alignment
return None, ser_index
else:
casted_ls.append(arg)
return casted_ls, ser_index


def get_relevant_submodule(func, module):
# point to the correct submodule
for submodule in func.__module__.split(".")[1:]:
if hasattr(module, submodule):
module = getattr(module, submodule)
else:
return None
return module


def _categorical_scalar_broadcast_to(cat_scalar, size):
if isinstance(cat_scalar, (cudf.Series, pd.Series)):
cats = cat_scalar.cat.categories
Expand Down

0 comments on commit b4d262d

Please sign in to comment.