Skip to content

Commit

Permalink
:)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev committed Apr 2, 2024
1 parent 5bec801 commit b70b22d
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 22 deletions.
34 changes: 24 additions & 10 deletions modin/core/dataframe/pandas/dataframe/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,15 +166,15 @@ def pivot_fn(self, samples: "list[pandas.DataFrame]") -> int:
key = self.kwargs.get("key", None)
samples = pandas.concat(samples, axis=0, copy=False)
# breakpoint()
if self.level is not None:
index_names = frozenset(samples.columns[lvl] if isinstance(lvl, int) else lvl for lvl in self.level)
else:
index_names = frozenset()
# if self.level is not None:
# index_names = frozenset(samples.columns[lvl] if isinstance(lvl, int) else lvl for lvl in self.level)
# else:
# index_names = frozenset()

columns_info: "list[ColumnInfo]" = []
number_of_groups = 1
cols = []
for col in samples.columns:
for i, col in enumerate(samples.columns):
num_pivots = int(self.ideal_num_new_partitions / number_of_groups)
if num_pivots < 2 and len(columns_info):
break
Expand All @@ -187,8 +187,8 @@ def pivot_fn(self, samples: "list[pandas.DataFrame]") -> int:
pivots = self.pick_pivots_from_samples_for_sort(
column_val, num_pivots, method, key
)
is_index_level = col in index_names
columns_info.append(ColumnInfo(col, pivots, is_numeric, is_index_level))
is_index_level = self.level is not None
columns_info.append(ColumnInfo(self.level[i] if col is None else col, pivots, is_numeric, is_index_level))
number_of_groups *= len(pivots) + 1
self.columns_info = columns_info
return number_of_groups
Expand Down Expand Up @@ -371,6 +371,7 @@ def split_partitions_using_pivots_for_sort(
na_indices = []

if index_levels:
# breakpoint()
index_data = _index_to_df_zero_copy(df, index_levels)
na_indices.append(index_data.isna())

Expand Down Expand Up @@ -586,12 +587,25 @@ def split_partitions_using_pivots_for_sort(
closed_on_right: bool=True,
**kwargs: dict,
) -> "tuple[pandas.DataFrame, ...]":
def add_attr(df, timestamp):
if "dummy_timestamp" in df.attrs:
df.attrs["dummy_timestamp"] = (*df.attrs["dummy_timestamp"], timestamp)
else:
df.attrs["dummy_timestamp"] = (timestamp,)
return df

result = ShuffleSortFunctions.split_partitions_using_pivots_for_sort(df, columns_info, ascending, closed_on_right, **kwargs)
# breakpoint()
lvl = columns_info[0].name
if isinstance(lvl, int):
to_drop = [i for i in range(result[0].index.nlevels) if i != lvl]
else:
to_drop = [i for i in result[0].index.names if i != lvl]
for rs in result:
rs.index = rs.index.droplevel(to_drop)
for i, pivot in enumerate(columns_info[0].pivots):
result[i].loc[pivot - pandas.Timedelta(1, unit="ns")] = [np.NaN] * result[i].shape[1]
add_attr(result[i], pivot - pandas.Timedelta(1, unit="ns"))
if i + 1 <= len(result):
result[i + 1].loc[pivot + pandas.Timedelta(1, unit="ns")] = [np.NaN] * result[i + 1].shape[1]
add_attr(result[i + 1], pivot + pandas.Timedelta(1, unit="ns"))
return result


Expand Down
29 changes: 17 additions & 12 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1053,15 +1053,19 @@ def _resample_func(

def map_func(df, resample_kwargs=resample_kwargs): # pragma: no cover
"""Resample time-series data of the passed frame and apply aggregation function on it."""
# HACK: REMOVE ME
df = df[~df.index.duplicated()]
if len(df) == 0:
if resample_kwargs["on"] is not None:
df = df.set_index(resample_kwargs["on"])
return df
# breakpoint()
if "dummy_timestamp" in df.attrs:
timestamps = df.attrs["dummy_timestamp"]
for ts in timestamps:
df.loc[ts] = np.NaN
if df_op is not None:
df = df_op(df)
if use_range_impl:
resample_kwargs = resample_kwargs.copy()
resample_kwargs["level"] = None
resampled_val = df.resample(**resample_kwargs)
op = getattr(pandas.core.resample.Resampler, func_name)
if callable(op):
Expand All @@ -1081,7 +1085,7 @@ def map_func(df, resample_kwargs=resample_kwargs): # pragma: no cover
return val

if resample_kwargs["on"] is None:
level = [0]
level = [0 if resample_kwargs["level"] is None else resample_kwargs["level"]]
key_columns = []
else:
level = None
Expand Down Expand Up @@ -1134,24 +1138,24 @@ def resample_agg_df(self, resample_kwargs, func, *args, **kwargs):

def resample_transform(self, resample_kwargs, arg, *args, **kwargs):
return self._resample_func(
resample_kwargs, "transform", arg=arg, *args, **kwargs
resample_kwargs, "transform", arg=arg, use_range_impl=False, *args, **kwargs
)

def resample_pipe(self, resample_kwargs, func, *args, **kwargs):
return self._resample_func(resample_kwargs, "pipe", func=func, *args, **kwargs)

def resample_ffill(self, resample_kwargs, limit):
return self._resample_func(resample_kwargs, "ffill", limit=limit)
return self._resample_func(resample_kwargs, "ffill", limit=limit, use_range_impl=False)

def resample_bfill(self, resample_kwargs, limit):
return self._resample_func(resample_kwargs, "bfill", limit=limit)
return self._resample_func(resample_kwargs, "bfill", limit=limit, use_range_impl=False)

def resample_nearest(self, resample_kwargs, limit):
return self._resample_func(resample_kwargs, "nearest", limit=limit)
return self._resample_func(resample_kwargs, "nearest", limit=limit, use_range_impl=False)

def resample_fillna(self, resample_kwargs, method, limit):
return self._resample_func(
resample_kwargs, "fillna", method=method, limit=limit
resample_kwargs, "fillna", method=method, limit=limit, use_range_impl=method is None,
)

def resample_asfreq(self, resample_kwargs, fill_value):
Expand All @@ -1178,6 +1182,7 @@ def resample_interpolate(
limit_direction=limit_direction,
limit_area=limit_area,
downcast=downcast,
use_range_impl=False,
**kwargs,
)

Expand All @@ -1188,10 +1193,10 @@ def resample_nunique(self, resample_kwargs, *args, **kwargs):
return self._resample_func(resample_kwargs, "nunique", *args, **kwargs)

def resample_first(self, resample_kwargs, *args, **kwargs):
return self._resample_func(resample_kwargs, "first", *args, **kwargs)
return self._resample_func(resample_kwargs, "first", use_range_impl=False, *args, **kwargs)

def resample_last(self, resample_kwargs, *args, **kwargs):
return self._resample_func(resample_kwargs, "last", *args, **kwargs)
return self._resample_func(resample_kwargs, "last", use_range_impl=False, *args, **kwargs)

def resample_max(self, resample_kwargs, *args, **kwargs):
return self._resample_func(resample_kwargs, "max", *args, **kwargs)
Expand Down Expand Up @@ -1228,7 +1233,7 @@ def resample_prod(self, resample_kwargs, min_count, *args, **kwargs):

def resample_size(self, resample_kwargs):
return self._resample_func(
resample_kwargs, "size", new_columns=[MODIN_UNNAMED_SERIES_LABEL]
resample_kwargs, "size", new_columns=[MODIN_UNNAMED_SERIES_LABEL], use_range_impl=False,
)

def resample_sem(self, resample_kwargs, *args, **kwargs):
Expand Down

0 comments on commit b70b22d

Please sign in to comment.