Skip to content

Commit

Permalink
Issue #442 make collection metadata based normalization optional
Browse files Browse the repository at this point in the history
Allow alternative operation modes where less/no metadata is available
  • Loading branch information
soxofaan committed Sep 14, 2023
1 parent 4e70e58 commit 5c2201a
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 84 deletions.
3 changes: 3 additions & 0 deletions openeo/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,7 @@ def band_common_names(self) -> List[str]:
return self.band_dimension.common_names

def get_band_index(self, band: Union[int, str]) -> int:
# TODO: eliminate this shortcut for smaller API surface
return self.band_dimension.band_index(band)

def filter_bands(self, band_names: List[Union[int, str]]) -> CollectionMetadata:
Expand Down Expand Up @@ -452,6 +453,8 @@ def rename_dimension(self, source: str, target: str) -> CollectionMetadata:
def reduce_dimension(self, dimension_name: str) -> CollectionMetadata:
"""Create new metadata object by collapsing/reducing a dimension."""
# TODO: option to keep reduced dimension (with a single value)?
# TODO: rename argument to `name` for more internal consistency
# TODO: merge with drop_dimension (which does the same).
self.assert_valid_dimension(dimension_name)
loc = self.dimension_names().index(dimension_name)
dimensions = self._dimensions[:loc] + self._dimensions[loc + 1:]
Expand Down
20 changes: 0 additions & 20 deletions openeo/rest/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1183,15 +1183,6 @@ def load_result(
:return: a :py:class:`DataCube`
"""
# TODO: add check that back-end supports `load_result` process?
metadata = CollectionMetadata(
{},
dimensions=[
SpatialDimension(name="x", extent=[]),
SpatialDimension(name="y", extent=[]),
TemporalDimension(name="t", extent=[]),
BandDimension(name="bands", bands=[Band(name="unknown")]),
],
)
cube = self.datacube_from_process(
process_id="load_result",
id=id,
Expand All @@ -1201,7 +1192,6 @@ def load_result(
bands=bands,
),
)
cube.metadata = metadata
return cube

@openeo_process
Expand Down Expand Up @@ -1309,15 +1299,6 @@ def load_stac(
"""
# TODO #425 move this implementation to `DataCube` and just forward here (like with `load_collection`)
# TODO #425 detect actual metadata from URL
metadata = CollectionMetadata(
{},
dimensions=[
SpatialDimension(name="x", extent=[]),
SpatialDimension(name="y", extent=[]),
TemporalDimension(name="t", extent=[]),
BandDimension(name="bands", bands=[Band(name="unknown")]),
],
)
arguments = {"url": url}
# TODO #425 more normalization/validation of extent/band parameters
if spatial_extent:
Expand All @@ -1331,7 +1312,6 @@ def load_stac(
prop: build_child_callback(pred, parent_parameters=["value"]) for prop, pred in properties.items()
}
cube = self.datacube_from_process(process_id="load_stac", **arguments)
cube.metadata = metadata
return cube

def load_ml_model(self, id: Union[str, BatchJob]) -> MlModel:
Expand Down
134 changes: 75 additions & 59 deletions openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@
log = logging.getLogger(__name__)




class DataCube(_ProcessGraphAbstraction):
"""
Class representing a openEO (raster) data cube.
Expand All @@ -79,9 +77,9 @@ class DataCube(_ProcessGraphAbstraction):
# TODO: set this based on back-end or user preference?
_DEFAULT_RASTER_FORMAT = "GTiff"

def __init__(self, graph: PGNode, connection: Connection, metadata: CollectionMetadata = None):
def __init__(self, graph: PGNode, connection: Connection, metadata: Optional[CollectionMetadata] = None):
super().__init__(pgnode=graph, connection=connection)
self.metadata = CollectionMetadata.get_or_create(metadata)
self.metadata: Optional[CollectionMetadata] = metadata

def process(
self,
Expand Down Expand Up @@ -118,6 +116,15 @@ def process_with_node(self, pg: PGNode, metadata: Optional[CollectionMetadata] =
# TODO: deprecate `process_with_node``: little added value over just calling DataCube() directly
return DataCube(graph=pg, connection=self._connection, metadata=metadata or self.metadata)

def _do_metadata_normalization(self) -> bool:
"""Do metadata-based normalization/validation of dimension names, band names, ..."""
return isinstance(self.metadata, CollectionMetadata)

def _assert_valid_dimension_name(self, name: str) -> str:
if self._do_metadata_normalization():
self.metadata.assert_valid_dimension(name)
return name

@classmethod
@openeo_process
def load_collection(
Expand Down Expand Up @@ -157,17 +164,15 @@ def load_collection(
}
if isinstance(collection_id, Parameter):
fetch_metadata = False
metadata = connection.collection_metadata(collection_id) if fetch_metadata else None
metadata: Optional[CollectionMetadata] = (
connection.collection_metadata(collection_id) if fetch_metadata else None
)
if bands:
if isinstance(bands, str):
bands = [bands]
if metadata:
bands = [b if isinstance(b, str) else metadata.band_dimension.band_name(b) for b in bands]
metadata = metadata.filter_bands(bands)
else:
# Ensure minimal metadata with best effort band dimension guess (based on `bands` argument).
band_dimension = BandDimension("bands", bands=[Band(name=b) for b in bands])
metadata = CollectionMetadata({}, dimensions=[band_dimension])
arguments['bands'] = bands
if max_cloud_cover:
properties = properties or {}
Expand Down Expand Up @@ -216,17 +221,7 @@ def load_disk_collection(cls, connection: Connection, file_format: str, glob_pat
'options': options
}
)

metadata = CollectionMetadata(
{},
dimensions=[
SpatialDimension(name="x", extent=[]),
SpatialDimension(name="y", extent=[]),
TemporalDimension(name="t", extent=[]),
BandDimension(name="bands", bands=[Band(name="unknown")]),
],
)
return cls(graph=pg, connection=connection, metadata=metadata)
return cls(graph=pg, connection=connection)

@classmethod
def _get_temporal_extent(
Expand Down Expand Up @@ -437,13 +432,13 @@ def filter_bands(self, bands: Union[List[Union[str, int]], str]) -> DataCube:
"""
if isinstance(bands, str):
bands = [bands]
bands = [self.metadata.band_dimension.band_name(b) for b in bands]
if self._do_metadata_normalization():
bands = [self.metadata.band_dimension.band_name(b) for b in bands]
cube = self.process(
process_id="filter_bands",
arguments={"data": THIS, "bands": bands},
metadata=self.metadata.filter_bands(bands) if self.metadata else None,
)
if cube.metadata:
cube.metadata = cube.metadata.filter_bands(bands)
return cube

band_filter = legacy_alias(filter_bands, "band_filter", since="0.1.0")
Expand All @@ -455,14 +450,14 @@ def band(self, band: Union[str, int]) -> DataCube:
:param band: band name, band common name or band index.
:return: a DataCube instance
"""
band_index = self.metadata.get_band_index(band)
return self.reduce_bands(reducer=PGNode(
process_id='array_element',
arguments={
'data': {'from_parameter': 'data'},
'index': band_index
},
))
if self._do_metadata_normalization():
band = self.metadata.band_dimension.band_index(band)
return self.reduce_bands(
reducer=PGNode(
process_id="array_element",
arguments={"data": {"from_parameter": "data"}, "index": band},
)
)

@openeo_process
def resample_spatial(
Expand Down Expand Up @@ -1084,7 +1079,7 @@ def apply_dimension(
arguments = {
"data": THIS,
"process": process,
"dimension": self.metadata.assert_valid_dimension(dimension),
"dimension": self._assert_valid_dimension_name(dimension),
}
if target_dimension is not None:
arguments["target_dimension"] = target_dimension
Expand Down Expand Up @@ -1129,15 +1124,18 @@ def reduce_dimension(
process=reducer, parent_parameters=["data", "context"], connection=self.connection
)

return self.process_with_node(ReduceNode(
process_id=process_id,
data=self,
reducer=reducer,
dimension=self.metadata.assert_valid_dimension(dimension),
context=context,
# TODO #123 is it (still) necessary to make "band" math a special case?
band_math_mode=band_math_mode
), metadata=self.metadata.reduce_dimension(dimension_name=dimension))
return self.process_with_node(
ReduceNode(
process_id=process_id,
data=self,
reducer=reducer,
dimension=self._assert_valid_dimension_name(dimension),
context=context,
# TODO #123 is it (still) necessary to make "band" math a special case?
band_math_mode=band_math_mode,
),
metadata=self.metadata.reduce_dimension(dimension_name=dimension) if self.metadata else None,
)

# @openeo_process
def chunk_polygon(
Expand Down Expand Up @@ -1189,15 +1187,22 @@ def reduce_bands(self, reducer: Union[str, PGNode, typing.Callable, UDF]) -> Dat
:param reducer: "child callback" function, see :ref:`callbackfunctions`
"""
return self.reduce_dimension(dimension=self.metadata.band_dimension.name, reducer=reducer, band_math_mode=True)
return self.reduce_dimension(
dimension=self.metadata.band_dimension.name if self.metadata else "bands",
reducer=reducer,
band_math_mode=True,
)

def reduce_temporal(self, reducer: Union[str, PGNode, typing.Callable, UDF]) -> DataCube:
"""
Shortcut for :py:meth:`reduce_dimension` along the temporal dimension
:param reducer: "child callback" function, see :ref:`callbackfunctions`
"""
return self.reduce_dimension(dimension=self.metadata.temporal_dimension.name, reducer=reducer)
return self.reduce_dimension(
dimension=self.metadata.temporal_dimension.name if self.metadata else "t",
reducer=reducer,
)

@deprecated(
"Use :py:meth:`reduce_bands` with :py:class:`UDF <openeo.rest._datacube.UDF>` as reducer.",
Expand Down Expand Up @@ -1227,7 +1232,7 @@ def add_dimension(self, name: str, label: str, type: Optional[str] = None):
return self.process(
process_id="add_dimension",
arguments=dict_no_none({"data": self, "name": name, "label": label, "type": type}),
metadata=self.metadata.add_dimension(name=name, label=label, type=type)
metadata=self.metadata.add_dimension(name=name, label=label, type=type) if self.metadata else None,
)

@openeo_process
Expand All @@ -1245,7 +1250,7 @@ def drop_dimension(self, name: str):
return self.process(
process_id="drop_dimension",
arguments={"data": self, "name": name},
metadata=self.metadata.drop_dimension(name=name),
metadata=self.metadata.drop_dimension(name=name) if self.metadata else None,
)

@deprecated(
Expand Down Expand Up @@ -1500,9 +1505,12 @@ def ndvi(self, nir: str = None, red: str = None, target_band: str = None) -> Dat
:return: a DataCube instance
"""
if target_band is None:
if self.metadata is None:
metadata = None
elif target_band is None:
metadata = self.metadata.reduce_dimension(self.metadata.band_dimension.name)
else:
# TODO: first drop "bands" dim and re-add it with single "ndvi" band
metadata = self.metadata.append_band(Band(name=target_band, common_name="ndvi"))
return self.process(
process_id="ndvi",
Expand All @@ -1522,16 +1530,16 @@ def rename_dimension(self, source: str, target: str):
:return: A new datacube with the dimension renamed.
"""
if target in self.metadata.dimension_names():
if self._do_metadata_normalization() and target in self.metadata.dimension_names():
raise ValueError('Target dimension name conflicts with existing dimension: %s.' % target)
return self.process(
process_id="rename_dimension",
arguments=dict_no_none(
data=THIS,
source=self.metadata.assert_valid_dimension(source),
source=self._assert_valid_dimension_name(source),
target=target,
),
metadata=self.metadata.rename_dimension(source, target),
metadata=self.metadata.rename_dimension(source, target) if self.metadata else None,
)

@openeo_process
Expand All @@ -1549,11 +1557,11 @@ def rename_labels(self, dimension: str, target: list, source: list = None) -> Da
process_id="rename_labels",
arguments=dict_no_none(
data=THIS,
dimension=self.metadata.assert_valid_dimension(dimension),
dimension=self._assert_valid_dimension_name(dimension),
target=target,
source=source,
),
metadata=self.metadata.rename_labels(dimension, target, source),
metadata=self.metadata.rename_labels(dimension, target, source) if self.metadata else None,
)

@openeo_process(mode="apply")
Expand Down Expand Up @@ -1670,12 +1678,20 @@ def merge_cubes(
arguments = {"cube1": self, "cube2": other}
if overlap_resolver:
arguments["overlap_resolver"] = build_child_callback(overlap_resolver, parent_parameters=["x", "y"])
# Minimal client side metadata merging
merged_metadata = self.metadata
if self.metadata.has_band_dimension() and isinstance(other, DataCube) and other.metadata.has_band_dimension():
if (
self.metadata
and self.metadata.has_band_dimension()
and isinstance(other, DataCube)
and other.metadata
and other.metadata.has_band_dimension()
):
# Minimal client side metadata merging
merged_metadata = self.metadata
for b in other.metadata.band_dimension.bands:
if b not in merged_metadata.bands:
merged_metadata = merged_metadata.append_band(b)
else:
merged_metadata = None
# Overlapping bands without overlap resolver will give an error in the backend
if context:
arguments["context"] = context
Expand Down Expand Up @@ -1755,8 +1771,7 @@ def raster_to_vector(self) -> VectorCube:
:return: a :py:class:`~openeo.rest.vectorcube.VectorCube`
"""
pg_node = PGNode(process_id="raster_to_vector", arguments={"data": self})
# TODO: properly update metadata (e.g. "geometry" dimension) related to #457
return VectorCube(pg_node, connection=self._connection, metadata=self.metadata)
return VectorCube(pg_node, connection=self._connection)

####VIEW methods #######

Expand Down Expand Up @@ -2330,9 +2345,10 @@ def dimension_labels(self, dimension: str) -> DataCube:
:param dimension: The name of the dimension to get the labels for.
"""
dimension_names = self.metadata.dimension_names()
if dimension_names and dimension not in dimension_names:
raise ValueError(f"Invalid dimension name {dimension!r}, should be one of {dimension_names}")
if self._do_metadata_normalization():
dimension_names = self.metadata.dimension_names()
if dimension_names and dimension not in dimension_names:
raise ValueError(f"Invalid dimension name {dimension!r}, should be one of {dimension_names}")
return self.process(process_id="dimension_labels", arguments={"data": THIS, "dimension": dimension})

@openeo_process
Expand Down
9 changes: 4 additions & 5 deletions openeo/rest/vectorcube.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ class VectorCube(_ProcessGraphAbstraction):
A geometry is specified in a 'coordinate reference system'. https://www.w3.org/TR/sdw-bp/#dfn-coordinate-reference-system-(crs)
"""

def __init__(self, graph: PGNode, connection: Connection, metadata: CollectionMetadata = None):
def __init__(self, graph: PGNode, connection: Connection, metadata: Optional[CollectionMetadata] = None):
super().__init__(pgnode=graph, connection=connection)
self.metadata = metadata or self._build_metadata()
self.metadata = metadata

@classmethod
def _build_metadata(cls, add_properties: bool = False) -> CollectionMetadata:
Expand All @@ -48,7 +48,7 @@ def _build_metadata(cls, add_properties: bool = False) -> CollectionMetadata:
dimensions = [Dimension(name="geometry", type="geometry")]
if add_properties:
dimensions.append(Dimension(name="properties", type="other"))
# TODO: use a more generic metadata container than "collection" metadata
# TODO #464: use a more generic metadata container than "collection" metadata
return CollectionMetadata(metadata={}, dimensions=dimensions)

def process(
Expand Down Expand Up @@ -533,8 +533,7 @@ def apply_dimension(
{
"data": THIS,
"process": process,
# TODO: drop `just_warn`?
"dimension": self.metadata.assert_valid_dimension(dimension, just_warn=True),
"dimension": dimension,
"target_dimension": target_dimension,
"context": context,
}
Expand Down

0 comments on commit 5c2201a

Please sign in to comment.