diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 36c31d8..c44bdc4 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -8,15 +8,15 @@ on: jobs: test: runs-on: ubuntu-latest - container: argovis/argovis_helpers:test-base-231026 + container: argovis/argovis_helpers:test-base-240418 services: database: - image: argovis/testdb:0.40 + image: argovis/testdb:0.41 redis: image: redis:7.0.2 api: - image: argovis/api:2.22.0 + image: argovis/api:2.24.3 env: ARGONODE: core diff --git a/Dockerfile b/Dockerfile index fd8efed..7ff0b02 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,5 +1,5 @@ FROM python:3.9 -RUN pip install requests pytest area numpy scipy +RUN pip install requests pytest area numpy scipy shapely geopandas WORKDIR /app COPY . . diff --git a/argovisHelpers/helpers.py b/argovisHelpers/helpers.py index a9bb72e..e694a53 100644 --- a/argovisHelpers/helpers.py +++ b/argovisHelpers/helpers.py @@ -1,4 +1,7 @@ -import requests, datetime, copy, time, re, area, math, urllib +import requests, datetime, copy, time, re, area, math, urllib, json +from shapely.geometry import shape, box, Polygon +from shapely.ops import orient +import geopandas as gpd # networking helpers @@ -58,6 +61,95 @@ def query(route, options={}, apikey='', apiroot='https://argovis-api.colorado.ed 'extended/ar': ['id'] } + winding = False + if 'winding' in options: + winding = options['winding'] == 'true' + + if r in data_routes and (not 'compression' in options or options['compression']!='minimal'): + # these are potentially large requests that might need to be sliced up + + ## identify timeseries, need to be recombined differently after slicing + isTimeseries = r.split('/')[0] == 'timeseries' + + ## if a data query carries a scoped parameter, no need to slice up: + if r in scoped_parameters and not set(scoped_parameters[r]).isdisjoint(options.keys()): + return argofetch(route, options=options, apikey=apikey, apiroot=apiroot, verbose=verbose)[0] + + # should we slice by time or space? + times = slice_timesteps(options, r) + n_space = 2592 # number of 5x5 bins covering a globe + if 'polygon' in options: + pgons = split_polygon(options['polygon'], winding=winding) + n_space = len(pgons) + elif 'box' in options: + boxes = split_box(options['box']) + n_space = len(boxes) + + if isTimeseries or n_space < len(times): + ## slice up in space bins + ops = copy.deepcopy(options) + results = [] + delay = 0 + + if 'box' in options: + boxes = split_box(options['box']) + for i in range(len(boxes)): + ops['box'] = boxes[i] + increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose) + results += increment[0] + delay = increment[1] + time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay + else: + pgons = [] + if 'polygon' in options: + pgons = split_polygon(options['polygon'], winding=winding) + else: + pgons = generate_global_cells() + for i in range(len(pgons)): + ops['polygon'] = pgons[i] + increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose) + results += increment[0] + delay = increment[1] + time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay + # smaller polygons will trace geodesics differently than full polygons, need to doublecheck; + # do it for boxes too just to make sure nothing funny happened on the boundaries + ops = copy.deepcopy(options) + ops['compression'] = 'minimal' + true_ids = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose) + true_ids = [x[0] for x in true_ids[0]] + fetched_ids = [x['_id'] for x in results] + if len(fetched_ids) != len(list(set(fetched_ids))): + # deduplicate anything scooped up by multiple cells, like on cell borders + r = {x['_id']: x for x in results} + results = [r[i] for i in list(r.keys())] + fetched_ids = [x['_id'] for x in results] + to_drop = [item for item in fetched_ids if item not in true_ids] + to_add = [item for item in true_ids if item not in fetched_ids] + for id in to_add: + p, delay = argofetch(route, options={'id': id}, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose) + results += p + results = [x for x in results if x['_id'] not in to_drop] + return results + else: + ## slice up in time bins + results = [] + ops = copy.deepcopy(options) + delay = 0 + for i in range(len(times)-1): + ops['startDate'] = times[i] + ops['endDate'] = times[i+1] + increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose) + results += increment[0] + delay = increment[1] + time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay + return results + + else: + return argofetch(route, options=options, apikey=apikey, apiroot=apiroot, verbose=verbose)[0] + +def slice_timesteps(options, r): + # given a qsr option dict and data route, return a list of reasonable time divisions + earliest_records = { 'argo': parsetime("1997-07-27T20:26:20.002Z"), 'cchdo': parsetime("1972-07-23T09:11:00.000Z"), @@ -91,74 +183,41 @@ def query(route, options={}, apikey='', apiroot='https://argovis-api.colorado.ed 'extended/ar': parsetime("2022-01-01T21:00:00Z") } - if r in data_routes: - # these are potentially large requests that might need to be sliced up - - ## identify timeseries, need to be recombined differently after slicing - isTimeseries = r.split('/')[0] == 'timeseries' - - ## if a data query carries a scoped parameter, no need to slice up: - if r in scoped_parameters and not set(scoped_parameters[r]).isdisjoint(options.keys()): - return argofetch(route, options=options, apikey=apikey, apiroot=apiroot, verbose=verbose)[0] - - ## slice up in time bins: - start = None - end = None - coerced_time = 0 - if 'startDate' in options: - start = parsetime(options['startDate']) - else: - start = earliest_records[r] - coerced_time += 1 - if 'endDate' in options: - end = parsetime(options['endDate']) - else: - end = last_records[r] - coerced_time += 1 - coerced_time = coerced_time == 2 # ie both timestamps have been coerced on by the helpers - - ### determine appropriate bin size - maxbulk = 1000000 # should be <= maxbulk used in generating an API 413 - timestep = 30 # days - - if 'polygon' in options: - extent = area.area({'type':'Polygon','coordinates':[ options['polygon'] ]}) / 13000 / 1000000 # poly area in units of 13000 sq. km. blocks - timestep = min(400, math.floor(maxbulk / extent)) - elif 'multipolygon' in options: - extents = [area.area({'type':'Polygon','coordinates':[x]}) / 13000 / 1000000 for x in options['multipolygon']] - extent = min(extents) - timestep = min(400,math.floor(maxbulk / extent)) - elif 'box' in options: - extent = area.area({'type':'Polygon','coordinates':[[ options['box'][0], [options['box'][1][0], options['box'][0][0]], options['box'][1], [options['box'][0][0], options['box'][1][0]], options['box'][0]]]}) / 13000 / 1000000 - timestep = min(400, math.floor(maxbulk / extent)) - - delta = datetime.timedelta(days=timestep) - times = [start] - while times[-1] + delta < end: - times.append(times[-1]+delta) - times.append(end) - times = [parsetime(x) for x in times] - results = [] - ops = copy.deepcopy(options) - delay = 0 - for i in range(len(times)-1): - ops['startDate'] = times[i] - ops['endDate'] = times[i+1] - increment = argofetch(route, options=ops, apikey=apikey, apiroot=apiroot, suggestedLatency=delay, verbose=verbose) - if isTimeseries and len(increment) > 0: - results = combine_dicts(results, increment[0]) - else: - results += increment[0] - delay = increment[1] - time.sleep(increment[1]*0.8) # assume the synchronous request is supplying at least some of delay - if isTimeseries and coerced_time: - # remove timeseries from data document, only there because the slicing forced it so - results = [{k: v for k, v in d.items() if k != 'timeseries'} for d in results] - return results - + maxbulk = 2000000 # should be <= maxbulk used in generating an API 413 + timestep = 30 # days + extent = 360000000 / 13000 #// 360M sq km, all the oceans + + if 'polygon' in options: + extent = area.area({'type':'Polygon','coordinates':[ options['polygon'] ]}) / 13000 / 1000000 # poly area in units of 13000 sq. km. blocks + elif 'multipolygon' in options: + extents = [area.area({'type':'Polygon','coordinates':[x]}) / 13000 / 1000000 for x in options['multipolygon']] + extent = min(extents) + elif 'box' in options: + extent = area.area({'type':'Polygon','coordinates':[[ options['box'][0], [options['box'][1][0], options['box'][0][0]], options['box'][1], [options['box'][0][0], options['box'][1][0]], options['box'][0]]]}) / 13000 / 1000000 + + timestep = math.floor(maxbulk / extent) + + ## slice up in time bins: + start = None + end = None + if 'startDate' in options: + start = parsetime(options['startDate']) else: - return argofetch(route, options=options, apikey=apikey, apiroot=apiroot, verbose=verbose)[0] - + start = earliest_records[r] + if 'endDate' in options: + end = parsetime(options['endDate']) + else: + end = last_records[r] + + delta = datetime.timedelta(days=timestep) + times = [start] + while times[-1] + delta < end: + times.append(times[-1]+delta) + times.append(end) + times = [parsetime(x) for x in times] + + return times + # data munging helpers def data_inflate(data_doc, metadata_doc=None): @@ -225,26 +284,113 @@ def combine_data_lists(lists): combined_list.append(combined_sublist) return combined_list -def combine_dicts(list1, list2): - # given two lists of timeseries data objects, - # combine them matching on geolocation and level +def split_polygon(coords, max_lon_size=5, max_lat_size=5, winding=False): + # slice a geojson polygon up into a list of smaller polygons of maximum extent in lon and lat - combined_list = [] - for dict1 in list1: - combined = False - for dict2 in list2: - if dict1.get('geolocation') == dict2.get('geolocation') and dict1.get('level') == dict2.get('level'): - combined_dict = dict1.copy() - combined_dict['timeseries'] += dict2.get('timeseries', []) - data = combine_data_lists([dict1.get('data', []), dict2.get('data', [])]) - if len(data) > 0: - combined_dict['data'] = data - combined_list.append(combined_dict) - combined = True - list2.remove(dict2) # Remove combined element from list2 - break - if not combined: - combined_list.append(dict1) - combined_list.extend(list2) # Append remaining elements from list2 - return combined_list + # if a polygon bridges the dateline and wraps its longitudes around, + # we need to detect this and un-wrap. + # assume bounded region is the smaller region unless winding is being enforced, per mongo + coords = dont_wrap_dateline(coords) + + polygon = shape({"type": "Polygon", "coordinates": [coords]}) + + smaller_polygons = [] + min_lon, min_lat, max_lon, max_lat = polygon.bounds + + if winding and is_cw(coords): + # if winding is being enforced and the polygon is cw wound, + # we're looking for everything outside the polygon. + + lon = min_lon-10 + lat = -90 + while lon < min_lon + 360: + while lat < max_lat+10: # < 90: + bounding_box = box(lon, lat, lon + max_lon_size, lat + max_lat_size) + chunk = bounding_box.difference(polygon) + if not chunk.is_empty: + # Convert the Shapely geometry to a GeoJSON polygon and add it to the list + shapes = json.loads(gpd.GeoSeries([chunk]).to_json()) + if shapes['features'][0]['geometry']['type'] == 'Polygon': + smaller_polygons.append(shapes['features'][0]['geometry']['coordinates'][0]) + elif shapes['features'][0]['geometry']['type'] == 'MultiPolygon': + for poly in shapes['features'][0]['geometry']['coordinates']: + smaller_polygons.append(poly[0]) + + lat += max_lat_size + lat = -90 + lon += max_lon_size + else: + # Split the polygon interior into smaller polygons + + lon = min_lon + lat = min_lat + while lon < max_lon: + while lat < max_lat: + # Create a bounding box for the current chunk + bounding_box = box(lon, lat, lon + max_lon_size, lat + max_lat_size) + + # Intersect the bounding box with the original polygon + chunk = polygon.intersection(bounding_box) + + # If the intersection is not empty, add it to the list of smaller polygons + if not chunk.is_empty: + # Convert the Shapely geometry to a GeoJSON polygon and add it to the list + shapes = json.loads(gpd.GeoSeries([chunk]).to_json()) + if shapes['features'][0]['geometry']['type'] == 'Polygon': + smaller_polygons.append(shapes['features'][0]['geometry']['coordinates'][0]) + elif shapes['features'][0]['geometry']['type'] == 'MultiPolygon': + for poly in shapes['features'][0]['geometry']['coordinates']: + smaller_polygons.append(poly[0]) + + lat += max_lat_size + lat = min_lat + lon += max_lon_size + + return smaller_polygons + +def split_box(box, max_lon_size=5, max_lat_size=5): + # slice a box up into a list of smaller boxes of maximum extent in lon and lat + + if box[0][0] > box[1][0]: + # unwrap the dateline + box[1][0] += 360 + + smaller_boxes = [] + lon = box[0][0] + lat = box[0][1] + while lon < box[1][0]: + while lat < box[1][1]: + smaller_boxes.append([[lon, lat],[min(box[1][0], lon + max_lon_size), min(box[1][1], lat + max_lat_size)]]) + lat += max_lat_size + lat = box[0][1] + lon += max_lon_size + + return smaller_boxes + +def dont_wrap_dateline(coords): + # given a list of polygon coords, return them ensuring they dont modulo 360 over the dateline. + + for i in range(len(coords)-1): + if coords[i][0]*coords[i+1][0] < 0 and abs(coords[i][0] - coords[i+1][0]) > 180: + # ie if any geodesic edge crosses the dateline with a modulo, we must need to remap. + return [[lon + 360 if lon < 0 else lon, lat] for lon, lat in coords] + + return coords + +def generate_global_cells(lonstep=5, latstep=5): + cells = [] + lon = -180 + lat = -90 + while lon < 180: + while lat < 90: + cells.append([[lon,lat],[lon+lonstep,lat],[lon+lonstep,lat+latstep],[lon,lat+latstep],[lon,lat]]) + + lat += latstep + lat = -90 + lon += lonstep + return cells + +def is_cw(coords): + unwrap = dont_wrap_dateline(coords) + return Polygon(unwrap) == orient(Polygon(unwrap), sign=-1.0) diff --git a/dist/argovisHelpers-0.0.23-py3-none-any.whl b/dist/argovisHelpers-0.0.23-py3-none-any.whl new file mode 100644 index 0000000..3211e65 Binary files /dev/null and b/dist/argovisHelpers-0.0.23-py3-none-any.whl differ diff --git a/dist/argovisHelpers-0.0.23.tar.gz b/dist/argovisHelpers-0.0.23.tar.gz new file mode 100644 index 0000000..c6aac14 Binary files /dev/null and b/dist/argovisHelpers-0.0.23.tar.gz differ diff --git a/pyproject.toml b/pyproject.toml index b802101..887845c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "argovisHelpers" -version = "0.0.22" +version = "0.0.23" description = "Helper functions to consume and parse information from University of Colorado's Argovis API." readme = "README.md" authors = [{name = "Katie Mills" }] @@ -16,7 +16,9 @@ classifiers = [ ] dependencies = [ "requests", - "area" + "area", + "shapely==1.8.0", + "geopandas==0.14.3" ] requires-python = ">=3.9" diff --git a/tests/tests.py b/tests/tests.py index 8ef8c9a..0f3c836 100644 --- a/tests/tests.py +++ b/tests/tests.py @@ -161,25 +161,6 @@ def test_combine_data_lists(apiroot, apikey): assert helpers.combine_data_lists([a,b]) == [[1,2,5,6],[3,4,7,8]], 'failed to combine two data lists' assert helpers.combine_data_lists([a,b,c]) == [[1,2,5,6,10,11],[3,4,7,8,12,13]], 'failed to combine three data lists' -def test_combine_dicts(apiroot, apikey): - ''' - check basic behavior of combine_dics - ''' - - x = {'geolocation':{'type': 'Point', 'coordinates':[0,0]}, 'level':0, 'timeseries':[0,1,2], 'data': [[1,2,3],[4,5,6]]} - y = {'geolocation':{'type': 'Point', 'coordinates':[10,10]}, 'level':1, 'timeseries':[0,1,2], 'data': [[10,20,30],[40,50,60]]} - z = {'geolocation':{'type': 'Point', 'coordinates':[20,20]}, 'level':2, 'timeseries':[0,1,2], 'data': [[100,200,300],[400,500,600]]} - - X = {'geolocation':{'type': 'Point', 'coordinates':[0,0]}, 'level':0, 'timeseries':[3,4,5], 'data': [[11,21,31],[41,51,61]]} - Y = {'geolocation':{'type': 'Point', 'coordinates':[10,10]}, 'level':1, 'timeseries':[3,4,5], 'data': [[101,201,301],[401,501,601]]} - Z = {'geolocation':{'type': 'Point', 'coordinates':[20,20]}, 'level':2.1, 'timeseries':[3,4,5], 'data': [[1001,2001,3001],[4001,5001,6001]]} - - assert helpers.combine_dicts([x,y,z], [X,Z,Y]) == [ - {'geolocation':{'type': 'Point', 'coordinates':[0,0]}, 'level':0, 'timeseries':[0,1,2,3,4,5], 'data': [[1,2,3,11,21,31],[4,5,6,41,51,61]]}, - {'geolocation':{'type': 'Point', 'coordinates':[10,10]}, 'level':1, 'timeseries':[0,1,2,3,4,5], 'data': [[10,20,30,101,201,301],[40,50,60,401,501,601]]}, - {'geolocation':{'type': 'Point', 'coordinates':[20,20]}, 'level':2, 'timeseries':[0,1,2], 'data': [[100,200,300],[400,500,600]]}, - {'geolocation':{'type': 'Point', 'coordinates':[20,20]}, 'level':2.1, 'timeseries':[3,4,5], 'data': [[1001,2001,3001],[4001,5001,6001]]} - ], 'failed to combine timeseries fragments correctly' def test_timeseries_recombo(apiroot, apikey): ''' @@ -188,6 +169,7 @@ def test_timeseries_recombo(apiroot, apikey): slice_response = helpers.query('/timeseries/ccmpwind', options={'startDate':'1995-01-01T00:00:00Z', 'endDate':'2019-01-01T00:00:00Z', 'polygon': [[-10,-10],[10,-10],[10,10],[-10,10],[-10,-10]], 'data':'all'}, apikey=apikey, apiroot=apiroot) noslice_response = helpers.query('/timeseries/ccmpwind', options={'startDate':'1995-01-01T00:00:00Z', 'endDate':'2019-01-01T00:00:00Z', 'id': '0.125_0.125', 'data':'all'}, apikey=apikey, apiroot=apiroot) + assert slice_response[0]['data'] == noslice_response[0]['data'], 'mismatch on data recombination' assert slice_response[0]['timeseries'] == noslice_response[0]['timeseries'], 'mismatch on timestamp recombination' @@ -202,3 +184,32 @@ def test_timeseries_recombo_edges(apiroot, apikey): assert 'timeseries' not in response[0], 'make sure timeseries recombination doesnt coerce a timeseries key onto a document that shouldnt have one' +def test_is_cw(apiroot, apikey): + ''' + check basic behavior of cw checker + ''' + + assert helpers.is_cw([[0,0],[0,10],[10,10],[10,0],[0,0]]), 'basic CW example failed' + assert not helpers.is_cw([[0,0],[10,0],[10,10],[0,10],[0,0]]), 'basic CCW example failed' + assert helpers.is_cw([[175,0],[175,10],[-175,10],[-175,0],[175,0]]), 'CW wrapping dateline example failed' + assert helpers.is_cw([[175,0],[175,10],[185,10],[185,0],[175,0]]), 'CW example crossing dateline failed' + +def test_generate_global_cells(apiroot, apikey): + ''' + check basic behavor of generate_global_cells + ''' + + assert len(helpers.generate_global_cells()) == 2592, 'global 5x5 grid generated wrong number of cells' + assert helpers.generate_global_cells()[0] == [[-180,-90],[-175,-90],[-175,-85],[-180,-85],[-180,-90]], 'first cell of globabl 5x5 grid generated incorrectly' + +def test_dont_wrap_dateline(apiroot, apikey): + ''' + check basic behavior of dont_wrap_dateline + ''' + + assert helpers.dont_wrap_dateline([[-175,0],[-175,10],[175,10],[175,0],[-175,0]]) == [[185,0],[185,10],[175,10],[175,0],[185,0]], 'basic dateline unwrap failed' + assert helpers.dont_wrap_dateline([[-175,0],[175,0],[175,10],[-175,10],[-175,0]]) == [[185,0],[175,0],[175,10],[185,10],[185,0]], 'unwrap cw' + assert helpers.dont_wrap_dateline([[5,0],[-5,0],[-5,5],[5,5],[5,0]]) == [[5,0],[-5,0],[-5,5],[5,5],[5,0]], 'unwrap shoudnt affect meridian crossing' + + +