From 44073c4ad854e0f24a560ebfa324548cfd4b3864 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=89ric=20Lemoine?= Date: Wed, 17 Jun 2020 19:21:41 +0200 Subject: [PATCH] Use a separate decorator for handling "closed connection" errors --- procrastinate/aiopg_connector.py | 55 ++++++++++++++++++++++-------- tests/unit/test_aiopg_connector.py | 12 +++---- 2 files changed, 46 insertions(+), 21 deletions(-) diff --git a/procrastinate/aiopg_connector.py b/procrastinate/aiopg_connector.py index 19360a866..05c19a78b 100644 --- a/procrastinate/aiopg_connector.py +++ b/procrastinate/aiopg_connector.py @@ -20,15 +20,37 @@ def wrap_exceptions(coro: CoroutineFunction) -> CoroutineFunction: """ - Wrap psycopg2 and aiopg errors as connector exceptions. In case we get a - psycopg2.OperationalError with a "server closed the connection - unexpectedly" message we retry a number of times. + Wrap psycopg2 and aiopg errors as connector exceptions. This decorator is expected to be used on coroutine functions only. """ @functools.wraps(coro) async def wrapped(*args, **kwargs): + try: + return await coro(*args, **kwargs) + except psycopg2.errors.UniqueViolation as exc: + raise exceptions.UniqueViolation(constraint_name=exc.diag.constraint_name) + except psycopg2.Error as exc: + raise exceptions.ConnectorException from exc + + # Attaching a custom attribute to ease testability and make the + # decorator more introspectable + wrapped._exceptions_wrapped = True # type: ignore + return wrapped + + +def wrap_query_exceptions(coro: CoroutineFunction) -> CoroutineFunction: + """ + Detect ConnectorException's caused by psycopg2 OperationalError with a "server + closed the connection unexpectedly" message and retry a number of times. + + This decorator is for decorating functions already decorated with wrap_exceptions. + """ + + @functools.wraps(coro) + async def wrapped(*args, **kwargs): + final_exc = None cnt = 0 max_tries = ( args[0]._pool.maxsize + 1 if args and getattr(args[0], "_pool", None) else 1 @@ -36,18 +58,17 @@ async def wrapped(*args, **kwargs): while cnt < max_tries: try: return await coro(*args, **kwargs) - except psycopg2.errors.UniqueViolation as exc: - raise exceptions.UniqueViolation( - constraint_name=exc.diag.constraint_name - ) - except psycopg2.Error as exc: - if isinstance( - exc, psycopg2.errors.OperationalError - ) and "server closed the connection unexpectedly" in str(exc): - final_exc = exc - cnt += 1 - continue - raise exceptions.ConnectorException from exc + except exceptions.ConnectorException as exc: + exc_cause = exc.__cause__ + if ( + not exc_cause + or not isinstance(exc_cause, psycopg2.errors.OperationalError) + or "server closed the connection unexpectedly" not in str(exc_cause) + ): + raise exc + final_exc = exc_cause + cnt += 1 + continue raise exceptions.ConnectorException( "Could not get a valid connection after {} tries".format(max_tries) ) from final_exc @@ -196,12 +217,14 @@ async def _get_pool(self) -> aiopg.Pool: # Because of this, it's easier to have 2 distinct methods for executing from # a pool or from a connection + @wrap_query_exceptions @wrap_exceptions async def execute_query_async(self, query: str, **arguments: Any) -> None: pool = await self._get_pool() with await pool.cursor() as cursor: await cursor.execute(query, self._wrap_json(arguments)) + @wrap_query_exceptions @wrap_exceptions async def _execute_query_connection( self, query: str, connection: aiopg.Connection, **arguments: Any, @@ -209,6 +232,7 @@ async def _execute_query_connection( async with connection.cursor() as cursor: await cursor.execute(query, self._wrap_json(arguments)) + @wrap_query_exceptions @wrap_exceptions async def execute_query_one_async( self, query: str, **arguments: Any @@ -219,6 +243,7 @@ async def execute_query_one_async( return await cursor.fetchone() + @wrap_query_exceptions @wrap_exceptions async def execute_query_all_async( self, query: str, **arguments: Any diff --git a/tests/unit/test_aiopg_connector.py b/tests/unit/test_aiopg_connector.py index 2df866201..db53bbfb4 100644 --- a/tests/unit/test_aiopg_connector.py +++ b/tests/unit/test_aiopg_connector.py @@ -53,10 +53,10 @@ async def corofunc(a, b): @pytest.mark.asyncio -async def test_wrap_exceptions_after_retry(mocker): - @aiopg_connector.wrap_exceptions +async def test_wrap_query_exceptions(mocker): + @aiopg_connector.wrap_query_exceptions async def corofunc(connector): - raise psycopg2.errors.OperationalError( + raise exceptions.ConnectorException from psycopg2.errors.OperationalError( "server closed the connection unexpectedly" ) @@ -70,15 +70,15 @@ async def corofunc(connector): @pytest.mark.asyncio -async def test_wrap_exceptions_success_after_retry(mocker): +async def test_wrap_query_exceptions_success(mocker): cnt = 0 - @aiopg_connector.wrap_exceptions + @aiopg_connector.wrap_query_exceptions async def corofunc(connector, a, b): nonlocal cnt if cnt < 2: cnt += 1 - raise psycopg2.errors.OperationalError( + raise exceptions.ConnectorException from psycopg2.errors.OperationalError( "server closed the connection unexpectedly" ) return a, b