Skip to content

Commit

Permalink
Use the jobs scheduled date when retrieving them
Browse files Browse the repository at this point in the history
  • Loading branch information
Sébastien Eustace committed May 6, 2019
1 parent ccc7419 commit 7c77675
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 47 deletions.
8 changes: 5 additions & 3 deletions cabbage/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
from cabbage import exceptions, jobs, store, types

insert_jobs_sql = """
INSERT INTO cabbage_jobs (queue_id, task_name, lock, args)
SELECT id, %(task_name)s, %(lock)s, %(args)s
INSERT INTO cabbage_jobs (queue_id, task_name, lock, args, scheduled_at)
SELECT id, %(task_name)s, %(lock)s, %(args)s, %(scheduled_at)s
FROM cabbage_queues WHERE queue_name=%(queue)s
RETURNING id;
"""

select_jobs_sql = """
SELECT id, task_name, lock, args FROM cabbage_fetch_job(%(queue)s);
SELECT id, task_name, lock, args, scheduled_at FROM cabbage_fetch_job(%(queue)s);
"""
finish_job_sql = """
SELECT cabbage_finish_job(%(job_id)s, %(status)s);
Expand Down Expand Up @@ -49,6 +49,7 @@ def launch_job(connection: psycopg2._psycopg.connection, job: jobs.Job) -> int:
"task_name": job.task_name,
"lock": job.lock,
"args": job.task_kwargs,
"scheduled_at": job.scheduled_at,
"queue": job.queue,
},
)
Expand Down Expand Up @@ -80,6 +81,7 @@ def get_jobs(
"lock": row["lock"],
"task_name": row["task_name"],
"task_kwargs": row["args"],
"scheduled_at": row["scheduled_at"],
"queue": queue,
}

Expand Down
20 changes: 9 additions & 11 deletions cabbage/testing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from itertools import count
from typing import Dict, Iterator, List, Optional

import attr
import pendulum

from cabbage import jobs, store


Expand All @@ -19,21 +22,16 @@ def register_queue(self, queue: str) -> Optional[int]:

def launch_job(self, job: jobs.Job) -> int:
id = next(self.job_counter)
self.jobs[job.queue].append(
jobs.Job(
id=id,
task_name=job.task_name,
lock=job.lock,
task_kwargs=job.task_kwargs,
queue=job.queue,
job_store=self,
)
)
self.jobs[job.queue].append(attr.evolve(job, id=id))

return id

def get_jobs(self, queue: str) -> Iterator[jobs.Job]:
# Creating a copy of the iterable so that we can modify it while we iterate

for job in list(self.jobs[queue]):
yield job
if not job.scheduled_at or job.scheduled_at <= pendulum.now("UTC"):
yield job

def finish_job(self, job: jobs.Job, status: jobs.Status) -> None:
j = self.jobs[job.queue].pop(0)
Expand Down
8 changes: 6 additions & 2 deletions init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ CREATE TABLE public.cabbage_jobs (
task_name character varying(32) NOT NULL,
lock text,
args jsonb DEFAULT '{}' NOT NULL,
status public.cabbage_job_status DEFAULT 'todo'::public.cabbage_job_status NOT NULL
status public.cabbage_job_status DEFAULT 'todo'::public.cabbage_job_status NOT NULL,
scheduled_at timestamp with time zone NULL
);


Expand All @@ -66,7 +67,10 @@ BEGIN
SELECT cabbage_jobs.*
FROM cabbage_jobs
LEFT JOIN cabbage_job_locks ON cabbage_job_locks.object = cabbage_jobs.lock
WHERE queue_id = target_queue_id AND cabbage_job_locks.object IS NULL AND status = 'todo'
WHERE queue_id = target_queue_id
AND cabbage_job_locks.object IS NULL
AND status = 'todo'
AND (scheduled_at IS NULL OR scheduled_at <= now())
FOR UPDATE OF cabbage_jobs SKIP LOCKED LIMIT 1
), lock_object AS (
INSERT INTO cabbage_job_locks
Expand Down
35 changes: 34 additions & 1 deletion tests/integration/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import threading
import time

import pendulum
import psycopg2
import pytest

Expand Down Expand Up @@ -123,10 +124,33 @@ def test_get_jobs(job_store):
job_store=job_store,
)
)
job_store.launch_job(
jobs.Job(
id=0,
queue="queue_a",
task_name="task_5",
lock="lock_5",
task_kwargs={"i": "j"},
scheduled_at=pendulum.datetime(2000, 1, 1),
job_store=job_store,
)
)
# We won't see this one because of the scheduled date
job_store.launch_job(
jobs.Job(
id=0,
queue="queue_a",
task_name="task_6",
lock="lock_6",
task_kwargs={"k": "l"},
scheduled_at=pendulum.datetime(2050, 1, 1),
job_store=job_store,
)
)

result = list(job_store.get_jobs("queue_a"))

t1, t2 = result
t1, t2, t3 = result
assert result == [
jobs.Job(
id=t1.id,
Expand All @@ -144,6 +168,15 @@ def test_get_jobs(job_store):
queue="queue_a",
job_store=job_store,
),
jobs.Job(
id=t3.id,
queue="queue_a",
task_name="task_5",
lock="lock_5",
task_kwargs={"i": "j"},
scheduled_at=pendulum.datetime(2000, 1, 1),
job_store=job_store,
),
]


Expand Down
19 changes: 19 additions & 0 deletions tests/unit/test_testing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import pendulum


def test_get_jobs_scheduled_jobs(job_store, job_factory):
job_store.register_queue("foo")

job_store.launch_job(
job=job_factory(queue="foo", scheduled_at=pendulum.datetime(2000, 1, 1))
)
job_store.launch_job(
job=job_factory(queue="foo", scheduled_at=pendulum.now().subtract(minutes=1))
)
job_store.launch_job(
job=job_factory(queue="foo", scheduled_at=pendulum.datetime(2050, 1, 1))
)

jobs = list(job_store.get_jobs(queue="foo"))

assert {job.id for job in jobs} == {0, 1}
31 changes: 1 addition & 30 deletions tests/unit/test_worker.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,6 @@
import pytest

from cabbage import exceptions, jobs, tasks, testing, worker


@pytest.fixture
def job_store():
return testing.InMemoryJobStore()


@pytest.fixture
def task_manager(job_store):
return tasks.TaskManager(job_store=job_store)


@pytest.fixture
def job_factory(job_store):
defaults = {
"id": 42,
"task_name": "bla",
"task_kwargs": {},
"lock": None,
"queue": "queue",
"job_store": job_store,
}

def factory(**kwargs):
final_kwargs = defaults.copy()
final_kwargs.update(kwargs)
return jobs.Job(**final_kwargs)

return factory
from cabbage import exceptions, jobs, tasks, worker


def test_run(task_manager, mocker):
Expand Down

0 comments on commit 7c77675

Please sign in to comment.