Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a function to notify StreamReader when connection from ClientSession is closed. #1323

Merged
merged 16 commits into from
Oct 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ CHANGES

- Ensure TestClient HTTP methods return a context manager #1318

-
- Raise `ClientDisconnectedError` to `FlowControlStreamReader` read function
if `ClientSession` object is closed by client when reading data. #1323

-

Expand Down
1 change: 1 addition & 0 deletions CONTRIBUTORS.txt
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ Vladimir Rutsky
Vladimir Shulyak
Vladimir Zakharov
Willem de Groot
Wilson Ong
W. Trevor King
Yannick Koechlin
Young-Ho Cha
Expand Down
8 changes: 8 additions & 0 deletions aiohttp/client_reqrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -659,6 +659,7 @@ def close(self):
self._connection.close()
self._connection = None
self._cleanup_writer()
self._notify_content()

@asyncio.coroutine
def release(self):
Expand All @@ -682,6 +683,7 @@ def release(self):
self._reader.unset_parser()
self._connection = None
self._cleanup_writer()
self._notify_content()

def raise_for_status(self):
if 400 <= self.status:
Expand All @@ -694,6 +696,12 @@ def _cleanup_writer(self):
self._writer.cancel()
self._writer = None

def _notify_content(self):
content = self.content
if content and content.exception() is None and not content.is_eof():
content.set_exception(
aiohttp.ClientDisconnectedError('Connection closed'))

@asyncio.coroutine
def wait_for_close(self):
if self._writer is not None:
Expand Down
3 changes: 3 additions & 0 deletions docs/streams.rst
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ Reading Methods
If the EOF was received and the internal buffer is empty, return an
empty bytes object.

Raise an :exc:`aiohttp.ClientDisconnectedError` if :class:`ClientSession`
object is closed when reading data.

:return bytes: the given line

Asynchronous Iteration Support
Expand Down
89 changes: 89 additions & 0 deletions tests/test_client_functional.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,95 @@ def handler(request):
yield from resp.read()


@asyncio.coroutine
def test_readline_error_on_conn_close(loop, test_client):

@asyncio.coroutine
def handler(request):
resp_ = web.StreamResponse()
yield from resp_.prepare(request)

# make sure connection is closed by client.
with pytest.raises(aiohttp.ServerDisconnectedError):
for _ in range(10):
resp_.write(b'data\n')
yield from resp_.drain()
yield from asyncio.sleep(0.5, loop=loop)
return resp_

app = web.Application(loop=loop)
app.router.add_route('GET', '/', handler)
server = yield from test_client(app)

with aiohttp.ClientSession(loop=loop) as session:
timer_started = False
url, headers = server.make_url('/'), {'Connection': 'Keep-alive'}
resp = yield from session.get(url, headers=headers)
with pytest.raises(aiohttp.ClientDisconnectedError):
while True:
data = yield from resp.content.readline()
data = data.strip()
if not data:
break
assert data == b'data'
if not timer_started:
def do_release():
loop.create_task(resp.release())
loop.call_later(1.0, do_release)
timer_started = True


@asyncio.coroutine
def test_no_error_on_conn_close_if_eof(loop, test_client):

@asyncio.coroutine
def handler(request):
resp_ = web.StreamResponse()
yield from resp_.prepare(request)
resp_.write(b'data\n')
yield from resp_.drain()
yield from asyncio.sleep(0.5, loop=loop)
return resp_

app = web.Application(loop=loop)
app.router.add_route('GET', '/', handler)
server = yield from test_client(app)

with aiohttp.ClientSession(loop=loop) as session:
url, headers = server.make_url('/'), {'Connection': 'Keep-alive'}
resp = yield from session.get(url, headers=headers)
while True:
data = yield from resp.content.readline()
data = data.strip()
if not data:
break
assert data == b'data'
yield from resp.release()
assert resp.content.exception() is None


@asyncio.coroutine
def test_error_not_overwrote_on_conn_close(loop, test_client):

@asyncio.coroutine
def handler(request):
resp_ = web.StreamResponse()
yield from resp_.prepare(request)
return resp_

app = web.Application(loop=loop)
app.router.add_route('GET', '/', handler)
server = yield from test_client(app)

with aiohttp.ClientSession(loop=loop) as session:
url, headers = server.make_url('/'), {'Connection': 'Keep-alive'}
resp = yield from session.get(url, headers=headers)
resp.content.set_exception(aiohttp.ClientRequestError())

yield from resp.release()
assert isinstance(resp.content.exception(), aiohttp.ClientRequestError)


@asyncio.coroutine
def test_HTTP_200_OK_METHOD(loop, test_client):
@asyncio.coroutine
Expand Down