Skip to content

Commit

Permalink
Merge pull request #34316 from hashicorp/f-pause-rep-tasks-endpoint-dms
Browse files Browse the repository at this point in the history
dms/endpoint: Pause replication tasks for updates
  • Loading branch information
YakDriver authored Nov 9, 2023
2 parents e4b2602 + 357ee9d commit a55d897
Show file tree
Hide file tree
Showing 5 changed files with 435 additions and 11 deletions.
3 changes: 3 additions & 0 deletions .changelog/34316.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_dms_endpoint: Add `pause_replication_tasks`, which when set to `true`, pauses associated running replication tasks, regardless if they are managed by Terraform, prior to modifying the endpoint (only tasks paused by the resource will be restarted after the modification completes)
```
117 changes: 116 additions & 1 deletion internal/service/dms/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ func ResourceEndpoint() *schema.Resource {
Sensitive: true,
ConflictsWith: []string{"secrets_manager_access_role_arn", "secrets_manager_arn"},
},
"pause_replication_tasks": {
Type: schema.TypeBool,
Optional: true,
},
"port": {
Type: schema.TypeInt,
Optional: true,
Expand Down Expand Up @@ -1304,11 +1308,25 @@ func resourceEndpointUpdate(ctx context.Context, d *schema.ResourceData, meta in
}
}

_, err := conn.ModifyEndpointWithContext(ctx, input)
var tasks []*dms.ReplicationTask
if v, ok := d.GetOk("pause_replication_tasks"); ok && v.(bool) {
var err error
tasks, err = stopEndpointReplicationTasks(ctx, conn, d.Get("endpoint_arn").(string))
if err != nil {
return sdkdiag.AppendErrorf(diags, "pausing replication tasks before updating DMS Endpoint (%s): %s", d.Id(), err)
}
}

_, err := conn.ModifyEndpointWithContext(ctx, input)
if err != nil {
return sdkdiag.AppendErrorf(diags, "updating DMS Endpoint (%s): %s", d.Id(), err)
}

if v, ok := d.GetOk("pause_replication_tasks"); ok && v.(bool) && len(tasks) > 0 {
if err := startEndpointReplicationTasks(ctx, conn, d.Get("endpoint_arn").(string), tasks); err != nil {
return sdkdiag.AppendErrorf(diags, "starting replication tasks after updating DMS Endpoint (%s): %s", d.Id(), err)
}
}
}

return append(diags, resourceEndpointRead(ctx, d, meta)...)
Expand Down Expand Up @@ -1581,6 +1599,103 @@ func resourceEndpointSetState(d *schema.ResourceData, endpoint *dms.Endpoint) er
return nil
}

func steadyEndpointReplicationTasks(ctx context.Context, conn *dms.DatabaseMigrationService, arn string) error {
tasks, err := FindReplicationTasksByEndpointARN(ctx, conn, arn)
if err != nil {
return err
}

for _, task := range tasks {
rtID := aws.StringValue(task.ReplicationTaskIdentifier)
switch aws.StringValue(task.Status) {
case replicationTaskStatusRunning, replicationTaskStatusFailed, replicationTaskStatusReady, replicationTaskStatusStopped:
continue
case replicationTaskStatusCreating, replicationTaskStatusDeleting, replicationTaskStatusModifying, replicationTaskStatusStopping, replicationTaskStatusStarting:
if err := waitReplicationTaskSteady(ctx, conn, rtID); err != nil {
return err
}
}
}

return nil
}

func stopEndpointReplicationTasks(ctx context.Context, conn *dms.DatabaseMigrationService, arn string) ([]*dms.ReplicationTask, error) {
if err := steadyEndpointReplicationTasks(ctx, conn, arn); err != nil {
return nil, err
}

tasks, err := FindReplicationTasksByEndpointARN(ctx, conn, arn)
if err != nil {
return nil, err
}

var stoppedTasks []*dms.ReplicationTask
for _, task := range tasks {
rtID := aws.StringValue(task.ReplicationTaskIdentifier)
switch aws.StringValue(task.Status) {
case replicationTaskStatusRunning:
err := stopReplicationTask(ctx, rtID, conn)
if tfawserr.ErrCodeEquals(err, dms.ErrCodeInvalidResourceStateFault) {
continue
}

if err != nil {
return stoppedTasks, err
}
stoppedTasks = append(stoppedTasks, task)
default:
continue
}
}

return stoppedTasks, nil
}

func startEndpointReplicationTasks(ctx context.Context, conn *dms.DatabaseMigrationService, arn string, tasks []*dms.ReplicationTask) error {
if len(tasks) == 0 {
return nil
}

if err := steadyEndpointReplicationTasks(ctx, conn, arn); err != nil {
return err
}

for _, task := range tasks {
_, err := conn.TestConnectionWithContext(ctx, &dms.TestConnectionInput{
EndpointArn: aws.String(arn),
ReplicationInstanceArn: task.ReplicationInstanceArn,
})

if tfawserr.ErrMessageContains(err, dms.ErrCodeInvalidResourceStateFault, "already being tested") {
continue
}

if err != nil {
return fmt.Errorf("testing connection: %w", err)
}

err = conn.WaitUntilTestConnectionSucceedsWithContext(ctx, &dms.DescribeConnectionsInput{
Filters: []*dms.Filter{
{
Name: aws.String("endpoint-arn"),
Values: []*string{aws.String(arn)},
},
},
})

if err != nil {
return fmt.Errorf("waiting until test connection succeeds: %w", err)
}

if err := startReplicationTask(ctx, conn, aws.StringValue(task.ReplicationTaskIdentifier)); err != nil {
return fmt.Errorf("starting replication task: %w", err)
}
}

return nil
}

func flattenOpenSearchSettings(settings *dms.ElasticsearchSettings) []map[string]interface{} {
if settings == nil {
return []map[string]interface{}{}
Expand Down
239 changes: 239 additions & 0 deletions internal/service/dms/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2112,6 +2112,43 @@ func TestAccDMSEndpoint_Redshift_SSEKMSKeyId(t *testing.T) {
})
}

func TestAccDMSEndpoint_pauseReplicationTasks(t *testing.T) {
ctx := acctest.Context(t)
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
endpointNameSource := "aws_dms_endpoint.source"
endpointNameTarget := "aws_dms_endpoint.target"
replicationTaskName := "aws_dms_replication_task.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t) },
ErrorCheck: acctest.ErrorCheck(t, dms.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckReplicationTaskDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccEndpointConfig_pauseReplicationTasks(rName, "3306"),
Check: resource.ComposeTestCheckFunc(
testAccCheckEndpointExists(ctx, endpointNameSource),
testAccCheckEndpointExists(ctx, endpointNameTarget),
testAccCheckReplicationTaskExists(ctx, replicationTaskName),
resource.TestCheckResourceAttr(endpointNameTarget, "port", "3306"),
resource.TestCheckResourceAttr(replicationTaskName, "status", "running"),
),
},
{
Config: testAccEndpointConfig_pauseReplicationTasks(rName, "3307"),
Check: resource.ComposeTestCheckFunc(
testAccCheckEndpointExists(ctx, endpointNameSource),
testAccCheckEndpointExists(ctx, endpointNameTarget),
testAccCheckReplicationTaskExists(ctx, replicationTaskName),
resource.TestCheckResourceAttr(endpointNameTarget, "port", "3307"),
resource.TestCheckResourceAttr(replicationTaskName, "status", "running"),
),
},
},
})
}

// testAccCheckResourceAttrRegionalHostname ensures the Terraform state exactly matches a formatted DNS hostname with region and partition DNS suffix
func testAccCheckResourceAttrRegionalHostname(resourceName, attributeName, serviceName, hostnamePrefix string) resource.TestCheckFunc {
return func(s *terraform.State) error {
Expand Down Expand Up @@ -4471,3 +4508,205 @@ resource "aws_kms_key" "test" {
}
`, rName))
}

func testAccEndpointConfig_pauseReplicationTasks(rName, port string) string {
return acctest.ConfigCompose(
acctest.ConfigAvailableAZsNoOptIn(),
fmt.Sprintf(`
data "aws_partition" "current" {}
data "aws_region" "current" {}
resource "aws_vpc" "test" {
cidr_block = "10.1.0.0/16"
tags = {
Name = %[1]q
}
}
resource "aws_subnet" "test1" {
cidr_block = "10.1.1.0/24"
availability_zone = data.aws_availability_zones.available.names[0]
vpc_id = aws_vpc.test.id
tags = {
Name = %[1]q
}
}
resource "aws_subnet" "test2" {
cidr_block = "10.1.2.0/24"
availability_zone = data.aws_availability_zones.available.names[1]
vpc_id = aws_vpc.test.id
tags = {
Name = "%[1]s-2"
}
}
resource "aws_security_group" "test" {
vpc_id = aws_vpc.test.id
ingress {
protocol = -1
self = true
from_port = 0
to_port = 0
}
egress {
from_port = 0
to_port = 0
protocol = "-1"
cidr_blocks = ["0.0.0.0/0"]
}
}
resource "aws_db_subnet_group" "test" {
name = %[1]q
subnet_ids = [aws_subnet.test1.id, aws_subnet.test2.id]
tags = {
Name = %[1]q
}
}
data "aws_rds_engine_version" "default" {
engine = "aurora-mysql"
}
data "aws_rds_orderable_db_instance" "test" {
engine = data.aws_rds_engine_version.default.engine
engine_version = data.aws_rds_engine_version.default.version
preferred_instance_classes = ["db.t3.small", "db.t3.medium", "db.t3.large"]
}
resource "aws_rds_cluster_parameter_group" "test" {
name = "%[1]s-pg-cluster"
family = data.aws_rds_engine_version.default.parameter_group_family
description = "DMS cluster parameter group"
parameter {
name = "binlog_format"
value = "ROW"
apply_method = "pending-reboot"
}
parameter {
name = "binlog_row_image"
value = "Full"
apply_method = "pending-reboot"
}
parameter {
name = "binlog_checksum"
value = "NONE"
apply_method = "pending-reboot"
}
}
resource "aws_rds_cluster" "source" {
cluster_identifier = "%[1]s-aurora-cluster-source"
engine = data.aws_rds_orderable_db_instance.test.engine
engine_version = data.aws_rds_orderable_db_instance.test.engine_version
database_name = "tftest"
master_username = "tftest"
master_password = "mustbeeightcharaters"
skip_final_snapshot = true
vpc_security_group_ids = [aws_security_group.test.id]
db_subnet_group_name = aws_db_subnet_group.test.name
db_cluster_parameter_group_name = aws_rds_cluster_parameter_group.test.name
}
resource "aws_rds_cluster_instance" "source" {
identifier = "%[1]s-source-primary"
cluster_identifier = aws_rds_cluster.source.id
engine = data.aws_rds_orderable_db_instance.test.engine
engine_version = data.aws_rds_orderable_db_instance.test.engine_version
instance_class = data.aws_rds_orderable_db_instance.test.instance_class
db_subnet_group_name = aws_db_subnet_group.test.name
}
resource "aws_rds_cluster" "target" {
cluster_identifier = "%[1]s-aurora-cluster-target"
engine = data.aws_rds_orderable_db_instance.test.engine
engine_version = data.aws_rds_orderable_db_instance.test.engine_version
database_name = "tftest"
master_username = "tftest"
master_password = "mustbeeightcharaters"
skip_final_snapshot = true
vpc_security_group_ids = [aws_security_group.test.id]
db_subnet_group_name = aws_db_subnet_group.test.name
}
resource "aws_rds_cluster_instance" "target" {
identifier = "%[1]s-target-primary"
cluster_identifier = aws_rds_cluster.target.id
engine = data.aws_rds_orderable_db_instance.test.engine
engine_version = data.aws_rds_orderable_db_instance.test.engine_version
instance_class = data.aws_rds_orderable_db_instance.test.instance_class
db_subnet_group_name = aws_db_subnet_group.test.name
}
resource "aws_dms_endpoint" "source" {
database_name = "tftest"
endpoint_id = "%[1]s-source"
endpoint_type = "source"
engine_name = "aurora"
password = "mustbeeightcharaters"
pause_replication_tasks = true
port = %[2]s
server_name = aws_rds_cluster.source.endpoint
username = "tftest"
}
resource "aws_dms_endpoint" "target" {
database_name = "tftest"
endpoint_id = "%[1]s-target"
endpoint_type = "target"
engine_name = "aurora"
password = "mustbeeightcharaters"
pause_replication_tasks = true
port = %[2]s
server_name = aws_rds_cluster.target.endpoint
username = "tftest"
}
resource "aws_dms_replication_subnet_group" "test" {
replication_subnet_group_id = %[1]q
replication_subnet_group_description = "terraform test for replication subnet group"
subnet_ids = [aws_subnet.test1.id, aws_subnet.test2.id]
}
resource "aws_dms_replication_instance" "test" {
allocated_storage = 5
auto_minor_version_upgrade = true
replication_instance_class = "dms.c4.large"
replication_instance_id = %[1]q
preferred_maintenance_window = "sun:00:30-sun:02:30"
publicly_accessible = false
replication_subnet_group_id = aws_dms_replication_subnet_group.test.replication_subnet_group_id
vpc_security_group_ids = [aws_security_group.test.id]
}
resource "aws_dms_replication_task" "test" {
migration_type = "full-load-and-cdc"
replication_instance_arn = aws_dms_replication_instance.test.replication_instance_arn
replication_task_id = %[1]q
replication_task_settings = "{\"BeforeImageSettings\":null,\"FailTaskWhenCleanTaskResourceFailed\":false,\"ChangeProcessingDdlHandlingPolicy\":{\"HandleSourceTableAltered\":true,\"HandleSourceTableDropped\":true,\"HandleSourceTableTruncated\":true},\"ChangeProcessingTuning\":{\"BatchApplyMemoryLimit\":500,\"BatchApplyPreserveTransaction\":true,\"BatchApplyTimeoutMax\":30,\"BatchApplyTimeoutMin\":1,\"BatchSplitSize\":0,\"CommitTimeout\":1,\"MemoryKeepTime\":60,\"MemoryLimitTotal\":1024,\"MinTransactionSize\":1000,\"StatementCacheSize\":50},\"CharacterSetSettings\":null,\"ControlTablesSettings\":{\"ControlSchema\":\"\",\"FullLoadExceptionTableEnabled\":false,\"HistoryTableEnabled\":false,\"HistoryTimeslotInMinutes\":5,\"StatusTableEnabled\":false,\"SuspendedTablesTableEnabled\":false},\"ErrorBehavior\":{\"ApplyErrorDeletePolicy\":\"IGNORE_RECORD\",\"ApplyErrorEscalationCount\":0,\"ApplyErrorEscalationPolicy\":\"LOG_ERROR\",\"ApplyErrorFailOnTruncationDdl\":false,\"ApplyErrorInsertPolicy\":\"LOG_ERROR\",\"ApplyErrorUpdatePolicy\":\"LOG_ERROR\",\"DataErrorEscalationCount\":0,\"DataErrorEscalationPolicy\":\"SUSPEND_TABLE\",\"DataErrorPolicy\":\"LOG_ERROR\",\"DataTruncationErrorPolicy\":\"LOG_ERROR\",\"EventErrorPolicy\":\"IGNORE\",\"FailOnNoTablesCaptured\":false,\"FailOnTransactionConsistencyBreached\":false,\"FullLoadIgnoreConflicts\":true,\"RecoverableErrorCount\":-1,\"RecoverableErrorInterval\":5,\"RecoverableErrorStopRetryAfterThrottlingMax\":false,\"RecoverableErrorThrottling\":true,\"RecoverableErrorThrottlingMax\":1800,\"TableErrorEscalationCount\":0,\"TableErrorEscalationPolicy\":\"STOP_TASK\",\"TableErrorPolicy\":\"SUSPEND_TABLE\"},\"FullLoadSettings\":{\"CommitRate\":10000,\"CreatePkAfterFullLoad\":false,\"MaxFullLoadSubTasks\":8,\"StopTaskCachedChangesApplied\":false,\"StopTaskCachedChangesNotApplied\":false,\"TargetTablePrepMode\":\"DROP_AND_CREATE\",\"TransactionConsistencyTimeout\":600},\"Logging\":{\"EnableLogging\":false,\"LogComponents\":[{\"Id\":\"TRANSFORMATION\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SOURCE_UNLOAD\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"IO\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TARGET_LOAD\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"PERFORMANCE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SOURCE_CAPTURE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SORTER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"REST_SERVER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"VALIDATOR_EXT\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TARGET_APPLY\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TASK_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TABLES_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"METADATA_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"FILE_FACTORY\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"COMMON\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"ADDONS\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"DATA_STRUCTURE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"COMMUNICATION\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"FILE_TRANSFER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"}]},\"LoopbackPreventionSettings\":null,\"PostProcessingRules\":null,\"StreamBufferSettings\":{\"CtrlStreamBufferSizeInMB\":5,\"StreamBufferCount\":3,\"StreamBufferSizeInMB\":8},\"TargetMetadata\":{\"BatchApplyEnabled\":false,\"FullLobMode\":false,\"InlineLobMaxSize\":0,\"LimitedSizeLobMode\":true,\"LoadMaxFileSize\":0,\"LobChunkSize\":0,\"LobMaxSize\":32,\"ParallelApplyBufferSize\":0,\"ParallelApplyQueuesPerThread\":0,\"ParallelApplyThreads\":0,\"ParallelLoadBufferSize\":0,\"ParallelLoadQueuesPerThread\":0,\"ParallelLoadThreads\":0,\"SupportLobs\":true,\"TargetSchema\":\"\",\"TaskRecoveryTableEnabled\":false},\"TTSettings\":{\"EnableTT\":false,\"TTRecordSettings\":null,\"TTS3Settings\":null}}"
source_endpoint_arn = aws_dms_endpoint.source.endpoint_arn
table_mappings = "{\"rules\":[{\"rule-type\":\"selection\",\"rule-id\":\"1\",\"rule-name\":\"testrule\",\"object-locator\":{\"schema-name\":\"%%\",\"table-name\":\"%%\"},\"rule-action\":\"include\"}]}"
start_replication_task = true
tags = {
Name = %[1]q
}
target_endpoint_arn = aws_dms_endpoint.target.endpoint_arn
depends_on = [aws_rds_cluster_instance.source, aws_rds_cluster_instance.target]
}
`, rName, port))
}
Loading

0 comments on commit a55d897

Please sign in to comment.