Skip to content

Commit

Permalink
Add schedule configuration option at jobs level
Browse files Browse the repository at this point in the history
  • Loading branch information
Sébastien Eustace committed May 6, 2019
1 parent b1b6103 commit ccc7419
Show file tree
Hide file tree
Showing 7 changed files with 129 additions and 14 deletions.
18 changes: 17 additions & 1 deletion cabbage/jobs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import datetime
import logging
from enum import Enum
from typing import Optional
Expand All @@ -10,6 +11,13 @@
logger = logging.getLogger(__name__)


def check_aware(
instance: "Job", attribute: attr.Attribute, value: datetime.datetime
) -> None:
if value and value.utcoffset() is None:
raise ValueError("Timezone aware datetime is required")


class Status(Enum):
TODO = "todo"
DOING = "doing"
Expand All @@ -24,6 +32,9 @@ class Job:
lock: str
task_name: str
task_kwargs: types.JSONDict = attr.ib(factory=dict)
scheduled_at: Optional[datetime.datetime] = attr.ib(
default=None, validator=check_aware
)
job_store: "cabbage.store.JobStore"

def defer(self, **task_kwargs: types.JSONValue) -> int:
Expand All @@ -40,6 +51,11 @@ def defer(self, **task_kwargs: types.JSONValue) -> int:
return id

def get_context(self) -> types.JSONDict:
return attr.asdict(
context = attr.asdict(
self, filter=attr.filters.exclude(attr.fields(Job).job_store)
)

if context["scheduled_at"]:
context["scheduled_at"] = context["scheduled_at"].isoformat()

return context
12 changes: 12 additions & 0 deletions cabbage/tasks.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import datetime
import functools
import logging
import uuid
from typing import Any, Callable, Dict, Optional, Set

import pendulum

from cabbage import jobs, postgres, store, types

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -35,7 +38,15 @@ def configure(
*,
lock: Optional[str] = None,
task_kwargs: Optional[types.JSONDict] = None,
schedule_at: Optional[datetime.datetime] = None,
schedule_in: Optional[Dict[str, int]] = None,
) -> jobs.Job:
if schedule_at and schedule_in is not None:
raise ValueError("Cannot set both schedule_at and schedule_in")

if schedule_in is not None:
schedule_at = pendulum.now("UTC").add(**schedule_in)

lock = lock or str(uuid.uuid4())
task_kwargs = task_kwargs or {}
return jobs.Job(
Expand All @@ -44,6 +55,7 @@ def configure(
task_name=self.name,
queue=self.queue,
task_kwargs=task_kwargs,
scheduled_at=schedule_at,
job_store=self.manager.job_store,
)

Expand Down
3 changes: 2 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ packages = find:
install_requires =
psycopg2
attrs
pendulum

[options.extras_require]
dev =
Expand Down Expand Up @@ -57,7 +58,7 @@ max-line-length = 88
[tool:pytest]
addopts = --cov-report term-missing --cov-branch --cov-report html --cov-report term --cov=cabbage -vv

[mypy-setuptools.*,psycopg2.*]
[mypy-setuptools.*,psycopg2.*,pendulum.*]
ignore_missing_imports = True

[coverage:report]
Expand Down
31 changes: 31 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from psycopg2 import sql
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

from cabbage import jobs, tasks, testing


def _execute(cursor, query, *identifiers):
cursor.execute(
Expand Down Expand Up @@ -63,3 +65,32 @@ def f(signal=stdlib_signal.SIGTERM):
os.kill(os.getpid(), signal)

return f


@pytest.fixture
def job_store():
return testing.InMemoryJobStore()


@pytest.fixture
def task_manager(job_store):
return tasks.TaskManager(job_store=job_store)


@pytest.fixture
def job_factory(job_store):
defaults = {
"id": 42,
"task_name": "bla",
"task_kwargs": {},
"lock": None,
"queue": "queue",
"job_store": job_store,
}

def factory(**kwargs):
final_kwargs = defaults.copy()
final_kwargs.update(kwargs)
return jobs.Job(**final_kwargs)

return factory
42 changes: 37 additions & 5 deletions tests/unit/test_jobs.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,32 @@
import pendulum
import pytest

from cabbage import jobs, testing
from cabbage import jobs


@pytest.fixture
def job_store():
return testing.InMemoryJobStore()
def test_job_get_context(job_store):

job = jobs.Job(
id=12,
queue="marsupilami",
lock="sher",
task_name="mytask",
task_kwargs={"a": "b"},
scheduled_at=pendulum.datetime(2000, 1, 1, tz="Europe/Paris"),
job_store=job_store,
)

assert job.get_context() == {
"id": 12,
"queue": "marsupilami",
"lock": "sher",
"task_name": "mytask",
"task_kwargs": {"a": "b"},
"scheduled_at": "2000-01-01T00:00:00+01:00",
}

def test_job_get_context(job_store):

def test_job_get_context_without_scheduled_at(job_store):

job = jobs.Job(
id=12,
Expand All @@ -25,6 +43,7 @@ def test_job_get_context(job_store):
"lock": "sher",
"task_name": "mytask",
"task_kwargs": {"a": "b"},
"scheduled_at": None,
}


Expand Down Expand Up @@ -53,3 +72,16 @@ def test_job_defer(job_store):
job_store=job_store,
)
]


def test_job_scheduled_at_naive(job_store):
with pytest.raises(ValueError):
jobs.Job(
id=12,
queue="marsupilami",
lock="sher",
task_name="mytask",
task_kwargs={"a": "b"},
scheduled_at=pendulum.naive(2000, 1, 1),
job_store=job_store,
)
37 changes: 30 additions & 7 deletions tests/unit/test_tasks.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
import uuid

import pendulum
import pytest

from cabbage import jobs, postgres, tasks, testing


@pytest.fixture
def task_manager(mocker):
store = testing.InMemoryJobStore()
return tasks.TaskManager(job_store=store)
from cabbage import jobs, postgres, tasks


def task_func():
Expand Down Expand Up @@ -65,6 +60,34 @@ def test_task_configure_no_lock(task_manager):
assert uuid.UUID(job.lock)


def test_task_configure_schedule_at(task_manager):
task = tasks.Task(task_func, manager=task_manager, queue="queue")

job = task.configure(schedule_at=pendulum.datetime(2000, 1, 1, tz="Europe/Paris"))

assert job.scheduled_at == pendulum.datetime(2000, 1, 1, tz="Europe/Paris")


def test_task_configure_schedule_in(task_manager):
task = tasks.Task(task_func, manager=task_manager, queue="queue")

now = pendulum.datetime(2000, 1, 1, tz="Europe/Paris")
with pendulum.test(now):
job = task.configure(schedule_in={"hours": 2})

assert job.scheduled_at == pendulum.datetime(2000, 1, 1, 2, tz="Europe/Paris")


def test_task_configure_schedule_in_and_schedule_at(task_manager):
task = tasks.Task(task_func, manager=task_manager, queue="queue")

with pytest.raises(ValueError):
task.configure(
schedule_at=pendulum.datetime(2000, 1, 1, tz="Europe/Paris"),
schedule_in={"hours": 2},
)


def test_task_manager_task_explicit(task_manager, mocker):
@task_manager.task(queue="a", name="b")
def wrapped():
Expand Down
Empty file added tests/unit/test_testing.py
Empty file.

0 comments on commit ccc7419

Please sign in to comment.