Skip to content

Commit

Permalink
Merge pull request #173 from peopledoc/pool-148-another-bleeping-time
Browse files Browse the repository at this point in the history
Use a pool instead of a single connection, and reorganize the worker around that
  • Loading branch information
Joachim Jablon authored Mar 25, 2020
2 parents 6a43c5e + c5ed9e1 commit 64c2e2e
Show file tree
Hide file tree
Showing 33 changed files with 951 additions and 563 deletions.
5 changes: 2 additions & 3 deletions docs/howto/testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@ To use it, you can do::

app = procrastinate.App(connector=procrastinate.testing.InMemoryConnector())

# Run the jobs your tests created, then stop
# the worker:
app.run_worker(only_once=True)
# Run the jobs your tests created, then stop the worker
app.run_worker(wait=False)

# See the jobs created:
print(app.connector.jobs)
Expand Down
6 changes: 5 additions & 1 deletion docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ are accessible through :py:attr:`App.builtin_tasks`:
app.builtin_tasks["remove_old_jobs"].defer(max_hours=72)
Job stores
Connectors
----------

.. This does not indicate that create_with_pool* is an async classmethod because of
https://github.com/sphinx-doc/sphinx/issues/7189
.. autoclass:: procrastinate.PostgresConnector
:members: create_with_pool, create_with_pool_async, close, close_async

.. autoclass:: procrastinate.testing.InMemoryConnector
:members: reset
Expand Down
255 changes: 162 additions & 93 deletions procrastinate/aiopg_connector.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,49 @@
import asyncio
import logging
import warnings
from typing import Any, Callable, Dict, List, Optional
from typing import Any, Callable, Dict, Iterable, List, NoReturn, Optional

import aiopg
import psycopg2.sql
from psycopg2.extras import Json, RealDictCursor

from procrastinate import connector
from procrastinate import connector, sql, utils

logger = logging.getLogger(__name__)

LISTEN_TIMEOUT = 30.0


@utils.add_sync_api
class PostgresConnector(connector.BaseConnector):
def __init__(
self,
dsn="",
socket_timeout: float = connector.SOCKET_TIMEOUT,
pool: aiopg.Pool,
json_dumps: Optional[Callable] = None,
json_loads: Optional[Callable] = None,
**kwargs: Any,
):
"""
All parameters except ``socket_timeout``, ``json_dumps`` and ``json_loads``
are passed to :py:func:`aiopg.connect` (see the documentation__)
The pool connections are expected to have jsonb adapters.
.. __: https://aiopg.readthedocs.io/en/stable/core.html#connection
.. _psycopg2 doc: https://www.psycopg.org/docs/extras.html#json-adaptation
See parameter details in :py:func:`PostgresConnector.create_with_pool`.
Parameters
----------
socket_timeout:
This parameter should generally not be changed.
It indicates the maximum duration (in seconds) procrastinate workers wait
between each database job pull. Job activity will be pushed from the db to
the worker, but in case the push mechanism fails somehow, workers will not
stay idle longer than the number of seconds indicated by this parameters.
json_dumps:
The JSON dumps function to use for serializing job arguments. Defaults to
the function used by psycopg2. See the `psycopg2 doc`_.
json_loads:
The JSON loads function to use for deserializing job arguments. Defaults
to the function used by psycopg2. See the `psycopg2 doc`_.
pool:
An aiopg pool, either externally configured or passed by
:py:func:`PostgresConnector.create_with_pool`.
"""
kwargs["dsn"] = dsn
self._connection_parameters = kwargs
self._connection: Optional[aiopg.Connection] = None
self._lock = asyncio.Lock()
self.socket_timeout = socket_timeout
self._pool = pool
self.json_dumps = json_dumps
self.json_loads = json_loads

async def close_connection(self) -> None:
if not self._connection or self._connection.closed:
return
await self._connection.close()
async def close_async(self) -> None:
"""
Closes the pool and awaits all connections to be released.
"""
self._pool.close()
await self._pool.wait_closed()

def _wrap_json(self, arguments: Dict[str, Any]):
return {
Expand All @@ -65,52 +53,112 @@ def _wrap_json(self, arguments: Dict[str, Any]):
for key, value in arguments.items()
}

async def _get_connection(self) -> aiopg.Connection:
# fast path
if self._connection and not self._connection.closed:
return self._connection
async with self._lock:
if self._connection and not self._connection.closed:
return self._connection
# tell aiopg not to register adapters for hstore & json by default, as
# those are registered at the module level and could overwrite previously
# defined adapters
kwargs = self._connection_parameters
kwargs.setdefault("enable_json", False)
kwargs.setdefault("enable_hstore", False)
kwargs.setdefault("enable_uuid", False)
conn = await aiopg.connect(**kwargs)
if self.json_loads:
psycopg2.extras.register_default_jsonb(conn.raw, loads=self.json_loads)
self._connection = conn
return conn
@classmethod
async def create_with_pool_async(
cls,
json_dumps: Optional[Callable] = None,
json_loads: Optional[Callable] = None,
**kwargs,
) -> aiopg.Pool:
"""
Creates a connector, and its connection pool, using the provided parameters.
All additional parameters will be used to create a
:py:func:`aiopg.Pool` (see the documentation__), sometimes with a different
default value.
When using this method, you explicitely take the responsibility for opening the
pool. It's your responsibility to call
:py:func:`procrastinate.PostgresConnector.close` or
:py:func:`procrastinate.PostgresConnector.close_async` to close connections
when your process ends.
.. __: https://aiopg.readthedocs.io/en/stable/core.html#aiopg.create_pool
.. _psycopg2 doc: https://www.psycopg.org/docs/extras.html#json-adaptation
json_dumps:
The JSON dumps function to use for serializing job arguments. Defaults to
the function used by psycopg2. See the `psycopg2 doc`_.
json_loads:
The JSON loads function to use for deserializing job arguments. Defaults
to the function used by psycopg2. See the `psycopg2 doc`_. Unused if pool
is passed.
dsn (Optional[str]):
Passed to aiopg. Default is "" instead of None, which means if no argument
is passed, it will connect to localhost:5432 instead of a Unix-domain
local socket file.
enable_json (bool):
Passed to aiopg. Default is False instead of True to avoid messing with
the global state.
enable_hstore (bool):
Passed to aiopg. Default is False instead of True to avoid messing with
the global state.
enable_uuid (bool):
Passed to aiopg. Default is False instead of True to avoid messing with
the global state.
cursor_factory (psycopg2.extensions.cursor):
Passed to aiopg. Default is :py:class:`psycopg2.extras.RealDictCursor`
instead of standard cursor. There is no identified use case for changing
this.
maxsize (int):
Passed to aiopg. Cannot be lower than 2, otherwise worker won't be
functionning normally (one connection for listen/notify, one for executing
tasks)
"""
base_on_connect = kwargs.pop("on_connect", None)

async def on_connect(connection):
if base_on_connect:
await base_on_connect(connection)
if json_loads:
psycopg2.extras.register_default_jsonb(connection.raw, loads=json_loads)

defaults = {
"dsn": "",
"enable_json": False,
"enable_hstore": False,
"enable_uuid": False,
"on_connect": on_connect,
"cursor_factory": RealDictCursor,
}
if "maxsize" in kwargs:
kwargs["maxsize"] = max(2, kwargs["maxsize"])

defaults.update(kwargs)

pool = await aiopg.create_pool(**defaults)

return cls(pool=pool, json_dumps=json_dumps, json_loads=json_loads)

# Pools and single connections do not exactly share their cursor API:
# - connection.cursor() is an async context manager (async with)
# - pool.cursor() is a coroutine returning a sync context manage (with await)
# Because of this, it's easier to have 2 distinct methods for executing from
# a pool or from a connection

async def execute_query(self, query: str, **arguments: Any) -> None:
connection = await self._get_connection()
async with self._lock:
# aiopg can work with psycopg2's cursor class
async with connection.cursor(cursor_factory=RealDictCursor) as cursor:
await cursor.execute(query, self._wrap_json(arguments))
with await self._pool.cursor() as cursor:
await cursor.execute(query, self._wrap_json(arguments))

async def _execute_query_connection(
self, query: str, connection: aiopg.Connection, **arguments: Any,
) -> None:
async with connection.cursor() as cursor:
await cursor.execute(query, self._wrap_json(arguments))

async def execute_query_one(self, query: str, **arguments: Any) -> Dict[str, Any]:
connection = await self._get_connection()
async with self._lock:
# aiopg can work with psycopg2's cursor class
async with connection.cursor(cursor_factory=RealDictCursor) as cursor:
await cursor.execute(query, self._wrap_json(arguments))
with await self._pool.cursor() as cursor:
await cursor.execute(query, self._wrap_json(arguments))

return await cursor.fetchone()
return await cursor.fetchone()

async def execute_query_all(
self, query: str, **arguments: Any
) -> List[Dict[str, Any]]:
connection = await self._get_connection()
async with self._lock:
# aiopg can work with psycopg2's cursor class
async with connection.cursor(cursor_factory=RealDictCursor) as cursor:
await cursor.execute(query, self._wrap_json(arguments))

return await cursor.fetchall()
with await self._pool.cursor() as cursor:
await cursor.execute(query, self._wrap_json(arguments))

return await cursor.fetchall()

def make_dynamic_query(self, query: str, **identifiers: str) -> str:
return psycopg2.sql.SQL(query).format(
Expand All @@ -120,30 +168,51 @@ def make_dynamic_query(self, query: str, **identifiers: str) -> str:
}
)

async def wait_for_activity(self):
connection = await self._get_connection()
try:
await asyncio.wait_for(
connection.notifies.get(), timeout=self.socket_timeout
)
except asyncio.TimeoutError:
pass

# This can be called from a signal handler, better not do async stuff
def interrupt_wait(self):
if not self._connection or self._connection.closed:
return
asyncio.get_event_loop().call_soon_threadsafe(
self._connection.notifies.put_nowait, "s"
)


class PostgresJobStore(PostgresConnector):
def __init__(self, *args, **kwargs):
message = (
"Use procrastinate.PostgresConnector(...) "
"instead of procrastinate.PostgresJobStore(...), with the same arguments"
)
logger.warn(f"Deprecation Warning: {message}")
warnings.warn(DeprecationWarning(message))
super().__init__(*args, **kwargs)
async def listen_notify(
self, event: asyncio.Event, channels: Iterable[str]
) -> NoReturn:
# We need to acquire a dedicated connection, and use the listen
# query
while True:
async with self._pool.acquire() as connection:
for channel_name in channels:
await self._execute_query_connection(
connection=connection,
query=self.make_dynamic_query(
query=sql.queries["listen_queue"], channel_name=channel_name
),
)
# Initial set() lets caller know that we're ready to listen
event.set()
await self._loop_notify(event=event, connection=connection)

async def _loop_notify(
self,
event: asyncio.Event,
connection: aiopg.Connection,
timeout: float = LISTEN_TIMEOUT,
) -> None:
# We'll leave this loop with a CancelledError, when we get cancelled
while True:
# because of https://github.com/aio-libs/aiopg/issues/249,
# we could get stuck in here forever if the connection closes.
# That's why we need timeout and if connection is closed, reopen
# a new one.
if connection.closed:
return
try:
await asyncio.wait_for(connection.notifies.get(), timeout)
except asyncio.TimeoutError:
continue

event.set()


def PostgresJobStore(*args, **kwargs):
message = (
"Use procrastinate.PostgresConnector(...) "
"instead of procrastinate.PostgresJobStore(...), with the same arguments"
)
logger.warn(f"Deprecation Warning: {message}")
warnings.warn(DeprecationWarning(message))
return PostgresConnector.create_with_pool(*args, **kwargs)
Loading

0 comments on commit 64c2e2e

Please sign in to comment.