Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't use f_locals for foreign async generators #3112

Merged
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions newsfragments/3112.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Rework foreign async generator finalization to track async generator
ids rather than mutating ``ag_frame.f_locals``. This fixes an issue
with the previous implementation: locals' lifetimes will no longer be
extended by materialization in the ``ag_frame.f_locals`` dictionary that
the previous finalization dispatcher logic needed to access to do its work.
65 changes: 39 additions & 26 deletions src/trio/_core/_asyncgens.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,32 @@
_ASYNC_GEN_SET = set


@_core.disable_ki_protection
def _finalize_without_ki_protection(
agen_name: str,
agen: AsyncGeneratorType[object, NoReturn],
) -> None:
# Host has no finalizer. Reimplement the default
# Python behavior with no hooks installed: throw in
# GeneratorExit, step once, raise RuntimeError if
# it doesn't exit.
closer = agen.aclose()
try:
# If the next thing is a yield, this will raise RuntimeError
# which we allow to propagate
closer.send(None)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually what do we want to happen here if there's a KI? We want long running finalizers to be interruptible, but then the KeyboardInterrupt exception just gets lost so you need to hit Ctrl+C again to terminate trio

Copy link
Contributor

@A5rocks A5rocks Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where does the KI get lost?

If it's in this closer after we call .send(None), doesn't it get raised here and will propagate up? I assume propagation here is fine because we already have a RuntimeError being raised here.

(I'm basing my assumptions off how generators behave, I don't know anything about async generator cleanup...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The finalizer gets called by tp_finalize which is called by the GC which catches the exception

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see... could we catch the KeyboardInterrupt and basically tell trio that it got another Ctrl+C? It seems to be raised normally:

>>> trio.run(main)
Exception ignored in: <async_generator object awaits_after_yield at 0x000001EF4C0F1F00>
Traceback (most recent call last):
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_asyncgens.py", line 120, in finalizer
    closer.send(None)
  File "<stdin>", line 6, in awaits_after_yield
KeyboardInterrupt:

(I also got this great error message in the REPL while playing around with this...)

Funny error message I ran into in the REPL

Exception ignored in: <function Nursery.__del__ at 0x000001EF4BCC54E0>
Traceback (most recent call last):
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_run.py", line 1372, in __del__
    assert not self._children
           ^^^^^^^^^^^^^^^^^^
AssertionError:
Exception ignored in: <coroutine object Runner.init at 0x000001EF4C0239A0>
Traceback (most recent call last):
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_run.py", line 2030, in init
    async with open_nursery() as run_sync_soon_nursery:
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_run.py", line 1030, in __aexit__
    new_exc = await self._nursery._nested_child_finished(exc)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_run.py", line 1192, in _nested_child_finished
    self._add_exc(nested_child_exc)
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_run.py", line 1170, in _add_exc
    self.cancel_scope.cancel()
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_ki.py", line 183, in wrapper
    return fn(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_run.py", line 870, in cancel
    self._cancel_status.recalculate()
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_run.py", line 459, in recalculate
    task._attempt_delivery_of_any_pending_cancel()
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_run.py", line 1545, in _attempt_delivery_of_any_pending_cancel
    self._attempt_abort(raise_cancel)
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_run.py", line 1527, in _attempt_abort
    success = self._abort_func(raise_cancel)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_io_windows.py", line 740, in abort_fn
    self._refresh_afd(base_handle)
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_io_windows.py", line 653, in _refresh_afd
    _check(
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_io_windows.py", line 301, in _check
    raise_winerror()
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_windows_cffi.py", line 520, in raise_winerror
    raise OSError(0, msg, filename, winerror, filename2)
OSError: [WinError 6] The handle is invalid
Exception ignored in: <function Nursery.__del__ at 0x000001EF4BCC54E0>
Traceback (most recent call last):
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_run.py", line 1372, in __del__
    assert not self._children
           ^^^^^^^^^^^^^^^^^^
AssertionError:
Exception ignored in: <function Nursery.__del__ at 0x000001EF4BCC54E0>
Traceback (most recent call last):
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_run.py", line 1372, in __del__
    assert not self._children
           ^^^^^^^^^^^^^^^^^^
AssertionError:
Exception ignored in: <async_generator object awaits_after_yield at 0x000001EF4C0F0040>
Traceback (most recent call last):
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_asyncgens.py", line 88, in finalizer
    runner.entry_queue.run_sync_soon(
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_entry_queue.py", line 145, in run_sync_soon
    self.wakeup.wakeup_thread_and_signal_safe()
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_wakeup_socketpair.py", line 39, in wakeup_thread_and_signal_safe
    self.write_sock.send(b"\x00")
OSError: [WinError 10038] An operation was attempted on something that is not a socket
Exception ignored in: <coroutine object NurseryManager.__aexit__ at 0x000001EF4C01B120>
Traceback (most recent call last):
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_run.py", line 1030, in __aexit__
    new_exc = await self._nursery._nested_child_finished(exc)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Users\A5rocks\Documents\trio\src\trio\_core\_run.py", line 1220, in _nested_child_finished
    assert popped is self
           ^^^^^^^^^^^^^^
AssertionError:

Not the most clear :^)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to be raised normally

But the message says the exception is ignored?

Copy link
Contributor

@A5rocks A5rocks Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could call signal.raise_signal(signal.SIGINT) in the except KeyboardInterrupt: and call it a day? Not a great solution IMO. (though actually that ... would probably raise in that except statement. ouch? nevermind)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. The except KeyboardInterrupt would have to be outside the _finalize_without_ki_protection

Copy link
Contributor

@A5rocks A5rocks Nov 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this awful hack?:

except KeyboardInterrupt:
  @enable_ki_protection
  def killer():
    signal.raise_signal(signal.SIGINT)
  try:
    killer()
  except KeyboardInterrupt:
    # turns out the trio KI handler isn't installed. Nothing good to do
    raise KeyboardInterrupt from None

edit: sorry, your comments weren't showing.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what I'm going to do here is put the KI protection on all the critical and guaranteed fast trio code and unprotect any of the host loop stuff. Then we can survey someone who uses guest mode and KI and async generators without aclose and find out what they want

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this awful hack?:

except KeyboardInterrupt:
  @enable_ki_protection
  def killer():
    signal.raise_signal(signal.SIGINT)
  try:
    killer()
  except KeyboardInterrupt:
    # turns out the trio KI handler isn't installed. Nothing good to do
    raise KeyboardInterrupt from None

Another KI could arrive while constructing or decorating or load_fast-ing the killer function

except StopIteration:
pass
else:
# If the next thing is an await, we get here. Give a nicer
# error than the default "async generator ignored GeneratorExit"
raise RuntimeError(
f"Non-Trio async generator {agen_name!r} awaited something "
"during finalization; install a finalization hook to "
"support this, or wrap it in 'async with aclosing(...):'",
)


@attrs.define(eq=False)
class AsyncGenerators:
# Async generators are added to this set when first iterated. Any
Expand All @@ -35,6 +61,11 @@ class AsyncGenerators:
# regular set so we don't have to deal with GC firing at
# unexpected times.
alive: _WEAK_ASYNC_GEN_SET | _ASYNC_GEN_SET = attrs.Factory(_WEAK_ASYNC_GEN_SET)
# The ids of foreign async generators are added to this set when first
# iterated. Usually it is not safe to refer to ids like this, but because
# we're using a finalizer we can ensure ids in this set do not outlive
# their async generator.
foreign: set[int] = attrs.Factory(set)

# This collects async generators that get garbage collected during
# the one-tick window between the system nursery closing and the
Expand All @@ -51,10 +82,7 @@ def firstiter(agen: AsyncGeneratorType[object, NoReturn]) -> None:
# An async generator first iterated outside of a Trio
# task doesn't belong to Trio. Probably we're in guest
# mode and the async generator belongs to our host.
# The locals dictionary is the only good place to
# remember this fact, at least until
# https://bugs.python.org/issue40916 is implemented.
agen.ag_frame.f_locals["@trio_foreign_asyncgen"] = True
self.foreign.add(id(agen))
if self.prev_hooks.firstiter is not None:
self.prev_hooks.firstiter(agen)

Expand All @@ -76,13 +104,16 @@ def finalize_in_trio_context(
# have hit it.
self.trailing_needs_finalize.add(agen)

@_core.enable_ki_protection
def finalizer(agen: AsyncGeneratorType[object, NoReturn]) -> None:
graingert marked this conversation as resolved.
Show resolved Hide resolved
agen_name = name_asyncgen(agen)
try:
is_ours = not agen.ag_frame.f_locals.get("@trio_foreign_asyncgen")
except AttributeError: # pragma: no cover
self.foreign.remove(id(agen))
except KeyError:
is_ours = True
else:
is_ours = False

agen_name = name_asyncgen(agen)
graingert marked this conversation as resolved.
Show resolved Hide resolved
if is_ours:
runner.entry_queue.run_sync_soon(
finalize_in_trio_context,
Expand All @@ -108,25 +139,7 @@ def finalizer(agen: AsyncGeneratorType[object, NoReturn]) -> None:
if self.prev_hooks.finalizer is not None:
self.prev_hooks.finalizer(agen)
else:
# Host has no finalizer. Reimplement the default
# Python behavior with no hooks installed: throw in
# GeneratorExit, step once, raise RuntimeError if
# it doesn't exit.
closer = agen.aclose()
try:
# If the next thing is a yield, this will raise RuntimeError
# which we allow to propagate
closer.send(None)
except StopIteration:
pass
else:
# If the next thing is an await, we get here. Give a nicer
# error than the default "async generator ignored GeneratorExit"
raise RuntimeError(
f"Non-Trio async generator {agen_name!r} awaited something "
"during finalization; install a finalization hook to "
"support this, or wrap it in 'async with aclosing(...):'",
)
_finalize_without_ki_protection(agen_name, agen)

self.prev_hooks = sys.get_asyncgen_hooks()
sys.set_asyncgen_hooks(firstiter=firstiter, finalizer=finalizer) # type: ignore[arg-type] # Finalizer doesn't use AsyncGeneratorType
Expand Down
64 changes: 53 additions & 11 deletions src/trio/_core/_tests/test_guest_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import asyncio
import contextlib
import contextvars
import queue
import signal
import socket
Expand All @@ -11,6 +10,7 @@
import time
import traceback
import warnings
import weakref
from collections.abc import AsyncGenerator, Awaitable, Callable
from functools import partial
from math import inf
Expand All @@ -22,6 +22,7 @@
)

import pytest
import sniffio
from outcome import Outcome

import trio
Expand Down Expand Up @@ -221,7 +222,8 @@ async def trio_main(in_host: InHost) -> str:


def test_guest_mode_sniffio_integration() -> None:
from sniffio import current_async_library, thread_local as sniffio_library
current_async_library = sniffio.current_async_library
sniffio_library = sniffio.thread_local

async def trio_main(in_host: InHost) -> str:
async def synchronize() -> None:
Expand Down Expand Up @@ -439,9 +441,9 @@ def aiotrio_run(
loop = asyncio.new_event_loop()

async def aio_main() -> T:
trio_done_fut = loop.create_future()
trio_done_fut: asyncio.Future[Outcome[T]] = loop.create_future()

def trio_done_callback(main_outcome: Outcome[object]) -> None:
def trio_done_callback(main_outcome: Outcome[T]) -> None:
print(f"trio_fn finished: {main_outcome!r}")
trio_done_fut.set_result(main_outcome)

Expand All @@ -455,9 +457,11 @@ def trio_done_callback(main_outcome: Outcome[object]) -> None:
**start_guest_run_kwargs,
)

return (await trio_done_fut).unwrap() # type: ignore[no-any-return]
return (await trio_done_fut).unwrap()

try:
# can't use asyncio.run because that fails on Windows (3.8, x64, with
# Komodia LSP) and segfaults on Windows (3.9, x64, with Komodia LSP)
return loop.run_until_complete(aio_main())
finally:
loop.close()
Expand Down Expand Up @@ -628,8 +632,6 @@ async def trio_main(in_host: InHost) -> None:

@restore_unraisablehook()
def test_guest_mode_asyncgens() -> None:
import sniffio

record = set()

async def agen(label: str) -> AsyncGenerator[int, None]:
Expand All @@ -656,9 +658,49 @@ async def trio_main() -> None:

gc_collect_harder()

# Ensure we don't pollute the thread-level context if run under
# an asyncio without contextvars support (3.6)
context = contextvars.copy_context()
context.run(aiotrio_run, trio_main, host_uses_signal_set_wakeup_fd=True)
aiotrio_run(trio_main, host_uses_signal_set_wakeup_fd=True)

assert record == {("asyncio", "asyncio"), ("trio", "trio")}


@restore_unraisablehook()
def test_guest_mode_asyncgens_garbage_collection() -> None:
A5rocks marked this conversation as resolved.
Show resolved Hide resolved
record: set[tuple[str, str, bool]] = set()

async def agen(label: str) -> AsyncGenerator[int, None]:
class A:
pass

a = A()
a_wr = weakref.ref(a)
assert sniffio.current_async_library() == label
try:
yield 1
finally:
library = sniffio.current_async_library()
with contextlib.suppress(trio.Cancelled):
await sys.modules[library].sleep(0)

del a
if sys.implementation.name == "pypy":
gc_collect_harder()
A5rocks marked this conversation as resolved.
Show resolved Hide resolved

record.add((label, library, a_wr() is None))

async def iterate_in_aio() -> None:
await agen("asyncio").asend(None)

async def trio_main() -> None:
task = asyncio.ensure_future(iterate_in_aio())
done_evt = trio.Event()
task.add_done_callback(lambda _: done_evt.set())
with trio.fail_after(1):
await done_evt.wait()

await agen("trio").asend(None)

gc_collect_harder()

aiotrio_run(trio_main, host_uses_signal_set_wakeup_fd=True)

assert record == {("asyncio", "asyncio", True), ("trio", "trio", True)}
Loading