Skip to content

Commit

Permalink
REFACTOR-modin-project#7105: Deprecate 'cfg.RangePartitioningGroupby'
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev committed Apr 9, 2024
1 parent 5a518ca commit 2744476
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 65 deletions.
2 changes: 1 addition & 1 deletion .github/actions/run-core-tests/group_3/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ runs:
shell: bash -l {0}
- run: |
echo "::group::Running range-partitioning tests (group 3)..."
MODIN_RANGE_PARTITIONING_GROUPBY=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/tests/pandas/test_groupby.py
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/tests/pandas/test_groupby.py
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/tests/pandas/test_series.py -k "test_unique or test_nunique or drop_duplicates or test_resample"
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/tests/pandas/test_general.py -k "test_unique"
MODIN_RANGE_PARTITIONING=1 ${{ inputs.runner }} ${{ inputs.parallel }} modin/tests/pandas/dataframe/test_map_metadata.py -k "drop_duplicates"
Expand Down
12 changes: 6 additions & 6 deletions docs/flow/modin/experimental/range_partitioning_groupby.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ Range-partitioning GroupBy
The range-partitioning GroupBy implementation utilizes Modin's reshuffling mechanism that gives an
ability to build range partitioning over a Modin DataFrame.

In order to enable/disable the range-partitiong implementation you have to specify ``cfg.RangePartitioningGroupby``
In order to enable/disable the range-partitiong implementation you have to specify ``cfg.RangePartitioning``
:doc:`configuration variable: </flow/modin/config>`

.. code-block:: ipython
In [4]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True)
In [4]: import modin.config as cfg; cfg.RangePartitioning.put(True)
In [5]: # past this point, Modin will always use the range-partitiong groupby implementation
In [6]: cfg.RangePartitioningGroupby.put(False)
In [6]: cfg.RangePartitioning.put(False)
In [7]: # past this point, Modin won't use range-partitiong groupby implementation anymore
Expand All @@ -32,7 +32,7 @@ The range-partitiong implementation appears to be quite efficient when compared
In [6]: %timeit df.groupby("col0").nunique() # full-axis implementation
Out[6]: # 2.73 s ± 28.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [7]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True)
In [7]: import modin.config as cfg; cfg.RangePartitioning.put(True)
In [8]: %timeit df.groupby("col0").nunique() # range-partitiong implementation
Out[8]: # 595 ms ± 61.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Expand All @@ -51,7 +51,7 @@ have too few unique values (and thus fewer units of parallelization):
In [6]: %timeit df.groupby("col0").sum() # TreeReduce implementation
Out[6]: # 155 ms ± 5.02 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
In [7]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True)
In [7]: import modin.config as cfg; cfg.RangePartitioning.put(True)
In [8]: %timeit df.groupby("col0").sum() # range-partitiong implementation
Out[8]: # 230 ms ± 22.2 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
Expand All @@ -65,7 +65,7 @@ implementation with the respective warning if it meets an unsupported case:

.. code-block:: python
In [14]: import modin.config as cfg; cfg.RangePartitioningGroupby.put(True)
In [14]: import modin.config as cfg; cfg.RangePartitioning.put(True)
In [15]: df.groupby(level=0).sum()
Out[15]: # UserWarning: Can't use range-partitiong groupby implementation because of:
Expand Down
43 changes: 26 additions & 17 deletions modin/config/envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -733,14 +733,28 @@ def _sibling(cls) -> type[EnvWithSibilings]:
)


class RangePartitioning(EnvWithSibilings, type=bool):
"""
Set to true to use Modin's range-partitioning implementation where possible.
Please refer to documentation for cases where enabling this options would be beneficial:
https://modin.readthedocs.io/en/stable/flow/modin/experimental/range_partitioning_groupby.html
"""

varname = "MODIN_RANGE_PARTITIONING"
default = False

@classmethod
def _sibling(cls) -> type[EnvWithSibilings]:
"""Get a parameter sibling."""
return RangePartitioningGroupby


class RangePartitioningGroupby(EnvWithSibilings, type=bool):
"""
Set to true to use Modin's range-partitioning group by implementation.
Experimental groupby is implemented using a range-partitioning technique,
note that it may not always work better than the original Modin's TreeReduce
and FullAxis implementations. For more information visit the according section
of Modin's documentation: TODO: add a link to the section once it's written.
This parameter is deprecated. Use ``RangePartitioning`` instead.
"""

varname = "MODIN_RANGE_PARTITIONING_GROUPBY"
Expand All @@ -752,11 +766,18 @@ def _sibling(cls) -> type[EnvWithSibilings]:
return ExperimentalGroupbyImpl


# Let the parameter's handling logic know that this variable is deprecated and that
# we should raise respective warnings
RangePartitioningGroupby._deprecation_descriptor = DeprecationDescriptor(
RangePartitioningGroupby, RangePartitioning
)


class ExperimentalGroupbyImpl(EnvWithSibilings, type=bool):
"""
Set to true to use Modin's range-partitioning group by implementation.
This parameter is deprecated. Use ``RangePartitioningGroupby`` instead.
This parameter is deprecated. Use ``RangePartitioning`` instead.
"""

varname = "MODIN_EXPERIMENTAL_GROUPBY"
Expand All @@ -775,18 +796,6 @@ def _sibling(cls) -> type[EnvWithSibilings]:
)


class RangePartitioning(EnvironmentVariable, type=bool):
"""
Set to true to use Modin's range-partitioning implementation where possible.
Please refer to documentation for cases where enabling this options would be beneficial:
https://modin.readthedocs.io/en/stable/flow/modin/experimental/range_partitioning_groupby.html
"""

varname = "MODIN_RANGE_PARTITIONING"
default = False


class CIAWSSecretAccessKey(EnvironmentVariable, type=str):
"""Set to AWS_SECRET_ACCESS_KEY when running mock S3 tests for Modin in GitHub CI."""

Expand Down
10 changes: 8 additions & 2 deletions modin/core/storage_formats/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@

"""Contains implementations for GroupbyReduce functions."""

import warnings

import numpy as np
import pandas

from modin.config import RangePartitioningGroupby
from modin.config import RangePartitioning, RangePartitioningGroupby
from modin.core.dataframe.algebra import GroupByReduce
from modin.error_message import ErrorMessage
from modin.utils import hashable
Expand Down Expand Up @@ -93,7 +95,11 @@ def build_qc_method(cls, agg_name, finalizer_fn=None):
)

def method(query_compiler, *args, **kwargs):
if RangePartitioningGroupby.get():
with warnings.catch_warnings():
# filter deprecation warning, it was already showed when a user set the variable
warnings.filterwarnings("ignore", category=FutureWarning)
old_range_part_var = RangePartitioningGroupby.get()
if RangePartitioning.get() or old_range_part_var:
try:
if finalizer_fn is not None:
raise NotImplementedError(
Expand Down
22 changes: 17 additions & 5 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3537,7 +3537,11 @@ def groupby_nth(
return result

def groupby_mean(self, by, axis, groupby_kwargs, agg_args, agg_kwargs, drop=False):
if RangePartitioningGroupby.get():
with warnings.catch_warnings():
# filter deprecation warning, it was already showed when a user set the variable
warnings.filterwarnings("ignore", category=FutureWarning)
old_range_part_var = RangePartitioningGroupby.get()
if RangePartitioning.get() or old_range_part_var:
try:
return self._groupby_shuffle(
by=by,
Expand Down Expand Up @@ -3608,7 +3612,11 @@ def groupby_size(
agg_kwargs,
drop=False,
):
if RangePartitioningGroupby.get():
with warnings.catch_warnings():
# filter deprecation warning, it was already showed when a user set the variable
warnings.filterwarnings("ignore", category=FutureWarning)
old_range_part_var = RangePartitioningGroupby.get()
if RangePartitioning.get() or old_range_part_var:
try:
return self._groupby_shuffle(
by=by,
Expand Down Expand Up @@ -4027,9 +4035,13 @@ def groupby_agg(
)

# 'group_wise' means 'groupby.apply()'. We're certain that range-partitioning groupby
# always works better for '.apply()', so we're using it regardless of the 'RangePartitioningGroupby'
# always works better for '.apply()', so we're using it regardless of the 'RangePartitioning'
# value
if how == "group_wise" or RangePartitioningGroupby.get():
with warnings.catch_warnings():
# filter deprecation warning, it was already showed when a user set the variable
warnings.filterwarnings("ignore", category=FutureWarning)
old_range_part_var = RangePartitioningGroupby.get()
if how == "group_wise" or RangePartitioning.get() or old_range_part_var:
try:
return self._groupby_shuffle(
by=by,
Expand All @@ -4050,7 +4062,7 @@ def groupby_agg(
+ "\nFalling back to a full-axis implementation."
)
get_logger().info(message)
if RangePartitioningGroupby.get():
if RangePartitioning.get() or old_range_part_var:
ErrorMessage.warn(message)

if isinstance(agg_func, dict) and GroupbyReduceImpl.has_impl_for(agg_func):
Expand Down
3 changes: 2 additions & 1 deletion modin/tests/config/test_envvars.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,9 @@ def test_hdk_envvar():
@pytest.mark.parametrize(
"deprecated_var, new_var",
[
(cfg.ExperimentalGroupbyImpl, cfg.RangePartitioningGroupby),
(cfg.ExperimentalGroupbyImpl, cfg.RangePartitioning),
(cfg.ExperimentalNumPyAPI, cfg.ModinNumpy),
(cfg.RangePartitioningGroupby, cfg.RangePartitioning),
],
)
def test_deprecated_bool_vars_warnings(deprecated_var, new_var):
Expand Down
14 changes: 6 additions & 8 deletions modin/tests/core/storage_formats/pandas/test_internals.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import pytest

import modin.pandas as pd
from modin.config import Engine, MinPartitionSize, NPartitions, RangePartitioningGroupby
from modin.config import Engine, MinPartitionSize, NPartitions, RangePartitioning
from modin.core.dataframe.pandas.dataframe.dataframe import PandasDataframe
from modin.core.dataframe.pandas.dataframe.utils import ColumnInfo, ShuffleSortFunctions
from modin.core.dataframe.pandas.metadata import (
Expand Down Expand Up @@ -1174,9 +1174,7 @@ def test_setitem_unhashable_preserve_dtypes():
assert df._query_compiler._modin_frame.has_materialized_dtypes


@pytest.mark.parametrize(
"modify_config", [{RangePartitioningGroupby: True}], indirect=True
)
@pytest.mark.parametrize("modify_config", [{RangePartitioning: True}], indirect=True)
def test_groupby_size_shuffling(modify_config):
# verifies that 'groupby.size()' works with reshuffling implementation
# https://github.com/modin-project/modin/issues/6367
Expand Down Expand Up @@ -2447,15 +2445,15 @@ def test_groupby_index_dtype(self):
assert res_dtypes._known_dtypes["a"] == np.dtype("int64")

# case 2: ExperimentalImpl impl, Series as an output of groupby
RangePartitioningGroupby.put(True)
RangePartitioning.put(True)
try:
df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]})
res = df.groupby("a").size().reset_index(name="new_name")
res_dtypes = res._query_compiler._modin_frame._dtypes._value
assert "a" in res_dtypes._known_dtypes
assert res_dtypes._known_dtypes["a"] == np.dtype("int64")
finally:
RangePartitioningGroupby.put(False)
RangePartitioning.put(False)

# case 3: MapReduce impl, DataFrame as an output of groupby
df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]})
Expand All @@ -2465,15 +2463,15 @@ def test_groupby_index_dtype(self):
assert res_dtypes._known_dtypes["a"] == np.dtype("int64")

# case 4: ExperimentalImpl impl, DataFrame as an output of groupby
RangePartitioningGroupby.put(True)
RangePartitioning.put(True)
try:
df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]})
res = df.groupby("a").sum().reset_index()
res_dtypes = res._query_compiler._modin_frame._dtypes._value
assert "a" in res_dtypes._known_dtypes
assert res_dtypes._known_dtypes["a"] == np.dtype("int64")
finally:
RangePartitioningGroupby.put(False)
RangePartitioning.put(False)

# case 5: FullAxis impl, DataFrame as an output of groupby
df = pd.DataFrame({"a": [1, 2, 2], "b": [3, 4, 5]})
Expand Down
34 changes: 11 additions & 23 deletions modin/tests/pandas/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@
import pytest

import modin.pandas as pd
from modin.config import (
IsRayCluster,
NPartitions,
RangePartitioningGroupby,
StorageFormat,
)
from modin.config import IsRayCluster, NPartitions, RangePartitioning, StorageFormat
from modin.core.dataframe.algebra.default2pandas.groupby import GroupBy
from modin.core.dataframe.pandas.partitioning.axis_partition import (
PandasDataframeAxisPartition,
Expand Down Expand Up @@ -290,7 +285,7 @@ def test_mixed_dtypes_groupby(as_index):
# This test though produces so much NaN values in the result, so it's impossible to sort,
# using manual comparison of set of rows instead
assert_set_of_rows_identical
if RangePartitioningGroupby.get()
if RangePartitioning.get()
else None
),
)
Expand Down Expand Up @@ -362,7 +357,7 @@ def test_mixed_dtypes_groupby(as_index):
eval_max(modin_groupby, pandas_groupby)
eval_len(modin_groupby, pandas_groupby)
eval_sum(modin_groupby, pandas_groupby)
if not RangePartitioningGroupby.get():
if not RangePartitioning.get():
# `.group` fails with experimental groupby
# https://github.com/modin-project/modin/issues/6083
eval_ngroup(modin_groupby, pandas_groupby)
Expand Down Expand Up @@ -1356,7 +1351,7 @@ def sort_if_experimental_groupby(*dfs):
https://github.com/modin-project/modin/issues/5924
"""
result = dfs
if RangePartitioningGroupby.get():
if RangePartitioning.get():
dfs = try_cast_to_pandas(dfs)
result = []
for df in dfs:
Expand Down Expand Up @@ -1634,7 +1629,7 @@ def test(grp):
def eval_groups(modin_groupby, pandas_groupby):
for k, v in modin_groupby.groups.items():
assert v.equals(pandas_groupby.groups[k])
if RangePartitioningGroupby.get():
if RangePartitioning.get():
# `.get_group()` doesn't work correctly with experimental groupby:
# https://github.com/modin-project/modin/issues/6093
return
Expand Down Expand Up @@ -1946,8 +1941,7 @@ def test_agg_func_None_rename(by_and_agg_dict, as_index):
pytest.param(
False,
marks=pytest.mark.skipif(
get_current_execution() == "BaseOnPython"
or RangePartitioningGroupby.get(),
get_current_execution() == "BaseOnPython" or RangePartitioning.get(),
reason="See Pandas issue #39103",
),
),
Expand Down Expand Up @@ -3142,8 +3136,8 @@ def perform(lib):
@pytest.mark.parametrize(
"modify_config",
[
{RangePartitioningGroupby: True, IsRayCluster: True},
{RangePartitioningGroupby: True, IsRayCluster: False},
{RangePartitioning: True, IsRayCluster: True},
{RangePartitioning: True, IsRayCluster: False},
],
indirect=True,
)
Expand Down Expand Up @@ -3205,9 +3199,7 @@ def func3(group):
)


@pytest.mark.parametrize(
"modify_config", [{RangePartitioningGroupby: True}], indirect=True
)
@pytest.mark.parametrize("modify_config", [{RangePartitioning: True}], indirect=True)
def test_reshuffling_groupby_on_strings(modify_config):
# reproducer from https://github.com/modin-project/modin/issues/6509
modin_df, pandas_df = create_test_dfs(
Expand All @@ -3226,9 +3218,7 @@ def test_reshuffling_groupby_on_strings(modify_config):
eval_general(md_grp, pd_grp, lambda grp: grp.tail(10))


@pytest.mark.parametrize(
"modify_config", [{RangePartitioningGroupby: True}], indirect=True
)
@pytest.mark.parametrize("modify_config", [{RangePartitioning: True}], indirect=True)
def test_groupby_apply_series_result(modify_config):
# reproducer from the issue:
# https://github.com/modin-project/modin/issues/6632
Expand Down Expand Up @@ -3467,9 +3457,7 @@ def test_groupby_agg_provided_callable_warning():
pandas_groupby.agg(func)


@pytest.mark.parametrize(
"modify_config", [{RangePartitioningGroupby: True}], indirect=True
)
@pytest.mark.parametrize("modify_config", [{RangePartitioning: True}], indirect=True)
@pytest.mark.parametrize("observed", [False])
@pytest.mark.parametrize("as_index", [True])
@pytest.mark.parametrize(
Expand Down
3 changes: 1 addition & 2 deletions modin/tests/pandas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
MinPartitionSize,
NPartitions,
RangePartitioning,
RangePartitioningGroupby,
TestDatasetSize,
TrackFileLeaks,
)
Expand Down Expand Up @@ -700,7 +699,7 @@ def sort_if_range_partitioning(df1, df2, comparator=None):
if comparator is None:
comparator = df_equals

if RangePartitioning.get() or RangePartitioningGroupby.get():
if RangePartitioning.get() or RangePartitioning.get():
df1, df2 = sort_data(df1), sort_data(df2)

comparator(df1, df2)
Expand Down

0 comments on commit 2744476

Please sign in to comment.