From 54c6d5d9d597e6a93f1dd1bb4a6932af0dd25f14 Mon Sep 17 00:00:00 2001 From: Stefaan Lippens Date: Thu, 17 Oct 2019 10:20:01 +0200 Subject: [PATCH] Issue #72: remove pre-0.4 code branches from ImageCollectionClient --- openeo/rest/imagecollectionclient.py | 203 +++++++++++---------------- 1 file changed, 80 insertions(+), 123 deletions(-) diff --git a/openeo/rest/imagecollectionclient.py b/openeo/rest/imagecollectionclient.py index 749b5cbc6..5d8716e37 100644 --- a/openeo/rest/imagecollectionclient.py +++ b/openeo/rest/imagecollectionclient.py @@ -482,41 +482,35 @@ def apply_dimension(self, code: str, runtime=None, version="latest",dimension='t :return: :raises: CardinalityChangedError """ - - if self._api_version.at_least('0.4.0'): - process_id = 'apply_dimension' - if runtime: - - callback = { - 'udf': self._create_run_udf(code, runtime, version) - } - else: - callback = { - 'process': { - "arguments": { - "data": { - "from_argument": "data" - } - }, - "process_id": code, - "result": True - } - } - args = { - 'data': { - 'from_node': self.node_id - }, - 'dimension': dimension, - + process_id = 'apply_dimension' + if runtime: + callback = { + 'udf': self._create_run_udf(code, runtime, version) + } + else: + callback = { 'process': { - 'callback': callback + "arguments": { + "data": { + "from_argument": "data" + } + }, + "process_id": code, + "result": True } } - return self.graph_add_process(process_id, args) - else: - raise NotImplementedError("apply_dimension requires backend version >=0.4.0") + args = { + 'data': { + 'from_node': self.node_id + }, + 'dimension': dimension, + 'process': { + 'callback': callback + } + } + return self.graph_add_process(process_id, args) - def apply_tiles(self, code: str,runtime="Python",version="latest") -> 'ImageCollection': + def apply_tiles(self, code: str, runtime="Python", version="latest") -> 'ImageCollection': """Apply a function to the given set of tiles in this image collection. This type applies a simple function to one pixel of the input image or image collection. @@ -526,34 +520,23 @@ def apply_tiles(self, code: str,runtime="Python",version="latest") -> 'ImageColl Code should follow the OpenEO UDF conventions. + TODO: Deprecated since 0.4.0? + :param code: String representing Python code to be executed in the backend. """ - - if self._api_version.at_least('0.4.0'): - process_id = 'reduce' - args = { - 'data': { - 'from_node': self.node_id - }, - 'dimension': 'spectral_bands',#TODO determine dimension based on datacube metadata - 'binary': False, - 'reducer': { - 'callback': { - 'udf': self._create_run_udf(code, runtime, version) - } + process_id = 'reduce' + args = { + 'data': { + 'from_node': self.node_id + }, + 'dimension': 'spectral_bands', # TODO determine dimension based on datacube metadata + 'binary': False, + 'reducer': { + 'callback': { + 'udf': self._create_run_udf(code, runtime, version) } } - else: - - process_id = 'apply_tiles' - args = { - 'data': {'from_node': self.node_id}, - 'code':{ - 'language':'python', - 'source':code - } - } - + } return self.graph_add_process(process_id, args) def _create_run_udf(self, code, runtime, version): @@ -582,23 +565,20 @@ def reduce_tiles_over_time(self,code: str,runtime="Python",version="latest"): :param version: The UDF runtime version :return: """ - if self._api_version.at_least('0.4.0'): - process_id = 'reduce' - args = { - 'data': { - 'from_node': self.node_id - }, - 'dimension': 'temporal',#TODO determine dimension based on datacube metadata - 'binary': False, - 'reducer': { - 'callback': { - 'udf': self._create_run_udf(code, runtime, version) - } + process_id = 'reduce' + args = { + 'data': { + 'from_node': self.node_id + }, + 'dimension': 'temporal', # TODO determine dimension based on datacube metadata + 'binary': False, + 'reducer': { + 'callback': { + 'udf': self._create_run_udf(code, runtime, version) } } - return self.graph_add_process(process_id, args) - else: - raise NotImplementedError("apply_to_tiles_over_time requires backend version >=0.4.0") + } + return self.graph_add_process(process_id, args) def apply(self, process: str, data_argument='data',arguments={}) -> 'ImageCollection': process_id = 'apply' @@ -847,18 +827,11 @@ def polygonal_standarddeviation_timeseries(self, polygon: Union[Polygon, MultiPo def _polygonal_timeseries(self, polygon: Union[Polygon, MultiPolygon, str], func: str) -> 'ImageCollection': def graph_add_aggregate_process(graph) -> 'ImageCollection': - process_id = 'aggregate_zonal' - if self._api_version.at_least('0.4.0'): - process_id = 'aggregate_polygon' - + process_id = 'aggregate_polygon' args = { 'data': {'from_node': self.node_id}, - 'dimension': 'temporal', - 'polygons': polygons - } - if self._api_version.at_least('0.4.0'): - del args['dimension'] - args['reducer'] = { + 'polygons': polygons, + 'reducer': { 'callback': { "unary": { "arguments": { @@ -871,22 +844,17 @@ def graph_add_aggregate_process(graph) -> 'ImageCollection': } } } - + } return graph.graph_add_process(process_id, args) if isinstance(polygon, str): - if self._api_version.at_least('0.4.0'): - with_read_vector = self.graph_add_process('read_vector', args={ - 'filename': polygon - }) - - polygons = { - 'from_node': with_read_vector.node_id - } - - return graph_add_aggregate_process(with_read_vector) - else: - raise NotImplementedError("filename requires backend version >=0.4.0") + with_read_vector = self.graph_add_process('read_vector', args={ + 'filename': polygon + }) + polygons = { + 'from_node': with_read_vector.node_id + } + return graph_add_aggregate_process(with_read_vector) else: polygons = mapping(polygon) polygons['crs'] = { @@ -895,7 +863,6 @@ def graph_add_aggregate_process(graph) -> 'ImageCollection': 'name': 'EPSG:4326' } } - return graph_add_aggregate_process(self) def save_result(self, format: str, options: dict = None): @@ -910,22 +877,18 @@ def save_result(self, format: str, options: dict = None): def download(self, outputfile: str, **format_options) -> str: """Extracts a geotiff from this image collection.""" - - if self._api_version.at_least('0.4.0'): - args = { - 'data': {'from_node': self.node_id}, - 'options': format_options - } - if 'format' in format_options: - args['format'] = format_options.pop('format') - else: - raise ValueError("Please use the 'format' keyword argument to specify the output format. Use openeo.connection.Connection#list_file_types to retrieve available ouput formats for this backend.") - newcollection = self.graph_add_process("save_result",args) - newcollection.graph[newcollection.node_id]["result"] = True - return self.session.download(newcollection.graph, outputfile, format_options) + args = { + 'data': {'from_node': self.node_id}, + 'options': format_options + } + if 'format' in format_options: + args['format'] = format_options.pop('format') else: - self.graph[self.node_id]["result"] = True - return self.session.download(self.graph, outputfile, format_options) + raise ValueError( + "Please use the 'format' keyword argument to specify the output format. Use openeo.connection.Connection#list_file_types to retrieve available ouput formats for this backend.") + newcollection = self.graph_add_process("save_result", args) + newcollection.graph[newcollection.node_id]["result"] = True + return self.session.download(newcollection.graph, outputfile, format_options) def tiled_viewing_service(self,**kwargs) -> Dict: newbuilder = self.builder.copy() @@ -940,20 +903,14 @@ def send_job(self, out_format=None, **format_options) -> Job: :return: status: ClientJob resulting job. """ if out_format: - graph = self.graph - if self._api_version.at_least('0.4.0'): - args = { - 'data': {'from_node': self.node_id}, - 'options': format_options, - 'format': out_format - } - newcollection = self.graph_add_process("save_result", args) - newcollection.graph[newcollection.node_id]["result"] = True - return self.session.create_job(process_graph=newcollection.graph) - else: - return self.session.create_job(process_graph=graph, output_format=out_format, - output_parameters=format_options) - + args = { + 'data': {'from_node': self.node_id}, + 'options': format_options, + 'format': out_format + } + newcollection = self.graph_add_process("save_result", args) + newcollection.graph[newcollection.node_id]["result"] = True + return self.session.create_job(process_graph=newcollection.graph) else: return self.session.create_job(process_graph=self.graph)