Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a mixin for scans #10358

Merged
merged 6 commits into from
Mar 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions python/cudf/cudf/core/column/numerical_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@
from cudf import _lib as libcudf
from cudf._typing import ScalarLike
from cudf.core.column import ColumnBase
from cudf.core.mixins import Scannable


class NumericalBaseColumn(ColumnBase):
class NumericalBaseColumn(ColumnBase, Scannable):
"""A column composed of numerical data.

This class encodes a standard interface for different types of columns
Expand All @@ -32,6 +33,13 @@ class NumericalBaseColumn(ColumnBase):
"std",
}

_VALID_SCANS = {
"cumsum",
"cumprod",
"cummin",
"cummax",
}

def _can_return_nan(self, skipna: bool = None) -> bool:
return not skipna and self.has_nulls()

Expand Down Expand Up @@ -174,7 +182,7 @@ def round(
"""Round the values in the Column to the given number of decimals."""
return libcudf.round.round(self, decimal_places=decimals, how=how)

def _apply_scan_op(self, op: str) -> ColumnBase:
return libcudf.reduce.scan(op, self, True)._with_type_metadata(
self.dtype
)
def _scan(self, op: str) -> ColumnBase:
return libcudf.reduce.scan(
op.replace("cum", ""), self, True
)._with_type_metadata(self.dtype)
2 changes: 1 addition & 1 deletion python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -5433,7 +5433,7 @@ def _scan(
if axis == 0:
return super()._scan(op, axis=axis, *args, **kwargs)
elif axis == 1:
return self._apply_cupy_method_axis_1(f"cum{op}", **kwargs)
return self._apply_cupy_method_axis_1(op, **kwargs)

@annotate("DATAFRAME_MODE", color="green", domain="cudf_python")
def mode(self, axis=0, numeric_only=False, dropna=True):
Expand Down
240 changes: 58 additions & 182 deletions python/cudf/cudf/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
)
from cudf.core.column_accessor import ColumnAccessor
from cudf.core.join import Merge, MergeSemi
from cudf.core.mixins import BinaryOperand
from cudf.core.mixins import BinaryOperand, Scannable
from cudf.core.window import Rolling
from cudf.utils import ioutils
from cudf.utils.docutils import copy_docstring
Expand Down Expand Up @@ -98,7 +98,7 @@
}


class Frame(BinaryOperand):
class Frame(BinaryOperand, Scannable):
"""A collection of Column objects with an optional index.

Parameters
Expand All @@ -117,6 +117,21 @@ class Frame(BinaryOperand):

_VALID_BINARY_OPERATIONS = BinaryOperand._SUPPORTED_BINARY_OPERATIONS

_VALID_SCANS = {
"cumsum",
"cumprod",
"cummin",
"cummax",
}

# Necessary because the function names don't directly map to the docs.
_SCAN_DOCSTRINGS = {
"cumsum": {"op_name": "cumulative sum"},
"cumprod": {"op_name": "cumulative product"},
"cummin": {"op_name": "cumulative min"},
"cummax": {"op_name": "cumulative max"},
}

def __init__(self, data=None, index=None):
if data is None:
data = {}
Expand Down Expand Up @@ -4566,142 +4581,14 @@ def median(

# Scans
@annotate("FRAME_SCAN", color="green", domain="cudf_python")
def _scan(self, op, axis=None, skipna=True, cast_to_int=False):
skipna = True if skipna is None else skipna

results = {}
for name, col in self._data.items():
if skipna:
try:
result_col = col.nans_to_nulls()
except AttributeError:
result_col = col
else:
if col.has_nulls(include_nan=True):
# Workaround as find_first_value doesn't seem to work
# incase of bools.
first_index = int(
col.isnull().astype("int8").find_first_value(1)
)
result_col = col.copy()
result_col[first_index:] = None
else:
result_col = col

if (
cast_to_int
and not is_decimal_dtype(result_col.dtype)
and (
np.issubdtype(result_col.dtype, np.integer)
or np.issubdtype(result_col.dtype, np.bool_)
)
):
# For reductions that accumulate a value (e.g. sum, not max)
# pandas returns an int64 dtype for all int or bool dtypes.
result_col = result_col.astype(np.int64)
results[name] = result_col._apply_scan_op(op)
# TODO: This will work for Index because it's passing self._index
# (which is None), but eventually we may want to remove that parameter
# for Index._from_data and simplify.
return self._from_data(results, index=self._index)

@annotate("FRAME_CUMMIN", color="green", domain="cudf_python")
def cummin(self, axis=None, skipna=True, *args, **kwargs):
"""
Return cumulative minimum of the Series or DataFrame.

Parameters
----------

axis: {index (0), columns(1)}
Axis for the function to be applied on.
skipna: bool, default True
Exclude NA/null values. If an entire row/column is NA,
the result will be NA.

Returns
-------
Series or DataFrame

Examples
--------
**Series**

>>> import cudf
>>> ser = cudf.Series([1, 5, 2, 4, 3])
>>> ser.cummin()
0 1
1 1
2 1
3 1
4 1

**DataFrame**

>>> import cudf
>>> df = cudf.DataFrame({'a': [1, 2, 3, 4], 'b': [7, 8, 9, 10]})
>>> df.cummin()
a b
0 1 7
1 1 7
2 1 7
3 1 7
"""
return self._scan("min", axis=axis, skipna=skipna, *args, **kwargs)

@annotate("FRAME_CUMMAX", color="green", domain="cudf_python")
def cummax(self, axis=None, skipna=True, *args, **kwargs):
"""
Return cumulative maximum of the Series or DataFrame.

Parameters
----------

axis: {index (0), columns(1)}
Axis for the function to be applied on.
skipna: bool, default True
Exclude NA/null values. If an entire row/column is NA,
the result will be NA.

Returns
-------
Series or DataFrame

Examples
--------
**Series**

>>> import cudf
>>> ser = cudf.Series([1, 5, 2, 4, 3])
>>> ser.cummax()
0 1
1 5
2 5
3 5
4 5

**DataFrame**

>>> import cudf
>>> df = cudf.DataFrame({'a': [1, 2, 3, 4], 'b': [7, 8, 9, 10]})
>>> df.cummax()
a b
0 1 7
1 2 8
2 3 9
3 4 10
"""
return self._scan("max", axis=axis, skipna=skipna, *args, **kwargs)

@annotate("FRAME_CUMSUM", color="green", domain="cudf_python")
def cumsum(self, axis=None, skipna=True, *args, **kwargs):
def _scan(self, op, axis=None, skipna=True):
"""
Return cumulative sum of the Series or DataFrame.
Return {op_name} of the {cls}.

Parameters
----------

axis: {index (0), columns(1)}
axis: {{index (0), columns(1)}}
Axis for the function to be applied on.
skipna: bool, default True
Exclude NA/null values. If an entire row/column is NA,
Expand All @@ -4710,7 +4597,7 @@ def cumsum(self, axis=None, skipna=True, *args, **kwargs):

Returns
-------
Series or DataFrame
{cls}

Examples
--------
Expand All @@ -4728,63 +4615,52 @@ def cumsum(self, axis=None, skipna=True, *args, **kwargs):
**DataFrame**

>>> import cudf
>>> df = cudf.DataFrame({'a': [1, 2, 3, 4], 'b': [7, 8, 9, 10]})
>>> df = cudf.DataFrame({{'a': [1, 2, 3, 4], 'b': [7, 8, 9, 10]}})
>>> s.cumsum()
a b
0 1 7
1 3 15
2 6 24
3 10 34
"""
return self._scan(
"sum", axis=axis, skipna=skipna, cast_to_int=True, *args, **kwargs
)

@annotate("FRAME_CUMPROD", color="green", domain="cudf_python")
def cumprod(self, axis=None, skipna=True, *args, **kwargs):
"""
Return cumulative product of the Series or DataFrame.

Parameters
----------

axis: {index (0), columns(1)}
Axis for the function to be applied on.
skipna: bool, default True
Exclude NA/null values. If an entire row/column is NA,
the result will be NA.

Returns
-------
Series or DataFrame

Examples
--------
**Series**

>>> import cudf
>>> ser = cudf.Series([1, 5, 2, 4, 3])
>>> ser.cumprod()
0 1
1 5
2 10
3 40
4 120
cast_to_int = op in ("cumsum", "cumprod")
skipna = True if skipna is None else skipna

**DataFrame**
results = {}
for name, col in self._data.items():
if skipna:
try:
result_col = col.nans_to_nulls()
except AttributeError:
result_col = col
else:
if col.has_nulls(include_nan=True):
# Workaround as find_first_value doesn't seem to work
# incase of bools.
first_index = int(
col.isnull().astype("int8").find_first_value(1)
)
result_col = col.copy()
result_col[first_index:] = None
else:
result_col = col

>>> import cudf
>>> df = cudf.DataFrame({'a': [1, 2, 3, 4], 'b': [7, 8, 9, 10]})
>>> s.cumprod()
a b
0 1 7
1 2 56
2 6 504
3 24 5040
"""
return self._scan(
"prod", axis=axis, skipna=skipna, cast_to_int=True, *args, **kwargs
)
if (
cast_to_int
and not is_decimal_dtype(result_col.dtype)
and (
np.issubdtype(result_col.dtype, np.integer)
or np.issubdtype(result_col.dtype, np.bool_)
)
):
# For reductions that accumulate a value (e.g. sum, not max)
# pandas returns an int64 dtype for all int or bool dtypes.
result_col = result_col.astype(np.int64)
results[name] = getattr(result_col, op)()
# TODO: This will work for Index because it's passing self._index
# (which is None), but eventually we may want to remove that parameter
# for Index._from_data and simplify.
return self._from_data(results, index=self._index)

@annotate("FRAME_TO_JSON", color="green", domain="cudf_python")
@ioutils.doc_to_json()
Expand Down
Loading