Skip to content

Commit

Permalink
Merge pull request #31 from /issues/5-integrate-workflows-api
Browse files Browse the repository at this point in the history
Integrate workflows api
  • Loading branch information
slesaad authored Dec 7, 2022
2 parents 6488a5c + 7695e85 commit bd182d6
Show file tree
Hide file tree
Showing 17 changed files with 732 additions and 91 deletions.
220 changes: 220 additions & 0 deletions API_usage.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
# Guide on ingesting and publishing data to the VEDA data store & STAC API

VEDA uses a STAC catalog for data dissemination.
Scientist publish the data to this STAC catalog to make it available to the users.

Follow the guide below to publish datasets to the VEDA STAC catalog.

## Prepare the data

VEDA supports inclusion of cloud optimized GeoTIFFs (COGs) to its data store.

### Creating COGs

1. Make sure the projection system is embedded in the COG
2. Make sure the there's an associated `NoData` value in the COG
3. Make sure that the COG filename contains the datetime associated with the COG in the following format. All the datetime values in the file should be preceded by the `_` underscore character. Some examples are shown below:

#### Single datetime

- Year data: `nightlights_2012.tif`, `nightlights_2012-yearly.tif`
- Month data: `nightlights_201201.tif`, `nightlights_2012-01_monthly.tif`
- Day data: `nightlights_20120101day.tif`, `nightlights_2012-01-01_day.tif`

#### Datetime range

- Year data: `nightlights_2012_2014.tif`, `nightlights_2012_year_2015.tif`
- Month data: `nightlights_201201_201205.tif`, `nightlights_2012-01_month_2012-06_data.tif`
- Day data: `nightlights_20120101day_20121221.tif`, `nightlights_2012-01-01_to_2012-12-31_day.tif`

**Note that the date/datetime value is always preceded by an `_` (underscore).**

## Upload to the VEDA data store

Once you have the COGs, obtain permissions to upload them to the `veda-data-store-staging` bucket.

Upload the data to a sensible location inside the bucket.
Example: `s3://veda-data-store-staging/<collection-name>/`

## Use the VEDA Ingestion API to schedule ingestion/publication of the data

### 1. Obtain credentials from a VEDA team member

Ask a VEDA team member to create credentials (username and password) for VEDA auth.

### 2. Export username and password

```bash
export username="slesa"
export password="xxxx"
```

### 3. Ingestion

#### Get token

```python
# Required imports
import os
import requests

# Pull username and password from environment variables
username = os.environ.get("username")
password = os.environ.get("password")

# base url for the workflows api
# experimental / subject to change in the future
base_url = "https://069xiins3b.execute-api.us-west-2.amazonaws.com/dev"

# endpoint to get the token from
token_url = f"{base_url}/token"

# authentication credentials to be passed to the token_url
body = {
"username": username,
"password": password,
}

# request token
response = requests.post(token_url, data=body)
if not response.ok:
print("something went wrong")

# get token from response
token = response.json().get("AccessToken")

# prepare headers for requests
headers = {
"Authorization": f"Bearer {token}"
}

```

#### Ingest the collection

You'll first need to create a collection for your dataset.
Before you can do that, you'll need metadata about the collection like the spatial and temporal extent, license, etc. See the `body` in the code snippet below.

Then, use the code snippet below to publish the collection.

```python
# url for collection ingestion
collection_ingestion_url = f"{base_url}/collections"

# prepare the body of the request,
# for a collection, it is a valid STAC record for the collection

body = {
"id": "demo-social-vulnerability-index-overall",
"type": "Collection",
"title": "(Demo) Social Vulnerability Index (Overall)",
"description": "Overall Social Vulnerability Index - Percentile ranking",
"stac_version": "1.0.0",
"license": "MIT",
"links": [],
"extent": {
"spatial": {
"bbox": [
[
-178.23333334,
18.908332897999998,
-66.958333785,
71.383332688
]
]
},
"temporal": {
"interval": [
[
"2000-01-01T00:00:00Z",
"2018-01-01T00:00:00Z"
]
]
}
},
"dashboard:is_periodic": False,
"dashboard:time_density": "year",
"item_assets": {
"cog_default": {
"type": "image/tiff; application=geotiff; profile=cloud-optimized",
"roles": [
"data",
"layer"
],
"title": "Default COG Layer",
"description": "Cloud optimized default layer to display on map"
}
}
}

# make the requests with the body and headers
response = requests.post(
collection_ingestion_url,
headers=headers,
json=body
)

# look at the response
if response.ok:
print(response.json())
else:
print("Error")
```

#### Ingest items to a collection

Make sure that the respective collection is already published using the instructions above.
Now you're ready to ingest the items to that collection.

Follow the example below to ingest items to a collection:

```python
# url for workflow execution
workflow_execution_url = f"{base_url}/workflow-executions"

# prepare the body of the request
body = {
"collection": "EPA-annual-emissions_1A_Combustion_Mobile",
"prefix": "EIS/cog/EPA-inventory-2012/annual/",
"bucket": "veda-data-store-staging",
"filename_regex": "^(.*)Combustion_Mobile.tif$",
"discovery": "s3",
"upload": False,
"start_datetime": "2012-01-01T00:00:00Z",
"end_datetime": "2012-12-31T23:59:59Z",
"cogify": False,
}

# make the requests with the body and headers
response = requests.post(
workflow_execution_url,
headers=headers,
json=body
)

# look at the response
if response.ok:
print(response.json())
else:
print("Error")
```

#### Check the status of the execution

```python
# the id of the execution
# should be available in the response of workflow execution request
execution_id = "xxx"

# url for execution status
execution_status_url = f"{workflow_execution_url}/{execution_id}"

# make the request
response = requests.get(
execution_status_url,
headers=headers,
)

if response.ok:
print(response.json())
```
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ This codebase utilizes the [Pydantic SSM Settings](https://github.com/developmen
### Running API

1. Create virtual environment:

```bash
python3 -m venv .venv
source .venv/bin/activate
```

1. Install dependencies:

```
pip install -r api/requirements.txt
```

1. Run API:

```
Expand All @@ -35,6 +39,10 @@ This codebase utilizes the [Pydantic SSM Settings](https://github.com/developmen

_Note:_ If no `.env` file is present, the API will connect to resources in the `dev` deployment via [pydantic-ssm-settings](https://github.com/developmentseed/pydantic-ssm-settings). This requires that your `AWS_PROFILE` be set to the profile associated with the AWS account hosting the `dev` deployment.

# License
This project is licensed under **Apache 2**, see the [LICENSE](LICENSE) file for more details.
## Using the API

Please go through the [API Usage docs](API_usage.md) for a guide on ingesting and publishing data to the VEDA data store & STAC API.

## License

This project is licensed under **Apache 2**, see the [LICENSE](LICENSE) file for more details.
3 changes: 2 additions & 1 deletion api/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ pypgstac==0.6.6
requests>=2.27.1
# Waiting for https://github.com/stac-utils/stac-pydantic/pull/116
stac-pydantic @ git+https://github.com/alukach/stac-pydantic.git@patch-1
ddbcereal==2.1.1
ddbcereal==2.1.1
python-multipart==0.0.5
109 changes: 109 additions & 0 deletions api/src/auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import base64
import hashlib
import hmac
import logging
from typing import Dict
import requests

import boto3

from authlib.jose import JsonWebToken, JsonWebKey, KeySet, JWTClaims, errors
from cachetools import cached, TTLCache
from fastapi import Depends, HTTPException, security

from . import config


logger = logging.getLogger(__name__)

token_scheme = security.HTTPBearer()


def get_settings() -> config.Settings:
from . import main

return main.settings


def get_jwks_url(settings: config.Settings = Depends(get_settings)) -> str:
return settings.jwks_url


@cached(TTLCache(maxsize=1, ttl=3600))
def get_jwks(jwks_url: str = Depends(get_jwks_url)) -> KeySet:
with requests.get(jwks_url) as response:
response.raise_for_status()
return JsonWebKey.import_key_set(response.json())


def decode_token(
token: security.HTTPAuthorizationCredentials = Depends(token_scheme),
jwks: KeySet = Depends(get_jwks),
) -> JWTClaims:
"""
Validate & decode JWT
"""
try:
claims = JsonWebToken(["RS256"]).decode(
s=token.credentials,
key=jwks,
claims_options={
# # Example of validating audience to match expected value
# "aud": {"essential": True, "values": [APP_CLIENT_ID]}
},
)

if "client_id" in claims:
# Insert Cognito's `client_id` into `aud` claim if `aud` claim is unset
claims.setdefault("aud", claims["client_id"])

claims.validate()
return claims
except errors.JoseError: #
logger.exception("Unable to decode token")
raise HTTPException(status_code=403, detail="Bad auth token")


def get_username(claims: security.HTTPBasicCredentials = Depends(decode_token)):
return claims["sub"]


def _get_secret_hash(username: str, client_id: str, client_secret: str) -> str:
# A keyed-hash message authentication code (HMAC) calculated using
# the secret key of a user pool client and username plus the client
# ID in the message.
message = username + client_id
dig = hmac.new(
bytearray(client_secret, "utf-8"),
msg=message.encode("UTF-8"),
digestmod=hashlib.sha256,
).digest()
return base64.b64encode(dig).decode()


def authenticate_and_get_token(
username: str,
password: str,
user_pool_id: str,
app_client_id: str,
app_client_secret: str,
) -> Dict:
client = boto3.client("cognito-idp")
try:
resp = client.admin_initiate_auth(
UserPoolId=user_pool_id,
ClientId=app_client_id,
AuthFlow="ADMIN_USER_PASSWORD_AUTH",
AuthParameters={
"USERNAME": username,
"PASSWORD": password,
"SECRET_HASH": _get_secret_hash(
username, app_client_id, app_client_secret
),
},
)
except client.exceptions.NotAuthorizedException:
return {
"message": "Login failed, please make sure the credentials are correct."
}
return resp["AuthenticationResult"]
10 changes: 10 additions & 0 deletions api/src/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@


AwsArn = constr(regex=r"^arn:aws:iam::\d{12}:role/.+")
AwsStepArn = constr(regex=r"^arn:aws:states:.+:\d{12}:stateMachine:.+")


class Settings(BaseSettings):
Expand All @@ -28,3 +29,12 @@ class Config(AwsSsmSourceConfig):
@classmethod
def from_ssm(cls, stack: str):
return cls(_secrets_dir=f"/{stack}")

data_pipeline_arn: AwsStepArn = Field(
description="ARN of AWS step function used to trigger data ingestion"
)

userpool_id: str = Field(description="The Cognito Userpool used for authentication")

client_id: str = Field(description="The Cognito APP client ID")
client_secret: str = Field(description="The Cognito APP client secret")
Loading

0 comments on commit bd182d6

Please sign in to comment.