diff --git a/README.md b/README.md index 33679d1..1c31748 100644 --- a/README.md +++ b/README.md @@ -31,7 +31,7 @@ Run `make` to build `./bin/firehose.so`. Then use with Fluent Bit: * `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis. * `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`. You can also use `%L` for milliseconds and `%f` for microseconds. If you are using ECS FireLens, make sure you are running Amazon ECS Container Agent v1.42.0 or later, otherwise the timestamps associated with your container logs will only have second precision. * `replace_dots`: Replace dot characters in key names with the value of this option. For example, if you add `replace_dots _` in your config then all occurrences of `.` will be replaced with an underscore. By default, dots will not be replaced. - +* `simple_aggregation`: Option to allow plugin send multiple log events in the same record if current record not exceed the maximumRecordSize (1 MiB). It joins together as many log records as possible into a single Firehose record and delimits them with newline. It's good to enable if your destination supports aggregation like S3. Default to be `false`, set to `true` to enable this option. ### Permissions The plugin requires `firehose:PutRecordBatch` permissions. diff --git a/firehose/firehose.go b/firehose/firehose.go index 21aada5..6bd097d 100644 --- a/firehose/firehose.go +++ b/firehose/firehose.go @@ -54,22 +54,23 @@ type PutRecordBatcher interface { // OutputPlugin sends log records to firehose type OutputPlugin struct { - region string - deliveryStream string - dataKeys string - timeKey string - fmtStrftime *strftime.Strftime - logKey string - client PutRecordBatcher - records []*firehose.Record - dataLength int - timer *plugins.Timeout - PluginID int - replaceDots string + region string + deliveryStream string + dataKeys string + timeKey string + fmtStrftime *strftime.Strftime + logKey string + client PutRecordBatcher + records []*firehose.Record + dataLength int + timer *plugins.Timeout + PluginID int + replaceDots string + simpleAggregation bool } // NewOutputPlugin creates an OutputPlugin object -func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeFmt, logKey, replaceDots string, pluginID int) (*OutputPlugin, error) { +func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeFmt, logKey, replaceDots string, pluginID int, simpleAggregation bool) (*OutputPlugin, error) { client, err := newPutRecordBatcher(roleARN, region, firehoseEndpoint, stsEndpoint, pluginID) if err != nil { return nil, err @@ -100,17 +101,18 @@ func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint } return &OutputPlugin{ - region: region, - deliveryStream: deliveryStream, - client: client, - records: records, - dataKeys: dataKeys, - timer: timer, - timeKey: timeKey, - fmtStrftime: timeFormatter, - logKey: logKey, - PluginID: pluginID, - replaceDots: replaceDots, + region: region, + deliveryStream: deliveryStream, + client: client, + records: records, + dataKeys: dataKeys, + timer: timer, + timeKey: timeKey, + fmtStrftime: timeFormatter, + logKey: logKey, + PluginID: pluginID, + replaceDots: replaceDots, + simpleAggregation: simpleAggregation, }, nil } @@ -207,9 +209,13 @@ func (output *OutputPlugin) AddRecord(record map[interface{}]interface{}, timeSt } } - output.records = append(output.records, &firehose.Record{ - Data: data, - }) + if output.simpleAggregation && len(output.records) > 0 && len(output.records[len(output.records)-1].Data) + newDataSize <= maximumRecordSize { + output.records[len(output.records)-1].Data = append(output.records[len(output.records)-1].Data, data...) + } else { + output.records = append(output.records, &firehose.Record{ + Data: data, + }) + } output.dataLength += newDataSize return fluentbit.FLB_OK } diff --git a/firehose/firehose_test.go b/firehose/firehose_test.go index befc9d5..36681f7 100644 --- a/firehose/firehose_test.go +++ b/firehose/firehose_test.go @@ -55,6 +55,36 @@ func TestAddRecord(t *testing.T) { assert.Len(t, output.records, 1, "Expected output to contain 1 record") } +func TestAddRecordWithSimpleAggregationEnable(t *testing.T) { + timer, _ := plugins.NewTimeout(func(d time.Duration) { + logrus.Errorf("[firehose] timeout threshold reached: Failed to send logs for %v\n", d) + logrus.Error("[firehose] Quitting Fluent Bit") + os.Exit(1) + }) + output := OutputPlugin{ + region: "us-east-1", + deliveryStream: "stream", + dataKeys: "", + client: nil, + records: make([]*firehose.Record, 0, 500), + timer: timer, + simpleAggregation: true, + } + + record := map[interface{}]interface{}{ + "somekey": []byte("some value"), + } + + timeStamp1 := time.Now() + retCode1 := output.AddRecord(record, &timeStamp1) + timeStamp2 := time.Now() + retCode2 := output.AddRecord(record, &timeStamp2) + + assert.Equal(t, retCode1, fluentbit.FLB_OK, "Expected return code to be FLB_OK") + assert.Equal(t, retCode2, fluentbit.FLB_OK, "Expected return code to be FLB_OK") + assert.Len(t, output.records, 1, "Expected output to contain 1 record") +} + func TestTruncateLargeLogEvent(t *testing.T) { timer, _ := plugins.NewTimeout(func(d time.Duration) { logrus.Errorf("[firehose] timeout threshold reached: Failed to send logs for %v\n", d) diff --git a/fluent-bit-firehose.go b/fluent-bit-firehose.go index cc8857c..eb153e2 100644 --- a/fluent-bit-firehose.go +++ b/fluent-bit-firehose.go @@ -16,7 +16,6 @@ package main import ( "C" "unsafe" - "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/firehose" "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" "github.com/fluent/fluent-bit-go/output" @@ -78,12 +77,15 @@ func newFirehoseOutput(ctx unsafe.Pointer, pluginID int) (*firehose.OutputPlugin logrus.Infof("[firehose %d] plugin parameter log_key = '%s'", pluginID, logKey) replaceDots := output.FLBPluginConfigKey(ctx, "replace_dots") logrus.Infof("[firehose %d] plugin parameter replace_dots = '%s'", pluginID, replaceDots) - + simpleAggregation := plugins.GetBoolParam(output.FLBPluginConfigKey(ctx, "simple_aggregation"), false) + logrus.Infof("[firehose %d] plugin parameter simple_aggregation = '%v'", + pluginID, simpleAggregation) if deliveryStream == "" || region == "" { return nil, fmt.Errorf("[firehose %d] delivery_stream and region are required configuration parameters", pluginID) } - return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, replaceDots, pluginID) + return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, + timeKeyFmt, logKey, replaceDots, pluginID, simpleAggregation) } //export FLBPluginInit diff --git a/plugins/plugins.go b/plugins/plugins.go index e83b80e..f8f4eac 100644 --- a/plugins/plugins.go +++ b/plugins/plugins.go @@ -231,7 +231,8 @@ func contains(s []string, e string) bool { return false } -func getBoolParam(param string, defaultVal bool) bool { +// GetBoolParam is used for boolean config setup +func GetBoolParam(param string, defaultVal bool) bool { val := strings.ToLower(param) if val == "true" { return true diff --git a/plugins/plugins_test.go b/plugins/plugins_test.go index cd3a390..a4b93ce 100644 --- a/plugins/plugins_test.go +++ b/plugins/plugins_test.go @@ -68,10 +68,10 @@ func TestDataKeys(t *testing.T) { } func TestGetBoolParam(t *testing.T) { - value1 := getBoolParam("true", false) + value1 := GetBoolParam("true", false) assert.Equal(t, value1, true, "Expected option value is true") - value2 := getBoolParam("false", false) + value2 := GetBoolParam("false", false) assert.Equal(t, value2, false, "Expected option value is false") - value3 := getBoolParam("fakeString", false) + value3 := GetBoolParam("fakeString", false) assert.Equal(t, value3, false, "Expected option value is false") }