Skip to content

Commit

Permalink
Issue #571/#595 make JobDatabaseInterface an abstract base class
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Jul 23, 2024
1 parent cdc6661 commit 1ce55e5
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 19 deletions.
22 changes: 15 additions & 7 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import abc
import contextlib
import datetime
import json
Expand Down Expand Up @@ -32,19 +33,27 @@ class _Backend(NamedTuple):
MAX_RETRIES = 5


class JobDatabaseInterface:
class JobDatabaseInterface(metaclass=abc.ABCMeta):
"""
Interface for a database of job metadata to use with the :py:class:`MultiBackendJobManager`,
allowing to regularly persist the job metadata while polling the job statuses
and resume/restart the job tracking after it was interrupted.
"""

@abc.abstractmethod
def exists(self) -> bool:
"""Is there existing data in the database?"""
return False
"""Does the job database already exist, to read job data from?"""
...

@abc.abstractmethod
def read(self) -> pd.DataFrame:
"""Read job data from the database ad pandas DataFrame."""
raise NotImplementedError()
"""Read job data from the database as pandas DataFrame."""
...

@abc.abstractmethod
def persist(self, df: pd.DataFrame):
"""Store job data to the database."""
raise NotImplementedError()
...


class MultiBackendJobManager:
Expand Down Expand Up @@ -512,7 +521,6 @@ def ignore_connection_errors(context: Optional[str] = None):
time.sleep(5)



class CsvJobDatabase(JobDatabaseInterface):
def __init__(self, path: Union[str, Path]):
self.path = Path(path)
Expand Down
32 changes: 20 additions & 12 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,26 +481,34 @@ def test_read_non_wkt(self, tmp_path):
df = CsvJobDatabase(path).read()
assert isinstance(df.geometry[0], str)

def test_persist(self, tmp_path):
df = pd.DataFrame(
def test_persist_and_read(self, tmp_path):
orig = pd.DataFrame(
{
"some_number": [3, 2, 1],
"numbers": [3, 2, 1],
"names": ["apple", "banana", "coconut"],
}
)

path = tmp_path / "jobs.csv"
CsvJobDatabase(path).persist(df)
assert CsvJobDatabase(path).read().equals(df)
CsvJobDatabase(path).persist(orig)
assert path.exists()

loaded = CsvJobDatabase(path).read()
assert list(loaded.dtypes) == list(orig.dtypes)
assert loaded.equals(orig)


class TestParquetJobDatabase:
def test_read_persist(self, tmp_path):
df = pd.DataFrame(
def test_persist_and_read(self, tmp_path):
orig = pd.DataFrame(
{
"some_number": [3, 2, 1],
"numbers": [3, 2, 1],
"names": ["apple", "banana", "coconut"],
}
)

path = tmp_path / "jobs.parquet"
ParquetJobDatabase(path).persist(df)
assert ParquetJobDatabase(path).read().equals(df)
ParquetJobDatabase(path).persist(orig)
assert path.exists()

loaded = ParquetJobDatabase(path).read()
assert list(loaded.dtypes) == list(orig.dtypes)
assert loaded.equals(orig)

0 comments on commit 1ce55e5

Please sign in to comment.