Skip to content

Commit

Permalink
academy state machine (#1484)
Browse files Browse the repository at this point in the history
* max concurrency lambda

* add academy state machine in stg

* shorter lambda name

* add lambda zip output path

* fix subnet id value

* add JobName parameter

* step function definition and iam statements

* change concurrency calculation

* maths

* disable glue trigger, create eventbridge rule
add acdemy table filters

* deploy to stg and prod

* counts

* Revert "counts"

This reverts commit a7f728c.
  • Loading branch information
timburke-hackit authored Nov 6, 2023
1 parent da273fd commit b61339f
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 24 deletions.
6 changes: 3 additions & 3 deletions lambdas/calculate_max_concurrency/main.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
def calculate_max_concurrency(available_ips: int, ips_per_job: int) -> int:
return int(available_ips / ips_per_job)
return int((available_ips - 2) / ips_per_job)


def lambda_handler(event, context):
available_ips = event["available_ips"]
ips_per_job = event["ips_per_job"]
available_ips = event["AvailableIPs"]
ips_per_job = event["Workers"]
max_concurrency = calculate_max_concurrency(available_ips, ips_per_job)
return {"max_concurrency": max_concurrency}

Expand Down
153 changes: 132 additions & 21 deletions terraform/core/13-mssql-ingestion.tf
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,9 @@ resource "aws_glue_trigger" "filter_ingestion_tables" {
}

module "ingest_academy_revenues_and_benefits_housing_needs_to_landing_zone" {
for_each = local.table_filter_expressions
tags = module.tags.values
trigger_enabled = false
for_each = local.table_filter_expressions
tags = module.tags.values

source = "../modules/aws-glue-job"
is_live_environment = local.is_live_environment
Expand Down Expand Up @@ -199,7 +200,9 @@ module "copy_academy_revenues_to_raw_zone" {
## Academy State Machine

locals {
academy_state_machine_count = local.is_live_environment && !local.is_production_environment ? 1 : 0
academy_state_machine_count = local.is_live_environment ? 1 : 0
number_of_glue_workers = 2
academy_table_filters = ["^lbhaliverbviews_core_hbrent[s].*", "^lbhaliverbviews_core_hbc.*", "^lbhaliverbviews_core_hbuc.*", "^lbhaliverbviews_core_hbrentclaim", "^lbhaliverbviews_core_hbrenttrans", "^lbhaliverbviews_core_hbrent[^tsc].*", "^lbhaliverbviews_core_hbmember", "^lbhaliverbviews_core_hbincome", "^lbhaliverbviews_core_hb[abdefghjklnopsw]", "^lbhaliverbviews_core_ct[dt].*", "^lbhaliverbviews_current_ctax.*", "^lbhaliverbviews_current_[hbn].*", "^lbhaliverbviews_core_ct[abcefghijklmnopqrsvw].*", "^lbhaliverbviews_core_recov_event", "(^lbhaliverbviews_core_cr.*|^lbhaliverbviews_core_[ins].*|^lbhaliverbviews_xdbvw.*|^lbhaliverbviews_current_im.*)"]
}

module "academy_glue_job" {
Expand All @@ -209,20 +212,21 @@ module "academy_glue_job" {
is_live_environment = local.is_live_environment
is_production_environment = local.is_production_environment

job_name = "${local.short_identifier_prefix}Academy Revs & Bens Housing Needs Database Ingestion"
script_s3_object_key = aws_s3_object.ingest_database_tables_via_jdbc_connection.key
environment = var.environment
pydeequ_zip_key = aws_s3_object.pydeequ.key
helper_module_key = aws_s3_object.helpers.key
jdbc_connections = [module.academy_mssql_database_ingestion[0].jdbc_connection_name]
glue_role_arn = aws_iam_role.glue_role.arn
glue_temp_bucket_id = module.glue_temp_storage.bucket_id
glue_scripts_bucket_id = module.glue_scripts.bucket_id
spark_ui_output_storage_id = module.spark_ui_output_storage.bucket_id
glue_job_timeout = 420
glue_version = "4.0"
glue_job_worker_type = "G.1X"
number_of_workers_for_glue_job = 2
job_name = "${local.short_identifier_prefix}Academy Revs & Bens Housing Needs Database Ingestion"
script_s3_object_key = aws_s3_object.ingest_database_tables_via_jdbc_connection.key
environment = var.environment
pydeequ_zip_key = aws_s3_object.pydeequ.key
helper_module_key = aws_s3_object.helpers.key
jdbc_connections = [module.academy_mssql_database_ingestion[0].jdbc_connection_name]
glue_role_arn = aws_iam_role.glue_role.arn
glue_temp_bucket_id = module.glue_temp_storage.bucket_id
glue_scripts_bucket_id = module.glue_scripts.bucket_id
spark_ui_output_storage_id = module.spark_ui_output_storage.bucket_id
glue_job_timeout = 420
glue_version = "4.0"
glue_job_worker_type = "G.1X"
number_of_workers_for_glue_job = local.number_of_glue_workers
max_concurrent_runs_of_glue_job = length(local.academy_table_filters)
job_parameters = {
"--source_data_database" = module.academy_mssql_database_ingestion[0].ingestion_database_name
"--s3_ingestion_bucket_target" = "s3://${module.landing_zone.bucket_id}/academy/"
Expand Down Expand Up @@ -261,7 +265,7 @@ module "academy_state_machine" {
"Resource": "${module.max_concurrency_lambda[0].lambda_function_arn}",
"Parameters": {
"AvailableIPs.$": "$.SubnetResult.Subnets[0].AvailableIpAddressCount",
"NumTasks.$": "$.TaskCount"
"Workers.$": "${local.number_of_glue_workers}"
},
"ResultPath": "$.MaxConcurrencyResult",
"Next": "Map"
Expand All @@ -281,16 +285,21 @@ module "academy_state_machine" {
"JobName": "${module.academy_glue_job[0].job_name}",
"Parameters": {
"Arguments": {
"--table_filter_expression": "$.table_filter_expression"
"--table_filter_expression.$": "$.FilterString"
}
}
},
"End": true
}
}
},
"MaxConcurrencyPath": "$.MaxConcurrencyResult",
"End": true
"MaxConcurrencyPath": "$.MaxConcurrencyResult.max_concurrency",
"End": true,
"ItemsPath": "$.TableFilters",
"ItemSelector": {
"MaxConcurrency.$": "$.MaxConcurrencyResult.max_concurrency",
"FilterString.$": "$$.Map.Item.Value"
}
}
}
}
Expand Down Expand Up @@ -328,3 +337,105 @@ data "aws_iam_policy_document" "step_functions_assume_role_policy" {
}
}
}

data "aws_iam_policy_document" "academy_step_functions_policy" {
statement {
actions = [
"logs:CreateLogDelivery",
"logs:GetLogDelivery",
"logs:UpdateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:PutResourcePolicy",
"logs:DescribeResourcePolicies",
"logs:DescribeLogGroups"
]
resources = ["*"]
}

statement {
actions = [
"glue:StartJobRun",
"glue:GetJobRun",
"glue:GetJobRuns",
"glue:BatchStopJobRun"
]
resources = ["*"]
}

statement {
actions = [
"lambda:InvokeFunction"
]
resources = [module.max_concurrency_lambda[0].lambda_function_arn]
}

statement {
actions = [
"ec2:DescribeSubnets"
]
resources = ["*"]
}
}

resource "aws_cloudwatch_event_rule" "academy_state_machine_trigger" {
count = local.academy_state_machine_count
name = "${local.short_identifier_prefix}academy-state-machine-trigger"
tags = module.tags.values
description = "Trigger the Academy State Machine every weekday at 1am"
schedule_expression = "cron(0 1 ? * MON-FRI *)"
is_enabled = true
role_arn = aws_iam_role.academy_cloudwatch_execution_role[0].arn
}

resource "aws_cloudwatch_event_target" "academy_state_machine_trigger" {
count = local.academy_state_machine_count
rule = aws_cloudwatch_event_rule.academy_state_machine_trigger[0].name
arn = module.academy_state_machine[0].arn
input = <<EOF
{
"TableFilters": ${jsonencode(local.academy_table_filters)}
}
EOF
}

resource "aws_iam_role" "academy_cloudwatch_execution_role" {
count = local.academy_state_machine_count
name = "${local.short_identifier_prefix}academy-cloudwatch-execution-role"
tags = module.tags.values
assume_role_policy = data.aws_iam_policy_document.cloudwatch_assume_role_policy.json
}

data "aws_iam_policy_document" "cloudwatch_assume_role_policy" {
statement {
actions = ["sts:AssumeRole"]

principals {
type = "Service"
identifiers = ["events.amazonaws.com"]
}
}
}

data "aws_iam_policy_document" "cloudwatch_execution_policy" {
statement {
actions = [
"states:StartExecution"
]
resources = [module.academy_state_machine[0].arn]
}
}

resource "aws_iam_policy_attachment" "academy_cloudwatch_execution_policy_attachment" {
count = local.academy_state_machine_count
name = "${local.short_identifier_prefix}academy-cloudwatch-execution-policy-attachment"
policy_arn = aws_iam_policy.academy_cloudwatch_execution_policy[0].arn
roles = [aws_iam_role.academy_cloudwatch_execution_role[0].name]
}

resource "aws_iam_policy" "academy_cloudwatch_execution_policy" {
count = local.academy_state_machine_count
name = "${local.short_identifier_prefix}academy-cloudwatch-execution-policy"
tags = module.tags.values
policy = data.aws_iam_policy_document.cloudwatch_execution_policy.json
}

0 comments on commit b61339f

Please sign in to comment.