From 8fa4ab5925ea3641ffd3c96ec9f49c8e9938767e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=89ric=20Lemoine?= Date: Fri, 5 Jun 2020 11:12:59 +0200 Subject: [PATCH 1/3] Close race condition in procrastinate_fetch_job This closes a race condition in the procrastinate_fetch_job plpgsql function, where jobs sharing the same lock can be run out of order. With this commit jobs with the same lock are **always** executed in order, whatever their ETAs and queues. In effect: - if job A in queue 1 (id 1) and job B in queue 2 (id 2) have the same lock, and no workers process queue 1, then job B won't be executed, because job A must be executed first - if job A is deferred with ETA 1 year, no other jobs with the same lock will be executed for 1 year The lock name may change from "lock" to "serial lock" in the future. --- ...9.0_001_close_fetch_job_race_condition.sql | 46 +++++++++++++ procrastinate/sql/schema.sql | 66 +++++++++---------- 2 files changed, 76 insertions(+), 36 deletions(-) create mode 100644 procrastinate/sql/migrations/delta_0.9.0_001_close_fetch_job_race_condition.sql diff --git a/procrastinate/sql/migrations/delta_0.9.0_001_close_fetch_job_race_condition.sql b/procrastinate/sql/migrations/delta_0.9.0_001_close_fetch_job_race_condition.sql new file mode 100644 index 000000000..62a1ddbe3 --- /dev/null +++ b/procrastinate/sql/migrations/delta_0.9.0_001_close_fetch_job_race_condition.sql @@ -0,0 +1,46 @@ +DROP TABLE IF EXISTS procrastinate_job_locks; + +CREATE UNIQUE INDEX procrastinate_jobs_lock_idx ON procrastinate_jobs (lock) WHERE status = 'doing'; + +CREATE OR REPLACE FUNCTION procrastinate_fetch_job(target_queue_names character varying[]) RETURNS procrastinate_jobs + LANGUAGE plpgsql + AS $$ +DECLARE + found_jobs procrastinate_jobs; +BEGIN + WITH candidate AS ( + SELECT jobs.* + FROM procrastinate_jobs AS jobs + LEFT JOIN procrastinate_jobs AS earlier_jobs + ON (earlier_jobs.lock = jobs.lock + AND earlier_jobs.status IN ('todo', 'doing') + AND earlier_jobs.id < jobs.id) + WHERE + earlier_jobs.id IS NULL -- reject jobs that have earlier jobs with the same lock + AND jobs.status = 'todo' + AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names )) + AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) + ORDER BY jobs.id ASC LIMIT 1 + FOR UPDATE OF jobs SKIP LOCKED + ) + UPDATE procrastinate_jobs + SET status = 'doing' + FROM candidate + WHERE procrastinate_jobs.id = candidate.id + RETURNING procrastinate_jobs.* INTO found_jobs; + + RETURN found_jobs; +END; +$$; + +CREATE OR REPLACE FUNCTION procrastinate_finish_job(job_id integer, end_status procrastinate_job_status, next_scheduled_at timestamp with time zone) RETURNS void + LANGUAGE plpgsql + AS $$ +BEGIN + UPDATE procrastinate_jobs + SET status = end_status, + attempts = attempts + 1, + scheduled_at = COALESCE(next_scheduled_at, scheduled_at) + WHERE id = job_id; +END; +$$; diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index f2c5e4fa0..7eb44a69e 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -35,6 +35,11 @@ CREATE TABLE procrastinate_jobs ( -- this prevents from having several jobs with the same queueing lock in the "todo" state CREATE UNIQUE INDEX procrastinate_jobs_queueing_lock_idx ON procrastinate_jobs (queueing_lock) WHERE status = 'todo'; +-- this prevents from having several jobs with the same lock in the "doing" state +CREATE UNIQUE INDEX procrastinate_jobs_lock_idx ON procrastinate_jobs (lock) WHERE status = 'doing'; + +CREATE INDEX ON procrastinate_jobs(queue_name); + CREATE TABLE procrastinate_events ( id BIGSERIAL PRIMARY KEY, job_id integer NOT NULL REFERENCES procrastinate_jobs ON DELETE CASCADE, @@ -42,38 +47,32 @@ CREATE TABLE procrastinate_events ( at timestamp with time zone DEFAULT NOW() NULL ); -CREATE TABLE procrastinate_job_locks ( - object text PRIMARY KEY -); - CREATE FUNCTION procrastinate_fetch_job(target_queue_names character varying[]) RETURNS procrastinate_jobs LANGUAGE plpgsql AS $$ DECLARE found_jobs procrastinate_jobs; BEGIN - WITH potential_job AS ( - SELECT procrastinate_jobs.* - FROM procrastinate_jobs - LEFT JOIN procrastinate_job_locks ON procrastinate_job_locks.object = procrastinate_jobs.lock - WHERE (target_queue_names IS NULL OR queue_name = ANY( target_queue_names )) - AND procrastinate_job_locks.object IS NULL - AND status = 'todo' - AND (scheduled_at IS NULL OR scheduled_at <= now()) - ORDER BY id ASC - FOR UPDATE OF procrastinate_jobs SKIP LOCKED LIMIT 1 - ), lock_object AS ( - INSERT INTO procrastinate_job_locks - SELECT lock FROM potential_job - ON CONFLICT DO NOTHING - RETURNING object - ) - UPDATE procrastinate_jobs - SET status = 'doing' - FROM potential_job, lock_object - WHERE lock_object.object IS NOT NULL - AND procrastinate_jobs.id = potential_job.id - RETURNING procrastinate_jobs.* INTO found_jobs; + WITH candidate AS ( + SELECT jobs.* + FROM procrastinate_jobs AS jobs + LEFT JOIN procrastinate_jobs AS earlier_jobs + ON (earlier_jobs.lock = jobs.lock + AND earlier_jobs.status IN ('todo', 'doing') + AND earlier_jobs.id < jobs.id) + WHERE + earlier_jobs.id IS NULL -- reject jobs that have earlier jobs with the same lock + AND jobs.status = 'todo' + AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names )) + AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) + ORDER BY jobs.id ASC LIMIT 1 + FOR UPDATE OF jobs SKIP LOCKED + ) + UPDATE procrastinate_jobs + SET status = 'doing' + FROM candidate + WHERE procrastinate_jobs.id = candidate.id + RETURNING procrastinate_jobs.* INTO found_jobs; RETURN found_jobs; END; @@ -83,14 +82,11 @@ CREATE FUNCTION procrastinate_finish_job(job_id integer, end_status procrastinat LANGUAGE plpgsql AS $$ BEGIN - WITH finished_job AS ( - UPDATE procrastinate_jobs - SET status = end_status, - attempts = attempts + 1, - scheduled_at = COALESCE(next_scheduled_at, scheduled_at) - WHERE id = job_id RETURNING lock - ) - DELETE FROM procrastinate_job_locks WHERE object = (SELECT lock FROM finished_job); + UPDATE procrastinate_jobs + SET status = end_status, + attempts = attempts + 1, + scheduled_at = COALESCE(next_scheduled_at, scheduled_at) + WHERE id = job_id; END; $$; @@ -160,8 +156,6 @@ BEGIN END; $$; -CREATE INDEX ON procrastinate_jobs(queue_name); - CREATE TRIGGER procrastinate_jobs_notify_queue AFTER INSERT ON procrastinate_jobs FOR EACH ROW WHEN ((new.status = 'todo'::procrastinate_job_status)) From 6ba2ecdb063e138cd71b3579745967a3881885fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=89ric=20Lemoine?= Date: Fri, 5 Jun 2020 15:19:12 +0200 Subject: [PATCH 2/3] Remove outdated material from the discussions section --- docs/discussions.rst | 51 +++++++++++++++++--------------------------- 1 file changed, 20 insertions(+), 31 deletions(-) diff --git a/docs/discussions.rst b/docs/discussions.rst index f6893d3cc..407e2360f 100644 --- a/docs/discussions.rst +++ b/docs/discussions.rst @@ -80,10 +80,10 @@ About locks ----------- Let's say we have a :term:`task` that writes a character at the end of a file after -waiting for a random amount of time. This represents a real world problem where tasks +waiting for a random amount of time. This represents a real world problem where jobs take an unforeseeable amount of time and share resources like a database. -We launch 4 tasks respectively writing ``a``, ``b``, ``c`` and ``d``. We would expect +We launch 4 jobs respectively writing ``a``, ``b``, ``c`` and ``d``. We would expect the file to contain ``abcd``, but it's not the case, for example maybe it's ``badc``. The jobs were taken from the queue in order, but because we have several workers, the jobs were launched in parallel and because their duration is random, the final result @@ -91,33 +91,37 @@ pretty much is too. We can solve this problem by using locks. Procrastinate gives us two guarantees: -- Tasks are consumed in creation order. When a worker requests a task, it will always - receive the oldest available task. Unavailable tasks, either locked, scheduled for the - future or in a queue that the worker doesn't listen to, will be ignored. -- If a group of tasks share the same lock, then only one can be executed at a time. +- Jobs are consumed in creation order. When a worker requests a job, it can recieve + a job with a lock, or a job without a lock. If there is a lock, then the received + job will be the oldest one with that lock. If the oldest job awaiting execution is + not available for this worker (either it's on a queue that this worker doesn't + listen to, or it's scheduled in the future), then jobs with this lock will not be + considered. +- If a group of jobs share the same lock, then only one can be executed at a time. -These two facts allow us to draw the following conclusion for our 4 letter tasks from -above. If our 4 tasks share the same lock (for example, the name of the file we're +These two facts allow us to draw the following conclusion for our 4 letter jobs from +above. If our 4 jobs share the same lock (for example, the name of the file we're writing to): -- The 4 tasks will be started in order; -- A task will not start before the previous one is finished. +- The 4 jobs will be started in order; +- A job will not start before the previous one is finished. This says we can safely expect the file to contain ``abcd``. Note that Procrastinate will use PostgreSQL to search the jobs table for suitable jobs. -Even if the database contains a high proportion of locked tasks, this will barely affect -Procrastinates's capacity to quickly find the free tasks. +Even if the database contains a high proportion of locked jobs, this will barely affect +Procrastinates's capacity to quickly find the free jobs. A good string identifier for the lock is a string identifier of the shared resource, UUIDs are well suited for this. If multiple resources are implicated, a combination of their identifiers could be used (there's no hard limit on the length of a lock string, but stay reasonable). -A task can only take a single lock so there's no dead-lock scenario possible where two -running tasks are waiting one another. That being said, if a worker dies with a lock, it -will be up to you to free it. If the task fails but the worker survives though, the -lock will be freed. +A job can only take a single lock so there's no dead-lock scenario possible where two +running jobs are waiting for one another. If a worker is killed without ending its job, +following jobs with the same lock will not run until the interrupted job is either +manually set to "failed" or "succeeded". If a job simply fails, following jobs with the +same locks may run. For a more practical approach, see `howto/locks`. @@ -229,21 +233,6 @@ thing directly with queries. This is so that the database is solely responsible consistency, and would allow us to have the same behavior if someone were to write a procrastinate compatible client, in Python or in another language altogether. -The ``procrastinate_job_locks`` table -^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - -We could have used PostgreSQL's `advisory locks`_, and we choose to kinda "reimplement -the wheel" with a dedicated table. This is because we made the choice that if a worker -dies holding a lock, we'd rather have a human examine the situation and manually free -the lock than having the lock been automatically freed, and fail our locks consistency -guarantee. - -.. _`advisory locks`: https://www.postgresql.org/docs/10/explicit-locking.html#ADVISORY-LOCKS - -So far, Procrastinate implements async job :term:`deferring `, and async job -executing but not in parallel, meaning it can run jobs written as a coroutine, but it -will only execute one job at a time. - Why is Procrastinate asynchronous at core? ------------------------------------------ From fac4636a782ffb5e1327b9dce334c4811c5c7c18 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=89ric=20Lemoine?= Date: Fri, 5 Jun 2020 17:08:29 +0200 Subject: [PATCH 3/3] Rewrite fetch_job query with NOT EXISTS The result is the same, but it makes the query more easily readable. --- ...elta_0.9.0_001_close_fetch_job_race_condition.sql | 12 +++++++----- procrastinate/sql/schema.sql | 12 +++++++----- 2 files changed, 14 insertions(+), 10 deletions(-) diff --git a/procrastinate/sql/migrations/delta_0.9.0_001_close_fetch_job_race_condition.sql b/procrastinate/sql/migrations/delta_0.9.0_001_close_fetch_job_race_condition.sql index 62a1ddbe3..26849a0a2 100644 --- a/procrastinate/sql/migrations/delta_0.9.0_001_close_fetch_job_race_condition.sql +++ b/procrastinate/sql/migrations/delta_0.9.0_001_close_fetch_job_race_condition.sql @@ -11,12 +11,14 @@ BEGIN WITH candidate AS ( SELECT jobs.* FROM procrastinate_jobs AS jobs - LEFT JOIN procrastinate_jobs AS earlier_jobs - ON (earlier_jobs.lock = jobs.lock - AND earlier_jobs.status IN ('todo', 'doing') - AND earlier_jobs.id < jobs.id) WHERE - earlier_jobs.id IS NULL -- reject jobs that have earlier jobs with the same lock + -- reject the job if its lock has earlier jobs + NOT EXISTS ( + SELECT 1 + FROM procrastinate_jobs AS earlier_jobs + WHERE earlier_jobs.lock = jobs.lock + AND earlier_jobs.status IN ('todo', 'doing') + AND earlier_jobs.id < jobs.id) AND jobs.status = 'todo' AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names )) AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now()) diff --git a/procrastinate/sql/schema.sql b/procrastinate/sql/schema.sql index 7eb44a69e..82e3e9aab 100644 --- a/procrastinate/sql/schema.sql +++ b/procrastinate/sql/schema.sql @@ -56,12 +56,14 @@ BEGIN WITH candidate AS ( SELECT jobs.* FROM procrastinate_jobs AS jobs - LEFT JOIN procrastinate_jobs AS earlier_jobs - ON (earlier_jobs.lock = jobs.lock - AND earlier_jobs.status IN ('todo', 'doing') - AND earlier_jobs.id < jobs.id) WHERE - earlier_jobs.id IS NULL -- reject jobs that have earlier jobs with the same lock + -- reject the job if its lock has earlier jobs + NOT EXISTS ( + SELECT 1 + FROM procrastinate_jobs AS earlier_jobs + WHERE earlier_jobs.lock = jobs.lock + AND earlier_jobs.status IN ('todo', 'doing') + AND earlier_jobs.id < jobs.id) AND jobs.status = 'todo' AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names )) AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now())