Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#4507: Do not call 'ray.get()' inside of the kernel executing call queues #6633

Merged
merged 5 commits into from
Oct 11, 2023

Conversation

dchigarev
Copy link
Collaborator

@dchigarev dchigarev commented Oct 5, 2023

What do these changes do?

The PR was brought to eliminate ray.get() calls from inside the ray kernel that executes the partitions call queue, which sometimes triggered a deadlock. Before this PR, materialization of all the futures was explicitly triggered inside the kernel using ray.get(), and now all the futures are being passed to the kernel as a variable-length parameter, allowing for Ray to materialize them implicitly beforehand.

Testing performance shows no major difference for other cases:
image

case 1 (getitem + transpose); 29 elements in queue
import numpy as np
import modin.pandas as pd
from asv_bench.benchmarks.utils.common import execute

from timeit import default_timer as timer

import modin.config as cfg
pd.DataFrame([np.arange(cfg.NPartitions.get() * cfg.MinPartitionSize.get())]).to_numpy()

NROWS, NCOLS = (5_000_000, 64)
df = pd.DataFrame({f"col{i}": np.random.randint(0, 1_000_000, NROWS) for i in range(NCOLS)})

cols = df.columns.tolist()

for i in range(1, 20):
    df = df[cols[:-i]]

for i in range(10):
    df = df.T

print(
    f"call queue len: {len(df._query_compiler._modin_frame._partitions[-1, -1].call_queue)}"
) # 29
t1 = timer()
res = df + 10
execute(res)
print(timer() - t1)
case 2 (various funcs); 4 elements in the queue
import numpy as np
import modin.pandas as pd
from asv_bench.benchmarks.utils.common import execute

from timeit import default_timer as timer

import modin.config as cfg
pd.DataFrame([np.arange(cfg.NPartitions.get() * cfg.MinPartitionSize.get())]).to_numpy()

NROWS, NCOLS = (5_000_000, 64)
df = pd.DataFrame({f"col{i}": np.random.randint(0, 1_000_000, NROWS) for i in range(NCOLS)})

def lazy_apply(df, func, func_args):
    mf = df._query_compiler._modin_frame
    new_parts = mf._partition_mgr_cls.lazy_map_partitions(mf._partitions, func, func_args)
    new_mf = mf.__constructor__(
        new_parts,
        mf.copy_index_cache(copy_lengths=True),
        mf.copy_columns_cache(copy_lengths=True),
        mf._row_lengths_cache,
        mf._column_widths_cache,
        dtypes=mf.copy_dtypes_cache(),
    )
    return type(df)(query_compiler=type(df._query_compiler)(new_mf))


df = lazy_apply(df, lambda df: df + 100, ())
df = lazy_apply(df, lambda df, arg: df * arg, [np.arange(32)])
# df = lazy_apply(df, lambda df, arg: df - arg, [df._query_compiler._modin_frame._partitions[0, 0]._data])
df = lazy_apply(df, lambda df, arg: df.fillna(arg), [10])
df = lazy_apply(df, lambda df, arg: df.astype(arg), ["float"])
# df = lazy_apply(df, lambda df, arg: df * arg, [df._query_compiler._modin_frame._partitions[0, 0]._data])

print("call queue len", len(df._query_compiler._modin_frame._partitions[0, 0].call_queue))
t1 = timer()
execute(df)
print(timer() - t1)
case 3 (various funcs, including those that has modin df as a parameter); 6 elements in the queue
import numpy as np
import modin.pandas as pd
from asv_bench.benchmarks.utils.common import execute

from timeit import default_timer as timer

import modin.config as cfg
pd.DataFrame([np.arange(cfg.NPartitions.get() * cfg.MinPartitionSize.get())]).to_numpy()

NROWS, NCOLS = (5_000_000, 64)
df = pd.DataFrame({f"col{i}": np.random.randint(0, 1_000_000, NROWS) for i in range(NCOLS)})

def lazy_apply(df, func, func_args):
    mf = df._query_compiler._modin_frame
    new_parts = mf._partition_mgr_cls.lazy_map_partitions(mf._partitions, func, func_args)
    new_mf = mf.__constructor__(
        new_parts,
        mf.copy_index_cache(copy_lengths=True),
        mf.copy_columns_cache(copy_lengths=True),
        mf._row_lengths_cache,
        mf._column_widths_cache,
        dtypes=mf.copy_dtypes_cache(),
    )
    return type(df)(query_compiler=type(df._query_compiler)(new_mf))


df = lazy_apply(df, lambda df: df + 100, ())
df = lazy_apply(df, lambda df, arg: df * arg, [np.arange(32)])
df = lazy_apply(df, lambda df, arg: df - arg, [df._query_compiler._modin_frame._partitions[0, 0]._data])
df = lazy_apply(df, lambda df, arg: df.fillna(arg), [10])
df = lazy_apply(df, lambda df, arg: df.astype(arg), ["float"])
df = lazy_apply(df, lambda df, arg: df * arg, [df._query_compiler._modin_frame._partitions[0, 0]._data])

print("call queue len", len(df._query_compiler._modin_frame._partitions[0, 0].call_queue))
t1 = timer()
execute(df)
print(timer() - t1)
case 4 (various funcs, including those that has modin df as a parameter); 60 elements in the queue
import numpy as np
import modin.pandas as pd
from asv_bench.benchmarks.utils.common import execute

from timeit import default_timer as timer

import modin.config as cfg
pd.DataFrame([np.arange(cfg.NPartitions.get() * cfg.MinPartitionSize.get())]).to_numpy()

NROWS, NCOLS = (5_000_000, 64)
df = pd.DataFrame({f"col{i}": np.random.randint(0, 1_000_000, NROWS) for i in range(NCOLS)})

def lazy_apply(df, func, func_args):
    mf = df._query_compiler._modin_frame
    new_parts = mf._partition_mgr_cls.lazy_map_partitions(mf._partitions, func, func_args)
    new_mf = mf.__constructor__(
        new_parts,
        mf.copy_index_cache(copy_lengths=True),
        mf.copy_columns_cache(copy_lengths=True),
        mf._row_lengths_cache,
        mf._column_widths_cache,
        dtypes=mf.copy_dtypes_cache(),
    )
    return type(df)(query_compiler=type(df._query_compiler)(new_mf))


for i in range(10):
    df = lazy_apply(df, lambda df: df + 100, ())
    df = lazy_apply(df, lambda df, arg: df * arg, [np.arange(32)])
    df = lazy_apply(df, lambda df, arg: df - arg, [df._query_compiler._modin_frame._partitions[0, 0]._data])
    df = lazy_apply(df, lambda df, arg: df.fillna(arg), [10])
    df = lazy_apply(df, lambda df, arg: df.astype(arg), ["float"])
    df = lazy_apply(df, lambda df, arg: df * arg, [df._query_compiler._modin_frame._partitions[0, 0]._data])

print("call queue len", len(df._query_compiler._modin_frame._partitions[0, 0].call_queue))
t1 = timer()
execute(df)
print(timer() - t1)
  • first commit message and PR title follow format outlined here

    NOTE: If you edit the PR title to match this format, you need to add another commit (even if it's empty) or amend your last commit for the CI job that checks the PR title to pick up the new PR title.

  • passes flake8 modin/ asv_bench/benchmarks scripts/doc_checker.py
  • passes black --check modin/ asv_bench/benchmarks scripts/doc_checker.py
  • signed commit with git commit -s
  • Resolves BUG: Ray remote tasks internally blocking on object IDs of their dependencies can lead to deadlock #4507
  • tests added and passing
  • module layout described at docs/development/architecture.rst is up-to-date

@@ -66,6 +66,43 @@ def __init__(self, data, length=None, width=None, ip=None, call_queue=None):
)
)

@staticmethod
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want us to keep the code for Ray engine in sync with unidist. If we enable unidist on Ray, then we will have an issue similar to one this PR resolves. So please add the respective changes to unidist engine. Perf measurements would also be useful.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, I can't test this with unidist because of modin-project/unidist#354

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue was fixed on unidist side and the fix will be available in unidist 0.5.0. I suppose you could test the changes without .wait method.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also release unidist 0.4.2 with that fix if it is really necessary.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also release unidist 0.4.2 with that fix if it is really necessary.

I think I'll just test with the master branch of unidist, so no urgent for the release

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you think it makes sense to port them to the unidist backend even if the problem is not reproducible with unidist?

I think it makes sense to port the changes to the unidist backend because as I said if we enable unidist on Ray in future, then we will have an issue similar to one this PR resolves.

I've just looked at the code of unidist partitioning, and it looks 100% identical to the Ray's partitions code. Maybe it's better to come-up with some unified classes for both ray and unidist instead of copying the code?

We already had a similar discussion when integrating unidist into Modin. We ported changes from Ray engine to unidist and just replaced ray.* call to unidist.* calls. If we came up with some unified classes for both ray and unidist instead of copying the code, I assume we would have criss-crossed dependencies for Ray and unidist, which we don't want. Since unidist has Ray-like API, it looks like code duplication. If unidist had different API from Ray, we wouldn't see code duplication.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to port the changes to unidist, but it doesn't work very well
image

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm... it is strange. Ok, let's then file an issue for porting the changes to unidist engine and also describe the perf problem there. For this PR we will leave the change for Ray only. Thanks for the measurements.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also tried to compare these two approaches with pure unidist and have been able to reproduce the deadlock caused by calling unidist.get() inside a kernel... Strange that it didn't happen with modin

import unidist
import pandas as pd
from timeit import default_timer as timer
import warnings

unidist.init()

def gen_df():
    return pd.DataFrame(range(10))

@unidist.remote
def func1(*args):
    return args

@unidist.remote
def func2(args):
    res = []
    for a in args:
        res.append(unidist.get(a))
    return res

remote_objs = [unidist.put(gen_df()) for _ in range(3)]

warnings.warn("running func1...")
t1 = timer()
unidist.wait(func1.remote(*remote_objs)) # works fine
print(timer() - t1)

remote_objs = [unidist.put(gen_df()) for _ in range(3)]

warnings.warn("running func2...")
t1 = timer() 
unidist.wait(func2.remote(remote_objs)) # deadlock
print(timer() - t1)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strange that it didn't happen with modin

unidist on MPI internally deconstructs all nested object refs and pass those into a kernel so data is materialized in the kernel. However, your example looks as a problem in unidist anyway. Can you file an issue in unidist. We will take a look at it.

@YarShev
Copy link
Collaborator

YarShev commented Oct 10, 2023

@modin-project/modin-core, please take a look

for func, f_args, f_kwargs in call_queue:
func = deserialize(func)
args = deserialize(f_args)
kwargs = deserialize(f_kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we are passing anything at all through this parameter that could lead to a hang? I believe objects created explicitly using ray.put should work correctly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we are passing anything at all through this parameter that could lead to a hang?

I think that we're indeed not passing anything like that via func nor via f_kwargs, but I thought that it might be a good idea to unify the approach across f_args, funcs and f_kwargs to avoid such problems in the future

Signed-off-by: Dmitry Chigarev <[email protected]>
Signed-off-by: Dmitry Chigarev <[email protected]>
Signed-off-by: Dmitry Chigarev <[email protected]>
modin/core/execution/ray/common/utils.py Outdated Show resolved Hide resolved
modin/core/execution/ray/common/utils.py Outdated Show resolved Hide resolved
was_iterable = False
value = [value]
unfolded_queue.extend(value)
value_lengths.append({"len": len(value), "was_iterable": was_iterable})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe using tuple (len(value), was_iterable) here will be faster. Consider only if unboxing/packing performance is not sufficient.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well, it may be faster for a couple of nanoseconds, but I would prefer readability of the code to saving not sufficient amount of time here

Signed-off-by: Dmitry Chigarev <[email protected]>
Copy link
Collaborator

@anmyachev anmyachev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

@anmyachev anmyachev merged commit b0ef411 into modin-project:master Oct 11, 2023
anmyachev pushed a commit to anmyachev/modin that referenced this pull request Oct 30, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

BUG: Ray remote tasks internally blocking on object IDs of their dependencies can lead to deadlock
3 participants