Skip to content

Commit

Permalink
Issue Open-EO#129/EP-3352/Open-EO#125: fix 1.0-style process_graph su…
Browse files Browse the repository at this point in the history
…bmit requests
  • Loading branch information
soxofaan committed Apr 2, 2020
1 parent 822cf42 commit e94a42c
Show file tree
Hide file tree
Showing 10 changed files with 327 additions and 292 deletions.
63 changes: 32 additions & 31 deletions openeo/rest/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -365,12 +365,10 @@ def load_collection(self, collection_id: str, **kwargs) -> ImageCollectionClient
# Legacy alias.
imagecollection = load_collection


def create_service(self, graph, type, **kwargs):
def create_service(self, graph: dict, type: str, **kwargs):
# TODO: type hint for graph: is it a nested or a flat one?
kwargs["process_graph"] = graph
kwargs["type"] = type
response = self.post("/services", json=kwargs, expected_status=201)
req = self._build_request_with_process_graph(process_graph=graph, type=type, **kwargs)
response = self.post(path="/services", json=req, expected_status=201)
return {
'url': response.headers.get('Location'),
'service_id': response.headers.get("OpenEO-Identifier"),
Expand Down Expand Up @@ -411,43 +409,48 @@ def create_file(self, path):
# No endpoint just returns a file object.
raise NotImplementedError()

def _build_request_with_process_graph(self, process_graph: dict, **kwargs) -> dict:
"""
Prepare a json payload with a process graph to submit to /result, /services, /jobs, ...
:param process_graph: flat dict representing a process graph
"""
result = kwargs
if self._api_version.at_least("1.0.0"):
result["process"] = {"process_graph": process_graph}
else:
result["process_graph"] = process_graph
return result

# TODO: Maybe rename to execute and merge with execute().
def download(self, graph, outputfile):
def download(self, graph: dict, outputfile):
"""
Downloads the result of a process graph synchronously, and save the result to the given file.
This method is useful to export binary content such as images. For json content, the execute method is recommended.
:param graph: Dict representing a process graph
:param graph: (flat) dict representing a process graph
:param outputfile: output file
:param format_options: formating options
:return: job_id: String
"""
request = {"process_graph": graph}
download_url = self.build_url("/result")
r = self.post(download_url, json=request, stream=True, timeout=1000)
request = self._build_request_with_process_graph(process_graph=graph)
r = self.post(path="/result", json=request, stream=True, timeout=1000)
with pathlib.Path(outputfile).open(mode="wb") as f:
shutil.copyfileobj(r.raw, f)

def execute(self, process_graph, output_format, output_parameters=None, budget=None):
def execute(self, process_graph: dict):
"""
Execute a process graph synchronously.
:param process_graph: Dict representing a process graph
:param output_format: String Output format of the execution
:param output_parameters: Dict of additional output parameters
:param budget: Budget
:return: job_id: String
:param process_graph: (flat) dict representing a process graph
"""
# TODO: add output_format to execution
return self.post(path="/result", json=process_graph).json()
req = self._build_request_with_process_graph(process_graph=process_graph)
return self.post(path="/result", json=req).json()

def create_job(self, process_graph: Dict, title: str = None, description: str = None,
def create_job(self, process_graph: dict, title: str = None, description: str = None,
plan: str = None, budget=None,
additional: Dict = None) -> RESTJob:
"""
Posts a job to the back end.
:param process_graph: String data of the job (e.g. process graph)
:param process_graph: (flat) dict representing process graph
:param title: String title of the job
:param description: String description of the job
:param plan: billing plan
Expand All @@ -456,17 +459,15 @@ def create_job(self, process_graph: Dict, title: str = None, description: str =
:return: job_id: String Job id of the new created job
"""
# TODO move all this (RESTJob factory) logic to RESTJob?
process_graph = {
"process_graph": process_graph,
"title": title,
"description": description,
"plan": plan,
"budget": budget
}
req = self._build_request_with_process_graph(
process_graph=process_graph,
title=title, description=description, plan=plan, budget=budget
)
if additional:
process_graph["job_options"] = additional
# TODO: get rid of this non-standard field?
req["job_options"] = additional

response = self.post("/jobs", process_graph)
response = self.post("/jobs", json=req)

if "openeo-identifier" in response.headers:
job_id = response.headers['openeo-identifier']
Expand Down
2 changes: 1 addition & 1 deletion openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ def send_job(self, out_format=None, job_options=None, **format_options) -> Job:

def execute(self) -> Dict:
"""Executes the process graph of the imagery. """
return self._connection.execute({"process_graph": self._pg.flatten()}, "")
return self._connection.execute(self._pg.flatten())

def to_graphviz(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion openeo/rest/imagecollectionclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ def execute(self) -> Dict:
"""Executes the process graph of the imagery. """
newbuilder = self.builder.shallow_copy()
newbuilder.processes[self.node_id]['result'] = True
return self.session.execute({"process_graph": newbuilder.processes},"")
return self.session.execute(newbuilder.processes)

####### HELPER methods #######

Expand Down
104 changes: 51 additions & 53 deletions tests/data/0.4.0/aggregate_zonal_path.json
Original file line number Diff line number Diff line change
@@ -1,61 +1,59 @@
{
"process_graph": {
"filterbbox1": {
"process_id": "filter_bbox",
"arguments": {
"data": {
"from_node": "loadcollection1"
},
"extent": {
"west": 3,
"east": 6,
"north": 52,
"south": 50,
"crs": "EPSG:4326"
}
"filterbbox1": {
"process_id": "filter_bbox",
"arguments": {
"data": {
"from_node": "loadcollection1"
},
"result": false
"extent": {
"west": 3,
"east": 6,
"north": 52,
"south": 50,
"crs": "EPSG:4326"
}
},
"loadcollection1": {
"process_id": "load_collection",
"arguments": {
"id": "S2",
"spatial_extent": null,
"temporal_extent": null
},
"result": false
"result": false
},
"loadcollection1": {
"process_id": "load_collection",
"arguments": {
"id": "S2",
"spatial_extent": null,
"temporal_extent": null
},
"readvector1": {
"arguments": {
"filename": "/some/path/to/GeometryCollection.geojson"
},
"process_id": "read_vector",
"result": false
"result": false
},
"readvector1": {
"arguments": {
"filename": "/some/path/to/GeometryCollection.geojson"
},
"aggregatepolygon1": {
"process_id": "aggregate_polygon",
"arguments": {
"data": {
"from_node": "filterbbox1"
},
"polygons": {
"from_node": "readvector1"
},
"reducer": {
"callback": {
"unary": {
"arguments": {
"data": {
"from_argument": "data"
}
},
"process_id": "mean",
"result": true
}
"process_id": "read_vector",
"result": false
},
"aggregatepolygon1": {
"process_id": "aggregate_polygon",
"arguments": {
"data": {
"from_node": "filterbbox1"
},
"polygons": {
"from_node": "readvector1"
},
"reducer": {
"callback": {
"unary": {
"arguments": {
"data": {
"from_argument": "data"
}
},
"process_id": "mean",
"result": true
}
}
},
"result": true
}
}
},
"result": true
}
}
}
144 changes: 71 additions & 73 deletions tests/data/0.4.0/aggregate_zonal_polygon.json
Original file line number Diff line number Diff line change
@@ -1,84 +1,82 @@
{
"process_graph": {
"filterbbox1": {
"process_id": "filter_bbox",
"arguments": {
"data": {
"from_node": "loadcollection1"
},
"extent": {
"west": 3,
"east": 6,
"north": 52,
"south": 50,
"crs": "EPSG:4326"
}
"filterbbox1": {
"process_id": "filter_bbox",
"arguments": {
"data": {
"from_node": "loadcollection1"
},
"result": false
"extent": {
"west": 3,
"east": 6,
"north": 52,
"south": 50,
"crs": "EPSG:4326"
}
},
"loadcollection1": {
"process_id": "load_collection",
"arguments": {
"id": "S2",
"spatial_extent": null,
"temporal_extent": null
},
"result": false
"result": false
},
"loadcollection1": {
"process_id": "load_collection",
"arguments": {
"id": "S2",
"spatial_extent": null,
"temporal_extent": null
},
"aggregatepolygon1": {
"process_id": "aggregate_polygon",
"arguments": {
"data": {
"from_node": "filterbbox1"
},
"polygons": {
"type": "Polygon",
"coordinates": [
"result": false
},
"aggregatepolygon1": {
"process_id": "aggregate_polygon",
"arguments": {
"data": {
"from_node": "filterbbox1"
},
"polygons": {
"type": "Polygon",
"coordinates": [
[
[
16.138916,
48.320647
],
[
16.524124,
48.320647
],
[
16.524124,
48.1386
],
[
[
16.138916,
48.320647
],
[
16.524124,
48.320647
],
[
16.524124,
48.1386
],
[
16.138916,
48.1386
],
[
16.138916,
48.320647
]
16.138916,
48.1386
],
[
16.138916,
48.320647
]
],
"crs": {
"type": "name",
"properties": {
"name": "EPSG:4326"
}
}
},
"reducer": {
"callback": {
"unary": {
"arguments": {
"data": {
"from_argument": "data"
}
},
"process_id": "mean",
"result": true
}
]
],
"crs": {
"type": "name",
"properties": {
"name": "EPSG:4326"
}
}
},
"result": true
}
"reducer": {
"callback": {
"unary": {
"arguments": {
"data": {
"from_argument": "data"
}
},
"process_id": "mean",
"result": true
}
}
}
},
"result": true
}
}
Loading

0 comments on commit e94a42c

Please sign in to comment.