Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for job scheduling #26

Merged
merged 2 commits into from
May 10, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion cabbage/jobs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import logging
from enum import Enum
from typing import Optional
Expand All @@ -10,6 +11,13 @@
logger = logging.getLogger(__name__)


def check_aware(
instance: "Job", attribute: attr.Attribute, value: datetime.datetime
) -> None:
if value and value.utcoffset() is None:
raise ValueError("Timezone aware datetime is required")


class Status(Enum):
TODO = "todo"
DOING = "doing"
Expand All @@ -24,6 +32,9 @@ class Job:
lock: str
task_name: str
task_kwargs: types.JSONDict = attr.ib(factory=dict)
scheduled_at: Optional[datetime.datetime] = attr.ib(
default=None, validator=check_aware
)
job_store: "cabbage.store.JobStore"

def defer(self, **task_kwargs: types.JSONValue) -> int:
Expand All @@ -40,6 +51,11 @@ def defer(self, **task_kwargs: types.JSONValue) -> int:
return id

def get_context(self) -> types.JSONDict:
return attr.asdict(
context = attr.asdict(
self, filter=attr.filters.exclude(attr.fields(Job).job_store)
)

if context["scheduled_at"]:
context["scheduled_at"] = context["scheduled_at"].isoformat()

return context
8 changes: 5 additions & 3 deletions cabbage/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
from cabbage import exceptions, jobs, store, types

insert_jobs_sql = """
INSERT INTO cabbage_jobs (queue_id, task_name, lock, args)
SELECT id, %(task_name)s, %(lock)s, %(args)s
INSERT INTO cabbage_jobs (queue_id, task_name, lock, args, scheduled_at)
SELECT id, %(task_name)s, %(lock)s, %(args)s, %(scheduled_at)s
FROM cabbage_queues WHERE queue_name=%(queue)s
RETURNING id;
"""

select_jobs_sql = """
SELECT id, task_name, lock, args FROM cabbage_fetch_job(%(queue)s);
SELECT id, task_name, lock, args, scheduled_at FROM cabbage_fetch_job(%(queue)s);
"""
finish_job_sql = """
SELECT cabbage_finish_job(%(job_id)s, %(status)s);
Expand Down Expand Up @@ -49,6 +49,7 @@ def launch_job(connection: psycopg2._psycopg.connection, job: jobs.Job) -> int:
"task_name": job.task_name,
"lock": job.lock,
"args": job.task_kwargs,
"scheduled_at": job.scheduled_at,
"queue": job.queue,
},
)
Expand Down Expand Up @@ -80,6 +81,7 @@ def get_jobs(
"lock": row["lock"],
"task_name": row["task_name"],
"task_kwargs": row["args"],
"scheduled_at": row["scheduled_at"],
"queue": queue,
}

Expand Down
12 changes: 12 additions & 0 deletions cabbage/tasks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import datetime
import functools
import logging
import uuid
from typing import Any, Callable, Dict, Optional, Set

import pendulum

from cabbage import jobs, postgres, store, types

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -35,7 +38,15 @@ def configure(
*,
lock: Optional[str] = None,
task_kwargs: Optional[types.JSONDict] = None,
schedule_at: Optional[datetime.datetime] = None,
schedule_in: Optional[Dict[str, int]] = None,
) -> jobs.Job:
if schedule_at and schedule_in is not None:
raise ValueError("Cannot set both schedule_at and schedule_in")

if schedule_in is not None:
schedule_at = pendulum.now("UTC").add(**schedule_in)

lock = lock or str(uuid.uuid4())
task_kwargs = task_kwargs or {}
return jobs.Job(
Expand All @@ -44,6 +55,7 @@ def configure(
task_name=self.name,
queue=self.queue,
task_kwargs=task_kwargs,
scheduled_at=schedule_at,
job_store=self.manager.job_store,
)

Expand Down
20 changes: 9 additions & 11 deletions cabbage/testing.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from itertools import count
from typing import Dict, Iterator, List, Optional

import attr
import pendulum

from cabbage import jobs, store


Expand All @@ -19,21 +22,16 @@ def register_queue(self, queue: str) -> Optional[int]:

def launch_job(self, job: jobs.Job) -> int:
id = next(self.job_counter)
self.jobs[job.queue].append(
jobs.Job(
id=id,
task_name=job.task_name,
lock=job.lock,
task_kwargs=job.task_kwargs,
queue=job.queue,
job_store=self,
)
)
self.jobs[job.queue].append(attr.evolve(job, id=id))

return id

def get_jobs(self, queue: str) -> Iterator[jobs.Job]:
# Creating a copy of the iterable so that we can modify it while we iterate

for job in list(self.jobs[queue]):
yield job
if not job.scheduled_at or job.scheduled_at <= pendulum.now("UTC"):
yield job

def finish_job(self, job: jobs.Job, status: jobs.Status) -> None:
j = self.jobs[job.queue].pop(0)
Expand Down
8 changes: 6 additions & 2 deletions init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ CREATE TABLE public.cabbage_jobs (
task_name character varying(32) NOT NULL,
lock text,
args jsonb DEFAULT '{}' NOT NULL,
status public.cabbage_job_status DEFAULT 'todo'::public.cabbage_job_status NOT NULL
status public.cabbage_job_status DEFAULT 'todo'::public.cabbage_job_status NOT NULL,
scheduled_at timestamp with time zone NULL
);


Expand All @@ -66,7 +67,10 @@ BEGIN
SELECT cabbage_jobs.*
FROM cabbage_jobs
LEFT JOIN cabbage_job_locks ON cabbage_job_locks.object = cabbage_jobs.lock
WHERE queue_id = target_queue_id AND cabbage_job_locks.object IS NULL AND status = 'todo'
WHERE queue_id = target_queue_id
AND cabbage_job_locks.object IS NULL
AND status = 'todo'
AND (scheduled_at IS NULL OR scheduled_at <= now())
FOR UPDATE OF cabbage_jobs SKIP LOCKED LIMIT 1
), lock_object AS (
INSERT INTO cabbage_job_locks
Expand Down
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ packages = find:
install_requires =
psycopg2
attrs
pendulum
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the setup.cfg file.

And regarding type annotations, there is a PR (python-pendulum/pendulum#320) that I need to merge at some point.


[options.extras_require]
dev =
Expand Down Expand Up @@ -57,7 +58,7 @@ max-line-length = 88
[tool:pytest]
addopts = --cov-report term-missing --cov-branch --cov-report html --cov-report term --cov=cabbage -vv

[mypy-setuptools.*,psycopg2.*]
[mypy-setuptools.*,psycopg2.*,pendulum.*]
ignore_missing_imports = True

[coverage:report]
Expand Down
31 changes: 31 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

from cabbage import jobs, tasks, testing


def _execute(cursor, query, *identifiers):
cursor.execute(
Expand Down Expand Up @@ -63,3 +65,32 @@ def f(signal=stdlib_signal.SIGTERM):
os.kill(os.getpid(), signal)

return f


@pytest.fixture
def job_store():
return testing.InMemoryJobStore()


@pytest.fixture
def task_manager(job_store):
return tasks.TaskManager(job_store=job_store)


@pytest.fixture
def job_factory(job_store):
defaults = {
"id": 42,
"task_name": "bla",
"task_kwargs": {},
"lock": None,
"queue": "queue",
"job_store": job_store,
}

def factory(**kwargs):
final_kwargs = defaults.copy()
final_kwargs.update(kwargs)
return jobs.Job(**final_kwargs)

return factory
35 changes: 34 additions & 1 deletion tests/integration/test_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import threading
import time

import pendulum
import psycopg2
import pytest

Expand Down Expand Up @@ -123,10 +124,33 @@ def test_get_jobs(job_store):
job_store=job_store,
)
)
job_store.launch_job(
jobs.Job(
id=0,
queue="queue_a",
task_name="task_5",
lock="lock_5",
task_kwargs={"i": "j"},
scheduled_at=pendulum.datetime(2000, 1, 1),
job_store=job_store,
)
)
# We won't see this one because of the scheduled date
job_store.launch_job(
jobs.Job(
id=0,
queue="queue_a",
task_name="task_6",
lock="lock_6",
task_kwargs={"k": "l"},
scheduled_at=pendulum.datetime(2050, 1, 1),
job_store=job_store,
)
)

result = list(job_store.get_jobs("queue_a"))

t1, t2 = result
t1, t2, t3 = result
assert result == [
jobs.Job(
id=t1.id,
Expand All @@ -144,6 +168,15 @@ def test_get_jobs(job_store):
queue="queue_a",
job_store=job_store,
),
jobs.Job(
id=t3.id,
queue="queue_a",
task_name="task_5",
lock="lock_5",
task_kwargs={"i": "j"},
scheduled_at=pendulum.datetime(2000, 1, 1),
job_store=job_store,
),
]


Expand Down
42 changes: 37 additions & 5 deletions tests/unit/test_jobs.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
import pendulum
import pytest

from cabbage import jobs, testing
from cabbage import jobs


@pytest.fixture
def job_store():
return testing.InMemoryJobStore()
def test_job_get_context(job_store):

job = jobs.Job(
id=12,
queue="marsupilami",
lock="sher",
task_name="mytask",
task_kwargs={"a": "b"},
scheduled_at=pendulum.datetime(2000, 1, 1, tz="Europe/Paris"),
job_store=job_store,
)

assert job.get_context() == {
"id": 12,
"queue": "marsupilami",
"lock": "sher",
"task_name": "mytask",
"task_kwargs": {"a": "b"},
"scheduled_at": "2000-01-01T00:00:00+01:00",
}

def test_job_get_context(job_store):

def test_job_get_context_without_scheduled_at(job_store):

job = jobs.Job(
id=12,
Expand All @@ -25,6 +43,7 @@ def test_job_get_context(job_store):
"lock": "sher",
"task_name": "mytask",
"task_kwargs": {"a": "b"},
"scheduled_at": None,
}


Expand Down Expand Up @@ -53,3 +72,16 @@ def test_job_defer(job_store):
job_store=job_store,
)
]


def test_job_scheduled_at_naive(job_store):
with pytest.raises(ValueError):
jobs.Job(
id=12,
queue="marsupilami",
lock="sher",
task_name="mytask",
task_kwargs={"a": "b"},
scheduled_at=pendulum.naive(2000, 1, 1),
job_store=job_store,
)
Loading