diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 5d2fba6..5516498 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -22,10 +22,21 @@ import ( "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" "github.com/aws/amazon-kinesis-streams-for-fluent-bit/kinesis" + kinesisAPI "github.com/aws/aws-sdk-go/service/kinesis" "github.com/fluent/fluent-bit-go/output" "github.com/sirupsen/logrus" ) +const ( + // Kinesis API Limit https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords + maximumRecordsPerPut = 500 +) + +const ( + retries = 6 + concurrentRetryLimit = 4 +) + var ( pluginInstances []*kinesis.OutputPlugin ) @@ -107,19 +118,65 @@ func FLBPluginInit(ctx unsafe.Pointer) int { //export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { - var count int + kinesisOutput := getPluginInstance(ctx) + + curRetries := kinesisOutput.GetConcurrentRetries() + if curRetries > concurrentRetryLimit { + logrus.Infof("[kinesis] flush returning retry, too many concurrent retries (%d)\n", curRetries) + return output.FLB_RETRY + } + + events, count, retCode := unpackRecords(kinesisOutput, data, length) + if retCode != output.FLB_OK { + logrus.Errorf("[kinesis] failed to unpackRecords\n") + return retCode + } + go flushWithRetries(kinesisOutput, tag, count, events, retries) + return output.FLB_OK +} + +func flushWithRetries(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records []*kinesisAPI.PutRecordsRequestEntry, retries int) { + var retCode, tries int + + currentRetries := kinesisOutput.GetConcurrentRetries() + + for tries = 0; tries < retries; tries++ { + if currentRetries > 0 { + // Wait if other goroutines are retrying, as well as implement a progressive backoff + time.Sleep(time.Duration((2^currentRetries)*100) * time.Millisecond) + } + + logrus.Debugf("[kinesis] Sending (%p) (%d) records, currentRetries=(%d)", records, len(records), currentRetries) + retCode = pluginConcurrentFlush(kinesisOutput, tag, count, &records) + if retCode != output.FLB_RETRY { + break + } + currentRetries = kinesisOutput.AddConcurrentRetries(1) + logrus.Infof("[kinesis] Going to retry with (%p) (%d) records, currentRetries=(%d)", records, len(records), currentRetries) + } + if tries > 0 { + kinesisOutput.AddConcurrentRetries(int64(-tries)) + } + if retCode == output.FLB_ERROR { + logrus.Errorf("[kinesis] Failed to flush (%d) records with error", len(records)) + } + if retCode == output.FLB_RETRY { + logrus.Errorf("[kinesis] Failed flush (%d) records after retries %d", len(records), retries) + } +} + +func unpackRecords(kinesisOutput *kinesis.OutputPlugin, data unsafe.Pointer, length C.int) ([]*kinesisAPI.PutRecordsRequestEntry, int, int) { var ret int var ts interface{} var timestamp time.Time var record map[interface{}]interface{} + count := 0 + + records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) // Create Fluent Bit decoder dec := output.NewDecoder(data, int(length)) - kinesisOutput := getPluginInstance(ctx) - fluentTag := C.GoString(tag) - logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag) - for { //Extract Record ret, ts, record = output.GetRecord(dec) @@ -138,16 +195,23 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int timestamp = time.Now() } - retCode := kinesisOutput.AddRecord(record, ×tamp) + retCode := kinesisOutput.AddRecord(&records, record, ×tamp) if retCode != output.FLB_OK { - return retCode + return nil, 0, retCode } + count++ } - err := kinesisOutput.Flush() - if err != nil { - logrus.Errorf("[kinesis %d] %v\n", kinesisOutput.PluginID, err) - return output.FLB_ERROR + + return records, count, output.FLB_OK +} + +func pluginConcurrentFlush(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records *[]*kinesisAPI.PutRecordsRequestEntry) int { + fluentTag := C.GoString(tag) + logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag) + retCode := kinesisOutput.Flush(records) + if retCode != output.FLB_OK { + return retCode } logrus.Debugf("[kinesis %d] Processed %d events with tag %s\n", kinesisOutput.PluginID, count, fluentTag) @@ -157,12 +221,9 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int //export FLBPluginExit func FLBPluginExit() int { // Before final exit, call Flush() for all the instances of the Output Plugin - for i := range pluginInstances { - err := pluginInstances[i].Flush() - if err != nil { - logrus.Errorf("[kinesis %d] %v\n", pluginInstances[i].PluginID, err) - } - } + // for i := range pluginInstances { + // pluginInstances[i].Flush(records) + // } return output.FLB_OK } diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index fd02b78..dd12574 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -21,6 +21,7 @@ import ( "fmt" "math/rand" "os" + "sync/atomic" "time" "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" @@ -75,17 +76,15 @@ type OutputPlugin struct { // Partition key decides in which shard of your stream the data belongs to partitionKey string // Decides whether to append a newline after each data record - appendNewline bool - timeKey string - fmtStrftime *strftime.Strftime - lastInvalidPartitionKeyIndex int - client PutRecordsClient - records []*kinesis.PutRecordsRequestEntry - dataLength int - backoff *plugins.Backoff - timer *plugins.Timeout - PluginID int - random *random + appendNewline bool + timeKey string + fmtStrftime *strftime.Strftime + client PutRecordsClient + timer *plugins.Timeout + PluginID int + random *random + // Used to implement backoff for concurrent flushes + concurrentRetries int64 } // NewOutputPlugin creates an OutputPlugin object @@ -95,7 +94,6 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, return nil, err } - records := make([]*kinesis.PutRecordsRequestEntry, 0, maximumRecordsPerPut) timer, err := plugins.NewTimeout(func(d time.Duration) { logrus.Errorf("[kinesis %d] timeout threshold reached: Failed to send logs for %s\n", pluginID, d.String()) logrus.Errorf("[kinesis %d] Quitting Fluent Bit", pluginID) @@ -125,19 +123,16 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, } return &OutputPlugin{ - stream: stream, - client: client, - records: records, - dataKeys: dataKeys, - partitionKey: partitionKey, - appendNewline: appendNewline, - timeKey: timeKey, - fmtStrftime: timeFormatter, - lastInvalidPartitionKeyIndex: -1, - backoff: plugins.NewBackoff(), - timer: timer, - PluginID: pluginID, - random: random, + stream: stream, + client: client, + dataKeys: dataKeys, + partitionKey: partitionKey, + appendNewline: appendNewline, + timeKey: timeKey, + fmtStrftime: timeFormatter, + timer: timer, + PluginID: pluginID, + random: random, }, nil } @@ -174,10 +169,9 @@ func newPutRecordsClient(roleARN string, awsRegion string, endpoint string) (*ki return client, nil } -// AddRecord accepts a record and adds it to the buffer, flushing the buffer if it is full -// the return value is one of: FLB_OK FLB_RETRY -// API Errors lead to an FLB_RETRY, and data processing errors are logged, the record is discarded and FLB_OK is returned -func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, timeStamp *time.Time) int { +// AddRecord accepts a record and adds it to the buffer +// the return value is one of: FLB_OK FLB_RETRY FLB_ERROR +func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsRequestEntry, record map[interface{}]interface{}, timeStamp *time.Time) int { if outputPlugin.timeKey != "" { buf := new(bytes.Buffer) err := outputPlugin.fmtStrftime.Format(buf, *timeStamp) @@ -188,7 +182,7 @@ func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, record[outputPlugin.timeKey] = buf.String() } - partitionKey := outputPlugin.getPartitionKey(record) + partitionKey := outputPlugin.getPartitionKey(*records, record) data, err := outputPlugin.processRecord(record, partitionKey) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) @@ -196,28 +190,49 @@ func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, return fluentbit.FLB_OK } - newRecordSize := len(data) + len(partitionKey) - - if len(outputPlugin.records) == maximumRecordsPerPut || (outputPlugin.dataLength+newRecordSize) > maximumPutRecordBatchSize { - err = outputPlugin.sendCurrentBatch() - if err != nil { - logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) - // If FluentBit fails to send logs, it will retry rather than discarding the logs - return fluentbit.FLB_RETRY - } - } - - outputPlugin.records = append(outputPlugin.records, &kinesis.PutRecordsRequestEntry{ + *records = append(*records, &kinesis.PutRecordsRequestEntry{ Data: data, PartitionKey: aws.String(partitionKey), }) - outputPlugin.dataLength += newRecordSize return fluentbit.FLB_OK } // Flush sends the current buffer of log records -func (outputPlugin *OutputPlugin) Flush() error { - return outputPlugin.sendCurrentBatch() +// Returns FLB_OK, FLB_RETRY, FLB_ERROR +func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEntry) int { + // Each flush must have its own output buffe r, since flushes can be concurrent + requestBuf := make([]*kinesis.PutRecordsRequestEntry, 0, maximumRecordsPerPut) + dataLength := 0 + + for i, record := range *records { + newRecordSize := len(record.Data) + len(aws.StringValue(record.PartitionKey)) + + if len(requestBuf) == maximumRecordsPerPut || (dataLength+newRecordSize) > maximumPutRecordBatchSize { + retCode, err := outputPlugin.sendCurrentBatch(&requestBuf, &dataLength) + if err != nil { + logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) + } + if retCode != fluentbit.FLB_OK { + unsent := (*records)[i:] + // requestBuf will contain records sendCurrentBatch failed to send, + // combine those with the records yet to be sent/batched + *records = append(requestBuf, unsent...) + return retCode + } + } + + requestBuf = append(requestBuf, record) + dataLength += newRecordSize + } + + // send any remaining records + retCode, err := outputPlugin.sendCurrentBatch(&requestBuf, &dataLength) + if err != nil { + logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) + } + // requestBuf will contain records sendCurrentBatch failed to send + *records = requestBuf + return retCode } func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface{}, partitionKey string) ([]byte, error) { @@ -251,16 +266,13 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface return data, nil } -func (outputPlugin *OutputPlugin) sendCurrentBatch() error { - if outputPlugin.lastInvalidPartitionKeyIndex >= 0 { - logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, outputPlugin.records[outputPlugin.lastInvalidPartitionKeyIndex].Data) - outputPlugin.lastInvalidPartitionKeyIndex = -1 +func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int) (int, error) { + if len(*records) == 0 { + return fluentbit.FLB_OK, nil } - outputPlugin.backoff.Wait() outputPlugin.timer.Check() - response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{ - Records: outputPlugin.records, + Records: *records, StreamName: aws.String(outputPlugin.stream), }) if err != nil { @@ -269,55 +281,61 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch() error { if aerr, ok := err.(awserr.Error); ok { if aerr.Code() == kinesis.ErrCodeProvisionedThroughputExceededException { logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) - // Backoff and Retry - outputPlugin.backoff.StartBackoff() } } - return err + return fluentbit.FLB_RETRY, err } - logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(outputPlugin.records)) + logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(*records)) - return outputPlugin.processAPIResponse(response) + return outputPlugin.processAPIResponse(records, dataLength, response) } // processAPIResponse processes the successful and failed records // it returns an error iff no records succeeded (i.e.) no progress has been made -func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecordsOutput) error { +func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int, response *kinesis.PutRecordsOutput) (int, error) { + + var retCode int = fluentbit.FLB_OK + var limitsExceeded bool + if aws.Int64Value(response.FailedRecordCount) > 0 { // start timer if all records failed (no progress has been made) - if aws.Int64Value(response.FailedRecordCount) == int64(len(outputPlugin.records)) { + if aws.Int64Value(response.FailedRecordCount) == int64(len(*records)) { outputPlugin.timer.Start() - return fmt.Errorf("PutRecords request returned with no records successfully recieved") + return fluentbit.FLB_RETRY, fmt.Errorf("PutRecords request returned with no records successfully recieved") } - logrus.Warnf("[kinesis %d] %d records failed to be delivered. Will retry.\n", outputPlugin.PluginID, aws.Int64Value(response.FailedRecordCount)) + logrus.Warnf("[kinesis %d] %d/%d records failed to be delivered. Will retry.\n", outputPlugin.PluginID, aws.Int64Value(response.FailedRecordCount), len(*records)) failedRecords := make([]*kinesis.PutRecordsRequestEntry, 0, aws.Int64Value(response.FailedRecordCount)) // try to resend failed records for i, record := range response.Records { if record.ErrorMessage != nil { logrus.Debugf("[kinesis %d] Record failed to send with error: %s\n", outputPlugin.PluginID, aws.StringValue(record.ErrorMessage)) - failedRecords = append(failedRecords, outputPlugin.records[i]) + failedRecords = append(failedRecords, (*records)[i]) } + if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException { - // Backoff and Retry - outputPlugin.backoff.StartBackoff() + retCode = fluentbit.FLB_RETRY + limitsExceeded = true } } - outputPlugin.records = outputPlugin.records[:0] - outputPlugin.records = append(outputPlugin.records, failedRecords...) - outputPlugin.dataLength = 0 - for _, record := range outputPlugin.records { - outputPlugin.dataLength += len(record.Data) + if limitsExceeded { + logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) + } + + *records = (*records)[:0] + *records = append(*records, failedRecords...) + *dataLength = 0 + for _, record := range *records { + *dataLength += len(record.Data) } } else { // request fully succeeded outputPlugin.timer.Reset() - outputPlugin.backoff.Reset() - outputPlugin.records = outputPlugin.records[:0] - outputPlugin.dataLength = 0 + *records = (*records)[:0] + *dataLength = 0 } - return nil + return retCode, nil } // randomString generates a random string of length 8 @@ -331,7 +349,7 @@ func (outputPlugin *OutputPlugin) randomString() string { // getPartitionKey returns the value for a given valid key // if the given key is emapty or invalid, it returns a random string -func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interface{}) string { +func (outputPlugin *OutputPlugin) getPartitionKey(records []*kinesis.PutRecordsRequestEntry, record map[interface{}]interface{}) string { partitionKey := outputPlugin.partitionKey if partitionKey != "" { for k, v := range record { @@ -346,7 +364,6 @@ func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interfa } } } - outputPlugin.lastInvalidPartitionKeyIndex = len(outputPlugin.records) % maximumRecordsPerPut } return outputPlugin.randomString() } @@ -362,3 +379,13 @@ func stringOrByteArray(v interface{}) string { return "" } } + +// GetConcurrentRetries value (goroutine safe) +func (outputPlugin *OutputPlugin) GetConcurrentRetries() int64 { + return atomic.LoadInt64(&outputPlugin.concurrentRetries) +} + +// AddConcurrentRetries will update the value (goroutine safe) +func (outputPlugin *OutputPlugin) AddConcurrentRetries(val int64) int64 { + return atomic.AddInt64(&outputPlugin.concurrentRetries, int64(val)) +} diff --git a/kinesis/kinesis_test.go b/kinesis/kinesis_test.go index 9769c28..1cff431 100644 --- a/kinesis/kinesis_test.go +++ b/kinesis/kinesis_test.go @@ -40,7 +40,6 @@ func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient) (*OutputPlug dataKeys: "", partitionKey: "", lastInvalidPartitionKeyIndex: -1, - backoff: plugins.NewBackoff(), timer: timer, PluginID: 0, random: random, @@ -98,6 +97,6 @@ func TestAddRecordAndFlush(t *testing.T) { retCode := outputPlugin.AddRecord(record, &timeStamp) assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") - err := outputPlugin.Flush() - assert.NoError(t, err, "Unexpected error calling flush") + retCode = outputPlugin.Flush() + assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") }