diff --git a/argovisHelpers/helpers.py b/argovisHelpers/helpers.py index 210bc2e..46a5da7 100644 --- a/argovisHelpers/helpers.py +++ b/argovisHelpers/helpers.py @@ -3,151 +3,6 @@ from shapely.ops import orient import geopandas as gpd -# networking helpers - -def argofetch(route, options={}, apikey='', apiroot='https://argovis-api.colorado.edu/', suggestedLatency=0, verbose=False): - # GET /? with in the header. - # raises on anything other than success or a 404. - - o = copy.deepcopy(options) - for option in ['polygon', 'box']: - if option in options: - options[option] = str(options[option]) - - dl = requests.get(apiroot.rstrip('/') + '/' + route.lstrip('/'), params = options, headers={'x-argokey': apikey}) - statuscode = dl.status_code - if verbose: - print(urllib.parse.unquote(dl.url)) - dl = dl.json() - - if statuscode==429: - # user exceeded API limit, extract suggested wait and delay times, and try again - wait = dl['delay'][0] - latency = dl['delay'][1] - time.sleep(wait*1.1) - return argofetch(route, options=o, apikey=apikey, apiroot=apiroot, suggestedLatency=latency, verbose=verbose) - - if (statuscode!=404 and statuscode!=200) or (statuscode==200 and type(dl) is dict and 'code' in dl): - if statuscode == 413: - print('The temporospatial extent of your request is enormous! Consider using the `query` helper in this package to split it up into more manageable chunks.') - elif statuscode >= 500 or (statuscode==200 and type(dl) is dict and 'code' in dl): - print("Argovis' servers experienced an error. Please try your request again, and email argovis@colorado.edu if this keeps happening; please include the full details of the the request you made so we can help address.") - raise Exception(statuscode, dl) - - # no special action for 404 - a 404 due to a mangled route will return an error, while a valid search with no result will return []. - - return dl, suggestedLatency - -def query(route, options={}, apikey='', apiroot='https://argovis-api.colorado.edu/', verbose=False): - # middleware function between the user and a call to argofetch to make sure individual requests are reasonably scoped and timed. - r = re.sub('^/', '', route) - r = re.sub('/$', '', r) - - data_routes = ['argo', 'cchdo', 'drifters', 'tc', 'argotrajectories', 'easyocean', 'grids/rg09', 'grids/kg21', 'grids/glodap', 'timeseries/noaasst', 'timeseries/copernicussla', 'timeseries/ccmpwind', 'extended/ar'] - - scoped_parameters = { - 'argo': ['id','platform', 'metadata'], - 'cchdo': ['id', 'woceline', 'cchdo_cruise', 'metadata'], - 'drifters': ['id', 'wmo', 'platform', 'metadata'], - 'tc': ['id', 'name', 'metadata'], - 'argotrajectories': ['id', 'platform', 'metadata'], - 'easyocean': ['id', 'woceline', 'metadata'], - 'grids/rg09': ['id'], - 'grids/kg21': ['id'], - 'grids/glodap': ['id'], - 'timeseries/noaasst': ['id'], - 'timeseries/copernicussla': ['id'], - 'timeseries/ccmpwind': ['id'], - 'extended/ar': ['id'] - } - - if r in data_routes and (not 'compression' in options or options['compression']!='minimal'): - # these are potentially large requests that might need to be sliced up - - ## identify timeseries, need to be recombined differently after slicing - isTimeseries = r.split('/')[0] == 'timeseries' - - ## if a data query carries a scoped parameter, no need to slice up: - if r in scoped_parameters and not set(scoped_parameters[r]).isdisjoint(options.keys()): - return argofetch(route, options=options, apikey=apikey, apiroot=apiroot, verbose=verbose)[0] - - # should we slice by time or space? - times = slice_timesteps(options, r) - n_space = 2592 # number of 5x5 bins covering a globe - if 'polygon' in options: - pgons = split_polygon(options['polygon']) - n_space = len(pgons) - elif 'box' in options: - boxes = split_box(options['box']) - n_space = len(boxes) - - if isTimeseries or n_space < len(times): - ## slice up in space bins - ops = copy.deepcopy(options) - results = [] - delay = 0 - - if 'box' in options: - boxes = split_box(options['box']) - for i in range(len(boxes)): - ops['box'] = boxes[i] - increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose) - results += increment[0] - delay = increment[1] - time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay - else: - pgons = [] - if 'polygon' in options: - pgons = split_polygon(options['polygon']) - else: - pgons = generate_global_cells() - for i in range(len(pgons)): - ops['polygon'] = pgons[i] - increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose) - results += increment[0] - delay = increment[1] - time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay - # smaller polygons will trace geodesics differently than full polygons, need to doublecheck; - # do it for boxes too just to make sure nothing funny happened on the boundaries - ops = copy.deepcopy(options) - ops['compression'] = 'minimal' - true_ids = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose) - true_ids = [x[0] for x in true_ids[0]] - fetched_ids = [x['_id'] for x in results] - if len(fetched_ids) != len(list(set(fetched_ids))): - # deduplicate anything scooped up by multiple cells, like on cell borders - r = {x['_id']: x for x in results} - results = [r[i] for i in list(r.keys())] - fetched_ids = [x['_id'] for x in results] - to_drop = [item for item in fetched_ids if item not in true_ids] - to_add = [item for item in true_ids if item not in fetched_ids] - for id in to_add: - p, delay = argofetch(route, options={'id': id}, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose) - results += p - results = [x for x in results if x['_id'] not in to_drop] - - else: - ## slice up in time bins - results = [] - ops = copy.deepcopy(options) - delay = 0 - for i in range(len(times)-1): - ops['startDate'] = times[i] - ops['endDate'] = times[i+1] - increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose) - results += increment[0] - delay = increment[1] - time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay - - # slicing can end up duplicating results in batchmeta requests, deduplicate - if 'batchmeta' in options: - results = list({x['_id']: x for x in results}.values()) - - return results - - else: - return argofetch(route, options=options, apikey=apikey, apiroot=apiroot, verbose=verbose)[0] - def slice_timesteps(options, r): # given a qsr option dict and data route, return a list of reasonable time divisions @@ -216,8 +71,6 @@ def slice_timesteps(options, r): return times -# data munging helpers - def data_inflate(data_doc, metadata_doc=None): # given a single JSON downloaded from one of the standard data routes, # return the data document with the data key reinflated to per-level dictionaries. @@ -360,3 +213,134 @@ def generate_global_cells(lonstep=5, latstep=5): lat = -90 lon += lonstep return cells + +def argofetch(route, options={}, apikey='', apiroot='https://argovis-api.colorado.edu/', suggestedLatency=0, verbose=False): + # GET /? with in the header. + # raises on anything other than success or a 404. + + o = copy.deepcopy(options) + for option in ['polygon', 'box']: + if option in options: + options[option] = str(options[option]) + + dl = requests.get(apiroot.rstrip('/') + '/' + route.lstrip('/'), params = options, headers={'x-argokey': apikey}) + statuscode = dl.status_code + if verbose: + print(urllib.parse.unquote(dl.url)) + dl = dl.json() + + if statuscode==429: + # user exceeded API limit, extract suggested wait and delay times, and try again + wait = dl['delay'][0] + latency = dl['delay'][1] + time.sleep(wait*1.1) + return argofetch(route, options=o, apikey=apikey, apiroot=apiroot, suggestedLatency=latency, verbose=verbose) + + if (statuscode!=404 and statuscode!=200) or (statuscode==200 and type(dl) is dict and 'code' in dl): + if statuscode == 413: + print('The temporospatial extent of your request is enormous! If you are using the query helper, it will now try to slice this request up for you. Try setting verbose=true to see how it is slicing this up.') + elif statuscode >= 500 or (statuscode==200 and type(dl) is dict and 'code' in dl): + print("Argovis' servers experienced an error. Please try your request again, and email argovis@colorado.edu if this keeps happening; please include the full details of the the request you made so we can help address.") + raise Exception(statuscode, dl) + + # no special action for 404 - a 404 due to a mangled route will return an error, while a valid search with no result will return []. + + return dl, suggestedLatency + +def query(route, options={}, apikey='', apiroot='https://argovis-api.colorado.edu/', verbose=False, slice=False): + # middleware function between the user and a call to argofetch to make sure individual requests are reasonably scoped and timed. + r = re.sub('^/', '', route) + r = re.sub('/$', '', r) + + # start by just trying the request, to determine if we need to slice it + if not slice: + try: + q = argofetch(route, options=options, apikey=apikey, apiroot=apiroot, verbose=verbose) + return q[0] + except Exception as e: + if e.args[0] == 413: + # we need to slice + return query(route=route, options=options, apikey=apikey, apiroot=apiroot, verbose=verbose, slice=True) + else: + print(e) + return e.args + + # slice request up into a series of requests + + ## identify timeseries, need to be recombined differently after slicing + isTimeseries = r.split('/')[0] == 'timeseries' + + # should we slice by time or space? + times = slice_timesteps(options, r) + n_space = 2592 # number of 5x5 bins covering a globe + if 'polygon' in options: + pgons = split_polygon(options['polygon']) + n_space = len(pgons) + elif 'box' in options: + boxes = split_box(options['box']) + n_space = len(boxes) + + if isTimeseries or n_space < len(times): + ## slice up in space bins + ops = copy.deepcopy(options) + results = [] + delay = 0 + + if 'box' in options: + boxes = split_box(options['box']) + for i in range(len(boxes)): + ops['box'] = boxes[i] + increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose) + results += increment[0] + delay = increment[1] + time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay + else: + pgons = [] + if 'polygon' in options: + pgons = split_polygon(options['polygon']) + else: + pgons = generate_global_cells() + for i in range(len(pgons)): + ops['polygon'] = pgons[i] + increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose) + results += increment[0] + delay = increment[1] + time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay + # smaller polygons will trace geodesics differently than full polygons, need to doublecheck; + # do it for boxes too just to make sure nothing funny happened on the boundaries + ops = copy.deepcopy(options) + ops['compression'] = 'minimal' + true_ids = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose) + true_ids = [x[0] for x in true_ids[0]] + fetched_ids = [x['_id'] for x in results] + if len(fetched_ids) != len(list(set(fetched_ids))): + # deduplicate anything scooped up by multiple cells, like on cell borders + r = {x['_id']: x for x in results} + results = [r[i] for i in list(r.keys())] + fetched_ids = [x['_id'] for x in results] + to_drop = [item for item in fetched_ids if item not in true_ids] + to_add = [item for item in true_ids if item not in fetched_ids] + for id in to_add: + p, delay = argofetch(route, options={'id': id}, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose) + results += p + results = [x for x in results if x['_id'] not in to_drop] + + else: + ## slice up in time bins + results = [] + ops = copy.deepcopy(options) + delay = 0 + for i in range(len(times)-1): + ops['startDate'] = times[i] + ops['endDate'] = times[i+1] + increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose) + results += increment[0] + delay = increment[1] + time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay + + # slicing can end up duplicating results in batchmeta requests, deduplicate + if 'batchmeta' in options: + results = list({x['_id']: x for x in results}.values()) + + return results + diff --git a/dist/argovisHelpers-0.0.26-py3-none-any.whl b/dist/argovisHelpers-0.0.26-py3-none-any.whl new file mode 100644 index 0000000..e603140 Binary files /dev/null and b/dist/argovisHelpers-0.0.26-py3-none-any.whl differ diff --git a/dist/argovisHelpers-0.0.26.tar.gz b/dist/argovisHelpers-0.0.26.tar.gz new file mode 100644 index 0000000..c265e06 Binary files /dev/null and b/dist/argovisHelpers-0.0.26.tar.gz differ diff --git a/pyproject.toml b/pyproject.toml index 31e46fc..432873f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "argovisHelpers" -version = "0.0.25" +version = "0.0.26" description = "Helper functions to consume and parse information from University of Colorado's Argovis API." readme = "README.md" authors = [{name = "Katie Mills" }]