Skip to content

Commit

Permalink
Merge pull request #147 from peopledoc/composition-postgres-142
Browse files Browse the repository at this point in the history
Composition postgres 142
  • Loading branch information
Joachim Jablon authored Feb 14, 2020
2 parents 61396c7 + 2d3b455 commit acf002d
Show file tree
Hide file tree
Showing 37 changed files with 567 additions and 537 deletions.
2 changes: 1 addition & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ Here's an example
# mycode.py
# Make an app in your code
app = procrastinate.App(job_store=procrastinate.PostgresJobStore())
app = procrastinate.App(connector=procrastinate.PostgresConnector())
# Then define tasks
@app.task(queue="sums")
Expand Down
2 changes: 1 addition & 1 deletion docs/howto/app.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@ In the first two cases, it's important to specify the location to your tasks usi
import procrastinate

app = procrastinate.App(
job_store=job_store,
connector=connector,
import_paths=["dotted.path.to", "all.the.modules", "that.define.tasks"]
)
4 changes: 2 additions & 2 deletions docs/howto/custom_json_encoder_decoder.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Here is an example involving serializing/deserializing ``datetime`` objects::
import functools
import json

from procrastinate import App, PostgresJobStore
from procrastinate import App, PostgresConnector

# Function used for encoding datetime objects
def encode(obj):
Expand All @@ -34,7 +34,7 @@ Here is an example involving serializing/deserializing ``datetime`` objects::
json_dumps = functools.partial(json.dumps, default=encode)
json_loads = functools.partial(json.loads, object_hook=decode)

app = App(job_store=PostgresJobStore(json_dumps=json_dumps, json_loads=json_loads))
app = App(connector=PostgresConnector(json_dumps=json_dumps, json_loads=json_loads))

In this example the custom JSON dumps and loads functions are based on the standard
``json`` module's ``dumps`` and ``loads`` functions, with specific ``default`` and
Expand Down
6 changes: 3 additions & 3 deletions docs/howto/job_store_connect.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,22 +13,22 @@ There are three ways you can specify the connection parameters:
and then just define::
import procrastinate
procrastinate.PostgresJobStore()
procrastinate.PostgresConnector()
.. _`libpq environment variables`: https://www.postgresql.org/docs/current/libpq-envars.html

- You can use `aiopg dsn`_::

import procrastinate
procrastinate.PostgresJobStore(dsn="postgres://user:password@host:port/dbname")
procrastinate.PostgresConnector(dsn="postgres://user:password@host:port/dbname")

.. _`aiopg dsn`: https://aiopg.readthedocs.io/en/stable/core.html#aiopg.connect

- You can use other `aiopg connection arguments`_ (which are the same as
`psycopg2 connection arguments`_)::

import procrastinate
procrastinate.PostgresJobStore(user="user", password="password", host="host")
procrastinate.PostgresConnector(user="user", password="password", host="host")

.. _`aiopg connection arguments`: https://aiopg.readthedocs.io/en/stable/core.html#aiopg.connect
.. _`psycopg2 connection arguments`: http://initd.org/psycopg/docs/module.html#psycopg2.connect
8 changes: 4 additions & 4 deletions docs/howto/testing.rst
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
Test your code that uses Procrastinate
--------------------------------------

Procrastinate defines an `InMemoryJobStore` that will speed up your tests,
Procrastinate defines an `InMemoryConnector` that will speed up your tests,
remove dependency to PostgreSQL and allow you to have tasks run in a
controlled way.

To use it, you can do::

app = procrastinate.App(job_store=procrastinate.testing.InMemoryJobStore())
app = procrastinate.App(connector=procrastinate.testing.InMemoryConnector())

# Run the jobs your tests created, then stop
# the worker:
app.run_worker(only_once=True)

# See the jobs created:
print(app.job_store.jobs)
print(app.connector.jobs)

# Reset the store between tests:
app.job_store.reset()
app.connector.reset()
8 changes: 4 additions & 4 deletions docs/quickstart.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ Create a Procrastinate application object

We'll do this in a single file. Start an empty file named ``tutorial.py``::

from procrastinate import App, PostgresJobStore
from procrastinate import App, PostgresConnector

app = App(job_store=PostgresJobStore(host="localhost", user="postgres"))
app = App(connector=PostgresConnector(host="localhost", user="postgres"))

The application will be the entry point for both:

Expand Down Expand Up @@ -158,9 +158,9 @@ Your final file
import random
import sys

from procrastinate import App, PostgresJobStore
from procrastinate import App, PostgresConnector

app = App(job_store=PostgresJobStore(host="localhost", user="postgres"))
app = App(connector=PostgresConnector(host="localhost", user="postgres"))

@app.task
def sum(a, b):
Expand Down
4 changes: 2 additions & 2 deletions docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ are accessible through :py:attr:`App.builtin_tasks`:
Job stores
----------

.. autoclass:: procrastinate.PostgresJobStore
.. autoclass:: procrastinate.PostgresConnector

.. autoclass:: procrastinate.testing.InMemoryJobStore
.. autoclass:: procrastinate.testing.InMemoryConnector
:members: reset

Tasks
Expand Down
5 changes: 2 additions & 3 deletions procrastinate/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
from procrastinate import metadata as _metadata_module
from procrastinate.aiopg_connector import PostgresJobStore
from procrastinate.aiopg_connector import PostgresConnector, PostgresJobStore
from procrastinate.app import App
from procrastinate.retry import BaseRetryStrategy, RetryStrategy
from procrastinate.store import BaseJobStore

__all__ = [
"App",
"BaseJobStore",
"BaseRetryStrategy",
"PostgresConnector",
"PostgresJobStore",
"RetryStrategy",
]
Expand Down
203 changes: 81 additions & 122 deletions procrastinate/aiopg_connector.py
Original file line number Diff line number Diff line change
@@ -1,106 +1,25 @@
import asyncio
import logging
import warnings
from typing import Any, Callable, Dict, List, Optional

import aiopg
import psycopg2.sql
from psycopg2.extras import Json, RealDictCursor

from procrastinate import store
from procrastinate import connector

logger = logging.getLogger(__name__)

def wrap_json(arguments: Dict[str, Any], json_dumps: Optional[Callable]):
return {
key: Json(value, dumps=json_dumps) if isinstance(value, dict) else value
for key, value in arguments.items()
}


async def get_connection(
dsn="", json_loads: Optional[Callable] = None, **kwargs
) -> aiopg.Connection:
# tell aiopg not to register adapters for hstore & json by default, as
# those are registered at the module level and could overwrite previously
# defined adapters
kwargs.setdefault("enable_json", False)
kwargs.setdefault("enable_hstore", False)
kwargs.setdefault("enable_uuid", False)
conn = await aiopg.connect(dsn=dsn, **kwargs)
if json_loads:
psycopg2.extras.register_default_jsonb(conn.raw, loads=json_loads)
return conn


def connection_is_open(connection: aiopg.Connection) -> bool:
return not connection.closed


async def execute_query(
connection: aiopg.Connection,
query: str,
json_dumps: Optional[Callable],
**arguments: Any
) -> None:
# aiopg can work with psycopg2's cursor class
async with connection.cursor(cursor_factory=RealDictCursor) as cursor:
await cursor.execute(query, wrap_json(arguments, json_dumps))


async def execute_query_one(
connection: aiopg.Connection,
query: str,
json_dumps: Optional[Callable],
**arguments: Any
) -> Dict[str, Any]:
# aiopg can work with psycopg2's cursor class
async with connection.cursor(cursor_factory=RealDictCursor) as cursor:
await cursor.execute(query, wrap_json(arguments, json_dumps))

return await cursor.fetchone()


async def execute_query_all(
connection: aiopg.Connection,
query: str,
json_dumps: Optional[Callable],
**arguments: Any
) -> List[Dict[str, Any]]:
# aiopg can work with psycopg2's cursor class
async with connection.cursor(cursor_factory=RealDictCursor) as cursor:
await cursor.execute(query, wrap_json(arguments, json_dumps))

return await cursor.fetchall()


def make_dynamic_query(query: str, **identifiers: str) -> str:
return psycopg2.sql.SQL(query).format(
**{key: psycopg2.sql.Identifier(value) for key, value in identifiers.items()}
)


async def wait_for_jobs(connection: aiopg.Connection, socket_timeout: float):
try:
await asyncio.wait_for(connection.notifies.get(), timeout=socket_timeout)
except asyncio.TimeoutError:
pass


def interrupt_wait(connection: aiopg.Connection):
asyncio.get_event_loop().call_soon_threadsafe(connection.notifies.put_nowait, "s")


class PostgresJobStore(store.BaseJobStore):
"""
Uses ``aiopg`` to establish an asynchronous
connection to a PostgreSQL database.
"""

class PostgresConnector(connector.BaseConnector):
def __init__(
self,
*,
socket_timeout: float = store.SOCKET_TIMEOUT,
dsn="",
socket_timeout: float = connector.SOCKET_TIMEOUT,
json_dumps: Optional[Callable] = None,
json_loads: Optional[Callable] = None,
**kwargs: Any
**kwargs: Any,
):
"""
All parameters except ``socket_timeout``, ``json_dumps`` and ``json_loads``
Expand All @@ -125,60 +44,100 @@ def __init__(
to the function used by psycopg2. See the `psycopg2 doc`_.
"""

kwargs["dsn"] = dsn
self._connection_parameters = kwargs
self._connection: Optional[aiopg.Connection] = None
self.socket_timeout = socket_timeout
self.json_dumps = json_dumps
self.json_loads = json_loads

async def get_connection(self):
if not self._connection or not connection_is_open(self._connection):
self._connection = await get_connection(
json_loads=self.json_loads, **self._connection_parameters
)
return self._connection

async def close_connection(self) -> None:
if not self._connection:
return

await self._connection.close()

def _wrap_json(self, arguments: Dict[str, Any]):
return {
key: Json(value, dumps=self.json_dumps)
if isinstance(value, dict)
else value
for key, value in arguments.items()
}

async def _get_connection(self) -> aiopg.Connection:
if self._connection and not self._connection.closed:
return self._connection

# tell aiopg not to register adapters for hstore & json by default, as
# those are registered at the module level and could overwrite previously
# defined adapters
kwargs = self._connection_parameters
kwargs.setdefault("enable_json", False)
kwargs.setdefault("enable_hstore", False)
kwargs.setdefault("enable_uuid", False)
conn = await aiopg.connect(**kwargs)
if self.json_loads:
psycopg2.extras.register_default_jsonb(conn.raw, loads=self.json_loads)
self._connection = conn
return conn

async def execute_query(self, query: str, **arguments: Any) -> None:
await execute_query(
await self.get_connection(),
query=query,
json_dumps=self.json_dumps,
**arguments
)
connection = await self._get_connection()
# aiopg can work with psycopg2's cursor class
async with connection.cursor(cursor_factory=RealDictCursor) as cursor:
await cursor.execute(query, self._wrap_json(arguments))

async def execute_query_one(self, query: str, **arguments: Any) -> Dict[str, Any]:
return await execute_query_one(
await self.get_connection(),
query=query,
json_dumps=self.json_dumps,
**arguments
)
connection = await self._get_connection()
# aiopg can work with psycopg2's cursor class
async with connection.cursor(cursor_factory=RealDictCursor) as cursor:
await cursor.execute(query, self._wrap_json(arguments))

return await cursor.fetchone()

async def execute_query_all(
self, query: str, **arguments: Any
) -> List[Dict[str, Any]]:
return await execute_query_all(
await self.get_connection(),
query=query,
json_dumps=self.json_dumps,
**arguments
)
connection = await self._get_connection()
# aiopg can work with psycopg2's cursor class
async with connection.cursor(cursor_factory=RealDictCursor) as cursor:
await cursor.execute(query, self._wrap_json(arguments))

return await cursor.fetchall()

def make_dynamic_query(self, query: str, **identifiers: str) -> str:
return make_dynamic_query(query=query, **identifiers)
return psycopg2.sql.SQL(query).format(
**{
key: psycopg2.sql.Identifier(value)
for key, value in identifiers.items()
}
)

async def wait_for_activity(self):
connection = await self._get_connection()
try:
await asyncio.wait_for(
connection.notifies.get(), timeout=self.socket_timeout
)
except asyncio.TimeoutError:
pass

async def wait_for_jobs(self):
return await wait_for_jobs(
connection=await self.get_connection(), socket_timeout=self.socket_timeout
# This can be called from a signal handler, better not do async stuff
def interrupt_wait(self):
if not self._connection or self._connection.closed:
return
asyncio.get_event_loop().call_soon_threadsafe(
self._connection.notifies.put_nowait, "s"
)

def stop(self):
if self._connection:
interrupt_wait(connection=self._connection)

class PostgresJobStore(PostgresConnector):
def __init__(self, *args, **kwargs):
message = (
"Use procrastinate.PostgresConnector(...) "
"instead of procrastinate.PostgresJobStore(...), with the same arguments"
)
logger.warn(f"Deprecation Warning: {message}")
warnings.warn(DeprecationWarning(message))
super().__init__(*args, **kwargs)
Loading

0 comments on commit acf002d

Please sign in to comment.