Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

provider/aws: Add support for cloudwatch_logging_options to AWS Kinesis Firehose Delivery Streams #8671

Merged
merged 1 commit into from
Sep 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,34 @@ import (
"github.com/hashicorp/terraform/helper/schema"
)

func cloudWatchLoggingOptionsSchema() *schema.Schema {
return &schema.Schema{
Type: schema.TypeSet,
MaxItems: 1,
Optional: true,
Computed: true,
Elem: &schema.Resource{
Schema: map[string]*schema.Schema{
"enabled": {
Type: schema.TypeBool,
Optional: true,
Default: false,
},

"log_group_name": {
Type: schema.TypeString,
Optional: true,
},

"log_stream_name": {
Type: schema.TypeString,
Optional: true,
},
},
},
}
}

func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
return &schema.Resource{
Create: resourceAwsKinesisFirehoseDeliveryStreamCreate,
Expand Down Expand Up @@ -135,6 +163,8 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
Type: schema.TypeString,
Optional: true,
},

"cloudwatch_logging_options": cloudWatchLoggingOptionsSchema(),
},
},
},
Expand Down Expand Up @@ -179,6 +209,8 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
Type: schema.TypeString,
Required: true,
},

"cloudwatch_logging_options": cloudWatchLoggingOptionsSchema(),
},
},
},
Expand Down Expand Up @@ -286,6 +318,8 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
return
},
},

"cloudwatch_logging_options": cloudWatchLoggingOptionsSchema(),
},
},
},
Expand Down Expand Up @@ -314,7 +348,7 @@ func resourceAwsKinesisFirehoseDeliveryStream() *schema.Resource {
func createS3Config(d *schema.ResourceData) *firehose.S3DestinationConfiguration {
s3 := d.Get("s3_configuration").([]interface{})[0].(map[string]interface{})

return &firehose.S3DestinationConfiguration{
configuration := &firehose.S3DestinationConfiguration{
BucketARN: aws.String(s3["bucket_arn"].(string)),
RoleARN: aws.String(s3["role_arn"].(string)),
BufferingHints: &firehose.BufferingHints{
Expand All @@ -325,22 +359,35 @@ func createS3Config(d *schema.ResourceData) *firehose.S3DestinationConfiguration
CompressionFormat: aws.String(s3["compression_format"].(string)),
EncryptionConfiguration: extractEncryptionConfiguration(s3),
}

if _, ok := s3["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3)
}

return configuration
}

func updateS3Config(d *schema.ResourceData) *firehose.S3DestinationUpdate {
s3 := d.Get("s3_configuration").([]interface{})[0].(map[string]interface{})

return &firehose.S3DestinationUpdate{
configuration := &firehose.S3DestinationUpdate{
BucketARN: aws.String(s3["bucket_arn"].(string)),
RoleARN: aws.String(s3["role_arn"].(string)),
BufferingHints: &firehose.BufferingHints{
IntervalInSeconds: aws.Int64((int64)(s3["buffer_interval"].(int))),
SizeInMBs: aws.Int64((int64)(s3["buffer_size"].(int))),
},
Prefix: extractPrefixConfiguration(s3),
CompressionFormat: aws.String(s3["compression_format"].(string)),
EncryptionConfiguration: extractEncryptionConfiguration(s3),
Prefix: extractPrefixConfiguration(s3),
CompressionFormat: aws.String(s3["compression_format"].(string)),
EncryptionConfiguration: extractEncryptionConfiguration(s3),
CloudWatchLoggingOptions: extractCloudWatchLoggingConfiguration(s3),
}

if _, ok := s3["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(s3)
}

return configuration
}

func extractEncryptionConfiguration(s3 map[string]interface{}) *firehose.EncryptionConfiguration {
Expand All @@ -357,6 +404,29 @@ func extractEncryptionConfiguration(s3 map[string]interface{}) *firehose.Encrypt
}
}

func extractCloudWatchLoggingConfiguration(s3 map[string]interface{}) *firehose.CloudWatchLoggingOptions {
config := s3["cloudwatch_logging_options"].(*schema.Set).List()
if len(config) == 0 {
return nil
}

loggingConfig := config[0].(map[string]interface{})
loggingOptions := &firehose.CloudWatchLoggingOptions{
Enabled: aws.Bool(loggingConfig["enabled"].(bool)),
}

if v, ok := loggingConfig["log_group_name"]; ok {
loggingOptions.LogGroupName = aws.String(v.(string))
}

if v, ok := loggingConfig["log_stream_name"]; ok {
loggingOptions.LogStreamName = aws.String(v.(string))
}

return loggingOptions

}

func extractPrefixConfiguration(s3 map[string]interface{}) *string {
if v, ok := s3["prefix"]; ok {
return aws.String(v.(string))
Expand All @@ -374,14 +444,20 @@ func createRedshiftConfig(d *schema.ResourceData, s3Config *firehose.S3Destinati

redshift := rl[0].(map[string]interface{})

return &firehose.RedshiftDestinationConfiguration{
configuration := &firehose.RedshiftDestinationConfiguration{
ClusterJDBCURL: aws.String(redshift["cluster_jdbcurl"].(string)),
Password: aws.String(redshift["password"].(string)),
Username: aws.String(redshift["username"].(string)),
RoleARN: aws.String(redshift["role_arn"].(string)),
CopyCommand: extractCopyCommandConfiguration(redshift),
S3Configuration: s3Config,
}, nil
}

if _, ok := redshift["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(redshift)
}

return configuration, nil
}

func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3DestinationUpdate) (*firehose.RedshiftDestinationUpdate, error) {
Expand All @@ -393,14 +469,20 @@ func updateRedshiftConfig(d *schema.ResourceData, s3Update *firehose.S3Destinati

redshift := rl[0].(map[string]interface{})

return &firehose.RedshiftDestinationUpdate{
configuration := &firehose.RedshiftDestinationUpdate{
ClusterJDBCURL: aws.String(redshift["cluster_jdbcurl"].(string)),
Password: aws.String(redshift["password"].(string)),
Username: aws.String(redshift["username"].(string)),
RoleARN: aws.String(redshift["role_arn"].(string)),
CopyCommand: extractCopyCommandConfiguration(redshift),
S3Update: s3Update,
}, nil
}

if _, ok := redshift["cloudwatch_logging_options"]; ok {
configuration.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(redshift)
}

return configuration, nil
}

func createElasticsearchConfig(d *schema.ResourceData, s3Config *firehose.S3DestinationConfiguration) (*firehose.ElasticsearchDestinationConfiguration, error) {
Expand All @@ -422,6 +504,10 @@ func createElasticsearchConfig(d *schema.ResourceData, s3Config *firehose.S3Dest
S3Configuration: s3Config,
}

if _, ok := es["cloudwatch_logging_options"]; ok {
config.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(es)
}

if indexRotationPeriod, ok := es["index_rotation_period"]; ok {
config.IndexRotationPeriod = aws.String(indexRotationPeriod.(string))
}
Expand Down Expand Up @@ -451,6 +537,10 @@ func updateElasticsearchConfig(d *schema.ResourceData, s3Update *firehose.S3Dest
S3Update: s3Update,
}

if _, ok := es["cloudwatch_logging_options"]; ok {
update.CloudWatchLoggingOptions = extractCloudWatchLoggingConfiguration(es)
}

if indexRotationPeriod, ok := es["index_rotation_period"]; ok {
update.IndexRotationPeriod = aws.String(indexRotationPeriod.(string))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,26 @@ func TestAccAWSKinesisFirehoseDeliveryStream_s3basic(t *testing.T) {
})
}

func TestAccAWSKinesisFirehoseDeliveryStream_s3WithCloudwatchLogging(t *testing.T) {
var stream firehose.DeliveryStreamDescription
ri := acctest.RandInt()

resource.Test(t, resource.TestCase{
PreCheck: testAccKinesisFirehosePreCheck(t),
Providers: testAccProviders,
CheckDestroy: testAccCheckKinesisFirehoseDeliveryStreamDestroy,
Steps: []resource.TestStep{
resource.TestStep{
Config: testAccKinesisFirehoseDeliveryStreamConfig_s3WithCloudwatchLogging(os.Getenv("AWS_ACCOUNT_ID"), ri),
Check: resource.ComposeTestCheckFunc(
testAccCheckKinesisFirehoseDeliveryStreamExists("aws_kinesis_firehose_delivery_stream.test_stream", &stream),
testAccCheckAWSKinesisFirehoseDeliveryStreamAttributes(&stream, nil, nil, nil),
),
},
},
})
}

func TestAccAWSKinesisFirehoseDeliveryStream_s3ConfigUpdates(t *testing.T) {
var stream firehose.DeliveryStreamDescription

Expand Down Expand Up @@ -340,6 +360,100 @@ EOF

`

func testAccKinesisFirehoseDeliveryStreamConfig_s3WithCloudwatchLogging(accountId string, rInt int) string {
return fmt.Sprintf(`
resource "aws_iam_role" "firehose" {
name = "tf_acctest_firehose_delivery_role_%d"
assume_role_policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Principal": {
"Service": "firehose.amazonaws.com"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "%s"
}
}
}
]
}
EOF
}

resource "aws_iam_role_policy" "firehose" {
name = "tf_acctest_firehose_delivery_policy_%d"
role = "${aws_iam_role.firehose.id}"
policy = <<EOF
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "",
"Effect": "Allow",
"Action": [
"s3:AbortMultipartUpload",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::${aws_s3_bucket.bucket.id}",
"arn:aws:s3:::${aws_s3_bucket.bucket.id}/*"
]
},
{
"Effect": "Allow",
"Action": [
"logs:putLogEvents"
],
"Resource": [
"arn:aws:logs::log-group:/aws/kinesisfirehose/*"
]
}
]
}
EOF
}

resource "aws_s3_bucket" "bucket" {
bucket = "tf-test-bucket-%d"
acl = "private"
}

resource "aws_cloudwatch_log_group" "test" {
name = "example-%d"
}

resource "aws_cloudwatch_log_stream" "test" {
name = "sample-log-stream-test-%d"
log_group_name = "${aws_cloudwatch_log_group.test.name}"
}

resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
depends_on = ["aws_iam_role_policy.firehose"]
name = "terraform-kinesis-firehose-basictest-cloudwatch"
destination = "s3"
s3_configuration {
role_arn = "${aws_iam_role.firehose.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
cloudwatch_logging_options {
enabled = true
log_group_name = "${aws_cloudwatch_log_group.test.name}"
log_stream_name = "${aws_cloudwatch_log_stream.test.name}"
}
}
}
`, rInt, accountId, rInt, rInt, rInt, rInt)
}

var testAccKinesisFirehoseDeliveryStreamConfig_s3basic = testAccKinesisFirehoseDeliveryStreamBaseConfig + `
resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
depends_on = ["aws_iam_role_policy.firehose"]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ resource "aws_elasticsearch_domain" "test_cluster" {

resource "aws_kinesis_firehose_delivery_stream" "test_stream" {
name = "terraform-kinesis-firehose-test-stream"
destination = "elasticsearch"
destination = "redshift"
s3_configuration {
role_arn = "${aws_iam_role.firehose_role.arn}"
bucket_arn = "${aws_s3_bucket.bucket.arn}"
Expand Down Expand Up @@ -137,6 +137,7 @@ The `s3_configuration` object supports the following:
* `compression_format` - (Optional) The compression format. If no value is specified, the default is UNCOMPRESSED. Other supported values are GZIP, ZIP & Snappy. If the destination is redshift you cannot use ZIP or Snappy.
* `kms_key_arn` - (Optional) If set, the stream will encrypt data using the key in KMS, otherwise, no encryption will
be used.
* `cloudwatch_logging_options` - (Optional) The CloudWatch Logging Options for the delivery stream. More details are given below

The `redshift_configuration` object supports the following:

Expand All @@ -147,6 +148,7 @@ The `redshift_configuration` object supports the following:
* `data_table_name` - (Required) The name of the table in the redshift cluster that the s3 bucket will copy to.
* `copy_options` - (Optional) Copy options for copying the data from the s3 intermediate bucket into redshift.
* `data_table_columns` - (Optional) The data table columns that will be targeted by the copy command.
* `cloudwatch_logging_options` - (Optional) The CloudWatch Logging Options for the delivery stream. More details are given below

The `elasticsearch_configuration` object supports the following:

Expand All @@ -159,6 +161,15 @@ The `elasticsearch_configuration` object supports the following:
* `role_arn` - (Required) The ARN of the IAM role to be assumed by Firehose for calling the Amazon ES Configuration API and for indexing documents. The pattern needs to be `arn:.*`.
* `s3_backup_mode` - (Optional) Defines how documents should be delivered to Amazon S3. Valid values are `FailedDocumentsOnly` and `AllDocuments`. Default value is `FailedDocumentsOnly`.
* `type_name` - (Required) The Elasticsearch type name with maximum length of 100 characters.
* `cloudwatch_logging_options` - (Optional) The CloudWatch Logging Options for the delivery stream. More details are given below

The `cloudwatch_logging_options` object supports the following:

* `enabled` - (Optional) Enables or disables the logging. Defaults to `false`.
* `log_group_name` - (Optional) The CloudWatch group name for logging. This value is required if `enabled` is true`.
* `log_stream_name` - (Optional) The CloudWatch log stream name for logging. This value is required if `enabled` is true`.



## Attributes Reference

Expand Down