diff --git a/docs/discussions.rst b/docs/discussions.rst index bf6df4490..28dbcc54f 100644 --- a/docs/discussions.rst +++ b/docs/discussions.rst @@ -172,15 +172,13 @@ Procrastinate supports two ways of doing synchronous I/O: - "classic" synchronous I/O (using synchronous database drivers such as ``Psycopg2``). This mode is necessary in multi-threaded cases. - "mixed" I/O (synchronously launching an event loop, and have asynchronous coroutine - run under the hood). - This mode will use less connections when used within another job. + run under the hood). This mode is adapted for synchronously defering jobs from other + job, from within the workers. -If you use an `AiopgConnector`, then, by default, you will use the "mixed" mode. You can -request the ``AiopgConnector`` to use the "classic" mode by passing -``real_sync_defer=True`` when creating the ``AiopgConnector``. You can also have the +If you use an `AiopgConnector`, then you will use the "mixed" mode. You can have the classic mode by using a `Psycopg2Connector` as your App's connector. In that case, you will be restricted to a few operations, including deferring tasks and applying the -schema. This is recommended for synchronous multi-threaded apps that defer jobs. +schema. This is recommended for synchronous multi-threaded apps only that defer jobs. See `howto/sync_defer`. diff --git a/docs/howto/sync_defer.rst b/docs/howto/sync_defer.rst index 6b3e77b54..152e7c78b 100644 --- a/docs/howto/sync_defer.rst +++ b/docs/howto/sync_defer.rst @@ -3,48 +3,42 @@ Control the way synchronous calls to defer are handled In some cases, usually linked to multi-threading (see `discussion-sync-defer`), you may want to defer tasks purely synchronously (what is called "classic" synchronous I/O). -There are two ways to achieve this: -``real_sync_defer=True`` ------------------------- +``Psycopg2Connector`` +--------------------- -By building your `AiopgConnector` with ``real_sync_defer=True``, defer operations will -automatically be handled by a synchronous connector, based on ``psycopg2``:: +By setting your `App`'s connector to an instance of `Psycopg2Connector`, you will +get "classic" synchronous I/O. Note that in this case, some ``App`` features will be +unavailable, such as the ``admin`` and ``worker`` sub-commands:: import procrastinate app = procrastinate.App( - connector=procrastinate.AiopgConnector( + connector=procrastinate.Psycopg2Connector( host="somehost", - real_sync_defer=True ), ) -``Psycopg2Connector`` ---------------------- - -By setting your `App`'s connector to an instance of `Psycopg2Connector`, you will also -get "classic" synchronous I/O. Note that in this case, some ``App`` features will be -unavailable, such as the ``admin`` and ``worker`` sub-commands. - It is perfectly fine to give the App either kind of connectors depending on the -situation. +situation:: -How does it work? ------------------ + import sys, procrastinate -In both cases, the synchronous connector will use a -``psycopg2.pool.ThreadedConnectionPool`` (see psycopg2 documentation__), which should -fit most workflows. + # This is an example condition, you'll need to check that it works in your case + if sys.argv[0:2] == ["procrastinate", "worker"]: + connector_class = procrastinate.AiopgConnector + else: + connector_class = procrastinate.Psycopg2Connector -.. __: https://www.psycopg.org/docs/pool.html#psycopg2.pool.ThreadedConnectionPool + app = procrastinate.App( + connector=connector_class(host="somehost"), + ) -.. note:: +How does it work? +----------------- - If you use ``real_sync_defer``, the ``ThreadedConnectionPool`` will be instantiated - mostly with the parameters passed to the `AiopgConnector`. That being said, there - are small differences here and there between the two parameter sets. If you find a - parameter that is not well supported, feel free to warn us through an issue__. +The synchronous connector will use a ``psycopg2.pool.ThreadedConnectionPool`` (see +psycopg2 documentation__), which should fit most workflows. -.. __: https://github.com/peopledoc/procrastinate/issues +.. __: https://www.psycopg.org/docs/pool.html#psycopg2.pool.ThreadedConnectionPool diff --git a/procrastinate/aiopg_connector.py b/procrastinate/aiopg_connector.py index e322f0120..482cdb807 100644 --- a/procrastinate/aiopg_connector.py +++ b/procrastinate/aiopg_connector.py @@ -45,7 +45,6 @@ def __init__( *, json_dumps: Optional[Callable] = None, json_loads: Optional[Callable] = None, - real_sync_defer: bool = False, **kwargs: Any, ): """ @@ -70,13 +69,6 @@ def __init__( to the function used by psycopg2. See the `psycopg2 doc`_. Unused if the pool is externally created and set into the connector through the `AiopgConnector.set_pool` method. - real_sync_defer : - If a synchronous call to a defer operation is issued, whether to call a - really synchronous psycopg2 implementation (``True``) which will use its own - connection pool, or a synchronous wrapper around this asynchronous - connector, which may not play as nicely with multi-threaded programs but - will use connections from this connector's pool (``False``)(see - `discussion-sync-defer`). dsn : ``Optional[str]`` Passed to aiopg. Default is "" instead of None, which means if no argument is passed, it will connect to localhost:5432 instead of a Unix-domain @@ -107,8 +99,6 @@ def __init__( self.json_loads = json_loads self._pool_args = self._adapt_pool_args(kwargs, json_loads) self._lock: Optional[asyncio.Lock] = None - self._sync_connector: Optional[psycopg2_connector.Psycopg2Connector] = None - self.real_sync_defer = real_sync_defer @staticmethod def _adapt_pool_args( @@ -149,9 +139,6 @@ async def close_async(self) -> None: self._pool.close() await self._pool.wait_closed() self._pool = None - if self._sync_connector: - # This is not an async call but hopefully at this point, it's ok. - self._sync_connector.close() def _wrap_json(self, arguments: Dict[str, Any]): return { @@ -272,24 +259,3 @@ async def _loop_notify( continue event.set() - - def get_sync_connector(self) -> connector.BaseConnector: - # Warning: this can do synchronous I/O work, - # only call in synchronous contexts. - if not self.real_sync_defer: - return super().get_sync_connector() - - if self._sync_connector: - return self._sync_connector - - pool_args = self._pool_args.copy() - pool_args["minconn"] = pool_args.pop("minsize", 1) - pool_args["maxconn"] = pool_args.pop("maxsize", 10) - pool_args.pop("enable_json", None) - pool_args.pop("enable_hstore", None) - pool_args.pop("enable_uuid", None) - pool_args.pop("on_connect", None) - self._sync_connector = psycopg2_connector.Psycopg2Connector( - json_dumps=self.json_dumps, json_loads=self.json_loads, **pool_args - ) - return self._sync_connector diff --git a/procrastinate/connector.py b/procrastinate/connector.py index 74d0c3216..fb5806e89 100644 --- a/procrastinate/connector.py +++ b/procrastinate/connector.py @@ -22,9 +22,6 @@ def execute_query_one(self, query: str, **arguments: Any) -> Dict[str, Any]: def execute_query_all(self, query: str, **arguments: Any) -> List[Dict[str, Any]]: raise NotImplementedError - def get_sync_connector(self) -> "BaseConnector": - return self - async def close_async(self) -> None: raise exceptions.SyncConnectorConfigurationError diff --git a/procrastinate/store.py b/procrastinate/store.py index 54a60b997..b7e2d10f3 100644 --- a/procrastinate/store.py +++ b/procrastinate/store.py @@ -31,10 +31,8 @@ async def defer_job_async(self, job: jobs.Job) -> int: return result["id"] def defer_job(self, job: jobs.Job) -> int: - connector = self.connector.get_sync_connector() - try: - result = connector.execute_query_one( + result = self.connector.execute_query_one( **self._defer_job_query_kwargs(job=job) ) except exceptions.UniqueViolation as exc: diff --git a/procrastinate_demo/app.py b/procrastinate_demo/app.py index e5395b982..3fbb559e9 100644 --- a/procrastinate_demo/app.py +++ b/procrastinate_demo/app.py @@ -6,11 +6,6 @@ connector=procrastinate.AiopgConnector(), import_paths=import_paths ) -real_sync_app = procrastinate.App( - connector=procrastinate.AiopgConnector(real_sync_defer=True), - import_paths=import_paths, -) - sync_app = procrastinate.App( connector=procrastinate.Psycopg2Connector(), import_paths=import_paths ) diff --git a/tests/acceptance/app.py b/tests/acceptance/app.py index 79d49d3dc..c474396c8 100644 --- a/tests/acceptance/app.py +++ b/tests/acceptance/app.py @@ -28,13 +28,7 @@ def decode(dct): connector=procrastinate.AiopgConnector(json_dumps=json_dumps, json_loads=json_loads) ) -real_sync_app = procrastinate.App( - connector=procrastinate.AiopgConnector( - json_dumps=json_dumps, json_loads=json_loads, real_sync_defer=True - ) -) - -psycopg2_app = procrastinate.App( +sync_app = procrastinate.App( connector=procrastinate.Psycopg2Connector( json_dumps=json_dumps, json_loads=json_loads ) diff --git a/tests/acceptance/test_nominal.py b/tests/acceptance/test_nominal.py index 9064efc55..6907b1a50 100644 --- a/tests/acceptance/test_nominal.py +++ b/tests/acceptance/test_nominal.py @@ -137,10 +137,7 @@ def test_queueing_lock(defer, running_worker): assert excinfo.value.returncode == 1 with pytest.raises(subprocess.CalledProcessError) as excinfo: - defer("sometask", ["--queueing-lock", "a"], app="real_sync_app") - - with pytest.raises(subprocess.CalledProcessError) as excinfo: - defer("sometask", ["--queueing-lock", "a"], app="psycopg2_app") + defer("sometask", ["--queueing-lock", "a"], app="sync_app") # This one doesn't raise defer("sometask", ["--queueing-lock", "a", "--ignore-already-enqueued"])