From e94a42ccded5239dcd4c8965e85ce72b8816c7b3 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Thu, 2 Apr 2020 12:11:31 +0200 Subject: [PATCH 1/2] Issue #129/EP-3352/#125: fix 1.0-style process_graph submit requests --- openeo/rest/connection.py | 63 ++++---- openeo/rest/datacube.py | 2 +- openeo/rest/imagecollectionclient.py | 2 +- tests/data/0.4.0/aggregate_zonal_path.json | 104 +++++++------ tests/data/0.4.0/aggregate_zonal_polygon.json | 144 +++++++++--------- tests/data/1.0.0/aggregate_zonal_path.json | 102 ++++++------- tests/data/1.0.0/aggregate_zonal_polygon.json | 144 +++++++++--------- tests/rest/datacube/test_datacube100.py | 14 +- tests/rest/test_connection.py | 20 +++ tests/rest/test_job.py | 24 ++- 10 files changed, 327 insertions(+), 292 deletions(-) diff --git a/openeo/rest/connection.py b/openeo/rest/connection.py index a12eb44d2..06d23af7f 100644 --- a/openeo/rest/connection.py +++ b/openeo/rest/connection.py @@ -365,12 +365,10 @@ def load_collection(self, collection_id: str, **kwargs) -> ImageCollectionClient # Legacy alias. imagecollection = load_collection - - def create_service(self, graph, type, **kwargs): + def create_service(self, graph: dict, type: str, **kwargs): # TODO: type hint for graph: is it a nested or a flat one? - kwargs["process_graph"] = graph - kwargs["type"] = type - response = self.post("/services", json=kwargs, expected_status=201) + req = self._build_request_with_process_graph(process_graph=graph, type=type, **kwargs) + response = self.post(path="/services", json=req, expected_status=201) return { 'url': response.headers.get('Location'), 'service_id': response.headers.get("OpenEO-Identifier"), @@ -411,43 +409,48 @@ def create_file(self, path): # No endpoint just returns a file object. raise NotImplementedError() + def _build_request_with_process_graph(self, process_graph: dict, **kwargs) -> dict: + """ + Prepare a json payload with a process graph to submit to /result, /services, /jobs, ... + :param process_graph: flat dict representing a process graph + """ + result = kwargs + if self._api_version.at_least("1.0.0"): + result["process"] = {"process_graph": process_graph} + else: + result["process_graph"] = process_graph + return result + # TODO: Maybe rename to execute and merge with execute(). - def download(self, graph, outputfile): + def download(self, graph: dict, outputfile): """ Downloads the result of a process graph synchronously, and save the result to the given file. This method is useful to export binary content such as images. For json content, the execute method is recommended. - :param graph: Dict representing a process graph + :param graph: (flat) dict representing a process graph :param outputfile: output file - :param format_options: formating options - :return: job_id: String """ - request = {"process_graph": graph} - download_url = self.build_url("/result") - r = self.post(download_url, json=request, stream=True, timeout=1000) + request = self._build_request_with_process_graph(process_graph=graph) + r = self.post(path="/result", json=request, stream=True, timeout=1000) with pathlib.Path(outputfile).open(mode="wb") as f: shutil.copyfileobj(r.raw, f) - def execute(self, process_graph, output_format, output_parameters=None, budget=None): + def execute(self, process_graph: dict): """ Execute a process graph synchronously. - :param process_graph: Dict representing a process graph - :param output_format: String Output format of the execution - :param output_parameters: Dict of additional output parameters - :param budget: Budget - :return: job_id: String + :param process_graph: (flat) dict representing a process graph """ - # TODO: add output_format to execution - return self.post(path="/result", json=process_graph).json() + req = self._build_request_with_process_graph(process_graph=process_graph) + return self.post(path="/result", json=req).json() - def create_job(self, process_graph: Dict, title: str = None, description: str = None, + def create_job(self, process_graph: dict, title: str = None, description: str = None, plan: str = None, budget=None, additional: Dict = None) -> RESTJob: """ Posts a job to the back end. - :param process_graph: String data of the job (e.g. process graph) + :param process_graph: (flat) dict representing process graph :param title: String title of the job :param description: String description of the job :param plan: billing plan @@ -456,17 +459,15 @@ def create_job(self, process_graph: Dict, title: str = None, description: str = :return: job_id: String Job id of the new created job """ # TODO move all this (RESTJob factory) logic to RESTJob? - process_graph = { - "process_graph": process_graph, - "title": title, - "description": description, - "plan": plan, - "budget": budget - } + req = self._build_request_with_process_graph( + process_graph=process_graph, + title=title, description=description, plan=plan, budget=budget + ) if additional: - process_graph["job_options"] = additional + # TODO: get rid of this non-standard field? + req["job_options"] = additional - response = self.post("/jobs", process_graph) + response = self.post("/jobs", json=req) if "openeo-identifier" in response.headers: job_id = response.headers['openeo-identifier'] diff --git a/openeo/rest/datacube.py b/openeo/rest/datacube.py index ee2f59cc1..70be8bf3b 100644 --- a/openeo/rest/datacube.py +++ b/openeo/rest/datacube.py @@ -876,7 +876,7 @@ def send_job(self, out_format=None, job_options=None, **format_options) -> Job: def execute(self) -> Dict: """Executes the process graph of the imagery. """ - return self._connection.execute({"process_graph": self._pg.flatten()}, "") + return self._connection.execute(self._pg.flatten()) def to_graphviz(self): """ diff --git a/openeo/rest/imagecollectionclient.py b/openeo/rest/imagecollectionclient.py index 9a667e949..86f1178a3 100644 --- a/openeo/rest/imagecollectionclient.py +++ b/openeo/rest/imagecollectionclient.py @@ -1036,7 +1036,7 @@ def execute(self) -> Dict: """Executes the process graph of the imagery. """ newbuilder = self.builder.shallow_copy() newbuilder.processes[self.node_id]['result'] = True - return self.session.execute({"process_graph": newbuilder.processes},"") + return self.session.execute(newbuilder.processes) ####### HELPER methods ####### diff --git a/tests/data/0.4.0/aggregate_zonal_path.json b/tests/data/0.4.0/aggregate_zonal_path.json index a60b6cb41..d4f6f2bbf 100644 --- a/tests/data/0.4.0/aggregate_zonal_path.json +++ b/tests/data/0.4.0/aggregate_zonal_path.json @@ -1,61 +1,59 @@ { - "process_graph": { - "filterbbox1": { - "process_id": "filter_bbox", - "arguments": { - "data": { - "from_node": "loadcollection1" - }, - "extent": { - "west": 3, - "east": 6, - "north": 52, - "south": 50, - "crs": "EPSG:4326" - } + "filterbbox1": { + "process_id": "filter_bbox", + "arguments": { + "data": { + "from_node": "loadcollection1" }, - "result": false + "extent": { + "west": 3, + "east": 6, + "north": 52, + "south": 50, + "crs": "EPSG:4326" + } }, - "loadcollection1": { - "process_id": "load_collection", - "arguments": { - "id": "S2", - "spatial_extent": null, - "temporal_extent": null - }, - "result": false + "result": false + }, + "loadcollection1": { + "process_id": "load_collection", + "arguments": { + "id": "S2", + "spatial_extent": null, + "temporal_extent": null }, - "readvector1": { - "arguments": { - "filename": "/some/path/to/GeometryCollection.geojson" - }, - "process_id": "read_vector", - "result": false + "result": false + }, + "readvector1": { + "arguments": { + "filename": "/some/path/to/GeometryCollection.geojson" }, - "aggregatepolygon1": { - "process_id": "aggregate_polygon", - "arguments": { - "data": { - "from_node": "filterbbox1" - }, - "polygons": { - "from_node": "readvector1" - }, - "reducer": { - "callback": { - "unary": { - "arguments": { - "data": { - "from_argument": "data" - } - }, - "process_id": "mean", - "result": true - } + "process_id": "read_vector", + "result": false + }, + "aggregatepolygon1": { + "process_id": "aggregate_polygon", + "arguments": { + "data": { + "from_node": "filterbbox1" + }, + "polygons": { + "from_node": "readvector1" + }, + "reducer": { + "callback": { + "unary": { + "arguments": { + "data": { + "from_argument": "data" + } + }, + "process_id": "mean", + "result": true } } - }, - "result": true - } + } + }, + "result": true } -} \ No newline at end of file +} diff --git a/tests/data/0.4.0/aggregate_zonal_polygon.json b/tests/data/0.4.0/aggregate_zonal_polygon.json index 035ddd646..419f0e9d4 100644 --- a/tests/data/0.4.0/aggregate_zonal_polygon.json +++ b/tests/data/0.4.0/aggregate_zonal_polygon.json @@ -1,84 +1,82 @@ { - "process_graph": { - "filterbbox1": { - "process_id": "filter_bbox", - "arguments": { - "data": { - "from_node": "loadcollection1" - }, - "extent": { - "west": 3, - "east": 6, - "north": 52, - "south": 50, - "crs": "EPSG:4326" - } + "filterbbox1": { + "process_id": "filter_bbox", + "arguments": { + "data": { + "from_node": "loadcollection1" }, - "result": false + "extent": { + "west": 3, + "east": 6, + "north": 52, + "south": 50, + "crs": "EPSG:4326" + } }, - "loadcollection1": { - "process_id": "load_collection", - "arguments": { - "id": "S2", - "spatial_extent": null, - "temporal_extent": null - }, - "result": false + "result": false + }, + "loadcollection1": { + "process_id": "load_collection", + "arguments": { + "id": "S2", + "spatial_extent": null, + "temporal_extent": null }, - "aggregatepolygon1": { - "process_id": "aggregate_polygon", - "arguments": { - "data": { - "from_node": "filterbbox1" - }, - "polygons": { - "type": "Polygon", - "coordinates": [ + "result": false + }, + "aggregatepolygon1": { + "process_id": "aggregate_polygon", + "arguments": { + "data": { + "from_node": "filterbbox1" + }, + "polygons": { + "type": "Polygon", + "coordinates": [ + [ + [ + 16.138916, + 48.320647 + ], + [ + 16.524124, + 48.320647 + ], + [ + 16.524124, + 48.1386 + ], [ - [ - 16.138916, - 48.320647 - ], - [ - 16.524124, - 48.320647 - ], - [ - 16.524124, - 48.1386 - ], - [ - 16.138916, - 48.1386 - ], - [ - 16.138916, - 48.320647 - ] + 16.138916, + 48.1386 + ], + [ + 16.138916, + 48.320647 ] - ], - "crs": { - "type": "name", - "properties": { - "name": "EPSG:4326" - } - } - }, - "reducer": { - "callback": { - "unary": { - "arguments": { - "data": { - "from_argument": "data" - } - }, - "process_id": "mean", - "result": true - } + ] + ], + "crs": { + "type": "name", + "properties": { + "name": "EPSG:4326" } } }, - "result": true - } + "reducer": { + "callback": { + "unary": { + "arguments": { + "data": { + "from_argument": "data" + } + }, + "process_id": "mean", + "result": true + } + } + } + }, + "result": true } } \ No newline at end of file diff --git a/tests/data/1.0.0/aggregate_zonal_path.json b/tests/data/1.0.0/aggregate_zonal_path.json index 6cfaf50bc..998a8bd10 100644 --- a/tests/data/1.0.0/aggregate_zonal_path.json +++ b/tests/data/1.0.0/aggregate_zonal_path.json @@ -1,58 +1,56 @@ { - "process_graph": { - "loadcollection1": { - "process_id": "load_collection", - "arguments": { - "id": "S2", - "spatial_extent": null, - "temporal_extent": null + "loadcollection1": { + "process_id": "load_collection", + "arguments": { + "id": "S2", + "spatial_extent": null, + "temporal_extent": null + } + }, + "filterbbox1": { + "process_id": "filter_bbox", + "arguments": { + "data": { + "from_node": "loadcollection1" + }, + "extent": { + "west": 3, + "east": 6, + "north": 52, + "south": 50, + "crs": "EPSG:4326" } - }, - "filterbbox1": { - "process_id": "filter_bbox", - "arguments": { - "data": { - "from_node": "loadcollection1" - }, - "extent": { - "west": 3, - "east": 6, - "north": 52, - "south": 50, - "crs": "EPSG:4326" + } + }, + "readvector1": { + "process_id": "read_vector", + "arguments": { + "filename": "/some/path/to/GeometryCollection.geojson" + } + }, + "aggregatespatial1": { + "process_id": "aggregate_spatial", + "arguments": { + "data": { + "from_node": "filterbbox1" + }, + "geometries": { + "from_node": "readvector1" + }, + "reducer": { + "process_graph": { + "mean1": { + "process_id": "mean", + "arguments": { + "data": { + "from_parameter": "data" + } + }, + "result": true + } } } }, - "readvector1": { - "process_id": "read_vector", - "arguments": { - "filename": "/some/path/to/GeometryCollection.geojson" - } - }, - "aggregatespatial1": { - "process_id": "aggregate_spatial", - "arguments": { - "data": { - "from_node": "filterbbox1" - }, - "geometries": { - "from_node": "readvector1" - }, - "reducer": { - "process_graph": { - "mean1": { - "process_id": "mean", - "arguments": { - "data": { - "from_parameter": "data" - } - }, - "result": true - } - } - } - }, - "result": true - } + "result": true } -} \ No newline at end of file +} diff --git a/tests/data/1.0.0/aggregate_zonal_polygon.json b/tests/data/1.0.0/aggregate_zonal_polygon.json index 966ab35b4..0ba285a4b 100644 --- a/tests/data/1.0.0/aggregate_zonal_polygon.json +++ b/tests/data/1.0.0/aggregate_zonal_polygon.json @@ -1,82 +1,80 @@ { - "process_graph": { - "loadcollection1": { - "process_id": "load_collection", - "arguments": { - "id": "S2", - "spatial_extent": null, - "temporal_extent": null - } - }, - "filterbbox1": { - "process_id": "filter_bbox", - "arguments": { - "data": { - "from_node": "loadcollection1" - }, - "extent": { - "west": 3, - "east": 6, - "north": 52, - "south": 50, - "crs": "EPSG:4326" - } + "loadcollection1": { + "process_id": "load_collection", + "arguments": { + "id": "S2", + "spatial_extent": null, + "temporal_extent": null + } + }, + "filterbbox1": { + "process_id": "filter_bbox", + "arguments": { + "data": { + "from_node": "loadcollection1" + }, + "extent": { + "west": 3, + "east": 6, + "north": 52, + "south": 50, + "crs": "EPSG:4326" } - }, - "aggregatespatial1": { - "process_id": "aggregate_spatial", - "arguments": { - "data": { - "from_node": "filterbbox1" - }, - "geometries": { - "type": "Polygon", - "coordinates": [ + } + }, + "aggregatespatial1": { + "process_id": "aggregate_spatial", + "arguments": { + "data": { + "from_node": "filterbbox1" + }, + "geometries": { + "type": "Polygon", + "coordinates": [ + [ + [ + 16.138916, + 48.320647 + ], [ - [ - 16.138916, - 48.320647 - ], - [ - 16.524124, - 48.320647 - ], - [ - 16.524124, - 48.1386 - ], - [ - 16.138916, - 48.1386 - ], - [ - 16.138916, - 48.320647 - ] + 16.524124, + 48.320647 + ], + [ + 16.524124, + 48.1386 + ], + [ + 16.138916, + 48.1386 + ], + [ + 16.138916, + 48.320647 ] - ], - "crs": { - "type": "name", - "properties": { - "name": "EPSG:4326" - } - } - }, - "reducer": { - "process_graph": { - "mean1": { - "process_id": "mean", - "arguments": { - "data": { - "from_parameter": "data" - } - }, - "result": true - } + ] + ], + "crs": { + "type": "name", + "properties": { + "name": "EPSG:4326" } } }, - "result": true - } + "reducer": { + "process_graph": { + "mean1": { + "process_id": "mean", + "arguments": { + "data": { + "from_parameter": "data" + } + }, + "result": true + } + } + } + }, + "result": true } } \ No newline at end of file diff --git a/tests/rest/datacube/test_datacube100.py b/tests/rest/datacube/test_datacube100.py index db01b3b49..c861a756a 100644 --- a/tests/rest/datacube/test_datacube100.py +++ b/tests/rest/datacube/test_datacube100.py @@ -101,12 +101,14 @@ def check_request(request): assert request.json() == { 'custom_param': 45, 'description': 'Nice!', - 'process_graph': { - 'loadcollection1': { - 'arguments': {'id': 'S2', 'spatial_extent': None, 'temporal_extent': None}, - 'process_id': 'load_collection', - 'result': True, - } + 'process': { + 'process_graph': { + 'loadcollection1': { + 'arguments': {'id': 'S2', 'spatial_extent': None, 'temporal_extent': None}, + 'process_id': 'load_collection', + 'result': True, + } + }, }, 'title': 'S2 Foo', 'type': 'WMTS', diff --git a/tests/rest/test_connection.py b/tests/rest/test_connection.py index 4450764e0..66ed9ed4d 100644 --- a/tests/rest/test_connection.py +++ b/tests/rest/test_connection.py @@ -222,3 +222,23 @@ def test_default_timeout(requests_mock): conn = connect(API_URL, default_timeout=2) assert conn.get("/foo").json() == '2' assert conn.get("/foo", timeout=5).json() == '5' + + +def test_execute_042(requests_mock): + requests_mock.get(API_URL, json={"api_version": "0.4.2"}) + conn = Connection(API_URL) + with mock.patch.object(conn, "request") as request: + conn.execute({"foo1": {"process_id": "foo"}}) + assert request.call_args_list == [ + mock.call("post", path="/result", json={"process_graph": {"foo1": {"process_id": "foo"}}}) + ] + + +def test_execute_100(requests_mock): + requests_mock.get(API_URL, json={"api_version": "1.0.0"}) + conn = Connection(API_URL) + with mock.patch.object(conn, "request") as request: + conn.execute({"foo1": {"process_id": "foo"}}) + assert request.call_args_list == [ + mock.call("post", path="/result", json={"process": {"process_graph": {"foo1": {"process_id": "foo"}}}}) + ] diff --git a/tests/rest/test_job.py b/tests/rest/test_job.py index 832c52261..7d02ea28d 100644 --- a/tests/rest/test_job.py +++ b/tests/rest/test_job.py @@ -14,6 +14,13 @@ def session040(requests_mock): return session +@pytest.fixture +def con100(requests_mock): + requests_mock.get(API_URL + "/", json={"api_version": "1.0.0"}) + con = openeo.connect(API_URL) + return con + + def test_execute_batch(session040, requests_mock, tmpdir): requests_mock.get(API_URL + "/collections/SENTINEL2", json={"foo": "bar"}) requests_mock.post(API_URL + "/jobs", headers={"OpenEO-Identifier": "f00ba5"}) @@ -81,8 +88,8 @@ def print(msg): try: session040.load_collection("SENTINEL2").execute_batch( - outputfile=str(path), out_format="GTIFF", - max_poll_interval=.1, print=print + outputfile=str(path), out_format="GTIFF", + max_poll_interval=.1, print=print ) assert False @@ -110,3 +117,16 @@ def test_get_job_logs(session040, requests_mock): log_entries = session040.job('f00ba5').logs(offset="123abc") assert log_entries[0].message == "error processing batch job" + + +def test_create_job_100(con100, requests_mock): + def check_request(request): + assert request.json() == { + "process": {"process_graph": {"foo1": {"process_id": "foo"}}}, + "title": "Foo", 'description': None, + 'budget': None, 'plan': None, + } + return True + + requests_mock.post(API_URL + "/jobs", headers={"OpenEO-Identifier": "f00ba5"}, additional_matcher=check_request) + con100.create_job({"foo1": {"process_id": "foo"}}, title="Foo") From d829d7b5be748f40d30420f369d9102c7a90ef73 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Thu, 2 Apr 2020 12:13:52 +0200 Subject: [PATCH 2/2] Job related code cleanup and finetuning --- openeo/rest/connection.py | 39 ++++++--------------------------------- openeo/rest/job.py | 14 +++++--------- 2 files changed, 11 insertions(+), 42 deletions(-) diff --git a/openeo/rest/connection.py b/openeo/rest/connection.py index 06d23af7f..5b80394fd 100644 --- a/openeo/rest/connection.py +++ b/openeo/rest/connection.py @@ -242,6 +242,7 @@ def user_jobs(self) -> dict: :return: jobs: Dict All jobs of the user """ + # TODO duplication with `list_jobs()` method return self.get('/jobs').json()["jobs"] def list_collections(self) -> List[dict]: @@ -301,7 +302,7 @@ def list_services(self) -> dict: :return: data_dict: Dict All available service types """ - #TODO return service objects + #TODO return parsed service objects return self.get('/services').json() def describe_collection(self, name) -> dict: @@ -327,12 +328,13 @@ def list_processes(self) -> dict: return self.get('/processes').json()["processes"] def list_jobs(self) -> dict: - # TODO: Maybe format the result so that there get Job classes returned. """ Lists all jobs of the authenticated user. :return: job_list: Dict of all jobs of the user. """ + # TODO: Maybe format the result so that there get Job classes returned. + # TODO: duplication with `user_jobs()` method return self.get('/jobs').json()["jobs"] def validate_processgraph(self, process_graph): @@ -384,12 +386,10 @@ def remove_service(self, service_id: str): response = self.delete('/services/' + service_id) def job_results(self, job_id): - response = self.get("/jobs/{}/results".format(job_id)) - return self.parse_json_response(response) + return self.get("/jobs/{}/results".format(job_id)).json() def job_logs(self, job_id, offset): - response = self.get("/jobs/{}/logs".format(job_id), params={'offset': offset}) - return self.parse_json_response(response) + return self.get("/jobs/{}/logs".format(job_id), params={'offset': offset}).json() def list_files(self): """ @@ -489,33 +489,6 @@ def job(self,job_id:str): """ return RESTJob(job_id, self) - def parse_json_response(self, response: requests.Response): - """ - Parses json response, if an error occurs it raises an Exception. - - :param response: Response of a RESTful request - :return: response: JSON Response - """ - # TODO Deprecated: status handling is now in RestApiConnection - if response.status_code == 200 or response.status_code == 201: - return response.json() - else: - self._handle_error_response(response) - - def _handle_error_response(self, response): - # TODO replace this with `_raise_api_error` - if response.status_code == 502: - from requests.exceptions import ProxyError - raise ProxyError("The proxy returned an error, this could be due to a timeout.") - else: - message = None - if response.headers['Content-Type'] == 'application/json': - message = response.json().get('message', None) - if message: - message = response.text - - raise ConnectionAbortedError(message) - def get_outputformats(self) -> dict: """ Loads all available output formats. diff --git a/openeo/rest/job.py b/openeo/rest/job.py index 0e0d7b7e4..9ffe5f4fe 100644 --- a/openeo/rest/job.py +++ b/openeo/rest/job.py @@ -35,29 +35,25 @@ def __init__(self, job_id: str, connection: 'Connection'): def describe_job(self): """ Get all job information.""" # GET /jobs/{job_id} - request = self.connection.get("/jobs/{}".format(self.job_id)) - return self.connection.parse_json_response(request) + return self.connection.get("/jobs/{}".format(self.job_id)).json() def update_job(self, process_graph=None, output_format=None, output_parameters=None, title=None, description=None, plan=None, budget=None, additional=None): """ Update a job.""" # PATCH /jobs/{job_id} - pass + raise NotImplementedError def delete_job(self): """ Delete a job.""" # DELETE /jobs/{job_id} request = self.connection.delete("/jobs/{}".format(self.job_id)) - - return request.status_code + assert request.status_code == 204 def estimate_job(self): """ Calculate an time/cost estimate for a job.""" # GET /jobs/{job_id}/estimate - request = self.connection.get("/jobs/{}/estimate".format(self.job_id)) - - return self.connection.parse_json_response(request) + return self.connection.get("/jobs/{}/estimate".format(self.job_id)).json() def start_job(self): """ Start / queue a job for processing.""" @@ -77,7 +73,7 @@ def stop_job(self): def list_results(self, type=None): """ Get document with download links.""" # GET /jobs/{job_id}/results - pass + raise NotImplementedError def download_results(self, target: Union[str, pathlib.Path]) -> pathlib.Path: """ Download job results."""