Skip to content

Commit

Permalink
Changed our mind: removed "real_sync_defer"
Browse files Browse the repository at this point in the history
  • Loading branch information
Joachim Jablon authored and Éric Lemoine committed Jun 16, 2020
1 parent bcab78b commit d812171
Show file tree
Hide file tree
Showing 9 changed files with 30 additions and 144 deletions.
10 changes: 4 additions & 6 deletions docs/discussions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,13 @@ Procrastinate supports two ways of doing synchronous I/O:
- "classic" synchronous I/O (using synchronous database drivers such as ``Psycopg2``).
This mode is necessary in multi-threaded cases.
- "mixed" I/O (synchronously launching an event loop, and have asynchronous coroutine
run under the hood).
This mode will use less connections when used within another job.
run under the hood). This mode is adapted for synchronously defering jobs from other
job, from within the workers.

If you use an `AiopgConnector`, then, by default, you will use the "mixed" mode. You can
request the ``AiopgConnector`` to use the "classic" mode by passing
``real_sync_defer=True`` when creating the ``AiopgConnector``. You can also have the
If you use an `AiopgConnector`, then you will use the "mixed" mode. You can have the
classic mode by using a `Psycopg2Connector` as your App's connector. In that case, you
will be restricted to a few operations, including deferring tasks and applying the
schema. This is recommended for synchronous multi-threaded apps that defer jobs.
schema. This is recommended for synchronous multi-threaded apps only that defer jobs.

See `howto/sync_defer`.

Expand Down
48 changes: 21 additions & 27 deletions docs/howto/sync_defer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,42 @@ Control the way synchronous calls to defer are handled

In some cases, usually linked to multi-threading (see `discussion-sync-defer`), you may
want to defer tasks purely synchronously (what is called "classic" synchronous I/O).
There are two ways to achieve this:

``real_sync_defer=True``
------------------------
``Psycopg2Connector``
---------------------

By building your `AiopgConnector` with ``real_sync_defer=True``, defer operations will
automatically be handled by a synchronous connector, based on ``psycopg2``::
By setting your `App`'s connector to an instance of `Psycopg2Connector`, you will
get "classic" synchronous I/O. Note that in this case, some ``App`` features will be
unavailable, such as the ``admin`` and ``worker`` sub-commands::

import procrastinate

app = procrastinate.App(
connector=procrastinate.AiopgConnector(
connector=procrastinate.Psycopg2Connector(
host="somehost",
real_sync_defer=True
),
)

``Psycopg2Connector``
---------------------

By setting your `App`'s connector to an instance of `Psycopg2Connector`, you will also
get "classic" synchronous I/O. Note that in this case, some ``App`` features will be
unavailable, such as the ``admin`` and ``worker`` sub-commands.

It is perfectly fine to give the App either kind of connectors depending on the
situation.
situation::

How does it work?
-----------------
import sys, procrastinate

In both cases, the synchronous connector will use a
``psycopg2.pool.ThreadedConnectionPool`` (see psycopg2 documentation__), which should
fit most workflows.
# This is an example condition, you'll need to check that it works in your case
if sys.argv[0:2] == ["procrastinate", "worker"]:
connector_class = procrastinate.AiopgConnector
else:
connector_class = procrastinate.Psycopg2Connector

.. __: https://www.psycopg.org/docs/pool.html#psycopg2.pool.ThreadedConnectionPool
app = procrastinate.App(
connector=connector_class(host="somehost"),
)


.. note::
How does it work?
-----------------

If you use ``real_sync_defer``, the ``ThreadedConnectionPool`` will be instantiated
mostly with the parameters passed to the `AiopgConnector`. That being said, there
are small differences here and there between the two parameter sets. If you find a
parameter that is not well supported, feel free to warn us through an issue__.
The synchronous connector will use a ``psycopg2.pool.ThreadedConnectionPool`` (see
psycopg2 documentation__), which should fit most workflows.

.. __: https://github.com/peopledoc/procrastinate/issues
.. __: https://www.psycopg.org/docs/pool.html#psycopg2.pool.ThreadedConnectionPool
36 changes: 1 addition & 35 deletions procrastinate/aiopg_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import psycopg2.sql
from psycopg2.extras import Json, RealDictCursor

from procrastinate import connector, exceptions, psycopg2_connector, sql
from procrastinate import connector, exceptions, sql

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -45,7 +45,6 @@ def __init__(
*,
json_dumps: Optional[Callable] = None,
json_loads: Optional[Callable] = None,
real_sync_defer: bool = False,
**kwargs: Any,
):
"""
Expand All @@ -70,13 +69,6 @@ def __init__(
to the function used by psycopg2. See the `psycopg2 doc`_. Unused if the
pool is externally created and set into the connector through the
`AiopgConnector.set_pool` method.
real_sync_defer :
If a synchronous call to a defer operation is issued, whether to call a
really synchronous psycopg2 implementation (``True``) which will use its own
connection pool, or a synchronous wrapper around this asynchronous
connector, which may not play as nicely with multi-threaded programs but
will use connections from this connector's pool (``False``)(see
`discussion-sync-defer`).
dsn : ``Optional[str]``
Passed to aiopg. Default is "" instead of None, which means if no argument
is passed, it will connect to localhost:5432 instead of a Unix-domain
Expand Down Expand Up @@ -107,8 +99,6 @@ def __init__(
self.json_loads = json_loads
self._pool_args = self._adapt_pool_args(kwargs, json_loads)
self._lock: Optional[asyncio.Lock] = None
self._sync_connector: Optional[psycopg2_connector.Psycopg2Connector] = None
self.real_sync_defer = real_sync_defer

@staticmethod
def _adapt_pool_args(
Expand Down Expand Up @@ -149,9 +139,6 @@ async def close_async(self) -> None:
self._pool.close()
await self._pool.wait_closed()
self._pool = None
if self._sync_connector:
# This is not an async call but hopefully at this point, it's ok.
self._sync_connector.close()

def _wrap_json(self, arguments: Dict[str, Any]):
return {
Expand Down Expand Up @@ -272,24 +259,3 @@ async def _loop_notify(
continue

event.set()

def get_sync_connector(self) -> connector.BaseConnector:
# Warning: this can do synchronous I/O work,
# only call in synchronous contexts.
if not self.real_sync_defer:
return super().get_sync_connector()

if self._sync_connector:
return self._sync_connector

pool_args = self._pool_args.copy()
pool_args["minconn"] = pool_args.pop("minsize", 1)
pool_args["maxconn"] = pool_args.pop("maxsize", 10)
pool_args.pop("enable_json", None)
pool_args.pop("enable_hstore", None)
pool_args.pop("enable_uuid", None)
pool_args.pop("on_connect", None)
self._sync_connector = psycopg2_connector.Psycopg2Connector(
json_dumps=self.json_dumps, json_loads=self.json_loads, **pool_args
)
return self._sync_connector
3 changes: 0 additions & 3 deletions procrastinate/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ def execute_query_one(self, query: str, **arguments: Any) -> Dict[str, Any]:
def execute_query_all(self, query: str, **arguments: Any) -> List[Dict[str, Any]]:
raise NotImplementedError

def get_sync_connector(self) -> "BaseConnector":
return self

async def close_async(self) -> None:
raise exceptions.SyncConnectorConfigurationError

Expand Down
4 changes: 1 addition & 3 deletions procrastinate/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ async def defer_job_async(self, job: jobs.Job) -> int:
return result["id"]

def defer_job(self, job: jobs.Job) -> int:
connector = self.connector.get_sync_connector()

try:
result = connector.execute_query_one(
result = self.connector.execute_query_one(
**self._defer_job_query_kwargs(job=job)
)
except exceptions.UniqueViolation as exc:
Expand Down
5 changes: 0 additions & 5 deletions procrastinate_demo/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,6 @@
connector=procrastinate.AiopgConnector(), import_paths=import_paths
)

real_sync_app = procrastinate.App(
connector=procrastinate.AiopgConnector(real_sync_defer=True),
import_paths=import_paths,
)

sync_app = procrastinate.App(
connector=procrastinate.Psycopg2Connector(), import_paths=import_paths
)
8 changes: 1 addition & 7 deletions tests/acceptance/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,7 @@ def decode(dct):
connector=procrastinate.AiopgConnector(json_dumps=json_dumps, json_loads=json_loads)
)

real_sync_app = procrastinate.App(
connector=procrastinate.AiopgConnector(
json_dumps=json_dumps, json_loads=json_loads, real_sync_defer=True
)
)

psycopg2_app = procrastinate.App(
sync_app = procrastinate.App(
connector=procrastinate.Psycopg2Connector(
json_dumps=json_dumps, json_loads=json_loads
)
Expand Down
5 changes: 1 addition & 4 deletions tests/acceptance/test_nominal.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,10 +137,7 @@ def test_queueing_lock(defer, running_worker):
assert excinfo.value.returncode == 1

with pytest.raises(subprocess.CalledProcessError) as excinfo:
defer("sometask", ["--queueing-lock", "a"], app="real_sync_app")

with pytest.raises(subprocess.CalledProcessError) as excinfo:
defer("sometask", ["--queueing-lock", "a"], app="psycopg2_app")
defer("sometask", ["--queueing-lock", "a"], app="sync_app")

# This one doesn't raise
defer("sometask", ["--queueing-lock", "a", "--ignore-already-enqueued"])
55 changes: 1 addition & 54 deletions tests/integration/test_aiopg_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import attr
import pytest

from procrastinate import aiopg_connector, psycopg2_connector
from procrastinate import aiopg_connector

pytestmark = pytest.mark.asyncio

Expand Down Expand Up @@ -208,56 +208,3 @@ async def test_loop_notify_timeout(aiopg_connector):
pytest.fail("Failed to detect that connection was closed and stop")

assert not event.is_set()


async def test_get_sync_connector_mixed(aiopg_connector_factory):
connector = aiopg_connector_factory(real_sync_defer=False)

assert connector.get_sync_connector() is connector


async def test_get_sync_connector_classic(aiopg_connector_factory):
def json_loads():
pass

def json_dumps():
pass

connector = aiopg_connector_factory(
real_sync_defer=True,
minsize=2,
maxsize=4,
enable_json=False,
enable_hstore=False,
enable_uuid=False,
on_connect="something",
json_loads=json_loads,
json_dumps=json_dumps,
)

sync_connector = connector.get_sync_connector()

assert isinstance(sync_connector, psycopg2_connector.Psycopg2Connector)
assert sync_connector.json_loads is json_loads
assert sync_connector.json_dumps is json_dumps
assert sync_connector._pool.minconn == 2
assert sync_connector._pool.maxconn == 4


async def test_get_sync_connector_classic_twice(aiopg_connector_factory):
connector = aiopg_connector_factory(real_sync_defer=True)

sync_connector1 = connector.get_sync_connector()
sync_connector2 = connector.get_sync_connector()

assert sync_connector1 is sync_connector2


async def test_get_sync_connector_classic_close(aiopg_connector_factory):
connector = aiopg_connector_factory(real_sync_defer=True)
sync_connector = connector.get_sync_connector()
assert not sync_connector._pool.closed

await connector.close_async()

assert sync_connector._pool.closed

0 comments on commit d812171

Please sign in to comment.