Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add time_key and time_key_format options to add timestamp to records #20

Merged
merged 2 commits into from
Mar 3, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ Run `make` to build `./bin/firehose.so`. Then use with Fluent Bit:
* `data_keys`: By default, the whole log record will be sent to Kinesis. If you specify a key name(s) with this option, then only those keys and values will be sent to Kinesis. For example, if you are using the Fluentd Docker log driver, you can specify `data_keys log` and only the log message will be sent to Kinesis. If you specify multiple keys, they should be comma delimited.
* `role_arn`: ARN of an IAM role to assume (for cross account access).
* `endpoint`: Specify a custom endpoint for the Kinesis Firehose API.
* `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis.
* `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`.

### Permissions

Expand Down
38 changes: 35 additions & 3 deletions firehose/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package firehose

import (
"bytes"
"fmt"
"os"
"time"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/aws/aws-sdk-go/service/firehose"
fluentbit "github.com/fluent/fluent-bit-go/output"
jsoniter "github.com/json-iterator/go"
"github.com/lestrrat-go/strftime"
"github.com/sirupsen/logrus"
)

Expand All @@ -38,6 +40,11 @@ const (
maximumRecordSize = 1024000 // 1000 KiB
)

const (
// We use strftime format specifiers because this will one day be re-written in C
defaultTimeFmt = "%Y-%m-%dT%H:%M:%S"
)

// PutRecordBatcher contains the firehose PutRecordBatch method call
type PutRecordBatcher interface {
PutRecordBatch(input *firehose.PutRecordBatchInput) (*firehose.PutRecordBatchOutput, error)
Expand All @@ -48,6 +55,8 @@ type OutputPlugin struct {
region string
deliveryStream string
dataKeys string
timeKey string
fmtStrftime *strftime.Strftime
client PutRecordBatcher
records []*firehose.Record
dataLength int
Expand All @@ -56,8 +65,8 @@ type OutputPlugin struct {
PluginID int
}

// NewOutputPlugin creates a OutputPlugin object
func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, endpoint string, pluginID int) (*OutputPlugin, error) {
// NewOutputPlugin creates an OutputPlugin object
func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, endpoint, timeKey, timeFmt string, pluginID int) (*OutputPlugin, error) {
sess, err := session.NewSession(&aws.Config{
Region: aws.String(region),
})
Expand All @@ -79,6 +88,18 @@ func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, endpoint string,
return nil, err
}

var timeFormatter *strftime.Strftime
if timeKey != "" {
if timeFmt == "" {
timeFmt = defaultTimeFmt
}
timeFormatter, err = strftime.New(timeFmt)
if err != nil {
logrus.Errorf("[firehose %d] Issue with strftime format in 'time_key_format'", pluginID)
return nil, err
}
}

return &OutputPlugin{
region: region,
deliveryStream: deliveryStream,
Expand All @@ -87,6 +108,8 @@ func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, endpoint string,
dataKeys: dataKeys,
backoff: plugins.NewBackoff(),
timer: timer,
timeKey: timeKey,
fmtStrftime: timeFormatter,
PluginID: pluginID,
}, nil
}
Expand Down Expand Up @@ -119,7 +142,16 @@ func newPutRecordBatcher(roleARN string, sess *session.Session, endpoint string)
// AddRecord accepts a record and adds it to the buffer, flushing the buffer if it is full
// the return value is one of: FLB_OK FLB_RETRY
// API Errors lead to an FLB_RETRY, and all other errors are logged, the record is discarded and FLB_OK is returned
func (output *OutputPlugin) AddRecord(record map[interface{}]interface{}) int {
func (output *OutputPlugin) AddRecord(record map[interface{}]interface{}, timeStamp *time.Time) int {
if output.timeKey != "" {
buf := new(bytes.Buffer)
err := output.fmtStrftime.Format(buf, *timeStamp)
if err != nil {
logrus.Errorf("[firehose %d] Could not create timestamp %v\n", output.PluginID, err)
return fluentbit.FLB_ERROR
}
record[output.timeKey] = buf.String()
}
data, err := output.processRecord(record)
if err != nil {
logrus.Errorf("[firehose %d] %v\n", output.PluginID, err)
Expand Down
6 changes: 4 additions & 2 deletions firehose/firehose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func TestAddRecord(t *testing.T) {
"somekey": []byte("some value"),
}

retCode := output.AddRecord(record)
timeStamp := time.Now()
retCode := output.AddRecord(record, &timeStamp)

assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK")
assert.Len(t, output.records, 1, "Expected output to contain 1 record")
Expand Down Expand Up @@ -82,7 +83,8 @@ func TestAddRecordAndFlush(t *testing.T) {
timer: timer,
}

retCode := output.AddRecord(record)
timeStamp := time.Now()
retCode := output.AddRecord(record, &timeStamp)
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK")

err := output.Flush()
Expand Down
24 changes: 21 additions & 3 deletions fluent-bit-firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
)
import (
"fmt"
"time"
)

var (
Expand Down Expand Up @@ -67,12 +68,16 @@ func newFirehoseOutput(ctx unsafe.Pointer, pluginID int) (*firehose.OutputPlugin
logrus.Infof("[firehose %d] plugin parameter role_arn = '%s'\n", pluginID, roleARN)
endpoint := output.FLBPluginConfigKey(ctx, "endpoint")
logrus.Infof("[firehose %d] plugin parameter endpoint = '%s'\n", pluginID, endpoint)
timeKey := output.FLBPluginConfigKey(ctx, "time_key")
logrus.Infof("[firehose %d] plugin parameter time_key = '%s'\n", pluginID, timeKey)
timeKeyFmt := output.FLBPluginConfigKey(ctx, "time_key_format")
logrus.Infof("[firehose %d] plugin parameter time_key_format = '%s'\n", pluginID, timeKeyFmt)

if deliveryStream == "" || region == "" {
return nil, fmt.Errorf("[firehose %d] delivery_stream and region are required configuration parameters", pluginID)
}

return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, endpoint, pluginID)
return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, endpoint, timeKey, timeKeyFmt, pluginID)
}

//export FLBPluginInit
Expand All @@ -91,6 +96,8 @@ func FLBPluginInit(ctx unsafe.Pointer) int {
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
var count int
var ret int
var ts interface{}
var timestamp time.Time
var record map[interface{}]interface{}

// Create Fluent Bit decoder
Expand All @@ -102,12 +109,23 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int

for {
// Extract Record
ret, _, record = output.GetRecord(dec)
ret, ts, record = output.GetRecord(dec)
if ret != 0 {
break
}

retCode := firehoseOutput.AddRecord(record)
switch tts := ts.(type) {
case output.FLBTime:
timestamp = tts.Time
case uint64:
// when ts is of type uint64 it appears to
// be the amount of seconds since unix epoch.
timestamp = time.Unix(int64(tts), 0)
default:
timestamp = time.Now()
}

retCode := firehoseOutput.AddRecord(record, &timestamp)
if retCode != output.FLB_OK {
return retCode
}
Expand Down
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ require (
github.com/fluent/fluent-bit-go v0.0.0-20190614024040-c017a8579953
github.com/golang/mock v1.3.1
github.com/json-iterator/go v1.1.6
github.com/lestrrat-go/strftime v1.0.1
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/sirupsen/logrus v1.4.2
github.com/stretchr/testify v1.2.2
github.com/stretchr/testify v1.3.0
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ github.com/aws/aws-sdk-go v1.25.43 h1:R5YqHQFIulYVfgRySz9hvBRTWBjudISa+r0C8XQ1uf
github.com/aws/aws-sdk-go v1.25.43/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/cenkalti/backoff v2.1.1+incompatible h1:tKJnvO2kl0zmb/jA5UKAt4VoEVw1qxKWjE/Bpp46npY=
github.com/cenkalti/backoff v2.1.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fluent/fluent-bit-go v0.0.0-20190614024040-c017a8579953 h1:dDGtm4HU/xEd2vkhzkJimQ0tPoQ3AKo7cr6vnx+qg5c=
Expand All @@ -16,17 +17,26 @@ github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwK
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8=
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is=
github.com/lestrrat-go/strftime v1.0.1 h1:o7qz5pmLzPDLyGW4lG6JvTKPUfTFXwe+vOamIYWtnVU=
github.com/lestrrat-go/strftime v1.0.1/go.mod h1:E1nN3pCbtMSu1yjSVeyuRFVm/U0xoR76fd03sz+Qz4g=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/ugorji/go v1.1.4 h1:j4s+tAvLfL3bZyefP2SEWmhBzmuIlH/eqNuPdFPgngw=
github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down