Skip to content

Commit

Permalink
Add a function to notify StreamReader when connection from ClientSess…
Browse files Browse the repository at this point in the history
…ion is closed. (#1323)

* - Added a function to notify StreamReader when connection is closed.

* Fix content is NoneType error.

* Added unit test.

* Modified CHANGES.rst

* moved unit test from 'test_py35/test_resp.py' to 'test_client_functional.py' and fix unhandled exception.

* fix unit test error.

* fix unit test error.

* fix unit test error.

* fix unit test error.

* fix unit test error.

* modified contributor list

* not raising error if eof is read and added unit test for it.

* add changes to documentation.

* modify changes sentence.

* modify changes sentence.

* modify changes sentence.
  • Loading branch information
springmaple authored and asvetlov committed Oct 21, 2016
1 parent 2013dc4 commit b97a400
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 1 deletion.
3 changes: 2 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ CHANGES

- Ensure TestClient HTTP methods return a context manager #1318

-
- Raise `ClientDisconnectedError` to `FlowControlStreamReader` read function
if `ClientSession` object is closed by client when reading data. #1323

-

Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ Vladimir Rutsky
Vladimir Shulyak
Vladimir Zakharov
Willem de Groot
Wilson Ong
W. Trevor King
Yannick Koechlin
Young-Ho Cha
Expand Down
8 changes: 8 additions & 0 deletions aiohttp/client_reqrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,7 @@ def close(self):
self._connection.close()
self._connection = None
self._cleanup_writer()
self._notify_content()

@asyncio.coroutine
def release(self):
Expand All @@ -682,6 +683,7 @@ def release(self):
self._reader.unset_parser()
self._connection = None
self._cleanup_writer()
self._notify_content()

def raise_for_status(self):
if 400 <= self.status:
Expand All @@ -694,6 +696,12 @@ def _cleanup_writer(self):
self._writer.cancel()
self._writer = None

def _notify_content(self):
content = self.content
if content and content.exception() is None and not content.is_eof():
content.set_exception(
aiohttp.ClientDisconnectedError('Connection closed'))

@asyncio.coroutine
def wait_for_close(self):
if self._writer is not None:
Expand Down
3 changes: 3 additions & 0 deletions docs/streams.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ Reading Methods
If the EOF was received and the internal buffer is empty, return an
empty bytes object.

Raise an :exc:`aiohttp.ClientDisconnectedError` if :class:`ClientSession`
object is closed when reading data.

:return bytes: the given line

Asynchronous Iteration Support
Expand Down
89 changes: 89 additions & 0 deletions tests/test_client_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,95 @@ def handler(request):
yield from resp.read()


@asyncio.coroutine
def test_readline_error_on_conn_close(loop, test_client):

@asyncio.coroutine
def handler(request):
resp_ = web.StreamResponse()
yield from resp_.prepare(request)

# make sure connection is closed by client.
with pytest.raises(aiohttp.ServerDisconnectedError):
for _ in range(10):
resp_.write(b'data\n')
yield from resp_.drain()
yield from asyncio.sleep(0.5, loop=loop)
return resp_

app = web.Application(loop=loop)
app.router.add_route('GET', '/', handler)
server = yield from test_client(app)

with aiohttp.ClientSession(loop=loop) as session:
timer_started = False
url, headers = server.make_url('/'), {'Connection': 'Keep-alive'}
resp = yield from session.get(url, headers=headers)
with pytest.raises(aiohttp.ClientDisconnectedError):
while True:
data = yield from resp.content.readline()
data = data.strip()
if not data:
break
assert data == b'data'
if not timer_started:
def do_release():
loop.create_task(resp.release())
loop.call_later(1.0, do_release)
timer_started = True


@asyncio.coroutine
def test_no_error_on_conn_close_if_eof(loop, test_client):

@asyncio.coroutine
def handler(request):
resp_ = web.StreamResponse()
yield from resp_.prepare(request)
resp_.write(b'data\n')
yield from resp_.drain()
yield from asyncio.sleep(0.5, loop=loop)
return resp_

app = web.Application(loop=loop)
app.router.add_route('GET', '/', handler)
server = yield from test_client(app)

with aiohttp.ClientSession(loop=loop) as session:
url, headers = server.make_url('/'), {'Connection': 'Keep-alive'}
resp = yield from session.get(url, headers=headers)
while True:
data = yield from resp.content.readline()
data = data.strip()
if not data:
break
assert data == b'data'
yield from resp.release()
assert resp.content.exception() is None


@asyncio.coroutine
def test_error_not_overwrote_on_conn_close(loop, test_client):

@asyncio.coroutine
def handler(request):
resp_ = web.StreamResponse()
yield from resp_.prepare(request)
return resp_

app = web.Application(loop=loop)
app.router.add_route('GET', '/', handler)
server = yield from test_client(app)

with aiohttp.ClientSession(loop=loop) as session:
url, headers = server.make_url('/'), {'Connection': 'Keep-alive'}
resp = yield from session.get(url, headers=headers)
resp.content.set_exception(aiohttp.ClientRequestError())

yield from resp.release()
assert isinstance(resp.content.exception(), aiohttp.ClientRequestError)


@asyncio.coroutine
def test_HTTP_200_OK_METHOD(loop, test_client):
@asyncio.coroutine
Expand Down

0 comments on commit b97a400

Please sign in to comment.