From 29d9da056a4e4766314c4ae9ea62b33e0d52e662 Mon Sep 17 00:00:00 2001 From: Mahesh Vashishtha Date: Mon, 21 Aug 2023 15:29:47 -0500 Subject: [PATCH] FIX-#4580: Fix access by row label in query and eval (#6488) Signed-off-by: mvashishtha --- .../storage_formats/base/query_compiler.py | 19 -- .../storage_formats/pandas/query_compiler.py | 6 - .../storage_formats/pyarrow/query_compiler.py | 197 ------------------ modin/pandas/dataframe.py | 37 ++-- modin/pandas/test/dataframe/test_udf.py | 32 ++- modin/pandas/test/test_series.py | 18 +- .../storage_formats/pandas/test_internals.py | 4 +- 7 files changed, 54 insertions(+), 259 deletions(-) diff --git a/modin/core/storage_formats/base/query_compiler.py b/modin/core/storage_formats/base/query_compiler.py index 188960a19a1..21f2d1032ef 100644 --- a/modin/core/storage_formats/base/query_compiler.py +++ b/modin/core/storage_formats/base/query_compiler.py @@ -2343,25 +2343,6 @@ def fillna(df, value, **kwargs): return DataFrameDefault.register(fillna)(self, **kwargs) - @doc_utils.add_refer_to("DataFrame.query") - def query(self, expr, **kwargs): - """ - Query columns of the QueryCompiler with a boolean expression. - - Parameters - ---------- - expr : str - **kwargs : dict - - Returns - ------- - BaseQueryCompiler - New QueryCompiler containing the rows where the boolean expression is satisfied. - """ - return DataFrameDefault.register(pandas.DataFrame.query)( - self, expr=expr, **kwargs - ) - @doc_utils.add_refer_to("DataFrame.rank") def rank(self, **kwargs): # noqa: PR02 """ diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 718b39e76f1..823aff25615 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -2477,12 +2477,6 @@ def quantile_builder(df, **kwargs): result = self.__constructor__(new_modin_frame) return result.transpose() if axis == 1 else result - def query(self, expr, **kwargs): - def query_builder(df, **modin_internal_kwargs): - return df.query(expr, inplace=False, **kwargs, **modin_internal_kwargs) - - return self.__constructor__(self._modin_frame.filter(1, query_builder)) - def rank(self, **kwargs): axis = kwargs.get("axis", 0) numeric_only = True if axis else kwargs.get("numeric_only", False) diff --git a/modin/experimental/core/storage_formats/pyarrow/query_compiler.py b/modin/experimental/core/storage_formats/pyarrow/query_compiler.py index ae888cedbac..75e42e5a6c0 100644 --- a/modin/experimental/core/storage_formats/pyarrow/query_compiler.py +++ b/modin/experimental/core/storage_formats/pyarrow/query_compiler.py @@ -19,13 +19,9 @@ """ import pandas -import pyarrow as pa from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler from modin.utils import _inherit_docstrings -from pandas.core.computation.expr import Expr -from pandas.core.computation.scope import Scope -from pandas.core.computation.ops import UnaryOp, BinOp, Term, MathCall, Constant class FakeSeries: @@ -59,199 +55,6 @@ class PyarrowQueryCompiler(PandasQueryCompiler): Shape hint for frames known to be a column or a row, otherwise None. """ - def query(self, expr, **kwargs): - def gen_table_expr(table, expr): - """ - Build pandas expression for the specified query. - - Parameters - ---------- - table : pyarrow.Table - Table to evaluate expression on. - expr : str - Query string to evaluate on the `table` columns. - - Returns - ------- - pandas.core.computation.expr.Expr - """ - resolver = { - name: FakeSeries(dtype.to_pandas_dtype()) - for name, dtype in zip(table.schema.names, table.schema.types) - } - scope = Scope(level=0, resolvers=(resolver,)) - return Expr(expr=expr, env=scope) - - unary_ops = {"~": "not"} - math_calls = {"log": "log", "exp": "exp", "log10": "log10", "cbrt": "cbrt"} - bin_ops = { - "+": "add", - "-": "subtract", - "*": "multiply", - "/": "divide", - "**": "power", - } - cmp_ops = { - "==": "equal", - "!=": "not_equal", - "<": "less_than", - "<=": "less_than_or_equal_to", - ">": "greater_than", - ">=": "greater_than_or_equal_to", - "like": "like", - } - - def build_node(table, terms, builder): - """ - Build expression Node in Gandiva notation for the specified pandas expression. - - Parameters - ---------- - table : pyarrow.Table - Table to evaluate expression on. - terms : pandas.core.computation.expr.Term - Pandas expression to evaluate. - builder : pyarrow.gandiva.TreeExprBuilder - Pyarrow node builder. - - Returns - ------- - pyarrow.gandiva.Node - """ - if isinstance(terms, Constant): - return builder.make_literal( - terms.value, (pa.from_numpy_dtype(terms.return_type)) - ) - - if isinstance(terms, Term): - return builder.make_field(table.schema.field_by_name(terms.name)) - - if isinstance(terms, BinOp): - lnode = build_node(table, terms.lhs, builder) - rnode = build_node(table, terms.rhs, builder) - return_type = pa.from_numpy_dtype(terms.return_type) - - if terms.op == "&": - return builder.make_and([lnode, rnode]) - if terms.op == "|": - return builder.make_or([lnode, rnode]) - if terms.op in cmp_ops: - assert return_type == pa.bool_() - return builder.make_function( - cmp_ops[terms.op], [lnode, rnode], return_type - ) - if terms.op in bin_ops: - return builder.make_function( - bin_ops[terms.op], [lnode, rnode], return_type - ) - - if isinstance(terms, UnaryOp): - return_type = pa.from_numpy_dtype(terms.return_type) - return builder.make_function( - unary_ops[terms.op], - [build_node(table, terms.operand, builder)], - return_type, - ) - - if isinstance(terms, MathCall): - return_type = pa.from_numpy_dtype(terms.return_type) - childern = [ - build_node(table, child, builder) for child in terms.operands - ] - return builder.make_function( - math_calls[terms.op], childern, return_type - ) - - raise TypeError("Unsupported term type: %s" % terms) - - def can_be_condition(expr): - """ - Check whether the passed expression is a conditional operation. - - Parameters - ---------- - expr : pandas.core.computation.expr.Expr - - Returns - ------- - bool - """ - if isinstance(expr.terms, BinOp): - if expr.terms.op in cmp_ops or expr.terms.op in ("&", "|"): - return True - elif isinstance(expr.terms, UnaryOp): - if expr.terms.op == "~": - return True - return False - - def filter_with_selection_vector(table, s): - """ - Filter passed pyarrow table with the specified filter. - - Parameters - ---------- - table : pyarrow.Table - s : pyarrow.gandiva.SelectionVector - - Returns - ------- - pyarrow.Table - """ - record_batch = table.to_batches()[0] - indices = s.to_array() # .to_numpy() - new_columns = [ - pa.array(c.to_numpy()[indices]) for c in record_batch.columns - ] - return pa.Table.from_arrays(new_columns, record_batch.schema.names) - - def gandiva_query(table, query): - """ - Evaluate string query on the passed table. - - Parameters - ---------- - table : pyarrow.Table - Table to evaluate query on. - query : str - Query string to evaluate on the `table` columns. - - Returns - ------- - pyarrow.Table - """ - expr = gen_table_expr(table, query) - if not can_be_condition(expr): - raise ValueError("Root operation should be a filter.") - - # We use this import here because of https://github.com/modin-project/modin/issues/3849, - # after the issue is fixed we should put the import at the top of this file - import pyarrow.gandiva as gandiva - - builder = gandiva.TreeExprBuilder() - root = build_node(table, expr.terms, builder) - cond = builder.make_condition(root) - filt = gandiva.make_filter(table.schema, cond) - sel_vec = filt.evaluate(table.to_batches()[0], pa.default_memory_pool()) - result = filter_with_selection_vector(table, sel_vec) - return result - - def query_builder(arrow_table, **kwargs): - """Evaluate string query on the passed pyarrow table.""" - return gandiva_query(arrow_table, kwargs.get("expr", "")) - - kwargs["expr"] = expr - # FIXME: `PandasQueryCompiler._prepare_method` was removed in #721, - # it is no longer needed to wrap function to apply. - func = self._prepare_method(query_builder, **kwargs) - # FIXME: `PandasQueryCompiler._map_across_full_axis` was removed in #721. - # This method call should be replaced to its equivalent from `operators.function` - new_data = self._map_across_full_axis(1, func) - # Query removes rows, so we need to update the index - new_index = self._compute_index(0, new_data, False) - return self.__constructor__( - new_data, new_index, self.columns, self._dtype_cache - ) - def _compute_index(self, axis, data_object, compute_diff=True): """ Compute index labels of the passed Modin Frame along specified axis. diff --git a/modin/pandas/dataframe.py b/modin/pandas/dataframe.py index 3e62d1ce789..4f391cc4127 100644 --- a/modin/pandas/dataframe.py +++ b/modin/pandas/dataframe.py @@ -16,6 +16,7 @@ from __future__ import annotations import pandas from pandas.core.common import apply_if_callable, get_cython_func +from pandas.core.computation.eval import _check_engine from pandas.core.dtypes.common import ( infer_dtype_from_object, is_dict_like, @@ -812,23 +813,18 @@ def eval(self, expr, inplace=False, **kwargs): # noqa: PR01, RT01, D200 """ Evaluate a string describing operations on ``DataFrame`` columns. """ - self._validate_eval_query(expr, **kwargs) - inplace = validate_bool_kwarg(inplace, "inplace") self._update_var_dicts_in_kwargs(expr, kwargs) - new_query_compiler = self._query_compiler.eval(expr, **kwargs) - return_type = type( - pandas.DataFrame(columns=self.columns) - .astype(self.dtypes) - .eval(expr, **kwargs) - ).__name__ - if return_type == type(self).__name__: - return self._create_or_update_from_compiler(new_query_compiler, inplace) - else: - if inplace: - raise ValueError("Cannot operate inplace if there is no assignment") - return getattr(sys.modules[self.__module__], return_type)( - query_compiler=new_query_compiler + if _check_engine(kwargs.get("engine", None)) == "numexpr": + # on numexpr engine, pandas.eval returns np.array if input is not of pandas + # type, so we can't use pandas eval [1]. Even if we could, pandas eval seems + # to convert all the data to numpy and then do the numexpr add, which is + # slow for modin. The user would not really be getting the benefit of + # numexpr. + # [1] https://github.com/pandas-dev/pandas/blob/934eebb532cf50e872f40638a788000be6e4dda4/pandas/core/computation/align.py#L78 + return self._default_to_pandas( + pandas.DataFrame.eval, expr, inplace=inplace, **kwargs ) + return pandas.DataFrame.eval(self, expr, inplace=inplace, **kwargs) def fillna( self, @@ -1554,14 +1550,21 @@ def quantile( method=method, ) + # methods and fields we need to use pandas.DataFrame.query + _AXIS_ORDERS = ["index", "columns"] + _get_index_resolvers = pandas.DataFrame._get_index_resolvers + _get_axis_resolvers = pandas.DataFrame._get_axis_resolvers + _get_cleaned_column_resolvers = pandas.DataFrame._get_cleaned_column_resolvers + def query(self, expr, inplace=False, **kwargs): # noqa: PR01, RT01, D200 """ Query the columns of a ``DataFrame`` with a boolean expression. """ self._update_var_dicts_in_kwargs(expr, kwargs) - self._validate_eval_query(expr, **kwargs) inplace = validate_bool_kwarg(inplace, "inplace") - new_query_compiler = self._query_compiler.query(expr, **kwargs) + new_query_compiler = pandas.DataFrame.query( + self, expr, inplace=False, **kwargs + )._query_compiler return self._create_or_update_from_compiler(new_query_compiler, inplace) def rename( diff --git a/modin/pandas/test/dataframe/test_udf.py b/modin/pandas/test/dataframe/test_udf.py index 1f3d6de8ea0..419ba963d2b 100644 --- a/modin/pandas/test/dataframe/test_udf.py +++ b/modin/pandas/test/dataframe/test_udf.py @@ -344,11 +344,15 @@ def test_eval_df_arithmetic_subexpression(): @pytest.mark.parametrize("method", ["query", "eval"]) @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) @pytest.mark.parametrize("local_var", [2]) -def test_eval_and_query_with_local_and_global_var(method, data, local_var): +@pytest.mark.parametrize("engine", ["python", "numexpr"]) +def test_eval_and_query_with_local_and_global_var(method, data, engine, local_var): modin_df, pandas_df = pd.DataFrame(data), pandas.DataFrame(data) op = "+" if method == "eval" else "<" for expr in (f"col1 {op} @local_var", f"col1 {op} @TEST_VAR"): - df_equals(getattr(modin_df, method)(expr), getattr(pandas_df, method)(expr)) + df_equals( + getattr(modin_df, method)(expr, engine=engine), + getattr(pandas_df, method)(expr, engine=engine), + ) @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) @@ -409,7 +413,8 @@ def f(x, arg2=0, arg3=0): @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) @pytest.mark.parametrize("funcs", query_func_values, ids=query_func_keys) -def test_query(data, funcs): +@pytest.mark.parametrize("engine", ["python", "numexpr"]) +def test_query(data, funcs, engine): if get_current_execution() == "BaseOnPython" and funcs != "col3 > col4": pytest.xfail( reason="In this case, we are faced with the problem of handling empty data frames - #4934" @@ -418,12 +423,12 @@ def test_query(data, funcs): pandas_df = pandas.DataFrame(data) try: - pandas_result = pandas_df.query(funcs) + pandas_result = pandas_df.query(funcs, engine=engine) except Exception as err: with pytest.raises(type(err)): - modin_df.query(funcs) + modin_df.query(funcs, engine=engine) else: - modin_result = modin_df.query(funcs) + modin_result = modin_df.query(funcs, engine=engine) # `dtypes` must be evaluated after `query` so we need to check cache assert modin_result._query_compiler._modin_frame.has_dtypes_cache df_equals(modin_result, pandas_result) @@ -437,16 +442,17 @@ def test_empty_query(): modin_df.query("") -def test_query_after_insert(): +@pytest.mark.parametrize("engine", ["python", "numexpr"]) +def test_query_after_insert(engine): modin_df = pd.DataFrame({"x": [-1, 0, 1, None], "y": [1, 2, None, 3]}) modin_df["z"] = modin_df.eval("x / y") - modin_df = modin_df.query("z >= 0") + modin_df = modin_df.query("z >= 0", engine=engine) modin_result = modin_df.reset_index(drop=True) modin_result.columns = ["a", "b", "c"] pandas_df = pd.DataFrame({"x": [-1, 0, 1, None], "y": [1, 2, None, 3]}) pandas_df["z"] = pandas_df.eval("x / y") - pandas_df = pandas_df.query("z >= 0") + pandas_df = pandas_df.query("z >= 0", engine=engine) pandas_result = pandas_df.reset_index(drop=True) pandas_result.columns = ["a", "b", "c"] @@ -454,6 +460,14 @@ def test_query_after_insert(): df_equals(modin_df, pandas_df) +@pytest.mark.parametrize("engine", ["python", "numexpr"]) +def test_query_with_element_access_issue_4580(engine): + pdf = pandas.DataFrame({"a": [0, 1, 2]}) + # get two row partitions by concatenating + df = pd.concat([pd.DataFrame(pdf[:1]), pd.DataFrame(pdf[1:])]) + eval_general(df, pdf, lambda df: df.query("a == a[0]", engine=engine)) + + @pytest.mark.parametrize("data", test_data_values, ids=test_data_keys) @pytest.mark.parametrize( "func", diff --git a/modin/pandas/test/test_series.py b/modin/pandas/test/test_series.py index bfe25ae4444..0fe8ad944f6 100644 --- a/modin/pandas/test/test_series.py +++ b/modin/pandas/test/test_series.py @@ -1867,21 +1867,21 @@ def test_dt(timezone): def dt_with_empty_partition(lib): # For context, see https://github.com/modin-project/modin/issues/5112 - df_a = lib.DataFrame({"A": [lib.to_datetime("26/10/2020")]}) - df_b = lib.DataFrame({"B": [lib.to_datetime("27/10/2020")]}) - df = lib.concat([df_a, df_b], axis=1) - eval_result = df.eval("B - A", engine="python") + df = ( + pd.concat([pd.DataFrame([None]), pd.DataFrame([pd.TimeDelta(1)])], axis=1) + .dropna(axis=1) + .squeeze(1) + ) # BaseOnPython ahd HDK had a single partition after the concat, and it - # maintains that partition after eval. In other execution modes, - # eval() should re-split the result into two column partitions, - # one of which is empty. + # maintains that partition after dropna and squeeze. In other execution modes, + # the series should have two column partitions, one of which is empty. if ( isinstance(df, pd.DataFrame) and get_current_execution() != "BaseOnPython" and StorageFormat.get() != "Hdk" ): - assert eval_result._query_compiler._modin_frame._partitions.shape == (1, 2) - return eval_result.dt.days + assert df._query_compiler._modin_frame._partitions.shape == (1, 2) + return df.dt.days eval_general(pd, pandas, dt_with_empty_partition) diff --git a/modin/test/storage_formats/pandas/test_internals.py b/modin/test/storage_formats/pandas/test_internals.py index ae1ab48a16c..f2e6d5c0b14 100644 --- a/modin/test/storage_formats/pandas/test_internals.py +++ b/modin/test/storage_formats/pandas/test_internals.py @@ -757,7 +757,7 @@ def test_groupby_with_empty_partition(): pandas_df=pandas.DataFrame({"a": [1, 1, 2, 2], "b": [3, 4, 5, 6]}), partitioning_scheme={"row_lengths": [2, 2], "column_widths": [2]}, ) - md_res = md_df.query("a > 1") + md_res = md_df.query("a > 1", engine="python") grp_obj = md_res.groupby("a") # check index error due to partitioning missmatching grp_obj.count() @@ -766,7 +766,7 @@ def test_groupby_with_empty_partition(): pandas_df=pandas.DataFrame({"a": [1, 1, 2, 2], "b": [3, 4, 5, 6]}), partitioning_scheme={"row_lengths": [2, 2], "column_widths": [2]}, ) - md_res = md_df.query("a > 1") + md_res = md_df.query("a > 1", engine="python") grp_obj = md_res.groupby(md_res["a"]) grp_obj.count()