From b61339fe9aedaa8427881fd863e2c2e0fedc95b6 Mon Sep 17 00:00:00 2001 From: timburke-hackit <61045197+timburke-hackit@users.noreply.github.com> Date: Mon, 6 Nov 2023 20:17:42 +0000 Subject: [PATCH] academy state machine (#1484) * max concurrency lambda * add academy state machine in stg * shorter lambda name * add lambda zip output path * fix subnet id value * add JobName parameter * step function definition and iam statements * change concurrency calculation * maths * disable glue trigger, create eventbridge rule add acdemy table filters * deploy to stg and prod * counts * Revert "counts" This reverts commit a7f728cfe6542c54f9267f14e8229e5ae8f3afcd. --- lambdas/calculate_max_concurrency/main.py | 6 +- terraform/core/13-mssql-ingestion.tf | 153 +++++++++++++++++++--- 2 files changed, 135 insertions(+), 24 deletions(-) diff --git a/lambdas/calculate_max_concurrency/main.py b/lambdas/calculate_max_concurrency/main.py index be53a81e3..e2af0e01f 100644 --- a/lambdas/calculate_max_concurrency/main.py +++ b/lambdas/calculate_max_concurrency/main.py @@ -1,10 +1,10 @@ def calculate_max_concurrency(available_ips: int, ips_per_job: int) -> int: - return int(available_ips / ips_per_job) + return int((available_ips - 2) / ips_per_job) def lambda_handler(event, context): - available_ips = event["available_ips"] - ips_per_job = event["ips_per_job"] + available_ips = event["AvailableIPs"] + ips_per_job = event["Workers"] max_concurrency = calculate_max_concurrency(available_ips, ips_per_job) return {"max_concurrency": max_concurrency} diff --git a/terraform/core/13-mssql-ingestion.tf b/terraform/core/13-mssql-ingestion.tf index 61072b3d0..c43a9551a 100644 --- a/terraform/core/13-mssql-ingestion.tf +++ b/terraform/core/13-mssql-ingestion.tf @@ -62,8 +62,9 @@ resource "aws_glue_trigger" "filter_ingestion_tables" { } module "ingest_academy_revenues_and_benefits_housing_needs_to_landing_zone" { - for_each = local.table_filter_expressions - tags = module.tags.values + trigger_enabled = false + for_each = local.table_filter_expressions + tags = module.tags.values source = "../modules/aws-glue-job" is_live_environment = local.is_live_environment @@ -199,7 +200,9 @@ module "copy_academy_revenues_to_raw_zone" { ## Academy State Machine locals { - academy_state_machine_count = local.is_live_environment && !local.is_production_environment ? 1 : 0 + academy_state_machine_count = local.is_live_environment ? 1 : 0 + number_of_glue_workers = 2 + academy_table_filters = ["^lbhaliverbviews_core_hbrent[s].*", "^lbhaliverbviews_core_hbc.*", "^lbhaliverbviews_core_hbuc.*", "^lbhaliverbviews_core_hbrentclaim", "^lbhaliverbviews_core_hbrenttrans", "^lbhaliverbviews_core_hbrent[^tsc].*", "^lbhaliverbviews_core_hbmember", "^lbhaliverbviews_core_hbincome", "^lbhaliverbviews_core_hb[abdefghjklnopsw]", "^lbhaliverbviews_core_ct[dt].*", "^lbhaliverbviews_current_ctax.*", "^lbhaliverbviews_current_[hbn].*", "^lbhaliverbviews_core_ct[abcefghijklmnopqrsvw].*", "^lbhaliverbviews_core_recov_event", "(^lbhaliverbviews_core_cr.*|^lbhaliverbviews_core_[ins].*|^lbhaliverbviews_xdbvw.*|^lbhaliverbviews_current_im.*)"] } module "academy_glue_job" { @@ -209,20 +212,21 @@ module "academy_glue_job" { is_live_environment = local.is_live_environment is_production_environment = local.is_production_environment - job_name = "${local.short_identifier_prefix}Academy Revs & Bens Housing Needs Database Ingestion" - script_s3_object_key = aws_s3_object.ingest_database_tables_via_jdbc_connection.key - environment = var.environment - pydeequ_zip_key = aws_s3_object.pydeequ.key - helper_module_key = aws_s3_object.helpers.key - jdbc_connections = [module.academy_mssql_database_ingestion[0].jdbc_connection_name] - glue_role_arn = aws_iam_role.glue_role.arn - glue_temp_bucket_id = module.glue_temp_storage.bucket_id - glue_scripts_bucket_id = module.glue_scripts.bucket_id - spark_ui_output_storage_id = module.spark_ui_output_storage.bucket_id - glue_job_timeout = 420 - glue_version = "4.0" - glue_job_worker_type = "G.1X" - number_of_workers_for_glue_job = 2 + job_name = "${local.short_identifier_prefix}Academy Revs & Bens Housing Needs Database Ingestion" + script_s3_object_key = aws_s3_object.ingest_database_tables_via_jdbc_connection.key + environment = var.environment + pydeequ_zip_key = aws_s3_object.pydeequ.key + helper_module_key = aws_s3_object.helpers.key + jdbc_connections = [module.academy_mssql_database_ingestion[0].jdbc_connection_name] + glue_role_arn = aws_iam_role.glue_role.arn + glue_temp_bucket_id = module.glue_temp_storage.bucket_id + glue_scripts_bucket_id = module.glue_scripts.bucket_id + spark_ui_output_storage_id = module.spark_ui_output_storage.bucket_id + glue_job_timeout = 420 + glue_version = "4.0" + glue_job_worker_type = "G.1X" + number_of_workers_for_glue_job = local.number_of_glue_workers + max_concurrent_runs_of_glue_job = length(local.academy_table_filters) job_parameters = { "--source_data_database" = module.academy_mssql_database_ingestion[0].ingestion_database_name "--s3_ingestion_bucket_target" = "s3://${module.landing_zone.bucket_id}/academy/" @@ -261,7 +265,7 @@ module "academy_state_machine" { "Resource": "${module.max_concurrency_lambda[0].lambda_function_arn}", "Parameters": { "AvailableIPs.$": "$.SubnetResult.Subnets[0].AvailableIpAddressCount", - "NumTasks.$": "$.TaskCount" + "Workers.$": "${local.number_of_glue_workers}" }, "ResultPath": "$.MaxConcurrencyResult", "Next": "Map" @@ -281,7 +285,7 @@ module "academy_state_machine" { "JobName": "${module.academy_glue_job[0].job_name}", "Parameters": { "Arguments": { - "--table_filter_expression": "$.table_filter_expression" + "--table_filter_expression.$": "$.FilterString" } } }, @@ -289,8 +293,13 @@ module "academy_state_machine" { } } }, - "MaxConcurrencyPath": "$.MaxConcurrencyResult", - "End": true + "MaxConcurrencyPath": "$.MaxConcurrencyResult.max_concurrency", + "End": true, + "ItemsPath": "$.TableFilters", + "ItemSelector": { + "MaxConcurrency.$": "$.MaxConcurrencyResult.max_concurrency", + "FilterString.$": "$$.Map.Item.Value" + } } } } @@ -328,3 +337,105 @@ data "aws_iam_policy_document" "step_functions_assume_role_policy" { } } } + +data "aws_iam_policy_document" "academy_step_functions_policy" { + statement { + actions = [ + "logs:CreateLogDelivery", + "logs:GetLogDelivery", + "logs:UpdateLogDelivery", + "logs:DeleteLogDelivery", + "logs:ListLogDeliveries", + "logs:PutResourcePolicy", + "logs:DescribeResourcePolicies", + "logs:DescribeLogGroups" + ] + resources = ["*"] + } + + statement { + actions = [ + "glue:StartJobRun", + "glue:GetJobRun", + "glue:GetJobRuns", + "glue:BatchStopJobRun" + ] + resources = ["*"] + } + + statement { + actions = [ + "lambda:InvokeFunction" + ] + resources = [module.max_concurrency_lambda[0].lambda_function_arn] + } + + statement { + actions = [ + "ec2:DescribeSubnets" + ] + resources = ["*"] + } +} + +resource "aws_cloudwatch_event_rule" "academy_state_machine_trigger" { + count = local.academy_state_machine_count + name = "${local.short_identifier_prefix}academy-state-machine-trigger" + tags = module.tags.values + description = "Trigger the Academy State Machine every weekday at 1am" + schedule_expression = "cron(0 1 ? * MON-FRI *)" + is_enabled = true + role_arn = aws_iam_role.academy_cloudwatch_execution_role[0].arn +} + +resource "aws_cloudwatch_event_target" "academy_state_machine_trigger" { + count = local.academy_state_machine_count + rule = aws_cloudwatch_event_rule.academy_state_machine_trigger[0].name + arn = module.academy_state_machine[0].arn + input = <