Skip to content

Commit

Permalink
psycopg2: handle AdminShutdown exceptions
Browse files Browse the repository at this point in the history
  • Loading branch information
Éric Lemoine committed Jun 18, 2020
1 parent 617bfc6 commit 5d970b3
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 19 deletions.
34 changes: 21 additions & 13 deletions procrastinate/psycopg2_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,12 @@ def wrapped(*args, **kwargs):

def wrap_query_exceptions(func: Callable) -> Callable:
"""
Detect psycopg2 InterfaceError's with a "connection already closed" message and
retry a number of times.
Detect "admin shutdown" errors and retry a number of times.
This is to handle the case where the database connection (obtained from the pool)
was actually closed by the server. In this case, pyscopg2 raises an InterfaceError
with a "connection already closed" message (and no pgcode) when the connection is
used for issuing a query. What we do is retry when an InterfaceError is raised, and
until max_tries is reached.
was actually closed by the server. In this case, pyscopg2 raises an AdminShutdown
exception when the connection is used for issuing a query. What we do is retry when
an AdminShutdown is raised, and until max_tries is reached.
max_tries is set to the max pool size + 1 (maxconn + 1) to handle the case where
all the connections we have in the pool were closed by the server.
Expand All @@ -58,11 +56,8 @@ def wrapped(*args, **kwargs):
for _ in range(max_tries):
try:
return func(*args, **kwargs)
except psycopg2.errors.InterfaceError as exc:
if "connection already closed" in str(exc):
final_exc = exc
continue
raise exc
except psycopg2.errors.AdminShutdown:
continue
raise exceptions.ConnectorException(
"Could not get a valid connection after {} tries".format(max_tries)
) from final_exc
Expand Down Expand Up @@ -160,10 +155,23 @@ def _wrap_json(self, arguments: Dict[str, Any]):

@contextlib.contextmanager
def _connection(self) -> psycopg2.extensions.connection:
# in case of an admin shutdown (Postgres error code 57P01) we do not
# rollback the connection or put the connection back to the pool as
# this will cause a psycopg2.InterfaceError exception
try:
with self._pool.getconn() as connection:
connection = self._pool.getconn()
try:
yield connection
finally:
except psycopg2.errors.AdminShutdown:
raise
except Exception:
connection.rollback()
raise
else:
connection.commit()
except psycopg2.errors.AdminShutdown:
raise
else:
self._pool.putconn(connection)

@wrap_exceptions
Expand Down
10 changes: 10 additions & 0 deletions tests/integration/test_psycopg2_connector.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import functools
import json

import psycopg2.errors
import pytest

from procrastinate import psycopg2_connector
Expand Down Expand Up @@ -32,6 +33,15 @@ def test_connection(psycopg2_connector_factory, connection_params):
assert connection.dsn == "dbname=" + connection_params["dbname"]


@pytest.mark.parametrize("exception", [Exception, psycopg2.errors.AdminShutdown])
def test_connection_exception(psycopg2_connector_factory, connection_params, exception):

connector = psycopg2_connector_factory()
with pytest.raises(exception):
with connector._connection():
raise exception


@pytest.mark.parametrize(
"method_name, expected",
[
Expand Down
11 changes: 5 additions & 6 deletions tests/unit/test_psycopg2_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def test_wrap_query_exceptions_reached_max_tries(mocker):
@psycopg2_connector.wrap_query_exceptions
def func(connector):
called.append(True)
raise psycopg2.errors.InterfaceError("connection already closed")
raise psycopg2.errors.AdminShutdown()

connector = mocker.Mock(_pool=mocker.Mock(maxconn=5))

Expand All @@ -38,18 +38,17 @@ def func(connector):
assert str(excinfo.value) == "Could not get a valid connection after 6 tries"


@pytest.mark.parametrize("exception_class", [Exception, psycopg2.errors.InterfaceError])
def test_wrap_query_exceptions_unhandled_exception(mocker, exception_class):
def test_wrap_query_exceptions_unhandled_exception(mocker):
called = []

@psycopg2_connector.wrap_query_exceptions
def func(connector):
called.append(True)
raise exception_class("foo")
raise psycopg2.errors.OperationalError()

connector = mocker.Mock(_pool=mocker.Mock(maxconn=5))

with pytest.raises(exception_class):
with pytest.raises(psycopg2.errors.OperationalError):
func(connector)

assert len(called) == 1
Expand All @@ -62,7 +61,7 @@ def test_wrap_query_exceptions_success(mocker):
def func(connector, a, b):
if len(called) < 2:
called.append(True)
raise psycopg2.errors.InterfaceError("connection already closed")
raise psycopg2.errors.AdminShutdown()
return a, b

connector = mocker.Mock(_pool=mocker.Mock(maxconn=5))
Expand Down

0 comments on commit 5d970b3

Please sign in to comment.