Skip to content

Commit

Permalink
apply review suggestions
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev committed Dec 11, 2023
1 parent 2836ed3 commit cb4a6a3
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 96 deletions.
4 changes: 2 additions & 2 deletions docs/flow/modin/experimental/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ and provides a limited set of functionality:
* :doc:`xgboost <xgboost>`
* :doc:`sklearn <sklearn>`
* :doc:`batch <batch>`
* :doc:`Reshuffling groupby <reshuffling_groupby>`
* :doc:`Range-partitioning group by implementation <range_partitioning_groupby>`


.. toctree::
Expand All @@ -24,4 +24,4 @@ and provides a limited set of functionality:
sklearn
xgboost
batch
reshuffling_groupby
range_partitioning_groupby
74 changes: 74 additions & 0 deletions docs/flow/modin/experimental/range_partitioning_groupby.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
Range-partitioning GroupBy
""""""""""""""""""""""""""

The range-partitioning GroupBy implementation utilizes Modin's reshuffling mechanism that gives an
ability to build range partitioning over a Modin DataFrame.

In order to enable/disable this new implementation you have to specify ``cfg.RangePartitioningGroupby``
:doc:`configuration variable: </flow/modin/config>`

.. code-block:: ipython
In [4]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True)
In [5]: # past this point, Modin will always use the new range-partitiong groupby implementation
In [6]: cfg.RangePartitioningGroupby.put(False)
In [7]: # past this point, Modin won't use range-partitiong groupby implementation anymore
The range-partitiong implementation appears to be quite efficient when compared to old TreeReduce and FullAxis implementations:

.. note::

All of the examples below were run on Intel(R) Xeon(R) Gold 6238R CPU @ 2.20GHz (112 cores), 192gb RAM

.. code-block:: ipython
In [4]: import modin.pandas as pd; import numpy as np
In [5]: df = pd.DataFrame(np.random.randint(0, 1_000_000, size=(1_000_000, 10)), columns=[f"col{i}" for i in range(10)])
In [6]: %timeit df.groupby("col0").nunique() # old full-axis implementation
Out[6]: # 2.73 s ± 28.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [7]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True)
In [8]: %timeit df.groupby("col0").nunique() # new range-partitiong implementation
Out[8]: # 595 ms ± 61.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Although it may look like the new implementation always outperforms the old ones, it's not actually true.
There's a decent overhead on building the range partitioning itself, meaning that the old implementations
may act better on smaller data sizes or when the grouping columns (a key column to build range partitioning)
have too few unique values (and thus fewer units of parallelization):

.. code-block:: ipython
In [4]: import modin.pandas as pd; import numpy as np
In [5]: df = pd.DataFrame({"col0": np.tile(list("abcde"), 50_000), "col1": np.arange(250_000)})
In [6]: %timeit df.groupby("col0").sum() # old TreeReduce implementation
Out[6]: # 155 ms ± 5.02 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [7]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True)
In [8]: %timeit df.groupby("col0").sum() # new range-partitiong implementation
Out[8]: # 230 ms ± 22.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
We're still looking for a heuristic that would be able to automatically switch to the best implementation
for each groupby case, but for now, we're offering to play with this switch on your own to see which
implementation works best for your particular case.

The new range-partitioning groupby does not yet support all of the pandas API and falls back to older
implementation with the respective warning if it meets an unsupported case:

.. code-block:: python
In [14]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True)
In [15]: df.groupby(level=0).sum()
Out[15]: # UserWarning: Can't use range-partitiong groupby implementation because of:
... # Range-partitioning groupby is only supported when grouping on a column(s) of the same frame.
... # https://github.com/modin-project/modin/issues/5926
... # Falling back to a TreeReduce implementation.
78 changes: 6 additions & 72 deletions docs/flow/modin/experimental/reshuffling_groupby.rst
Original file line number Diff line number Diff line change
@@ -1,74 +1,8 @@
Range-partitioning GroupBy
""""""""""""""""""""""""""
:orphan:

The range-partitioning GroupBy implementation utilizes Modin's reshuffling mechanism that gives an
ability to build range partitioning over a Modin DataFrame.
.. redirect to the new page
.. raw:: html

In order to enable/disable this new implementation you have to specify ``cfg.RangePartitioningGroupby``
:doc:`configuration variable: </flow/modin/config>`

.. code-block:: ipython
In [4]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True)
In [5]: # past this point, Modin will always use the new reshuffling groupby implementation
In [6]: cfg.RangePartitioningGroupby.put(False)
In [7]: # past this point, Modin won't use reshuffling groupby implementation anymore
The reshuffling implementation appears to be quite efficient when compared to old TreeReduce and FullAxis implementations:

.. note::

All of the examples below were run on Intel(R) Xeon(R) Gold 6238R CPU @ 2.20GHz (112 cores), 192gb RAM

.. code-block:: ipython
In [4]: import modin.pandas as pd; import numpy as np
In [5]: df = pd.DataFrame(np.random.randint(0, 1_000_000, size=(1_000_000, 10)), columns=[f"col{i}" for i in range(10)])
In [6]: %timeit df.groupby("col0").nunique() # old full-axis implementation
Out[6]: # 2.73 s ± 28.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [7]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True)
In [8]: %timeit df.groupby("col0").nunique() # new reshuffling implementation
Out[8]: # 595 ms ± 61.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Although it may look like the new implementation always outperforms the old ones, it's not actually true.
There's a decent overhead on building the range partitioning itself, meaning that the old implementations
may act better on smaller data sizes or when the grouping columns (a key column to build range partitioning)
have too few unique values (and thus fewer units of parallelization):

.. code-block:: ipython
In [4]: import modin.pandas as pd; import numpy as np
In [5]: df = pd.DataFrame({"col0": np.tile(list("abcde"), 50_000), "col1": np.arange(250_000)})
In [6]: %timeit df.groupby("col0").sum() # old TreeReduce implementation
Out[6]: # 155 ms ± 5.02 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [7]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True)
In [8]: %timeit df.groupby("col0").sum() # new reshuffling implementation
Out[8]: # 230 ms ± 22.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
We're still looking for a heuristic that would be able to automatically switch to the best implementation
for each groupby case, but for now, we're offering to play with this switch on your own to see which
implementation works best for your particular case.

The new range-partitioning groupby does not yet support all of the pandas API and falls back to older
implementation with the respective warning if it meets an unsupported case:

.. code-block:: python
In [14]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True)
In [15]: df.groupby(level=0).sum()
Out[15]: # UserWarning: Can't use reshuffling groupby implementation because of:
... # Reshuffling groupby is only supported when grouping on a column(s) of the same frame.
... # https://github.com/modin-project/modin/issues/5926
... # Falling back to a TreeReduce implementation.
<script type="text/javascript">
window.location.href = 'range_partitioning_groupby.html';
</script>
14 changes: 9 additions & 5 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,14 @@ def get(cls) -> Any:
try:
old_v = cls._sibling()._get_raw_from_config()
except KeyError:
# keeping `_UNSET`, will handle default value later
# when process both of the siblings
old_v = _UNSET
try:
new_v = cls._get_raw_from_config()
except KeyError:
# keeping `_UNSET`, will handle default value later
# when process both of the siblings
new_v = _UNSET
if old_v is not _UNSET and new_v is _UNSET:
if not _TYPE_PARAMS[cls.type].verify(old_v):
Expand Down Expand Up @@ -703,10 +707,10 @@ class GithubCI(EnvironmentVariable, type=bool):
default = False


class NumpyOnModin(EnvWithSibilings, type=bool):
class ModinNumpy(EnvWithSibilings, type=bool):
"""Set to true to use Modin's implementation of NumPy API."""

varname = "NUMPY_ON_MODIN"
varname = "MODIN_NUMPY"
default = False

@classmethod
Expand All @@ -719,7 +723,7 @@ class ExperimentalNumPyAPI(EnvWithSibilings, type=bool):
"""
Set to true to use Modin's implementation of NumPy API.
This parameter is deprecated. Use ``NumpyOnModin`` instead.
This parameter is deprecated. Use ``ModinNumpy`` instead.
"""

varname = "MODIN_EXPERIMENTAL_NUMPY_API"
Expand All @@ -728,13 +732,13 @@ class ExperimentalNumPyAPI(EnvWithSibilings, type=bool):
@classmethod
def _sibling(cls) -> type[EnvWithSibilings]:
"""Get a parameter sibling."""
return NumpyOnModin
return ModinNumpy


# Let the parameter's handling logic know that this variable is deprecated and that
# we should raise respective warnings
ExperimentalNumPyAPI._deprecation_descriptor = DeprecationDescriptor(
ExperimentalNumPyAPI, NumpyOnModin
ExperimentalNumPyAPI, ModinNumpy
)


Expand Down
4 changes: 2 additions & 2 deletions modin/config/test/test_envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def test_hdk_envvar():
"deprecated_var, new_var",
[
(cfg.ExperimentalGroupbyImpl, cfg.RangePartitioningGroupby),
(cfg.ExperimentalNumPyAPI, cfg.NumpyOnModin),
(cfg.ExperimentalNumPyAPI, cfg.ModinNumpy),
],
)
def test_deprecated_bool_vars_warnings(deprecated_var, new_var):
Expand Down Expand Up @@ -173,7 +173,7 @@ def test_deprecated_bool_vars_warnings(deprecated_var, new_var):
"deprecated_var, new_var",
[
(cfg.ExperimentalGroupbyImpl, cfg.RangePartitioningGroupby),
(cfg.ExperimentalNumPyAPI, cfg.NumpyOnModin),
(cfg.ExperimentalNumPyAPI, cfg.ModinNumpy),
],
)
@pytest.mark.parametrize("get_depr_first", [True, False])
Expand Down
4 changes: 2 additions & 2 deletions modin/core/storage_formats/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ def method(query_compiler, *args, **kwargs):
try:
if finalizer_fn is not None:
raise NotImplementedError(
"Reshuffling groupby is not implemented yet when a finalizing function is specified."
"Range-partitioning groupby is not implemented yet when a finalizing function is specified."
)
return query_compiler._groupby_shuffle(
*args, agg_func=agg_name, **kwargs
)
except NotImplementedError as e:
ErrorMessage.warn(
f"Can't use reshuffling groupby implementation because of: {e}"
f"Can't use range-partitioning groupby implementation because of: {e}"
+ "\nFalling back to a TreeReduce implementation."
)
return map_reduce_method(query_compiler, *args, **kwargs)
Expand Down
12 changes: 6 additions & 6 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3534,7 +3534,7 @@ def groupby_mean(self, by, axis, groupby_kwargs, agg_args, agg_kwargs, drop=Fals
)
except NotImplementedError as e:
ErrorMessage.warn(
f"Can't use reshuffling groupby implementation because of: {e}"
f"Can't use range-partitioning groupby implementation because of: {e}"
+ "\nFalling back to a TreeReduce implementation."
)

Expand Down Expand Up @@ -3605,7 +3605,7 @@ def groupby_size(
)
except NotImplementedError as e:
ErrorMessage.warn(
f"Can't use reshuffling groupby implementation because of: {e}"
f"Can't use range-partitioning groupby implementation because of: {e}"
+ "\nFalling back to a TreeReduce implementation."
)

Expand Down Expand Up @@ -3779,7 +3779,7 @@ def _groupby_shuffle(

if not is_all_column_names:
raise NotImplementedError(
"Reshuffling groupby is only supported when grouping on a column(s) of the same frame. "
"Range-partitioning groupby is only supported when grouping on a column(s) of the same frame. "
+ "https://github.com/modin-project/modin/issues/5926"
)

Expand All @@ -3790,7 +3790,7 @@ def _groupby_shuffle(
by_dtypes = self.dtypes[by]
if any(isinstance(dtype, pandas.CategoricalDtype) for dtype in by_dtypes):
raise NotImplementedError(
"Reshuffling groupby is not yet supported when grouping on a categorical column. "
"Range-partitioning groupby is not yet supported when grouping on a categorical column. "
+ "https://github.com/modin-project/modin/issues/5925"
)

Expand All @@ -3799,7 +3799,7 @@ def _groupby_shuffle(
if is_transform:
# https://github.com/modin-project/modin/issues/5924
ErrorMessage.missmatch_with_pandas(
operation="reshuffling groupby",
operation="range-partitioning groupby",
message="the order of rows may be shuffled for the result",
)

Expand Down Expand Up @@ -3980,7 +3980,7 @@ def groupby_agg(
# if a user wants to use range-partitioning groupby explicitly, then we should print a visible
# warning to them on a failure, otherwise we're only logging it
message = (
f"Can't use reshuffling groupby implementation because of: {e}"
f"Can't use range-partitioning groupby implementation because of: {e}"
+ "\nFalling back to a full-axis implementation."
)
get_logger().info(message)
Expand Down
2 changes: 1 addition & 1 deletion modin/numpy/arr.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def __init__(
):
self._siblings = []
ErrorMessage.single_warning(
"Using Modin's new NumPy API. To convert from a Modin object to a NumPy array, either turn off the NumpyOnModin flag, or use `modin.pandas.io.to_numpy`."
"Using Modin's new NumPy API. To convert from a Modin object to a NumPy array, either turn off the ModinNumpy flag, or use `modin.pandas.io.to_numpy`."
)
if isinstance(object, array):
_query_compiler = object._query_compiler.copy()
Expand Down
4 changes: 2 additions & 2 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3371,9 +3371,9 @@ def to_numpy(
"""
Convert the `BasePandasDataset` to a NumPy array or a Modin wrapper for NumPy array.
"""
from modin.config import NumpyOnModin
from modin.config import ModinNumpy

if NumpyOnModin.get():
if ModinNumpy.get():
from ..numpy.arr import array

return array(self, copy=copy)
Expand Down
8 changes: 4 additions & 4 deletions modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -496,9 +496,9 @@ def values(self): # noqa: RT01, D200

data = self.to_numpy()
if isinstance(self.dtype, pd.CategoricalDtype):
from modin.config import NumpyOnModin
from modin.config import ModinNumpy

if NumpyOnModin.get():
if ModinNumpy.get():
data = data._to_numpy()
data = pd.Categorical(data, dtype=self.dtype)
return data
Expand Down Expand Up @@ -1914,9 +1914,9 @@ def to_numpy(
"""
Return the NumPy ndarray representing the values in this Series or Index.
"""
from modin.config import NumpyOnModin
from modin.config import ModinNumpy

if not NumpyOnModin.get():
if not ModinNumpy.get():
return (
super(Series, self)
.to_numpy(
Expand Down

0 comments on commit cb4a6a3

Please sign in to comment.