diff --git a/CHANGES.txt b/CHANGES.txt index 2e5507be..c45f64d5 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,7 +1,8 @@ Changes ------- -0.9.1a0 (2018-XX-XX) -^^^^^^^^^^^^^^^^^^^^ +0.9.1 (2018-05-04) +^^^^^^^^^^^^^^^^^^ +* fix timeout bug introduced in last release 0.9.0 (2018-06-01) ^^^^^^^^^^^^^^^^^^ diff --git a/aiobotocore/__init__.py b/aiobotocore/__init__.py index 6c639343..9f954a68 100644 --- a/aiobotocore/__init__.py +++ b/aiobotocore/__init__.py @@ -1,4 +1,4 @@ from .session import get_session, AioSession __all__ = ['get_session', 'AioSession'] -__version__ = '0.9.1a0' +__version__ = '0.9.1' diff --git a/aiobotocore/endpoint.py b/aiobotocore/endpoint.py index 18ae89bd..4de81a71 100644 --- a/aiobotocore/endpoint.py +++ b/aiobotocore/endpoint.py @@ -245,7 +245,7 @@ async def _request(self, method, url, headers, data, verify, stream): url = URL(url, encoded=True) resp = await self._aio_session.request( method, url=url, headers=headers_, data=data, proxy=proxy, - verify_ssl=verify, timeout=None) + verify_ssl=verify) # If we're not streaming, read the content so we can retry any timeout # errors, see: diff --git a/requirements-dev.txt b/requirements-dev.txt index be58fe15..a371d238 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -1,11 +1,12 @@ -e . coverage==4.5.1 flake8==3.5.0 + # we specify flask directly and don't use moto[server] as we want to fix the flask version Flask==0.12.2 -# until https://github.com/spulec/moto/pull/1611 is released -git+git://github.com/thehesiod/moto.git@fix-copy-source-query#egg=moto +# until release with https://github.com/spulec/moto/pull/1611 is available +git+git://github.com/spulec/moto.git@80929292584ee78affc07643d16fae6bb31b4014#egg=moto pytest-cov==2.5.1 pytest==3.4.1 diff --git a/tests/mock_server.py b/tests/mock_server.py index b4565ff0..2da98734 100644 --- a/tests/mock_server.py +++ b/tests/mock_server.py @@ -1,4 +1,5 @@ import asyncio +import aiohttp import aiohttp.web from aiohttp.web import StreamResponse import pytest @@ -8,9 +9,8 @@ import subprocess as sp import sys import time -import threading import socket -from unittest import mock +import multiprocessing _proxy_bypass = { @@ -18,7 +18,6 @@ "https": None, } - host = "localhost" @@ -30,33 +29,46 @@ def get_free_tcp_port(): return port -class AIOServer(threading.Thread): +# This runs in a subprocess for a variety of reasons +# 1) early versions of python 3.5 did not correctly set one thread per run loop +# 2) aiohttp uses get_event_loop instead of using the passed in run loop +# 3) aiohttp shutdown can be hairy +class AIOServer(multiprocessing.Process): + """ + This is a mock AWS service which will 5 seconds before returning + a response to test socket timeouts. + """ def __init__(self): super().__init__(target=self._run) self._loop = None self._port = get_free_tcp_port() - self.start() self.endpoint_url = 'http://{}:{}'.format(host, self._port) - self._shutdown_evt = threading.Event() + self.daemon = True # die when parent dies def _run(self): - self._loop = asyncio.new_event_loop() - app = aiohttp.web.Application(loop=self._loop) + asyncio.set_event_loop(asyncio.new_event_loop()) + app = aiohttp.web.Application() app.router.add_route('*', '/ok', self.ok) app.router.add_route('*', '/{anything:.*}', self.stream_handler) try: - # We need to mock `.get_event_loop` function and return - # `self._loop` explicitly because from `aiohttp>=3.0.0` we can't - # pass `loop` as a kwargs into `run_app`. - with mock.patch('asyncio.get_event_loop', return_value=self._loop): - aiohttp.web.run_app(app, host=host, port=self._port, - handle_signals=False) + aiohttp.web.run_app(app, host=host, port=self._port, + handle_signals=False) except BaseException: pytest.fail('unable to start and connect to aiohttp server') raise - finally: - self._shutdown_evt.set() + + async def __aenter__(self): + self.start() + await self._wait_until_up() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + try: + self.terminate() + except BaseException: + pytest.fail("Unable to shut down server") + raise async def ok(self, request): return aiohttp.web.Response() @@ -73,31 +85,24 @@ async def stream_handler(self, request): await resp.drain() return resp - def wait_until_up(self): - connected = False - for i in range(0, 30): - try: - # we need to bypass the proxies due to monkey patches - requests.get(self.endpoint_url + '/ok', timeout=0.5, - proxies=_proxy_bypass) - connected = True - break - except (requests.exceptions.ConnectionError, - requests.exceptions.ReadTimeout): - time.sleep(0.5) - except BaseException: - pytest.fail('unable to start and connect to aiohttp server') - raise - - if not connected: - pytest.fail('unable to start and connect to aiohttp server') - - async def stop(self): - if self._loop: - self._loop.stop() - - if not self._shutdown_evt.wait(20): - pytest.fail("Unable to shut down server") + async def _wait_until_up(self): + async with aiohttp.ClientSession() as session: + for i in range(0, 30): + if self.exitcode is not None: + pytest.fail('unable to start/connect to aiohttp server') + return + + try: + # we need to bypass the proxies due to monkey patches + await session.get(self.endpoint_url + '/ok', timeout=0.5) + return + except (aiohttp.ClientConnectionError, asyncio.TimeoutError): + await asyncio.sleep(0.5) + except BaseException: + pytest.fail('unable to start/connect to aiohttp server') + raise + + pytest.fail('unable to start and connect to aiohttp server') def start_service(service_name, host, port): diff --git a/tests/test_config.py b/tests/test_config.py index 29988138..52dd7915 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -47,34 +47,52 @@ def test_connector_args(): @pytest.mark.moto @pytest.mark.asyncio async def test_connector_timeout(event_loop): - server = AIOServer() session = AioSession(loop=event_loop) config = AioConfig(max_pool_connections=1, connect_timeout=1, retries={'max_attempts': 0}) - async with session.create_client('s3', config=config, - endpoint_url=server.endpoint_url, - aws_secret_access_key='xxx', - aws_access_key_id='xxx') as s3_client: + async with AIOServer() as server, \ + session.create_client('s3', config=config, + endpoint_url=server.endpoint_url, + aws_secret_access_key='xxx', + aws_access_key_id='xxx') as s3_client: - try: - server.wait_until_up() - - async def get_and_wait(): - await s3_client.get_object(Bucket='foo', Key='bar') - await asyncio.sleep(100) + async def get_and_wait(): + await s3_client.get_object(Bucket='foo', Key='bar') + await asyncio.sleep(100) - task1 = asyncio.Task(get_and_wait(), loop=event_loop) - task2 = asyncio.Task(get_and_wait(), loop=event_loop) + task1 = asyncio.Task(get_and_wait(), loop=event_loop) + task2 = asyncio.Task(get_and_wait(), loop=event_loop) - try: - done, pending = await asyncio.wait([task1, task2], - timeout=3, loop=event_loop) + try: + done, pending = await asyncio.wait([task1, task2], + timeout=3, loop=event_loop) - # second request should not timeout just because there isn't a - # connector available - assert len(pending) == 2 - finally: - task1.cancel() - task2.cancel() + # second request should not timeout just because there isn't a + # connector available + assert len(pending) == 2 finally: - await server.stop() + task1.cancel() + task2.cancel() + + +# Enable this once https://github.com/aio-libs/aiohttp/issues/3053 is fixed +# @pytest.mark.moto +# @pytest.mark.asyncio +# async def test_connector_timeout2(event_loop): +# session = AioSession(loop=event_loop) +# config = AioConfig(max_pool_connections=1, connect_timeout=1, +# read_timeout=1, retries={'max_attempts': 0}) +# async with AIOServer() as server, \ +# session.create_client('s3', config=config, +# endpoint_url=server.endpoint_url, +# aws_secret_access_key='xxx', +# aws_access_key_id='xxx') as s3_client: +# +# with pytest.raises(TimeoutError): +# try: +# resp = await s3_client.get_object(Bucket='foo', Key='bar') +# print() +# await resp["Body"].read() +# print() +# except BaseException as e: +# raise