Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add kinesis output support #428

Closed
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
tivan
.vagrant
telegraf
.idea
1 change: 1 addition & 0 deletions outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
_ "github.com/influxdb/telegraf/outputs/datadog"
_ "github.com/influxdb/telegraf/outputs/influxdb"
_ "github.com/influxdb/telegraf/outputs/kafka"
_ "github.com/influxdb/telegraf/outputs/kinesis"
_ "github.com/influxdb/telegraf/outputs/librato"
_ "github.com/influxdb/telegraf/outputs/mqtt"
_ "github.com/influxdb/telegraf/outputs/nsq"
Expand Down
61 changes: 61 additions & 0 deletions outputs/kinesis/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
## Amazon Kinesis Output for Telegraf

This is an experimental plugin that is still in the early stages of development. It will batch up all of the Points
in one Put request to Kinesis. This should save the number of API requests by a considerable level.

## About Kinesis

This is not the place to document all of the various Kinesis terms however it
maybe useful for users to review Amazons official documentation which is available
[here](http://docs.aws.amazon.com/kinesis/latest/dev/key-concepts.html).

## Amazon Authentication

This plugin uses a credential chain for Authentication with the Kinesis API endpoint. In the following order the plugin
will attempt to authenticate.
1. [IAMS Role](http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/iam-roles-for-amazon-ec2.html)
2. [Environment Variables](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk)
3. [Shared Credentials](https://github.com/aws/aws-sdk-go/wiki/configuring-sdk)


## Config

For this output plugin to function correctly the following variables must be configured.

* region
* streamname
* partitionkey

### region

The region is the Amazon region that you wish to connect to. Examples include but are not limited to
* us-west-1
* us-west-2
* us-east-1
* ap-southeast-1
* ap-southeast-2

### streamname

The streamname is used by the plugin to ensure that data is sent to the correct Kinesis stream. It is important to
note that the stream *MUST* be pre-configured for this plugin to function correctly. If the stream does not exist the
plugin will result in telegraf exiting with an exit code of 1.

### partitionkey

This is used to group data within a stream. Currently this plugin only supports a single partitionkey.
Manually configuring different hosts, or groups of hosts with manually selected partitionkeys might be a workable
solution to scale out.

### format

The format configuration value has been designated to allow people to change the format of the Point as written to
Kinesis. Right now there are two supported formats string and custom.

#### string

String is defined using the default Point.String() value and translated to []byte for the Kinesis stream.

#### custom

Custom is a string defined by a number of values in the FormatMetric() function.
179 changes: 179 additions & 0 deletions outputs/kinesis/kinesis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package kinesis

import (
"errors"
"fmt"
"log"
"os"
"sync/atomic"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/credentials/ec2rolecreds"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"

"github.com/influxdb/influxdb/client/v2"
"github.com/influxdb/telegraf/outputs"
)

type KinesisOutput struct {
Region string `toml:"region"`
StreamName string `toml:"streamname"`
PartitionKey string `toml:"partitionkey"`
Format string `toml:"format"`
Debug bool `toml:"debug"`
svc *kinesis.Kinesis
}

var sampleConfig = `
# Amazon REGION of kinesis endpoint.
region = "ap-southeast-2"
# Kinesis StreamName must exist prior to starting telegraf.
streamname = "StreamName"
# PartitionKey as used for sharding data.
partitionkey = "PartitionKey"
# format of the Data payload in the kinesis PutRecord, supported
# String and Custom.
format = "string"
# debug will show upstream aws messages.
debug = false
`

func (k *KinesisOutput) SampleConfig() string {
return sampleConfig
}

func (k *KinesisOutput) Description() string {
return "Configuration for the AWS Kinesis output."
}

func checkstream(l []*string, s string) bool {
// Check if the StreamName exists in the slice returned from the ListStreams API request.
for _, stream := range l {
if *stream == s {
return true
}
}
return false
}

func (k *KinesisOutput) Connect() error {
// We attempt first to create a session to Kinesis using an IAMS role, if that fails it will fall through to using
// environment variables, and then Shared Credentials.
if k.Debug {
log.Printf("kinesis: Establishing a connection to Kinesis in %+v", k.Region)
}
Config := &aws.Config{
Region: aws.String(k.Region),
Credentials: credentials.NewChainCredentials(
[]credentials.Provider{
&ec2rolecreds.EC2RoleProvider{Client: ec2metadata.New(session.New())},
&credentials.EnvProvider{},
&credentials.SharedCredentialsProvider{},
}),
}
svc := kinesis.New(session.New(Config))

KinesisParams := &kinesis.ListStreamsInput{
Limit: aws.Int64(100),
}

resp, err := svc.ListStreams(KinesisParams)

if err != nil {
log.Printf("kinesis: Error in ListSteams API call : %+v \n", err)
}

if checkstream(resp.StreamNames, k.StreamName) {
if k.Debug {
log.Printf("kinesis: Stream Exists")
}
k.svc = svc
return nil
} else {
log.Printf("kinesis : You have configured a StreamName %+v which does not exist. exiting.", k.StreamName)
os.Exit(1)
}
return err
}

func (k *KinesisOutput) Close() error {
return errors.New("Error")
}

func FormatMetric(k *KinesisOutput, point *client.Point) (string, error) {
if k.Format == "string" {
return point.String(), nil
} else {
m := fmt.Sprintf("%+v,%+v,%+v",
point.Name(),
point.Tags(),
point.String())
return m, nil
}
}

func writekinesis(k *KinesisOutput, r []*kinesis.PutRecordsRequestEntry) time.Duration {
start := time.Now()
payload := &kinesis.PutRecordsInput{
Records: r,
StreamName: aws.String(k.StreamName),
}

if k.Debug {
resp, err := k.svc.PutRecords(payload)
if err != nil {
log.Printf("kinesis: Unable to write to Kinesis : %+v \n", err.Error())
}
log.Printf("%+v \n", resp)

} else {
_, err := k.svc.PutRecords(payload)
if err != nil {
log.Printf("kinesis: Unable to write to Kinesis : %+v \n", err.Error())
}
}
return time.Since(start)
}

func (k *KinesisOutput) Write(points []*client.Point) error {
var sz uint32 = 0

if len(points) == 0 {
return nil
}

r := []*kinesis.PutRecordsRequestEntry{}

for _, p := range points {
atomic.AddUint32(&sz, 1)

metric, _ := FormatMetric(k, p)
d := kinesis.PutRecordsRequestEntry{
Data: []byte(metric),
PartitionKey: aws.String(k.PartitionKey),
}
r = append(r, &d)

if sz == 500 {
// Max Messages Per PutRecordRequest is 500
elapsed := writekinesis(k, r)
log.Printf("Wrote a %+v point batch to Kinesis in %+v.\n", sz, elapsed)
atomic.StoreUint32(&sz, 0)
r = nil
}
}

writekinesis(k, r)

return nil
}

func init() {
outputs.Add("kinesis", func() outputs.Output {
return &KinesisOutput{}
})
}
40 changes: 40 additions & 0 deletions outputs/kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package kinesis

import (
"testing"
"github.com/influxdb/telegraf/testutil"
"github.com/stretchr/testify/require"
)


func TestFormatMetric(t *testing.T) {
if testing.Short() {
t.Skip("Skipping integration test in short mode")
}

k := &KinesisOutput{
Format: "string",
}

p := testutil.MockBatchPoints().Points()[0]

valid_string := "test1,tag1=value1 value=1 1257894000000000000"
func_string, err := FormatMetric(k, p)

if func_string != valid_string {
t.Error("Expected ", valid_string)
}
require.NoError(t, err)

k = &KinesisOutput{
Format: "custom",
}

valid_custom := "test1,map[tag1:value1],test1,tag1=value1 value=1 1257894000000000000"
func_custom, err := FormatMetric(k, p)

if func_custom != valid_custom {
t.Error("Expected ", valid_custom)
}
require.NoError(t, err)
}