diff --git a/.changelog/34316.txt b/.changelog/34316.txt new file mode 100644 index 00000000000..01809aec633 --- /dev/null +++ b/.changelog/34316.txt @@ -0,0 +1,3 @@ +```release-note:enhancement +resource/aws_dms_endpoint: Add `pause_replication_tasks`, which when set to `true`, pauses associated running replication tasks, regardless if they are managed by Terraform, prior to modifying the endpoint (only tasks paused by the resource will be restarted after the modification completes) +``` \ No newline at end of file diff --git a/internal/service/dms/endpoint.go b/internal/service/dms/endpoint.go index b147104e692..c79a0a76656 100644 --- a/internal/service/dms/endpoint.go +++ b/internal/service/dms/endpoint.go @@ -339,6 +339,10 @@ func ResourceEndpoint() *schema.Resource { Sensitive: true, ConflictsWith: []string{"secrets_manager_access_role_arn", "secrets_manager_arn"}, }, + "pause_replication_tasks": { + Type: schema.TypeBool, + Optional: true, + }, "port": { Type: schema.TypeInt, Optional: true, @@ -1304,11 +1308,25 @@ func resourceEndpointUpdate(ctx context.Context, d *schema.ResourceData, meta in } } - _, err := conn.ModifyEndpointWithContext(ctx, input) + var tasks []*dms.ReplicationTask + if v, ok := d.GetOk("pause_replication_tasks"); ok && v.(bool) { + var err error + tasks, err = stopEndpointReplicationTasks(ctx, conn, d.Get("endpoint_arn").(string)) + if err != nil { + return sdkdiag.AppendErrorf(diags, "pausing replication tasks before updating DMS Endpoint (%s): %s", d.Id(), err) + } + } + _, err := conn.ModifyEndpointWithContext(ctx, input) if err != nil { return sdkdiag.AppendErrorf(diags, "updating DMS Endpoint (%s): %s", d.Id(), err) } + + if v, ok := d.GetOk("pause_replication_tasks"); ok && v.(bool) && len(tasks) > 0 { + if err := startEndpointReplicationTasks(ctx, conn, d.Get("endpoint_arn").(string), tasks); err != nil { + return sdkdiag.AppendErrorf(diags, "starting replication tasks after updating DMS Endpoint (%s): %s", d.Id(), err) + } + } } return append(diags, resourceEndpointRead(ctx, d, meta)...) @@ -1581,6 +1599,103 @@ func resourceEndpointSetState(d *schema.ResourceData, endpoint *dms.Endpoint) er return nil } +func steadyEndpointReplicationTasks(ctx context.Context, conn *dms.DatabaseMigrationService, arn string) error { + tasks, err := FindReplicationTasksByEndpointARN(ctx, conn, arn) + if err != nil { + return err + } + + for _, task := range tasks { + rtID := aws.StringValue(task.ReplicationTaskIdentifier) + switch aws.StringValue(task.Status) { + case replicationTaskStatusRunning, replicationTaskStatusFailed, replicationTaskStatusReady, replicationTaskStatusStopped: + continue + case replicationTaskStatusCreating, replicationTaskStatusDeleting, replicationTaskStatusModifying, replicationTaskStatusStopping, replicationTaskStatusStarting: + if err := waitReplicationTaskSteady(ctx, conn, rtID); err != nil { + return err + } + } + } + + return nil +} + +func stopEndpointReplicationTasks(ctx context.Context, conn *dms.DatabaseMigrationService, arn string) ([]*dms.ReplicationTask, error) { + if err := steadyEndpointReplicationTasks(ctx, conn, arn); err != nil { + return nil, err + } + + tasks, err := FindReplicationTasksByEndpointARN(ctx, conn, arn) + if err != nil { + return nil, err + } + + var stoppedTasks []*dms.ReplicationTask + for _, task := range tasks { + rtID := aws.StringValue(task.ReplicationTaskIdentifier) + switch aws.StringValue(task.Status) { + case replicationTaskStatusRunning: + err := stopReplicationTask(ctx, rtID, conn) + if tfawserr.ErrCodeEquals(err, dms.ErrCodeInvalidResourceStateFault) { + continue + } + + if err != nil { + return stoppedTasks, err + } + stoppedTasks = append(stoppedTasks, task) + default: + continue + } + } + + return stoppedTasks, nil +} + +func startEndpointReplicationTasks(ctx context.Context, conn *dms.DatabaseMigrationService, arn string, tasks []*dms.ReplicationTask) error { + if len(tasks) == 0 { + return nil + } + + if err := steadyEndpointReplicationTasks(ctx, conn, arn); err != nil { + return err + } + + for _, task := range tasks { + _, err := conn.TestConnectionWithContext(ctx, &dms.TestConnectionInput{ + EndpointArn: aws.String(arn), + ReplicationInstanceArn: task.ReplicationInstanceArn, + }) + + if tfawserr.ErrMessageContains(err, dms.ErrCodeInvalidResourceStateFault, "already being tested") { + continue + } + + if err != nil { + return fmt.Errorf("testing connection: %w", err) + } + + err = conn.WaitUntilTestConnectionSucceedsWithContext(ctx, &dms.DescribeConnectionsInput{ + Filters: []*dms.Filter{ + { + Name: aws.String("endpoint-arn"), + Values: []*string{aws.String(arn)}, + }, + }, + }) + + if err != nil { + return fmt.Errorf("waiting until test connection succeeds: %w", err) + } + + if err := startReplicationTask(ctx, conn, aws.StringValue(task.ReplicationTaskIdentifier)); err != nil { + return fmt.Errorf("starting replication task: %w", err) + } + } + + return nil +} + func flattenOpenSearchSettings(settings *dms.ElasticsearchSettings) []map[string]interface{} { if settings == nil { return []map[string]interface{}{} diff --git a/internal/service/dms/endpoint_test.go b/internal/service/dms/endpoint_test.go index bf355246b58..2401e7f01c4 100644 --- a/internal/service/dms/endpoint_test.go +++ b/internal/service/dms/endpoint_test.go @@ -2112,6 +2112,43 @@ func TestAccDMSEndpoint_Redshift_SSEKMSKeyId(t *testing.T) { }) } +func TestAccDMSEndpoint_pauseReplicationTasks(t *testing.T) { + ctx := acctest.Context(t) + rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix) + endpointNameSource := "aws_dms_endpoint.source" + endpointNameTarget := "aws_dms_endpoint.target" + replicationTaskName := "aws_dms_replication_task.test" + + resource.ParallelTest(t, resource.TestCase{ + PreCheck: func() { acctest.PreCheck(ctx, t) }, + ErrorCheck: acctest.ErrorCheck(t, dms.EndpointsID), + ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories, + CheckDestroy: testAccCheckReplicationTaskDestroy(ctx), + Steps: []resource.TestStep{ + { + Config: testAccEndpointConfig_pauseReplicationTasks(rName, "3306"), + Check: resource.ComposeTestCheckFunc( + testAccCheckEndpointExists(ctx, endpointNameSource), + testAccCheckEndpointExists(ctx, endpointNameTarget), + testAccCheckReplicationTaskExists(ctx, replicationTaskName), + resource.TestCheckResourceAttr(endpointNameTarget, "port", "3306"), + resource.TestCheckResourceAttr(replicationTaskName, "status", "running"), + ), + }, + { + Config: testAccEndpointConfig_pauseReplicationTasks(rName, "3307"), + Check: resource.ComposeTestCheckFunc( + testAccCheckEndpointExists(ctx, endpointNameSource), + testAccCheckEndpointExists(ctx, endpointNameTarget), + testAccCheckReplicationTaskExists(ctx, replicationTaskName), + resource.TestCheckResourceAttr(endpointNameTarget, "port", "3307"), + resource.TestCheckResourceAttr(replicationTaskName, "status", "running"), + ), + }, + }, + }) +} + // testAccCheckResourceAttrRegionalHostname ensures the Terraform state exactly matches a formatted DNS hostname with region and partition DNS suffix func testAccCheckResourceAttrRegionalHostname(resourceName, attributeName, serviceName, hostnamePrefix string) resource.TestCheckFunc { return func(s *terraform.State) error { @@ -4471,3 +4508,205 @@ resource "aws_kms_key" "test" { } `, rName)) } + +func testAccEndpointConfig_pauseReplicationTasks(rName, port string) string { + return acctest.ConfigCompose( + acctest.ConfigAvailableAZsNoOptIn(), + fmt.Sprintf(` +data "aws_partition" "current" {} + +data "aws_region" "current" {} + +resource "aws_vpc" "test" { + cidr_block = "10.1.0.0/16" + + tags = { + Name = %[1]q + } +} + +resource "aws_subnet" "test1" { + cidr_block = "10.1.1.0/24" + availability_zone = data.aws_availability_zones.available.names[0] + vpc_id = aws_vpc.test.id + + tags = { + Name = %[1]q + } +} + +resource "aws_subnet" "test2" { + cidr_block = "10.1.2.0/24" + availability_zone = data.aws_availability_zones.available.names[1] + vpc_id = aws_vpc.test.id + + tags = { + Name = "%[1]s-2" + } +} + +resource "aws_security_group" "test" { + vpc_id = aws_vpc.test.id + + ingress { + protocol = -1 + self = true + from_port = 0 + to_port = 0 + } + + egress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + } +} + +resource "aws_db_subnet_group" "test" { + name = %[1]q + subnet_ids = [aws_subnet.test1.id, aws_subnet.test2.id] + + tags = { + Name = %[1]q + } +} + +data "aws_rds_engine_version" "default" { + engine = "aurora-mysql" +} + +data "aws_rds_orderable_db_instance" "test" { + engine = data.aws_rds_engine_version.default.engine + engine_version = data.aws_rds_engine_version.default.version + preferred_instance_classes = ["db.t3.small", "db.t3.medium", "db.t3.large"] +} + +resource "aws_rds_cluster_parameter_group" "test" { + name = "%[1]s-pg-cluster" + family = data.aws_rds_engine_version.default.parameter_group_family + description = "DMS cluster parameter group" + + parameter { + name = "binlog_format" + value = "ROW" + apply_method = "pending-reboot" + } + + parameter { + name = "binlog_row_image" + value = "Full" + apply_method = "pending-reboot" + } + + parameter { + name = "binlog_checksum" + value = "NONE" + apply_method = "pending-reboot" + } +} + +resource "aws_rds_cluster" "source" { + cluster_identifier = "%[1]s-aurora-cluster-source" + engine = data.aws_rds_orderable_db_instance.test.engine + engine_version = data.aws_rds_orderable_db_instance.test.engine_version + database_name = "tftest" + master_username = "tftest" + master_password = "mustbeeightcharaters" + skip_final_snapshot = true + vpc_security_group_ids = [aws_security_group.test.id] + db_subnet_group_name = aws_db_subnet_group.test.name + db_cluster_parameter_group_name = aws_rds_cluster_parameter_group.test.name +} + +resource "aws_rds_cluster_instance" "source" { + identifier = "%[1]s-source-primary" + cluster_identifier = aws_rds_cluster.source.id + engine = data.aws_rds_orderable_db_instance.test.engine + engine_version = data.aws_rds_orderable_db_instance.test.engine_version + instance_class = data.aws_rds_orderable_db_instance.test.instance_class + db_subnet_group_name = aws_db_subnet_group.test.name +} + +resource "aws_rds_cluster" "target" { + cluster_identifier = "%[1]s-aurora-cluster-target" + engine = data.aws_rds_orderable_db_instance.test.engine + engine_version = data.aws_rds_orderable_db_instance.test.engine_version + database_name = "tftest" + master_username = "tftest" + master_password = "mustbeeightcharaters" + skip_final_snapshot = true + vpc_security_group_ids = [aws_security_group.test.id] + db_subnet_group_name = aws_db_subnet_group.test.name +} + +resource "aws_rds_cluster_instance" "target" { + identifier = "%[1]s-target-primary" + cluster_identifier = aws_rds_cluster.target.id + engine = data.aws_rds_orderable_db_instance.test.engine + engine_version = data.aws_rds_orderable_db_instance.test.engine_version + instance_class = data.aws_rds_orderable_db_instance.test.instance_class + db_subnet_group_name = aws_db_subnet_group.test.name +} + +resource "aws_dms_endpoint" "source" { + database_name = "tftest" + endpoint_id = "%[1]s-source" + endpoint_type = "source" + engine_name = "aurora" + password = "mustbeeightcharaters" + pause_replication_tasks = true + port = %[2]s + server_name = aws_rds_cluster.source.endpoint + username = "tftest" +} + +resource "aws_dms_endpoint" "target" { + database_name = "tftest" + endpoint_id = "%[1]s-target" + endpoint_type = "target" + engine_name = "aurora" + password = "mustbeeightcharaters" + pause_replication_tasks = true + port = %[2]s + server_name = aws_rds_cluster.target.endpoint + username = "tftest" +} + +resource "aws_dms_replication_subnet_group" "test" { + replication_subnet_group_id = %[1]q + replication_subnet_group_description = "terraform test for replication subnet group" + subnet_ids = [aws_subnet.test1.id, aws_subnet.test2.id] +} + +resource "aws_dms_replication_instance" "test" { + allocated_storage = 5 + auto_minor_version_upgrade = true + replication_instance_class = "dms.c4.large" + replication_instance_id = %[1]q + preferred_maintenance_window = "sun:00:30-sun:02:30" + publicly_accessible = false + replication_subnet_group_id = aws_dms_replication_subnet_group.test.replication_subnet_group_id + vpc_security_group_ids = [aws_security_group.test.id] +} + +resource "aws_dms_replication_task" "test" { + migration_type = "full-load-and-cdc" + replication_instance_arn = aws_dms_replication_instance.test.replication_instance_arn + replication_task_id = %[1]q + replication_task_settings = "{\"BeforeImageSettings\":null,\"FailTaskWhenCleanTaskResourceFailed\":false,\"ChangeProcessingDdlHandlingPolicy\":{\"HandleSourceTableAltered\":true,\"HandleSourceTableDropped\":true,\"HandleSourceTableTruncated\":true},\"ChangeProcessingTuning\":{\"BatchApplyMemoryLimit\":500,\"BatchApplyPreserveTransaction\":true,\"BatchApplyTimeoutMax\":30,\"BatchApplyTimeoutMin\":1,\"BatchSplitSize\":0,\"CommitTimeout\":1,\"MemoryKeepTime\":60,\"MemoryLimitTotal\":1024,\"MinTransactionSize\":1000,\"StatementCacheSize\":50},\"CharacterSetSettings\":null,\"ControlTablesSettings\":{\"ControlSchema\":\"\",\"FullLoadExceptionTableEnabled\":false,\"HistoryTableEnabled\":false,\"HistoryTimeslotInMinutes\":5,\"StatusTableEnabled\":false,\"SuspendedTablesTableEnabled\":false},\"ErrorBehavior\":{\"ApplyErrorDeletePolicy\":\"IGNORE_RECORD\",\"ApplyErrorEscalationCount\":0,\"ApplyErrorEscalationPolicy\":\"LOG_ERROR\",\"ApplyErrorFailOnTruncationDdl\":false,\"ApplyErrorInsertPolicy\":\"LOG_ERROR\",\"ApplyErrorUpdatePolicy\":\"LOG_ERROR\",\"DataErrorEscalationCount\":0,\"DataErrorEscalationPolicy\":\"SUSPEND_TABLE\",\"DataErrorPolicy\":\"LOG_ERROR\",\"DataTruncationErrorPolicy\":\"LOG_ERROR\",\"EventErrorPolicy\":\"IGNORE\",\"FailOnNoTablesCaptured\":false,\"FailOnTransactionConsistencyBreached\":false,\"FullLoadIgnoreConflicts\":true,\"RecoverableErrorCount\":-1,\"RecoverableErrorInterval\":5,\"RecoverableErrorStopRetryAfterThrottlingMax\":false,\"RecoverableErrorThrottling\":true,\"RecoverableErrorThrottlingMax\":1800,\"TableErrorEscalationCount\":0,\"TableErrorEscalationPolicy\":\"STOP_TASK\",\"TableErrorPolicy\":\"SUSPEND_TABLE\"},\"FullLoadSettings\":{\"CommitRate\":10000,\"CreatePkAfterFullLoad\":false,\"MaxFullLoadSubTasks\":8,\"StopTaskCachedChangesApplied\":false,\"StopTaskCachedChangesNotApplied\":false,\"TargetTablePrepMode\":\"DROP_AND_CREATE\",\"TransactionConsistencyTimeout\":600},\"Logging\":{\"EnableLogging\":false,\"LogComponents\":[{\"Id\":\"TRANSFORMATION\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SOURCE_UNLOAD\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"IO\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TARGET_LOAD\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"PERFORMANCE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SOURCE_CAPTURE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SORTER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"REST_SERVER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"VALIDATOR_EXT\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TARGET_APPLY\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TASK_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TABLES_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"METADATA_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"FILE_FACTORY\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"COMMON\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"ADDONS\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"DATA_STRUCTURE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"COMMUNICATION\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"FILE_TRANSFER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"}]},\"LoopbackPreventionSettings\":null,\"PostProcessingRules\":null,\"StreamBufferSettings\":{\"CtrlStreamBufferSizeInMB\":5,\"StreamBufferCount\":3,\"StreamBufferSizeInMB\":8},\"TargetMetadata\":{\"BatchApplyEnabled\":false,\"FullLobMode\":false,\"InlineLobMaxSize\":0,\"LimitedSizeLobMode\":true,\"LoadMaxFileSize\":0,\"LobChunkSize\":0,\"LobMaxSize\":32,\"ParallelApplyBufferSize\":0,\"ParallelApplyQueuesPerThread\":0,\"ParallelApplyThreads\":0,\"ParallelLoadBufferSize\":0,\"ParallelLoadQueuesPerThread\":0,\"ParallelLoadThreads\":0,\"SupportLobs\":true,\"TargetSchema\":\"\",\"TaskRecoveryTableEnabled\":false},\"TTSettings\":{\"EnableTT\":false,\"TTRecordSettings\":null,\"TTS3Settings\":null}}" + source_endpoint_arn = aws_dms_endpoint.source.endpoint_arn + table_mappings = "{\"rules\":[{\"rule-type\":\"selection\",\"rule-id\":\"1\",\"rule-name\":\"testrule\",\"object-locator\":{\"schema-name\":\"%%\",\"table-name\":\"%%\"},\"rule-action\":\"include\"}]}" + + start_replication_task = true + + tags = { + Name = %[1]q + } + + target_endpoint_arn = aws_dms_endpoint.target.endpoint_arn + + depends_on = [aws_rds_cluster_instance.source, aws_rds_cluster_instance.target] +} +`, rName, port)) +} diff --git a/internal/service/dms/replication_task.go b/internal/service/dms/replication_task.go index 162a3875c16..74dee1dae40 100644 --- a/internal/service/dms/replication_task.go +++ b/internal/service/dms/replication_task.go @@ -164,7 +164,7 @@ func resourceReplicationTaskCreate(ctx context.Context, d *schema.ResourceData, } if d.Get("start_replication_task").(bool) { - if err := startReplicationTask(ctx, d.Id(), conn); err != nil { + if err := startReplicationTask(ctx, conn, d.Id()); err != nil { return sdkdiag.AppendFromErr(diags, err) } } @@ -262,7 +262,7 @@ func resourceReplicationTaskUpdate(ctx context.Context, d *schema.ResourceData, } if d.Get("start_replication_task").(bool) { - err := startReplicationTask(ctx, d.Id(), conn) + err := startReplicationTask(ctx, conn, d.Id()) if err != nil { return sdkdiag.AppendFromErr(diags, err) } @@ -273,7 +273,7 @@ func resourceReplicationTaskUpdate(ctx context.Context, d *schema.ResourceData, status := d.Get("status").(string) if d.Get("start_replication_task").(bool) { if status != replicationTaskStatusRunning { - if err := startReplicationTask(ctx, d.Id(), conn); err != nil { + if err := startReplicationTask(ctx, conn, d.Id()); err != nil { return sdkdiag.AppendFromErr(diags, err) } } @@ -352,7 +352,7 @@ func replicationTaskRemoveReadOnlySettings(settings string) (*string, error) { return &cleanedSettingsString, nil } -func startReplicationTask(ctx context.Context, id string, conn *dms.DatabaseMigrationService) error { +func startReplicationTask(ctx context.Context, conn *dms.DatabaseMigrationService, id string) error { log.Printf("[DEBUG] Starting DMS Replication Task: (%s)", id) task, err := FindReplicationTaskByID(ctx, conn, id) @@ -423,7 +423,10 @@ func FindReplicationTaskByID(ctx context.Context, conn *dms.DatabaseMigrationSer }, }, } + return FindReplicationTask(ctx, conn, input) +} +func FindReplicationTask(ctx context.Context, conn *dms.DatabaseMigrationService, input *dms.DescribeReplicationTasksInput) (*dms.ReplicationTask, error) { var results []*dms.ReplicationTask err := conn.DescribeReplicationTasksPagesWithContext(ctx, input, func(page *dms.DescribeReplicationTasksOutput, lastPage bool) bool { @@ -463,6 +466,51 @@ func FindReplicationTaskByID(ctx context.Context, conn *dms.DatabaseMigrationSer return results[0], nil } +func FindReplicationTasksByEndpointARN(ctx context.Context, conn *dms.DatabaseMigrationService, arn string) ([]*dms.ReplicationTask, error) { + input := &dms.DescribeReplicationTasksInput{ + Filters: []*dms.Filter{ + { + Name: aws.String("endpoint-arn"), + Values: []*string{aws.String(arn)}, + }, + }, + } + + var results []*dms.ReplicationTask + + err := conn.DescribeReplicationTasksPagesWithContext(ctx, input, func(page *dms.DescribeReplicationTasksOutput, lastPage bool) bool { + if page == nil { + return !lastPage + } + + for _, task := range page.ReplicationTasks { + if task == nil { + continue + } + + switch aws.StringValue(task.Status) { + case replicationTaskStatusRunning, replicationTaskStatusStarting: + results = append(results, task) + } + } + + return !lastPage + }) + + if tfawserr.ErrCodeEquals(err, dms.ErrCodeResourceNotFoundFault) { + return nil, &retry.NotFoundError{ + LastError: err, + LastRequest: input, + } + } + + if err != nil { + return nil, err + } + + return results, nil +} + func statusReplicationTask(ctx context.Context, conn *dms.DatabaseMigrationService, id string) retry.StateRefreshFunc { return func() (interface{}, string, error) { output, err := FindReplicationTaskByID(ctx, conn, id) @@ -549,12 +597,30 @@ func waitReplicationTaskRunning(ctx context.Context, conn *dms.DatabaseMigration func waitReplicationTaskStopped(ctx context.Context, conn *dms.DatabaseMigrationService, id string) error { stateConf := &retry.StateChangeConf{ - Pending: []string{replicationTaskStatusStopping, replicationTaskStatusRunning}, - Target: []string{replicationTaskStatusStopped}, - Refresh: statusReplicationTask(ctx, conn, id), - Timeout: replicationTaskRunningTimeout, - MinTimeout: 10 * time.Second, - Delay: 60 * time.Second, // Wait 30 secs before starting + Pending: []string{replicationTaskStatusStopping, replicationTaskStatusRunning}, + Target: []string{replicationTaskStatusStopped}, + Refresh: statusReplicationTask(ctx, conn, id), + Timeout: replicationTaskRunningTimeout, + MinTimeout: 10 * time.Second, + Delay: 60 * time.Second, // Wait 60 secs before starting + ContinuousTargetOccurence: 2, + } + + // Wait, catching any errors + _, err := stateConf.WaitForStateContext(ctx) + + return err +} + +func waitReplicationTaskSteady(ctx context.Context, conn *dms.DatabaseMigrationService, id string) error { + stateConf := &retry.StateChangeConf{ + Pending: []string{replicationTaskStatusCreating, replicationTaskStatusDeleting, replicationTaskStatusModifying, replicationTaskStatusStopping, replicationTaskStatusStarting}, + Target: []string{replicationTaskStatusFailed, replicationTaskStatusReady, replicationTaskStatusStopped, replicationTaskStatusRunning}, + Refresh: statusReplicationTask(ctx, conn, id), + Timeout: replicationTaskRunningTimeout, + MinTimeout: 10 * time.Second, + Delay: 60 * time.Second, // Wait 60 secs before starting + ContinuousTargetOccurence: 2, } // Wait, catching any errors diff --git a/website/docs/r/dms_endpoint.html.markdown b/website/docs/r/dms_endpoint.html.markdown index 0d66bae67e4..77903a417a4 100644 --- a/website/docs/r/dms_endpoint.html.markdown +++ b/website/docs/r/dms_endpoint.html.markdown @@ -58,6 +58,7 @@ The following arguments are optional: * `kinesis_settings` - (Optional) Configuration block for Kinesis settings. See below. * `mongodb_settings` - (Optional) Configuration block for MongoDB settings. See below. * `password` - (Optional) Password to be used to login to the endpoint database. +* `pause_replication_tasks` - (Optional) Whether to pause associated running replication tasks, regardless if they are managed by Terraform, prior to modifying the endpoint. Only tasks paused by the resource will be restarted after the modification completes. Default is `false`. * `port` - (Optional) Port used by the endpoint database. * `redshift_settings` - (Optional) Configuration block for Redshift settings. See below. * `s3_settings` - (Optional) (**Deprecated**, use the [`aws_dms_s3_endpoint`](/docs/providers/aws/r/dms_s3_endpoint.html) resource instead) Configuration block for S3 settings. See below.