Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

issue-193: Add Dynamo DB Table for SWOT Prior Lakes #209

Merged
merged 23 commits into from
Aug 1, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
b935576
add code to handle prior lakes shapefiles, add test prior lake data
torimcd Jul 16, 2024
83f7054
update terraform to add prior lake table
torimcd Jul 16, 2024
04a1f10
fix tests, change to smaller test data file, changelog
torimcd Jul 16, 2024
ed55e27
linting
torimcd Jul 16, 2024
f06d128
reconfigure main load_data method to make more readable and pass linting
torimcd Jul 17, 2024
5fb53d7
lint
torimcd Jul 17, 2024
11732db
lint
torimcd Jul 17, 2024
80e3162
fix string casting to lower storage req & update test responses to ha…
torimcd Jul 18, 2024
76cc6ef
update load benchmarking function for linting and add unit test
torimcd Jul 18, 2024
6334a3c
try parent collection for lakes
torimcd Jul 30, 2024
808d3c6
update version parsing for parent collection
torimcd Jul 30, 2024
d08dbeb
fix case error
torimcd Jul 30, 2024
72e1362
fix lake id reference
torimcd Jul 30, 2024
fda028b
add logging to troubleshoot too large features
torimcd Jul 30, 2024
0febe06
add item size logging and remove error raise for batch write
torimcd Jul 30, 2024
787d7df
clean up logging statements & move numeric_columns assignment
torimcd Aug 1, 2024
87c0b46
Merge branch 'develop' into feature/issue-193
torimcd Aug 1, 2024
cee8b20
update batch logging statement
torimcd Aug 1, 2024
af49235
Merge branch 'develop' into feature/issue-193
frankinspace Aug 1, 2024
be6ea7f
Rename constant
frankinspace Aug 1, 2024
0258aa4
Fix temp dir security risk https://rules.sonarsource.com/python/RSPEC…
frankinspace Aug 1, 2024
49c4c7a
Fix temp dir security risk https://rules.sonarsource.com/python/RSPEC…
frankinspace Aug 1, 2024
7633374
fix code coverage calculation
frankinspace Aug 1, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- Issue 198 - Implement track ingest lambda function CMR and Hydrocron queries
- Issue 193 - Add new Dynamo table for prior lake data
### Changed
### Deprecated
### Removed
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Unpacks SWOT Reach & Node Shapefiles
Unpacks SWOT Shapefiles
"""
import os.path
import json
Expand All @@ -20,7 +20,7 @@

def read_shapefile(filepath, obscure_data, columns, s3_resource=None):
"""
Reads a SWOT River Reach shapefile packaged as a zip
Reads a SWOT shapefile packaged as a zip

Parameters
----------
Expand Down Expand Up @@ -67,16 +67,16 @@ def read_shapefile(filepath, obscure_data, columns, s3_resource=None):
with zipfile.ZipFile(filepath) as archive:
shp_xml_tree = ET.fromstring(archive.read(filename[:-4] + ".shp.xml"))

numeric_columns = shp_file[columns].select_dtypes(include=[np.number]).columns
if obscure_data:
numeric_columns = shp_file[columns].select_dtypes(include=[np.number]).columns

shp_file[numeric_columns] = np.where(
(np.rint(shp_file[numeric_columns]) != -999) &
(np.rint(shp_file[numeric_columns]) != -99999999) &
(np.rint(shp_file[numeric_columns]) != -999999999999),
np.random.default_rng().integers(low=2, high=10)*shp_file[numeric_columns],
shp_file[numeric_columns])

shp_file = shp_file.astype(str)
filename_attrs = parse_from_filename(filename)

xml_attrs = parse_metadata_from_shpxml(shp_xml_tree)
Expand All @@ -92,7 +92,7 @@ def read_shapefile(filepath, obscure_data, columns, s3_resource=None):

def parse_metadata_from_shpxml(xml_elem):
"""
Read the SWORD version number from the shp.xml file
Read the prior database (SWORD or PLD) version number from the shp.xml file
and add to the database fields

Parameters
Expand All @@ -108,14 +108,16 @@ def parse_metadata_from_shpxml(xml_elem):
# get SWORD version
for globs in xml_elem.findall('global_attributes'):
prior_db_files = globs.find('xref_prior_river_db_files').text
metadata_attrs = {'sword_version': prior_db_files[-5:-3]}

metadata_attrs = {
'sword_version': prior_db_files[-5:-3]
}
# get PLD version
for globs in xml_elem.findall('global_metadata'):
prior_db_files = globs.find('xref_prior_lake_db_file').text
metadata_attrs = {'PLD_version': prior_db_files[-10:-7]}

# get units on fields that have them
for child in xml_elem:
if child.tag == 'attributes':
if child.tag in ('attributes', 'attribute_metadata'):
for field in child:
try:
units = field.find('units').text
Expand All @@ -130,27 +132,29 @@ def parse_metadata_from_shpxml(xml_elem):
return metadata_attrs


def assemble_attributes(file_as_str, attributes):
def assemble_attributes(geodf, attributes):
"""
Helper function to concat file attributes to records

Parameters
----------
file_as_str : string
The file records as a string
geodf : geodataframe
The file records as a geodataframe

attributes : dict
A dictionary of attributes to concatenate
"""

items = []

for _index, row in file_as_str.iterrows():
# rework to use dataframe instead of file as string
for _index, row in geodf.iterrows():

shp_attrs = json.loads(
row.to_json(default_handler=str))

item_attrs = shp_attrs | attributes

item_attrs = {key: str(item_attrs[key]) for key in item_attrs.keys()}
items.append(item_attrs)

return items
Expand All @@ -177,14 +181,18 @@ def parse_from_filename(filename):
collection = ""
collection_version = ""

if 'Reach' in filename:
if 'RiverSP_Reach' in filename:
collection = constants.SWOT_REACH_COLLECTION_NAME
collection_version = constants.SWOT_REACH_COLLECTION_VERSION

if 'Node' in filename:
if 'RiverSP_Node' in filename:
collection = constants.SWOT_NODE_COLLECTION_NAME
collection_version = constants.SWOT_NODE_COLLECTION_VERSION

if 'LakeSP_Prior' in filename:
collection = constants.SWOT_PRIOR_LAKE_COLLECTION_NAME
collection_version = constants.SWOT_PRIOR_LAKE_COLLECTION_VERSION

filename_attrs = {
'cycle_id': filename_components[5],
'pass_id': filename_components[6],
Expand Down
82 changes: 51 additions & 31 deletions hydrocron/db/load_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from botocore.exceptions import ClientError

from hydrocron.db import HydrocronTable
from hydrocron.db.io import swot_reach_node_shp
from hydrocron.db.io import swot_shp
from hydrocron.utils import connection
from hydrocron.utils import constants

Expand Down Expand Up @@ -49,6 +49,9 @@ def lambda_handler(event, _): # noqa: E501 # pylint: disable=W0613
case constants.SWOT_NODE_TABLE_NAME:
collection_shortname = constants.SWOT_NODE_COLLECTION_NAME
feature_type = 'Node'
case constants.SWOT_PRIOR_LAKE_TABLE_NAME:
collection_shortname = constants.SWOT_PRIOR_LAKE_COLLECTION_NAME
feature_type = 'LakeSP_prior'
case constants.DB_TEST_TABLE_NAME:
collection_shortname = constants.SWOT_REACH_COLLECTION_NAME
feature_type = 'Reach'
Expand Down Expand Up @@ -95,14 +98,17 @@ def granule_handler(event, _):
if ("Node" in granule_path) & (table_name != constants.SWOT_NODE_TABLE_NAME):
raise TableMisMatch(f"Error: Cannot load Node data into table: '{table_name}'")

if ("LakeSP_Prior" in granule_path) & (table_name != constants.SWOT_PRIOR_LAKE_TABLE_NAME):
raise TableMisMatch(f"Error: Cannot load Prior Lake data into table: '{table_name}'")

logging.info("Value of load_benchmarking_data is: %s", load_benchmarking_data)

obscure_data = "true" in os.getenv("OBSCURE_DATA").lower()
logging.info("Value of obscure_data is: %s", obscure_data)

if load_benchmarking_data == "True":
logging.info("Loading benchmarking data")
items = swot_reach_node_shp.load_benchmarking_data()
items = swot_shp.load_benchmarking_data()
else:
logging.info("Setting up S3 connection")
s3_resource = connection.s3_resource
Expand Down Expand Up @@ -158,6 +164,18 @@ def cnm_handler(event, _):
InvocationType='Event',
Payload=event2)

if 'LakeSP_Prior' in granule_uri:
event2 = ('{"body": {"granule_path": "' + granule_uri
+ '","table_name": "' + constants.SWOT_PRIOR_LAKE_TABLE_NAME
+ '","load_benchmarking_data": "' + load_benchmarking_data + '"}}')

logging.info("Invoking granule load lambda with event json %s", str(event2))

lambda_client.invoke(
FunctionName=os.environ['GRANULE_LAMBDA_FUNCTION_NAME'],
InvocationType='Event',
Payload=event2)


def find_new_granules(collection_shortname, start_date, end_date):
"""
Expand Down Expand Up @@ -208,20 +226,28 @@ def read_data(granule_path, obscure_data, s3_resource=None):

if 'Reach' in granule_path:
logging.info("Start reading reach shapefile")
items = swot_reach_node_shp.read_shapefile(
items = swot_shp.read_shapefile(
granule_path,
obscure_data,
constants.REACH_DATA_COLUMNS,
s3_resource=s3_resource)

if 'Node' in granule_path:
logging.info("Start reading node shapefile")
items = swot_reach_node_shp.read_shapefile(
items = swot_shp.read_shapefile(
granule_path,
obscure_data,
constants.NODE_DATA_COLUMNS,
s3_resource=s3_resource)

if 'LakeSP_Prior' in granule_path:
logging.info("Start reading prior lake shapefile")
items = swot_shp.read_shapefile(
granule_path,
obscure_data,
constants.PRIOR_LAKE_DATA_COLUMNS,
s3_resource=s3_resource)

return items


Expand All @@ -247,33 +273,27 @@ def load_data(dynamo_resource, table_name, items):
raise MissingTable(f"Hydrocron table '{table_name}' does not exist.") from err
raise err

if hydrocron_table.table_name == constants.SWOT_REACH_TABLE_NAME:

if len(items) > 5:
logging.info("Batch adding %s reach items", len(items))
for i in range(5):
logging.info("Item reach_id: %s", items[i]['reach_id'])
hydrocron_table.batch_fill_table(items)

else:
logging.info("Adding reach items to table individually")
for item_attrs in items:
logging.info("Item reach_id: %s", item_attrs['reach_id'])
hydrocron_table.add_data(**item_attrs)

elif hydrocron_table.table_name == constants.SWOT_NODE_TABLE_NAME:

if len(items) > 5:
logging.info("Batch adding %s node items", len(items))
for i in range(5):
logging.info("Item node_id: %s", items[i]['node_id'])
hydrocron_table.batch_fill_table(items)
match hydrocron_table.table_name:
case constants.SWOT_REACH_TABLE_NAME:
feature_name = 'reach'
feature_id = feature_name + '_id'
case constants.SWOT_NODE_TABLE_NAME:
feature_name = 'node'
feature_id = feature_name + '_id'
case constants.SWOT_PRIOR_LAKE_TABLE_NAME:
feature_name = 'prior_lake'
feature_id = 'lake_id'
case _:
logging.warning('Items cannot be parsed, file reader not implemented for table %s', hydrocron_table.table_name)

else:
logging.info("Adding node items to table individually")
for item_attrs in items:
logging.info("Item node_id: %s", item_attrs['node_id'])
hydrocron_table.add_data(**item_attrs)
if len(items) > 5:
logging.info("Batch adding %s %s items. First 5 feature ids in batch: ", len(items), feature_name)
for i in range(5):
frankinspace marked this conversation as resolved.
Show resolved Hide resolved
logging.info("Item %s: %s", feature_id, items[i][feature_id])
hydrocron_table.batch_fill_table(items)

else:
logging.warning('Items cannot be parsed, file reader not implemented for table %s', hydrocron_table.table_name)
logging.info("Adding %s items to table individually", feature_name)
for item_attrs in items:
logging.info("Item %s: %s", feature_id, item_attrs[feature_id])
hydrocron_table.add_data(**item_attrs)
17 changes: 15 additions & 2 deletions hydrocron/db/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Hydrocron Table module
"""
import logging
import sys
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key

Expand Down Expand Up @@ -114,12 +115,24 @@ def batch_fill_table(self, items):
try:
with table.batch_writer() as writer:
for item in items:
writer.put_item(Item=item)
logger.info(
"Item %s size: %s",
item[self.partition_key_name],
str(sys.getsizeof(item))
)
if sys.getsizeof(item) < 300000:
writer.put_item(Item=item)
else:
logger.Warning(
"Item too large, could not load: %s %s",
self.partition_key_name,
item[self.partition_key_name]
)
continue
logger.info("Loaded data into table %s.", table.name)

except ClientError:
logger.exception("Couldn't load data into table %s.", table.name)
raise

def run_query(self, partition_key, sort_key=None):
"""
Expand Down
Loading
Loading