Skip to content

Commit

Permalink
Merge pull request #32882 from DrFaust92/eventbridge-target-sagemaker
Browse files Browse the repository at this point in the history
cloudwatch_events_target - add sagemaker pipeline support
  • Loading branch information
ewbankkit authored Aug 7, 2023
2 parents d820b0f + 8a5e7ae commit 7a7b8df
Show file tree
Hide file tree
Showing 4 changed files with 263 additions and 10 deletions.
3 changes: 3 additions & 0 deletions .changelog/32882.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:enhancement
resource/aws_cloudwatch_events_target: Add `sagemaker_pipeline_target` argument
```
101 changes: 101 additions & 0 deletions internal/service/events/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,33 @@ func ResourceTarget() *schema.Resource {
},
},
},
"sagemaker_pipeline_target": {
Type: schema.TypeList,
Optional: true,
MaxItems: 1,
DiffSuppressFunc: verify.SuppressMissingOptionalConfigurationBlock,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"pipeline_parameter_list": {
Type: schema.TypeSet,
Optional: true,
MaxItems: 200,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"name": {
Type: schema.TypeString,
Required: true,
},
"value": {
Type: schema.TypeString,
Required: true,
},
},
},
},
},
},
},
"sqs_target": {
Type: schema.TypeList,
Optional: true,
Expand Down Expand Up @@ -540,6 +567,12 @@ func resourceTargetRead(ctx context.Context, d *schema.ResourceData, meta interf
}
}

if t.SageMakerPipelineParameters != nil {
if err := d.Set("sagemaker_pipeline_target", flattenTargetSageMakerPipelineParameters(t.SageMakerPipelineParameters)); err != nil {
return diag.Errorf("setting sagemaker_pipeline_parameters: %s", err)
}
}

if t.SqsParameters != nil {
if err := d.Set("sqs_target", flattenTargetSQSParameters(t.SqsParameters)); err != nil {
return diag.Errorf("setting sqs_target: %s", err)
Expand Down Expand Up @@ -701,6 +734,10 @@ func buildPutTargetInputStruct(ctx context.Context, d *schema.ResourceData) *eve
e.SqsParameters = expandTargetSQSParameters(v.([]interface{}))
}

if v, ok := d.GetOk("sagemaker_pipeline_target"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
e.SageMakerPipelineParameters = expandTargetSageMakerPipelineParameters(v.([]interface{}))
}

if v, ok := d.GetOk("input_transformer"); ok && len(v.([]interface{})) > 0 && v.([]interface{})[0] != nil {
e.InputTransformer = expandTransformerParameters(v.([]interface{}))
}
Expand Down Expand Up @@ -918,6 +955,48 @@ func expandTargetSQSParameters(config []interface{}) *eventbridge.SqsParameters
return sqsParameters
}

func expandTargetSageMakerPipelineParameterList(tfList []interface{}) []*eventbridge.SageMakerPipelineParameter {
if len(tfList) == 0 {
return nil
}

var result []*eventbridge.SageMakerPipelineParameter

for _, tfMapRaw := range tfList {
if tfMapRaw == nil {
continue
}

tfMap := tfMapRaw.(map[string]interface{})

apiObject := &eventbridge.SageMakerPipelineParameter{}

if v, ok := tfMap["name"].(string); ok && v != "" {
apiObject.Name = aws.String(v)
}

if v, ok := tfMap["value"].(string); ok && v != "" {
apiObject.Value = aws.String(v)
}

result = append(result, apiObject)
}

return result
}

func expandTargetSageMakerPipelineParameters(config []interface{}) *eventbridge.SageMakerPipelineParameters {
sageMakerPipelineParameters := &eventbridge.SageMakerPipelineParameters{}
for _, c := range config {
param := c.(map[string]interface{})
if v, ok := param["pipeline_parameter_list"].(*schema.Set); ok && v.Len() > 0 {
sageMakerPipelineParameters.PipelineParameterList = expandTargetSageMakerPipelineParameterList(v.List())
}
}

return sageMakerPipelineParameters
}

func expandTargetHTTPParameters(tfMap map[string]interface{}) *eventbridge.HttpParameters {
if tfMap == nil {
return nil
Expand Down Expand Up @@ -1069,6 +1148,28 @@ func flattenTargetKinesisParameters(kinesisParameters *eventbridge.KinesisParame
return result
}

func flattenTargetSageMakerPipelineParameters(sageMakerParameters *eventbridge.SageMakerPipelineParameters) []map[string]interface{} {
config := make(map[string]interface{})
config["pipeline_parameter_list"] = flattenTargetSageMakerPipelineParameter(sageMakerParameters.PipelineParameterList)
result := []map[string]interface{}{config}
return result
}

func flattenTargetSageMakerPipelineParameter(pcs []*eventbridge.SageMakerPipelineParameter) []map[string]interface{} {
if len(pcs) == 0 {
return nil
}
results := make([]map[string]interface{}, 0)
for _, pc := range pcs {
c := make(map[string]interface{})
c["name"] = aws.StringValue(pc.Name)
c["value"] = aws.StringValue(pc.Value)

results = append(results, c)
}
return results
}

func flattenTargetSQSParameters(sqsParameters *eventbridge.SqsParameters) []map[string]interface{} {
config := make(map[string]interface{})
config["message_group_id"] = aws.StringValue(sqsParameters.MessageGroupId)
Expand Down
159 changes: 149 additions & 10 deletions internal/service/events/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,23 +38,26 @@ func TestAccEventsTarget_basic(t *testing.T) {
Steps: []resource.TestStep{
{
Config: testAccTargetConfig_basic(rName),
Check: resource.ComposeTestCheckFunc(
Check: resource.ComposeAggregateTestCheckFunc(
testAccCheckTargetExists(ctx, resourceName, &v),
resource.TestCheckResourceAttr(resourceName, "rule", rName),
resource.TestCheckResourceAttr(resourceName, "event_bus_name", "default"),
resource.TestCheckResourceAttr(resourceName, "target_id", rName),
resource.TestCheckResourceAttrPair(resourceName, "arn", snsTopicResourceName, "arn"),

resource.TestCheckResourceAttr(resourceName, "input", ""),
resource.TestCheckResourceAttr(resourceName, "input_path", ""),
resource.TestCheckResourceAttr(resourceName, "role_arn", ""),
resource.TestCheckResourceAttr(resourceName, "run_command_targets.#", "0"),
resource.TestCheckResourceAttr(resourceName, "batch_target.#", "0"),
resource.TestCheckResourceAttr(resourceName, "dead_letter_config.#", "0"),
resource.TestCheckResourceAttr(resourceName, "ecs_target.#", "0"),
resource.TestCheckResourceAttr(resourceName, "event_bus_name", "default"),
resource.TestCheckResourceAttr(resourceName, "http_target.#", "0"),
resource.TestCheckResourceAttr(resourceName, "input", ""),
resource.TestCheckResourceAttr(resourceName, "input_path", ""),
resource.TestCheckResourceAttr(resourceName, "input_transformer.#", "0"),
resource.TestCheckResourceAttr(resourceName, "kinesis_target.#", "0"),
resource.TestCheckResourceAttr(resourceName, "redshift_target.#", "0"),
resource.TestCheckResourceAttr(resourceName, "retry_policy.#", "0"),
resource.TestCheckResourceAttr(resourceName, "role_arn", ""),
resource.TestCheckResourceAttr(resourceName, "rule", rName),
resource.TestCheckResourceAttr(resourceName, "run_command_targets.#", "0"),
resource.TestCheckResourceAttr(resourceName, "sagemaker_pipeline_target.#", "0"),
resource.TestCheckResourceAttr(resourceName, "sqs_target.#", "0"),
resource.TestCheckResourceAttr(resourceName, "input_transformer.#", "0"),
resource.TestCheckResourceAttr(resourceName, "target_id", rName),
),
},
{
Expand Down Expand Up @@ -818,6 +821,41 @@ func TestAccEventsTarget_sqs(t *testing.T) {
})
}

func TestAccEventsTarget_sageMakerPipeline(t *testing.T) {
ctx := acctest.Context(t)
var v eventbridge.Target
rName := sdkacctest.RandomWithPrefix(acctest.ResourcePrefix)
resourceName := "aws_cloudwatch_event_target.test"

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { acctest.PreCheck(ctx, t) },
ErrorCheck: acctest.ErrorCheck(t, eventbridge.EndpointsID),
ProtoV5ProviderFactories: acctest.ProtoV5ProviderFactories,
CheckDestroy: testAccCheckTargetDestroy(ctx),
Steps: []resource.TestStep{
{
Config: testAccTargetConfig_sageMakerPipeline(rName),
Check: resource.ComposeTestCheckFunc(
testAccCheckTargetExists(ctx, resourceName, &v),
resource.TestCheckResourceAttrPair(resourceName, "arn", "aws_sagemaker_pipeline.test", "arn"),
resource.TestCheckResourceAttr(resourceName, "sagemaker_pipeline_target.#", "1"),
resource.TestCheckResourceAttr(resourceName, "sagemaker_pipeline_target.0.pipeline_parameter_list.#", "1"),
resource.TestCheckTypeSetElemNestedAttrs(resourceName, "sagemaker_pipeline_target.0.pipeline_parameter_list.*", map[string]string{
"name": "key",
"value": "value",
}),
),
},
{
ResourceName: resourceName,
ImportState: true,
ImportStateIdFunc: testAccTargetImportStateIdFunc(resourceName),
ImportStateVerify: true,
},
},
})
}

func TestAccEventsTarget_Input_transformer(t *testing.T) {
ctx := acctest.Context(t)
resourceName := "aws_cloudwatch_event_target.test"
Expand Down Expand Up @@ -2183,6 +2221,107 @@ resource "aws_sqs_queue" "test" {
`, rName)
}

func testAccTargetConfig_sageMakerPipeline(rName string) string {
return fmt.Sprintf(`
data "aws_partition" "current" {}
resource "aws_cloudwatch_event_rule" "test" {
name = %[1]q
description = "schedule_batch_test"
schedule_expression = "rate(5 minutes)"
}
resource "aws_cloudwatch_event_target" "test" {
arn = aws_sagemaker_pipeline.test.arn
rule = aws_cloudwatch_event_rule.test.id
role_arn = aws_iam_role.test.arn
sagemaker_pipeline_target {
pipeline_parameter_list {
name = "key"
value = "value"
}
}
target_id = %[1]q
}
resource "aws_iam_role" "test" {
name = %[1]q
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Principal": {
"Service": "events.${data.aws_partition.current.dns_suffix}"
},
"Effect": "Allow",
"Sid": ""
}
]
}
EOF
}
resource "aws_iam_role_policy" "test" {
name = %[1]q
role = aws_iam_role.test.id
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sagemaker:*",
"Effect": "Allow",
"Resource": [
"*"
]
}
]
}
EOF
}
resource "aws_iam_role" "sagemaker" {
name = "%[1]s-2"
path = "/"
assume_role_policy = data.aws_iam_policy_document.test.json
}
data "aws_iam_policy_document" "test" {
statement {
actions = ["sts:AssumeRole"]
principals {
type = "Service"
identifiers = ["sagemaker.amazonaws.com"]
}
}
}
resource "aws_sagemaker_pipeline" "test" {
pipeline_name = %[1]q
pipeline_display_name = %[1]q
role_arn = aws_iam_role.sagemaker.arn
pipeline_definition = jsonencode({
Version = "2020-12-01"
Steps = [{
Name = "Test"
Type = "Fail"
Arguments = {
ErrorMessage = "test"
}
}]
})
}
`, rName)
}

func testAccTargetConfig_inputTransformer(rName string, inputPathKeys []string) string {
var inputPaths, inputTemplates strings.Builder

Expand Down
10 changes: 10 additions & 0 deletions website/docs/r/cloudwatch_event_target.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ The following arguments are optional:
* `run_command_targets` - (Optional) Parameters used when you are using the rule to invoke Amazon EC2 Run Command. Documented below. A maximum of 5 are allowed.
* `redshift_target` - (Optional) Parameters used when you are using the rule to invoke an Amazon Redshift Statement. Documented below. A maximum of 1 are allowed.
* `retry_policy` - (Optional) Parameters used when you are providing retry policies. Documented below. A maximum of 1 are allowed.
* `sagemaker_pipeline_target` - (Optional) Parameters used when you are using the rule to invoke an Amazon SageMaker Pipeline. Documented below. A maximum of 1 are allowed.
* `sqs_target` - (Optional) Parameters used when you are using the rule to invoke an Amazon SQS Queue. Documented below. A maximum of 1 are allowed.
* `target_id` - (Optional) The unique target assignment ID. If missing, will generate a random, unique id.

Expand Down Expand Up @@ -572,6 +573,15 @@ For more information, see [Task Networking](https://docs.aws.amazon.com/AmazonEC

* `message_group_id` - (Optional) The FIFO message group ID to use as the target.

### sagemaker_pipeline_target

* `pipeline_parameter_list` - (Optional) List of Parameter names and values for SageMaker Model Building Pipeline execution.

#### pipeline_parameter_list

* `name` - (Required) Name of parameter to start execution of a SageMaker Model Building Pipeline.
* `value` - (Required) Value of parameter to start execution of a SageMaker Model Building Pipeline.

## Attribute Reference

This resource exports no additional attributes.
Expand Down

0 comments on commit 7a7b8df

Please sign in to comment.