diff --git a/.gitignore b/.gitignore index d69f9330bc92d..59b50782c35c8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ tivan .vagrant telegraf +.idea diff --git a/outputs/all/all.go b/outputs/all/all.go index 2c00f43f9f91a..08ebf254994f7 100644 --- a/outputs/all/all.go +++ b/outputs/all/all.go @@ -6,6 +6,7 @@ import ( _ "github.com/influxdb/telegraf/outputs/datadog" _ "github.com/influxdb/telegraf/outputs/influxdb" _ "github.com/influxdb/telegraf/outputs/kafka" + _ "github.com/influxdb/telegraf/outputs/kinesis" _ "github.com/influxdb/telegraf/outputs/librato" _ "github.com/influxdb/telegraf/outputs/mqtt" _ "github.com/influxdb/telegraf/outputs/nsq" diff --git a/outputs/kinesis/README.md b/outputs/kinesis/README.md new file mode 100644 index 0000000000000..3b75117aca24d --- /dev/null +++ b/outputs/kinesis/README.md @@ -0,0 +1,61 @@ +## Amazon Kinesis Output for Telegraf + +This is an experimental plugin that is still in the early stages of development. It will batch up all of the Points +in one Put request to Kinesis. This should save the number of API requests by a considerable level. + +## About Kinesis + +This is not the place to document all of the various Kinesis terms however it +maybe useful for users to review Amazons official documentation which is available +[here](http://docs.aws.amazon.com/kinesis/latest/dev/key-concepts.html). + +## Amazon Authentication + +This plugin uses a credential chain for Authentication with the Kinesis API endpoint. In the following order the plugin +will attempt to authenticate. +1. [IAMS Role](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html) +2. [Environment Variables](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk) +3. [Shared Credentials](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk) + + +## Config + +For this output plugin to function correctly the following variables must be configured. + +* region +* streamname +* partitionkey + +### region + +The region is the Amazon region that you wish to connect to. Examples include but are not limited to +* us-west-1 +* us-west-2 +* us-east-1 +* ap-southeast-1 +* ap-southeast-2 + +### streamname + +The streamname is used by the plugin to ensure that data is sent to the correct Kinesis stream. It is important to +note that the stream *MUST* be pre-configured for this plugin to function correctly. If the stream does not exist the +plugin will result in telegraf exiting with an exit code of 1. + +### partitionkey + +This is used to group data within a stream. Currently this plugin only supports a single partitionkey. +Manually configuring different hosts, or groups of hosts with manually selected partitionkeys might be a workable +solution to scale out. + +### format + +The format configuration value has been designated to allow people to change the format of the Point as written to +Kinesis. Right now there are two supported formats string and custom. + +#### string + +String is defined using the default Point.String() value and translated to []byte for the Kinesis stream. + +#### custom + +Custom is a string defined by a number of values in the FormatMetric() function. \ No newline at end of file diff --git a/outputs/kinesis/kinesis.go b/outputs/kinesis/kinesis.go new file mode 100644 index 0000000000000..1441317070ddb --- /dev/null +++ b/outputs/kinesis/kinesis.go @@ -0,0 +1,179 @@ +package kinesis + +import ( + "errors" + "fmt" + "log" + "os" + "sync/atomic" + "time" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds" + "github.com/aws/aws-sdk-go/aws/ec2metadata" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/kinesis" + + "github.com/influxdb/influxdb/client/v2" + "github.com/influxdb/telegraf/outputs" +) + +type KinesisOutput struct { + Region string `toml:"region"` + StreamName string `toml:"streamname"` + PartitionKey string `toml:"partitionkey"` + Format string `toml:"format"` + Debug bool `toml:"debug"` + svc *kinesis.Kinesis +} + +var sampleConfig = ` + # Amazon REGION of kinesis endpoint. + region = "ap-southeast-2" + # Kinesis StreamName must exist prior to starting telegraf. + streamname = "StreamName" + # PartitionKey as used for sharding data. + partitionkey = "PartitionKey" + # format of the Data payload in the kinesis PutRecord, supported + # String and Custom. + format = "string" + # debug will show upstream aws messages. + debug = false +` + +func (k *KinesisOutput) SampleConfig() string { + return sampleConfig +} + +func (k *KinesisOutput) Description() string { + return "Configuration for the AWS Kinesis output." +} + +func checkstream(l []*string, s string) bool { + // Check if the StreamName exists in the slice returned from the ListStreams API request. + for _, stream := range l { + if *stream == s { + return true + } + } + return false +} + +func (k *KinesisOutput) Connect() error { + // We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using + // environment variables, and then Shared Credentials. + if k.Debug { + log.Printf("kinesis: Establishing a connection to Kinesis in %+v", k.Region) + } + Config := &aws.Config{ + Region: aws.String(k.Region), + Credentials: credentials.NewChainCredentials( + []credentials.Provider{ + &ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())}, + &credentials.EnvProvider{}, + &credentials.SharedCredentialsProvider{}, + }), + } + svc := kinesis.New(session.New(Config)) + + KinesisParams := &kinesis.ListStreamsInput{ + Limit: aws.Int64(100), + } + + resp, err := svc.ListStreams(KinesisParams) + + if err != nil { + log.Printf("kinesis: Error in ListSteams API call : %+v \n", err) + } + + if checkstream(resp.StreamNames, k.StreamName) { + if k.Debug { + log.Printf("kinesis: Stream Exists") + } + k.svc = svc + return nil + } else { + log.Printf("kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName) + os.Exit(1) + } + return err +} + +func (k *KinesisOutput) Close() error { + return errors.New("Error") +} + +func FormatMetric(k *KinesisOutput, point *client.Point) (string, error) { + if k.Format == "string" { + return point.String(), nil + } else { + m := fmt.Sprintf("%+v,%+v,%+v", + point.Name(), + point.Tags(), + point.String()) + return m, nil + } +} + +func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Duration { + start := time.Now() + payload := &kinesis.PutRecordsInput{ + Records: r, + StreamName: aws.String(k.StreamName), + } + + if k.Debug { + resp, err := k.svc.PutRecords(payload) + if err != nil { + log.Printf("kinesis: Unable to write to Kinesis : %+v \n", err.Error()) + } + log.Printf("%+v \n", resp) + + } else { + _, err := k.svc.PutRecords(payload) + if err != nil { + log.Printf("kinesis: Unable to write to Kinesis : %+v \n", err.Error()) + } + } + return time.Since(start) +} + +func (k *KinesisOutput) Write(points []*client.Point) error { + var sz uint32 = 0 + + if len(points) == 0 { + return nil + } + + r := []*kinesis.PutRecordsRequestEntry{} + + for _, p := range points { + atomic.AddUint32(&sz, 1) + + metric, _ := FormatMetric(k, p) + d := kinesis.PutRecordsRequestEntry{ + Data: []byte(metric), + PartitionKey: aws.String(k.PartitionKey), + } + r = append(r, &d) + + if sz == 500 { + // Max Messages Per PutRecordRequest is 500 + elapsed := writekinesis(k, r) + log.Printf("Wrote a %+v point batch to Kinesis in %+v.\n", sz, elapsed) + atomic.StoreUint32(&sz, 0) + r = nil + } + } + + writekinesis(k, r) + + return nil +} + +func init() { + outputs.Add("kinesis", func() outputs.Output { + return &KinesisOutput{} + }) +} diff --git a/outputs/kinesis/kinesis_test.go b/outputs/kinesis/kinesis_test.go new file mode 100644 index 0000000000000..76bf242d7511b --- /dev/null +++ b/outputs/kinesis/kinesis_test.go @@ -0,0 +1,40 @@ +package kinesis + +import ( + "testing" + "github.com/influxdb/telegraf/testutil" + "github.com/stretchr/testify/require" +) + + +func TestFormatMetric(t *testing.T) { + if testing.Short() { + t.Skip("Skipping integration test in short mode") + } + + k := &KinesisOutput{ + Format: "string", + } + + p := testutil.MockBatchPoints().Points()[0] + + valid_string := "test1,tag1=value1 value=1 1257894000000000000" + func_string, err := FormatMetric(k, p) + + if func_string != valid_string { + t.Error("Expected ", valid_string) + } + require.NoError(t, err) + + k = &KinesisOutput{ + Format: "custom", + } + + valid_custom := "test1,map[tag1:value1],test1,tag1=value1 value=1 1257894000000000000" + func_custom, err := FormatMetric(k, p) + + if func_custom != valid_custom { + t.Error("Expected ", valid_custom) + } + require.NoError(t, err) +}