Skip to content

Commit

Permalink
Merge pull request #1181 from procrastinate-org/v3-update
Browse files Browse the repository at this point in the history
ewjoachim authored Sep 5, 2024

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
2 parents 24cd54b + 657941e commit ed4d625
Showing 17 changed files with 288 additions and 238 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -61,6 +61,7 @@ jobs:
with:
name: coverage-${{ matrix.python-version }}
path: .coverage.${{ matrix.python-version }}
include-hidden-files: true

static-typing:
name: Run Pyright
10 changes: 5 additions & 5 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -29,7 +29,7 @@ repos:
- id: trailing-whitespace
- id: mixed-line-ending
- repo: https://github.com/RobertCraigie/pyright-python
rev: v1.1.376
rev: v1.1.378
hooks:
- id: pyright
additional_dependencies:
@@ -40,16 +40,16 @@ repos:
- contextlib2==21.6.0
- croniter==3.0.3
- django-stubs==5.0.4
- django==4.2.15
- importlib-resources==6.4.2
- django==4.2.16
- importlib-resources==6.4.4
- psycopg2-binary==2.9.9
- psycopg[pool]==3.2.1
- python-dateutil==2.9.0.post0
- sphinx==7.1.2
- sqlalchemy==2.0.32
- sqlalchemy==2.0.34
- typing-extensions==4.12.2
- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.1
rev: v0.6.4
hooks:
- id: ruff
args: [--fix, --unsafe-fixes]
45 changes: 22 additions & 23 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -13,13 +13,13 @@ Of course, feel free to read the script before launching it.
This script is intended to be a one-liner that sets up everything you need. It makes
the following assumptions:

- You're using `MacOS` or `Linux`, and `bash` or `zsh`.
- You already have `python3` available
- You have `poetry` [installed](https://python-poetry.org/docs/#installation)
- Either you've already setup a PostgreSQL database and environment variables (`PG*`)
are set or you have `docker-compose` available and port 5432 is free.
- Either `psql` and other `libpq` executables are available in the `PATH` or they
are located in `usr/local/opt/libpq/bin` (`Homebrew`).
- You're using `MacOS` or `Linux`, and `bash` or `zsh`.
- You already have `python3` available
- You have `poetry` [installed](https://python-poetry.org/docs/#installation)
- Either you've already setup a PostgreSQL database and environment variables (`PG*`)
are set or you have `docker compose` available and port 5432 is free.
- Either `psql` and other `libpq` executables are available in the `PATH` or they
are located in `usr/local/opt/libpq/bin` (`Homebrew`).

The `dev-env` script will add the `scripts` folder to your `$PATH` for the current
shell, so in the following documentation, if you see `scripts/foo`, you're welcome
@@ -46,7 +46,7 @@ The PostgreSQL database we used is a fresh standard out-of-the-box database
on the latest stable version.

```console
$ docker-compose up -d postgres
$ docker compose up -d postgres
```

If you want to try out the project locally, it's useful to have `postgresql-client`
@@ -129,7 +129,6 @@ In addition, an [editorconfig] file will help your favorite editor to respect
procrastinate coding style. It is automatically used by most famous IDEs, such as
Pycharm and VS Code.


### Write the documentation

The documentation is written in `Markdown` and built with `Sphinx` and `MyST`.
@@ -301,23 +300,23 @@ Python environment on the host system. Alternatively, they can be installed in a
image, and Procrastinate and all the development tools can be run in Docker containers.
Docker is useful when you can't, or don't want to, install system requirements.

This section shows, through `docker-compose` command examples, how to test and run
This section shows, through `docker compose` command examples, how to test and run
Procrastinate in Docker.

Build the `procrastinate` Docker image:

```console
$ export UID GID
$ docker-compose build procrastinate
$ docker compose build procrastinate
```

Run the automated tests:

```console
$ docker-compose run --rm procrastinate pytest
$ docker compose run --rm procrastinate pytest
```

Docker Compose is configured (in `docker-compose.yml`) to mount the local directory on
Docker Compose is configured (in `docker compose.yml`) to mount the local directory on
the host system onto `/src` in the container. This means that local
changes made to the Procrastinate code are visible in Procrastinate containers.

@@ -326,63 +325,63 @@ container to be run with the current user id and group id. If not set or exporte
Procrastinate container will run as root, and files owned by root may be created in the
developer's working directory.

In the definition of the `procrastinate` service in `docker-compose.yml` the
In the definition of the `procrastinate` service in `docker compose.yml` the
`PROCRASTINATE_APP` variable is set to `procrastinate_demo.app.app` (the
Procrastinate demo application). So `procrastinate` commands run in Procrastinate
containers are always run as if they were passed `--app procrastinate_demo.app.app`.

Run the `procrastinate` command :

```console
$ docker-compose run --rm procrastinate procrastinate -h
$ docker compose run --rm procrastinate procrastinate -h
```

Apply the Procrastinate database schema:

```console
$ docker-compose run --rm procrastinate procrastinate schema --apply
$ docker compose run --rm procrastinate procrastinate schema --apply
```

Run the Procrastinate healthchecks:

```console
$ docker-compose run --rm procrastinate procrastinate healthchecks
$ docker compose run --rm procrastinate procrastinate healthchecks
```

Start a Procrastinate worker (`-d` used to start the container in detached mode):

```console
$ docker-compose up -d procrastinate
$ docker compose up -d procrastinate
```

Run a command (`bash` here) in the Procrastinate worker container just started:

```console
$ docker-compose exec procrastinate bash
$ docker compose exec procrastinate bash
```

Watch the Procrastinate worker logs:

```console
$ docker-compose logs -ft procrastinate
$ docker compose logs -ft procrastinate
```

Use the `procrastinate defer` command to create a job:

```console
$ docker-compose run --rm procrastinate procrastinate defer procrastinate_demo.tasks.sum '{"a":3, "b": 5}'
$ docker compose run --rm procrastinate procrastinate defer procrastinate_demo.tasks.sum '{"a":3, "b": 5}'
```

Or run the demo main file:

```console
$ docker-compose run --rm procrastinate python -m procrastinate_demo
$ docker compose run --rm procrastinate python -m procrastinate_demo
```

Stop and remove all the containers (including the `postgres` container):

```console
$ docker-compose down
$ docker compose down
```

## Wait, there are `async` and `await` keywords everywhere!?
2 changes: 1 addition & 1 deletion dev-env
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@ export GID=$(id -g)
if ! pg_isready ; then
echo "Starting database"
export PGDATABASE=procrastinate PGHOST=127.0.0.1 PGUSER=postgres PGPASSWORD=password
docker-compose up -d postgres || return
docker compose up -d postgres || return
sleep 3
fi

18 changes: 0 additions & 18 deletions docs/howto/advanced/sync_defer.md
Original file line number Diff line number Diff line change
@@ -55,24 +55,6 @@ app = App(connector=SQLAlchemyPsycopg2Connector())
app.open(engine)
```

## Having multiple apps

If you need to have multiple connectors interact with the tasks, you can
create multiple synchronized apps with {py:meth}`App.with_connector`:

```
import procrastinate
app = procrastinate.App(
connector=procrastinate.PsycopgConnector(...),
)
sync_app = app.with_connector(
connector=procrastinate.SyncPsycopgConnector(...),
)
```

## Procrastinate's automatic connector selection

Async connectors are able to summon their synchronous counterpart when needed
33 changes: 28 additions & 5 deletions docs/howto/basics/defer.md
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ There are several ways to do this.

In the following examples, the task will be:

```
```python
@app.task(queue="some_queue")
def my_task(a: int, b:int):
pass
@@ -14,39 +14,62 @@ Task name is `my_module.my_task`.

## The direct way

```
By using the sync method:

```python
my_task.defer(a=1, b=2)
```

## With parameters
Or the async method:

```python
await my_task.defer_async(a=1, b=2)
```

## With parameters

Using the sync defer method:

```python
my_task.configure(
lock="the name of my lock",
schedule_in={"hours": 1},
queue="not_the_default_queue"
).defer(a=1, b=2)

# or
await my_task.configure(
lock="the name of my lock",
schedule_in={"hours": 1},
queue="not_the_default_queue"
).defer_async(a=1, b=2)
```

See details in {py:meth}`Task.configure`

## Create a job pattern, launch multiple jobs

```
```python
pattern = my_task.configure(task_kwargs={"a": 1})

pattern.defer(b=2)
pattern.defer(b=3)
pattern.defer(b=4)
# or
await pattern.defer_async(b=2)
await pattern.defer_async(b=3)
await pattern.defer_async(b=4)
```

## Defer a job if you can't access the task

This is useful if the code that defers jobs is not in the same code base as the code
that runs the jobs. You can defer a job with just the name of its task.

```
```python
app.configure_task(name="my_module.my_task", queue="some_queue").defer(a=1, b=2)
# or
await app.configure_task(name="my_module.my_task", queue="some_queue").defer_async(a=1, b=2)
```

Any parameter you would use for {py:meth}`Task.configure` can be used in
27 changes: 24 additions & 3 deletions docs/howto/basics/tasks.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,43 @@
# Define a task

You can specify a task with:
Specify a sync task with:

```
```python
@app.task(...)
def mytask(argument, other_argument):
...
```

:::{note}
Each sync task runs in its own thread (independently of the worker thread).
:::

Or an async task with:

```python
@app.task(...)
async def mytask(argument, other_argument):
...
```

:::{note}
All async tasks run in the same event loop.
:::

See {py:meth}`App.task` for the exact parameters. In particular, you can define values for
`queue`, `lock` and `queueing_lock` that will be used as default values when
calling {py:meth}`Task.configure` or {py:meth}`Task.defer` on this task.

If you're OK with all the default parameters, you can omit parentheses after
`task`:

```
```python
@app.task
def mytask(argument, other_argument):
...

# or
@app.task
async def mytask(argument, other_argument):
...
```
4 changes: 2 additions & 2 deletions docs/howto/django/scripts.md
Original file line number Diff line number Diff line change
@@ -21,8 +21,8 @@ def main():
django.setup()
# By default, the app uses the Django database connection, which is unsuitable
# for the worker.
app = app.with_connector(app.connector.get_worker_connector())
app.run_worker()
with app.replace_connector(app.connector.get_worker_connector()):
app.run_worker()

if __name__ == "__main__":
main()
22 changes: 11 additions & 11 deletions docs/howto/django/tests.md
Original file line number Diff line number Diff line change
@@ -24,8 +24,8 @@ def app():
# Replace the connector in the current app
# Note that this fixture gives you the app back for convenience, but it's
# the same instance as you'd get with `procrastinate.contrib.django.app`.
with procrastinate_app.current_app.replace_connector(in_memory) as app_with_connector:
yield app_with_connector
with procrastinate_app.current_app.replace_connector(in_memory) as app:
yield app

def test_my_task(app):
# Run the task
@@ -126,8 +126,8 @@ class TestingTaskClass(TransactionTestCase):
my_task.defer(a=1, b=2)

# Start worker
app = app.with_connector(app.connector.get_worker_connector())
app.run_worker(wait=False, install_signal_handlers=False, listen_notify=False)
with app.replace_connector(app.connector.get_worker_connector())
app.run_worker(wait=False, install_signal_handlers=False, listen_notify=False)

# Check task has been executed
assert ProcrastinateJob.objects.filter(task_name="my_task").status == "succeeded"
@@ -144,20 +144,20 @@ def test_task():
my_task.defer(a=1, b=2)

# Start worker
app = app.with_connector(app.connector.get_worker_connector())
app.run_worker(wait=False, install_signal_handlers=False, listen_notify=False)
with app.replace_connector(app.connector.get_worker_connector())
app.run_worker(wait=False, install_signal_handlers=False, listen_notify=False)

# Check task has been executed
assert ProcrastinateJob.objects.filter(task_name="my_task").status == "succeeded"

# Or with a fixture
@pytest.fixture
def worker(transactional_db):
def _():
app = app.with_connector(app.connector.get_worker_connector())
app.run_worker(wait=False, install_signal_handlers=False, listen_notify=False)
return app
return _
with app.replace_connector(app.connector.get_worker_connector())
def f():
app.run_worker(wait=False, install_signal_handlers=False, listen_notify=False)
return app
yield f

def test_task(worker):
# Run tasks
6 changes: 3 additions & 3 deletions docs/howto/production/delete_finished_jobs.md
Original file line number Diff line number Diff line change
@@ -13,9 +13,9 @@ app.run_worker(delete_jobs="always")

With `always`, every finished job will be deleted on completion. Other options are:

- `successful` to only delete successful jobs and keep failed jobs in the database
until explicit deletion.
- `never` to keep every job in the database, this is the default.
- `successful` to only delete successful jobs and keep failed jobs in the database
until explicit deletion.
- `never` to keep every job in the database, this is the default.

You can also do this from the CLI:

4 changes: 2 additions & 2 deletions docs/howto/production/testing.md
Original file line number Diff line number Diff line change
@@ -17,8 +17,8 @@ def app():
# Replace the connector in the current app
# Note that this fixture gives you the app back for covenience,
# but it's the same instance as `my_app`.
with my_app.replace_connector(in_memory) as app_with_connector:
yield app_with_connector
with my_app.replace_connector(in_memory) as app:
yield app


def test_my_task(app):
272 changes: 142 additions & 130 deletions poetry.lock

Large diffs are not rendered by default.

34 changes: 23 additions & 11 deletions procrastinate/app.py
Original file line number Diff line number Diff line change
@@ -115,12 +115,18 @@ def __init__(

self._register_builtin_tasks()

def with_connector(self, connector: connector_module.BaseConnector) -> App:
def with_connector(
self,
connector: connector_module.BaseConnector,
) -> App:
"""
Create another app instance sychronized with this one, with a different
connector. For all things regarding periodic tasks, the original app
(and its original connector) will be used, even when the new app's
methods are used.
connector.
.. deprecated:: 2.14.0
Use `replace_connector` instead. Because this method creates a new
app that references the same tasks, and the task have a link
back to the app, using this method can lead to unexpected behavior.
Parameters
----------
@@ -130,7 +136,7 @@ def with_connector(self, connector: connector_module.BaseConnector) -> App:
Returns
-------
:
A new compatible app.
A new app with the same tasks.
"""
app = App(
connector=connector,
@@ -147,6 +153,12 @@ def replace_connector(
) -> Iterator[App]:
"""
Replace the connector of the app while in the context block, then restore it.
The context variable is the same app as this method is called on.
>>> with app.replace_connector(new_connector) as app2:
... ...
... # app and app2 are the same object
Parameters
----------
@@ -155,8 +167,8 @@ def replace_connector(
Yields
-------
`App`
A new compatible app.
:
A context manager that yields the same app with the new connector.
"""
old_connector = self.connector
self.connector = connector
@@ -258,22 +270,22 @@ async def run_worker_async(self, **kwargs: Unpack[WorkerOptions]) -> None:
Name of the worker. Will be passed in the `JobContext` and used in the
logs (defaults to ``None`` which will result in the worker named
``worker``).
polling_interval : ``float``
polling_interval: ``float``
Indicates the maximum duration (in seconds) the worker waits between
each database job poll. Raising this parameter can lower the rate at which
the worker makes queries to the database for requesting jobs.
(defaults to 5.0)
shutdown_timeout : ``float``
shutdown_timeout: ``float``
Indicates the maximum duration (in seconds) the worker waits for jobs to
complete when requested stop. Jobs that have not been completed by that time
are aborted. A value of None corresponds to no timeout.
(defaults to None)
listen_notify : ``bool``
listen_notify: ``bool``
If ``True``, the worker will dedicate a connection from the pool to
listening to database events, notifying of newly available jobs.
If ``False``, the worker will just poll the database periodically
(see ``polling_interval``). (defaults to ``True``)
delete_jobs : ``str``
delete_jobs: ``str``
If ``always``, the worker will automatically delete all jobs on completion.
If ``successful`` the worker will only delete successful jobs.
If ``never``, the worker will keep the jobs in the database.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import asyncio
import contextlib

from django.core.management.base import BaseCommand

@@ -24,6 +25,11 @@ def add_arguments(self, parser):

def handle(self, *args, **kwargs):
kwargs = {k: v for k, v in kwargs.items() if k not in self._django_options}
context = contextlib.nullcontext()

if isinstance(app.connector, django_connector.DjangoConnector):
kwargs["app"] = app.with_connector(app.connector.get_worker_connector())
asyncio.run(cli.execute_command(kwargs))
kwargs["app"] = app
context = app.replace_connector(app.connector.get_worker_connector())

with context:
asyncio.run(cli.execute_command(kwargs))
20 changes: 5 additions & 15 deletions procrastinate/job_context.py
Original file line number Diff line number Diff line change
@@ -38,27 +38,17 @@ def as_dict(self):
class JobContext:
"""
Execution context of a running job.
Attributes
----------
app : `App`
Procrastinate `App` running this job
worker_name : ``str``
Name of the worker (may be useful for logging)
worker_queues : ``Optional[Iterable[str]]``
Queues listened by this worker
job : `Job`
Current `Job` instance
task : `Task`
Current `Task` instance. This can be None when the a task cannot be found for a given job.
Any task function being called with a job context can be guaranteed to have its own task instance set.
"""

#: Procrastinate `App` running this job
app: app_module.App
#: Name of the worker (may be useful for logging)
worker_name: str | None = None
#: Queues listened by this worker
worker_queues: Iterable[str] | None = None
#: Corresponding :py:class:`~jobs.Job`
job: jobs.Job
#: Corresponding :py:class:`~tasks.Task`
task: tasks.Task | None = None
job_result: JobResult = attr.ib(factory=JobResult)
additional_context: dict = attr.ib(factory=dict)
14 changes: 8 additions & 6 deletions tests/integration/contrib/django/test_models.py
Original file line number Diff line number Diff line change
@@ -113,12 +113,14 @@ def my_task(timestamp):
pass

django_app = procrastinate.contrib.django.app
app = django_app.with_connector(django_app.connector.get_worker_connector())
async with app.open_async():
try:
await asyncio.wait_for(app.run_worker_async(), timeout=0.1)
except asyncio.TimeoutError:
pass
with django_app.replace_connector(
django_app.connector.get_worker_connector()
) as app:
async with app.open_async():
try:
await asyncio.wait_for(app.run_worker_async(), timeout=0.1)
except asyncio.TimeoutError:
pass

periodic_defers = []
async for element in models.ProcrastinatePeriodicDefer.objects.values().all():
4 changes: 3 additions & 1 deletion tests/unit/test_worker.py
Original file line number Diff line number Diff line change
@@ -207,9 +207,11 @@ async def test_worker_run_respects_polling(worker, app):
await start_worker(worker)

connector = cast(InMemoryConnector, app.connector)
await asyncio.sleep(0.01)

assert len([query for query in connector.queries if query[0] == "fetch_job"]) == 1

await asyncio.sleep(0.05)
await asyncio.sleep(0.07)

assert len([query for query in connector.queries if query[0] == "fetch_job"]) == 2

0 comments on commit ed4d625

Please sign in to comment.