diff --git a/modin/pandas/base.py b/modin/pandas/base.py index 5522410792a..5fde31165be 100644 --- a/modin/pandas/base.py +++ b/modin/pandas/base.py @@ -32,6 +32,7 @@ import pickle as pkl from modin.error_message import ErrorMessage +from modin.pandas.utils import try_cast_to_pandas # Similar to pandas, sentinel value to use as kwarg in place of None when None has # special meaning and needs to be distinguished from a user explicitly passing None. @@ -241,11 +242,9 @@ def _default_to_pandas(self, op, *args, **kwargs): empty_self_str, ) ) - args = (a._to_pandas() if hasattr(a, "_to_pandas") else a for a in args) - kwargs = { - k: v._to_pandas() if hasattr(v, "_to_pandas") else v - for k, v in kwargs.items() - } + + args = try_cast_to_pandas(args) + kwargs = try_cast_to_pandas(kwargs) pandas_obj = self._to_pandas() if callable(op): result = op(pandas_obj, *args, **kwargs) diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index cc9ba04d280..a18c9e30589 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -346,7 +346,9 @@ def aggregate(self, func=None, *args, **kwargs): if func is None or is_list_like(func): return self._default_to_pandas( - lambda df: df.aggregate(func, *args, **kwargs) + lambda df, *args, **kwargs: df.aggregate(func, *args, **kwargs), + *args, + **kwargs, ) if isinstance(func, str): @@ -355,7 +357,10 @@ def aggregate(self, func=None, *args, **kwargs): return agg_func(*args, **kwargs) return self._apply_agg_function( - lambda df: df.aggregate(func, *args, **kwargs), drop=self._as_index + lambda df, *args, **kwargs: df.aggregate(func, *args, **kwargs), + drop=self._as_index, + *args, + **kwargs, ) agg = aggregate @@ -622,7 +627,7 @@ def _wrap_aggregation( return result.squeeze() return result - def _apply_agg_function(self, f, drop=True, **kwargs): + def _apply_agg_function(self, f, drop=True, *args, **kwargs): """Perform aggregation and combine stages based on a given function. Args: @@ -634,7 +639,7 @@ def _apply_agg_function(self, f, drop=True, **kwargs): assert callable(f), "'{0}' object is not callable".format(type(f)) if self._is_multi_by: - return self._default_to_pandas(f, **kwargs) + return self._default_to_pandas(f, *args, **kwargs) if isinstance(self._by, type(self._query_compiler)): by = self._by.to_pandas().squeeze() @@ -658,7 +663,7 @@ def _apply_agg_function(self, f, drop=True, **kwargs): return result.squeeze() return result - def _default_to_pandas(self, f, **kwargs): + def _default_to_pandas(self, f, *args, **kwargs): """Defailts the execution of this function to pandas. Args: @@ -677,10 +682,12 @@ def _default_to_pandas(self, f, **kwargs): else: by = self._by - def groupby_on_multiple_columns(df): - return f(df.groupby(by=by, axis=self._axis, **self._kwargs), **kwargs) + def groupby_on_multiple_columns(df, *args, **kwargs): + return f( + df.groupby(by=by, axis=self._axis, **self._kwargs), *args, **kwargs + ) - return self._df._default_to_pandas(groupby_on_multiple_columns) + return self._df._default_to_pandas(groupby_on_multiple_columns, *args, **kwargs) class SeriesGroupBy(DataFrameGroupBy): diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index fd8bf474db2..5dbe30baa19 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -44,13 +44,17 @@ def modin_groupby_equals_pandas(modin_groupby, pandas_groupby): df_equals(g1[1], g2[1]) -def eval_aggregation(md_df, pd_df, operation, by=None, *args, **kwargs): +def eval_aggregation(md_df, pd_df, operation=None, by=None, *args, **kwargs): if by is None: by = md_df.columns[0] + if operation is None: + operation = {} return eval_general( md_df, pd_df, - operation=lambda df: df.groupby(by=by).agg(operation), + operation=lambda df, *args, **kwargs: df.groupby(by=by).agg( + operation, *args, **kwargs + ), *args, **kwargs, ) @@ -1104,3 +1108,29 @@ def test_agg_exceptions(operation): data = {**data1, **data2} eval_aggregation(*create_test_dfs(data), operation=operation) + + +@pytest.mark.parametrize( + "kwargs", + [ + { + "Max": ("cnt", np.max), + "Sum": ("cnt", np.sum), + "Num": ("c", pd.Series.nunique), + "Num1": ("c", pandas.Series.nunique), + }, + { + "func": { + "Max": ("cnt", np.max), + "Sum": ("cnt", np.sum), + "Num": ("c", pd.Series.nunique), + "Num1": ("c", pandas.Series.nunique), + } + }, + ], +) +def test_to_pandas_convertion(kwargs): + data = {"a": [1, 2], "b": [3, 4], "c": [5, 6]} + by = ["a", "b"] + + eval_aggregation(*create_test_dfs(data), by=by, **kwargs) diff --git a/modin/pandas/utils.py b/modin/pandas/utils.py index cc3b39eadd4..8ed2e60d5cd 100644 --- a/modin/pandas/utils.py +++ b/modin/pandas/utils.py @@ -11,6 +11,8 @@ # ANY KIND, either express or implied. See the License for the specific language # governing permissions and limitations under the License. +import pandas + def from_non_pandas(df, index, columns, dtype): from modin.data_management.dispatcher import EngineDispatcher @@ -82,3 +84,35 @@ def decorator(cls): return cls return decorator + + +def try_cast_to_pandas(obj): + """ + Converts obj and all nested objects from modin to pandas if it is possible, + otherwise returns obj + + Parameters + ---------- + obj : object, + object to convert from modin to pandas + + Returns + ------- + Converted object + """ + if hasattr(obj, "_to_pandas"): + return obj._to_pandas() + if isinstance(obj, (list, tuple)): + return type(obj)([try_cast_to_pandas(o) for o in obj]) + if isinstance(obj, dict): + return {k: try_cast_to_pandas(v) for k, v in obj.items()} + if callable(obj): + module_hierarchy = getattr(obj, "__module__", "").split(".") + fn_name = getattr(obj, "__name__", None) + if fn_name and module_hierarchy[0] == "modin": + return ( + getattr(pandas.DataFrame, fn_name, obj) + if module_hierarchy[-1] == "dataframe" + else getattr(pandas.Series, fn_name, obj) + ) + return obj