diff --git a/CHANGELOG.md b/CHANGELOG.md index 480ced93b..2ce502b7f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - `MultiBackendJobManager`: Avoid `SettingWithCopyWarning` ([#641](https://github.com/Open-EO/openeo-python-client/issues/641)) - Avoid creating empty file if asset download request failed. - `MultiBackendJobManager`: avoid dtype loading mistakes in `CsvJobDatabase` on empty columns ([#656](https://github.com/Open-EO/openeo-python-client/issues/656)) +- `MultiBackendJobManager`: restore logging of job status histogram during `run_jobs` ([#655](https://github.com/Open-EO/openeo-python-client/issues/655)) ## [0.34.0] - 2024-10-31 diff --git a/openeo/extra/job_management.py b/openeo/extra/job_management.py index a84d8dadc..4b56a9f25 100644 --- a/openeo/extra/job_management.py +++ b/openeo/extra/job_management.py @@ -10,7 +10,17 @@ import warnings from pathlib import Path from threading import Thread -from typing import Any, Callable, Dict, List, Mapping, NamedTuple, Optional, Union +from typing import ( + Any, + Callable, + Dict, + Iterable, + List, + Mapping, + NamedTuple, + Optional, + Union, +) import numpy import pandas as pd @@ -80,10 +90,12 @@ def persist(self, df: pd.DataFrame): ... @abc.abstractmethod - def count_by_status(self, statuses: List[str]) -> dict: + def count_by_status(self, statuses: Iterable[str] = ()) -> dict: """ Retrieve the number of jobs per status. + :param statuses: List/set of statuses to include. If empty, all statuses are included. + :return: dictionary with status as key and the count as value. """ ... @@ -355,12 +367,18 @@ def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabas self._stop_thread = False def run_loop(): + + # TODO: support user-provided `stats` + stats = collections.defaultdict(int) + while ( sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0 and not self._stop_thread ): self._job_update_loop(job_db=job_db, start_job=start_job) + stats["run_jobs loop"] += 1 + _log.info(f"Job status histogram: {job_db.count_by_status()}. Run stats: {dict(stats)}") # Do sequence of micro-sleeps to allow for quick thread exit for _ in range(int(max(1, self.poll_sleep))): time.sleep(1) @@ -479,11 +497,15 @@ def run_jobs( # TODO: start showing deprecation warnings for this usage pattern? job_db.initialize_from_df(df) + # TODO: support user-provided `stats` stats = collections.defaultdict(int) + while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0: self._job_update_loop(job_db=job_db, start_job=start_job, stats=stats) stats["run_jobs loop"] += 1 + # Show current stats and sleep + _log.info(f"Job status histogram: {job_db.count_by_status()}. Run stats: {dict(stats)}") time.sleep(self.poll_sleep) stats["sleep"] += 1 @@ -791,9 +813,12 @@ def df(self) -> pd.DataFrame: self._df = self.read() return self._df - def count_by_status(self, statuses: List[str]) -> dict: + def count_by_status(self, statuses: Iterable[str] = ()) -> dict: status_histogram = self.df.groupby("status").size().to_dict() - return {k:v for k,v in status_histogram.items() if k in statuses} + statuses = set(statuses) + if statuses: + status_histogram = {k: v for k, v in status_histogram.items() if k in statuses} + return status_histogram def get_by_status(self, statuses, max=None) -> pd.DataFrame: """ diff --git a/tests/extra/test_job_management.py b/tests/extra/test_job_management.py index e484c27d3..4c04f5512 100644 --- a/tests/extra/test_job_management.py +++ b/tests/extra/test_job_management.py @@ -1,5 +1,6 @@ import copy import json +import logging import re import threading from pathlib import Path @@ -616,7 +617,17 @@ def test_empty_csv_handling(self, tmp_path, sleep_mock, recwarn, job_manager): assert [(w.category, w.message, str(w)) for w in recwarn.list] == [] + def test_status_logging(self, tmp_path, job_manager, job_manager_root_dir, sleep_mock, caplog): + caplog.set_level(logging.INFO) + df = pd.DataFrame({"year": [2018, 2019, 2020, 2021, 2022]}) + job_db_path = tmp_path / "jobs.csv" + job_db = CsvJobDatabase(job_db_path).initialize_from_df(df) + + run_stats = job_manager.run_jobs(job_db=job_db, start_job=self._create_year_job) + assert run_stats == dirty_equals.IsPartialDict({"start_job call": 5, "job finished": 5}) + needle = re.compile(r"Job status histogram:.*'queued': 4.*Run stats:.*'start_job call': 4") + assert needle.search(caplog.text) JOB_DB_DF_BASICS = pd.DataFrame( @@ -681,7 +692,7 @@ def test_initialize_from_df_on_exists_error(self, tmp_path, db_class): @pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase]) def test_initialize_from_df_on_exists_skip(self, tmp_path, db_class): - path = tmp_path / "jobs.csv" + path = tmp_path / "jobs.db" db = db_class(path).initialize_from_df( pd.DataFrame({"some_number": [3, 2, 1]}), @@ -695,6 +706,42 @@ def test_initialize_from_df_on_exists_skip(self, tmp_path, db_class): ) assert set(db.read()["some_number"]) == {1, 2, 3} + @pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase]) + def test_count_by_status(self, tmp_path, db_class): + path = tmp_path / "jobs.db" + + db = db_class(path).initialize_from_df( + pd.DataFrame( + { + "status": [ + "not_started", + "created", + "queued", + "queued", + "queued", + "running", + "running", + "finished", + "finished", + "error", + ] + } + ) + ) + assert db.count_by_status(statuses=["not_started"]) == {"not_started": 1} + assert db.count_by_status(statuses=("not_started", "running")) == {"not_started": 1, "running": 2} + assert db.count_by_status(statuses={"finished", "error"}) == {"error": 1, "finished": 2} + + # All statuses by default + assert db.count_by_status() == { + "created": 1, + "error": 1, + "finished": 2, + "not_started": 1, + "queued": 3, + "running": 2, + } + class TestCsvJobDatabase: