Skip to content

Commit

Permalink
Issue #404 Code review, more robust error handling during validation
Browse files Browse the repository at this point in the history
  • Loading branch information
JohanKJSchreurs committed Oct 4, 2023
1 parent 41dd4ec commit f8d523c
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 32 deletions.
46 changes: 22 additions & 24 deletions openeo/rest/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1052,15 +1052,8 @@ def validate_process_graph(self, process_graph: dict) -> List[dict]:
:param process_graph: (flat) dict representing process graph
:return: list of errors (dictionaries with "code" and "message" fields)
"""
# TODO: sometimes process_graph is already in the graph. Should we really *always* add it?
# Was getting errors in some new unit tests because of the double process_graph but
# perhaps the error is really not here but somewhere else that adds process_graph
# when it should not? Still needs to be confirmed.
if "process_graph" not in process_graph:
request = {"process_graph": process_graph}
else:
request = process_graph
return self.post(path="/validation", json=request, expected_status=200).json()["errors"]
graph = self._build_request_with_process_graph(process_graph)["process"]
return self.post(path="/validation", json=graph, expected_status=200).json()["errors"]

@property
def _api_version(self) -> ComparableVersion:
Expand Down Expand Up @@ -1482,26 +1475,31 @@ def _build_request_with_process_graph(self, process_graph: Union[dict, FlatGraph
return result

def _warn_if_process_graph_invalid(self, process_graph: Union[dict, FlatGraphableMixin, str, Path]):
if not self.capabilities().supports_endpoint("/validation", "POST"):
return

graph = as_flat_graph(process_graph)
if "process_graph" not in graph:
graph = {"process_graph": graph}

validation_errors = self.validate_process_graph(process_graph=graph)
if validation_errors:
_log.warning(
"Process graph is not valid. Validation errors:\n" + "\n".join(e["message"] for e in validation_errors)
)
# At present, the intention is that a failed validation does not block
# the job from running, it is only reported as a warning.
# Therefor we also want to continue when something *else* goes wrong
# *during* the validation.
try:
if not self.capabilities().supports_endpoint("/validation", "POST"):
return

graph = self._build_request_with_process_graph(process_graph)["process"]
validation_errors = self.validate_process_graph(process_graph=graph)
if validation_errors:
_log.warning(
"Process graph is not valid. Validation errors:\n"
+ "\n".join(e["message"] for e in validation_errors)
)
except Exception:
_log.warning("Could not validate the process graph", exc_info=True)

# TODO: unify `download` and `execute` better: e.g. `download` always writes to disk, `execute` returns result (raw or as JSON decoded dict)
def download(
self,
graph: Union[dict, FlatGraphableMixin, str, Path],
outputfile: Union[Path, str, None] = None,
timeout: Optional[int] = None,
validate: Optional[bool] = True,
validate: bool = True,
) -> Union[None, bytes]:
"""
Downloads the result of a process graph synchronously,
Expand Down Expand Up @@ -1536,7 +1534,7 @@ def execute(
self,
process_graph: Union[dict, str, Path],
timeout: Optional[int] = None,
validate: Optional[bool] = True,
validate: bool = True,
):
"""
Execute a process graph synchronously and return the result (assumed to be JSON).
Expand Down Expand Up @@ -1565,7 +1563,7 @@ def create_job(
plan: Optional[str] = None,
budget: Optional[float] = None,
additional: Optional[dict] = None,
validate: Optional[bool] = True,
validate: bool = True,
) -> BatchJob:
"""
Create a new job from given process graph on the back-end.
Expand Down
8 changes: 4 additions & 4 deletions openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -1945,7 +1945,7 @@ def download(
outputfile: Optional[Union[str, pathlib.Path]] = None,
format: Optional[str] = None,
options: Optional[dict] = None,
validate: Optional[bool] = True,
validate: bool = True,
) -> Union[None, bytes]:
"""
Execute synchronously and download the raster data cube, e.g. as GeoTIFF.
Expand Down Expand Up @@ -2063,7 +2063,7 @@ def execute_batch(
max_poll_interval: float = 60,
connection_retry_interval: float = 30,
job_options: Optional[dict] = None,
validate: Optional[bool] = True,
validate: bool = True,
# TODO: avoid `format_options` as keyword arguments
**format_options,
) -> BatchJob:
Expand Down Expand Up @@ -2098,7 +2098,7 @@ def create_job(
plan: Optional[str] = None,
budget: Optional[float] = None,
job_options: Optional[dict] = None,
validate: Optional[bool] = True,
validate: bool = True,
# TODO: avoid `format_options` as keyword arguments
**format_options,
) -> BatchJob:
Expand Down Expand Up @@ -2164,7 +2164,7 @@ def save_user_defined_process(
returns=returns, categories=categories, examples=examples, links=links,
)

def execute(self, validate: Optional[bool] = True) -> dict:
def execute(self, validate: bool = True) -> dict:
"""Executes the process graph of the imagery. """
return self._connection.execute(self.flat_graph(), validate=validate)

Expand Down
8 changes: 4 additions & 4 deletions openeo/rest/vectorcube.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def _ensure_save_result(
cube = self.save_result(format=format or "GeoJSON", options=options)
return cube

def execute(self, validate: Optional[bool] = True) -> dict:
def execute(self, validate: bool = True) -> dict:
"""Executes the process graph of the imagery."""
return self._connection.execute(self.flat_graph(), validate=validate)

Expand All @@ -235,7 +235,7 @@ def download(
outputfile: Optional[Union[str, pathlib.Path]] = None,
format: Optional[str] = None,
options: Optional[dict] = None,
validate: Optional[bool] = True,
validate: bool = True,
) -> Union[None, bytes]:
"""
Execute synchronously and download the vector cube.
Expand Down Expand Up @@ -267,7 +267,7 @@ def execute_batch(
max_poll_interval: float = 60,
connection_retry_interval: float = 30,
job_options: Optional[dict] = None,
validate: Optional[bool] = True,
validate: bool = True,
# TODO: avoid using kwargs as format options
**format_options,
) -> BatchJob:
Expand Down Expand Up @@ -305,7 +305,7 @@ def create_job(
plan: Optional[str] = None,
budget: Optional[float] = None,
job_options: Optional[dict] = None,
validate: Optional[bool] = True,
validate: bool = True,
**format_options,
) -> BatchJob:
"""
Expand Down
22 changes: 22 additions & 0 deletions tests/rest/test_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3255,6 +3255,28 @@ def validation(request, context):
assert caplog.messages == ["Process graph is not valid. Validation errors:\nInvalid process graph"]
assert m.call_count == 1

@pytest.mark.parametrize("pg_json", [PG_INVALID_INNER, PG_INVALID_OUTER])
def test_download_pg_json_handles_other_exception_during_validation_gracefully(
self, requests_mock, connection_with_pgvalidation, tmp_path, pg_json: str, caplog
):
caplog.set_level(logging.WARNING)
requests_mock.post(API_URL + "result", content=self._post_result_handler_tiff_invalid_pg)

exception_message = "Testing for errors that are not due to invalid graphs."

def validation(request, context):
raise Exception(exception_message)

m = requests_mock.post(API_URL + "validation", json=validation)

output = tmp_path / "result.tiff"
connection_with_pgvalidation.download(pg_json, outputfile=output, validate=True)

assert output.read_bytes() == b"TIFF data"
assert caplog.messages[0].startswith("Could not validate the process graph")
assert caplog.text.endswith(exception_message + "\n")
assert m.call_count == 1

@pytest.mark.parametrize("pg_json", [PG_JSON_1, PG_JSON_2])
def test_execute_pg_json(self, requests_mock, pg_json: str):
requests_mock.get(API_URL, json={"api_version": "1.0.0"})
Expand Down

0 comments on commit f8d523c

Please sign in to comment.