Skip to content

Commit

Permalink
Merge pull request #33 from zackwine/goroutines-optional
Browse files Browse the repository at this point in the history
Concurrently flush records in goroutines with concurrency level
  • Loading branch information
sonofachamp authored Jul 10, 2020
2 parents 1f77af8 + 7d52cb2 commit faecc50
Show file tree
Hide file tree
Showing 4 changed files with 274 additions and 101 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ If you think you’ve found a potential security issue, please do not post it in
* `endpoint`: Specify a custom endpoint for the Kinesis Streams API.
* `append_newline`: If you set append_newline as true, a newline will be addded after each log record.
* `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis.
* `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`.
* `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`.
* `experimental_concurrency`: Specify a limit of concurrent go routines for flushing records to kinesis. By default `experimental_concurrency` is set to 0 and records are flushed in Fluent Bit's single thread. This means that requests to Kinesis will block the execution of Fluent Bit. If this value is set to `4` for example then calls to Flush records from fluentbit will spawn concurrent go routines until the limit of `4` concurrent go routines are running. Once the `experimental_concurrency` limit is reached calls to Flush will return a retry code. The upper limit of the `experimental_concurrency` option is `10`. WARNING: Enabling `experimental_concurrency` can lead to data loss if the retry count is reached. Enabling concurrency will increase resource usage (memory and CPU).
* `experimental_concurrency_retries`: Specify a limit to the number of retries concurrent goroutines will attempt. By default `4` retries will be attempted before records are dropped.

### Permissions

Expand Down
93 changes: 75 additions & 18 deletions fluent-bit-kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,25 @@ package main
import (
"C"
"fmt"
"strconv"
"strings"
"time"
"unsafe"

"github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins"
"github.com/aws/amazon-kinesis-streams-for-fluent-bit/kinesis"
kinesisAPI "github.com/aws/aws-sdk-go/service/kinesis"
"github.com/fluent/fluent-bit-go/output"
"github.com/sirupsen/logrus"
)

const (
// Kinesis API Limit https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords
maximumRecordsPerPut = 500
maximumConcurrency = 10
defaultConcurrentRetries = 4
)

var (
pluginInstances []*kinesis.OutputPlugin
)
Expand Down Expand Up @@ -66,6 +75,10 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
logrus.Infof("[firehose %d] plugin parameter time_key = '%s'\n", pluginID, timeKey)
timeKeyFmt := output.FLBPluginConfigKey(ctx, "time_key_format")
logrus.Infof("[firehose %d] plugin parameter time_key_format = '%s'\n", pluginID, timeKeyFmt)
concurrency := output.FLBPluginConfigKey(ctx, "experimental_concurrency")
logrus.Infof("[firehose %d] plugin parameter experimental_concurrency = '%s'\n", pluginID, concurrency)
concurrencyRetries := output.FLBPluginConfigKey(ctx, "experimental_concurrency_retries")
logrus.Infof("[firehose %d] plugin parameter experimental_concurrency_retries = '%s'\n", pluginID, concurrencyRetries)

if stream == "" || region == "" {
return nil, fmt.Errorf("[kinesis %d] stream and region are required configuration parameters", pluginID)
Expand All @@ -83,7 +96,41 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
if strings.ToLower(appendNewline) == "true" {
appendNL = true
}
return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeKeyFmt, appendNL, pluginID)

var concurrencyInt, concurrencyRetriesInt int
var err error
if concurrency != "" {
concurrencyInt, err = strconv.Atoi(concurrency)
if err != nil {
logrus.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value %s specified: %v", pluginID, concurrency, err)
return nil, err
}
if concurrencyInt < 0 {
return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value (%s) specified, must be a non-negative number", pluginID, concurrency)
}

if concurrencyInt > maximumConcurrency {
return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value (%s) specified, must be less than or equal to %d", pluginID, concurrency, maximumConcurrency)
}

if concurrencyInt > 0 {
logrus.Warnf("[kinesis %d] WARNING: Enabling concurrency can lead to data loss. If 'experimental_concurrency_retries' is reached data will be lost.", pluginID)
}
}

if concurrencyRetries != "" {
concurrencyRetriesInt, err = strconv.Atoi(concurrencyRetries)
if err != nil {
return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency_retries' value (%s) specified: %v", pluginID, concurrencyRetries, err)
}
if concurrencyRetriesInt < 0 {
return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency_retries' value (%s) specified, must be a non-negative number", pluginID, concurrencyRetries)
}
} else {
concurrencyRetriesInt = defaultConcurrentRetries
}

return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeKeyFmt, concurrencyInt, concurrencyRetriesInt, appendNL, pluginID)
}

// The "export" comments have syntactic meaning
Expand All @@ -107,19 +154,37 @@ func FLBPluginInit(ctx unsafe.Pointer) int {

//export FLBPluginFlushCtx
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
var count int
kinesisOutput := getPluginInstance(ctx)

fluentTag := C.GoString(tag)

events, count, retCode := unpackRecords(kinesisOutput, data, length)
if retCode != output.FLB_OK {
logrus.Errorf("[kinesis %d] failed to unpackRecords with tag: %s\n", kinesisOutput.PluginID, fluentTag)

return retCode
}

logrus.Debugf("[kinesis %d] Flushing %d logs with tag: %s\n", kinesisOutput.PluginID, count, fluentTag)
if kinesisOutput.Concurrency > 0 {
return kinesisOutput.FlushConcurrent(count, events)
}

return kinesisOutput.Flush(&events)
}

func unpackRecords(kinesisOutput *kinesis.OutputPlugin, data unsafe.Pointer, length C.int) ([]*kinesisAPI.PutRecordsRequestEntry, int, int) {
var ret int
var ts interface{}
var timestamp time.Time
var record map[interface{}]interface{}
count := 0

records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut)

// Create Fluent Bit decoder
dec := output.NewDecoder(data, int(length))

kinesisOutput := getPluginInstance(ctx)
fluentTag := C.GoString(tag)
logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag)

for {
//Extract Record
ret, ts, record = output.GetRecord(dec)
Expand All @@ -138,27 +203,19 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
timestamp = time.Now()
}

retCode := kinesisOutput.AddRecord(record, &timestamp)
retCode := kinesisOutput.AddRecord(&records, record, &timestamp)
if retCode != output.FLB_OK {
return retCode
return nil, 0, retCode
}

count++
}
retCode := kinesisOutput.Flush()
if retCode != output.FLB_OK {
return retCode
}
logrus.Debugf("[kinesis %d] Processed %d events with tag %s\n", kinesisOutput.PluginID, count, fluentTag)

return output.FLB_OK
return records, count, output.FLB_OK
}

//export FLBPluginExit
func FLBPluginExit() int {
// Before final exit, call Flush() for all the instances of the Output Plugin
for i := range pluginInstances {
pluginInstances[i].Flush()
}

return output.FLB_OK
}
Expand Down
Loading

0 comments on commit faecc50

Please sign in to comment.