Skip to content

Commit

Permalink
Merge pull request #471 from aleksandr-shtaub/arguments_for_periodic_…
Browse files Browse the repository at this point in the history
…tasks_436
  • Loading branch information
Joachim Jablon authored Dec 19, 2021
2 parents 0b858d4 + fc4a5ef commit 1582d41
Show file tree
Hide file tree
Showing 15 changed files with 502 additions and 163 deletions.
75 changes: 44 additions & 31 deletions docs/howto/cron.rst
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
Launch a task periodically
==========================

Procrastinate offers a way to schedule periodic deferring of tasks. It uses the
`Unix cron`_ syntax::
Procrastinate offers a way to schedule periodic deferring of tasks, with
`App.periodic`. It uses the `Unix cron`_ syntax::

# scheduled at the 0th minute of each hour
@app.periodic(cron="0 * * * *")
Expand Down Expand Up @@ -43,45 +43,58 @@ When using periodic tasks there are a few things to know:
but a task that is more than 10 minutes late will not be deferred. This value is
configurable in the `App`.

Periodic task arguments
-----------------------
Timestamp argument
------------------

Periodic tasks receive a single integer argument, named ``timestamp``. it represents the
`Unix timestamp`__ of the date/time it was scheduled for (which might be arbitrarily far
in the past).
By default, periodic tasks receive a single integer argument, named
``timestamp``. it represents the `Unix timestamp`__ of the date/time it was
scheduled for (which might be arbitrarily far in the past).

.. __: https://en.wikipedia.org/wiki/Unix_time

Queue, lock, queuing lock
-------------------------
Scheduling a job multiple times with multiple arguments
-------------------------------------------------------

Procrastinate itself takes care of deferring the periodic jobs, which means you don't
have to opportunity to specify a given queue, lock or queueing lock at defer time.
Fortunately, you can define all of those on the task itself, provided that you
plan to have the same value for every job::
It's possible to pass additional arguments to `App.periodic`, they will be used
to configure the periodic task. Arguments are identical to `Task.configure`.

@app.periodic(cron="*/5 * * * *")
@app.task(
queue="healthchecks",
lock="healthchecks",
queueing_lock="healthchecks"
)
def run_healthchecks(timestamp: int):
...
This can let you add multiple periodic schedules for a single task. If you do
that, you will need to pass a ``periodic_id`` argument to `App.periodic`, which
will be used by Procrastinate to distiguish the different schedules of the same
task.

The value of those parameters is static, but you could put different values on different
workers. If the same task is periodically deferred to different queues, each job will be
independent::
Of course, you can also use arguments on `App.task` which will be common to all
schedules.

@app.periodic(cron="*/5 * * * *")
@app.task(
queue=f"healthchecks_{my_worker_id}",
)
def run_healthchecks(timestamp: int):
::

@app.task(lock="do_something_lock")
def do_something(timestamp: int, value: int):
...

In this setup, each worker (with differing values of ``my_worker_id``) would defer their
own healthcheck jobs, independently from other workers.
app.periodic(
cron="*/5 * * * *",
queue="foo",
task_kwargs={"value": 1},
periodic_id="foo",
)(do_something)

app.periodic(
cron="*/8 * * * *",
queue="bar",
task_kwargs={"value": 2},
periodic_id="bar",
)(do_something)

In the example below, the ``do_something`` task would be deferred every 5
minutes on the queue ``"foo"`` with ``value=1`` **and** every 8 minutes on the
queue ``"bar"`` with ``value=2``. And either way, it would be deferred with the
lock ``"do_something_lock"``.

.. note::

The arguments ``schedule_in`` and ``schedule_at`` of `Task.configure` would be
confusing in this context, so they're ignored.

Using cron
----------
Expand Down
2 changes: 1 addition & 1 deletion docs/reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ App

.. autoclass:: procrastinate.App
:members: open, open_async, task, run_worker, run_worker_async, configure_task,
from_path, add_tasks_from, add_task_alias, with_connector
from_path, add_tasks_from, add_task_alias, with_connector, periodic

Connectors
----------
Expand Down
14 changes: 9 additions & 5 deletions procrastinate/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,7 @@ def __init__(
periodic_defaults = periodic_defaults or {}

self.job_manager = manager.JobManager(connector=self.connector)
self.periodic_deferrer = periodic.PeriodicDeferrer(
job_manager=self.job_manager, **periodic_defaults
)
self.periodic_deferrer = periodic.PeriodicDeferrer(**periodic_defaults)

self._register_builtin_tasks()

Expand All @@ -116,7 +114,7 @@ def with_connector(self, connector: connector_module.BaseConnector) -> "App":
app.periodic_deferrer = self.periodic_deferrer
return app

def periodic(self, *, cron: str):
def periodic(self, *, cron: str, periodic_id: str = "", **kwargs: Dict[str, Any]):
"""
Task decorator, marks task as being scheduled for periodic deferring (see
`howto/cron`).
Expand All @@ -125,8 +123,14 @@ def periodic(self, *, cron: str):
----------
cron :
Cron-like string. Optionally add a 6th column for seconds.
periodic_id :
Task name suffix. Used to distinct periodic tasks with different kwargs.
**kwargs :
Additional parameters are passed to `Task.configure`.
"""
return self.periodic_deferrer.periodic_decorator(cron=cron)
return self.periodic_deferrer.periodic_decorator(
cron=cron, periodic_id=periodic_id, **kwargs
)

def _register_builtin_tasks(self) -> None:
from procrastinate import builtin_tasks
Expand Down
21 changes: 15 additions & 6 deletions procrastinate/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,22 +79,31 @@ def _raise_already_enqueued(
) from exc
raise exc

async def defer_periodic_job(self, task, defer_timestamp) -> Optional[int]:
async def defer_periodic_job(
self,
job: "jobs.Job",
periodic_id: str,
defer_timestamp: int,
) -> Optional[int]:
"""
Defer a periodic job, ensuring that no other worker will defer a job for the
same timestamp.
"""
job.task_kwargs["timestamp"] = defer_timestamp
# schedule_at and schedule_in are meaningless in this context, we ignore them
try:
result = await self.connector.execute_query_one_async(
query=sql.queries["defer_periodic_job"],
task_name=task.name,
queue=task.queue,
lock=task.lock,
queueing_lock=task.queueing_lock,
task_name=job.task_name,
defer_timestamp=defer_timestamp,
periodic_id=periodic_id,
queue=job.queue,
lock=job.lock,
queueing_lock=job.queueing_lock,
args=job.task_kwargs,
)
except exceptions.UniqueViolation as exc:
self._raise_already_enqueued(exc=exc, queueing_lock=task.queueing_lock)
self._raise_already_enqueued(exc=exc, queueing_lock=job.queueing_lock)

return result["id"]

Expand Down
114 changes: 77 additions & 37 deletions procrastinate/periodic.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import asyncio
import functools
import logging
import sys
import time
from typing import Dict, Iterable, List, Optional, Tuple
from typing import Any, Dict, Iterable, Optional, Tuple

import attr
import croniter

from procrastinate import exceptions, manager, tasks
from procrastinate import exceptions, tasks

# The maximum delay after which tasks will be considered as
# outdated, and ignored.
Expand All @@ -18,36 +19,45 @@

logger = logging.getLogger(__name__)

TaskAtTime = Tuple[tasks.Task, int]
if sys.version_info < (3, 8):
cached_property = property
else:
cached_property = functools.cached_property


@attr.dataclass(frozen=True)
class PeriodicTask:
task: tasks.Task
cron: str
periodic_id: str
configure_kwargs: Dict[str, Any]

@functools.lru_cache(maxsize=1)
@cached_property
def croniter(self) -> croniter.croniter:
return croniter.croniter(self.cron)


TaskAtTime = Tuple[PeriodicTask, int]


class PeriodicDeferrer:
def __init__(self, job_manager: manager.JobManager, max_delay: float = MAX_DELAY):
self.periodic_tasks: List[PeriodicTask] = []
self.job_manager = job_manager
# {task_name: defer_timestamp}
self.last_defers: Dict[str, int] = {}
def __init__(self, max_delay: float = MAX_DELAY):
self.periodic_tasks: Dict[Tuple[str, str], PeriodicTask] = {}
# {(task_name, periodic_id): defer_timestamp}
self.last_defers: Dict[Tuple[str, str], int] = {}
self.max_delay = max_delay

def periodic_decorator(self, cron: str):
def periodic_decorator(self, cron: str, periodic_id: str, **kwargs):
"""
Decorator over a task definition that registers that task for periodic
launch. This decorator should not be used directly, ``@app.periodic()`` is meant
to be used instead.
"""

def wrapper(task: tasks.Task):
self.register_task(task=task, cron=cron)
self.register_task(
task=task, cron=cron, periodic_id=periodic_id, configure_kwargs=kwargs
)
return task

return wrapper
Expand All @@ -70,17 +80,39 @@ async def worker(self) -> None:

# Internal methods

def register_task(self, task: tasks.Task, cron: str) -> PeriodicTask:
def register_task(
self,
task: tasks.Task,
cron: str,
periodic_id: str,
configure_kwargs: Dict[str, Any],
) -> PeriodicTask:
key = (task.name, periodic_id)
if key in self.periodic_tasks:
raise exceptions.TaskAlreadyRegistered(
"A periodic task was already registed with the same periodic_id "
f"({periodic_id!r}). Please use a different periodic_id for multiple "
"schedules of the same task."
)

logger.info(
f"Registering task {task.name} to run periodically with cron {cron}",
f"Registering task {task.name} with periodic id {periodic_id!r} to run "
f"periodically with cron {cron}",
extra={
"action": "registering_periodic_task",
"task": task.name,
"cron": cron,
"periodic_id": periodic_id,
"kwargs": str(configure_kwargs),
},
)
periodic_task = PeriodicTask(task=task, cron=cron)
self.periodic_tasks.append(periodic_task)

self.periodic_tasks[key] = periodic_task = PeriodicTask(
task=task,
cron=cron,
periodic_id=periodic_id,
configure_kwargs=configure_kwargs,
)
return periodic_task

def get_next_tick(self, at: float):
Expand All @@ -90,8 +122,8 @@ def get_next_tick(self, at: float):
If now is not passed, the current timestamp is used.
"""
next_timestamp = min(
pt.croniter().get_next(ret_type=float, start_time=at) # type: ignore
for pt in self.periodic_tasks
pt.croniter.get_next(ret_type=float, start_time=at) # type: ignore
for pt in self.periodic_tasks.values()
)
return next_timestamp - at

Expand All @@ -102,20 +134,19 @@ def get_previous_tasks(self, at: float) -> Iterable[TaskAtTime]:
Tasks that should have been deferred more than self.max_delay seconds ago are
ignored.
"""
for periodic_task in self.periodic_tasks:
task = periodic_task.task
name = task.name

for key, periodic_task in self.periodic_tasks.items():
for timestamp in self.get_timestamps(
periodic_task=periodic_task, since=self.last_defers.get(name), until=at
periodic_task=periodic_task,
since=self.last_defers.get(key),
until=at,
):
self.last_defers[name] = timestamp
yield task, timestamp
self.last_defers[key] = timestamp
yield periodic_task, timestamp

def get_timestamps(
self, periodic_task: PeriodicTask, since: Optional[int], until: float
) -> Iterable[int]:
cron_iterator = periodic_task.croniter()
cron_iterator = periodic_task.croniter
if since:
# For some reason, mypy can't wrap its head around this statement.
# You're welcome to tell us why (or how to fix it).
Expand Down Expand Up @@ -150,21 +181,32 @@ async def defer_jobs(self, jobs_to_defer: Iterable[TaskAtTime]) -> None:
Try deferring all tasks that might need deferring. The database will keep us
from deferring the same task for the same scheduled time multiple times.
"""
for task, timestamp in jobs_to_defer:
for periodic_task, timestamp in jobs_to_defer:
task = periodic_task.task
periodic_id = periodic_task.periodic_id
configure_kwargs = periodic_task.configure_kwargs
description = {
"task_name": task.name,
"periodic_id": periodic_id,
"defer_timestamp": timestamp,
"kwargs": configure_kwargs,
}
job_deferrer = task.configure(**configure_kwargs)
try:
job_id = await self.job_manager.defer_periodic_job(
task=task, defer_timestamp=timestamp
job_id = await job_deferrer.job_manager.defer_periodic_job(
job=job_deferrer.job,
periodic_id=periodic_id,
defer_timestamp=timestamp,
)
except exceptions.AlreadyEnqueued:
logger.debug(
f"Periodic job {task.name}(timestamp={timestamp}) "
"cannot be enqueued: there is already a job in the queue "
f"with the queueing lock {task.queueing_lock}",
f"Periodic job {task.name}(timestamp={timestamp}, "
f"periodic_id={periodic_id}) cannot be enqueued: there is already "
f"a job in the queue with the queueing lock {task.queueing_lock}",
extra={
"action": "skip_periodic_task_queueing_lock",
"task_name": task.name,
"defer_timestamp": timestamp,
"queueing_lock": task.queueing_lock,
**description,
},
)
continue
Expand All @@ -175,9 +217,8 @@ async def defer_jobs(self, jobs_to_defer: Iterable[TaskAtTime]) -> None:
f"{timestamp} with id {job_id}",
extra={
"action": "periodic_task_deferred",
"task": task.name,
"timestamp": timestamp,
"job_id": job_id,
**description,
},
)
else:
Expand All @@ -186,8 +227,7 @@ async def defer_jobs(self, jobs_to_defer: Iterable[TaskAtTime]) -> None:
f"deferred for timestamp {timestamp}",
extra={
"action": "periodic_task_already_deferred",
"task": task.name,
"timestamp": timestamp,
**description,
},
)

Expand Down
Loading

0 comments on commit 1582d41

Please sign in to comment.