Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Workflow hangs when tasks or sub-flows using ProcessPool with start method fork #9229

Open
4 tasks done
tyong920 opened this issue Apr 16, 2023 · 7 comments
Open
4 tasks done
Labels
bug Something isn't working needs:research Blocked by investigation into feasibility and cause

Comments

@tyong920
Copy link

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn't find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

Fork is the default on Linux (it isn’t available on Windows), while Windows and MacOS use spawn by default.

When calling a user defined task or sub-flow from entry point flow, if it invokes a Process Pool, Prefect might hangs forever.

Reproduction

"""
Verion 0: hangs on Linux but works on MacOS.
"""
from concurrent import futures

from prefect import flow, task


def say_hi(name: str):
    print(f"Hi {name}")


# @flow # Same issue here
@task
def func():
    with futures.ProcessPoolExecutor(2) as executor:
        executor.map(say_hi, ("Prefect", "Orion"))


@flow
def this_works():
    func.fn() # Call function directly


@flow
def this_hangs():
    func() # Call task or sub-flow


if __name__ == "__main__":
    this_hangs() # ... ON Linux, e.g., Ubuntu
    # this_works() # ... ON Mac, e.g., MacOS Ventura


"""
Version 1: works on both Linux and MacOS. Set start method to spawn
"""
from concurrent import futures
from multiprocessing import get_context

from prefect import flow, task


def say_hi(name: str):
    print(f"Hi {name}")


# @flow # Same issue here
@task
def func():
    context = get_context("spawn")
    with futures.ProcessPoolExecutor(2, mp_context=context) as executor:
        executor.map(say_hi, ("Prefect", "Orion"))


@flow
def this_works():
    func.fn() # Call function directly


@flow
def this_hangs():
    func() # Call task or sub-flow


if __name__ == "__main__":
    this_hangs() # ... [WORKS Now] ON Linux, e.g., Ubuntu
    # this_works() # ... ON Mac, e.g., MacOS Ventura

Error

12:03:13.761 | INFO    | prefect.engine - Created flow run 'shapeless-yak' for flow 'this-hangs'
12:03:13.821 | INFO    | Flow run 'shapeless-yak' - Created task run 'func-0' for task 'func'
12:03:13.822 | INFO    | Flow run 'shapeless-yak' - Executing 'func-0' immediately...
Hi Prefect
Hi Orion
^C12:03:21.965 | ERROR   | Task run 'func-0' - Crash detected! Execution was cancelled by the runtime environment.
12:03:21.979 | ERROR   | Flow run 'shapeless-yak' - Crash detected! Execution was aborted by an interrupt signal.
12:03:21.992 | ERROR   | prefect._internal.concurrency.services - Service 'APILogWorker' failed.
Traceback (most recent call last):
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/services.py", line 114, in _run
    await self._main_loop()
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/services.py", line 254, in _main_loop
    item = await self._queue.get()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/asyncio/queues.py", line 158, in get
    await getter
asyncio.exceptions.CancelledError
Traceback (most recent call last):
  File "/private/tmp/prefect-issue/flow_processpool.py", line 30, in <module>
    this_hangs() # ... ON Linux, e.g., Ubuntu
    ^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/flows.py", line 468, in __call__
    return enter_flow_run_engine_from_flow_call(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/engine.py", line 184, in enter_flow_run_engine_from_flow_call
    retval = from_sync.wait_for_call_in_loop_thread(
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 137, in wait_for_call_in_loop_thread
    return call.result()
           ^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 173, in result
    return self.future.result(timeout=timeout)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 449, in result
    return self.__get_result()
           ^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/concurrent/futures/_base.py", line 401, in __get_result
    raise self._exception
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 218, in _run_async
    result = await coro
             ^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/client/utilities.py", line 40, in with_injected_client
    return await fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/engine.py", line 251, in create_then_begin_flow_run
    state = await begin_flow_run(
            ^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/engine.py", line 388, in begin_flow_run
    terminal_or_paused_state = await orchestrate_flow_run(
                               ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/engine.py", line 669, in orchestrate_flow_run
    result = await flow_call.aresult()
             ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 181, in aresult
    return await asyncio.wrap_future(self.future)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/calls.py", line 194, in _run_sync
    result = self.fn(*self.args, **self.kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/flow_processpool.py", line 26, in this_hangs
    func() # Call task or sub-flow
    ^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/tasks.py", line 485, in __call__
    return enter_task_run_engine(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/engine.py", line 972, in enter_task_run_engine
    return from_sync.wait_for_call_in_loop_thread(begin_run)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/api.py", line 136, in wait_for_call_in_loop_thread
    waiter.wait()
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 124, in wait
    self._handle_waiting_callbacks(ctx)
  File "/private/tmp/prefect-issue/.venv/lib/python3.11/site-packages/prefect/_internal/concurrency/waiters.py", line 88, in _handle_waiting_callbacks
    callback: Call = self._queue.get()
                     ^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/[email protected]/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/queue.py", line 171, in get
    self.not_empty.wait()
  File "/opt/homebrew/Cellar/[email protected]/3.11.3/Frameworks/Python.framework/Versions/3.11/lib/python3.11/threading.py", line 320, in wait
    waiter.acquire()
KeyboardInterrupt

Versions

Version:             2.10.4
API version:         0.8.4
Python version:      3.10.9
Git commit:          b6d0433a
Built:               Thu, Apr 13, 2023 5:34 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.41.2

Additional context

No response

@tyong920 tyong920 added bug Something isn't working status:triage labels Apr 16, 2023
@tyong920 tyong920 changed the title Workflow hangs when tasks or sub-flows use ProcessPool with start method fork Workflow hangs when tasks or sub-flows using ProcessPool with start method fork Apr 16, 2023
@github-actions
Copy link
Contributor

This issue is stale because it has been open 30 days with no activity. To keep this issue open remove stale label or comment.

@zanieb zanieb added status:accepted needs:research Blocked by investigation into feasibility and cause and removed status:stale labels May 16, 2023
@rmorshea
Copy link
Contributor

rmorshea commented Jun 28, 2023

I have experienced this as well.

sync_compatible seems to be at fault in my case. I can be reasonable certain of this because forcing the offending sync_compatible decorated function to run async via asyncio.run works around this issue:

from prefect.something import offending_function

def call_offending_function_sync():
    async def wrapper():
        await offending_function()
    asyncio.run(wrapper())

It's rather surprising to me that the sync compatibility decorator doesn't take a similar approach with asyncio.run. Perhaps there should be some heuristic that attempts to do so before submitting a task to the global thread loop.

@rmorshea
Copy link
Contributor

rmorshea commented Jul 6, 2023

@zanieb here's the shortest example I've been able to come up with:

import multiprocessing as mp
from prefect_aws import S3Bucket

S3Bucket.load("bucket-name")

if __name__ == "__main__":
    proc = mp.get_context("fork").Process(target=lambda: None, daemon=True)
    proc.start()
    proc.join()

@rmorshea
Copy link
Contributor

rmorshea commented Jul 6, 2023

@zanieb if you comment out this line it does not hang:

emit_instance_method_called_event(block, "load", successful=True)

@rmorshea
Copy link
Contributor

rmorshea commented Jul 6, 2023

My earlier statement about this being related to sync_compatible seem to be wrong. This still hangs:

import asyncio
import multiprocessing as mp
from prefect_aws import S3Bucket


async def func():
    await S3Bucket.load("global-storage")


asyncio.run(func())


if __name__ == "__main__":
    proc = mp.get_context("fork").Process(target=lambda: None, daemon=True)
    proc.start()
    proc.join()

@rmorshea
Copy link
Contributor

rmorshea commented Jul 6, 2023

Going one layer deeper, removing usage of this lock also prevents the hang:

Seems related to the problem in this article - the lock is likely being copied in an acquired state. The subprocess thus, can never acquire the lock.

@rmorshea
Copy link
Contributor

rmorshea commented Jul 6, 2023

The solution seem to just not use forked subprocesses. According to the article:

  • Starting in Python 3.12, you will get a DeprecationWarning indicating that “fork” will stop being the default in 3.14.
  • In Python 3.14, the default will be changed to either “spawn” or “forkserver” (a mostly safer alternative to “fork”).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working needs:research Blocked by investigation into feasibility and cause
Projects
None yet
Development

No branches or pull requests

4 participants