Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Abort job polling interval #1192

Merged
merged 1 commit into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions docs/discussions.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,26 +199,30 @@ Having sub-workers wait for an available connection in the pool is suboptimal. Y
resources will be better used with fewer sub-workers or a larger pool, but there are
many factors to take into account when [sizing your pool](https://wiki.postgresql.org/wiki/Number_Of_Database_Connections).

### How the `polling_interval` works
### How polling works

#### `fetch_job_polling_interval`

Even when the database doesn't notify workers regarding newly deferred jobs, each worker still poll the database every now and then, just in case.
There could be previously locked jobs that are now free, or scheduled jobs that have
reached the ETA. `polling_interval` is the {py:meth}`App.run_worker` parameter (or the
reached the ETA. `fetch_job_polling_interval` is the {py:meth}`App.run_worker` parameter (or the
equivalent CLI flag) that sizes this "every now and then".

A worker will keep fetching new jobs as long as they have capacity to process them.
The polling interval starts from the moment the last attempt to fetch a new job yields no result.

The `polling_interval` also defines how often the worker will poll the database for jobs to abort.
When `listen_notify=True`, the worker will likely be notified "instantly" of each abort request prior to polling the database.
:::{note}
The polling interval was previously called `timeout` in pre-v3 versions of Procrastinate. It was renamed to `fetch_job_polling_interval` for clarity.
:::

However, in the event `listen_notify=False` or if the abort notification was missed, `polling_interval` will represent the maximum delay before the worker reacts to an abort request.
#### `abort_job_polling_interval`

Note that the worker will not poll the database for jobs to be aborted if it is idle (i.e. it has no running job).
Another polling interval is the `abort_job_polling_interval`. It defines how often the worker will poll the database for jobs to abort.
When `listen_notify=True`, the worker will likely be notified "instantly" of each abort request prior to polling the database.

:::{note}
The polling interval was previously called `timeout` in pre-v3 versions of Procrastinate. It was renamed to `polling_interval` for clarity.
:::
However, when `listen_notify=False` or the abort notification was missed, `abort_job_polling_interval` will represent the maximum delay before the worker reacts to an abort request.

Note that the worker will only poll the database for abort requests when at least one job is running.

## Procrastinate's usage of PostgreSQL functions and procedures

Expand Down
2 changes: 1 addition & 1 deletion docs/howto/advanced/cancellation.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ await app.job_manager.cancel_job_by_id_async(33, abort=True)

Behind the scenes, the worker receives a Postgres notification every time a job is requested to abort, (unless `listen_notify=False`).

The worker also polls (respecting `polling_interval`) the database for abortion requests, as long as the worker is running at least one job (in the absence of running job, there is nothing to abort).
The worker also polls (respecting `fetch_job_polling_interval`) the database for abortion requests, as long as the worker is running at least one job (in the absence of running job, there is nothing to abort).

:::{note}
When a job is requested to abort and that job fails, it will not be retried (regardless of the retry strategy).
Expand Down
19 changes: 13 additions & 6 deletions procrastinate/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ class WorkerOptions(TypedDict):
name: NotRequired[str]
concurrency: NotRequired[int]
wait: NotRequired[bool]
polling_interval: NotRequired[float]
fetch_job_polling_interval: NotRequired[float]
abort_job_polling_interval: NotRequired[float]
shutdown_timeout: NotRequired[float]
listen_notify: NotRequired[bool]
delete_jobs: NotRequired[str | jobs.DeleteJobCondition]
Expand Down Expand Up @@ -270,13 +271,19 @@ async def run_worker_async(self, **kwargs: Unpack[WorkerOptions]) -> None:
Name of the worker. Will be passed in the `JobContext` and used in the
logs (defaults to ``None`` which will result in the worker named
``worker``).
polling_interval : ``float``
fetch_job_polling_interval : ``float``
Maximum time (in seconds) between database job polls.

Controls the frequency of database queries for:
- Checking for new jobs to start
- Fetching updates for running jobs
- Checking for abort requests
Controls the frequency of database queries for new jobs to start.

When `listen_notify` is True, the polling interval acts as a fallback
mechanism and can reasonably be set to a higher value.

(defaults to 5.0)
abort_job_polling_interval : ``float``
Maximum time (in seconds) between database abort requet polls.

Controls the frequency of database queries for abort requests

When `listen_notify` is True, the polling interval acts as a fallback
mechanism and can reasonably be set to a higher value.
Expand Down
12 changes: 10 additions & 2 deletions procrastinate/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,18 @@ def configure_worker_parser(subparsers: argparse._SubParsersAction):
add_argument(
worker_parser,
"-p",
"--polling-interval",
"--fetch-job-polling-interval",
type=float,
help="How long to wait for database event push before polling",
envvar="WORKER_POLLING_INTERVAL",
envvar="WORKER_FETCH_JOB_POLLING_INTERVAL",
)
add_argument(
worker_parser,
"-a",
"--abort-job-polling-interval",
type=float,
help="How often to polling for abort requests",
envvar="WORKER_ABORT_JOB_POLLING_INTERVAL",
)
add_argument(
worker_parser,
Expand Down
15 changes: 9 additions & 6 deletions procrastinate/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@

WORKER_NAME = "worker"
WORKER_CONCURRENCY = 1 # maximum number of parallel jobs
POLLING_INTERVAL = 5.0 # seconds
FETCH_JOB_POLLING_INTERVAL = 5.0 # seconds
ABORT_JOB_POLLING_INTERVAL = 5.0 # seconds


class Worker:
Expand All @@ -36,7 +37,8 @@ def __init__(
name: str | None = WORKER_NAME,
concurrency: int = WORKER_CONCURRENCY,
wait: bool = True,
polling_interval: float = POLLING_INTERVAL,
fetch_job_polling_interval: float = FETCH_JOB_POLLING_INTERVAL,
abort_job_polling_interval: float = ABORT_JOB_POLLING_INTERVAL,
shutdown_timeout: float | None = None,
listen_notify: bool = True,
delete_jobs: str | jobs.DeleteJobCondition | None = None,
Expand All @@ -48,7 +50,8 @@ def __init__(
self.worker_name = name
self.concurrency = concurrency
self.wait = wait
self.polling_interval = polling_interval
self.fetch_job_polling_interval = fetch_job_polling_interval
self.abort_job_polling_interval = abort_job_polling_interval
self.listen_notify = listen_notify
self.delete_jobs = (
jobs.DeleteJobCondition(delete_jobs)
Expand Down Expand Up @@ -361,9 +364,9 @@ async def _handle_notification(
async def _poll_jobs_to_abort(self):
while True:
logger.debug(
f"waiting for {self.polling_interval}s before querying jobs to abort"
f"waiting for {self.abort_job_polling_interval}s before querying jobs to abort"
)
await asyncio.sleep(self.polling_interval)
await asyncio.sleep(self.abort_job_polling_interval)
if not self._running_jobs:
logger.debug("Not querying jobs to abort because no job is running")
continue
Expand Down Expand Up @@ -485,7 +488,7 @@ async def _run_loop(self):
# wait for a new job notification, a stop even or the next polling interval
await utils.wait_any(
self._new_job_event.wait(),
asyncio.sleep(self.polling_interval),
asyncio.sleep(self.fetch_job_polling_interval),
self._stop_event.wait(),
)
await self._fetch_and_process_jobs()
Expand Down
21 changes: 12 additions & 9 deletions tests/acceptance/test_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ async def task1():

job_id = await task1.defer_async()

polling_interval = 0.1
abort_job_polling_interval = 0.1

worker_task = asyncio.create_task(
async_app.run_worker_async(
queues=["default"],
wait=False,
polling_interval=polling_interval,
abort_job_polling_interval=abort_job_polling_interval,
listen_notify=True if mode == "listen" else False,
)
)
Expand All @@ -132,9 +132,9 @@ async def task1():
assert result is True

# when listening for notifications, job should cancel within ms
# if notifications are disabled, job will only cancel after polling_interval
# if notifications are disabled, job will only cancel after abort_job_polling_interval
await asyncio.wait_for(
worker_task, timeout=0.1 if mode == "listen" else polling_interval * 2
worker_task, timeout=0.1 if mode == "listen" else abort_job_polling_interval * 2
)

status = await async_app.job_manager.get_job_status_async(job_id)
Expand All @@ -152,13 +152,13 @@ def task1(context):

job_id = await task1.defer_async()

polling_interval = 0.1
abort_job_polling_interval = 0.1

worker_task = asyncio.create_task(
async_app.run_worker_async(
queues=["default"],
wait=False,
polling_interval=polling_interval,
abort_job_polling_interval=abort_job_polling_interval,
listen_notify=True if mode == "listen" else False,
)
)
Expand All @@ -168,9 +168,9 @@ def task1(context):
assert result is True

# when listening for notifications, job should cancel within ms
# if notifications are disabled, job will only cancel after polling_interval
# if notifications are disabled, job will only cancel after abort_job_polling_interval
await asyncio.wait_for(
worker_task, timeout=0.1 if mode == "listen" else polling_interval * 2
worker_task, timeout=0.1 if mode == "listen" else abort_job_polling_interval * 2
)

status = await async_app.job_manager.get_job_status_async(job_id)
Expand Down Expand Up @@ -218,7 +218,10 @@ async def sum(a: int, b: int):
# rely on polling to fetch new jobs
worker_task = asyncio.create_task(
async_app.run_worker_async(
concurrency=1, wait=True, listen_notify=False, polling_interval=0.3
concurrency=1,
wait=True,
listen_notify=False,
fetch_job_polling_interval=0.3,
)
)

Expand Down
5 changes: 3 additions & 2 deletions tests/integration/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ async def test_worker(entrypoint, cli_app, mocker):
cli_app.run_worker_async = mocker.AsyncMock()
result = await entrypoint(
"worker "
"--queues a,b --name=w1 --polling-interval=8.3 "
"--queues a,b --name=w1 --fetch-job-polling-interval=8.3 --abort-job-polling-interval=20 "
"--one-shot --concurrency=10 --no-listen-notify --delete-jobs=always"
)

Expand All @@ -79,7 +79,8 @@ async def test_worker(entrypoint, cli_app, mocker):
concurrency=10,
name="w1",
queues=["a", "b"],
polling_interval=8.3,
fetch_job_polling_interval=8.3,
abort_job_polling_interval=20,
wait=False,
listen_notify=False,
delete_jobs=jobs.DeleteJobCondition.ALWAYS,
Expand Down
8 changes: 4 additions & 4 deletions tests/integration/test_wait_stop.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ async def test_wait_for_activity_cancelled(psycopg_connector):
Testing that the work can be cancelled
"""
pg_app = app.App(connector=psycopg_connector)
worker = worker_module.Worker(app=pg_app, polling_interval=2)
worker = worker_module.Worker(app=pg_app, fetch_job_polling_interval=2)
task = asyncio.ensure_future(worker.run())
await asyncio.sleep(0.2) # should be enough so that we're waiting

Expand All @@ -31,7 +31,7 @@ async def test_wait_for_activity_timeout(psycopg_connector):
Testing that we timeout if nothing happens
"""
pg_app = app.App(connector=psycopg_connector)
worker = worker_module.Worker(app=pg_app, polling_interval=2)
worker = worker_module.Worker(app=pg_app, fetch_job_polling_interval=2)
task = asyncio.ensure_future(worker.run())
await asyncio.sleep(0.2) # should be enough so that we're waiting
with pytest.raises(asyncio.TimeoutError):
Expand All @@ -43,7 +43,7 @@ async def test_wait_for_activity_stop_from_signal(psycopg_connector, kill_own_pi
Testing than ctrl+c interrupts the wait
"""
pg_app = app.App(connector=psycopg_connector)
worker = worker_module.Worker(app=pg_app, polling_interval=2)
worker = worker_module.Worker(app=pg_app, fetch_job_polling_interval=2)
task = asyncio.ensure_future(worker.run())
await asyncio.sleep(0.2) # should be enough so that we're waiting

Expand All @@ -60,7 +60,7 @@ async def test_wait_for_activity_stop(psycopg_connector):
Testing than calling worker.stop() interrupts the wait
"""
pg_app = app.App(connector=psycopg_connector)
worker = worker_module.Worker(app=pg_app, polling_interval=2)
worker = worker_module.Worker(app=pg_app, fetch_job_polling_interval=2)
task = asyncio.ensure_future(worker.run())
await asyncio.sleep(0.2) # should be enough so that we're waiting

Expand Down
4 changes: 2 additions & 2 deletions tests/unit/test_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ def test_app_register(app: app_module.App):
def test_app_worker(app: app_module.App, mocker):
Worker = mocker.patch("procrastinate.worker.Worker")

app.worker_defaults["polling_interval"] = 12
app.worker_defaults["fetch_job_polling_interval"] = 12
app._worker(queues=["yay"], name="w1", wait=False)

Worker.assert_called_once_with(
queues=["yay"], app=app, name="w1", polling_interval=12, wait=False
queues=["yay"], app=app, name="w1", fetch_job_polling_interval=12, wait=False
)


Expand Down
6 changes: 3 additions & 3 deletions tests/unit/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ async def perform_job():

@pytest.mark.parametrize(
"worker",
[({"polling_interval": 0.05})],
[({"fetch_job_polling_interval": 0.05})],
indirect=["worker"],
)
async def test_worker_run_respects_polling(worker, app):
Expand Down Expand Up @@ -601,8 +601,8 @@ async def task_func():
@pytest.mark.parametrize(
"worker",
[
({"listen_notify": False, "polling_interval": 0.05}),
({"listen_notify": True, "polling_interval": 1}),
({"listen_notify": False, "abort_job_polling_interval": 0.05}),
({"listen_notify": True, "abort_job_polling_interval": 1}),
],
indirect=["worker"],
)
Expand Down
Loading