Skip to content

Commit

Permalink
Merge pull request #32848 from nrajb/f-msk-cluster-policy
Browse files Browse the repository at this point in the history
Adding new resource aws_msk_cluster_policy
  • Loading branch information
ewbankkit authored Aug 11, 2023
2 parents 0865a74 + 1eccae0 commit 1bedef1
Show file tree
Hide file tree
Showing 5 changed files with 403 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .changelog/32848.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:new-resource
aws_msk_cluster_policy
```
164 changes: 164 additions & 0 deletions internal/service/kafka/cluster_policy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package kafka

import (
"context"
"log"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/kafka"
"github.com/aws/aws-sdk-go-v2/service/kafka/types"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/structure"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation"
"github.com/hashicorp/terraform-provider-aws/internal/conns"
"github.com/hashicorp/terraform-provider-aws/internal/errs"
"github.com/hashicorp/terraform-provider-aws/internal/errs/sdkdiag"
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
"github.com/hashicorp/terraform-provider-aws/internal/verify"
)

// @SDKResource("aws_msk_cluster_policy", name="Cluster Policy")
func ResourceClusterPolicy() *schema.Resource {
return &schema.Resource{
CreateWithoutTimeout: resourceClusterPolicyPut,
ReadWithoutTimeout: resourceClusterPolicyRead,
UpdateWithoutTimeout: resourceClusterPolicyPut,
DeleteWithoutTimeout: resourceClusterPolicyDelete,

Importer: &schema.ResourceImporter{
StateContext: schema.ImportStatePassthroughContext,
},

Schema: map[string]*schema.Schema{
"cluster_arn": {
Type: schema.TypeString,
Required: true,
},
"current_version": {
Type: schema.TypeString,
Computed: true,
},
"policy": {
Type: schema.TypeString,
Required: true,
ValidateFunc: validation.StringIsJSON,
DiffSuppressFunc: verify.SuppressEquivalentPolicyDiffs,
DiffSuppressOnRefresh: true,
StateFunc: func(v interface{}) string {
json, _ := structure.NormalizeJsonString(v)
return json
},
},
},
}
}

func resourceClusterPolicyPut(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
var diags diag.Diagnostics
conn := meta.(*conns.AWSClient).KafkaClient(ctx)

policy, err := structure.NormalizeJsonString(d.Get("policy").(string))
if err != nil {
return sdkdiag.AppendFromErr(diags, err)
}

clusterARN := d.Get("cluster_arn").(string)
in := &kafka.PutClusterPolicyInput{
ClusterArn: aws.String(clusterARN),
Policy: aws.String(policy),
}

_, err = conn.PutClusterPolicy(ctx, in)

if err != nil {
return sdkdiag.AppendErrorf(diags, "setting MSK Cluster Policy (%s): %s", clusterARN, err)
}

if d.IsNewResource() {
d.SetId(clusterARN)
}

return append(diags, resourceClusterPolicyRead(ctx, d, meta)...)
}

func resourceClusterPolicyRead(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
var diags diag.Diagnostics
conn := meta.(*conns.AWSClient).KafkaClient(ctx)

output, err := FindClusterPolicyByARN(ctx, conn, d.Id())

if !d.IsNewResource() && tfresource.NotFound(err) {
log.Printf("[WARN] MSK Cluster Policy (%s) not found, removing from state", d.Id())
d.SetId("")
return diags
}

if err != nil {
return sdkdiag.AppendErrorf(diags, "reading MSK Cluster Policy (%s): %s", d.Id(), err)
}

d.Set("cluster_arn", d.Id())
d.Set("current_version", output.CurrentVersion)
if output.Policy != nil {
policyToSet, err := verify.PolicyToSet(d.Get("policy").(string), aws.ToString(output.Policy))
if err != nil {
return sdkdiag.AppendFromErr(diags, err)
}

d.Set("policy", policyToSet)
} else {
d.Set("policy", nil)
}

return diags
}

func resourceClusterPolicyDelete(ctx context.Context, d *schema.ResourceData, meta interface{}) diag.Diagnostics {
var diags diag.Diagnostics
conn := meta.(*conns.AWSClient).KafkaClient(ctx)

log.Printf("[INFO] Deleting MSK Cluster Policy: %s", d.Id())
_, err := conn.DeleteClusterPolicy(ctx, &kafka.DeleteClusterPolicyInput{
ClusterArn: aws.String(d.Id()),
})

if errs.IsA[*types.NotFoundException](err) {
return diags
}

if err != nil {
return sdkdiag.AppendErrorf(diags, "deleting MSK Cluster Policy (%s): %s", d.Id(), err)
}

return diags
}

func FindClusterPolicyByARN(ctx context.Context, conn *kafka.Client, id string) (*kafka.GetClusterPolicyOutput, error) {
in := &kafka.GetClusterPolicyInput{
ClusterArn: aws.String(id),
}

out, err := conn.GetClusterPolicy(ctx, in)

if errs.IsA[*types.NotFoundException](err) {
return nil, &retry.NotFoundError{
LastError: err,
LastRequest: in,
}
}

if err != nil {
return nil, err
}

if out == nil {
return nil, tfresource.NewEmptyResultError(in)
}

return out, nil
}
160 changes: 160 additions & 0 deletions internal/service/kafka/cluster_policy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package kafka_test

import (
"context"
"fmt"
"regexp"
"testing"

"github.com/aws/aws-sdk-go-v2/service/kafka"
sdkacctest "github.com/hashicorp/terraform-plugin-testing/helper/acctest"
"github.com/hashicorp/terraform-plugin-testing/helper/resource"
"github.com/hashicorp/terraform-plugin-testing/terraform"
"github.com/hashicorp/terraform-provider-aws/internal/acctest"
"github.com/hashicorp/terraform-provider-aws/internal/conns"
tfkafka "github.com/hashicorp/terraform-provider-aws/internal/service/kafka"
"github.com/hashicorp/terraform-provider-aws/internal/tfresource"
"github.com/hashicorp/terraform-provider-aws/names"
)

func TestAccKafkaClusterPolicy_basic(t *testing.T) {
ctx := acctest.Context(t)
var clusterpolicy kafka.GetClusterPolicyOutput
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_msk_cluster_policy.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() {
acctest.PreCheck(ctx, t)
acctest.PreCheckPartitionHasService(t, names.Kafka)
testAccPreCheck(ctx, t)
},
ErrorCheck: acctest.ErrorCheck(t, names.Kafka),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckClusterPolicyDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccClusterPolicyConfig_basic(rName),
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckClusterPolicyExists(ctx, resourceName, &clusterpolicy),
resource.TestCheckResourceAttrSet(resourceName, "current_version"),
resource.TestMatchResourceAttr(resourceName, "policy", regexp.MustCompile(`"kafka:CreateVpcConnection","kafka:GetBootstrapBrokers"`)),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
},
},
})
}

func TestAccKafkaClusterPolicy_disappears(t *testing.T) {
ctx := acctest.Context(t)
var clusterpolicy kafka.GetClusterPolicyOutput
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_msk_cluster_policy.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() {
acctest.PreCheck(ctx, t)
acctest.PreCheckPartitionHasService(t, names.Kafka)
testAccPreCheck(ctx, t)
},
ErrorCheck: acctest.ErrorCheck(t, names.Kafka),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckClusterPolicyDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccClusterPolicyConfig_basic(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckClusterPolicyExists(ctx, resourceName, &clusterpolicy),
acctest.CheckResourceDisappears(ctx, acctest.Provider, tfkafka.ResourceClusterPolicy(), resourceName),
),
ExpectNonEmptyPlan: true,
},
},
})
}

func testAccCheckClusterPolicyDestroy(ctx context.Context) resource.TestCheckFunc {
return func(s *terraform.State) error {
conn := acctest.Provider.Meta().(*conns.AWSClient).KafkaClient(ctx)

for _, rs := range s.RootModule().Resources {
if rs.Type != "aws_msk_cluster_policy" {
continue
}

_, err := tfkafka.FindClusterPolicyByARN(ctx, conn, rs.Primary.ID)

if tfresource.NotFound(err) {
continue
}

if err != nil {
return err
}

return fmt.Errorf("MSK Cluster Policy %s still exists", rs.Primary.ID)
}

return nil
}
}

func testAccCheckClusterPolicyExists(ctx context.Context, n string, v *kafka.GetClusterPolicyOutput) resource.TestCheckFunc {
return func(s *terraform.State) error {
rs, ok := s.RootModule().Resources[n]
if !ok {
return fmt.Errorf("Not found: %s", n)
}

conn := acctest.Provider.Meta().(*conns.AWSClient).KafkaClient(ctx)

output, err := tfkafka.FindClusterPolicyByARN(ctx, conn, rs.Primary.ID)

if err != nil {
return err
}

*v = *output

return nil
}
}

func testAccClusterPolicyConfig_basic(rName string) string {
return acctest.ConfigCompose(testAccVPCConnectionConfig_basic(rName), `
data "aws_caller_identity" "current" {}
data "aws_partition" "current" {}
resource "aws_msk_cluster_policy" "test" {
cluster_arn = aws_msk_cluster.test.arn
policy = jsonencode({
Version = "2012-10-17",
Statement = [{
Sid = "testMskClusterPolicy"
Effect = "Allow"
Principal = {
"AWS" = "arn:${data.aws_partition.current.partition}:iam::${data.aws_caller_identity.current.account_id}:root"
}
Action = [
"kafka:Describe*",
"kafka:Get*",
"kafka:CreateVpcConnection",
"kafka:GetBootstrapBrokers",
]
Resource = aws_msk_cluster.test.arn
}]
})
depends_on = [aws_msk_vpc_connection.test]
}
`)
}
5 changes: 5 additions & 0 deletions internal/service/kafka/service_package_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 1bedef1

Please sign in to comment.