Skip to content

Commit

Permalink
Issue #635: cleaner job manager API for initializing the job database
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Oct 7, 2024
1 parent f7f035f commit de9c427
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 19 deletions.
29 changes: 25 additions & 4 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,8 +258,11 @@ def _make_resilient(connection):
connection.session.mount("https://", HTTPAdapter(max_retries=retries))
connection.session.mount("http://", HTTPAdapter(max_retries=retries))

def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
"""Ensure we have the required columns and the expected type for the geometry column.
@staticmethod
def _normalize_df(df: pd.DataFrame) -> pd.DataFrame:
"""
Normalize given pandas dataframe (creating a new one):
ensure we have the required columns.
:param df: The dataframe to normalize.
:return: a new dataframe that is normalized.
Expand Down Expand Up @@ -456,8 +459,7 @@ def run_jobs(
_log.info(f"Resuming `run_jobs` from existing {job_db}")
elif df is not None:
# TODO: start showing deprecation warnings for this usage pattern?
df = self._normalize_df(df)
job_db.persist(df)
job_db.initialize_from_df(df)

while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0:
self._job_update_loop(job_db=job_db, start_job=start_job)
Expand Down Expand Up @@ -697,6 +699,25 @@ def __init__(self):
super().__init__()
self._df = None

def initialize_from_df(self, df: pd.DataFrame):
"""
Initialize the job database from a given dataframe,
which will be first normalized to be compatible
with :py:class:`MultiBackendJobManager` usage.
:param df: data frame with some columns that
:return: initialized job database.
.. versionadded:: 0.33.0
"""
# TODO: option to provide custom MultiBackendJobManager subclass with custom normalize?
if self.exists():
raise RuntimeError(f"Job database {self!r} already exists.")
df = MultiBackendJobManager._normalize_df(df)
self.persist(df)
# Return self to allow chaining with constructor.
return self

@property
def df(self) -> pd.DataFrame:
if self._df is None:
Expand Down
72 changes: 57 additions & 15 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,7 @@ def start_job(row, connection, **kwargs):
year = int(row["year"])
return BatchJob(job_id=f"job-{year}", connection=connection)

job_db = CsvJobDatabase(output_file)
# TODO #636 avoid this cumbersome pattern using private _normalize_df API
job_db.persist(manager._normalize_df(df))
job_db = CsvJobDatabase(output_file).initialize_from_df(df)

manager.run_jobs(job_db=job_db, start_job=start_job)
assert sleep_mock.call_count > 10
Expand Down Expand Up @@ -164,9 +162,7 @@ def start_job(row, connection, **kwargs):
year = int(row["year"])
return BatchJob(job_id=f"job-{year}", connection=connection)

job_db = CsvJobDatabase(output_file)
# TODO #636 avoid this cumbersome pattern using private _normalize_df API
job_db.persist(manager._normalize_df(df))
job_db = CsvJobDatabase(output_file).initialize_from_df(df)

manager.start_job_thread(start_job=start_job, job_db=job_db)
# Trigger context switch to job thread
Expand Down Expand Up @@ -244,14 +240,8 @@ def mock_job_status(job_id, queued=1, running=2):
return manager

def test_normalize_df(self):
df = pd.DataFrame(
{
"some_number": [3, 2, 1],
}
)

df_normalized = MultiBackendJobManager()._normalize_df(df)

df = pd.DataFrame({"some_number": [3, 2, 1]})
df_normalized = MultiBackendJobManager._normalize_df(df)
assert set(df_normalized.columns) == set(
[
"some_number",
Expand Down Expand Up @@ -695,7 +685,7 @@ def test_persist_and_read(self, tmp_path, orig: pandas.DataFrame):
],
)
def test_partial_read_write(self, tmp_path, orig: pandas.DataFrame):
path = tmp_path / "jobs.parquet"
path = tmp_path / "jobs.csv"

required_with_default = [
("status", "not_started"),
Expand Down Expand Up @@ -726,6 +716,34 @@ def test_partial_read_write(self, tmp_path, orig: pandas.DataFrame):
assert all.loc[2,"status"] == "not_started"
print(loaded.index)

def test_initialize_from_df(self, tmp_path):
orig_df = pd.DataFrame({"some_number": [3, 2, 1]})
path = tmp_path / "jobs.csv"

# Initialize the CSV from the dataframe
_ = CsvJobDatabase(path).initialize_from_df(orig_df)

# Check persisted CSV
assert path.exists()
expected_columns = {
"some_number",
"status",
"id",
"start_time",
"running_start_time",
"cpu",
"memory",
"duration",
"backend_name",
}

# Raw file content check
raw_columns = set(path.read_text().split("\n")[0].split(","))
# Higher level read
read_columns = set(CsvJobDatabase(path).read().columns)

assert raw_columns == expected_columns
assert read_columns == expected_columns


class TestParquetJobDatabase:
Expand Down Expand Up @@ -753,3 +771,27 @@ def test_persist_and_read(self, tmp_path, orig: pandas.DataFrame):
assert loaded.dtypes.to_dict() == orig.dtypes.to_dict()
assert loaded.equals(orig)
assert type(orig) is type(loaded)

def test_initialize_from_df(self, tmp_path):
orig_df = pd.DataFrame({"some_number": [3, 2, 1]})
path = tmp_path / "jobs.parquet"

# Initialize the CSV from the dataframe
_ = ParquetJobDatabase(path).initialize_from_df(orig_df)

# Check persisted CSV
assert path.exists()
expected_columns = {
"some_number",
"status",
"id",
"start_time",
"running_start_time",
"cpu",
"memory",
"duration",
"backend_name",
}

df_from_disk = ParquetJobDatabase(path).read()
assert set(df_from_disk.columns) == expected_columns

0 comments on commit de9c427

Please sign in to comment.