Skip to content

Commit

Permalink
Issue #635 add basic "on_exists" modes to initialize_from_df
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Oct 7, 2024
1 parent e77de9e commit 8784c5a
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 5 deletions.
18 changes: 13 additions & 5 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,6 @@ def _normalize_df(df: pd.DataFrame) -> pd.DataFrame:
:param df: The dataframe to normalize.
:return: a new dataframe that is normalized.
"""
# TODO: this was originally an internal helper, but we need a clean public API for the user

# check for some required columns.
required_with_default = [
("status", "not_started"),
Expand Down Expand Up @@ -699,20 +697,30 @@ def __init__(self):
super().__init__()
self._df = None

def initialize_from_df(self, df: pd.DataFrame):
def initialize_from_df(self, df: pd.DataFrame, on_exists: str = "error"):
"""
Initialize the job database from a given dataframe,
which will be first normalized to be compatible
with :py:class:`MultiBackendJobManager` usage.
:param df: data frame with some columns that
:param df: dataframe with some columns your ``start_job`` callable expects
:param on_exists: what to do when the job database already exists (persisted on disk):
- "error": (default) raise an exception
- "skip": work with existing database, ignore given dataframe and skip any initialization
:return: initialized job database.
.. versionadded:: 0.33.0
"""
# TODO: option to provide custom MultiBackendJobManager subclass with custom normalize?
if self.exists():
raise RuntimeError(f"Job database {self!r} already exists.")
if on_exists == "skip":
return self
elif on_exists == "error":
raise FileExistsError(f"Job database {self!r} already exists.")
else:
# TODO handle other on_exists modes: e.g. overwrite, merge, ...
raise ValueError(f"Invalid on_exists={on_exists!r}")
df = MultiBackendJobManager._normalize_df(df)
self.persist(df)
# Return self to allow chaining with constructor.
Expand Down
102 changes: 102 additions & 0 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,29 @@ def start_job(row, connection, **kwargs):
metadata_path = manager.get_job_metadata_path(job_id="job-2022")
assert metadata_path.exists()

@pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase])
def test_db_class(self, tmp_path, requests_mock, sleep_mock, db_class):
"""
Basic run parameterized on database class
"""
manager = self._create_basic_mocked_manager(requests_mock, tmp_path)

def start_job(row, connection, **kwargs):
year = int(row["year"])
return BatchJob(job_id=f"job-{year}", connection=connection)

df = pd.DataFrame({"year": [2018, 2019, 2020, 2021, 2022]})
output_file = tmp_path / "jobs.db"
job_db = db_class(output_file).initialize_from_df(df)

manager.run_jobs(job_db=job_db, start_job=start_job)
assert sleep_mock.call_count > 10

result = job_db.read()
assert len(result) == 5
assert set(result.status) == {"finished"}
assert set(result.backend_name) == {"foo", "bar"}

def test_basic_threading(self, tmp_path, requests_mock, sleep_mock):
manager = self._create_basic_mocked_manager(requests_mock, tmp_path)

Expand Down Expand Up @@ -626,6 +649,63 @@ def test_automatic_cancel_of_too_long_running_jobs(
)


class TestFullDataFrameJobDatabase:
@pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase])
def test_initialize_from_df(self, tmp_path, db_class):
orig_df = pd.DataFrame({"some_number": [3, 2, 1]})
path = tmp_path / "jobs.db"

db = db_class(path)
assert not path.exists()
db.initialize_from_df(orig_df)
assert path.exists()

# Check persisted CSV
assert path.exists()
expected_columns = {
"some_number",
"status",
"id",
"start_time",
"running_start_time",
"cpu",
"memory",
"duration",
"backend_name",
}

actual_columns = set(db_class(path).read().columns)
assert actual_columns == expected_columns

@pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase])
def test_initialize_from_df_on_exists_error(self, tmp_path, db_class):
df = pd.DataFrame({"some_number": [3, 2, 1]})
path = tmp_path / "jobs.csv"
_ = db_class(path).initialize_from_df(df, on_exists="error")
assert path.exists()

with pytest.raises(FileExistsError, match="Job database.* already exists"):
_ = db_class(path).initialize_from_df(df, on_exists="error")

assert set(db_class(path).read()["some_number"]) == {1, 2, 3}

@pytest.mark.parametrize("db_class", [CsvJobDatabase, ParquetJobDatabase])
def test_initialize_from_df_on_exists_skip(self, tmp_path, db_class):
path = tmp_path / "jobs.csv"

db = db_class(path).initialize_from_df(
pd.DataFrame({"some_number": [3, 2, 1]}),
on_exists="skip",
)
assert set(db.read()["some_number"]) == {1, 2, 3}

db = db_class(path).initialize_from_df(
pd.DataFrame({"some_number": [444, 555, 666]}),
on_exists="skip",
)
assert set(db.read()["some_number"]) == {1, 2, 3}


class TestCsvJobDatabase:

def test_repr(self, tmp_path):
Expand Down Expand Up @@ -745,6 +825,28 @@ def test_initialize_from_df(self, tmp_path):
assert raw_columns == expected_columns
assert read_columns == expected_columns

def test_initialize_from_df_on_exists_error(self, tmp_path):
orig_df = pd.DataFrame({"some_number": [3, 2, 1]})
path = tmp_path / "jobs.csv"
_ = CsvJobDatabase(path).initialize_from_df(orig_df, on_exists="error")
with pytest.raises(FileExistsError, match="Job database.* already exists"):
_ = CsvJobDatabase(path).initialize_from_df(orig_df, on_exists="error")

def test_initialize_from_df_on_exists_skip(self, tmp_path):
path = tmp_path / "jobs.csv"

db = CsvJobDatabase(path).initialize_from_df(
pd.DataFrame({"some_number": [3, 2, 1]}),
on_exists="skip",
)
assert set(db.read()["some_number"]) == {1, 2, 3}

db = CsvJobDatabase(path).initialize_from_df(
pd.DataFrame({"some_number": [444, 555, 666]}),
on_exists="skip",
)
assert set(db.read()["some_number"]) == {1, 2, 3}


class TestParquetJobDatabase:

Expand Down

0 comments on commit 8784c5a

Please sign in to comment.