Skip to content

Commit

Permalink
Synchronous connector
Browse files Browse the repository at this point in the history
  • Loading branch information
ewjoachim committed Jun 6, 2020
1 parent 47dae8d commit 7db8821
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 33 deletions.
2 changes: 2 additions & 0 deletions procrastinate/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from procrastinate import metadata as _metadata_module
from procrastinate.aiopg_connector import AiopgConnector
from procrastinate.psycopg2_connector import Psycopg2Connector
from procrastinate.app import App
from procrastinate.job_context import JobContext
from procrastinate.retry import BaseRetryStrategy, RetryStrategy
Expand All @@ -9,6 +10,7 @@
"JobContext",
"BaseRetryStrategy",
"AiopgConnector",
"Psycopg2Connector",
"RetryStrategy",
]

Expand Down
14 changes: 11 additions & 3 deletions procrastinate/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,22 @@ def __init__(
self,
*,
connector: connector_module.BaseConnector,
sync_connector: Optional[connector_module.BaseSyncConnector] = None,
import_paths: Optional[Iterable[str]] = None,
worker_defaults: Optional[Dict] = None,
):
"""
Parameters
----------
connector :
Instance of a subclass of :py:class:`procrastinate.connector.BaseConnector`,
typically `AiopgConnector`. It will be responsible for all communications
Typically an `AiopgConnector`. It will be responsible for all communications
with the database. Mandatory.
sync_defer_connector :
Typically a `Psycopg2Connector`. If set, will be used for synchronous defer
operations. By default, ``.defer()`` will start an event loop that will call
``.defer_async()`` which might cause problems if your program is
multithreaded, depending on your setup. Using a ``sync_defer_connector``
makes the operation purely synchronous.
import_paths :
List of python dotted paths of modules to import, to make sure
that the workers know about all possible tasks.
Expand All @@ -86,7 +92,9 @@ def __init__(
self.import_paths = import_paths or []
self.worker_defaults = worker_defaults or {}

self.job_store = store.JobStore(connector=self.connector)
self.job_store = store.JobStore(
connector=self.connector, sync_connector=sync_connector
)

self._register_builtin_tasks()

Expand Down
10 changes: 10 additions & 0 deletions procrastinate/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,13 @@ async def listen_notify(
self, event: asyncio.Event, channels: Iterable[str]
) -> None:
raise NotImplementedError


class BaseSyncConnector:
json_dumps: Optional[Callable] = None

def close(self) -> None:
pass

def execute_query_one(self, query: str, **arguments: Any) -> Dict[str, Any]:
raise NotImplementedError
34 changes: 23 additions & 11 deletions procrastinate/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import attr

from procrastinate import types, utils
from procrastinate import types

if TYPE_CHECKING:
from procrastinate import store # noqa
Expand Down Expand Up @@ -103,7 +103,6 @@ def call_string(self):
return f"{self.task_name}[{self.id}]({kwargs_string})"


@utils.add_sync_api
class JobDeferrer:
"""
The main purpose of ``JobDeferrer`` is to get a hold of the job_store and the job,
Expand All @@ -121,21 +120,34 @@ def make_new_job(self, **task_kwargs: types.JSONValue) -> Job:

return attr.evolve(self.job, task_kwargs=final_kwargs)

async def defer_async(self, **task_kwargs: types.JSONValue) -> int:
"""
See `Task.defer` for details.
"""

job = self.make_new_job(**task_kwargs)

def _log_before_defer_job(self, job: Job) -> None:
logger.debug(
f"About to defer job {job.call_string}",
extra={"action": "about_to_defer_job", "job": job.log_context()},
)
id = await self.job_store.defer_job(job=job)
job = job.evolve(id=id)

def _log_after_defer_job(self, job: Job) -> None:

logger.info(
f"Deferred job {job.call_string}",
extra={"action": "job_defer", "job": job.log_context()},
)

async def defer_async(self, **task_kwargs: types.JSONValue) -> int:
"""
See `Task.defer` for details.
"""
# Make sure this code stays synchronized with .defer()
job = self.make_new_job(**task_kwargs)
self._log_before_defer_job(job=job)
id = await self.job_store.defer_job_async(job=job)
self._log_after_defer_job(job=job.evolve(id=id))
return id

def defer(self, **task_kwargs: types.JSONValue) -> int:
# Make sure this code stays synchronized with .defer_async()
job = self.make_new_job(**task_kwargs)
self._log_before_defer_job(job=job)
id = self.job_store.defer_job(job=job)
self._log_after_defer_job(job=job.evolve(id=id))
return id
124 changes: 124 additions & 0 deletions procrastinate/psycopg2_connector.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import asyncio
import contextlib
import functools
import logging
from typing import Any, Callable, Dict, Optional

import psycopg2
import psycopg2.errors
import psycopg2.pool
from psycopg2.extras import Json, RealDictCursor

from procrastinate import connector, exceptions, utils

logger = logging.getLogger(__name__)


def wrap_exceptions(func: Callable) -> Callable:
"""
Wrap psycopg2 errors as connector exceptions
This decorator is expected to be used on coroutine functions only
"""

@functools.wraps(func)
def wrapped(*args, **kwargs):
try:
return func(*args, **kwargs)
except psycopg2.errors.UniqueViolation as exc:
raise exceptions.UniqueViolation(constraint_name=exc.diag.constraint_name)
except psycopg2.Error as exc:
raise exceptions.ConnectorException from exc

# Attaching a custom attribute to ease testability and make the
# decorator more introspectable
wrapped._exceptions_wrapped = True # type: ignore
return wrapped


@utils.add_sync_api
class Psycopg2Connector(connector.BaseSyncConnector):
def __init__(
self, *, json_dumps: Optional[Callable] = None, **kwargs: Any,
):
"""
``psycopg2.pool.ThreadedConnectionPool`` which it creates at instanciation time.
This is used if you want your .defer() calls to be pureley synchronous, not
asynchronous with a sync wrapper. You may need this if your program is
multithreaded and doesn't handle async loops well
(see `discussion-async-sync-defer`).
All other arguments than ``json_dumps`` are passed to
:py:func:`ThreadedConnectionPool` (see psycopg2 documentation__), with default
values that may differ from those of ``psycopg2`` (see a partial list of
parameters below).
.. _psycopg2 doc: https://www.psycopg.org/docs/extras.html#json-adaptation
.. __: https://www.psycopg.org/docs/pool.html
#psycopg2.pool.ThreadedConnectionPool
Parameters
----------
minconn : int
Mandatory, passed to psycopg2.
maxconn : int
Mandatory, passed to psycopg2.
json_dumps :
The JSON dumps function to use for serializing job arguments. Defaults to
the function used by psycopg2. See the `psycopg2 doc`_.
dsn : ``Optional[str]``
Passed to psycopg2. Default is "" instead of None, which means if no
argument is passed, it will connect to localhost:5432 instead of a
Unix-domain local socket file.
cursor_factory : ``psycopg2.extensions.cursor``
Passed to psycopg2. Default is ``psycopg2.extras.RealDictCursor``
instead of standard cursor. There is no identified use case for changing
this.
"""
self.json_dumps = json_dumps
pool_args = self._adapt_pool_args(kwargs)
self._pool = psycopg2.pool.ThreadedConnectionPool(**pool_args)
self._lock = asyncio.Lock()

@staticmethod
def _adapt_pool_args(pool_args: Dict[str, Any]) -> Dict[str, Any]:
"""
Adapt the pool args for ``psycopg2``, using sensible defaults for Procrastinate.
"""
final_args = {
"dsn": "",
"cursor_factory": RealDictCursor,
}
final_args.update(pool_args)
return final_args

@wrap_exceptions
def close(self) -> None:
"""
Close the pool and awaits all connections to be released.
"""
if self._pool:
self._pool.closeall()

def _wrap_json(self, arguments: Dict[str, Any]):
return {
key: Json(value, dumps=self.json_dumps)
if isinstance(value, dict)
else value
for key, value in arguments.items()
}

@contextlib.contextmanager
def _connection(self) -> psycopg2.extensions.connection:
try:
with self._pool.getconn() as connection:
yield connection
finally:
self._pool.putconn(connection)

@wrap_exceptions
def execute_query_one(self, query: str, **arguments: Any) -> Dict[str, Any]:
with self._connection() as connection:
with connection.cursor() as cursor:
cursor.execute(query, self._wrap_json(arguments))
return cursor.fetchone()
72 changes: 55 additions & 17 deletions procrastinate/store.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import asyncio
import datetime
import uuid
from typing import Iterable, Optional
from typing import Any, Iterable, Optional, Dict

from procrastinate import connector, exceptions, jobs, sql
from procrastinate import connector, exceptions, jobs, sql, utils


def get_channel_for_queues(queues: Optional[Iterable[str]] = None) -> Iterable[str]:
Expand All @@ -13,31 +13,69 @@ def get_channel_for_queues(queues: Optional[Iterable[str]] = None) -> Iterable[s
return ["procrastinate_queue#" + queue for queue in queues]


@utils.add_sync_api
class JobStore:
def __init__(self, connector: connector.BaseConnector):
def __init__(
self,
connector: connector.BaseConnector,
sync_connector: Optional[connector.BaseSyncConnector] = None,
):
self.connector = connector
self.sync_connector = sync_connector

async def defer_job(self, job: jobs.Job) -> int:
# This function's automated counterpart will be _defer_job(), not
# defer_job() because that one already exists.
async def defer_job_async(self, job: jobs.Job) -> int:
# Make sure this code stays synchronized with ._defer_job_sync()
try:
result = await self.connector.execute_query_one(
query=sql.queries["defer_job"],
task_name=job.task_name,
lock=job.lock or str(uuid.uuid4()),
queueing_lock=job.queueing_lock,
args=job.task_kwargs,
scheduled_at=job.scheduled_at,
queue=job.queue,
**self._defer_job_query_kwargs(job=job)
)
except exceptions.UniqueViolation as exc:
if exc.constraint_name == connector.QUEUEING_LOCK_CONSTRAINT:
raise exceptions.AlreadyEnqueued(
"Job cannot be enqueued: there is already a job in the queue "
f"with the lock {job.queueing_lock}"
) from exc
raise
self._raise_already_enqueued(exc=exc, job=job)

return result["id"]

def _defer_job_sync(self, job: jobs.Job) -> int:
# Make sure this code stays synchronized with .defer_job_async()
assert self.sync_connector
try:
result = self.sync_connector.execute_query_one(
**self._defer_job_query_kwargs(job=job)
)
except exceptions.UniqueViolation as exc:
self._raise_already_enqueued(exc=exc, job=job)

return result["id"]

def _defer_job_query_kwargs(self, job: jobs.Job) -> Dict[str, Any]:

return {
"query": sql.queries["defer_job"],
"task_name": job.task_name,
"lock": job.lock or str(uuid.uuid4()),
"queueing_lock": job.queueing_lock,
"args": job.task_kwargs,
"scheduled_at": job.scheduled_at,
"queue": job.queue,
}

def _raise_already_enqueued(self, exc: exceptions.UniqueViolation, job: jobs.Job):
if exc.constraint_name == connector.QUEUEING_LOCK_CONSTRAINT:
raise exceptions.AlreadyEnqueued(
"Job cannot be enqueued: there is already a job in the queue "
f"with the lock {job.queueing_lock}"
) from exc
raise exc

def defer_job(self, job: jobs.Job) -> int:
if self.sync_connector:
# The manually written sync version
return self._defer_job_sync(job=job)
else:
# The automated sync wrapper around the async version
return self._defer_job(job=job) # type: ignore

async def fetch_job(self, queues: Optional[Iterable[str]]) -> Optional[jobs.Job]:

row = await self.connector.execute_query_one(
Expand Down
4 changes: 3 additions & 1 deletion procrastinate_demo/app.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import procrastinate

app = procrastinate.App(
connector=procrastinate.AiopgConnector(), import_paths=["procrastinate_demo.tasks"],
connector=procrastinate.AiopgConnector(),
sync_connector=procrastinate.Psycopg2Connector(minconn=0, maxconn=2),
import_paths=["procrastinate_demo.tasks"],
)
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ packages = find:
install_requires =
aiopg
attrs
pendulum
click
pendulum
psycopg2-binary # This is a dependency of aiopg anyway
typing-extensions
# Backport from Python 3.8
importlib-metadata
Expand Down

0 comments on commit 7db8821

Please sign in to comment.