Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release v0.5.8 #184

Merged
merged 50 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
5c41087
Bump ruff from 0.6.9 to 0.7.1
dependabot[bot] Oct 24, 2024
0491f78
Merge pull request #174 from ASFHyP3/dependabot/pip/ruff-0.7.1
jtherrmann Oct 25, 2024
fa04eb1
Bump pystac-client from 0.8.4 to 0.8.5
dependabot[bot] Oct 28, 2024
dba9a6f
Bump cfn-lint from 1.18.0 to 1.18.1
dependabot[bot] Oct 28, 2024
2557e3c
Merge pull request #176 from ASFHyP3/dependabot/pip/cfn-lint-1.18.1
jtherrmann Oct 29, 2024
1a75a37
Merge pull request #175 from ASFHyP3/dependabot/pip/pystac-client-0.8.5
jtherrmann Oct 29, 2024
359292c
Bump cfn-lint from 1.18.1 to 1.18.3
dependabot[bot] Nov 4, 2024
1aede92
Merge pull request #177 from ASFHyP3/dependabot/pip/cfn-lint-1.18.3
jtherrmann Nov 4, 2024
255fdc3
Bump ruff from 0.7.1 to 0.7.2
dependabot[bot] Nov 4, 2024
dab5122
deduplication with s3
AndrewPlayer3 Nov 4, 2024
ddd691a
ruff
AndrewPlayer3 Nov 4, 2024
24e65a1
updated changelog
AndrewPlayer3 Nov 4, 2024
fcc1d29
Merge pull request #178 from ASFHyP3/dependabot/pip/ruff-0.7.2
jtherrmann Nov 4, 2024
b32d0ce
add reference geometry to pairs gdf
AndrewPlayer3 Nov 4, 2024
7f395e4
dont search every prefix
AndrewPlayer3 Nov 4, 2024
d91eae3
remove ref geom since sec is all we need
AndrewPlayer3 Nov 4, 2024
b964191
remove shapely polygon import
AndrewPlayer3 Nov 4, 2024
f2f9a5c
Batch uses + instead of extend
AndrewPlayer3 Nov 4, 2024
81c4375
ruff
AndrewPlayer3 Nov 4, 2024
d97f0bc
add numpy to requirements
AndrewPlayer3 Nov 4, 2024
1281b86
one liner
AndrewPlayer3 Nov 4, 2024
e691c52
add tests for region functions and s3 search; simplify selecting regi…
jhkennedy Nov 5, 2024
54e87be
ruff formatting
jhkennedy Nov 5, 2024
25b52af
add dedup s3 test
AndrewPlayer3 Nov 6, 2024
c845cc9
reset indexes not needed
AndrewPlayer3 Nov 6, 2024
3deb7ac
ruff
AndrewPlayer3 Nov 6, 2024
9a03565
refactor tests to use patch decorator
jhkennedy Nov 7, 2024
286eb2a
more verbose deduplication
jhkennedy Nov 7, 2024
02cc6b1
re-order changelog
jhkennedy Nov 7, 2024
bab883c
Merge pull request #179 from ASFHyP3/s3_deduplication
AndrewPlayer3 Nov 7, 2024
2b75e51
Bump numpy from 1.26.4 to 2.1.3
dependabot[bot] Nov 11, 2024
efce387
Bump cfn-lint from 1.18.3 to 1.18.4
dependabot[bot] Nov 11, 2024
c43fd76
Merge pull request #182 from ASFHyP3/dependabot/pip/cfn-lint-1.18.4
jtherrmann Nov 12, 2024
28969f0
Bump ruff from 0.7.2 to 0.7.3
dependabot[bot] Nov 12, 2024
b7555c1
Merge pull request #180 from ASFHyP3/dependabot/pip/ruff-0.7.3
jtherrmann Nov 12, 2024
5c80ebb
Update CHANGELOG.md
jtherrmann Nov 15, 2024
afeab5a
Merge branch 'develop' into dependabot/pip/numpy-2.1.3
jtherrmann Nov 15, 2024
9601fa8
Merge pull request #181 from ASFHyP3/dependabot/pip/numpy-2.1.3
jtherrmann Nov 15, 2024
42e7685
fix changelog
jtherrmann Nov 15, 2024
b026d6e
Merge pull request #183 from ASFHyP3/fix-changelog
jtherrmann Nov 16, 2024
f833454
Bump cfn-lint from 1.18.4 to 1.19.0
dependabot[bot] Nov 18, 2024
0db8fd5
Merge pull request #186 from ASFHyP3/dependabot/pip/cfn-lint-1.19.0
jtherrmann Nov 19, 2024
d5088e6
Bump ruff from 0.7.3 to 0.7.4
dependabot[bot] Nov 19, 2024
f649a8a
Merge pull request #185 from ASFHyP3/dependabot/pip/ruff-0.7.4
jtherrmann Nov 19, 2024
dac798a
no sign s3 read request
jtherrmann Nov 19, 2024
1ba07c3
Merge pull request #187 from ASFHyP3/anonymous-s3-read
jtherrmann Nov 19, 2024
59d5cd4
Reword changelog to make changes more clear
jhkennedy Nov 19, 2024
1756570
Update CHANGELOG.md
jhkennedy Nov 19, 2024
4ae5384
Update CHANGELOG.md
jhkennedy Nov 19, 2024
fa8bcb2
Merge pull request #188 from ASFHyP3/changelog-for-release
jtherrmann Nov 19, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.5.8]
### Changed
- As an incremental improvement to deduplication performance, its-live-monitoring now:
- searches the `s3://its-live-data` bucket directly for already published (succeeded) pairs.
- searches HyP3 ITS_LIVE via the API for pairs still pending or running, instead of searching for all previously submitted pairs.
- Upgrade numpy from 1.26.4 to 2.1.3

## [0.5.7]
### Fixed
Expand Down
108 changes: 103 additions & 5 deletions its_live_monitoring/src/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
import logging
import os
import sys
from typing import Iterable

import boto3
import botocore.config
import geopandas as gpd
import hyp3_sdk as sdk
import numpy as np
import pandas as pd

from landsat import (
Expand Down Expand Up @@ -35,11 +39,88 @@
log = logging.getLogger('its_live_monitoring')
log.setLevel(os.environ.get('LOGGING_LEVEL', 'INFO'))

s3 = boto3.client(
's3',
config=botocore.config.Config(signature_version=botocore.UNSIGNED),
)

def deduplicate_hyp3_pairs(pairs: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Ensure we don't submit duplicate jobs to HyP3.

Search HyP3 jobs since the reference scene's acquisition date and remove already processed pairs
def point_to_region(lat: float, lon: float) -> str:
"""Returns a string (for example, N78W124) of a region name based on granule center point lat,lon."""
nw_hemisphere = 'S' if np.signbit(lat) else 'N'
ew_hemisphere = 'W' if np.signbit(lon) else 'E'

region_lat = int(np.abs(np.fix(lat / 10) * 10))
if region_lat == 90: # if you are exactly at a pole, put in lat = 80 bin
region_lat = 80

region_lon = int(np.abs(np.fix(lon / 10) * 10))
if region_lon >= 180: # if you are at the dateline, back off to the 170 bin
region_lon = 170

return f'{nw_hemisphere}{region_lat:02d}{ew_hemisphere}{region_lon:03d}'


def regions_from_bounds(min_lon: float, min_lat: float, max_lon: float, max_lat: float) -> set[str]:
"""Returns a set of all region names within a bounding box."""
lats, lons = np.mgrid[min_lat : max_lat + 10 : 10, min_lon : max_lon + 10 : 10]
return {point_to_region(lat, lon) for lat, lon in zip(lats.ravel(), lons.ravel())}


def get_key(tile_prefixes: Iterable[str], reference: str, secondary: str) -> str | None:
"""Search S3 for the key of a processed pair.

Args:
tile_prefixes: s3 tile path prefixes
reference: reference scene name
secondary: secondary scene name

Returns:
The key or None if one wasn't found.
"""
# NOTE: hyp3-autorift enforces earliest scene as the reference scene and will write files accordingly,
# but its-live-monitoring uses the latest scene as the reference scene, so enforce autorift convention
reference, secondary = sorted([reference, secondary])

for tile_prefix in tile_prefixes:
prefix = f'{tile_prefix}/{reference}_X_{secondary}'
response = s3.list_objects_v2(
Bucket='its-live-data',
Prefix=prefix,
)
for item in response.get('Contents', []):
if item['Key'].endswith('.nc'):
return item['Key']
return None


def deduplicate_s3_pairs(pairs: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Ensures that pairs aren't submitted if they already have a product in S3.

Args:
pairs: A GeoDataFrame containing *at least* these columns: `reference`, `reference_acquisition`, and
`secondary`.

Returns:
The pairs GeoDataFrame with any already submitted pairs removed.
"""
s2_prefix = 'velocity_image_pair/sentinel2/v02'
landsat_prefix = 'velocity_image_pair/landsatOLI/v02'
prefix = s2_prefix if pairs['reference'][0].startswith('S2') else landsat_prefix

regions = regions_from_bounds(*pairs['geometry'].total_bounds)
tile_prefixes = [f'{prefix}/{region}' for region in regions]

drop_indexes = []
for idx, reference, secondary in pairs[['reference', 'secondary']].itertuples():
if get_key(tile_prefixes=tile_prefixes, reference=reference, secondary=secondary):
drop_indexes.append(idx)

return pairs.drop(index=drop_indexes)


def deduplicate_hyp3_pairs(pairs: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Search HyP3 jobs since the reference scene's acquisition date and remove already submitted (in PENDING or RUNNING state) pairs.

Args:
pairs: A GeoDataFrame containing *at least* these columns: `reference`, `reference_acquisition`, and
Expand All @@ -48,13 +129,24 @@ def deduplicate_hyp3_pairs(pairs: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
Returns:
The pairs GeoDataFrame with any already submitted pairs removed.
"""
jobs = HYP3.find_jobs(
pending_jobs = HYP3.find_jobs(
job_type='AUTORIFT',
start=pairs.iloc[0].reference_acquisition,
name=pairs.iloc[0].reference,
user_id=EARTHDATA_USERNAME,
status_code='PENDING',
)

running_jobs = HYP3.find_jobs(
job_type='AUTORIFT',
start=pairs.iloc[0].reference_acquisition,
name=pairs.iloc[0].reference,
user_id=EARTHDATA_USERNAME,
status_code='RUNNING',
)

jobs = pending_jobs + running_jobs

df = pd.DataFrame([job.job_parameters['granules'] for job in jobs], columns=['reference', 'secondary'])
df = df.set_index(['reference', 'secondary'])
pairs = pairs.set_index(['reference', 'secondary'])
Expand Down Expand Up @@ -123,7 +215,13 @@ def process_scene(
if len(pairs) > 0:
pairs = deduplicate_hyp3_pairs(pairs)

log.info(f'Deduplicated pairs; {len(pairs)} remaining')
log.info(f'Deduplicated HyP3 running/pending pairs; {len(pairs)} remaining')
with pd.option_context('display.max_rows', None, 'display.max_columns', None, 'display.width', None):
log.debug(pairs.sort_values(by=['secondary'], ascending=False).loc[:, ['reference', 'secondary']])

pairs = deduplicate_s3_pairs(pairs)

log.info(f'Deduplicated already published pairs; {len(pairs)} remaining')
with pd.option_context('display.max_rows', None, 'display.max_columns', None, 'display.width', None):
log.debug(pairs.sort_values(by=['secondary'], ascending=False).loc[:, ['reference', 'secondary']])

Expand Down
4 changes: 2 additions & 2 deletions requirements-all.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
-r requirements-its_live_monitoring.txt
-r requirements-status-messages.txt
cfn-lint==1.18.0
ruff==0.6.9
cfn-lint==1.19.0
ruff==0.7.4
pytest==8.3.3
responses==0.25.3
3 changes: 2 additions & 1 deletion requirements-its_live_monitoring.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
geopandas==1.0.1
hyp3-sdk==7.0.1
pandas==2.2.3
pystac-client==0.8.4
pystac-client==0.8.5
requests==2.32.3
shapely==2.0.6
numpy==2.1.3
12 changes: 12 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ def create_pystac_item(
return create_pystac_item


@pytest.fixture
def stac_search_factory():
class MockItemSearch:
def __init__(self, items: list[pystac.item.Item]):
self.items = items

def pages(self):
return [self.items]

return MockItemSearch


@pytest.fixture
def hyp3_job_factory():
def create_hyp3_job(granules: list) -> sdk.Job:
Expand Down
29 changes: 15 additions & 14 deletions tests/its_live_monitoring/test_landsat.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from copy import deepcopy
from datetime import datetime
from unittest.mock import MagicMock, patch
from unittest.mock import patch

import landsat


def test_get_landsat_stac_item(pystac_item_factory):
@patch('landsat.LANDSAT_COLLECTION.get_item')
def test_get_landsat_stac_item(mock_landsat_get_item, pystac_item_factory):
scene = 'LC08_L1TP_138041_20240128_20240207_02_T1'
properties = {
'instruments': ['OLI'],
Expand All @@ -18,10 +19,8 @@ def test_get_landsat_stac_item(pystac_item_factory):
collection = 'landsat-c2l1'
expected_item = pystac_item_factory(id=scene, datetime=datetime.now(), properties=properties, collection=collection)

with patch('landsat.LANDSAT_COLLECTION', MagicMock()):
landsat.LANDSAT_COLLECTION.get_item.return_value = expected_item
item = landsat.get_landsat_stac_item(scene)

mock_landsat_get_item.side_effect = [expected_item]
item = landsat.get_landsat_stac_item(scene)
assert item.collection_id == collection
assert item.properties == properties

Expand Down Expand Up @@ -99,7 +98,8 @@ def test_qualifies_for_processing(pystac_item_factory):
assert landsat.qualifies_for_landsat_processing(item)


def test_get_landsat_pairs_for_reference_scene(pystac_item_factory):
@patch('landsat.LANDSAT_CATALOG.search')
def test_get_landsat_pairs_for_reference_scene(mock_landsat_get_item, pystac_item_factory, stac_search_factory):
properties = {
'instruments': ['OLI'],
'landsat:collection_category': 'T1',
Expand Down Expand Up @@ -129,9 +129,8 @@ def test_get_landsat_pairs_for_reference_scene(pystac_item_factory):
pystac_item_factory(id=scene, datetime=date_time, properties=properties, collection=collection)
)

with patch('landsat.LANDSAT_CATALOG', MagicMock()):
landsat.LANDSAT_CATALOG.search().pages.return_value = (sec_items,)
df = landsat.get_landsat_pairs_for_reference_scene(ref_item)
mock_landsat_get_item.side_effect = [stac_search_factory(sec_items)]
df = landsat.get_landsat_pairs_for_reference_scene(ref_item)

assert (df['landsat:wrs_path'] == ref_item.properties['landsat:wrs_path']).all()
assert (df['landsat:wrs_row'] == ref_item.properties['landsat:wrs_row']).all()
Expand All @@ -140,7 +139,10 @@ def test_get_landsat_pairs_for_reference_scene(pystac_item_factory):
assert (df['reference'] == ref_item.id).all()


def test_get_landsat_pairs_for_off_nadir_reference_scene(pystac_item_factory):
@patch('landsat.LANDSAT_CATALOG.search')
def test_get_landsat_pairs_for_off_nadir_reference_scene(
mock_landsat_get_item, pystac_item_factory, stac_search_factory
):
properties = {
'instruments': ['OLI'],
'landsat:collection_category': 'T1',
Expand Down Expand Up @@ -171,9 +173,8 @@ def test_get_landsat_pairs_for_off_nadir_reference_scene(pystac_item_factory):
props['view:off_nadir'] = off_nadir
sec_items.append(pystac_item_factory(id=scene, datetime=date_time, properties=props, collection=collection))

with patch('landsat.LANDSAT_CATALOG', MagicMock()):
landsat.LANDSAT_CATALOG.search().pages.return_value = (sec_items,)
df = landsat.get_landsat_pairs_for_reference_scene(ref_item)
mock_landsat_get_item.side_effect = [stac_search_factory(sec_items)]
df = landsat.get_landsat_pairs_for_reference_scene(ref_item)

assert (df['view:off_nadir'] > 0).all()

Expand Down
Loading
Loading