Skip to content

Commit

Permalink
FIX-#4580: Fix access by row label in query and eval (#6488)
Browse files Browse the repository at this point in the history
Signed-off-by: mvashishtha <[email protected]>
  • Loading branch information
mvashishtha authored Aug 21, 2023
1 parent cfee6b4 commit 29d9da0
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 259 deletions.
19 changes: 0 additions & 19 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2343,25 +2343,6 @@ def fillna(df, value, **kwargs):

return DataFrameDefault.register(fillna)(self, **kwargs)

@doc_utils.add_refer_to("DataFrame.query")
def query(self, expr, **kwargs):
"""
Query columns of the QueryCompiler with a boolean expression.
Parameters
----------
expr : str
**kwargs : dict
Returns
-------
BaseQueryCompiler
New QueryCompiler containing the rows where the boolean expression is satisfied.
"""
return DataFrameDefault.register(pandas.DataFrame.query)(
self, expr=expr, **kwargs
)

@doc_utils.add_refer_to("DataFrame.rank")
def rank(self, **kwargs): # noqa: PR02
"""
Expand Down
6 changes: 0 additions & 6 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2477,12 +2477,6 @@ def quantile_builder(df, **kwargs):
result = self.__constructor__(new_modin_frame)
return result.transpose() if axis == 1 else result

def query(self, expr, **kwargs):
def query_builder(df, **modin_internal_kwargs):
return df.query(expr, inplace=False, **kwargs, **modin_internal_kwargs)

return self.__constructor__(self._modin_frame.filter(1, query_builder))

def rank(self, **kwargs):
axis = kwargs.get("axis", 0)
numeric_only = True if axis else kwargs.get("numeric_only", False)
Expand Down
197 changes: 0 additions & 197 deletions modin/experimental/core/storage_formats/pyarrow/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,9 @@
"""

import pandas
import pyarrow as pa

from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler
from modin.utils import _inherit_docstrings
from pandas.core.computation.expr import Expr
from pandas.core.computation.scope import Scope
from pandas.core.computation.ops import UnaryOp, BinOp, Term, MathCall, Constant


class FakeSeries:
Expand Down Expand Up @@ -59,199 +55,6 @@ class PyarrowQueryCompiler(PandasQueryCompiler):
Shape hint for frames known to be a column or a row, otherwise None.
"""

def query(self, expr, **kwargs):
def gen_table_expr(table, expr):
"""
Build pandas expression for the specified query.
Parameters
----------
table : pyarrow.Table
Table to evaluate expression on.
expr : str
Query string to evaluate on the `table` columns.
Returns
-------
pandas.core.computation.expr.Expr
"""
resolver = {
name: FakeSeries(dtype.to_pandas_dtype())
for name, dtype in zip(table.schema.names, table.schema.types)
}
scope = Scope(level=0, resolvers=(resolver,))
return Expr(expr=expr, env=scope)

unary_ops = {"~": "not"}
math_calls = {"log": "log", "exp": "exp", "log10": "log10", "cbrt": "cbrt"}
bin_ops = {
"+": "add",
"-": "subtract",
"*": "multiply",
"/": "divide",
"**": "power",
}
cmp_ops = {
"==": "equal",
"!=": "not_equal",
"<": "less_than",
"<=": "less_than_or_equal_to",
">": "greater_than",
">=": "greater_than_or_equal_to",
"like": "like",
}

def build_node(table, terms, builder):
"""
Build expression Node in Gandiva notation for the specified pandas expression.
Parameters
----------
table : pyarrow.Table
Table to evaluate expression on.
terms : pandas.core.computation.expr.Term
Pandas expression to evaluate.
builder : pyarrow.gandiva.TreeExprBuilder
Pyarrow node builder.
Returns
-------
pyarrow.gandiva.Node
"""
if isinstance(terms, Constant):
return builder.make_literal(
terms.value, (pa.from_numpy_dtype(terms.return_type))
)

if isinstance(terms, Term):
return builder.make_field(table.schema.field_by_name(terms.name))

if isinstance(terms, BinOp):
lnode = build_node(table, terms.lhs, builder)
rnode = build_node(table, terms.rhs, builder)
return_type = pa.from_numpy_dtype(terms.return_type)

if terms.op == "&":
return builder.make_and([lnode, rnode])
if terms.op == "|":
return builder.make_or([lnode, rnode])
if terms.op in cmp_ops:
assert return_type == pa.bool_()
return builder.make_function(
cmp_ops[terms.op], [lnode, rnode], return_type
)
if terms.op in bin_ops:
return builder.make_function(
bin_ops[terms.op], [lnode, rnode], return_type
)

if isinstance(terms, UnaryOp):
return_type = pa.from_numpy_dtype(terms.return_type)
return builder.make_function(
unary_ops[terms.op],
[build_node(table, terms.operand, builder)],
return_type,
)

if isinstance(terms, MathCall):
return_type = pa.from_numpy_dtype(terms.return_type)
childern = [
build_node(table, child, builder) for child in terms.operands
]
return builder.make_function(
math_calls[terms.op], childern, return_type
)

raise TypeError("Unsupported term type: %s" % terms)

def can_be_condition(expr):
"""
Check whether the passed expression is a conditional operation.
Parameters
----------
expr : pandas.core.computation.expr.Expr
Returns
-------
bool
"""
if isinstance(expr.terms, BinOp):
if expr.terms.op in cmp_ops or expr.terms.op in ("&", "|"):
return True
elif isinstance(expr.terms, UnaryOp):
if expr.terms.op == "~":
return True
return False

def filter_with_selection_vector(table, s):
"""
Filter passed pyarrow table with the specified filter.
Parameters
----------
table : pyarrow.Table
s : pyarrow.gandiva.SelectionVector
Returns
-------
pyarrow.Table
"""
record_batch = table.to_batches()[0]
indices = s.to_array() # .to_numpy()
new_columns = [
pa.array(c.to_numpy()[indices]) for c in record_batch.columns
]
return pa.Table.from_arrays(new_columns, record_batch.schema.names)

def gandiva_query(table, query):
"""
Evaluate string query on the passed table.
Parameters
----------
table : pyarrow.Table
Table to evaluate query on.
query : str
Query string to evaluate on the `table` columns.
Returns
-------
pyarrow.Table
"""
expr = gen_table_expr(table, query)
if not can_be_condition(expr):
raise ValueError("Root operation should be a filter.")

# We use this import here because of https://github.com/modin-project/modin/issues/3849,
# after the issue is fixed we should put the import at the top of this file
import pyarrow.gandiva as gandiva

builder = gandiva.TreeExprBuilder()
root = build_node(table, expr.terms, builder)
cond = builder.make_condition(root)
filt = gandiva.make_filter(table.schema, cond)
sel_vec = filt.evaluate(table.to_batches()[0], pa.default_memory_pool())
result = filter_with_selection_vector(table, sel_vec)
return result

def query_builder(arrow_table, **kwargs):
"""Evaluate string query on the passed pyarrow table."""
return gandiva_query(arrow_table, kwargs.get("expr", ""))

kwargs["expr"] = expr
# FIXME: `PandasQueryCompiler._prepare_method` was removed in #721,
# it is no longer needed to wrap function to apply.
func = self._prepare_method(query_builder, **kwargs)
# FIXME: `PandasQueryCompiler._map_across_full_axis` was removed in #721.
# This method call should be replaced to its equivalent from `operators.function`
new_data = self._map_across_full_axis(1, func)
# Query removes rows, so we need to update the index
new_index = self._compute_index(0, new_data, False)
return self.__constructor__(
new_data, new_index, self.columns, self._dtype_cache
)

def _compute_index(self, axis, data_object, compute_diff=True):
"""
Compute index labels of the passed Modin Frame along specified axis.
Expand Down
37 changes: 20 additions & 17 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from __future__ import annotations
import pandas
from pandas.core.common import apply_if_callable, get_cython_func
from pandas.core.computation.eval import _check_engine
from pandas.core.dtypes.common import (
infer_dtype_from_object,
is_dict_like,
Expand Down Expand Up @@ -812,23 +813,18 @@ def eval(self, expr, inplace=False, **kwargs): # noqa: PR01, RT01, D200
"""
Evaluate a string describing operations on ``DataFrame`` columns.
"""
self._validate_eval_query(expr, **kwargs)
inplace = validate_bool_kwarg(inplace, "inplace")
self._update_var_dicts_in_kwargs(expr, kwargs)
new_query_compiler = self._query_compiler.eval(expr, **kwargs)
return_type = type(
pandas.DataFrame(columns=self.columns)
.astype(self.dtypes)
.eval(expr, **kwargs)
).__name__
if return_type == type(self).__name__:
return self._create_or_update_from_compiler(new_query_compiler, inplace)
else:
if inplace:
raise ValueError("Cannot operate inplace if there is no assignment")
return getattr(sys.modules[self.__module__], return_type)(
query_compiler=new_query_compiler
if _check_engine(kwargs.get("engine", None)) == "numexpr":
# on numexpr engine, pandas.eval returns np.array if input is not of pandas
# type, so we can't use pandas eval [1]. Even if we could, pandas eval seems
# to convert all the data to numpy and then do the numexpr add, which is
# slow for modin. The user would not really be getting the benefit of
# numexpr.
# [1] https://github.com/pandas-dev/pandas/blob/934eebb532cf50e872f40638a788000be6e4dda4/pandas/core/computation/align.py#L78
return self._default_to_pandas(
pandas.DataFrame.eval, expr, inplace=inplace, **kwargs
)
return pandas.DataFrame.eval(self, expr, inplace=inplace, **kwargs)

def fillna(
self,
Expand Down Expand Up @@ -1554,14 +1550,21 @@ def quantile(
method=method,
)

# methods and fields we need to use pandas.DataFrame.query
_AXIS_ORDERS = ["index", "columns"]
_get_index_resolvers = pandas.DataFrame._get_index_resolvers
_get_axis_resolvers = pandas.DataFrame._get_axis_resolvers
_get_cleaned_column_resolvers = pandas.DataFrame._get_cleaned_column_resolvers

def query(self, expr, inplace=False, **kwargs): # noqa: PR01, RT01, D200
"""
Query the columns of a ``DataFrame`` with a boolean expression.
"""
self._update_var_dicts_in_kwargs(expr, kwargs)
self._validate_eval_query(expr, **kwargs)
inplace = validate_bool_kwarg(inplace, "inplace")
new_query_compiler = self._query_compiler.query(expr, **kwargs)
new_query_compiler = pandas.DataFrame.query(
self, expr, inplace=False, **kwargs
)._query_compiler
return self._create_or_update_from_compiler(new_query_compiler, inplace)

def rename(
Expand Down
32 changes: 23 additions & 9 deletions modin/pandas/test/dataframe/test_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,15 @@ def test_eval_df_arithmetic_subexpression():
@pytest.mark.parametrize("method", ["query", "eval"])
@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
@pytest.mark.parametrize("local_var", [2])
def test_eval_and_query_with_local_and_global_var(method, data, local_var):
@pytest.mark.parametrize("engine", ["python", "numexpr"])
def test_eval_and_query_with_local_and_global_var(method, data, engine, local_var):
modin_df, pandas_df = pd.DataFrame(data), pandas.DataFrame(data)
op = "+" if method == "eval" else "<"
for expr in (f"col1 {op} @local_var", f"col1 {op} @TEST_VAR"):
df_equals(getattr(modin_df, method)(expr), getattr(pandas_df, method)(expr))
df_equals(
getattr(modin_df, method)(expr, engine=engine),
getattr(pandas_df, method)(expr, engine=engine),
)


@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
Expand Down Expand Up @@ -409,7 +413,8 @@ def f(x, arg2=0, arg3=0):

@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
@pytest.mark.parametrize("funcs", query_func_values, ids=query_func_keys)
def test_query(data, funcs):
@pytest.mark.parametrize("engine", ["python", "numexpr"])
def test_query(data, funcs, engine):
if get_current_execution() == "BaseOnPython" and funcs != "col3 > col4":
pytest.xfail(
reason="In this case, we are faced with the problem of handling empty data frames - #4934"
Expand All @@ -418,12 +423,12 @@ def test_query(data, funcs):
pandas_df = pandas.DataFrame(data)

try:
pandas_result = pandas_df.query(funcs)
pandas_result = pandas_df.query(funcs, engine=engine)
except Exception as err:
with pytest.raises(type(err)):
modin_df.query(funcs)
modin_df.query(funcs, engine=engine)
else:
modin_result = modin_df.query(funcs)
modin_result = modin_df.query(funcs, engine=engine)
# `dtypes` must be evaluated after `query` so we need to check cache
assert modin_result._query_compiler._modin_frame.has_dtypes_cache
df_equals(modin_result, pandas_result)
Expand All @@ -437,23 +442,32 @@ def test_empty_query():
modin_df.query("")


def test_query_after_insert():
@pytest.mark.parametrize("engine", ["python", "numexpr"])
def test_query_after_insert(engine):
modin_df = pd.DataFrame({"x": [-1, 0, 1, None], "y": [1, 2, None, 3]})
modin_df["z"] = modin_df.eval("x / y")
modin_df = modin_df.query("z >= 0")
modin_df = modin_df.query("z >= 0", engine=engine)
modin_result = modin_df.reset_index(drop=True)
modin_result.columns = ["a", "b", "c"]

pandas_df = pd.DataFrame({"x": [-1, 0, 1, None], "y": [1, 2, None, 3]})
pandas_df["z"] = pandas_df.eval("x / y")
pandas_df = pandas_df.query("z >= 0")
pandas_df = pandas_df.query("z >= 0", engine=engine)
pandas_result = pandas_df.reset_index(drop=True)
pandas_result.columns = ["a", "b", "c"]

df_equals(modin_result, pandas_result)
df_equals(modin_df, pandas_df)


@pytest.mark.parametrize("engine", ["python", "numexpr"])
def test_query_with_element_access_issue_4580(engine):
pdf = pandas.DataFrame({"a": [0, 1, 2]})
# get two row partitions by concatenating
df = pd.concat([pd.DataFrame(pdf[:1]), pd.DataFrame(pdf[1:])])
eval_general(df, pdf, lambda df: df.query("a == a[0]", engine=engine))


@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
@pytest.mark.parametrize(
"func",
Expand Down
Loading

0 comments on commit 29d9da0

Please sign in to comment.