diff --git a/lib/ingestor-api/runtime/src/collection.py b/lib/ingestor-api/runtime/src/collection.py index 9a72e35..d6f11fa 100644 --- a/lib/ingestor-api/runtime/src/collection.py +++ b/lib/ingestor-api/runtime/src/collection.py @@ -3,9 +3,9 @@ from pypgstac.db import PgstacDB from pypgstac.load import Methods +from .loader import Loader from .schemas import StacCollection from .utils import get_db_credentials -from .loader import Loader def ingest(collection: StacCollection): diff --git a/lib/ingestor-api/runtime/src/ingestor.py b/lib/ingestor-api/runtime/src/ingestor.py index a89caaa..c2025d4 100644 --- a/lib/ingestor-api/runtime/src/ingestor.py +++ b/lib/ingestor-api/runtime/src/ingestor.py @@ -23,7 +23,7 @@ def get_queued_ingestions(records: List["DynamodbRecord"]) -> Iterator[Ingestion k: deserializer.deserialize(v) for k, v in record["dynamodb"]["NewImage"].items() } - ingestion = Ingestion.construct(**parsed) + ingestion = Ingestion.parse_obj(parsed) if ingestion.status == Status.queued: yield ingestion diff --git a/lib/ingestor-api/runtime/src/loader.py b/lib/ingestor-api/runtime/src/loader.py index b2012e5..0871bd1 100644 --- a/lib/ingestor-api/runtime/src/loader.py +++ b/lib/ingestor-api/runtime/src/loader.py @@ -27,7 +27,7 @@ def update_collection_summaries(self, collection_id: str) -> None: ) ) cur.execute( - "SELECT dashboard.update_collection_default_summaries(%s)", + "SELECT dashboard.update_default_summaries(%s)", [collection_id], ) logger.info("Updating bbox for collection: {}.".format(collection_id)) diff --git a/lib/ingestor-api/runtime/src/schemas.py b/lib/ingestor-api/runtime/src/schemas.py index 6732145..79db560 100644 --- a/lib/ingestor-api/runtime/src/schemas.py +++ b/lib/ingestor-api/runtime/src/schemas.py @@ -3,12 +3,19 @@ import enum import json from datetime import datetime -from decimal import Decimal -from typing import TYPE_CHECKING, Dict, List, Optional +from typing import TYPE_CHECKING, Dict, List, Optional, Union from urllib.parse import urlparse +from fastapi.encoders import jsonable_encoder from fastapi.exceptions import RequestValidationError -from pydantic import BaseModel, PositiveInt, dataclasses, error_wrappers, validator +from pydantic import ( + BaseModel, + Json, + PositiveInt, + dataclasses, + error_wrappers, + validator, +) from stac_pydantic import Collection, Item, shared from . import validators @@ -23,7 +30,7 @@ def is_accessible(cls, href): url = urlparse(href) if url.scheme in ["https", "http"]: - validators.url_is_accessible(href) + validators.url_is_accessible(href=href) elif url.scheme in ["s3"]: validators.s3_object_is_accessible( bucket=url.hostname, key=url.path.lstrip("/") @@ -64,7 +71,7 @@ class Ingestion(BaseModel): created_at: datetime = None updated_at: datetime = None - item: Item + item: Union[Item, Json[Item]] @validator("created_at", pre=True, always=True, allow_reuse=True) @validator("updated_at", pre=True, always=True, allow_reuse=True) @@ -84,9 +91,16 @@ def save(self, db: "services.Database"): db.write(self) return self - def dynamodb_dict(self, by_alias=True): + def dynamodb_dict(self): """DynamoDB-friendly serialization""" - return json.loads(self.json(by_alias=by_alias), parse_float=Decimal) + # convert to dictionary + output = self.dict(exclude={"item"}) + + # add STAC item as string + output["item"] = self.item.json() + + # make JSON-friendly (will be able to do with Pydantic V2, https://github.com/pydantic/pydantic/issues/1409#issuecomment-1423995424) + return jsonable_encoder(output) @dataclasses.dataclass diff --git a/lib/ingestor-api/runtime/src/utils.py b/lib/ingestor-api/runtime/src/utils.py index bcec2e7..1da2b57 100644 --- a/lib/ingestor-api/runtime/src/utils.py +++ b/lib/ingestor-api/runtime/src/utils.py @@ -1,15 +1,13 @@ -import decimal -import json -from typing import Any, Dict, Sequence +from typing import Sequence import boto3 -import orjson import pydantic from pypgstac.db import PgstacDB from pypgstac.load import Methods +from fastapi.encoders import jsonable_encoder -from .schemas import Ingestion from .loader import Loader +from .schemas import Ingestion class DbCreds(pydantic.BaseModel): @@ -36,26 +34,6 @@ def get_db_credentials(secret_arn: str) -> DbCreds: return DbCreds.parse_raw(response["SecretString"]) -def convert_decimals_to_float(item: Dict[str, Any]) -> Dict[str, Any]: - """ - DynamoDB stores floats as Decimals. We want to convert them back to floats - before inserting them into pgSTAC to avoid any issues when the records are - converted to JSON by pgSTAC. - """ - - def decimal_to_float(obj): - if isinstance(obj, decimal.Decimal): - return float(obj) - raise TypeError - - return json.loads( - orjson.dumps( - item, - default=decimal_to_float, - ) - ) - - def load_items(creds: DbCreds, ingestions: Sequence[Ingestion]): """ Bulk insert STAC records into pgSTAC. @@ -63,13 +41,8 @@ def load_items(creds: DbCreds, ingestions: Sequence[Ingestion]): with PgstacDB(dsn=creds.dsn_string, debug=True) as db: loader = Loader(db=db) - items = [ - # NOTE: Important to deserialize values to convert decimals to floats - convert_decimals_to_float(i.item) - for i in ingestions - ] - - print(f"Ingesting {len(items)} items") + # serialize to JSON-friendly dicts (won't be necessary in Pydantic v2, https://github.com/pydantic/pydantic/issues/1409#issuecomment-1423995424) + items = jsonable_encoder(i.item for i in ingestions) loading_result = loader.load_items( file=items, # use insert_ignore to avoid overwritting existing items or upsert to replace diff --git a/lib/ingestor-api/runtime/tests/conftest.py b/lib/ingestor-api/runtime/tests/conftest.py index c58a757..f2d357e 100644 --- a/lib/ingestor-api/runtime/tests/conftest.py +++ b/lib/ingestor-api/runtime/tests/conftest.py @@ -20,6 +20,7 @@ def test_environ(): os.environ["JWKS_URL"] = "https://test-jwks.url" os.environ["STAC_URL"] = "https://test-stac.url" os.environ["DATA_ACCESS_ROLE"] = "arn:aws:iam::123456789012:role/test-role" + os.environ["DB_SECRET_ARN"] = "testing" @pytest.fixture @@ -101,7 +102,10 @@ def example_stac_item(): ] ], }, - "properties": {"datetime": "2020-12-11T22:38:32.125000Z"}, + "properties": { + "datetime": "2020-12-11T22:38:32.125000Z", + "eo:cloud_cover": 1, + }, "collection": "simple-collection", "links": [ { @@ -125,13 +129,13 @@ def example_stac_item(): ], "assets": { "visual": { - "href": "https://storage.googleapis.com/open-cogs/stac-examples/20201211_223832_CS2.tif", # noqa + "href": "https://TEST_API.com/open-cogs/stac-examples/20201211_223832_CS2.tif", # noqa "type": "image/tiff; application=geotiff; profile=cloud-optimized", "title": "3-Band Visual", "roles": ["visual"], }, "thumbnail": { - "href": "https://storage.googleapis.com/open-cogs/stac-examples/20201211_223832_CS2.jpg", # noqa + "href": "https://TEST_API.com/open-cogs/stac-examples/20201211_223832_CS2.jpg", # noqa "title": "Thumbnail", "type": "image/jpeg", "roles": ["thumbnail"], @@ -244,10 +248,7 @@ def client_authenticated(app): """ from src.dependencies import get_username - def skip_auth(): - pass - - app.dependency_overrides[get_username] = skip_auth + app.dependency_overrides[get_username] = lambda: 'test_user' return TestClient(app) diff --git a/lib/ingestor-api/runtime/tests/test_ingestor.py b/lib/ingestor-api/runtime/tests/test_ingestor.py new file mode 100644 index 0000000..72e393b --- /dev/null +++ b/lib/ingestor-api/runtime/tests/test_ingestor.py @@ -0,0 +1,60 @@ +from unittest.mock import patch + +import pytest + + +@pytest.fixture() +def dynamodb_stream_event(): + return {"Records": None} + + +@pytest.fixture() +def get_queued_ingestions(example_ingestion): + with patch( + "src.ingestor.get_queued_ingestions", + return_value=iter([example_ingestion]), + autospec=True, + ) as m: + yield m + + +@pytest.fixture() +def get_db_credentials(): + with patch("src.ingestor.get_db_credentials", return_value="", autospec=True) as m: + yield m + + +@pytest.fixture() +def load_items(): + with patch("src.ingestor.load_items", return_value=0, autospec=True) as m: + yield m + + +@pytest.fixture() +def get_table(mock_table): + with patch("src.ingestor.get_table", return_value=mock_table, autospec=True) as m: + yield m + + +def test_handler( + monkeypatch, + test_environ, + dynamodb_stream_event, + example_ingestion, + get_queued_ingestions, + get_db_credentials, + load_items, + get_table, + mock_table, +): + import src.ingestor as ingestor + + ingestor.handler(dynamodb_stream_event, {}) + load_items.assert_called_once_with( + creds="", + ingestions=list([example_ingestion]), + ) + response = mock_table.get_item( + Key={"created_by": example_ingestion.created_by, "id": example_ingestion.id} + ) + assert response["Item"]["status"] == "succeeded" diff --git a/lib/ingestor-api/runtime/tests/test_registration.py b/lib/ingestor-api/runtime/tests/test_registration.py index d748128..55721de 100644 --- a/lib/ingestor-api/runtime/tests/test_registration.py +++ b/lib/ingestor-api/runtime/tests/test_registration.py @@ -2,9 +2,12 @@ import json from datetime import timedelta from typing import TYPE_CHECKING, List +from unittest.mock import call, patch +from fastapi.encoders import jsonable_encoder import pytest + if TYPE_CHECKING: from fastapi.testclient import TestClient from src import schemas, services @@ -12,6 +15,121 @@ ingestion_endpoint = "/ingestions" +@pytest.fixture() +def collection_exists(): + with patch("src.validators.collection_exists", return_value=True) as m: + yield m + + +@pytest.fixture() +def collection_missing(): + def bad_collection(collection_id: str): + raise ValueError("MOCKED MISSING COLLECTION ERROR") + + with patch("src.validators.collection_exists", side_effect=bad_collection) as m: + yield m + + +@pytest.fixture() +def asset_exists(): + with patch("src.validators.url_is_accessible", return_value=True) as m: + yield m + + +@pytest.fixture() +def asset_missing(): + def bad_asset_url(href: str): + raise ValueError("MOCKED INACCESSIBLE URL ERROR") + + with patch("src.validators.url_is_accessible", side_effect=bad_asset_url) as m: + yield m + + +class TestCreate: + @pytest.fixture(autouse=True) + def setup( + self, + api_client: "TestClient", + mock_table: "services.Table", + example_ingestion: "schemas.Ingestion", + ): + from src import services + + self.api_client = api_client + self.mock_table = mock_table + self.db = services.Database(self.mock_table) + self.example_ingestion = example_ingestion + + def test_unauthenticated_create(self): + response = self.api_client.post( + ingestion_endpoint, + json=jsonable_encoder(self.example_ingestion.item), + ) + + assert response.status_code == 403 + + def test_create(self, client_authenticated, collection_exists, asset_exists): + response = self.api_client.post( + ingestion_endpoint, + json=jsonable_encoder(self.example_ingestion.item), + ) + + assert response.status_code == 201 + assert collection_exists.called_once_with( + self.example_ingestion.item.collection + ) + + stored_data = self.db.fetch_many(status="queued")["items"] + assert len(stored_data) == 1 + assert json.loads(stored_data[0].json(by_alias=True)) == response.json() + + def test_validates_missing_collection( + self, client_authenticated, collection_missing, asset_exists + ): + response = self.api_client.post( + ingestion_endpoint, + json=jsonable_encoder(self.example_ingestion.item), + ) + + collection_missing.assert_called_once_with( + collection_id=self.example_ingestion.item.collection + ) + assert response.status_code == 422, "should get validation error" + assert ( + len(self.db.fetch_many(status="queued")["items"]) == 0 + ), "data should not be stored in DB" + + def test_validates_missing_assets( + self, client_authenticated, collection_exists, asset_missing + ): + response = self.api_client.post( + ingestion_endpoint, + json=jsonable_encoder(self.example_ingestion.item), + ) + + collection_exists.assert_called_once_with( + collection_id=self.example_ingestion.item.collection + ) + asset_missing.assert_has_calls( + [ + call(href=asset.href) + for asset in self.example_ingestion.item.assets.values() + ], + any_order=True, + ) + assert response.status_code == 422, "should get validation error" + for asset_type in self.example_ingestion.item.assets.keys(): + assert any( + [ + err["loc"] == ["body", "assets", asset_type, "href"] + for err in response.json()["detail"] + ] + ), "should reference asset type in validation error response" + assert ( + len(self.db.fetch_many(status="queued")["items"]) == 0 + ), "data should not be stored in DB" + + class TestList: @pytest.fixture(autouse=True) def setup( @@ -36,11 +154,11 @@ def populate_table(self, count=100) -> List["schemas.Ingestion"]: def test_simple_lookup(self): self.mock_table.put_item(Item=self.example_ingestion.dynamodb_dict()) - + ingestion = jsonable_encoder(self.example_ingestion) response = self.api_client.get(ingestion_endpoint) assert response.status_code == 200 assert response.json() == { - "items": [json.loads(self.example_ingestion.json(by_alias=True))], + "items": [ingestion], "next": None, } @@ -57,10 +175,7 @@ def test_next_response(self): response = self.api_client.get(ingestion_endpoint, params={"limit": limit}) assert response.status_code == 200 assert json.loads(base64.b64decode(response.json()["next"])) == expected_next - assert response.json()["items"] == [ - json.loads(ingestion.json(by_alias=True)) - for ingestion in example_ingestions[:limit] - ] + assert response.json()["items"] == jsonable_encoder(example_ingestions[:limit]) @pytest.mark.skip(reason="Test is currently broken") def test_get_next_page(self): diff --git a/lib/ingestor-api/runtime/tests/test_utils.py b/lib/ingestor-api/runtime/tests/test_utils.py new file mode 100644 index 0000000..f03bb1d --- /dev/null +++ b/lib/ingestor-api/runtime/tests/test_utils.py @@ -0,0 +1,35 @@ +from unittest.mock import Mock, patch + +import pytest +from pypgstac.load import Methods +from fastapi.encoders import jsonable_encoder +from src.utils import DbCreds + + +@pytest.fixture() +def loader(): + with patch("src.utils.Loader", autospec=True) as m: + yield m + + +@pytest.fixture() +def pgstacdb(): + with patch("src.utils.PgstacDB", autospec=True) as m: + m.return_value.__enter__.return_value = Mock() + yield m + + +@pytest.fixture() +def dbcreds(): + dbcreds = DbCreds(username="", password="", host="", port=1, dbname="", engine="") + return dbcreds + + +def test_load_items(loader, pgstacdb, example_ingestion, dbcreds): + import src.utils as utils + + utils.load_items(dbcreds, list([example_ingestion])) + loader.return_value.load_items.assert_called_once_with( + file=jsonable_encoder([example_ingestion.item]), + insert_mode=Methods.upsert, + )