Skip to content

Commit

Permalink
FIX-modin-project#4478: Support level param in query and eval.
Browse files Browse the repository at this point in the history
Signed-off-by: mvashishtha <[email protected]>
  • Loading branch information
mvashishtha committed Jun 6, 2022
1 parent c1d5dbd commit 13f25d6
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 13 deletions.
61 changes: 51 additions & 10 deletions modin/pandas/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,25 +795,40 @@ def equals(self, other): # noqa: PR01, RT01, D200
and self.eq(other).all().all()
)

def _update_var_dicts_in_kwargs(self, expr, kwargs):
def _update_var_dicts_in_kwargs(self, expr, level: int, kwargs):
"""
Copy variables with "@" prefix in `local_dict` and `global_dict` keys of kwargs.
This function exists because we neeed to infer local and variables
ourselves here in the main node instead of having the remote functions
infer them on their own. The remote functions take place in a different
thread with their own stacks that normally do not match the stack that
leads to the Modin eval or query call.
Parameters
----------
expr : str
The expression string to search variables with "@" prefix.
level : int
The number of prior stack frames back to look for local variables.
level=0 means look just one frame back.
kwargs : dict
See the documentation for eval() for complete details on the keyword arguments accepted by query().
"""
if "@" not in expr:
return
frame = sys._getframe()
try:
f_locals = frame.f_back.f_back.f_back.f_back.f_locals
f_globals = frame.f_back.f_back.f_back.f_back.f_globals
finally:
del frame

# Bump level up one because this function is wrapped in the Modin
# logging function wrapper. The alternative is for the wrapper to
# give the Modin functions that deal with `level` the special
# treatment of bumping `level` up by one, but that's not so nice
# either.
level += 1

frame = sys._getframe(level + 1)
f_locals = frame.f_locals
f_globals = frame.f_globals
del frame
local_names = set(re.findall(r"@([\w]+)", expr))
local_dict = {}
global_dict = {}
Expand All @@ -836,14 +851,27 @@ def eval(self, expr, inplace=False, **kwargs): # noqa: PR01, RT01, D200
"""
Evaluate a string describing operations on ``DataFrame`` columns.
"""
# Remove `level` from the kwargs if it's already there. "level" doesn't
# make sense within the remote execution context, where the remote
# functions have a stack which is different from the stack on the
# driver node that ends in the modin eval call.
level = kwargs.pop("level", 0)

# Bump level up one because this function is wrapped in the Modin
# logging function wrapper. The alternative is for the wrapper to
# give the Modin functions that deal with `level` the special
# treatment of bumping `level` up by one, but that's not so nice
# either.
level += 1
self._validate_eval_query(expr, **kwargs)
inplace = validate_bool_kwarg(inplace, "inplace")
self._update_var_dicts_in_kwargs(expr, kwargs)
self._update_var_dicts_in_kwargs(expr, level + 1, kwargs)
# Make sure to not pass level to query compiler
new_query_compiler = self._query_compiler.eval(expr, **kwargs)
return_type = type(
pandas.DataFrame(columns=self.columns)
.astype(self.dtypes)
.eval(expr, **kwargs)
.eval(expr, level=level + 1, **kwargs)
).__name__
if return_type == type(self).__name__:
return self._create_or_update_from_compiler(new_query_compiler, inplace)
Expand Down Expand Up @@ -1692,9 +1720,22 @@ def query(self, expr, inplace=False, **kwargs): # noqa: PR01, RT01, D200
"""
Query the columns of a ``DataFrame`` with a boolean expression.
"""
self._update_var_dicts_in_kwargs(expr, kwargs)
# Remove `level` from the kwargs if it's already there. "level" doesn't
# make sense within the remote execution context, where the remote
# functions have a stack which is different from the stack on the
# driver node that ends in the modin eval call.
level = kwargs.pop("level", 0)

# Bump level up one because this function is wrapped in the Modin
# logging function wrapper. The alternative is for the wrapper to
# give the Modin functions that deal with `level` the special
# treatment of bumping `level` up by one, but that's not so nice
# eitiher.
level += 1
self._update_var_dicts_in_kwargs(expr, level + 1, kwargs)
self._validate_eval_query(expr, **kwargs)
inplace = validate_bool_kwarg(inplace, "inplace")
# Make sure to not pass level to query compiler.
new_query_compiler = self._query_compiler.query(expr, **kwargs)
return self._create_or_update_from_compiler(new_query_compiler, inplace)

Expand Down
23 changes: 20 additions & 3 deletions modin/pandas/test/dataframe/test_udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,12 +285,29 @@ def test_eval_df_arithmetic_subexpression():

@pytest.mark.parametrize("method", ["query", "eval"])
@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
@pytest.mark.parametrize("local_var", [2])
def test_eval_and_query_with_local_and_global_var(method, data, local_var):
@pytest.mark.parametrize("level", [0, 1])
def test_eval_and_query_with_local_and_global_var(method, data, level):
# NOTE: if this test is failing because pandas or modin can't find
# the local variable and you are investigating why, it's worth
# noting that Modin's `query` and `eval` functions bump scope `level`
# up by 1 to reflect the Modin logging function wrapper.
local_var = 1 # noqa: F841 eval and query can reach back in the call
# stack to access this variable

def df_method_with_local_var(df, method: str, *args, **kwargs):
# Give this level of the stack a different value of local_var, so that
# passing `level=0` uses value 0, and `level=1` uses value 1.
local_var = 0 # noqa: F841 eval and query can reach back in the call
# stack to access this variable
return getattr(df, method)(*args, **kwargs)

modin_df, pandas_df = pd.DataFrame(data), pandas.DataFrame(data)
op = "+" if method == "eval" else "<"
for expr in (f"col1 {op} @local_var", f"col1 {op} @TEST_VAR"):
df_equals(getattr(modin_df, method)(expr), getattr(pandas_df, method)(expr))
df_equals(
df_method_with_local_var(modin_df, method, expr, level=level),
df_method_with_local_var(pandas_df, method, expr, level=level),
)


@pytest.mark.parametrize("data", test_data_values, ids=test_data_keys)
Expand Down

0 comments on commit 13f25d6

Please sign in to comment.