Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

aws_lambda_event_source_mapping: maximum_batching_window_in_seconds #10051

Merged
18 changes: 17 additions & 1 deletion aws/resource_aws_lambda_event_source_mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ func resourceAwsLambdaEventSourceMapping() *schema.Resource {
Optional: true,
Default: true,
},
"maximum_batching_window_in_seconds": {
Type: schema.TypeInt,
Optional: true,
},
"function_arn": {
Type: schema.TypeString,
Computed: true,
Expand Down Expand Up @@ -144,6 +148,10 @@ func resourceAwsLambdaEventSourceMappingCreate(d *schema.ResourceData, meta inte
params.BatchSize = aws.Int64(int64(batchSize.(int)))
}

if batchWindow, ok := d.GetOk("maximum_batching_window_in_seconds"); ok {
params.MaximumBatchingWindowInSeconds = aws.Int64(int64(batchWindow.(int)))
}

if startingPosition, ok := d.GetOk("starting_position"); ok {
params.StartingPosition = aws.String(startingPosition.(string))
}
Expand Down Expand Up @@ -210,6 +218,7 @@ func resourceAwsLambdaEventSourceMappingRead(d *schema.ResourceData, meta interf
}

d.Set("batch_size", eventSourceMappingConfiguration.BatchSize)
d.Set("maximum_batching_window_in_seconds", eventSourceMappingConfiguration.MaximumBatchingWindowInSeconds)
d.Set("event_source_arn", eventSourceMappingConfiguration.EventSourceArn)
d.Set("function_arn", eventSourceMappingConfiguration.FunctionArn)
d.Set("last_modified", eventSourceMappingConfiguration.LastModified)
Expand Down Expand Up @@ -278,7 +287,14 @@ func resourceAwsLambdaEventSourceMappingUpdate(d *schema.ResourceData, meta inte
Enabled: aws.Bool(d.Get("enabled").(bool)),
}

err := resource.Retry(5*time.Minute, func() *resource.RetryError {
// AWS API will fail if this parameter is set (even as default value) for sqs event source. Ideally this should be implemented in GO SDK or AWS API itself.
eventSourceArn, err := arn.Parse(d.Get("event_source_arn").(string))

if err == nil && eventSourceArn.Service != "sqs" {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bflad your proposed change swallows the err here if it were to exist, was that intentional?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vs returning the err or printing a warning of sorts

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry if this was confusing, I should have elaborated on swallowing the error -- if anything, potentially returning a warning log would feel appropriate here since the ARN should have gone through multiple forms of validation at this point (either from Terraform or the API itself). Returning an error could leave operators in a potentially unrecoverable state if for some reason the event source ARN was not an ARN and they made it this far. Hope this makes sense. 😅

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense - thanks for the explanation!

params.MaximumBatchingWindowInSeconds = aws.Int64(int64(d.Get("maximum_batching_window_in_seconds").(int)))
}

err = resource.Retry(5*time.Minute, func() *resource.RetryError {
_, err := conn.UpdateEventSourceMapping(params)
if err != nil {
if isAWSErr(err, lambda.ErrCodeInvalidParameterValueException, "") ||
Expand Down
52 changes: 52 additions & 0 deletions aws/resource_aws_lambda_event_source_mapping_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,45 @@ func TestAccAWSLambdaEventSourceMapping_StartingPositionTimestamp(t *testing.T)
})
}

func TestAccAWSLambdaEventSourceMapping_BatchWindow(t *testing.T) {
var conf lambda.EventSourceMappingConfiguration
rName := acctest.RandomWithPrefix("tf-acc-test")
resourceName := "aws_lambda_event_source_mapping.test"
batchWindow := int64(5)
batchWindowUpdate := int64(10)

resource.ParallelTest(t, resource.TestCase{
PreCheck: func() { testAccPreCheck(t) },
Providers: testAccProviders,
CheckDestroy: testAccCheckLambdaEventSourceMappingDestroy,
Steps: []resource.TestStep{
{
Config: testAccAWSLambdaEventSourceMappingConfigKinesisBatchWindow(rName, batchWindow),
Check: resource.ComposeTestCheckFunc(
testAccCheckAwsLambdaEventSourceMappingExists(resourceName, &conf),
resource.TestCheckResourceAttr(resourceName, "maximum_batching_window_in_seconds", strconv.Itoa(int(batchWindow))),
),
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if there was an additional test step to verify updating maximum_batching_window_in_seconds 👍

{
ResourceName: resourceName,
ImportState: true,
ImportStateVerify: true,
ImportStateVerifyIgnore: []string{
"starting_position",
"starting_position_timestamp",
},
},
{
Config: testAccAWSLambdaEventSourceMappingConfigKinesisBatchWindow(rName, batchWindowUpdate),
Check: resource.ComposeTestCheckFunc(
testAccCheckAwsLambdaEventSourceMappingExists(resourceName, &conf),
resource.TestCheckResourceAttr(resourceName, "maximum_batching_window_in_seconds", strconv.Itoa(int(batchWindowUpdate))),
),
},
},
})
}

func testAccCheckAWSLambdaEventSourceMappingIsBeingDisabled(conf *lambda.EventSourceMappingConfiguration) resource.TestCheckFunc {
return func(s *terraform.State) error {
conn := testAccProvider.Meta().(*AWSClient).lambdaconn
Expand Down Expand Up @@ -554,6 +593,19 @@ resource "aws_lambda_event_source_mapping" "test" {
`, startingPositionTimestamp)
}

func testAccAWSLambdaEventSourceMappingConfigKinesisBatchWindow(rName string, batchWindow int64) string {
return testAccAWSLambdaEventSourceMappingConfigKinesisBase(rName) + fmt.Sprintf(`
resource "aws_lambda_event_source_mapping" "test" {
batch_size = 100
maximum_batching_window_in_seconds = %v
enabled = true
event_source_arn = "${aws_kinesis_stream.test.arn}"
function_name = "${aws_lambda_function.test.arn}"
starting_position = "TRIM_HORIZON"
}
`, batchWindow)
}

func testAccAWSLambdaEventSourceMappingConfig_kinesis(roleName, policyName, attName, streamName,
funcName, uFuncName string) string {
return fmt.Sprintf(`
Expand Down
1 change: 1 addition & 0 deletions website/docs/r/lambda_event_source_mapping.html.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ resource "aws_lambda_event_source_mapping" "example" {
## Argument Reference

* `batch_size` - (Optional) The largest number of records that Lambda will retrieve from your event source at the time of invocation. Defaults to `100` for DynamoDB and Kinesis, `10` for SQS.
* `maximum_batching_window_in_seconds` - (Optional) The maximum amount of time to gather records before invoking the function, in seconds. Records will continue to buffer until either `maximum_batching_window_in_seconds` expires or `batch_size` has been met. Defaults to as soon as records are available in the stream. If the batch it reads from the stream only has one record in it, Lambda only sends one record to the function.
* `event_source_arn` - (Required) The event source ARN - can be a Kinesis stream, DynamoDB stream, or SQS queue.
* `enabled` - (Optional) Determines if the mapping will be enabled on creation. Defaults to `true`.
* `function_name` - (Required) The name or the ARN of the Lambda function that will be subscribing to events.
Expand Down