From b8d0059f86f5a1cb7f38ac6e01fb8199c2e2aa51 Mon Sep 17 00:00:00 2001 From: Jeroen Dries Date: Thu, 5 Sep 2024 20:59:07 +0200 Subject: [PATCH 01/12] use asyncio to run jobmanager loop --- openeo/extra/job_management.py | 75 +++++++++++++++++++++++++--------- 1 file changed, 56 insertions(+), 19 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index c49af6165..edf83e913 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -1,4 +1,5 @@ import abc +import asyncio import contextlib import datetime import json @@ -150,6 +151,7 @@ def __init__( self._cancel_running_job_after = ( datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None ) + self._loop_task = None def add_backend( self, @@ -252,6 +254,39 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: return df + def start_job_thread(self,start_job: Callable[[], BatchJob], + job_db: JobDatabaseInterface ): + """ + Start running the jobs in a separate thread, returns afterwards. + """ + + # Resume from existing db + _log.info(f"Resuming `run_jobs` from existing {job_db}") + df = job_db.read() + + async def run_loop(): + + while ( + df[ + # TODO: risk on infinite loop if a backend reports a (non-standard) terminal status that is not covered here + (df.status != "finished") + & (df.status != "skipped") + & (df.status != "start_failed") + & (df.status != "error") + & (df.status != "canceled") + ].size + > 0 + ): + + await self._job_update_loop(df, job_db, start_job) + await asyncio.sleep(self.poll_sleep) + + self.loop_task = asyncio.create_task(run_loop()) + + def stop_job_thread(self): + if(self._loop_task is not None): + self._loop_task.cancel() + def run_jobs( self, df: pd.DataFrame, @@ -355,28 +390,30 @@ def run_jobs( > 0 ): - with ignore_connection_errors(context="get statuses"): - self._track_statuses(df) - status_histogram = df.groupby("status").size().to_dict() - _log.info(f"Status histogram: {status_histogram}") - job_db.persist(df) - - if len(df[df.status == "not_started"]) > 0: - # Check number of jobs running at each backend - running = df[(df.status == "created") | (df.status == "queued") | (df.status == "running")] - per_backend = running.groupby("backend_name").size().to_dict() - _log.info(f"Running per backend: {per_backend}") - for backend_name in self.backends: - backend_load = per_backend.get(backend_name, 0) - if backend_load < self.backends[backend_name].parallel_jobs: - to_add = self.backends[backend_name].parallel_jobs - backend_load - to_launch = df[df.status == "not_started"].iloc[0:to_add] - for i in to_launch.index: - self._launch_job(start_job, df, i, backend_name) - job_db.persist(df) + asyncio.run(self._job_update_loop(df, job_db, start_job)) time.sleep(self.poll_sleep) + async def _job_update_loop(self, df, job_db, start_job): + with ignore_connection_errors(context="get statuses"): + self._track_statuses(df) + status_histogram = df.groupby("status").size().to_dict() + _log.info(f"Status histogram: {status_histogram}") + job_db.persist(df) + if len(df[df.status == "not_started"]) > 0: + # Check number of jobs running at each backend + running = df[(df.status == "created") | (df.status == "queued") | (df.status == "running")] + per_backend = running.groupby("backend_name").size().to_dict() + _log.info(f"Running per backend: {per_backend}") + for backend_name in self.backends: + backend_load = per_backend.get(backend_name, 0) + if backend_load < self.backends[backend_name].parallel_jobs: + to_add = self.backends[backend_name].parallel_jobs - backend_load + to_launch = df[df.status == "not_started"].iloc[0:to_add] + for i in to_launch.index: + self._launch_job(start_job, df, i, backend_name) + job_db.persist(df) + def _launch_job(self, start_job, df, i, backend_name): """Helper method for launching jobs From 9b624edd6554bff4aba8cb290bdf845d2a756666 Mon Sep 17 00:00:00 2001 From: Jeroen Dries Date: Thu, 5 Sep 2024 21:09:32 +0200 Subject: [PATCH 02/12] add clean cancelling --- openeo/extra/job_management.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index edf83e913..6595a166e 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -141,6 +141,7 @@ def __init__( .. versionchanged:: 0.32.0 Added `cancel_running_job_after` parameter. """ + self._stop = True self.backends: Dict[str, _Backend] = {} self.poll_sleep = poll_sleep self._connections: Dict[str, _Backend] = {} @@ -264,6 +265,8 @@ def start_job_thread(self,start_job: Callable[[], BatchJob], _log.info(f"Resuming `run_jobs` from existing {job_db}") df = job_db.read() + self._stop = False + async def run_loop(): while ( @@ -275,7 +278,7 @@ async def run_loop(): & (df.status != "error") & (df.status != "canceled") ].size - > 0 + > 0 and not self._stop ): await self._job_update_loop(df, job_db, start_job) @@ -283,8 +286,10 @@ async def run_loop(): self.loop_task = asyncio.create_task(run_loop()) - def stop_job_thread(self): + def stop_job_thread(self, force_timeout_seconds = 30): if(self._loop_task is not None): + self._stop = True + asyncio.sleep(force_timeout_seconds) self._loop_task.cancel() def run_jobs( From 2d44e7e4eea04fce97663e94906b00049270c60a Mon Sep 17 00:00:00 2001 From: Jeroen Dries Date: Fri, 13 Sep 2024 17:57:11 +0200 Subject: [PATCH 03/12] preliminary conversion to use python threading instead of asyncio --- openeo/extra/job_management.py | 45 +++++++-------- tests/extra/test_job_management.py | 93 +++++++++++++++++++++--------- 2 files changed, 85 insertions(+), 53 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 3dffb47de..1ff52c607 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -7,6 +7,7 @@ import time import warnings from pathlib import Path +from threading import Thread from typing import Callable, Dict, NamedTuple, Optional, Union, List import pandas as pd @@ -173,7 +174,7 @@ def __init__( self._cancel_running_job_after = ( datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None ) - self._loop_task = None + self._timer = None def add_backend( self, @@ -288,30 +289,23 @@ def start_job_thread(self,start_job: Callable[[], BatchJob], self._stop = False - async def run_loop(): - - while ( - df[ - # TODO: risk on infinite loop if a backend reports a (non-standard) terminal status that is not covered here - (df.status != "finished") - & (df.status != "skipped") - & (df.status != "start_failed") - & (df.status != "error") - & (df.status != "canceled") - ].size - > 0 and not self._stop + + + def run_loop(): + while (sum(job_db.count_by_status(statuses=["not_started","created","queued","running"]).values()) > 0 and not self._stop ): + self._job_update_loop(df, job_db, start_job) + - await self._job_update_loop(df, job_db, start_job) - await asyncio.sleep(self.poll_sleep) - self.loop_task = asyncio.create_task(run_loop()) + self._timer = Thread(target = run_loop) + self._timer.start() def stop_job_thread(self, force_timeout_seconds = 30): - if(self._loop_task is not None): - self._stop = True - asyncio.sleep(force_timeout_seconds) - self._loop_task.cancel() + self._stop = True + if(self._timer is not None): + self._timer.join(force_timeout_seconds) + def run_jobs( self, @@ -404,17 +398,16 @@ def run_jobs( while ( sum(job_db.count_by_status(statuses=["not_started","created","queued","running"]).values()) > 0 - ): - - asyncio.run(self._job_update_loop(df, job_db, start_job)) - + self._job_update_loop(df, job_db, start_job) time.sleep(self.poll_sleep) - async def _job_update_loop(self, df, job_db, start_job): + def _job_update_loop(self, df, job_db, start_job): with ignore_connection_errors(context="get statuses"): self._track_statuses(job_db) + + not_started = job_db.get_by_status(statuses=["not_started"],max=200) if len(not_started) > 0: # Check number of jobs running at each backend @@ -430,6 +423,8 @@ async def _job_update_loop(self, df, job_db, start_job): self._launch_job(start_job, not_started, i, backend_name) job_db.persist(to_launch) + + def _launch_job(self, start_job, df, i, backend_name): """Helper method for launching jobs diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 0eaaff0ac..04e7c821d 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -5,6 +5,7 @@ import textwrap import threading import time +from time import sleep from typing import Callable, Union from unittest import mock @@ -85,6 +86,69 @@ def sleep_mock(self): yield sleep def test_basic(self, tmp_path, requests_mock, sleep_mock): + manager = self.create_basic_mocked_manager(requests_mock, tmp_path) + + df = pd.DataFrame( + { + "year": [2018, 2019, 2020, 2021, 2022], + # Use simple points in WKT format to test conversion to the geometry dtype + "geometry": ["POINT (1 2)"] * 5, + } + ) + output_file = tmp_path / "jobs.csv" + + def start_job(row, connection, **kwargs): + year = int(row["year"]) + return BatchJob(job_id=f"job-{year}", connection=connection) + + manager.run_jobs(df=df, start_job=start_job, output_file=output_file) + assert sleep_mock.call_count > 10 + + result = pd.read_csv(output_file) + assert len(result) == 5 + assert set(result.status) == {"finished"} + assert set(result.backend_name) == {"foo", "bar"} + + # We expect that the job metadata was saved, so verify that it exists. + # Checking for one of the jobs is enough. + metadata_path = manager.get_job_metadata_path(job_id="job-2022") + assert metadata_path.exists() + + def test_basic_threading(self, tmp_path, requests_mock, sleep_mock): + manager = self.create_basic_mocked_manager(requests_mock, tmp_path) + + df = pd.DataFrame( + { + "year": [2018, 2019, 2020, 2021, 2022], + # Use simple points in WKT format to test conversion to the geometry dtype + "geometry": ["POINT (1 2)"] * 5, + } + ) + output_file = tmp_path / "jobs.csv" + + def start_job(row, connection, **kwargs): + year = int(row["year"]) + return BatchJob(job_id=f"job-{year}", connection=connection) + + df = manager._normalize_df(df) + job_db = CsvJobDatabase(output_file) + job_db.persist(df) + manager.start_job_thread( start_job=start_job,job_db=job_db) + sleep(20) + manager.stop_job_thread(10) + #assert sleep_mock.call_count > 10 + + result = pd.read_csv(output_file) + assert len(result) == 5 + assert set(result.status) == {"finished"} + assert set(result.backend_name) == {"foo", "bar"} + + # We expect that the job metadata was saved, so verify that it exists. + # Checking for one of the jobs is enough. + metadata_path = manager.get_job_metadata_path(job_id="job-2022") + assert metadata_path.exists() + + def create_basic_mocked_manager(self, requests_mock, tmp_path): requests_mock.get("http://foo.test/", json={"api_version": "1.1.0"}) requests_mock.get("http://bar.test/", json={"api_version": "1.1.0"}) @@ -136,38 +200,11 @@ def mock_job_status(job_id, queued=1, running=2): mock_job_status("job-2020", queued=3, running=4) mock_job_status("job-2021", queued=3, running=5) mock_job_status("job-2022", queued=5, running=6) - root_dir = tmp_path / "job_mgr_root" manager = MultiBackendJobManager(root_dir=root_dir) - manager.add_backend("foo", connection=openeo.connect("http://foo.test")) manager.add_backend("bar", connection=openeo.connect("http://bar.test")) - - df = pd.DataFrame( - { - "year": [2018, 2019, 2020, 2021, 2022], - # Use simple points in WKT format to test conversion to the geometry dtype - "geometry": ["POINT (1 2)"] * 5, - } - ) - output_file = tmp_path / "jobs.csv" - - def start_job(row, connection, **kwargs): - year = int(row["year"]) - return BatchJob(job_id=f"job-{year}", connection=connection) - - manager.run_jobs(df=df, start_job=start_job, output_file=output_file) - assert sleep_mock.call_count > 10 - - result = pd.read_csv(output_file) - assert len(result) == 5 - assert set(result.status) == {"finished"} - assert set(result.backend_name) == {"foo", "bar"} - - # We expect that the job metadata was saved, so verify that it exists. - # Checking for one of the jobs is enough. - metadata_path = manager.get_job_metadata_path(job_id="job-2022") - assert metadata_path.exists() + return manager def test_normalize_df(self): df = pd.DataFrame( From 995e530d643262f92198b0d4345c8138cd9dd6ae Mon Sep 17 00:00:00 2001 From: Jeroen Dries Date: Sun, 15 Sep 2024 14:05:02 +0200 Subject: [PATCH 04/12] job manager: changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 18d77e31b..920c651ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `load_stac`/`metadata_from_stac`: add support for extracting actual temporal dimension metadata ([#567](https://github.com/Open-EO/openeo-python-client/issues/567)) - `MultiBackendJobManager`: add `cancel_running_job_after` option to automatically cancel jobs that are running for too long ([#590](https://github.com/Open-EO/openeo-python-client/issues/590)) +- `MultiBackendJobManager`: add API to the update loop in a separate thread, allowing controlled interruption. ### Changed From c7e05446e6f88c9ad7ab4960829b36911854885e Mon Sep 17 00:00:00 2001 From: Jeroen Dries Date: Thu, 19 Sep 2024 18:22:02 +0200 Subject: [PATCH 05/12] PR improvements --- openeo/extra/job_management.py | 51 ++++++++++++++++++++++++++++++---- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 1ff52c607..77b0bfbff 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -281,6 +281,41 @@ def start_job_thread(self,start_job: Callable[[], BatchJob], job_db: JobDatabaseInterface ): """ Start running the jobs in a separate thread, returns afterwards. + + :param start_job: + A callback which will be invoked with, amongst others, + the row of the dataframe for which a job should be created and/or started. + This callable should return a :py:class:`openeo.rest.job.BatchJob` object. + + The following parameters will be passed to ``start_job``: + + ``row`` (:py:class:`pandas.Series`): + The row in the pandas dataframe that stores the jobs state and other tracked data. + + ``connection_provider``: + A getter to get a connection by backend name. + Typically, you would need either the parameter ``connection_provider``, + or the parameter ``connection``, but likely you will not need both. + + ``connection`` (:py:class:`Connection`): + The :py:class:`Connection` itself, that has already been created. + Typically, you would need either the parameter ``connection_provider``, + or the parameter ``connection``, but likely you will not need both. + + ``provider`` (``str``): + The name of the backend that will run the job. + + You do not have to define all the parameters described below, but if you leave + any of them out, then remember to include the ``*args`` and ``**kwargs`` parameters. + Otherwise you will have an exception because :py:meth:`run_jobs` passes unknown parameters to ``start_job``. + :param job_db: + Job database to load/store existing job status data and other metadata from/to. + Can be specified as a path to CSV or Parquet file, + or as a custom database object following the :py:class:`JobDatabaseInterface` interface. + + .. note:: + Support for Parquet files depends on the ``pyarrow`` package + as :ref:`optional dependency `. """ # Resume from existing db @@ -289,22 +324,28 @@ def start_job_thread(self,start_job: Callable[[], BatchJob], self._stop = False - - def run_loop(): while (sum(job_db.count_by_status(statuses=["not_started","created","queued","running"]).values()) > 0 and not self._stop ): self._job_update_loop(df, job_db, start_job) - + time.sleep(self.poll_sleep) self._timer = Thread(target = run_loop) self._timer.start() - def stop_job_thread(self, force_timeout_seconds = 30): + def stop_job_thread(self, timeout_seconds = None): + """ + Try to stop the thread, if timeout_seconds is set, the method will return after the timeout + + :param timeout_seconds: The time to wait for the thread to stop, set to None (default) to wait indefinitely + + """ self._stop = True if(self._timer is not None): - self._timer.join(force_timeout_seconds) + self._timer.join(timeout_seconds) + if(self._timer.is_alive()): + _log.warning("Job thread did not stop after timeout") def run_jobs( From bfd99e347ffdf715b56a2039520e478138cd5b00 Mon Sep 17 00:00:00 2001 From: Jeroen Dries Date: Fri, 20 Sep 2024 08:26:06 +0200 Subject: [PATCH 06/12] add API for clean initialization of job db --- openeo/extra/job_management.py | 35 +++++++++++++++++++++++++++--- tests/extra/test_job_management.py | 8 +++---- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 77b0bfbff..671893a4c 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -267,6 +267,7 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: # TODO: columns "cpu", "memory", "duration" are not referenced directly # within MultiBackendJobManager making it confusing to claim they are required. # However, they are through assumptions about job "usage" metadata in `_track_statuses`. + # => proposed solution: allow to configure usage columns when adding a backend ("cpu", None), ("memory", None), ("duration", None), @@ -277,6 +278,27 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: return df + def initialize_job_db(self,job_db:JobDatabaseInterface, dataframe:pd.DataFrame): + """ + Initialize a job database to be compatible with this job manager. + The provided dataframe should contain all the columns that you need to create your jobs. + + This job manager uses the following reserved column names: status, id, start_time, running_start_time, + cpu, memory, duration, backend_name. + + If the column name already exists in the provided dataframe, it will be assumed that it can be used as is by + this job manager. + + + """ + + if( not job_db.exists()): + dataframe = self._normalize_df(dataframe) + job_db.persist(dataframe) + else: + raise ValueError(f"Job database {job_db} already exists, cannot initialize.") + + def start_job_thread(self,start_job: Callable[[], BatchJob], job_db: JobDatabaseInterface ): """ @@ -312,6 +334,7 @@ def start_job_thread(self,start_job: Callable[[], BatchJob], Job database to load/store existing job status data and other metadata from/to. Can be specified as a path to CSV or Parquet file, or as a custom database object following the :py:class:`JobDatabaseInterface` interface. + The job database should be initialized with :py:meth:`initialize_job_db` before calling this method. .. note:: Support for Parquet files depends on the ``pyarrow`` package @@ -434,8 +457,7 @@ def run_jobs( # Resume from existing db _log.info(f"Resuming `run_jobs` from existing {job_db}") elif df is not None: - df = self._normalize_df(df) - job_db.persist(df) + self.initialize_job_db(job_db,df) while ( sum(job_db.count_by_status(statuses=["not_started","created","queued","running"]).values()) > 0 @@ -644,7 +666,8 @@ def _track_statuses(self, job_db: JobDatabaseInterface): # TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df` for key in job_metadata.get("usage", {}).keys(): - active.loc[i, key] = _format_usage_stat(job_metadata, key) + if key in active.columns: + active.loc[i, key] = _format_usage_stat(job_metadata, key) except OpenEoApiError as e: print(f"error for job {job_id!r} on backend {backend_name}") @@ -725,6 +748,9 @@ def __init__(self, path: Union[str, Path]): super().__init__() self.path = Path(path) + def __str__(self): + return str(self.path) + def exists(self) -> bool: return self.path.exists() @@ -774,6 +800,9 @@ def __init__(self, path: Union[str, Path]): super().__init__() self.path = Path(path) + def __str__(self): + return str(self.path) + def exists(self) -> bool: return self.path.exists() diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 04e7c821d..b98ecc74f 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -130,13 +130,13 @@ def start_job(row, connection, **kwargs): year = int(row["year"]) return BatchJob(job_id=f"job-{year}", connection=connection) - df = manager._normalize_df(df) job_db = CsvJobDatabase(output_file) - job_db.persist(df) + manager.initialize_job_db(job_db,df) + manager.start_job_thread( start_job=start_job,job_db=job_db) - sleep(20) + sleep(5) manager.stop_job_thread(10) - #assert sleep_mock.call_count > 10 + assert sleep_mock.call_count > 10 result = pd.read_csv(output_file) assert len(result) == 5 From 1e126828d76abc6642a0791df2e511b63e7779d4 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Mon, 23 Sep 2024 11:19:35 +0200 Subject: [PATCH 07/12] PR #614 code style cleanup (using darker and isort) --- openeo/extra/job_management.py | 39 ++++++++++++------------------ tests/extra/test_job_management.py | 10 ++------ 2 files changed, 17 insertions(+), 32 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index e7028da3f..60f67efd7 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -1,5 +1,4 @@ import abc -import asyncio import contextlib import datetime import json @@ -8,7 +7,7 @@ import warnings from pathlib import Path from threading import Thread -from typing import Callable, Dict, NamedTuple, Optional, Union, List +from typing import Callable, Dict, List, NamedTuple, Optional, Union import pandas as pd import requests @@ -278,7 +277,7 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: return df - def initialize_job_db(self,job_db:JobDatabaseInterface, dataframe:pd.DataFrame): + def initialize_job_db(self, job_db: JobDatabaseInterface, dataframe: pd.DataFrame): """ Initialize a job database to be compatible with this job manager. The provided dataframe should contain all the columns that you need to create your jobs. @@ -292,15 +291,13 @@ def initialize_job_db(self,job_db:JobDatabaseInterface, dataframe:pd.DataFrame): """ - if( not job_db.exists()): + if not job_db.exists(): dataframe = self._normalize_df(dataframe) job_db.persist(dataframe) else: raise ValueError(f"Job database {job_db} already exists, cannot initialize.") - - def start_job_thread(self,start_job: Callable[[], BatchJob], - job_db: JobDatabaseInterface ): + def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabaseInterface): """ Start running the jobs in a separate thread, returns afterwards. @@ -348,16 +345,17 @@ def start_job_thread(self,start_job: Callable[[], BatchJob], self._stop = False def run_loop(): - while (sum(job_db.count_by_status(statuses=["not_started","created","queued","running"]).values()) > 0 and not self._stop + while ( + sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0 + and not self._stop ): self._job_update_loop(df, job_db, start_job) time.sleep(self.poll_sleep) - - self._timer = Thread(target = run_loop) + self._timer = Thread(target=run_loop) self._timer.start() - def stop_job_thread(self, timeout_seconds = None): + def stop_job_thread(self, timeout_seconds=None): """ Try to stop the thread, if timeout_seconds is set, the method will return after the timeout @@ -365,12 +363,11 @@ def stop_job_thread(self, timeout_seconds = None): """ self._stop = True - if(self._timer is not None): + if self._timer is not None: self._timer.join(timeout_seconds) - if(self._timer.is_alive()): + if self._timer.is_alive(): _log.warning("Job thread did not stop after timeout") - def run_jobs( self, df: Optional[pd.DataFrame], @@ -457,11 +454,9 @@ def run_jobs( # Resume from existing db _log.info(f"Resuming `run_jobs` from existing {job_db}") elif df is not None: - self.initialize_job_db(job_db,df) + self.initialize_job_db(job_db, df) - while ( - sum(job_db.count_by_status(statuses=["not_started","created","queued","running"]).values()) > 0 - ): + while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0: self._job_update_loop(df, job_db, start_job) time.sleep(self.poll_sleep) @@ -469,12 +464,10 @@ def _job_update_loop(self, df, job_db, start_job): with ignore_connection_errors(context="get statuses"): self._track_statuses(job_db) - - - not_started = job_db.get_by_status(statuses=["not_started"],max=200) + not_started = job_db.get_by_status(statuses=["not_started"], max=200) if len(not_started) > 0: # Check number of jobs running at each backend - running = job_db.get_by_status(statuses=["created","queued","running"]) + running = job_db.get_by_status(statuses=["created", "queued", "running"]) per_backend = running.groupby("backend_name").size().to_dict() _log.info(f"Running per backend: {per_backend}") for backend_name in self.backends: @@ -486,8 +479,6 @@ def _job_update_loop(self, df, job_db, start_job): self._launch_job(start_job, not_started, i, backend_name) job_db.persist(to_launch) - - def _launch_job(self, start_job, df, i, backend_name): """Helper method for launching jobs diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index b98ecc74f..7f4647217 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1,10 +1,6 @@ -import datetime import json import re -import sys -import textwrap import threading -import time from time import sleep from typing import Callable, Union from unittest import mock @@ -24,7 +20,6 @@ import pytest import requests import shapely.geometry -import time_machine import openeo import openeo.extra.job_management @@ -35,7 +30,6 @@ MultiBackendJobManager, ParquetJobDatabase, ) -from openeo.rest import OpenEoApiError from openeo.util import rfc3339 @@ -131,9 +125,9 @@ def start_job(row, connection, **kwargs): return BatchJob(job_id=f"job-{year}", connection=connection) job_db = CsvJobDatabase(output_file) - manager.initialize_job_db(job_db,df) + manager.initialize_job_db(job_db, df) - manager.start_job_thread( start_job=start_job,job_db=job_db) + manager.start_job_thread(start_job=start_job, job_db=job_db) sleep(5) manager.stop_job_thread(10) assert sleep_mock.call_count > 10 From 1e80314da3c7b6623c1385ff76d37a05bdeeedc2 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Mon, 23 Sep 2024 11:44:38 +0200 Subject: [PATCH 08/12] PR #614 some minor clarifications --- openeo/extra/job_management.py | 30 +++++++++++++----------------- tests/extra/test_job_management.py | 3 ++- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 60f67efd7..376653a20 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -162,7 +162,7 @@ def __init__( .. versionchanged:: 0.32.0 Added `cancel_running_job_after` parameter. """ - self._stop = True + self._stop_thread = None self.backends: Dict[str, _Backend] = {} self.poll_sleep = poll_sleep self._connections: Dict[str, _Backend] = {} @@ -173,7 +173,7 @@ def __init__( self._cancel_running_job_after = ( datetime.timedelta(seconds=cancel_running_job_after) if cancel_running_job_after is not None else None ) - self._timer = None + self._thread = None def add_backend( self, @@ -287,10 +287,7 @@ def initialize_job_db(self, job_db: JobDatabaseInterface, dataframe: pd.DataFram If the column name already exists in the provided dataframe, it will be assumed that it can be used as is by this job manager. - - """ - if not job_db.exists(): dataframe = self._normalize_df(dataframe) job_db.persist(dataframe) @@ -336,24 +333,25 @@ def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabas .. note:: Support for Parquet files depends on the ``pyarrow`` package as :ref:`optional dependency `. + + .. versionadded:: 0.32.0 """ # Resume from existing db _log.info(f"Resuming `run_jobs` from existing {job_db}") df = job_db.read() - self._stop = False - + self._stop_thread = False def run_loop(): while ( sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0 - and not self._stop + and not self._stop_thread ): self._job_update_loop(df, job_db, start_job) time.sleep(self.poll_sleep) - self._timer = Thread(target=run_loop) - self._timer.start() + self._thread = Thread(target=run_loop) + self._thread.start() def stop_job_thread(self, timeout_seconds=None): """ @@ -361,11 +359,12 @@ def stop_job_thread(self, timeout_seconds=None): :param timeout_seconds: The time to wait for the thread to stop, set to None (default) to wait indefinitely + .. versionadded:: 0.32.0 """ - self._stop = True - if self._timer is not None: - self._timer.join(timeout_seconds) - if self._timer.is_alive(): + self._stop_thread = True + if self._thread is not None: + self._thread.join(timeout_seconds) + if self._thread.is_alive(): _log.warning("Job thread did not stop after timeout") def run_jobs( @@ -685,7 +684,6 @@ def ignore_connection_errors(context: Optional[str] = None, sleep: int = 5): class FullDataFrameJobDatabase(JobDatabaseInterface): - def __init__(self): super().__init__() self._df = None @@ -700,8 +698,6 @@ def count_by_status(self, statuses: List[str]) -> dict: status_histogram = self.df.groupby("status").size().to_dict() return {k:v for k,v in status_histogram.items() if k in statuses} - - def get_by_status(self, statuses, max=None) -> pd.DataFrame: """ Returns a dataframe with jobs, filtered by status. diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 7f4647217..7de045b24 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -128,7 +128,8 @@ def start_job(row, connection, **kwargs): manager.initialize_job_db(job_db, df) manager.start_job_thread(start_job=start_job, job_db=job_db) - sleep(5) + # Trigger context switch to job thread + sleep(1) manager.stop_job_thread(10) assert sleep_mock.call_count > 10 From 95a4ec7051b6b1ab5da7e6d12efd783543065a36 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Mon, 23 Sep 2024 12:08:34 +0200 Subject: [PATCH 09/12] PR #614 job thread: do micro-sleeps for quick exit and avoid infinite wait (by default) --- openeo/extra/job_management.py | 24 +++++++++++++++++++----- tests/extra/test_job_management.py | 2 +- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 376653a20..004b90af4 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -32,6 +32,9 @@ class _Backend(NamedTuple): MAX_RETRIES = 5 +# Sentinel value to indicate that a parameter was not set +_UNSET = object() + class JobDatabaseInterface(metaclass=abc.ABCMeta): """ @@ -348,24 +351,35 @@ def run_loop(): and not self._stop_thread ): self._job_update_loop(df, job_db, start_job) - time.sleep(self.poll_sleep) + + # Do sequence of micro-sleeps to allow for quick thread exit + for _ in range(int(max(1, self.poll_sleep))): + time.sleep(1) + if self._stop_thread: + break self._thread = Thread(target=run_loop) self._thread.start() - def stop_job_thread(self, timeout_seconds=None): + def stop_job_thread(self, timeout_seconds: Optional[float] = _UNSET): """ - Try to stop the thread, if timeout_seconds is set, the method will return after the timeout + Stop the job polling thread. - :param timeout_seconds: The time to wait for the thread to stop, set to None (default) to wait indefinitely + :param timeout_seconds: The time to wait for the thread to stop. + By default, it will wait for 2 times the poll_sleep time. + Set to None to wait indefinitely. .. versionadded:: 0.32.0 """ - self._stop_thread = True if self._thread is not None: + self._stop_thread = True + if timeout_seconds is _UNSET: + timeout_seconds = 2 * self.poll_sleep self._thread.join(timeout_seconds) if self._thread.is_alive(): _log.warning("Job thread did not stop after timeout") + else: + _log.error("No job thread to stop") def run_jobs( self, diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 7de045b24..041b9851b 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -130,7 +130,7 @@ def start_job(row, connection, **kwargs): manager.start_job_thread(start_job=start_job, job_db=job_db) # Trigger context switch to job thread sleep(1) - manager.stop_job_thread(10) + manager.stop_job_thread() assert sleep_mock.call_count > 10 result = pd.read_csv(output_file) From 219f972aedf4a78b43fecb71cb0d2342a5c084ea Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 27 Sep 2024 12:25:30 +0200 Subject: [PATCH 10/12] PR #614 drop `initialize_job_db` from scope of this PR db initialization API needs some more thought, which is for another PR --- openeo/extra/job_management.py | 30 +++++++----------------------- tests/extra/test_job_management.py | 17 ++++++++++++++++- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index 004b90af4..ebe6e96d2 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -259,6 +259,7 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: :param df: The dataframe to normalize. :return: a new dataframe that is normalized. """ + # TODO: this was originally an internal helper, but we need a clean public API for the user # check for some required columns. required_with_default = [ @@ -280,23 +281,6 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame: return df - def initialize_job_db(self, job_db: JobDatabaseInterface, dataframe: pd.DataFrame): - """ - Initialize a job database to be compatible with this job manager. - The provided dataframe should contain all the columns that you need to create your jobs. - - This job manager uses the following reserved column names: status, id, start_time, running_start_time, - cpu, memory, duration, backend_name. - - If the column name already exists in the provided dataframe, it will be assumed that it can be used as is by - this job manager. - """ - if not job_db.exists(): - dataframe = self._normalize_df(dataframe) - job_db.persist(dataframe) - else: - raise ValueError(f"Job database {job_db} already exists, cannot initialize.") - def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabaseInterface): """ Start running the jobs in a separate thread, returns afterwards. @@ -331,7 +315,6 @@ def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabas Job database to load/store existing job status data and other metadata from/to. Can be specified as a path to CSV or Parquet file, or as a custom database object following the :py:class:`JobDatabaseInterface` interface. - The job database should be initialized with :py:meth:`initialize_job_db` before calling this method. .. note:: Support for Parquet files depends on the ``pyarrow`` package @@ -467,7 +450,8 @@ def run_jobs( # Resume from existing db _log.info(f"Resuming `run_jobs` from existing {job_db}") elif df is not None: - self.initialize_job_db(job_db, df) + df = self._normalize_df(df) + job_db.persist(df) while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0: self._job_update_loop(df, job_db, start_job) @@ -749,8 +733,8 @@ def __init__(self, path: Union[str, Path]): super().__init__() self.path = Path(path) - def __str__(self): - return str(self.path) + def __repr__(self): + return f"{self.__class__.__name__}({str(self.path)!r})" def exists(self) -> bool: return self.path.exists() @@ -801,8 +785,8 @@ def __init__(self, path: Union[str, Path]): super().__init__() self.path = Path(path) - def __str__(self): - return str(self.path) + def __repr__(self): + return f"{self.__class__.__name__}({str(self.path)!r})" def exists(self) -> bool: return self.path.exists() diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 041b9851b..9ed6463d5 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -125,7 +125,8 @@ def start_job(row, connection, **kwargs): return BatchJob(job_id=f"job-{year}", connection=connection) job_db = CsvJobDatabase(output_file) - manager.initialize_job_db(job_db, df) + # TODO: avoid private _normalize_df API + job_db.persist(manager._normalize_df(df)) manager.start_job_thread(start_job=start_job, job_db=job_db) # Trigger context switch to job thread @@ -595,6 +596,13 @@ def test_automatic_cancel_of_too_long_running_jobs( class TestCsvJobDatabase: + + def test_repr(self, tmp_path): + path = tmp_path / "db.csv" + db = CsvJobDatabase(path) + assert repr(db) == f"CsvJobDatabase('{path!s}')" + assert str(db) == f"CsvJobDatabase('{path!s}')" + def test_read_wkt(self, tmp_path): wkt_df = pd.DataFrame( { @@ -680,6 +688,13 @@ def test_partial_read_write(self, tmp_path, orig: pandas.DataFrame): class TestParquetJobDatabase: + + def test_repr(self, tmp_path): + path = tmp_path / "db.csv" + db = ParquetJobDatabase(path) + assert repr(db) == f"ParquetJobDatabase('{path!s}')" + assert str(db) == f"ParquetJobDatabase('{path!s}')" + @pytest.mark.parametrize( ["orig"], [ From 062846cb1248d557b0250e4052d52d7875ee8217 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 27 Sep 2024 12:50:44 +0200 Subject: [PATCH 11/12] fixup! PR #614 drop `initialize_job_db` from scope of this PR --- tests/extra/test_job_management.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index 9ed6463d5..42908508a 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -600,8 +600,8 @@ class TestCsvJobDatabase: def test_repr(self, tmp_path): path = tmp_path / "db.csv" db = CsvJobDatabase(path) - assert repr(db) == f"CsvJobDatabase('{path!s}')" - assert str(db) == f"CsvJobDatabase('{path!s}')" + assert re.match(r"CsvJobDatabase\('[^']+\.csv'\)", repr(db)) + assert re.match(r"CsvJobDatabase\('[^']+\.csv'\)", str(db)) def test_read_wkt(self, tmp_path): wkt_df = pd.DataFrame( @@ -690,10 +690,10 @@ def test_partial_read_write(self, tmp_path, orig: pandas.DataFrame): class TestParquetJobDatabase: def test_repr(self, tmp_path): - path = tmp_path / "db.csv" + path = tmp_path / "db.pq" db = ParquetJobDatabase(path) - assert repr(db) == f"ParquetJobDatabase('{path!s}')" - assert str(db) == f"ParquetJobDatabase('{path!s}')" + assert re.match(r"ParquetJobDatabase\('[^']+\.pq'\)", repr(db)) + assert re.match(r"ParquetJobDatabase\('[^']+\.pq'\)", str(db)) @pytest.mark.parametrize( ["orig"], From a60e101a58ba356c598a6f17b8e633752b4a45c9 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Fri, 27 Sep 2024 12:39:32 +0200 Subject: [PATCH 12/12] fixup! PR #614 drop `initialize_job_db` from scope of this PR --- openeo/extra/job_management.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index ebe6e96d2..84a347e6b 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -458,6 +458,11 @@ def run_jobs( time.sleep(self.poll_sleep) def _job_update_loop(self, df, job_db, start_job): + """ + Inner loop logic of job management: + go through the necessary jobs to check for status updates, + trigger status events, start new jobs when there is room for them, etc. + """ with ignore_connection_errors(context="get statuses"): self._track_statuses(job_db)