Skip to content

Commit

Permalink
asyncio: Improve support Python 3.8 on Windows
Browse files Browse the repository at this point in the history
This commit removes the need for applications to work around the
backwards-incompatible change to the default event loop. Instead,
Tornado will detect the use of the windows proactor event loop and
start a selector event loop in a separate thread.

Closes tornadoweb#2804
  • Loading branch information
bdarnell authored and jeyrce committed Aug 25, 2021
1 parent 08c800a commit 701704f
Show file tree
Hide file tree
Showing 3 changed files with 214 additions and 24 deletions.
222 changes: 206 additions & 16 deletions tornado/platform/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@
Windows. Use the `~asyncio.SelectorEventLoop` instead.
"""

import asyncio
import atexit
import concurrent.futures
import functools
import itertools
import sys

from threading import get_ident
import threading
import typing
from tornado.gen import convert_yielded
from tornado.ioloop import IOLoop, _Selectable

import asyncio

import typing
from typing import Any, TypeVar, Awaitable, Callable, Union, Optional

if typing.TYPE_CHECKING:
Expand All @@ -38,11 +38,44 @@
_T = TypeVar("_T")


class _HasFileno(typing.Protocol):
def fileno(self) -> int:
pass


_FileDescriptorLike = Union[int, _HasFileno]


_seq_gen = itertools.count()

_atexit_run = False


def _atexit_callback() -> None:
global _atexit_run
_atexit_run = True


atexit.register(_atexit_callback)


class BaseAsyncIOLoop(IOLoop):
def initialize( # type: ignore
self, asyncio_loop: asyncio.AbstractEventLoop, **kwargs: Any
) -> None:
# asyncio_loop is always the real underlying IOLoop. This is used in
# ioloop.py to maintain the asyncio-to-ioloop mappings.
self.asyncio_loop = asyncio_loop
# selector_loop is an event loop that implements the add_reader family of
# methods. Usually the same as asyncio_loop but differs on platforms such
# as windows where the default event loop does not implement these methods.
self.selector_loop = asyncio_loop
if hasattr(asyncio, "ProactorEventLoop") and isinstance(
asyncio_loop, asyncio.ProactorEventLoop
):
# Ignore this line for mypy because the abstract method checker
# doesn't understand dynamic proxies.
self.selector_loop = AddThreadSelectorEventLoop(asyncio_loop) # type: ignore
# Maps fd to (fileobj, handler function) pair (as in IOLoop.add_handler)
self.handlers = {} # type: Dict[int, Tuple[Union[int, _Selectable], Callable]]
# Set of fds listening for reads/writes
Expand Down Expand Up @@ -70,7 +103,7 @@ def initialize( # type: ignore
super(BaseAsyncIOLoop, self).initialize(**kwargs)

def assign_thread_identity() -> None:
self._thread_identity = get_ident()
self._thread_identity = threading.get_ident()

self.add_callback(assign_thread_identity)

Expand All @@ -87,6 +120,8 @@ def close(self, all_fds: bool = False) -> None:
# assume it was closed from the asyncio side, and do this
# cleanup for us, leading to a KeyError.
del IOLoop._ioloop_for_asyncio[self.asyncio_loop]
if self.selector_loop is not self.asyncio_loop:
self.selector_loop.close()
self.asyncio_loop.close()

def add_handler(
Expand All @@ -97,40 +132,40 @@ def add_handler(
raise ValueError("fd %s added twice" % fd)
self.handlers[fd] = (fileobj, handler)
if events & IOLoop.READ:
self.asyncio_loop.add_reader(fd, self._handle_events, fd, IOLoop.READ)
self.selector_loop.add_reader(fd, self._handle_events, fd, IOLoop.READ)
self.readers.add(fd)
if events & IOLoop.WRITE:
self.asyncio_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE)
self.selector_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE)
self.writers.add(fd)

def update_handler(self, fd: Union[int, _Selectable], events: int) -> None:
fd, fileobj = self.split_fd(fd)
if events & IOLoop.READ:
if fd not in self.readers:
self.asyncio_loop.add_reader(fd, self._handle_events, fd, IOLoop.READ)
self.selector_loop.add_reader(fd, self._handle_events, fd, IOLoop.READ)
self.readers.add(fd)
else:
if fd in self.readers:
self.asyncio_loop.remove_reader(fd)
self.selector_loop.remove_reader(fd)
self.readers.remove(fd)
if events & IOLoop.WRITE:
if fd not in self.writers:
self.asyncio_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE)
self.selector_loop.add_writer(fd, self._handle_events, fd, IOLoop.WRITE)
self.writers.add(fd)
else:
if fd in self.writers:
self.asyncio_loop.remove_writer(fd)
self.selector_loop.remove_writer(fd)
self.writers.remove(fd)

def remove_handler(self, fd: Union[int, _Selectable]) -> None:
fd, fileobj = self.split_fd(fd)
if fd not in self.handlers:
return
if fd in self.readers:
self.asyncio_loop.remove_reader(fd)
self.selector_loop.remove_reader(fd)
self.readers.remove(fd)
if fd in self.writers:
self.asyncio_loop.remove_writer(fd)
self.selector_loop.remove_writer(fd)
self.writers.remove(fd)
del self.handlers[fd]

Expand Down Expand Up @@ -169,7 +204,7 @@ def remove_timeout(self, timeout: object) -> None:
timeout.cancel() # type: ignore

def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None:
if get_ident() == self._thread_identity:
if threading.get_ident() == self._thread_identity:
call_soon = self.asyncio_loop.call_soon
else:
call_soon = self.asyncio_loop.call_soon_threadsafe
Expand All @@ -182,6 +217,11 @@ def add_callback(self, callback: Callable, *args: Any, **kwargs: Any) -> None:
# add_callback that completes without error will
# eventually execute).
pass
except AttributeError:
# ProactorEventLoop may raise this instead of RuntimeError
# if call_soon_threadsafe races with a call to close().
# Swallow it too for consistency.
pass

def add_callback_from_signal(
self, callback: Callable, *args: Any, **kwargs: Any
Expand All @@ -197,7 +237,7 @@ def run_in_executor(
self,
executor: Optional[concurrent.futures.Executor],
func: Callable[..., _T],
*args: Any
*args: Any,
) -> Awaitable[_T]:
return self.asyncio_loop.run_in_executor(executor, func, *args)

Expand Down Expand Up @@ -344,3 +384,153 @@ def get_event_loop(self) -> asyncio.AbstractEventLoop:
loop = self.new_event_loop()
self.set_event_loop(loop)
return loop


class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
"""Wrap an event loop to add implementations of the ``add_reader`` method family.
Instances of this class start a second thread to run a selector-based event loop.
This thread is completely hidden from the user; all callbacks are run on the
wrapped event loop's thread.
This class is used automatically by Tornado; applications should not need
to refer to it directly.
It is safe to wrap any event loop with this class, although it only makes sense
for event loops that do not implement the ``add_reader`` family of methods
themselves (i.e. ``WindowsProactorEventLoop``)
Closing the ``AddThreadSelectorEventLoop`` does not close the wrapped event loop.
"""

# This class is a __getattribute__-based proxy. All attributes other than those
# in this set are proxied through to the underlying loop.
MY_ATTRIBUTES = {
"add_reader",
"add_writer",
"remove_reader",
"remove_writer",
"close",
"_real_loop",
"_selector_loop",
"_selector_thread",
"_run_on_selector",
"_handle_event_from_selector",
"_reader_seq",
"_writer_seq",
}

def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
self._real_loop = real_loop

# Sequence numbers allow us to detect races between the selector thread
# and the main thread, such as when a handler for a file descriptor has
# been removed and re-added. These maps go from file descriptor to a
# sequence number.
self._reader_seq = {} # type: Dict[_FileDescriptorLike, int]
self._writer_seq = {} # type: Dict[_FileDescriptorLike, int]

fut = (
concurrent.futures.Future()
) # type: concurrent.futures.Future[asyncio.AbstractEventLoop]

def f() -> None:
loop = asyncio.SelectorEventLoop()
fut.set_result(loop)
loop.run_forever()
loop.close()

self._selector_thread = threading.Thread(target=f)
# Must be a daemon in case this event loop is not explicitly closed
# (often the case for the main loop).
self._selector_thread.daemon = True
self._selector_thread.start()
self._selector_loop = fut.result()

def close(self) -> None:
self._selector_loop.call_soon_threadsafe(self._selector_loop.stop)
if not _atexit_run:
# Shutdown is tricky: Our thread must be set as a daemon so that it
# doesn't prevent shutdown in the common case of an unclosed main
# loop. But daemon threads are halted relatively early in the
# interpreter shutdown process; once this happens attempts to join
# them will block forever.
#
# I can't find formal documentation of this, but as of cpython 3.8
# the shutdown order is
# 1. atexit functions
# 2. daemon threads halt
# 3. global destructors run
#
# If we're running after atexit functions, we're probably in a
# global destructor. But in any case, we know that the process is
# about to exit and it's no longer necessary to join our daemon
# thread. (Is it ever necessary to join it? Probably not but it
# feels dirty not to)
self._selector_thread.join()
self._real_loop.close()

def __getattribute__(self, name: str) -> Any:
if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES:
return super().__getattribute__(name)
return getattr(self._real_loop, name)

def _run_on_selector(self, method: Callable[..., _T], *args: Any) -> _T:
"""Synchronously run the given method on the selector thread.
"""
fut = concurrent.futures.Future() # type: concurrent.futures.Future[_T]

def wrapper() -> None:
try:
result = method(*args)
except Exception as e:
fut.set_exception(e)
else:
fut.set_result(result)

self._selector_loop.call_soon_threadsafe(wrapper)
return fut.result()

def add_reader(
self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
) -> None:
seq = next(_seq_gen)
self._reader_seq[fd] = seq

def wrapper() -> None:
if self._reader_seq.get(fd, None) != seq:
return
callback(*args)

return self._run_on_selector(
self._selector_loop.add_reader,
fd,
self._real_loop.call_soon_threadsafe,
wrapper,
)

def add_writer(
self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
) -> None:
seq = next(_seq_gen)
self._writer_seq[fd] = seq

def wrapper() -> None:
if self._writer_seq.get(fd, None) != seq:
return
callback(*args)

return self._run_on_selector(
self._selector_loop.add_writer,
fd,
self._real_loop.call_soon_threadsafe,
wrapper,
)

def remove_reader(self, fd: _FileDescriptorLike) -> None:
del self._reader_seq[fd]
return self._run_on_selector(self._selector_loop.remove_reader, fd)

def remove_writer(self, fd: _FileDescriptorLike) -> None:
del self._writer_seq[fd]
return self._run_on_selector(self._selector_loop.remove_writer, fd)
8 changes: 0 additions & 8 deletions tornado/test/__init__.py

This file was deleted.

8 changes: 8 additions & 0 deletions tornado/test/twisted_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ def restore_signal_handlers(saved):
class CompatibilityTests(unittest.TestCase):
def setUp(self):
self.saved_signals = save_signal_handlers()
self.saved_policy = asyncio.get_event_loop_policy()
if hasattr(asyncio, "WindowsSelectorEventLoopPolicy"):
# Twisted requires a selector event loop, even if Tornado is
# doing its own tricks in AsyncIOLoop to support proactors.
# Setting an AddThreadSelectorEventLoop exposes various edge
# cases so just use a regular selector.
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # type: ignore
self.io_loop = IOLoop()
self.io_loop.make_current()
self.reactor = AsyncioSelectorReactor()
Expand All @@ -83,6 +90,7 @@ def tearDown(self):
self.reactor.disconnectAll()
self.io_loop.clear_current()
self.io_loop.close(all_fds=True)
asyncio.set_event_loop_policy(self.saved_policy)
restore_signal_handlers(self.saved_signals)

def start_twisted_server(self):
Expand Down

0 comments on commit 701704f

Please sign in to comment.