Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

regional slicing #30

Merged
merged 13 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ on:
jobs:
test:
runs-on: ubuntu-latest
container: argovis/argovis_helpers:test-base-231026
container: argovis/argovis_helpers:test-base-240418

services:
database:
image: argovis/testdb:0.40
image: argovis/testdb:0.41
redis:
image: redis:7.0.2
api:
image: argovis/api:2.22.0
image: argovis/api:2.24.3
env:
ARGONODE: core

Expand Down
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
FROM python:3.9

RUN pip install requests pytest area numpy scipy
RUN pip install requests pytest area numpy scipy shapely geopandas
WORKDIR /app
COPY . .
324 changes: 235 additions & 89 deletions argovisHelpers/helpers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import requests, datetime, copy, time, re, area, math, urllib
import requests, datetime, copy, time, re, area, math, urllib, json
from shapely.geometry import shape, box, Polygon
from shapely.ops import orient
import geopandas as gpd

# networking helpers

Expand Down Expand Up @@ -58,6 +61,95 @@ def query(route, options={}, apikey='', apiroot='https://argovis-api.colorado.ed
'extended/ar': ['id']
}

winding = False
if 'winding' in options:
winding = options['winding'] == 'true'

if r in data_routes and (not 'compression' in options or options['compression']!='minimal'):
# these are potentially large requests that might need to be sliced up

## identify timeseries, need to be recombined differently after slicing
isTimeseries = r.split('/')[0] == 'timeseries'

## if a data query carries a scoped parameter, no need to slice up:
if r in scoped_parameters and not set(scoped_parameters[r]).isdisjoint(options.keys()):
return argofetch(route, options=options, apikey=apikey, apiroot=apiroot, verbose=verbose)[0]

# should we slice by time or space?
times = slice_timesteps(options, r)
n_space = 2592 # number of 5x5 bins covering a globe
if 'polygon' in options:
pgons = split_polygon(options['polygon'], winding=winding)
n_space = len(pgons)
elif 'box' in options:
boxes = split_box(options['box'])
n_space = len(boxes)

if isTimeseries or n_space < len(times):
## slice up in space bins
ops = copy.deepcopy(options)
results = []
delay = 0

if 'box' in options:
boxes = split_box(options['box'])
for i in range(len(boxes)):
ops['box'] = boxes[i]
increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose)
results += increment[0]
delay = increment[1]
time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay
else:
pgons = []
if 'polygon' in options:
pgons = split_polygon(options['polygon'], winding=winding)
else:
pgons = generate_global_cells()
for i in range(len(pgons)):
ops['polygon'] = pgons[i]
increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose)
results += increment[0]
delay = increment[1]
time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay
# smaller polygons will trace geodesics differently than full polygons, need to doublecheck;
# do it for boxes too just to make sure nothing funny happened on the boundaries
ops = copy.deepcopy(options)
ops['compression'] = 'minimal'
true_ids = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose)
true_ids = [x[0] for x in true_ids[0]]
fetched_ids = [x['_id'] for x in results]
if len(fetched_ids) != len(list(set(fetched_ids))):
# deduplicate anything scooped up by multiple cells, like on cell borders
r = {x['_id']: x for x in results}
results = [r[i] for i in list(r.keys())]
fetched_ids = [x['_id'] for x in results]
to_drop = [item for item in fetched_ids if item not in true_ids]
to_add = [item for item in true_ids if item not in fetched_ids]
for id in to_add:
p, delay = argofetch(route, options={'id': id}, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose)
results += p
results = [x for x in results if x['_id'] not in to_drop]
return results
else:
## slice up in time bins
results = []
ops = copy.deepcopy(options)
delay = 0
for i in range(len(times)-1):
ops['startDate'] = times[i]
ops['endDate'] = times[i+1]
increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose)
results += increment[0]
delay = increment[1]
time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay
return results

else:
return argofetch(route, options=options, apikey=apikey, apiroot=apiroot, verbose=verbose)[0]

def slice_timesteps(options, r):
# given a qsr option dict and data route, return a list of reasonable time divisions

earliest_records = {
'argo': parsetime("1997-07-27T20:26:20.002Z"),
'cchdo': parsetime("1972-07-23T09:11:00.000Z"),
Expand Down Expand Up @@ -91,74 +183,41 @@ def query(route, options={}, apikey='', apiroot='https://argovis-api.colorado.ed
'extended/ar': parsetime("2022-01-01T21:00:00Z")
}

if r in data_routes:
# these are potentially large requests that might need to be sliced up

## identify timeseries, need to be recombined differently after slicing
isTimeseries = r.split('/')[0] == 'timeseries'

## if a data query carries a scoped parameter, no need to slice up:
if r in scoped_parameters and not set(scoped_parameters[r]).isdisjoint(options.keys()):
return argofetch(route, options=options, apikey=apikey, apiroot=apiroot, verbose=verbose)[0]

## slice up in time bins:
start = None
end = None
coerced_time = 0
if 'startDate' in options:
start = parsetime(options['startDate'])
else:
start = earliest_records[r]
coerced_time += 1
if 'endDate' in options:
end = parsetime(options['endDate'])
else:
end = last_records[r]
coerced_time += 1
coerced_time = coerced_time == 2 # ie both timestamps have been coerced on by the helpers

### determine appropriate bin size
maxbulk = 1000000 # should be <= maxbulk used in generating an API 413
timestep = 30 # days

if 'polygon' in options:
extent = area.area({'type':'Polygon','coordinates':[ options['polygon'] ]}) / 13000 / 1000000 # poly area in units of 13000 sq. km. blocks
timestep = min(400, math.floor(maxbulk / extent))
elif 'multipolygon' in options:
extents = [area.area({'type':'Polygon','coordinates':[x]}) / 13000 / 1000000 for x in options['multipolygon']]
extent = min(extents)
timestep = min(400,math.floor(maxbulk / extent))
elif 'box' in options:
extent = area.area({'type':'Polygon','coordinates':[[ options['box'][0], [options['box'][1][0], options['box'][0][0]], options['box'][1], [options['box'][0][0], options['box'][1][0]], options['box'][0]]]}) / 13000 / 1000000
timestep = min(400, math.floor(maxbulk / extent))

delta = datetime.timedelta(days=timestep)
times = [start]
while times[-1] + delta < end:
times.append(times[-1]+delta)
times.append(end)
times = [parsetime(x) for x in times]
results = []
ops = copy.deepcopy(options)
delay = 0
for i in range(len(times)-1):
ops['startDate'] = times[i]
ops['endDate'] = times[i+1]
increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose)
if isTimeseries and len(increment) > 0:
results = combine_dicts(results, increment[0])
else:
results += increment[0]
delay = increment[1]
time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay
if isTimeseries and coerced_time:
# remove timeseries from data document, only there because the slicing forced it so
results = [{k: v for k, v in d.items() if k != 'timeseries'} for d in results]
return results

maxbulk = 2000000 # should be <= maxbulk used in generating an API 413
timestep = 30 # days
extent = 360000000 / 13000 #// 360M sq km, all the oceans

if 'polygon' in options:
extent = area.area({'type':'Polygon','coordinates':[ options['polygon'] ]}) / 13000 / 1000000 # poly area in units of 13000 sq. km. blocks
elif 'multipolygon' in options:
extents = [area.area({'type':'Polygon','coordinates':[x]}) / 13000 / 1000000 for x in options['multipolygon']]
extent = min(extents)
elif 'box' in options:
extent = area.area({'type':'Polygon','coordinates':[[ options['box'][0], [options['box'][1][0], options['box'][0][0]], options['box'][1], [options['box'][0][0], options['box'][1][0]], options['box'][0]]]}) / 13000 / 1000000

timestep = math.floor(maxbulk / extent)

## slice up in time bins:
start = None
end = None
if 'startDate' in options:
start = parsetime(options['startDate'])
else:
return argofetch(route, options=options, apikey=apikey, apiroot=apiroot, verbose=verbose)[0]

start = earliest_records[r]
if 'endDate' in options:
end = parsetime(options['endDate'])
else:
end = last_records[r]

delta = datetime.timedelta(days=timestep)
times = [start]
while times[-1] + delta < end:
times.append(times[-1]+delta)
times.append(end)
times = [parsetime(x) for x in times]

return times

# data munging helpers

def data_inflate(data_doc, metadata_doc=None):
Expand Down Expand Up @@ -225,26 +284,113 @@ def combine_data_lists(lists):
combined_list.append(combined_sublist)
return combined_list

def combine_dicts(list1, list2):
# given two lists of timeseries data objects,
# combine them matching on geolocation and level
def split_polygon(coords, max_lon_size=5, max_lat_size=5, winding=False):
# slice a geojson polygon up into a list of smaller polygons of maximum extent in lon and lat

combined_list = []
for dict1 in list1:
combined = False
for dict2 in list2:
if dict1.get('geolocation') == dict2.get('geolocation') and dict1.get('level') == dict2.get('level'):
combined_dict = dict1.copy()
combined_dict['timeseries'] += dict2.get('timeseries', [])
data = combine_data_lists([dict1.get('data', []), dict2.get('data', [])])
if len(data) > 0:
combined_dict['data'] = data
combined_list.append(combined_dict)
combined = True
list2.remove(dict2) # Remove combined element from list2
break
if not combined:
combined_list.append(dict1)
combined_list.extend(list2) # Append remaining elements from list2
return combined_list
# if a polygon bridges the dateline and wraps its longitudes around,
# we need to detect this and un-wrap.
# assume bounded region is the smaller region unless winding is being enforced, per mongo
coords = dont_wrap_dateline(coords)

polygon = shape({"type": "Polygon", "coordinates": [coords]})

smaller_polygons = []

min_lon, min_lat, max_lon, max_lat = polygon.bounds

if winding and is_cw(coords):
# if winding is being enforced and the polygon is cw wound,
# we're looking for everything outside the polygon.

lon = min_lon-10
lat = -90
while lon < min_lon + 360:
while lat < max_lat+10: # < 90:
bounding_box = box(lon, lat, lon + max_lon_size, lat + max_lat_size)
chunk = bounding_box.difference(polygon)
if not chunk.is_empty:
# Convert the Shapely geometry to a GeoJSON polygon and add it to the list
shapes = json.loads(gpd.GeoSeries([chunk]).to_json())
if shapes['features'][0]['geometry']['type'] == 'Polygon':
smaller_polygons.append(shapes['features'][0]['geometry']['coordinates'][0])
elif shapes['features'][0]['geometry']['type'] == 'MultiPolygon':
for poly in shapes['features'][0]['geometry']['coordinates']:
smaller_polygons.append(poly[0])

lat += max_lat_size
lat = -90
lon += max_lon_size
else:
# Split the polygon interior into smaller polygons

lon = min_lon
lat = min_lat
while lon < max_lon:
while lat < max_lat:
# Create a bounding box for the current chunk
bounding_box = box(lon, lat, lon + max_lon_size, lat + max_lat_size)

# Intersect the bounding box with the original polygon
chunk = polygon.intersection(bounding_box)

# If the intersection is not empty, add it to the list of smaller polygons
if not chunk.is_empty:
# Convert the Shapely geometry to a GeoJSON polygon and add it to the list
shapes = json.loads(gpd.GeoSeries([chunk]).to_json())
if shapes['features'][0]['geometry']['type'] == 'Polygon':
smaller_polygons.append(shapes['features'][0]['geometry']['coordinates'][0])
elif shapes['features'][0]['geometry']['type'] == 'MultiPolygon':
for poly in shapes['features'][0]['geometry']['coordinates']:
smaller_polygons.append(poly[0])

lat += max_lat_size
lat = min_lat
lon += max_lon_size

return smaller_polygons

def split_box(box, max_lon_size=5, max_lat_size=5):
# slice a box up into a list of smaller boxes of maximum extent in lon and lat

if box[0][0] > box[1][0]:
# unwrap the dateline
box[1][0] += 360

smaller_boxes = []
lon = box[0][0]
lat = box[0][1]
while lon < box[1][0]:
while lat < box[1][1]:
smaller_boxes.append([[lon, lat],[min(box[1][0], lon + max_lon_size), min(box[1][1], lat + max_lat_size)]])
lat += max_lat_size
lat = box[0][1]
lon += max_lon_size

return smaller_boxes

def dont_wrap_dateline(coords):
# given a list of polygon coords, return them ensuring they dont modulo 360 over the dateline.

for i in range(len(coords)-1):
if coords[i][0]*coords[i+1][0] < 0 and abs(coords[i][0] - coords[i+1][0]) > 180:
# ie if any geodesic edge crosses the dateline with a modulo, we must need to remap.
return [[lon + 360 if lon < 0 else lon, lat] for lon, lat in coords]

return coords

def generate_global_cells(lonstep=5, latstep=5):
cells = []
lon = -180
lat = -90
while lon < 180:
while lat < 90:
cells.append([[lon,lat],[lon+lonstep,lat],[lon+lonstep,lat+latstep],[lon,lat+latstep],[lon,lat]])

lat += latstep
lat = -90
lon += lonstep
return cells

def is_cw(coords):
unwrap = dont_wrap_dateline(coords)
return Polygon(unwrap) == orient(Polygon(unwrap), sign=-1.0)
Binary file added dist/argovisHelpers-0.0.23-py3-none-any.whl
Binary file not shown.
Binary file added dist/argovisHelpers-0.0.23.tar.gz
Binary file not shown.
6 changes: 4 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "argovisHelpers"
version = "0.0.22"
version = "0.0.23"
description = "Helper functions to consume and parse information from University of Colorado's Argovis API."
readme = "README.md"
authors = [{name = "Katie Mills" }]
Expand All @@ -16,7 +16,9 @@ classifiers = [
]
dependencies = [
"requests",
"area"
"area",
"shapely==1.8.0",
"geopandas==0.14.3"
]
requires-python = ">=3.9"

Expand Down
Loading
Loading