Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Fix segfault when cancel and generator is used with a high load #40083

Closed
wants to merge 14 commits into from

Conversation

rkooo567
Copy link
Contributor

@rkooo567 rkooo567 commented Oct 4, 2023

Why are these changes needed?

Right now, future.result() returns and rasie an exception when future.cancel() is called although the coroutine itself is actually not cancelled. I verified this from a local example.

This means the future.result can return and the task states are cleaned up (since task is cancelled) while we are running the actual coroutine & cpp code in a different thread. This causes a random sigabrt and sigsegv when there are lots of cancel & generator tasks are running.

For a generator, we are using a custom await between generator for the perf reason:

done = await loop.run_in_executor(
. The cpp func running in a separte thread kept running while the task is cleaned up, which causes the segfault.

The same thing can technically happen for a regular async task (if it uses thread pool). If it doesn't use a thread pool, it doesn't happen because the future.cancel is not scheduled until the coroutine is awaited (whereas for a generator, it always have await between each run because of the threadpool above).

I fixed the issue by waiting until the coroutine is finished. Note that currently it is a blocking call, but it will mostly be a short time because when the cancellation error is raised the coroutine is already cancelled.

Related issue number

Closes #39703

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Copy link
Collaborator

@jjyao jjyao left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now, future.result() returns and rasie an exception when future.cancel() is called although the coroutine itself is actually not cancelled. I verified this from a local example.

Is this behavior documented by Python?

SangBin Cho added 3 commits October 4, 2023 19:14
@rkooo567
Copy link
Contributor Author

rkooo567 commented Oct 4, 2023

It is not documented, but a relevant issue; python/cpython#105836

@@ -0,0 +1,33 @@
import requests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

Copy link
Contributor

@edoakes edoakes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks extremely sketchy and blocking the loop makes me uncomfortable.

We are already using an event to signal the completion of the coroutine inside of run_async_func_or_coro_in_event_loop:

future.add_done_callback(lambda _: event.Notify())

I assume the current issue is that the callback added with add_done_callback is running prior to the coroutine actually completing. If that's the case, then how about we move event.Notify() into a finally block in the async_func just above?

I believe that'll accomplish the same thing without requiring the blocking wait() call.

@@ -1196,36 +1196,43 @@ async def execute_streaming_generator_async(

Args:
context: The context to execute streaming generator.
coroutine_complete_event: The asyncio.Event to notify the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this and the type hint say asyncio.Event but a threading.Event is passed in

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah I tried with non-blocking appraoch, and it didn't work, and I changed to blocking way. Let's just go with #40122 as it makes more sense to me..

@@ -2235,7 +2260,7 @@ cdef void cancel_async_task(
function_descriptor, name_of_concurrency_group_to_execute)
future = worker.core_worker.get_queued_future(task_id)
if future is not None:
future.cancel()
eventloop.call_soon_threadsafe(future.cancel)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why's this change needed btw?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to reduce the impact of the blocking call (so when things are cancelled, the coroutine is already context switched). I think with the new fix, it is not necessray

@edoakes
Copy link
Contributor

edoakes commented Oct 4, 2023

@rkooo567 I just verified that my suggested change fixes the test case you've added. Without the following diff, I see the segfaults. With the following diff, the test passes consistently (without any of your other changes):

diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx
index af1e777680..5449563478 100644
--- a/python/ray/_raylet.pyx
+++ b/python/ray/_raylet.pyx
@@ -4280,22 +4280,24 @@ cdef class CoreWorker:
             function_descriptor, specified_cgname)
 
         async def async_func():
-            if task_id:
-                async_task_id.set(task_id)
+            try:
+                if task_id:
+                    async_task_id.set(task_id)
 
-            if inspect.isawaitable(func_or_coro):
-                coroutine = func_or_coro
-            else:
-                coroutine = func_or_coro(*args, **kwargs)
+                if inspect.isawaitable(func_or_coro):
+                    coroutine = func_or_coro
+                else:
+                    coroutine = func_or_coro(*args, **kwargs)
 
-            return await coroutine
+                return await coroutine
+            finally:
+                event.Notify()
 
         future = asyncio.run_coroutine_threadsafe(async_func(), eventloop)
         if task_id:
             with self._task_id_to_future_lock:
                 self._task_id_to_future[task_id] = future
 
-        future.add_done_callback(lambda _: event.Notify())
         with nogil:
             (CCoreWorkerProcess.GetCoreWorker()
                 .YieldCurrentFiber(event))

@edoakes
Copy link
Contributor

edoakes commented Oct 4, 2023

Made a PR to run CI against it: #40122

@rkooo567
Copy link
Contributor Author

rkooo567 commented Oct 4, 2023

I assume the current issue is that the callback added with add_done_callback is running prior to the coroutine actually completing. If that's the case, then how about we move event.Notify() into a finally block in the async_func just above?

@edoakes this fix makes sense

@rkooo567
Copy link
Contributor Author

rkooo567 commented Oct 4, 2023

there's a better/safer approach here; #40122

@rkooo567 rkooo567 closed this Oct 4, 2023
edoakes added a commit that referenced this pull request Oct 4, 2023
rkooo567 pushed a commit to rkooo567/ray that referenced this pull request Oct 4, 2023
GeneDer pushed a commit that referenced this pull request Oct 5, 2023
…#40083 (#40122) (#40126)

Signed-off-by: Edward Oakes <[email protected]>
Co-authored-by: Edward Oakes <[email protected]>
Co-authored-by: SangBin Cho <[email protected]>
Zandew pushed a commit to Zandew/ray that referenced this pull request Oct 10, 2023
vymao pushed a commit to vymao/ray that referenced this pull request Oct 11, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Serve][Streaming] Actor crash when requests get canceled.
4 participants