Skip to content

Commit

Permalink
Use a separate decorator for handling "closed connection" errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Éric Lemoine committed Jun 17, 2020
1 parent e360db4 commit 44073c4
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 21 deletions.
55 changes: 40 additions & 15 deletions procrastinate/aiopg_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,34 +20,55 @@

def wrap_exceptions(coro: CoroutineFunction) -> CoroutineFunction:
"""
Wrap psycopg2 and aiopg errors as connector exceptions. In case we get a
psycopg2.OperationalError with a "server closed the connection
unexpectedly" message we retry a number of times.
Wrap psycopg2 and aiopg errors as connector exceptions.
This decorator is expected to be used on coroutine functions only.
"""

@functools.wraps(coro)
async def wrapped(*args, **kwargs):
try:
return await coro(*args, **kwargs)
except psycopg2.errors.UniqueViolation as exc:
raise exceptions.UniqueViolation(constraint_name=exc.diag.constraint_name)
except psycopg2.Error as exc:
raise exceptions.ConnectorException from exc

# Attaching a custom attribute to ease testability and make the
# decorator more introspectable
wrapped._exceptions_wrapped = True # type: ignore
return wrapped


def wrap_query_exceptions(coro: CoroutineFunction) -> CoroutineFunction:
"""
Detect ConnectorException's caused by psycopg2 OperationalError with a "server
closed the connection unexpectedly" message and retry a number of times.
This decorator is for decorating functions already decorated with wrap_exceptions.
"""

@functools.wraps(coro)
async def wrapped(*args, **kwargs):
final_exc = None
cnt = 0
max_tries = (
args[0]._pool.maxsize + 1 if args and getattr(args[0], "_pool", None) else 1
)
while cnt < max_tries:
try:
return await coro(*args, **kwargs)
except psycopg2.errors.UniqueViolation as exc:
raise exceptions.UniqueViolation(
constraint_name=exc.diag.constraint_name
)
except psycopg2.Error as exc:
if isinstance(
exc, psycopg2.errors.OperationalError
) and "server closed the connection unexpectedly" in str(exc):
final_exc = exc
cnt += 1
continue
raise exceptions.ConnectorException from exc
except exceptions.ConnectorException as exc:
exc_cause = exc.__cause__
if (
not exc_cause
or not isinstance(exc_cause, psycopg2.errors.OperationalError)
or "server closed the connection unexpectedly" not in str(exc_cause)
):
raise exc
final_exc = exc_cause
cnt += 1
continue
raise exceptions.ConnectorException(
"Could not get a valid connection after {} tries".format(max_tries)
) from final_exc
Expand Down Expand Up @@ -196,19 +217,22 @@ async def _get_pool(self) -> aiopg.Pool:
# Because of this, it's easier to have 2 distinct methods for executing from
# a pool or from a connection

@wrap_query_exceptions
@wrap_exceptions
async def execute_query_async(self, query: str, **arguments: Any) -> None:
pool = await self._get_pool()
with await pool.cursor() as cursor:
await cursor.execute(query, self._wrap_json(arguments))

@wrap_query_exceptions
@wrap_exceptions
async def _execute_query_connection(
self, query: str, connection: aiopg.Connection, **arguments: Any,
) -> None:
async with connection.cursor() as cursor:
await cursor.execute(query, self._wrap_json(arguments))

@wrap_query_exceptions
@wrap_exceptions
async def execute_query_one_async(
self, query: str, **arguments: Any
Expand All @@ -219,6 +243,7 @@ async def execute_query_one_async(

return await cursor.fetchone()

@wrap_query_exceptions
@wrap_exceptions
async def execute_query_all_async(
self, query: str, **arguments: Any
Expand Down
12 changes: 6 additions & 6 deletions tests/unit/test_aiopg_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ async def corofunc(a, b):


@pytest.mark.asyncio
async def test_wrap_exceptions_after_retry(mocker):
@aiopg_connector.wrap_exceptions
async def test_wrap_query_exceptions(mocker):
@aiopg_connector.wrap_query_exceptions
async def corofunc(connector):
raise psycopg2.errors.OperationalError(
raise exceptions.ConnectorException from psycopg2.errors.OperationalError(
"server closed the connection unexpectedly"
)

Expand All @@ -70,15 +70,15 @@ async def corofunc(connector):


@pytest.mark.asyncio
async def test_wrap_exceptions_success_after_retry(mocker):
async def test_wrap_query_exceptions_success(mocker):
cnt = 0

@aiopg_connector.wrap_exceptions
@aiopg_connector.wrap_query_exceptions
async def corofunc(connector, a, b):
nonlocal cnt
if cnt < 2:
cnt += 1
raise psycopg2.errors.OperationalError(
raise exceptions.ConnectorException from psycopg2.errors.OperationalError(
"server closed the connection unexpectedly"
)
return a, b
Expand Down

0 comments on commit 44073c4

Please sign in to comment.