Skip to content

Commit

Permalink
Merge pull request #131 from soxofaan/issue129-pg-submit
Browse files Browse the repository at this point in the history
improve 1.0-style process graph submit requests
  • Loading branch information
soxofaan authored Apr 2, 2020
2 parents 822cf42 + d829d7b commit cb717a0
Show file tree
Hide file tree
Showing 11 changed files with 338 additions and 334 deletions.
102 changes: 38 additions & 64 deletions openeo/rest/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ def user_jobs(self) -> dict:
:return: jobs: Dict All jobs of the user
"""
# TODO duplication with `list_jobs()` method
return self.get('/jobs').json()["jobs"]

def list_collections(self) -> List[dict]:
Expand Down Expand Up @@ -301,7 +302,7 @@ def list_services(self) -> dict:
:return: data_dict: Dict All available service types
"""
#TODO return service objects
#TODO return parsed service objects
return self.get('/services').json()

def describe_collection(self, name) -> dict:

This comment has been minimized.

Copy link
@m-mohr

m-mohr Apr 2, 2020

Member

@soxofaan Regarding the comment below "Maybe create some kind of Data class": You could either import or get inspired from PySTAC (see https://github.com/azavea/pystac/blob/develop/pystac/collection.py for example). Similarly, for the STAC Items returned for Batch Job results, you could have a look at https://github.com/azavea/pystac/blob/develop/pystac/item.py

This comment has been minimized.

Copy link
@soxofaan

soxofaan Apr 2, 2020

Author Member

Thanks for the tip, looks indeed interesting to use such a library

This comment has been minimized.

Copy link
@soxofaan

soxofaan Apr 2, 2020

Author Member

-> #133

Expand All @@ -327,12 +328,13 @@ def list_processes(self) -> dict:
return self.get('/processes').json()["processes"]

def list_jobs(self) -> dict:
# TODO: Maybe format the result so that there get Job classes returned.
"""
Lists all jobs of the authenticated user.
:return: job_list: Dict of all jobs of the user.
"""
# TODO: Maybe format the result so that there get Job classes returned.
# TODO: duplication with `user_jobs()` method
return self.get('/jobs').json()["jobs"]

def validate_processgraph(self, process_graph):
Expand Down Expand Up @@ -365,12 +367,10 @@ def load_collection(self, collection_id: str, **kwargs) -> ImageCollectionClient
# Legacy alias.
imagecollection = load_collection


def create_service(self, graph, type, **kwargs):
def create_service(self, graph: dict, type: str, **kwargs):
# TODO: type hint for graph: is it a nested or a flat one?
kwargs["process_graph"] = graph
kwargs["type"] = type
response = self.post("/services", json=kwargs, expected_status=201)
req = self._build_request_with_process_graph(process_graph=graph, type=type, **kwargs)
response = self.post(path="/services", json=req, expected_status=201)
return {
'url': response.headers.get('Location'),
'service_id': response.headers.get("OpenEO-Identifier"),
Expand All @@ -386,12 +386,10 @@ def remove_service(self, service_id: str):
response = self.delete('/services/' + service_id)

def job_results(self, job_id):
response = self.get("/jobs/{}/results".format(job_id))
return self.parse_json_response(response)
return self.get("/jobs/{}/results".format(job_id)).json()

def job_logs(self, job_id, offset):
response = self.get("/jobs/{}/logs".format(job_id), params={'offset': offset})
return self.parse_json_response(response)
return self.get("/jobs/{}/logs".format(job_id), params={'offset': offset}).json()

def list_files(self):
"""
Expand All @@ -411,43 +409,48 @@ def create_file(self, path):
# No endpoint just returns a file object.
raise NotImplementedError()

def _build_request_with_process_graph(self, process_graph: dict, **kwargs) -> dict:
"""
Prepare a json payload with a process graph to submit to /result, /services, /jobs, ...
:param process_graph: flat dict representing a process graph
"""
result = kwargs
if self._api_version.at_least("1.0.0"):
result["process"] = {"process_graph": process_graph}
else:
result["process_graph"] = process_graph
return result

# TODO: Maybe rename to execute and merge with execute().
def download(self, graph, outputfile):
def download(self, graph: dict, outputfile):
"""
Downloads the result of a process graph synchronously, and save the result to the given file.
This method is useful to export binary content such as images. For json content, the execute method is recommended.
:param graph: Dict representing a process graph
:param graph: (flat) dict representing a process graph
:param outputfile: output file
:param format_options: formating options
:return: job_id: String
"""
request = {"process_graph": graph}
download_url = self.build_url("/result")
r = self.post(download_url, json=request, stream=True, timeout=1000)
request = self._build_request_with_process_graph(process_graph=graph)
r = self.post(path="/result", json=request, stream=True, timeout=1000)
with pathlib.Path(outputfile).open(mode="wb") as f:
shutil.copyfileobj(r.raw, f)

def execute(self, process_graph, output_format, output_parameters=None, budget=None):
def execute(self, process_graph: dict):
"""
Execute a process graph synchronously.
:param process_graph: Dict representing a process graph
:param output_format: String Output format of the execution
:param output_parameters: Dict of additional output parameters
:param budget: Budget
:return: job_id: String
:param process_graph: (flat) dict representing a process graph
"""
# TODO: add output_format to execution
return self.post(path="/result", json=process_graph).json()
req = self._build_request_with_process_graph(process_graph=process_graph)
return self.post(path="/result", json=req).json()

def create_job(self, process_graph: Dict, title: str = None, description: str = None,
def create_job(self, process_graph: dict, title: str = None, description: str = None,
plan: str = None, budget=None,
additional: Dict = None) -> RESTJob:
"""
Posts a job to the back end.
:param process_graph: String data of the job (e.g. process graph)
:param process_graph: (flat) dict representing process graph
:param title: String title of the job
:param description: String description of the job
:param plan: billing plan
Expand All @@ -456,17 +459,15 @@ def create_job(self, process_graph: Dict, title: str = None, description: str =
:return: job_id: String Job id of the new created job
"""
# TODO move all this (RESTJob factory) logic to RESTJob?
process_graph = {
"process_graph": process_graph,
"title": title,
"description": description,
"plan": plan,
"budget": budget
}
req = self._build_request_with_process_graph(
process_graph=process_graph,
title=title, description=description, plan=plan, budget=budget
)
if additional:
process_graph["job_options"] = additional
# TODO: get rid of this non-standard field?
req["job_options"] = additional

response = self.post("/jobs", process_graph)
response = self.post("/jobs", json=req)

if "openeo-identifier" in response.headers:
job_id = response.headers['openeo-identifier']
Expand All @@ -488,33 +489,6 @@ def job(self,job_id:str):
"""
return RESTJob(job_id, self)

def parse_json_response(self, response: requests.Response):
"""
Parses json response, if an error occurs it raises an Exception.
:param response: Response of a RESTful request
:return: response: JSON Response
"""
# TODO Deprecated: status handling is now in RestApiConnection
if response.status_code == 200 or response.status_code == 201:
return response.json()
else:
self._handle_error_response(response)

def _handle_error_response(self, response):
# TODO replace this with `_raise_api_error`
if response.status_code == 502:
from requests.exceptions import ProxyError
raise ProxyError("The proxy returned an error, this could be due to a timeout.")
else:
message = None
if response.headers['Content-Type'] == 'application/json':
message = response.json().get('message', None)
if message:
message = response.text

raise ConnectionAbortedError(message)

def get_outputformats(self) -> dict:
"""
Loads all available output formats.
Expand Down
2 changes: 1 addition & 1 deletion openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -876,7 +876,7 @@ def send_job(self, out_format=None, job_options=None, **format_options) -> Job:

def execute(self) -> Dict:
"""Executes the process graph of the imagery. """
return self._connection.execute({"process_graph": self._pg.flatten()}, "")
return self._connection.execute(self._pg.flatten())

def to_graphviz(self):
"""
Expand Down
2 changes: 1 addition & 1 deletion openeo/rest/imagecollectionclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,7 @@ def execute(self) -> Dict:
"""Executes the process graph of the imagery. """
newbuilder = self.builder.shallow_copy()
newbuilder.processes[self.node_id]['result'] = True
return self.session.execute({"process_graph": newbuilder.processes},"")
return self.session.execute(newbuilder.processes)

####### HELPER methods #######

Expand Down
14 changes: 5 additions & 9 deletions openeo/rest/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,29 +35,25 @@ def __init__(self, job_id: str, connection: 'Connection'):
def describe_job(self):
""" Get all job information."""
# GET /jobs/{job_id}
request = self.connection.get("/jobs/{}".format(self.job_id))
return self.connection.parse_json_response(request)
return self.connection.get("/jobs/{}".format(self.job_id)).json()

def update_job(self, process_graph=None, output_format=None,
output_parameters=None, title=None, description=None,
plan=None, budget=None, additional=None):
""" Update a job."""
# PATCH /jobs/{job_id}
pass
raise NotImplementedError

def delete_job(self):
""" Delete a job."""
# DELETE /jobs/{job_id}
request = self.connection.delete("/jobs/{}".format(self.job_id))

return request.status_code
assert request.status_code == 204

def estimate_job(self):
""" Calculate an time/cost estimate for a job."""
# GET /jobs/{job_id}/estimate
request = self.connection.get("/jobs/{}/estimate".format(self.job_id))

return self.connection.parse_json_response(request)
return self.connection.get("/jobs/{}/estimate".format(self.job_id)).json()

def start_job(self):
""" Start / queue a job for processing."""
Expand All @@ -77,7 +73,7 @@ def stop_job(self):
def list_results(self, type=None):
""" Get document with download links."""
# GET /jobs/{job_id}/results
pass
raise NotImplementedError

def download_results(self, target: Union[str, pathlib.Path]) -> pathlib.Path:
""" Download job results."""
Expand Down
104 changes: 51 additions & 53 deletions tests/data/0.4.0/aggregate_zonal_path.json
Original file line number Diff line number Diff line change
@@ -1,61 +1,59 @@
{
"process_graph": {
"filterbbox1": {
"process_id": "filter_bbox",
"arguments": {
"data": {
"from_node": "loadcollection1"
},
"extent": {
"west": 3,
"east": 6,
"north": 52,
"south": 50,
"crs": "EPSG:4326"
}
"filterbbox1": {
"process_id": "filter_bbox",
"arguments": {
"data": {
"from_node": "loadcollection1"
},
"result": false
"extent": {
"west": 3,
"east": 6,
"north": 52,
"south": 50,
"crs": "EPSG:4326"
}
},
"loadcollection1": {
"process_id": "load_collection",
"arguments": {
"id": "S2",
"spatial_extent": null,
"temporal_extent": null
},
"result": false
"result": false
},
"loadcollection1": {
"process_id": "load_collection",
"arguments": {
"id": "S2",
"spatial_extent": null,
"temporal_extent": null
},
"readvector1": {
"arguments": {
"filename": "/some/path/to/GeometryCollection.geojson"
},
"process_id": "read_vector",
"result": false
"result": false
},
"readvector1": {
"arguments": {
"filename": "/some/path/to/GeometryCollection.geojson"
},
"aggregatepolygon1": {
"process_id": "aggregate_polygon",
"arguments": {
"data": {
"from_node": "filterbbox1"
},
"polygons": {
"from_node": "readvector1"
},
"reducer": {
"callback": {
"unary": {
"arguments": {
"data": {
"from_argument": "data"
}
},
"process_id": "mean",
"result": true
}
"process_id": "read_vector",
"result": false
},
"aggregatepolygon1": {
"process_id": "aggregate_polygon",
"arguments": {
"data": {
"from_node": "filterbbox1"
},
"polygons": {
"from_node": "readvector1"
},
"reducer": {
"callback": {
"unary": {
"arguments": {
"data": {
"from_argument": "data"
}
},
"process_id": "mean",
"result": true
}
}
},
"result": true
}
}
},
"result": true
}
}
}
Loading

0 comments on commit cb717a0

Please sign in to comment.