Skip to content

Commit

Permalink
Squashed commit of PR #596 (cancel feature in job manager)
Browse files Browse the repository at this point in the history
refs: #596, #590
  • Loading branch information
HansVRP authored and soxofaan committed Sep 3, 2024
1 parent 29ae888 commit b656fd7
Show file tree
Hide file tree
Showing 2 changed files with 164 additions and 56 deletions.
103 changes: 73 additions & 30 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

from openeo import BatchJob, Connection
from openeo.rest import OpenEoApiError
from openeo.util import deep_get
from openeo.util import deep_get, rfc3339

_log = logging.getLogger(__name__)


class _Backend(NamedTuple):
"""Container for backend info/settings"""

Expand Down Expand Up @@ -112,7 +111,10 @@ def start_job(
"""

def __init__(
self, poll_sleep: int = 60, root_dir: Optional[Union[str, Path]] = "."
self,
poll_sleep: int = 60,
root_dir: Optional[Union[str, Path]] = ".",
cancel_running_job_after: Optional[int] = None,
):
"""Create a MultiBackendJobManager.
Expand All @@ -129,6 +131,10 @@ def __init__(
- get_job_dir
- get_error_log_path
- get_job_metadata_path
:param cancel_running_job_after [seconds]:
A temporal limit for long running jobs to get automatically canceled.
The preset is None, which disables the feature.
"""
self.backends: Dict[str, _Backend] = {}
self.poll_sleep = poll_sleep
Expand All @@ -137,6 +143,10 @@ def __init__(
# An explicit None or "" should also default to "."
self._root_dir = Path(root_dir or ".")

self.cancel_running_job_after = (
datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None
)

def add_backend(
self,
name: str,
Expand All @@ -161,9 +171,7 @@ def add_backend(
c = connection
connection = lambda: c
assert callable(connection)
self.backends[name] = _Backend(
get_connection=connection, parallel_jobs=parallel_jobs
)
self.backends[name] = _Backend(get_connection=connection, parallel_jobs=parallel_jobs)

def _get_connection(self, backend_name: str, resilient: bool = True) -> Connection:
"""Get a connection for the backend and optionally make it resilient (adds retry behavior)
Expand Down Expand Up @@ -226,13 +234,15 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
("status", "not_started"),
("id", None),
("start_time", None),
("running_start_time", None),
# TODO: columns "cpu", "memory", "duration" are not referenced directly
# within MultiBackendJobManager making it confusing to claim they are required.
# However, they are through assumptions about job "usage" metadata in `_update_statuses`.
# However, they are through assumptions about job "usage" metadata in `_track_statuses`.
("cpu", None),
("memory", None),
("duration", None),
("backend_name", None),
("backend_name", None),
]
new_columns = {col: val for (col, val) in required_with_default if col not in df.columns}
df = df.assign(**new_columns)
Expand Down Expand Up @@ -336,30 +346,26 @@ def run_jobs(
& (df.status != "skipped")
& (df.status != "start_failed")
& (df.status != "error")
& (df.status != "canceled")
].size
> 0
):

with ignore_connection_errors(context="get statuses"):
self._update_statuses(df)
self._track_statuses(df)
status_histogram = df.groupby("status").size().to_dict()
_log.info(f"Status histogram: {status_histogram}")
job_db.persist(df)

if len(df[df.status == "not_started"]) > 0:
# Check number of jobs running at each backend
running = df[
(df.status == "created")
| (df.status == "queued")
| (df.status == "running")
]
running = df[(df.status == "created") | (df.status == "queued") | (df.status == "running")]
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
for backend_name in self.backends:
backend_load = per_backend.get(backend_name, 0)
if backend_load < self.backends[backend_name].parallel_jobs:
to_add = (
self.backends[backend_name].parallel_jobs - backend_load
)
to_add = self.backends[backend_name].parallel_jobs - backend_load
to_launch = df[df.status == "not_started"].iloc[0:to_add]
for i in to_launch.index:
self._launch_job(start_job, df, i, backend_name)
Expand Down Expand Up @@ -407,7 +413,7 @@ def _launch_job(self, start_job, df, i, backend_name):
_log.warning(f"Failed to start job for {row.to_dict()}", exc_info=True)
df.loc[i, "status"] = "start_failed"
else:
df.loc[i, "start_time"] = datetime.datetime.now().isoformat()
df.loc[i, "start_time"] = rfc3339.utcnow()
if job:
df.loc[i, "id"] = job.job_id
with ignore_connection_errors(context="get status"):
Expand Down Expand Up @@ -463,6 +469,33 @@ def on_job_error(self, job: BatchJob, row):
self.ensure_job_dir_exists(job.job_id)
error_log_path.write_text(json.dumps(error_logs, indent=2))

def on_job_cancel(self, job: BatchJob, row):
"""
Handle a job that was cancelled. Can be overridden to provide custom behaviour.
Default implementation does not do anything.
:param job: The job that was canceled.
:param row: DataFrame row containing the job's metadata.
"""
pass

def _cancel_prolonged_job(self, job: BatchJob, row):
"""Cancel the job if it has been running for too long."""
job_running_start_time = rfc3339.parse_datetime(row["running_start_time"], with_timezone=True)
current_time = rfc3339.parse_datetime(rfc3339.utcnow(), with_timezone=True)

if current_time > job_running_start_time + self.cancel_running_job_after:
try:
_log.info(
f"Cancelling job {job.job_id} as it has been running for more than {self.cancel_running_job_after}"
)

job.stop()

except OpenEoApiError as e:
_log.error(f"Error Cancelling long-running job {job.job_id}: {e}")

def get_job_dir(self, job_id: str) -> Path:
"""Path to directory where job metadata, results and error logs are be saved."""
return self._root_dir / f"job_{job_id}"
Expand All @@ -481,13 +514,9 @@ def ensure_job_dir_exists(self, job_id: str) -> Path:
if not job_dir.exists():
job_dir.mkdir(parents=True)

def _update_statuses(self, df: pd.DataFrame):
"""Update status (and stats) of running jobs (in place)."""
active = df.loc[
(df.status == "created")
| (df.status == "queued")
| (df.status == "running")
]
def _track_statuses(self, df: pd.DataFrame):
"""tracks status (and stats) of running jobs (in place). Optinally cancels jobs when running too long"""
active = df.loc[(df.status == "created") | (df.status == "queued") | (df.status == "running")]
for i in active.index:
job_id = df.loc[i, "id"]
backend_name = df.loc[i, "backend_name"]
Expand All @@ -496,15 +525,29 @@ def _update_statuses(self, df: pd.DataFrame):
con = self._get_connection(backend_name)
the_job = con.job(job_id)
job_metadata = the_job.describe()
_log.info(
f"Status of job {job_id!r} (on backend {backend_name}) is {job_metadata['status']!r}"
)
if job_metadata["status"] == "finished":

previous_status = df.loc[i, "status"]
new_status = job_metadata["status"]

_log.info(f"Status of job {job_id!r} (on backend {backend_name}) is {new_status!r}")

if new_status == "finished":
self.on_job_done(the_job, df.loc[i])
if df.loc[i, "status"] != "error" and job_metadata["status"] == "error":

if previous_status != "error" and new_status == "error":
self.on_job_error(the_job, df.loc[i])

df.loc[i, "status"] = job_metadata["status"]
if previous_status in {"created", "queued"} and new_status == "running":
df.loc[i, "running_start_time"] = rfc3339.utcnow()

if new_status == "canceled":
self.on_job_cancel(the_job, df.loc[i])

if self.cancel_running_job_after and new_status == "running":
self._cancel_prolonged_job(the_job, df.loc[i])

df.loc[i, "status"] = new_status

# TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df`
for key in job_metadata.get("usage", {}).keys():
df.loc[i, key] = _format_usage_stat(job_metadata, key)
Expand Down
Loading

0 comments on commit b656fd7

Please sign in to comment.