From 49d1cc28648fe676dbddcf27c01939f87122ed8e Mon Sep 17 00:00:00 2001 From: Ashwin Srinath <3190405+shwina@users.noreply.github.com> Date: Fri, 12 Nov 2021 22:06:21 -0500 Subject: [PATCH] Grouping by frequency and resampling (#9178) Closes #6255, #8416 This PR implements two related features: 1. Grouping by a frequency via the `freq=` argument to `cudf.Grouper` 2. and time-series resampling via the `.resample()` API Either operation results in ` _Resampler` object that represents the data resampled into "bins" of a particular frequency. The following operations are supported on resampled data: 1. Aggregations such as `min()` and `max()`, performed bin-wise 2. `ffill()` and `bfill()` methods: forward and backward filling in the case of upsampling data 3. `asfreq()`: returns the resampled data as a Series or DataFrame() These are all best understood by example: First, we create a time series with 1 minute intervals: ```python >>> index = cudf.date_range(start="2001-01-01", periods=10, freq="1T") >>> sr = cudf.Series(range(10), index=index) >>> sr 2001-01-01 00:00:00 0 2001-01-01 00:01:00 1 2001-01-01 00:02:00 2 2001-01-01 00:03:00 3 2001-01-01 00:04:00 4 2001-01-01 00:05:00 5 2001-01-01 00:06:00 6 2001-01-01 00:07:00 7 2001-01-01 00:08:00 8 2001-01-01 00:09:00 9 dtype: int64 ```` Downsampling to 3 minute intervals, followed by a "sum" aggregation: ```python >>> sr.resample("3T").sum() # equivalently, sr.groupby(cudf.Grouper(freq="3T")).sum() 2001-01-01 00:00:00 3 2001-01-01 00:03:00 12 2001-01-01 00:06:00 21 2001-01-01 00:09:00 9 dtype: int64 ```` Upsampling to 30 second intervals: ```python >>> sr.resample("30s").asfreq() 2001-01-01 00:00:00 0.0 2001-01-01 00:00:30 NaN 2001-01-01 00:01:00 1.0 2001-01-01 00:01:30 NaN 2001-01-01 00:02:00 2.0 2001-01-01 00:02:30 NaN 2001-01-01 00:03:00 3.0 2001-01-01 00:03:30 NaN 2001-01-01 00:04:00 4.0 2001-01-01 00:04:30 NaN 2001-01-01 00:05:00 5.0 2001-01-01 00:05:30 NaN 2001-01-01 00:06:00 6.0 2001-01-01 00:06:30 NaN 2001-01-01 00:07:00 7.0 2001-01-01 00:07:30 NaN 2001-01-01 00:08:00 8.0 2001-01-01 00:08:30 NaN 2001-01-01 00:09:00 9.0 Freq: 30S, dtype: float64 ``` Upsampling to 30 second intervals, followed by a forward fill: ```python >>> sr.resample("30s").ffill() 2001-01-01 00:00:00 0 2001-01-01 00:00:30 0 2001-01-01 00:01:00 1 2001-01-01 00:01:30 1 2001-01-01 00:02:00 2 2001-01-01 00:02:30 2 2001-01-01 00:03:00 3 2001-01-01 00:03:30 3 2001-01-01 00:04:00 4 2001-01-01 00:04:30 4 2001-01-01 00:05:00 5 2001-01-01 00:05:30 5 2001-01-01 00:06:00 6 2001-01-01 00:06:30 6 2001-01-01 00:07:00 7 2001-01-01 00:07:30 7 2001-01-01 00:08:00 8 2001-01-01 00:08:30 8 2001-01-01 00:09:00 9 Freq: 30S, dtype: int64 ``` Authors: - Ashwin Srinath (https://github.com/shwina) - Michael Wang (https://github.com/isVoid) Approvers: - https://github.com/brandon-b-miller - Vyas Ramasubramani (https://github.com/vyasr) - Benjamin Zaitlen (https://github.com/quasiben) URL: https://github.com/rapidsai/cudf/pull/9178 --- python/cudf/cudf/_lib/copying.pyx | 5 +- python/cudf/cudf/core/column/column.py | 6 + python/cudf/cudf/core/column_accessor.py | 29 ++ python/cudf/cudf/core/cut.py | 29 +- python/cudf/cudf/core/dataframe.py | 20 +- python/cudf/cudf/core/frame.py | 11 +- python/cudf/cudf/core/groupby/groupby.py | 61 ++-- python/cudf/cudf/core/index.py | 1 + python/cudf/cudf/core/indexed_frame.py | 220 +++++++++++++- python/cudf/cudf/core/join/join.py | 37 ++- python/cudf/cudf/core/resample.py | 343 ++++++++++++++++++++++ python/cudf/cudf/core/series.py | 44 +-- python/cudf/cudf/core/tools/datetimes.py | 72 +++-- python/cudf/cudf/core/window/rolling.py | 6 +- python/cudf/cudf/tests/test_datetime.py | 22 +- python/cudf/cudf/tests/test_groupby.py | 111 +++++++ python/cudf/cudf/tests/test_joining.py | 27 ++ python/cudf/cudf/tests/test_resampling.py | 156 ++++++++++ 18 files changed, 1068 insertions(+), 132 deletions(-) create mode 100644 python/cudf/cudf/core/resample.py create mode 100644 python/cudf/cudf/tests/test_resampling.py diff --git a/python/cudf/cudf/_lib/copying.pyx b/python/cudf/cudf/_lib/copying.pyx index c892c100bf6..26ef428f21f 100644 --- a/python/cudf/cudf/_lib/copying.pyx +++ b/python/cudf/cudf/_lib/copying.pyx @@ -147,12 +147,13 @@ def gather( source_table, Column gather_map, bool keep_index=True, - bool nullify=False + bool nullify=False, + bool check_bounds=True ): if not pd.api.types.is_integer_dtype(gather_map.dtype): raise ValueError("Gather map is not integer dtype.") - if len(gather_map) > 0 and not nullify: + if check_bounds and len(gather_map) > 0 and not nullify: gm_min, gm_max = minmax(gather_map) if gm_min < -len(source_table) or gm_max >= len(source_table): raise IndexError(f"Gather map index with min {gm_min}," diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index 0a3688248bd..5f9104263b1 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -1152,6 +1152,12 @@ def normalize_binop_value( ) -> Union[ColumnBase, ScalarLike]: raise NotImplementedError + def _minmax(self, skipna: bool = None): + result_col = self._process_for_reduction(skipna=skipna) + if isinstance(result_col, ColumnBase): + return libcudf.reduce.minmax(result_col) + return result_col + def min(self, skipna: bool = None, dtype: Dtype = None): result_col = self._process_for_reduction(skipna=skipna) if isinstance(result_col, ColumnBase): diff --git a/python/cudf/cudf/core/column_accessor.py b/python/cudf/cudf/core/column_accessor.py index 2411b2a9211..c2ea9d756f7 100644 --- a/python/cudf/cudf/core/column_accessor.py +++ b/python/cudf/cudf/core/column_accessor.py @@ -537,6 +537,24 @@ def rename_column(x): return self.__class__(ca) + def droplevel(self, level): + # drop the nth level + if level < 0: + level += self.nlevels + + self._data = { + _remove_key_level(key, level): value + for key, value in self._data.items() + } + self._level_names = ( + self._level_names[:level] + self._level_names[level + 1 :] + ) + + if ( + len(self._level_names) == 1 + ): # can't use nlevels, as it depends on multiindex + self.multiindex = False + def _compare_keys(target: Any, key: Any) -> bool: """ @@ -554,3 +572,14 @@ def _compare_keys(target: Any, key: Any) -> bool: if k1 != k2: return False return True + + +def _remove_key_level(key: Any, level: int) -> Any: + """ + Remove a level from key. If detupleize is True, and if only a + single level remains, convert the tuple to a scalar. + """ + result = key[:level] + key[level + 1 :] + if len(result) == 1: + return result[0] + return result diff --git a/python/cudf/cudf/core/cut.py b/python/cudf/cudf/core/cut.py index 039dba8a715..7c585602c23 100644 --- a/python/cudf/cudf/core/cut.py +++ b/python/cudf/cudf/core/cut.py @@ -115,7 +115,7 @@ def cut( "Bin labels must either be False, None or passed in as a " "list-like argument" ) - elif ordered and labels is not None: + if ordered and labels is not None: if len(set(labels)) != len(labels): raise ValueError( "labels must be unique if ordered=True;" @@ -207,22 +207,23 @@ def cut( ) if ordered and len(set(labels)) != len(labels): raise ValueError( - "labels must be unique if ordered=True; pass ordered=False for" + "labels must be unique if ordered=True; " + "pass ordered=False for" "duplicate labels" ) + + if len(labels) != len(bins) - 1: + raise ValueError( + "Bin labels must be one fewer than the number of bin edges" + ) + if not ordered and len(set(labels)) != len(labels): + interval_labels = cudf.CategoricalIndex( + labels, categories=None, ordered=False + ) else: - if len(labels) != len(bins) - 1: - raise ValueError( - "Bin labels must be one fewer than the number of bin edges" - ) - if not ordered and len(set(labels)) != len(labels): - interval_labels = cudf.CategoricalIndex( - labels, categories=None, ordered=False - ) - else: - interval_labels = ( - labels if len(set(labels)) == len(labels) else None - ) + interval_labels = ( + labels if len(set(labels)) == len(labels) else None + ) if isinstance(bins, pd.IntervalIndex): # get the left and right edges of the bins as columns diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index 8c2e3c8cc7f..b2e6588edb2 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -59,6 +59,7 @@ _get_label_range_or_mask, _indices_from_labels, ) +from cudf.core.resample import DataFrameResampler from cudf.core.series import Series from cudf.utils import applyutils, docutils, ioutils, queryutils, utils from cudf.utils.docutils import copy_docstring @@ -3811,13 +3812,17 @@ def groupby( "groupby() requires either by or level to be specified." ) - return DataFrameGroupBy( - self, - by=by, - level=level, - as_index=as_index, - dropna=dropna, - sort=sort, + return ( + DataFrameResampler(self, by=by) + if isinstance(by, cudf.Grouper) and by.freq + else DataFrameGroupBy( + self, + by=by, + level=level, + as_index=as_index, + dropna=dropna, + sort=sort, + ) ) def query(self, expr, local_dict=None): @@ -4721,7 +4726,6 @@ def to_pandas(self, nullable=False, **kwargs): b object dtype: object """ - out_data = {} out_index = self.index.to_pandas() diff --git a/python/cudf/cudf/core/frame.py b/python/cudf/cudf/core/frame.py index 99da216d392..e9c9878c66d 100644 --- a/python/cudf/cudf/core/frame.py +++ b/python/cudf/cudf/core/frame.py @@ -517,7 +517,9 @@ def _get_columns_by_index(self, indices): data, columns=data.to_pandas_index(), index=self.index ) - def _gather(self, gather_map, keep_index=True, nullify=False): + def _gather( + self, gather_map, keep_index=True, nullify=False, check_bounds=True + ): if not is_integer_dtype(gather_map.dtype): gather_map = gather_map.astype("int32") result = self.__class__._from_data( @@ -526,6 +528,7 @@ def _gather(self, gather_map, keep_index=True, nullify=False): as_column(gather_map), keep_index=keep_index, nullify=nullify, + check_bounds=check_bounds, ) ) @@ -1354,6 +1357,12 @@ def fillna( return self._mimic_inplace(result, inplace=inplace) + def ffill(self): + return self.fillna(method="ffill") + + def bfill(self): + return self.fillna(method="bfill") + def _drop_na_rows( self, how="any", subset=None, thresh=None, drop_nan=False ): diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index 6ffba8da069..ba69e42674a 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -4,6 +4,7 @@ import pickle import warnings +import numpy as np import pandas as pd from nvtx import annotate @@ -31,8 +32,6 @@ def _quantile_75(x): return x.quantile(0.75) -# Note that all valid aggregation methods (e.g. GroupBy.min) are bound to the -# class after its definition (see below). class GroupBy(Serializable): _MAX_GROUPS_BEFORE_WARN = 100 @@ -240,22 +239,20 @@ def agg(self, func): result = result.sort_index() if not _is_multi_agg(func): - if result.columns.nlevels == 1: + if result._data.nlevels <= 1: # 0 or 1 levels # make sure it's a flat index: - result.columns = result.columns.get_level_values(0) - - if result.columns.nlevels > 1: - try: - # drop the last level - result.columns = result.columns.droplevel(-1) - except IndexError: - # Pandas raises an IndexError if we are left - # with an all-nan MultiIndex when dropping - # the last level - if result.shape[1] == 1: - result.columns = [None] - else: - raise + result._data.multiindex = False + + if result._data.nlevels > 1: + result._data.droplevel(-1) + + # if, after dropping the last level, the only + # remaining key is `NaN`, we need to convert to `None` + # for Pandas compat: + if result._data.names == (np.nan,): + result._data = result._data.rename_levels( + {np.nan: None}, level=0 + ) if libgroupby._is_all_scan_aggregate(normalized_aggs): # Scan aggregations return rows in original index order @@ -1303,14 +1300,20 @@ def apply(self, func): return result +# TODO: should we define this as a dataclass instead? class Grouper(object): - def __init__(self, key=None, level=None): + def __init__( + self, key=None, level=None, freq=None, closed=None, label=None + ): if key is not None and level is not None: raise ValueError("Grouper cannot specify both key and level") - if key is None and level is None: + if (key, level) == (None, None) and not freq: raise ValueError("Grouper must specify either key or level") self.key = key self.level = level + self.freq = freq + self.closed = closed + self.label = label class _Grouping(Serializable): @@ -1318,14 +1321,6 @@ def __init__(self, obj, by=None, level=None): self._obj = obj self._key_columns = [] self.names = [] - # For transform operations, we want to filter out only the value - # columns that will be used. When part of the key columns are composed - # from columns in `obj`, these columns are not included in the value - # columns. This only happens when `by` is specified with - # column labels or `Grouper` object with `key` param. To avoid that - # external objects overlaps in names to `obj`, these column - # names are recorded separately in this list. - self._key_column_names_from_obj = [] # Need to keep track of named key columns # to support `as_index=False` correctly @@ -1390,9 +1385,7 @@ def values(self): """ # If the key columns are in `obj`, filter them out value_column_names = [ - x - for x in self._obj._data.names - if x not in self._key_column_names_from_obj + x for x in self._obj._data.names if x not in self._named_columns ] value_columns = self._obj._data.select_by_label(value_column_names) return self._obj.__class__._from_data(value_columns) @@ -1418,14 +1411,18 @@ def _handle_label(self, by): self._key_columns.append(self._obj._data[by]) self.names.append(by) self._named_columns.append(by) - self._key_column_names_from_obj.append(by) def _handle_grouper(self, by): - if by.key: + if by.freq: + self._handle_frequency_grouper(by) + elif by.key: self._handle_label(by.key) else: self._handle_level(by.level) + def _handle_frequency_grouper(self, by): + raise NotImplementedError() + def _handle_level(self, by): level_values = self._obj.index.get_level_values(by) self._key_columns.append(level_values._values) diff --git a/python/cudf/cudf/core/index.py b/python/cudf/cudf/core/index.py index 071344084c2..35b80715cca 100644 --- a/python/cudf/cudf/core/index.py +++ b/python/cudf/cudf/core/index.py @@ -2331,6 +2331,7 @@ def __init__( data = column.as_column(data) data.dtype.closed = closed + self.closed = closed super().__init__(data, **kwargs) def from_breaks(breaks, closed="right", name=None, copy=False, dtype=None): diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index bcaf37e659a..cf12907d96a 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -5,13 +5,16 @@ import warnings from typing import Type, TypeVar +from uuid import uuid4 import cupy as cp import pandas as pd from nvtx import annotate import cudf +from cudf._typing import ColumnLike from cudf.api.types import is_categorical_dtype, is_list_like +from cudf.core.column import arange from cudf.core.frame import Frame from cudf.core.index import Index from cudf.core.multiindex import MultiIndex @@ -38,8 +41,8 @@ def _indices_from_labels(obj, labels): # join is not guaranteed to maintain the index ordering # so we will sort it with its initial ordering which is stored # in column "__" - lhs = cudf.DataFrame({"__": column.arange(len(labels))}, index=labels) - rhs = cudf.DataFrame({"_": column.arange(len(obj))}, index=obj.index) + lhs = cudf.DataFrame({"__": arange(len(labels))}, index=labels) + rhs = cudf.DataFrame({"_": arange(len(obj))}, index=obj.index) return lhs.join(rhs).sort_values("__")["_"] @@ -77,6 +80,8 @@ def __init__(self, frame): _LocIndexerClass = TypeVar("_LocIndexerClass", bound="_FrameIndexer") _IlocIndexerClass = TypeVar("_IlocIndexerClass", bound="_FrameIndexer") +T = TypeVar("T", bound="IndexedFrame") + class IndexedFrame(Frame): """A frame containing an index. @@ -542,3 +547,214 @@ def _n_largest_or_smallest(self, largest, n, columns, keep): return self.take(indices, keep_index=True) else: raise ValueError('keep must be either "first", "last"') + + def _align_to_index( + self: T, + index: ColumnLike, + how: str = "outer", + sort: bool = True, + allow_non_unique: bool = False, + ) -> T: + index = cudf.core.index.as_index(index) + + if self.index.equals(index): + return self + if not allow_non_unique: + if not self.index.is_unique or not index.is_unique: + raise ValueError("Cannot align indices with non-unique values") + + lhs = cudf.DataFrame._from_data(self._data, index=self.index) + rhs = cudf.DataFrame._from_data({}, index=index) + + # create a temporary column that we will later sort by + # to recover ordering after index alignment. + sort_col_id = str(uuid4()) + if how == "left": + lhs[sort_col_id] = arange(len(lhs)) + elif how == "right": + rhs[sort_col_id] = arange(len(rhs)) + + result = lhs.join(rhs, how=how, sort=sort) + if how in ("left", "right"): + result = result.sort_values(sort_col_id) + del result[sort_col_id] + + result = self.__class__._from_data(result._data, index=result.index) + result._data.multiindex = self._data.multiindex + result._data._level_names = self._data._level_names + result.index.names = self.index.names + + return result + + def resample( + self, + rule, + axis=0, + closed=None, + label=None, + convention="start", + kind=None, + loffset=None, + base=None, + on=None, + level=None, + origin="start_day", + offset=None, + ): + """ + Convert the frequency of ("resample") the given time series data. + + Parameters + ---------- + rule: str + The offset string representing the frequency to use. + Note that DateOffset objects are not yet supported. + closed: {"right", "left"}, default None + Which side of bin interval is closed. The default is + "left" for all frequency offsets except for "M" and "W", + which have a default of "right". + label: {"right", "left"}, default None + Which bin edge label to label bucket with. The default is + "left" for all frequency offsets except for "M" and "W", + which have a default of "right". + on: str, optional + For a DataFrame, column to use instead of the index for + resampling. Column must be a datetime-like. + level: str or int, optional + For a MultiIndex, level to use instead of the index for + resampling. The level must be a datetime-like. + + Returns + ------- + A Resampler object + + Examples + -------- + First, we create a time series with 1 minute intervals: + + >>> index = cudf.date_range(start="2001-01-01", periods=10, freq="1T") + >>> sr = cudf.Series(range(10), index=index) + >>> sr + 2001-01-01 00:00:00 0 + 2001-01-01 00:01:00 1 + 2001-01-01 00:02:00 2 + 2001-01-01 00:03:00 3 + 2001-01-01 00:04:00 4 + 2001-01-01 00:05:00 5 + 2001-01-01 00:06:00 6 + 2001-01-01 00:07:00 7 + 2001-01-01 00:08:00 8 + 2001-01-01 00:09:00 9 + dtype: int64 + + Downsampling to 3 minute intervals, followed by a "sum" aggregation: + + >>> sr.resample("3T").sum() + 2001-01-01 00:00:00 3 + 2001-01-01 00:03:00 12 + 2001-01-01 00:06:00 21 + 2001-01-01 00:09:00 9 + dtype: int64 + + Use the right side of each interval to label the bins: + + >>> sr.resample("3T", label="right").sum() + 2001-01-01 00:03:00 3 + 2001-01-01 00:06:00 12 + 2001-01-01 00:09:00 21 + 2001-01-01 00:12:00 9 + dtype: int64 + + Close the right side of the interval instead of the left: + + >>> sr.resample("3T", closed="right").sum() + 2000-12-31 23:57:00 0 + 2001-01-01 00:00:00 6 + 2001-01-01 00:03:00 15 + 2001-01-01 00:06:00 24 + dtype: int64 + + Upsampling to 30 second intervals: + + >>> sr.resample("30s").asfreq()[:5] # show the first 5 rows + 2001-01-01 00:00:00 0 + 2001-01-01 00:00:30 + 2001-01-01 00:01:00 1 + 2001-01-01 00:01:30 + 2001-01-01 00:02:00 2 + dtype: int64 + + Upsample and fill nulls using the "bfill" method: + + >>> sr.resample("30s").bfill()[:5] + 2001-01-01 00:00:00 0 + 2001-01-01 00:00:30 1 + 2001-01-01 00:01:00 1 + 2001-01-01 00:01:30 2 + 2001-01-01 00:02:00 2 + dtype: int64 + + Resampling by a specified column of a Dataframe: + + >>> df = cudf.DataFrame({ + ... "price": [10, 11, 9, 13, 14, 18, 17, 19], + ... "volume": [50, 60, 40, 100, 50, 100, 40, 50], + ... "week_starting": cudf.date_range( + ... "2018-01-01", periods=8, freq="7D" + ... ) + ... }) + >>> df + price volume week_starting + 0 10 50 2018-01-01 + 1 11 60 2018-01-08 + 2 9 40 2018-01-15 + 3 13 100 2018-01-22 + 4 14 50 2018-01-29 + 5 18 100 2018-02-05 + 6 17 40 2018-02-12 + 7 19 50 2018-02-19 + >>> df.resample("M", on="week_starting").mean() + price volume + week_starting + 2018-01-31 11.4 60.000000 + 2018-02-28 18.0 63.333333 + + + Notes + ----- + Note that the dtype of the index (or the 'on' column if using + 'on=') in the result will be of a frequency closest to the + resampled frequency. For example, if resampling from + nanoseconds to milliseconds, the index will be of dtype + 'datetime64[ms]'. + """ + import cudf.core.resample + + if (axis, convention, kind, loffset, base, origin, offset) != ( + 0, + "start", + None, + None, + None, + "start_day", + None, + ): + raise NotImplementedError( + "The following arguments are not " + "currently supported by resample:\n\n" + "- axis\n" + "- convention\n" + "- kind\n" + "- loffset\n" + "- base\n" + "- origin\n" + "- offset" + ) + by = cudf.Grouper( + key=on, freq=rule, closed=closed, label=label, level=level + ) + return ( + cudf.core.resample.SeriesResampler(self, by=by) + if isinstance(self, cudf.Series) + else cudf.core.resample.DataFrameResampler(self, by=by) + ) diff --git a/python/cudf/cudf/core/join/join.py b/python/cudf/cudf/core/join/join.py index 55540d362ac..28b2d5d8167 100644 --- a/python/cudf/cudf/core/join/join.py +++ b/python/cudf/cudf/core/join/join.py @@ -178,11 +178,17 @@ def perform_merge(self) -> Frame: gather_index = self.left_index or self.right_index if left_rows is not None: left_result = lhs._gather( - left_rows, nullify=True, keep_index=gather_index + left_rows, + nullify=True, + keep_index=gather_index, + check_bounds=False, ) if right_rows is not None: right_result = rhs._gather( - right_rows, nullify=True, keep_index=gather_index + right_rows, + nullify=True, + keep_index=gather_index, + check_bounds=False, ) result = self._merge_results(left_result, right_result) @@ -310,8 +316,25 @@ def _merge_results(self, left_result: Frame, right_result: Frame) -> Frame: else: del right_names[name] - # Assemble the data columns of the result: - data = left_result._data.__class__() + # determine if the result has multiindex columns. The result + # of a join has a MultiIndex as its columns if: + # - both the `lhs` and `rhs` have a MultiIndex columns + # OR + # - either one of `lhs` or `rhs` have a MultiIndex columns, + # and the other is empty (i.e., no columns) + if self.lhs._data and self.rhs._data: + multiindex_columns = ( + self.lhs._data.multiindex and self.rhs._data.multiindex + ) + elif self.lhs._data: + multiindex_columns = self.lhs._data.multiindex + elif self.rhs._data: + multiindex_columns = self.rhs._data.multiindex + else: + multiindex_columns = False + + # Assemble the data columns of the result + data = left_result._data.__class__(multiindex=multiindex_columns) for lcol in left_names: data.set_by_label( @@ -354,7 +377,9 @@ def _sort_result(self, result: Frame) -> Frame: sort_order = result._get_sorted_inds( list(_coerce_to_tuple(self.on)) ) - return result._gather(sort_order, keep_index=False) + return result._gather( + sort_order, keep_index=False, check_bounds=False + ) by = [] if self.left_index and self.right_index: if result._index is not None: @@ -370,7 +395,7 @@ def _sort_result(self, result: Frame) -> Frame: if by: to_sort = cudf.DataFrame._from_columns(by) sort_order = to_sort.argsort() - result = result._gather(sort_order) + result = result._gather(sort_order, check_bounds=False) return result @staticmethod diff --git a/python/cudf/cudf/core/resample.py b/python/cudf/cudf/core/resample.py new file mode 100644 index 00000000000..55a92a9d226 --- /dev/null +++ b/python/cudf/cudf/core/resample.py @@ -0,0 +1,343 @@ +# Copyright (c) 2021, NVIDIA CORPORATION. + +import numpy as np +import pandas as pd + +import cudf +import cudf._lib.labeling +from cudf._typing import DataFrameOrSeries +from cudf.core.groupby.groupby import ( + DataFrameGroupBy, + GroupBy, + SeriesGroupBy, + _Grouping, +) +from cudf.core.tools.datetimes import _offset_alias_to_code, _unit_dtype_map + + +class _Resampler(GroupBy): + + grouping: "_ResampleGrouping" + + def __init__(self, obj, by, axis=None, kind=None): + by = _ResampleGrouping(obj, by) + super().__init__(obj, by=by) + + def agg(self, func): + result = super().agg(func) + if len(self.grouping.bin_labels) != len(result): + index = cudf.Index( + self.grouping.bin_labels, name=self.grouping.names[0] + ) + return result._align_to_index( + index, how="right", sort=False, allow_non_unique=True + ) + else: + return result.sort_index() + + def asfreq(self): + return self.obj._align_to_index( + self.grouping.bin_labels, + how="right", + sort=False, + allow_non_unique=True, + ) + + def _scan_fill(self, method: str, limit: int) -> DataFrameOrSeries: + # TODO: can this be more efficient? + + # first, compute the outer join between `self.obj` and the `bin_labels` + # to get the sampling "gaps": + upsampled = self.obj._align_to_index( + self.grouping.bin_labels, + how="outer", + sort=True, + allow_non_unique=True, + ) + + # fill the gaps: + filled = upsampled.fillna(method=method) + + # filter the result to only include the values corresponding + # to the bin labels: + return filled._align_to_index( + self.grouping.bin_labels, + how="right", + sort=False, + allow_non_unique=True, + ) + + +class DataFrameResampler(_Resampler, DataFrameGroupBy): + pass + + +class SeriesResampler(_Resampler, SeriesGroupBy): + pass + + +class _ResampleGrouping(_Grouping): + + bin_labels: cudf.Index + + def _handle_frequency_grouper(self, by): + # if `by` is a time frequency grouper, we bin the key column + # using bin intervals specified by `by.freq`, then use *that* + # as the groupby key + + freq = by.freq + label = by.label + closed = by.closed + + if isinstance(freq, (cudf.DateOffset, pd.DateOffset)): + raise NotImplementedError( + "Resampling by DateOffset objects is not yet supported." + ) + if not isinstance(freq, str): + raise TypeError( + f"Unsupported type for freq: {type(freq).__name__}" + ) + # convert freq to a pd.DateOffset: + offset = pd.tseries.frequencies.to_offset(freq) + + if offset.freqstr == "M" or offset.freqstr.startswith("W-"): + label = "right" if label is None else label + closed = "right" if closed is None else closed + else: + label = "left" if label is None else label + closed = "left" if closed is None else closed + + # determine the key column + if by.key is None and by.level is None: + # then assume that the key is the index of `self._obj`: + self._handle_index(self._obj.index) + elif by.key: + self._handle_label(by.key) + elif by.level: + self._handle_level(by.level) + + if not len(self._key_columns) == 1: + raise ValueError("Must resample on exactly one column") + + key_column = self._key_columns[0] + + if not isinstance(key_column, cudf.core.column.DatetimeColumn): + raise TypeError( + f"Can only resample on a DatetimeIndex or datetime column, " + f"got column of type {key_column.dtype}" + ) + + # get the start and end values that will be used to generate + # the bin labels + min_date, max_date = key_column._minmax() + start, end = _get_timestamp_range_edges( + pd.Timestamp(min_date.value), + pd.Timestamp(max_date.value), + offset, + closed=closed, + ) + + # in some cases, an extra time stamp is required in order to + # bin all the values. It's OK if we generate more labels than + # we need, as we remove any unused labels below + end += offset + + # generate the labels for binning the key column: + bin_labels = cudf.date_range(start=start, end=end, freq=freq,) + + # We want the (resampled) column of timestamps in the result + # to have a resolution closest to the resampling + # frequency. For example, if resampling from '1T' to '1s', we + # want the resulting timestamp column to by of dtype + # 'datetime64[s]'. libcudf requires the bin labels and key + # column to have the same dtype, so we compute a `result_type` + # and cast them both to that type. + try: + result_type = np.dtype( + _unit_dtype_map[_offset_alias_to_code[offset.name]] + ) + except KeyError: + # unsupported resolution (we don't support resolutions >s) + # fall back to using datetime64[s] + result_type = np.dtype("datetime64[s]") + + # TODO: Ideally, we can avoid one cast by having `date_range` + # generate timestamps of a given dtype. Currently, it can + # only generate timestamps with 'ns' precision + key_column = key_column.astype(result_type) + bin_labels = bin_labels.astype(result_type) + + # bin the key column: + bin_numbers = cudf._lib.labeling.label_bins( + key_column, + left_edges=bin_labels[:-1]._column, + left_inclusive=(closed == "left"), + right_edges=bin_labels[1:]._column, + right_inclusive=(closed == "right"), + ) + + if label == "right": + bin_labels = bin_labels[1:] + else: + bin_labels = bin_labels[:-1] + + # if we have more labels than bins, remove the extras labels: + nbins = bin_numbers.max() + 1 + if len(bin_labels) > nbins: + bin_labels = bin_labels[:nbins] + + bin_labels.name = self.names[0] + self.bin_labels = bin_labels + + # replace self._key_columns with the binned key column: + self._key_columns = [ + bin_labels._gather(bin_numbers, check_bounds=False)._column.astype( + result_type + ) + ] + + +# NOTE: this function is vendored from Pandas +def _get_timestamp_range_edges( + first, last, freq, closed="left", origin="start_day", offset=None +): + """ + Adjust the `first` Timestamp to the preceding Timestamp that resides on + the provided offset. Adjust the `last` Timestamp to the following + Timestamp that resides on the provided offset. Input Timestamps that + already reside on the offset will be adjusted depending on the type of + offset and the `closed` parameter. + + Parameters + ---------- + first : pd.Timestamp + The beginning Timestamp of the range to be adjusted. + last : pd.Timestamp + The ending Timestamp of the range to be adjusted. + freq : pd.DateOffset + The dateoffset to which the Timestamps will be adjusted. + closed : {'right', 'left'}, default None + Which side of bin interval is closed. + origin : {'epoch', 'start', 'start_day'} or Timestamp, default 'start_day' + The timestamp on which to adjust the grouping. The timezone of origin + must match the timezone of the index. If a timestamp is not used, + these values are also supported: + + - 'epoch': `origin` is 1970-01-01 + - 'start': `origin` is the first value of the timeseries + - 'start_day': `origin` is the first day at midnight of the timeseries + offset : pd.Timedelta, default is None + An offset timedelta added to the origin. + + Returns + ------- + A tuple of length 2, containing the adjusted pd.Timestamp objects. + """ + from pandas.tseries.offsets import Day, Tick + + if isinstance(freq, Tick): + index_tz = first.tz + if isinstance(origin, pd.Timestamp) and (origin.tz is None) != ( + index_tz is None + ): + raise ValueError( + "The origin must have the same timezone as the index." + ) + elif origin == "epoch": + # set the epoch based on the timezone to have similar bins results + # when resampling on the same kind of indexes on different + # timezones + origin = pd.Timestamp("1970-01-01", tz=index_tz) + + if isinstance(freq, Day): + # _adjust_dates_anchored assumes 'D' means 24H, but first/last + # might contain a DST transition (23H, 24H, or 25H). + # So "pretend" the dates are naive when adjusting the endpoints + first = first.tz_localize(None) + last = last.tz_localize(None) + if isinstance(origin, pd.Timestamp): + origin = origin.tz_localize(None) + + first, last = _adjust_dates_anchored( + first, last, freq, closed=closed, origin=origin, offset=offset + ) + if isinstance(freq, Day): + first = first.tz_localize(index_tz) + last = last.tz_localize(index_tz) + else: + first = first.normalize() + last = last.normalize() + + if closed == "left": + first = pd.Timestamp(freq.rollback(first)) + else: + first = pd.Timestamp(first - freq) + + last = pd.Timestamp(last + freq) + + return first, last + + +# NOTE: this function is vendored from Pandas +def _adjust_dates_anchored( + first, last, freq, closed="right", origin="start_day", offset=None +): + # First and last offsets should be calculated from the start day to fix an + # error cause by resampling across multiple days when a one day period is + # not a multiple of the frequency. See GH 8683 + # To handle frequencies that are not multiple or divisible by a day we let + # the possibility to define a fixed origin timestamp. See GH 31809 + origin_nanos = 0 # origin == "epoch" + if origin == "start_day": + origin_nanos = first.normalize().value + elif origin == "start": + origin_nanos = first.value + elif isinstance(origin, pd.Timestamp): + origin_nanos = origin.value + origin_nanos += offset.value if offset else 0 + + # GH 10117 & GH 19375. If first and last contain timezone information, + # Perform the calculation in UTC in order to avoid localizing on an + # Ambiguous or Nonexistent time. + first_tzinfo = first.tzinfo + last_tzinfo = last.tzinfo + if first_tzinfo is not None: + first = first.tz_convert("UTC") + if last_tzinfo is not None: + last = last.tz_convert("UTC") + + foffset = (first.value - origin_nanos) % freq.nanos + loffset = (last.value - origin_nanos) % freq.nanos + + if closed == "right": + if foffset > 0: + # roll back + fresult = first.value - foffset + else: + fresult = first.value - freq.nanos + + if loffset > 0: + # roll forward + lresult = last.value + (freq.nanos - loffset) + else: + # already the end of the road + lresult = last.value + else: # closed == 'left' + if foffset > 0: + fresult = first.value - foffset + else: + # start of the road + fresult = first.value + + if loffset > 0: + # roll forward + lresult = last.value + (freq.nanos - loffset) + else: + lresult = last.value + freq.nanos + fresult = pd.Timestamp(fresult) + lresult = pd.Timestamp(lresult) + if first_tzinfo is not None: + fresult = fresult.tz_localize("UTC").tz_convert(first_tzinfo) + if last_tzinfo is not None: + lresult = lresult.tz_localize("UTC").tz_convert(last_tzinfo) + return fresult, lresult diff --git a/python/cudf/cudf/core/series.py b/python/cudf/cudf/core/series.py index 961476309c7..00a8ebabe34 100644 --- a/python/cudf/cudf/core/series.py +++ b/python/cudf/cudf/core/series.py @@ -11,7 +11,6 @@ from numbers import Number from shutil import get_terminal_size from typing import Any, MutableMapping, Optional, Set, Union -from uuid import uuid4 import cupy import numpy as np @@ -3534,6 +3533,8 @@ def groupby( observed=False, dropna=True, ): + import cudf.core.resample + if axis not in (0, "index"): raise NotImplementedError("axis parameter is not yet implemented") @@ -3557,8 +3558,12 @@ def groupby( "groupby() requires either by or level to be specified." ) - return SeriesGroupBy( - self, by=by, level=level, dropna=dropna, sort=sort + return ( + cudf.core.resample.SeriesResampler(self, by=by) + if isinstance(by, cudf.Grouper) and by.freq + else SeriesGroupBy( + self, by=by, level=level, dropna=dropna, sort=sort + ) ) def rename(self, index=None, copy=True): @@ -3610,39 +3615,6 @@ def rename(self, index=None, copy=True): return out.copy(deep=copy) - def _align_to_index( - self, index, how="outer", sort=True, allow_non_unique=False - ): - """ - Align to the given Index. See _align_indices below. - """ - - index = as_index(index) - if self.index.equals(index): - return self - if not allow_non_unique: - if len(self) != len(self.index.unique()) or len(index) != len( - index.unique() - ): - raise ValueError("Cannot align indices with non-unique values") - lhs = self.to_frame(0) - rhs = cudf.DataFrame(index=as_index(index)) - if how == "left": - tmp_col_id = str(uuid4()) - lhs[tmp_col_id] = column.arange(len(lhs)) - elif how == "right": - tmp_col_id = str(uuid4()) - rhs[tmp_col_id] = column.arange(len(rhs)) - result = lhs.join(rhs, how=how, sort=sort) - if how == "left" or how == "right": - result = result.sort_values(tmp_col_id)[0] - else: - result = result[0] - - result.name = self.name - result.index.names = index.names - return result - def merge( self, other, diff --git a/python/cudf/cudf/core/tools/datetimes.py b/python/cudf/cudf/core/tools/datetimes.py index 31fb7c7023c..34d62ffc048 100644 --- a/python/cudf/cudf/core/tools/datetimes.py +++ b/python/cudf/cudf/core/tools/datetimes.py @@ -3,7 +3,7 @@ import math import re import warnings -from typing import Sequence, Union +from typing import Sequence, Type, TypeVar, Union import cupy as cp import numpy as np @@ -30,6 +30,7 @@ } _offset_alias_to_code = { + "W": "W", "D": "D", "H": "h", "h": "h", @@ -365,6 +366,9 @@ def get_units(value): return value +_T = TypeVar("_T", bound="DateOffset") + + class DateOffset: """ An object used for binary ops where calendrical arithmetic @@ -440,7 +444,21 @@ class DateOffset: "years": "Y", } - _CODES_TO_UNITS = {v: k for k, v in _UNITS_TO_CODES.items()} + _CODES_TO_UNITS = { + "ns": "nanoseconds", + "us": "microseconds", + "ms": "milliseconds", + "L": "milliseconds", + "s": "seconds", + "m": "minutes", + "h": "hours", + "D": "days", + "W": "weeks", + "M": "months", + "Y": "years", + } + + _FREQSTR_REGEX = re.compile("([0-9]*)([a-zA-Z]+)") def __init__(self, n=1, normalize=False, **kwds): if normalize: @@ -554,7 +572,9 @@ def _combine_kwargs_to_seconds(self, **kwargs): kwargs["seconds"] = seconds return kwargs - def _datetime_binop(self, datetime_col, op, reflect=False): + def _datetime_binop( + self, datetime_col, op, reflect=False + ) -> column.DatetimeColumn: if reflect and op == "sub": raise TypeError( f"Can not subtract a {type(datetime_col).__name__}" @@ -588,7 +608,7 @@ def _generate_months_column(self, size, op): return col @property - def _is_no_op(self): + def _is_no_op(self) -> bool: # some logic could be implemented here for more complex cases # such as +1 year, -12 months return all([i == 0 for i in self._kwds.values()]) @@ -609,24 +629,22 @@ def __repr__(self): return repr_str @classmethod - def _from_freqstr(cls, freqstr): + def _from_freqstr(cls: Type[_T], freqstr: str) -> _T: """ Parse a string and return a DateOffset object expects strings of the form 3D, 25W, 10ms, 42ns, etc. """ - numeric_part = "" - freq_part = "" + match = cls._FREQSTR_REGEX.match(freqstr) - for x in freqstr: - if x.isdigit(): - numeric_part += x - else: - freq_part += x + if match is None: + raise ValueError(f"Invalid frequency string: {freqstr}") - if ( - freq_part not in cls._CODES_TO_UNITS - or not numeric_part + freq_part == freqstr - ): + numeric_part = match.group(1) + if numeric_part == "": + numeric_part = "1" + freq_part = match.group(2) + + if freq_part not in cls._CODES_TO_UNITS: raise ValueError(f"Cannot interpret frequency str: {freqstr}") return cls(**{cls._CODES_TO_UNITS[freq_part]: int(numeric_part)}) @@ -859,15 +877,16 @@ def date_range( # end == start, return exactly 1 timestamp (start) periods = 1 - # The estimated upper bound of `end` is enforced to be computed to make - # sure overflow components are raised before actually computing the - # sequence. + # We compute `end_estim` (the estimated upper bound of the date + # range) below, but don't always use it. We do this to ensure + # that the appropriate OverflowError is raised by Pandas in case + # of overflow. # FIXME: when `end_estim` is out of bound, but the actual `end` is not, # we shouldn't raise but compute the sequence as is. The trailing overflow # part should get trimmed at the end. end_estim = ( pd.Timestamp(start.value) - + (periods - 1) * offset._maybe_as_fast_pandas_offset() + + periods * offset._maybe_as_fast_pandas_offset() ).to_datetime64() if "months" in offset.kwds or "years" in offset.kwds: @@ -881,13 +900,12 @@ def date_range( (res <= end) if _is_increment_sequence else (res <= start) ] else: - # If `offset` is fixed frequency, we treat both timestamps as integers - # and evenly divide the given integer range. - arr = cp.linspace( - start=start.value.astype("int64"), - stop=end_estim.astype("int64"), - num=periods, - ) + # If `offset` is fixed frequency, we generate a range of + # treating `start`, `stop` and `step` as ints: + stop = end_estim.astype("int64") + start = start.value.astype("int64") + step = int(_offset_to_nanoseconds_lower_bound(offset)) + arr = cp.arange(start=start, stop=stop, step=step) res = cudf.core.column.as_column(arr).astype("datetime64[ns]") return cudf.DatetimeIndex._from_data({name: res}) diff --git a/python/cudf/cudf/core/window/rolling.py b/python/cudf/cudf/core/window/rolling.py index 680cfca19eb..617dbdeaea5 100644 --- a/python/cudf/cudf/core/window/rolling.py +++ b/python/cudf/cudf/core/window/rolling.py @@ -428,9 +428,9 @@ def __init__(self, groupby, window, min_periods=None, center=False): # of `groupby.grouping.keys` and `groupby.obj`. # As an optimization, avoid gathering those twice. self._group_keys = groupby.grouping.keys.take(sort_order) - obj = groupby.obj.drop( - columns=groupby.grouping._key_column_names_from_obj - ).take(sort_order) + obj = groupby.obj.drop(columns=groupby.grouping._named_columns).take( + sort_order + ) gb_size = groupby.size().sort_index() self._group_starts = ( diff --git a/python/cudf/cudf/tests/test_datetime.py b/python/cudf/cudf/tests/test_datetime.py index deb10855802..d666dfc0ec1 100644 --- a/python/cudf/cudf/tests/test_datetime.py +++ b/python/cudf/cudf/tests/test_datetime.py @@ -1452,7 +1452,14 @@ def test_is_month_start(data, dtype): date_range_test_periods = [1, 10, 100] date_range_test_freq = [ {"months": 3, "years": 1}, - {"hours": 10, "days": 57, "nanoseconds": 3}, + pytest.param( + {"hours": 10, "days": 57, "nanoseconds": 3}, + marks=pytest.mark.xfail( + True, + reason="Pandas ignoring nanoseconds component. " + "https://github.com/pandas-dev/pandas/issues/44393", + ), + ), "83D", "17h", "-680T", @@ -1547,6 +1554,19 @@ def test_date_range_end_freq_periods(end, freq, periods): ) +def test_date_range_freq_does_not_divide_range(): + expect = pd.date_range( + "2001-01-01 00:00:00.000000", "2001-01-01 00:00:00.000010", freq="3us" + ) + got = cudf.date_range( + "2001-01-01 00:00:00.000000", "2001-01-01 00:00:00.000010", freq="3us" + ) + np.testing.assert_allclose( + expect.to_numpy().astype("int64"), + got.to_pandas().to_numpy().astype("int64"), + ) + + def test_date_range_raise_overflow(): # Fixed offset start = np.datetime64(np.iinfo("int64").max, "ns") diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index 77d0b766e97..1feaddf74e2 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -2205,6 +2205,114 @@ def foo(x): assert_groupby_results_equal(expect, got) +@pytest.mark.parametrize("label", [None, "left", "right"]) +@pytest.mark.parametrize("closed", [None, "left", "right"]) +def test_groupby_freq_week(label, closed): + pdf = pd.DataFrame( + { + "Publish date": [ + pd.Timestamp("2000-01-03"), + pd.Timestamp("2000-01-01"), + pd.Timestamp("2000-01-09"), + pd.Timestamp("2000-01-02"), + pd.Timestamp("2000-01-07"), + pd.Timestamp("2000-01-16"), + ], + "ID": [0, 1, 2, 3, 4, 5], + "Price": [10, 20, 30, 40, 50, 60], + } + ) + gdf = cudf.from_pandas(pdf) + expect = pdf.groupby( + pd.Grouper(key="Publish date", freq="1W", label=label, closed=closed) + ).mean() + got = gdf.groupby( + cudf.Grouper(key="Publish date", freq="1W", label=label, closed=closed) + ).mean() + assert_eq(expect, got, check_like=True, check_dtype=False) + + +@pytest.mark.parametrize("label", [None, "left", "right"]) +@pytest.mark.parametrize("closed", [None, "left", "right"]) +def test_groupby_freq_day(label, closed): + pdf = pd.DataFrame( + { + "Publish date": [ + pd.Timestamp("2000-01-03"), + pd.Timestamp("2000-01-01"), + pd.Timestamp("2000-01-09"), + pd.Timestamp("2000-01-02"), + pd.Timestamp("2000-01-07"), + pd.Timestamp("2000-01-16"), + ], + "ID": [0, 1, 2, 3, 4, 5], + "Price": [10, 20, 30, 40, 50, 60], + } + ) + gdf = cudf.from_pandas(pdf) + expect = pdf.groupby( + pd.Grouper(key="Publish date", freq="3D", label=label, closed=closed) + ).mean() + got = gdf.groupby( + cudf.Grouper(key="Publish date", freq="3D", label=label, closed=closed) + ).mean() + assert_eq(expect, got, check_like=True, check_dtype=False) + + +@pytest.mark.parametrize("label", [None, "left", "right"]) +@pytest.mark.parametrize("closed", [None, "left", "right"]) +def test_groupby_freq_min(label, closed): + pdf = pd.DataFrame( + { + "Publish date": [ + pd.Timestamp("2000-01-01 12:01:00"), + pd.Timestamp("2000-01-01 12:05:00"), + pd.Timestamp("2000-01-01 15:30:00"), + pd.Timestamp("2000-01-02 00:00:00"), + pd.Timestamp("2000-01-01 23:47:00"), + pd.Timestamp("2000-01-02 00:05:00"), + ], + "ID": [0, 1, 2, 3, 4, 5], + "Price": [10, 20, 30, 40, 50, 60], + } + ) + gdf = cudf.from_pandas(pdf) + expect = pdf.groupby( + pd.Grouper(key="Publish date", freq="1h", label=label, closed=closed) + ).mean() + got = gdf.groupby( + cudf.Grouper(key="Publish date", freq="1h", label=label, closed=closed) + ).mean() + assert_eq(expect, got, check_like=True, check_dtype=False) + + +@pytest.mark.parametrize("label", [None, "left", "right"]) +@pytest.mark.parametrize("closed", [None, "left", "right"]) +def test_groupby_freq_s(label, closed): + pdf = pd.DataFrame( + { + "Publish date": [ + pd.Timestamp("2000-01-01 00:00:02"), + pd.Timestamp("2000-01-01 00:00:07"), + pd.Timestamp("2000-01-01 00:00:02"), + pd.Timestamp("2000-01-02 00:00:15"), + pd.Timestamp("2000-01-01 00:00:05"), + pd.Timestamp("2000-01-02 00:00:09"), + ], + "ID": [0, 1, 2, 3, 4, 5], + "Price": [10, 20, 30, 40, 50, 60], + } + ) + gdf = cudf.from_pandas(pdf) + expect = pdf.groupby( + pd.Grouper(key="Publish date", freq="3s", label=label, closed=closed) + ).mean() + got = gdf.groupby( + cudf.Grouper(key="Publish date", freq="3s", label=label, closed=closed) + ).mean() + assert_eq(expect, got, check_like=True, check_dtype=False) + + @pytest.mark.parametrize( "pdf, group, name, obj", [ @@ -2252,3 +2360,6 @@ def test_groupby_get_group(pdf, group, name, obj): actual = gdf.groupby(group).get_group(name=name, obj=gobj) assert_groupby_results_equal(expected, actual) + + +# TODO: Add a test including datetime64[ms] column in input data diff --git a/python/cudf/cudf/tests/test_joining.py b/python/cudf/cudf/tests/test_joining.py index 88e822c27c4..e9f55c9e51a 100644 --- a/python/cudf/cudf/tests/test_joining.py +++ b/python/cudf/cudf/tests/test_joining.py @@ -2103,6 +2103,33 @@ def test_string_join_values_nulls(): assert_join_results_equal(expect, got, how="left") +def test_merge_multiindex_columns(): + lhs = pd.DataFrame({"a": [1, 2, 3], "b": [2, 3, 4]}) + lhs.columns = pd.MultiIndex.from_tuples([("a", "x"), ("a", "y")]) + rhs = pd.DataFrame({"a": [1, 2, 3], "b": [2, 3, 4]}) + rhs.columns = pd.MultiIndex.from_tuples([("a", "x"), ("a", "z")]) + expect = lhs.merge(rhs, on=[("a", "x")], how="inner") + + lhs = cudf.from_pandas(lhs) + rhs = cudf.from_pandas(rhs) + got = lhs.merge(rhs, on=[("a", "x")], how="inner") + + assert_join_results_equal(expect, got, how="inner") + + +def test_join_multiindex_empty(): + lhs = pd.DataFrame({"a": [1, 2, 3], "b": [2, 3, 4]}, index=["a", "b", "c"]) + lhs.columns = pd.MultiIndex.from_tuples([("a", "x"), ("a", "y")]) + rhs = pd.DataFrame(index=["a", "c", "d"]) + expect = lhs.join(rhs, how="inner") + + lhs = cudf.from_pandas(lhs) + rhs = cudf.from_pandas(rhs) + got = lhs.join(rhs, how="inner") + + assert_join_results_equal(expect, got, how="inner") + + def test_join_on_index_with_duplicate_names(): # although index levels with duplicate names are poorly supported # overall, we *should* be able to join on them: diff --git a/python/cudf/cudf/tests/test_resampling.py b/python/cudf/cudf/tests/test_resampling.py new file mode 100644 index 00000000000..3b8e807c3b6 --- /dev/null +++ b/python/cudf/cudf/tests/test_resampling.py @@ -0,0 +1,156 @@ +import numpy as np +import pandas as pd +import pytest + +import cudf +from cudf.testing._utils import assert_eq + + +def assert_resample_results_equal(lhs, rhs, **kwargs): + assert_eq( + lhs.sort_index(), + rhs.sort_index(), + check_dtype=False, + check_freq=False, + **kwargs, + ) + + +@pytest.mark.parametrize("ts_resolution", ["ns", "s", "ms"]) +def test_series_downsample_simple(ts_resolution): + # Series with and index of 5min intervals: + + index = pd.date_range(start="2001-01-01", periods=10, freq="1T") + psr = pd.Series(range(10), index=index) + gsr = cudf.from_pandas(psr) + gsr.index = gsr.index.astype(f"datetime64[{ts_resolution}]") + assert_resample_results_equal( + psr.resample("3T").sum(), gsr.resample("3T").sum(), + ) + + +def test_series_upsample_simple(): + # Series with and index of 5min intervals: + + index = pd.date_range(start="2001-01-01", periods=10, freq="1T") + psr = pd.Series(range(10), index=index) + gsr = cudf.from_pandas(psr) + assert_resample_results_equal( + psr.resample("3T").sum(), gsr.resample("3T").sum(), + ) + + +@pytest.mark.parametrize("rule", ["2S", "10S"]) +def test_series_resample_ffill(rule): + rng = pd.date_range("1/1/2012", periods=10, freq="5S") + ts = pd.Series(np.random.randint(0, 500, len(rng)), index=rng) + gts = cudf.from_pandas(ts) + assert_resample_results_equal( + ts.resample(rule).ffill(), gts.resample(rule).ffill() + ) + + +@pytest.mark.parametrize("rule", ["2S", "10S"]) +def test_series_resample_bfill(rule): + rng = pd.date_range("1/1/2012", periods=10, freq="5S") + ts = pd.Series(np.random.randint(0, 500, len(rng)), index=rng) + gts = cudf.from_pandas(ts) + assert_resample_results_equal( + ts.resample(rule).bfill(), gts.resample(rule).bfill() + ) + + +@pytest.mark.parametrize("rule", ["2S", "10S"]) +def test_series_resample_asfreq(rule): + rng = pd.date_range("1/1/2012", periods=100, freq="5S") + ts = pd.Series(np.random.randint(0, 500, len(rng)), index=rng) + gts = cudf.from_pandas(ts) + assert_resample_results_equal( + ts.resample(rule).asfreq(), gts.resample(rule).asfreq() + ) + + +def test_dataframe_resample_aggregation_simple(): + pdf = pd.DataFrame( + np.random.randn(1000, 3), + index=pd.date_range("1/1/2012", freq="S", periods=1000), + columns=["A", "B", "C"], + ) + gdf = cudf.from_pandas(pdf) + assert_resample_results_equal( + pdf.resample("3T").mean(), gdf.resample("3T").mean() + ) + + +def test_dataframe_resample_multiagg(): + pdf = pd.DataFrame( + np.random.randn(1000, 3), + index=pd.date_range("1/1/2012", freq="S", periods=1000), + columns=["A", "B", "C"], + ) + gdf = cudf.from_pandas(pdf) + assert_resample_results_equal( + pdf.resample("3T").agg(["sum", "mean", "std"]), + gdf.resample("3T").agg(["sum", "mean", "std"]), + ) + + +def test_dataframe_resample_on(): + # test resampling on a specified column + pdf = pd.DataFrame( + { + "x": np.random.randn(1000), + "y": pd.date_range("1/1/2012", freq="S", periods=1000), + } + ) + gdf = cudf.from_pandas(pdf) + assert_resample_results_equal( + pdf.resample("3T", on="y").mean(), gdf.resample("3T", on="y").mean() + ) + + +def test_dataframe_resample_level(): + # test resampling on a specific level of a MultIndex + pdf = pd.DataFrame( + { + "x": np.random.randn(1000), + "y": pd.date_range("1/1/2012", freq="S", periods=1000), + } + ) + pdi = pd.MultiIndex.from_frame(pdf) + pdf = pd.DataFrame({"a": np.random.randn(1000)}, index=pdi) + gdf = cudf.from_pandas(pdf) + assert_resample_results_equal( + pdf.resample("3T", level="y").mean(), + gdf.resample("3T", level="y").mean(), + ) + + +@pytest.mark.parametrize( + "in_freq, sampling_freq, out_freq", + [ + ("1ns", "1us", "us"), + ("1us", "10us", "us"), + ("ms", "100us", "us"), + ("ms", "1s", "s"), + ("s", "1T", "s"), + ("1T", "30s", "s"), + ("1D", "10D", "s"), + ("10D", "1D", "s"), + ], +) +def test_resampling_frequency_conversion(in_freq, sampling_freq, out_freq): + # test that we cast to the appropriate frequency + # when resampling: + pdf = pd.DataFrame( + { + "x": np.random.randn(100), + "y": pd.date_range("1/1/2012", freq=in_freq, periods=100), + } + ) + gdf = cudf.from_pandas(pdf) + expect = pdf.resample(sampling_freq, on="y").mean() + got = gdf.resample(sampling_freq, on="y").mean() + assert_resample_results_equal(expect, got) + + assert got.index.dtype == np.dtype(f"datetime64[{out_freq}]")