Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix timeout issue #593

Merged
merged 4 commits into from
Jun 4, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
Changes
-------
0.9.1a0 (2018-XX-XX)
^^^^^^^^^^^^^^^^^^^^
0.9.1 (2018-05-04)
^^^^^^^^^^^^^^^^^^
* fix timeout bug introduced in last release

0.9.0 (2018-06-01)
^^^^^^^^^^^^^^^^^^
Expand Down
2 changes: 1 addition & 1 deletion aiobotocore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .session import get_session, AioSession

__all__ = ['get_session', 'AioSession']
__version__ = '0.9.1a0'
__version__ = '0.9.1'
2 changes: 1 addition & 1 deletion aiobotocore/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ async def _request(self, method, url, headers, data, verify, stream):
url = URL(url, encoded=True)
resp = await self._aio_session.request(
method, url=url, headers=headers_, data=data, proxy=proxy,
verify_ssl=verify, timeout=None)
verify_ssl=verify)

# If we're not streaming, read the content so we can retry any timeout
# errors, see:
Expand Down
5 changes: 3 additions & 2 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
-e .
coverage==4.5.1
flake8==3.5.0

# we specify flask directly and don't use moto[server] as we want to fix the flask version
Flask==0.12.2

# until https://github.com/spulec/moto/pull/1611 is released
git+git://github.com/thehesiod/moto.git@fix-copy-source-query#egg=moto
# until release with https://github.com/spulec/moto/pull/1611 is available
git+git://github.com/spulec/moto.git@80929292584ee78affc07643d16fae6bb31b4014#egg=moto

pytest-cov==2.5.1
pytest==3.4.1
Expand Down
87 changes: 46 additions & 41 deletions tests/mock_server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import aiohttp
import aiohttp.web
from aiohttp.web import StreamResponse
import pytest
Expand All @@ -8,17 +9,15 @@
import subprocess as sp
import sys
import time
import threading
import socket
from unittest import mock
import multiprocessing


_proxy_bypass = {
"http": None,
"https": None,
}


host = "localhost"


Expand All @@ -30,33 +29,46 @@ def get_free_tcp_port():
return port


class AIOServer(threading.Thread):
# This runs in a subprocess for a variety of reasons
# 1) early versions of python 3.5 did not correctly set one thread per run loop
# 2) aiohttp uses get_event_loop instead of using the passed in run loop
# 3) aiohttp shutdown can be hairy
class AIOServer(multiprocessing.Process):
"""
This is a mock AWS service which will 5 seconds before returning
a response to test socket timeouts.
"""
def __init__(self):
super().__init__(target=self._run)
self._loop = None
self._port = get_free_tcp_port()
self.start()
self.endpoint_url = 'http://{}:{}'.format(host, self._port)
self._shutdown_evt = threading.Event()
self.daemon = True # die when parent dies

def _run(self):
self._loop = asyncio.new_event_loop()
app = aiohttp.web.Application(loop=self._loop)
asyncio.set_event_loop(asyncio.new_event_loop())
app = aiohttp.web.Application()
app.router.add_route('*', '/ok', self.ok)
app.router.add_route('*', '/{anything:.*}', self.stream_handler)

try:
# We need to mock `.get_event_loop` function and return
# `self._loop` explicitly because from `aiohttp>=3.0.0` we can't
# pass `loop` as a kwargs into `run_app`.
with mock.patch('asyncio.get_event_loop', return_value=self._loop):
aiohttp.web.run_app(app, host=host, port=self._port,
handle_signals=False)
aiohttp.web.run_app(app, host=host, port=self._port,
handle_signals=False)
except BaseException:
pytest.fail('unable to start and connect to aiohttp server')
raise
finally:
self._shutdown_evt.set()

async def __aenter__(self):
self.start()
await self._wait_until_up()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
try:
self.terminate()
except BaseException:
pytest.fail("Unable to shut down server")
raise

async def ok(self, request):
return aiohttp.web.Response()
Expand All @@ -73,31 +85,24 @@ async def stream_handler(self, request):
await resp.drain()
return resp

def wait_until_up(self):
connected = False
for i in range(0, 30):
try:
# we need to bypass the proxies due to monkey patches
requests.get(self.endpoint_url + '/ok', timeout=0.5,
proxies=_proxy_bypass)
connected = True
break
except (requests.exceptions.ConnectionError,
requests.exceptions.ReadTimeout):
time.sleep(0.5)
except BaseException:
pytest.fail('unable to start and connect to aiohttp server')
raise

if not connected:
pytest.fail('unable to start and connect to aiohttp server')

async def stop(self):
if self._loop:
self._loop.stop()

if not self._shutdown_evt.wait(20):
pytest.fail("Unable to shut down server")
async def _wait_until_up(self):
async with aiohttp.ClientSession() as session:
for i in range(0, 30):
if self.exitcode is not None:
pytest.fail('unable to start/connect to aiohttp server')
return

try:
# we need to bypass the proxies due to monkey patches
await session.get(self.endpoint_url + '/ok', timeout=0.5)
return
except (aiohttp.ClientConnectionError, asyncio.TimeoutError):
await asyncio.sleep(0.5)
except BaseException:
pytest.fail('unable to start/connect to aiohttp server')
raise

pytest.fail('unable to start and connect to aiohttp server')


def start_service(service_name, host, port):
Expand Down
64 changes: 41 additions & 23 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,34 +47,52 @@ def test_connector_args():
@pytest.mark.moto
@pytest.mark.asyncio
async def test_connector_timeout(event_loop):
server = AIOServer()
session = AioSession(loop=event_loop)
config = AioConfig(max_pool_connections=1, connect_timeout=1,
retries={'max_attempts': 0})
async with session.create_client('s3', config=config,
endpoint_url=server.endpoint_url,
aws_secret_access_key='xxx',
aws_access_key_id='xxx') as s3_client:
async with AIOServer() as server, \
session.create_client('s3', config=config,
endpoint_url=server.endpoint_url,
aws_secret_access_key='xxx',
aws_access_key_id='xxx') as s3_client:

try:
server.wait_until_up()

async def get_and_wait():
await s3_client.get_object(Bucket='foo', Key='bar')
await asyncio.sleep(100)
async def get_and_wait():
await s3_client.get_object(Bucket='foo', Key='bar')
await asyncio.sleep(100)

task1 = asyncio.Task(get_and_wait(), loop=event_loop)
task2 = asyncio.Task(get_and_wait(), loop=event_loop)
task1 = asyncio.Task(get_and_wait(), loop=event_loop)
task2 = asyncio.Task(get_and_wait(), loop=event_loop)

try:
done, pending = await asyncio.wait([task1, task2],
timeout=3, loop=event_loop)
try:
done, pending = await asyncio.wait([task1, task2],
timeout=3, loop=event_loop)

# second request should not timeout just because there isn't a
# connector available
assert len(pending) == 2
finally:
task1.cancel()
task2.cancel()
# second request should not timeout just because there isn't a
# connector available
assert len(pending) == 2
finally:
await server.stop()
task1.cancel()
task2.cancel()


# Enable this once https://github.com/aio-libs/aiohttp/issues/3053 is fixed
# @pytest.mark.moto
# @pytest.mark.asyncio
# async def test_connector_timeout2(event_loop):
# session = AioSession(loop=event_loop)
# config = AioConfig(max_pool_connections=1, connect_timeout=1,
# read_timeout=1, retries={'max_attempts': 0})
# async with AIOServer() as server, \
# session.create_client('s3', config=config,
# endpoint_url=server.endpoint_url,
# aws_secret_access_key='xxx',
# aws_access_key_id='xxx') as s3_client:
#
# with pytest.raises(TimeoutError):
# try:
# resp = await s3_client.get_object(Bucket='foo', Key='bar')
# print()
# await resp["Body"].read()
# print()
# except BaseException as e:
# raise