Skip to content

Commit

Permalink
Issue #457/#104/#672 add automatic load_url support
Browse files Browse the repository at this point in the history
when providing URL to aggregate_spatial, mask_polygon, ...
  • Loading branch information
soxofaan committed Nov 27, 2024
1 parent 5a3e6f4 commit a239fe5
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 27 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Automatically use `load_url` when providing a URL as geometries to `DataCube.aggregate_spatial()`, `DataCube.mask_polygon()`, etc. ([#104](https://github.com/Open-EO/openeo-python-client/issues/104), [#457](https://github.com/Open-EO/openeo-python-client/issues/457))

### Changed

- `MultiBackendJobManager`: costs has been added as a column in tracking databases ([[#588](https://github.com/Open-EO/openeo-python-client/issues/588)])
Expand Down
99 changes: 85 additions & 14 deletions openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import datetime
import logging
import pathlib
import re
import typing
import urllib.parse
import warnings
from builtins import staticmethod
from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple, Union
Expand Down Expand Up @@ -584,7 +586,9 @@ def filter_bbox(
)

@openeo_process
def filter_spatial(self, geometries) -> DataCube:
def filter_spatial(
self, geometries: Union[shapely.geometry.base.BaseGeometry, dict, str, pathlib.Path, Parameter, VectorCube]
) -> DataCube:
"""
Limits the data cube over the spatial dimensions to the specified geometries.
Expand All @@ -597,10 +601,24 @@ def filter_spatial(self, geometries) -> DataCube:
More specifically, pixels outside of the bounding box of the given geometry will not be available after filtering.
All pixels inside the bounding box that are not retained will be set to null (no data).
:param geometries: One or more geometries used for filtering, specified as GeoJSON in EPSG:4326.
:param geometries: One or more geometries used for filtering, Can be provided in different ways:
- a shapely geometry
- a GeoJSON-style dictionary,
- a public URL to the geometries in a vector format that is supported by the backend
(also see :py:func:`Connection.list_file_formats() <openeo.rest.connection.Connection.list_file_formats>`),
e.g. GeoJSON, GeoParquet, etc.
A ``load_url`` process will automatically be added to the process graph.
- a path (that is valid for the back-end) to a GeoJSON file.
- a :py:class:`~openeo.rest.vectorcube.VectorCube` instance.
- a :py:class:`~openeo.api.process.Parameter` instance.
:return: A data cube restricted to the specified geometries. The dimensions and dimension properties (name,
type, labels, reference system and resolution) remain unchanged, except that the spatial dimensions have less
(or the same) dimension labels.
.. versionchanged:: 0.36.0
Support passing a URL as ``geometries`` argument, which will be loaded with the ``load_url`` process.
"""
valid_geojson_types = [
"Point", "MultiPoint", "LineString", "MultiLineString",
Expand Down Expand Up @@ -1052,15 +1070,29 @@ def _get_geometry_argument(
:param crs: value that encodes a coordinate reference system.
See :py:func:`openeo.util.normalize_crs` for more details about additional normalization that is applied to this argument.
"""
if isinstance(geometry, Parameter):
return geometry
elif isinstance(geometry, _FromNodeMixin):
return geometry.from_node()

if isinstance(geometry, str) and re.match(r"^https?://", geometry, flags=re.I):
# Geometry provided as URL: load with `load_url` (with best-effort format guess)
url = urllib.parse.urlparse(geometry)
suffix = pathlib.Path(url.path.lower()).suffix
format = {
".json": "GeoJSON",
".geojson": "GeoJSON",
".pq": "Parquet",
".parquet": "Parquet",
".geoparquet": "Parquet",
}.get(suffix, suffix.split(".")[-1])
return self.connection.load_url(url=geometry, format=format)

if isinstance(geometry, (str, pathlib.Path)):
# Assumption: `geometry` is path to polygon is a path to vector file at backend.
# TODO #104: `read_vector` is non-standard process.
# TODO: If path exists client side: load it client side?
return PGNode(process_id="read_vector", arguments={"filename": str(geometry)})
elif isinstance(geometry, Parameter):
return geometry
elif isinstance(geometry, _FromNodeMixin):
return geometry.from_node()

if isinstance(geometry, shapely.geometry.base.BaseGeometry):
geometry = mapping(geometry)
Expand Down Expand Up @@ -1107,8 +1139,18 @@ def aggregate_spatial(
Aggregates statistics for one or more geometries (e.g. zonal statistics for polygons)
over the spatial dimensions.
:param geometries: a shapely geometry, a GeoJSON-style dictionary,
a public GeoJSON URL, or a path (that is valid for the back-end) to a GeoJSON file.
:param geometries: The geometries to aggregate in. Can be provided in different ways:
- a shapely geometry
- a GeoJSON-style dictionary,
- a public URL to the geometries in a vector format that is supported by the backend
(also see :py:func:`Connection.list_file_formats() <openeo.rest.connection.Connection.list_file_formats>`),
e.g. GeoJSON, GeoParquet, etc.
A ``load_url`` process will automatically be added to the process graph.
- a path (that is valid for the back-end) to a GeoJSON file.
- a :py:class:`~openeo.rest.vectorcube.VectorCube` instance.
- a :py:class:`~openeo.api.process.Parameter` instance.
:param reducer: the "child callback":
the name of a single openEO process,
or a callback function as discussed in :ref:`callbackfunctions`,
Expand All @@ -1128,10 +1170,13 @@ def aggregate_spatial(
By default, longitude-latitude (EPSG:4326) is assumed.
See :py:func:`openeo.util.normalize_crs` for more details about additional normalization that is applied to this argument.
:param context: Additional data to be passed to the reducer process.
.. note:: this ``crs`` argument is a non-standard/experimental feature, only supported by specific back-ends.
See https://github.com/Open-EO/openeo-processes/issues/235 for details.
:param context: Additional data to be passed to the reducer process.
.. versionchanged:: 0.36.0
Support passing a URL as ``geometries`` argument, which will be loaded with the ``load_url`` process.
"""
valid_geojson_types = [
"Point", "MultiPoint", "LineString", "MultiLineString",
Expand Down Expand Up @@ -1461,8 +1506,18 @@ def apply_polygon(
the GeometriesOverlap exception is thrown.
Each sub data cube is passed individually to the given process.
:param geometries: Polygons, provided as a shapely geometry, a GeoJSON-style dictionary,
a public GeoJSON URL, or a path (that is valid for the back-end) to a GeoJSON file.
:param geometries: Can be provided in different ways:
- a shapely geometry
- a GeoJSON-style dictionary,
- a public URL to the geometries in a vector format that is supported by the backend
(also see :py:func:`Connection.list_file_formats() <openeo.rest.connection.Connection.list_file_formats>`),
e.g. GeoJSON, GeoParquet, etc.
A ``load_url`` process will automatically be added to the process graph.
- a path (that is valid for the back-end) to a GeoJSON file.
- a :py:class:`~openeo.rest.vectorcube.VectorCube` instance.
- a :py:class:`~openeo.api.process.Parameter` instance.
:param process: "child callback" function, see :ref:`callbackfunctions`
:param mask_value: The value used for pixels outside the polygon.
:param context: Additional data to be passed to the process.
Expand All @@ -1473,6 +1528,9 @@ def apply_polygon(
Argument ``polygons`` was renamed to ``geometries``.
While deprecated, the old name ``polygons`` is still supported
as keyword argument for backwards compatibility.
.. versionchanged:: 0.36.0
Support passing a URL as ``geometries`` argument, which will be loaded with the ``load_url`` process.
"""
# TODO drop support for legacy `polygons` argument:
# remove `kwargs, remove default `None` value for `geometries` and `process`
Expand Down Expand Up @@ -1957,14 +2015,27 @@ def mask_polygon(
The pixel values are replaced with the value specified for `replacement`,
which defaults to `no data`.
:param mask: The geometry to mask with: a shapely geometry, a GeoJSON-style dictionary,
a public GeoJSON URL, or a path (that is valid for the back-end) to a GeoJSON file.
:param mask: The geometry to mask with.an be provided in different ways:
- a shapely geometry
- a GeoJSON-style dictionary,
- a public URL to the geometries in a vector format that is supported by the backend
(also see :py:func:`Connection.list_file_formats() <openeo.rest.connection.Connection.list_file_formats>`),
e.g. GeoJSON, GeoParquet, etc.
A ``load_url`` process will automatically be added to the process graph.
- a path (that is valid for the back-end) to a GeoJSON file.
- a :py:class:`~openeo.rest.vectorcube.VectorCube` instance.
- a :py:class:`~openeo.api.process.Parameter` instance.
:param srs: The spatial reference system of the provided polygon.
By default longitude-latitude (EPSG:4326) is assumed.
.. note:: this ``srs`` argument is a non-standard/experimental feature, only supported by specific back-ends.
See https://github.com/Open-EO/openeo-processes/issues/235 for details.
:param replacement: the value to replace the masked pixels with
.. versionchanged:: 0.36.0
Support passing a URL as ``geometries`` argument, which will be loaded with the ``load_url`` process.
"""
valid_geojson_types = ["Polygon", "MultiPolygon", "GeometryCollection", "Feature", "FeatureCollection"]
mask = self._get_geometry_argument(mask, valid_geojson_types=valid_geojson_types, crs=srs)
Expand Down
162 changes: 149 additions & 13 deletions tests/rest/datacube/test_datacube100.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,21 +347,47 @@ def test_filter_bbox_args_and_kwargs_conflict(con100: Connection, args, kwargs,
con100.load_collection("S2").filter_bbox(*args, **kwargs)


def test_filter_spatial(con100: Connection, recwarn):
img = con100.load_collection("S2")
def test_filter_spatial(con100: Connection):
cube = con100.load_collection("S2")
polygon = shapely.geometry.box(0, 0, 1, 1)
masked = img.filter_spatial(geometries=polygon)
assert sorted(masked.flat_graph().keys()) == ["filterspatial1", "loadcollection1"]
assert masked.flat_graph()["filterspatial1"] == {
"process_id": "filter_spatial",
"arguments": {
"data": {"from_node": "loadcollection1"},
"geometries": {
"type": "Polygon",
"coordinates": (((1.0, 0.0), (1.0, 1.0), (0.0, 1.0), (0.0, 0.0), (1.0, 0.0)),),
}
masked = cube.filter_spatial(geometries=polygon)
assert get_download_graph(masked, drop_save_result=True, drop_load_collection=True) == {
"filterspatial1": {
"process_id": "filter_spatial",
"arguments": {
"data": {"from_node": "loadcollection1"},
"geometries": {
"type": "Polygon",
"coordinates": [[[1.0, 0.0], [1.0, 1.0], [0.0, 1.0], [0.0, 0.0], [1.0, 0.0]]],
},
},
}
}


@pytest.mark.parametrize(
["url", "expected_format"],
[
("https://example.com/geometry.json", "GeoJSON"),
("https://example.com/geometry.geojson", "GeoJSON"),
("https://example.com/geometry.GeoJSON", "GeoJSON"),
("https://example.com/geometry.pq", "Parquet"),
("https://example.com/geometry.parquet", "Parquet"),
("https://example.com/geometry.GeoParquet", "Parquet"),
],
)
def test_filter_spatial_geometry_url(con100: Connection, url, expected_format):
cube = con100.load_collection("S2")
masked = cube.filter_spatial(geometries=url)
assert get_download_graph(masked, drop_save_result=True, drop_load_collection=True) == {
"loadurl1": {
"process_id": "load_url",
"arguments": {"url": url, "format": expected_format},
},
"filterspatial1": {
"process_id": "filter_spatial",
"arguments": {"data": {"from_node": "loadcollection1"}, "geometries": {"from_node": "loadurl1"}},
},
"result": True
}


Expand Down Expand Up @@ -595,6 +621,44 @@ def test_aggregate_spatial_geometry_from_node(con100: Connection, get_geometries
}


@pytest.mark.parametrize(
["url", "expected_format"],
[
("https://example.com/geometry.json", "GeoJSON"),
("https://example.com/geometry.geojson", "GeoJSON"),
("https://example.com/geometry.GeoJSON", "GeoJSON"),
("https://example.com/geometry.pq", "Parquet"),
("https://example.com/geometry.parquet", "Parquet"),
("https://example.com/geometry.GeoParquet", "Parquet"),
],
)
def test_aggregate_spatial_geometry_url(con100: Connection, url, expected_format):
cube = con100.load_collection("S2")
result = cube.aggregate_spatial(geometries=url, reducer="mean")
assert get_download_graph(result, drop_save_result=True, drop_load_collection=True) == {
"loadurl1": {
"process_id": "load_url",
"arguments": {"url": url, "format": expected_format},
},
"aggregatespatial1": {
"process_id": "aggregate_spatial",
"arguments": {
"data": {"from_node": "loadcollection1"},
"geometries": {"from_node": "loadurl1"},
"reducer": {
"process_graph": {
"mean1": {
"process_id": "mean",
"arguments": {"data": {"from_parameter": "data"}},
"result": True,
}
}
},
},
},
}


def test_aggregate_spatial_window(con100: Connection):
img = con100.load_collection("S2")
size = [5, 3]
Expand Down Expand Up @@ -810,6 +874,35 @@ def test_mask_polygon_from_node(con100: Connection, get_geometries):
}


@pytest.mark.parametrize(
["url", "expected_format"],
[
("https://example.com/geometry.json", "GeoJSON"),
("https://example.com/geometry.geojson", "GeoJSON"),
("https://example.com/geometry.GeoJSON", "GeoJSON"),
("https://example.com/geometry.pq", "Parquet"),
("https://example.com/geometry.parquet", "Parquet"),
("https://example.com/geometry.GeoParquet", "Parquet"),
],
)
def test_mask_polygon_geometry_url(con100: Connection, url, expected_format):
cube = con100.load_collection("S2")
masked = cube.mask_polygon(mask=url)
assert get_download_graph(masked, drop_save_result=True, drop_load_collection=True) == {
"loadurl1": {
"process_id": "load_url",
"arguments": {"url": url, "format": expected_format},
},
"maskpolygon1": {
"process_id": "mask_polygon",
"arguments": {
"data": {"from_node": "loadcollection1"},
"mask": {"from_node": "loadurl1"},
},
},
}


def test_mask_raster(con100: Connection):
img = con100.load_collection("S2")
mask = con100.load_collection("MASK")
Expand Down Expand Up @@ -1768,6 +1861,49 @@ def test_apply_polygon_context(con100: Connection, geometries_argument, geometri
}


@pytest.mark.parametrize(
["url", "expected_format"],
[
("https://example.com/geometry.json", "GeoJSON"),
("https://example.com/geometry.geojson", "GeoJSON"),
("https://example.com/geometry.GeoJSON", "GeoJSON"),
("https://example.com/geometry.pq", "Parquet"),
("https://example.com/geometry.parquet", "Parquet"),
("https://example.com/geometry.GeoParquet", "Parquet"),
],
)
def test_apply_polygon_geometry_url(con100: Connection, url, expected_format):
cube = con100.load_collection("S2")
process = UDF(code="myfancycode", runtime="Python")
result = cube.apply_polygon(geometries=url, process=process)
assert get_download_graph(result, drop_save_result=True, drop_load_collection=True) == {
"loadurl1": {
"process_id": "load_url",
"arguments": {"url": url, "format": expected_format},
},
"applypolygon1": {
"process_id": "apply_polygon",
"arguments": {
"data": {"from_node": "loadcollection1"},
"geometries": {"from_node": "loadurl1"},
"process": {
"process_graph": {
"runudf1": {
"process_id": "run_udf",
"arguments": {
"data": {"from_parameter": "data"},
"runtime": "Python",
"udf": "myfancycode",
},
"result": True,
}
}
},
},
},
}


def test_metadata_load_collection_100(con100, requests_mock):
requests_mock.get(API_URL + "/collections/SENTINEL2", json={
"cube:dimensions": {
Expand Down

0 comments on commit a239fe5

Please sign in to comment.