Skip to content

Commit

Permalink
FEAT-modin-project#2491: renamed some entities
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev committed Dec 17, 2020
1 parent 2262dc4 commit 58cfaa6
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 23 deletions.
10 changes: 5 additions & 5 deletions asv_bench/benchmarks/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,21 @@ def execute(df):


class BaseTimeGroupBy:
def setup(self, data_size, count_columns=1):
def setup(self, data_size, ncols=1):
self.df = generate_dataframe(
ASV_USE_IMPL, "int", data_size[1], data_size[0], RAND_LOW, RAND_HIGH
)
self.groupby_columns = self.df.columns[:count_columns].tolist()
self.groupby_columns = self.df.columns[:ncols].tolist()


class TimeMultiColumnGroupby(BaseTimeGroupBy):
param_names = ["data_size", "count_columns"]
param_names = ["data_size", "ncols"]
params = [UNARY_OP_DATA_SIZE, [6]]

def time_groupby_agg_quan(self, data_size, count_columns):
def time_groupby_agg_quan(self, data_size, ncols):
execute(self.df.groupby(by=self.groupby_columns).agg("quantile"))

def time_groupby_agg_mean(self, data_size, count_columns):
def time_groupby_agg_mean(self, data_size, ncols):
execute(self.df.groupby(by=self.groupby_columns).apply(lambda df: df.mean()))


Expand Down
26 changes: 13 additions & 13 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2476,14 +2476,14 @@ def _groupby_dict_reduce(
map_fns = []
for i, fn in enumerate(col_funcs):
if not isinstance(fn, str) and isinstance(fn, Iterable):
future_col_name, func = fn
new_col_name, func = fn
elif isinstance(fn, str):
future_col_name, func = fn, fn
new_col_name, func = fn, fn
else:
raise TypeError

map_fns.append((future_col_name, groupby_reduce_functions[func][0]))
reduce_dict[(col, future_col_name)] = groupby_reduce_functions[func][1]
map_fns.append((new_col_name, groupby_reduce_functions[func][0]))
reduce_dict[(col, new_col_name)] = groupby_reduce_functions[func][1]
map_dict[col] = map_fns
return GroupbyReduceFunction.register(map_dict, reduce_dict)(
query_compiler=self,
Expand All @@ -2507,23 +2507,23 @@ def groupby_agg(
groupby_kwargs,
drop=False,
):
def is_reduce_fn(o, deep_level=0):
if not isinstance(o, str) and isinstance(o, Container):
def is_reduce_fn(fn, deep_level=0):
if not isinstance(fn, str) and isinstance(fn, Container):
# `deep_level` parameter specifies the number of nested containers that was met:
# - if it's 0, then we're outside of container, `o` could be either function name
# - if it's 0, then we're outside of container, `fn` could be either function name
# or container of function names/renamers.
# - if it's 1, then we're inside container of function names/renamers. `o` must be
# - if it's 1, then we're inside container of function names/renamers. `fn` must be
# either function name or renamer (renamer is some container which length == 2,
# the first element is the new column name and the second is the function name).
assert deep_level == 0 or (
deep_level > 0 and len(o) == 2
), f"Got the renamer with incorrect length, expected 2 got {len(o)}."
deep_level > 0 and len(fn) == 2
), f"Got the renamer with incorrect length, expected 2 got {len(fn)}."
return (
all(is_reduce_fn(v, deep_level + 1) for v in o)
all(is_reduce_fn(f, deep_level + 1) for f in fn)
if deep_level == 0
else is_reduce_fn(o[1], deep_level + 1)
else is_reduce_fn(fn[1], deep_level + 1)
)
return isinstance(o, str) and o in groupby_reduce_functions
return isinstance(fn, str) and fn in groupby_reduce_functions

if isinstance(agg_func, dict) and all(
is_reduce_fn(x) for x in agg_func.values()
Expand Down
3 changes: 3 additions & 0 deletions modin/engines/base/frame/partition_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,9 @@ def map_axis_partitions(
The flag to keep partitions for Modin Frame.
lengths : list(int)
The list of lengths to shuffle the object.
enumerate_partitions : bool (optional, default False),
Whether or not to pass partition index into `map_func`.
Note that `map_func` must be able to obtain `partition_idx` kwarg.
Returns
-------
Expand Down
10 changes: 5 additions & 5 deletions modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,15 +377,15 @@ def aggregate(self, func=None, *args, **kwargs):
relabeling_required = False
if isinstance(func, dict) or func is None:

def try_get_str_func(o):
if not isinstance(o, str) and isinstance(o, Iterable):
return [try_get_str_func(v) for v in o]
return o.__name__ if callable(o) and o.__name__ in dir(self) else o
def try_get_str_func(fn):
if not isinstance(fn, str) and isinstance(fn, Iterable):
return [try_get_str_func(f) for f in fn]
return fn.__name__ if callable(fn) and fn.__name__ in dir(self) else fn

relabeling_required, func_dict, new_columns, order = reconstruct_func(
func, **kwargs
)
func_dict = {k: try_get_str_func(v) for k, v in func_dict.items()}
func_dict = {col: try_get_str_func(fn) for col, fn in func_dict.items()}

if any(i not in self._df.columns for i in func_dict.keys()):
from pandas.core.base import SpecificationError
Expand Down

0 comments on commit 58cfaa6

Please sign in to comment.