From bc0119940762a4c2905d9e1492f3b83bd943c59c Mon Sep 17 00:00:00 2001 From: Trevor Pounds Date: Sun, 14 Oct 2018 16:00:15 -0400 Subject: [PATCH 1/2] Update AWS SDK to v1.15.54. --- Gopkg.lock | 6 +++--- Gopkg.toml | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 80a6277bd4211..823a248ecc98b 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -137,7 +137,7 @@ revision = "f2867c24984aa53edec54a138c03db934221bdea" [[projects]] - digest = "1:65a05bde9b02f645c73afa61c9f6af92d94d726c81a268f45cc70218bd58de65" + digest = "1:996727880e06dcf037f712c4d046e241d1b1b01844636fefb0fbaa480cfd230e" name = "github.com/aws/aws-sdk-go" packages = [ "aws", @@ -173,8 +173,8 @@ "service/sts", ] pruneopts = "" - revision = "8cf662a972fa7fba8f2c1ec57648cf840e2bb401" - version = "v1.14.30" + revision = "bf8067ceb6e7f51e150c218972dccfeeed892b85" + version = "v1.15.54" [[projects]] branch = "master" diff --git a/Gopkg.toml b/Gopkg.toml index 2fa3e4c4069fb..c54ac601565f8 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -16,7 +16,7 @@ [[constraint]] name = "github.com/aws/aws-sdk-go" - version = "1.14.8" + version = "1.15.54" [[constraint]] name = "github.com/bsm/sarama-cluster" From d87715127346bbd3b04e34c606ab707e33eeabee Mon Sep 17 00:00:00 2001 From: Trevor Pounds Date: Sun, 14 Oct 2018 17:03:28 -0400 Subject: [PATCH 2/2] Simplify Kinesis stream validation. Updates the plugin's stream discovery to use DescribeStreamSummary[1] in favor of ListStreams[2]. The API switch has the following benefits. * Faster Startup - Previously, the configured stream had to be compared against all available streams (possibly paginating the results) to check its existence. Now it just calls a single API. * Improved Security - Previously, the AWS IAM credentials had to have read access to list all available streams. Now the AWS IAM credentials can restrict operations to a specific Kinesis stream ARN. * Improve API Limits - Per the AWS documentation, DescribeStreamSummary[1] has a limit of 20 transactions per second while ListStreams[2] has a limit of 5 transaction per second. [1] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamSummary.html [2] https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreams.html --- plugins/outputs/kinesis/kinesis.go | 40 ++++++------------------------ 1 file changed, 7 insertions(+), 33 deletions(-) diff --git a/plugins/outputs/kinesis/kinesis.go b/plugins/outputs/kinesis/kinesis.go index 0143791460768..402f95156e0a7 100644 --- a/plugins/outputs/kinesis/kinesis.go +++ b/plugins/outputs/kinesis/kinesis.go @@ -2,7 +2,6 @@ package kinesis import ( "log" - "os" "time" "github.com/aws/aws-sdk-go/aws" @@ -115,17 +114,11 @@ func (k *KinesisOutput) Description() string { return "Configuration for the AWS Kinesis output." } -func checkstream(l []*string, s string) bool { - // Check if the StreamName exists in the slice returned from the ListStreams API request. - for _, stream := range l { - if *stream == s { - return true - } +func (k *KinesisOutput) Connect() error { + if k.Partition == nil { + log.Print("E! kinesis : Deprecated paritionkey configuration in use, please consider using outputs.kinesis.partition") } - return false -} -func (k *KinesisOutput) Connect() error { // We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using // environment variables, and then Shared Credentials. if k.Debug { @@ -145,29 +138,10 @@ func (k *KinesisOutput) Connect() error { configProvider := credentialConfig.Credentials() svc := kinesis.New(configProvider) - KinesisParams := &kinesis.ListStreamsInput{ - Limit: aws.Int64(100), - } - - resp, err := svc.ListStreams(KinesisParams) - - if err != nil { - log.Printf("E! kinesis: Error in ListSteams API call : %+v \n", err) - } - - if checkstream(resp.StreamNames, k.StreamName) { - if k.Debug { - log.Printf("E! kinesis: Stream Exists") - } - k.svc = svc - return nil - } else { - log.Printf("E! kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName) - os.Exit(1) - } - if k.Partition == nil { - log.Print("E! kinesis : Deprecated paritionkey configuration in use, please consider using outputs.kinesis.partition") - } + _, err := svc.DescribeStreamSummary(&kinesis.DescribeStreamSummaryInput{ + StreamName: aws.String(k.StreamName), + }) + k.svc = svc return err }