Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#4580: Fix access by row label in query and eval #6488

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2343,25 +2343,6 @@ def fillna(df, value, **kwargs):

return DataFrameDefault.register(fillna)(self, **kwargs)

@doc_utils.add_refer_to("DataFrame.query")
def query(self, expr, **kwargs):
"""
Query columns of the QueryCompiler with a boolean expression.

Parameters
----------
expr : str
**kwargs : dict

Returns
-------
BaseQueryCompiler
New QueryCompiler containing the rows where the boolean expression is satisfied.
"""
return DataFrameDefault.register(pandas.DataFrame.query)(
self, expr=expr, **kwargs
)

@doc_utils.add_refer_to("DataFrame.rank")
def rank(self, **kwargs): # noqa: PR02
"""
Expand Down
6 changes: 0 additions & 6 deletions modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2477,12 +2477,6 @@ def quantile_builder(df, **kwargs):
result = self.__constructor__(new_modin_frame)
return result.transpose() if axis == 1 else result

def query(self, expr, **kwargs):
def query_builder(df, **modin_internal_kwargs):
return df.query(expr, inplace=False, **kwargs, **modin_internal_kwargs)

return self.__constructor__(self._modin_frame.filter(1, query_builder))
Comment on lines -2480 to -2484
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After this PR we see about ~1.6x perf degradation for one of our internal workloads that uses simple queries a lot.

@mvashishtha can you please check #6499?


def rank(self, **kwargs):
axis = kwargs.get("axis", 0)
numeric_only = True if axis else kwargs.get("numeric_only", False)
Expand Down
197 changes: 0 additions & 197 deletions modin/experimental/core/storage_formats/pyarrow/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,9 @@
"""

import pandas
import pyarrow as pa

from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler
from modin.utils import _inherit_docstrings
from pandas.core.computation.expr import Expr
from pandas.core.computation.scope import Scope
from pandas.core.computation.ops import UnaryOp, BinOp, Term, MathCall, Constant


class FakeSeries:
Expand Down Expand Up @@ -59,199 +55,6 @@ class PyarrowQueryCompiler(PandasQueryCompiler):
Shape hint for frames known to be a column or a row, otherwise None.
"""

def query(self, expr, **kwargs):
def gen_table_expr(table, expr):
"""
Build pandas expression for the specified query.

Parameters
----------
table : pyarrow.Table
Table to evaluate expression on.
expr : str
Query string to evaluate on the `table` columns.

Returns
-------
pandas.core.computation.expr.Expr
"""
resolver = {
name: FakeSeries(dtype.to_pandas_dtype())
for name, dtype in zip(table.schema.names, table.schema.types)
}
scope = Scope(level=0, resolvers=(resolver,))
return Expr(expr=expr, env=scope)

unary_ops = {"~": "not"}
math_calls = {"log": "log", "exp": "exp", "log10": "log10", "cbrt": "cbrt"}
bin_ops = {
"+": "add",
"-": "subtract",
"*": "multiply",
"/": "divide",
"**": "power",
}
cmp_ops = {
"==": "equal",
"!=": "not_equal",
"<": "less_than",
"<=": "less_than_or_equal_to",
">": "greater_than",
">=": "greater_than_or_equal_to",
"like": "like",
}

def build_node(table, terms, builder):
"""
Build expression Node in Gandiva notation for the specified pandas expression.

Parameters
----------
table : pyarrow.Table
Table to evaluate expression on.
terms : pandas.core.computation.expr.Term
Pandas expression to evaluate.
builder : pyarrow.gandiva.TreeExprBuilder
Pyarrow node builder.

Returns
-------
pyarrow.gandiva.Node
"""
if isinstance(terms, Constant):
return builder.make_literal(
terms.value, (pa.from_numpy_dtype(terms.return_type))
)

if isinstance(terms, Term):
return builder.make_field(table.schema.field_by_name(terms.name))

if isinstance(terms, BinOp):
lnode = build_node(table, terms.lhs, builder)
rnode = build_node(table, terms.rhs, builder)
return_type = pa.from_numpy_dtype(terms.return_type)

if terms.op == "&":
return builder.make_and([lnode, rnode])
if terms.op == "|":
return builder.make_or([lnode, rnode])
if terms.op in cmp_ops:
assert return_type == pa.bool_()
return builder.make_function(
cmp_ops[terms.op], [lnode, rnode], return_type
)
if terms.op in bin_ops:
return builder.make_function(
bin_ops[terms.op], [lnode, rnode], return_type
)

if isinstance(terms, UnaryOp):
return_type = pa.from_numpy_dtype(terms.return_type)
return builder.make_function(
unary_ops[terms.op],
[build_node(table, terms.operand, builder)],
return_type,
)

if isinstance(terms, MathCall):
return_type = pa.from_numpy_dtype(terms.return_type)
childern = [
build_node(table, child, builder) for child in terms.operands
]
return builder.make_function(
math_calls[terms.op], childern, return_type
)

raise TypeError("Unsupported term type: %s" % terms)

def can_be_condition(expr):
"""
Check whether the passed expression is a conditional operation.

Parameters
----------
expr : pandas.core.computation.expr.Expr

Returns
-------
bool
"""
if isinstance(expr.terms, BinOp):
if expr.terms.op in cmp_ops or expr.terms.op in ("&", "|"):
return True
elif isinstance(expr.terms, UnaryOp):
if expr.terms.op == "~":
return True
return False

def filter_with_selection_vector(table, s):
"""
Filter passed pyarrow table with the specified filter.

Parameters
----------
table : pyarrow.Table
s : pyarrow.gandiva.SelectionVector

Returns
-------
pyarrow.Table
"""
record_batch = table.to_batches()[0]
indices = s.to_array() # .to_numpy()
new_columns = [
pa.array(c.to_numpy()[indices]) for c in record_batch.columns
]
return pa.Table.from_arrays(new_columns, record_batch.schema.names)

def gandiva_query(table, query):
"""
Evaluate string query on the passed table.

Parameters
----------
table : pyarrow.Table
Table to evaluate query on.
query : str
Query string to evaluate on the `table` columns.

Returns
-------
pyarrow.Table
"""
expr = gen_table_expr(table, query)
if not can_be_condition(expr):
raise ValueError("Root operation should be a filter.")

# We use this import here because of https://github.com/modin-project/modin/issues/3849,
# after the issue is fixed we should put the import at the top of this file
import pyarrow.gandiva as gandiva

builder = gandiva.TreeExprBuilder()
root = build_node(table, expr.terms, builder)
cond = builder.make_condition(root)
filt = gandiva.make_filter(table.schema, cond)
sel_vec = filt.evaluate(table.to_batches()[0], pa.default_memory_pool())
result = filter_with_selection_vector(table, sel_vec)
return result

def query_builder(arrow_table, **kwargs):
"""Evaluate string query on the passed pyarrow table."""
return gandiva_query(arrow_table, kwargs.get("expr", ""))

kwargs["expr"] = expr
# FIXME: `PandasQueryCompiler._prepare_method` was removed in #721,
# it is no longer needed to wrap function to apply.
func = self._prepare_method(query_builder, **kwargs)
# FIXME: `PandasQueryCompiler._map_across_full_axis` was removed in #721.
# This method call should be replaced to its equivalent from `operators.function`
new_data = self._map_across_full_axis(1, func)
# Query removes rows, so we need to update the index
new_index = self._compute_index(0, new_data, False)
return self.__constructor__(
new_data, new_index, self.columns, self._dtype_cache
)

def _compute_index(self, axis, data_object, compute_diff=True):
"""
Compute index labels of the passed Modin Frame along specified axis.
Expand Down
37 changes: 20 additions & 17 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from __future__ import annotations
import pandas
from pandas.core.common import apply_if_callable, get_cython_func
from pandas.core.computation.eval import _check_engine
from pandas.core.dtypes.common import (
infer_dtype_from_object,
is_dict_like,
Expand Down Expand Up @@ -812,23 +813,18 @@ def eval(self, expr, inplace=False, **kwargs): # noqa: PR01, RT01, D200
"""
Evaluate a string describing operations on ``DataFrame`` columns.
"""
self._validate_eval_query(expr, **kwargs)
inplace = validate_bool_kwarg(inplace, "inplace")
self._update_var_dicts_in_kwargs(expr, kwargs)
new_query_compiler = self._query_compiler.eval(expr, **kwargs)
return_type = type(
pandas.DataFrame(columns=self.columns)
.astype(self.dtypes)
.eval(expr, **kwargs)
).__name__
if return_type == type(self).__name__:
return self._create_or_update_from_compiler(new_query_compiler, inplace)
else:
if inplace:
raise ValueError("Cannot operate inplace if there is no assignment")
return getattr(sys.modules[self.__module__], return_type)(
query_compiler=new_query_compiler
if _check_engine(kwargs.get("engine", None)) == "numexpr":
# on numexpr engine, pandas.eval returns np.array if input is not of pandas
# type, so we can't use pandas eval [1]. Even if we could, pandas eval seems
# to convert all the data to numpy and then do the numexpr add, which is
# slow for modin. The user would not really be getting the benefit of
# numexpr.
# [1] https://github.com/pandas-dev/pandas/blob/934eebb532cf50e872f40638a788000be6e4dda4/pandas/core/computation/align.py#L78
return self._default_to_pandas(
pandas.DataFrame.eval, expr, inplace=inplace, **kwargs
)
return pandas.DataFrame.eval(self, expr, inplace=inplace, **kwargs)

def fillna(
self,
Expand Down Expand Up @@ -1554,14 +1550,21 @@ def quantile(
method=method,
)

# methods and fields we need to use pandas.DataFrame.query
_AXIS_ORDERS = ["index", "columns"]
_get_index_resolvers = pandas.DataFrame._get_index_resolvers
_get_axis_resolvers = pandas.DataFrame._get_axis_resolvers
_get_cleaned_column_resolvers = pandas.DataFrame._get_cleaned_column_resolvers

def query(self, expr, inplace=False, **kwargs): # noqa: PR01, RT01, D200
"""
Query the columns of a ``DataFrame`` with a boolean expression.
"""
self._update_var_dicts_in_kwargs(expr, kwargs)
self._validate_eval_query(expr, **kwargs)
inplace = validate_bool_kwarg(inplace, "inplace")
new_query_compiler = self._query_compiler.query(expr, **kwargs)
new_query_compiler = pandas.DataFrame.query(
self, expr, inplace=False, **kwargs
)._query_compiler
return self._create_or_update_from_compiler(new_query_compiler, inplace)

def rename(
Expand Down
32 changes: 23 additions & 9 deletions modin/pandas/test/dataframe/test_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,11 +344,15 @@ def test_eval_df_arithmetic_subexpression():
@pytest.mark.parametrize("method", ["query", "eval"])
@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
@pytest.mark.parametrize("local_var", [2])
def test_eval_and_query_with_local_and_global_var(method, data, local_var):
@pytest.mark.parametrize("engine", ["python", "numexpr"])
def test_eval_and_query_with_local_and_global_var(method, data, engine, local_var):
modin_df, pandas_df = pd.DataFrame(data), pandas.DataFrame(data)
op = "+" if method == "eval" else "<"
for expr in (f"col1 {op} @local_var", f"col1 {op} @TEST_VAR"):
df_equals(getattr(modin_df, method)(expr), getattr(pandas_df, method)(expr))
df_equals(
getattr(modin_df, method)(expr, engine=engine),
getattr(pandas_df, method)(expr, engine=engine),
)


@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
Expand Down Expand Up @@ -409,7 +413,8 @@ def f(x, arg2=0, arg3=0):

@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
@pytest.mark.parametrize("funcs", query_func_values, ids=query_func_keys)
def test_query(data, funcs):
@pytest.mark.parametrize("engine", ["python", "numexpr"])
def test_query(data, funcs, engine):
if get_current_execution() == "BaseOnPython" and funcs != "col3 > col4":
pytest.xfail(
reason="In this case, we are faced with the problem of handling empty data frames - #4934"
Expand All @@ -418,12 +423,12 @@ def test_query(data, funcs):
pandas_df = pandas.DataFrame(data)

try:
pandas_result = pandas_df.query(funcs)
pandas_result = pandas_df.query(funcs, engine=engine)
except Exception as err:
with pytest.raises(type(err)):
modin_df.query(funcs)
modin_df.query(funcs, engine=engine)
else:
modin_result = modin_df.query(funcs)
modin_result = modin_df.query(funcs, engine=engine)
# `dtypes` must be evaluated after `query` so we need to check cache
assert modin_result._query_compiler._modin_frame.has_dtypes_cache
df_equals(modin_result, pandas_result)
Expand All @@ -437,23 +442,32 @@ def test_empty_query():
modin_df.query("")


def test_query_after_insert():
@pytest.mark.parametrize("engine", ["python", "numexpr"])
def test_query_after_insert(engine):
modin_df = pd.DataFrame({"x": [-1, 0, 1, None], "y": [1, 2, None, 3]})
modin_df["z"] = modin_df.eval("x / y")
modin_df = modin_df.query("z >= 0")
modin_df = modin_df.query("z >= 0", engine=engine)
modin_result = modin_df.reset_index(drop=True)
modin_result.columns = ["a", "b", "c"]

pandas_df = pd.DataFrame({"x": [-1, 0, 1, None], "y": [1, 2, None, 3]})
pandas_df["z"] = pandas_df.eval("x / y")
pandas_df = pandas_df.query("z >= 0")
pandas_df = pandas_df.query("z >= 0", engine=engine)
pandas_result = pandas_df.reset_index(drop=True)
pandas_result.columns = ["a", "b", "c"]

df_equals(modin_result, pandas_result)
df_equals(modin_df, pandas_df)


@pytest.mark.parametrize("engine", ["python", "numexpr"])
def test_query_with_element_access_issue_4580(engine):
pdf = pandas.DataFrame({"a": [0, 1, 2]})
# get two row partitions by concatenating
df = pd.concat([pd.DataFrame(pdf[:1]), pd.DataFrame(pdf[1:])])
eval_general(df, pdf, lambda df: df.query("a == a[0]", engine=engine))


@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
@pytest.mark.parametrize(
"func",
Expand Down
Loading