diff --git a/CHANGES.rst b/CHANGES.rst index 68305a3f7a4..11811001f1e 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -45,7 +45,8 @@ CHANGES - Ensure TestClient HTTP methods return a context manager #1318 -- +- Raise `ClientDisconnectedError` to `FlowControlStreamReader` read function + if `ClientSession` object is closed by client when reading data. #1323 - diff --git a/CONTRIBUTORS.txt b/CONTRIBUTORS.txt index ded366e6164..db3fdd66a0e 100644 --- a/CONTRIBUTORS.txt +++ b/CONTRIBUTORS.txt @@ -134,6 +134,7 @@ Vladimir Rutsky Vladimir Shulyak Vladimir Zakharov Willem de Groot +Wilson Ong W. Trevor King Yannick Koechlin Young-Ho Cha diff --git a/aiohttp/client_reqrep.py b/aiohttp/client_reqrep.py index c4ff16ed488..6f7dc5d62fd 100644 --- a/aiohttp/client_reqrep.py +++ b/aiohttp/client_reqrep.py @@ -659,6 +659,7 @@ def close(self): self._connection.close() self._connection = None self._cleanup_writer() + self._notify_content() @asyncio.coroutine def release(self): @@ -682,6 +683,7 @@ def release(self): self._reader.unset_parser() self._connection = None self._cleanup_writer() + self._notify_content() def raise_for_status(self): if 400 <= self.status: @@ -694,6 +696,12 @@ def _cleanup_writer(self): self._writer.cancel() self._writer = None + def _notify_content(self): + content = self.content + if content and content.exception() is None and not content.is_eof(): + content.set_exception( + aiohttp.ClientDisconnectedError('Connection closed')) + @asyncio.coroutine def wait_for_close(self): if self._writer is not None: diff --git a/docs/streams.rst b/docs/streams.rst index dd1c60445e9..b82f89fbc30 100644 --- a/docs/streams.rst +++ b/docs/streams.rst @@ -71,6 +71,9 @@ Reading Methods If the EOF was received and the internal buffer is empty, return an empty bytes object. + Raise an :exc:`aiohttp.ClientDisconnectedError` if :class:`ClientSession` + object is closed when reading data. + :return bytes: the given line Asynchronous Iteration Support diff --git a/tests/test_client_functional.py b/tests/test_client_functional.py index ee8ca55fb82..5061c5b761d 100644 --- a/tests/test_client_functional.py +++ b/tests/test_client_functional.py @@ -554,6 +554,95 @@ def handler(request): yield from resp.read() +@asyncio.coroutine +def test_readline_error_on_conn_close(loop, test_client): + + @asyncio.coroutine + def handler(request): + resp_ = web.StreamResponse() + yield from resp_.prepare(request) + + # make sure connection is closed by client. + with pytest.raises(aiohttp.ServerDisconnectedError): + for _ in range(10): + resp_.write(b'data\n') + yield from resp_.drain() + yield from asyncio.sleep(0.5, loop=loop) + return resp_ + + app = web.Application(loop=loop) + app.router.add_route('GET', '/', handler) + server = yield from test_client(app) + + with aiohttp.ClientSession(loop=loop) as session: + timer_started = False + url, headers = server.make_url('/'), {'Connection': 'Keep-alive'} + resp = yield from session.get(url, headers=headers) + with pytest.raises(aiohttp.ClientDisconnectedError): + while True: + data = yield from resp.content.readline() + data = data.strip() + if not data: + break + assert data == b'data' + if not timer_started: + def do_release(): + loop.create_task(resp.release()) + loop.call_later(1.0, do_release) + timer_started = True + + +@asyncio.coroutine +def test_no_error_on_conn_close_if_eof(loop, test_client): + + @asyncio.coroutine + def handler(request): + resp_ = web.StreamResponse() + yield from resp_.prepare(request) + resp_.write(b'data\n') + yield from resp_.drain() + yield from asyncio.sleep(0.5, loop=loop) + return resp_ + + app = web.Application(loop=loop) + app.router.add_route('GET', '/', handler) + server = yield from test_client(app) + + with aiohttp.ClientSession(loop=loop) as session: + url, headers = server.make_url('/'), {'Connection': 'Keep-alive'} + resp = yield from session.get(url, headers=headers) + while True: + data = yield from resp.content.readline() + data = data.strip() + if not data: + break + assert data == b'data' + yield from resp.release() + assert resp.content.exception() is None + + +@asyncio.coroutine +def test_error_not_overwrote_on_conn_close(loop, test_client): + + @asyncio.coroutine + def handler(request): + resp_ = web.StreamResponse() + yield from resp_.prepare(request) + return resp_ + + app = web.Application(loop=loop) + app.router.add_route('GET', '/', handler) + server = yield from test_client(app) + + with aiohttp.ClientSession(loop=loop) as session: + url, headers = server.make_url('/'), {'Connection': 'Keep-alive'} + resp = yield from session.get(url, headers=headers) + resp.content.set_exception(aiohttp.ClientRequestError()) + + yield from resp.release() + assert isinstance(resp.content.exception(), aiohttp.ClientRequestError) + + @asyncio.coroutine def test_HTTP_200_OK_METHOD(loop, test_client): @asyncio.coroutine