Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor the code to make task functions simple functions #15

Merged
merged 2 commits into from
Apr 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions cabbage/task_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def run(self, timeout: int = SOCKET_TIMEOUT) -> None:
break

logger.debug("Waiting")
select.select(rlist=[connection], wlist=[], xlist=[], timeout=timeout)
select.select([connection], [], [], timeout)
sdispater marked this conversation as resolved.
Show resolved Hide resolved

def process_tasks(self) -> None:
for task_row in self._task_manager.get_tasks(self._queue): # pragma: no branch
Expand Down Expand Up @@ -58,13 +58,12 @@ def run_task(self, task_row: postgres.TaskRow) -> None:

pk = task_row.id

task_run = tasks.TaskRun(task=task, id=pk, lock=task_row.targeted_object)
kwargs = task_row.args

description = f"{task.queue}.{task.name}.{pk}({kwargs})"
logger.info(f"Start - {description}")
try:
task_run.run(**kwargs)
task.func(**kwargs)
except Exception as e:
logger.exception(f"Error - {description}")
raise exceptions.TaskError() from e
Expand Down
60 changes: 32 additions & 28 deletions cabbage/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,17 @@

class Task:
def __init__(
self, *, manager: "TaskManager", queue: str, name: Optional[str] = None
self,
func: Callable,
*,
manager: "TaskManager",
queue: str,
name: Optional[str] = None,
):
self.queue = queue
self.manager = manager
self.func: Optional[Callable] = None
self.name = name

def __call__(self, func: Callable) -> "Task":
self.func = func
if not self.name:
self.name = func.__name__
self.manager.register(self)
return self
self.func: Callable = func
self.name = name or self.func.__name__

def defer(self, lock: str = None, **kwargs: types.JSONValue) -> None:
lock = lock or f"{uuid.uuid4()}"
Expand All @@ -47,13 +45,32 @@ def __init__(self, connection: Any = None) -> None:
self.tasks: Dict[str, Task] = {}
self.queues: Set[str] = set()

def task(self, **kwargs) -> Task:
kwargs["manager"] = self
task = Task(**kwargs)
return task
def task(
self,
_func: Optional[Callable] = None,
queue: str = "default",
name: Optional[str] = None,
) -> Callable:
"""
Declare a function as a task.

Can be used as a decorator or a simple method.
"""

def _wrap(func: Callable) -> Callable:
task = Task(func, manager=self, queue=queue, name=name)
self.register(task)

func.defer = task.defer

return func

if _func is None:
return _wrap

return _wrap(_func)

def register(self, task: Task) -> None:
assert task.name, "Task has no name"
self.tasks[task.name] = task
if task.queue not in self.queues:
logger.info(f"Creating queue {task.queue} (if not already existing)")
Expand All @@ -65,16 +82,3 @@ def get_tasks(self, queue: str) -> Iterator[postgres.TaskRow]:

def finish_task(self, task_row: postgres.TaskRow, status: str) -> None:
return postgres.finish_task(self.connection, task_row.id, status)


class TaskRun:
def __init__(
self, task: Task, id: int, lock: str
): # pylint: disable=redefined-builtin
self.task = task
self.id = id
self.lock = lock

def run(self, **kwargs) -> None:
assert self.task.func, "Task has no associated function"
self.task.func(self, **kwargs)
18 changes: 5 additions & 13 deletions cabbage_demo/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,20 @@


@task_manager.task(queue="sums")
def sum(task_run, a, b):
with open(f"{task_run.task.name}-{task_run.id}", "w") as f:
f.write(f"{a + b}")
def sum(a, b):
print(a + b)


@task_manager.task(queue="sums")
def sleep(task_run, i):
def sleep(i):
import time

time.sleep(i)


@task_manager.task(queue="sums")
def sum_plus_one(task_run, a, b):
with open(f"{task_run.task.name}-{task_run.id}", "w") as f:
f.write(f"{a + b + 1}")


@cabbage.Task(manager=task_manager, queue="products")
def product(task_run, a, b):
with open(f"{task_run.task.name}-{task_run.id}", "w") as f:
f.write(f"{a * b}")
def sum_plus_one(a, b):
print(a + b + 1)


def client():
Expand Down
25 changes: 17 additions & 8 deletions tests/acceptance/test_nominal.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import signal
import threading
import time

import cabbage

Expand All @@ -10,38 +12,45 @@ def test_nominal(connection, kill_own_pid):
product_results = []

@task_manager.task(queue="sum_queue")
def sum_task(task_run, a, b): # pytest: disable=unused-argument
def sum_task(a, b): # pytest: disable=unused-argument
sum_results.append(a + b)

@task_manager.task(queue="sum_queue")
def increment_task(task_run, a): # pytest: disable=unused-argument
def increment_task(a): # pytest: disable=unused-argument
sum_results.append(a + 1)

@task_manager.task(queue="sum_queue")
def stop_sum(task_run):
def stop_sum():
kill_own_pid(signal.SIGINT)

@task_manager.task(queue="product_queue")
def product_task(task_run, a, b): # pytest: disable=unused-argument
def product_task(a, b): # pytest: disable=unused-argument
product_results.append(a * b)

@task_manager.task(queue="product_queue")
def stop_product(task_run):
def stop_product():
kill_own_pid(signal.SIGTERM)

sum_task.defer(a=5, b=7)
sum_task.defer(a=3, b=4)
increment_task.defer(a=3)
stop_sum.defer()
product_task.defer(a=5, b=4)
stop_product.defer()

def stop():
time.sleep(1)
sum_task.defer(a=2, b=3)
stop_sum.defer()

thread = threading.Thread(target=stop)
thread.start()

cabbage.Worker(task_manager, "sum_queue").run(timeout=1e-9)

assert sum_results == [12, 7, 4]
assert sum_results == [12, 7, 4, 5]
assert product_results == []

cabbage.Worker(task_manager, "product_queue").run(timeout=1e-9)

assert sum_results == [12, 7, 4]
assert sum_results == [12, 7, 4, 5]
assert product_results == [20]
13 changes: 5 additions & 8 deletions tests/unit/test_task_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,7 @@ def process_tasks(self):
worker.run(timeout=42)

listen_queue.assert_called_with(connection=manager.connection, queue="marsupilami")
select.assert_called_with(
rlist=[manager.connection], wlist=[], xlist=[], timeout=42
)
select.assert_called_with([manager.connection], [], [], 42)


def test_process_tasks(mocker, manager):
Expand Down Expand Up @@ -76,11 +74,10 @@ def side_effect(task_row):
def test_run_task(manager):
result = []

def job(task_run, a, b): # pylint: disable=unused-argument
def job(a, b): # pylint: disable=unused-argument
result.append(a + b)

task = tasks.Task(manager=manager, queue="yay", name="job")
task.func = job
task = tasks.Task(job, manager=manager, queue="yay", name="job")

manager.tasks = {"job": task}

Expand All @@ -94,10 +91,10 @@ def job(task_run, a, b): # pylint: disable=unused-argument


def test_run_task_error(manager):
def job(task_run, a, b): # pylint: disable=unused-argument
def job(a, b): # pylint: disable=unused-argument
raise ValueError("nope")

task = tasks.Task(manager=manager, queue="yay", name="job")
task = tasks.Task(job, manager=manager, queue="yay", name="job")
task.func = job

manager.tasks = {"job": task}
Expand Down
88 changes: 30 additions & 58 deletions tests/unit/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,27 @@ def manager():
return tasks.TaskManager(object())


def test_task_call(manager, mocker):
def job():
pass

register = mocker.patch("cabbage.tasks.TaskManager.register")
task = tasks.Task(manager=manager, queue="queue")

def job():
pass

assert task(job) is task
def test_task_init_with_no_name(manager):
task = tasks.Task(job, manager=manager, queue="queue")

assert task.func is job
assert task.name == "job"
register.assert_called_with(task)


def test_task_call_explicit_name(manager, mocker):

def test_task_init_explicit_name(manager, mocker):
mocker.patch("cabbage.tasks.TaskManager.register")
task = tasks.Task(manager=manager, queue="queue", name="other")

def job():
pass

task(job)
task = tasks.Task(job, manager=manager, queue="queue", name="other")

assert task.name == "other"


def test_task_defer(manager, mocker):
launch_task = mocker.patch("cabbage.postgres.launch_task")
task = tasks.Task(manager=manager, queue="queue", name="job")
task = tasks.Task(job, manager=manager, queue="queue", name="job")

task.defer(lock="sherlock", a="b", c=3)

Expand All @@ -55,7 +45,7 @@ def test_task_defer(manager, mocker):

def test_task_defer_no_lock(manager, mocker):
launch_task = mocker.patch("cabbage.postgres.launch_task")
task = tasks.Task(manager=manager, queue="queue", name="job")
task = tasks.Task(job, manager=manager, queue="queue", name="job")

task.defer(a="b", c=3)

Expand All @@ -64,25 +54,35 @@ def test_task_defer_no_lock(manager, mocker):
assert uuid.UUID(kwargs["lock"])


def test_task_defer_no_name(manager, mocker):
mocker.patch("cabbage.postgres.launch_task")
task = tasks.Task(manager=manager, queue="queue")
def test_task_manager_task_explicit(manager, mocker):
mocker.patch("cabbage.postgres.register_queue")

with pytest.raises(AssertionError):
task.defer(a="b", c=3)
@manager.task(queue="a", name="b")
def wrapped():
return "foo"

assert "foo" == wrapped()
assert "b" == manager.tasks["b"].name
assert "a" == manager.tasks["b"].queue
assert manager.tasks["b"].func is wrapped

def test_task_manager_task(manager):
task = manager.task(queue="a", name="b")

assert task.queue == "a"
assert task.name == "b"
assert task.manager is manager
def test_task_manager_task_implicit(manager, mocker):
mocker.patch("cabbage.postgres.register_queue")

@manager.task
def wrapped():
return "foo"

assert "foo" == wrapped()
assert "wrapped" == manager.tasks["wrapped"].name
assert "default" == manager.tasks["wrapped"].queue
assert manager.tasks["wrapped"].func is wrapped


def test_task_manager_register(manager, mocker):
register_queue = mocker.patch("cabbage.postgres.register_queue")
task = tasks.Task(manager=manager, queue="queue", name="bla")
task = tasks.Task(job, manager=manager, queue="queue", name="bla")

manager.register(task)

Expand All @@ -91,19 +91,10 @@ def test_task_manager_register(manager, mocker):
register_queue.assert_called_with(manager.connection, "queue")


def test_task_manager_register_no_task_name(manager, mocker):
mocker.patch("cabbage.postgres.register_queue")

task = tasks.Task(manager=manager, queue="queue")

with pytest.raises(AssertionError):
manager.register(task)


def test_task_manager_register_queue_already_exists(manager, mocker):
register_queue = mocker.patch("cabbage.postgres.register_queue")
manager.queues.add("queue")
task = tasks.Task(manager=manager, queue="queue", name="bla")
task = tasks.Task(job, manager=manager, queue="queue", name="bla")

manager.register(task)

Expand All @@ -112,25 +103,6 @@ def test_task_manager_register_queue_already_exists(manager, mocker):
register_queue.assert_not_called()


def test_task_run_run(manager, mocker):

mocker.patch("cabbage.postgres.register_queue")
job = mocker.MagicMock()
task = tasks.Task(manager=manager, queue="bla", name="foo")(job)
task_run = tasks.TaskRun(task=task, id=12, lock="bla")
task_run.run(a=1, b=2)

job.assert_called_with(task_run, a=1, b=2)


def test_task_run_run_no_func(manager):
task = tasks.Task(manager=manager, queue="bla", name="foo")
task_run = tasks.TaskRun(task=task, id=12, lock="bla")

with pytest.raises(AssertionError):
task_run.run(a=1, b=2)


def test_task_manager_default_connection(mocker):
get_connection = mocker.patch("cabbage.postgres.get_connection")
manager = tasks.TaskManager()
Expand Down