Skip to content

Commit

Permalink
Add an option to send multiple log events as a record
Browse files Browse the repository at this point in the history
Signed-off-by: Drew Zhang <[email protected]>
  • Loading branch information
DrewZhang13 committed Mar 5, 2021
1 parent b13662c commit 7622b07
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 35 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Run `make` to build `./bin/firehose.so`. Then use with Fluent Bit:
* `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis.
* `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`. You can also use `%L` for milliseconds and `%f` for microseconds. If you are using ECS FireLens, make sure you are running Amazon ECS Container Agent v1.42.0 or later, otherwise the timestamps associated with your container logs will only have second precision.
* `replace_dots`: Replace dot characters in key names with the value of this option. For example, if you add `replace_dots _` in your config then all occurrences of `.` will be replaced with an underscore. By default, dots will not be replaced.

* `simple_aggregation`: Option to allow plugin send multiple log events in the same record if current record not exceed the maximumRecordSize (1 MiB). It joins together as many log records as possible into a single Firehose record and delimits them with newline. It's good to enable if your destination supports aggregation like S3. Default to be `false`, set to `true` to enable this option.
### Permissions

The plugin requires `firehose:PutRecordBatch` permissions.
Expand Down
60 changes: 33 additions & 27 deletions firehose/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,22 +54,23 @@ type PutRecordBatcher interface {

// OutputPlugin sends log records to firehose
type OutputPlugin struct {
region string
deliveryStream string
dataKeys string
timeKey string
fmtStrftime *strftime.Strftime
logKey string
client PutRecordBatcher
records []*firehose.Record
dataLength int
timer *plugins.Timeout
PluginID int
replaceDots string
region string
deliveryStream string
dataKeys string
timeKey string
fmtStrftime *strftime.Strftime
logKey string
client PutRecordBatcher
records []*firehose.Record
dataLength int
timer *plugins.Timeout
PluginID int
replaceDots string
simpleAggregation bool
}

// NewOutputPlugin creates an OutputPlugin object
func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeFmt, logKey, replaceDots string, pluginID int) (*OutputPlugin, error) {
func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeFmt, logKey, replaceDots string, pluginID int, simpleAggregation bool) (*OutputPlugin, error) {
client, err := newPutRecordBatcher(roleARN, region, firehoseEndpoint, stsEndpoint, pluginID)
if err != nil {
return nil, err
Expand Down Expand Up @@ -100,17 +101,18 @@ func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint
}

return &OutputPlugin{
region: region,
deliveryStream: deliveryStream,
client: client,
records: records,
dataKeys: dataKeys,
timer: timer,
timeKey: timeKey,
fmtStrftime: timeFormatter,
logKey: logKey,
PluginID: pluginID,
replaceDots: replaceDots,
region: region,
deliveryStream: deliveryStream,
client: client,
records: records,
dataKeys: dataKeys,
timer: timer,
timeKey: timeKey,
fmtStrftime: timeFormatter,
logKey: logKey,
PluginID: pluginID,
replaceDots: replaceDots,
simpleAggregation: simpleAggregation,
}, nil
}

Expand Down Expand Up @@ -207,9 +209,13 @@ func (output *OutputPlugin) AddRecord(record map[interface{}]interface{}, timeSt
}
}

output.records = append(output.records, &firehose.Record{
Data: data,
})
if output.simpleAggregation && len(output.records) > 0 && len(output.records[len(output.records)-1].Data) + newDataSize <= maximumRecordSize {
output.records[len(output.records)-1].Data = append(output.records[len(output.records)-1].Data, data...)
} else {
output.records = append(output.records, &firehose.Record{
Data: data,
})
}
output.dataLength += newDataSize
return fluentbit.FLB_OK
}
Expand Down
30 changes: 30 additions & 0 deletions firehose/firehose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,36 @@ func TestAddRecord(t *testing.T) {
assert.Len(t, output.records, 1, "Expected output to contain 1 record")
}

func TestAddRecordWithSimpleAggregationEnable(t *testing.T) {
timer, _ := plugins.NewTimeout(func(d time.Duration) {
logrus.Errorf("[firehose] timeout threshold reached: Failed to send logs for %v\n", d)
logrus.Error("[firehose] Quitting Fluent Bit")
os.Exit(1)
})
output := OutputPlugin{
region: "us-east-1",
deliveryStream: "stream",
dataKeys: "",
client: nil,
records: make([]*firehose.Record, 0, 500),
timer: timer,
simpleAggregation: true,
}

record := map[interface{}]interface{}{
"somekey": []byte("some value"),
}

timeStamp1 := time.Now()
retCode1 := output.AddRecord(record, &timeStamp1)
timeStamp2 := time.Now()
retCode2 := output.AddRecord(record, &timeStamp2)

assert.Equal(t, retCode1, fluentbit.FLB_OK, "Expected return code to be FLB_OK")
assert.Equal(t, retCode2, fluentbit.FLB_OK, "Expected return code to be FLB_OK")
assert.Len(t, output.records, 1, "Expected output to contain 1 record")
}

func TestTruncateLargeLogEvent(t *testing.T) {
timer, _ := plugins.NewTimeout(func(d time.Duration) {
logrus.Errorf("[firehose] timeout threshold reached: Failed to send logs for %v\n", d)
Expand Down
8 changes: 5 additions & 3 deletions fluent-bit-firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package main
import (
"C"
"unsafe"

"github.com/aws/amazon-kinesis-firehose-for-fluent-bit/firehose"
"github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins"
"github.com/fluent/fluent-bit-go/output"
Expand Down Expand Up @@ -78,12 +77,15 @@ func newFirehoseOutput(ctx unsafe.Pointer, pluginID int) (*firehose.OutputPlugin
logrus.Infof("[firehose %d] plugin parameter log_key = '%s'", pluginID, logKey)
replaceDots := output.FLBPluginConfigKey(ctx, "replace_dots")
logrus.Infof("[firehose %d] plugin parameter replace_dots = '%s'", pluginID, replaceDots)

simpleAggregation := plugins.GetBoolParam(output.FLBPluginConfigKey(ctx, "simple_aggregation"), false)
logrus.Infof("[firehose %d] plugin parameter simple_aggregation = '%v'",
pluginID, simpleAggregation)
if deliveryStream == "" || region == "" {
return nil, fmt.Errorf("[firehose %d] delivery_stream and region are required configuration parameters", pluginID)
}

return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey, timeKeyFmt, logKey, replaceDots, pluginID)
return firehose.NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, firehoseEndpoint, stsEndpoint, timeKey,
timeKeyFmt, logKey, replaceDots, pluginID, simpleAggregation)
}

//export FLBPluginInit
Expand Down
3 changes: 2 additions & 1 deletion plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,8 @@ func contains(s []string, e string) bool {
return false
}

func getBoolParam(param string, defaultVal bool) bool {
// GetBoolParam is used for boolean config setup
func GetBoolParam(param string, defaultVal bool) bool {
val := strings.ToLower(param)
if val == "true" {
return true
Expand Down
6 changes: 3 additions & 3 deletions plugins/plugins_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ func TestDataKeys(t *testing.T) {
}

func TestGetBoolParam(t *testing.T) {
value1 := getBoolParam("true", false)
value1 := GetBoolParam("true", false)
assert.Equal(t, value1, true, "Expected option value is true")
value2 := getBoolParam("false", false)
value2 := GetBoolParam("false", false)
assert.Equal(t, value2, false, "Expected option value is false")
value3 := getBoolParam("fakeString", false)
value3 := GetBoolParam("fakeString", false)
assert.Equal(t, value3, false, "Expected option value is false")
}

0 comments on commit 7622b07

Please sign in to comment.