Skip to content

Commit

Permalink
FIX-#1976: indices matching at reduction functions fixed (#2270)
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev authored Oct 22, 2020
1 parent 6697e05 commit 55e3459
Show file tree
Hide file tree
Showing 11 changed files with 373 additions and 478 deletions.
116 changes: 87 additions & 29 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,15 @@
is_scalar,
)
from pandas.core.base import DataError
from typing import Type, Callable
import warnings


from modin.backends.base.query_compiler import BaseQueryCompiler
from modin.error_message import ErrorMessage
from modin.utils import try_cast_to_pandas, wrap_udf_function
from modin.data_management.functions import (
Function,
FoldFunction,
MapFunction,
MapReduceFunction,
Expand Down Expand Up @@ -150,6 +153,34 @@ def caller(df, *args, **kwargs):
return caller


def _numeric_only_reduce_fn(applier: Type[Function], *funcs) -> Callable:
"""
Build reduce function for statistic operations with `numeric_only` parameter.
Parameters
----------
applier: Callable
Function object to register `funcs`
*funcs: list
List of functions to register in `applier`
Returns
-------
callable
A callable function to be applied in the partitions
"""

def caller(self, *args, **kwargs):
# If `numeric_only` is None then we don't know what columns/indices will
# be dropped at the result of reduction function, and so can't preserve labels
preserve_index = kwargs.get("numeric_only", None) is not None
return applier.register(*funcs, preserve_index=preserve_index)(
self, *args, **kwargs
)

return caller


class PandasQueryCompiler(BaseQueryCompiler):
"""This class implements the logic necessary for operating on partitions
with a Pandas backend. This logic is specific to Pandas."""
Expand Down Expand Up @@ -625,29 +656,54 @@ def is_monotonic_decreasing(self):
is_monotonic = _is_monotonic

count = MapReduceFunction.register(pandas.DataFrame.count, pandas.DataFrame.sum)
max = MapReduceFunction.register(pandas.DataFrame.max, pandas.DataFrame.max)
min = MapReduceFunction.register(pandas.DataFrame.min, pandas.DataFrame.min)
sum = MapReduceFunction.register(pandas.DataFrame.sum, pandas.DataFrame.sum)
prod = MapReduceFunction.register(pandas.DataFrame.prod, pandas.DataFrame.prod)
max = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.max)
min = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.min)
sum = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.sum)
prod = _numeric_only_reduce_fn(MapReduceFunction, pandas.DataFrame.prod)
any = MapReduceFunction.register(pandas.DataFrame.any, pandas.DataFrame.any)
all = MapReduceFunction.register(pandas.DataFrame.all, pandas.DataFrame.all)
memory_usage = MapReduceFunction.register(
pandas.DataFrame.memory_usage,
lambda x, *args, **kwargs: pandas.DataFrame.sum(x),
axis=0,
)
mean = MapReduceFunction.register(
lambda df, **kwargs: df.apply(
lambda x: (x.sum(skipna=kwargs.get("skipna", True)), x.count()),
axis=kwargs.get("axis", 0),
result_type="reduce",
).set_axis(df.axes[kwargs.get("axis", 0) ^ 1], axis=0),
lambda df, **kwargs: df.apply(
lambda x: x.apply(lambda d: d[0]).sum(skipna=kwargs.get("skipna", True))
/ x.apply(lambda d: d[1]).sum(skipna=kwargs.get("skipna", True)),
axis=kwargs.get("axis", 0),
).set_axis(df.axes[kwargs.get("axis", 0) ^ 1], axis=0),
)

def mean(self, axis, **kwargs):

This comment has been minimized.

Copy link
@YarShev

YarShev Oct 27, 2020

Collaborator

What reason is to change this method? What was before your commit if level is not None?

This comment has been minimized.

Copy link
@dchigarev

dchigarev Oct 27, 2020

Author Collaborator

The main reason is to make it human-readable))

But rewriting this method has also fixed the issue with handing level, before this commit level parameter was just ignored in mean

This comment has been minimized.

Copy link
@YarShev

YarShev Oct 28, 2020

Collaborator

Okay, I see.

if kwargs.get("level") is not None:
return self.default_to_pandas(pandas.DataFrame.mean, axis=axis, **kwargs)

skipna = kwargs.get("skipna", True)

def map_apply_fn(ser, **kwargs):
try:
sum_result = ser.sum(skipna=skipna)
count_result = ser.count()
except TypeError:
return None
else:
return (sum_result, count_result)

def reduce_apply_fn(ser, **kwargs):
sum_result = ser.apply(lambda x: x[0]).sum(skipna=skipna)
count_result = ser.apply(lambda x: x[1]).sum(skipna=skipna)
return sum_result / count_result

def reduce_fn(df, **kwargs):
df.dropna(axis=1, inplace=True, how="any")
return build_applyier(reduce_apply_fn, axis=axis)(df)

def build_applyier(func, **applyier_kwargs):
def applyier(df, **kwargs):
result = df.apply(func, **applyier_kwargs)
return result.set_axis(df.axes[axis ^ 1], axis=0)

return applyier

return MapReduceFunction.register(
build_applyier(map_apply_fn, axis=axis, result_type="reduce"),
reduce_fn,
preserve_index=(kwargs.get("numeric_only") is not None),
)(self, axis=axis, **kwargs)

def value_counts(self, **kwargs):
"""
Expand All @@ -664,7 +720,7 @@ def value_counts(self, **kwargs):
return self.__constructor__(new_modin_frame)

def map_func(df, *args, **kwargs):
return df.squeeze(axis=1).value_counts(**kwargs)
return df.squeeze(axis=1).value_counts(**kwargs).to_frame()

This comment has been minimized.

Copy link
@YarShev

YarShev Oct 27, 2020

Collaborator

Why is to_frame needed now?

This comment has been minimized.

Copy link
@dchigarev

dchigarev Oct 27, 2020

Author Collaborator

Previously value_counts was passing None axis to the engine, now it explicitly passes axis=0, that makes engine process applied function correctly (what was wrong before). And in case of axis=0 map reduce function transposes the result if it is a Series link to code, which is undesired since we want to be able to access the whole 0 axis at the reduce stage. So now we manually create Frame from series, to avoid undesired transposing.

This comment has been minimized.

Copy link
@YarShev

YarShev Oct 28, 2020

Collaborator

As I see, the main reason for the change is to pass either axis=0 or axis=1, not None. Am I right?

This comment has been minimized.

Copy link
@dchigarev

dchigarev Oct 28, 2020

Author Collaborator

Yeap, since we fixed that we're not passing None axis at engine level anymore, and so map_reduce method started to work correctly, then we have to do this change


def reduce_func(df, *args, **kwargs):
normalize = kwargs.get("normalize", False)
Expand Down Expand Up @@ -735,28 +791,30 @@ def sort_index_for_equal_values(result, ascending):
else:
new_index[j] = result.index[j]
i += 1
return pandas.DataFrame(result, index=new_index)
return pandas.DataFrame(
result, index=new_index, columns=["__reduced__"]
)

return sort_index_for_equal_values(result, ascending)

return MapReduceFunction.register(map_func, reduce_func, preserve_index=False)(
self, **kwargs
)
return MapReduceFunction.register(
map_func, reduce_func, axis=0, preserve_index=False
)(self, **kwargs)

# END MapReduce operations

# Reduction operations
idxmax = ReductionFunction.register(pandas.DataFrame.idxmax)
idxmin = ReductionFunction.register(pandas.DataFrame.idxmin)
median = ReductionFunction.register(pandas.DataFrame.median)
median = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.median)
nunique = ReductionFunction.register(pandas.DataFrame.nunique)
skew = ReductionFunction.register(pandas.DataFrame.skew)
kurt = ReductionFunction.register(pandas.DataFrame.kurt)
sem = ReductionFunction.register(pandas.DataFrame.sem)
std = ReductionFunction.register(pandas.DataFrame.std)
var = ReductionFunction.register(pandas.DataFrame.var)
sum_min_count = ReductionFunction.register(pandas.DataFrame.sum)
prod_min_count = ReductionFunction.register(pandas.DataFrame.prod)
skew = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.skew)
kurt = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.kurt)
sem = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.sem)
std = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.std)
var = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.var)
sum_min_count = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.sum)
prod_min_count = _numeric_only_reduce_fn(ReductionFunction, pandas.DataFrame.prod)
quantile_for_single_value = ReductionFunction.register(pandas.DataFrame.quantile)
mad = ReductionFunction.register(pandas.DataFrame.mad)
to_datetime = ReductionFunction.register(
Expand Down
2 changes: 2 additions & 0 deletions modin/data_management/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from .function import Function
from .mapfunction import MapFunction
from .mapreducefunction import MapReduceFunction
from .reductionfunction import ReductionFunction
Expand All @@ -19,6 +20,7 @@
from .groupby_function import GroupbyReduceFunction

__all__ = [
"Function",
"MapFunction",
"MapReduceFunction",
"ReductionFunction",
Expand Down
5 changes: 2 additions & 3 deletions modin/data_management/functions/foldfunction.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ class FoldFunction(Function):
@classmethod
def call(cls, fold_function, **call_kwds):
def caller(query_compiler, *args, **kwargs):
axis = call_kwds.get("axis", kwargs.get("axis"))
return query_compiler.__constructor__(
query_compiler._modin_frame._fold(
call_kwds.get("axis")
if "axis" in call_kwds
else kwargs.get("axis"),
cls.validate_axis(axis),
lambda x: fold_function(x, *args, **kwargs),
)
)
Expand Down
6 changes: 6 additions & 0 deletions modin/data_management/functions/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

from typing import Optional


class Function(object):
def __init__(self):
Expand All @@ -27,3 +29,7 @@ def call(cls, func, **call_kwds):
@classmethod
def register(cls, func, **kwargs):
return cls.call(func, **kwargs)

@classmethod
def validate_axis(cls, axis: Optional[int]) -> int:

This comment has been minimized.

Copy link
@YarShev

YarShev Oct 27, 2020

Collaborator

I think we need to validate axis in API layer. We have corresponding function _get_axis_number for this.

This comment has been minimized.

Copy link
@dchigarev

dchigarev Oct 27, 2020

Author Collaborator

We already doing this, but sometimes it happens that we're not passing the applying axis nor via .register not via method itself (like it was in value_counts), so the applying axis considered to be None and in that case we will get strange behavior in engine part.

If axis is None, then:

bool(axis) == False
bool(axis == 0) == False
bool(axis == 1) == False

And if we will meet two if-else statements that compares axis value differently [1] [2] then with axis == None we consider that for some part of executing function axis is 0, but for other it is 1, which can give undesired effect.

So it's better to validate axis, before passing it into the engine part.

This comment has been minimized.

Copy link
@YarShev

YarShev Oct 28, 2020

Collaborator

I see, for now we can keep so. My point was to move all validation of axes into API layer in order to pass axis=0 or axis=1 into QC layer.

This comment has been minimized.

Copy link
@dchigarev

dchigarev Oct 28, 2020

Author Collaborator

But the thing is, that in case of value_counts we doesn't have any axis parameter on the API layer, so there is no value to validate, and when we're not specifying axis explicitly in .register method at QC level, axis considered to be None (0 after this fix).

For all cases, where we have axis parameter at the API level, we validate it at the API level

return 0 if axis is None else axis
9 changes: 5 additions & 4 deletions modin/data_management/functions/mapreducefunction.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ class MapReduceFunction(Function):
def call(cls, map_function, reduce_function, **call_kwds):
def caller(query_compiler, *args, **kwargs):
preserve_index = call_kwds.pop("preserve_index", True)
axis = call_kwds.get("axis", kwargs.get("axis"))
return query_compiler.__constructor__(
query_compiler._modin_frame._map_reduce(
call_kwds.get("axis")
if "axis" in call_kwds
else kwargs.get("axis"),
cls.validate_axis(axis),
lambda x: map_function(x, *args, **kwargs),
lambda y: reduce_function(y, *args, **kwargs),
preserve_index=preserve_index,
Expand All @@ -33,5 +32,7 @@ def caller(query_compiler, *args, **kwargs):
return caller

@classmethod
def register(cls, map_function, reduce_function, **kwargs):
def register(cls, map_function, reduce_function=None, **kwargs):
if reduce_function is None:
reduce_function = map_function
return cls.call(map_function, reduce_function, **kwargs)
7 changes: 4 additions & 3 deletions modin/data_management/functions/reductionfunction.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@ class ReductionFunction(Function):
@classmethod
def call(cls, reduction_function, **call_kwds):
def caller(query_compiler, *args, **kwargs):
preserve_index = call_kwds.pop("preserve_index", True)
axis = call_kwds.get("axis", kwargs.get("axis"))
return query_compiler.__constructor__(
query_compiler._modin_frame._fold_reduce(
call_kwds.get("axis")
if "axis" in call_kwds
else kwargs.get("axis"),
cls.validate_axis(axis),
lambda x: reduction_function(x, *args, **kwargs),
preserve_index=preserve_index,
)
)

Expand Down
Loading

0 comments on commit 55e3459

Please sign in to comment.