Skip to content

Commit

Permalink
Merge pull request #202 from ASFHyP3/develop
Browse files Browse the repository at this point in the history
Release v0.5.9 -- performance improvements
  • Loading branch information
jhkennedy authored Nov 25, 2024
2 parents 196bddb + 169885a commit 1e4d51a
Show file tree
Hide file tree
Showing 17 changed files with 304 additions and 68 deletions.
17 changes: 3 additions & 14 deletions .github/actions/deploy/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@ description: Deploy the ITS_LIVE Monitoring system
inputs:
STACK_NAME:
required: true
AWS_ACCESS_KEY_ID:
required: true
AWS_SECRET_ACCESS_KEY:
required: true
CF_TEMPLATE_BUCKET:
required: true
LANDSAT_TOPIC_ARN:
Expand All @@ -16,6 +12,8 @@ inputs:
required: true
HYP3_API:
required: true
HYP3_JOBS_TABLE:
required: true
LAMBDA_LOGGING_LEVEL:
required: true
EARTHDATA_USERNAME:
Expand All @@ -30,16 +28,6 @@ inputs:
runs:
using: composite
steps:
- uses: aws-actions/configure-aws-credentials@v1
with:
aws-access-key-id: ${{ inputs.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ inputs.AWS_SECRET_ACCESS_KEY }}
aws-region: us-west-2

- uses: actions/setup-python@v3
with:
python-version: 3.12

- name: Install
shell: bash
run: make install-lambda-deps
Expand All @@ -60,6 +48,7 @@ runs:
LandsatTopicArn=${{ inputs.LANDSAT_TOPIC_ARN }} \
Sentinel2TopicArn=${{ inputs.SENTINEL2_TOPIC_ARN }} \
Hyp3Api=${{ inputs.HYP3_API }} \
Hyp3JobsTable=${{ inputs.HYP3_JOBS_TABLE }} \
LambdaLoggingLevel=${{ inputs.LAMBDA_LOGGING_LEVEL }} \
EarthdataUsername=${{ inputs.EARTHDATA_USERNAME }} \
EarthdataPassword=${{ inputs.EARTHDATA_PASSWORD }} \
Expand Down
14 changes: 12 additions & 2 deletions .github/workflows/deploy-prod.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,27 @@ jobs:

steps:
- uses: actions/checkout@v4

- uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-west-2

- uses: actions/setup-python@v5
with:
python-version: 3.12

- uses: ./.github/actions/deploy
with:
STACK_NAME: its-live-monitoring-prod
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
EARTHDATA_USERNAME: ${{ secrets.EARTHDATA_USERNAME }}
EARTHDATA_PASSWORD: ${{ secrets.EARTHDATA_PASSWORD }}
CF_TEMPLATE_BUCKET: cf-templates-3o5lnspmwmzg-us-west-2
LANDSAT_TOPIC_ARN: arn:aws:sns:us-west-2:673253540267:public-c2-notify-v2
SENTINEL2_TOPIC_ARN: arn:aws:sns:eu-west-1:214830741341:NewSentinel2Product
HYP3_API: https://hyp3-its-live.asf.alaska.edu
HYP3_JOBS_TABLE: hyp3-its-live-JobsTable-178MIZ4ZXKL69
LAMBDA_LOGGING_LEVEL: INFO
PUBLISH_BUCKET: its-live-data
MATTERMOST_PAT: ${{ secrets.MATTERMOST_PAT }}
16 changes: 13 additions & 3 deletions .github/workflows/deploy-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,27 @@ jobs:

steps:
- uses: actions/checkout@v4

- uses: aws-actions/configure-aws-credentials@v4
with:
aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
aws-region: us-west-2

- uses: actions/setup-python@v5
with:
python-version: 3.12

- uses: ./.github/actions/deploy
with:
STACK_NAME: its-live-monitoring-test
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
EARTHDATA_USERNAME: ${{ secrets.EARTHDATA_USERNAME }}
EARTHDATA_PASSWORD: ${{ secrets.EARTHDATA_PASSWORD }}
CF_TEMPLATE_BUCKET: cf-templates-3o5lnspmwmzg-us-west-2
LANDSAT_TOPIC_ARN: arn:aws:sns:us-west-2:986442313181:its-live-notify-landsat-test
SENTINEL2_TOPIC_ARN: arn:aws:sns:eu-west-1:986442313181:its-live-notify-sentinel2-test
HYP3_API: https://hyp3-its-live.asf.alaska.edu
HYP3_API: https://hyp3-its-live-test.asf.alaska.edu
HYP3_JOBS_TABLE: hyp3-its-live-test-JobsTable-1FT990PLG5K45
LAMBDA_LOGGING_LEVEL: INFO
PUBLISH_BUCKET: its-live-data-test
MATTERMOST_PAT: ${{ secrets.MATTERMOST_PAT }}
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.5.9]
### Changed
- Improved deduplication performance by searching HyP3's dynamodb directly for `PENDING` and `RUNNING` jobs, instead of using the `hyp3_sdk`.

### Fixed
- Mixed STAC Item datetime formats (e.g., occasionally not including microseconds) in the list of secondary scenes no longer causes a ValueError to be raised.

## [0.5.8]
### Changed
- As an incremental improvement to deduplication performance, its-live-monitoring now:
Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ install-lambda-deps:

test_file ?= 'tests/'
test:
export $$(xargs < tests/cfg.env); \
pytest $(test_file)

landsat-integration:
Expand Down
27 changes: 20 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,23 +66,36 @@ aws sns publish \
--message file://${MESSAGE_FILE}
```

where `TOPIC_ARN` is the ARN of the test topic and `MESSAGE_FILE` is the path to a file containing the contents of the message you want published. Example message contents are provided in these files in the [`tests/integration`](tests/integration) directory:
where `TOPIC_ARN` is the ARN of the test topic and `MESSAGE_FILE` is the path to a file containing the contents of the message you want published. Example message contents are provided in these files in the [`tests/integration`](tests/integration) directory, two of which are described here:
* [`landsat-l8-valid.json`](tests/integration/landsat-l8-valid.json) - A message containing a Landsat 9 scene over ice that *should* be processed.
* [`landsat-l9-wrong-tier.json`](tests/integration/landsat-l9-wrong-tier.json) - A message containing a Landsat 9 scene *not* over ice that should be *filtered out* and *not* processed.

To submit the Landsat integration test payloads to the default Landsat test SNS topic, run:
To submit **all** the integration test payloads to the default test SNS topics, run:
```shell
make integration
```

>[!IMPORTANT]
> The integration tests will submit jobs to `hyp3-its-live-test`, which will publish products to `s3://its-live-data-test`. Notably `s3://its-live-data-test` has a lifecycle rule which will delete all products after 14 days. So to test deduplication of HyP3 and S3, you'll need to:
> 1. disable `hyp3-its-live-test`'s compute environment or start execution manager
> 2. submit the integration tests and see jobs submitted
> 3. submit the integration tests again to see _all_ jobs deduplicate with the hung jobs from the previous step
> 4. re-enable the compute environment or start execution manager and wait for all jobs to finish
> 5. once all jobs are finished, submit the integration tests again to see jobs deduplicate against the products in `s3://its-live-data-test`
>
> That means, fully testing of its-live-monitoring requires _at least_ 3 rounds of integration testing!


To submit _just_ the Landsat integration test payloads to the default Landsat test SNS topic, run:
```shell
make landsat-integration
```
Likewise, to submit the Sentinel-2 integration test payloads to the default Sentinel-2 test SNS topic, run:
Likewise, to submit _just_ the Sentinel-2 integration test payloads to the default Sentinel-2 test SNS topic, run:
```shell
make Sentinel2-integration
```

To submit **all** the integration test payloads to the default test SNS topic, run:
```shell
make integration
```
or, you can submit to an alternative SNS topic like:
```shell
LANDSAT_TOPIC_ARN=foobar make landsat-integration
Expand Down
14 changes: 14 additions & 0 deletions cloudformation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ Parameters:
Hyp3Api:
Type: String

Hyp3JobsTable:
Type: String

LambdaLoggingLevel:
Type: String
Default: INFO
Expand Down Expand Up @@ -90,6 +93,7 @@ Resources:
Environment:
Variables:
HYP3_API: !Ref Hyp3Api
JOBS_TABLE_NAME: !Ref Hyp3JobsTable
LOGGING_LEVEL: !Ref LambdaLoggingLevel
EARTHDATA_USERNAME: !Ref EarthdataUsername
EARTHDATA_PASSWORD: !Ref EarthdataPassword
Expand Down Expand Up @@ -143,6 +147,16 @@ Resources:
- Effect: Allow
Action: sqs:*
Resource: !GetAtt Queue.Arn
- Effect: Allow
Action: dynamodb:Query
Resource: !Sub "arn:aws:dynamodb:${AWS::Region}:${AWS::AccountId}:table/${Hyp3JobsTable}*"
- Effect: Allow
Action:
- s3:List*
- s3:Get*
Resource:
- !Sub "arn:aws:s3:::${PublishBucket}/*"
- !Sub "arn:aws:s3:::${PublishBucket}"

StatusMessages:
Type: AWS::CloudFormation::Stack
Expand Down
2 changes: 1 addition & 1 deletion its_live_monitoring/src/landsat.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,6 @@ def get_landsat_pairs_for_reference_scene(
features.append(feature)

df = gpd.GeoDataFrame.from_features(features)
df['datetime'] = pd.to_datetime(df.datetime)
df['datetime'] = pd.to_datetime(df.datetime, format='ISO8601')

return df
84 changes: 62 additions & 22 deletions its_live_monitoring/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
import logging
import os
import sys
from typing import Iterable
from datetime import datetime, timezone

import boto3
import botocore.config
import geopandas as gpd
import hyp3_sdk as sdk
import numpy as np
import pandas as pd
from boto3.dynamodb.conditions import Attr, Key

from landsat import (
get_landsat_pairs_for_reference_scene,
Expand All @@ -39,10 +39,8 @@
log = logging.getLogger('its_live_monitoring')
log.setLevel(os.environ.get('LOGGING_LEVEL', 'INFO'))

s3 = boto3.client(
's3',
config=botocore.config.Config(signature_version=botocore.UNSIGNED),
)
s3 = boto3.client('s3')
dynamo = boto3.resource('dynamodb')


def point_to_region(lat: float, lon: float) -> str:
Expand All @@ -67,7 +65,7 @@ def regions_from_bounds(min_lon: float, min_lat: float, max_lon: float, max_lat:
return {point_to_region(lat, lon) for lat, lon in zip(lats.ravel(), lons.ravel())}


def get_key(tile_prefixes: Iterable[str], reference: str, secondary: str) -> str | None:
def get_key(tile_prefixes: list[str], reference: str, secondary: str) -> str | None:
"""Search S3 for the key of a processed pair.
Args:
Expand All @@ -85,7 +83,7 @@ def get_key(tile_prefixes: Iterable[str], reference: str, secondary: str) -> str
for tile_prefix in tile_prefixes:
prefix = f'{tile_prefix}/{reference}_X_{secondary}'
response = s3.list_objects_v2(
Bucket='its-live-data',
Bucket=os.environ.get('PUBLISH_BUCKET', 'its-live-data'),
Prefix=prefix,
)
for item in response.get('Contents', []):
Expand Down Expand Up @@ -119,6 +117,57 @@ def deduplicate_s3_pairs(pairs: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
return pairs.drop(index=drop_indexes)


def format_time(time: datetime) -> str:
"""Format time to ISO with UTC timezone.
Args:
time: a datetime object to format
Returns:
datetime: the UTC time in ISO format
"""
if time.tzinfo is None:
raise ValueError(f'missing tzinfo for datetime {time}')
utc_time = time.astimezone(timezone.utc)
return utc_time.isoformat(timespec='seconds')


def query_jobs_by_status_code(status_code: str, user: str, name: str, start: datetime) -> sdk.Batch:
"""Query dynamodb for jobs by status_code, then filter by user, name, and date.
Args:
status_code: `status_code` of the desired jobs
user: the `user_id` that submitted the jobs
name: the name of the jobs
start: the earliest submission date of the jobs
Returns:
sdk.Batch: batch of jobs matching the filters
"""
table = dynamo.Table(os.environ['JOBS_TABLE_NAME'])

key_expression = Key('status_code').eq(status_code)

filter_expression = Attr('user_id').eq(user) & Attr('name').eq(name) & Attr('request_time').gte(format_time(start))

params = {
'IndexName': 'status_code',
'KeyConditionExpression': key_expression,
'FilterExpression': filter_expression,
'ScanIndexForward': False,
}

jobs = []
while True:
response = table.query(**params)
jobs.extend(response['Items'])
if (next_key := response.get('LastEvaluatedKey')) is None:
break
params['ExclusiveStartKey'] = next_key

return sdk.Batch([sdk.Job.from_dict(job) for job in jobs])


def deduplicate_hyp3_pairs(pairs: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Search HyP3 jobs since the reference scene's acquisition date and remove already submitted (in PENDING or RUNNING state) pairs.
Expand All @@ -129,22 +178,12 @@ def deduplicate_hyp3_pairs(pairs: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
Returns:
The pairs GeoDataFrame with any already submitted pairs removed.
"""
pending_jobs = HYP3.find_jobs(
job_type='AUTORIFT',
start=pairs.iloc[0].reference_acquisition,
name=pairs.iloc[0].reference,
user_id=EARTHDATA_USERNAME,
status_code='PENDING',
pending_jobs = query_jobs_by_status_code(
'PENDING', EARTHDATA_USERNAME, pairs.iloc[0].reference, pairs.iloc[0].reference_acquisition
)

running_jobs = HYP3.find_jobs(
job_type='AUTORIFT',
start=pairs.iloc[0].reference_acquisition,
name=pairs.iloc[0].reference,
user_id=EARTHDATA_USERNAME,
status_code='RUNNING',
running_jobs = query_jobs_by_status_code(
'RUNNING', EARTHDATA_USERNAME, pairs.iloc[0].reference, pairs.iloc[0].reference_acquisition
)

jobs = pending_jobs + running_jobs

df = pd.DataFrame([job.job_parameters['granules'] for job in jobs], columns=['reference', 'secondary'])
Expand Down Expand Up @@ -219,6 +258,7 @@ def process_scene(
with pd.option_context('display.max_rows', None, 'display.max_columns', None, 'display.width', None):
log.debug(pairs.sort_values(by=['secondary'], ascending=False).loc[:, ['reference', 'secondary']])

if len(pairs) > 0:
pairs = deduplicate_s3_pairs(pairs)

log.info(f'Deduplicated already published pairs; {len(pairs)} remaining')
Expand Down
4 changes: 2 additions & 2 deletions its_live_monitoring/src/sentinel2.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def get_sentinel2_stac_item(scene: str) -> pystac.Item:
items = [item for page in results.pages() for item in page]
if (n_items := len(items)) != 1:
raise ValueError(
f'{n_items} for {scene} found in Sentinel-2 STAC collection: '
f'{n_items} items for {scene} found in Sentinel-2 STAC collection: '
f'{SENTINEL2_CATALOG_API}/collections/{SENTINEL2_COLLECTION_NAME}'
)
item = items[0]
Expand Down Expand Up @@ -234,6 +234,6 @@ def get_sentinel2_pairs_for_reference_scene(
features.append(feature)

df = gpd.GeoDataFrame.from_features(features)
df['datetime'] = pd.to_datetime(df.datetime)
df['datetime'] = pd.to_datetime(df.datetime, format='ISO8601')

return df
5 changes: 3 additions & 2 deletions requirements-all.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
-r requirements-its_live_monitoring.txt
-r requirements-status-messages.txt
cfn-lint==1.19.0
ruff==0.7.4
cfn-lint==1.20.0
ruff==0.8.0
pytest==8.3.3
responses==0.25.3
moto==5.0.21
1 change: 1 addition & 0 deletions requirements-its_live_monitoring.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ pystac-client==0.8.5
requests==2.32.3
shapely==2.0.6
numpy==2.1.3
boto3==1.35.68
6 changes: 6 additions & 0 deletions tests/cfg.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
JOBS_TABLE_NAME=job-db-table
AWS_DEFAULT_REGION=us-west-2
AWS_ACCESS_KEY_ID=testing
AWS_SECRET_ACCESS_KEY=testing
AWS_SECURITY_TOKEN=testing
AWS_SESSION_TOKEN=testing
Loading

0 comments on commit 1e4d51a

Please sign in to comment.