From 7c776750db6cfbb69e0547382c9c051961e913bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Se=CC=81bastien=20Eustace?= Date: Fri, 3 May 2019 18:36:25 +0200 Subject: [PATCH] Use the jobs scheduled date when retrieving them --- cabbage/postgres.py | 8 ++++--- cabbage/testing.py | 20 ++++++++--------- init.sql | 8 +++++-- tests/integration/test_postgres.py | 35 +++++++++++++++++++++++++++++- tests/unit/test_testing.py | 19 ++++++++++++++++ tests/unit/test_worker.py | 31 +------------------------- 6 files changed, 74 insertions(+), 47 deletions(-) diff --git a/cabbage/postgres.py b/cabbage/postgres.py index 505be74db..926eb67c4 100644 --- a/cabbage/postgres.py +++ b/cabbage/postgres.py @@ -8,14 +8,14 @@ from cabbage import exceptions, jobs, store, types insert_jobs_sql = """ -INSERT INTO cabbage_jobs (queue_id, task_name, lock, args) -SELECT id, %(task_name)s, %(lock)s, %(args)s +INSERT INTO cabbage_jobs (queue_id, task_name, lock, args, scheduled_at) +SELECT id, %(task_name)s, %(lock)s, %(args)s, %(scheduled_at)s FROM cabbage_queues WHERE queue_name=%(queue)s RETURNING id; """ select_jobs_sql = """ -SELECT id, task_name, lock, args FROM cabbage_fetch_job(%(queue)s); +SELECT id, task_name, lock, args, scheduled_at FROM cabbage_fetch_job(%(queue)s); """ finish_job_sql = """ SELECT cabbage_finish_job(%(job_id)s, %(status)s); @@ -49,6 +49,7 @@ def launch_job(connection: psycopg2._psycopg.connection, job: jobs.Job) -> int: "task_name": job.task_name, "lock": job.lock, "args": job.task_kwargs, + "scheduled_at": job.scheduled_at, "queue": job.queue, }, ) @@ -80,6 +81,7 @@ def get_jobs( "lock": row["lock"], "task_name": row["task_name"], "task_kwargs": row["args"], + "scheduled_at": row["scheduled_at"], "queue": queue, } diff --git a/cabbage/testing.py b/cabbage/testing.py index c96fc5fb4..8f49045f6 100644 --- a/cabbage/testing.py +++ b/cabbage/testing.py @@ -1,6 +1,9 @@ from itertools import count from typing import Dict, Iterator, List, Optional +import attr +import pendulum + from cabbage import jobs, store @@ -19,21 +22,16 @@ def register_queue(self, queue: str) -> Optional[int]: def launch_job(self, job: jobs.Job) -> int: id = next(self.job_counter) - self.jobs[job.queue].append( - jobs.Job( - id=id, - task_name=job.task_name, - lock=job.lock, - task_kwargs=job.task_kwargs, - queue=job.queue, - job_store=self, - ) - ) + self.jobs[job.queue].append(attr.evolve(job, id=id)) + return id def get_jobs(self, queue: str) -> Iterator[jobs.Job]: + # Creating a copy of the iterable so that we can modify it while we iterate + for job in list(self.jobs[queue]): - yield job + if not job.scheduled_at or job.scheduled_at <= pendulum.now("UTC"): + yield job def finish_job(self, job: jobs.Job, status: jobs.Status) -> None: j = self.jobs[job.queue].pop(0) diff --git a/init.sql b/init.sql index eef765cfd..8e77bc5b4 100644 --- a/init.sql +++ b/init.sql @@ -43,7 +43,8 @@ CREATE TABLE public.cabbage_jobs ( task_name character varying(32) NOT NULL, lock text, args jsonb DEFAULT '{}' NOT NULL, - status public.cabbage_job_status DEFAULT 'todo'::public.cabbage_job_status NOT NULL + status public.cabbage_job_status DEFAULT 'todo'::public.cabbage_job_status NOT NULL, + scheduled_at timestamp with time zone NULL ); @@ -66,7 +67,10 @@ BEGIN SELECT cabbage_jobs.* FROM cabbage_jobs LEFT JOIN cabbage_job_locks ON cabbage_job_locks.object = cabbage_jobs.lock - WHERE queue_id = target_queue_id AND cabbage_job_locks.object IS NULL AND status = 'todo' + WHERE queue_id = target_queue_id + AND cabbage_job_locks.object IS NULL + AND status = 'todo' + AND (scheduled_at IS NULL OR scheduled_at <= now()) FOR UPDATE OF cabbage_jobs SKIP LOCKED LIMIT 1 ), lock_object AS ( INSERT INTO cabbage_job_locks diff --git a/tests/integration/test_postgres.py b/tests/integration/test_postgres.py index 8a97af7e5..0f1db7089 100644 --- a/tests/integration/test_postgres.py +++ b/tests/integration/test_postgres.py @@ -3,6 +3,7 @@ import threading import time +import pendulum import psycopg2 import pytest @@ -123,10 +124,33 @@ def test_get_jobs(job_store): job_store=job_store, ) ) + job_store.launch_job( + jobs.Job( + id=0, + queue="queue_a", + task_name="task_5", + lock="lock_5", + task_kwargs={"i": "j"}, + scheduled_at=pendulum.datetime(2000, 1, 1), + job_store=job_store, + ) + ) + # We won't see this one because of the scheduled date + job_store.launch_job( + jobs.Job( + id=0, + queue="queue_a", + task_name="task_6", + lock="lock_6", + task_kwargs={"k": "l"}, + scheduled_at=pendulum.datetime(2050, 1, 1), + job_store=job_store, + ) + ) result = list(job_store.get_jobs("queue_a")) - t1, t2 = result + t1, t2, t3 = result assert result == [ jobs.Job( id=t1.id, @@ -144,6 +168,15 @@ def test_get_jobs(job_store): queue="queue_a", job_store=job_store, ), + jobs.Job( + id=t3.id, + queue="queue_a", + task_name="task_5", + lock="lock_5", + task_kwargs={"i": "j"}, + scheduled_at=pendulum.datetime(2000, 1, 1), + job_store=job_store, + ), ] diff --git a/tests/unit/test_testing.py b/tests/unit/test_testing.py index e69de29bb..ebdc6d2ef 100644 --- a/tests/unit/test_testing.py +++ b/tests/unit/test_testing.py @@ -0,0 +1,19 @@ +import pendulum + + +def test_get_jobs_scheduled_jobs(job_store, job_factory): + job_store.register_queue("foo") + + job_store.launch_job( + job=job_factory(queue="foo", scheduled_at=pendulum.datetime(2000, 1, 1)) + ) + job_store.launch_job( + job=job_factory(queue="foo", scheduled_at=pendulum.now().subtract(minutes=1)) + ) + job_store.launch_job( + job=job_factory(queue="foo", scheduled_at=pendulum.datetime(2050, 1, 1)) + ) + + jobs = list(job_store.get_jobs(queue="foo")) + + assert {job.id for job in jobs} == {0, 1} diff --git a/tests/unit/test_worker.py b/tests/unit/test_worker.py index f41ca02c2..73a92c65a 100644 --- a/tests/unit/test_worker.py +++ b/tests/unit/test_worker.py @@ -1,35 +1,6 @@ import pytest -from cabbage import exceptions, jobs, tasks, testing, worker - - -@pytest.fixture -def job_store(): - return testing.InMemoryJobStore() - - -@pytest.fixture -def task_manager(job_store): - return tasks.TaskManager(job_store=job_store) - - -@pytest.fixture -def job_factory(job_store): - defaults = { - "id": 42, - "task_name": "bla", - "task_kwargs": {}, - "lock": None, - "queue": "queue", - "job_store": job_store, - } - - def factory(**kwargs): - final_kwargs = defaults.copy() - final_kwargs.update(kwargs) - return jobs.Job(**final_kwargs) - - return factory +from cabbage import exceptions, jobs, tasks, worker def test_run(task_manager, mocker):