Skip to content

Commit

Permalink
added output_file as an argument of JobTrackerStorage class #571
Browse files Browse the repository at this point in the history
  • Loading branch information
VincentVerelst committed Jul 3, 2024
1 parent 305ac2a commit fd5f071
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 61 deletions.
70 changes: 35 additions & 35 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,27 @@ def _make_resilient(self, connection):
connection.session.mount("https://", HTTPAdapter(max_retries=retries))
connection.session.mount("http://", HTTPAdapter(max_retries=retries))

def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
"""Ensure we have the required columns and the expected type for the geometry column.
:param df: The dataframe to normalize.
:return: a new dataframe that is normalized.
"""

# check for some required columns.
required_with_default = [
("status", "not_started"),
("id", None),
("start_time", None),
("cpu", None),
("memory", None),
("duration", None),
("backend_name", None),
]
new_columns = {col: val for (col, val) in required_with_default if col not in df.columns}
df = df.assign(**new_columns)

return df

def run_jobs(
self,
Expand Down Expand Up @@ -220,9 +240,9 @@ def run_jobs(
"""
# TODO: Defining start_jobs as a Protocol might make its usage more clear, and avoid complicated doctrings,
# but Protocols are only supported in Python 3.8 and higher.
job_tracker_storage = JobTrackerStorage()
df = job_tracker_storage.resume_df(df, output_file)
df = job_tracker_storage.normalize_df(df)
job_tracker_storage = JobTrackerStorage(output_file)
df = job_tracker_storage.resume_df(df)
df = self._normalize_df(df)

while (
df[
Expand All @@ -237,7 +257,7 @@ def run_jobs(
self._update_statuses(df)
status_histogram = df.groupby("status").size().to_dict()
_log.info(f"Status histogram: {status_histogram}")
job_tracker_storage.persists(df, output_file)
job_tracker_storage.persists(df)

if len(df[df.status == "not_started"]) > 0:
# Check number of jobs running at each backend
Expand All @@ -257,7 +277,7 @@ def run_jobs(
to_launch = df[df.status == "not_started"].iloc[0:to_add]
for i in to_launch.index:
self._launch_job(start_job, df, i, backend_name)
job_tracker_storage.persists(df, output_file)
job_tracker_storage.persists(df)

time.sleep(self.poll_sleep)

Expand Down Expand Up @@ -431,51 +451,31 @@ class JobTrackerStorage:
Helper to manage the storage of batch job metadata.
"""

def __init__(self):
pass
def __init__(self, output_file: Union[str, Path]):
self.output_file = output_file

def normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
"""Ensure we have the required columns and the expected type for the geometry column.
:param df: The dataframe to normalize.
:return: a new dataframe that is normalized.
"""

# check for some required columns.
required_with_default = [
("status", "not_started"),
("id", None),
("start_time", None),
("cpu", None),
("memory", None),
("duration", None),
("backend_name", None),
]
new_columns = {col: val for (col, val) in required_with_default if col not in df.columns}
df = df.assign(**new_columns)

return df
def resume_df(self, df: pd.DataFrame, output_file: Union[str, Path]) -> pd.DataFrame:
def resume_df(self, df: pd.DataFrame) -> pd.DataFrame:
"""Resume job tracker from an existing CSV file if it exists.
:param df: User input job dataframe. To be overwritten if output_file exists.
:param output_file: Path to the output CSV file. If it exists, the job tracker will be resumed from it.
:return: The resumed dataframe.
"""
output_file = Path(output_file)
if output_file.exists() and output_file.is_file():
output_path = Path(self.output_file)
if output_path.exists() and output_path.is_file():
# Resume from existing CSV
_log.info(f"Resuming `run_jobs` from {output_file.absolute()}")
df = pd.read_csv(output_file)
_log.info(f"Resuming `run_jobs` from {output_path.absolute()}")
df = pd.read_csv(output_path)
status_histogram = df.groupby("status").size().to_dict()
_log.info(f"Status histogram: {status_histogram}")
return df

def persists(self, df, output_file):
def persists(self, df):
"""Persist the dataframe to the output_file.
:param df: The job tracker dataframe.
:param output_file: Path to the output CSV file.
"""
df.to_csv(output_file, index=False)
_log.info(f"Wrote job metadata to {output_file}")
df.to_csv(self.output_file, index=False)
_log.info(f"Wrote job metadata to {self.output_file}")
53 changes: 27 additions & 26 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,28 @@ def start_job(row, connection, **kwargs):
metadata_path = manager.get_job_metadata_path(job_id="job-2022")
assert metadata_path.exists()

def test_normalize_df(self):
df = pd.DataFrame(
{
"some_number": [3, 2, 1],
}
)

df_normalized = MultiBackendJobManager()._normalize_df(df)

assert set(df_normalized.columns) == set(
[
"some_number",
"status",
"id",
"start_time",
"cpu",
"memory",
"duration",
"backend_name",
]
)

def test_manager_must_exit_when_all_jobs_done(self, tmp_path, requests_mock, sleep_mock):
"""Make sure the MultiBackendJobManager does not hang after all processes finish.
Expand Down Expand Up @@ -434,27 +456,6 @@ def start_job(row, connection_provider, connection, **kwargs):


class TestJobTrackerStorage:
def test_normalize_df(self):
df = pd.DataFrame(
{
"some_number": [3, 2, 1],
}
)

df_normalized = JobTrackerStorage().normalize_df(df)

assert set(df_normalized.columns) == set(
[
"some_number",
"status",
"id",
"start_time",
"cpu",
"memory",
"duration",
"backend_name",
]
)

def test_resume_df_from_existing_path(self, tmp_path):
existing_df = pd.DataFrame(
Expand All @@ -471,7 +472,7 @@ def test_resume_df_from_existing_path(self, tmp_path):
)
dir = tmp_path / "job_tracker.csv"
existing_df.to_csv(dir, index=False)
df_resumed = JobTrackerStorage().resume_df(new_df, dir)
df_resumed = JobTrackerStorage(dir).resume_df(new_df)
pd.testing.assert_frame_equal(existing_df, df_resumed)

def test_resume_df_from_non_existing_path(self, tmp_path):
Expand All @@ -482,7 +483,7 @@ def test_resume_df_from_non_existing_path(self, tmp_path):
}
)
dir = tmp_path / "non_existing_job_tracker.csv"
df_resumed = JobTrackerStorage().resume_df(new_df, dir)
df_resumed = JobTrackerStorage(dir).resume_df(new_df)
pd.testing.assert_frame_equal(new_df, df_resumed)

def test_persists(self, tmp_path):
Expand All @@ -491,6 +492,6 @@ def test_persists(self, tmp_path):
"some_number": [3, 2, 1],
}
)

JobTrackerStorage().persists(df, tmp_path / "job_tracker.csv")
assert pd.read_csv(tmp_path / "job_tracker.csv").equals(df)
dir = tmp_path / "job_tracker.csv"
JobTrackerStorage(dir).persists(df)
assert pd.read_csv(dir).equals(df)

0 comments on commit fd5f071

Please sign in to comment.