Skip to content

Commit

Permalink
FEAT-#7718: Add range-partitioning impl for 'df.resample()'
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev committed Apr 3, 2024
1 parent 9afc049 commit 2d60fa3
Show file tree
Hide file tree
Showing 5 changed files with 284 additions and 31 deletions.
7 changes: 6 additions & 1 deletion modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2438,6 +2438,7 @@ def combine_and_apply(
dtypes=new_dtypes,
)

@lazy_metadata_decorator(apply_axis="both")
def _apply_func_to_range_partitioning(
self,
key_columns,
Expand All @@ -2447,6 +2448,7 @@ def _apply_func_to_range_partitioning(
data=None,
data_key_columns=None,
level=None,
shuffle_func_cls=ShuffleSortFunctions,
**kwargs,
):
"""
Expand All @@ -2470,6 +2472,9 @@ def _apply_func_to_range_partitioning(
Additional key columns from `data`. Will be combined with `key_columns`.
level : list of ints or labels, optional
Index level(s) to build the range partitioning for. Can't be specified along with `key_columns`.
shuffle_func_cls : cls, default: ShuffleSortFunctions
A class implementing ``modin.core.dataframe.pandas.utils.ShuffleFunctions`` to be used
as a shuffle function.
**kwargs : dict
Additional arguments to forward to the range builder function.
Expand Down Expand Up @@ -2573,7 +2578,7 @@ def _apply_func_to_range_partitioning(
else:
new_partitions = grouper._partitions

shuffling_functions = ShuffleSortFunctions(
shuffling_functions = shuffle_func_cls(
grouper,
key_columns,
ascending[0] if is_list_like(ascending) else ascending,
Expand Down
189 changes: 178 additions & 11 deletions modin/core/dataframe/pandas/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import numpy as np
import pandas
from pandas._libs.tslibs import to_offset
from pandas.core.dtypes.common import is_list_like, is_numeric_dtype
from pandas.core.resample import _get_timestamp_range_edges

from modin.error_message import ErrorMessage
from modin.utils import _inherit_docstrings
Expand Down Expand Up @@ -122,6 +124,7 @@ class ShuffleSortFunctions(ShuffleFunctions):
The ideal number of new partitions.
level : list of strings or ints, or None
Index level(s) to use as a key. Can't be specified along with `columns`.
closed_on_right : bool, default: True
**kwargs : dict
Additional keyword arguments.
"""
Expand All @@ -133,6 +136,7 @@ def __init__(
ascending: Union[list, bool],
ideal_num_new_partitions: int,
level: Optional[list[Union[str, int]]] = None,
closed_on_right: bool = True,
**kwargs: dict,
):
self.frame_len = len(modin_frame)
Expand All @@ -142,6 +146,7 @@ def __init__(
self.kwargs = kwargs.copy()
self.level = level
self.columns_info = None
self.closed_on_right = closed_on_right

def sample_fn(self, partition: pandas.DataFrame) -> pandas.DataFrame:
if self.level is not None:
Expand All @@ -159,11 +164,11 @@ def pivot_fn(self, samples: "list[pandas.DataFrame]") -> int:
columns_info: "list[ColumnInfo]" = []
number_of_groups = 1
cols = []
for col in samples.columns:
for i, col in enumerate(samples.columns):
num_pivots = int(self.ideal_num_new_partitions / number_of_groups)
if num_pivots < 2 and len(columns_info):
break
column_val = samples[col].to_numpy()
column_val = samples[col]
cols.append(col)
is_numeric = is_numeric_dtype(column_val.dtype)

Expand All @@ -172,7 +177,13 @@ def pivot_fn(self, samples: "list[pandas.DataFrame]") -> int:
pivots = self.pick_pivots_from_samples_for_sort(
column_val, num_pivots, method, key
)
columns_info.append(ColumnInfo(col, pivots, is_numeric))
columns_info.append(
ColumnInfo(
self.level[i] if col is None and self.level is not None else col,
pivots,
is_numeric,
)
)
number_of_groups *= len(pivots) + 1
self.columns_info = columns_info
return number_of_groups
Expand All @@ -190,6 +201,7 @@ def split_fn(
self.columns_info,
self.ascending,
keys_are_index_levels=self.level is not None,
closed_on_right=self.closed_on_right,
**self.kwargs,
)

Expand Down Expand Up @@ -270,10 +282,9 @@ def pick_samples_for_quantiles(
probability = (1 / m) * np.log(num_partitions * length)
return df.sample(frac=probability)

@classmethod
def pick_pivots_from_samples_for_sort(
cls,
samples: np.ndarray,
self,
samples: pandas.Series,
ideal_num_new_partitions: int,
method: str = "linear",
key: Optional[Callable] = None,
Expand All @@ -288,7 +299,7 @@ def pick_pivots_from_samples_for_sort(
Parameters
----------
samples : np.ndarray
samples : pandas.Series
The samples computed by ``get_partition_quantiles_for_sort``.
ideal_num_new_partitions : int
The ideal number of new partitions.
Expand All @@ -302,6 +313,7 @@ def pick_pivots_from_samples_for_sort(
np.ndarray
A list of overall quantiles.
"""
samples = samples.to_numpy()
# We don't call `np.unique` on the samples, since if a quantile shows up in multiple
# partition's samples, this is probably an indicator of skew in the dataset, and we
# want our final partitions to take this into account.
Expand All @@ -313,7 +325,7 @@ def pick_pivots_from_samples_for_sort(
# If we only desire 1 partition, we need to ensure that we're not trying to find quantiles
# from an empty list of pivots.
if len(quantiles) > 0:
return cls._find_quantiles(samples, quantiles, method)
return self._find_quantiles(samples, quantiles, method)
return np.array([])

@staticmethod
Expand All @@ -322,8 +334,9 @@ def split_partitions_using_pivots_for_sort(
columns_info: "list[ColumnInfo]",
ascending: bool,
keys_are_index_levels: bool = False,
closed_on_right: bool = True,
**kwargs: dict,
) -> "tuple[pandas.DataFrame, ...]":
) -> "tuple[pandas.DataFrame, ...]": # noqa
"""
Split the given dataframe into the partitions specified by `pivots` in `columns_info`.
Expand Down Expand Up @@ -402,15 +415,21 @@ def get_group(grp, key, df):
cols_to_digitize = cols_to_digitize.squeeze()

if col_info.is_numeric:
groupby_col = np.digitize(cols_to_digitize, pivots)
groupby_col = np.digitize(
cols_to_digitize, pivots, right=not closed_on_right
)
# `np.digitize` returns results based off of the sort order of the pivots it is passed.
# When we only have one unique value in our pivots, `np.digitize` assumes that the pivots
# are sorted in ascending order, and gives us results based off of that assumption - so if
# we actually want to sort in descending order, we need to swap the new indices.
if not ascending and len(np.unique(pivots)) == 1:
groupby_col = len(pivots) - groupby_col
else:
groupby_col = np.searchsorted(pivots, cols_to_digitize, side="right")
groupby_col = np.searchsorted(
pivots,
cols_to_digitize,
side="right" if closed_on_right else "left",
)
# Since np.searchsorted requires the pivots to be in ascending order, if we want to sort
# in descending order, we need to swap the new indices.
if not ascending:
Expand Down Expand Up @@ -479,6 +498,154 @@ def _index_to_df_zero_copy(
return index_data


class ShuffleResample(ShuffleSortFunctions): # noqa
def __init__(
self,
modin_frame: "PandasDataframe",
columns: Union[str, list],
ascending: Union[list, bool],
ideal_num_new_partitions: int,
resample_kwargs: dict,
**kwargs: dict,
):
super().__init__(
modin_frame,
columns,
ascending,
ideal_num_new_partitions,
closed_on_right=False,
**kwargs,
)

resample_kwargs = resample_kwargs.copy()
rule = resample_kwargs.pop("rule")

if resample_kwargs["closed"] is None:
if rule in ("ME", "YE", "QE", "BME", "BA", "BQE", "W"):
resample_kwargs["closed"] = "right"
else:
resample_kwargs["closed"] = "left"

self.closed_on_right = resample_kwargs["closed"] != "right"

resample_kwargs["freq"] = to_offset(rule)
self.resample_kwargs = resample_kwargs

@staticmethod
def pick_samples_for_quantiles(
df: pandas.DataFrame,
num_partitions: int,
length: int,
) -> pandas.DataFrame: # noqa
return pandas.concat([df.min().to_frame().T, df.max().to_frame().T])

def pick_pivots_from_samples_for_sort(
self,
samples: np.ndarray,
ideal_num_new_partitions: int,
method: str = "linear",
key: Optional[Callable] = None,
) -> np.ndarray: # noqa
if key is not None:
raise NotImplementedError(key)

max_value = samples.max()

first, last = _get_timestamp_range_edges(
samples.min(),
max_value,
self.resample_kwargs["freq"],
unit=samples.dt.unit,
closed=self.resample_kwargs["closed"],
origin=self.resample_kwargs["origin"],
offset=self.resample_kwargs["offset"],
)

all_bins = pandas.date_range(
start=first,
end=last,
freq=self.resample_kwargs["freq"],
ambiguous=True,
nonexistent="shift_forward",
unit=samples.dt.unit,
)

all_bins = self._adjust_bin_edges(
all_bins,
max_value,
freq=self.resample_kwargs["freq"],
closed=self.resample_kwargs["closed"],
)

step = 1 / ideal_num_new_partitions
bins = [
all_bins[int(len(all_bins) * i * step)]
for i in range(1, ideal_num_new_partitions)
]
return bins

def _adjust_bin_edges(
self,
binner: pandas.DatetimeIndex,
end_timestamp,
freq,
closed,
) -> tuple[pandas.DatetimeIndex, np.ndarray[np.int64]]: # noqa
# Some hacks for > daily data, see #1471, #1458, #1483

if freq.name not in ("BME", "ME", "W") and freq.name.split("-")[0] not in (
"BQE",
"BYE",
"QE",
"YE",
"W",
):
return binner

# If the right end-point is on the last day of the month, roll forwards
# until the last moment of that day. Note that we only do this for offsets
# which correspond to the end of a super-daily period - "month start", for
# example, is excluded.
if closed == "right":
# GH 21459, GH 9119: Adjust the bins relative to the wall time
edges_dti = binner.tz_localize(None)
edges_dti = (
edges_dti
+ pandas.Timedelta(days=1, unit=edges_dti.unit).as_unit(edges_dti.unit)
- pandas.Timedelta(1, unit=edges_dti.unit).as_unit(edges_dti.unit)
)
binner = edges_dti.tz_localize(binner.tz)

# intraday values on last day
if binner[-2] > end_timestamp:
binner = binner[:-1]
return binner

@staticmethod
def split_partitions_using_pivots_for_sort(
df: pandas.DataFrame,
columns_info: "list[ColumnInfo]",
ascending: bool,
closed_on_right: bool = True,
**kwargs: dict,
) -> "tuple[pandas.DataFrame, ...]": # noqa
def add_attr(df, timestamp):
if "bin_bounds" in df.attrs:
df.attrs["bin_bounds"] = (*df.attrs["bin_bounds"], timestamp)
else:
df.attrs["bin_bounds"] = (timestamp,)
return df

result = ShuffleSortFunctions.split_partitions_using_pivots_for_sort(
df, columns_info, ascending, **kwargs
)
for i, pivot in enumerate(columns_info[0].pivots):
add_attr(result[i], pivot - pandas.Timedelta(1, unit="ns"))
if i + 1 <= len(result):
add_attr(result[i + 1], pivot + pandas.Timedelta(1, unit="ns"))
return result


def lazy_metadata_decorator(apply_axis=None, axis_arg=-1, transpose=False):
"""
Lazily propagate metadata for the ``PandasDataframe``.
Expand Down
Loading

0 comments on commit 2d60fa3

Please sign in to comment.