Skip to content

Commit

Permalink
Async 9 new approach (#96)
Browse files Browse the repository at this point in the history
Async 9 new approach
  • Loading branch information
Joachim Jablon authored Nov 3, 2019
2 parents f1f25be + cc88982 commit b9c983e
Show file tree
Hide file tree
Showing 40 changed files with 1,203 additions and 1,226 deletions.
6 changes: 6 additions & 0 deletions CONTRIBUTING.rst
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,9 @@ Schedule some tasks with:
.. code-block:: console
(venv) $ python -m procrastinate_demo
Wait, there are ``async`` and ``await`` keywords everywhere!?
-------------------------------------------------------------

Yes, in order to provide both a synchronous **and** asynchronous API, Procrastinate
needs to be asynchronous at core. Find out more in the Discussions section.
21 changes: 20 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Procrastinate: PostgreSQL-based Task Queue for Python

Procrastinate is an open-source Python 3.6+ distributed task processing
library, leveraging PostgreSQL to store task definitions, manage locks and
dispatch tasks.
dispatch tasks. It can be used within both sync and async code.

In other words, from your main code, you call specific functions (tasks) in a
special way and instead of being run on the spot, they're scheduled to
Expand Down Expand Up @@ -74,6 +74,25 @@ Similarly, from the command line:
# Run a worker
procrastinate worker sums
Lastly, you can use Procrastinate asynchronously too:

.. code-block:: python
# Define asynchronous tasks using coroutine functions
@app.task(queue="sums")
async def sum(a, b):
await asyncio.sleep(a + b)
# Launch a job asynchronously
await sum.defer_async(a=3, b=5)
# Somewhere in your program, run a worker asynchronously
worker = procrastinate.Worker(
app=app,
queues=["sums"]
)
await worker.run_async()
There are quite a few interesting features that Procrastinate adds to the mix.
You can head to the Quickstart section for a general tour or
to the How-To sections for specific features. The Discussion
Expand Down
46 changes: 41 additions & 5 deletions docs/discussions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,47 @@ guarantee.

.. _`advisory locks`: https://www.postgresql.org/docs/10/explicit-locking.html#ADVISORY-LOCKS

How stable is Procrastinate ?
-----------------------------

Not quite stable. There a lot of things we would like to do before we start
advertising the project, and so far, it's not used anywhere.
So far, Procrastinate implements async job deferring, and async job executing but
not in parallel, meaning it can run jobs written as a coroutine, but it will
only execute one job at a time.

Why is Procrastinate asynchronous at core?
------------------------------------------

There are several ways to write a program that can be called from both a synchronous
and an asynchronous code:

- Duplicate the codebase. It's not a fantastic idea. There's a high probability that
this will lead to awkward bugs, you'll have twice the work in maintenance etc.
The good thing is that it will force you to extract as much as the logic in a common
module, and have the I/Os very decoupled.

- Have the project be synchronous, and provide top level asynchronous wrappers that
run the synchronous code in a thread. This can be a possibility, but then you enter
a whole new circle of thread safety hell.

- Have the project be asynchronous, and provide top level synchronous wrappers that
will synchronously launch coroutines in the event loop and wait for them to be
completed. This is virtually the best solution we could find, and thus it's what
we decided to do.

We've even cheated a bit: instead of implementing our synchronous wrappers manually,
we've been using a trick that autogenerates a synchronous API based on our asynchronous
API. This way, we have less code to unit-test, and we can guarantee that the 2 APIs
will stay synchronized in the future no matter what. Want to know more about this?
Here are a few resources:

- How we generate our sync API:
https://github.com/peopledoc/procrastinate/blob/master/procrastinate/utils.py
- An interesting talk on the issues that appear when trying to make codebases compatible
with sync **and** async callers: "Just add await" from Andrew Godwin:
https://www.youtube.com/watch?v=oMHrDy62kgE

How stable is Procrastinate?
----------------------------

More and more stable. There are still a few things we would like to do before we start
advertising the project, and it's about to be used in production at our place.

We'd love if you were to try out Procrastinate in a non-production non-critical
project of yours and provide us with feedback.
Expand Down
42 changes: 21 additions & 21 deletions docs/howto/async.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,11 @@ Launch a job and/or execute it asynchronously
First, make sure you understand the implications of using the asynchronous interface
(see :ref:`discussion-async`).

In order to user the asynchronous interface, you'll need to install extra dependencies
using:

.. code-block:: console
$ pip install 'procrastinate[async]'
Then, the :py:class:`procrastinate.App` has to be configured with an
:py:class:`procrastinate.aiopg_connector.AiopgJobStore` ``job_store``::

import procrastinate
from procrastinate.aiopg_connector import AiopgJobStore

app = procrastinate.App(
job_store=AiopgJobStore(dsn="...")
)


Defer jobs asynchronously
^^^^^^^^^^^^^^^^^^^^^^^^^

If your job store is asynchronous, then instead of calling ``defer``, you'll need
to call ``defer_async``::
In order for the defer query to be executed asynchronously, instead of calling
``defer``, you'll need to call ``defer_async``::

@app.task
def my_task(a, b):
Expand All @@ -43,4 +25,22 @@ to call ``defer_async``::
Execute jobs asynchronously
^^^^^^^^^^^^^^^^^^^^^^^^^^^

This has not been implemented yet.
If your job is a coroutine, it will be awaited::

@app.task
async def my_task(a, b):
await asyncio.sleep(3)

# Tasks being async or not can be awaited asynchronously or not
await my_task.defer_async(a=1, b=2)
# or
my_task.defer(a=1, b=2)

As of today, jobs are still executed
sequentially, so if you have 100 asynchronous jobs that each take 1 second doing
asynchronous I/O, you would expect the complete queue to run in little over 1 second,
and instead it will take 100 seconds.

In the future, you will be able to process asynchronous jobs in parallel (see ticket__).

__ https://github.com/peopledoc/procrastinate/issues/106
24 changes: 5 additions & 19 deletions docs/howto/job_store_connect.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,18 @@ There are three ways you can specify the connection parameters:
.. _`libpq environment variables`: https://www.postgresql.org/docs/current/libpq-envars.html

- You can use `psycopg2 dsn`_::
- You can use `aiopg dsn`_::

import procrastinate
procrastinate.PostgresJobStore(dsn="postgres://user:password@host:port/dbname")

.. _`psycopg2 dsn`: http://initd.org/psycopg/docs/module.html#psycopg2.connect
.. _`aiopg dsn`: https://aiopg.readthedocs.io/en/stable/core.html#aiopg.connect

- You can use other `psycopg2 connection arguments`_::
- You can use other `aiopg connection arguments`_ (which are the same as
`psycopg2 connection arguments`_)::

import procrastinate
procrastinate.PostgresJobStore(user="user", password="password", host="host")

.. _`aiopg connection arguments`: https://aiopg.readthedocs.io/en/stable/core.html#aiopg.connect
.. _`psycopg2 connection arguments`: http://initd.org/psycopg/docs/module.html#psycopg2.connect


Asynchronous job store
^^^^^^^^^^^^^^^^^^^^^^

If you need to interact with procrastinate through the asynchronous interface
(see :ref:`how-to-async`), you'll need to implement a
:py:class:`procrastinate.aiopg_connector.AiopgJobStore` instead of a
:py:class:`procrastinate.PostgresJobStore`. It takes the same arguments,
and exposes the necessary async functions.

Don't forget to install the `aiopg` extra:

.. code-block:: console
$ pip install 'procrastinate[async]'
2 changes: 0 additions & 2 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ Job stores

.. autoclass:: procrastinate.PostgresJobStore

.. autoclass:: procrastinate.aiopg_connector.AiopgJobStore

.. autoclass:: procrastinate.testing.InMemoryJobStore
:members: reset

Expand Down
2 changes: 1 addition & 1 deletion procrastinate/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from procrastinate import metadata as _metadata_module
from procrastinate.aiopg_connector import PostgresJobStore
from procrastinate.app import App
from procrastinate.psycopg2_connector import PostgresJobStore
from procrastinate.retry import BaseRetryStrategy, RetryStrategy
from procrastinate.store import BaseJobStore

Expand Down
89 changes: 77 additions & 12 deletions procrastinate/aiopg_connector.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,70 @@
from typing import Any, Awaitable, Dict, Optional
import asyncio
from typing import Any, Awaitable, Dict, List, Optional

import aiopg
from psycopg2.extras import RealDictCursor
import psycopg2.sql
from psycopg2.extras import Json, RealDictCursor

from procrastinate import psycopg2_connector, store
from procrastinate import store


def get_connection(**kwargs) -> Awaitable[aiopg.Connection]:
return aiopg.connect(**kwargs)
def wrap_json(arguments: Dict[str, Any]):
return {
key: Json(value) if isinstance(value, dict) else value
for key, value in arguments.items()
}


def get_connection(dsn="", **kwargs) -> Awaitable[aiopg.Connection]:
return aiopg.connect(dsn=dsn, **kwargs)


async def execute_query(
connection: aiopg.Connection, query: str, **arguments: Any
) -> None:
# aiopg can work with psycopg2's cursor class
async with connection.cursor(cursor_factory=RealDictCursor) as cursor:
await cursor.execute(query, wrap_json(arguments))


async def execute_query_one(
connection: aiopg.Connection, query: str, **arguments: Any
) -> Dict[str, Any]:
# Strangely, aiopg can work with psycopg2's cursor class.
# aiopg can work with psycopg2's cursor class
async with connection.cursor(cursor_factory=RealDictCursor) as cursor:
await cursor.execute(query, psycopg2_connector.wrap_json(arguments))
await cursor.execute(query, wrap_json(arguments))

return await cursor.fetchone()


class AiopgJobStore(store.AsyncBaseJobStore):
async def execute_query_all(
connection: aiopg.Connection, query: str, **arguments: Any
) -> List[Dict[str, Any]]:
# aiopg can work with psycopg2's cursor class
async with connection.cursor(cursor_factory=RealDictCursor) as cursor:
await cursor.execute(query, wrap_json(arguments))

return await cursor.fetchall()


def make_dynamic_query(query: str, **identifiers: str) -> str:
return psycopg2.sql.SQL(query).format(
**{key: psycopg2.sql.Identifier(value) for key, value in identifiers.items()}
)


async def wait_for_jobs(connection: aiopg.Connection, socket_timeout: float):
try:
await asyncio.wait_for(connection.notifies.get(), timeout=socket_timeout)
except asyncio.futures.TimeoutError:
pass


def interrupt_wait(connection: aiopg.Connection):
asyncio.get_event_loop().call_soon_threadsafe(connection.notifies.put_nowait, "s")


class PostgresJobStore(store.BaseJobStore):
"""
Uses ``aiopg`` to establish an asynchronous
connection to a Postgres database.
Expand Down Expand Up @@ -52,14 +96,35 @@ async def get_connection(self):
self._connection = await get_connection(**self._connection_parameters)
return self._connection

async def close_connection(self) -> None:
if not self._connection:
return

await self._connection.close()

async def execute_query(self, query: str, **arguments: Any) -> None:
await execute_query(await self.get_connection(), query=query, **arguments)

async def execute_query_one(self, query: str, **arguments: Any) -> Dict[str, Any]:
return await execute_query_one(
await self.get_connection(), query=query, **arguments
)

def get_sync_store(self) -> store.BaseJobStore:
from procrastinate.psycopg2_connector import PostgresJobStore
async def execute_query_all(
self, query: str, **arguments: Any
) -> List[Dict[str, Any]]:
return await execute_query_all(
await self.get_connection(), query=query, **arguments
)

def make_dynamic_query(self, query: str, **identifiers: str) -> str:
return make_dynamic_query(query=query, **identifiers)

return PostgresJobStore(
socket_timeout=self.socket_timeout, **self._connection_parameters
async def wait_for_jobs(self):
return await wait_for_jobs(
connection=await self.get_connection(), socket_timeout=self.socket_timeout
)

def stop(self):
if self._connection:
interrupt_wait(connection=self._connection)
10 changes: 7 additions & 3 deletions procrastinate/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
logger = logging.getLogger(__name__)


@utils.add_sync_api
class App:
"""
The App is the main entry point for procrastinate integration.
Expand Down Expand Up @@ -173,7 +174,7 @@ def perform_import_paths(self):
extra={"action": "imported_tasks", "tasks": list(self.tasks)},
)

def run_worker(
async def run_worker_async(
self, queues: Optional[Iterable[str]] = None, only_once: bool = False
) -> None:
"""
Expand All @@ -195,12 +196,15 @@ def run_worker(
worker = self._worker(queues=queues)
if only_once:
try:
worker.process_jobs_once()
await worker.process_jobs_once()
except (exceptions.NoMoreJobs, exceptions.StopRequested):
pass
else:
worker.run()
await worker.run()

@property
def migrator(self) -> migration.Migrator:
return migration.Migrator(job_store=self.job_store)

async def close_connection_async(self):
await self.job_store.close_connection()
4 changes: 2 additions & 2 deletions procrastinate/builtin_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from procrastinate import store


def remove_old_jobs(
async def remove_old_jobs(
job_store: store.BaseJobStore,
max_hours: int,
queue: Optional[str] = None,
Expand All @@ -25,7 +25,7 @@ def remove_old_jobs(
By default only successful jobs will be removed. When this parameter is True
failed jobs will also be deleted.
"""
job_store.delete_old_jobs(
await job_store.delete_old_jobs(
nb_hours=max_hours, queue=queue, include_error=remove_error
)

Expand Down
Loading

0 comments on commit b9c983e

Please sign in to comment.