Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestor-api)!: store STAC item as string in DynamoDB #26

Merged
merged 8 commits into from
Mar 28, 2023
2 changes: 1 addition & 1 deletion lib/ingestor-api/runtime/src/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
from pypgstac.db import PgstacDB
from pypgstac.load import Methods

from .loader import Loader
from .schemas import StacCollection
from .utils import get_db_credentials
from .loader import Loader


def ingest(collection: StacCollection):
Expand Down
2 changes: 1 addition & 1 deletion lib/ingestor-api/runtime/src/ingestor.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def get_queued_ingestions(records: List["DynamodbRecord"]) -> Iterator[Ingestion
k: deserializer.deserialize(v)
for k, v in record["dynamodb"]["NewImage"].items()
}
ingestion = Ingestion.construct(**parsed)
ingestion = Ingestion.parse_obj(parsed)
if ingestion.status == Status.queued:
yield ingestion

Expand Down
2 changes: 1 addition & 1 deletion lib/ingestor-api/runtime/src/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def update_collection_summaries(self, collection_id: str) -> None:
)
)
cur.execute(
"SELECT dashboard.update_collection_default_summaries(%s)",
"SELECT dashboard.update_default_summaries(%s)",
[collection_id],
)
logger.info("Updating bbox for collection: {}.".format(collection_id))
Expand Down
28 changes: 21 additions & 7 deletions lib/ingestor-api/runtime/src/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,19 @@
import enum
import json
from datetime import datetime
from decimal import Decimal
from typing import TYPE_CHECKING, Dict, List, Optional
from typing import TYPE_CHECKING, Dict, List, Optional, Union
from urllib.parse import urlparse

from fastapi.encoders import jsonable_encoder
from fastapi.exceptions import RequestValidationError
from pydantic import BaseModel, PositiveInt, dataclasses, error_wrappers, validator
from pydantic import (
BaseModel,
Json,
PositiveInt,
dataclasses,
error_wrappers,
validator,
)
from stac_pydantic import Collection, Item, shared

from . import validators
Expand All @@ -23,7 +30,7 @@ def is_accessible(cls, href):
url = urlparse(href)

if url.scheme in ["https", "http"]:
validators.url_is_accessible(href)
validators.url_is_accessible(href=href)
elif url.scheme in ["s3"]:
validators.s3_object_is_accessible(
bucket=url.hostname, key=url.path.lstrip("/")
Expand Down Expand Up @@ -64,7 +71,7 @@ class Ingestion(BaseModel):
created_at: datetime = None
updated_at: datetime = None

item: Item
item: Union[Item, Json[Item]]

@validator("created_at", pre=True, always=True, allow_reuse=True)
@validator("updated_at", pre=True, always=True, allow_reuse=True)
Expand All @@ -84,9 +91,16 @@ def save(self, db: "services.Database"):
db.write(self)
return self

def dynamodb_dict(self, by_alias=True):
def dynamodb_dict(self):
"""DynamoDB-friendly serialization"""
return json.loads(self.json(by_alias=by_alias), parse_float=Decimal)
# convert to dictionary
output = self.dict(exclude={"item"})

# add STAC item as string
output["item"] = self.item.json()

# make JSON-friendly (will be able to do with Pydantic V2, https://github.com/pydantic/pydantic/issues/1409#issuecomment-1423995424)
return jsonable_encoder(output)


@dataclasses.dataclass
Expand Down
37 changes: 5 additions & 32 deletions lib/ingestor-api/runtime/src/utils.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import decimal
import json
from typing import Any, Dict, Sequence
from typing import Sequence

import boto3
import orjson
import pydantic
from pypgstac.db import PgstacDB
from pypgstac.load import Methods
from fastapi.encoders import jsonable_encoder

from .schemas import Ingestion
from .loader import Loader
from .schemas import Ingestion


class DbCreds(pydantic.BaseModel):
Expand All @@ -36,40 +34,15 @@ def get_db_credentials(secret_arn: str) -> DbCreds:
return DbCreds.parse_raw(response["SecretString"])


def convert_decimals_to_float(item: Dict[str, Any]) -> Dict[str, Any]:
"""
DynamoDB stores floats as Decimals. We want to convert them back to floats
before inserting them into pgSTAC to avoid any issues when the records are
converted to JSON by pgSTAC.
"""

def decimal_to_float(obj):
if isinstance(obj, decimal.Decimal):
return float(obj)
raise TypeError

return json.loads(
orjson.dumps(
item,
default=decimal_to_float,
)
)


def load_items(creds: DbCreds, ingestions: Sequence[Ingestion]):
"""
Bulk insert STAC records into pgSTAC.
"""
with PgstacDB(dsn=creds.dsn_string, debug=True) as db:
loader = Loader(db=db)

items = [
# NOTE: Important to deserialize values to convert decimals to floats
convert_decimals_to_float(i.item)
for i in ingestions
]

print(f"Ingesting {len(items)} items")
# serialize to JSON-friendly dicts (won't be necessary in Pydantic v2, https://github.com/pydantic/pydantic/issues/1409#issuecomment-1423995424)
items = jsonable_encoder(i.item for i in ingestions)
loading_result = loader.load_items(
file=items,
# use insert_ignore to avoid overwritting existing items or upsert to replace
Expand Down
15 changes: 8 additions & 7 deletions lib/ingestor-api/runtime/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def test_environ():
os.environ["JWKS_URL"] = "https://test-jwks.url"
os.environ["STAC_URL"] = "https://test-stac.url"
os.environ["DATA_ACCESS_ROLE"] = "arn:aws:iam::123456789012:role/test-role"
os.environ["DB_SECRET_ARN"] = "testing"


@pytest.fixture
Expand Down Expand Up @@ -101,7 +102,10 @@ def example_stac_item():
]
],
},
"properties": {"datetime": "2020-12-11T22:38:32.125000Z"},
"properties": {
"datetime": "2020-12-11T22:38:32.125000Z",
"eo:cloud_cover": 1,
},
"collection": "simple-collection",
"links": [
{
Expand All @@ -125,13 +129,13 @@ def example_stac_item():
],
"assets": {
"visual": {
"href": "https://storage.googleapis.com/open-cogs/stac-examples/20201211_223832_CS2.tif", # noqa
"href": "https://TEST_API.com/open-cogs/stac-examples/20201211_223832_CS2.tif", # noqa
"type": "image/tiff; application=geotiff; profile=cloud-optimized",
"title": "3-Band Visual",
"roles": ["visual"],
},
"thumbnail": {
"href": "https://storage.googleapis.com/open-cogs/stac-examples/20201211_223832_CS2.jpg", # noqa
"href": "https://TEST_API.com/open-cogs/stac-examples/20201211_223832_CS2.jpg", # noqa
"title": "Thumbnail",
"type": "image/jpeg",
"roles": ["thumbnail"],
Expand Down Expand Up @@ -244,10 +248,7 @@ def client_authenticated(app):
"""
from src.dependencies import get_username

def skip_auth():
pass

app.dependency_overrides[get_username] = skip_auth
app.dependency_overrides[get_username] = lambda: 'test_user'
return TestClient(app)


Expand Down
60 changes: 60 additions & 0 deletions lib/ingestor-api/runtime/tests/test_ingestor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from unittest.mock import patch

import pytest


@pytest.fixture()
def dynamodb_stream_event():
return {"Records": None}


@pytest.fixture()
def get_queued_ingestions(example_ingestion):
with patch(
"src.ingestor.get_queued_ingestions",
return_value=iter([example_ingestion]),
autospec=True,
) as m:
yield m


@pytest.fixture()
def get_db_credentials():
with patch("src.ingestor.get_db_credentials", return_value="", autospec=True) as m:
yield m


@pytest.fixture()
def load_items():
with patch("src.ingestor.load_items", return_value=0, autospec=True) as m:
yield m


@pytest.fixture()
def get_table(mock_table):
with patch("src.ingestor.get_table", return_value=mock_table, autospec=True) as m:
yield m


def test_handler(
monkeypatch,
test_environ,
dynamodb_stream_event,
example_ingestion,
get_queued_ingestions,
get_db_credentials,
load_items,
get_table,
mock_table,
):
import src.ingestor as ingestor

ingestor.handler(dynamodb_stream_event, {})
load_items.assert_called_once_with(
creds="",
ingestions=list([example_ingestion]),
)
response = mock_table.get_item(
Key={"created_by": example_ingestion.created_by, "id": example_ingestion.id}
)
assert response["Item"]["status"] == "succeeded"
Loading