Skip to content

Commit

Permalink
Fix Dataframe.__setitem__ slow-downs (#17222)
Browse files Browse the repository at this point in the history
Fixes: #17140 

This PR fixes slow-downs in `DataFrame.__seitem__` by properly passing in CPU objects where needed instead of passing a GPU object and then failing and performing a GPU -> CPU transfer.

`DataFrame.__setitem__` first argument can be a column(pd.Index), in our fast path this will be converted to `cudf.Index` and thus there will be failure from cudf side and then the transfer to CPU + slow-path executes, this is the primary reason for slowdown. This PR maintains a dict mapping of such special functions where we shouldn't be converting the objects to fast path.

Authors:
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - Matthew Murray (https://github.com/Matt711)

URL: #17222
  • Loading branch information
galipremsagar authored Nov 12, 2024
1 parent 5cbdcd0 commit 84743c3
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 1 deletion.
49 changes: 48 additions & 1 deletion python/cudf/cudf/pandas/fast_slow_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ def call_operator(fn, args, kwargs):
"EXECUTE_SLOW": 0x0571B0,
}

# This is a dict of functions that are known to have arguments that
# need to be transformed from fast to slow only. i.e., Some cudf functions
# error on passing a device object but don't error on passing a host object.
# For example: DataFrame.__setitem__(arg, value) errors on passing a
# cudf.Index object but doesn't error on passing a pd.Index object.
# Hence we need to transform the arg from fast to slow only. So, we use
# a dictionary like:
# {"DataFrame.__setitem__": {0}}
# where the keys are the function names and the values are the indices
# (0-based) of the arguments that need to be transformed.

_SPECIAL_FUNCTIONS_ARGS_MAP = {
"DataFrame.__setitem__": {0},
}

_WRAPPER_ASSIGNMENTS = tuple(
attr
Expand Down Expand Up @@ -875,6 +889,10 @@ def __name__(self, value):
pass
setattr(self._fsproxy_slow, "__name__", value)

@property
def _customqualname(self):
return self._fsproxy_slow.__qualname__


def _assert_fast_slow_eq(left, right):
if _is_final_type(type(left)) or type(left) in NUMPY_TYPES:
Expand Down Expand Up @@ -1011,7 +1029,36 @@ def _transform_arg(
# use __reduce_ex__ instead...
if type(arg) is tuple:
# Must come first to avoid infinite recursion
return tuple(_transform_arg(a, attribute_name, seen) for a in arg)
if (
len(arg) > 0
and isinstance(arg[0], _MethodProxy)
and arg[0]._customqualname in _SPECIAL_FUNCTIONS_ARGS_MAP
):
indices_map = _SPECIAL_FUNCTIONS_ARGS_MAP[
arg[0]._customqualname
]
method_proxy, original_args, original_kwargs = arg

original_args = tuple(
_transform_arg(a, "_fsproxy_slow", seen)
if i - 1 in indices_map
else _transform_arg(a, attribute_name, seen)
for i, a in enumerate(original_args)
)
original_kwargs = _transform_arg(
original_kwargs, attribute_name, seen
)
return tuple(
(
_transform_arg(method_proxy, attribute_name, seen),
original_args,
original_kwargs,
)
)
else:
return tuple(
_transform_arg(a, attribute_name, seen) for a in arg
)
elif hasattr(arg, "__getnewargs_ex__"):
# Partial implementation of to reconstruct with
# transformed pieces
Expand Down
23 changes: 23 additions & 0 deletions python/cudf/cudf_pandas_tests/test_cudf_pandas.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import pickle
import subprocess
import tempfile
import time
import types
from io import BytesIO, StringIO

Expand Down Expand Up @@ -1795,3 +1796,25 @@ def test_iter_doesnot_raise(monkeypatch):
monkeycontext.setenv("CUDF_PANDAS_FAIL_ON_FALLBACK", "True")
for _ in s:
pass


def test_dataframe_setitem_slowdown():
# We are explicitly testing the slowdown of the setitem operation
df = xpd.DataFrame(
{"a": [1, 2, 3] * 100000, "b": [1, 2, 3] * 100000}
).astype("float64")
df = xpd.DataFrame({"a": df["a"].repeat(1000), "b": df["b"].repeat(1000)})
new_df = df + 1
start_time = time.time()
df[df.columns] = new_df
end_time = time.time()
delta = int(end_time - start_time)
if delta > 5:
pytest.fail(f"Test took too long to run, runtime: {delta}")


def test_dataframe_setitem():
df = xpd.DataFrame({"a": [1, 2, 3], "b": [1, 2, 3]}).astype("float64")
new_df = df + 1
df[df.columns] = new_df
tm.assert_equal(df, new_df)

0 comments on commit 84743c3

Please sign in to comment.