Skip to content

Commit

Permalink
Add eager scheduler + pickers for >1 app|dest
Browse files Browse the repository at this point in the history
  • Loading branch information
sverhoeven committed Mar 4, 2024
1 parent 76fc0c3 commit ba6e715
Show file tree
Hide file tree
Showing 13 changed files with 368 additions and 34 deletions.
20 changes: 20 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ schedulers
* **arq**, Scheduler which uses a Redis server as a job queue and
1 or more workers (`bartender perform` command) to run the jobs.
* **dirac**, Scheduler which submits job to grid using [DIRAC](http://diracgrid.org/).
* **eager**, Scheduler which runs the job immediately on submission.

Supported file systems

Expand Down Expand Up @@ -393,6 +394,25 @@ destinations:
log_level: DEBUG
```

### Example of running jobs direct on submission

For applications that can be run within request/response cycle time window.
For example to alter the uploaded zip contents to mimic another applications output.

```yaml
destinations:
atonce:
scheduler:
type: eager
filesystem:
type: local
applications:
runimport:
command_template: mkdir -p output && mv * output || true
# `|| true` is there to swallow eror
# that output dir itself can not be moved
```

## Destination picker

If you have multiple applications and job destinations you need some way to
Expand Down
22 changes: 22 additions & 0 deletions src/bartender/check_load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from os import getloadavg, sched_getaffinity

from fastapi import HTTPException
from starlette import status


def check_load(max_load: float = 1.0) -> None:
"""Check if machine load is too high.
Args:
max_load: Maximum load allowed.
Raises:
HTTPException: When machine load is too high.
"""
nr_cpus = len(sched_getaffinity(0))
load_avg_last_minute = getloadavg()[0] / nr_cpus
if load_avg_last_minute > max_load:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Machine load is too high, please try again later.",
)
3 changes: 2 additions & 1 deletion src/bartender/destinations.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ class DestinationConfig(BaseModel):

# TODO validate that some combinations of scheduler and file system
# are not possible like
# * MemoryScheduler + SftpFileSystem
# * MemoryScheduler + remote fs
# * EagerScheduler + remote fs
# In future possible combos
# * AWSBatchScheduler + S3FileSystem
# * DiracScheduler + SrmFileSystem
Expand Down
49 changes: 49 additions & 0 deletions src/bartender/picker.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,55 @@ def pick_first(
return destination_names[0]


def pick_byname(
job_dir: Path,
application_name: str,
submitter: User,
context: "Context",
) -> str:
"""Picks destination with same name as application name.
Args:
job_dir: Location where job input files are located.
application_name: Application name that should be run.
submitter: User that submitted the job.
context: Context with applications and destinations.
Returns:
Destination name.
Raises:
KeyError: If application has no destination.
"""
if application_name in context.destinations:
return application_name
raise KeyError(f"Application {application_name} has no destination.")


def pick_byindex(
job_dir: Path,
application_name: str,
submitter: User,
context: "Context",
) -> str:
"""Picks destination by index.
For example the 2nd application will be submitted to the 2nd destination.
Args:
job_dir: Location where job input files are located.
application_name: Application name that should be run.
submitter: User that submitted the job.
context: Context with applications and destinations.
Returns:
Destination name.
"""
application_index = list(context.applications.keys()).index(application_name)
destination_names = list(context.destinations.keys())
return destination_names[application_index]


class PickRound:
"""Builder for round robin destination picker."""

Expand Down
7 changes: 7 additions & 0 deletions src/bartender/schedulers/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ class JobDescription(BaseModel):
command: str


class JobSubmissionError(Exception):
"""Error during job submission."""


class AbstractScheduler(ABC):
"""Abstract scheduler."""

Expand All @@ -34,6 +38,9 @@ async def submit(self, description: JobDescription) -> str:
Returns:
Identifier that can be used later to interact with job.
Raises:
JobSubmissionError: If job submission failed.
"""

@abstractmethod
Expand Down
17 changes: 11 additions & 6 deletions src/bartender/schedulers/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from bartender.schedulers.abstract import AbstractScheduler
from bartender.schedulers.arq import ArqScheduler, ArqSchedulerConfig
from bartender.schedulers.dirac_config import DiracSchedulerConfig
from bartender.schedulers.eager import EagerScheduler, EagerSchedulerConfig
from bartender.schedulers.memory import MemoryScheduler, MemorySchedulerConfig
from bartender.schedulers.slurm import SlurmScheduler, SlurmSchedulerConfig
from bartender.shared.dirac_config import DIRAC_INSTALLED
Expand All @@ -12,6 +13,7 @@
SlurmSchedulerConfig,
ArqSchedulerConfig,
DiracSchedulerConfig,
EagerSchedulerConfig,
]


Expand All @@ -28,12 +30,15 @@ def build(config: SchedulerConfig) -> AbstractScheduler:
A scheduler instance.
"""
if isinstance(config, MemorySchedulerConfig):
return MemoryScheduler(config)
if isinstance(config, SlurmSchedulerConfig):
return SlurmScheduler(config)
if isinstance(config, ArqSchedulerConfig):
return ArqScheduler(config)
config2scheduler = {
MemorySchedulerConfig: MemoryScheduler,
SlurmSchedulerConfig: SlurmScheduler,
ArqSchedulerConfig: ArqScheduler,
EagerSchedulerConfig: EagerScheduler,
}
for cfgcls, schedulercls in config2scheduler.items():
if isinstance(config, cfgcls):
return schedulercls(config)
if isinstance(config, DiracSchedulerConfig):
if DIRAC_INSTALLED:
from bartender.schedulers.dirac import ( # noqa: WPS433 is optional import
Expand Down
81 changes: 81 additions & 0 deletions src/bartender/schedulers/eager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
from asyncio import create_subprocess_shell, wait_for
from asyncio.subprocess import Process
from pathlib import Path
from typing import Literal
from uuid import uuid1

from pydantic import BaseModel, PositiveInt
from pydantic.types import PositiveFloat

from bartender.check_load import check_load
from bartender.db.models.job_model import State
from bartender.schedulers.abstract import (
AbstractScheduler,
JobDescription,
JobSubmissionError,
)


class EagerSchedulerConfig(BaseModel):
"""Configuration for eager scheduler.
Args:
max_load: Maximum load that scheduler will process submissions.
timeout: Maximum time to wait for job to finish. In seconds.
"""

type: Literal["eager"] = "eager"
max_load: PositiveFloat = 1.0
timeout: PositiveInt = 300


async def _exec(description: JobDescription, timeout: int) -> None:
stderr_fn = description.job_dir / "stderr.txt"
stdout_fn = description.job_dir / "stdout.txt"

with open(stderr_fn, "w") as stderr:
with open(stdout_fn, "w") as stdout:
proc = await create_subprocess_shell(
description.command,
stdout=stdout,
stderr=stderr,
cwd=description.job_dir,
)
try:
await _handle_job_completion(timeout, proc, description.job_dir)
except TimeoutError:
raise JobSubmissionError(f"Job took longer than {timeout} seconds")


async def _handle_job_completion(timeout: int, proc: Process, job_dir: Path) -> None:
returncode = await wait_for(proc.wait(), timeout=timeout)
(job_dir / "returncode").write_text(str(returncode))
if returncode != 0:
raise JobSubmissionError(
f"Job failed with return code {returncode}",
)


class EagerScheduler(AbstractScheduler):
"""Scheduler that runs jobs immediately on submission."""

def __init__(self, config: EagerSchedulerConfig) -> None:
self.config = config

async def submit(self, description: JobDescription) -> str: # noqa: D102
check_load(self.config.max_load)
await _exec(description, self.config.timeout)
return str(uuid1())

async def state(self, job_id: str) -> State: # noqa: D102
return "ok"

async def states(self, job_ids: list[str]) -> list[State]: # noqa: D102
return ["ok" for _ in job_ids]

async def cancel(self, job_id: str) -> None: # noqa: D102
pass # noqa: WPS420 -- cannot cancel job that is already completed.

async def close(self) -> None: # noqa: D102
pass # noqa: WPS420 -- nothing to close.
17 changes: 10 additions & 7 deletions src/bartender/web/api/applications/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from bartender.context import Context
from bartender.db.dao.job_dao import JobDAO
from bartender.filesystems.abstract import AbstractFileSystem
from bartender.schedulers.abstract import JobDescription
from bartender.schedulers.abstract import JobDescription, JobSubmissionError
from bartender.template_environment import template_environment
from bartender.web.users import User

Expand Down Expand Up @@ -71,13 +71,16 @@ async def submit( # noqa: WPS211
localized_description,
)

internal_job_id = await destination.scheduler.submit(localized_description)
try:
internal_job_id = await destination.scheduler.submit(localized_description)

await job_dao.update_internal_job_id(
external_job_id,
internal_job_id,
destination_name,
)
await job_dao.update_internal_job_id(
external_job_id,
internal_job_id,
destination_name,
)
except JobSubmissionError:
await job_dao.update_job_state(external_job_id, "error")


async def _upload_input_files(
Expand Down
1 change: 1 addition & 0 deletions src/bartender/web/api/job/interactive_apps.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ async def _shell(job_dir: Path, command: str, timeout: float) -> InteractiveAppR
job_dir: The path to the job directory.
command: The shell command to execute.
timeout: The maximum time to wait for the command to finish.
In seconds.
Returns:
The result of running the shell command.
Expand Down
20 changes: 1 addition & 19 deletions src/bartender/web/api/job/views.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from os import getloadavg, sched_getaffinity
from pathlib import Path
from typing import Annotated, Literal, Optional, Tuple, Type, Union

Expand All @@ -23,6 +22,7 @@
from starlette import status

from bartender.async_utils import async_wrap
from bartender.check_load import check_load
from bartender.config import CurrentConfig, InteractiveApplicationConfiguration
from bartender.context import CurrentContext, get_job_root_dir
from bartender.db.dao.job_dao import CurrentJobDAO
Expand Down Expand Up @@ -481,24 +481,6 @@ def get_interactive_app(
]


def check_load(max_load: float = 1.0) -> None:
"""Check if machine load is too high.
Args:
max_load: Maximum load allowed.
Raises:
HTTPException: When machine load is too high.
"""
nr_cpus = len(sched_getaffinity(0))
load_avg_last_minute = getloadavg()[0] / nr_cpus
if load_avg_last_minute > max_load:
raise HTTPException(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail="Machine load is too high, please try again later.",
)


@router.post(
"/{jobid}/interactive/{application}",
)
Expand Down
27 changes: 27 additions & 0 deletions tests/schedulers/test_eager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from pathlib import Path

import pytest

from bartender.schedulers.abstract import JobDescription, JobSubmissionError
from bartender.schedulers.eager import EagerScheduler, EagerSchedulerConfig


@pytest.mark.anyio
async def test_ok_running_job(tmp_path: Path) -> None:
async with EagerScheduler(EagerSchedulerConfig()) as scheduler:
description = JobDescription(command="echo -n hello", job_dir=tmp_path)

jid = await scheduler.submit(description)

assert (await scheduler.state(jid)) == "ok"
assert (tmp_path / "returncode").read_text() == "0"
assert (tmp_path / "stdout.txt").read_text() == "hello"


@pytest.mark.anyio
async def test_bad_running_job(tmp_path: Path) -> None:
async with EagerScheduler(EagerSchedulerConfig()) as scheduler:
description = JobDescription(command="exit 42", job_dir=tmp_path)

with pytest.raises(JobSubmissionError, match="Job failed with return code 42"):
await scheduler.submit(description)
Loading

0 comments on commit ba6e715

Please sign in to comment.