diff --git a/openeo_driver/_version.py b/openeo_driver/_version.py index d071bde0..39e8de23 100644 --- a/openeo_driver/_version.py +++ b/openeo_driver/_version.py @@ -1 +1 @@ -__version__ = "0.59.0a1" +__version__ = "0.60.0a1" diff --git a/openeo_driver/datacube.py b/openeo_driver/datacube.py index 76831ac1..e33cd9b1 100644 --- a/openeo_driver/datacube.py +++ b/openeo_driver/datacube.py @@ -7,7 +7,7 @@ import io import geopandas as gpd -import numpy as np +import numpy import pyproj import shapely.geometry import shapely.geometry.base @@ -17,11 +17,13 @@ import requests from openeo.metadata import CollectionMetadata -from openeo.util import ensure_dir +from openeo.util import ensure_dir, str_truncate +import openeo.udf from openeo_driver.datastructs import SarBackscatterArgs, ResolutionMergeArgs, StacAsset from openeo_driver.errors import FeatureUnsupportedException, InternalException from openeo_driver.util.geometry import GeometryBufferer, validate_geojson_coordinates from openeo_driver.util.ioformats import IOFORMATS +from openeo_driver.util.pgparsing import SingleRunUDFProcessGraph from openeo_driver.util.utm import area_in_square_meters from openeo_driver.utils import EvalEnv @@ -214,13 +216,15 @@ class DriverVectorCube: These components are "joined" on the GeoPandas dataframe's index and DataArray first dimension """ DIM_GEOMETRIES = "geometries" - FLATTEN_PREFIX = "vc" + DIM_BANDS = "bands" + DIM_PROPERTIES = "properties" + COLUMN_SELECTION_ALL = "all" + COLUMN_SELECTION_NUMERICAL = "numerical" def __init__( self, geometries: gpd.GeoDataFrame, cube: Optional[xarray.DataArray] = None, - flatten_prefix: str = FLATTEN_PREFIX, ): """ @@ -234,18 +238,77 @@ def __init__( log.error(f"First cube dim should be {self.DIM_GEOMETRIES!r} but got dims {cube.dims!r}") raise VectorCubeError("Cube's first dimension is invalid.") if not geometries.index.equals(cube.indexes[cube.dims[0]]): - log.error(f"Invalid VectorCube components {geometries.index!r} != {cube.indexes[cube.dims[0]]!r}") + log.error(f"Invalid VectorCube components {geometries.index=} != {cube.indexes[cube.dims[0]]=}") raise VectorCubeError("Incompatible vector cube components") self._geometries: gpd.GeoDataFrame = geometries self._cube = cube - self._flatten_prefix = flatten_prefix - def with_cube(self, cube: xarray.DataArray, flatten_prefix: str = FLATTEN_PREFIX) -> "DriverVectorCube": + def with_cube(self, cube: xarray.DataArray) -> "DriverVectorCube": """Create new vector cube with same geometries but new cube""" log.info(f"Creating vector cube with new cube {cube.name!r}") - return type(self)( - geometries=self._geometries, cube=cube, flatten_prefix=flatten_prefix - ) + return type(self)(geometries=self._geometries, cube=cube) + + @classmethod + def from_geodataframe( + cls, + data: gpd.GeoDataFrame, + *, + columns_for_cube: Union[List[str], str] = COLUMN_SELECTION_NUMERICAL, + dimension_name: str = DIM_PROPERTIES, + ) -> "DriverVectorCube": + """ + Build a DriverVectorCube from given GeoPandas data frame, + using the data frame geometries as vector cube geometries + and other columns (as specified) as cube values along a "bands" dimension + + :param data: geopandas data frame + :param columns_for_cube: which data frame columns to use as cube values. + One of: + - "numerical": automatically pick numerical columns + - "all": use all columns as cube values + - list of column names + :param dimension_name: name of the "bands" dimension + :return: vector cube + """ + available_columns = [c for c in data.columns if c != "geometry"] + + if columns_for_cube is None: + # TODO #114: what should default selection be? + columns_for_cube = cls.COLUMN_SELECTION_NUMERICAL + + if columns_for_cube == cls.COLUMN_SELECTION_NUMERICAL: + columns_for_cube = [c for c in available_columns if numpy.issubdtype(data[c].dtype, numpy.number)] + elif columns_for_cube == cls.COLUMN_SELECTION_ALL: + columns_for_cube = available_columns + elif isinstance(columns_for_cube, list): + # TODO #114 limit to subset with available columns (and automatically fill in missing columns with nodata)? + columns_for_cube = columns_for_cube + else: + raise ValueError(columns_for_cube) + assert isinstance(columns_for_cube, list) + + if columns_for_cube: + cube_df = data[columns_for_cube] + # TODO: remove `columns_for_cube` from geopandas data frame? + # Enabling that triggers failure of som existing tests that use `aggregate_spatial` + # to "enrich" a vector cube with pre-existing properties + # Also see https://github.com/Open-EO/openeo-api/issues/504 + # geometries_df = data.drop(columns=columns_for_cube) + geometries_df = data + + # TODO: leverage pandas `to_xarray` and xarray `to_array` instead of this manual building? + cube: xarray.DataArray = xarray.DataArray( + data=cube_df.values, + dims=[cls.DIM_GEOMETRIES, dimension_name], + coords={ + cls.DIM_GEOMETRIES: data.geometry.index.to_list(), + dimension_name: cube_df.columns, + }, + ) + return cls(geometries=geometries_df, cube=cube) + + else: + return cls(geometries=data) @classmethod def from_fiona( @@ -258,15 +321,21 @@ def from_fiona( if len(paths) != 1: # TODO #114 EP-3981: support multiple paths raise FeatureUnsupportedException(message="Loading a vector cube from multiple files is not supported") + columns_for_cube = (options or {}).get("columns_for_cube", cls.COLUMN_SELECTION_NUMERICAL) # TODO #114 EP-3981: lazy loading like/with DelayedVector # note for GeoJSON: will consider Feature.id as well as Feature.properties.id if "parquet" == driver: - return cls.from_parquet(paths=paths) + return cls.from_parquet(paths=paths, columns_for_cube=columns_for_cube) else: - return cls(geometries=gpd.read_file(paths[0], driver=driver)) + gdf = gpd.read_file(paths[0], driver=driver) + return cls.from_geodataframe(gdf, columns_for_cube=columns_for_cube) @classmethod - def from_parquet(cls, paths: List[Union[str, Path]]): + def from_parquet( + cls, + paths: List[Union[str, Path]], + columns_for_cube: Union[List[str], str] = COLUMN_SELECTION_NUMERICAL, + ): if len(paths) != 1: # TODO #114 EP-3981: support multiple paths raise FeatureUnsupportedException( @@ -284,10 +353,14 @@ def from_parquet(cls, paths: List[Union[str, Path]]): if "OGC:CRS84" in str(df.crs) or "WGS 84 (CRS84)" in str(df.crs): # workaround for not being able to decode ogc:crs84 df.crs = CRS.from_epsg(4326) - return cls(geometries=df) + return cls.from_geodataframe(df, columns_for_cube=columns_for_cube) @classmethod - def from_geojson(cls, geojson: dict) -> "DriverVectorCube": + def from_geojson( + cls, + geojson: dict, + columns_for_cube: Union[List[str], str] = COLUMN_SELECTION_NUMERICAL, + ) -> "DriverVectorCube": """Construct vector cube from GeoJson dict structure""" validate_geojson_coordinates(geojson) # TODO support more geojson types? @@ -305,7 +378,8 @@ def from_geojson(cls, geojson: dict) -> "DriverVectorCube": raise FeatureUnsupportedException( f"Can not construct DriverVectorCube from {geojson.get('type', type(geojson))!r}" ) - return cls(geometries=gpd.GeoDataFrame.from_features(features)) + gdf = gpd.GeoDataFrame.from_features(features) + return cls.from_geodataframe(gdf, columns_for_cube=columns_for_cube) @classmethod def from_geometry( @@ -320,7 +394,9 @@ def from_geometry( geometry = [geometry] return cls(geometries=gpd.GeoDataFrame(geometry=geometry)) - def _as_geopandas_df(self) -> gpd.GeoDataFrame: + def _as_geopandas_df( + self, flatten_prefix: Optional[str] = None, flatten_name_joiner: str = "~" + ) -> gpd.GeoDataFrame: """Join geometries and cube as a geopandas dataframe""" # TODO: avoid copy? df = self._geometries.copy(deep=True) @@ -331,18 +407,20 @@ def _as_geopandas_df(self) -> gpd.GeoDataFrame: if self._cube.dims[1:]: stacked = self._cube.stack(prop=self._cube.dims[1:]) log.info(f"Flattened cube component of vector cube to {stacked.shape[1]} properties") + name_prefix = [flatten_prefix] if flatten_prefix else [] for p in stacked.indexes["prop"]: - name = "~".join(str(x) for x in [self._flatten_prefix] + list(p)) + name = flatten_name_joiner.join(str(x) for x in name_prefix + list(p)) # TODO: avoid column collisions? df[name] = stacked.sel(prop=p) else: - df[self._flatten_prefix] = self._cube + # TODO: better fallback column/property name in this case? + df[flatten_prefix or "_vc"] = self._cube return df - def to_geojson(self) -> dict: + def to_geojson(self, flatten_prefix: Optional[str] = None) -> dict: """Export as GeoJSON FeatureCollection.""" - return shapely.geometry.mapping(self._as_geopandas_df()) + return shapely.geometry.mapping(self._as_geopandas_df(flatten_prefix=flatten_prefix)) def to_wkt(self) -> List[str]: wkts = [str(g) for g in self._geometries.geometry] @@ -366,7 +444,8 @@ def write_assets( ) return self.to_legacy_save_result().write_assets(directory) - self._as_geopandas_df().to_file(path, driver=format_info.fiona_driver) + gdf = self._as_geopandas_df(flatten_prefix=options.get("flatten_prefix")) + gdf.to_file(path, driver=format_info.fiona_driver) if not format_info.multi_file: # single file format @@ -461,6 +540,9 @@ def geometry_count(self) -> int: def get_geometries(self) -> Sequence[shapely.geometry.base.BaseGeometry]: return self._geometries.geometry + def get_cube(self) -> Optional[xarray.DataArray]: + return self._cube + def get_ids(self) -> Optional[Sequence]: return self._geometries.get("id") @@ -471,8 +553,9 @@ def get_xarray_cube_basics(self) -> Tuple[tuple, dict]: return dims, coords def __eq__(self, other): - return (isinstance(other, DriverVectorCube) - and np.array_equal(self._as_geopandas_df().values, other._as_geopandas_df().values)) + return isinstance(other, DriverVectorCube) and numpy.array_equal( + self._as_geopandas_df().values, other._as_geopandas_df().values + ) def fit_class_random_forest( self, @@ -504,6 +587,49 @@ def buffer_points(self, distance: float = 10) -> "DriverVectorCube": ] ) + def apply_dimension( + self, + process: dict, + *, + dimension: str, + target_dimension: Optional[str] = None, + context: Optional[dict] = None, + env: EvalEnv, + ) -> "DriverVectorCube": + single_run_udf = SingleRunUDFProcessGraph.parse_or_none(process) + + if single_run_udf: + # Process with single "run_udf" node + # TODO: check provided dimension with actual dimension of the cube + if dimension in (self.DIM_BANDS, self.DIM_PROPERTIES) and target_dimension is None: + log.warning( + f"Using experimental feature: DriverVectorCube.apply_dimension along dim {dimension} and empty cube" + ) + # TODO: this is non-standard special case: vector cube with only geometries, but no "cube" data + gdf = self._as_geopandas_df() + feature_collection = openeo.udf.FeatureCollection(id="_", data=gdf) + udf_data = openeo.udf.UdfData( + proj={"EPSG": self._geometries.crs.to_epsg()}, + feature_collection_list=[feature_collection], + user_context=context, + ) + log.info(f"[run_udf] Running UDF {str_truncate(single_run_udf.udf, width=256)!r} on {udf_data!r}") + result_data = env.backend_implementation.processing.run_udf(udf=single_run_udf.udf, data=udf_data) + log.info(f"[run_udf] UDF resulted in {result_data!r}") + + if not isinstance(result_data, openeo.udf.UdfData): + raise ValueError(f"UDF should return UdfData, but got {type(result_data)}") + result_features = result_data.get_feature_collection_list() + if not (result_features and len(result_features) == 1): + raise ValueError( + f"UDF should return single feature collection but got {result_features and len(result_features)}" + ) + return DriverVectorCube(geometries=result_features[0].data) + + raise FeatureUnsupportedException( + message=f"DriverVectorCube.apply_dimension with {dimension=} and {bool(single_run_udf)=}" + ) + class DriverMlModel: """Base class for driver-side 'ml-model' data structures""" diff --git a/openeo_driver/dummy/dummy_backend.py b/openeo_driver/dummy/dummy_backend.py index 4460229f..d2d29004 100644 --- a/openeo_driver/dummy/dummy_backend.py +++ b/openeo_driver/dummy/dummy_backend.py @@ -265,7 +265,7 @@ def assert_polygon_sequence(geometries: Union[Sequence, BaseMultipartGeometry]) coords=coords, name="aggregate_spatial", ) - return geometries.with_cube(cube=cube, flatten_prefix="agg") + return geometries.with_cube(cube=cube) elif isinstance(geometries, str): geometries = [geometry for geometry in DelayedVector(geometries).geometries] n_geometries = assert_polygon_sequence(geometries) diff --git a/openeo_driver/util/pgparsing.py b/openeo_driver/util/pgparsing.py new file mode 100644 index 00000000..dcca5b7a --- /dev/null +++ b/openeo_driver/util/pgparsing.py @@ -0,0 +1,45 @@ +import dataclasses +from typing import Optional + + +class NotASingleRunUDFProcessGraph(ValueError): + pass + + +@dataclasses.dataclass(frozen=True) +class SingleRunUDFProcessGraph: + """ + Container (and parser) for a callback process graph containing only a single `run_udf` node. + """ + + data: dict + udf: str + runtime: str + version: Optional[str] = None + context: Optional[dict] = None + + @classmethod + def parse(cls, process_graph: dict) -> "SingleRunUDFProcessGraph": + try: + (node,) = process_graph.values() + assert node["process_id"] == "run_udf" + assert node["result"] is True + arguments = node["arguments"] + assert {"data", "udf", "runtime"}.issubset(arguments.keys()) + + return cls( + data=arguments["data"], + udf=arguments["udf"], + runtime=arguments["runtime"], + version=arguments.get("version"), + context=arguments.get("context") or {}, + ) + except Exception as e: + raise NotASingleRunUDFProcessGraph(str(e)) from e + + @classmethod + def parse_or_none(cls, process_graph: dict) -> Optional["SingleNodeRunUDFProcessGraph"]: + try: + return cls.parse(process_graph=process_graph) + except NotASingleRunUDFProcessGraph: + return None diff --git a/tests/test_vectorcube.py b/tests/test_vectorcube.py index 7c61a0d9..c3ce62ba 100644 --- a/tests/test_vectorcube.py +++ b/tests/test_vectorcube.py @@ -1,3 +1,6 @@ +import textwrap + +import geopandas import geopandas as gpd import numpy.testing import pyproj @@ -9,6 +12,7 @@ from openeo_driver.datacube import DriverVectorCube from openeo_driver.testing import DictSubSet, ApproxGeometry from openeo_driver.util.geometry import as_geojson_feature_collection +from openeo_driver.utils import EvalEnv from .data import get_path @@ -22,6 +26,10 @@ def gdf(self) -> gpd.GeoDataFrame: df = gpd.read_file(path) return df + @pytest.fixture + def vc(self, gdf) -> DriverVectorCube: + return DriverVectorCube(geometries=gdf) + def test_basic(self, gdf): vc = DriverVectorCube(gdf) assert vc.get_bounding_box() == (1, 1, 5, 4) @@ -85,37 +93,153 @@ def test_with_cube_to_geojson(self, gdf): dims += ("bands",) coords["bands"] = ["red", "green"] cube = xarray.DataArray(data=[[1, 2], [3, 4]], dims=dims, coords=coords) - vc2 = vc1.with_cube(cube, flatten_prefix="bandz") - assert vc1.to_geojson() == DictSubSet({ - "type": "FeatureCollection", - "features": [ - DictSubSet({ - "type": "Feature", - "geometry": {"type": "Polygon", "coordinates": (((1, 1), (3, 1), (2, 3), (1, 1)),)}, - "properties": {"id": "first", "pop": 1234}, - }), - DictSubSet({ - "type": "Feature", - "geometry": {"type": "Polygon", "coordinates": (((4, 2), (5, 4), (3, 4), (4, 2)),)}, - "properties": {"id": "second", "pop": 5678}, - }), - ] - }) - assert vc2.to_geojson() == DictSubSet({ - "type": "FeatureCollection", - "features": [ - DictSubSet({ - "type": "Feature", - "geometry": {"type": "Polygon", "coordinates": (((1, 1), (3, 1), (2, 3), (1, 1)),)}, - "properties": {"id": "first", "pop": 1234, "bandz~red": 1, "bandz~green": 2}, - }), - DictSubSet({ - "type": "Feature", - "geometry": {"type": "Polygon", "coordinates": (((4, 2), (5, 4), (3, 4), (4, 2)),)}, - "properties": {"id": "second", "pop": 5678, "bandz~red": 3, "bandz~green": 4}, - }), - ] - }) + vc2 = vc1.with_cube(cube) + assert vc1.to_geojson() == DictSubSet( + { + "type": "FeatureCollection", + "features": [ + DictSubSet( + { + "type": "Feature", + "geometry": {"type": "Polygon", "coordinates": (((1, 1), (3, 1), (2, 3), (1, 1)),)}, + "properties": {"id": "first", "pop": 1234}, + } + ), + DictSubSet( + { + "type": "Feature", + "geometry": {"type": "Polygon", "coordinates": (((4, 2), (5, 4), (3, 4), (4, 2)),)}, + "properties": {"id": "second", "pop": 5678}, + } + ), + ], + } + ) + assert vc2.to_geojson() == DictSubSet( + { + "type": "FeatureCollection", + "features": [ + DictSubSet( + { + "type": "Feature", + "geometry": {"type": "Polygon", "coordinates": (((1, 1), (3, 1), (2, 3), (1, 1)),)}, + "properties": {"id": "first", "pop": 1234, "red": 1, "green": 2}, + } + ), + DictSubSet( + { + "type": "Feature", + "geometry": {"type": "Polygon", "coordinates": (((4, 2), (5, 4), (3, 4), (4, 2)),)}, + "properties": {"id": "second", "pop": 5678, "red": 3, "green": 4}, + } + ), + ], + } + ) + assert vc2.to_geojson(flatten_prefix="bandz") == DictSubSet( + { + "type": "FeatureCollection", + "features": [ + DictSubSet( + { + "type": "Feature", + "geometry": {"type": "Polygon", "coordinates": (((1, 1), (3, 1), (2, 3), (1, 1)),)}, + "properties": {"id": "first", "pop": 1234, "bandz~red": 1, "bandz~green": 2}, + } + ), + DictSubSet( + { + "type": "Feature", + "geometry": {"type": "Polygon", "coordinates": (((4, 2), (5, 4), (3, 4), (4, 2)),)}, + "properties": {"id": "second", "pop": 5678, "bandz~red": 3, "bandz~green": 4}, + } + ), + ], + } + ) + + def test_from_geodataframe_default(self, gdf): + vc = DriverVectorCube.from_geodataframe(gdf) + assert vc.to_geojson() == DictSubSet( + { + "type": "FeatureCollection", + "features": [ + DictSubSet( + { + "type": "Feature", + "properties": {"id": "first", "pop": 1234}, + "geometry": { + "coordinates": (((1.0, 1.0), (3.0, 1.0), (2.0, 3.0), (1.0, 1.0)),), + "type": "Polygon", + }, + } + ), + DictSubSet( + { + "type": "Feature", + "properties": {"id": "second", "pop": 5678}, + "geometry": { + "coordinates": (((4.0, 2.0), (5.0, 4.0), (3.0, 4.0), (4.0, 2.0)),), + "type": "Polygon", + }, + } + ), + ], + } + ) + cube = vc.get_cube() + assert cube.dims == ("geometries", "properties") + assert cube.shape == (2, 1) + assert {k: list(v.values) for k, v in cube.coords.items()} == {"geometries": [0, 1], "properties": ["pop"]} + + @pytest.mark.parametrize( + ["columns_for_cube", "expected"], + [ + ("numerical", {"shape": (2, 1), "coords": {"geometries": [0, 1], "properties": ["pop"]}}), + ("all", {"shape": (2, 2), "coords": {"geometries": [0, 1], "properties": ["id", "pop"]}}), + ([], None), + (["id"], {"shape": (2, 1), "coords": {"geometries": [0, 1], "properties": ["id"]}}), + (["pop", "id"], {"shape": (2, 2), "coords": {"geometries": [0, 1], "properties": ["pop", "id"]}}), + # TODO: test specifying non-existent column (to be filled with no-data): + # (["pop", "nopenope"], {"shape": (2, 2), "coords": {"geometries": [0, 1], "properties": ["pop", "nopenope"]}}), + ], + ) + def test_from_geodataframe_columns_for_cube(self, gdf, columns_for_cube, expected): + vc = DriverVectorCube.from_geodataframe(gdf, columns_for_cube=columns_for_cube) + assert vc.to_geojson() == DictSubSet( + { + "type": "FeatureCollection", + "features": [ + DictSubSet( + { + "type": "Feature", + "properties": {"id": "first", "pop": 1234}, + "geometry": { + "coordinates": (((1.0, 1.0), (3.0, 1.0), (2.0, 3.0), (1.0, 1.0)),), + "type": "Polygon", + }, + } + ), + DictSubSet( + { + "type": "Feature", + "properties": {"id": "second", "pop": 5678}, + "geometry": { + "coordinates": (((4.0, 2.0), (5.0, 4.0), (3.0, 4.0), (4.0, 2.0)),), + "type": "Polygon", + }, + } + ), + ], + } + ) + cube = vc.get_cube() + if expected is None: + assert cube is None + else: + assert cube.dims == ("geometries", "properties") + assert cube.shape == expected["shape"] + assert {k: list(v.values) for k, v in cube.coords.items()} == expected["coords"] @pytest.mark.parametrize(["geojson", "expected"], [ ( @@ -335,7 +459,7 @@ def test_from_geometry(self, geometry, expected): ], ) def test_from_fiona(self, path, driver): - vc = DriverVectorCube.from_fiona([path], driver=driver) + vc = DriverVectorCube.from_fiona([path], driver=driver, options={"columns_for_cube": []}) assert vc.to_geojson() == DictSubSet( { "type": "FeatureCollection", @@ -446,3 +570,52 @@ def test_buffer_points(self): ], } ) + + @pytest.mark.parametrize("dimension", ["bands", "properties"]) + def test_apply_dimension_run_udf_change_geometry(self, gdf, backend_implementation, dimension): + vc = DriverVectorCube.from_geodataframe(gdf, dimension_name=dimension) + udf = textwrap.dedent( + """ + from openeo.udf import UdfData, FeatureCollection + def process_geometries(udf_data: UdfData) -> UdfData: + [feature_collection] = udf_data.get_feature_collection_list() + gdf = feature_collection.data + gdf["geometry"] = gdf["geometry"].buffer(distance=1, resolution=2) + udf_data.set_feature_collection_list([ + FeatureCollection(id="_", data=gdf), + ]) + """ + ) + callback = { + "runudf1": { + "process_id": "run_udf", + "arguments": {"data": {"from_parameter": "data"}, "udf": udf, "runtime": "Python"}, + "result": True, + } + } + env = EvalEnv({"backend_implementation": backend_implementation}) + result = vc.apply_dimension(process=callback, dimension=dimension, env=env) + assert isinstance(result, DriverVectorCube) + feature_collection = result.to_geojson() + assert feature_collection == DictSubSet( + { + "type": "FeatureCollection", + "bbox": pytest.approx((0, 0, 6, 5), abs=0.1), + "features": [ + { + "type": "Feature", + "bbox": pytest.approx((0, 0, 4, 4), abs=0.1), + "geometry": DictSubSet({"type": "Polygon"}), + "id": "0", + "properties": {"id": "first", "pop": 1234}, + }, + { + "type": "Feature", + "bbox": pytest.approx((2, 1, 6, 5), abs=0.1), + "geometry": DictSubSet({"type": "Polygon"}), + "id": "1", + "properties": {"id": "second", "pop": 5678}, + }, + ], + } + ) diff --git a/tests/test_views_execute.py b/tests/test_views_execute.py index 43da0118..94701a5f 100644 --- a/tests/test_views_execute.py +++ b/tests/test_views_execute.py @@ -1047,7 +1047,7 @@ def test_aggregate_spatial_vector_cube_basic(api100, feature_collection_test_pat "lc": {"process_id": "load_collection", "arguments": {"id": "S2_FOOBAR", "bands": ["B02", "B03", "B04"]}}, "lf": { "process_id": "load_uploaded_files", - "arguments": {"paths": [str(path)], "format": "GeoJSON"}, + "arguments": {"paths": [str(path)], "format": "GeoJSON", "options": {"columns_for_cube": []}}, }, "ag": { "process_id": "aggregate_spatial", @@ -1058,8 +1058,12 @@ def test_aggregate_spatial_vector_cube_basic(api100, feature_collection_test_pat "mean": {"process_id": "mean", "arguments": {"data": {"from_parameter": "data"}}, "result": True}} } }, + }, + "sr": { + "process_id": "save_result", + "arguments": {"data": {"from_node": "ag"}, "format": "GeoJSON", "options": {"flatten_prefix": "agg"}}, "result": True - } + }, } res = api100.check_result(pg) @@ -1212,19 +1216,29 @@ def test_aggregate_spatial_vector_cube_dimensions( "lc": {"process_id": "load_collection", "arguments": {"id": "S2_FOOBAR", "bands": ["B02", "B03", "B04"]}}, "lf": { "process_id": "load_uploaded_files", - "arguments": {"paths": [str(path)], "format": "GeoJSON"}, + "arguments": {"paths": [str(path)], "format": "GeoJSON", "options": {"columns_for_cube": []}}, }, "ag": { "process_id": "aggregate_spatial", "arguments": { "data": {"from_node": aggregate_data}, "geometries": {"from_node": "lf"}, - "reducer": {"process_graph": { - "mean": {"process_id": "mean", "arguments": {"data": {"from_parameter": "data"}}, "result": True}} - } + "reducer": { + "process_graph": { + "mean": { + "process_id": "mean", + "arguments": {"data": {"from_parameter": "data"}}, + "result": True, + } + } + }, }, + }, + "sr": { + "process_id": "save_result", + "arguments": {"data": {"from_node": "ag"}, "format": "GeoJSON", "options": {"flatten_prefix": "agg"}}, "result": True - } + }, } pg.update(preprocess_pg) res = api100.check_result(pg) @@ -3299,10 +3313,10 @@ def test_fit_class_random_forest(api100): "id": "0", "geometry": geom1, "properties": { - "agg~B02": 2.345, - "agg~B03": None, - "agg~B04": 2.0, - "agg~B08": 3.0, + "B02": 2.345, + "B03": None, + "B04": 2.0, + "B08": 3.0, "target": 0, }, } @@ -3313,10 +3327,10 @@ def test_fit_class_random_forest(api100): "id": "1", "geometry": geom2, "properties": { - "agg~B02": 4.0, - "agg~B03": 5.0, - "agg~B04": 6.0, - "agg~B08": 7.0, + "B02": 4.0, + "B03": 5.0, + "B04": 6.0, + "B08": 7.0, "target": 1, }, } diff --git a/tests/util/test_pgparsing.py b/tests/util/test_pgparsing.py new file mode 100644 index 00000000..a11f80c7 --- /dev/null +++ b/tests/util/test_pgparsing.py @@ -0,0 +1,78 @@ +import pytest + +from openeo_driver.util.pgparsing import SingleRunUDFProcessGraph, NotASingleRunUDFProcessGraph + + +class TestSingleRunUDFProcessGraph: + def test_parse_basic(self): + pg = { + "runudf1": { + "process_id": "run_udf", + "arguments": { + "data": {"from_parameter": "data"}, + "udf": "print('Hello world')", + "runtime": "Python", + }, + "result": True, + } + } + run_udf = SingleRunUDFProcessGraph.parse(pg) + assert run_udf.data == {"from_parameter": "data"} + assert run_udf.udf == "print('Hello world')" + assert run_udf.runtime == "Python" + assert run_udf.version is None + assert run_udf.context == {} + + @pytest.mark.parametrize( + "pg", + [ + { + "runudf1": { + "process_id": "run_udffffffffffffffff", + "arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4", "runtime": "Python"}, + "result": True, + } + }, + { + "runudf1": { + "process_id": "run_udf", + "arguments": {"udf": "x = 4", "runtime": "Python"}, + "result": True, + } + }, + { + "runudf1": { + "process_id": "run_udf", + "arguments": {"data": {"from_parameter": "data"}, "runtime": "Python"}, + "result": True, + } + }, + { + "runudf1": { + "process_id": "run_udf", + "arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4"}, + "result": True, + } + }, + { + "runudf1": { + "process_id": "run_udf", + "arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4", "runtime": "Python"}, + } + }, + { + "runudf1": { + "process_id": "run_udf", + "arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4", "runtime": "Python"}, + "result": True, + }, + "runudf2": { + "process_id": "run_udf", + "arguments": {"data": {"from_parameter": "data"}, "udf": "x = 4", "runtime": "Python"}, + }, + }, + ], + ) + def test_parse_invalid(self, pg): + with pytest.raises(NotASingleRunUDFProcessGraph): + _ = SingleRunUDFProcessGraph.parse(pg)