diff --git a/pandas/_libs/window.pyx b/pandas/_libs/window.pyx index d6bad0f20d7605..303b4f6f24eac2 100644 --- a/pandas/_libs/window.pyx +++ b/pandas/_libs/window.pyx @@ -96,280 +96,20 @@ def _check_minp(win, minp, N, floor=None) -> int: # Physical description: 366 p. # Series: Prentice-Hall Series in Automatic Computation -# ---------------------------------------------------------------------- -# The indexer objects for rolling -# These define start/end indexers to compute offsets - - -cdef class WindowIndexer: - - cdef: - ndarray start, end - int64_t N, minp, win - bint is_variable - - def get_data(self): - return (self.start, self.end, self.N, - self.win, self.minp, - self.is_variable) - - -cdef class MockFixedWindowIndexer(WindowIndexer): - """ - - We are just checking parameters of the indexer, - and returning a consistent API with fixed/variable - indexers. - - Parameters - ---------- - values: ndarray - values data array - win: int64_t - window size - minp: int64_t - min number of obs in a window to consider non-NaN - index: object - index of the values - floor: optional - unit for flooring - left_closed: bint - left endpoint closedness - right_closed: bint - right endpoint closedness - - """ - def __init__(self, ndarray values, int64_t win, int64_t minp, - bint left_closed, bint right_closed, - object index=None, object floor=None): - - assert index is None - self.is_variable = 0 - self.N = len(values) - self.minp = _check_minp(win, minp, self.N, floor=floor) - self.start = np.empty(0, dtype='int64') - self.end = np.empty(0, dtype='int64') - self.win = win - - -cdef class FixedWindowIndexer(WindowIndexer): - """ - create a fixed length window indexer object - that has start & end, that point to offsets in - the index object; these are defined based on the win - arguments - - Parameters - ---------- - values: ndarray - values data array - win: int64_t - window size - minp: int64_t - min number of obs in a window to consider non-NaN - index: object - index of the values - floor: optional - unit for flooring the unit - left_closed: bint - left endpoint closedness - right_closed: bint - right endpoint closedness - - """ - def __init__(self, ndarray values, int64_t win, int64_t minp, - bint left_closed, bint right_closed, - object index=None, object floor=None): - cdef: - ndarray[int64_t] start_s, start_e, end_s, end_e - - assert index is None - self.is_variable = 0 - self.N = len(values) - self.minp = _check_minp(win, minp, self.N, floor=floor) - - start_s = np.zeros(win, dtype='int64') - start_e = np.arange(win, self.N, dtype='int64') - win + 1 - self.start = np.concatenate([start_s, start_e]) - - end_s = np.arange(win, dtype='int64') + 1 - end_e = start_e + win - self.end = np.concatenate([end_s, end_e]) - self.win = win - - -cdef class VariableWindowIndexer(WindowIndexer): - """ - create a variable length window indexer object - that has start & end, that point to offsets in - the index object; these are defined based on the win - arguments - - Parameters - ---------- - values: ndarray - values data array - win: int64_t - window size - minp: int64_t - min number of obs in a window to consider non-NaN - index: ndarray - index of the values - left_closed: bint - left endpoint closedness - True if the left endpoint is closed, False if open - right_closed: bint - right endpoint closedness - True if the right endpoint is closed, False if open - floor: optional - unit for flooring the unit - """ - def __init__(self, ndarray values, int64_t win, int64_t minp, - bint left_closed, bint right_closed, ndarray index, - object floor=None): - - self.is_variable = 1 - self.N = len(index) - self.minp = _check_minp(win, minp, self.N, floor=floor) - - self.start = np.empty(self.N, dtype='int64') - self.start.fill(-1) - - self.end = np.empty(self.N, dtype='int64') - self.end.fill(-1) - - self.build(index, win, left_closed, right_closed) - - # max window size - self.win = (self.end - self.start).max() - - def build(self, const int64_t[:] index, int64_t win, bint left_closed, - bint right_closed): - - cdef: - ndarray[int64_t] start, end - int64_t start_bound, end_bound, N - Py_ssize_t i, j - - start = self.start - end = self.end - N = self.N - - start[0] = 0 - - # right endpoint is closed - if right_closed: - end[0] = 1 - # right endpoint is open - else: - end[0] = 0 - - with nogil: - - # start is start of slice interval (including) - # end is end of slice interval (not including) - for i in range(1, N): - end_bound = index[i] - start_bound = index[i] - win - - # left endpoint is closed - if left_closed: - start_bound -= 1 - - # advance the start bound until we are - # within the constraint - start[i] = i - for j in range(start[i - 1], i): - if index[j] > start_bound: - start[i] = j - break - - # end bound is previous end - # or current index - if index[end[i - 1]] <= end_bound: - end[i] = i + 1 - else: - end[i] = end[i - 1] - - # right endpoint is open - if not right_closed: - end[i] -= 1 - - -def get_window_indexer(values, win, minp, index, closed, - floor=None, use_mock=True): - """ - Return the correct window indexer for the computation. - - Parameters - ---------- - values: 1d ndarray - win: integer, window size - minp: integer, minimum periods - index: 1d ndarray, optional - index to the values array - closed: string, default None - {'right', 'left', 'both', 'neither'} - window endpoint closedness. Defaults to 'right' in - VariableWindowIndexer and to 'both' in FixedWindowIndexer - floor: optional - unit for flooring the unit - use_mock: boolean, default True - if we are a fixed indexer, return a mock indexer - instead of the FixedWindow Indexer. This is a type - compat Indexer that allows us to use a standard - code path with all of the indexers. - - Returns - ------- - tuple of 1d int64 ndarrays of the offsets & data about the window - - """ - - cdef: - bint left_closed = False - bint right_closed = False - - assert closed is None or closed in ['right', 'left', 'both', 'neither'] - - # if windows is variable, default is 'right', otherwise default is 'both' - if closed is None: - closed = 'right' if index is not None else 'both' - - if closed in ['right', 'both']: - right_closed = True - - if closed in ['left', 'both']: - left_closed = True - - if index is not None: - indexer = VariableWindowIndexer(values, win, minp, left_closed, - right_closed, index, floor) - elif use_mock: - indexer = MockFixedWindowIndexer(values, win, minp, left_closed, - right_closed, index, floor) - else: - indexer = FixedWindowIndexer(values, win, minp, left_closed, - right_closed, index, floor) - return indexer.get_data() - # ---------------------------------------------------------------------- # Rolling count # this is only an impl for index not None, IOW, freq aware -def roll_count(ndarray[float64_t] values, int64_t win, int64_t minp, - object index, object closed): +def roll_count(ndarray[float64_t] values, ndarray[int64_t] start, ndarray[int64_t] end, + int64_t minp): cdef: float64_t val, count_x = 0.0 - int64_t s, e, nobs, N + int64_t s, e, nobs, N = len(values) Py_ssize_t i, j - int64_t[:] start, end ndarray[float64_t] output - start, end, N, win, minp, _ = get_window_indexer(values, win, - minp, index, closed) output = np.empty(N, dtype=float) with nogil: @@ -442,80 +182,75 @@ cdef inline void remove_sum(float64_t val, int64_t *nobs, float64_t *sum_x) nogi sum_x[0] = sum_x[0] - val -def roll_sum(ndarray[float64_t] values, int64_t win, int64_t minp, - object index, object closed): +def roll_sum_variable(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp): cdef: - float64_t val, prev_x, sum_x = 0 - int64_t s, e, range_endpoint - int64_t nobs = 0, i, j, N - bint is_variable - int64_t[:] start, end + float64_t sum_x = 0 + int64_t s, e + int64_t nobs = 0, i, j, N = len(values) ndarray[float64_t] output - start, end, N, win, minp, is_variable = get_window_indexer(values, win, - minp, index, - closed, - floor=0) output = np.empty(N, dtype=float) - # for performance we are going to iterate - # fixed windows separately, makes the code more complex as we have 2 paths - # but is faster + with nogil: - if is_variable: + for i in range(0, N): + s = start[i] + e = end[i] - # variable window - with nogil: + if i == 0: - for i in range(0, N): - s = start[i] - e = end[i] + # setup + sum_x = 0.0 + nobs = 0 + for j in range(s, e): + add_sum(values[j], &nobs, &sum_x) - if i == 0: + else: - # setup - sum_x = 0.0 - nobs = 0 - for j in range(s, e): - add_sum(values[j], &nobs, &sum_x) + # calculate deletes + for j in range(start[i - 1], s): + remove_sum(values[j], &nobs, &sum_x) - else: + # calculate adds + for j in range(end[i - 1], e): + add_sum(values[j], &nobs, &sum_x) - # calculate deletes - for j in range(start[i - 1], s): - remove_sum(values[j], &nobs, &sum_x) + output[i] = calc_sum(minp, nobs, sum_x) - # calculate adds - for j in range(end[i - 1], e): - add_sum(values[j], &nobs, &sum_x) + return output - output[i] = calc_sum(minp, nobs, sum_x) - else: +def roll_sum_fixed(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp, int64_t win): + cdef: + float64_t val, prev_x, sum_x = 0 + int64_t range_endpoint + int64_t nobs = 0, i, N = len(values) + ndarray[float64_t] output - # fixed window + output = np.empty(N, dtype=float) - range_endpoint = int_max(minp, 1) - 1 + range_endpoint = int_max(minp, 1) - 1 - with nogil: + with nogil: - for i in range(0, range_endpoint): - add_sum(values[i], &nobs, &sum_x) - output[i] = NaN + for i in range(0, range_endpoint): + add_sum(values[i], &nobs, &sum_x) + output[i] = NaN - for i in range(range_endpoint, N): - val = values[i] - add_sum(val, &nobs, &sum_x) + for i in range(range_endpoint, N): + val = values[i] + add_sum(val, &nobs, &sum_x) - if i > win - 1: - prev_x = values[i - win] - remove_sum(prev_x, &nobs, &sum_x) + if i > win - 1: + prev_x = values[i - win] + remove_sum(prev_x, &nobs, &sum_x) - output[i] = calc_sum(minp, nobs, sum_x) + output[i] = calc_sum(minp, nobs, sum_x) return output - # ---------------------------------------------------------------------- # Rolling mean @@ -563,77 +298,75 @@ cdef inline void remove_mean(float64_t val, Py_ssize_t *nobs, float64_t *sum_x, neg_ct[0] = neg_ct[0] - 1 -def roll_mean(ndarray[float64_t] values, int64_t win, int64_t minp, - object index, object closed): +def roll_mean_fixed(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp, int64_t win): cdef: - float64_t val, prev_x, result, sum_x = 0 - int64_t s, e - bint is_variable - Py_ssize_t nobs = 0, i, j, neg_ct = 0, N - int64_t[:] start, end + float64_t val, prev_x, sum_x = 0 + Py_ssize_t nobs = 0, i, neg_ct = 0, N = len(values) ndarray[float64_t] output - start, end, N, win, minp, is_variable = get_window_indexer(values, win, - minp, index, - closed) output = np.empty(N, dtype=float) - # for performance we are going to iterate - # fixed windows separately, makes the code more complex as we have 2 paths - # but is faster + with nogil: + for i in range(minp - 1): + val = values[i] + add_mean(val, &nobs, &sum_x, &neg_ct) + output[i] = NaN + + for i in range(minp - 1, N): + val = values[i] + add_mean(val, &nobs, &sum_x, &neg_ct) - if is_variable: + if i > win - 1: + prev_x = values[i - win] + remove_mean(prev_x, &nobs, &sum_x, &neg_ct) - with nogil: + output[i] = calc_mean(minp, nobs, neg_ct, sum_x) - for i in range(0, N): - s = start[i] - e = end[i] + return output - if i == 0: - # setup - sum_x = 0.0 - nobs = 0 - for j in range(s, e): - val = values[j] - add_mean(val, &nobs, &sum_x, &neg_ct) +def roll_mean_variable(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp): + cdef: + float64_t val, sum_x = 0 + int64_t s, e + Py_ssize_t nobs = 0, i, j, neg_ct = 0, N = len(values) + ndarray[float64_t] output - else: + output = np.empty(N, dtype=float) - # calculate deletes - for j in range(start[i - 1], s): - val = values[j] - remove_mean(val, &nobs, &sum_x, &neg_ct) + with nogil: - # calculate adds - for j in range(end[i - 1], e): - val = values[j] - add_mean(val, &nobs, &sum_x, &neg_ct) + for i in range(0, N): + s = start[i] + e = end[i] - output[i] = calc_mean(minp, nobs, neg_ct, sum_x) + if i == 0: - else: + # setup + sum_x = 0.0 + nobs = 0 + for j in range(s, e): + val = values[j] + add_mean(val, &nobs, &sum_x, &neg_ct) - with nogil: - for i in range(minp - 1): - val = values[i] - add_mean(val, &nobs, &sum_x, &neg_ct) - output[i] = NaN + else: - for i in range(minp - 1, N): - val = values[i] - add_mean(val, &nobs, &sum_x, &neg_ct) + # calculate deletes + for j in range(start[i - 1], s): + val = values[j] + remove_mean(val, &nobs, &sum_x, &neg_ct) - if i > win - 1: - prev_x = values[i - win] - remove_mean(prev_x, &nobs, &sum_x, &neg_ct) + # calculate adds + for j in range(end[i - 1], e): + val = values[j] + add_mean(val, &nobs, &sum_x, &neg_ct) - output[i] = calc_mean(minp, nobs, neg_ct, sum_x) + output[i] = calc_mean(minp, nobs, neg_ct, sum_x) return output - # ---------------------------------------------------------------------- # Rolling variance @@ -696,8 +429,8 @@ cdef inline void remove_var(float64_t val, float64_t *nobs, float64_t *mean_x, ssqdm_x[0] = 0 -def roll_var(ndarray[float64_t] values, int64_t win, int64_t minp, - object index, object closed, int ddof=1): +def roll_var_fixed(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp, int64_t win, int ddof=1): """ Numerically stable implementation using Welford's method. """ @@ -705,98 +438,102 @@ def roll_var(ndarray[float64_t] values, int64_t win, int64_t minp, float64_t mean_x = 0, ssqdm_x = 0, nobs = 0, float64_t val, prev, delta, mean_x_old int64_t s, e - bint is_variable - Py_ssize_t i, j, N - int64_t[:] start, end + Py_ssize_t i, j, N = len(values) ndarray[float64_t] output - start, end, N, win, minp, is_variable = get_window_indexer(values, win, - minp, index, - closed) output = np.empty(N, dtype=float) # Check for windows larger than array, addresses #7297 win = min(win, N) - # for performance we are going to iterate - # fixed windows separately, makes the code more complex as we - # have 2 paths but is faster + with nogil: - if is_variable: + # Over the first window, observations can only be added, never + # removed + for i in range(win): + add_var(values[i], &nobs, &mean_x, &ssqdm_x) + output[i] = calc_var(minp, ddof, nobs, ssqdm_x) - with nogil: + # a part of Welford's method for the online variance-calculation + # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance - for i in range(0, N): + # After the first window, observations can both be added and + # removed + for i in range(win, N): + val = values[i] + prev = values[i - win] - s = start[i] - e = end[i] + if notnan(val): + if prev == prev: - # Over the first window, observations can only be added - # never removed - if i == 0: + # Adding one observation and removing another one + delta = val - prev + mean_x_old = mean_x - for j in range(s, e): - add_var(values[j], &nobs, &mean_x, &ssqdm_x) + mean_x += delta / nobs + ssqdm_x += ((nobs - 1) * val + + (nobs + 1) * prev + - 2 * nobs * mean_x_old) * delta / nobs else: + add_var(val, &nobs, &mean_x, &ssqdm_x) + elif prev == prev: + remove_var(prev, &nobs, &mean_x, &ssqdm_x) - # After the first window, observations can both be added - # and removed + output[i] = calc_var(minp, ddof, nobs, ssqdm_x) + + return output - # calculate adds - for j in range(end[i - 1], e): - add_var(values[j], &nobs, &mean_x, &ssqdm_x) - # calculate deletes - for j in range(start[i - 1], s): - remove_var(values[j], &nobs, &mean_x, &ssqdm_x) +def roll_var_variable(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp, int ddof=1): + """ + Numerically stable implementation using Welford's method. + """ + cdef: + float64_t mean_x = 0, ssqdm_x = 0, nobs = 0, + float64_t val, prev, delta, mean_x_old + int64_t s, e + Py_ssize_t i, j, N = len(values) + ndarray[float64_t] output - output[i] = calc_var(minp, ddof, nobs, ssqdm_x) + output = np.empty(N, dtype=float) - else: + with nogil: - with nogil: + for i in range(0, N): - # Over the first window, observations can only be added, never - # removed - for i in range(win): - add_var(values[i], &nobs, &mean_x, &ssqdm_x) - output[i] = calc_var(minp, ddof, nobs, ssqdm_x) + s = start[i] + e = end[i] - # a part of Welford's method for the online variance-calculation - # https://en.wikipedia.org/wiki/Algorithms_for_calculating_variance + # Over the first window, observations can only be added + # never removed + if i == 0: - # After the first window, observations can both be added and - # removed - for i in range(win, N): - val = values[i] - prev = values[i - win] + for j in range(s, e): + add_var(values[j], &nobs, &mean_x, &ssqdm_x) - if notnan(val): - if prev == prev: + else: - # Adding one observation and removing another one - delta = val - prev - mean_x_old = mean_x + # After the first window, observations can both be added + # and removed - mean_x += delta / nobs - ssqdm_x += ((nobs - 1) * val - + (nobs + 1) * prev - - 2 * nobs * mean_x_old) * delta / nobs + # calculate adds + for j in range(end[i - 1], e): + add_var(values[j], &nobs, &mean_x, &ssqdm_x) - else: - add_var(val, &nobs, &mean_x, &ssqdm_x) - elif prev == prev: - remove_var(prev, &nobs, &mean_x, &ssqdm_x) + # calculate deletes + for j in range(start[i - 1], s): + remove_var(values[j], &nobs, &mean_x, &ssqdm_x) - output[i] = calc_var(minp, ddof, nobs, ssqdm_x) + output[i] = calc_var(minp, ddof, nobs, ssqdm_x) return output - # ---------------------------------------------------------------------- # Rolling skewness + cdef inline float64_t calc_skew(int64_t minp, int64_t nobs, float64_t x, float64_t xx, float64_t xxx) nogil: @@ -861,76 +598,80 @@ cdef inline void remove_skew(float64_t val, int64_t *nobs, xxx[0] = xxx[0] - val * val * val -def roll_skew(ndarray[float64_t] values, int64_t win, int64_t minp, - object index, object closed): +def roll_skew_fixed(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp, int64_t win): cdef: float64_t val, prev float64_t x = 0, xx = 0, xxx = 0 - int64_t nobs = 0, i, j, N + int64_t nobs = 0, i, j, N = len(values) int64_t s, e - bint is_variable - int64_t[:] start, end ndarray[float64_t] output - start, end, N, win, minp, is_variable = get_window_indexer(values, win, - minp, index, - closed) output = np.empty(N, dtype=float) - if is_variable: + with nogil: + for i in range(minp - 1): + val = values[i] + add_skew(val, &nobs, &x, &xx, &xxx) + output[i] = NaN - with nogil: + for i in range(minp - 1, N): + val = values[i] + add_skew(val, &nobs, &x, &xx, &xxx) - for i in range(0, N): + if i > win - 1: + prev = values[i - win] + remove_skew(prev, &nobs, &x, &xx, &xxx) - s = start[i] - e = end[i] + output[i] = calc_skew(minp, nobs, x, xx, xxx) - # Over the first window, observations can only be added - # never removed - if i == 0: + return output - for j in range(s, e): - val = values[j] - add_skew(val, &nobs, &x, &xx, &xxx) - else: +def roll_skew_variable(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp): + cdef: + float64_t val, prev + float64_t x = 0, xx = 0, xxx = 0 + int64_t nobs = 0, i, j, N = len(values) + int64_t s, e + ndarray[float64_t] output + + output = np.empty(N, dtype=float) - # After the first window, observations can both be added - # and removed + with nogil: - # calculate adds - for j in range(end[i - 1], e): - val = values[j] - add_skew(val, &nobs, &x, &xx, &xxx) + for i in range(0, N): - # calculate deletes - for j in range(start[i - 1], s): - val = values[j] - remove_skew(val, &nobs, &x, &xx, &xxx) + s = start[i] + e = end[i] - output[i] = calc_skew(minp, nobs, x, xx, xxx) + # Over the first window, observations can only be added + # never removed + if i == 0: - else: + for j in range(s, e): + val = values[j] + add_skew(val, &nobs, &x, &xx, &xxx) - with nogil: - for i in range(minp - 1): - val = values[i] - add_skew(val, &nobs, &x, &xx, &xxx) - output[i] = NaN + else: - for i in range(minp - 1, N): - val = values[i] - add_skew(val, &nobs, &x, &xx, &xxx) + # After the first window, observations can both be added + # and removed - if i > win - 1: - prev = values[i - win] - remove_skew(prev, &nobs, &x, &xx, &xxx) + # calculate adds + for j in range(end[i - 1], e): + val = values[j] + add_skew(val, &nobs, &x, &xx, &xxx) - output[i] = calc_skew(minp, nobs, x, xx, xxx) + # calculate deletes + for j in range(start[i - 1], s): + val = values[j] + remove_skew(val, &nobs, &x, &xx, &xxx) - return output + output[i] = calc_skew(minp, nobs, x, xx, xxx) + return output # ---------------------------------------------------------------------- # Rolling kurtosis @@ -1005,69 +746,73 @@ cdef inline void remove_kurt(float64_t val, int64_t *nobs, xxxx[0] = xxxx[0] - val * val * val * val -def roll_kurt(ndarray[float64_t] values, int64_t win, int64_t minp, - object index, object closed): +def roll_kurt_fixed(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp, int64_t win): cdef: float64_t val, prev float64_t x = 0, xx = 0, xxx = 0, xxxx = 0 - int64_t nobs = 0, i, j, N + int64_t nobs = 0, i, j, N = len(values) int64_t s, e - bint is_variable - int64_t[:] start, end ndarray[float64_t] output - start, end, N, win, minp, is_variable = get_window_indexer(values, win, - minp, index, - closed) output = np.empty(N, dtype=float) - if is_variable: + with nogil: - with nogil: + for i in range(minp - 1): + add_kurt(values[i], &nobs, &x, &xx, &xxx, &xxxx) + output[i] = NaN - for i in range(0, N): + for i in range(minp - 1, N): + add_kurt(values[i], &nobs, &x, &xx, &xxx, &xxxx) - s = start[i] - e = end[i] + if i > win - 1: + prev = values[i - win] + remove_kurt(prev, &nobs, &x, &xx, &xxx, &xxxx) - # Over the first window, observations can only be added - # never removed - if i == 0: + output[i] = calc_kurt(minp, nobs, x, xx, xxx, xxxx) - for j in range(s, e): - add_kurt(values[j], &nobs, &x, &xx, &xxx, &xxxx) + return output - else: - # After the first window, observations can both be added - # and removed +def roll_kurt_variable(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp): + cdef: + float64_t val, prev + float64_t x = 0, xx = 0, xxx = 0, xxxx = 0 + int64_t nobs = 0, i, j, s, e, N = len(values) + ndarray[float64_t] output - # calculate adds - for j in range(end[i - 1], e): - add_kurt(values[j], &nobs, &x, &xx, &xxx, &xxxx) + output = np.empty(N, dtype=float) - # calculate deletes - for j in range(start[i - 1], s): - remove_kurt(values[j], &nobs, &x, &xx, &xxx, &xxxx) + with nogil: - output[i] = calc_kurt(minp, nobs, x, xx, xxx, xxxx) + for i in range(0, N): - else: + s = start[i] + e = end[i] - with nogil: + # Over the first window, observations can only be added + # never removed + if i == 0: - for i in range(minp - 1): - add_kurt(values[i], &nobs, &x, &xx, &xxx, &xxxx) - output[i] = NaN + for j in range(s, e): + add_kurt(values[j], &nobs, &x, &xx, &xxx, &xxxx) - for i in range(minp - 1, N): - add_kurt(values[i], &nobs, &x, &xx, &xxx, &xxxx) + else: - if i > win - 1: - prev = values[i - win] - remove_kurt(prev, &nobs, &x, &xx, &xxx, &xxxx) + # After the first window, observations can both be added + # and removed - output[i] = calc_kurt(minp, nobs, x, xx, xxx, xxxx) + # calculate adds + for j in range(end[i - 1], e): + add_kurt(values[j], &nobs, &x, &xx, &xxx, &xxxx) + + # calculate deletes + for j in range(start[i - 1], s): + remove_kurt(values[j], &nobs, &x, &xx, &xxx, &xxxx) + + output[i] = calc_kurt(minp, nobs, x, xx, xxx, xxxx) return output @@ -1076,31 +821,26 @@ def roll_kurt(ndarray[float64_t] values, int64_t win, int64_t minp, # Rolling median, min, max -def roll_median_c(ndarray[float64_t] values, int64_t win, int64_t minp, - object index, object closed): +def roll_median_c(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp, int64_t win): cdef: float64_t val, res, prev - bint err = 0, is_variable + bint err = 0 int ret = 0 skiplist_t *sl Py_ssize_t i, j - int64_t nobs = 0, N, s, e + int64_t nobs = 0, N = len(values), s, e int midpoint - int64_t[:] start, end ndarray[float64_t] output # we use the Fixed/Variable Indexer here as the # actual skiplist ops outweigh any window computation costs - start, end, N, win, minp, is_variable = get_window_indexer( - values, win, - minp, index, closed, - use_mock=False) output = np.empty(N, dtype=float) - if win == 0: + if win == 0 or (end - start).max() == 0: output[:] = NaN return output - + win = (end - start).max() sl = skiplist_init(win) if sl == NULL: raise MemoryError("skiplist_init failed") @@ -1209,76 +949,89 @@ cdef inline numeric calc_mm(int64_t minp, Py_ssize_t nobs, return result -def roll_max(ndarray[numeric] values, int64_t win, int64_t minp, - object index, object closed): +def roll_max_fixed(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp, int64_t win): """ Moving max of 1d array of any numeric type along axis=0 ignoring NaNs. Parameters ---------- - values: numpy array - window: int, size of rolling window - minp: if number of observations in window + values : np.ndarray[np.float64] + window : int, size of rolling window + minp : if number of observations in window is below this, output a NaN - index: ndarray, optional + index : ndarray, optional index for window computation - closed: 'right', 'left', 'both', 'neither' + closed : 'right', 'left', 'both', 'neither' make the interval closed on the right, left, both or neither endpoints """ - return _roll_min_max(values, win, minp, index, closed=closed, is_max=1) + return _roll_min_max_fixed(values, start, end, minp, win, is_max=1) -def roll_min(ndarray[numeric] values, int64_t win, int64_t minp, - object index, object closed): +def roll_max_variable(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp): """ Moving max of 1d array of any numeric type along axis=0 ignoring NaNs. Parameters ---------- - values: numpy array - window: int, size of rolling window - minp: if number of observations in window + values : np.ndarray[np.float64] + window : int, size of rolling window + minp : if number of observations in window is below this, output a NaN - index: ndarray, optional + index : ndarray, optional index for window computation + closed : 'right', 'left', 'both', 'neither' + make the interval closed on the right, left, + both or neither endpoints """ - return _roll_min_max(values, win, minp, index, is_max=0, closed=closed) + return _roll_min_max_variable(values, start, end, minp, is_max=1) -cdef _roll_min_max(ndarray[numeric] values, int64_t win, int64_t minp, - object index, object closed, bint is_max): +def roll_min_fixed(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp, int64_t win): """ - Moving min/max of 1d array of any numeric type along axis=0 - ignoring NaNs. + Moving max of 1d array of any numeric type along axis=0 ignoring NaNs. + + Parameters + ---------- + values : np.ndarray[np.float64] + window : int, size of rolling window + minp : if number of observations in window + is below this, output a NaN + index : ndarray, optional + index for window computation """ - cdef: - ndarray[int64_t] starti, endi - int64_t N - bint is_variable + return _roll_min_max_fixed(values, start, end, minp, win, is_max=0) - starti, endi, N, win, minp, is_variable = get_window_indexer( - values, win, - minp, index, closed) - if is_variable: - return _roll_min_max_variable(values, starti, endi, N, win, minp, - is_max) - else: - return _roll_min_max_fixed(values, N, win, minp, is_max) +def roll_min_variable(ndarray[float64_t] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp): + """ + Moving max of 1d array of any numeric type along axis=0 ignoring NaNs. + + Parameters + ---------- + values : np.ndarray[np.float64] + window : int, size of rolling window + minp : if number of observations in window + is below this, output a NaN + index : ndarray, optional + index for window computation + """ + return _roll_min_max_variable(values, start, end, minp, is_max=0) cdef _roll_min_max_variable(ndarray[numeric] values, ndarray[int64_t] starti, ndarray[int64_t] endi, - int64_t N, - int64_t win, int64_t minp, bint is_max): cdef: numeric ai int64_t i, close_offset, curr_win_size - Py_ssize_t nobs = 0 + Py_ssize_t nobs = 0, N = len(values) deque Q[int64_t] # min/max always the front deque W[int64_t] # track the whole window for nobs compute ndarray[float64_t, ndim=1] output @@ -1353,15 +1106,16 @@ cdef _roll_min_max_variable(ndarray[numeric] values, cdef _roll_min_max_fixed(ndarray[numeric] values, - int64_t N, - int64_t win, + ndarray[int64_t] starti, + ndarray[int64_t] endi, int64_t minp, + int64_t win, bint is_max): cdef: numeric ai bint should_replace int64_t i, removed, window_i, - Py_ssize_t nobs = 0 + Py_ssize_t nobs = 0, N = len(values) int64_t* death numeric* ring numeric* minvalue @@ -1457,8 +1211,8 @@ interpolation_types = { } -def roll_quantile(ndarray[float64_t, cast=True] values, int64_t win, - int64_t minp, object index, object closed, +def roll_quantile(ndarray[float64_t, cast=True] values, ndarray[int64_t] start, + ndarray[int64_t] end, int64_t minp, int64_t win, float64_t quantile, str interpolation): """ O(N log(window)) implementation using skip list @@ -1466,10 +1220,8 @@ def roll_quantile(ndarray[float64_t, cast=True] values, int64_t win, cdef: float64_t val, prev, midpoint, idx_with_fraction skiplist_t *skiplist - int64_t nobs = 0, i, j, s, e, N + int64_t nobs = 0, i, j, s, e, N = len(values) Py_ssize_t idx - bint is_variable - int64_t[:] start, end ndarray[float64_t] output float64_t vlow, vhigh InterpolationType interpolation_type @@ -1485,16 +1237,12 @@ def roll_quantile(ndarray[float64_t, cast=True] values, int64_t win, # we use the Fixed/Variable Indexer here as the # actual skiplist ops outweigh any window computation costs - start, end, N, win, minp, is_variable = get_window_indexer( - values, win, - minp, index, closed, - use_mock=False) output = np.empty(N, dtype=float) - if win == 0: + if win == 0 or (end - start).max() == 0: output[:] = NaN return output - + win = (end - start).max() skiplist = skiplist_init(win) if skiplist == NULL: raise MemoryError("skiplist_init failed") @@ -1575,18 +1323,17 @@ def roll_quantile(ndarray[float64_t, cast=True] values, int64_t win, return output -def roll_generic(object obj, - int64_t win, int64_t minp, object index, object closed, - int offset, object func, bint raw, - object args, object kwargs): +def roll_generic_fixed(object obj, + ndarray[int64_t] start, ndarray[int64_t] end, + int64_t minp, int64_t win, + int offset, object func, bint raw, + object args, object kwargs): cdef: ndarray[float64_t] output, counts, bufarr ndarray[float64_t, cast=True] arr float64_t *buf float64_t *oldbuf - int64_t nobs = 0, i, j, s, e, N - bint is_variable - int64_t[:] start, end + int64_t nobs = 0, i, j, s, e, N = len(start) n = len(obj) if n == 0: @@ -1599,36 +1346,13 @@ def roll_generic(object obj, if not arr.flags.c_contiguous: arr = arr.copy('C') - counts = roll_sum(np.concatenate([np.isfinite(arr).astype(float), - np.array([0.] * offset)]), - win, minp, index, closed)[offset:] - - start, end, N, win, minp, is_variable = get_window_indexer(arr, win, - minp, index, - closed, - floor=0) + counts = roll_sum_fixed(np.concatenate([np.isfinite(arr).astype(float), + np.array([0.] * offset)]), + start, end, minp, win)[offset:] output = np.empty(N, dtype=float) - if is_variable: - # variable window arr or series - - if offset != 0: - raise ValueError("unable to roll_generic with a non-zero offset") - - for i in range(0, N): - s = start[i] - e = end[i] - - if counts[i] >= minp: - if raw: - output[i] = func(arr[s:e], *args, **kwargs) - else: - output[i] = func(obj.iloc[s:e], *args, **kwargs) - else: - output[i] = NaN - - elif not raw: + if not raw: # series for i in range(N): if counts[i] >= minp: @@ -1672,6 +1396,53 @@ def roll_generic(object obj, return output +def roll_generic_variable(object obj, + ndarray[int64_t] start, ndarray[int64_t] end, + int64_t minp, + int offset, object func, bint raw, + object args, object kwargs): + cdef: + ndarray[float64_t] output, counts, bufarr + ndarray[float64_t, cast=True] arr + float64_t *buf + float64_t *oldbuf + int64_t nobs = 0, i, j, s, e, N = len(start) + + n = len(obj) + if n == 0: + return obj + + arr = np.asarray(obj) + + # ndarray input + if raw: + if not arr.flags.c_contiguous: + arr = arr.copy('C') + + counts = roll_sum_variable(np.concatenate([np.isfinite(arr).astype(float), + np.array([0.] * offset)]), + start, end, minp)[offset:] + + output = np.empty(N, dtype=float) + + if offset != 0: + raise ValueError("unable to roll_generic with a non-zero offset") + + for i in range(0, N): + s = start[i] + e = end[i] + + if counts[i] >= minp: + if raw: + output[i] = func(arr[s:e], *args, **kwargs) + else: + output[i] = func(obj.iloc[s:e], *args, **kwargs) + else: + output[i] = NaN + + return output + + # ---------------------------------------------------------------------- # Rolling sum and mean for weighted window diff --git a/pandas/_libs/window_indexer.pyx b/pandas/_libs/window_indexer.pyx new file mode 100644 index 00000000000000..8f49a8b9462d3e --- /dev/null +++ b/pandas/_libs/window_indexer.pyx @@ -0,0 +1,165 @@ +# cython: boundscheck=False, wraparound=False, cdivision=True + +import numpy as np +from numpy cimport ndarray, int64_t + +# ---------------------------------------------------------------------- +# The indexer objects for rolling +# These define start/end indexers to compute offsets + + +class MockFixedWindowIndexer: + """ + + We are just checking parameters of the indexer, + and returning a consistent API with fixed/variable + indexers. + + Parameters + ---------- + values: ndarray + values data array + win: int64_t + window size + index: object + index of the values + closed: string + closed behavior + """ + def __init__(self, ndarray values, int64_t win, object closed, object index=None): + + self.start = np.empty(0, dtype='int64') + self.end = np.empty(0, dtype='int64') + + def get_window_bounds(self): + return self.start, self.end + + +class FixedWindowIndexer: + """ + create a fixed length window indexer object + that has start & end, that point to offsets in + the index object; these are defined based on the win + arguments + + Parameters + ---------- + values: ndarray + values data array + win: int64_t + window size + index: object + index of the values + closed: string + closed behavior + """ + def __init__(self, ndarray values, int64_t win, object closed, object index=None): + cdef: + ndarray[int64_t, ndim=1] start_s, start_e, end_s, end_e + int64_t N = len(values) + + start_s = np.zeros(win, dtype='int64') + start_e = np.arange(win, N, dtype='int64') - win + 1 + self.start = np.concatenate([start_s, start_e])[:N] + + end_s = np.arange(win, dtype='int64') + 1 + end_e = start_e + win + self.end = np.concatenate([end_s, end_e])[:N] + + def get_window_bounds(self): + return self.start, self.end + + +class VariableWindowIndexer: + """ + create a variable length window indexer object + that has start & end, that point to offsets in + the index object; these are defined based on the win + arguments + + Parameters + ---------- + values: ndarray + values data array + win: int64_t + window size + index: ndarray + index of the values + closed: string + closed behavior + """ + def __init__(self, ndarray values, int64_t win, object closed, ndarray index): + cdef: + bint left_closed = False + bint right_closed = False + int64_t N = len(index) + + # if windows is variable, default is 'right', otherwise default is 'both' + if closed is None: + closed = 'right' if index is not None else 'both' + + if closed in ['right', 'both']: + right_closed = True + + if closed in ['left', 'both']: + left_closed = True + + self.start, self.end = self.build(index, win, left_closed, right_closed, N) + + @staticmethod + def build(const int64_t[:] index, int64_t win, bint left_closed, + bint right_closed, int64_t N): + + cdef: + ndarray[int64_t] start, end + int64_t start_bound, end_bound + Py_ssize_t i, j + + start = np.empty(N, dtype='int64') + start.fill(-1) + end = np.empty(N, dtype='int64') + end.fill(-1) + + start[0] = 0 + + # right endpoint is closed + if right_closed: + end[0] = 1 + # right endpoint is open + else: + end[0] = 0 + + with nogil: + + # start is start of slice interval (including) + # end is end of slice interval (not including) + for i in range(1, N): + end_bound = index[i] + start_bound = index[i] - win + + # left endpoint is closed + if left_closed: + start_bound -= 1 + + # advance the start bound until we are + # within the constraint + start[i] = i + for j in range(start[i - 1], i): + if index[j] > start_bound: + start[i] = j + break + + # end bound is previous end + # or current index + if index[end[i - 1]] <= end_bound: + end[i] = i + 1 + else: + end[i] = end[i - 1] + + # right endpoint is open + if not right_closed: + end[i] -= 1 + return start, end + + def get_window_bounds(self): + return self.start, self.end diff --git a/pandas/core/window/common.py b/pandas/core/window/common.py index 3fd567f97edaec..453fd124955432 100644 --- a/pandas/core/window/common.py +++ b/pandas/core/window/common.py @@ -1,5 +1,6 @@ """Common utility functions for rolling operations""" from collections import defaultdict +from typing import Callable, Optional import warnings import numpy as np @@ -62,12 +63,20 @@ def __init__(self, obj, *args, **kwargs): cov = _dispatch("cov", other=None, pairwise=None) def _apply( - self, func, name=None, window=None, center=None, check_minp=None, **kwargs + self, + func: Callable, + center: bool, + require_min_periods: int = 0, + floor: int = 1, + is_weighted: bool = False, + name: Optional[str] = None, + **kwargs, ): """ Dispatch to apply; we are stripping all of the _apply kwargs and performing the original function call on the grouped object. """ + kwargs.pop("floor", None) # TODO: can we de-duplicate with _dispatch? def f(x, name=name, *args): @@ -267,6 +276,44 @@ def _use_window(minp, window): return minp +def calculate_min_periods( + window: int, + min_periods: Optional[int], + num_values: int, + required_min_periods: int, + floor: int, +) -> int: + """ + Calculates final minimum periods value for rolling aggregations. + + Parameters + ---------- + window : passed window value + min_periods : passed min periods value + num_values : total number of values + required_min_periods : required min periods per aggregation function + floor : required min periods per aggregation function + + Returns + ------- + min_periods : int + """ + if min_periods is None: + min_periods = window + else: + min_periods = max(required_min_periods, min_periods) + if min_periods > window: + raise ValueError( + "min_periods {min_periods} must be <= " + "window {window}".format(min_periods=min_periods, window=window) + ) + elif min_periods > num_values: + min_periods = num_values + 1 + elif min_periods < 0: + raise ValueError("min_periods must be >= 0") + return max(min_periods, floor) + + def _zsqrt(x): with np.errstate(all="ignore"): result = np.sqrt(x) diff --git a/pandas/core/window/rolling.py b/pandas/core/window/rolling.py index bec350f6b7d8bb..fd2e8aa2ad02f6 100644 --- a/pandas/core/window/rolling.py +++ b/pandas/core/window/rolling.py @@ -3,6 +3,7 @@ similar to how we have a Groupby object. """ from datetime import timedelta +from functools import partial from textwrap import dedent from typing import Callable, Dict, List, Optional, Set, Tuple, Union import warnings @@ -10,6 +11,7 @@ import numpy as np import pandas._libs.window as libwindow +import pandas._libs.window_indexer as libwindow_indexer from pandas.compat._optional import import_optional_dependency from pandas.compat.numpy import function as nv from pandas.util._decorators import Appender, Substitution, cache_readonly @@ -43,10 +45,10 @@ _doc_template, _flex_binary_moment, _offset, - _require_min_periods, _shared_docs, _use_window, _zsqrt, + calculate_min_periods, ) @@ -366,39 +368,55 @@ def _center_window(self, result, window) -> np.ndarray: result = np.copy(result[tuple(lead_indexer)]) return result - def _get_roll_func( - self, cfunc: Callable, check_minp: Callable, index: np.ndarray, **kwargs - ) -> Callable: + def _get_roll_func(self, func_name: str) -> Callable: """ Wrap rolling function to check values passed. Parameters ---------- - cfunc : callable + func_name : str Cython function used to calculate rolling statistics - check_minp : callable - function to check minimum period parameter - index : ndarray - used for variable window Returns ------- func : callable """ + window_func = getattr(libwindow, func_name, None) + if window_func is None: + raise ValueError( + "we do not support this function " + "in libwindow.{func_name}".format(func_name=func_name) + ) + return window_func - def func(arg, window, min_periods=None, closed=None): - minp = check_minp(min_periods, window) - return cfunc(arg, window, minp, index, closed, **kwargs) + def _get_cython_func_type(self, func): + """ + Return a variable or fixed cython function type. - return func + Variable algorithms do not use window while fixed do. + """ + if self.is_freq_type: + return self._get_roll_func("{}_variable".format(func)) + return partial( + self._get_roll_func("{}_fixed".format(func)), win=self._get_window() + ) + + def _get_window_indexer(self): + """ + Return an indexer class that will compute the window start and end bounds + """ + if self.is_freq_type: + return libwindow_indexer.VariableWindowIndexer + return libwindow_indexer.FixedWindowIndexer def _apply( self, - func: Union[str, Callable], + func: Callable, + center: bool, + require_min_periods: int = 0, + floor: int = 1, + is_weighted: bool = False, name: Optional[str] = None, - window: Optional[Union[int, str]] = None, - center: Optional[bool] = None, - check_minp: Optional[Callable] = None, **kwargs, ): """ @@ -408,13 +426,13 @@ def _apply( Parameters ---------- - func : str/callable to apply - name : str, optional - name of this function - window : int/str, default to _get_window() - window length or offset - center : bool, default to self.center - check_minp : function, default to _use_window + func : callable function to apply + center : bool + require_min_periods : int + floor: int + is_weighted + name: str, + compatibility with groupby.rolling **kwargs additional arguments for rolling function and window function @@ -422,20 +440,13 @@ def _apply( ------- y : type of input """ - - if center is None: - center = self.center - - if check_minp is None: - check_minp = _use_window - - if window is None: - win_type = self._get_win_type(kwargs) - window = self._get_window(win_type=win_type) + win_type = self._get_win_type(kwargs) + window = self._get_window(win_type=win_type) blocks, obj = self._create_blocks() block_list = list(blocks) index_as_array = self._get_index() + window_indexer = self._get_window_indexer() results = [] exclude = [] # type: List[Scalar] @@ -455,36 +466,27 @@ def _apply( results.append(values.copy()) continue - # if we have a string function name, wrap it - if isinstance(func, str): - cfunc = getattr(libwindow, func, None) - if cfunc is None: - raise ValueError( - "we do not support this function " - "in libwindow.{func}".format(func=func) - ) - - func = self._get_roll_func(cfunc, check_minp, index_as_array, **kwargs) - # calculation function - if center: - offset = _offset(window, center) - additional_nans = np.array([np.NaN] * offset) + offset = _offset(window, center) if center else 0 + additional_nans = np.array([np.nan] * offset) + + if not is_weighted: def calc(x): - return func( - np.concatenate((x, additional_nans)), - window, - min_periods=self.min_periods, - closed=self.closed, + x = np.concatenate((x, additional_nans)) + min_periods = calculate_min_periods( + window, self.min_periods, len(x), require_min_periods, floor ) + start, end = window_indexer( + x, window, self.closed, index_as_array + ).get_window_bounds() + return func(x, start, end, min_periods) else: def calc(x): - return func( - x, window, min_periods=self.min_periods, closed=self.closed - ) + x = np.concatenate((x, additional_nans)) + return func(x, window, self.min_periods) with np.errstate(all="ignore"): if values.ndim > 1: @@ -995,8 +997,8 @@ def _get_window( # GH #15662. `False` makes symmetric window, rather than periodic. return sig.get_window(win_type, window, False).astype(float) - def _get_roll_func( - self, cfunc: Callable, check_minp: Callable, index: np.ndarray, **kwargs + def _get_weighted_roll_func( + self, cfunc: Callable, check_minp: Callable, **kwargs ) -> Callable: def func(arg, window, min_periods=None, closed=None): minp = check_minp(min_periods, len(window)) @@ -1070,25 +1072,38 @@ def aggregate(self, func, *args, **kwargs): @Appender(_shared_docs["sum"]) def sum(self, *args, **kwargs): nv.validate_window_func("sum", args, kwargs) - return self._apply("roll_weighted_sum", **kwargs) + window_func = self._get_roll_func("roll_weighted_sum") + window_func = self._get_weighted_roll_func(window_func, _use_window) + return self._apply( + window_func, center=self.center, is_weighted=True, name="sum", **kwargs + ) @Substitution(name="window") @Appender(_shared_docs["mean"]) def mean(self, *args, **kwargs): nv.validate_window_func("mean", args, kwargs) - return self._apply("roll_weighted_mean", **kwargs) + window_func = self._get_roll_func("roll_weighted_mean") + window_func = self._get_weighted_roll_func(window_func, _use_window) + return self._apply( + window_func, center=self.center, is_weighted=True, name="mean", **kwargs + ) @Substitution(name="window", versionadded="\n.. versionadded:: 1.0.0\n") @Appender(_shared_docs["var"]) def var(self, ddof=1, *args, **kwargs): nv.validate_window_func("var", args, kwargs) - return self._apply("roll_weighted_var", ddof=ddof, **kwargs) + window_func = partial(self._get_roll_func("roll_weighted_var"), ddof=ddof) + window_func = self._get_weighted_roll_func(window_func, _use_window) + kwargs.pop("name", None) + return self._apply( + window_func, center=self.center, is_weighted=True, name="var", **kwargs + ) @Substitution(name="window", versionadded="\n.. versionadded:: 1.0.0\n") @Appender(_shared_docs["std"]) def std(self, ddof=1, *args, **kwargs): nv.validate_window_func("std", args, kwargs) - return _zsqrt(self.var(ddof=ddof, **kwargs)) + return _zsqrt(self.var(ddof=ddof, name="std", **kwargs)) class _Rolling(_Window): @@ -1203,9 +1218,9 @@ def apply(self, func, raw=None, args=(), kwargs={}): from pandas import Series kwargs.pop("_level", None) + kwargs.pop("floor", None) window = self._get_window() offset = _offset(window, self.center) - index_as_array = self._get_index() # TODO: default is for backward compat # change to False in the future @@ -1221,28 +1236,31 @@ def apply(self, func, raw=None, args=(), kwargs={}): ) raw = True - def f(arg, window, min_periods, closed): - minp = _use_window(min_periods, window) + window_func = partial( + self._get_cython_func_type("roll_generic"), + args=args, + kwargs=kwargs, + raw=raw, + offset=offset, + func=func, + ) + + def apply_func(values, begin, end, min_periods, raw=raw): if not raw: - arg = Series(arg, index=self.obj.index) - return libwindow.roll_generic( - arg, - window, - minp, - index_as_array, - closed, - offset, - func, - raw, - args, - kwargs, - ) + values = Series(values, index=self.obj.index) + return window_func(values, begin, end, min_periods) - return self._apply(f, func, args=args, kwargs=kwargs, center=False, raw=raw) + # TODO: Why do we always pass center=False? + # name=func for WindowGroupByMixin._apply + return self._apply(apply_func, center=False, floor=0, name=func) def sum(self, *args, **kwargs): nv.validate_window_func("sum", args, kwargs) - return self._apply("roll_sum", "sum", **kwargs) + window_func = self._get_cython_func_type("roll_sum") + kwargs.pop("floor", None) + return self._apply( + window_func, center=self.center, floor=0, name="sum", **kwargs + ) _shared_docs["max"] = dedent( """ @@ -1257,7 +1275,8 @@ def sum(self, *args, **kwargs): def max(self, *args, **kwargs): nv.validate_window_func("max", args, kwargs) - return self._apply("roll_max", "max", **kwargs) + window_func = self._get_cython_func_type("roll_max") + return self._apply(window_func, center=self.center, name="max", **kwargs) _shared_docs["min"] = dedent( """ @@ -1298,11 +1317,13 @@ def max(self, *args, **kwargs): def min(self, *args, **kwargs): nv.validate_window_func("min", args, kwargs) - return self._apply("roll_min", "min", **kwargs) + window_func = self._get_cython_func_type("roll_min") + return self._apply(window_func, center=self.center, name="min", **kwargs) def mean(self, *args, **kwargs): nv.validate_window_func("mean", args, kwargs) - return self._apply("roll_mean", "mean", **kwargs) + window_func = self._get_cython_func_type("roll_mean") + return self._apply(window_func, center=self.center, name="mean", **kwargs) _shared_docs["median"] = dedent( """ @@ -1342,27 +1363,40 @@ def mean(self, *args, **kwargs): ) def median(self, **kwargs): - return self._apply("roll_median_c", "median", **kwargs) + window_func = self._get_roll_func("roll_median_c") + window_func = partial(window_func, win=self._get_window()) + return self._apply(window_func, center=self.center, name="median", **kwargs) def std(self, ddof=1, *args, **kwargs): nv.validate_window_func("std", args, kwargs) - window = self._get_window() - index_as_array = self._get_index() + kwargs.pop("require_min_periods", None) + window_func = self._get_cython_func_type("roll_var") - def f(arg, *args, **kwargs): - minp = _require_min_periods(1)(self.min_periods, window) - return _zsqrt( - libwindow.roll_var(arg, window, minp, index_as_array, self.closed, ddof) - ) + def zsqrt_func(values, begin, end, min_periods): + return _zsqrt(window_func(values, begin, end, min_periods, ddof=ddof)) + # ddof passed again for compat with groupby.rolling return self._apply( - f, "std", check_minp=_require_min_periods(1), ddof=ddof, **kwargs + zsqrt_func, + center=self.center, + require_min_periods=1, + name="std", + ddof=ddof, + **kwargs, ) def var(self, ddof=1, *args, **kwargs): nv.validate_window_func("var", args, kwargs) + kwargs.pop("require_min_periods", None) + window_func = partial(self._get_cython_func_type("roll_var"), ddof=ddof) + # ddof passed again for compat with groupby.rolling return self._apply( - "roll_var", "var", check_minp=_require_min_periods(1), ddof=ddof, **kwargs + window_func, + center=self.center, + require_min_periods=1, + name="var", + ddof=ddof, + **kwargs, ) _shared_docs[ @@ -1377,8 +1411,14 @@ def var(self, ddof=1, *args, **kwargs): """ def skew(self, **kwargs): + window_func = self._get_cython_func_type("roll_skew") + kwargs.pop("require_min_periods", None) return self._apply( - "roll_skew", "skew", check_minp=_require_min_periods(3), **kwargs + window_func, + center=self.center, + require_min_periods=3, + name="skew", + **kwargs, ) _shared_docs["kurt"] = dedent( @@ -1414,8 +1454,14 @@ def skew(self, **kwargs): ) def kurt(self, **kwargs): + window_func = self._get_cython_func_type("roll_kurt") + kwargs.pop("require_min_periods", None) return self._apply( - "roll_kurt", "kurt", check_minp=_require_min_periods(4), **kwargs + window_func, + center=self.center, + require_min_periods=4, + name="kurt", + **kwargs, ) _shared_docs["quantile"] = dedent( @@ -1475,33 +1521,22 @@ def kurt(self, **kwargs): ) def quantile(self, quantile, interpolation="linear", **kwargs): - window = self._get_window() - index_as_array = self._get_index() - - def f(arg, *args, **kwargs): - minp = _use_window(self.min_periods, window) - if quantile == 1.0: - return libwindow.roll_max( - arg, window, minp, index_as_array, self.closed - ) - elif quantile == 0.0: - return libwindow.roll_min( - arg, window, minp, index_as_array, self.closed - ) - else: - return libwindow.roll_quantile( - arg, - window, - minp, - index_as_array, - self.closed, - quantile, - interpolation, - ) + if quantile == 1.0: + window_func = self._get_cython_func_type("roll_max") + elif quantile == 0.0: + window_func = self._get_cython_func_type("roll_min") + else: + window_func = partial( + self._get_roll_func("roll_quantile"), + win=self._get_window(), + quantile=quantile, + interpolation=interpolation, + ) - return self._apply( - f, "quantile", quantile=quantile, interpolation=interpolation, **kwargs - ) + # Pass through for groupby.rolling + kwargs["quantile"] = quantile + kwargs["interpolation"] = interpolation + return self._apply(window_func, center=self.center, name="quantile", **kwargs) _shared_docs[ "cov" @@ -1856,7 +1891,8 @@ def count(self): # different impl for freq counting if self.is_freq_type: - return self._apply("roll_count", "count") + window_func = self._get_roll_func("roll_count") + return self._apply(window_func, center=self.center, name="count") return super().count() diff --git a/setup.py b/setup.py index 545765ecb114d2..0915b6aba113a1 100755 --- a/setup.py +++ b/setup.py @@ -344,6 +344,7 @@ class CheckSDist(sdist_class): "pandas/_libs/tslibs/resolution.pyx", "pandas/_libs/tslibs/parsing.pyx", "pandas/_libs/tslibs/tzconversion.pyx", + "pandas/_libs/window_indexer.pyx", "pandas/_libs/writers.pyx", "pandas/io/sas/sas.pyx", ] @@ -683,6 +684,7 @@ def srcpath(name=None, suffix=".pyx", subdir="src"): }, "_libs.testing": {"pyxfile": "_libs/testing"}, "_libs.window": {"pyxfile": "_libs/window", "language": "c++", "suffix": ".cpp"}, + "_libs.window_indexer": {"pyxfile": "_libs/window_indexer"}, "_libs.writers": {"pyxfile": "_libs/writers"}, "io.sas._sas": {"pyxfile": "io/sas/sas"}, "io.msgpack._packer": {