Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extend job database interface to allow more granular querying on status #607

Merged
merged 14 commits into from
Sep 13, 2024
90 changes: 52 additions & 38 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,32 @@ def read(self) -> pd.DataFrame:
def persist(self, df: pd.DataFrame):
"""
Store job data to the database.
The provided dataframe may contain partial information, which is merged into the larger database.

:param df: job data to store.
"""
...

@abc.abstractmethod
def has_unfinished_jobs(self) -> bool:
jdries marked this conversation as resolved.
Show resolved Hide resolved
"""
Check if there are still unfinished jobs in the database.

:return: True if there are unfinished jobs, False otherwise.
"""
...

@abc.abstractmethod
def get_by_status(self, include, max=None) -> pd.DataFrame:
jdries marked this conversation as resolved.
Show resolved Hide resolved
"""
Returns a dataframe with jobs, filtered by status.

:param include: List of statuses to include.
:param max: Maximum number of jobs to return.

:return: DataFrame with jobs filtered by status.
"""
...

class MultiBackendJobManager:
"""
Expand Down Expand Up @@ -325,33 +346,28 @@ def run_jobs(
# Resume from existing db
_log.info(f"Resuming `run_jobs` from existing {job_db}")
df = job_db.read()
jdries marked this conversation as resolved.
Show resolved Hide resolved
status_histogram = df.groupby("status").size().to_dict()
_log.info(f"Status histogram: {status_histogram}")

df = self._normalize_df(df)
#status_histogram = df.groupby("status").size().to_dict()
#_log.info(f"Status histogram: {status_histogram}")
jdries marked this conversation as resolved.
Show resolved Hide resolved
else:
df = self._normalize_df(df)
job_db.persist(df)
jdries marked this conversation as resolved.
Show resolved Hide resolved

while (
df[
(df.status != "finished")
& (df.status != "skipped")
& (df.status != "start_failed")
& (df.status != "error")
].size
> 0
job_db.has_unfinished_jobs()

):
with ignore_connection_errors(context="get statuses"):
self._update_statuses(df)
status_histogram = df.groupby("status").size().to_dict()
_log.info(f"Status histogram: {status_histogram}")
job_db.persist(df)
self._update_statuses(job_db)
#TODO do we still want to support this? Would require a 'count_by_status'?
#status_histogram = df.groupby("status").size().to_dict()
#_log.info(f"Status histogram: {status_histogram}")
jdries marked this conversation as resolved.
Show resolved Hide resolved


if len(df[df.status == "not_started"]) > 0:
not_started = job_db.get_by_status(include=["not_started"],max=200)

if len(not_started) > 0:
# Check number of jobs running at each backend
running = df[
(df.status == "created")
| (df.status == "queued")
| (df.status == "running")
]
running = job_db.get_by_status(include=["created","queued","running"])
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
for backend_name in self.backends:
Expand All @@ -360,10 +376,10 @@ def run_jobs(
to_add = (
self.backends[backend_name].parallel_jobs - backend_load
)
to_launch = df[df.status == "not_started"].iloc[0:to_add]
to_launch = not_started.iloc[0:to_add]
for i in to_launch.index:
self._launch_job(start_job, df, i, backend_name)
job_db.persist(df)
self._launch_job(start_job, not_started, i, backend_name)
job_db.persist(to_launch)

time.sleep(self.poll_sleep)

Expand Down Expand Up @@ -481,16 +497,14 @@ def ensure_job_dir_exists(self, job_id: str) -> Path:
if not job_dir.exists():
job_dir.mkdir(parents=True)

def _update_statuses(self, df: pd.DataFrame):
def _update_statuses(self, job_db: JobDatabaseInterface):
"""Update status (and stats) of running jobs (in place)."""
active = df.loc[
(df.status == "created")
| (df.status == "queued")
| (df.status == "running")
]

active = self.job_db.get_by_status(include=["created", "queued", "running"])
jdries marked this conversation as resolved.
Show resolved Hide resolved

for i in active.index:
job_id = df.loc[i, "id"]
backend_name = df.loc[i, "backend_name"]
job_id = active.loc[i, "id"]
backend_name = active.loc[i, "backend_name"]

try:
con = self._get_connection(backend_name)
Expand All @@ -500,19 +514,19 @@ def _update_statuses(self, df: pd.DataFrame):
f"Status of job {job_id!r} (on backend {backend_name}) is {job_metadata['status']!r}"
)
if job_metadata["status"] == "finished":
self.on_job_done(the_job, df.loc[i])
if df.loc[i, "status"] != "error" and job_metadata["status"] == "error":
self.on_job_error(the_job, df.loc[i])
self.on_job_done(the_job, active.loc[i])
if active.loc[i, "status"] != "error" and job_metadata["status"] == "error":
self.on_job_error(the_job, active.loc[i])

df.loc[i, "status"] = job_metadata["status"]
active.loc[i, "status"] = job_metadata["status"]
# TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df`
for key in job_metadata.get("usage", {}).keys():
df.loc[i, key] = _format_usage_stat(job_metadata, key)
active.loc[i, key] = _format_usage_stat(job_metadata, key)

except OpenEoApiError as e:
print(f"error for job {job_id!r} on backend {backend_name}")
print(e)

job_db.persist(active)

def _format_usage_stat(job_metadata: dict, field: str) -> str:
value = deep_get(job_metadata, "usage", field, "value", default=0)
Expand Down
Loading