Skip to content

Commit

Permalink
Issue Open-EO#72: remove pre-0.4 code branches from ImageCollectionCl…
Browse files Browse the repository at this point in the history
…ient
  • Loading branch information
soxofaan committed Oct 17, 2019
1 parent 9fad48c commit 43658c3
Showing 1 changed file with 80 additions and 123 deletions.
203 changes: 80 additions & 123 deletions openeo/rest/imagecollectionclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -482,41 +482,35 @@ def apply_dimension(self, code: str, runtime=None, version="latest",dimension='t
:return:
:raises: CardinalityChangedError
"""

if self._api_version.at_least('0.4.0'):
process_id = 'apply_dimension'
if runtime:

callback = {
'udf': self._create_run_udf(code, runtime, version)
}
else:
callback = {
'process': {
"arguments": {
"data": {
"from_argument": "data"
}
},
"process_id": code,
"result": True
}
}
args = {
'data': {
'from_node': self.node_id
},
'dimension': dimension,

process_id = 'apply_dimension'
if runtime:
callback = {
'udf': self._create_run_udf(code, runtime, version)
}
else:
callback = {
'process': {
'callback': callback
"arguments": {
"data": {
"from_argument": "data"
}
},
"process_id": code,
"result": True
}
}
return self.graph_add_process(process_id, args)
else:
raise NotImplementedError("apply_dimension requires backend version >=0.4.0")
args = {
'data': {
'from_node': self.node_id
},
'dimension': dimension,
'process': {
'callback': callback
}
}
return self.graph_add_process(process_id, args)

def apply_tiles(self, code: str,runtime="Python",version="latest") -> 'ImageCollection':
def apply_tiles(self, code: str, runtime="Python", version="latest") -> 'ImageCollection':
"""Apply a function to the given set of tiles in this image collection.
This type applies a simple function to one pixel of the input image or image collection.
Expand All @@ -526,34 +520,23 @@ def apply_tiles(self, code: str,runtime="Python",version="latest") -> 'ImageColl
Code should follow the OpenEO UDF conventions.
TODO: Deprecated since 0.4.0?
:param code: String representing Python code to be executed in the backend.
"""

if self._api_version.at_least('0.4.0'):
process_id = 'reduce'
args = {
'data': {
'from_node': self.node_id
},
'dimension': 'spectral_bands',#TODO determine dimension based on datacube metadata
'binary': False,
'reducer': {
'callback': {
'udf': self._create_run_udf(code, runtime, version)
}
process_id = 'reduce'
args = {
'data': {
'from_node': self.node_id
},
'dimension': 'spectral_bands', # TODO determine dimension based on datacube metadata
'binary': False,
'reducer': {
'callback': {
'udf': self._create_run_udf(code, runtime, version)
}
}
else:

process_id = 'apply_tiles'
args = {
'data': {'from_node': self.node_id},
'code':{
'language':'python',
'source':code
}
}

}
return self.graph_add_process(process_id, args)

def _create_run_udf(self, code, runtime, version):
Expand Down Expand Up @@ -582,23 +565,20 @@ def reduce_tiles_over_time(self,code: str,runtime="Python",version="latest"):
:param version: The UDF runtime version
:return:
"""
if self._api_version.at_least('0.4.0'):
process_id = 'reduce'
args = {
'data': {
'from_node': self.node_id
},
'dimension': 'temporal',#TODO determine dimension based on datacube metadata
'binary': False,
'reducer': {
'callback': {
'udf': self._create_run_udf(code, runtime, version)
}
process_id = 'reduce'
args = {
'data': {
'from_node': self.node_id
},
'dimension': 'temporal', # TODO determine dimension based on datacube metadata
'binary': False,
'reducer': {
'callback': {
'udf': self._create_run_udf(code, runtime, version)
}
}
return self.graph_add_process(process_id, args)
else:
raise NotImplementedError("apply_to_tiles_over_time requires backend version >=0.4.0")
}
return self.graph_add_process(process_id, args)

def apply(self, process: str, data_argument='data',arguments={}) -> 'ImageCollection':
process_id = 'apply'
Expand Down Expand Up @@ -845,18 +825,11 @@ def polygonal_standarddeviation_timeseries(self, polygon: Union[Polygon, MultiPo

def _polygonal_timeseries(self, polygon: Union[Polygon, MultiPolygon, str], func: str) -> 'ImageCollection':
def graph_add_aggregate_process(graph) -> 'ImageCollection':
process_id = 'aggregate_zonal'
if self._api_version.at_least('0.4.0'):
process_id = 'aggregate_polygon'

process_id = 'aggregate_polygon'
args = {
'data': {'from_node': self.node_id},
'dimension': 'temporal',
'polygons': polygons
}
if self._api_version.at_least('0.4.0'):
del args['dimension']
args['reducer'] = {
'polygons': polygons,
'reducer': {
'callback': {
"unary": {
"arguments": {
Expand All @@ -869,22 +842,17 @@ def graph_add_aggregate_process(graph) -> 'ImageCollection':
}
}
}

}
return graph.graph_add_process(process_id, args)

if isinstance(polygon, str):
if self._api_version.at_least('0.4.0'):
with_read_vector = self.graph_add_process('read_vector', args={
'filename': polygon
})

polygons = {
'from_node': with_read_vector.node_id
}

return graph_add_aggregate_process(with_read_vector)
else:
raise NotImplementedError("filename requires backend version >=0.4.0")
with_read_vector = self.graph_add_process('read_vector', args={
'filename': polygon
})
polygons = {
'from_node': with_read_vector.node_id
}
return graph_add_aggregate_process(with_read_vector)
else:
polygons = mapping(polygon)
polygons['crs'] = {
Expand All @@ -893,7 +861,6 @@ def graph_add_aggregate_process(graph) -> 'ImageCollection':
'name': 'EPSG:4326'
}
}

return graph_add_aggregate_process(self)

def save_result(self, format: str, options: dict = None):
Expand All @@ -908,22 +875,18 @@ def save_result(self, format: str, options: dict = None):

def download(self, outputfile: str, **format_options) -> str:
"""Extracts a geotiff from this image collection."""

if self._api_version.at_least('0.4.0'):
args = {
'data': {'from_node': self.node_id},
'options': format_options
}
if 'format' in format_options:
args['format'] = format_options.pop('format')
else:
raise ValueError("Please use the 'format' keyword argument to specify the output format. Use openeo.connection.Connection#list_file_types to retrieve available ouput formats for this backend.")
newcollection = self.graph_add_process("save_result",args)
newcollection.graph[newcollection.node_id]["result"] = True
return self.session.download(newcollection.graph, outputfile, format_options)
args = {
'data': {'from_node': self.node_id},
'options': format_options
}
if 'format' in format_options:
args['format'] = format_options.pop('format')
else:
self.graph[self.node_id]["result"] = True
return self.session.download(self.graph, outputfile, format_options)
raise ValueError(
"Please use the 'format' keyword argument to specify the output format. Use openeo.connection.Connection#list_file_types to retrieve available ouput formats for this backend.")
newcollection = self.graph_add_process("save_result", args)
newcollection.graph[newcollection.node_id]["result"] = True
return self.session.download(newcollection.graph, outputfile, format_options)

def tiled_viewing_service(self,**kwargs) -> Dict:
newbuilder = self.builder.copy()
Expand All @@ -938,20 +901,14 @@ def send_job(self, out_format=None, **format_options) -> Job:
:return: status: ClientJob resulting job.
"""
if out_format:
graph = self.graph
if self._api_version.at_least('0.4.0'):
args = {
'data': {'from_node': self.node_id},
'options': format_options,
'format': out_format
}
newcollection = self.graph_add_process("save_result", args)
newcollection.graph[newcollection.node_id]["result"] = True
return self.session.create_job(process_graph=newcollection.graph)
else:
return self.session.create_job(process_graph=graph, output_format=out_format,
output_parameters=format_options)

args = {
'data': {'from_node': self.node_id},
'options': format_options,
'format': out_format
}
newcollection = self.graph_add_process("save_result", args)
newcollection.graph[newcollection.node_id]["result"] = True
return self.session.create_job(process_graph=newcollection.graph)
else:
return self.session.create_job(process_graph=self.graph)

Expand Down

0 comments on commit 43658c3

Please sign in to comment.