Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add time_key and time_key_format config options #17

Merged
merged 1 commit into from
Mar 3, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -15,6 +15,8 @@ If you think you’ve found a potential security issue, please do not post it in
* `role_arn`: ARN of an IAM role to assume (for cross account access).
* `endpoint`: Specify a custom endpoint for the Kinesis Streams API.
* `append_newline`: If you set append_newline as true, a newline will be addded after each log record.
* `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis.
* `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`.

### Permissions

24 changes: 21 additions & 3 deletions fluent-bit-kinesis.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ import (
"C"
"fmt"
"strings"
"time"
"unsafe"

"github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins"
@@ -61,6 +62,10 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
logrus.Infof("[kinesis %d] plugin parameter endpoint = '%s'", pluginID, endpoint)
appendNewline := output.FLBPluginConfigKey(ctx, "append_newline")
logrus.Infof("[kinesis %d] plugin parameter append_newline = %s", pluginID, appendNewline)
timeKey := output.FLBPluginConfigKey(ctx, "time_key")
logrus.Infof("[firehose %d] plugin parameter time_key = '%s'\n", pluginID, timeKey)
timeKeyFmt := output.FLBPluginConfigKey(ctx, "time_key_format")
logrus.Infof("[firehose %d] plugin parameter time_key_format = '%s'\n", pluginID, timeKeyFmt)

if stream == "" || region == "" {
return nil, fmt.Errorf("[kinesis %d] stream and region are required configuration parameters", pluginID)
@@ -78,7 +83,7 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
if strings.ToLower(appendNewline) == "true" {
appendNL = true
}
return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, appendNL, pluginID)
return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeKeyFmt, appendNL, pluginID)
}

// The "export" comments have syntactic meaning
@@ -104,6 +109,8 @@ func FLBPluginInit(ctx unsafe.Pointer) int {
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
var count int
var ret int
var ts interface{}
var timestamp time.Time
var record map[interface{}]interface{}

// Create Fluent Bit decoder
@@ -115,12 +122,23 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int

for {
//Extract Record
ret, _, record = output.GetRecord(dec)
ret, ts, record = output.GetRecord(dec)
if ret != 0 {
break
}

retCode := kinesisOutput.AddRecord(record)
switch tts := ts.(type) {
case output.FLBTime:
timestamp = tts.Time
case uint64:
// when ts is of type uint64 it appears to
// be the amount of seconds since unix epoch.
timestamp = time.Unix(int64(tts), 0)
default:
timestamp = time.Now()
}

retCode := kinesisOutput.AddRecord(record, &timestamp)
if retCode != output.FLB_OK {
return retCode
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ require (
github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c
github.com/golang/mock v1.3.1
github.com/json-iterator/go v1.1.7
github.com/lestrrat-go/strftime v1.0.1
github.com/sirupsen/logrus v1.4.2
github.com/stretchr/testify v1.3.0
)
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -21,12 +21,18 @@ github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62F
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8=
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is=
github.com/lestrrat-go/strftime v1.0.1 h1:o7qz5pmLzPDLyGW4lG6JvTKPUfTFXwe+vOamIYWtnVU=
github.com/lestrrat-go/strftime v1.0.1/go.mod h1:E1nN3pCbtMSu1yjSVeyuRFVm/U0xoR76fd03sz+Qz4g=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
37 changes: 35 additions & 2 deletions kinesis/kinesis.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@
package kinesis

import (
"bytes"
"fmt"
"math/rand"
"os"
@@ -31,6 +32,7 @@ import (
"github.com/aws/aws-sdk-go/service/kinesis"
fluentbit "github.com/fluent/fluent-bit-go/output"
jsoniter "github.com/json-iterator/go"
"github.com/lestrrat-go/strftime"
"github.com/sirupsen/logrus"
)

@@ -47,6 +49,11 @@ const (
partitionKeyMaxLength = 256
)

const (
// We use strftime format specifiers because this will one day be re-written in C
defaultTimeFmt = "%Y-%m-%dT%H:%M:%S"
)

// PutRecordsClient contains the kinesis PutRecords method call
type PutRecordsClient interface {
PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error)
@@ -69,6 +76,8 @@ type OutputPlugin struct {
partitionKey string
// Decides whether to append a newline after each data record
appendNewline bool
timeKey string
fmtStrftime *strftime.Strftime
lastInvalidPartitionKeyIndex int
client PutRecordsClient
records []*kinesis.PutRecordsRequestEntry
@@ -80,7 +89,7 @@ type OutputPlugin struct {
}

// NewOutputPlugin creates an OutputPlugin object
func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint string, appendNewline bool, pluginID int) (*OutputPlugin, error) {
func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeFmt string, appendNewline bool, pluginID int) (*OutputPlugin, error) {
client, err := newPutRecordsClient(roleARN, region, endpoint)
if err != nil {
return nil, err
@@ -103,13 +112,27 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint s
buffer: make([]byte, 8),
}

var timeFormatter *strftime.Strftime
if timeKey != "" {
if timeFmt == "" {
timeFmt = defaultTimeFmt
}
timeFormatter, err = strftime.New(timeFmt)
if err != nil {
logrus.Errorf("[kinesis %d] Issue with strftime format in 'time_key_format'", pluginID)
return nil, err
}
}

return &OutputPlugin{
stream: stream,
client: client,
records: records,
dataKeys: dataKeys,
partitionKey: partitionKey,
appendNewline: appendNewline,
timeKey: timeKey,
fmtStrftime: timeFormatter,
lastInvalidPartitionKeyIndex: -1,
backoff: plugins.NewBackoff(),
timer: timer,
@@ -154,7 +177,17 @@ func newPutRecordsClient(roleARN string, awsRegion string, endpoint string) (*ki
// AddRecord accepts a record and adds it to the buffer, flushing the buffer if it is full
// the return value is one of: FLB_OK FLB_RETRY
// API Errors lead to an FLB_RETRY, and data processing errors are logged, the record is discarded and FLB_OK is returned
func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}) int {
func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, timeStamp *time.Time) int {
if outputPlugin.timeKey != "" {
buf := new(bytes.Buffer)
err := outputPlugin.fmtStrftime.Format(buf, *timeStamp)
if err != nil {
logrus.Errorf("[kinesis %d] Could not create timestamp %v\n", outputPlugin.PluginID, err)
return fluentbit.FLB_ERROR
}
record[outputPlugin.timeKey] = buf.String()
}

data, err := outputPlugin.processRecord(record)
if err != nil {
logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err)
6 changes: 4 additions & 2 deletions kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
@@ -74,7 +74,8 @@ func TestAddRecord(t *testing.T) {

outputPlugin, _ := newMockOutputPlugin(nil)

retCode := outputPlugin.AddRecord(record)
timeStamp := time.Now()
retCode := outputPlugin.AddRecord(record, &timeStamp)
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK")
assert.Len(t, outputPlugin.records, 1, "Expected output to contain 1 record")
}
@@ -93,7 +94,8 @@ func TestAddRecordAndFlush(t *testing.T) {

outputPlugin, _ := newMockOutputPlugin(mockKinesis)

retCode := outputPlugin.AddRecord(record)
timeStamp := time.Now()
retCode := outputPlugin.AddRecord(record, &timeStamp)
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK")

err := outputPlugin.Flush()