From 51654dac733cdb8f3690273b69b5c36487dbcf24 Mon Sep 17 00:00:00 2001 From: Jake Mensch Date: Sun, 15 Mar 2020 08:26:27 -0700 Subject: [PATCH 1/6] ingest fixes --- server/src/app.py | 74 +++-- server/src/services/databaseOrm.py | 122 ++------- server/src/services/socrataClient.py | 51 ++++ server/src/services/sqlIngest.py | 391 ++++++++++----------------- server/src/settings.example.cfg | 1 + 5 files changed, 276 insertions(+), 363 deletions(-) create mode 100644 server/src/services/socrataClient.py diff --git a/server/src/app.py b/server/src/app.py index 8f4ca68b9..be3fe15b6 100644 --- a/server/src/app.py +++ b/server/src/app.py @@ -97,27 +97,61 @@ async def sample_route(request): @app.route('/ingest', methods=["POST"]) @compress.compress() async def ingest(request): - """Accept POST requests with a list of years to import. - Query parameter name is 'years', and parameter value is - a comma-separated list of years to import. - Ex. '/ingest?years=2015,2016,2017' """ - current_year = datetime.now().year - querySize = request.args.get("querySize", None) - limit = request.args.get("limit", None) - ALLOWED_YEARS = [year for year in range(2015, current_year+1)] - if not request.args.get("years"): - return json({"error": "'years' parameter is required."}) - years = set([int(year) for year in request.args.get("years").split(",")]) - if not all(year in ALLOWED_YEARS for year in years): - return json({"error": - f"'years' param values must be one of {ALLOWED_YEARS}"}) - loader = DataHandler(app.config['Settings']) - loader.populateFullDatabase(yearRange=years, - querySize=querySize, - limit=limit) - return_data = {'response': 'ingest ok'} - return json(return_data) + Query parameters: + years: + a comma-separated list of years to import. + Ex. '/ingest?years=2015,2016,2017' + defaults to range(2015, 2021) + limit: + the max number of records per year + defaults to 2000000 + querySize: + the number of records per request to socrata + defaults to 50000 + + Counts: + These are the counts you can expect if you do the full ingest: + + 2015: 237305 + 2016: 952486 + 2017: 1131558 + 2018: 1210075 + 2019: 1308093 + 2020: 319628 (and counting) + + GET https://data.lacity.org/resource/{ID}.json?$select=count(srnumber) + + Hint: + Run /ingest without params to get all socrata data + """ + # parse params + years = request.args.get('years', None) + limit = request.args.get('limit', None) + querySize = request.args.get('querySize', None) + + # validate params + if years is None: + years = range(2015, 2021) + else: + current_year = datetime.now().year + allowed_years = [year for year in range(2015, current_year+1)] + years = set([int(year) for year in years.split(',')]) + if not all(year in allowed_years for year in years): + return json({ + 'error': f"'years' param values must be one of {allowed_years}" + }) + + limit = int(limit) if limit else 2000000 + querySize = int(querySize) if querySize else 50000 + querySize = min([limit, querySize]) + + # get data + loader = DataHandler(app.config['Settings']['Database']) + data = await loader.populateDatabase(years=years, + limit=limit, + querySize=querySize) + return json(data) @app.route('/update') diff --git a/server/src/services/databaseOrm.py b/server/src/services/databaseOrm.py index 4b03220a7..9a7563985 100644 --- a/server/src/services/databaseOrm.py +++ b/server/src/services/databaseOrm.py @@ -1,4 +1,4 @@ -from sqlalchemy import Column, Integer, String, DateTime, Float +from sqlalchemy import Column, Integer, String, DateTime, Float, JSON from sqlalchemy.ext.declarative import declarative_base @@ -16,20 +16,33 @@ def _asdict(self): class Ingest(Base, Mixin): __tablename__ = 'ingest_staging_table' - srnumber = Column(String(50), primary_key=True, unique=True) + + # a temporary primary key + id = Column(Integer, primary_key=True, autoincrement=True) + + # becomes the primary key after deduplication + srnumber = Column(String) + + # dates createddate = Column(DateTime) updateddate = Column(DateTime) + servicedate = Column(DateTime) + closeddate = Column(DateTime) + + # about + requesttype = Column(String) + requestsource = Column(String) actiontaken = Column(String) owner = Column(String) - requesttype = Column(String) status = Column(String) - requestsource = Column(String) createdbyuserorganization = Column(String) mobileos = Column(String) anonymous = Column(String) assignto = Column(String) - servicedate = Column(String) - closeddate = Column(String) + + # location + latitude = Column(Float) + longitude = Column(Float) addressverified = Column(String) approximateaddress = Column(String) address = Column(String) @@ -38,96 +51,17 @@ class Ingest(Base, Mixin): streetname = Column(String) suffix = Column(String) zipcode = Column(String) - latitude = Column(String) - longitude = Column(String) - location = Column(String) - tbmpage = Column(String) - tbmcolumn = Column(String) - tbmrow = Column(String) + location = Column(JSON) + + # politics apc = Column(String) - cd = Column(String) + cd = Column(Integer) cdmember = Column(String) - nc = Column(String) + nc = Column(Integer) ncname = Column(String) policeprecinct = Column(String) - -insertFields = {'srnumber': String(50), - 'createddate': DateTime, - 'updateddate': DateTime, - 'actiontaken': String(30), - 'owner': String(10), - 'requesttype': String(30), - 'status': String(20), - 'requestsource': String(30), - 'createdbyuserorganization': String(16), - 'mobileos': String(10), - 'anonymous': String(10), - 'assignto': String(20), - 'servicedate': String(30), - 'closeddate': String(30), - 'addressverified': String(16), - 'approximateaddress': String(20), - 'address': String(250), - 'housenumber': String(10), - 'direction': String(10), - 'streetname': String(50), - 'suffix': String(10), - 'zipcode': Integer, - 'latitude': Float, - 'longitude': Float, - 'location': String(250), - 'tbmpage': Integer, - 'tbmcolumn': String(10), - 'tbmrow': Float, - 'apc': String(30), - 'cd': Float, - 'cdmember': String(30), - 'nc': Float, - 'ncname': String(100), - 'policeprecinct': String(30)} - - -readFields = {'SRNumber': str, - 'CreatedDate': str, - 'UpdatedDate': str, - 'ActionTaken': str, - 'Owner': str, - 'RequestType': str, - 'Status': str, - 'RequestSource': str, - 'MobileOS': str, - 'Anonymous': str, - 'AssignTo': str, - 'ServiceDate': str, - 'ClosedDate': str, - 'AddressVerified': str, - 'ApproximateAddress': str, - 'Address': str, - 'HouseNumber': str, - 'Direction': str, - 'StreetName': str, - 'Suffix': str, - 'ZipCode': str, - 'Latitude': str, - 'Longitude': str, - 'Location': str, - 'TBMPage': str, - 'TBMColumn': str, - 'TBMRow': str, - 'APC': str, - 'CD': str, - 'CDMember': str, - 'NC': str, - 'NCName': str, - 'PolicePrecinct': str} - - -tableFields = ['srnumber', 'createddate', 'updateddate', 'actiontaken', - 'owner', 'requesttype', 'status', 'requestsource', - 'createdbyuserorganization', 'mobileos', 'anonymous', - 'assignto', 'servicedate', 'closeddate', 'addressverified', - 'approximateaddress', 'address', 'housenumber', 'direction', - 'streetname', 'suffix', 'zipcode', 'latitude', 'longitude', - 'location', 'tbmpage', 'tbmcolumn', 'tbmrow', 'apc', 'cd', - 'cdmember', 'nc', 'ncname', 'policeprecinct'] + # misc + tbmpage = Column(String) + tbmcolumn = Column(String) + tbmrow = Column(Integer) diff --git a/server/src/services/socrataClient.py b/server/src/services/socrataClient.py new file mode 100644 index 000000000..a8b5bf964 --- /dev/null +++ b/server/src/services/socrataClient.py @@ -0,0 +1,51 @@ +""" +This is a simple wrapper for Socrata that handles a couple of issues: +1. retrying requests in the event of a timeout or other failure +2. grabbing the dataset id from the config based on the year +3. automatically closing the client when we're done using it + +Usage is like this: + socrata = SocrataClient() + results = socrata.get(year, **kwargs) + +kwargs are all of the normal socrata kwargs - select, limit, etc. +""" + +from sodapy import Socrata +from configparser import ConfigParser + + +class SocrataClient: + def __init__(self): + config = ConfigParser() + config.read('./settings.cfg') + config = config['Socrata'] + + domain = config['DOMAIN'] + token = None if config['TOKEN'] == 'None' else config['TOKEN'] + timeout = int(config['TIMEOUT']) + + self.client = Socrata(domain, token, timeout=timeout) + self.attempts = int(config['ATTEMPTS']) + self.config = config + + def __del__(self): + self.client.close() + + def dataset_id(self, year): + return self.config['AP' + str(year)] + + def get(self, year, **kwargs): + id = self.dataset_id(year) + for attempt in range(self.attempts): + try: + return self.client.get(id, **kwargs) + except Exception as e: + if attempt < self.attempts - 1: + continue + else: + raise e + + def get_metadata(self, year): + id = self.dataset_id(year) + return self.client.get_metadata(id) diff --git a/server/src/services/sqlIngest.py b/server/src/services/sqlIngest.py index cf152444c..0e7154a5e 100644 --- a/server/src/services/sqlIngest.py +++ b/server/src/services/sqlIngest.py @@ -1,254 +1,147 @@ -import os +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy.sql import text import time -import numpy as np -import pandas as pd -import sqlalchemy as db -from sodapy import Socrata -from configparser import ConfigParser -if __name__ == '__main__': - # Contains db specs and field definitions - from databaseOrm import tableFields, insertFields, readFields -else: - from .databaseOrm import tableFields, insertFields, readFields +from .databaseOrm import Ingest, Base +from .socrataClient import SocrataClient + + +def log(message): + print(message, flush=True) + + +class Timer(): + def __init__(self): + self.start = time.perf_counter() + + def end(self): + return round((time.perf_counter() - self.start) / 60, 2) class DataHandler: - def __init__(self, config=None, configFilePath=None, separator=','): - self.data = None - self.config = config - self.dbString = None if not self.config \ - else self.config['Database']['DB_CONNECTION_STRING'] - self.token = None if config['Socrata']['TOKEN'] == 'None' \ - else config['Socrata']['TOKEN'] - self.timeout = int(self.config['Socrata']['TIMEOUT']) - self.filePath = None - self.configFilePath = configFilePath - self.separator = separator - self.fields = tableFields - self.insertParams = insertFields - self.readParams = readFields - self.dialect = self.dbString.split(':')[0] - - def loadData(self, fileName="2018_mini"): - '''Load dataset into pandas object''' - if self.separator == ',': - dataFile = fileName + ".csv" - else: - dataFile = fileName + ".tsv" - - self.filePath = os.path.join(self.config['Database']['DATA_DIRECTORY'], - dataFile) - - print('Loading dataset %s' % self.filePath) - self.data = pd.read_table(self.filePath, - sep=self.separator, - na_values=['nan'], - dtype=self.readParams) - - def elapsedTimer(self, timeVal): - '''Simple timer method to report on elapsed time for each method''' - return (time.time() - timeVal) / 60 - - def cleanData(self): - '''Perform general data filtering''' - print('Cleaning data...') - cleanTimer = time.time() - data = self.data - zipIndex = (data['zipcode'].str.isdigit()) | (data['zipcode'].isna()) - data['zipcode'].loc[~zipIndex] = np.nan - # Format dates as datetime (Time intensive) - if 'createddate' in data.columns: - data['createddate'] = pd.to_datetime(data['createddate']) - if 'closeddate' in data.columns: - data['closeddate'] = pd.to_datetime(data['closeddate']) - if 'servicedate' in data.columns: - data['servicedate'] = pd.to_datetime(data['servicedate']) - data['location'] = data.location.astype(str) - # Check for column consistency - for f in self.fields: - if f not in self.data.columns: - print('\tcolumn %s missing - substituting NaN values' % f) - data[f] = np.NaN - for f in data.columns: - if f not in self.fields: - print('\tcolumn %s not in defined set - dropping column' % f) - data = data[self.fields] - # self.data = self.data.drop(f) - self.data = data - print('\tCleaning Complete: %.1f minutes' % - self.elapsedTimer(cleanTimer)) - - def ingestData(self, ingestMethod='replace', - tableName='ingest_staging_table'): - '''Set up connection to database''' - print('Inserting data into ' + self.dialect + ' instance...') - ingestTimer = time.time() - data = self.data.copy() # shard deepcopy for other endpoint operations - engine = db.create_engine(self.dbString) - newColumns = [column.replace(' ', '_').lower() for column in data] - data.columns = newColumns - # Ingest data - # Schema is same as database in MySQL; - # schema here is set to db name in connection string - data.to_sql(tableName, - engine, - if_exists=ingestMethod, - schema='public', - index=False, - chunksize=10, - method='multi', - dtype=self.insertParams) - print('\tIngest Complete: %.1f minutes' % - self.elapsedTimer(ingestTimer)) - - def dumpFilteredCsvFile(self, - dataset, - startDate, - requestType, - councilName): - '''Output data as CSV by council name, request type, and - start date (pulls to current date). Arguments should be passed - as strings. Date values must be formatted %Y-%m-%d.''' - df = dataset.copy() # Shard deepcopy to allow multiple endpoints - # Data filtering - dateFilter = df['createddate'] > startDate - requestFilter = df['requesttype'] == requestType - councilFilter = df['ncname'] == councilName - df = df[dateFilter & requestFilter & councilFilter] - # Return string object for routing to download - return df.to_csv() - - def saveCsvFile(self, filename): - '''Save contents of self.data to CSV output''' - self.data.to_csv(filename, index=False) - - def fetchSocrata(self, - year=2019, - querySize=10000, - totalRequestRecords=10**7): - '''Fetch data from Socrata connection and return pandas dataframe''' - # Load config files - print('Retrieving partial Socrata query...') - socrata_domain = self.config['Socrata']['DOMAIN'] - socrata_dataset_identifier = self.config['Socrata']['AP' + str(year)] - socrata_token = self.token - # Establish connection to Socrata resource - client = Socrata(socrata_domain, socrata_token) - client.timeout = self.timeout - # Fetch data - # Loop for querying dataset - tableInit = False - query = int(querySize) - maxRecords = int(totalRequestRecords) - for i in range(0, maxRecords, query): - fetchTimer = time.time() - print('Fetching %d records with offset %d up to a max of %d' - % (query, i, maxRecords)) - results = client.get(socrata_dataset_identifier, - offset=i, - select="*", - order="updateddate DESC", - limit=query) - if not results: + def __init__(self, config=None): + self.engine = create_engine(config['DB_CONNECTION_STRING']) + self.session = sessionmaker(bind=self.engine)() + self.socrata = SocrataClient() + + def __del__(self): + self.session.close() + + def resetDatabase(self): + log('\nResetting database.') + Base.metadata.drop_all(self.engine) + Base.metadata.create_all(self.engine) + + def fetchData(self, year, offset, limit): + log('\tFetching {} rows, offset {}'.format(limit, offset)) + return self.socrata.get(year, + select="*", + offset=offset, + limit=limit) + + def insertData(self, rows): + self.session.bulk_insert_mappings(Ingest, rows) + self.session.commit() + + def ingestYear(self, year, limit, querySize): + log('\nIngesting up to {} rows for year {}'.format(limit, year)) + timer = Timer() + + rowsInserted = 0 + endReached = False + + for offset in range(0, limit, querySize): + rows = self.fetchData(year, offset, querySize) + self.insertData(rows) + rowsInserted += len(rows) + + if len(rows) < querySize: + endReached = True break - tempDf = pd.DataFrame.from_dict(results) - self.data = tempDf - self.cleanData() - if not tableInit: - self.ingestData(ingestMethod='replace') - tableInit = True - else: - self.ingestData(ingestMethod='append') - print('%d records retrieved in %.2f minutes' % - (self.data.shape[0], self.elapsedTimer(fetchTimer))) - - def fetchSocrataFull(self, year=2019, limit=10**7): - '''Fetch entirety of dataset via Socrata''' - # Load config files - print('Downloading %d data from Socrata data source...' % year) - downloadTimer = time.time() - socrata_domain = self.config['Socrata']['DOMAIN'] - socrata_dataset_identifier = self.config['Socrata']['AP' + str(year)] - socrata_token = self.token - # Establish connection to Socrata resource - client = Socrata(socrata_domain, socrata_token) - results = client.get(socrata_dataset_identifier, limit=limit) - self.data = pd.DataFrame.from_dict(results) - print('\tDownload Complete: %.1f minutes' % - self.elapsedTimer(downloadTimer)) - - def populateFullDatabase(self, - yearRange=range(2015, 2021), - querySize=None, - limit=None): - '''Fetches all data from Socrata to populate database - Default operation is to fetch data from 2015-2020 - !!! Be aware that each fresh import will wipe the - existing staging table''' - print('Performing {} population from data source'.format(self.dialect)) - globalTimer = time.time() - for y in yearRange: - self.fetchSocrata(year=y, - querySize=querySize, - totalRequestRecords=limit) - - print('All Operations Complete: %.1f minutes' % - self.elapsedTimer(globalTimer)) - - def updateDatabase(self): - '''Incrementally updates database with contents of data attribute - overwriting pre-existing records with the same srnumber''' - def fix_nan_vals(resultDict): - '''sqlAlchemy will not take NaT or NaN values for - insert in some fields. They must be replaced - with None values''' - for key in resultDict: - if resultDict[key] is pd.NaT or resultDict[key] is np.nan: - resultDict[key] = None - # Also doesn't like nested dictionaries - if type(resultDict[key]) is dict: - resultDict[key] = str(resultDict[key]) - return resultDict - - print('Updating database with new records...') - engine = db.create_engine(self.dbString) - metadata = db.MetaData() - staging = db.Table('ingest_staging_table', - metadata, - autoload=True, - autoload_with=engine) - connection = engine.connect() - row = None - updateTimer = time.time() - updated = 0 - inserted = 0 - for srnumber in self.data.srnumber: - stmt = (db.select([staging]) - .where(staging.columns.srnumber == srnumber)) - results = connection.execute(stmt).fetchall() - # print(srnumber, results) - # Delete the record if it is already there - if len(results) > 0: - delete_stmt = (db.delete(staging) - .where(staging.columns.srnumber == srnumber)) - connection.execute(delete_stmt) - updated += 1 - else: - inserted += 1 - # Write record - insert_stmt = db.insert(staging) - row = self.data[self.data.srnumber == srnumber].to_dict('results') - row = [fix_nan_vals(r) for r in row] - connection.execute(insert_stmt, row) - print('Operation Complete: %d inserts, %d updates in %.2f minutes' % - (inserted, updated, self.elapsedTimer(updateTimer))) - - -if __name__ == "__main__": - '''Class DataHandler workflow from initial load to SQL population''' - config = ConfigParser() - config.read('../settings.cfg') - loader = DataHandler(config) - loader.fetchSocrataFull() - loader.cleanData() - loader.ingestData(tableName='ingest_staging_table') + + minutes = timer.end() + log('\tDone with {} after {} minutes.'.format(year, minutes)) + log('\tRows inserted: {}'.format(rowsInserted)) + + return { + 'year': year, + 'rowsInserted': rowsInserted, + 'endReached': endReached, + 'minutesElapsed': minutes, + } + + def cleanTable(self): + def exec_sql(sql): + with self.engine.connect() as conn: + return conn.execute(text(sql)) + + def dropDuplicates(table, report): + rows = exec_sql(f""" + DELETE FROM {table} a USING {table} b + WHERE a.id < b.id AND a.srnumber = b.srnumber; + """) + + report.append({ + 'description': 'dropped duplicate rows by srnumber', + 'rows': rows.rowcount + }) + + def switchPrimaryKey(table, report): + exec_sql(f""" + ALTER TABLE {table} DROP COLUMN id; + ALTER TABLE {table} ADD PRIMARY KEY (srnumber); + """) + + report.append({ + 'description': 'switched primary key column to srnumber', + 'rows': 'N/A' + }) + + def removeInvalidClosedDates(table, report): + result = exec_sql(f""" + UPDATE {table} + SET closeddate = NULL + WHERE closeddate::timestamp < createddate::timestamp; + """) + + report.append({ + 'description': 'removed invalid closed dates', + 'rowsAffected': result.rowcount + }) + + log('\nCleaning ingest table.') + table = Ingest.__tablename__ + report = [] + + dropDuplicates(table, report) + switchPrimaryKey(table, report) + removeInvalidClosedDates(table, report) + + return report + + async def populateDatabase(self, + years=range(2015, 2021), + limit=2000000, + querySize=50000): + log('\nPopulating database for years: {}'.format(list(years))) + timer = Timer() + + self.resetDatabase() + + insertReport = [] + for year in years: + inserts = self.ingestYear(year, limit, querySize) + insertReport.append(inserts) + + cleanReport = self.cleanTable() + + minutes = timer.end() + log('\nDone with ingestion after {} minutes.\n'.format(minutes)) + + report = { + 'insertion': insertReport, + 'cleaning': cleanReport, + 'totalMinutesElapsed': minutes + } + log(report) + return report diff --git a/server/src/settings.example.cfg b/server/src/settings.example.cfg index 845340958..0866ce574 100644 --- a/server/src/settings.example.cfg +++ b/server/src/settings.example.cfg @@ -13,6 +13,7 @@ REDACTED = REDACTED [Socrata] TOKEN = None TIMEOUT = 90 +ATTEMPTS = 5 DOMAIN = data.lacity.org AP2020 = rq3b-xjk8 AP2019 = pvft-t768 From 0c5e42b79ce4000168074583fd98f0bdeed64d49 Mon Sep 17 00:00:00 2001 From: Jake Mensch Date: Sun, 29 Mar 2020 16:55:19 -0700 Subject: [PATCH 2/6] linting --- server/src/app.py | 2 +- server/src/services/sqlIngest.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/app.py b/server/src/app.py index be3fe15b6..32d8cf010 100644 --- a/server/src/app.py +++ b/server/src/app.py @@ -119,7 +119,7 @@ async def ingest(request): 2018: 1210075 2019: 1308093 2020: 319628 (and counting) - + GET https://data.lacity.org/resource/{ID}.json?$select=count(srnumber) Hint: diff --git a/server/src/services/sqlIngest.py b/server/src/services/sqlIngest.py index 0e7154a5e..5677b5724 100644 --- a/server/src/services/sqlIngest.py +++ b/server/src/services/sqlIngest.py @@ -49,7 +49,7 @@ def ingestYear(self, year, limit, querySize): rowsInserted = 0 endReached = False - + for offset in range(0, limit, querySize): rows = self.fetchData(year, offset, querySize) self.insertData(rows) From cb8b0aa157371c2696e6f9546eed845e299f171a Mon Sep 17 00:00:00 2001 From: Jake Mensch Date: Sun, 29 Mar 2020 19:07:05 -0700 Subject: [PATCH 3/6] pretty-printing final report --- server/src/services/sqlIngest.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/server/src/services/sqlIngest.py b/server/src/services/sqlIngest.py index 5677b5724..4f728cbe2 100644 --- a/server/src/services/sqlIngest.py +++ b/server/src/services/sqlIngest.py @@ -2,6 +2,7 @@ from sqlalchemy.orm import sessionmaker from sqlalchemy.sql import text import time +import json from .databaseOrm import Ingest, Base from .socrataClient import SocrataClient @@ -143,5 +144,5 @@ async def populateDatabase(self, 'cleaning': cleanReport, 'totalMinutesElapsed': minutes } - log(report) + log(json.dumps(report, indent=2)) return report From ecb2bd3f2f5fd056d3877055a0b6ef6e41540fad Mon Sep 17 00:00:00 2001 From: Jake Mensch Date: Mon, 30 Mar 2020 21:10:21 -0700 Subject: [PATCH 4/6] passing config from app.py to socrata --- server/src/app.py | 2 +- server/src/services/socrataClient.py | 5 +---- server/src/services/sqlIngest.py | 6 ++++-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/server/src/app.py b/server/src/app.py index 32d8cf010..5a0a1d7e9 100644 --- a/server/src/app.py +++ b/server/src/app.py @@ -147,7 +147,7 @@ async def ingest(request): querySize = min([limit, querySize]) # get data - loader = DataHandler(app.config['Settings']['Database']) + loader = DataHandler(app.config['Settings']) data = await loader.populateDatabase(years=years, limit=limit, querySize=querySize) diff --git a/server/src/services/socrataClient.py b/server/src/services/socrataClient.py index a8b5bf964..028321fc1 100644 --- a/server/src/services/socrataClient.py +++ b/server/src/services/socrataClient.py @@ -12,13 +12,10 @@ """ from sodapy import Socrata -from configparser import ConfigParser class SocrataClient: - def __init__(self): - config = ConfigParser() - config.read('./settings.cfg') + def __init__(self, config=None): config = config['Socrata'] domain = config['DOMAIN'] diff --git a/server/src/services/sqlIngest.py b/server/src/services/sqlIngest.py index 4f728cbe2..32dd37f5c 100644 --- a/server/src/services/sqlIngest.py +++ b/server/src/services/sqlIngest.py @@ -21,9 +21,11 @@ def end(self): class DataHandler: def __init__(self, config=None): - self.engine = create_engine(config['DB_CONNECTION_STRING']) + dbString = config['Database']['DB_CONNECTION_STRING'] + + self.engine = create_engine(dbString) self.session = sessionmaker(bind=self.engine)() - self.socrata = SocrataClient() + self.socrata = SocrataClient(config) def __del__(self): self.session.close() From bf1a739a0a7ded87a301ad9a480d97fbe8f77121 Mon Sep 17 00:00:00 2001 From: Jake Mensch Date: Mon, 30 Mar 2020 21:15:13 -0700 Subject: [PATCH 5/6] switched ingest to GET --- server/src/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/app.py b/server/src/app.py index 5a0a1d7e9..2072245e5 100644 --- a/server/src/app.py +++ b/server/src/app.py @@ -94,7 +94,7 @@ async def sample_route(request): return json(sample_dataset) -@app.route('/ingest', methods=["POST"]) +@app.route('/ingest', methods=["GET"]) @compress.compress() async def ingest(request): """ From cb1efe98e833847c30aad718d3129c30c05e35f0 Mon Sep 17 00:00:00 2001 From: Jake Mensch Date: Mon, 30 Mar 2020 21:33:47 -0700 Subject: [PATCH 6/6] added default ingestion params to settings.cfg --- server/src/app.py | 35 +++++++++++++++----------------- server/src/services/sqlIngest.py | 5 +---- server/src/settings.example.cfg | 5 +++++ 3 files changed, 22 insertions(+), 23 deletions(-) diff --git a/server/src/app.py b/server/src/app.py index 2072245e5..0b365e8c6 100644 --- a/server/src/app.py +++ b/server/src/app.py @@ -102,13 +102,10 @@ async def ingest(request): years: a comma-separated list of years to import. Ex. '/ingest?years=2015,2016,2017' - defaults to range(2015, 2021) limit: the max number of records per year - defaults to 2000000 querySize: the number of records per request to socrata - defaults to 50000 Counts: These are the counts you can expect if you do the full ingest: @@ -125,25 +122,25 @@ async def ingest(request): Hint: Run /ingest without params to get all socrata data """ + # parse params - years = request.args.get('years', None) - limit = request.args.get('limit', None) - querySize = request.args.get('querySize', None) + defaults = app.config['Settings']['Ingestion'] + + years = request.args.get('years', defaults['YEARS']) + limit = request.args.get('limit', defaults['LIMIT']) + querySize = request.args.get('querySize', defaults['QUERY_SIZE']) # validate params - if years is None: - years = range(2015, 2021) - else: - current_year = datetime.now().year - allowed_years = [year for year in range(2015, current_year+1)] - years = set([int(year) for year in years.split(',')]) - if not all(year in allowed_years for year in years): - return json({ - 'error': f"'years' param values must be one of {allowed_years}" - }) - - limit = int(limit) if limit else 2000000 - querySize = int(querySize) if querySize else 50000 + current_year = datetime.now().year + allowed_years = [year for year in range(2015, current_year+1)] + years = set([int(year) for year in years.split(',')]) + if not all(year in allowed_years for year in years): + return json({ + 'error': f"'years' param values must be one of {allowed_years}" + }) + + limit = int(limit) + querySize = int(querySize) querySize = min([limit, querySize]) # get data diff --git a/server/src/services/sqlIngest.py b/server/src/services/sqlIngest.py index 32dd37f5c..7a147b6fb 100644 --- a/server/src/services/sqlIngest.py +++ b/server/src/services/sqlIngest.py @@ -122,10 +122,7 @@ def removeInvalidClosedDates(table, report): return report - async def populateDatabase(self, - years=range(2015, 2021), - limit=2000000, - querySize=50000): + async def populateDatabase(self, years=[], limit=None, querySize=None): log('\nPopulating database for years: {}'.format(list(years))) timer = Timer() diff --git a/server/src/settings.example.cfg b/server/src/settings.example.cfg index 0866ce574..58c38faf4 100644 --- a/server/src/settings.example.cfg +++ b/server/src/settings.example.cfg @@ -21,3 +21,8 @@ AP2018 = h65r-yf5i AP2017 = d4vt-q4t5 AP2016 = ndkd-k878 AP2015 = ms7h-a45h + +[Ingestion] +YEARS=2015,2016,2017,2018,2019,2020 +LIMIT=2000000 +QUERY_SIZE=50000