Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental: Concurrently flush records in goroutines #28

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a4f0692
Remove exponential backoff code
PettitWesley May 15, 2020
b9e0975
experimental: return immediately and send data in a goroutine
PettitWesley May 18, 2020
d3096cf
Add retries to goroutine/concurrent mode
PettitWesley May 18, 2020
a5c8452
flush on exit does not make sense in goroutine/concurrent mode
PettitWesley May 18, 2020
195d299
tmp
PettitWesley May 18, 2020
4282eb7
its not working...
PettitWesley May 18, 2020
aee0813
Hmmm...
PettitWesley May 18, 2020
d4fdcb7
Need to use a pointer to a slice
PettitWesley May 18, 2020
ab8243b
Fix bug from backoff code change
PettitWesley May 18, 2020
b6bee2d
Concurrency fix: process incoming C data structures before returning
PettitWesley May 19, 2020
dcdc6e0
Fix record length bug
PettitWesley May 19, 2020
62c7464
Debugging
PettitWesley May 19, 2020
1dd656e
Debugging...
PettitWesley May 20, 2020
f95a0df
Debugging...
PettitWesley May 20, 2020
ce3e513
Oops
PettitWesley May 20, 2020
20229de
hmmm
PettitWesley May 20, 2020
cb5cff1
debugging
PettitWesley May 20, 2020
8f2af76
What is going on
PettitWesley May 20, 2020
c2b6133
makes no sense at all
PettitWesley May 20, 2020
ab59010
WAT THE
PettitWesley May 20, 2020
070ef3b
cats
PettitWesley May 20, 2020
2f45335
Call AddRecord before spawning goroutine
PettitWesley May 22, 2020
4b35c4d
Remove log message used for debug
PettitWesley May 22, 2020
f63d185
Remove invalid partition key field from output plugin- todo: fix later
PettitWesley May 22, 2020
a7d7962
Fix bug in error handling logic
PettitWesley May 22, 2020
949dd4e
Remove unneeded info statements
PettitWesley May 22, 2020
96dd60a
Fix retry logic for concurrent flushes.
zackwine May 29, 2020
87c39b4
Address code review. Rename backoff to currentRetries for clarity.
zackwine Jun 1, 2020
898263f
Address code review. Make limitsExceeded var to ensure message is va…
zackwine Jun 2, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 78 additions & 17 deletions fluent-bit-kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,21 @@ import (

"github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins"
"github.com/aws/amazon-kinesis-streams-for-fluent-bit/kinesis"
kinesisAPI "github.com/aws/aws-sdk-go/service/kinesis"
"github.com/fluent/fluent-bit-go/output"
"github.com/sirupsen/logrus"
)

const (
// Kinesis API Limit https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords
maximumRecordsPerPut = 500
)

const (
retries = 6
concurrentRetryLimit = 4
)

var (
pluginInstances []*kinesis.OutputPlugin
)
Expand Down Expand Up @@ -107,19 +118,65 @@ func FLBPluginInit(ctx unsafe.Pointer) int {

//export FLBPluginFlushCtx
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int {
var count int
kinesisOutput := getPluginInstance(ctx)

curRetries := kinesisOutput.GetConcurrentRetries()
if curRetries > concurrentRetryLimit {
logrus.Infof("[kinesis] flush returning retry, too many concurrent retries (%d)\n", curRetries)
return output.FLB_RETRY
}

events, count, retCode := unpackRecords(kinesisOutput, data, length)
if retCode != output.FLB_OK {
logrus.Errorf("[kinesis] failed to unpackRecords\n")
return retCode
}
go flushWithRetries(kinesisOutput, tag, count, events, retries)
return output.FLB_OK
}

func flushWithRetries(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records []*kinesisAPI.PutRecordsRequestEntry, retries int) {
var retCode, tries int

currentRetries := kinesisOutput.GetConcurrentRetries()

for tries = 0; tries < retries; tries++ {
if currentRetries > 0 {
// Wait if other goroutines are retrying, as well as implement a progressive backoff
time.Sleep(time.Duration((2^currentRetries)*100) * time.Millisecond)
}

logrus.Debugf("[kinesis] Sending (%p) (%d) records, currentRetries=(%d)", records, len(records), currentRetries)
retCode = pluginConcurrentFlush(kinesisOutput, tag, count, &records)
if retCode != output.FLB_RETRY {
break
}
currentRetries = kinesisOutput.AddConcurrentRetries(1)
logrus.Infof("[kinesis] Going to retry with (%p) (%d) records, currentRetries=(%d)", records, len(records), currentRetries)
}
if tries > 0 {
kinesisOutput.AddConcurrentRetries(int64(-tries))
}
if retCode == output.FLB_ERROR {
logrus.Errorf("[kinesis] Failed to flush (%d) records with error", len(records))
}
if retCode == output.FLB_RETRY {
logrus.Errorf("[kinesis] Failed flush (%d) records after retries %d", len(records), retries)
}
}

func unpackRecords(kinesisOutput *kinesis.OutputPlugin, data unsafe.Pointer, length C.int) ([]*kinesisAPI.PutRecordsRequestEntry, int, int) {
var ret int
var ts interface{}
var timestamp time.Time
var record map[interface{}]interface{}
count := 0

records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut)

// Create Fluent Bit decoder
dec := output.NewDecoder(data, int(length))

kinesisOutput := getPluginInstance(ctx)
fluentTag := C.GoString(tag)
logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag)

for {
//Extract Record
ret, ts, record = output.GetRecord(dec)
Expand All @@ -138,16 +195,23 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
timestamp = time.Now()
}

retCode := kinesisOutput.AddRecord(record, &timestamp)
retCode := kinesisOutput.AddRecord(&records, record, &timestamp)
if retCode != output.FLB_OK {
return retCode
return nil, 0, retCode
}

count++
}
err := kinesisOutput.Flush()
if err != nil {
logrus.Errorf("[kinesis %d] %v\n", kinesisOutput.PluginID, err)
return output.FLB_ERROR

return records, count, output.FLB_OK
}

func pluginConcurrentFlush(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records *[]*kinesisAPI.PutRecordsRequestEntry) int {
fluentTag := C.GoString(tag)
logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag)
retCode := kinesisOutput.Flush(records)
if retCode != output.FLB_OK {
return retCode
}
logrus.Debugf("[kinesis %d] Processed %d events with tag %s\n", kinesisOutput.PluginID, count, fluentTag)

Expand All @@ -157,12 +221,9 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
//export FLBPluginExit
func FLBPluginExit() int {
// Before final exit, call Flush() for all the instances of the Output Plugin
for i := range pluginInstances {
err := pluginInstances[i].Flush()
if err != nil {
logrus.Errorf("[kinesis %d] %v\n", pluginInstances[i].PluginID, err)
}
}
// for i := range pluginInstances {
// pluginInstances[i].Flush(records)
// }

return output.FLB_OK
}
Expand Down
Loading