Skip to content

Commit

Permalink
Close race condition in procrastinate_fetch_job
Browse files Browse the repository at this point in the history
This closes a race condition in the procrastinate_fetch_job plpgsql function,
where jobs sharing the same lock can be run out of order.

With this commit jobs with the same lock are **always** executed in order,
whatever their ETAs and queues.

In effect:
- if job A in queue 1 (id 1) and job B in queue 2 (id 2) have the same lock,
  and no workers process queue 1, then job B won't be executed, because
  job A must be executed first
- if job A is deferred with ETA 1 year, no other jobs with the same lock will
  be executed for 1 year

The lock name may change from "lock" to "serial lock" in the future.
  • Loading branch information
Éric Lemoine committed Jun 5, 2020
1 parent 5e47d99 commit 66b3070
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 36 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
DROP TABLE IF EXISTS procrastinate_job_locks;

CREATE UNIQUE INDEX procrastinate_jobs_lock_idx ON procrastinate_jobs (lock) WHERE status = 'doing';

CREATE OR REPLACE FUNCTION procrastinate_fetch_job(target_queue_names character varying[]) RETURNS procrastinate_jobs
LANGUAGE plpgsql
AS $$
DECLARE
found_jobs procrastinate_jobs;
BEGIN
WITH candidate AS (
SELECT jobs.*
FROM procrastinate_jobs AS jobs
LEFT JOIN procrastinate_jobs AS earlier_jobs
ON (earlier_jobs.lock = jobs.lock
AND earlier_jobs.status IN ('todo', 'doing')
AND earlier_jobs.id < jobs.id)
WHERE
earlier_jobs.id IS NULL
AND jobs.status = 'todo'
AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names ))
AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now())
ORDER BY jobs.id ASC LIMIT 1
FOR UPDATE OF jobs SKIP LOCKED
)
UPDATE procrastinate_jobs
SET status = 'doing'
FROM candidate
WHERE procrastinate_jobs.id = candidate.id
RETURNING procrastinate_jobs.* INTO found_jobs;

RETURN found_jobs;
END;
$$;

CREATE OR REPLACE FUNCTION procrastinate_finish_job(job_id integer, end_status procrastinate_job_status, next_scheduled_at timestamp with time zone) RETURNS void
LANGUAGE plpgsql
AS $$
BEGIN
UPDATE procrastinate_jobs
SET status = end_status,
attempts = attempts + 1,
scheduled_at = COALESCE(next_scheduled_at, scheduled_at)
WHERE id = job_id;
END;
$$;
66 changes: 30 additions & 36 deletions procrastinate/sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -35,45 +35,44 @@ CREATE TABLE procrastinate_jobs (
-- this prevents from having several jobs with the same queueing lock in the "todo" state
CREATE UNIQUE INDEX procrastinate_jobs_queueing_lock_idx ON procrastinate_jobs (queueing_lock) WHERE status = 'todo';

-- this prevents from having several jobs with the same lock in the "doing" state
CREATE UNIQUE INDEX procrastinate_jobs_lock_idx ON procrastinate_jobs (lock) WHERE status = 'doing';

CREATE INDEX ON procrastinate_jobs(queue_name);

CREATE TABLE procrastinate_events (
id BIGSERIAL PRIMARY KEY,
job_id integer NOT NULL REFERENCES procrastinate_jobs ON DELETE CASCADE,
type procrastinate_job_event_type,
at timestamp with time zone DEFAULT NOW() NULL
);

CREATE TABLE procrastinate_job_locks (
object text PRIMARY KEY
);

CREATE FUNCTION procrastinate_fetch_job(target_queue_names character varying[]) RETURNS procrastinate_jobs
LANGUAGE plpgsql
AS $$
DECLARE
found_jobs procrastinate_jobs;
BEGIN
WITH potential_job AS (
SELECT procrastinate_jobs.*
FROM procrastinate_jobs
LEFT JOIN procrastinate_job_locks ON procrastinate_job_locks.object = procrastinate_jobs.lock
WHERE (target_queue_names IS NULL OR queue_name = ANY( target_queue_names ))
AND procrastinate_job_locks.object IS NULL
AND status = 'todo'
AND (scheduled_at IS NULL OR scheduled_at <= now())
ORDER BY id ASC
FOR UPDATE OF procrastinate_jobs SKIP LOCKED LIMIT 1
), lock_object AS (
INSERT INTO procrastinate_job_locks
SELECT lock FROM potential_job
ON CONFLICT DO NOTHING
RETURNING object
)
UPDATE procrastinate_jobs
SET status = 'doing'
FROM potential_job, lock_object
WHERE lock_object.object IS NOT NULL
AND procrastinate_jobs.id = potential_job.id
RETURNING procrastinate_jobs.* INTO found_jobs;
WITH candidate AS (
SELECT jobs.*
FROM procrastinate_jobs AS jobs
LEFT JOIN procrastinate_jobs AS earlier_jobs
ON (earlier_jobs.lock = jobs.lock
AND earlier_jobs.status IN ('todo', 'doing')
AND earlier_jobs.id < jobs.id)
WHERE
earlier_jobs.id IS NULL -- reject jobs that have earlier jobs with the same lock
AND jobs.status = 'todo'
AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names ))
AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now())
ORDER BY jobs.id ASC LIMIT 1
FOR UPDATE OF jobs SKIP LOCKED
)
UPDATE procrastinate_jobs
SET status = 'doing'
FROM candidate
WHERE procrastinate_jobs.id = candidate.id
RETURNING procrastinate_jobs.* INTO found_jobs;

RETURN found_jobs;
END;
Expand All @@ -83,14 +82,11 @@ CREATE FUNCTION procrastinate_finish_job(job_id integer, end_status procrastinat
LANGUAGE plpgsql
AS $$
BEGIN
WITH finished_job AS (
UPDATE procrastinate_jobs
SET status = end_status,
attempts = attempts + 1,
scheduled_at = COALESCE(next_scheduled_at, scheduled_at)
WHERE id = job_id RETURNING lock
)
DELETE FROM procrastinate_job_locks WHERE object = (SELECT lock FROM finished_job);
UPDATE procrastinate_jobs
SET status = end_status,
attempts = attempts + 1,
scheduled_at = COALESCE(next_scheduled_at, scheduled_at)
WHERE id = job_id;
END;
$$;

Expand Down Expand Up @@ -160,8 +156,6 @@ BEGIN
END;
$$;

CREATE INDEX ON procrastinate_jobs(queue_name);

CREATE TRIGGER procrastinate_jobs_notify_queue
AFTER INSERT ON procrastinate_jobs
FOR EACH ROW WHEN ((new.status = 'todo'::procrastinate_job_status))
Expand Down

0 comments on commit 66b3070

Please sign in to comment.