Skip to content

Commit

Permalink
rebased with upstream/master
Browse files Browse the repository at this point in the history
  • Loading branch information
AltayAliyev committed Mar 2, 2018
1 parent d33f0ba commit 3380f00
Show file tree
Hide file tree
Showing 3 changed files with 272 additions and 113 deletions.
151 changes: 118 additions & 33 deletions aws/resource_aws_dms_endpoint.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package aws

import (
"fmt"
"log"
"strings"
"time"
Expand Down Expand Up @@ -71,6 +72,7 @@ func resourceAwsDmsEndpoint() *schema.Resource {
"redshift",
"sybase",
"sqlserver",
"s3",
}, false),
},
"extra_connection_attributes": {
Expand All @@ -79,11 +81,12 @@ func resourceAwsDmsEndpoint() *schema.Resource {
Optional: true,
},
"kms_key_arn": {
Type: schema.TypeString,
Computed: true,
Optional: true,
ForceNew: true,
ValidateFunc: validateArn,
Type: schema.TypeString,
Computed: true,
Optional: true,
ForceNew: true,
ConflictsWith: []string{"bucket_name", "bucket_folder"},
ValidateFunc: validateArn,
},
"password": {
Type: schema.TypeString,
Expand Down Expand Up @@ -117,6 +120,14 @@ func resourceAwsDmsEndpoint() *schema.Resource {
Type: schema.TypeString,
Optional: true,
},
"bucket_name": {
Type: schema.TypeString,
Optional: true,
},
"bucket_folder": {
Type: schema.TypeString,
Optional: true,
},
},
}
}
Expand All @@ -131,12 +142,42 @@ func resourceAwsDmsEndpointCreate(d *schema.ResourceData, meta interface{}) erro
Tags: dmsTagsFromMap(d.Get("tags").(map[string]interface{})),
}

switch d.Get("engine_name").(string) {
// if dynamodb then add required params
if d.Get("engine_name").(string) == "dynamodb" {
case "dynamodb":
request.DynamoDbSettings = &dms.DynamoDbSettings{
ServiceAccessRoleArn: aws.String(d.Get("service_access_role").(string)),
}
} else {
case "s3":
request.S3Settings = &dms.S3Settings{
BucketName: aws.String(d.Get("bucket_name").(string)),
BucketFolder: aws.String(d.Get("bucket_folder").(string)),
ServiceAccessRoleArn: aws.String(d.Get("service_access_role").(string)),

// By default extra variables (should be set):
CompressionType: aws.String("GZIP"),
CsvDelimiter: aws.String(","),
CsvRowDelimiter: aws.String("\\n"),
}

// if extra_connection_attributes is set. Then parse the varaiables.
if v, ok := d.GetOk("extra_connection_attributes"); ok {
elems := strings.Split(v.(string), ";")
if len(elems) > 0 {
for _, elem := range elems {
vals := strings.Split(elem, "=")
if strings.Contains(strings.ToLower(vals[0]), "compressiontype") {
request.S3Settings.CompressionType = aws.String(vals[1])
} else if strings.Contains(strings.ToLower(vals[0]), "csvdelimiter") {
request.S3Settings.CsvDelimiter = aws.String(vals[1])
} else if strings.Contains(strings.ToLower(vals[0]), "csvrowdelimiter") {
request.S3Settings.CsvRowDelimiter = aws.String(vals[1])
}
}
}
}

default:
request.Password = aws.String(d.Get("password").(string))
request.Port = aws.Int64(int64(d.Get("port").(int)))
request.ServerName = aws.String(d.Get("server_name").(string))
Expand All @@ -148,17 +189,17 @@ func resourceAwsDmsEndpointCreate(d *schema.ResourceData, meta interface{}) erro
if v, ok := d.GetOk("extra_connection_attributes"); ok {
request.ExtraConnectionAttributes = aws.String(v.(string))
}
if v, ok := d.GetOk("kms_key_arn"); ok {
request.KmsKeyId = aws.String(v.(string))
}
if v, ok := d.GetOk("ssl_mode"); ok {
request.SslMode = aws.String(v.(string))
}
}

if v, ok := d.GetOk("certificate_arn"); ok {
request.CertificateArn = aws.String(v.(string))
}
if v, ok := d.GetOk("kms_key_arn"); ok {
request.KmsKeyId = aws.String(v.(string))
}
if v, ok := d.GetOk("ssl_mode"); ok {
request.SslMode = aws.String(v.(string))
}

log.Println("[DEBUG] DMS create endpoint:", request)

Expand Down Expand Up @@ -228,6 +269,11 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro
}
hasChanges := false

if d.HasChange("endpoint_type") {
request.EndpointType = aws.String(d.Get("endpoint_type").(string))
hasChanges = true
}

if d.HasChange("certificate_arn") {
request.CertificateArn = aws.String(d.Get("certificate_arn").(string))
hasChanges = true
Expand All @@ -238,26 +284,50 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro
hasChanges = true
}

if d.HasChange("service_access_role") {
request.DynamoDbSettings = &dms.DynamoDbSettings{
ServiceAccessRoleArn: aws.String(d.Get("service_access_role").(string)),
}
hasChanges = true
}

if d.HasChange("endpoint_type") {
request.EndpointType = aws.String(d.Get("endpoint_type").(string))
hasChanges = true
}

if d.HasChange("engine_name") {
request.EngineName = aws.String(d.Get("engine_name").(string))
hasChanges = true
}

if d.HasChange("extra_connection_attributes") {
request.ExtraConnectionAttributes = aws.String(d.Get("extra_connection_attributes").(string))
hasChanges = true
switch d.Get("engine_name").(string) {
case "dynamodb":
if d.HasChange("service_access_role") {
request.DynamoDbSettings = &dms.DynamoDbSettings{
ServiceAccessRoleArn: aws.String(d.Get("service_access_role").(string)),
}
hasChanges = true
}
case "s3":
if d.HasChange("service_access_role") || d.HasChange("bucket_name") || d.HasChange("bucket_folder") || d.HasChange("extra_connection_attributes") {
request.S3Settings = &dms.S3Settings{
ServiceAccessRoleArn: aws.String(d.Get("service_access_role").(string)),
BucketFolder: aws.String(d.Get("bucket_folder").(string)),
BucketName: aws.String(d.Get("bucket_name").(string)),
}

elems := strings.Split(d.Get("extra_connection_attributes").(string), ";")
if len(elems) > 0 {
for _, elem := range elems {
vals := strings.Split(elem, "=")
if strings.Contains(strings.ToLower(vals[0]), "compressiontype") {
request.S3Settings.CompressionType = aws.String(vals[1])
} else if strings.Contains(strings.ToLower(vals[0]), "csvdelimiter") {
request.S3Settings.CsvDelimiter = aws.String(vals[1])
} else if strings.Contains(strings.ToLower(vals[0]), "csvrowdelimiter") {
request.S3Settings.CsvRowDelimiter = aws.String(vals[1])
}
}
}

hasChanges = true

goto DONE
}
default:
if d.HasChange("extra_connection_attributes") {
request.ExtraConnectionAttributes = aws.String(d.Get("extra_connection_attributes").(string))
hasChanges = true
}
}

if d.HasChange("password") {
Expand Down Expand Up @@ -292,6 +362,7 @@ func resourceAwsDmsEndpointUpdate(d *schema.ResourceData, meta interface{}) erro
}
}

DONE:
if hasChanges {
log.Println("[DEBUG] DMS update endpoint:", request)

Expand Down Expand Up @@ -333,22 +404,36 @@ func resourceAwsDmsEndpointSetState(d *schema.ResourceData, endpoint *dms.Endpoi
d.Set("endpoint_type", strings.ToLower(*endpoint.EndpointType))
d.Set("engine_name", endpoint.EngineName)

if *endpoint.EngineName == "dynamodb" {
switch *endpoint.EngineName {
case "dynamodb":
if endpoint.DynamoDbSettings != nil {
d.Set("service_access_role", endpoint.DynamoDbSettings.ServiceAccessRoleArn)
} else {
d.Set("service_access_role", "")
}
} else {
case "s3":
if endpoint.S3Settings != nil {
d.Set("service_access_role", endpoint.S3Settings.ServiceAccessRoleArn)
d.Set("bucket_folder", endpoint.S3Settings.BucketFolder)
d.Set("bucket_name", endpoint.S3Settings.BucketName)
d.Set("extra_connection_attributes",
aws.String(fmt.Sprintf("compressionType=%s;csvDelimiter=%s;csvRowDelimiter=%s",
*endpoint.S3Settings.CompressionType, *endpoint.S3Settings.CsvDelimiter, *endpoint.S3Settings.CsvRowDelimiter)))
} else {
d.Set("service_access_role", "")
d.Set("bucket_folder", "")
d.Set("bucket_name", "")
d.Set("extra_connection_attributes", "")
}
default:
d.Set("database_name", endpoint.DatabaseName)
d.Set("extra_connection_attributes", endpoint.ExtraConnectionAttributes)
d.Set("port", endpoint.Port)
d.Set("server_name", endpoint.ServerName)
d.Set("username", endpoint.Username)
d.Set("kms_key_arn", endpoint.KmsKeyId)
d.Set("ssl_mode", endpoint.SslMode)
}

d.Set("kms_key_arn", endpoint.KmsKeyId)
d.Set("ssl_mode", endpoint.SslMode)

return nil
}
Loading

0 comments on commit 3380f00

Please sign in to comment.