Skip to content

Commit

Permalink
FIX-#2311: fixed performance bottleneck at reduction operations (#2314)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev authored Oct 28, 2020
1 parent 5c5a8e0 commit 5cabeb9
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 27 deletions.
5 changes: 3 additions & 2 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@ def _numeric_only_reduce_fn(applier: Type[Function], *funcs) -> Callable:
"""

def caller(self, *args, **kwargs):
# If `numeric_only` is None then we don't know what columns/indices will
# be dropped at the result of reduction function, and so can't preserve labels
# If `numeric_only` is None and the frame contains non-numeric columns,
# then we don't know what columns/indices will be dropped at the result
# of reduction function, and so can't preserve labels
preserve_index = kwargs.get("numeric_only", None) is not None
return applier.register(*funcs, preserve_index=preserve_index)(
self, *args, **kwargs
Expand Down
50 changes: 29 additions & 21 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -330,12 +330,14 @@ def _validate_axis_equality(self, axis: int, force: bool = False):
def _validate_internal_indices(self, mode=None, **kwargs):
"""
Validates and optionally updates internal and external indices
of modin_frame in specified mode. There is 3 modes supported:
1. "reduced" - force validates on that axes
where external indices is ["__reduced__"]
2. "all" - validates indices at all axes, optionally force
of modin_frame in specified mode. There is 4 modes supported:
1. "reduced" - validates on that axis where external
indices is ["__reduced__"] for not force
2. "reduced+other" - validates on axis where external
indices is ["__reduced__"] for not force, and force on another axis
3. "all" - validates indices at all axes, optionally force
if `force` parameter specified in kwargs
3. "custom" - validation follows arguments specified in kwargs.
4. "custom" - validation follows arguments specified in kwargs.
Parameters
----------
Expand All @@ -353,26 +355,38 @@ def _validate_internal_indices(self, mode=None, **kwargs):
is_force = kwargs.get("force", False)

reduced_sample = pandas.Index(["__reduced__"])

is_axis_reduced = [self.axes[i].equals(reduced_sample) for i in [0, 1]]

args_dict = {
"custom": kwargs,
"reduced": {
"validate_index": self.index.equals(reduced_sample),
"validate_columns": self.columns.equals(reduced_sample),
"force": False,
"validate_index": is_axis_reduced[0],
"validate_columns": is_axis_reduced[1],
"force": [False, False],
},
"reduced+other": {
"validate_index": True,
"validate_columns": True,
"force": [not is_axis_reduced[0], not is_axis_reduced[1]],
},
"all": {
"validate_index": True,
"validate_columns": True,
"force": is_force,
"force": [is_force, is_force],
},
}

args = args_dict.get(mode, args_dict["custom"])

def force_getter(axis):
force = args.get("force", False)
return force[axis] if isinstance(force, list) else force

if args.get("validate_index", True):
self._validate_axis_equality(axis=0, force=args.get("force"))
self._validate_axis_equality(axis=0, force=force_getter(0))
if args.get("validate_columns", True):
self._validate_axis_equality(axis=1, force=args.get("force"))
self._validate_axis_equality(axis=1, force=force_getter(1))

def _apply_index_objs(self, axis=None):
"""Lazily applies the index object (Index or Columns) to the partitions.
Expand Down Expand Up @@ -1026,18 +1040,12 @@ def _compute_map_reduce_metadata(self, axis, new_parts, preserve_index=True):
new_axes, new_axes_lengths = [0, 0], [0, 0]

new_axes[axis] = ["__reduced__"]
new_axes[axis ^ 1] = (
self.axes[axis ^ 1]
if preserve_index
else self._compute_axis_labels(axis ^ 1, new_parts)
)
new_axes[axis ^ 1] = self.axes[axis ^ 1]

new_axes_lengths[axis] = [1]
new_axes_lengths[axis ^ 1] = (
self._axes_lengths[axis ^ 1] if preserve_index else None
)
new_axes_lengths[axis ^ 1] = self._axes_lengths[axis ^ 1]

if (axis == 0 or self._dtypes is None) and preserve_index:
if axis == 0 or self._dtypes is None:
new_dtypes = self._dtypes
elif preserve_index:
new_dtypes = pandas.Series(
Expand All @@ -1051,7 +1059,7 @@ def _compute_map_reduce_metadata(self, axis, new_parts, preserve_index=True):
*new_axes,
*new_axes_lengths,
new_dtypes,
validate_axes="reduced",
validate_axes=("reduced" if preserve_index else "reduced+other"),
)

def _fold_reduce(self, axis, func, preserve_index=True):
Expand Down
14 changes: 12 additions & 2 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1449,8 +1449,18 @@ def _stat_operation(
"""
axis = self._get_axis_number(axis)
if numeric_only is not None and not numeric_only:
self._validate_dtypes(numeric_only=True)
# If `numeric_only` is None, then we can do this precheck to whether or not
# frame contains non-numeric columns, if it doesn't, then we can pass to a backend
# `numeric_only=False` parameter and make its work easier in that case, rather than
# performing under complicate `numeric_only=None` parameter
if not numeric_only:
try:
self._validate_dtypes(numeric_only=True)
except TypeError:
if numeric_only is not None:
raise
else:
numeric_only = False

data = self._get_numeric_data(axis) if numeric_only else self

Expand Down
7 changes: 5 additions & 2 deletions modin/pandas/test/dataframe/test_reduction.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,14 @@ def test_sum_single_column(data):
@pytest.mark.parametrize(
"fn", ["max", "min", "median", "mean", "skew", "kurt", "sem", "std", "var"]
)
@pytest.mark.parametrize("axis", [0, 1])
@pytest.mark.parametrize(
"numeric_only", bool_arg_values, ids=arg_keys("numeric_only", bool_arg_keys)
)
def test_reduction_specific(fn, numeric_only):
def test_reduction_specific(fn, numeric_only, axis):
if fn == "mean" and axis == 1:
pytest.skip("See issue #2313 for details")
eval_general(
*create_test_dfs(test_data_diff_dtype),
lambda df: getattr(df, fn)(numeric_only=numeric_only),
lambda df: getattr(df, fn)(numeric_only=numeric_only, axis=axis),
)

0 comments on commit 5cabeb9

Please sign in to comment.