Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Academy state machine #1484

Merged
merged 14 commits into from
Nov 6, 2023
6 changes: 3 additions & 3 deletions lambdas/calculate_max_concurrency/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
def calculate_max_concurrency(available_ips: int, ips_per_job: int) -> int:
return int(available_ips / ips_per_job)
return int((available_ips - 2) / ips_per_job)


def lambda_handler(event, context):
available_ips = event["available_ips"]
ips_per_job = event["ips_per_job"]
available_ips = event["AvailableIPs"]
ips_per_job = event["Workers"]
max_concurrency = calculate_max_concurrency(available_ips, ips_per_job)
return {"max_concurrency": max_concurrency}

Expand Down
154 changes: 133 additions & 21 deletions terraform/core/13-mssql-ingestion.tf
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ resource "aws_glue_trigger" "filter_ingestion_tables" {
}

module "ingest_academy_revenues_and_benefits_housing_needs_to_landing_zone" {
for_each = local.table_filter_expressions
tags = module.tags.values
trigger_enabled = false
for_each = local.table_filter_expressions
tags = module.tags.values

source = "../modules/aws-glue-job"
is_live_environment = local.is_live_environment
Expand Down Expand Up @@ -199,7 +200,9 @@ module "copy_academy_revenues_to_raw_zone" {
## Academy State Machine

locals {
academy_state_machine_count = local.is_live_environment && !local.is_production_environment ? 1 : 0
academy_state_machine_count = local.is_live_environment ? 1 : 0
number_of_glue_workers = 2
academy_table_filters = ["^lbhaliverbviews_core_hbrent[s].*", "^lbhaliverbviews_core_hbc.*", "^lbhaliverbviews_core_hbuc.*", "^lbhaliverbviews_core_hbrentclaim", "^lbhaliverbviews_core_hbrenttrans", "^lbhaliverbviews_core_hbrent[^tsc].*", "^lbhaliverbviews_core_hbmember", "^lbhaliverbviews_core_hbincome", "^lbhaliverbviews_core_hb[abdefghjklnopsw]", "^lbhaliverbviews_core_ct[dt].*", "^lbhaliverbviews_current_ctax.*", "^lbhaliverbviews_current_[hbn].*", "^lbhaliverbviews_core_ct[abcefghijklmnopqrsvw].*", "^lbhaliverbviews_core_recov_event", "(^lbhaliverbviews_core_cr.*|^lbhaliverbviews_core_[ins].*|^lbhaliverbviews_xdbvw.*|^lbhaliverbviews_current_im.*)"]
}

module "academy_glue_job" {
Expand All @@ -209,20 +212,21 @@ module "academy_glue_job" {
is_live_environment = local.is_live_environment
is_production_environment = local.is_production_environment

job_name = "${local.short_identifier_prefix}Academy Revs & Bens Housing Needs Database Ingestion"
script_s3_object_key = aws_s3_object.ingest_database_tables_via_jdbc_connection.key
environment = var.environment
pydeequ_zip_key = aws_s3_object.pydeequ.key
helper_module_key = aws_s3_object.helpers.key
jdbc_connections = [module.academy_mssql_database_ingestion[0].jdbc_connection_name]
glue_role_arn = aws_iam_role.glue_role.arn
glue_temp_bucket_id = module.glue_temp_storage.bucket_id
glue_scripts_bucket_id = module.glue_scripts.bucket_id
spark_ui_output_storage_id = module.spark_ui_output_storage.bucket_id
glue_job_timeout = 420
glue_version = "4.0"
glue_job_worker_type = "G.1X"
number_of_workers_for_glue_job = 2
job_name = "${local.short_identifier_prefix}Academy Revs & Bens Housing Needs Database Ingestion"
script_s3_object_key = aws_s3_object.ingest_database_tables_via_jdbc_connection.key
environment = var.environment
pydeequ_zip_key = aws_s3_object.pydeequ.key
helper_module_key = aws_s3_object.helpers.key
jdbc_connections = [module.academy_mssql_database_ingestion[0].jdbc_connection_name]
glue_role_arn = aws_iam_role.glue_role.arn
glue_temp_bucket_id = module.glue_temp_storage.bucket_id
glue_scripts_bucket_id = module.glue_scripts.bucket_id
spark_ui_output_storage_id = module.spark_ui_output_storage.bucket_id
glue_job_timeout = 420
glue_version = "4.0"
glue_job_worker_type = "G.1X"
number_of_workers_for_glue_job = local.number_of_glue_workers
max_concurrent_runs_of_glue_job = length(local.academy_table_filters)
job_parameters = {
"--source_data_database" = module.academy_mssql_database_ingestion[0].ingestion_database_name
"--s3_ingestion_bucket_target" = "s3://${module.landing_zone.bucket_id}/academy/"
Expand Down Expand Up @@ -261,7 +265,7 @@ module "academy_state_machine" {
"Resource": "${module.max_concurrency_lambda[0].lambda_function_arn}",
"Parameters": {
"AvailableIPs.$": "$.SubnetResult.Subnets[0].AvailableIpAddressCount",
"NumTasks.$": "$.TaskCount"
"Workers.$": "${local.number_of_glue_workers}"
},
"ResultPath": "$.MaxConcurrencyResult",
"Next": "Map"
Expand All @@ -278,18 +282,24 @@ module "academy_state_machine" {
"Type": "Task",
"Resource": "arn:aws:states:::glue:startJobRun.sync",
"Parameters": {
"JobName": "${module.academy_glue_job[0].job_name}",
"Parameters": {
"Arguments": {
"--table_filter_expression": "$.table_filter_expression"
"--table_filter_expression.$": "$.FilterString"
}
}
},
"End": true
}
}
},
"MaxConcurrencyPath": "$.MaxConcurrencyResult",
"End": true
"MaxConcurrencyPath": "$.MaxConcurrencyResult.max_concurrency",
"End": true,
"ItemsPath": "$.TableFilters",
"ItemSelector": {
"MaxConcurrency.$": "$.MaxConcurrencyResult.max_concurrency",
"FilterString.$": "$$.Map.Item.Value"
}
}
}
}
Expand Down Expand Up @@ -327,3 +337,105 @@ data "aws_iam_policy_document" "step_functions_assume_role_policy" {
}
}
}

data "aws_iam_policy_document" "academy_step_functions_policy" {
statement {
actions = [
"logs:CreateLogDelivery",
"logs:GetLogDelivery",
"logs:UpdateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:PutResourcePolicy",
"logs:DescribeResourcePolicies",
"logs:DescribeLogGroups"
]
resources = ["*"]
}

statement {
actions = [
"glue:StartJobRun",
"glue:GetJobRun",
"glue:GetJobRuns",
"glue:BatchStopJobRun"
]
resources = ["*"]
}

statement {
actions = [
"lambda:InvokeFunction"
]
resources = [module.max_concurrency_lambda[0].lambda_function_arn]
}

statement {
actions = [
"ec2:DescribeSubnets"
]
resources = ["*"]
}
}

resource "aws_cloudwatch_event_rule" "academy_state_machine_trigger" {
count = local.academy_state_machine_count
name = "${local.short_identifier_prefix}academy-state-machine-trigger"
tags = module.tags.values
description = "Trigger the Academy State Machine every weekday at 1am"
schedule_expression = "cron(0 1 ? * MON-FRI *)"
is_enabled = true
role_arn = aws_iam_role.academy_cloudwatch_execution_role[0].arn
}

resource "aws_cloudwatch_event_target" "academy_state_machine_trigger" {
count = local.academy_state_machine_count
rule = aws_cloudwatch_event_rule.academy_state_machine_trigger[0].name
arn = module.academy_state_machine[0].arn
input = <<EOF
{
"TableFilters": ${jsonencode(local.academy_table_filters)}
}
EOF
}

resource "aws_iam_role" "academy_cloudwatch_execution_role" {
count = local.academy_state_machine_count
name = "${local.short_identifier_prefix}academy-cloudwatch-execution-role"
tags = module.tags.values
assume_role_policy = data.aws_iam_policy_document.cloudwatch_assume_role_policy.json
}

data "aws_iam_policy_document" "cloudwatch_assume_role_policy" {
statement {
actions = ["sts:AssumeRole"]

principals {
type = "Service"
identifiers = ["events.amazonaws.com"]
}
}
}

data "aws_iam_policy_document" "cloudwatch_execution_policy" {
statement {
actions = [
"states:StartExecution"
]
resources = [module.academy_state_machine[0].arn]
}
}

resource "aws_iam_policy_attachment" "academy_cloudwatch_execution_policy_attachment" {
count = local.academy_state_machine_count
name = "${local.short_identifier_prefix}academy-cloudwatch-execution-policy-attachment"
policy_arn = aws_iam_policy.academy_cloudwatch_execution_policy[0].arn
roles = [aws_iam_role.academy_cloudwatch_execution_role[0].name]
}

resource "aws_iam_policy" "academy_cloudwatch_execution_policy" {
count = local.academy_state_machine_count
name = "${local.short_identifier_prefix}academy-cloudwatch-execution-policy"
tags = module.tags.values
policy = data.aws_iam_policy_document.cloudwatch_execution_policy.json
}
Loading