diff --git a/tornado/platform/asyncio.py b/tornado/platform/asyncio.py index 9b44775fa2..1437581248 100644 --- a/tornado/platform/asyncio.py +++ b/tornado/platform/asyncio.py @@ -19,17 +19,17 @@ Windows. Use the `~asyncio.SelectorEventLoop` instead. """ +import asyncio +import atexit import concurrent.futures import functools +import itertools import sys - -from threading import get_ident +import threading +import typing from tornado.gen import convert_yielded from tornado.ioloop import IOLoop, _Selectable -import asyncio - -import typing from typing import Any, TypeVar, Awaitable, Callable, Union, Optional if typing.TYPE_CHECKING: @@ -38,11 +38,44 @@ _T = TypeVar("_T") +class _HasFileno(typing.Protocol): + def fileno(self) -> int: + pass + + +_FileDescriptorLike = Union[int, _HasFileno] + + +_seq_gen = itertools.count() + +_atexit_run = False + + +def _atexit_callback() -> None: + global _atexit_run + _atexit_run = True + + +atexit.register(_atexit_callback) + + class BaseAsyncIOLoop(IOLoop): def initialize( # type: ignore self, asyncio_loop: asyncio.AbstractEventLoop, **kwargs: Any ) -> None: + # asyncio_loop is always the real underlying IOLoop. This is used in + # ioloop.py to maintain the asyncio-to-ioloop mappings. self.asyncio_loop = asyncio_loop + # selector_loop is an event loop that implements the add_reader family of + # methods. Usually the same as asyncio_loop but differs on platforms such + # as windows where the default event loop does not implement these methods. + self.selector_loop = asyncio_loop + if hasattr(asyncio, "ProactorEventLoop") and isinstance( + asyncio_loop, asyncio.ProactorEventLoop + ): + # Ignore this line for mypy because the abstract method checker + # doesn't understand dynamic proxies. + self.selector_loop = AddThreadSelectorEventLoop(asyncio_loop) # type: ignore # Maps fd to (fileobj, handler function) pair (as in IOLoop.add_handler) self.handlers = {} # type: Dict[int, Tuple[Union[int, _Selectable], Callable]] # Set of fds listening for reads/writes @@ -70,7 +103,7 @@ def initialize( # type: ignore super(BaseAsyncIOLoop, self).initialize(**kwargs) def assign_thread_identity() -> None: - self._thread_identity = get_ident() + self._thread_identity = threading.get_ident() self.add_callback(assign_thread_identity) @@ -87,6 +120,8 @@ def close(self, all_fds: bool = False) -> None: # assume it was closed from the asyncio side, and do this # cleanup for us, leading to a KeyError. del IOLoop._ioloop_for_asyncio[self.asyncio_loop] + if self.selector_loop is not self.asyncio_loop: + self.selector_loop.close() self.asyncio_loop.close() def add_handler( @@ -97,29 +132,29 @@ def add_handler( raise ValueError("fd %s added twice" % fd) self.handlers[fd] = (fileobj, handler) if events & IOLoop.READ: - self.asyncio_loop.add_reader(fd, self._handle_events, fd, IOLoop.READ) + self.selector_loop.add_reader(fd, self._handle_events, fd, IOLoop.READ) self.readers.add(fd) if events & IOLoop.WRITE: - self.asyncio_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE) + self.selector_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE) self.writers.add(fd) def update_handler(self, fd: Union[int, _Selectable], events: int) -> None: fd, fileobj = self.split_fd(fd) if events & IOLoop.READ: if fd not in self.readers: - self.asyncio_loop.add_reader(fd, self._handle_events, fd, IOLoop.READ) + self.selector_loop.add_reader(fd, self._handle_events, fd, IOLoop.READ) self.readers.add(fd) else: if fd in self.readers: - self.asyncio_loop.remove_reader(fd) + self.selector_loop.remove_reader(fd) self.readers.remove(fd) if events & IOLoop.WRITE: if fd not in self.writers: - self.asyncio_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE) + self.selector_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE) self.writers.add(fd) else: if fd in self.writers: - self.asyncio_loop.remove_writer(fd) + self.selector_loop.remove_writer(fd) self.writers.remove(fd) def remove_handler(self, fd: Union[int, _Selectable]) -> None: @@ -127,10 +162,10 @@ def remove_handler(self, fd: Union[int, _Selectable]) -> None: if fd not in self.handlers: return if fd in self.readers: - self.asyncio_loop.remove_reader(fd) + self.selector_loop.remove_reader(fd) self.readers.remove(fd) if fd in self.writers: - self.asyncio_loop.remove_writer(fd) + self.selector_loop.remove_writer(fd) self.writers.remove(fd) del self.handlers[fd] @@ -169,7 +204,7 @@ def remove_timeout(self, timeout: object) -> None: timeout.cancel() # type: ignore def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None: - if get_ident() == self._thread_identity: + if threading.get_ident() == self._thread_identity: call_soon = self.asyncio_loop.call_soon else: call_soon = self.asyncio_loop.call_soon_threadsafe @@ -182,6 +217,11 @@ def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None: # add_callback that completes without error will # eventually execute). pass + except AttributeError: + # ProactorEventLoop may raise this instead of RuntimeError + # if call_soon_threadsafe races with a call to close(). + # Swallow it too for consistency. + pass def add_callback_from_signal( self, callback: Callable, *args: Any, **kwargs: Any @@ -197,7 +237,7 @@ def run_in_executor( self, executor: Optional[concurrent.futures.Executor], func: Callable[..., _T], - *args: Any + *args: Any, ) -> Awaitable[_T]: return self.asyncio_loop.run_in_executor(executor, func, *args) @@ -344,3 +384,153 @@ def get_event_loop(self) -> asyncio.AbstractEventLoop: loop = self.new_event_loop() self.set_event_loop(loop) return loop + + +class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop): + """Wrap an event loop to add implementations of the ``add_reader`` method family. + + Instances of this class start a second thread to run a selector-based event loop. + This thread is completely hidden from the user; all callbacks are run on the + wrapped event loop's thread. + + This class is used automatically by Tornado; applications should not need + to refer to it directly. + + It is safe to wrap any event loop with this class, although it only makes sense + for event loops that do not implement the ``add_reader`` family of methods + themselves (i.e. ``WindowsProactorEventLoop``) + + Closing the ``AddThreadSelectorEventLoop`` does not close the wrapped event loop. + """ + + # This class is a __getattribute__-based proxy. All attributes other than those + # in this set are proxied through to the underlying loop. + MY_ATTRIBUTES = { + "add_reader", + "add_writer", + "remove_reader", + "remove_writer", + "close", + "_real_loop", + "_selector_loop", + "_selector_thread", + "_run_on_selector", + "_handle_event_from_selector", + "_reader_seq", + "_writer_seq", + } + + def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None: + self._real_loop = real_loop + + # Sequence numbers allow us to detect races between the selector thread + # and the main thread, such as when a handler for a file descriptor has + # been removed and re-added. These maps go from file descriptor to a + # sequence number. + self._reader_seq = {} # type: Dict[_FileDescriptorLike, int] + self._writer_seq = {} # type: Dict[_FileDescriptorLike, int] + + fut = ( + concurrent.futures.Future() + ) # type: concurrent.futures.Future[asyncio.AbstractEventLoop] + + def f() -> None: + loop = asyncio.SelectorEventLoop() + fut.set_result(loop) + loop.run_forever() + loop.close() + + self._selector_thread = threading.Thread(target=f) + # Must be a daemon in case this event loop is not explicitly closed + # (often the case for the main loop). + self._selector_thread.daemon = True + self._selector_thread.start() + self._selector_loop = fut.result() + + def close(self) -> None: + self._selector_loop.call_soon_threadsafe(self._selector_loop.stop) + if not _atexit_run: + # Shutdown is tricky: Our thread must be set as a daemon so that it + # doesn't prevent shutdown in the common case of an unclosed main + # loop. But daemon threads are halted relatively early in the + # interpreter shutdown process; once this happens attempts to join + # them will block forever. + # + # I can't find formal documentation of this, but as of cpython 3.8 + # the shutdown order is + # 1. atexit functions + # 2. daemon threads halt + # 3. global destructors run + # + # If we're running after atexit functions, we're probably in a + # global destructor. But in any case, we know that the process is + # about to exit and it's no longer necessary to join our daemon + # thread. (Is it ever necessary to join it? Probably not but it + # feels dirty not to) + self._selector_thread.join() + self._real_loop.close() + + def __getattribute__(self, name: str) -> Any: + if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES: + return super().__getattribute__(name) + return getattr(self._real_loop, name) + + def _run_on_selector(self, method: Callable[..., _T], *args: Any) -> _T: + """Synchronously run the given method on the selector thread. + """ + fut = concurrent.futures.Future() # type: concurrent.futures.Future[_T] + + def wrapper() -> None: + try: + result = method(*args) + except Exception as e: + fut.set_exception(e) + else: + fut.set_result(result) + + self._selector_loop.call_soon_threadsafe(wrapper) + return fut.result() + + def add_reader( + self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any + ) -> None: + seq = next(_seq_gen) + self._reader_seq[fd] = seq + + def wrapper() -> None: + if self._reader_seq.get(fd, None) != seq: + return + callback(*args) + + return self._run_on_selector( + self._selector_loop.add_reader, + fd, + self._real_loop.call_soon_threadsafe, + wrapper, + ) + + def add_writer( + self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any + ) -> None: + seq = next(_seq_gen) + self._writer_seq[fd] = seq + + def wrapper() -> None: + if self._writer_seq.get(fd, None) != seq: + return + callback(*args) + + return self._run_on_selector( + self._selector_loop.add_writer, + fd, + self._real_loop.call_soon_threadsafe, + wrapper, + ) + + def remove_reader(self, fd: _FileDescriptorLike) -> None: + del self._reader_seq[fd] + return self._run_on_selector(self._selector_loop.remove_reader, fd) + + def remove_writer(self, fd: _FileDescriptorLike) -> None: + del self._writer_seq[fd] + return self._run_on_selector(self._selector_loop.remove_writer, fd) diff --git a/tornado/test/__init__.py b/tornado/test/__init__.py deleted file mode 100644 index bfc2656d9f..0000000000 --- a/tornado/test/__init__.py +++ /dev/null @@ -1,8 +0,0 @@ -import asyncio -import sys - -# Use the selector event loop on windows. Do this in tornado/test/__init__.py -# instead of runtests.py so it happens no matter how the test is run (such as -# through editor integrations). -if sys.platform == "win32" and hasattr(asyncio, "WindowsSelectorEventLoopPolicy"): - asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # type: ignore diff --git a/tornado/test/twisted_test.py b/tornado/test/twisted_test.py index 2fc3f8e30e..661953d73e 100644 --- a/tornado/test/twisted_test.py +++ b/tornado/test/twisted_test.py @@ -75,6 +75,13 @@ def restore_signal_handlers(saved): class CompatibilityTests(unittest.TestCase): def setUp(self): self.saved_signals = save_signal_handlers() + self.saved_policy = asyncio.get_event_loop_policy() + if hasattr(asyncio, "WindowsSelectorEventLoopPolicy"): + # Twisted requires a selector event loop, even if Tornado is + # doing its own tricks in AsyncIOLoop to support proactors. + # Setting an AddThreadSelectorEventLoop exposes various edge + # cases so just use a regular selector. + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # type: ignore self.io_loop = IOLoop() self.io_loop.make_current() self.reactor = AsyncioSelectorReactor() @@ -83,6 +90,7 @@ def tearDown(self): self.reactor.disconnectAll() self.io_loop.clear_current() self.io_loop.close(all_fds=True) + asyncio.set_event_loop_policy(self.saved_policy) restore_signal_handlers(self.saved_signals) def start_twisted_server(self):