Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrently flush records in goroutines with concurrency level #33

Merged
merged 42 commits into from
Jul 10, 2020
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
a4f0692
Remove exponential backoff code
PettitWesley May 15, 2020
b9e0975
experimental: return immediately and send data in a goroutine
PettitWesley May 18, 2020
d3096cf
Add retries to goroutine/concurrent mode
PettitWesley May 18, 2020
a5c8452
flush on exit does not make sense in goroutine/concurrent mode
PettitWesley May 18, 2020
195d299
tmp
PettitWesley May 18, 2020
4282eb7
its not working...
PettitWesley May 18, 2020
aee0813
Hmmm...
PettitWesley May 18, 2020
d4fdcb7
Need to use a pointer to a slice
PettitWesley May 18, 2020
ab8243b
Fix bug from backoff code change
PettitWesley May 18, 2020
b6bee2d
Concurrency fix: process incoming C data structures before returning
PettitWesley May 19, 2020
dcdc6e0
Fix record length bug
PettitWesley May 19, 2020
62c7464
Debugging
PettitWesley May 19, 2020
1dd656e
Debugging...
PettitWesley May 20, 2020
f95a0df
Debugging...
PettitWesley May 20, 2020
ce3e513
Oops
PettitWesley May 20, 2020
20229de
hmmm
PettitWesley May 20, 2020
cb5cff1
debugging
PettitWesley May 20, 2020
8f2af76
What is going on
PettitWesley May 20, 2020
c2b6133
makes no sense at all
PettitWesley May 20, 2020
ab59010
WAT THE
PettitWesley May 20, 2020
070ef3b
cats
PettitWesley May 20, 2020
2f45335
Call AddRecord before spawning goroutine
PettitWesley May 22, 2020
4b35c4d
Remove log message used for debug
PettitWesley May 22, 2020
f63d185
Remove invalid partition key field from output plugin- todo: fix later
PettitWesley May 22, 2020
a7d7962
Fix bug in error handling logic
PettitWesley May 22, 2020
949dd4e
Remove unneeded info statements
PettitWesley May 22, 2020
7f3209b
Fix retry logic for concurrent flushes.
zackwine May 29, 2020
a25c093
Address code review. Rename backoff to currentRetries for clarity.
zackwine Jun 1, 2020
ada60a3
Address code review. Make limitsExceeded var to ensure message is va…
zackwine Jun 2, 2020
96dd60a
Fix retry logic for concurrent flushes.
zackwine May 29, 2020
87c39b4
Address code review. Rename backoff to currentRetries for clarity.
zackwine Jun 1, 2020
898263f
Address code review. Make limitsExceeded var to ensure message is va…
zackwine Jun 2, 2020
1ea9c17
Add concurrency option, and make concurrent goroutine flushes optional.
zackwine Jun 10, 2020
1797e5a
Merge branch 'goroutines' into goroutines-optional
zackwine Jun 10, 2020
bf432b4
Address code review. Update docs, and make retries configurable.
zackwine Jun 15, 2020
1285807
Merge master
zackwine Jun 22, 2020
a824c33
Address code review items.
zackwine Jun 23, 2020
e714a07
Address code review items (2).
zackwine Jun 24, 2020
99e7a95
Address code review. Add data loss warning for concurrency.
zackwine Jul 9, 2020
53cd757
Address code review. Rename the concurrency feature experimental.
zackwine Jul 10, 2020
8d294d5
Clean up imports.
zackwine Jul 10, 2020
7d52cb2
Fix unit tests.
zackwine Jul 10, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ If you think you’ve found a potential security issue, please do not post it in
* `endpoint`: Specify a custom endpoint for the Kinesis Streams API.
* `append_newline`: If you set append_newline as true, a newline will be addded after each log record.
* `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis.
* `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`.
* `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`.
* `concurrency`: Specify a limit of concurrent go routines for flushing records to kinesis. By default concurrency is set to 0 and records are flushed in Fluent Bit's single thread. This means that requests to Kinesis will block the execution of Fluent Bit as whole. If this value is set to `4` for example then calls to Flush records from fluentbit will spawn concurrent go routines until the limit of `4` concurrent go routines are running. Once the `concurrency` limit is reached calls to Flush will return a retry code. The upper limit of the `concurrency` option is `10`.
* `concurrency_retries`: Specify a limit to the number of retries concurrent goroutines will attempt. By default `4` retries will be attempted.

### Permissions

Expand Down
89 changes: 71 additions & 18 deletions fluent-bit-kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,18 @@ import (

"github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins"
"github.com/aws/amazon-kinesis-streams-for-fluent-bit/kinesis"
kinesisAPI "github.com/aws/aws-sdk-go/service/kinesis"
"github.com/fluent/fluent-bit-go/output"
"github.com/sirupsen/logrus"
)
import "strconv"

const (
// Kinesis API Limit https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords
maximumRecordsPerPut = 500
maximumConcurrency = 10
defaultConcurrentRetries = 4
)

var (
pluginInstances []*kinesis.OutputPlugin
Expand Down Expand Up @@ -66,6 +75,10 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
logrus.Infof("[firehose %d] plugin parameter time_key = '%s'\n", pluginID, timeKey)
timeKeyFmt := output.FLBPluginConfigKey(ctx, "time_key_format")
logrus.Infof("[firehose %d] plugin parameter time_key_format = '%s'\n", pluginID, timeKeyFmt)
concurrency := output.FLBPluginConfigKey(ctx, "concurrency")
logrus.Infof("[firehose %d] plugin parameter concurrency = '%s'\n", pluginID, concurrency)
concurrencyRetries := output.FLBPluginConfigKey(ctx, "concurrency_retries")
logrus.Infof("[firehose %d] plugin parameter concurrency_retries = '%s'\n", pluginID, concurrencyRetries)

if stream == "" || region == "" {
return nil, fmt.Errorf("[kinesis %d] stream and region are required configuration parameters", pluginID)
Expand All @@ -83,7 +96,37 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
if strings.ToLower(appendNewline) == "true" {
appendNL = true
}
return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeKeyFmt, appendNL, pluginID)

var concurrencyInt, concurrencyRetriesInt int
var err error
if concurrency != "" {
concurrencyInt, err = strconv.Atoi(concurrency)
if err != nil {
logrus.Errorf("[kinesis %d] Invalid 'concurrency' value %s specified: %v", pluginID, concurrency, err)
return nil, err
}
if concurrencyInt < 0 {
zackwine marked this conversation as resolved.
Show resolved Hide resolved
return nil, fmt.Errorf("[kinesis %d] Invalid 'concurrency' value (%s) specified, must be a non-negative number", pluginID, concurrency)
}

if concurrencyInt > maximumConcurrency {
return nil, fmt.Errorf("[kinesis %d] Invalid 'concurrency' value (%s) specified, must be less than or equal to %d", pluginID, concurrency, maximumConcurrency)
}
}

if concurrencyRetries != "" {
concurrencyRetriesInt, err = strconv.Atoi(concurrencyRetries)
if err != nil {
return nil, fmt.Errorf("[kinesis %d] Invalid 'concurrency_retries' value (%s) specified: %v", pluginID, concurrencyRetries, err)
}
if concurrencyRetriesInt < 0 {
return nil, fmt.Errorf("[kinesis %d] Invalid 'concurrency_retries' value (%s) specified, must be a non-negative number", pluginID, concurrencyRetries)
}
} else {
concurrencyRetriesInt = defaultConcurrentRetries
}

return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeKeyFmt, concurrencyInt, concurrencyRetriesInt, appendNL, pluginID)
}

// The "export" comments have syntactic meaning
Expand All @@ -107,19 +150,37 @@ func FLBPluginInit(ctx unsafe.Pointer) int {

//export FLBPluginFlushCtx
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
var count int
kinesisOutput := getPluginInstance(ctx)

fluentTag := C.GoString(tag)

events, count, retCode := unpackRecords(kinesisOutput, data, length)
if retCode != output.FLB_OK {
logrus.Errorf("[kinesis %d] failed to unpackRecords with tag: %s\n", kinesisOutput.PluginID, fluentTag)

return retCode
}

logrus.Debugf("[kinesis %d] Flushing %d logs with tag: %s\n", kinesisOutput.PluginID, count, fluentTag)
if kinesisOutput.Concurrency > 0 {
return kinesisOutput.FlushConcurrent(count, events)
}

return kinesisOutput.Flush(&events)
}

func unpackRecords(kinesisOutput *kinesis.OutputPlugin, data unsafe.Pointer, length C.int) ([]*kinesisAPI.PutRecordsRequestEntry, int, int) {
var ret int
var ts interface{}
var timestamp time.Time
var record map[interface{}]interface{}
count := 0

records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut)

// Create Fluent Bit decoder
dec := output.NewDecoder(data, int(length))

kinesisOutput := getPluginInstance(ctx)
fluentTag := C.GoString(tag)
logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag)

for {
//Extract Record
ret, ts, record = output.GetRecord(dec)
Expand All @@ -138,27 +199,19 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
timestamp = time.Now()
}

retCode := kinesisOutput.AddRecord(record, &timestamp)
retCode := kinesisOutput.AddRecord(&records, record, &timestamp)
if retCode != output.FLB_OK {
return retCode
return nil, 0, retCode
}

count++
}
retCode := kinesisOutput.Flush()
if retCode != output.FLB_OK {
return retCode
}
logrus.Debugf("[kinesis %d] Processed %d events with tag %s\n", kinesisOutput.PluginID, count, fluentTag)

return output.FLB_OK
return records, count, output.FLB_OK
}

//export FLBPluginExit
func FLBPluginExit() int {
// Before final exit, call Flush() for all the instances of the Output Plugin
for i := range pluginInstances {
pluginInstances[i].Flush()
}

return output.FLB_OK
}
Expand Down
Loading