Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query Prereq Check #987

Merged
merged 15 commits into from
Jul 8, 2020
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion flow/data_pipeline/data_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""contains class and helper functions for the data pipeline."""
import pandas as pd
import boto3
from flow.data_pipeline.query import QueryStrings
from botocore.exceptions import ClientError
from flow.data_pipeline.query import QueryStrings, prerequisites
from time import time
from datetime import date
import csv
from io import StringIO
import json


def generate_trajectory_table(data_path, extra_info, partition_name):
Expand Down Expand Up @@ -143,6 +145,7 @@ def delete_obsolete_data(s3, latest_key, table, bucket="circles.data.pipeline"):


def update_baseline(s3, baseline_network, baseline_source_id):
"""Update the baseline source_id for the specified network."""
obj = s3.get_object(Bucket='circles.data.pipeline', Key='baseline_table/baselines.csv')['Body']
original_str = obj.read().decode()
reader = csv.DictReader(StringIO(original_str))
Expand All @@ -157,6 +160,39 @@ def update_baseline(s3, baseline_network, baseline_source_id):
Body=new_str.getvalue().replace('\r', '').encode())


def get_completed_queries(s3, source_id):
"""Return the deserialized list of completed queries from S3."""
try:
completed_queries_obj = \
s3.get_object(Bucket='circles.data.pipeline', Key='lambda_temp/{}'.format(source_id))['Body']
completed_queries = json.loads(completed_queries_obj.read().decode('utf-8'))
except ClientError as e:
if e.response['Error']['Code'] == 'NoSuchKey':
completed_queries = set()
else:
raise
return completed_queries


def put_completed_queries(s3, completed_queries):
"""Put all the completed queries lists into S3 as in a serialized json format."""
for source_id, completed_queries_list in completed_queries.items():
completed_queries_json = json.dumps(completed_queries_list)
s3.put_object(Bucket='circles.data.pipeline', Key='lambda_temp/{}'.format(source_id),
Body=completed_queries_json.encode('utf-8'))


def get_ready_queries(completed_queries, new_query):
"""Return queries whose prerequisite queries are completed."""
readied_queries = []
unfinished_queries = prerequisites.keys() - completed_queries
for query_name in unfinished_queries:
if not prerequisites[query_name][1].issubset(completed_queries):
if prerequisites[query_name][1].issubset(completed_queries + [new_query]):
readied_queries.append((query_name, prerequisites[query_name][0]))
return readied_queries


class AthenaQuery:
"""Class used to run queries.

Expand Down
79 changes: 39 additions & 40 deletions flow/data_pipeline/lambda_function.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
"""lambda function on AWS Lambda."""
import boto3
from urllib.parse import unquote_plus
from flow.data_pipeline.data_pipeline import AthenaQuery, delete_obsolete_data, update_baseline
from flow.data_pipeline.query import tags, tables, network_using_edge, summary_tables
from flow.data_pipeline.query import X_FILTER, EDGE_FILTER, WARMUP_STEPS, HORIZON_STEPS
from flow.data_pipeline.data_pipeline import AthenaQuery, delete_obsolete_data, update_baseline, \
get_ready_queries, get_completed_queries, put_completed_queries
from flow.data_pipeline.query import tables, network_filters, summary_tables, triggers

s3 = boto3.client('s3')
queryEngine = AthenaQuery()


def lambda_handler(event, context):
"""Handle S3 put event on AWS Lambda."""
# stores all lists of completed query for each source_id
completed = {}
records = []
# do a pre-sweep to handle tasks other than initalizing a query
for record in event['Records']:
Expand All @@ -19,58 +21,55 @@ def lambda_handler(event, context):
table = key.split('/')[0]
if table not in tables:
continue

# delete unwanted metadata files
if (key[-9:] == '.metadata'):
s3.delete_object(Bucket=bucket, Key=key)
continue

s3.delete_object(Bucket=bucket, Key=(key + '.metadata'))
# load the partition for newly added table
query_date = key.split('/')[-3].split('=')[-1]
partition = key.split('/')[-2].split('=')[-1]
source_id = "flow_{}".format(partition.split('_')[1])
if table == "fact_vehicle_trace":
query_name = "FACT_VEHICLE_TRACE"
else:
query_name = partition.replace(source_id, "")[1:]
queryEngine.repair_partition(table, query_date, partition)

# delete obsolete data
if table in summary_tables:
delete_obsolete_data(s3, key, table)

# add table that need to start a query to list
if table in tags.keys():
records.append((bucket, key, table, query_date, partition))
if query_name in triggers:
records.append((bucket, key, table, query_name, query_date, partition, source_id))

# initialize the queries
start_filter = WARMUP_STEPS
stop_filter = WARMUP_STEPS + HORIZON_STEPS
for bucket, key, table, query_date, partition in records:
source_id = "flow_{}".format(partition.split('_')[1])
for bucket, key, table, query_name, query_date, partition, source_id in records:
# retrieve the set of completed query for this source_id if not already available
if source_id not in completed.keys():
completed[source_id] = get_completed_queries(s3, source_id)
# if query already recorded before, skip it. This is to tolerate repetitive execution by Lambda
if query_name in completed[source_id]:
continue
# retrieve metadata and use it to determine the right loc_filter
metadata_key = "fact_vehicle_trace/date={0}/partition_name={1}/{1}.csv".format(query_date, source_id)
response = s3.head_object(Bucket=bucket, Key=metadata_key)
loc_filter = X_FILTER
if 'network' in response["Metadata"]:
if response["Metadata"]['network'] in network_using_edge:
loc_filter = EDGE_FILTER
network = response["Metadata"]['network']
loc_filter = network_filters[network]['loc_filter']
start_filter = network_filters[network]['warmup_steps']
stop_filter = network_filters[network]['horizon_steps']

# update baseline if needed
if table == 'fact_vehicle_trace' \
and 'is_baseline' in response['Metadata'] and response['Metadata']['is_baseline'] == 'True':
update_baseline(s3, response["Metadata"]['network'], source_id)

query_dict = tags[table]

# handle different energy models
if table == "fact_energy_trace":
energy_model_id = partition.replace(source_id, "")[1:]
query_dict = tags[energy_model_id]
update_baseline(s3, network, source_id)

readied_queries = get_ready_queries(completed[source_id], query_name)
completed[source_id].add(query_name)
# initialize queries and store them at appropriate locations
for table_name, query_list in query_dict.items():
for query_name in query_list:
result_location = 's3://circles.data.pipeline/{}/date={}/partition_name={}_{}'.format(table_name,
query_date,
source_id,
query_name)
queryEngine.run_query(query_name,
result_location,
query_date,
partition,
loc_filter=loc_filter,
start_filter=start_filter,
stop_filter=stop_filter)
for readied_query_name, table_name in readied_queries:
result_location = 's3://circles.data.pipeline/{}/date={}/partition_name={}_{}'.format(table_name,
query_date,
source_id,
readied_query_name)
queryEngine.run_query(readied_query_name, result_location, query_date, partition, loc_filter=loc_filter,
start_filter=start_filter, stop_filter=stop_filter)
# stores all the updated lists of completed queries back to S3
put_completed_queries(s3, completed)
Loading