Skip to content

Commit

Permalink
Query Prereq Check (#987)
Browse files Browse the repository at this point in the history
* prereq dict added to query

* prereq checking mechanism implemented, not tested yet

* prereq checking tested

* change to more flexible filter handling

* make safety_rate and safety_max_value floats

* ignore nulls in fact_top_scores

* fix typo

* remove unneeded import

* replace uneccessary use of list to set

* add queries to pre-bin histogram data

* fix the serialization issue with set, convert to list before write as json

* fix query

* fix query

* fixed query bug

Co-authored-by: liljonnystyle <[email protected]>
  • Loading branch information
brentgryffindor and liljonnystyle authored Jul 8, 2020
1 parent 5b7e8b2 commit c4ba7ad
Show file tree
Hide file tree
Showing 3 changed files with 221 additions and 113 deletions.
40 changes: 39 additions & 1 deletion flow/data_pipeline/data_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""contains class and helper functions for the data pipeline."""
import pandas as pd
import boto3
from flow.data_pipeline.query import QueryStrings
from botocore.exceptions import ClientError
from flow.data_pipeline.query import QueryStrings, prerequisites
from time import time
from datetime import date
import csv
from io import StringIO
import json


def generate_trajectory_table(data_path, extra_info, partition_name):
Expand Down Expand Up @@ -158,6 +160,42 @@ def update_baseline(s3, baseline_network, baseline_source_id):
Body=new_str.getvalue().replace('\r', '').encode())


def get_completed_queries(s3, source_id):
"""Return the deserialized list of completed queries from S3."""
try:
completed_queries_obj = \
s3.get_object(Bucket='circles.data.pipeline', Key='lambda_temp/{}'.format(source_id))['Body']
completed_queries = json.loads(completed_queries_obj.read().decode('utf-8'))
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
completed_queries = set()
else:
raise
return set(completed_queries)


def put_completed_queries(s3, completed_queries):
"""Put all the completed queries lists into S3 as in a serialized json format."""
for source_id, completed_queries_set in completed_queries.items():
completed_queries_list = list(completed_queries_set)
completed_queries_json = json.dumps(completed_queries_list)
s3.put_object(Bucket='circles.data.pipeline', Key='lambda_temp/{}'.format(source_id),
Body=completed_queries_json.encode('utf-8'))


def get_ready_queries(completed_queries, new_query):
"""Return queries whose prerequisite queries are completed."""
readied_queries = []
unfinished_queries = prerequisites.keys() - completed_queries
upadted_completed_queries = completed_queries.copy()
upadted_completed_queries.add(new_query)
for query_name in unfinished_queries:
if not prerequisites[query_name][1].issubset(completed_queries):
if prerequisites[query_name][1].issubset(upadted_completed_queries):
readied_queries.append((query_name, prerequisites[query_name][0]))
return readied_queries


class AthenaQuery:
"""Class used to run queries.
Expand Down
79 changes: 39 additions & 40 deletions flow/data_pipeline/lambda_function.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
"""lambda function on AWS Lambda."""
import boto3
from urllib.parse import unquote_plus
from flow.data_pipeline.data_pipeline import AthenaQuery, delete_obsolete_data, update_baseline
from flow.data_pipeline.query import tags, tables, network_using_edge, summary_tables
from flow.data_pipeline.query import X_FILTER, EDGE_FILTER, WARMUP_STEPS, HORIZON_STEPS
from flow.data_pipeline.data_pipeline import AthenaQuery, delete_obsolete_data, update_baseline, \
get_ready_queries, get_completed_queries, put_completed_queries
from flow.data_pipeline.query import tables, network_filters, summary_tables, triggers

s3 = boto3.client('s3')
queryEngine = AthenaQuery()


def lambda_handler(event, context):
"""Handle S3 put event on AWS Lambda."""
# stores all lists of completed query for each source_id
completed = {}
records = []
# do a pre-sweep to handle tasks other than initalizing a query
for record in event['Records']:
Expand All @@ -19,58 +21,55 @@ def lambda_handler(event, context):
table = key.split('/')[0]
if table not in tables:
continue

# delete unwanted metadata files
if (key[-9:] == '.metadata'):
s3.delete_object(Bucket=bucket, Key=key)
continue

s3.delete_object(Bucket=bucket, Key=(key + '.metadata'))
# load the partition for newly added table
query_date = key.split('/')[-3].split('=')[-1]
partition = key.split('/')[-2].split('=')[-1]
source_id = "flow_{}".format(partition.split('_')[1])
if table == "fact_vehicle_trace":
query_name = "FACT_VEHICLE_TRACE"
else:
query_name = partition.replace(source_id, "")[1:]
queryEngine.repair_partition(table, query_date, partition)

# delete obsolete data
if table in summary_tables:
delete_obsolete_data(s3, key, table)

# add table that need to start a query to list
if table in tags.keys():
records.append((bucket, key, table, query_date, partition))
if query_name in triggers:
records.append((bucket, key, table, query_name, query_date, partition, source_id))

# initialize the queries
start_filter = WARMUP_STEPS
stop_filter = WARMUP_STEPS + HORIZON_STEPS
for bucket, key, table, query_date, partition in records:
source_id = "flow_{}".format(partition.split('_')[1])
for bucket, key, table, query_name, query_date, partition, source_id in records:
# retrieve the set of completed query for this source_id if not already available
if source_id not in completed.keys():
completed[source_id] = get_completed_queries(s3, source_id)
# if query already recorded before, skip it. This is to tolerate repetitive execution by Lambda
if query_name in completed[source_id]:
continue
# retrieve metadata and use it to determine the right loc_filter
metadata_key = "fact_vehicle_trace/date={0}/partition_name={1}/{1}.csv".format(query_date, source_id)
response = s3.head_object(Bucket=bucket, Key=metadata_key)
loc_filter = X_FILTER
if 'network' in response["Metadata"]:
if response["Metadata"]['network'] in network_using_edge:
loc_filter = EDGE_FILTER
network = response["Metadata"]['network']
loc_filter = network_filters[network]['loc_filter']
start_filter = network_filters[network]['warmup_steps']
stop_filter = network_filters[network]['horizon_steps']

# update baseline if needed
if table == 'fact_vehicle_trace' \
and 'is_baseline' in response['Metadata'] and response['Metadata']['is_baseline'] == 'True':
update_baseline(s3, response["Metadata"]['network'], source_id)

query_dict = tags[table]

# handle different energy models
if table == "fact_energy_trace":
energy_model_id = partition.replace(source_id, "")[1:]
query_dict = tags[energy_model_id]
update_baseline(s3, network, source_id)

readied_queries = get_ready_queries(completed[source_id], query_name)
completed[source_id].add(query_name)
# initialize queries and store them at appropriate locations
for table_name, query_list in query_dict.items():
for query_name in query_list:
result_location = 's3://circles.data.pipeline/{}/date={}/partition_name={}_{}'.format(table_name,
query_date,
source_id,
query_name)
queryEngine.run_query(query_name,
result_location,
query_date,
partition,
loc_filter=loc_filter,
start_filter=start_filter,
stop_filter=stop_filter)
for readied_query_name, table_name in readied_queries:
result_location = 's3://circles.data.pipeline/{}/date={}/partition_name={}_{}'.format(table_name,
query_date,
source_id,
readied_query_name)
queryEngine.run_query(readied_query_name, result_location, query_date, partition, loc_filter=loc_filter,
start_filter=start_filter, stop_filter=stop_filter)
# stores all the updated lists of completed queries back to S3
put_completed_queries(s3, completed)
Loading

0 comments on commit c4ba7ad

Please sign in to comment.