Skip to content

Commit

Permalink
Issue #604/#644 refactor out parse_remote_process_definition as stand…
Browse files Browse the repository at this point in the history
…alone utility

Finetune openeo.internal.processes.parse to properly support this
(e.g. leverage named tuples for immutability, less boilerplate and out of the box equality checks)
  • Loading branch information
soxofaan committed Oct 11, 2024
1 parent e4eedd6 commit 607f78e
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 59 deletions.
41 changes: 12 additions & 29 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from requests.adapters import HTTPAdapter, Retry

from openeo import BatchJob, Connection
from openeo.internal.processes.parse import Process, parse_remote_process_definition
from openeo.rest import OpenEoApiError
from openeo.util import deep_get, repr_truncate, rfc3339

Expand Down Expand Up @@ -841,40 +842,22 @@ def __init__(
self._namespace = namespace
self._parameter_defaults = parameter_defaults or {}

def _get_process_definition(self, connection: Connection) -> dict:
def _get_process_definition(self, connection: Connection) -> Process:
if isinstance(self._namespace, str) and re.match("https?://", self._namespace):
# Remote process definition handling
return self._get_remote_process_definition()
elif self._namespace is None:
return connection.user_defined_process(self._process_id).describe()
# Handling of a user-specific UDP
udp_raw = connection.user_defined_process(self._process_id).describe()
return Process.from_dict(udp_raw)
else:
raise NotImplementedError(
f"Unsupported process definition source udp_id={self._process_id!r} namespace={self._namespace!r}"
)

@functools.lru_cache()
def _get_remote_process_definition(self) -> dict:
"""
Get process definition based on "Remote Process Definition Extension" spec
https://github.com/Open-EO/openeo-api/tree/draft/extensions/remote-process-definition
"""
assert isinstance(self._namespace, str) and re.match("https?://", self._namespace)
resp = requests.get(url=self._namespace)
resp.raise_for_status()
data = resp.json()
if isinstance(data, list):
# Handle process listing: filter out right process
processes = [p for p in data if p.get("id") == self._process_id]
if len(processes) != 1:
raise ValueError(f"Process {self._process_id!r} not found at {self._namespace}")
(data,) = processes

# Check for required fields of a process definition
if isinstance(data, dict) and "id" in data and "process_graph" in data:
process_definition = data
else:
raise ValueError(f"Invalid process definition at {self._namespace}")

return process_definition
def _get_remote_process_definition(self) -> Process:
return parse_remote_process_definition(namespace=self._namespace, process_id=self._process_id)

def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob:
"""
Expand All @@ -888,19 +871,19 @@ def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob:
"""

process_definition = self._get_process_definition(connection=connection)
parameters = process_definition.get("parameters", [])
parameters = process_definition.parameters or []
arguments = {}
for parameter in parameters:
name = parameter["name"]
schema = parameter.get("schema", {})
name = parameter.name
schema = parameter.schema
if name in row.index:
# Higherst priority: value from dataframe row
value = row[name]
elif name in self._parameter_defaults:
# Fallback on default values from constructor
value = self._parameter_defaults[name]
else:
if parameter.get("optional", False):
if parameter.optional:
continue
raise ValueError(f"Missing required parameter {name!r} for process {self._process_id!r}")

Expand Down
83 changes: 57 additions & 26 deletions openeo/internal/processes/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@
from __future__ import annotations

import json
import re
import typing
from pathlib import Path
from typing import Iterator, List, Union
from typing import Any, Iterator, List, Optional, Union

import requests


class Schema:
class Schema(typing.NamedTuple):
"""Schema description of an openEO process parameter or return value."""

def __init__(self, schema: Union[dict, list]):
self.schema = schema
schema: Union[dict, list]

@classmethod
def from_dict(cls, data: dict) -> Schema:
Expand All @@ -32,32 +32,31 @@ def is_process_graph(self) -> bool:
)


class Parameter:
"""openEO process parameter"""
_NO_DEFAULT = object()


class Parameter(typing.NamedTuple):
"""openEO process parameter"""
# TODO unify with openeo.api.process.Parameter?

NO_DEFAULT = object()

def __init__(self, name: str, description: str, schema: Schema, default=NO_DEFAULT, optional: bool = False):
self.name = name
self.description = description
self.schema = schema
self.default = default
self.optional = optional
name: str
description: str
schema: Schema
default: Any = _NO_DEFAULT
optional: bool = False

@classmethod
def from_dict(cls, data: dict) -> Parameter:
return cls(
name=data["name"],
description=data["description"],
schema=Schema.from_dict(data["schema"]),
default=data.get("default", cls.NO_DEFAULT),
default=data.get("default", _NO_DEFAULT),
optional=data.get("optional", False),
)

def has_default(self):
return self.default is not self.NO_DEFAULT
return self.default is not _NO_DEFAULT


class Returns:
Expand All @@ -73,24 +72,31 @@ def from_dict(cls, data: dict) -> Returns:


class Process(typing.NamedTuple):
"""An openEO process"""

"""
Container for a opneEO process definition of an openEO process,
covering pre-defined processes, user-defined processes,
remote process definitions, etc.
"""

# Common-denominator-wise only the process id is a required field in a process definition.
# Depending on the context in the openEO API, some other fields (e.g. "process_graph")
# may also be required.
id: str
parameters: List[Parameter]
returns: Returns
description: str = ""
summary: str = ""
parameters: Optional[List[Parameter]] = None
returns: Optional[Returns] = None
description: Optional[str] = None
summary: Optional[str] = None
# TODO: more properties?

@classmethod
def from_dict(cls, data: dict) -> Process:
"""Construct openEO process from dictionary values"""
return cls(
id=data["id"],
parameters=[Parameter.from_dict(d) for d in data["parameters"]],
returns=Returns.from_dict(data["returns"]),
description=data["description"],
summary=data["summary"],
parameters=[Parameter.from_dict(d) for d in data["parameters"]] if "parameters" in data else None,
returns=Returns.from_dict(data["returns"]) if "returns" in data else None,
description=data.get("description"),
summary=data.get("summary"),
)

@classmethod
Expand All @@ -114,3 +120,28 @@ def parse_all_from_dir(path: Union[str, Path], pattern="*.json") -> Iterator[Pro
"""Parse all openEO process files in given directory"""
for p in sorted(Path(path).glob(pattern)):
yield Process.from_json_file(p)


def parse_remote_process_definition(namespace: str, process_id: Optional[str] = None) -> Process:
"""
Parse a process definition as defined by the "Remote Process Definition Extension" spec
https://github.com/Open-EO/openeo-api/tree/draft/extensions/remote-process-definition
"""
if not re.match("https?://", namespace):
raise ValueError(f"Expected absolute URL, but got {namespace!r}")

resp = requests.get(url=namespace)
resp.raise_for_status()
data = resp.json()
assert isinstance(data, dict)

if "id" not in data and "processes" in data and isinstance(data["processes"], list):
# Handle process listing: filter out right process
if not isinstance(process_id, str):
raise ValueError(f"Working with process listing, but got invalid process id {process_id!r}")
processes = [p for p in data["processes"] if p.get("id") == process_id]
if len(processes) != 1:
raise LookupError(f"Process {process_id!r} not found in process listing {namespace!r}")
(data,) = processes

return Process.from_dict(data)
10 changes: 8 additions & 2 deletions tests/extra/test_job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -786,8 +786,14 @@ def remote_process_definitions(self, requests_mock):
json={
"id": "increment",
"parameters": [
{"name": "data", "schema": {"type": "number"}},
{"name": "increment", "schema": {"type": "number"}, "optional": True, "default": 1},
{"name": "data", "description": "data", "schema": {"type": "number"}},
{
"name": "increment",
"description": "increment",
"schema": {"type": "number"},
"optional": True,
"default": 1,
},
],
"process_graph": {
"process_id": "add",
Expand Down
112 changes: 110 additions & 2 deletions tests/internal/processes/test_parse.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,27 @@
from openeo.internal.processes.parse import Parameter, Process, Returns, Schema
import pytest

from openeo.internal.processes.parse import (
_NO_DEFAULT,
Parameter,
Process,
Returns,
Schema,
parse_remote_process_definition,
)


def test_schema():
s = Schema.from_dict({"type": "number"})
assert s.schema == {"type": "number"}


def test_schema_equality():
assert Schema({"type": "number"}) == Schema({"type": "number"})
assert Schema({"type": "number"}) == Schema.from_dict({"type": "number"})

assert Schema({"type": "number"}) != Schema({"type": "string"})


def test_parameter():
p = Parameter.from_dict({
"name": "foo",
Expand All @@ -15,7 +31,7 @@ def test_parameter():
assert p.name == "foo"
assert p.description == "Foo amount"
assert p.schema.schema == {"type": "number"}
assert p.default is Parameter.NO_DEFAULT
assert p.default is _NO_DEFAULT
assert p.optional is False


Expand All @@ -39,6 +55,14 @@ def test_parameter_default_none():
assert p.default is None


def test_parameter_equality():
p1 = Parameter.from_dict({"name": "foo", "description": "Foo", "schema": {"type": "number"}})
p2 = Parameter.from_dict({"name": "foo", "description": "Foo", "schema": {"type": "number"}})
p3 = Parameter.from_dict({"name": "foo", "description": "Foo", "schema": {"type": "string"}})
assert p1 == p2
assert p1 != p3


def test_returns():
r = Returns.from_dict({
"description": "Roo",
Expand Down Expand Up @@ -98,3 +122,87 @@ def test_process_from_json():
assert p.parameters[0].schema.schema == {"type": ["number", "null"]}
assert p.returns.description == "The computed absolute value."
assert p.returns.schema.schema == {"type": ["number", "null"], "minimum": 0}


def test_parse_remote_process_definition_minimal(requests_mock):
url = "https://example.com/ndvi.json"
requests_mock.get(url, json={"id": "ndvi"})
process = parse_remote_process_definition(url)
assert process.id == "ndvi"
assert process.parameters is None
assert process.returns is None
assert process.description is None
assert process.summary is None


def test_parse_remote_process_definition_parameters(requests_mock):
url = "https://example.com/ndvi.json"
requests_mock.get(
url,
json={
"id": "ndvi",
"parameters": [
{"name": "incr", "description": "Increment", "schema": {"type": "number"}},
{"name": "scales", "description": "Scales", "default": [1, 1], "schema": {"type": "number"}},
],
},
)
process = parse_remote_process_definition(url)
assert process.id == "ndvi"
assert process.parameters == [
Parameter(name="incr", description="Increment", schema=Schema({"type": "number"})),
Parameter(name="scales", description="Scales", default=[1, 1], schema=Schema({"type": "number"})),
]
assert process.returns is None
assert process.description is None
assert process.summary is None


def test_parse_remote_process_definition_listing(requests_mock):
url = "https://example.com/processes.json"
requests_mock.get(
url,
json={
"processes": [
{
"id": "ndvi",
"parameters": [{"name": "incr", "description": "Incr", "schema": {"type": "number"}}],
},
{
"id": "scale",
"parameters": [
{"name": "factor", "description": "Factor", "default": 1, "schema": {"type": "number"}}
],
},
],
"links": [],
},
)

# No process id given
with pytest.raises(ValueError, match="Working with process listing, but got invalid process id None"):
parse_remote_process_definition(url)

# Process id not found
with pytest.raises(LookupError, match="Process 'mehblargh' not found in process listing"):
parse_remote_process_definition(url, process_id="mehblargh")

# Valid proces id
process = parse_remote_process_definition(url, process_id="ndvi")
assert process.id == "ndvi"
assert process.parameters == [
Parameter(name="incr", description="Incr", schema=Schema({"type": "number"})),
]
assert process.returns is None
assert process.description is None
assert process.summary is None

# Another proces id
process = parse_remote_process_definition(url, process_id="scale")
assert process.id == "scale"
assert process.parameters == [
Parameter(name="factor", description="Factor", default=1, schema=Schema({"type": "number"})),
]
assert process.returns is None
assert process.description is None
assert process.summary is None

0 comments on commit 607f78e

Please sign in to comment.