Skip to content

Commit

Permalink
Grouping by frequency and resampling (#9178)
Browse files Browse the repository at this point in the history
Closes #6255, #8416 

This PR implements two related features:

1. Grouping by a frequency via the `freq=` argument to `cudf.Grouper`
2. and time-series resampling via the `.resample()` API

Either operation results in ` _Resampler` object that represents the data resampled into "bins" of a particular frequency. The following operations are supported on resampled data:

1. Aggregations such as `min()` and `max()`, performed bin-wise
2. `ffill()` and `bfill()` methods: forward and backward filling in the case of upsampling data
3. `asfreq()`: returns the resampled data as a Series or DataFrame()

These are all best understood by example:

First, we create a time series with 1 minute intervals:

```python
>>> index = cudf.date_range(start="2001-01-01", periods=10, freq="1T")
>>> sr = cudf.Series(range(10), index=index)
>>> sr
2001-01-01 00:00:00    0
2001-01-01 00:01:00    1
2001-01-01 00:02:00    2
2001-01-01 00:03:00    3
2001-01-01 00:04:00    4
2001-01-01 00:05:00    5
2001-01-01 00:06:00    6
2001-01-01 00:07:00    7
2001-01-01 00:08:00    8
2001-01-01 00:09:00    9
dtype: int64
````
Downsampling to 3 minute intervals, followed by a "sum" aggregation:

```python
>>> sr.resample("3T").sum()  # equivalently, sr.groupby(cudf.Grouper(freq="3T")).sum()
2001-01-01 00:00:00     3
2001-01-01 00:03:00    12
2001-01-01 00:06:00    21
2001-01-01 00:09:00     9
dtype: int64
````

Upsampling to 30 second intervals:

```python
>>> sr.resample("30s").asfreq()
2001-01-01 00:00:00    0.0
2001-01-01 00:00:30    NaN
2001-01-01 00:01:00    1.0
2001-01-01 00:01:30    NaN
2001-01-01 00:02:00    2.0
2001-01-01 00:02:30    NaN
2001-01-01 00:03:00    3.0
2001-01-01 00:03:30    NaN
2001-01-01 00:04:00    4.0
2001-01-01 00:04:30    NaN
2001-01-01 00:05:00    5.0
2001-01-01 00:05:30    NaN
2001-01-01 00:06:00    6.0
2001-01-01 00:06:30    NaN
2001-01-01 00:07:00    7.0
2001-01-01 00:07:30    NaN
2001-01-01 00:08:00    8.0
2001-01-01 00:08:30    NaN
2001-01-01 00:09:00    9.0
Freq: 30S, dtype: float64
```

Upsampling to 30 second intervals, followed by a forward fill:


```python
>>> sr.resample("30s").ffill()
2001-01-01 00:00:00    0
2001-01-01 00:00:30    0
2001-01-01 00:01:00    1
2001-01-01 00:01:30    1
2001-01-01 00:02:00    2
2001-01-01 00:02:30    2
2001-01-01 00:03:00    3
2001-01-01 00:03:30    3
2001-01-01 00:04:00    4
2001-01-01 00:04:30    4
2001-01-01 00:05:00    5
2001-01-01 00:05:30    5
2001-01-01 00:06:00    6
2001-01-01 00:06:30    6
2001-01-01 00:07:00    7
2001-01-01 00:07:30    7
2001-01-01 00:08:00    8
2001-01-01 00:08:30    8
2001-01-01 00:09:00    9
Freq: 30S, dtype: int64
```

Authors:
  - Ashwin Srinath (https://github.com/shwina)
  - Michael Wang (https://github.com/isVoid)

Approvers:
  - https://github.com/brandon-b-miller
  - Vyas Ramasubramani (https://github.com/vyasr)
  - Benjamin Zaitlen (https://github.com/quasiben)

URL: #9178
  • Loading branch information
shwina authored Nov 13, 2021
1 parent 47af69a commit 49d1cc2
Show file tree
Hide file tree
Showing 18 changed files with 1,068 additions and 132 deletions.
5 changes: 3 additions & 2 deletions python/cudf/cudf/_lib/copying.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -147,12 +147,13 @@ def gather(
source_table,
Column gather_map,
bool keep_index=True,
bool nullify=False
bool nullify=False,
bool check_bounds=True
):
if not pd.api.types.is_integer_dtype(gather_map.dtype):
raise ValueError("Gather map is not integer dtype.")

if len(gather_map) > 0 and not nullify:
if check_bounds and len(gather_map) > 0 and not nullify:
gm_min, gm_max = minmax(gather_map)
if gm_min < -len(source_table) or gm_max >= len(source_table):
raise IndexError(f"Gather map index with min {gm_min},"
Expand Down
6 changes: 6 additions & 0 deletions python/cudf/cudf/core/column/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,12 @@ def normalize_binop_value(
) -> Union[ColumnBase, ScalarLike]:
raise NotImplementedError

def _minmax(self, skipna: bool = None):
result_col = self._process_for_reduction(skipna=skipna)
if isinstance(result_col, ColumnBase):
return libcudf.reduce.minmax(result_col)
return result_col

def min(self, skipna: bool = None, dtype: Dtype = None):
result_col = self._process_for_reduction(skipna=skipna)
if isinstance(result_col, ColumnBase):
Expand Down
29 changes: 29 additions & 0 deletions python/cudf/cudf/core/column_accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,24 @@ def rename_column(x):

return self.__class__(ca)

def droplevel(self, level):
# drop the nth level
if level < 0:
level += self.nlevels

self._data = {
_remove_key_level(key, level): value
for key, value in self._data.items()
}
self._level_names = (
self._level_names[:level] + self._level_names[level + 1 :]
)

if (
len(self._level_names) == 1
): # can't use nlevels, as it depends on multiindex
self.multiindex = False


def _compare_keys(target: Any, key: Any) -> bool:
"""
Expand All @@ -554,3 +572,14 @@ def _compare_keys(target: Any, key: Any) -> bool:
if k1 != k2:
return False
return True


def _remove_key_level(key: Any, level: int) -> Any:
"""
Remove a level from key. If detupleize is True, and if only a
single level remains, convert the tuple to a scalar.
"""
result = key[:level] + key[level + 1 :]
if len(result) == 1:
return result[0]
return result
29 changes: 15 additions & 14 deletions python/cudf/cudf/core/cut.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def cut(
"Bin labels must either be False, None or passed in as a "
"list-like argument"
)
elif ordered and labels is not None:
if ordered and labels is not None:
if len(set(labels)) != len(labels):
raise ValueError(
"labels must be unique if ordered=True;"
Expand Down Expand Up @@ -207,22 +207,23 @@ def cut(
)
if ordered and len(set(labels)) != len(labels):
raise ValueError(
"labels must be unique if ordered=True; pass ordered=False for"
"labels must be unique if ordered=True; "
"pass ordered=False for"
"duplicate labels"
)

if len(labels) != len(bins) - 1:
raise ValueError(
"Bin labels must be one fewer than the number of bin edges"
)
if not ordered and len(set(labels)) != len(labels):
interval_labels = cudf.CategoricalIndex(
labels, categories=None, ordered=False
)
else:
if len(labels) != len(bins) - 1:
raise ValueError(
"Bin labels must be one fewer than the number of bin edges"
)
if not ordered and len(set(labels)) != len(labels):
interval_labels = cudf.CategoricalIndex(
labels, categories=None, ordered=False
)
else:
interval_labels = (
labels if len(set(labels)) == len(labels) else None
)
interval_labels = (
labels if len(set(labels)) == len(labels) else None
)

if isinstance(bins, pd.IntervalIndex):
# get the left and right edges of the bins as columns
Expand Down
20 changes: 12 additions & 8 deletions python/cudf/cudf/core/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
_get_label_range_or_mask,
_indices_from_labels,
)
from cudf.core.resample import DataFrameResampler
from cudf.core.series import Series
from cudf.utils import applyutils, docutils, ioutils, queryutils, utils
from cudf.utils.docutils import copy_docstring
Expand Down Expand Up @@ -3811,13 +3812,17 @@ def groupby(
"groupby() requires either by or level to be specified."
)

return DataFrameGroupBy(
self,
by=by,
level=level,
as_index=as_index,
dropna=dropna,
sort=sort,
return (
DataFrameResampler(self, by=by)
if isinstance(by, cudf.Grouper) and by.freq
else DataFrameGroupBy(
self,
by=by,
level=level,
as_index=as_index,
dropna=dropna,
sort=sort,
)
)

def query(self, expr, local_dict=None):
Expand Down Expand Up @@ -4721,7 +4726,6 @@ def to_pandas(self, nullable=False, **kwargs):
b object
dtype: object
"""

out_data = {}
out_index = self.index.to_pandas()

Expand Down
11 changes: 10 additions & 1 deletion python/cudf/cudf/core/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,9 @@ def _get_columns_by_index(self, indices):
data, columns=data.to_pandas_index(), index=self.index
)

def _gather(self, gather_map, keep_index=True, nullify=False):
def _gather(
self, gather_map, keep_index=True, nullify=False, check_bounds=True
):
if not is_integer_dtype(gather_map.dtype):
gather_map = gather_map.astype("int32")
result = self.__class__._from_data(
Expand All @@ -526,6 +528,7 @@ def _gather(self, gather_map, keep_index=True, nullify=False):
as_column(gather_map),
keep_index=keep_index,
nullify=nullify,
check_bounds=check_bounds,
)
)

Expand Down Expand Up @@ -1354,6 +1357,12 @@ def fillna(

return self._mimic_inplace(result, inplace=inplace)

def ffill(self):
return self.fillna(method="ffill")

def bfill(self):
return self.fillna(method="bfill")

def _drop_na_rows(
self, how="any", subset=None, thresh=None, drop_nan=False
):
Expand Down
61 changes: 29 additions & 32 deletions python/cudf/cudf/core/groupby/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import pickle
import warnings

import numpy as np
import pandas as pd
from nvtx import annotate

Expand Down Expand Up @@ -31,8 +32,6 @@ def _quantile_75(x):
return x.quantile(0.75)


# Note that all valid aggregation methods (e.g. GroupBy.min) are bound to the
# class after its definition (see below).
class GroupBy(Serializable):

_MAX_GROUPS_BEFORE_WARN = 100
Expand Down Expand Up @@ -240,22 +239,20 @@ def agg(self, func):
result = result.sort_index()

if not _is_multi_agg(func):
if result.columns.nlevels == 1:
if result._data.nlevels <= 1: # 0 or 1 levels
# make sure it's a flat index:
result.columns = result.columns.get_level_values(0)

if result.columns.nlevels > 1:
try:
# drop the last level
result.columns = result.columns.droplevel(-1)
except IndexError:
# Pandas raises an IndexError if we are left
# with an all-nan MultiIndex when dropping
# the last level
if result.shape[1] == 1:
result.columns = [None]
else:
raise
result._data.multiindex = False

if result._data.nlevels > 1:
result._data.droplevel(-1)

# if, after dropping the last level, the only
# remaining key is `NaN`, we need to convert to `None`
# for Pandas compat:
if result._data.names == (np.nan,):
result._data = result._data.rename_levels(
{np.nan: None}, level=0
)

if libgroupby._is_all_scan_aggregate(normalized_aggs):
# Scan aggregations return rows in original index order
Expand Down Expand Up @@ -1303,29 +1300,27 @@ def apply(self, func):
return result


# TODO: should we define this as a dataclass instead?
class Grouper(object):
def __init__(self, key=None, level=None):
def __init__(
self, key=None, level=None, freq=None, closed=None, label=None
):
if key is not None and level is not None:
raise ValueError("Grouper cannot specify both key and level")
if key is None and level is None:
if (key, level) == (None, None) and not freq:
raise ValueError("Grouper must specify either key or level")
self.key = key
self.level = level
self.freq = freq
self.closed = closed
self.label = label


class _Grouping(Serializable):
def __init__(self, obj, by=None, level=None):
self._obj = obj
self._key_columns = []
self.names = []
# For transform operations, we want to filter out only the value
# columns that will be used. When part of the key columns are composed
# from columns in `obj`, these columns are not included in the value
# columns. This only happens when `by` is specified with
# column labels or `Grouper` object with `key` param. To avoid that
# external objects overlaps in names to `obj`, these column
# names are recorded separately in this list.
self._key_column_names_from_obj = []

# Need to keep track of named key columns
# to support `as_index=False` correctly
Expand Down Expand Up @@ -1390,9 +1385,7 @@ def values(self):
"""
# If the key columns are in `obj`, filter them out
value_column_names = [
x
for x in self._obj._data.names
if x not in self._key_column_names_from_obj
x for x in self._obj._data.names if x not in self._named_columns
]
value_columns = self._obj._data.select_by_label(value_column_names)
return self._obj.__class__._from_data(value_columns)
Expand All @@ -1418,14 +1411,18 @@ def _handle_label(self, by):
self._key_columns.append(self._obj._data[by])
self.names.append(by)
self._named_columns.append(by)
self._key_column_names_from_obj.append(by)

def _handle_grouper(self, by):
if by.key:
if by.freq:
self._handle_frequency_grouper(by)
elif by.key:
self._handle_label(by.key)
else:
self._handle_level(by.level)

def _handle_frequency_grouper(self, by):
raise NotImplementedError()

def _handle_level(self, by):
level_values = self._obj.index.get_level_values(by)
self._key_columns.append(level_values._values)
Expand Down
1 change: 1 addition & 0 deletions python/cudf/cudf/core/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -2331,6 +2331,7 @@ def __init__(
data = column.as_column(data)
data.dtype.closed = closed

self.closed = closed
super().__init__(data, **kwargs)

def from_breaks(breaks, closed="right", name=None, copy=False, dtype=None):
Expand Down
Loading

0 comments on commit 49d1cc2

Please sign in to comment.