Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close race condition in procrastinate_fetch_job #231

Merged
merged 3 commits into from
Jun 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 20 additions & 31 deletions docs/discussions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -80,44 +80,48 @@ About locks
-----------

Let's say we have a :term:`task` that writes a character at the end of a file after
waiting for a random amount of time. This represents a real world problem where tasks
waiting for a random amount of time. This represents a real world problem where jobs
take an unforeseeable amount of time and share resources like a database.

We launch 4 tasks respectively writing ``a``, ``b``, ``c`` and ``d``. We would expect
We launch 4 jobs respectively writing ``a``, ``b``, ``c`` and ``d``. We would expect
the file to contain ``abcd``, but it's not the case, for example maybe it's ``badc``.
The jobs were taken from the queue in order, but because we have several workers, the
jobs were launched in parallel and because their duration is random, the final result
pretty much is too.

We can solve this problem by using locks. Procrastinate gives us two guarantees:

- Tasks are consumed in creation order. When a worker requests a task, it will always
receive the oldest available task. Unavailable tasks, either locked, scheduled for the
future or in a queue that the worker doesn't listen to, will be ignored.
- If a group of tasks share the same lock, then only one can be executed at a time.
- Jobs are consumed in creation order. When a worker requests a job, it can recieve
a job with a lock, or a job without a lock. If there is a lock, then the received
job will be the oldest one with that lock. If the oldest job awaiting execution is
not available for this worker (either it's on a queue that this worker doesn't
listen to, or it's scheduled in the future), then jobs with this lock will not be
considered.
- If a group of jobs share the same lock, then only one can be executed at a time.

These two facts allow us to draw the following conclusion for our 4 letter tasks from
above. If our 4 tasks share the same lock (for example, the name of the file we're
These two facts allow us to draw the following conclusion for our 4 letter jobs from
above. If our 4 jobs share the same lock (for example, the name of the file we're
writing to):

- The 4 tasks will be started in order;
- A task will not start before the previous one is finished.
- The 4 jobs will be started in order;
- A job will not start before the previous one is finished.

This says we can safely expect the file to contain ``abcd``.

Note that Procrastinate will use PostgreSQL to search the jobs table for suitable jobs.
Even if the database contains a high proportion of locked tasks, this will barely affect
Procrastinates's capacity to quickly find the free tasks.
Even if the database contains a high proportion of locked jobs, this will barely affect
Procrastinates's capacity to quickly find the free jobs.

A good string identifier for the lock is a string identifier of the shared resource,
UUIDs are well suited for this. If multiple resources are implicated, a combination of
their identifiers could be used (there's no hard limit on the length of a lock string,
but stay reasonable).

A task can only take a single lock so there's no dead-lock scenario possible where two
running tasks are waiting one another. That being said, if a worker dies with a lock, it
will be up to you to free it. If the task fails but the worker survives though, the
lock will be freed.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It this outdated ?

Copy link
Member

@ewjoachim ewjoachim Jun 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could rephrasing it as:

If a worker is killed without ending its job, following jobs with the same lock will not run until the interrupted job is either manually set to "failed" or "succeeded". If a job simply fails, following jobs with the same locks may run.

A job can only take a single lock so there's no dead-lock scenario possible where two
running jobs are waiting for one another. If a worker is killed without ending its job,
following jobs with the same lock will not run until the interrupted job is either
manually set to "failed" or "succeeded". If a job simply fails, following jobs with the
same locks may run.

For a more practical approach, see `howto/locks`.

Expand Down Expand Up @@ -229,21 +233,6 @@ thing directly with queries. This is so that the database is solely responsible
consistency, and would allow us to have the same behavior if someone were to write
a procrastinate compatible client, in Python or in another language altogether.

The ``procrastinate_job_locks`` table
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

We could have used PostgreSQL's `advisory locks`_, and we choose to kinda "reimplement
the wheel" with a dedicated table. This is because we made the choice that if a worker
dies holding a lock, we'd rather have a human examine the situation and manually free
the lock than having the lock been automatically freed, and fail our locks consistency
guarantee.

.. _`advisory locks`: https://www.postgresql.org/docs/10/explicit-locking.html#ADVISORY-LOCKS

So far, Procrastinate implements async job :term:`deferring <defer>`, and async job
executing but not in parallel, meaning it can run jobs written as a coroutine, but it
will only execute one job at a time.

Why is Procrastinate asynchronous at core?
------------------------------------------

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
DROP TABLE IF EXISTS procrastinate_job_locks;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll be explicit in the changelog that with those migrations, the workers will need to be stopped when running the migration.
Of course, one can always write better migrations.


CREATE UNIQUE INDEX procrastinate_jobs_lock_idx ON procrastinate_jobs (lock) WHERE status = 'doing';

CREATE OR REPLACE FUNCTION procrastinate_fetch_job(target_queue_names character varying[]) RETURNS procrastinate_jobs
LANGUAGE plpgsql
AS $$
DECLARE
found_jobs procrastinate_jobs;
BEGIN
WITH candidate AS (
SELECT jobs.*
FROM procrastinate_jobs AS jobs
WHERE
-- reject the job if its lock has earlier jobs
NOT EXISTS (
SELECT 1
FROM procrastinate_jobs AS earlier_jobs
WHERE earlier_jobs.lock = jobs.lock
AND earlier_jobs.status IN ('todo', 'doing')
AND earlier_jobs.id < jobs.id)
AND jobs.status = 'todo'
AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names ))
AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now())
ORDER BY jobs.id ASC LIMIT 1
FOR UPDATE OF jobs SKIP LOCKED
)
UPDATE procrastinate_jobs
SET status = 'doing'
FROM candidate
WHERE procrastinate_jobs.id = candidate.id
RETURNING procrastinate_jobs.* INTO found_jobs;

RETURN found_jobs;
END;
$$;

CREATE OR REPLACE FUNCTION procrastinate_finish_job(job_id integer, end_status procrastinate_job_status, next_scheduled_at timestamp with time zone) RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
UPDATE procrastinate_jobs
SET status = end_status,
attempts = attempts + 1,
scheduled_at = COALESCE(next_scheduled_at, scheduled_at)
WHERE id = job_id;
END;
$$;
68 changes: 32 additions & 36 deletions procrastinate/sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -35,45 +35,46 @@ CREATE TABLE procrastinate_jobs (
-- this prevents from having several jobs with the same queueing lock in the "todo" state
CREATE UNIQUE INDEX procrastinate_jobs_queueing_lock_idx ON procrastinate_jobs (queueing_lock) WHERE status = 'todo';

-- this prevents from having several jobs with the same lock in the "doing" state
CREATE UNIQUE INDEX procrastinate_jobs_lock_idx ON procrastinate_jobs (lock) WHERE status = 'doing';

CREATE INDEX ON procrastinate_jobs(queue_name);
k4nar marked this conversation as resolved.
Show resolved Hide resolved

CREATE TABLE procrastinate_events (
id BIGSERIAL PRIMARY KEY,
job_id integer NOT NULL REFERENCES procrastinate_jobs ON DELETE CASCADE,
type procrastinate_job_event_type,
at timestamp with time zone DEFAULT NOW() NULL
);

CREATE TABLE procrastinate_job_locks (
object text PRIMARY KEY
);

CREATE FUNCTION procrastinate_fetch_job(target_queue_names character varying[]) RETURNS procrastinate_jobs
LANGUAGE plpgsql
AS $$
DECLARE
found_jobs procrastinate_jobs;
BEGIN
WITH potential_job AS (
SELECT procrastinate_jobs.*
FROM procrastinate_jobs
LEFT JOIN procrastinate_job_locks ON procrastinate_job_locks.object = procrastinate_jobs.lock
WHERE (target_queue_names IS NULL OR queue_name = ANY( target_queue_names ))
AND procrastinate_job_locks.object IS NULL
AND status = 'todo'
AND (scheduled_at IS NULL OR scheduled_at <= now())
ORDER BY id ASC
FOR UPDATE OF procrastinate_jobs SKIP LOCKED LIMIT 1
), lock_object AS (
INSERT INTO procrastinate_job_locks
SELECT lock FROM potential_job
ON CONFLICT DO NOTHING
RETURNING object
)
UPDATE procrastinate_jobs
SET status = 'doing'
FROM potential_job, lock_object
WHERE lock_object.object IS NOT NULL
AND procrastinate_jobs.id = potential_job.id
RETURNING procrastinate_jobs.* INTO found_jobs;
WITH candidate AS (
SELECT jobs.*
FROM procrastinate_jobs AS jobs
WHERE
-- reject the job if its lock has earlier jobs
NOT EXISTS (
SELECT 1
FROM procrastinate_jobs AS earlier_jobs
WHERE earlier_jobs.lock = jobs.lock
AND earlier_jobs.status IN ('todo', 'doing')
AND earlier_jobs.id < jobs.id)
AND jobs.status = 'todo'
AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names ))
AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now())
ORDER BY jobs.id ASC LIMIT 1
FOR UPDATE OF jobs SKIP LOCKED
)
UPDATE procrastinate_jobs
SET status = 'doing'
FROM candidate
WHERE procrastinate_jobs.id = candidate.id
RETURNING procrastinate_jobs.* INTO found_jobs;

RETURN found_jobs;
END;
Expand All @@ -83,14 +84,11 @@ CREATE FUNCTION procrastinate_finish_job(job_id integer, end_status procrastinat
LANGUAGE plpgsql
AS $$
BEGIN
WITH finished_job AS (
UPDATE procrastinate_jobs
SET status = end_status,
attempts = attempts + 1,
scheduled_at = COALESCE(next_scheduled_at, scheduled_at)
WHERE id = job_id RETURNING lock
)
DELETE FROM procrastinate_job_locks WHERE object = (SELECT lock FROM finished_job);
UPDATE procrastinate_jobs
SET status = end_status,
attempts = attempts + 1,
scheduled_at = COALESCE(next_scheduled_at, scheduled_at)
WHERE id = job_id;
END;
$$;

Expand Down Expand Up @@ -160,8 +158,6 @@ BEGIN
END;
$$;

CREATE INDEX ON procrastinate_jobs(queue_name);

CREATE TRIGGER procrastinate_jobs_notify_queue
AFTER INSERT ON procrastinate_jobs
FOR EACH ROW WHEN ((new.status = 'todo'::procrastinate_job_status))
Expand Down