Skip to content

Commit

Permalink
Improve handling of overlap_resolver in merge_cubes (EP-3130 related)
Browse files Browse the repository at this point in the history
- 0.4-style ImageCollectionClient was not fully to spec
- 1.0-style DataCube: added handling of simple str overlap resolver
  • Loading branch information
soxofaan committed Mar 31, 2020
1 parent dfd1705 commit 822cf42
Show file tree
Hide file tree
Showing 11 changed files with 213 additions and 19 deletions.
2 changes: 1 addition & 1 deletion openeo/internal/graph_building.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ def to_process_graph_argument(value: Union['PGNode', str, dict]):
'reduce_dimension', 'aggregate_spatial', 'apply', 'merge_cubes', 'resample_cube_temporal'
"""
if isinstance(value, str):
# assume string with predefined reduce/appply process ("mean", "sum", ...)
# assume string with predefined reduce/apply process ("mean", "sum", ...)
return value
elif isinstance(value, PGNode):
return {"process_graph": value}
Expand Down
8 changes: 7 additions & 1 deletion openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -708,12 +708,18 @@ def mask_polygon(
)
)

def merge(self, other: 'DataCube', overlap_resolver: PGNode = None) -> 'DataCube':
def merge(self, other: 'DataCube', overlap_resolver: Union[str, PGNode] = None) -> 'DataCube':
arguments = {
'cube1': {'from_node': self._pg},
'cube2': {'from_node': other._pg},
}
if overlap_resolver:
if isinstance(overlap_resolver, str):
# Simple resolver (specified as process_id string)
overlap_resolver = PGNode(
process_id=overlap_resolver,
arguments={"data": [{"from_parameter": "x"}, {"from_parameter": "y"}]}
)
arguments["overlap_resolver"] = {"process_graph": overlap_resolver}
# TODO #125 context
# TODO: set metadata of reduced cube?
Expand Down
32 changes: 22 additions & 10 deletions openeo/rest/imagecollectionclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,12 +371,13 @@ def add(self, other:Union[ImageCollection,Union[int,float]]):

def _reduce_bands_binary(self, operator, other: 'ImageCollectionClient',arg_name='data'):
# first we create the callback
fallback_node = {'from_argument': 'data'}
my_builder = self._get_band_graph_builder()
other_builder = other._get_band_graph_builder()
merged = GraphBuilder.combine(operator=operator,
first=my_builder or fallback_node,
second=other_builder or fallback_node, arg_name=arg_name)
merged = GraphBuilder.combine(
operator=operator,
first=my_builder or {'from_argument': 'data'},
second=other_builder or {'from_argument': 'data'},
arg_name=arg_name)
# callback is ready, now we need to properly set up the reduce process that will invoke it
if my_builder is None and other_builder is None:
# there was no previous reduce step, perhaps this is a cube merge?
Expand Down Expand Up @@ -405,11 +406,12 @@ def _reduce_bands_binary(self, operator, other: 'ImageCollectionClient',arg_name
cube_list = list(merged.processes.values())[0]["arguments"][arg_name]
assert len(cube_list) == 2
# it is really not clear if this is the agreed way to go
cube_list[0]["from_argument"] = "cube1"
cube_list[1]["from_argument"] = "cube2"
cube_list[0]["from_argument"] = "x"
cube_list[1]["from_argument"] = "y"
the_node["arguments"]["overlap_resolver"] = {
'callback': merged.processes
}
the_node["arguments"]["binary"] = True
return ImageCollectionClient(node_id, cubes_merged, self.session, metadata=self.metadata)
else:
args = {
Expand Down Expand Up @@ -836,7 +838,7 @@ def mask(self, polygon: Union[Polygon, MultiPolygon,str]=None, srs="EPSG:4326",

return new_collection.graph_add_process(process_id, args)

def merge(self,other:'ImageCollection') -> 'ImageCollection':
def merge(self, other: 'ImageCollection', overlap_resolver: str = None) -> 'ImageCollection':
other_node = other.graph[other.node_id]
other_node['result'] = True
new_collection = self._graph_merge(other.graph)
Expand All @@ -847,10 +849,20 @@ def merge(self,other:'ImageCollection') -> 'ImageCollection':
cube2 = {
'from_node': mask_id
}
args={
'cube1':{'from_node':self.node_id},
'cube2':cube2
args = {
'cube1': {'from_node': self.node_id},
'cube2': cube2
}
if overlap_resolver:
# Assume simple math operation
# TODO support general overlap resolvers.
assert isinstance(overlap_resolver, str)
args["overlap_resolver"] = {"callback": {"r1": {
"process_id": overlap_resolver,
"arguments": {"data": [{"from_argument": "x"}, {"from_argument": "y"}]},
"result": True,
}}}
args["binary"] = True
return new_collection.graph_add_process('merge_cubes', args)


Expand Down
51 changes: 51 additions & 0 deletions tests/data/0.4.0/merge_cubes_max.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
{
"loadcollection1": {
"process_id": "load_collection",
"arguments": {
"id": "S2",
"spatial_extent": null,
"temporal_extent": null
},
"result": false
},
"loadcollection3": {
"process_id": "load_collection",
"arguments": {
"id": "MASK",
"spatial_extent": null,
"temporal_extent": null
},
"result": false
},
"mergecubes1": {
"process_id": "merge_cubes",
"arguments": {
"cube1": {
"from_node": "loadcollection1"
},
"cube2": {
"from_node": "loadcollection3"
},
"overlap_resolver": {
"callback": {
"r1": {
"process_id": "max",
"arguments": {
"data": [
{
"from_argument": "x"
},
{
"from_argument": "y"
}
]
},
"result": true
}
}
},
"binary": true
},
"result": false
}
}
32 changes: 32 additions & 0 deletions tests/data/0.4.0/merge_cubes_no_resolver.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"loadcollection1": {
"process_id": "load_collection",
"arguments": {
"id": "S2",
"spatial_extent": null,
"temporal_extent": null
},
"result": false
},
"loadcollection3": {
"process_id": "load_collection",
"arguments": {
"id": "MASK",
"spatial_extent": null,
"temporal_extent": null
},
"result": false
},
"mergecubes1": {
"process_id": "merge_cubes",
"arguments": {
"cube1": {
"from_node": "loadcollection1"
},
"cube2": {
"from_node": "loadcollection3"
}
},
"result": false
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -127,17 +127,18 @@
"arguments": {
"expressions": [
{
"from_argument": "cube2"
"from_argument": "x"
},
{
"from_argument": "cube2"
"from_argument": "y"
}
]
},
"result": true
}
}
}
},
"binary": true
},
"result": false
},
Expand Down
48 changes: 48 additions & 0 deletions tests/data/1.0.0/merge_cubes_max.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
{
"loadcollection1": {
"process_id": "load_collection",
"arguments": {
"id": "S2",
"spatial_extent": null,
"temporal_extent": null
}
},
"loadcollection2": {
"process_id": "load_collection",
"arguments": {
"id": "MASK",
"spatial_extent": null,
"temporal_extent": null
}
},
"mergecubes1": {
"process_id": "merge_cubes",
"arguments": {
"cube1": {
"from_node": "loadcollection1"
},
"cube2": {
"from_node": "loadcollection2"
},
"overlap_resolver": {
"process_graph": {
"max1": {
"process_id": "max",
"arguments": {
"data": [
{
"from_parameter": "x"
},
{
"from_parameter": "y"
}
]
},
"result": true
}
}
}
},
"result": true
}
}
File renamed without changes.
30 changes: 30 additions & 0 deletions tests/data/1.0.0/merge_cubes_no_resolver.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
{
"loadcollection1": {
"process_id": "load_collection",
"arguments": {
"id": "S2",
"spatial_extent": null,
"temporal_extent": null
}
},
"loadcollection2": {
"process_id": "load_collection",
"arguments": {
"id": "MASK",
"spatial_extent": null,
"temporal_extent": null
}
},
"mergecubes1": {
"process_id": "merge_cubes",
"arguments": {
"cube1": {
"from_node": "loadcollection1"
},
"cube2": {
"from_node": "loadcollection2"
}
},
"result": true
}
}
File renamed without changes.
22 changes: 18 additions & 4 deletions tests/rest/datacube/test_bandmath.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,18 +225,18 @@ def test_logical_and(connection, api_version):
assert actual == load_json_resource('data/%s/logical_and.json' % api_version)


def test_cube_merge_or(connection, api_version):
def test_merge_cubes_or(connection, api_version):
s2 = connection.load_collection("S2")
b1 = s2.band("B02") > 1
b2 = s2.band("B03") > 2
b1 = b1.linear_scale_range(0, 1, 0, 2)
b2 = b2.linear_scale_range(0, 1, 0, 2)
combined = b1 | b2
actual = get_download_graph(combined)
assert actual == load_json_resource('data/%s/cube_merge_or.json' % api_version)
assert actual == load_json_resource('data/%s/merge_cubes_or.json' % api_version)


def test_cube_merge_multiple(connection, api_version):
def test_merge_cubes_multiple(connection, api_version):
if api_version == "0.4.0":
pytest.skip("doesn't work in 0.4.0")
s2 = connection.load_collection("S2")
Expand All @@ -247,4 +247,18 @@ def test_cube_merge_multiple(connection, api_version):
assert sorted(n["process_id"] for n in actual.values()) == [
"linear_scale_range", "load_collection",
"merge_cubes", "merge_cubes", "reduce_dimension", "save_result"]
assert actual == load_json_resource('data/%s/cube_merge_multiple.json' % api_version)
assert actual == load_json_resource('data/%s/merge_cubes_multiple.json' % api_version)


def test_merge_cubes_no_resolver(connection, api_version):
s2 = connection.load_collection("S2")
mask = connection.load_collection("MASK")
merged = s2.merge(mask)
assert merged.graph == load_json_resource('data/%s/merge_cubes_no_resolver.json' % api_version)


def test_merge_cubes_max_resolver(connection, api_version):
s2 = connection.load_collection("S2")
mask = connection.load_collection("MASK")
merged = s2.merge(mask, overlap_resolver="max")
assert merged.graph == load_json_resource('data/%s/merge_cubes_max.json' % api_version)

0 comments on commit 822cf42

Please sign in to comment.