Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

call exit callback even if AsyncProcess is reaped elsewhere #6684

Conversation

graingert
Copy link
Member

@graingert graingert commented Jul 7, 2022

Might help with #6683 ?

  • Tests added / passed
  • Passes pre-commit run --all-files

@github-actions
Copy link
Contributor

github-actions bot commented Jul 7, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±    0         15 suites  ±0   6h 53m 32s ⏱️ + 21m 33s
  2 989 tests +  58    2 894 ✔️ +  49       89 💤 +  4  6 +5 
22 165 runs  +454  21 113 ✔️ +394  1 046 💤 +55  6 +5 

For more details on these failures, see this check.

Results for commit 9dbf154. ± Comparison against base commit ade4266.

♻️ This comment has been updated with latest results.

@graingert graingert force-pushed the call-exit-callback-if-process-reaped-elsewhere branch from 91114f7 to 86e5a00 Compare July 7, 2022 14:57
Comment on lines +237 to +239
# The child process is already reaped
# (may happen if waitpid() is called elsewhere).
exitcode = 255
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inspired by:

        try:
            _, status = os.waitpid(pid, 0)
        except ChildProcessError:
            # The child process is already reaped
            # (may happen if waitpid() is called elsewhere).
            returncode = 255
            logger.warning(
                "child process pid %d exit status already read: "
                " will report returncode 255",
                pid)

https://github.com/python/cpython/blob/c5819c1f6c67f04e08f058ac7f24dec86e0e4554/Lib/asyncio/unix_events.py#L944-L952


# FIXME: this breaks if changed to async def...
Copy link
Member Author

@graingert graingert Jul 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this breaks because async functions are not supported. set_exit_callback calls synchronous callbacks on the event loop thread, and this is intentional: https://github.com/dask/distributed/pull/6526/files

@graingert graingert marked this pull request as ready for review July 7, 2022 15:29
@graingert graingert requested a review from gjoseph92 July 7, 2022 15:29
Comment on lines 326 to 329
# this needs to be run in a process pool because reaping a process
# outside multiprocessing causes it to remain in
# multiprocessing.active_children() forever - which blocks cleanup
# see https://github.com/python/cpython/issues/94661
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@graingert
Copy link
Member Author

graingert commented Jul 8, 2022

making a test for this that's not flaky is a bit horrible, because the test call to os.waitpid races with the AsyncProcess._watch_process call to process.join(None) which also calls os.waitpid

def _run_and_close_tornado(async_fn, /, *args, **kwargs):
    tornado_loop = None

    async def inner_fn():
        nonlocal tornado_loop
        tornado_loop = IOLoop.current()
        return await async_fn(*args, **kwargs)

    try:
        return asyncio.run(inner_fn())
    finally:
        tornado_loop.close(all_fds=True)


def _write_byte_wait_closed(sock):
    with sock:
        sock.send(b"\x00")
        sock.recv(1)


async def _check_process_reaped_elsewhere():
    import time

    loop = asyncio.get_running_loop()
    a, b = socket.socketpair()
    event = asyncio.Event()
    lock = threading.Lock()
    lock.acquire()

    real_waitpid = os.waitpid

    def _fake_waitpid(*args, **kwargs):
        with lock:
            return real_waitpid(*args, **kwargs)


    os.waitpid = _fake_waitpid

    def wait_pid_in_thread(pid):
        t_start = time.monotonic()
        try:
            real_waitpid(pid, 0)
        finally:
            lock.release()
            print(f"wote pid {t_start=} {time.monotonic()=}")
            loop.call_soon_threadsafe(event.set)

    with a:
        with b:
            proc = AsyncProcess(
                target=_write_byte_wait_closed, args=(b,), loop=IOLoop.current()
            )
            await proc.start()

        t = threading.Thread(target=wait_pid_in_thread, args=(proc.pid, ))
        t.start()
        a.setblocking(False)
        assert await loop.sock_recv(a, 1) == b"\x00"
        print(f"sock recv {time.monotonic()=}")
        await loop.sock_sendall(a, b"\x00")
        print(f"sock sendall {time.monotonic()=}")
        await proc.join()
        print(f"await proc.join() {time.monotonic()=}")
        await event.wait()
        print(f"event wait {time.monotonic()=}")
        t.join()
        print(f"t join {time.monotonic()=}")
        return proc.exitcode

so I think it's better to leave it untested?

@graingert graingert force-pushed the call-exit-callback-if-process-reaped-elsewhere branch from f566a6c to 9dbf154 Compare July 8, 2022 09:52
Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that test patching os.waitpid is a bit mad. Since we won't be filtering the warning anymore, seems ok to leave untested.

@gjoseph92
Copy link
Collaborator

@fjetter
Copy link
Member

fjetter commented Aug 4, 2022

what's the status on this @gjoseph92 @graingert ?

@graingert
Copy link
Member Author

I think give this another CI run and see, then land it as is?

@graingert graingert closed this Aug 4, 2022
@graingert graingert reopened this Aug 4, 2022
@graingert
Copy link
Member Author

@gjoseph92 thanks for going through the CI failures for me here! I think this is ready to merge

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants