Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DMS - Replication task - Allow date usage on cdc_start_time parameter #31917

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/31917.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_dms_replication_task: allow cdc_start_time parameter to use RFC3339 formatted date additionally to a UNIX timestamp.
```
29 changes: 20 additions & 9 deletions internal/service/dms/replication_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func ResourceReplicationTask() *schema.Resource {
ConflictsWith: []string{"cdc_start_time"},
},
"cdc_start_time": {
Type: schema.TypeString,
Optional: true,
// Requires a Unix timestamp in seconds. Example 1484346880
Type: schema.TypeString,
Optional: true,
ValidateFunc: verify.ValidStringDateOrPositiveInt,
ConflictsWith: []string{"cdc_start_position"},
},
"migration_type": {
Expand Down Expand Up @@ -135,11 +135,19 @@ func resourceReplicationTaskCreate(ctx context.Context, d *schema.ResourceData,
}

if v, ok := d.GetOk("cdc_start_time"); ok {
seconds, err := strconv.ParseInt(v.(string), 10, 64)
// Check if input is RFC3339 date string or UNIX timestamp.
dateTime, err := time.Parse(time.RFC3339, v.(string))

if err != nil {
return sdkdiag.AppendErrorf(diags, "DMS create replication task. Invalid CDC Unix timestamp: %s", err)
// Not a valid RF3339 date, checking if this is a UNIX timestamp.
seconds, err := strconv.ParseInt(v.(string), 10, 64)
if err != nil {
return sdkdiag.AppendErrorf(diags, "DMS create replication task. Invalid Unix timestamp given for cdc_start_time parameter: %s", err)
}
request.CdcStartTime = aws.Time(time.Unix(seconds, 0))
} else {
request.CdcStartTime = aws.Time(dateTime)
}
request.CdcStartTime = aws.Time(time.Unix(seconds, 0))
}

if v, ok := d.GetOk("replication_task_settings"); ok {
Expand Down Expand Up @@ -224,11 +232,14 @@ func resourceReplicationTaskUpdate(ctx context.Context, d *schema.ResourceData,
}

if d.HasChange("cdc_start_time") {
seconds, err := strconv.ParseInt(d.Get("cdc_start_time").(string), 10, 64)
// Parse the RFC3339 date string into a time.Time object
dateTime, err := time.Parse(time.RFC3339, d.Get("cdc_start_time").(string))

if err != nil {
return sdkdiag.AppendErrorf(diags, "DMS update replication task. Invalid CRC Unix timestamp: %s", err)
return sdkdiag.AppendErrorf(diags, "DMS update replication task. Invalid cdc_start_time value: %s", err)
}
input.CdcStartTime = aws.Time(time.Unix(seconds, 0))

input.CdcStartTime = aws.Time(dateTime)
}

if d.HasChange("replication_task_settings") {
Expand Down
89 changes: 86 additions & 3 deletions internal/service/dms/replication_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@ package dms_test
import (
"context"
"fmt"
"regexp"
"testing"

dms "github.com/aws/aws-sdk-go/service/databasemigrationservice"
sdkacctest "github.com/hashicorp/terraform-plugin-testing/helper/acctest"
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
Expand All @@ -14,6 +11,11 @@ import (
"github.com/hashicorp/terraform-provider-aws/internal/conns"
tfdms "github.com/hashicorp/terraform-provider-aws/internal/service/dms"
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
"regexp"
"strconv"
"strings"
"testing"
"time"
)

func TestAccDMSReplicationTask_basic(t *testing.T) {
Expand Down Expand Up @@ -179,6 +181,70 @@ func TestAccDMSReplicationTask_cdcStartPosition(t *testing.T) {
})
}

func TestAccDMSReplicationTask_cdcStartTime_rfc3339_date(t *testing.T) {
ctx := acctest.Context(t)
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_dms_replication_task.test"

currentTime := time.Now().UTC()
rfc3339Time := currentTime.Format(time.RFC3339)
awsDmsExpectedOutput := strings.TrimRight(rfc3339Time, "Z") // AWS API drop "Z" part.

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t) },
ErrorCheck: acctest.ErrorCheck(t, dms.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckReplicationTaskDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccReplicationTaskConfig_cdcStartTime(rName, rfc3339Time),
Check: resource.ComposeTestCheckFunc(
testAccCheckReplicationTaskExists(ctx, resourceName),
resource.TestCheckResourceAttr(resourceName, "cdc_start_position", awsDmsExpectedOutput),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerifyIgnore: []string{"start_replication_task"},
},
},
})
}

func TestAccDMSReplicationTask_cdcStartTime_unix_timestamp(t *testing.T) {
ctx := acctest.Context(t)
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_dms_replication_task.test"

currentTime := time.Now().UTC()
rfc3339Time := currentTime.Format(time.RFC3339)
awsDmsExpectedOutput := strings.TrimRight(rfc3339Time, "Z") // AWS API drop "Z" part.
dateTime, _ := time.Parse(time.RFC3339, rfc3339Time)
unixDateTime := strconv.Itoa(int(dateTime.Unix()))

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t) },
ErrorCheck: acctest.ErrorCheck(t, dms.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckReplicationTaskDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccReplicationTaskConfig_cdcStartTime(rName, unixDateTime),
Check: resource.ComposeTestCheckFunc(
testAccCheckReplicationTaskExists(ctx, resourceName),
resource.TestCheckResourceAttr(resourceName, "cdc_start_position", awsDmsExpectedOutput),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerifyIgnore: []string{"start_replication_task"},
},
},
})
}

func TestAccDMSReplicationTask_startReplicationTask(t *testing.T) {
ctx := acctest.Context(t)
if testing.Short() {
Expand Down Expand Up @@ -462,6 +528,23 @@ resource "aws_dms_replication_task" "test" {
`, cdcStartPosition, rName))
}

func testAccReplicationTaskConfig_cdcStartTime(rName, cdcStartPosition string) string {
return acctest.ConfigCompose(
replicationTaskConfigBase(rName),
fmt.Sprintf(`
resource "aws_dms_replication_task" "test" {
cdc_start_time = %[1]q
migration_type = "cdc"
replication_instance_arn = aws_dms_replication_instance.test.replication_instance_arn
replication_task_id = %[2]q
replication_task_settings = "{\"BeforeImageSettings\":null,\"FailTaskWhenCleanTaskResourceFailed\":false,\"ChangeProcessingDdlHandlingPolicy\":{\"HandleSourceTableAltered\":true,\"HandleSourceTableDropped\":true,\"HandleSourceTableTruncated\":true},\"ChangeProcessingTuning\":{\"BatchApplyMemoryLimit\":500,\"BatchApplyPreserveTransaction\":true,\"BatchApplyTimeoutMax\":30,\"BatchApplyTimeoutMin\":1,\"BatchSplitSize\":0,\"CommitTimeout\":1,\"MemoryKeepTime\":60,\"MemoryLimitTotal\":1024,\"MinTransactionSize\":1000,\"StatementCacheSize\":50},\"CharacterSetSettings\":null,\"ControlTablesSettings\":{\"ControlSchema\":\"\",\"FullLoadExceptionTableEnabled\":false,\"HistoryTableEnabled\":false,\"HistoryTimeslotInMinutes\":5,\"StatusTableEnabled\":false,\"SuspendedTablesTableEnabled\":false},\"ErrorBehavior\":{\"ApplyErrorDeletePolicy\":\"IGNORE_RECORD\",\"ApplyErrorEscalationCount\":0,\"ApplyErrorEscalationPolicy\":\"LOG_ERROR\",\"ApplyErrorFailOnTruncationDdl\":false,\"ApplyErrorInsertPolicy\":\"LOG_ERROR\",\"ApplyErrorUpdatePolicy\":\"LOG_ERROR\",\"DataErrorEscalationCount\":0,\"DataErrorEscalationPolicy\":\"SUSPEND_TABLE\",\"DataErrorPolicy\":\"LOG_ERROR\",\"DataTruncationErrorPolicy\":\"LOG_ERROR\",\"EventErrorPolicy\":\"IGNORE\",\"FailOnNoTablesCaptured\":false,\"FailOnTransactionConsistencyBreached\":false,\"FullLoadIgnoreConflicts\":true,\"RecoverableErrorCount\":-1,\"RecoverableErrorInterval\":5,\"RecoverableErrorStopRetryAfterThrottlingMax\":false,\"RecoverableErrorThrottling\":true,\"RecoverableErrorThrottlingMax\":1800,\"TableErrorEscalationCount\":0,\"TableErrorEscalationPolicy\":\"STOP_TASK\",\"TableErrorPolicy\":\"SUSPEND_TABLE\"},\"FullLoadSettings\":{\"CommitRate\":10000,\"CreatePkAfterFullLoad\":false,\"MaxFullLoadSubTasks\":8,\"StopTaskCachedChangesApplied\":false,\"StopTaskCachedChangesNotApplied\":false,\"TargetTablePrepMode\":\"DROP_AND_CREATE\",\"TransactionConsistencyTimeout\":600},\"Logging\":{\"EnableLogging\":false,\"LogComponents\":[{\"Id\":\"TRANSFORMATION\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SOURCE_UNLOAD\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"IO\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TARGET_LOAD\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"PERFORMANCE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SOURCE_CAPTURE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"SORTER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"REST_SERVER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"VALIDATOR_EXT\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TARGET_APPLY\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TASK_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"TABLES_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"METADATA_MANAGER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"FILE_FACTORY\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"COMMON\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"ADDONS\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"DATA_STRUCTURE\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"COMMUNICATION\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"},{\"Id\":\"FILE_TRANSFER\",\"Severity\":\"LOGGER_SEVERITY_DEFAULT\"}]},\"LoopbackPreventionSettings\":null,\"PostProcessingRules\":null,\"StreamBufferSettings\":{\"CtrlStreamBufferSizeInMB\":5,\"StreamBufferCount\":3,\"StreamBufferSizeInMB\":8},\"TargetMetadata\":{\"BatchApplyEnabled\":false,\"FullLobMode\":false,\"InlineLobMaxSize\":0,\"LimitedSizeLobMode\":true,\"LoadMaxFileSize\":0,\"LobChunkSize\":0,\"LobMaxSize\":32,\"ParallelApplyBufferSize\":0,\"ParallelApplyQueuesPerThread\":0,\"ParallelApplyThreads\":0,\"ParallelLoadBufferSize\":0,\"ParallelLoadQueuesPerThread\":0,\"ParallelLoadThreads\":0,\"SupportLobs\":true,\"TargetSchema\":\"\",\"TaskRecoveryTableEnabled\":false},\"TTSettings\":{\"EnableTT\":false,\"TTRecordSettings\":null,\"TTS3Settings\":null}}"
source_endpoint_arn = aws_dms_endpoint.source.endpoint_arn
table_mappings = "{\"rules\":[{\"rule-type\":\"selection\",\"rule-id\":\"1\",\"rule-name\":\"1\",\"object-locator\":{\"schema-name\":\"%%\",\"table-name\":\"%%\"},\"rule-action\":\"include\"}]}"
target_endpoint_arn = aws_dms_endpoint.target.endpoint_arn
}
`, cdcStartPosition, rName))
}

func testAccReplicationTaskConfig_start(rName string, startTask bool, ruleName string) string {
return acctest.ConfigCompose(
acctest.ConfigAvailableAZsNoOptIn(),
Expand Down
10 changes: 7 additions & 3 deletions website/docs/r/dms_replication_task.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ Provides a DMS (Data Migration Service) replication task resource. DMS replicati
```terraform
# Create a new replication task
resource "aws_dms_replication_task" "test" {
cdc_start_time = 1484346880
cdc_start_time = "1993-05-21T05:50:00Z"
migration_type = "full-load"
replication_instance_arn = aws_dms_replication_instance.test-dms-replication-instance-tf.replication_instance_arn
replication_task_id = "test-dms-replication-task-tf"
Expand All @@ -37,8 +37,12 @@ resource "aws_dms_replication_task" "test" {

The following arguments are supported:

* `cdc_start_position` - (Optional, Conflicts with `cdc_start_time`) Indicates when you want a change data capture (CDC) operation to start. The value can be in date, checkpoint, or LSN/SCN format depending on the source engine. For more information, see [Determining a CDC native start point](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Task.CDC.html#CHAP_Task.CDC.StartPoint.Native).
* `cdc_start_time` - (Optional, Conflicts with `cdc_start_position`) The Unix timestamp integer for the start of the Change Data Capture (CDC) operation.
* `cdc_start_position` - (Optional, Conflicts with `cdc_start_time`) Indicates when you want a change data capture (CDC)
operation to start. The value can be a RFC3339 formatted date, a checkpoint, or a LSN/SCN format depending on the
source engine. For more information,
see [Determining a CDC native start point](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Task.CDC.html#CHAP_Task.CDC.StartPoint.Native).
* `cdc_start_time` - (Optional, Conflicts with `cdc_start_position`) RFC3339 formatted date string or UNIX timestamp for
the start of the Change Data Capture (CDC) operation.
* `migration_type` - (Required) The migration type. Can be one of `full-load | cdc | full-load-and-cdc`.
* `replication_instance_arn` - (Required) The Amazon Resource Name (ARN) of the replication instance.
* `replication_task_id` - (Required) The replication task identifier.
Expand Down