Skip to content

Commit

Permalink
dpp 573/parking tables load lambda (#1551)
Browse files Browse the repository at this point in the history
* sql procedure for staging and loading tables

* script for stage and load execution lambda

* add schema argument

* remove trailing comma, prevent Black auto-format

* formatting and exception handling

* move sql to lambda folder

* remove procedure creation

* read and parameterise sql from file

* output cluster arn

* terraform for lambda config

* add tags to roles

* fix lambda path

* policy attachment count

* add lambda zip output path

* braces around parameters

* remove braces
  • Loading branch information
timburke-hackit authored Jan 17, 2024
1 parent 8fb7961 commit b0ac4fc
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 1 deletion.
62 changes: 62 additions & 0 deletions lambdas/redshift_stage_and_load_tables/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import os

import boto3

redshift_data_client = boto3.client("redshift-data")


def read_sql_from_file(file_path, params):
with open(file_path, "r") as f:
sql_template = f.read()
return sql_template.format(**params)


def start_sql_execution(cluster_id, database, user, sql):
response = redshift_data_client.execute_statement(
ClusterIdentifier=cluster_id,
Database=database,
DbUser=user,
Sql=sql,
WithEvent=True,
)

return response["Id"]


def lambda_handler(event, context):
cluster_id = os.environ["REDSHIFT_CLUSTER_ID"]
redshift_database = os.environ["REDSHIFT_DBNAME"]
user = os.environ["REDSHIFT_USER"]
iam_role = os.environ["REDSHIFT_IAM_ROLE"]
source_bucket = os.environ["SOURCE_BUCKET"]
schema_name = os.environ["SCHEMA_NAME"]
sql_file = os.environ["SQL_FILE"]

import_year = event["year"]
import_month = event["month"]
import_day = event["day"]
import_date = event["date"]
source_database = event["database"]

query_ids = []

for table in event["tables"]:
s3_path = f"s3://{source_bucket}/{source_database}/{table}/{import_year}/{import_month}/{import_day}/{import_date}/"

params = {
"schema_name": schema_name,
"table_name": table,
"s3_path": s3_path,
"iam_role": iam_role,
}

sql = read_sql_from_file(sql_file, params)

query_id = start_sql_execution(cluster_id, redshift_database, user, sql)
query_ids.append(query_id)

return {
"statusCode": 200,
"body": "Started Redshift data staging and load for all tables!",
"query_ids": query_ids,
}
32 changes: 32 additions & 0 deletions lambdas/redshift_stage_and_load_tables/stage_and_load_parquet.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
BEGIN
-- Create the staging table
EXECUTE format(
'CREATE TABLE IF NOT EXISTS %I.%I_staging (LIKE %I.%I);',
{schema_name},
{table_name},
{schema_name},
{table_name}
);
-- Load data from S3 into the staging table
EXECUTE format(
'COPY %I.%I_staging FROM %L FORMAT AS PARQUET iam_role %L;',
{schema_name},
{table_name},
{s3_path},
{iam_role}
);
-- Insert data from staging to main table
EXECUTE format(
'INSERT INTO %I.%I SELECT * FROM %I.%I_staging;',
{schema_name},
{table_name},
{schema_name},
{table_name}
);
-- Truncate staging table
EXECUTE format(
'TRUNCATE %I.%I_staging;',
{schema_name},
{table_name}
);
COMMIT;
128 changes: 128 additions & 0 deletions terraform/etl/38-parking-load-redshift-tables.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
module "parking_load_redshift_tables" {
count = local.is_live_environment && !local.is_production_environment ? 1 : 0
source = "../modules/aws-lambda"
lambda_name = "parking-load-redshift-tables"
handler = "main.lambda_handler"
lambda_artefact_storage_bucket = module.lambda_artefact_storage_data_source.bucket_id
s3_key = "parking-load-redshift-tables.zip"
lambda_source_dir = "../../lambdas/redshift_stage_and_load_tables"
lambda_output_path = "../../lambdas/lambda-archives/parking-load-redshift-tables.zip"

environment_variables = {
"REDSHIFT_CLUSTER_ID" = module.redshift[0].cluster_id
"REDSHIFT_DBNAME" = "data_platform"
"REDSHIFT_IAM_ROLE" = aws_iam_role.parking_redshift_copier[0].arn
"SOURCE_BUCKET" = module.refined_zone_data_source.bucket_id
"SCHEMA_NAME" = "parking_test"
"SQL_FILE" = "stage_and_load_parquet.sql"
"SECRET_NAME" = "parking/redshift_stage_and_load_tables_user"
}
}

# IAM policies and role for Redshift to access S3

resource "aws_iam_role" "parking_redshift_copier" {
count = local.is_live_environment && !local.is_production_environment ? 1 : 0
name = "parking_redshift_copier"
tags = module.tags.values
assume_role_policy = jsonencode({
Statement = [
{
Action = "sts:AssumeRole"
Effect = "Allow"
Principal = {
Service = "redshift.amazonaws.com"
}
}
]
})
}

data "aws_iam_policy_document" "redshift_list_and_get_s3" {
statement {
actions = [
"s3:ListBucket",
"s3:GetObject"
]
resources = [
module.refined_zone_data_source.bucket_arn,
"${module.refined_zone_data_source.bucket_arn}/*"
]
}
}

resource "aws_iam_policy" "redshift_list_and_get_s3" {
count = local.is_live_environment && !local.is_production_environment ? 1 : 0
name = "redshift_list_and_get_s3"
policy = data.aws_iam_policy_document.redshift_list_and_get_s3.json
}

resource "aws_iam_role_policy_attachment" "redshift_list_and_get_s3" {
count = local.is_live_environment && !local.is_production_environment ? 1 : 0
role = aws_iam_role.parking_redshift_copier[0].name
policy_arn = aws_iam_policy.redshift_list_and_get_s3[0].arn
}

resource "aws_redshift_cluster_iam_roles" "parking_redshift_copier" {
count = local.is_live_environment && !local.is_production_environment ? 1 : 0
cluster_identifier = module.redshift[0].cluster_id
iam_role_arns = [aws_iam_role.parking_redshift_copier[0].arn]
}

# IAM policies and role for lambda executor

data "aws_iam_policy_document" "lambda_redshift_policy" {
statement {
actions = [
"redshift-data:ExecuteStatement",
"redshift-data:GetStatementResult",
"redshift-data:DescribeStatement",
"redshift-data:ListDatabases",
"redshift-data:ListSchemas",
"redshift-data:ListTables",
"redshift-data:ListStatements",
"redshift-data:CancelStatement"
]
resources = [module.redshift[0].cluster_arn]
}

statement {
actions = [
"secretsmanager:GetSecretValue"
]
resources = [
"arn:aws:secretsmanager:eu-west-2:${data.aws_caller_identity.data_platform.account_id}:secret:parking/redshift_stage_and_load_tables_user"

]
}
}

resource "aws_iam_policy" "lambda_policy" {
count = local.is_live_environment && !local.is_production_environment ? 1 : 0
name = "lambda_redshift_policy"
policy = data.aws_iam_policy_document.lambda_redshift_policy.json
}

resource "aws_iam_role" "lambda_role" {
name = "lambda_execution_role"
tags = module.tags.values

assume_role_policy = jsonencode({
Version = "2012-10-17",
Statement = [
{
Action = "sts:AssumeRole",
Effect = "Allow",
Principal = {
Service = "lambda.amazonaws.com"
}
}
]
})
}

resource "aws_iam_role_policy_attachment" "lambda_policy_attach" {
count = local.is_live_environment && !local.is_production_environment ? 1 : 0
role = aws_iam_role.lambda_role.name
policy_arn = aws_iam_policy.lambda_policy[0].arn
}
6 changes: 5 additions & 1 deletion terraform/modules/redshift/99-outputs.tf
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,8 @@ output "role_arn" {

output "cluster_id" {
value = aws_redshift_cluster.redshift_cluster.cluster_identifier
}
}

output "cluster_arn" {
value = aws_redshift_cluster.redshift_cluster.arn
}

0 comments on commit b0ac4fc

Please sign in to comment.