diff --git a/aiohttp/web.py b/aiohttp/web.py index fd37dca23b4..64b33a5ab1d 100644 --- a/aiohttp/web.py +++ b/aiohttp/web.py @@ -171,6 +171,7 @@ def __init__(self, *, logger=web_logger, loop=None, self._on_pre_signal = PreSignal() self._on_post_signal = PostSignal() self._on_response_prepare = Signal(self) + self._on_startup = Signal(self) self._on_shutdown = Signal(self) self._on_cleanup = Signal(self) @@ -190,6 +191,10 @@ def on_pre_signal(self): def on_post_signal(self): return self._on_post_signal + @property + def on_startup(self): + return self._on_startup + @property def on_shutdown(self): return self._on_shutdown @@ -214,6 +219,14 @@ def make_handler(self, **kwargs): return self._handler_factory( self, self.router, loop=self.loop, **kwargs) + @asyncio.coroutine + def startup(self): + """Causes on_startup signal + + Should be called in the event loop along with the request handler. + """ + yield from self.on_startup.send(self) + @asyncio.coroutine def shutdown(self): """Causes on_shutdown signal @@ -267,9 +280,11 @@ def run_app(app, *, host='0.0.0.0', port=None, loop = app.loop handler = app.make_handler() - srv = loop.run_until_complete(loop.create_server(handler, host, port, - ssl=ssl_context, - backlog=backlog)) + server = loop.create_server(handler, host, port, ssl=ssl_context, + backlog=backlog) + srv, startup_res = loop.run_until_complete(asyncio.gather(server, + app.startup(), + loop=loop)) scheme = 'https' if ssl_context else 'http' print("======== Running on {scheme}://{host}:{port}/ ========\n" diff --git a/docs/web.rst b/docs/web.rst index c0c2a84f850..588bde3d3d7 100644 --- a/docs/web.rst +++ b/docs/web.rst @@ -944,7 +944,7 @@ handler:: return ws -Signal handler may looks like:: +Signal handler may look like:: async def on_shutdown(app): for ws in app['websockets']: @@ -986,6 +986,63 @@ finalizing. It's pretty close to :func:`run_app` utility function:: loop.run_until_complete(app.cleanup()) loop.close() +.. _aiohttp-web-background-tasks: + +Background tasks +----------------- + +Sometimes there's a need to perform some asynchronous operations just +after application start-up. + +Even more, in some sophisticated systems there could be a need to run some +background tasks in the event loop along with the application's request +handler. Such as listening to message queue or other network message/event +sources (e.g. ZeroMQ, Redis Pub/Sub, AMQP, etc.) to react to received messages +within the application. + +For example the background task could listen to ZeroMQ on :data:`zmq.SUB` socket, +process and forward retrieved messages to clients connected via WebSocket +that are stored somewhere in the application +(e.g. in the :obj:`application['websockets']` list). + +To run such short and long running background tasks aiohttp provides an +ability to register :attr:`Application.on_startup` signal handler(s) that +will run along with the application's request handler. + +For example there's a need to run one quick task and two long running +tasks that will live till the application is alive. The appropriate +background tasks could be registered as an :attr:`Application.on_startup` +signal handlers as shown in the example below:: + + app = web.Application() + + async def quickly_notify_monitoring(app): + """Send notification to monitoring service about the app process start-up""" + pass + + async def listen_to_zeromq(app): + """Listen to messages on zmq.SUB socket""" + pass + + async def listen_to_redis(app): + """Listen to messages from Redis Pub/Sub""" + pass + + async def run_all_long_running_tasks(app): + return await asyncio.gather(listen_to_zeromq(app), + listen_to_redis(app), + loop=app.loop) + app.on_startup.append(quickly_notify_monitoring) + app.on_startup.append(run_all_long_running_tasks) + web.run_app(app) + + +The :func:`quickly_notify_monitoring` from the example above will complete +and exit but :func:`listen_to_zeromq` and :func:`listen_to_redis` will take +forever. +An :attr:`Application.on_cleanup` signal handler may be used to send a +cancellation to all registered long-running tasks. + CORS support ------------ diff --git a/docs/web_reference.rst b/docs/web_reference.rst index a236023e739..d780683afa1 100644 --- a/docs/web_reference.rst +++ b/docs/web_reference.rst @@ -1042,6 +1042,21 @@ duplicated like one using :meth:`Application.copy`. async def on_prepare(request, response): pass + .. attribute:: on_startup + + A :class:`~aiohttp.signals.Signal` that is fired on application start-up. + + Subscribers may use the signal to run background tasks in the event + loop along with the application's request handler just after the + application start-up. + + Signal handlers should have the following signature:: + + async def on_startup(app): + pass + + .. seealso:: :ref:`aiohttp-web-background-tasks`. + .. attribute:: on_shutdown A :class:`~aiohttp.signals.Signal` that is fired on application shutdown. @@ -1076,7 +1091,6 @@ duplicated like one using :meth:`Application.copy`. .. seealso:: :ref:`aiohttp-web-graceful-shutdown` and :attr:`on_shutdown`. - .. method:: make_handler(**kwargs) Creates HTTP protocol factory for handling requests. @@ -1104,6 +1118,14 @@ duplicated like one using :meth:`Application.copy`. secure_proxy_ssl_header='X-Forwarded-Proto'), '0.0.0.0', 8080) + .. coroutinemethod:: startup() + + A :ref:`coroutine` that will be called along with the + application's request handler. + + The purpose of the method is calling :attr:`on_startup` signal + handlers. + .. coroutinemethod:: shutdown() A :ref:`coroutine` that should be called on diff --git a/tests/test_run_app.py b/tests/test_run_app.py index 4dc7e6a5455..fc781f3e2e6 100644 --- a/tests/test_run_app.py +++ b/tests/test_run_app.py @@ -9,12 +9,14 @@ def test_run_app_http(loop, mocker): loop.call_later(0.02, loop.stop) app = web.Application(loop=loop) + mocker.spy(app, 'startup') web.run_app(app, print=lambda *args: None) assert loop.is_closed() loop.create_server.assert_called_with(mock.ANY, '0.0.0.0', 8080, ssl=None, backlog=128) + app.startup.assert_called_once_with() def test_run_app_https(loop, mocker): @@ -22,6 +24,7 @@ def test_run_app_https(loop, mocker): loop.call_later(0.02, loop.stop) app = web.Application(loop=loop) + mocker.spy(app, 'startup') ssl_context = ssl.create_default_context() @@ -30,6 +33,7 @@ def test_run_app_https(loop, mocker): assert loop.is_closed() loop.create_server.assert_called_with(mock.ANY, '0.0.0.0', 8443, ssl=ssl_context, backlog=128) + app.startup.assert_called_once_with() def test_run_app_nondefault_host_port(loop, unused_port, mocker): @@ -40,12 +44,14 @@ def test_run_app_nondefault_host_port(loop, unused_port, mocker): loop.call_later(0.02, loop.stop) app = web.Application(loop=loop) + mocker.spy(app, 'startup') web.run_app(app, host=host, port=port, print=lambda *args: None) assert loop.is_closed() loop.create_server.assert_called_with(mock.ANY, host, port, ssl=None, backlog=128) + app.startup.assert_called_once_with() def test_run_app_custom_backlog(loop, mocker): @@ -53,9 +59,11 @@ def test_run_app_custom_backlog(loop, mocker): loop.call_later(0.02, loop.stop) app = web.Application(loop=loop) + mocker.spy(app, 'startup') web.run_app(app, backlog=10, print=lambda *args: None) assert loop.is_closed() loop.create_server.assert_called_with(mock.ANY, '0.0.0.0', 8080, ssl=None, backlog=10) + app.startup.assert_called_once_with() diff --git a/tests/test_web_application.py b/tests/test_web_application.py index ea404b382fe..78085a8549b 100644 --- a/tests/test_web_application.py +++ b/tests/test_web_application.py @@ -100,3 +100,48 @@ def on_shutdown(app_param): yield from app.shutdown() assert called + + +@pytest.mark.run_loop +def test_on_startup(loop): + app = web.Application(loop=loop) + + blocking_called = False + long_running1_called = False + long_running2_called = False + all_long_running_called = False + + def on_startup_blocking(app_param): + nonlocal blocking_called + assert app is app_param + blocking_called = True + + @asyncio.coroutine + def long_running1(app_param): + nonlocal long_running1_called + assert app is app_param + long_running1_called = True + + @asyncio.coroutine + def long_running2(app_param): + nonlocal long_running2_called + assert app is app_param + long_running2_called = True + + @asyncio.coroutine + def on_startup_all_long_running(app_param): + nonlocal all_long_running_called + assert app is app_param + all_long_running_called = True + return (yield from asyncio.gather(long_running1(app_param), + long_running2(app_param), + loop=app_param.loop)) + + app.on_startup.append(on_startup_blocking) + app.on_startup.append(on_startup_all_long_running) + + yield from app.startup() + assert blocking_called + assert long_running1_called + assert long_running2_called + assert all_long_running_called