Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#6552: avoid FutureWarnings in groupby unless necessary #6595

Merged
merged 13 commits into from
Sep 26, 2023
9 changes: 7 additions & 2 deletions modin/core/dataframe/algebra/default2pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

"""Module houses default GroupBy functions builder class."""

import warnings
from typing import Any

import pandas
Expand Down Expand Up @@ -59,7 +60,9 @@ def is_transformation_kernel(agg_func: Any) -> bool:
@classmethod
def _call_groupby(cls, df, *args, **kwargs): # noqa: PR01
"""Call .groupby() on passed `df`."""
return df.groupby(*args, **kwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
return df.groupby(*args, **kwargs)

@classmethod
def validate_by(cls, by):
Expand Down Expand Up @@ -563,7 +566,9 @@ def _call_groupby(cls, df, *args, **kwargs): # noqa: PR01
# In second case surrounding logic will supplement grouping columns,
# so we need to drop them after grouping is over; our originally
# selected column is always the first, so use it
return df.groupby(*args, **kwargs)[df.columns[0]]
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
return df.groupby(*args, **kwargs)[df.columns[0]]


class GroupByDefault(DefaultMethod):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def dtype(self) -> Tuple[DTypeKind, int, str, str]:
if self._dtype_cache is not None:
return self._dtype_cache

dtype = self._col.dtypes[0]
dtype = self._col.dtypes.iloc[0]
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid:

FutureWarning: Series.__getitem__ treating keys as positions is deprecated. In a future version, integer keys will always be treated as labels (consistent with DataFrame behavior). To access a value by position, use `ser.iloc[pos]`


if isinstance(dtype, pandas.CategoricalDtype):
pandas_series = self._col.to_pandas().squeeze(axis=1)
Expand Down
10 changes: 8 additions & 2 deletions modin/core/dataframe/pandas/partitioning/axis_partition.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

"""The module defines base interface for an axis partition of a Modin DataFrame."""

import warnings

import numpy as np
import pandas

Expand Down Expand Up @@ -418,7 +420,9 @@ def deploy_axis_func(
A list of pandas DataFrames.
"""
dataframe = pandas.concat(list(partitions), axis=axis, copy=False)
result = func(dataframe, *f_args, **f_kwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
result = func(dataframe, *f_args, **f_kwargs)

if num_splits == 1:
# If we're not going to split the result, we don't need to specify
Expand Down Expand Up @@ -497,7 +501,9 @@ def deploy_func_between_two_axis_partitions(
for i in range(1, len(other_shape))
]
rt_frame = pandas.concat(combined_axis, axis=axis ^ 1, copy=False)
result = func(lt_frame, rt_frame, *f_args, **f_kwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
result = func(lt_frame, rt_frame, *f_args, **f_kwargs)
return split_result_of_axis_func_pandas(axis, num_splits, result)

@classmethod
Expand Down
15 changes: 14 additions & 1 deletion modin/core/execution/dask/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
Memory,
NPartitions,
)
from modin.core.execution.utils import set_env
from modin.error_message import ErrorMessage


Expand All @@ -32,6 +33,14 @@ def initialize_dask():

try:
client = default_client()

def _disable_warnings():
import warnings

warnings.simplefilter("ignore", category=FutureWarning)

client.run(_disable_warnings)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to disable the warnings here, whereas we use the context manager below to enable the warnings?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this section we connect to the already created Dask cluster. Using an environment variable will not help here, since the processes are already running.

The context manager below does not enable warnings, but rather disables a certain category of warnings using an environment variable.


except ValueError:
from distributed import Client

Expand All @@ -47,7 +56,11 @@ def initialize_dask():
num_cpus = CpuCount.get()
memory_limit = Memory.get()
worker_memory_limit = memory_limit // num_cpus if memory_limit else "auto"
client = Client(n_workers=num_cpus, memory_limit=worker_memory_limit)

# when the client is initialized, environment variables are inherited
with set_env(PYTHONWARNINGS="ignore::FutureWarning"):
client = Client(n_workers=num_cpus, memory_limit=worker_memory_limit)

if GithubCI.get():
# set these keys to run tests that write to the mock s3 service. this seems
# to be the way to pass environment variables to the workers:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

"""The module defines interface for a partition with pandas storage format and Python engine."""

import warnings

from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition
from modin.core.execution.python.common import PythonWrapper

Expand Down Expand Up @@ -116,7 +118,9 @@ def call_queue_closure(data, call_queue):

self._data = call_queue_closure(self._data, self.call_queue)
self.call_queue = []
return self.__constructor__(func(self._data.copy(), *args, **kwargs))
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
return self.__constructor__(func(self._data.copy(), *args, **kwargs))

def drain_call_queue(self):
"""Execute all operations stored in the call queue on the object wrapped by this partition."""
Expand Down
18 changes: 8 additions & 10 deletions modin/core/execution/ray/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
StorageFormat,
ValueSource,
)
from modin.core.execution.utils import set_env
from modin.error_message import ErrorMessage

from .engine_wrapper import RayWrapper
Expand Down Expand Up @@ -82,7 +83,10 @@ def initialize_ray(
# the `pandas` module has been fully imported inside of each process before
# any execution begins:
# https://github.com/modin-project/modin/pull/4603
env_vars = {"__MODIN_AUTOIMPORT_PANDAS__": "1"}
env_vars = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we connect to an already running Ray cluster? Will we see FutureWarnings?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, yes, I don’t know of a way to change the state of an already running cluster on Ray.

"__MODIN_AUTOIMPORT_PANDAS__": "1",
"PYTHONWARNINGS": "ignore::FutureWarning",
Copy link
Collaborator

@dchigarev dchigarev Sep 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering why with catch_warnings(): inside of kernels doesn't work. Why wouldn't we just add the warning-catcher to the lowest level possible (inside the kernel that runs all other kernels), for example here:

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of distribution engines, the environment variable approach is simpler and more reliable (we do not need to keep track of all the call points of pandas functions).

Therefore, the question here is why we should not use this approach where it is possible.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to do this for the unidist, while setting variables does not work without mpiexec (at least I don't know how). cc @YarShev

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
if GithubCI.get():
# need these to write parquet to the moto service mocking s3.
env_vars.update(
Expand Down Expand Up @@ -143,9 +147,8 @@ def initialize_ray(
# time and doesn't enforce us with any overhead that Ray's native `runtime_env`
# is usually causing. You can visit this gh-issue for more info:
# https://github.com/modin-project/modin/issues/5157#issuecomment-1500225150
for key, value in env_vars.items():
os.environ[key] = value
ray.init(**ray_init_kwargs)
with set_env(**env_vars):
ray.init(**ray_init_kwargs)

if StorageFormat.get() == "Cudf":
from modin.core.execution.ray.implementations.cudf_on_ray.partitioning import (
Expand All @@ -163,12 +166,7 @@ def initialize_ray(
runtime_env_vars = ray.get_runtime_context().runtime_env.get("env_vars", {})
for varname, varvalue in env_vars.items():
if str(runtime_env_vars.get(varname, "")) != str(varvalue):
if is_cluster or (
# Here we relax our requirements for a non-cluster case allowing for the `env_vars`
# to be set at least as a process environment variable
not is_cluster
and os.environ.get(varname, "") != str(varvalue)
Comment on lines -167 to -170
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dchigarev Now all environment variables are set temporarily before Ray is initialized (in case of a local cluster), so this check will not work. This seems to be enough for Ray's processes to start using these variables.

I don't see any particular need to use these environment variables for the main process either. What do you think?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree

):
if is_cluster:
ErrorMessage.single_warning(
"When using a pre-initialized Ray cluster, please ensure that the runtime env "
+ f"sets environment variable {varname} to {varvalue}"
Expand Down
3 changes: 2 additions & 1 deletion modin/core/execution/unidist/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ def initialize_unidist():
unidist.init()
""",
)

# TODO: allow unidist to inherit env variables on initialization
# with set_env(PYTHONWARNINGS="ignore::FutureWarning"):
Comment on lines +48 to +49
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@YarShev is it possible to make that change?

I don't want to force users to set environment variables manually. In addition, in the case of warnings, this environment variable is only needed for worker processes, and not for main one.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change should work when running unidist on MPI with mpiexec -n <N> python foo.py. However, this won't have effect when using a dynamic spawn, i.e., mpiexec -n 1 python foo.py. Can you create a feature request in unidist repo to add an ability for passing configuration settings in to MPI workers?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change should work when running unidist on MPI with mpiexec -n <N> python foo.py. However, this won't have effect when using a dynamic spawn, i.e., mpiexec -n 1 python foo.py. Can you create a feature request in unidist repo to add an ability for passing configuration settings in to MPI workers?

modin-project/unidist#341

unidist.init()

num_cpus = sum(v["CPU"] for v in unidist.cluster_resources().values())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

"""Module houses class that wraps data (block partition) and its metadata."""

import warnings

import unidist

from modin.core.dataframe.pandas.partitioning.partition import PandasDataframePartition
Expand Down Expand Up @@ -351,12 +353,16 @@ def _apply_func(partition, func, *args, **kwargs): # pragma: no cover
destructuring it causes a performance penalty.
"""
try:
result = func(partition, *args, **kwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
result = func(partition, *args, **kwargs)
# Sometimes Arrow forces us to make a copy of an object before we operate on it. We
# don't want the error to propagate to the user, and we want to avoid copying unless
# we absolutely have to.
except ValueError:
result = func(partition.copy(), *args, **kwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
result = func(partition.copy(), *args, **kwargs)
return (
result,
len(result) if hasattr(result, "__len__") else 0,
Expand Down Expand Up @@ -393,12 +399,16 @@ def _apply_list_of_funcs(call_queue, partition): # pragma: no cover
args = deserialize(f_args)
kwargs = deserialize(f_kwargs)
try:
partition = func(partition, *args, **kwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
partition = func(partition, *args, **kwargs)
# Sometimes Arrow forces us to make a copy of an object before we operate on it. We
# don't want the error to propagate to the user, and we want to avoid copying unless
# we absolutely have to.
except ValueError:
partition = func(partition.copy(), *args, **kwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
partition = func(partition.copy(), *args, **kwargs)

return (
partition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

"""Module houses classes responsible for storing a virtual partition and applying a function to it."""

import warnings

import pandas
import unidist

Expand Down Expand Up @@ -310,7 +312,9 @@ def _deploy_unidist_func(
Unidist functions are not detected by codecov (thus pragma: no cover).
"""
f_args = deserialize(f_args)
result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
result = deployer(axis, f_to_deploy, f_args, f_kwargs, *args, **kwargs)
if not extract_metadata:
return result
ip = unidist.get_ip()
Expand Down
31 changes: 31 additions & 0 deletions modin/core/execution/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

"""General utils for execution module."""

import contextlib
import os


@contextlib.contextmanager
def set_env(**environ):
"""
Temporarily set the process environment variables.
"""
old_environ = os.environ.copy()
os.environ.update(environ)
try:
yield
finally:
os.environ.clear()
os.environ.update(old_environ)
5 changes: 4 additions & 1 deletion modin/core/storage_formats/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
"""

import abc
import warnings
from typing import Hashable, List, Optional

import numpy as np
Expand Down Expand Up @@ -164,7 +165,9 @@ def default_to_pandas(self, pandas_op, *args, **kwargs):
args = try_cast_to_pandas(args)
kwargs = try_cast_to_pandas(kwargs)

result = pandas_op(try_cast_to_pandas(self), *args, **kwargs)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
result = pandas_op(try_cast_to_pandas(self), *args, **kwargs)
if isinstance(result, (tuple, list)):
return [self.__wrap_in_qc(obj) for obj in result]
return self.__wrap_in_qc(result)
Expand Down
2 changes: 1 addition & 1 deletion modin/core/storage_formats/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2721,7 +2721,7 @@ def getitem_array(self, key):
# here we check for a subset of bool indexers only to simplify the code;
# there could (potentially) be more of those, but we assume the most frequent
# ones are just of bool dtype
if len(key.dtypes) == 1 and is_bool_dtype(key.dtypes[0]):
if len(key.dtypes) == 1 and is_bool_dtype(key.dtypes.iloc[0]):
self.__validate_bool_indexer(key.index)
return self.__getitem_bool(key, broadcast=True, dtypes="copy")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2779,8 +2779,8 @@ def test_dict(self):
at = mdf._query_compiler._modin_frame._partitions[0][0].get()
assert len(at.column(0).chunks) == nchunks

mdt = mdf.dtypes[0]
pdt = pdf.dtypes[0]
mdt = mdf.dtypes.iloc[0]
pdt = pdf.dtypes.iloc[0]
assert mdt == "category"
assert isinstance(mdt, pandas.CategoricalDtype)
assert str(mdt) == str(pdt)
Expand Down
34 changes: 18 additions & 16 deletions modin/pandas/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -503,23 +503,25 @@ def _default_to_pandas(self, op, *args, **kwargs):
args = try_cast_to_pandas(args)
kwargs = try_cast_to_pandas(kwargs)
pandas_obj = self._to_pandas()
if callable(op):
result = op(pandas_obj, *args, **kwargs)
elif isinstance(op, str):
# The inner `getattr` is ensuring that we are treating this object (whether
# it is a DataFrame, Series, etc.) as a pandas object. The outer `getattr`
# will get the operation (`op`) from the pandas version of the class and run
# it on the object after we have converted it to pandas.
attr = getattr(self._pandas_class, op)
if isinstance(attr, property):
result = getattr(pandas_obj, op)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=FutureWarning)
if callable(op):
result = op(pandas_obj, *args, **kwargs)
elif isinstance(op, str):
# The inner `getattr` is ensuring that we are treating this object (whether
# it is a DataFrame, Series, etc.) as a pandas object. The outer `getattr`
# will get the operation (`op`) from the pandas version of the class and run
# it on the object after we have converted it to pandas.
attr = getattr(self._pandas_class, op)
if isinstance(attr, property):
result = getattr(pandas_obj, op)
else:
result = attr(pandas_obj, *args, **kwargs)
else:
result = attr(pandas_obj, *args, **kwargs)
else:
ErrorMessage.catch_bugs_and_request_email(
failure_condition=True,
extra_log="{} is an unsupported operation".format(op),
)
ErrorMessage.catch_bugs_and_request_email(
failure_condition=True,
extra_log="{} is an unsupported operation".format(op),
)
# SparseDataFrames cannot be serialized by arrow and cause problems for Modin.
# For now we will use pandas.
if isinstance(result, type(self)) and not isinstance(
Expand Down
Loading