Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEAT-#6301: Simplify usage of algebra operators to define custom functions #6302

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
57 changes: 40 additions & 17 deletions docs/flow/modin/core/dataframe/algebra.rst
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ Uniformly apply a function argument to each partition in parallel.
.. figure:: /img/map_evaluation.svg
:align: center

.. autoclass:: modin.core.dataframe.algebra.map.Map
:members: register, apply

Reduce operator
---------------
Applies an argument function that reduces each column or row on the specified axis into a scalar, but requires knowledge about the whole axis.
Expand All @@ -43,13 +46,19 @@ that the reduce function returns a one dimensional frame.
.. figure:: /img/reduce_evaluation.svg
:align: center

.. autoclass:: modin.core.dataframe.algebra.reduce.Reduce
:members: register, apply

TreeReduce operator
-------------------
Applies an argument function that reduces specified axis into a scalar. First applies map function to each partition
in parallel, then concatenates resulted partitions along the specified axis and applies reduce
function. In contrast with `Map function` template, here you're allowed to change partition shape
in the map phase. Note that the execution engine expects that the reduce function returns a one dimensional frame.

.. autoclass:: modin.core.dataframe.algebra.tree_reduce.TreeReduce
:members: register, apply

Binary operator
---------------
Applies an argument function, that takes exactly two operands (first is always `QueryCompiler`).
Expand All @@ -65,23 +74,32 @@ the right operand to the left.
it automatically but note that this requires repartitioning, which is a much
more expensive operation than the binary function itself.

.. autoclass:: modin.core.dataframe.algebra.binary.Binary
:members: register, apply

Fold operator
-------------
Applies an argument function that requires knowledge of the whole axis. Be aware that providing this knowledge may be
expensive because the execution engine has to concatenate partitions along the specified axis.

.. autoclass:: modin.core.dataframe.algebra.fold.Fold
:members: register, apply

GroupBy operator
----------------
Evaluates GroupBy aggregation for that type of functions that can be executed via TreeReduce approach.
To be able to form groups engine broadcasts ``by`` partitions to each partition of the source frame.

.. autoclass:: modin.core.dataframe.algebra.groupby.GroupByReduce
:members: register, apply

Default-to-pandas operator
--------------------------
Do :doc:`fallback to pandas </supported_apis/defaulting_to_pandas>` for passed function.


How to register your own function
'''''''''''''''''''''''''''''''''
How to use UDFs with these operators
''''''''''''''''''''''''''''''''''''
Let's examine an example of how to use the algebra module to create your own
new functions.

Expand All @@ -95,34 +113,39 @@ Let's implement a function that counts non-NA values for each column or row
TreeReduce approach would be great: in a map phase, we'll count non-NA cells in each
partition in parallel and then just sum its results in the reduce phase.

To define the TreeReduce function that does `count` + `sum` we just need to register the
appropriate functions and then assign the result to the picked `QueryCompiler`
(`PandasQueryCompiler` in our case):
To execute a TreeReduce function that does `count` + `sum` you can simply use the operator's ``.apply(...)``
method that takes and outputs a :py:class:`~modin.pandas.dataframe.DataFrame`:

.. code-block:: python

from modin.core.storage_formats import PandasQueryCompiler
from modin.core.dataframe.algebra import TreeReduce

PandasQueryCompiler.custom_count = TreeReduce.register(pandas.DataFrame.count, pandas.DataFrame.sum)
res_df = TreeReduce.apply(
df,
map_func=lambda df: df.count(),
reduce_function=lambda df: df.sum()
)

Then, we want to handle it from the :py:class:`~modin.pandas.dataframe.DataFrame`, so we need to create a way to do that:
If you're going to use your custom-defined function quite often you may want
to wrap it into a separate function or assign it as a DataFrame's method:

.. code-block:: python

import modin.pandas as pd

def count_func(self, **kwargs):
# The constructor allows you to pass in a query compiler as a keyword argument
return self.__constructor__(query_compiler=self._query_compiler.custom_count(**kwargs))
def count_func(self):
return TreeReduce.apply(
self,
map_func=lambda df: df.count(),
reduce_function=lambda df: df.sum()
)

pd.DataFrame.count_custom = count_func

And then you can use it like you usually would:
# you can then use the function as is
res = count_func(df)

.. code-block:: python

df.count_custom(axis=1)
# or assign it to the DataFrame's class and use it as a method
pd.DataFrame.count_custom = count_func
res = df.count_custom()

Many of the `pandas` API functions can be easily implemented this way, so if you find
out that one of your favorite function is still defaulted to pandas and decide to
Expand Down
42 changes: 41 additions & 1 deletion modin/core/dataframe/algebra/binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,7 @@ def register(
"""

def caller(
query_compiler, other, broadcast=False, *args, dtypes=None, **kwargs
query_compiler, other, *args, broadcast=False, dtypes=None, **kwargs
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed the order so the function's positional arguments won't conflict with the keyword broadcast arg.

):
"""
Apply binary `func` to passed operands.
Expand Down Expand Up @@ -414,3 +414,43 @@ def caller(
)

return caller

@classmethod
def apply(
cls, left, right, func, axis=0, func_args=None, func_kwargs=None, **kwargs
):
r"""
Apply a binary function row-wise or column-wise.

Parameters
----------
left : modin.pandas.DataFrame or modin.pandas.Series
Left operand.
right : modin.pandas.DataFrame or modin.pandas.Series
Right operand.
Comment on lines +427 to +430
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This layer depends on the upper layer in every apply.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lower layer still takes the object(s) from the API layer, namely, modin.pandas.DataFrame/Series. Then, there is a func, which takes pandas.DataFrame/Series. Also, there is a kwargs argument that needs to be passed to the cls.register(). What is cls.register for the user? Doesn't this look a little complicated for the user? So many things to understand. I am also thinking that we are trying to simplify the things not for the user, but for us ourselves, when we are re-writing a customer workload to get better performance.

func : callable(pandas.DataFrame, pandas.DataFrame, \*args, axis, \*\*kwargs) -> pandas.DataFrame
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the signature is wrong here, as the implementation explicitly passes Query Compilers as arguments...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, func here is a kernel that will be applied to deserialized partitions (pandas dataframes) so the signature is correct

follow the track of the func:

  1. register(func)
  2. register(func) -> modin_dataframe.apply_full_axis(func, ...)

A binary function to apply `left` and `right`.
axis : int, default: 0
Whether to apply the function across rows (``axis=0``) or across columns (``axis=1``).
func_args : tuple, optional
Positional arguments to pass to the funcs.
func_kwargs : dict, optional
Keyword arguments to pass to the funcs.
**kwargs : dict
Additional arguments to pass to the ``cls.register()``.

Returns
-------
The same type as `df`.
"""
operator = cls.register(func, **kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what is the purpose of .register for a one-off thing

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, that's the only way of how we can get the caller function


qc_result = operator(
left._query_compiler,
right._query_compiler,
broadcast=right.ndim == 1,
*(func_args or ()),
axis=axis,
**(func_kwargs or {}),
)
return left.__constructor__(query_compiler=qc_result)
26 changes: 26 additions & 0 deletions modin/core/dataframe/algebra/fold.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,29 @@ def caller(query_compiler, fold_axis=None, *args, **kwargs):
)

return caller

@classmethod
def apply(cls, df, func, fold_axis=0, func_args=None, func_kwargs=None):
r"""
Apply a Fold (full-axis) function to the dataframe.

Parameters
----------
df : modin.pandas.DataFrame or modin.pandas.Series
DataFrame object to apply the operator against.
func : callable(pandas.DataFrame[NxM], \*args, \*\*kwargs) -> pandas.DataFrame[NxM]
A function to apply to every partition. Note that the function shouldn't change
the shape of the dataframe.
fold_axis : int, default: 0
Whether to apply the function across rows (``axis=0``) or across columns (``axis=1``).
func_args : tuple, optional
Positional arguments to pass to the funcs.
func_kwargs : dict, optional
Keyword arguments to pass to the funcs.

Returns
-------
the same type as `df`
"""
func_args = (fold_axis,) + (func_args or ())
return super().apply(df, func, func_args, func_kwargs or {})
48 changes: 48 additions & 0 deletions modin/core/dataframe/algebra/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -736,3 +736,51 @@ def wrapper(df):
return result

return _map, _reduce

@classmethod
def apply(
cls,
df,
map_func,
reduce_func,
by,
groupby_kwargs=None,
agg_args=None,
agg_kwargs=None,
):
"""
Apply groupby aggregation function using map-reduce pattern.

Parameters
----------
df : modin.pandas.DataFrame or modin.pandas.Series
A source DataFrame to group.
map_func : callable(pandas.core.groupby.DataFrameGroupBy) -> pandas.DataFrame
A map function to apply to a groupby object in every partition.
reduce_func : callable(pandas.core.groupby.DataFrameGroupBy) -> pandas.DataFrame
A reduction function to apply to the results of the map functions.
by : label or list of labels
Columns of the `df` to group on.
groupby_kwargs : dict, optional
Keyword arguments matching the signature of ``pandas.DataFrame.groupby``.
agg_args : tuple, optional
Positional arguments to pass to the funcs.
agg_kwargs : dict, optional
Keyword arguments to pass to the funcs.

Returns
-------
The same type as `df`.
"""
operator = cls.register(map_func, reduce_func)
qc_result = operator(
df._query_compiler,
df[by]._query_compiler,
axis=0,
groupby_kwargs=groupby_kwargs or {},
agg_args=agg_args or (),
agg_kwargs=agg_kwargs or {},
drop=True,
)

return df.__constructor__(query_compiler=qc_result)
28 changes: 28 additions & 0 deletions modin/core/dataframe/algebra/operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,31 @@ def validate_axis(cls, axis: Optional[int]) -> int:
Integer representation of given axis.
"""
return 0 if axis is None else axis

@classmethod
def apply(cls, df, func, func_args=None, func_kwargs=None, **kwargs):
r"""
Apply a function to a Modin DataFrame using the operators scheme.

Parameters
----------
df : modin.pandas.DataFrame or modin.pandas.Series
DataFrame object to apply the operator against.
func : callable(pandas.DataFrame, \*args, \*\*kwargs) -> pandas.DataFrame
A function to apply.
func_args : tuple, optional
Positional arguments to pass to the `func`.
func_kwargs : dict, optional
Keyword arguments to pass to the `func`.
**kwargs : dict
Aditional arguments to pass to the ``cls.register()``.

Returns
-------
return_type
"""
operator = cls.register(func, **kwargs)
qc_result = operator(
df._query_compiler, *(func_args or ()), **(func_kwargs or {})
)
return df.__constructor__(query_compiler=qc_result)
25 changes: 25 additions & 0 deletions modin/core/dataframe/algebra/reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,28 @@ def caller(query_compiler, *args, **kwargs):
)

return caller

@classmethod
def apply(cls, df, func, axis=0, func_args=None, func_kwargs=None):
r"""
Apply a reduction function to each row/column partition of the dataframe.

Parameters
----------
df : modin.pandas.DataFrame or modin.pandas.Series
DataFrame object to apply the operator against.
func : callable(pandas.DataFrame, \*args, \*\*kwargs) -> Union[pandas.Series, pandas.DataFrame[1xN]]
A function to apply.
axis : int, default: 0
Whether to apply the function across rows (``axis=0``) or across columns (``axis=1``).
func_args : tuple, optional
Positional arguments to pass to the `func`.
func_kwargs : dict, optional
Keyword arguments to pass to the `func`.

Returns
-------
modin.pandas.Series
"""
result = super().apply(df, func, func_args, func_kwargs, axis=axis)
return result if result.ndim == 1 else result.squeeze(axis)
36 changes: 36 additions & 0 deletions modin/core/dataframe/algebra/tree_reduce.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,39 @@ def caller(query_compiler, *args, **kwargs):
)

return caller

@classmethod
def apply(
cls, df, map_function, reduce_function, axis=0, func_args=None, func_kwargs=None
):
r"""
Apply a map-reduce function to the dataframe.

Parameters
----------
df : modin.pandas.DataFrame or modin.pandas.Series
DataFrame object to apply the operator against.
map_function : callable(pandas.DataFrame, \*args, \*\*kwargs) -> pandas.DataFrame
A map function to apply to every partition.
reduce_function : callable(pandas.DataFrame, \*args, \*\*kwargs) -> Union[pandas.Series, pandas.DataFrame[1xN]]
A reduction function to apply to the results of the map functions.
axis : int, default: 0
Whether to apply the reduce function across rows (``axis=0``) or across columns (``axis=1``).
func_args : tuple, optional
Positional arguments to pass to the funcs.
func_kwargs : dict, optional
Keyword arguments to pass to the funcs.

Returns
-------
modin.pandas.Series
"""
result = super().apply(
df,
map_function,
func_args,
func_kwargs,
reduce_function=reduce_function,
axis=axis,
)
return result if result.ndim == 1 else result.squeeze(axis)
Loading