From f8d523cf2594ca601158b6275cdbe186993c8ddf Mon Sep 17 00:00:00 2001 From: Johan Schreurs Date: Wed, 4 Oct 2023 17:42:24 +0200 Subject: [PATCH] Issue #404 Code review, more robust error handling during validation --- openeo/rest/connection.py | 46 +++++++++++++++++------------------ openeo/rest/datacube.py | 8 +++--- openeo/rest/vectorcube.py | 8 +++--- tests/rest/test_connection.py | 22 +++++++++++++++++ 4 files changed, 52 insertions(+), 32 deletions(-) diff --git a/openeo/rest/connection.py b/openeo/rest/connection.py index f9569ad4f..f9be08936 100644 --- a/openeo/rest/connection.py +++ b/openeo/rest/connection.py @@ -1052,15 +1052,8 @@ def validate_process_graph(self, process_graph: dict) -> List[dict]: :param process_graph: (flat) dict representing process graph :return: list of errors (dictionaries with "code" and "message" fields) """ - # TODO: sometimes process_graph is already in the graph. Should we really *always* add it? - # Was getting errors in some new unit tests because of the double process_graph but - # perhaps the error is really not here but somewhere else that adds process_graph - # when it should not? Still needs to be confirmed. - if "process_graph" not in process_graph: - request = {"process_graph": process_graph} - else: - request = process_graph - return self.post(path="/validation", json=request, expected_status=200).json()["errors"] + graph = self._build_request_with_process_graph(process_graph)["process"] + return self.post(path="/validation", json=graph, expected_status=200).json()["errors"] @property def _api_version(self) -> ComparableVersion: @@ -1482,18 +1475,23 @@ def _build_request_with_process_graph(self, process_graph: Union[dict, FlatGraph return result def _warn_if_process_graph_invalid(self, process_graph: Union[dict, FlatGraphableMixin, str, Path]): - if not self.capabilities().supports_endpoint("/validation", "POST"): - return - - graph = as_flat_graph(process_graph) - if "process_graph" not in graph: - graph = {"process_graph": graph} - - validation_errors = self.validate_process_graph(process_graph=graph) - if validation_errors: - _log.warning( - "Process graph is not valid. Validation errors:\n" + "\n".join(e["message"] for e in validation_errors) - ) + # At present, the intention is that a failed validation does not block + # the job from running, it is only reported as a warning. + # Therefor we also want to continue when something *else* goes wrong + # *during* the validation. + try: + if not self.capabilities().supports_endpoint("/validation", "POST"): + return + + graph = self._build_request_with_process_graph(process_graph)["process"] + validation_errors = self.validate_process_graph(process_graph=graph) + if validation_errors: + _log.warning( + "Process graph is not valid. Validation errors:\n" + + "\n".join(e["message"] for e in validation_errors) + ) + except Exception: + _log.warning("Could not validate the process graph", exc_info=True) # TODO: unify `download` and `execute` better: e.g. `download` always writes to disk, `execute` returns result (raw or as JSON decoded dict) def download( @@ -1501,7 +1499,7 @@ def download( graph: Union[dict, FlatGraphableMixin, str, Path], outputfile: Union[Path, str, None] = None, timeout: Optional[int] = None, - validate: Optional[bool] = True, + validate: bool = True, ) -> Union[None, bytes]: """ Downloads the result of a process graph synchronously, @@ -1536,7 +1534,7 @@ def execute( self, process_graph: Union[dict, str, Path], timeout: Optional[int] = None, - validate: Optional[bool] = True, + validate: bool = True, ): """ Execute a process graph synchronously and return the result (assumed to be JSON). @@ -1565,7 +1563,7 @@ def create_job( plan: Optional[str] = None, budget: Optional[float] = None, additional: Optional[dict] = None, - validate: Optional[bool] = True, + validate: bool = True, ) -> BatchJob: """ Create a new job from given process graph on the back-end. diff --git a/openeo/rest/datacube.py b/openeo/rest/datacube.py index 5231e523c..8ce692459 100644 --- a/openeo/rest/datacube.py +++ b/openeo/rest/datacube.py @@ -1945,7 +1945,7 @@ def download( outputfile: Optional[Union[str, pathlib.Path]] = None, format: Optional[str] = None, options: Optional[dict] = None, - validate: Optional[bool] = True, + validate: bool = True, ) -> Union[None, bytes]: """ Execute synchronously and download the raster data cube, e.g. as GeoTIFF. @@ -2063,7 +2063,7 @@ def execute_batch( max_poll_interval: float = 60, connection_retry_interval: float = 30, job_options: Optional[dict] = None, - validate: Optional[bool] = True, + validate: bool = True, # TODO: avoid `format_options` as keyword arguments **format_options, ) -> BatchJob: @@ -2098,7 +2098,7 @@ def create_job( plan: Optional[str] = None, budget: Optional[float] = None, job_options: Optional[dict] = None, - validate: Optional[bool] = True, + validate: bool = True, # TODO: avoid `format_options` as keyword arguments **format_options, ) -> BatchJob: @@ -2164,7 +2164,7 @@ def save_user_defined_process( returns=returns, categories=categories, examples=examples, links=links, ) - def execute(self, validate: Optional[bool] = True) -> dict: + def execute(self, validate: bool = True) -> dict: """Executes the process graph of the imagery. """ return self._connection.execute(self.flat_graph(), validate=validate) diff --git a/openeo/rest/vectorcube.py b/openeo/rest/vectorcube.py index 8d2271c72..633f562d4 100644 --- a/openeo/rest/vectorcube.py +++ b/openeo/rest/vectorcube.py @@ -226,7 +226,7 @@ def _ensure_save_result( cube = self.save_result(format=format or "GeoJSON", options=options) return cube - def execute(self, validate: Optional[bool] = True) -> dict: + def execute(self, validate: bool = True) -> dict: """Executes the process graph of the imagery.""" return self._connection.execute(self.flat_graph(), validate=validate) @@ -235,7 +235,7 @@ def download( outputfile: Optional[Union[str, pathlib.Path]] = None, format: Optional[str] = None, options: Optional[dict] = None, - validate: Optional[bool] = True, + validate: bool = True, ) -> Union[None, bytes]: """ Execute synchronously and download the vector cube. @@ -267,7 +267,7 @@ def execute_batch( max_poll_interval: float = 60, connection_retry_interval: float = 30, job_options: Optional[dict] = None, - validate: Optional[bool] = True, + validate: bool = True, # TODO: avoid using kwargs as format options **format_options, ) -> BatchJob: @@ -305,7 +305,7 @@ def create_job( plan: Optional[str] = None, budget: Optional[float] = None, job_options: Optional[dict] = None, - validate: Optional[bool] = True, + validate: bool = True, **format_options, ) -> BatchJob: """ diff --git a/tests/rest/test_connection.py b/tests/rest/test_connection.py index 0f2f8c7ac..ef7cca40a 100644 --- a/tests/rest/test_connection.py +++ b/tests/rest/test_connection.py @@ -3255,6 +3255,28 @@ def validation(request, context): assert caplog.messages == ["Process graph is not valid. Validation errors:\nInvalid process graph"] assert m.call_count == 1 + @pytest.mark.parametrize("pg_json", [PG_INVALID_INNER, PG_INVALID_OUTER]) + def test_download_pg_json_handles_other_exception_during_validation_gracefully( + self, requests_mock, connection_with_pgvalidation, tmp_path, pg_json: str, caplog + ): + caplog.set_level(logging.WARNING) + requests_mock.post(API_URL + "result", content=self._post_result_handler_tiff_invalid_pg) + + exception_message = "Testing for errors that are not due to invalid graphs." + + def validation(request, context): + raise Exception(exception_message) + + m = requests_mock.post(API_URL + "validation", json=validation) + + output = tmp_path / "result.tiff" + connection_with_pgvalidation.download(pg_json, outputfile=output, validate=True) + + assert output.read_bytes() == b"TIFF data" + assert caplog.messages[0].startswith("Could not validate the process graph") + assert caplog.text.endswith(exception_message + "\n") + assert m.call_count == 1 + @pytest.mark.parametrize("pg_json", [PG_JSON_1, PG_JSON_2]) def test_execute_pg_json(self, requests_mock, pg_json: str): requests_mock.get(API_URL, json={"api_version": "1.0.0"})