Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into issue635-job-manage…
Browse files Browse the repository at this point in the history
…r-initialize
  • Loading branch information
soxofaan committed Oct 7, 2024
2 parents 8233fdf + 08fed55 commit f88d307
Show file tree
Hide file tree
Showing 9 changed files with 274 additions and 40 deletions.
19 changes: 16 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added `DataCube.load_stac()` to also support creating a `load_stac` based cube without a connection ([#638](https://github.com/Open-EO/openeo-python-client/issues/638))

### Changed

### Removed

### Fixed

- When using `DataCube.load_collection()` without a connection, it is not necessary anymore to also explicitly set `fetch_metadata=False` ([#638](https://github.com/Open-EO/openeo-python-client/issues/638))


## [0.32.0] - 2024-09-27

### Added

- `load_stac`/`metadata_from_stac`: add support for extracting actual temporal dimension metadata ([#567](https://github.com/Open-EO/openeo-python-client/issues/567))
- `MultiBackendJobManager`: add `cancel_running_job_after` option to automatically cancel jobs that are running for too long ([#590](https://github.com/Open-EO/openeo-python-client/issues/590))
- Added `openeo.api.process.Parameter` helper to easily create a "spatial_extent" UDP parameter
- Wrap OIDC token request failure in more descriptive `OidcException` (related to [#624](https://github.com/Open-EO/openeo-python-client/issues/624))
- Added `auto_add_save_result` option (on by default) to disable automatic addition of `save_result` node on `download`/`create_job`/`execute_batch` ([#513](https://github.com/Open-EO/openeo-python-client/issues/513))
- Add support for `apply_vectorcube` UDF signature in `run_udf_code` ([Open-EO/openeo-geopyspark-driver#881]https://github.com/Open-EO/openeo-geopyspark-driver/issues/811)
- Add support for `apply_vectorcube` UDF signature in `run_udf_code` ([Open-EO/openeo-geopyspark-driver#881](https://github.com/Open-EO/openeo-geopyspark-driver/issues/811))
- `MultiBackendJobManager`: add API to the update loop in a separate thread, allowing controlled interruption.

### Changed
Expand All @@ -29,8 +44,6 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
and implicit `save_result` addition from `download()`/`create_job()` calls with `format`
(related to [#623](https://github.com/Open-EO/openeo-python-client/issues/623), [#401](https://github.com/Open-EO/openeo-python-client/issues/401), [#583](https://github.com/Open-EO/openeo-python-client/issues/583))

### Removed

### Fixed

- `apply_dimension` with a `target_dimension` argument was not correctly adjusting datacube metadata on the client side, causing a mismatch.
Expand Down
2 changes: 1 addition & 1 deletion openeo/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.32.0a2"
__version__ = "0.33.0a1"
6 changes: 3 additions & 3 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def run_loop():
sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0
and not self._stop_thread
):
self._job_update_loop(df, job_db, start_job)
self._job_update_loop(job_db=job_db, start_job=start_job)

# Do sequence of micro-sleeps to allow for quick thread exit
for _ in range(int(max(1, self.poll_sleep))):
Expand Down Expand Up @@ -462,10 +462,10 @@ def run_jobs(
job_db.initialize_from_df(df)

while sum(job_db.count_by_status(statuses=["not_started", "created", "queued", "running"]).values()) > 0:
self._job_update_loop(df, job_db, start_job)
self._job_update_loop(job_db=job_db, start_job=start_job)
time.sleep(self.poll_sleep)

def _job_update_loop(self, df, job_db, start_job):
def _job_update_loop(self, job_db: JobDatabaseInterface, start_job: Callable[[], BatchJob]):
"""
Inner loop logic of job management:
go through the necessary jobs to check for status updates,
Expand Down
4 changes: 3 additions & 1 deletion openeo/rest/_datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,10 @@ class _ProcessGraphAbstraction(_FromNodeMixin, FlatGraphableMixin):
raster data cubes, vector cubes, ML models, ...
"""

def __init__(self, pgnode: PGNode, connection: Connection):
def __init__(self, pgnode: PGNode, connection: Union[Connection, None]):
self._pg = pgnode
# TODO: now that connection can officially be None:
# improve exceptions in cases where is it still assumed to be a real connection (download, create_job, ...)
self._connection = connection

def __str__(self):
Expand Down
35 changes: 13 additions & 22 deletions openeo/rest/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
CollectionMetadata,
SpatialDimension,
TemporalDimension,
metadata_from_stac,
)
from openeo.rest import (
DEFAULT_DOWNLOAD_CHUNK_SIZE,
Expand Down Expand Up @@ -785,7 +784,7 @@ def authenticate_oidc(
print("Authenticated using device code flow.")
return con

def authenticate_oidc_access_token(self, access_token: str, provider_id: Optional[str] = None) -> None:
def authenticate_oidc_access_token(self, access_token: str, provider_id: Optional[str] = None) -> Connection:
"""
Set up authorization headers directly with an OIDC access token.
Expand All @@ -800,10 +799,14 @@ def authenticate_oidc_access_token(self, access_token: str, provider_id: Optiona
against the backend's list of providers to avoid and related OIDC configuration
.. versionadded:: 0.31.0
.. versionchanged:: 0.33.0
Return connection object to support chaining.
"""
provider_id, _ = self._get_oidc_provider(provider_id=provider_id, parse_info=False)
self.auth = OidcBearerAuth(provider_id=provider_id, access_token=access_token)
self._oidc_auth_renewer = None
return self

def request(
self,
Expand Down Expand Up @@ -1411,26 +1414,14 @@ def load_stac(
Argument ``temporal_extent``: add support for year/month shorthand notation
as discussed at :ref:`date-shorthand-handling`.
"""
# TODO #425 move this implementation to `DataCube` and just forward here (like with `load_collection`)
# TODO #425 detect actual metadata from URL
arguments = {"url": url}
# TODO #425 more normalization/validation of extent/band parameters
if spatial_extent:
arguments["spatial_extent"] = spatial_extent
if temporal_extent:
arguments["temporal_extent"] = DataCube._get_temporal_extent(extent=temporal_extent)
if bands:
arguments["bands"] = bands
if properties:
arguments["properties"] = {
prop: build_child_callback(pred, parent_parameters=["value"]) for prop, pred in properties.items()
}
cube = self.datacube_from_process(process_id="load_stac", **arguments)
try:
cube.metadata = metadata_from_stac(url)
except Exception:
_log.warning(f"Failed to extract cube metadata from STAC URL {url}", exc_info=True)
return cube
return DataCube.load_stac(
url=url,
spatial_extent=spatial_extent,
temporal_extent=temporal_extent,
bands=bands,
properties=properties,
connection=self,
)

def load_stac_from_job(
self,
Expand Down
146 changes: 138 additions & 8 deletions openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
CollectionMetadata,
SpatialDimension,
TemporalDimension,
metadata_from_stac,
)
from openeo.processes import ProcessBuilder
from openeo.rest import BandMathException, OpenEoClientException, OperatorException
Expand Down Expand Up @@ -84,7 +85,7 @@ class DataCube(_ProcessGraphAbstraction):
# TODO: set this based on back-end or user preference?
_DEFAULT_RASTER_FORMAT = "GTiff"

def __init__(self, graph: PGNode, connection: Connection, metadata: Optional[CollectionMetadata] = None):
def __init__(self, graph: PGNode, connection: Optional[Connection], metadata: Optional[CollectionMetadata] = None):
super().__init__(pgnode=graph, connection=connection)
self.metadata: Optional[CollectionMetadata] = metadata

Expand Down Expand Up @@ -137,7 +138,7 @@ def _assert_valid_dimension_name(self, name: str) -> str:
def load_collection(
cls,
collection_id: Union[str, Parameter],
connection: Connection = None,
connection: Optional[Connection] = None,
spatial_extent: Union[Dict[str, float], Parameter, None] = None,
temporal_extent: Union[Sequence[InputDate], Parameter, str, None] = None,
bands: Union[None, List[str], Parameter] = None,
Expand All @@ -151,7 +152,8 @@ def load_collection(
Create a new Raster Data cube.
:param collection_id: image collection identifier
:param connection: The connection to use to connect with the backend.
:param connection: The backend connection to use.
Can be ``None`` to work without connection and collection metadata.
:param spatial_extent: limit data to specified bounding box or polygons
:param temporal_extent: limit data to specified temporal interval.
Typically, just a two-item list or tuple containing start and end date.
Expand Down Expand Up @@ -190,7 +192,7 @@ def load_collection(
if isinstance(collection_id, Parameter):
fetch_metadata = False
metadata: Optional[CollectionMetadata] = (
connection.collection_metadata(collection_id) if fetch_metadata else None
connection.collection_metadata(collection_id) if connection and fetch_metadata else None
)
if bands:
if isinstance(bands, str):
Expand Down Expand Up @@ -259,6 +261,133 @@ def load_disk_collection(cls, connection: Connection, file_format: str, glob_pat
)
return cls(graph=pg, connection=connection)

@classmethod
def load_stac(
cls,
url: str,
spatial_extent: Union[Dict[str, float], Parameter, None] = None,
temporal_extent: Union[Sequence[InputDate], Parameter, str, None] = None,
bands: Optional[List[str]] = None,
properties: Optional[Dict[str, Union[str, PGNode, Callable]]] = None,
connection: Optional[Connection] = None,
) -> DataCube:
"""
Loads data from a static STAC catalog or a STAC API Collection and returns the data as a processable :py:class:`DataCube`.
A batch job result can be loaded by providing a reference to it.
If supported by the underlying metadata and file format, the data that is added to the data cube can be
restricted with the parameters ``spatial_extent``, ``temporal_extent`` and ``bands``.
If no data is available for the given extents, a ``NoDataAvailable`` error is thrown.
Remarks:
* The bands (and all dimensions that specify nominal dimension labels) are expected to be ordered as
specified in the metadata if the ``bands`` parameter is set to ``null``.
* If no additional parameter is specified this would imply that the whole data set is expected to be loaded.
Due to the large size of many data sets, this is not recommended and may be optimized by back-ends to only
load the data that is actually required after evaluating subsequent processes such as filters.
This means that the values should be processed only after the data has been limited to the required extent
and as a consequence also to a manageable size.
:param url: The URL to a static STAC catalog (STAC Item, STAC Collection, or STAC Catalog)
or a specific STAC API Collection that allows to filter items and to download assets.
This includes batch job results, which itself are compliant to STAC.
For external URLs, authentication details such as API keys or tokens may need to be included in the URL.
Batch job results can be specified in two ways:
- For Batch job results at the same back-end, a URL pointing to the corresponding batch job results
endpoint should be provided. The URL usually ends with ``/jobs/{id}/results`` and ``{id}``
is the corresponding batch job ID.
- For external results, a signed URL must be provided. Not all back-ends support signed URLs,
which are provided as a link with the link relation `canonical` in the batch job result metadata.
:param spatial_extent:
Limits the data to load to the specified bounding box or polygons.
For raster data, the process loads the pixel into the data cube if the point at the pixel center intersects
with the bounding box or any of the polygons (as defined in the Simple Features standard by the OGC).
For vector data, the process loads the geometry into the data cube if the geometry is fully within the
bounding box or any of the polygons (as defined in the Simple Features standard by the OGC).
Empty geometries may only be in the data cube if no spatial extent has been provided.
The GeoJSON can be one of the following feature types:
* A ``Polygon`` or ``MultiPolygon`` geometry,
* a ``Feature`` with a ``Polygon`` or ``MultiPolygon`` geometry, or
* a ``FeatureCollection`` containing at least one ``Feature`` with ``Polygon`` or ``MultiPolygon`` geometries.
Set this parameter to ``None`` to set no limit for the spatial extent.
Be careful with this when loading large datasets. It is recommended to use this parameter instead of
using ``filter_bbox()`` or ``filter_spatial()`` directly after loading unbounded data.
:param temporal_extent:
Limits the data to load to the specified left-closed temporal interval.
Applies to all temporal dimensions.
The interval has to be specified as an array with exactly two elements:
1. The first element is the start of the temporal interval.
The specified instance in time is **included** in the interval.
2. The second element is the end of the temporal interval.
The specified instance in time is **excluded** from the interval.
The second element must always be greater/later than the first element.
Otherwise, a `TemporalExtentEmpty` exception is thrown.
Also supports open intervals by setting one of the boundaries to ``None``, but never both.
Set this parameter to ``None`` to set no limit for the temporal extent.
Be careful with this when loading large datasets. It is recommended to use this parameter instead of
using ``filter_temporal()`` directly after loading unbounded data.
:param bands:
Only adds the specified bands into the data cube so that bands that don't match the list
of band names are not available. Applies to all dimensions of type `bands`.
Either the unique band name (metadata field ``name`` in bands) or one of the common band names
(metadata field ``common_name`` in bands) can be specified.
If the unique band name and the common name conflict, the unique band name has a higher priority.
The order of the specified array defines the order of the bands in the data cube.
If multiple bands match a common name, all matched bands are included in the original order.
It is recommended to use this parameter instead of using ``filter_bands()`` directly after loading unbounded data.
:param properties:
Limits the data by metadata properties to include only data in the data cube which
all given conditions return ``True`` for (AND operation).
Specify key-value-pairs with the key being the name of the metadata property,
which can be retrieved with the openEO Data Discovery for Collections.
The value must be a condition (user-defined process) to be evaluated against a STAC API.
This parameter is not supported for static STAC.
:param connection: The connection to use to connect with the backend.
.. versionadded:: 0.33.0
"""
arguments = {"url": url}
# TODO #425 more normalization/validation of extent/band parameters
if spatial_extent:
arguments["spatial_extent"] = spatial_extent
if temporal_extent:
arguments["temporal_extent"] = DataCube._get_temporal_extent(extent=temporal_extent)
if bands:
arguments["bands"] = bands
if properties:
arguments["properties"] = {
prop: build_child_callback(pred, parent_parameters=["value"]) for prop, pred in properties.items()
}
graph = PGNode("load_stac", arguments=arguments)
try:
metadata = metadata_from_stac(url)
except Exception:
log.warning(f"Failed to extract cube metadata from STAC URL {url}", exc_info=True)
metadata = None
return cls(graph=graph, connection=connection, metadata=metadata)

@classmethod
def _get_temporal_extent(
cls,
Expand Down Expand Up @@ -2082,10 +2211,11 @@ def save_result(
format: str = _DEFAULT_RASTER_FORMAT,
options: Optional[dict] = None,
) -> DataCube:
formats = set(self._connection.list_output_formats().keys())
# TODO: map format to correct casing too?
if format.lower() not in {f.lower() for f in formats}:
raise ValueError("Invalid format {f!r}. Should be one of {s}".format(f=format, s=formats))
if self._connection:
formats = set(self._connection.list_output_formats().keys())
# TODO: map format to correct casing too?
if format.lower() not in {f.lower() for f in formats}:
raise ValueError("Invalid format {f!r}. Should be one of {s}".format(f=format, s=formats))
return self.process(
process_id="save_result",
arguments={
Expand Down
2 changes: 1 addition & 1 deletion openeo/rest/mlmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class MlModel(_ProcessGraphAbstraction):
.. versionadded:: 0.10.0
"""

def __init__(self, graph: PGNode, connection: Connection):
def __init__(self, graph: PGNode, connection: Union[Connection, None]):
super().__init__(pgnode=graph, connection=connection)

def save_ml_model(self, options: Optional[dict] = None):
Expand Down
2 changes: 1 addition & 1 deletion openeo/rest/vectorcube.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class VectorCube(_ProcessGraphAbstraction):

_DEFAULT_VECTOR_FORMAT = "GeoJSON"

def __init__(self, graph: PGNode, connection: Connection, metadata: Optional[CubeMetadata] = None):
def __init__(self, graph: PGNode, connection: Union[Connection, None], metadata: Optional[CubeMetadata] = None):
super().__init__(pgnode=graph, connection=connection)
self.metadata = metadata

Expand Down
Loading

0 comments on commit f88d307

Please sign in to comment.