Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use threading to run jobmanager loop #614

Closed
wants to merge 15 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Wrap OIDC token request failure in more descriptive `OidcException` (related to [#624](https://github.com/Open-EO/openeo-python-client/issues/624))
- Added `auto_add_save_result` option (on by default) to disable automatic addition of `save_result` node on `download`/`create_job`/`execute_batch` ([#513](https://github.com/Open-EO/openeo-python-client/issues/513))
- Add support for `apply_vectorcube` UDF signature in `run_udf_code` ([Open-EO/openeo-geopyspark-driver#881]https://github.com/Open-EO/openeo-geopyspark-driver/issues/811)
- `MultiBackendJobManager`: add API to the update loop in a separate thread, allowing controlled interruption.

### Changed

Expand Down
155 changes: 126 additions & 29 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import time
import warnings
from pathlib import Path
from typing import Callable, Dict, NamedTuple, Optional, Union, List
from threading import Thread
from typing import Callable, Dict, List, NamedTuple, Optional, Union

import pandas as pd
import requests
Expand All @@ -31,6 +32,9 @@ class _Backend(NamedTuple):

MAX_RETRIES = 5

# Sentinel value to indicate that a parameter was not set
_UNSET = object()


class JobDatabaseInterface(metaclass=abc.ABCMeta):
"""
Expand Down Expand Up @@ -161,6 +165,7 @@ def __init__(
.. versionchanged:: 0.32.0
Added `cancel_running_job_after` parameter.
"""
self._stop_thread = None
self.backends: Dict[str, _Backend] = {}
self.poll_sleep = poll_sleep
self._connections: Dict[str, _Backend] = {}
Expand All @@ -171,6 +176,7 @@ def __init__(
self._cancel_running_job_after = (
datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None
)
self._thread = None

def add_backend(
self,
Expand Down Expand Up @@ -253,6 +259,7 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
:param df: The dataframe to normalize.
:return: a new dataframe that is normalized.
"""
# TODO: this was originally an internal helper, but we need a clean public API for the user

# check for some required columns.
required_with_default = [
Expand All @@ -263,6 +270,7 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
# TODO: columns "cpu", "memory", "duration" are not referenced directly
# within MultiBackendJobManager making it confusing to claim they are required.
# However, they are through assumptions about job "usage" metadata in `_track_statuses`.
# => proposed solution: allow to configure usage columns when adding a backend
("cpu", None),
("memory", None),
("duration", None),
Expand All @@ -273,6 +281,89 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:

return df

def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabaseInterface):
"""
Start running the jobs in a separate thread, returns afterwards.

:param start_job:
A callback which will be invoked with, amongst others,
the row of the dataframe for which a job should be created and/or started.
This callable should return a :py:class:`openeo.rest.job.BatchJob` object.

The following parameters will be passed to ``start_job``:

``row`` (:py:class:`pandas.Series`):
The row in the pandas dataframe that stores the jobs state and other tracked data.

``connection_provider``:
A getter to get a connection by backend name.
Typically, you would need either the parameter ``connection_provider``,
or the parameter ``connection``, but likely you will not need both.

``connection`` (:py:class:`Connection`):
The :py:class:`Connection` itself, that has already been created.
Typically, you would need either the parameter ``connection_provider``,
or the parameter ``connection``, but likely you will not need both.

``provider`` (``str``):
The name of the backend that will run the job.

You do not have to define all the parameters described below, but if you leave
any of them out, then remember to include the ``*args`` and ``**kwargs`` parameters.
Otherwise you will have an exception because :py:meth:`run_jobs` passes unknown parameters to ``start_job``.
:param job_db:
Job database to load/store existing job status data and other metadata from/to.
Can be specified as a path to CSV or Parquet file,
or as a custom database object following the :py:class:`JobDatabaseInterface` interface.

.. note::
Support for Parquet files depends on the ``pyarrow`` package
as :ref:`optional dependency <installation-optional-dependencies>`.

.. versionadded:: 0.32.0
"""

# Resume from existing db
_log.info(f"Resuming `run_jobs` from existing {job_db}")
df = job_db.read()

self._stop_thread = False
def run_loop():
while (
sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0
and not self._stop_thread
):
self._job_update_loop(df, job_db, start_job)

jdries marked this conversation as resolved.
Show resolved Hide resolved
# Do sequence of micro-sleeps to allow for quick thread exit
for _ in range(int(max(1, self.poll_sleep))):
time.sleep(1)
if self._stop_thread:
break

self._thread = Thread(target=run_loop)
self._thread.start()

def stop_job_thread(self, timeout_seconds: Optional[float] = _UNSET):
"""
Stop the job polling thread.

:param timeout_seconds: The time to wait for the thread to stop.
By default, it will wait for 2 times the poll_sleep time.
Set to None to wait indefinitely.

.. versionadded:: 0.32.0
"""
if self._thread is not None:
self._stop_thread = True
if timeout_seconds is _UNSET:
timeout_seconds = 2 * self.poll_sleep
self._thread.join(timeout_seconds)
if self._thread.is_alive():
_log.warning("Job thread did not stop after timeout")
else:
_log.error("No job thread to stop")

def run_jobs(
self,
df: Optional[pd.DataFrame],
Expand Down Expand Up @@ -362,32 +453,34 @@ def run_jobs(
df = self._normalize_df(df)
job_db.persist(df)

while (
sum(job_db.count_by_status(statuses=["not_started","created","queued","running"]).values()) > 0

):

with ignore_connection_errors(context="get statuses"):
self._track_statuses(job_db)

not_started = job_db.get_by_status(statuses=["not_started"],max=200)

if len(not_started) > 0:
# Check number of jobs running at each backend
running = job_db.get_by_status(statuses=["created","queued","running"])
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
for backend_name in self.backends:
backend_load = per_backend.get(backend_name, 0)
if backend_load < self.backends[backend_name].parallel_jobs:
to_add = self.backends[backend_name].parallel_jobs - backend_load
to_launch = not_started.iloc[0:to_add]
for i in to_launch.index:
self._launch_job(start_job, not_started, i, backend_name)
job_db.persist(to_launch)

while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0:
self._job_update_loop(df, job_db, start_job)
time.sleep(self.poll_sleep)

def _job_update_loop(self, df, job_db, start_job):
"""
Inner loop logic of job management:
go through the necessary jobs to check for status updates,
trigger status events, start new jobs when there is room for them, etc.
"""
with ignore_connection_errors(context="get statuses"):
self._track_statuses(job_db)

not_started = job_db.get_by_status(statuses=["not_started"], max=200)
if len(not_started) > 0:
# Check number of jobs running at each backend
running = job_db.get_by_status(statuses=["created", "queued", "running"])
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
for backend_name in self.backends:
backend_load = per_backend.get(backend_name, 0)
if backend_load < self.backends[backend_name].parallel_jobs:
to_add = self.backends[backend_name].parallel_jobs - backend_load
to_launch = not_started.iloc[0:to_add]
for i in to_launch.index:
self._launch_job(start_job, not_started, i, backend_name)
job_db.persist(to_launch)

def _launch_job(self, start_job, df, i, backend_name):
"""Helper method for launching jobs

Expand Down Expand Up @@ -566,7 +659,8 @@ def _track_statuses(self, job_db: JobDatabaseInterface):

# TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df`
for key in job_metadata.get("usage", {}).keys():
active.loc[i, key] = _format_usage_stat(job_metadata, key)
if key in active.columns:
active.loc[i, key] = _format_usage_stat(job_metadata, key)

except OpenEoApiError as e:
print(f"error for job {job_id!r} on backend {backend_name}")
Expand All @@ -593,7 +687,6 @@ def ignore_connection_errors(context: Optional[str] = None, sleep: int = 5):

class FullDataFrameJobDatabase(JobDatabaseInterface):


def __init__(self):
super().__init__()
self._df = None
Expand All @@ -608,8 +701,6 @@ def count_by_status(self, statuses: List[str]) -> dict:
status_histogram = self.df.groupby("status").size().to_dict()
return {k:v for k,v in status_histogram.items() if k in statuses}



def get_by_status(self, statuses, max=None) -> pd.DataFrame:
"""
Returns a dataframe with jobs, filtered by status.
Expand Down Expand Up @@ -647,6 +738,9 @@ def __init__(self, path: Union[str, Path]):
super().__init__()
self.path = Path(path)

def __repr__(self):
return f"{self.__class__.__name__}({str(self.path)!r})"

def exists(self) -> bool:
return self.path.exists()

Expand Down Expand Up @@ -696,6 +790,9 @@ def __init__(self, path: Union[str, Path]):
super().__init__()
self.path = Path(path)

def __repr__(self):
return f"{self.__class__.__name__}({str(self.path)!r})"

def exists(self) -> bool:
return self.path.exists()

Expand Down
Loading