Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use a 413 to trigger slicing #35

Merged
merged 2 commits into from
May 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
278 changes: 131 additions & 147 deletions argovisHelpers/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,151 +3,6 @@
from shapely.ops import orient
import geopandas as gpd

# networking helpers

def argofetch(route, options={}, apikey='', apiroot='https://argovis-api.colorado.edu/', suggestedLatency=0, verbose=False):
# GET <apiroot>/<route>?<options> with <apikey> in the header.
# raises on anything other than success or a 404.

o = copy.deepcopy(options)
for option in ['polygon', 'box']:
if option in options:
options[option] = str(options[option])

dl = requests.get(apiroot.rstrip('/') + '/' + route.lstrip('/'), params = options, headers={'x-argokey': apikey})
statuscode = dl.status_code
if verbose:
print(urllib.parse.unquote(dl.url))
dl = dl.json()

if statuscode==429:
# user exceeded API limit, extract suggested wait and delay times, and try again
wait = dl['delay'][0]
latency = dl['delay'][1]
time.sleep(wait*1.1)
return argofetch(route, options=o, apikey=apikey, apiroot=apiroot, suggestedLatency=latency, verbose=verbose)

if (statuscode!=404 and statuscode!=200) or (statuscode==200 and type(dl) is dict and 'code' in dl):
if statuscode == 413:
print('The temporospatial extent of your request is enormous! Consider using the `query` helper in this package to split it up into more manageable chunks.')
elif statuscode >= 500 or (statuscode==200 and type(dl) is dict and 'code' in dl):
print("Argovis' servers experienced an error. Please try your request again, and email [email protected] if this keeps happening; please include the full details of the the request you made so we can help address.")
raise Exception(statuscode, dl)

# no special action for 404 - a 404 due to a mangled route will return an error, while a valid search with no result will return [].

return dl, suggestedLatency

def query(route, options={}, apikey='', apiroot='https://argovis-api.colorado.edu/', verbose=False):
# middleware function between the user and a call to argofetch to make sure individual requests are reasonably scoped and timed.
r = re.sub('^/', '', route)
r = re.sub('/$', '', r)

data_routes = ['argo', 'cchdo', 'drifters', 'tc', 'argotrajectories', 'easyocean', 'grids/rg09', 'grids/kg21', 'grids/glodap', 'timeseries/noaasst', 'timeseries/copernicussla', 'timeseries/ccmpwind', 'extended/ar']

scoped_parameters = {
'argo': ['id','platform', 'metadata'],
'cchdo': ['id', 'woceline', 'cchdo_cruise', 'metadata'],
'drifters': ['id', 'wmo', 'platform', 'metadata'],
'tc': ['id', 'name', 'metadata'],
'argotrajectories': ['id', 'platform', 'metadata'],
'easyocean': ['id', 'woceline', 'metadata'],
'grids/rg09': ['id'],
'grids/kg21': ['id'],
'grids/glodap': ['id'],
'timeseries/noaasst': ['id'],
'timeseries/copernicussla': ['id'],
'timeseries/ccmpwind': ['id'],
'extended/ar': ['id']
}

if r in data_routes and (not 'compression' in options or options['compression']!='minimal'):
# these are potentially large requests that might need to be sliced up

## identify timeseries, need to be recombined differently after slicing
isTimeseries = r.split('/')[0] == 'timeseries'

## if a data query carries a scoped parameter, no need to slice up:
if r in scoped_parameters and not set(scoped_parameters[r]).isdisjoint(options.keys()):
return argofetch(route, options=options, apikey=apikey, apiroot=apiroot, verbose=verbose)[0]

# should we slice by time or space?
times = slice_timesteps(options, r)
n_space = 2592 # number of 5x5 bins covering a globe
if 'polygon' in options:
pgons = split_polygon(options['polygon'])
n_space = len(pgons)
elif 'box' in options:
boxes = split_box(options['box'])
n_space = len(boxes)

if isTimeseries or n_space < len(times):
## slice up in space bins
ops = copy.deepcopy(options)
results = []
delay = 0

if 'box' in options:
boxes = split_box(options['box'])
for i in range(len(boxes)):
ops['box'] = boxes[i]
increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose)
results += increment[0]
delay = increment[1]
time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay
else:
pgons = []
if 'polygon' in options:
pgons = split_polygon(options['polygon'])
else:
pgons = generate_global_cells()
for i in range(len(pgons)):
ops['polygon'] = pgons[i]
increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose)
results += increment[0]
delay = increment[1]
time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay
# smaller polygons will trace geodesics differently than full polygons, need to doublecheck;
# do it for boxes too just to make sure nothing funny happened on the boundaries
ops = copy.deepcopy(options)
ops['compression'] = 'minimal'
true_ids = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose)
true_ids = [x[0] for x in true_ids[0]]
fetched_ids = [x['_id'] for x in results]
if len(fetched_ids) != len(list(set(fetched_ids))):
# deduplicate anything scooped up by multiple cells, like on cell borders
r = {x['_id']: x for x in results}
results = [r[i] for i in list(r.keys())]
fetched_ids = [x['_id'] for x in results]
to_drop = [item for item in fetched_ids if item not in true_ids]
to_add = [item for item in true_ids if item not in fetched_ids]
for id in to_add:
p, delay = argofetch(route, options={'id': id}, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose)
results += p
results = [x for x in results if x['_id'] not in to_drop]

else:
## slice up in time bins
results = []
ops = copy.deepcopy(options)
delay = 0
for i in range(len(times)-1):
ops['startDate'] = times[i]
ops['endDate'] = times[i+1]
increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose)
results += increment[0]
delay = increment[1]
time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay

# slicing can end up duplicating results in batchmeta requests, deduplicate
if 'batchmeta' in options:
results = list({x['_id']: x for x in results}.values())

return results

else:
return argofetch(route, options=options, apikey=apikey, apiroot=apiroot, verbose=verbose)[0]

def slice_timesteps(options, r):
# given a qsr option dict and data route, return a list of reasonable time divisions

Expand Down Expand Up @@ -216,8 +71,6 @@ def slice_timesteps(options, r):

return times

# data munging helpers

def data_inflate(data_doc, metadata_doc=None):
# given a single JSON <data_doc> downloaded from one of the standard data routes,
# return the data document with the data key reinflated to per-level dictionaries.
Expand Down Expand Up @@ -360,3 +213,134 @@ def generate_global_cells(lonstep=5, latstep=5):
lat = -90
lon += lonstep
return cells

def argofetch(route, options={}, apikey='', apiroot='https://argovis-api.colorado.edu/', suggestedLatency=0, verbose=False):
# GET <apiroot>/<route>?<options> with <apikey> in the header.
# raises on anything other than success or a 404.

o = copy.deepcopy(options)
for option in ['polygon', 'box']:
if option in options:
options[option] = str(options[option])

dl = requests.get(apiroot.rstrip('/') + '/' + route.lstrip('/'), params = options, headers={'x-argokey': apikey})
statuscode = dl.status_code
if verbose:
print(urllib.parse.unquote(dl.url))
dl = dl.json()

if statuscode==429:
# user exceeded API limit, extract suggested wait and delay times, and try again
wait = dl['delay'][0]
latency = dl['delay'][1]
time.sleep(wait*1.1)
return argofetch(route, options=o, apikey=apikey, apiroot=apiroot, suggestedLatency=latency, verbose=verbose)

if (statuscode!=404 and statuscode!=200) or (statuscode==200 and type(dl) is dict and 'code' in dl):
if statuscode == 413:
print('The temporospatial extent of your request is enormous! If you are using the query helper, it will now try to slice this request up for you. Try setting verbose=true to see how it is slicing this up.')
elif statuscode >= 500 or (statuscode==200 and type(dl) is dict and 'code' in dl):
print("Argovis' servers experienced an error. Please try your request again, and email [email protected] if this keeps happening; please include the full details of the the request you made so we can help address.")
raise Exception(statuscode, dl)

# no special action for 404 - a 404 due to a mangled route will return an error, while a valid search with no result will return [].

return dl, suggestedLatency

def query(route, options={}, apikey='', apiroot='https://argovis-api.colorado.edu/', verbose=False, slice=False):
# middleware function between the user and a call to argofetch to make sure individual requests are reasonably scoped and timed.
r = re.sub('^/', '', route)
r = re.sub('/$', '', r)

# start by just trying the request, to determine if we need to slice it
if not slice:
try:
q = argofetch(route, options=options, apikey=apikey, apiroot=apiroot, verbose=verbose)
return q[0]
except Exception as e:
if e.args[0] == 413:
# we need to slice
return query(route=route, options=options, apikey=apikey, apiroot=apiroot, verbose=verbose, slice=True)
else:
print(e)
return e.args

# slice request up into a series of requests

## identify timeseries, need to be recombined differently after slicing
isTimeseries = r.split('/')[0] == 'timeseries'

# should we slice by time or space?
times = slice_timesteps(options, r)
n_space = 2592 # number of 5x5 bins covering a globe
if 'polygon' in options:
pgons = split_polygon(options['polygon'])
n_space = len(pgons)
elif 'box' in options:
boxes = split_box(options['box'])
n_space = len(boxes)

if isTimeseries or n_space < len(times):
## slice up in space bins
ops = copy.deepcopy(options)
results = []
delay = 0

if 'box' in options:
boxes = split_box(options['box'])
for i in range(len(boxes)):
ops['box'] = boxes[i]
increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose)
results += increment[0]
delay = increment[1]
time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay
else:
pgons = []
if 'polygon' in options:
pgons = split_polygon(options['polygon'])
else:
pgons = generate_global_cells()
for i in range(len(pgons)):
ops['polygon'] = pgons[i]
increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose)
results += increment[0]
delay = increment[1]
time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay
# smaller polygons will trace geodesics differently than full polygons, need to doublecheck;
# do it for boxes too just to make sure nothing funny happened on the boundaries
ops = copy.deepcopy(options)
ops['compression'] = 'minimal'
true_ids = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose)
true_ids = [x[0] for x in true_ids[0]]
fetched_ids = [x['_id'] for x in results]
if len(fetched_ids) != len(list(set(fetched_ids))):
# deduplicate anything scooped up by multiple cells, like on cell borders
r = {x['_id']: x for x in results}
results = [r[i] for i in list(r.keys())]
fetched_ids = [x['_id'] for x in results]
to_drop = [item for item in fetched_ids if item not in true_ids]
to_add = [item for item in true_ids if item not in fetched_ids]
for id in to_add:
p, delay = argofetch(route, options={'id': id}, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose)
results += p
results = [x for x in results if x['_id'] not in to_drop]

else:
## slice up in time bins
results = []
ops = copy.deepcopy(options)
delay = 0
for i in range(len(times)-1):
ops['startDate'] = times[i]
ops['endDate'] = times[i+1]
increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose)
results += increment[0]
delay = increment[1]
time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay

# slicing can end up duplicating results in batchmeta requests, deduplicate
if 'batchmeta' in options:
results = list({x['_id']: x for x in results}.values())

return results

Binary file added dist/argovisHelpers-0.0.26-py3-none-any.whl
Binary file not shown.
Binary file added dist/argovisHelpers-0.0.26.tar.gz
Binary file not shown.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "argovisHelpers"
version = "0.0.25"
version = "0.0.26"
description = "Helper functions to consume and parse information from University of Colorado's Argovis API."
readme = "README.md"
authors = [{name = "Katie Mills" }]
Expand Down
Loading