Skip to content

Commit

Permalink
FIX-modin-project#2269: logic of dropping 'by' moved back to API level
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev committed Oct 30, 2020
1 parent addc0b2 commit c5636dd
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 34 deletions.
13 changes: 1 addition & 12 deletions modin/backends/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1401,13 +1401,11 @@ def groupby_agg(
self,
by,
is_multi_by,
idx_name,
axis,
agg_func,
agg_args,
agg_kwargs,
groupby_kwargs,
drop_,
drop=False,
):
if is_multi_by:
Expand All @@ -1420,17 +1418,8 @@ def groupby_agg(
else:
by = by.to_pandas().squeeze() if isinstance(by, type(self)) else by

# For aggregations, pandas behavior does this for the result.
# For other operations it does not, so we wait until there is an aggregation to
# actually perform this operation.
new_self = (
self.drop(columns=[idx_name])
if idx_name is not None and drop_ and drop
else self
)

return GroupByDefault.register(pandas.core.groupby.DataFrameGroupBy.aggregate)(
self if is_multi_by else new_self,
self,
by=by,
is_multi_by=is_multi_by,
axis=axis,
Expand Down
19 changes: 3 additions & 16 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2588,13 +2588,11 @@ def groupby_agg(
self,
by,
is_multi_by,
idx_name,
axis,
agg_func,
agg_args,
agg_kwargs,
groupby_kwargs,
drop_,
drop=False,
):
agg_func = wrap_udf_function(agg_func)
Expand All @@ -2603,27 +2601,16 @@ def groupby_agg(
return super().groupby_agg(
by=by,
is_multi_by=is_multi_by,
idx_name=idx_name,
axis=axis,
agg_func=agg_func,
agg_args=agg_args,
agg_kwargs=agg_kwargs,
groupby_kwargs=groupby_kwargs,
drop_=drop_,
drop=drop,
)

by = by.to_pandas().squeeze() if isinstance(by, type(self)) else by

# For aggregations, pandas behavior does this for the result.
# For other operations it does not, so we wait until there is an aggregation to
# actually perform this operation.
new_self = (
self.drop(columns=[idx_name])
if idx_name is not None and drop_ and drop
else self
)

# since we're going to modify `groupby_kwargs` dict in a `groupby_agg_builder`,
# we want to copy it to not propagate these changes into source dict, in case
# of unsuccessful end of function
Expand Down Expand Up @@ -2655,14 +2642,14 @@ def compute_groupby(df):
except (ValueError, KeyError):
return compute_groupby(df.copy())

new_modin_frame = new_self._modin_frame._apply_full_axis(
new_modin_frame = self._modin_frame._apply_full_axis(
axis, lambda df: groupby_agg_builder(df)
)
result = new_self.__constructor__(new_modin_frame)
result = self.__constructor__(new_modin_frame)

# that means that exception in `compute_groupby` was raised
# in every partition, so we also should raise it
if len(result.columns) == 0 and len(new_self.columns) != 0:
if len(result.columns) == 0 and len(self.columns) != 0:
# determening type of raised exception by applying `aggfunc`
# to empty DataFrame
try:
Expand Down
4 changes: 1 addition & 3 deletions modin/experimental/backends/omnisci/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,16 +266,14 @@ def groupby_agg(
self,
by,
is_multi_by,
idx_name,
axis,
agg_func,
agg_args,
agg_kwargs,
groupby_kwargs,
drop_,
drop=False,
):
# TODO: handle `is_multi_by`, `idx_name`, `agg_args`, `drop_`, `drop` args
# TODO: handle `is_multi_by`, `agg_args`, `drop` args
new_frame = self._modin_frame.groupby_agg(
by, axis, agg_func, groupby_kwargs, **agg_kwargs
)
Expand Down
12 changes: 9 additions & 3 deletions modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -834,16 +834,22 @@ def _apply_agg_function(self, f, drop=True, *args, **kwargs):
"""
assert callable(f), "'{0}' object is not callable".format(type(f))

new_manager = self._query_compiler.groupby_agg(
# For aggregations, pandas behavior does this for the result.
# For other operations it does not, so we wait until there is an aggregation to
# actually perform this operation.
if not self._is_multi_by and self._idx_name is not None and drop and self._drop:
groupby_qc = self._query_compiler.drop(columns=[self._idx_name])
else:
groupby_qc = self._query_compiler

new_manager = groupby_qc.groupby_agg(
by=self._by,
is_multi_by=self._is_multi_by,
idx_name=self._idx_name,
axis=self._axis,
agg_func=f,
agg_args=args,
agg_kwargs=kwargs,
groupby_kwargs=self._kwargs,
drop_=drop,
drop=self._drop,
)
if self._idx_name is not None and self._as_index:
Expand Down

0 comments on commit c5636dd

Please sign in to comment.