Skip to content

Commit

Permalink
Merge pull request tornadoweb#2545 from bdarnell/future-exception-cancel
Browse files Browse the repository at this point in the history
concurrent: Add future_set_exception_unless_cancelled
  • Loading branch information
bdarnell authored Dec 3, 2018
2 parents 9868443 + 566441e commit c350dc9
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 20 deletions.
45 changes: 35 additions & 10 deletions tornado/concurrent.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import sys
import types

from tornado.log import app_log

import typing
from typing import Any, Callable, Optional, Tuple, Union

Expand Down Expand Up @@ -185,6 +187,28 @@ def future_set_result_unless_cancelled(
future.set_result(value)


def future_set_exception_unless_cancelled(
future: Union["futures.Future[_T]", "Future[_T]"], exc: BaseException
) -> None:
"""Set the given ``exc`` as the `Future`'s exception.
If the Future is already canceled, logs the exception instead. If
this logging is not desired, the caller should explicitly check
the state of the Future and call Future.set_exception instead of
this wrapper.
Avoids asyncio.InvalidStateError when calling set_exception() on
a cancelled `asyncio.Future`.
.. versionadded:: 6.0
"""
if not future.cancelled():
future.set_exception(exc)
else:
app_log.error("Exception after Future was cancelled", exc_info=exc)


def future_set_exc_info(
future: Union["futures.Future[_T]", "Future[_T]"],
exc_info: Tuple[
Expand All @@ -193,19 +217,20 @@ def future_set_exc_info(
) -> None:
"""Set the given ``exc_info`` as the `Future`'s exception.
Understands both `asyncio.Future` and Tornado's extensions to
enable better tracebacks on Python 2.
Understands both `asyncio.Future` and the extensions in older
versions of Tornado to enable better tracebacks on Python 2.
.. versionadded:: 5.0
.. versionchanged:: 6.0
If the future is already cancelled, this function is a no-op.
(previously asyncio.InvalidStateError would be raised)
"""
if hasattr(future, "set_exc_info"):
# Tornado's Future
future.set_exc_info(exc_info) # type: ignore
else:
# asyncio.Future
if exc_info[1] is None:
raise Exception("future_set_exc_info called with no exception")
future.set_exception(exc_info[1])
if exc_info[1] is None:
raise Exception("future_set_exc_info called with no exception")
future_set_exception_unless_cancelled(future, exc_info[1])


@typing.overload
Expand Down
8 changes: 6 additions & 2 deletions tornado/httpclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@
import time
import weakref

from tornado.concurrent import Future, future_set_result_unless_cancelled
from tornado.concurrent import (
Future,
future_set_result_unless_cancelled,
future_set_exception_unless_cancelled,
)
from tornado.escape import utf8, native_str
from tornado import gen, httputil
from tornado.ioloop import IOLoop
Expand Down Expand Up @@ -295,7 +299,7 @@ def fetch(
def handle_response(response: "HTTPResponse") -> None:
if response.error:
if raise_error or not response._error_is_response_code:
future.set_exception(response.error)
future_set_exception_unless_cancelled(future, response.error)
return
future_set_result_unless_cancelled(future, response)

Expand Down
12 changes: 7 additions & 5 deletions tornado/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,15 +627,17 @@ def _signal_closed(self) -> None:
futures.append(self._connect_future)
self._connect_future = None
for future in futures:
future.set_exception(StreamClosedError(real_error=self.error))
if not future.done():
future.set_exception(StreamClosedError(real_error=self.error))
future.exception()
if self._ssl_connect_future is not None:
# _ssl_connect_future expects to see the real exception (typically
# an ssl.SSLError), not just StreamClosedError.
if self.error is not None:
self._ssl_connect_future.set_exception(self.error)
else:
self._ssl_connect_future.set_exception(StreamClosedError())
if not self._ssl_connect_future.done():
if self.error is not None:
self._ssl_connect_future.set_exception(self.error)
else:
self._ssl_connect_future.set_exception(StreamClosedError())
self._ssl_connect_future.exception()
self._ssl_connect_future = None
if self._close_callback is not None:
Expand Down
10 changes: 8 additions & 2 deletions tornado/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@

from binascii import hexlify

from tornado.concurrent import Future, future_set_result_unless_cancelled
from tornado.concurrent import (
Future,
future_set_result_unless_cancelled,
future_set_exception_unless_cancelled,
)
from tornado import ioloop
from tornado.iostream import PipeIOStream
from tornado.log import gen_log
Expand Down Expand Up @@ -296,7 +300,9 @@ def wait_for_exit(self, raise_error: bool = True) -> "Future[int]":
def callback(ret: int) -> None:
if ret != 0 and raise_error:
# Unfortunately we don't have the original args any more.
future.set_exception(CalledProcessError(ret, "unknown"))
future_set_exception_unless_cancelled(
future, CalledProcessError(ret, "unknown")
)
else:
future_set_result_unless_cancelled(future, ret)

Expand Down
16 changes: 15 additions & 1 deletion tornado/test/httpclient_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.iostream import IOStream
from tornado.log import gen_log
from tornado.log import gen_log, app_log
from tornado import netutil
from tornado.testing import AsyncHTTPTestCase, bind_unused_port, gen_test, ExpectLog
from tornado.test.util import skipOnTravis
Expand Down Expand Up @@ -572,6 +572,20 @@ def test_response_times(self):
for k, v in response.time_info.items():
self.assertTrue(0 <= v < 1.0, "time_info[%s] out of bounds: %s" % (k, v))

@gen_test
def test_error_after_cancel(self):
fut = self.http_client.fetch(self.get_url("/404"))
self.assertTrue(fut.cancel())
with ExpectLog(app_log, "Exception after Future was cancelled") as el:
# We can't wait on the cancelled Future any more, so just
# let the IOLoop run until the exception gets logged (or
# not, in which case we exit the loop and ExpectLog will
# raise).
for i in range(100):
yield gen.sleep(0.01)
if el.logged_stack:
break


class RequestProxyTest(unittest.TestCase):
def test_request_set(self):
Expand Down

0 comments on commit c350dc9

Please sign in to comment.