Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Boefjes combined schedulers integration #4015

Open
wants to merge 6 commits into
base: poc/mula/combined-schedulers
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 16 additions & 36 deletions boefjes/boefjes/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,42 +81,28 @@ def run(self, queue_type: WorkerManager.Queue) -> None:
raise

def _fill_queue(self, task_queue: Queue, queue_type: WorkerManager.Queue) -> None:
if task_queue.qsize() > self.settings.pool_size:
time.sleep(self.settings.worker_heartbeat)
return
logger.debug("Popping from queue %s", queue_type.value)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add this back in

try:
queues = self.scheduler_client.get_queues()
except HTTPError:
# Scheduler is having issues, so make note of it and try again
logger.exception("Getting the queues from the scheduler failed")
time.sleep(self.settings.poll_interval) # But not immediately
response = self.scheduler_client.pop_item(queue_type.value)
except (HTTPError, ValidationError):
logger.exception("Popping task from scheduler failed, sleeping 10 seconds")
time.sleep(10)
return

# We do not target a specific queue since we start one runtime for all organisations
# and queue ids contain the organisation_id
queues = [q for q in queues if q.id.startswith(queue_type.value) and q.size > 0]

logger.debug("Found queues: %s", [queue.id for queue in queues])

all_queues_empty = True

for queue in queues:
logger.debug("Popping from queue %s", queue.id)

try:
p_item = self.scheduler_client.pop_item(queue.id)
except (HTTPError, ValidationError):
logger.exception("Popping task from scheduler failed, sleeping 10 seconds")
time.sleep(10)
continue

if not p_item:
logger.debug("Queue %s empty", queue.id)
continue
# TODO: check
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check what kind of response the scheduler gives when the queue is empty and handle this.

if not response:
logger.debug("Queue %s empty", queue_type.value)
time.sleep(10)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be time.sleep(self.settings.poll_interval) ?

return

all_queues_empty = False
# TODO: check
if response.count == 0:
logger.debug("Queue %s empty", queue_type.value)
time.sleep(10)
return

for p_item in response.results:
logger.info("Handling task[%s]", p_item.data.id)

try:
Expand All @@ -134,12 +120,6 @@ def _fill_queue(self, task_queue: Queue, queue_type: WorkerManager.Queue) -> Non
except HTTPError:
logger.exception("Could not patch scheduler task to %s", TaskStatus.FAILED.value)

raise

if all_queues_empty:
logger.debug("All queues empty, sleeping %f seconds", self.settings.poll_interval)
time.sleep(self.settings.poll_interval)

def _check_workers(self) -> None:
new_workers = []

Expand Down
30 changes: 24 additions & 6 deletions boefjes/boefjes/clients/scheduler_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import datetime
import uuid
from enum import Enum
from typing import Any

from httpx import Client, HTTPTransport, Response
from pydantic import BaseModel, TypeAdapter
Expand Down Expand Up @@ -29,7 +30,8 @@ class TaskStatus(Enum):
class Task(BaseModel):
id: uuid.UUID
scheduler_id: str
schedule_id: str | None
schedule_id: uuid.UUID | None = None
organisation: str
priority: int
status: TaskStatus
type: str
Expand All @@ -39,11 +41,21 @@ class Task(BaseModel):
modified_at: datetime.datetime


class PaginatedTasksResponse(BaseModel):
count: int
next: str | None = None
previous: str | None = None
results: list[Task]


class SchedulerClientInterface:
def get_queues(self) -> list[Queue]:
raise NotImplementedError()

def pop_item(self, queue_id: str) -> Task | None:
def pop_item(self, scheduler_id: str) -> PaginatedTasksResponse | None:
raise NotImplementedError()

def pop_items(self, scheduler_id: str, filters: dict[str, Any]) -> PaginatedTasksResponse | None:
raise NotImplementedError()

def patch_task(self, task_id: uuid.UUID, status: TaskStatus) -> None:
Expand Down Expand Up @@ -72,14 +84,20 @@ def get_queues(self) -> list[Queue]:

return TypeAdapter(list[Queue]).validate_json(response.content)

def pop_item(self, queue_id: str) -> Task | None:
response = self._session.post(f"/queues/{queue_id}/pop")
def pop_item(self, scheduler_id: str) -> PaginatedTasksResponse | None:
response = self._session.post(f"/schedulers/{scheduler_id}/pop?limit=1")
self._verify_response(response)

return TypeAdapter(PaginatedTasksResponse | None).validate_json(response.content)

def pop_items(self, scheduler_id: str, filters: dict[str, Any]) -> PaginatedTasksResponse | None:
response = self._session.post(f"/schedulers/{scheduler_id}/pop", json=filters)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Important to note is that the pop endpoint changed to return multiple Tasks, additionally it allows for more complex filtering options. I've, for now limited the response to 1 Task to resemble how it currently works, but this can be updated accordingly to what is desired.

self._verify_response(response)

return TypeAdapter(Task | None).validate_json(response.content)
return TypeAdapter(PaginatedTasksResponse | None).validate_json(response.content)

def push_item(self, p_item: Task) -> None:
response = self._session.post(f"/queues/{p_item.scheduler_id}/push", content=p_item.model_dump_json())
response = self._session.post(f"/schedulers/{p_item.scheduler_id}/push", content=p_item.model_dump_json())
self._verify_response(response)

def patch_task(self, task_id: uuid.UUID, status: TaskStatus) -> None:
Expand Down
Loading