Skip to content

Commit

Permalink
script for stage and load execution lambda
Browse files Browse the repository at this point in the history
  • Loading branch information
timburke-hackit committed Nov 1, 2023
1 parent b48c7a9 commit 0ebf4e7
Showing 1 changed file with 45 additions and 0 deletions.
45 changes: 45 additions & 0 deletions lambdas/redshift_stage_and_load_tables/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import os

import boto3

redshift_data_client = boto3.client("redshift-data")


def start_sql_execution(cluster_id, database, user, sql):
response = redshift_data_client.execute_statement(
ClusterIdentifier=cluster_id,
Database=database,
DbUser=user,
Sql=sql,
WithEvent=True,
)

return response["Id"]


def lambda_handler(event, context):
cluster_id = os.environ["REDSHIFT_CLUSTER_ID"]
redshift_database = os.environ["REDSHIFT_DBNAME"]
user = os.environ["REDSHIFT_USER"]
iam_role = os.environ["REDSHIFT_IAM_ROLE"]
source_bucket = os.environ["SOURCE_BUCKET"]

import_year = event["year"]
import_month = event["month"]
import_day = event["day"]
import_date = event["date"]
source_database = event["database"]

query_ids = []

for table in event["tables"]:
s3_path = f"s3://{source_bucket}/{source_database}/{table}/{import_year}/{import_month}/{import_day}/{import_date}/"
sql = f"CALL stage_and_load_parquet('{s3_path}', '{iam_role}', '{table}')"
query_id = start_sql_execution(cluster_id, redshift_database, user, sql)
query_ids.append(query_id)

return {
"statusCode": 200,
"body": "Started Redshift data staging and load for all tables!",
"query_ids": query_ids,
}

0 comments on commit 0ebf4e7

Please sign in to comment.