Skip to content

Commit

Permalink
Merge pull request #402 from procrastinate-org/periodic-lock-400
Browse files Browse the repository at this point in the history
  • Loading branch information
Joachim Jablon authored Oct 3, 2021
2 parents 92e2ba5 + 895663d commit cfb5fbc
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 5 deletions.
3 changes: 1 addition & 2 deletions procrastinate/manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import asyncio
import datetime
import logging
import uuid
from typing import Any, Dict, Iterable, Optional

from procrastinate import connector, exceptions, jobs, sql, utils
Expand Down Expand Up @@ -62,7 +61,7 @@ def _defer_job_query_kwargs(self, job: jobs.Job) -> Dict[str, Any]:
"query": sql.queries["defer_job"],
"task_name": job.task_name,
"queue": job.queue,
"lock": job.lock or str(uuid.uuid4()),
"lock": job.lock,
"queueing_lock": job.queueing_lock,
"args": job.task_kwargs,
"scheduled_at": job.scheduled_at,
Expand Down
36 changes: 36 additions & 0 deletions procrastinate/sql/migrations/00.19.00_02_null_locks_excluded.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
DROP FUNCTION IF EXISTS procrastinate_fetch_job(target_queue_names character varying[]);

CREATE FUNCTION procrastinate_fetch_job(target_queue_names character varying[]) RETURNS procrastinate_jobs
LANGUAGE plpgsql
AS $$
DECLARE
found_jobs procrastinate_jobs;
BEGIN
WITH candidate AS (
SELECT jobs.*
FROM procrastinate_jobs AS jobs
WHERE
-- reject the job if its lock has earlier jobs
NOT EXISTS (
SELECT 1
FROM procrastinate_jobs AS earlier_jobs
WHERE
jobs.lock IS NOT NULL
AND earlier_jobs.lock = jobs.lock
AND earlier_jobs.status IN ('todo', 'doing')
AND earlier_jobs.id < jobs.id)
AND jobs.status = 'todo'
AND (target_queue_names IS NULL OR jobs.queue_name = ANY( target_queue_names ))
AND (jobs.scheduled_at IS NULL OR jobs.scheduled_at <= now())
ORDER BY jobs.id ASC LIMIT 1
FOR UPDATE OF jobs SKIP LOCKED
)
UPDATE procrastinate_jobs
SET status = 'doing'
FROM candidate
WHERE procrastinate_jobs.id = candidate.id
RETURNING procrastinate_jobs.* INTO found_jobs;

RETURN found_jobs;
END;
$$;
4 changes: 3 additions & 1 deletion procrastinate/sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ BEGIN
NOT EXISTS (
SELECT 1
FROM procrastinate_jobs AS earlier_jobs
WHERE earlier_jobs.lock = jobs.lock
WHERE
jobs.lock IS NOT NULL
AND earlier_jobs.lock = jobs.lock
AND earlier_jobs.status IN ('todo', 'doing')
AND earlier_jobs.id < jobs.id)
AND jobs.status = 'todo'
Expand Down
4 changes: 3 additions & 1 deletion procrastinate/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,9 @@ def defer_periodic_job_one(

@property
def current_locks(self) -> Iterable[str]:
return {job["lock"] for job in self.jobs.values() if job["status"] == "doing"}
return {
job["lock"] for job in self.jobs.values() if job["status"] == "doing"
} - {None}

@property
def finished_jobs(self) -> List[JobRow]:
Expand Down
11 changes: 10 additions & 1 deletion tests/integration/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,15 @@ async def test_fetch_job_not_fetching_locked_job(
assert await pg_job_manager.fetch_job(queues=None) is None


async def test_fetch_job_spacial_case_none_lock(
pg_job_manager, deferred_job_factory, fetched_job_factory
):
await fetched_job_factory(lock=None)
job = await deferred_job_factory(lock=None)

assert (await pg_job_manager.fetch_job(queues=None)).id == job.id


@pytest.mark.parametrize(
"job_kwargs, fetch_queues",
[
Expand All @@ -89,7 +98,7 @@ async def test_fetch_job_not_fetching_locked_job(
({"scheduled_at": conftest.aware_datetime(2100, 1, 1)}, None),
],
)
async def test_get_job_no_result(
async def test_fetch_job_no_result(
pg_job_manager, deferred_job_factory, job_kwargs, fetch_queues
):
await deferred_job_factory(**job_kwargs)
Expand Down
23 changes: 23 additions & 0 deletions tests/unit/test_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,29 @@ def test_fetch_job_one(connector):
assert connector.fetch_job_one(queues=["marsupilami"])["id"] == 5


def test_fetch_job_one_none_lock(connector):
"""Testing that 2 jobs with locks "None" don't block one another"""
connector.defer_job_one(
task_name="mytask",
args={},
queue="default",
scheduled_at=None,
lock=None,
queueing_lock=None,
)
connector.defer_job_one(
task_name="mytask",
args={},
queue="default",
scheduled_at=None,
lock=None,
queueing_lock=None,
)

assert connector.fetch_job_one(queues=None)["id"] == 1
assert connector.fetch_job_one(queues=None)["id"] == 2


def test_finish_job_run(connector):
connector.defer_job_one(
task_name="mytask",
Expand Down

0 comments on commit cfb5fbc

Please sign in to comment.