Skip to content

Commit

Permalink
Merge pull request #36968 from nikhil-goenka/f-aws_msk_replicator
Browse files Browse the repository at this point in the history
f-aws_msk_replicator:Argument for Replication starting position
  • Loading branch information
ewbankkit authored Jul 15, 2024
2 parents 4b2c6fc + b34fa2a commit 5840bf8
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 32 deletions.
3 changes: 3 additions & 0 deletions .changelog/36968.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_msk_replicator: Add `starting_position` argument
```
98 changes: 73 additions & 25 deletions internal/service/kafka/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,22 @@ func resourceReplicator() *schema.Resource {
Optional: true,
Default: true,
},
"starting_position": {
Type: schema.TypeList,
Optional: true,
Computed: true,
MaxItems: 1,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
names.AttrType: {
Type: schema.TypeString,
Optional: true,
ForceNew: true,
ValidateDiagFunc: enum.Validate[types.ReplicationStartingPositionType](),
},
},
},
},
"topics_to_exclude": {
Type: schema.TypeSet,
Optional: true,
Expand Down Expand Up @@ -522,21 +538,21 @@ func flattenConsumerGroupReplication(apiObject *types.ConsumerGroupReplication)
tfMap := map[string]interface{}{}

if v := apiObject.ConsumerGroupsToReplicate; v != nil {
tfMap["consumer_groups_to_replicate"] = flex.FlattenStringValueSet(v)
tfMap["consumer_groups_to_replicate"] = v
}

if v := apiObject.ConsumerGroupsToExclude; v != nil {
tfMap["consumer_groups_to_exclude"] = flex.FlattenStringValueSet(v)
}

if aws.ToBool(apiObject.SynchroniseConsumerGroupOffsets) {
tfMap["synchronise_consumer_group_offsets"] = apiObject.SynchroniseConsumerGroupOffsets
tfMap["consumer_groups_to_exclude"] = v
}

if aws.ToBool(apiObject.DetectAndCopyNewConsumerGroups) {
tfMap["detect_and_copy_new_consumer_groups"] = apiObject.DetectAndCopyNewConsumerGroups
}

if aws.ToBool(apiObject.SynchroniseConsumerGroupOffsets) {
tfMap["synchronise_consumer_group_offsets"] = apiObject.SynchroniseConsumerGroupOffsets
}

return tfMap
}

Expand All @@ -547,24 +563,42 @@ func flattenTopicReplication(apiObject *types.TopicReplication) map[string]inter

tfMap := map[string]interface{}{}

if aws.ToBool(apiObject.CopyAccessControlListsForTopics) {
tfMap["copy_access_control_lists_for_topics"] = apiObject.CopyAccessControlListsForTopics
}

if aws.ToBool(apiObject.CopyTopicConfigurations) {
tfMap["copy_topic_configurations"] = apiObject.CopyTopicConfigurations
}

if aws.ToBool(apiObject.DetectAndCopyNewTopics) {
tfMap["detect_and_copy_new_topics"] = apiObject.DetectAndCopyNewTopics
}

if v := apiObject.StartingPosition; v != nil {
tfMap["starting_position"] = []interface{}{flattenReplicationStartingPosition(v)}
}

if v := apiObject.TopicsToReplicate; v != nil {
tfMap["topics_to_replicate"] = flex.FlattenStringValueSet(v)
tfMap["topics_to_replicate"] = v
}

if v := apiObject.TopicsToExclude; v != nil {
tfMap["topics_to_exclude"] = flex.FlattenStringValueSet(v)
tfMap["topics_to_exclude"] = v
}

if aws.ToBool(apiObject.CopyTopicConfigurations) {
tfMap["copy_topic_configurations"] = apiObject.CopyTopicConfigurations
}
return tfMap
}

if aws.ToBool(apiObject.CopyAccessControlListsForTopics) {
tfMap["copy_access_control_lists_for_topics"] = apiObject.CopyAccessControlListsForTopics
func flattenReplicationStartingPosition(apiObject *types.ReplicationStartingPosition) map[string]interface{} {
if apiObject == nil {
return nil
}

if aws.ToBool(apiObject.DetectAndCopyNewTopics) {
tfMap["detect_and_copy_new_topics"] = apiObject.DetectAndCopyNewTopics
tfMap := map[string]interface{}{}

if v := apiObject.Type; v != "" {
tfMap[names.AttrType] = v
}

return tfMap
Expand Down Expand Up @@ -606,11 +640,11 @@ func flattenKafkaClusterClientVPCConfig(apiObject *types.KafkaClusterClientVpcCo
tfMap := map[string]interface{}{}

if v := apiObject.SecurityGroupIds; v != nil {
tfMap["security_groups_ids"] = flex.FlattenStringValueSet(v)
tfMap["security_groups_ids"] = v
}

if v := apiObject.SubnetIds; v != nil {
tfMap[names.AttrSubnetIDs] = flex.FlattenStringValueSet(v)
tfMap[names.AttrSubnetIDs] = v
}

return tfMap
Expand Down Expand Up @@ -749,6 +783,22 @@ func expandConsumerGroupReplication(tfMap map[string]interface{}) *types.Consume
func expandTopicReplication(tfMap map[string]interface{}) *types.TopicReplication {
apiObject := &types.TopicReplication{}

if v, ok := tfMap["copy_access_control_lists_for_topics"].(bool); ok {
apiObject.CopyAccessControlListsForTopics = aws.Bool(v)
}

if v, ok := tfMap["copy_topic_configurations"].(bool); ok {
apiObject.CopyTopicConfigurations = aws.Bool(v)
}

if v, ok := tfMap["detect_and_copy_new_topics"].(bool); ok {
apiObject.DetectAndCopyNewTopics = aws.Bool(v)
}

if v, ok := tfMap["starting_position"].([]interface{}); ok && len(v) > 0 && v[0] != nil {
apiObject.StartingPosition = expandReplicationStartingPosition(v[0].(map[string]interface{}))
}

if v, ok := tfMap["topics_to_replicate"].(*schema.Set); ok && v.Len() > 0 {
apiObject.TopicsToReplicate = flex.ExpandStringValueSet(v)
}
Expand All @@ -757,16 +807,14 @@ func expandTopicReplication(tfMap map[string]interface{}) *types.TopicReplicatio
apiObject.TopicsToExclude = flex.ExpandStringValueSet(v)
}

if v, ok := tfMap["copy_topic_configurations"].(bool); ok {
apiObject.CopyTopicConfigurations = aws.Bool(v)
}
return apiObject
}

if v, ok := tfMap["copy_access_control_lists_for_topics"].(bool); ok {
apiObject.CopyAccessControlListsForTopics = aws.Bool(v)
}
func expandReplicationStartingPosition(tfMap map[string]interface{}) *types.ReplicationStartingPosition {
apiObject := &types.ReplicationStartingPosition{}

if v, ok := tfMap["detect_and_copy_new_topics"].(bool); ok {
apiObject.DetectAndCopyNewTopics = aws.Bool(v)
if v, ok := tfMap[names.AttrType].(string); ok {
apiObject.Type = types.ReplicationStartingPositionType(v)
}

return apiObject
Expand Down
21 changes: 15 additions & 6 deletions internal/service/kafka/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,10 @@ func TestAccKafkaReplicator_basic(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "kafka_cluster.0.vpc_config.0.security_groups_ids.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "kafka_cluster.1.vpc_config.0.subnet_ids.#", acctest.Ct3),
resource.TestCheckResourceAttr(resourceName, "kafka_cluster.1.vpc_config.0.security_groups_ids.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.consumer_group_replication.0.consumer_groups_to_replicate.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.target_compression_type", "NONE"),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.topic_replication.0.starting_position.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.topic_replication.0.topics_to_replicate.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.consumer_group_replication.0.consumer_groups_to_replicate.#", acctest.Ct1),
),
},
{
Expand All @@ -66,6 +67,7 @@ func TestAccKafkaReplicator_basic(t *testing.T) {
},
})
}

func TestAccKafkaReplicator_update(t *testing.T) {
ctx := acctest.Context(t)
if testing.Short() {
Expand Down Expand Up @@ -100,9 +102,10 @@ func TestAccKafkaReplicator_update(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "kafka_cluster.0.vpc_config.0.security_groups_ids.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "kafka_cluster.1.vpc_config.0.subnet_ids.#", acctest.Ct3),
resource.TestCheckResourceAttr(resourceName, "kafka_cluster.1.vpc_config.0.security_groups_ids.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.consumer_group_replication.0.consumer_groups_to_replicate.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.target_compression_type", "NONE"),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.topic_replication.0.starting_position.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.topic_replication.0.topics_to_replicate.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.consumer_group_replication.0.consumer_groups_to_replicate.#", acctest.Ct1),
),
},
{
Expand All @@ -122,16 +125,18 @@ func TestAccKafkaReplicator_update(t *testing.T) {
resource.TestCheckResourceAttr(resourceName, "kafka_cluster.0.vpc_config.0.security_groups_ids.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "kafka_cluster.1.vpc_config.0.subnet_ids.#", acctest.Ct3),
resource.TestCheckResourceAttr(resourceName, "kafka_cluster.1.vpc_config.0.security_groups_ids.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.consumer_group_replication.0.consumer_groups_to_replicate.#", acctest.Ct3),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.consumer_group_replication.0.consumer_groups_to_exclude.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.consumer_group_replication.0.synchronise_consumer_group_offsets", acctest.CtFalse),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.consumer_group_replication.0.detect_and_copy_new_consumer_groups", acctest.CtFalse),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.target_compression_type", "NONE"),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.topic_replication.0.starting_position.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.topic_replication.0.starting_position.0.type", "EARLIEST"),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.topic_replication.0.topics_to_replicate.#", acctest.Ct3),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.topic_replication.0.topics_to_exclude.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.topic_replication.0.copy_topic_configurations", acctest.CtFalse),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.topic_replication.0.copy_access_control_lists_for_topics", acctest.CtFalse),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.topic_replication.0.detect_and_copy_new_topics", acctest.CtFalse),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.consumer_group_replication.0.consumer_groups_to_replicate.#", acctest.Ct3),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.consumer_group_replication.0.consumer_groups_to_exclude.#", acctest.Ct1),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.consumer_group_replication.0.synchronise_consumer_group_offsets", acctest.CtFalse),
resource.TestCheckResourceAttr(resourceName, "replication_info_list.0.consumer_group_replication.0.detect_and_copy_new_consumer_groups", acctest.CtFalse),
),
},
{
Expand Down Expand Up @@ -620,6 +625,10 @@ resource "aws_msk_replicator" "test" {
copy_topic_configurations = false
topics_to_replicate = ["topic1", "topic2", "topic3"]
topics_to_exclude = ["topic-4"]
starting_position {
type = "EARLIEST"
}
}
consumer_group_replication {
Expand Down
7 changes: 6 additions & 1 deletion website/docs/r/msk_replicator.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ The following arguments are required:
* `target_kafka_cluster_arn` - (Required) The ARN of the target Kafka cluster.
* `target_compression_type` - (Required) The type of compression to use writing records to target Kafka cluster.
* `topic_replication` - (Required) Configuration relating to topic replication.
* `consumer_group_replication` - (Required) Confguration relating to consumer group replication.
* `starting_position` - (Optional) Configuration for specifying the position in the topics to start replicating from.
* `consumer_group_replication` - (Required) Configuration relating to consumer group replication.

### topic_replication Argument Reference

Expand All @@ -106,6 +107,10 @@ The following arguments are required:
* `detect_and_copy_new_consumer_groups` - (Optional) Whether to periodically check for new consumer groups.
* `synchronise_consumer_group_offsets` - (Optional) Whether to periodically write the translated offsets to __consumer_offsets topic in target cluster.

### starting_position

* `type` - (Optional) The type of replication starting position. Supports `LATEST` and `EARLIEST`.

## Attribute Reference

This resource exports the following attributes in addition to the arguments above:
Expand Down

0 comments on commit 5840bf8

Please sign in to comment.