From cb4a6a3a3f2ac5778133860cc0213661553aff9c Mon Sep 17 00:00:00 2001 From: Dmitry Chigarev Date: Mon, 11 Dec 2023 13:30:46 +0100 Subject: [PATCH] apply review suggestions Signed-off-by: Dmitry Chigarev --- docs/flow/modin/experimental/index.rst | 4 +- .../range_partitioning_groupby.rst | 74 ++++++++++++++++++ .../experimental/reshuffling_groupby.rst | 78 ++----------------- modin/config/envvars.py | 14 ++-- modin/config/test/test_envvars.py | 4 +- modin/core/storage_formats/pandas/groupby.py | 4 +- .../storage_formats/pandas/query_compiler.py | 12 +-- modin/numpy/arr.py | 2 +- modin/pandas/base.py | 4 +- modin/pandas/series.py | 8 +- 10 files changed, 108 insertions(+), 96 deletions(-) create mode 100644 docs/flow/modin/experimental/range_partitioning_groupby.rst diff --git a/docs/flow/modin/experimental/index.rst b/docs/flow/modin/experimental/index.rst index f11cdb12d7c..49a28f46230 100644 --- a/docs/flow/modin/experimental/index.rst +++ b/docs/flow/modin/experimental/index.rst @@ -15,7 +15,7 @@ and provides a limited set of functionality: * :doc:`xgboost ` * :doc:`sklearn ` * :doc:`batch ` -* :doc:`Reshuffling groupby ` +* :doc:`Range-partitioning group by implementation ` .. toctree:: @@ -24,4 +24,4 @@ and provides a limited set of functionality: sklearn xgboost batch - reshuffling_groupby + range_partitioning_groupby diff --git a/docs/flow/modin/experimental/range_partitioning_groupby.rst b/docs/flow/modin/experimental/range_partitioning_groupby.rst new file mode 100644 index 00000000000..c8902aedebd --- /dev/null +++ b/docs/flow/modin/experimental/range_partitioning_groupby.rst @@ -0,0 +1,74 @@ +Range-partitioning GroupBy +"""""""""""""""""""""""""" + +The range-partitioning GroupBy implementation utilizes Modin's reshuffling mechanism that gives an +ability to build range partitioning over a Modin DataFrame. + +In order to enable/disable this new implementation you have to specify ``cfg.RangePartitioningGroupby`` +:doc:`configuration variable: ` + +.. code-block:: ipython + + In [4]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True) + + In [5]: # past this point, Modin will always use the new range-partitiong groupby implementation + + In [6]: cfg.RangePartitioningGroupby.put(False) + + In [7]: # past this point, Modin won't use range-partitiong groupby implementation anymore + +The range-partitiong implementation appears to be quite efficient when compared to old TreeReduce and FullAxis implementations: + +.. note:: + + All of the examples below were run on Intel(R) Xeon(R) Gold 6238R CPU @ 2.20GHz (112 cores), 192gb RAM + +.. code-block:: ipython + + In [4]: import modin.pandas as pd; import numpy as np + + In [5]: df = pd.DataFrame(np.random.randint(0, 1_000_000, size=(1_000_000, 10)), columns=[f"col{i}" for i in range(10)]) + + In [6]: %timeit df.groupby("col0").nunique() # old full-axis implementation + Out[6]: # 2.73 s ± 28.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) + + In [7]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True) + + In [8]: %timeit df.groupby("col0").nunique() # new range-partitiong implementation + Out[8]: # 595 ms ± 61.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) + +Although it may look like the new implementation always outperforms the old ones, it's not actually true. +There's a decent overhead on building the range partitioning itself, meaning that the old implementations +may act better on smaller data sizes or when the grouping columns (a key column to build range partitioning) +have too few unique values (and thus fewer units of parallelization): + +.. code-block:: ipython + + In [4]: import modin.pandas as pd; import numpy as np + + In [5]: df = pd.DataFrame({"col0": np.tile(list("abcde"), 50_000), "col1": np.arange(250_000)}) + + In [6]: %timeit df.groupby("col0").sum() # old TreeReduce implementation + Out[6]: # 155 ms ± 5.02 ms per loop (mean ± std. dev. of 7 runs, 10 loops each) + + In [7]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True) + + In [8]: %timeit df.groupby("col0").sum() # new range-partitiong implementation + Out[8]: # 230 ms ± 22.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) + +We're still looking for a heuristic that would be able to automatically switch to the best implementation +for each groupby case, but for now, we're offering to play with this switch on your own to see which +implementation works best for your particular case. + +The new range-partitioning groupby does not yet support all of the pandas API and falls back to older +implementation with the respective warning if it meets an unsupported case: + +.. code-block:: python + + In [14]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True) + + In [15]: df.groupby(level=0).sum() + Out[15]: # UserWarning: Can't use range-partitiong groupby implementation because of: + ... # Range-partitioning groupby is only supported when grouping on a column(s) of the same frame. + ... # https://github.com/modin-project/modin/issues/5926 + ... # Falling back to a TreeReduce implementation. diff --git a/docs/flow/modin/experimental/reshuffling_groupby.rst b/docs/flow/modin/experimental/reshuffling_groupby.rst index 258ac1e37fb..feb930fd26d 100644 --- a/docs/flow/modin/experimental/reshuffling_groupby.rst +++ b/docs/flow/modin/experimental/reshuffling_groupby.rst @@ -1,74 +1,8 @@ -Range-partitioning GroupBy -"""""""""""""""""""""""""" +:orphan: -The range-partitioning GroupBy implementation utilizes Modin's reshuffling mechanism that gives an -ability to build range partitioning over a Modin DataFrame. +.. redirect to the new page +.. raw:: html -In order to enable/disable this new implementation you have to specify ``cfg.RangePartitioningGroupby`` -:doc:`configuration variable: ` - -.. code-block:: ipython - - In [4]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True) - - In [5]: # past this point, Modin will always use the new reshuffling groupby implementation - - In [6]: cfg.RangePartitioningGroupby.put(False) - - In [7]: # past this point, Modin won't use reshuffling groupby implementation anymore - -The reshuffling implementation appears to be quite efficient when compared to old TreeReduce and FullAxis implementations: - -.. note:: - - All of the examples below were run on Intel(R) Xeon(R) Gold 6238R CPU @ 2.20GHz (112 cores), 192gb RAM - -.. code-block:: ipython - - In [4]: import modin.pandas as pd; import numpy as np - - In [5]: df = pd.DataFrame(np.random.randint(0, 1_000_000, size=(1_000_000, 10)), columns=[f"col{i}" for i in range(10)]) - - In [6]: %timeit df.groupby("col0").nunique() # old full-axis implementation - Out[6]: # 2.73 s ± 28.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) - - In [7]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True) - - In [8]: %timeit df.groupby("col0").nunique() # new reshuffling implementation - Out[8]: # 595 ms ± 61.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) - -Although it may look like the new implementation always outperforms the old ones, it's not actually true. -There's a decent overhead on building the range partitioning itself, meaning that the old implementations -may act better on smaller data sizes or when the grouping columns (a key column to build range partitioning) -have too few unique values (and thus fewer units of parallelization): - -.. code-block:: ipython - - In [4]: import modin.pandas as pd; import numpy as np - - In [5]: df = pd.DataFrame({"col0": np.tile(list("abcde"), 50_000), "col1": np.arange(250_000)}) - - In [6]: %timeit df.groupby("col0").sum() # old TreeReduce implementation - Out[6]: # 155 ms ± 5.02 ms per loop (mean ± std. dev. of 7 runs, 10 loops each) - - In [7]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True) - - In [8]: %timeit df.groupby("col0").sum() # new reshuffling implementation - Out[8]: # 230 ms ± 22.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) - -We're still looking for a heuristic that would be able to automatically switch to the best implementation -for each groupby case, but for now, we're offering to play with this switch on your own to see which -implementation works best for your particular case. - -The new range-partitioning groupby does not yet support all of the pandas API and falls back to older -implementation with the respective warning if it meets an unsupported case: - -.. code-block:: python - - In [14]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True) - - In [15]: df.groupby(level=0).sum() - Out[15]: # UserWarning: Can't use reshuffling groupby implementation because of: - ... # Reshuffling groupby is only supported when grouping on a column(s) of the same frame. - ... # https://github.com/modin-project/modin/issues/5926 - ... # Falling back to a TreeReduce implementation. + diff --git a/modin/config/envvars.py b/modin/config/envvars.py index a6060592c6c..5bf3669dd6c 100644 --- a/modin/config/envvars.py +++ b/modin/config/envvars.py @@ -104,10 +104,14 @@ def get(cls) -> Any: try: old_v = cls._sibling()._get_raw_from_config() except KeyError: + # keeping `_UNSET`, will handle default value later + # when process both of the siblings old_v = _UNSET try: new_v = cls._get_raw_from_config() except KeyError: + # keeping `_UNSET`, will handle default value later + # when process both of the siblings new_v = _UNSET if old_v is not _UNSET and new_v is _UNSET: if not _TYPE_PARAMS[cls.type].verify(old_v): @@ -703,10 +707,10 @@ class GithubCI(EnvironmentVariable, type=bool): default = False -class NumpyOnModin(EnvWithSibilings, type=bool): +class ModinNumpy(EnvWithSibilings, type=bool): """Set to true to use Modin's implementation of NumPy API.""" - varname = "NUMPY_ON_MODIN" + varname = "MODIN_NUMPY" default = False @classmethod @@ -719,7 +723,7 @@ class ExperimentalNumPyAPI(EnvWithSibilings, type=bool): """ Set to true to use Modin's implementation of NumPy API. - This parameter is deprecated. Use ``NumpyOnModin`` instead. + This parameter is deprecated. Use ``ModinNumpy`` instead. """ varname = "MODIN_EXPERIMENTAL_NUMPY_API" @@ -728,13 +732,13 @@ class ExperimentalNumPyAPI(EnvWithSibilings, type=bool): @classmethod def _sibling(cls) -> type[EnvWithSibilings]: """Get a parameter sibling.""" - return NumpyOnModin + return ModinNumpy # Let the parameter's handling logic know that this variable is deprecated and that # we should raise respective warnings ExperimentalNumPyAPI._deprecation_descriptor = DeprecationDescriptor( - ExperimentalNumPyAPI, NumpyOnModin + ExperimentalNumPyAPI, ModinNumpy ) diff --git a/modin/config/test/test_envvars.py b/modin/config/test/test_envvars.py index 11f151dc87a..b37f17df49f 100644 --- a/modin/config/test/test_envvars.py +++ b/modin/config/test/test_envvars.py @@ -134,7 +134,7 @@ def test_hdk_envvar(): "deprecated_var, new_var", [ (cfg.ExperimentalGroupbyImpl, cfg.RangePartitioningGroupby), - (cfg.ExperimentalNumPyAPI, cfg.NumpyOnModin), + (cfg.ExperimentalNumPyAPI, cfg.ModinNumpy), ], ) def test_deprecated_bool_vars_warnings(deprecated_var, new_var): @@ -173,7 +173,7 @@ def test_deprecated_bool_vars_warnings(deprecated_var, new_var): "deprecated_var, new_var", [ (cfg.ExperimentalGroupbyImpl, cfg.RangePartitioningGroupby), - (cfg.ExperimentalNumPyAPI, cfg.NumpyOnModin), + (cfg.ExperimentalNumPyAPI, cfg.ModinNumpy), ], ) @pytest.mark.parametrize("get_depr_first", [True, False]) diff --git a/modin/core/storage_formats/pandas/groupby.py b/modin/core/storage_formats/pandas/groupby.py index d117c9fc4b4..596984c743f 100644 --- a/modin/core/storage_formats/pandas/groupby.py +++ b/modin/core/storage_formats/pandas/groupby.py @@ -97,14 +97,14 @@ def method(query_compiler, *args, **kwargs): try: if finalizer_fn is not None: raise NotImplementedError( - "Reshuffling groupby is not implemented yet when a finalizing function is specified." + "Range-partitioning groupby is not implemented yet when a finalizing function is specified." ) return query_compiler._groupby_shuffle( *args, agg_func=agg_name, **kwargs ) except NotImplementedError as e: ErrorMessage.warn( - f"Can't use reshuffling groupby implementation because of: {e}" + f"Can't use range-partitioning groupby implementation because of: {e}" + "\nFalling back to a TreeReduce implementation." ) return map_reduce_method(query_compiler, *args, **kwargs) diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 47b0068a045..50d2b477c71 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -3534,7 +3534,7 @@ def groupby_mean(self, by, axis, groupby_kwargs, agg_args, agg_kwargs, drop=Fals ) except NotImplementedError as e: ErrorMessage.warn( - f"Can't use reshuffling groupby implementation because of: {e}" + f"Can't use range-partitioning groupby implementation because of: {e}" + "\nFalling back to a TreeReduce implementation." ) @@ -3605,7 +3605,7 @@ def groupby_size( ) except NotImplementedError as e: ErrorMessage.warn( - f"Can't use reshuffling groupby implementation because of: {e}" + f"Can't use range-partitioning groupby implementation because of: {e}" + "\nFalling back to a TreeReduce implementation." ) @@ -3779,7 +3779,7 @@ def _groupby_shuffle( if not is_all_column_names: raise NotImplementedError( - "Reshuffling groupby is only supported when grouping on a column(s) of the same frame. " + "Range-partitioning groupby is only supported when grouping on a column(s) of the same frame. " + "https://github.com/modin-project/modin/issues/5926" ) @@ -3790,7 +3790,7 @@ def _groupby_shuffle( by_dtypes = self.dtypes[by] if any(isinstance(dtype, pandas.CategoricalDtype) for dtype in by_dtypes): raise NotImplementedError( - "Reshuffling groupby is not yet supported when grouping on a categorical column. " + "Range-partitioning groupby is not yet supported when grouping on a categorical column. " + "https://github.com/modin-project/modin/issues/5925" ) @@ -3799,7 +3799,7 @@ def _groupby_shuffle( if is_transform: # https://github.com/modin-project/modin/issues/5924 ErrorMessage.missmatch_with_pandas( - operation="reshuffling groupby", + operation="range-partitioning groupby", message="the order of rows may be shuffled for the result", ) @@ -3980,7 +3980,7 @@ def groupby_agg( # if a user wants to use range-partitioning groupby explicitly, then we should print a visible # warning to them on a failure, otherwise we're only logging it message = ( - f"Can't use reshuffling groupby implementation because of: {e}" + f"Can't use range-partitioning groupby implementation because of: {e}" + "\nFalling back to a full-axis implementation." ) get_logger().info(message) diff --git a/modin/numpy/arr.py b/modin/numpy/arr.py index d6dfd19e701..8f8880a381b 100644 --- a/modin/numpy/arr.py +++ b/modin/numpy/arr.py @@ -166,7 +166,7 @@ def __init__( ): self._siblings = [] ErrorMessage.single_warning( - "Using Modin's new NumPy API. To convert from a Modin object to a NumPy array, either turn off the NumpyOnModin flag, or use `modin.pandas.io.to_numpy`." + "Using Modin's new NumPy API. To convert from a Modin object to a NumPy array, either turn off the ModinNumpy flag, or use `modin.pandas.io.to_numpy`." ) if isinstance(object, array): _query_compiler = object._query_compiler.copy() diff --git a/modin/pandas/base.py b/modin/pandas/base.py index 8df685c2758..7671612e004 100644 --- a/modin/pandas/base.py +++ b/modin/pandas/base.py @@ -3371,9 +3371,9 @@ def to_numpy( """ Convert the `BasePandasDataset` to a NumPy array or a Modin wrapper for NumPy array. """ - from modin.config import NumpyOnModin + from modin.config import ModinNumpy - if NumpyOnModin.get(): + if ModinNumpy.get(): from ..numpy.arr import array return array(self, copy=copy) diff --git a/modin/pandas/series.py b/modin/pandas/series.py index f09a485b431..2466be99deb 100644 --- a/modin/pandas/series.py +++ b/modin/pandas/series.py @@ -496,9 +496,9 @@ def values(self): # noqa: RT01, D200 data = self.to_numpy() if isinstance(self.dtype, pd.CategoricalDtype): - from modin.config import NumpyOnModin + from modin.config import ModinNumpy - if NumpyOnModin.get(): + if ModinNumpy.get(): data = data._to_numpy() data = pd.Categorical(data, dtype=self.dtype) return data @@ -1914,9 +1914,9 @@ def to_numpy( """ Return the NumPy ndarray representing the values in this Series or Index. """ - from modin.config import NumpyOnModin + from modin.config import ModinNumpy - if not NumpyOnModin.get(): + if not ModinNumpy.get(): return ( super(Series, self) .to_numpy(