Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Frame scans #9021

Merged
merged 13 commits into from
Aug 17, 2021
162 changes: 12 additions & 150 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5851,7 +5851,7 @@ def _from_arrays(cls, data, index=None, columns=None, nan_as_null=False):
)

if data.ndim == 2:
num_cols = len(data[0])
num_cols = data.shape[1]
else:
# Since we validate ndim to be either 1 or 2 above,
# this case can be assumed to be ndim == 1.
Expand Down Expand Up @@ -6225,7 +6225,7 @@ def _prepare_for_rowwise_op(self, method, skipna):
col.nullable for col in self._columns
):
msg = (
f"Row-wise operations to calculate '{method}' is not "
f"Row-wise operations to calculate '{method}' do not "
f"currently support columns with null values. "
f"Consider removing them with .dropna() "
f"or using .fillna()."
Expand Down Expand Up @@ -6340,154 +6340,15 @@ def _reduce(
elif axis == 1:
return self._apply_support_method_axis_1(op, **kwargs)

def cummin(self, axis=None, skipna=True, *args, **kwargs):
"""
Return cumulative minimum of the DataFrame.

Parameters
----------

skipna: bool, default True
Exclude NA/null values. If an entire row/column is NA,
the result will be NA.

Returns
-------
DataFrame

Notes
-----
Parameters currently not supported is `axis`

Examples
--------
>>> import cudf
>>> df = cudf.DataFrame({'a': [1, 2, 3, 4], 'b': [7, 8, 9, 10]})
>>> df.cummin()
a b
0 1 7
1 1 7
2 1 7
3 1 7
"""
if axis not in (0, "index", None):
raise NotImplementedError("Only axis=0 is currently supported.")

return self._apply_support_method(
"cummin", axis=axis, skipna=skipna, *args, **kwargs
)

def cummax(self, axis=None, skipna=True, *args, **kwargs):
"""
Return cumulative maximum of the DataFrame.

Parameters
----------

skipna: bool, default True
Exclude NA/null values. If an entire row/column is NA,
the result will be NA.

Returns
-------
DataFrame

Notes
-----
Parameters currently not supported is `axis`

Examples
--------
>>> import cudf
>>> df = cudf.DataFrame({'a': [1, 2, 3, 4], 'b': [7, 8, 9, 10]})
>>> df.cummax()
a b
0 1 7
1 2 8
2 3 9
3 4 10
"""
if axis not in (0, "index", None):
raise NotImplementedError("Only axis=0 is currently supported.")

return self._apply_support_method(
"cummax", axis=axis, skipna=skipna, *args, **kwargs
)

def cumsum(self, axis=None, skipna=True, *args, **kwargs):
"""
Return cumulative sum of the DataFrame.

Parameters
----------

skipna: bool, default True
Exclude NA/null values. If an entire row/column is NA,
the result will be NA.


Returns
-------
DataFrame

Notes
-----
Parameters currently not supported is `axis`

Examples
--------
>>> import cudf
>>> df = cudf.DataFrame({'a': [1, 2, 3, 4], 'b': [7, 8, 9, 10]})
>>> s.cumsum()
a b
0 1 7
1 3 15
2 6 24
3 10 34
"""
if axis not in (0, "index", None):
raise NotImplementedError("Only axis=0 is currently supported.")

return self._apply_support_method(
"cumsum", axis=axis, skipna=skipna, *args, **kwargs
)

def cumprod(self, axis=None, skipna=True, *args, **kwargs):
"""
Return cumulative product of the DataFrame.

Parameters
----------

skipna: bool, default True
Exclude NA/null values. If an entire row/column is NA,
the result will be NA.

Returns
-------
DataFrame

Notes
-----
Parameters currently not supported is `axis`

Examples
--------
>>> import cudf
>>> df = cudf.DataFrame({'a': [1, 2, 3, 4], 'b': [7, 8, 9, 10]})
>>> s.cumprod()
a b
0 1 7
1 2 56
2 6 504
3 24 5040
"""
if axis not in (0, "index", None):
raise NotImplementedError("Only axis=0 is currently supported.")
def _scan(
self, op, axis=None, *args, **kwargs,
):
axis = self._get_axis_from_axis_arg(axis)

return self._apply_support_method(
"cumprod", axis=axis, skipna=skipna, *args, **kwargs
)
if axis == 0:
return super()._scan(op, axis=axis, *args, **kwargs)
elif axis == 1:
return self._apply_support_method_axis_1(f"cum{op}", **kwargs)

def mode(self, axis=0, numeric_only=False, dropna=True):
"""
Expand Down Expand Up @@ -6715,13 +6576,14 @@ def _apply_support_method_axis_0(self, method, *args, **kwargs):
def _apply_support_method_axis_1(self, method, *args, **kwargs):
# for dask metadata compatibility
skipna = kwargs.pop("skipna", None)
skipna = True if skipna is None else skipna
if method not in _cupy_nan_methods_map and skipna not in (
None,
True,
1,
):
raise NotImplementedError(
f"Row-wise operation to calculate '{method}'"
f"Row-wise operations to calculate '{method}'"
f" currently do not support `skipna=False`."
)

Expand Down
175 changes: 167 additions & 8 deletions python/cudf/cudf/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -3681,14 +3681,7 @@ def _get_axis_from_axis_arg(cls, axis):
try:
return cls._SUPPORT_AXIS_LOOKUP[axis]
except KeyError:
valid_axes = ", ".join(
(
ax
for ax in cls._SUPPORT_AXIS_LOOKUP.keys()
if ax is not None
)
)
raise ValueError(f"Invalid axis, must be one of {valid_axes}.")
raise ValueError(f"No axis named {axis} for object type {cls}")

def _reduce(self, *args, **kwargs):
raise NotImplementedError(
Expand Down Expand Up @@ -4199,6 +4192,166 @@ def median(
**kwargs,
)

# Scans
def _scan(self, op, axis=None, skipna=True, cast_to_int=False):
skipna = True if skipna is None else skipna

results = {}
for name, col in self._data.items():
if skipna:
result_col = self._data[name].nans_to_nulls()
else:
result_col = self._data[name].copy()
if result_col.has_nulls:
# Workaround as find_first_value doesn't seem to work
# incase of bools.
first_index = int(
result_col.isnull().astype("int8").find_first_value(1)
)
result_col[first_index:] = None

if (
cast_to_int
and not is_decimal_dtype(result_col.dtype)
and (
np.issubdtype(result_col.dtype, np.integer)
or np.issubdtype(result_col.dtype, np.bool_)
)
):
# For reductions that accumulate a value (e.g. sum, not max)
# pandas returns an int64 dtype for all int or bool dtypes.
result_col = result_col.astype(np.int64)
results[name] = result_col._apply_scan_op(op)
# TODO: This will work for Index because it's passing self._index
# (which is None), but eventually we may want to remove that parameter
# for Index._from_data and simplify.
return self._from_data(results, index=self._index)

def cummin(self, axis=None, skipna=True, *args, **kwargs):
"""
Return cumulative minimum of the Series or DataFrame.

Parameters
----------

axis: {index (0), columns(1)}
Axis for the function to be applied on.
skipna: bool, default True
Exclude NA/null values. If an entire row/column is NA,
the result will be NA.

Returns
-------
Series or DataFrame

Examples
--------
>>> import cudf
>>> df = cudf.DataFrame({'a': [1, 2, 3, 4], 'b': [7, 8, 9, 10]})
>>> df.cummin()
a b
0 1 7
1 1 7
2 1 7
3 1 7
"""
return self._scan("min", axis=axis, skipna=skipna, *args, **kwargs)

def cummax(self, axis=None, skipna=True, *args, **kwargs):
"""
Return cumulative maximum of the Series or DataFrame.

Parameters
----------

axis: {index (0), columns(1)}
Axis for the function to be applied on.
skipna: bool, default True
Exclude NA/null values. If an entire row/column is NA,
the result will be NA.

Returns
-------
Series or DataFrame

Examples
--------
>>> import cudf
>>> df = cudf.DataFrame({'a': [1, 2, 3, 4], 'b': [7, 8, 9, 10]})
>>> df.cummax()
a b
0 1 7
1 2 8
2 3 9
3 4 10
"""
return self._scan("max", axis=axis, skipna=skipna, *args, **kwargs)

def cumsum(self, axis=None, skipna=True, *args, **kwargs):
"""
Return cumulative sum of the Series or DataFrame.

Parameters
----------

axis: {index (0), columns(1)}
Axis for the function to be applied on.
skipna: bool, default True
Exclude NA/null values. If an entire row/column is NA,
the result will be NA.


Returns
-------
Series or DataFrame

Examples
--------
>>> import cudf
>>> df = cudf.DataFrame({'a': [1, 2, 3, 4], 'b': [7, 8, 9, 10]})
>>> s.cumsum()
a b
0 1 7
1 3 15
2 6 24
3 10 34
"""
return self._scan(
"sum", axis=axis, skipna=skipna, cast_to_int=True, *args, **kwargs
)

def cumprod(self, axis=None, skipna=True, *args, **kwargs):
"""
Return cumulative product of the Series or DataFrame.

Parameters
----------

axis: {index (0), columns(1)}
Axis for the function to be applied on.
skipna: bool, default True
Exclude NA/null values. If an entire row/column is NA,
the result will be NA.

Returns
-------
Series or DataFrame

Examples
--------
>>> import cudf
>>> df = cudf.DataFrame({'a': [1, 2, 3, 4], 'b': [7, 8, 9, 10]})
>>> s.cumprod()
a b
0 1 7
1 2 56
2 6 504
3 24 5040
"""
return self._scan(
"prod", axis=axis, skipna=skipna, cast_to_int=True, *args, **kwargs
)


class SingleColumnFrame(Frame):
"""A one-dimensional frame.
Expand Down Expand Up @@ -4228,6 +4381,12 @@ def _reduce(
)
return getattr(self._column, op)(**kwargs)

def _scan(self, op, axis=None, *args, **kwargs):
if axis not in (None, 0):
raise NotImplementedError("axis parameter is not implemented yet")

return super()._scan(op, axis=axis, *args, **kwargs)

@classmethod
def _from_data(
cls,
Expand Down
Loading