From 2ee60176156b76600ffe9eee9ccf15c5befd54ee Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Mon, 2 Mar 2020 18:37:45 -0800 Subject: [PATCH] Add time_key and time_key_format config options to add timestamp to records --- README.md | 2 ++ fluent-bit-kinesis.go | 24 +++++++++++++++++++++--- go.mod | 1 + go.sum | 6 ++++++ kinesis/kinesis.go | 37 +++++++++++++++++++++++++++++++++++-- kinesis/kinesis_test.go | 6 ++++-- 6 files changed, 69 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index c9bb42d..69d94d5 100644 --- a/README.md +++ b/README.md @@ -15,6 +15,8 @@ If you think you’ve found a potential security issue, please do not post it in * `role_arn`: ARN of an IAM role to assume (for cross account access). * `endpoint`: Specify a custom endpoint for the Kinesis Streams API. * `append_newline`: If you set append_newline as true, a newline will be addded after each log record. +* `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis. +* `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`. ### Permissions diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 62b3c3c..5d2fba6 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -17,6 +17,7 @@ import ( "C" "fmt" "strings" + "time" "unsafe" "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" @@ -61,6 +62,10 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, logrus.Infof("[kinesis %d] plugin parameter endpoint = '%s'", pluginID, endpoint) appendNewline := output.FLBPluginConfigKey(ctx, "append_newline") logrus.Infof("[kinesis %d] plugin parameter append_newline = %s", pluginID, appendNewline) + timeKey := output.FLBPluginConfigKey(ctx, "time_key") + logrus.Infof("[firehose %d] plugin parameter time_key = '%s'\n", pluginID, timeKey) + timeKeyFmt := output.FLBPluginConfigKey(ctx, "time_key_format") + logrus.Infof("[firehose %d] plugin parameter time_key_format = '%s'\n", pluginID, timeKeyFmt) if stream == "" || region == "" { return nil, fmt.Errorf("[kinesis %d] stream and region are required configuration parameters", pluginID) @@ -78,7 +83,7 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, if strings.ToLower(appendNewline) == "true" { appendNL = true } - return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, appendNL, pluginID) + return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeKeyFmt, appendNL, pluginID) } // The "export" comments have syntactic meaning @@ -104,6 +109,8 @@ func FLBPluginInit(ctx unsafe.Pointer) int { func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { var count int var ret int + var ts interface{} + var timestamp time.Time var record map[interface{}]interface{} // Create Fluent Bit decoder @@ -115,12 +122,23 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int for { //Extract Record - ret, _, record = output.GetRecord(dec) + ret, ts, record = output.GetRecord(dec) if ret != 0 { break } - retCode := kinesisOutput.AddRecord(record) + switch tts := ts.(type) { + case output.FLBTime: + timestamp = tts.Time + case uint64: + // when ts is of type uint64 it appears to + // be the amount of seconds since unix epoch. + timestamp = time.Unix(int64(tts), 0) + default: + timestamp = time.Now() + } + + retCode := kinesisOutput.AddRecord(record, ×tamp) if retCode != output.FLB_OK { return retCode } diff --git a/go.mod b/go.mod index 624c321..bf55aa9 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/fluent/fluent-bit-go v0.0.0-20190925192703-ea13c021720c github.com/golang/mock v1.3.1 github.com/json-iterator/go v1.1.7 + github.com/lestrrat-go/strftime v1.0.1 github.com/sirupsen/logrus v1.4.2 github.com/stretchr/testify v1.3.0 ) diff --git a/go.sum b/go.sum index 017757a..03fc7e7 100644 --- a/go.sum +++ b/go.sum @@ -21,12 +21,18 @@ github.com/json-iterator/go v1.1.7 h1:KfgG9LzI+pYjr4xvmz/5H4FXjokeP+rlHLhv3iH62F github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8= +github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is= +github.com/lestrrat-go/strftime v1.0.1 h1:o7qz5pmLzPDLyGW4lG6JvTKPUfTFXwe+vOamIYWtnVU= +github.com/lestrrat-go/strftime v1.0.1/go.mod h1:E1nN3pCbtMSu1yjSVeyuRFVm/U0xoR76fd03sz+Qz4g= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 221d6a0..d4f7211 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -17,6 +17,7 @@ package kinesis import ( + "bytes" "fmt" "math/rand" "os" @@ -31,6 +32,7 @@ import ( "github.com/aws/aws-sdk-go/service/kinesis" fluentbit "github.com/fluent/fluent-bit-go/output" jsoniter "github.com/json-iterator/go" + "github.com/lestrrat-go/strftime" "github.com/sirupsen/logrus" ) @@ -47,6 +49,11 @@ const ( partitionKeyMaxLength = 256 ) +const ( + // We use strftime format specifiers because this will one day be re-written in C + defaultTimeFmt = "%Y-%m-%dT%H:%M:%S" +) + // PutRecordsClient contains the kinesis PutRecords method call type PutRecordsClient interface { PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) @@ -69,6 +76,8 @@ type OutputPlugin struct { partitionKey string // Decides whether to append a newline after each data record appendNewline bool + timeKey string + fmtStrftime *strftime.Strftime lastInvalidPartitionKeyIndex int client PutRecordsClient records []*kinesis.PutRecordsRequestEntry @@ -80,7 +89,7 @@ type OutputPlugin struct { } // NewOutputPlugin creates an OutputPlugin object -func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint string, appendNewline bool, pluginID int) (*OutputPlugin, error) { +func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeFmt string, appendNewline bool, pluginID int) (*OutputPlugin, error) { client, err := newPutRecordsClient(roleARN, region, endpoint) if err != nil { return nil, err @@ -103,6 +112,18 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint s buffer: make([]byte, 8), } + var timeFormatter *strftime.Strftime + if timeKey != "" { + if timeFmt == "" { + timeFmt = defaultTimeFmt + } + timeFormatter, err = strftime.New(timeFmt) + if err != nil { + logrus.Errorf("[kinesis %d] Issue with strftime format in 'time_key_format'", pluginID) + return nil, err + } + } + return &OutputPlugin{ stream: stream, client: client, @@ -110,6 +131,8 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint s dataKeys: dataKeys, partitionKey: partitionKey, appendNewline: appendNewline, + timeKey: timeKey, + fmtStrftime: timeFormatter, lastInvalidPartitionKeyIndex: -1, backoff: plugins.NewBackoff(), timer: timer, @@ -154,7 +177,17 @@ func newPutRecordsClient(roleARN string, awsRegion string, endpoint string) (*ki // AddRecord accepts a record and adds it to the buffer, flushing the buffer if it is full // the return value is one of: FLB_OK FLB_RETRY // API Errors lead to an FLB_RETRY, and data processing errors are logged, the record is discarded and FLB_OK is returned -func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}) int { +func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, timeStamp *time.Time) int { + if outputPlugin.timeKey != "" { + buf := new(bytes.Buffer) + err := outputPlugin.fmtStrftime.Format(buf, *timeStamp) + if err != nil { + logrus.Errorf("[kinesis %d] Could not create timestamp %v\n", outputPlugin.PluginID, err) + return fluentbit.FLB_ERROR + } + record[outputPlugin.timeKey] = buf.String() + } + data, err := outputPlugin.processRecord(record) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) diff --git a/kinesis/kinesis_test.go b/kinesis/kinesis_test.go index 682bc34..9769c28 100644 --- a/kinesis/kinesis_test.go +++ b/kinesis/kinesis_test.go @@ -74,7 +74,8 @@ func TestAddRecord(t *testing.T) { outputPlugin, _ := newMockOutputPlugin(nil) - retCode := outputPlugin.AddRecord(record) + timeStamp := time.Now() + retCode := outputPlugin.AddRecord(record, &timeStamp) assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") assert.Len(t, outputPlugin.records, 1, "Expected output to contain 1 record") } @@ -93,7 +94,8 @@ func TestAddRecordAndFlush(t *testing.T) { outputPlugin, _ := newMockOutputPlugin(mockKinesis) - retCode := outputPlugin.AddRecord(record) + timeStamp := time.Now() + retCode := outputPlugin.AddRecord(record, &timeStamp) assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") err := outputPlugin.Flush()