Skip to content

Commit

Permalink
DPR2-1053: Stop pipeline before ingestion (#7537)
Browse files Browse the repository at this point in the history
  • Loading branch information
koladeadewuyi-moj authored Aug 16, 2024
1 parent 367fcfe commit 85fb7a7
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,65 @@ module "data_ingestion_pipeline" {
definition = jsonencode(
{
"Comment" : "Data Ingestion Pipeline Step Function",
"StartAt" : "Start DMS Replication Task",
"StartAt" : "Deactivate Archive Trigger",
"States" : {
"Deactivate Archive Trigger" : {
"Type" : "Task",
"Resource" : "arn:aws:states:::glue:startJobRun.sync",
"Parameters" : {
"JobName" : var.glue_trigger_activation_job,
"Arguments" : {
"--dpr.glue.trigger.name" : var.archive_job_trigger_name,
"--dpr.glue.trigger.activate" : "false"
}
},
"Next" : "Stop Archive Job"
},
"Stop Archive Job" : {
"Type" : "Task",
"Resource" : "arn:aws:states:::glue:startJobRun.sync",
"Parameters" : {
"JobName" : var.glue_stop_glue_instance_job,
"Arguments" : {
"--dpr.stop.glue.instance.job.name" : var.glue_archive_job
}
},
"Next" : "Stop DMS Replication Task"
},
"Stop DMS Replication Task" : {
"Type" : "Task",
"Resource" : "arn:aws:states:::glue:startJobRun.sync",
"Parameters" : {
"JobName" : var.stop_dms_task_job,
"Arguments" : {
"--dpr.dms.replication.task.id" : var.replication_task_id
}
},
"Next" : "Stop Glue Streaming Job"
},
"Stop Glue Streaming Job" : {
"Type" : "Task",
"Resource" : "arn:aws:states:::glue:startJobRun.sync",
"Parameters" : {
"JobName" : var.glue_stop_glue_instance_job,
"Arguments" : {
"--dpr.stop.glue.instance.job.name" : var.glue_reporting_hub_cdc_jobname
}
},
"Next" : "Empty All Data"
},
"Empty All Data" : {
"Type" : "Task",
"Resource" : "arn:aws:states:::glue:startJobRun.sync",
"Parameters" : {
"JobName" : var.glue_s3_data_deletion_job,
"Arguments" : {
"--dpr.file.deletion.buckets" : "${var.s3_raw_bucket_id},${var.s3_raw_archive_bucket_id},${var.s3_structured_bucket_id},${var.s3_curated_bucket_id},${var.s3_temp_reload_bucket_id}",
"--dpr.config.key" : var.domain
}
},
"Next" : "Start DMS Replication Task"
},
"Start DMS Replication Task" : {
"Type" : "Task",
"Resource" : "arn:aws:states:::aws-sdk:databasemigration:startReplicationTask",
Expand Down Expand Up @@ -120,6 +177,18 @@ module "data_ingestion_pipeline" {
"--dpr.config.key" : var.domain
}
},
"Next" : "Reactivate Archive Trigger"
},
"Reactivate Archive Trigger" : {
"Type" : "Task",
"Resource" : "arn:aws:states:::glue:startJobRun.sync",
"Parameters" : {
"JobName" : var.glue_trigger_activation_job,
"Arguments" : {
"--dpr.glue.trigger.name" : var.archive_job_trigger_name,
"--dpr.glue.trigger.activate" : "true"
}
},
"End" : true
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,19 @@ variable "pipeline_additional_policies" {
default = []
}

variable "dms_replication_task_arn" {}
variable "glue_s3_data_deletion_job" {
description = "Name of glue job which deletes parquet files from s3 bucket(s)"
type = string
default = ""
}

variable "dms_replication_task_arn" {
type = string
}

variable "replication_task_id" {
type = string
}

variable "pipeline_notification_lambda_function" {
description = "Pipeline Notification Lambda Name"
Expand Down Expand Up @@ -61,6 +73,54 @@ variable "s3_raw_archive_bucket_id" {
default = ""
}

variable "s3_structured_bucket_id" {
description = "S3, Structured Bucket ID"
type = string
default = ""
}

variable "s3_curated_bucket_id" {
description = "S3, Curated Bucket ID"
type = string
default = ""
}

variable "s3_temp_reload_bucket_id" {
description = "S3 Bucket ID for the temporary location to store reload data"
type = string
default = ""
}

variable "glue_stop_glue_instance_job" {
description = "Name of job to stop the current running instance of the streaming job"
type = string
default = ""
}

variable "stop_dms_task_job" {
description = "Name of job to stop a running DMS task"
type = string
default = ""
}

variable "glue_trigger_activation_job" {
description = "Name of job to which activates/deactivates a glue trigger"
type = string
default = ""
}

variable "archive_job_trigger_name" {
description = "Name of the trigger for a glue trigger"
type = string
default = ""
}

variable "glue_archive_job" {
description = "Name of the glue job which archives the raw data"
type = string
default = ""
}

variable "glue_s3_file_transfer_job" {
description = "Name of s3 file transfer job"
type = string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,15 @@ module "reload_pipeline" {
"--dpr.config.key" : var.domain
}
},
"Next" : "Empty Structured and Curated Data"
"Next" : "Empty Raw, Structured and Curated Data"
},
"Empty Structured and Curated Data" : {
"Empty Raw, Structured and Curated Data" : {
"Type" : "Task",
"Resource" : "arn:aws:states:::glue:startJobRun.sync",
"Parameters" : {
"JobName" : var.glue_s3_data_deletion_job,
"Arguments" : {
"--dpr.file.deletion.buckets" : "${var.s3_structured_bucket_id},${var.s3_curated_bucket_id}",
"--dpr.file.deletion.buckets" : "${var.s3_raw_bucket_id},${var.s3_structured_bucket_id},${var.s3_curated_bucket_id}",
"--dpr.config.key" : var.domain
}
},
Expand Down

0 comments on commit 85fb7a7

Please sign in to comment.