Skip to content

Commit

Permalink
Issue #513 add auto_add_save_result option to download/create_job/exe…
Browse files Browse the repository at this point in the history
…cute_batch
  • Loading branch information
soxofaan committed Sep 26, 2024
1 parent 58f0205 commit cfd9f1a
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 51 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- `MultiBackendJobManager`: add `cancel_running_job_after` option to automatically cancel jobs that are running for too long ([#590](https://github.com/Open-EO/openeo-python-client/issues/590))
- Added `openeo.api.process.Parameter` helper to easily create a "spatial_extent" UDP parameter
- Wrap OIDC token request failure in more descriptive `OidcException` (related to [#624](https://github.com/Open-EO/openeo-python-client/issues/624))
- Added `auto_add_save_result` option (on by default) to disable automatic addition of `save_result` node on `download`/`create_job`/`execute_batch` ([#513](https://github.com/Open-EO/openeo-python-client/issues/513))

### Changed

Expand Down
11 changes: 8 additions & 3 deletions openeo/rest/_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
from openeo.rest.vectorcube import VectorCube


class OpeneoTestingException(Exception):
pass


class DummyBackend:
"""
Dummy backend that handles sync/batch execution requests
Expand Down Expand Up @@ -75,7 +79,7 @@ def _handle_post_jobs(self, request, context):
def _get_job_id(self, request) -> str:
match = re.match(r"^/jobs/(job-\d+)(/|$)", request.path)
if not match:
raise ValueError(f"Failed to extract job_id from {request.path}")
raise OpeneoTestingException(f"Failed to extract job_id from {request.path}")
job_id = match.group(1)
assert job_id in self.batch_jobs
return job_id
Expand Down Expand Up @@ -132,13 +136,14 @@ def get_pg(self, process_id: Optional[str] = None) -> dict:
:return: process graph (flat graph representation) or process graph node
"""
pgs = self.sync_requests + [b["pg"] for b in self.batch_jobs.values()]
assert len(pgs) == 1
if len(pgs) != 1:
raise OpeneoTestingException(f"Expected single process graph, but collected {len(pgs)}")
pg = pgs[0]
if process_id:
# Just return single node (by process_id)
found = [node for node in pg.values() if node.get("process_id") == process_id]
if len(found) != 1:
raise RuntimeError(
raise OpeneoTestingException(
f"Expected single process graph node with process_id {process_id!r}, but found {len(found)}: {found}"
)
return found[0]
Expand Down
70 changes: 46 additions & 24 deletions openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -2103,6 +2103,7 @@ def download(
options: Optional[dict] = None,
*,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
) -> Union[None, bytes]:
"""
Execute synchronously and download the raster data cube, e.g. as GeoTIFF.
Expand All @@ -2115,17 +2116,24 @@ def download(
:param options: Optional, file format options
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
:return: None if the result is stored to disk, or a bytes object returned by the backend.
.. versionchanged:: 0.32.0
Added ``auto_add_save_result`` option
"""
# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
cube = _ensure_save_result(
cube=self,
format=format,
options=options,
weak_format=guess_format(outputfile) if outputfile else None,
default_format=self._DEFAULT_RASTER_FORMAT,
method="DataCube.download()",
)
cube = self
if auto_add_save_result:
cube = _ensure_save_result(
cube=cube,
format=format,
options=options,
weak_format=guess_format(outputfile) if outputfile else None,
default_format=self._DEFAULT_RASTER_FORMAT,
method="DataCube.download()",
)
return self._connection.download(cube.flat_graph(), outputfile, validate=validate)

def validate(self) -> List[dict]:
Expand Down Expand Up @@ -2228,6 +2236,7 @@ def execute_batch(
connection_retry_interval: float = 30,
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
# TODO: deprecate `format_options` as keyword arguments
**format_options,
) -> BatchJob:
Expand All @@ -2242,22 +2251,28 @@ def execute_batch(
:param job_options:
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
.. versionchanged:: 0.32.0
Added ``auto_add_save_result`` option
"""
# TODO: start showing deprecation warnings about these inconsistent argument names
if "format" in format_options and not out_format:
out_format = format_options["format"] # align with 'download' call arg name

# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
cube = _ensure_save_result(
cube=self,
format=out_format,
options=format_options,
weak_format=guess_format(outputfile) if outputfile else None,
default_format=self._DEFAULT_RASTER_FORMAT,
method="DataCube.execute_batch()",
)
cube = self
if auto_add_save_result:
cube = _ensure_save_result(
cube=cube,
format=out_format,
options=format_options,
weak_format=guess_format(outputfile) if outputfile else None,
default_format=self._DEFAULT_RASTER_FORMAT,
method="DataCube.execute_batch()",
)

job = cube.create_job(job_options=job_options, validate=validate)
job = cube.create_job(job_options=job_options, validate=validate, auto_add_save_result=False)
return job.run_synchronous(
outputfile=outputfile,
print=print, max_poll_interval=max_poll_interval, connection_retry_interval=connection_retry_interval
Expand All @@ -2273,6 +2288,7 @@ def create_job(
budget: Optional[float] = None,
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
# TODO: avoid `format_options` as keyword arguments
**format_options,
) -> BatchJob:
Expand All @@ -2293,19 +2309,25 @@ def create_job(
:param job_options: custom job options.
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
:return: Created job.
.. versionchanged:: 0.32.0
Added ``auto_add_save_result`` option
"""
# TODO: add option to also automatically start the job?
# TODO: avoid using all kwargs as format_options
# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
cube = _ensure_save_result(
cube=self,
format=out_format,
options=format_options or None,
default_format=self._DEFAULT_RASTER_FORMAT,
method="DataCube.create_job()",
)
cube = self
if auto_add_save_result:
cube = _ensure_save_result(
cube=cube,
format=out_format,
options=format_options or None,
default_format=self._DEFAULT_RASTER_FORMAT,
method="DataCube.create_job()",
)
return self._connection.create_job(
process_graph=cube.flat_graph(),
title=title,
Expand Down
68 changes: 44 additions & 24 deletions openeo/rest/vectorcube.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ def download(
options: Optional[dict] = None,
*,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
) -> Union[None, bytes]:
"""
Execute synchronously and download the vector cube.
Expand All @@ -221,20 +222,25 @@ def download(
:param options: (optional) additional output format options.
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
.. versionchanged:: 0.21.0
When not specified explicitly, output format is guessed from output file extension.
.. versionchanged:: 0.32.0
Added ``auto_add_save_result`` option
"""
# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
cube = _ensure_save_result(
cube=self,
format=format,
options=options,
weak_format=guess_format(outputfile) if outputfile else None,
default_format=self._DEFAULT_VECTOR_FORMAT,
method="VectorCube.download()",
)
cube = self
if auto_add_save_result:
cube = _ensure_save_result(
cube=cube,
format=format,
options=options,
weak_format=guess_format(outputfile) if outputfile else None,
default_format=self._DEFAULT_VECTOR_FORMAT,
method="VectorCube.download()",
)
return self._connection.download(cube.flat_graph(), outputfile=outputfile, validate=validate)

def execute_batch(
Expand All @@ -247,6 +253,7 @@ def execute_batch(
connection_retry_interval: float = 30,
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
# TODO: avoid using kwargs as format options
**format_options,
) -> BatchJob:
Expand All @@ -262,19 +269,25 @@ def execute_batch(
:param format_options: (optional) additional output format options
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
.. versionchanged:: 0.21.0
When not specified explicitly, output format is guessed from output file extension.
.. versionchanged:: 0.32.0
Added ``auto_add_save_result`` option
"""
cube = _ensure_save_result(
cube=self,
format=out_format,
options=format_options,
weak_format=guess_format(outputfile) if outputfile else None,
default_format=self._DEFAULT_VECTOR_FORMAT,
method="VectorCube.execute_batch()",
)
job = cube.create_job(job_options=job_options, validate=validate)
cube = self
if auto_add_save_result:
cube = _ensure_save_result(
cube=cube,
format=out_format,
options=format_options,
weak_format=guess_format(outputfile) if outputfile else None,
default_format=self._DEFAULT_VECTOR_FORMAT,
method="VectorCube.execute_batch()",
)
job = cube.create_job(job_options=job_options, validate=validate, auto_add_save_result=False)
return job.run_synchronous(
# TODO #135 support multi file result sets too
outputfile=outputfile,
Expand All @@ -291,6 +304,7 @@ def create_job(
budget: Optional[float] = None,
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
**format_options,
) -> BatchJob:
"""
Expand All @@ -306,18 +320,24 @@ def create_job(
:param format_options: String Parameters for the job result format
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
:return: Created job.
.. versionchanged:: 0.32.0
Added ``auto_add_save_result`` option
"""
# TODO: avoid using all kwargs as format_options
# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
cube = _ensure_save_result(
cube=self,
format=out_format,
options=format_options or None,
default_format=self._DEFAULT_VECTOR_FORMAT,
method="VectorCube.create_job()",
)
cube = self
if auto_add_save_result:
cube = _ensure_save_result(
cube=cube,
format=out_format,
options=format_options or None,
default_format=self._DEFAULT_VECTOR_FORMAT,
method="VectorCube.create_job()",
)
return self._connection.create_job(
process_graph=cube.flat_graph(),
title=title,
Expand Down
22 changes: 22 additions & 0 deletions tests/rest/datacube/test_datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,17 @@ def test_save_result_and_create_job_both_with_format(self, s2cube, save_result_f
):
cube.create_job(out_format=execute_format)

@pytest.mark.parametrize(
["auto_add_save_result", "process_ids"],
[
(True, {"load_collection", "save_result"}),
(False, {"load_collection"}),
],
)
def test_create_job_auto_add_save_result(self, s2cube, dummy_backend, auto_add_save_result, process_ids):
s2cube.create_job(auto_add_save_result=auto_add_save_result)
assert set(n["process_id"] for n in dummy_backend.get_pg().values()) == process_ids

def test_execute_batch_defaults(self, s2cube, get_create_job_pg, recwarn, caplog):
s2cube.execute_batch()
pg = get_create_job_pg()
Expand Down Expand Up @@ -942,6 +953,17 @@ def test_save_result_format_options_vs_execute_batch(elf, s2cube, get_create_job
"result": True,
}

@pytest.mark.parametrize(
["auto_add_save_result", "process_ids"],
[
(True, {"load_collection", "save_result"}),
(False, {"load_collection"}),
],
)
def test_execute_batch_auto_add_save_result(self, s2cube, dummy_backend, auto_add_save_result, process_ids):
s2cube.execute_batch(auto_add_save_result=auto_add_save_result)
assert set(n["process_id"] for n in dummy_backend.get_pg().values()) == process_ids


class TestDataCubeValidation:
"""
Expand Down
13 changes: 13 additions & 0 deletions tests/rest/datacube/test_datacube100.py
Original file line number Diff line number Diff line change
Expand Up @@ -3337,6 +3337,19 @@ def post_result(request, context):
assert post_result_mock.call_count == 1


@pytest.mark.parametrize(
["auto_add_save_result", "process_ids"],
[
(True, {"load_collection", "save_result"}),
(False, {"load_collection"}),
],
)
def test_download_auto_add_save_result(s2cube, dummy_backend, tmp_path, auto_add_save_result, process_ids):
path = tmp_path / "result.tiff"
s2cube.download(path, auto_add_save_result=auto_add_save_result)
assert set(n["process_id"] for n in dummy_backend.get_pg().values()) == process_ids


class TestBatchJob:
_EXPECTED_SIMPLE_S2_JOB = {"process": {"process_graph": {
"loadcollection1": {
Expand Down
Loading

0 comments on commit cfd9f1a

Please sign in to comment.