Skip to content

Commit

Permalink
FIX-#6552: avoid FutureWarnings in groupby unless necessary (#6595)
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev authored Sep 26, 2023
1 parent ea8088a commit 405c8c3
Show file tree
Hide file tree
Showing 17 changed files with 376 additions and 65 deletions.
9 changes: 7 additions & 2 deletions modin/core/dataframe/algebra/default2pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

"""Module houses default GroupBy functions builder class."""

import warnings
from typing import Any

import pandas
Expand Down Expand Up @@ -59,7 +60,9 @@ def is_transformation_kernel(agg_func: Any) -> bool:
@classmethod
def _call_groupby(cls, df, *args, **kwargs): # noqa: PR01
"""Call .groupby() on passed `df`."""
return df.groupby(*args, **kwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
return df.groupby(*args, **kwargs)

@classmethod
def validate_by(cls, by):
Expand Down Expand Up @@ -563,7 +566,9 @@ def _call_groupby(cls, df, *args, **kwargs): # noqa: PR01
# In second case surrounding logic will supplement grouping columns,
# so we need to drop them after grouping is over; our originally
# selected column is always the first, so use it
return df.groupby(*args, **kwargs)[df.columns[0]]
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
return df.groupby(*args, **kwargs)[df.columns[0]]


class GroupByDefault(DefaultMethod):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def dtype(self) -> Tuple[DTypeKind, int, str, str]:
if self._dtype_cache is not None:
return self._dtype_cache

dtype = self._col.dtypes[0]
dtype = self._col.dtypes.iloc[0]

if isinstance(dtype, pandas.CategoricalDtype):
pandas_series = self._col.to_pandas().squeeze(axis=1)
Expand Down
10 changes: 8 additions & 2 deletions modin/core/dataframe/pandas/partitioning/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

"""The module defines base interface for an axis partition of a Modin DataFrame."""

import warnings

import numpy as np
import pandas

Expand Down Expand Up @@ -418,7 +420,9 @@ def deploy_axis_func(
A list of pandas DataFrames.
"""
dataframe = pandas.concat(list(partitions), axis=axis, copy=False)
result = func(dataframe, *f_args, **f_kwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
result = func(dataframe, *f_args, **f_kwargs)

if num_splits == 1:
# If we're not going to split the result, we don't need to specify
Expand Down Expand Up @@ -497,7 +501,9 @@ def deploy_func_between_two_axis_partitions(
for i in range(1, len(other_shape))
]
rt_frame = pandas.concat(combined_axis, axis=axis ^ 1, copy=False)
result = func(lt_frame, rt_frame, *f_args, **f_kwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
result = func(lt_frame, rt_frame, *f_args, **f_kwargs)
return split_result_of_axis_func_pandas(axis, num_splits, result)

@classmethod
Expand Down
15 changes: 14 additions & 1 deletion modin/core/execution/dask/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Memory,
NPartitions,
)
from modin.core.execution.utils import set_env
from modin.error_message import ErrorMessage


Expand All @@ -32,6 +33,14 @@ def initialize_dask():

try:
client = default_client()

def _disable_warnings():
import warnings

warnings.simplefilter("ignore", category=FutureWarning)

client.run(_disable_warnings)

except ValueError:
from distributed import Client

Expand All @@ -47,7 +56,11 @@ def initialize_dask():
num_cpus = CpuCount.get()
memory_limit = Memory.get()
worker_memory_limit = memory_limit // num_cpus if memory_limit else "auto"
client = Client(n_workers=num_cpus, memory_limit=worker_memory_limit)

# when the client is initialized, environment variables are inherited
with set_env(PYTHONWARNINGS="ignore::FutureWarning"):
client = Client(n_workers=num_cpus, memory_limit=worker_memory_limit)

if GithubCI.get():
# set these keys to run tests that write to the mock s3 service. this seems
# to be the way to pass environment variables to the workers:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

"""The module defines interface for a partition with pandas storage format and Python engine."""

import warnings

from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition
from modin.core.execution.python.common import PythonWrapper

Expand Down Expand Up @@ -116,7 +118,9 @@ def call_queue_closure(data, call_queue):

self._data = call_queue_closure(self._data, self.call_queue)
self.call_queue = []
return self.__constructor__(func(self._data.copy(), *args, **kwargs))
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
return self.__constructor__(func(self._data.copy(), *args, **kwargs))

def drain_call_queue(self):
"""Execute all operations stored in the call queue on the object wrapped by this partition."""
Expand Down
18 changes: 8 additions & 10 deletions modin/core/execution/ray/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
StorageFormat,
ValueSource,
)
from modin.core.execution.utils import set_env
from modin.error_message import ErrorMessage

from .engine_wrapper import RayWrapper
Expand Down Expand Up @@ -82,7 +83,10 @@ def initialize_ray(
# the `pandas` module has been fully imported inside of each process before
# any execution begins:
# https://github.com/modin-project/modin/pull/4603
env_vars = {"__MODIN_AUTOIMPORT_PANDAS__": "1"}
env_vars = {
"__MODIN_AUTOIMPORT_PANDAS__": "1",
"PYTHONWARNINGS": "ignore::FutureWarning",
}
if GithubCI.get():
# need these to write parquet to the moto service mocking s3.
env_vars.update(
Expand Down Expand Up @@ -143,9 +147,8 @@ def initialize_ray(
# time and doesn't enforce us with any overhead that Ray's native `runtime_env`
# is usually causing. You can visit this gh-issue for more info:
# https://github.com/modin-project/modin/issues/5157#issuecomment-1500225150
for key, value in env_vars.items():
os.environ[key] = value
ray.init(**ray_init_kwargs)
with set_env(**env_vars):
ray.init(**ray_init_kwargs)

if StorageFormat.get() == "Cudf":
from modin.core.execution.ray.implementations.cudf_on_ray.partitioning import (
Expand All @@ -163,12 +166,7 @@ def initialize_ray(
runtime_env_vars = ray.get_runtime_context().runtime_env.get("env_vars", {})
for varname, varvalue in env_vars.items():
if str(runtime_env_vars.get(varname, "")) != str(varvalue):
if is_cluster or (
# Here we relax our requirements for a non-cluster case allowing for the `env_vars`
# to be set at least as a process environment variable
not is_cluster
and os.environ.get(varname, "") != str(varvalue)
):
if is_cluster:
ErrorMessage.single_warning(
"When using a pre-initialized Ray cluster, please ensure that the runtime env "
+ f"sets environment variable {varname} to {varvalue}"
Expand Down
3 changes: 2 additions & 1 deletion modin/core/execution/unidist/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def initialize_unidist():
unidist.init()
""",
)

# TODO: allow unidist to inherit env variables on initialization
# with set_env(PYTHONWARNINGS="ignore::FutureWarning"):
unidist.init()

num_cpus = sum(v["CPU"] for v in unidist.cluster_resources().values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

"""Module houses class that wraps data (block partition) and its metadata."""

import warnings

import unidist

from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition
Expand Down Expand Up @@ -351,12 +353,16 @@ def _apply_func(partition, func, *args, **kwargs): # pragma: no cover
destructuring it causes a performance penalty.
"""
try:
result = func(partition, *args, **kwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
result = func(partition, *args, **kwargs)
# Sometimes Arrow forces us to make a copy of an object before we operate on it. We
# don't want the error to propagate to the user, and we want to avoid copying unless
# we absolutely have to.
except ValueError:
result = func(partition.copy(), *args, **kwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
result = func(partition.copy(), *args, **kwargs)
return (
result,
len(result) if hasattr(result, "__len__") else 0,
Expand Down Expand Up @@ -393,12 +399,16 @@ def _apply_list_of_funcs(call_queue, partition): # pragma: no cover
args = deserialize(f_args)
kwargs = deserialize(f_kwargs)
try:
partition = func(partition, *args, **kwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
partition = func(partition, *args, **kwargs)
# Sometimes Arrow forces us to make a copy of an object before we operate on it. We
# don't want the error to propagate to the user, and we want to avoid copying unless
# we absolutely have to.
except ValueError:
partition = func(partition.copy(), *args, **kwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
partition = func(partition.copy(), *args, **kwargs)

return (
partition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

"""Module houses classes responsible for storing a virtual partition and applying a function to it."""

import warnings

import pandas
import unidist

Expand Down Expand Up @@ -310,7 +312,9 @@ def _deploy_unidist_func(
Unidist functions are not detected by codecov (thus pragma: no cover).
"""
f_args = deserialize(f_args)
result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs)
if not extract_metadata:
return result
ip = unidist.get_ip()
Expand Down
31 changes: 31 additions & 0 deletions modin/core/execution/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

"""General utils for execution module."""

import contextlib
import os


@contextlib.contextmanager
def set_env(**environ):
"""
Temporarily set the process environment variables.
"""
old_environ = os.environ.copy()
os.environ.update(environ)
try:
yield
finally:
os.environ.clear()
os.environ.update(old_environ)
5 changes: 4 additions & 1 deletion modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""

import abc
import warnings
from typing import Hashable, List, Optional

import numpy as np
Expand Down Expand Up @@ -164,7 +165,9 @@ def default_to_pandas(self, pandas_op, *args, **kwargs):
args = try_cast_to_pandas(args)
kwargs = try_cast_to_pandas(kwargs)

result = pandas_op(try_cast_to_pandas(self), *args, **kwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
result = pandas_op(try_cast_to_pandas(self), *args, **kwargs)
if isinstance(result, (tuple, list)):
return [self.__wrap_in_qc(obj) for obj in result]
return self.__wrap_in_qc(result)
Expand Down
2 changes: 1 addition & 1 deletion modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2721,7 +2721,7 @@ def getitem_array(self, key):
# here we check for a subset of bool indexers only to simplify the code;
# there could (potentially) be more of those, but we assume the most frequent
# ones are just of bool dtype
if len(key.dtypes) == 1 and is_bool_dtype(key.dtypes[0]):
if len(key.dtypes) == 1 and is_bool_dtype(key.dtypes.iloc[0]):
self.__validate_bool_indexer(key.index)
return self.__getitem_bool(key, broadcast=True, dtypes="copy")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2779,8 +2779,8 @@ def test_dict(self):
at = mdf._query_compiler._modin_frame._partitions[0][0].get()
assert len(at.column(0).chunks) == nchunks

mdt = mdf.dtypes[0]
pdt = pdf.dtypes[0]
mdt = mdf.dtypes.iloc[0]
pdt = pdf.dtypes.iloc[0]
assert mdt == "category"
assert isinstance(mdt, pandas.CategoricalDtype)
assert str(mdt) == str(pdt)
Expand Down
34 changes: 18 additions & 16 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,23 +503,25 @@ def _default_to_pandas(self, op, *args, **kwargs):
args = try_cast_to_pandas(args)
kwargs = try_cast_to_pandas(kwargs)
pandas_obj = self._to_pandas()
if callable(op):
result = op(pandas_obj, *args, **kwargs)
elif isinstance(op, str):
# The inner `getattr` is ensuring that we are treating this object (whether
# it is a DataFrame, Series, etc.) as a pandas object. The outer `getattr`
# will get the operation (`op`) from the pandas version of the class and run
# it on the object after we have converted it to pandas.
attr = getattr(self._pandas_class, op)
if isinstance(attr, property):
result = getattr(pandas_obj, op)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
if callable(op):
result = op(pandas_obj, *args, **kwargs)
elif isinstance(op, str):
# The inner `getattr` is ensuring that we are treating this object (whether
# it is a DataFrame, Series, etc.) as a pandas object. The outer `getattr`
# will get the operation (`op`) from the pandas version of the class and run
# it on the object after we have converted it to pandas.
attr = getattr(self._pandas_class, op)
if isinstance(attr, property):
result = getattr(pandas_obj, op)
else:
result = attr(pandas_obj, *args, **kwargs)
else:
result = attr(pandas_obj, *args, **kwargs)
else:
ErrorMessage.catch_bugs_and_request_email(
failure_condition=True,
extra_log="{} is an unsupported operation".format(op),
)
ErrorMessage.catch_bugs_and_request_email(
failure_condition=True,
extra_log="{} is an unsupported operation".format(op),
)
# SparseDataFrames cannot be serialized by arrow and cause problems for Modin.
# For now we will use pandas.
if isinstance(result, type(self)) and not isinstance(
Expand Down
Loading

0 comments on commit 405c8c3

Please sign in to comment.