From b0ac4fc6b457aab20fc49c626aad53333e4bb455 Mon Sep 17 00:00:00 2001 From: timburke-hackit <61045197+timburke-hackit@users.noreply.github.com> Date: Wed, 17 Jan 2024 15:53:10 +0000 Subject: [PATCH] dpp 573/parking tables load lambda (#1551) * sql procedure for staging and loading tables * script for stage and load execution lambda * add schema argument * remove trailing comma, prevent Black auto-format * formatting and exception handling * move sql to lambda folder * remove procedure creation * read and parameterise sql from file * output cluster arn * terraform for lambda config * add tags to roles * fix lambda path * policy attachment count * add lambda zip output path * braces around parameters * remove braces --- .../redshift_stage_and_load_tables/main.py | 62 +++++++++ .../stage_and_load_parquet.sql | 32 +++++ .../etl/38-parking-load-redshift-tables.tf | 128 ++++++++++++++++++ terraform/modules/redshift/99-outputs.tf | 6 +- 4 files changed, 227 insertions(+), 1 deletion(-) create mode 100644 lambdas/redshift_stage_and_load_tables/main.py create mode 100644 lambdas/redshift_stage_and_load_tables/stage_and_load_parquet.sql create mode 100644 terraform/etl/38-parking-load-redshift-tables.tf diff --git a/lambdas/redshift_stage_and_load_tables/main.py b/lambdas/redshift_stage_and_load_tables/main.py new file mode 100644 index 000000000..407bdc79e --- /dev/null +++ b/lambdas/redshift_stage_and_load_tables/main.py @@ -0,0 +1,62 @@ +import os + +import boto3 + +redshift_data_client = boto3.client("redshift-data") + + +def read_sql_from_file(file_path, params): + with open(file_path, "r") as f: + sql_template = f.read() + return sql_template.format(**params) + + +def start_sql_execution(cluster_id, database, user, sql): + response = redshift_data_client.execute_statement( + ClusterIdentifier=cluster_id, + Database=database, + DbUser=user, + Sql=sql, + WithEvent=True, + ) + + return response["Id"] + + +def lambda_handler(event, context): + cluster_id = os.environ["REDSHIFT_CLUSTER_ID"] + redshift_database = os.environ["REDSHIFT_DBNAME"] + user = os.environ["REDSHIFT_USER"] + iam_role = os.environ["REDSHIFT_IAM_ROLE"] + source_bucket = os.environ["SOURCE_BUCKET"] + schema_name = os.environ["SCHEMA_NAME"] + sql_file = os.environ["SQL_FILE"] + + import_year = event["year"] + import_month = event["month"] + import_day = event["day"] + import_date = event["date"] + source_database = event["database"] + + query_ids = [] + + for table in event["tables"]: + s3_path = f"s3://{source_bucket}/{source_database}/{table}/{import_year}/{import_month}/{import_day}/{import_date}/" + + params = { + "schema_name": schema_name, + "table_name": table, + "s3_path": s3_path, + "iam_role": iam_role, + } + + sql = read_sql_from_file(sql_file, params) + + query_id = start_sql_execution(cluster_id, redshift_database, user, sql) + query_ids.append(query_id) + + return { + "statusCode": 200, + "body": "Started Redshift data staging and load for all tables!", + "query_ids": query_ids, + } diff --git a/lambdas/redshift_stage_and_load_tables/stage_and_load_parquet.sql b/lambdas/redshift_stage_and_load_tables/stage_and_load_parquet.sql new file mode 100644 index 000000000..e45c55ae3 --- /dev/null +++ b/lambdas/redshift_stage_and_load_tables/stage_and_load_parquet.sql @@ -0,0 +1,32 @@ +BEGIN + -- Create the staging table + EXECUTE format( + 'CREATE TABLE IF NOT EXISTS %I.%I_staging (LIKE %I.%I);', + {schema_name}, + {table_name}, + {schema_name}, + {table_name} + ); + -- Load data from S3 into the staging table + EXECUTE format( + 'COPY %I.%I_staging FROM %L FORMAT AS PARQUET iam_role %L;', + {schema_name}, + {table_name}, + {s3_path}, + {iam_role} + ); + -- Insert data from staging to main table + EXECUTE format( + 'INSERT INTO %I.%I SELECT * FROM %I.%I_staging;', + {schema_name}, + {table_name}, + {schema_name}, + {table_name} + ); + -- Truncate staging table + EXECUTE format( + 'TRUNCATE %I.%I_staging;', + {schema_name}, + {table_name} + ); +COMMIT; \ No newline at end of file diff --git a/terraform/etl/38-parking-load-redshift-tables.tf b/terraform/etl/38-parking-load-redshift-tables.tf new file mode 100644 index 000000000..1dcac44df --- /dev/null +++ b/terraform/etl/38-parking-load-redshift-tables.tf @@ -0,0 +1,128 @@ +module "parking_load_redshift_tables" { + count = local.is_live_environment && !local.is_production_environment ? 1 : 0 + source = "../modules/aws-lambda" + lambda_name = "parking-load-redshift-tables" + handler = "main.lambda_handler" + lambda_artefact_storage_bucket = module.lambda_artefact_storage_data_source.bucket_id + s3_key = "parking-load-redshift-tables.zip" + lambda_source_dir = "../../lambdas/redshift_stage_and_load_tables" + lambda_output_path = "../../lambdas/lambda-archives/parking-load-redshift-tables.zip" + + environment_variables = { + "REDSHIFT_CLUSTER_ID" = module.redshift[0].cluster_id + "REDSHIFT_DBNAME" = "data_platform" + "REDSHIFT_IAM_ROLE" = aws_iam_role.parking_redshift_copier[0].arn + "SOURCE_BUCKET" = module.refined_zone_data_source.bucket_id + "SCHEMA_NAME" = "parking_test" + "SQL_FILE" = "stage_and_load_parquet.sql" + "SECRET_NAME" = "parking/redshift_stage_and_load_tables_user" + } +} + +# IAM policies and role for Redshift to access S3 + +resource "aws_iam_role" "parking_redshift_copier" { + count = local.is_live_environment && !local.is_production_environment ? 1 : 0 + name = "parking_redshift_copier" + tags = module.tags.values + assume_role_policy = jsonencode({ + Statement = [ + { + Action = "sts:AssumeRole" + Effect = "Allow" + Principal = { + Service = "redshift.amazonaws.com" + } + } + ] + }) +} + +data "aws_iam_policy_document" "redshift_list_and_get_s3" { + statement { + actions = [ + "s3:ListBucket", + "s3:GetObject" + ] + resources = [ + module.refined_zone_data_source.bucket_arn, + "${module.refined_zone_data_source.bucket_arn}/*" + ] + } +} + +resource "aws_iam_policy" "redshift_list_and_get_s3" { + count = local.is_live_environment && !local.is_production_environment ? 1 : 0 + name = "redshift_list_and_get_s3" + policy = data.aws_iam_policy_document.redshift_list_and_get_s3.json +} + +resource "aws_iam_role_policy_attachment" "redshift_list_and_get_s3" { + count = local.is_live_environment && !local.is_production_environment ? 1 : 0 + role = aws_iam_role.parking_redshift_copier[0].name + policy_arn = aws_iam_policy.redshift_list_and_get_s3[0].arn +} + +resource "aws_redshift_cluster_iam_roles" "parking_redshift_copier" { + count = local.is_live_environment && !local.is_production_environment ? 1 : 0 + cluster_identifier = module.redshift[0].cluster_id + iam_role_arns = [aws_iam_role.parking_redshift_copier[0].arn] +} + +# IAM policies and role for lambda executor + +data "aws_iam_policy_document" "lambda_redshift_policy" { + statement { + actions = [ + "redshift-data:ExecuteStatement", + "redshift-data:GetStatementResult", + "redshift-data:DescribeStatement", + "redshift-data:ListDatabases", + "redshift-data:ListSchemas", + "redshift-data:ListTables", + "redshift-data:ListStatements", + "redshift-data:CancelStatement" + ] + resources = [module.redshift[0].cluster_arn] + } + + statement { + actions = [ + "secretsmanager:GetSecretValue" + ] + resources = [ + "arn:aws:secretsmanager:eu-west-2:${data.aws_caller_identity.data_platform.account_id}:secret:parking/redshift_stage_and_load_tables_user" + + ] + } +} + +resource "aws_iam_policy" "lambda_policy" { + count = local.is_live_environment && !local.is_production_environment ? 1 : 0 + name = "lambda_redshift_policy" + policy = data.aws_iam_policy_document.lambda_redshift_policy.json +} + +resource "aws_iam_role" "lambda_role" { + name = "lambda_execution_role" + tags = module.tags.values + + assume_role_policy = jsonencode({ + Version = "2012-10-17", + Statement = [ + { + Action = "sts:AssumeRole", + Effect = "Allow", + Principal = { + Service = "lambda.amazonaws.com" + } + } + ] + }) +} + +resource "aws_iam_role_policy_attachment" "lambda_policy_attach" { + count = local.is_live_environment && !local.is_production_environment ? 1 : 0 + role = aws_iam_role.lambda_role.name + policy_arn = aws_iam_policy.lambda_policy[0].arn +} diff --git a/terraform/modules/redshift/99-outputs.tf b/terraform/modules/redshift/99-outputs.tf index 0425a7c83..58b643b20 100644 --- a/terraform/modules/redshift/99-outputs.tf +++ b/terraform/modules/redshift/99-outputs.tf @@ -4,4 +4,8 @@ output "role_arn" { output "cluster_id" { value = aws_redshift_cluster.redshift_cluster.cluster_identifier -} \ No newline at end of file +} + +output "cluster_arn" { + value = aws_redshift_cluster.redshift_cluster.arn +}