From a4f0692e2e6e4c1d8e5d6d70b661f435889471a4 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Fri, 15 May 2020 15:17:25 -0700 Subject: [PATCH 01/29] Remove exponential backoff code --- fluent-bit-kinesis.go | 12 ++++-------- kinesis/kinesis.go | 34 ++++++++++++++++------------------ kinesis/kinesis_test.go | 5 ++--- 3 files changed, 22 insertions(+), 29 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 5d2fba6..96dfd63 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -144,10 +144,9 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int } count++ } - err := kinesisOutput.Flush() - if err != nil { - logrus.Errorf("[kinesis %d] %v\n", kinesisOutput.PluginID, err) - return output.FLB_ERROR + retCode := kinesisOutput.Flush() + if retCode != output.FLB_OK { + return retCode } logrus.Debugf("[kinesis %d] Processed %d events with tag %s\n", kinesisOutput.PluginID, count, fluentTag) @@ -158,10 +157,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int func FLBPluginExit() int { // Before final exit, call Flush() for all the instances of the Output Plugin for i := range pluginInstances { - err := pluginInstances[i].Flush() - if err != nil { - logrus.Errorf("[kinesis %d] %v\n", pluginInstances[i].PluginID, err) - } + pluginInstances[i].Flush() } return output.FLB_OK diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index fd02b78..22005f7 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -82,7 +82,6 @@ type OutputPlugin struct { client PutRecordsClient records []*kinesis.PutRecordsRequestEntry dataLength int - backoff *plugins.Backoff timer *plugins.Timeout PluginID int random *random @@ -134,7 +133,6 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey: timeKey, fmtStrftime: timeFormatter, lastInvalidPartitionKeyIndex: -1, - backoff: plugins.NewBackoff(), timer: timer, PluginID: pluginID, random: random, @@ -199,12 +197,11 @@ func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, newRecordSize := len(data) + len(partitionKey) if len(outputPlugin.records) == maximumRecordsPerPut || (outputPlugin.dataLength+newRecordSize) > maximumPutRecordBatchSize { - err = outputPlugin.sendCurrentBatch() + retCode, err := outputPlugin.sendCurrentBatch() if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) - // If FluentBit fails to send logs, it will retry rather than discarding the logs - return fluentbit.FLB_RETRY } + return retCode } outputPlugin.records = append(outputPlugin.records, &kinesis.PutRecordsRequestEntry{ @@ -216,8 +213,13 @@ func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, } // Flush sends the current buffer of log records -func (outputPlugin *OutputPlugin) Flush() error { - return outputPlugin.sendCurrentBatch() +// Returns FLB_OK, FLB_RETRY, FLB_ERROR +func (outputPlugin *OutputPlugin) Flush() int { + retCode, err := outputPlugin.sendCurrentBatch() + if err != nil { + logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) + } + return retCode } func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface{}, partitionKey string) ([]byte, error) { @@ -251,12 +253,11 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface return data, nil } -func (outputPlugin *OutputPlugin) sendCurrentBatch() error { +func (outputPlugin *OutputPlugin) sendCurrentBatch() (int, error) { if outputPlugin.lastInvalidPartitionKeyIndex >= 0 { logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, outputPlugin.records[outputPlugin.lastInvalidPartitionKeyIndex].Data) outputPlugin.lastInvalidPartitionKeyIndex = -1 } - outputPlugin.backoff.Wait() outputPlugin.timer.Check() response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{ @@ -269,11 +270,9 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch() error { if aerr, ok := err.(awserr.Error); ok { if aerr.Code() == kinesis.ErrCodeProvisionedThroughputExceededException { logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) - // Backoff and Retry - outputPlugin.backoff.StartBackoff() } } - return err + return fluentbit.FLB_RETRY, err } logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(outputPlugin.records)) @@ -282,12 +281,12 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch() error { // processAPIResponse processes the successful and failed records // it returns an error iff no records succeeded (i.e.) no progress has been made -func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecordsOutput) error { +func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecordsOutput) (int, error) { if aws.Int64Value(response.FailedRecordCount) > 0 { // start timer if all records failed (no progress has been made) if aws.Int64Value(response.FailedRecordCount) == int64(len(outputPlugin.records)) { outputPlugin.timer.Start() - return fmt.Errorf("PutRecords request returned with no records successfully recieved") + return fluentbit.FLB_RETRY, fmt.Errorf("PutRecords request returned with no records successfully recieved") } logrus.Warnf("[kinesis %d] %d records failed to be delivered. Will retry.\n", outputPlugin.PluginID, aws.Int64Value(response.FailedRecordCount)) @@ -299,8 +298,8 @@ func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecord failedRecords = append(failedRecords, outputPlugin.records[i]) } if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException { - // Backoff and Retry - outputPlugin.backoff.StartBackoff() + logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) + return fluentbit.FLB_RETRY, nil } } @@ -313,11 +312,10 @@ func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecord } else { // request fully succeeded outputPlugin.timer.Reset() - outputPlugin.backoff.Reset() outputPlugin.records = outputPlugin.records[:0] outputPlugin.dataLength = 0 } - return nil + return fluentbit.FLB_OK, nil } // randomString generates a random string of length 8 diff --git a/kinesis/kinesis_test.go b/kinesis/kinesis_test.go index 9769c28..1cff431 100644 --- a/kinesis/kinesis_test.go +++ b/kinesis/kinesis_test.go @@ -40,7 +40,6 @@ func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient) (*OutputPlug dataKeys: "", partitionKey: "", lastInvalidPartitionKeyIndex: -1, - backoff: plugins.NewBackoff(), timer: timer, PluginID: 0, random: random, @@ -98,6 +97,6 @@ func TestAddRecordAndFlush(t *testing.T) { retCode := outputPlugin.AddRecord(record, &timeStamp) assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") - err := outputPlugin.Flush() - assert.NoError(t, err, "Unexpected error calling flush") + retCode = outputPlugin.Flush() + assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") } From b9e09755947a5edb82f50e65c7a82ebd8fe253de Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 17 May 2020 22:04:42 -0700 Subject: [PATCH 02/29] experimental: return immediately and send data in a goroutine --- fluent-bit-kinesis.go | 21 +++++++++++++++++--- kinesis/kinesis.go | 45 ++++++++++++++++++++----------------------- 2 files changed, 39 insertions(+), 27 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 96dfd63..35fdb55 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -22,10 +22,16 @@ import ( "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" "github.com/aws/amazon-kinesis-streams-for-fluent-bit/kinesis" + kinesisAPI "github.com/aws/aws-sdk-go/service/kinesis" "github.com/fluent/fluent-bit-go/output" "github.com/sirupsen/logrus" ) +const ( + // Kinesis API Limit https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords + maximumRecordsPerPut = 500 +) + var ( pluginInstances []*kinesis.OutputPlugin ) @@ -107,6 +113,11 @@ func FLBPluginInit(ctx unsafe.Pointer) int { //export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { + go pluginConcurrentFlush(ctx, data, length, tag) + return output.FLB_OK +} + +func pluginConcurrentFlush(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { var count int var ret int var ts interface{} @@ -120,6 +131,9 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int fluentTag := C.GoString(tag) logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag) + // Each flush must have its own output buffer, since flushes can be concurrent + records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) + for { //Extract Record ret, ts, record = output.GetRecord(dec) @@ -138,13 +152,13 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int timestamp = time.Now() } - retCode := kinesisOutput.AddRecord(record, ×tamp) + retCode := kinesisOutput.AddRecord(records, record, ×tamp) if retCode != output.FLB_OK { return retCode } count++ } - retCode := kinesisOutput.Flush() + retCode := kinesisOutput.Flush(records) if retCode != output.FLB_OK { return retCode } @@ -155,9 +169,10 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int //export FLBPluginExit func FLBPluginExit() int { + records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) // Before final exit, call Flush() for all the instances of the Output Plugin for i := range pluginInstances { - pluginInstances[i].Flush() + pluginInstances[i].Flush(records) } return output.FLB_OK diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 22005f7..c22467e 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -80,7 +80,6 @@ type OutputPlugin struct { fmtStrftime *strftime.Strftime lastInvalidPartitionKeyIndex int client PutRecordsClient - records []*kinesis.PutRecordsRequestEntry dataLength int timer *plugins.Timeout PluginID int @@ -94,7 +93,6 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, return nil, err } - records := make([]*kinesis.PutRecordsRequestEntry, 0, maximumRecordsPerPut) timer, err := plugins.NewTimeout(func(d time.Duration) { logrus.Errorf("[kinesis %d] timeout threshold reached: Failed to send logs for %s\n", pluginID, d.String()) logrus.Errorf("[kinesis %d] Quitting Fluent Bit", pluginID) @@ -126,7 +124,6 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, return &OutputPlugin{ stream: stream, client: client, - records: records, dataKeys: dataKeys, partitionKey: partitionKey, appendNewline: appendNewline, @@ -175,7 +172,7 @@ func newPutRecordsClient(roleARN string, awsRegion string, endpoint string) (*ki // AddRecord accepts a record and adds it to the buffer, flushing the buffer if it is full // the return value is one of: FLB_OK FLB_RETRY // API Errors lead to an FLB_RETRY, and data processing errors are logged, the record is discarded and FLB_OK is returned -func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, timeStamp *time.Time) int { +func (outputPlugin *OutputPlugin) AddRecord(records []*kinesis.PutRecordsRequestEntry, record map[interface{}]interface{}, timeStamp *time.Time) int { if outputPlugin.timeKey != "" { buf := new(bytes.Buffer) err := outputPlugin.fmtStrftime.Format(buf, *timeStamp) @@ -186,7 +183,7 @@ func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, record[outputPlugin.timeKey] = buf.String() } - partitionKey := outputPlugin.getPartitionKey(record) + partitionKey := outputPlugin.getPartitionKey(records, record) data, err := outputPlugin.processRecord(record, partitionKey) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) @@ -196,15 +193,15 @@ func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, newRecordSize := len(data) + len(partitionKey) - if len(outputPlugin.records) == maximumRecordsPerPut || (outputPlugin.dataLength+newRecordSize) > maximumPutRecordBatchSize { - retCode, err := outputPlugin.sendCurrentBatch() + if len(records) == maximumRecordsPerPut || (outputPlugin.dataLength+newRecordSize) > maximumPutRecordBatchSize { + retCode, err := outputPlugin.sendCurrentBatch(records) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) } return retCode } - outputPlugin.records = append(outputPlugin.records, &kinesis.PutRecordsRequestEntry{ + records = append(records, &kinesis.PutRecordsRequestEntry{ Data: data, PartitionKey: aws.String(partitionKey), }) @@ -214,8 +211,8 @@ func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, // Flush sends the current buffer of log records // Returns FLB_OK, FLB_RETRY, FLB_ERROR -func (outputPlugin *OutputPlugin) Flush() int { - retCode, err := outputPlugin.sendCurrentBatch() +func (outputPlugin *OutputPlugin) Flush(records []*kinesis.PutRecordsRequestEntry) int { + retCode, err := outputPlugin.sendCurrentBatch(records) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) } @@ -253,15 +250,15 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface return data, nil } -func (outputPlugin *OutputPlugin) sendCurrentBatch() (int, error) { +func (outputPlugin *OutputPlugin) sendCurrentBatch(records []*kinesis.PutRecordsRequestEntry) (int, error) { if outputPlugin.lastInvalidPartitionKeyIndex >= 0 { - logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, outputPlugin.records[outputPlugin.lastInvalidPartitionKeyIndex].Data) + logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, records[outputPlugin.lastInvalidPartitionKeyIndex].Data) outputPlugin.lastInvalidPartitionKeyIndex = -1 } outputPlugin.timer.Check() response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{ - Records: outputPlugin.records, + Records: records, StreamName: aws.String(outputPlugin.stream), }) if err != nil { @@ -274,17 +271,17 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch() (int, error) { } return fluentbit.FLB_RETRY, err } - logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(outputPlugin.records)) + logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(records)) - return outputPlugin.processAPIResponse(response) + return outputPlugin.processAPIResponse(records, response) } // processAPIResponse processes the successful and failed records // it returns an error iff no records succeeded (i.e.) no progress has been made -func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecordsOutput) (int, error) { +func (outputPlugin *OutputPlugin) processAPIResponse(records []*kinesis.PutRecordsRequestEntry, response *kinesis.PutRecordsOutput) (int, error) { if aws.Int64Value(response.FailedRecordCount) > 0 { // start timer if all records failed (no progress has been made) - if aws.Int64Value(response.FailedRecordCount) == int64(len(outputPlugin.records)) { + if aws.Int64Value(response.FailedRecordCount) == int64(len(records)) { outputPlugin.timer.Start() return fluentbit.FLB_RETRY, fmt.Errorf("PutRecords request returned with no records successfully recieved") } @@ -295,7 +292,7 @@ func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecord for i, record := range response.Records { if record.ErrorMessage != nil { logrus.Debugf("[kinesis %d] Record failed to send with error: %s\n", outputPlugin.PluginID, aws.StringValue(record.ErrorMessage)) - failedRecords = append(failedRecords, outputPlugin.records[i]) + failedRecords = append(failedRecords, records[i]) } if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException { logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) @@ -303,16 +300,16 @@ func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecord } } - outputPlugin.records = outputPlugin.records[:0] - outputPlugin.records = append(outputPlugin.records, failedRecords...) + records = records[:0] + records = append(records, failedRecords...) outputPlugin.dataLength = 0 - for _, record := range outputPlugin.records { + for _, record := range records { outputPlugin.dataLength += len(record.Data) } } else { // request fully succeeded outputPlugin.timer.Reset() - outputPlugin.records = outputPlugin.records[:0] + records = records[:0] outputPlugin.dataLength = 0 } return fluentbit.FLB_OK, nil @@ -329,7 +326,7 @@ func (outputPlugin *OutputPlugin) randomString() string { // getPartitionKey returns the value for a given valid key // if the given key is emapty or invalid, it returns a random string -func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interface{}) string { +func (outputPlugin *OutputPlugin) getPartitionKey(records []*kinesis.PutRecordsRequestEntry, record map[interface{}]interface{}) string { partitionKey := outputPlugin.partitionKey if partitionKey != "" { for k, v := range record { @@ -344,7 +341,7 @@ func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interfa } } } - outputPlugin.lastInvalidPartitionKeyIndex = len(outputPlugin.records) % maximumRecordsPerPut + outputPlugin.lastInvalidPartitionKeyIndex = len(records) % maximumRecordsPerPut } return outputPlugin.randomString() } From d3096cf27ee1eb912c1ff1a1c703eeac2797f19d Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 17 May 2020 22:13:22 -0700 Subject: [PATCH 03/29] Add retries to goroutine/concurrent mode --- fluent-bit-kinesis.go | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 35fdb55..b3d76d2 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -32,6 +32,10 @@ const ( maximumRecordsPerPut = 500 ) +const ( + retries = 2 +) + var ( pluginInstances []*kinesis.OutputPlugin ) @@ -113,10 +117,19 @@ func FLBPluginInit(ctx unsafe.Pointer) int { //export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { - go pluginConcurrentFlush(ctx, data, length, tag) + go flushWithRetries(ctx, data, length, tag, retries) return output.FLB_OK } +func flushWithRetries(ctx, data unsafe.Pointer, length C.int, tag *C.char, retries int) { + for i := 0; i < retries; i++ { + retCode := pluginConcurrentFlush(ctx, data, length, tag) + if retCode != output.FLB_RETRY { + break + } + } +} + func pluginConcurrentFlush(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { var count int var ret int From a5c845238f872d60c7b4b61ded0e4d346f1fd433 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 17 May 2020 22:24:28 -0700 Subject: [PATCH 04/29] flush on exit does not make sense in goroutine/concurrent mode --- fluent-bit-kinesis.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index b3d76d2..9ba9ece 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -182,11 +182,10 @@ func pluginConcurrentFlush(ctx, data unsafe.Pointer, length C.int, tag *C.char) //export FLBPluginExit func FLBPluginExit() int { - records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) // Before final exit, call Flush() for all the instances of the Output Plugin - for i := range pluginInstances { - pluginInstances[i].Flush(records) - } + // for i := range pluginInstances { + // pluginInstances[i].Flush(records) + // } return output.FLB_OK } From 195d299b2608aba718d3b987cdf5869256d2a51b Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 17 May 2020 22:41:55 -0700 Subject: [PATCH 05/29] tmp --- kinesis/kinesis.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index c22467e..d41701f 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -251,7 +251,12 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface } func (outputPlugin *OutputPlugin) sendCurrentBatch(records []*kinesis.PutRecordsRequestEntry) (int, error) { + if len(records) == 0 { + return fluentbit.FLB_OK, nil + } if outputPlugin.lastInvalidPartitionKeyIndex >= 0 { + logrus.Infof("lastInvalidPartitionKeyIndex: %d\n", outputPlugin.lastInvalidPartitionKeyIndex) + logrus.Infof("len(records): %d\n", len(records)) logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, records[outputPlugin.lastInvalidPartitionKeyIndex].Data) outputPlugin.lastInvalidPartitionKeyIndex = -1 } From 4282eb702952b58ec35a2ef8e406c0c1209889cc Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 17 May 2020 23:03:22 -0700 Subject: [PATCH 06/29] its not working... --- kinesis/kinesis.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index d41701f..5fa6b8f 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -254,18 +254,17 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch(records []*kinesis.PutRecords if len(records) == 0 { return fluentbit.FLB_OK, nil } - if outputPlugin.lastInvalidPartitionKeyIndex >= 0 { - logrus.Infof("lastInvalidPartitionKeyIndex: %d\n", outputPlugin.lastInvalidPartitionKeyIndex) - logrus.Infof("len(records): %d\n", len(records)) + if outputPlugin.lastInvalidPartitionKeyIndex >= 0 && outputPlugin.lastInvalidPartitionKeyIndex <= len(records) { logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, records[outputPlugin.lastInvalidPartitionKeyIndex].Data) outputPlugin.lastInvalidPartitionKeyIndex = -1 } outputPlugin.timer.Check() - + logrus.Infof("About to send %d records to %s", len(records), outputPlugin.stream) response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{ Records: records, StreamName: aws.String(outputPlugin.stream), }) + logrus.Infof("Tried send %d records", len(records)) if err != nil { logrus.Errorf("[kinesis %d] PutRecords failed with %v\n", outputPlugin.PluginID, err) outputPlugin.timer.Start() From aee0813a149c0f42cc9eeb4c5faa8dd8fe72fc75 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 17 May 2020 23:10:24 -0700 Subject: [PATCH 07/29] Hmmm... --- kinesis/kinesis.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 5fa6b8f..f3c58a4 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -252,6 +252,7 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface func (outputPlugin *OutputPlugin) sendCurrentBatch(records []*kinesis.PutRecordsRequestEntry) (int, error) { if len(records) == 0 { + logrus.Info("No records") return fluentbit.FLB_OK, nil } if outputPlugin.lastInvalidPartitionKeyIndex >= 0 && outputPlugin.lastInvalidPartitionKeyIndex <= len(records) { @@ -264,7 +265,7 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch(records []*kinesis.PutRecords Records: records, StreamName: aws.String(outputPlugin.stream), }) - logrus.Infof("Tried send %d records", len(records)) + logrus.Infof("Tried to send %d records", len(records)) if err != nil { logrus.Errorf("[kinesis %d] PutRecords failed with %v\n", outputPlugin.PluginID, err) outputPlugin.timer.Start() From d4fdcb7f99e9cc720cf97fbef674bb185e262e82 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 17 May 2020 23:35:09 -0700 Subject: [PATCH 08/29] Need to use a pointer to a slice --- fluent-bit-kinesis.go | 4 ++-- kinesis/kinesis.go | 40 ++++++++++++++++++++-------------------- 2 files changed, 22 insertions(+), 22 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 9ba9ece..1747edb 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -165,13 +165,13 @@ func pluginConcurrentFlush(ctx, data unsafe.Pointer, length C.int, tag *C.char) timestamp = time.Now() } - retCode := kinesisOutput.AddRecord(records, record, ×tamp) + retCode := kinesisOutput.AddRecord(&records, record, ×tamp) if retCode != output.FLB_OK { return retCode } count++ } - retCode := kinesisOutput.Flush(records) + retCode := kinesisOutput.Flush(&records) if retCode != output.FLB_OK { return retCode } diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index f3c58a4..9834d36 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -172,7 +172,7 @@ func newPutRecordsClient(roleARN string, awsRegion string, endpoint string) (*ki // AddRecord accepts a record and adds it to the buffer, flushing the buffer if it is full // the return value is one of: FLB_OK FLB_RETRY // API Errors lead to an FLB_RETRY, and data processing errors are logged, the record is discarded and FLB_OK is returned -func (outputPlugin *OutputPlugin) AddRecord(records []*kinesis.PutRecordsRequestEntry, record map[interface{}]interface{}, timeStamp *time.Time) int { +func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsRequestEntry, record map[interface{}]interface{}, timeStamp *time.Time) int { if outputPlugin.timeKey != "" { buf := new(bytes.Buffer) err := outputPlugin.fmtStrftime.Format(buf, *timeStamp) @@ -183,7 +183,7 @@ func (outputPlugin *OutputPlugin) AddRecord(records []*kinesis.PutRecordsRequest record[outputPlugin.timeKey] = buf.String() } - partitionKey := outputPlugin.getPartitionKey(records, record) + partitionKey := outputPlugin.getPartitionKey(*records, record) data, err := outputPlugin.processRecord(record, partitionKey) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) @@ -193,7 +193,7 @@ func (outputPlugin *OutputPlugin) AddRecord(records []*kinesis.PutRecordsRequest newRecordSize := len(data) + len(partitionKey) - if len(records) == maximumRecordsPerPut || (outputPlugin.dataLength+newRecordSize) > maximumPutRecordBatchSize { + if len(*records) == maximumRecordsPerPut || (outputPlugin.dataLength+newRecordSize) > maximumPutRecordBatchSize { retCode, err := outputPlugin.sendCurrentBatch(records) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) @@ -201,7 +201,7 @@ func (outputPlugin *OutputPlugin) AddRecord(records []*kinesis.PutRecordsRequest return retCode } - records = append(records, &kinesis.PutRecordsRequestEntry{ + *records = append(*records, &kinesis.PutRecordsRequestEntry{ Data: data, PartitionKey: aws.String(partitionKey), }) @@ -211,7 +211,7 @@ func (outputPlugin *OutputPlugin) AddRecord(records []*kinesis.PutRecordsRequest // Flush sends the current buffer of log records // Returns FLB_OK, FLB_RETRY, FLB_ERROR -func (outputPlugin *OutputPlugin) Flush(records []*kinesis.PutRecordsRequestEntry) int { +func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEntry) int { retCode, err := outputPlugin.sendCurrentBatch(records) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) @@ -250,22 +250,22 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface return data, nil } -func (outputPlugin *OutputPlugin) sendCurrentBatch(records []*kinesis.PutRecordsRequestEntry) (int, error) { - if len(records) == 0 { +func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecordsRequestEntry) (int, error) { + if len(*records) == 0 { logrus.Info("No records") return fluentbit.FLB_OK, nil } - if outputPlugin.lastInvalidPartitionKeyIndex >= 0 && outputPlugin.lastInvalidPartitionKeyIndex <= len(records) { - logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, records[outputPlugin.lastInvalidPartitionKeyIndex].Data) + if outputPlugin.lastInvalidPartitionKeyIndex >= 0 && outputPlugin.lastInvalidPartitionKeyIndex <= len(*records) { + logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, (*records)[outputPlugin.lastInvalidPartitionKeyIndex].Data) outputPlugin.lastInvalidPartitionKeyIndex = -1 } outputPlugin.timer.Check() - logrus.Infof("About to send %d records to %s", len(records), outputPlugin.stream) + logrus.Infof("About to send %d records to %s", len(*records), outputPlugin.stream) response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{ - Records: records, + Records: *records, StreamName: aws.String(outputPlugin.stream), }) - logrus.Infof("Tried to send %d records", len(records)) + logrus.Infof("Tried to send %d records", len(*records)) if err != nil { logrus.Errorf("[kinesis %d] PutRecords failed with %v\n", outputPlugin.PluginID, err) outputPlugin.timer.Start() @@ -276,17 +276,17 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch(records []*kinesis.PutRecords } return fluentbit.FLB_RETRY, err } - logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(records)) + logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(*records)) return outputPlugin.processAPIResponse(records, response) } // processAPIResponse processes the successful and failed records // it returns an error iff no records succeeded (i.e.) no progress has been made -func (outputPlugin *OutputPlugin) processAPIResponse(records []*kinesis.PutRecordsRequestEntry, response *kinesis.PutRecordsOutput) (int, error) { +func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutRecordsRequestEntry, response *kinesis.PutRecordsOutput) (int, error) { if aws.Int64Value(response.FailedRecordCount) > 0 { // start timer if all records failed (no progress has been made) - if aws.Int64Value(response.FailedRecordCount) == int64(len(records)) { + if aws.Int64Value(response.FailedRecordCount) == int64(len(*records)) { outputPlugin.timer.Start() return fluentbit.FLB_RETRY, fmt.Errorf("PutRecords request returned with no records successfully recieved") } @@ -297,7 +297,7 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records []*kinesis.PutRecor for i, record := range response.Records { if record.ErrorMessage != nil { logrus.Debugf("[kinesis %d] Record failed to send with error: %s\n", outputPlugin.PluginID, aws.StringValue(record.ErrorMessage)) - failedRecords = append(failedRecords, records[i]) + failedRecords = append(failedRecords, (*records)[i]) } if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException { logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) @@ -305,16 +305,16 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records []*kinesis.PutRecor } } - records = records[:0] - records = append(records, failedRecords...) + *records = (*records)[:0] + *records = append(*records, failedRecords...) outputPlugin.dataLength = 0 - for _, record := range records { + for _, record := range *records { outputPlugin.dataLength += len(record.Data) } } else { // request fully succeeded outputPlugin.timer.Reset() - records = records[:0] + *records = (*records)[:0] outputPlugin.dataLength = 0 } return fluentbit.FLB_OK, nil From ab8243bcd813ee6c49d5c4738679acb64703d026 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Sun, 17 May 2020 23:44:53 -0700 Subject: [PATCH 09/29] Fix bug from backoff code change --- kinesis/kinesis.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 9834d36..24638c2 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -197,8 +197,8 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques retCode, err := outputPlugin.sendCurrentBatch(records) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) + return retCode } - return retCode } *records = append(*records, &kinesis.PutRecordsRequestEntry{ From b6bee2dc400af5440487e8b0f298a69c8829b0b0 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Mon, 18 May 2020 23:41:11 -0700 Subject: [PATCH 10/29] Concurrency fix: process incoming C data structures before returning --- fluent-bit-kinesis.go | 52 +++++++++++++++++++++++++++++++------------ 1 file changed, 38 insertions(+), 14 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 1747edb..db3c71b 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -117,36 +117,33 @@ func FLBPluginInit(ctx unsafe.Pointer) int { //export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { - go flushWithRetries(ctx, data, length, tag, retries) + events, timestamps := unpackRecords(data, length) + go flushWithRetries(ctx, tag, events, timestamps, retries) return output.FLB_OK } -func flushWithRetries(ctx, data unsafe.Pointer, length C.int, tag *C.char, retries int) { +func flushWithRetries(ctx unsafe.Pointer, tag *C.char, events []map[interface{}]interface{}, timestamps []time.Time, retries int) { for i := 0; i < retries; i++ { - retCode := pluginConcurrentFlush(ctx, data, length, tag) + retCode := pluginConcurrentFlush(ctx, tag, events, timestamps) if retCode != output.FLB_RETRY { break } } } -func pluginConcurrentFlush(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { +func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{}]interface{}, timestamps []time.Time) { var count int var ret int var ts interface{} var timestamp time.Time var record map[interface{}]interface{} + records = make([]map[interface{}]interface{}, int(length)) + timestamps = make([]time.Time, int(length)) + // Create Fluent Bit decoder dec := output.NewDecoder(data, int(length)) - kinesisOutput := getPluginInstance(ctx) - fluentTag := C.GoString(tag) - logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag) - - // Each flush must have its own output buffer, since flushes can be concurrent - records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) - for { //Extract Record ret, ts, record = output.GetRecord(dec) @@ -165,17 +162,44 @@ func pluginConcurrentFlush(ctx, data unsafe.Pointer, length C.int, tag *C.char) timestamp = time.Now() } - retCode := kinesisOutput.AddRecord(&records, record, ×tamp) + records = append(records, record) + timestamps = append(timestamps, timestamp) + + count++ + } + + return records, timestamps +} + +func pluginConcurrentFlush(ctx unsafe.Pointer, tag *C.char, events []map[interface{}]interface{}, timestamps []time.Time) int { + var i int = 0 + var timestamp time.Time + var event map[interface{}]interface{} + + kinesisOutput := getPluginInstance(ctx) + fluentTag := C.GoString(tag) + logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag) + + // Each flush must have its own output buffe r, since flushes can be concurrent + records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) + + for { + if i >= len(events) || i >= len(timestamps) { + break + } + event = events[i] + timestamp = timestamps[i] + retCode := kinesisOutput.AddRecord(&records, event, ×tamp) if retCode != output.FLB_OK { return retCode } - count++ + i++ } retCode := kinesisOutput.Flush(&records) if retCode != output.FLB_OK { return retCode } - logrus.Debugf("[kinesis %d] Processed %d events with tag %s\n", kinesisOutput.PluginID, count, fluentTag) + logrus.Debugf("[kinesis %d] Processed %d events with tag %s\n", kinesisOutput.PluginID, i, fluentTag) return output.FLB_OK } From dcdc6e059f99231c56090ec13e876fbe7306a84e Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 00:55:21 -0700 Subject: [PATCH 11/29] Fix record length bug --- fluent-bit-kinesis.go | 28 ++++++++++++---------------- 1 file changed, 12 insertions(+), 16 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index db3c71b..86626e2 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -117,29 +117,29 @@ func FLBPluginInit(ctx unsafe.Pointer) int { //export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { - events, timestamps := unpackRecords(data, length) - go flushWithRetries(ctx, tag, events, timestamps, retries) + events, timestamps, count := unpackRecords(data, length) + go flushWithRetries(ctx, tag, count, events, timestamps, retries) return output.FLB_OK } -func flushWithRetries(ctx unsafe.Pointer, tag *C.char, events []map[interface{}]interface{}, timestamps []time.Time, retries int) { +func flushWithRetries(ctx unsafe.Pointer, tag *C.char, count int, events []map[interface{}]interface{}, timestamps []time.Time, retries int) { for i := 0; i < retries; i++ { - retCode := pluginConcurrentFlush(ctx, tag, events, timestamps) + retCode := pluginConcurrentFlush(ctx, tag, count, events, timestamps) if retCode != output.FLB_RETRY { break } } } -func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{}]interface{}, timestamps []time.Time) { - var count int +func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{}]interface{}, timestamps []time.Time, count int) { var ret int var ts interface{} var timestamp time.Time var record map[interface{}]interface{} + count = 0 - records = make([]map[interface{}]interface{}, int(length)) - timestamps = make([]time.Time, int(length)) + records = make([]map[interface{}]interface{}, 100) + timestamps = make([]time.Time, 100) // Create Fluent Bit decoder dec := output.NewDecoder(data, int(length)) @@ -168,11 +168,10 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} count++ } - return records, timestamps + return records, timestamps, count } -func pluginConcurrentFlush(ctx unsafe.Pointer, tag *C.char, events []map[interface{}]interface{}, timestamps []time.Time) int { - var i int = 0 +func pluginConcurrentFlush(ctx unsafe.Pointer, tag *C.char, count int, events []map[interface{}]interface{}, timestamps []time.Time) int { var timestamp time.Time var event map[interface{}]interface{} @@ -183,10 +182,7 @@ func pluginConcurrentFlush(ctx unsafe.Pointer, tag *C.char, events []map[interfa // Each flush must have its own output buffe r, since flushes can be concurrent records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) - for { - if i >= len(events) || i >= len(timestamps) { - break - } + for i := 0; i < count; i++ { event = events[i] timestamp = timestamps[i] retCode := kinesisOutput.AddRecord(&records, event, ×tamp) @@ -199,7 +195,7 @@ func pluginConcurrentFlush(ctx unsafe.Pointer, tag *C.char, events []map[interfa if retCode != output.FLB_OK { return retCode } - logrus.Debugf("[kinesis %d] Processed %d events with tag %s\n", kinesisOutput.PluginID, i, fluentTag) + logrus.Debugf("[kinesis %d] Processed %d events with tag %s\n", kinesisOutput.PluginID, count, fluentTag) return output.FLB_OK } From 62c7464d24fa8f004266e4ea84c874f16d579c6e Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 01:47:52 -0700 Subject: [PATCH 12/29] Debugging --- kinesis/kinesis.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 24638c2..ad220b3 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -191,6 +191,8 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques return fluentbit.FLB_OK } + logrus.Infof("Processing record %\ns", string(data)) + newRecordSize := len(data) + len(partitionKey) if len(*records) == maximumRecordsPerPut || (outputPlugin.dataLength+newRecordSize) > maximumPutRecordBatchSize { From 1dd656eca8ac1bbcf00121fa8e0e1083738ab8ad Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 17:23:26 -0700 Subject: [PATCH 13/29] Debugging... --- fluent-bit-kinesis.go | 13 +++++++++++++ kinesis/kinesis.go | 2 +- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 86626e2..9320b8f 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -26,6 +26,7 @@ import ( "github.com/fluent/fluent-bit-go/output" "github.com/sirupsen/logrus" ) +import jsoniter "github.com/json-iterator/go" const ( // Kinesis API Limit https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords @@ -162,6 +163,18 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} timestamp = time.Now() } + if record == nil { + logrus.Info("unpack: null record") + } else { + var json = jsoniter.ConfigCompatibleWithStandardLibrary + data, err := json.Marshal(record) + if err != nil { + logrus.Infof("unpack: %s\n", err) + } else { + logrus.Infof("unpack: %s\n", string(data)) + } + } + records = append(records, record) timestamps = append(timestamps, timestamp) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index ad220b3..7a86ba2 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -191,7 +191,7 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques return fluentbit.FLB_OK } - logrus.Infof("Processing record %\ns", string(data)) + logrus.Infof("Processing record %s\n", string(data)) newRecordSize := len(data) + len(partitionKey) From f95a0df44acdaafa6c7132cd644f9f22616d0583 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 17:32:43 -0700 Subject: [PATCH 14/29] Debugging... --- fluent-bit-kinesis.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 9320b8f..ebb55bf 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -138,6 +138,7 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} var timestamp time.Time var record map[interface{}]interface{} count = 0 + all_good := true records = make([]map[interface{}]interface{}, 100) timestamps = make([]time.Time, 100) @@ -165,11 +166,15 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} if record == nil { logrus.Info("unpack: null record") + all_good = false } else { var json = jsoniter.ConfigCompatibleWithStandardLibrary data, err := json.Marshal(record) if err != nil { - logrus.Infof("unpack: %s\n", err) + if len(data) == 0 { + logrus.Info("unpack: record has zero length") + all_good = false + } } else { logrus.Infof("unpack: %s\n", string(data)) } @@ -180,6 +185,12 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} count++ } + logrus.Infof("Processed %d records", count) + if all_good { + logrus.Info("All good") + } else { + logrus.Info("Not all good") + } return records, timestamps, count } From ce3e513987a239865b892abe04d68970dbef9d1b Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 17:39:13 -0700 Subject: [PATCH 15/29] Oops --- fluent-bit-kinesis.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index ebb55bf..3c0cfce 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -175,8 +175,6 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} logrus.Info("unpack: record has zero length") all_good = false } - } else { - logrus.Infof("unpack: %s\n", string(data)) } } From 20229de0c79375c6ed0a5a48b094af9fcc54fded Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 17:51:14 -0700 Subject: [PATCH 16/29] hmmm --- fluent-bit-kinesis.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 3c0cfce..ae8f4a6 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -170,7 +170,7 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} } else { var json = jsoniter.ConfigCompatibleWithStandardLibrary data, err := json.Marshal(record) - if err != nil { + if err == nil { if len(data) == 0 { logrus.Info("unpack: record has zero length") all_good = false From cb5cff191f6fcd58586e7dcb207bbee19232ddf3 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 20:10:43 -0700 Subject: [PATCH 17/29] debugging --- fluent-bit-kinesis.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index ae8f4a6..54099a7 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -175,6 +175,9 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} logrus.Info("unpack: record has zero length") all_good = false } + } else { + logrus.Info("unpack: unmarshal error") + all_good = false } } From 8f2af76b3b826af5bfc7b435260a868eb4f742f8 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 22:56:12 -0700 Subject: [PATCH 18/29] What is going on --- fluent-bit-kinesis.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 54099a7..2750638 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -193,6 +193,21 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} logrus.Info("Not all good") } + for i := 0; i < count; i++ { + record = records[i] + if record == nil { + logrus.Infof("unpack: %d is null\n", i) + continue + } + var json = jsoniter.ConfigCompatibleWithStandardLibrary + data, err := json.Marshal(record) + if err == nil { + logrus.Infof("unpack: %s\n", string(data)) + } else { + logrus.Info("unpack 2: unmarshal error") + } + } + return records, timestamps, count } @@ -207,6 +222,21 @@ func pluginConcurrentFlush(ctx unsafe.Pointer, tag *C.char, count int, events [] // Each flush must have its own output buffe r, since flushes can be concurrent records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) + for i := 0; i < count; i++ { + event = events[i] + if event == nil { + logrus.Infof("flush: %d is null\n", i) + continue + } + var json = jsoniter.ConfigCompatibleWithStandardLibrary + data, err := json.Marshal(event) + if err == nil { + logrus.Infof("flush: %s\n", string(data)) + } else { + logrus.Info("flush: unmarshal error") + } + } + for i := 0; i < count; i++ { event = events[i] timestamp = timestamps[i] From c2b6133b3684ea06623cd239697fa5f853718ab3 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 23:09:18 -0700 Subject: [PATCH 19/29] makes no sense at all --- fluent-bit-kinesis.go | 1 - 1 file changed, 1 deletion(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 2750638..7b4fd3c 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -197,7 +197,6 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} record = records[i] if record == nil { logrus.Infof("unpack: %d is null\n", i) - continue } var json = jsoniter.ConfigCompatibleWithStandardLibrary data, err := json.Marshal(record) From ab590103cab2e5a4afa9d02f56ba36f18d8dbd42 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 23:17:28 -0700 Subject: [PATCH 20/29] WAT THE --- fluent-bit-kinesis.go | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 7b4fd3c..304a481 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -167,18 +167,13 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} if record == nil { logrus.Info("unpack: null record") all_good = false + } + var json = jsoniter.ConfigCompatibleWithStandardLibrary + data, err := json.Marshal(record) + if err == nil { + logrus.Infof("unpack 2: %s\n", string(data)) } else { - var json = jsoniter.ConfigCompatibleWithStandardLibrary - data, err := json.Marshal(record) - if err == nil { - if len(data) == 0 { - logrus.Info("unpack: record has zero length") - all_good = false - } - } else { - logrus.Info("unpack: unmarshal error") - all_good = false - } + logrus.Info("unpack 2: unmarshal error") } records = append(records, record) @@ -196,12 +191,12 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} for i := 0; i < count; i++ { record = records[i] if record == nil { - logrus.Infof("unpack: %d is null\n", i) + logrus.Infof("unpack 2: %d is null\n", i) } var json = jsoniter.ConfigCompatibleWithStandardLibrary data, err := json.Marshal(record) if err == nil { - logrus.Infof("unpack: %s\n", string(data)) + logrus.Infof("unpack 2: %s\n", string(data)) } else { logrus.Info("unpack 2: unmarshal error") } From 070ef3b57201c9d2bb7d2fc0b9ce5917942ee386 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Tue, 19 May 2020 23:26:29 -0700 Subject: [PATCH 21/29] cats --- fluent-bit-kinesis.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 304a481..7a0f8ce 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -168,6 +168,7 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} logrus.Info("unpack: null record") all_good = false } + logrus.Info("unpack: %v", record) var json = jsoniter.ConfigCompatibleWithStandardLibrary data, err := json.Marshal(record) if err == nil { @@ -190,6 +191,7 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} for i := 0; i < count; i++ { record = records[i] + logrus.Info("unpack 2: %v", record) if record == nil { logrus.Infof("unpack 2: %d is null\n", i) } From 2f45335f6e666cef2d9dfb96da7c158381a24e3d Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Thu, 21 May 2020 23:00:40 -0700 Subject: [PATCH 22/29] Call AddRecord before spawning goroutine --- fluent-bit-kinesis.go | 94 ++++++++----------------------------------- kinesis/kinesis.go | 53 +++++++++++++----------- 2 files changed, 46 insertions(+), 101 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 7a0f8ce..c90221f 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -26,7 +26,6 @@ import ( "github.com/fluent/fluent-bit-go/output" "github.com/sirupsen/logrus" ) -import jsoniter "github.com/json-iterator/go" const ( // Kinesis API Limit https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords @@ -118,30 +117,32 @@ func FLBPluginInit(ctx unsafe.Pointer) int { //export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { - events, timestamps, count := unpackRecords(data, length) - go flushWithRetries(ctx, tag, count, events, timestamps, retries) + kinesisOutput := getPluginInstance(ctx) + events, count, retCode := unpackRecords(kinesisOutput, data, length) + if retCode != output.FLB_OK { + return retCode + } + go flushWithRetries(kinesisOutput, tag, count, events, retries) return output.FLB_OK } -func flushWithRetries(ctx unsafe.Pointer, tag *C.char, count int, events []map[interface{}]interface{}, timestamps []time.Time, retries int) { +func flushWithRetries(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records []*kinesisAPI.PutRecordsRequestEntry, retries int) { for i := 0; i < retries; i++ { - retCode := pluginConcurrentFlush(ctx, tag, count, events, timestamps) + retCode := pluginConcurrentFlush(kinesisOutput, tag, count, records) if retCode != output.FLB_RETRY { break } } } -func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{}]interface{}, timestamps []time.Time, count int) { +func unpackRecords(kinesisOutput *kinesis.OutputPlugin, data unsafe.Pointer, length C.int) ([]*kinesisAPI.PutRecordsRequestEntry, int, int) { var ret int var ts interface{} var timestamp time.Time var record map[interface{}]interface{} - count = 0 - all_good := true + count := 0 - records = make([]map[interface{}]interface{}, 100) - timestamps = make([]time.Time, 100) + records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) // Create Fluent Bit decoder dec := output.NewDecoder(data, int(length)) @@ -164,84 +165,21 @@ func unpackRecords(data unsafe.Pointer, length C.int) (records []map[interface{} timestamp = time.Now() } - if record == nil { - logrus.Info("unpack: null record") - all_good = false - } - logrus.Info("unpack: %v", record) - var json = jsoniter.ConfigCompatibleWithStandardLibrary - data, err := json.Marshal(record) - if err == nil { - logrus.Infof("unpack 2: %s\n", string(data)) - } else { - logrus.Info("unpack 2: unmarshal error") + retCode := kinesisOutput.AddRecord(&records, record, ×tamp) + if retCode != output.FLB_OK { + return nil, 0, retCode } - records = append(records, record) - timestamps = append(timestamps, timestamp) - count++ } - logrus.Infof("Processed %d records", count) - if all_good { - logrus.Info("All good") - } else { - logrus.Info("Not all good") - } - - for i := 0; i < count; i++ { - record = records[i] - logrus.Info("unpack 2: %v", record) - if record == nil { - logrus.Infof("unpack 2: %d is null\n", i) - } - var json = jsoniter.ConfigCompatibleWithStandardLibrary - data, err := json.Marshal(record) - if err == nil { - logrus.Infof("unpack 2: %s\n", string(data)) - } else { - logrus.Info("unpack 2: unmarshal error") - } - } - return records, timestamps, count + return records, count, output.FLB_OK } -func pluginConcurrentFlush(ctx unsafe.Pointer, tag *C.char, count int, events []map[interface{}]interface{}, timestamps []time.Time) int { - var timestamp time.Time - var event map[interface{}]interface{} - - kinesisOutput := getPluginInstance(ctx) +func pluginConcurrentFlush(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records []*kinesisAPI.PutRecordsRequestEntry) int { fluentTag := C.GoString(tag) logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag) - // Each flush must have its own output buffe r, since flushes can be concurrent - records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) - - for i := 0; i < count; i++ { - event = events[i] - if event == nil { - logrus.Infof("flush: %d is null\n", i) - continue - } - var json = jsoniter.ConfigCompatibleWithStandardLibrary - data, err := json.Marshal(event) - if err == nil { - logrus.Infof("flush: %s\n", string(data)) - } else { - logrus.Info("flush: unmarshal error") - } - } - - for i := 0; i < count; i++ { - event = events[i] - timestamp = timestamps[i] - retCode := kinesisOutput.AddRecord(&records, event, ×tamp) - if retCode != output.FLB_OK { - return retCode - } - i++ - } retCode := kinesisOutput.Flush(&records) if retCode != output.FLB_OK { return retCode diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 7a86ba2..0f7569e 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -80,7 +80,6 @@ type OutputPlugin struct { fmtStrftime *strftime.Strftime lastInvalidPartitionKeyIndex int client PutRecordsClient - dataLength int timer *plugins.Timeout PluginID int random *random @@ -169,9 +168,8 @@ func newPutRecordsClient(roleARN string, awsRegion string, endpoint string) (*ki return client, nil } -// AddRecord accepts a record and adds it to the buffer, flushing the buffer if it is full -// the return value is one of: FLB_OK FLB_RETRY -// API Errors lead to an FLB_RETRY, and data processing errors are logged, the record is discarded and FLB_OK is returned +// AddRecord accepts a record and adds it to the buffer +// the return value is one of: FLB_OK FLB_RETRY FLB_ERROR func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsRequestEntry, record map[interface{}]interface{}, timeStamp *time.Time) int { if outputPlugin.timeKey != "" { buf := new(bytes.Buffer) @@ -191,30 +189,39 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques return fluentbit.FLB_OK } - logrus.Infof("Processing record %s\n", string(data)) - - newRecordSize := len(data) + len(partitionKey) - - if len(*records) == maximumRecordsPerPut || (outputPlugin.dataLength+newRecordSize) > maximumPutRecordBatchSize { - retCode, err := outputPlugin.sendCurrentBatch(records) - if err != nil { - logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) - return retCode - } - } + logrus.Infof("Adding record %s\n", string(data)) *records = append(*records, &kinesis.PutRecordsRequestEntry{ Data: data, PartitionKey: aws.String(partitionKey), }) - outputPlugin.dataLength += newRecordSize return fluentbit.FLB_OK } // Flush sends the current buffer of log records // Returns FLB_OK, FLB_RETRY, FLB_ERROR func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEntry) int { - retCode, err := outputPlugin.sendCurrentBatch(records) + // Each flush must have its own output buffe r, since flushes can be concurrent + requestBuf := make([]*kinesis.PutRecordsRequestEntry, 0, maximumRecordsPerPut) + dataLength := 0 + + for _, record := range *records { + newRecordSize := len(record.Data) + len(aws.StringValue(record.PartitionKey)) + + if len(requestBuf) == maximumRecordsPerPut || (dataLength+newRecordSize) > maximumPutRecordBatchSize { + retCode, err := outputPlugin.sendCurrentBatch(&requestBuf, &dataLength) + if err != nil { + logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) + return retCode + } + } + + requestBuf = append(requestBuf, record) + dataLength += newRecordSize + } + + // send any remaining records + retCode, err := outputPlugin.sendCurrentBatch(&requestBuf, &dataLength) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) } @@ -252,7 +259,7 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface return data, nil } -func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecordsRequestEntry) (int, error) { +func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int) (int, error) { if len(*records) == 0 { logrus.Info("No records") return fluentbit.FLB_OK, nil @@ -280,12 +287,12 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecord } logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(*records)) - return outputPlugin.processAPIResponse(records, response) + return outputPlugin.processAPIResponse(records, dataLength, response) } // processAPIResponse processes the successful and failed records // it returns an error iff no records succeeded (i.e.) no progress has been made -func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutRecordsRequestEntry, response *kinesis.PutRecordsOutput) (int, error) { +func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int, response *kinesis.PutRecordsOutput) (int, error) { if aws.Int64Value(response.FailedRecordCount) > 0 { // start timer if all records failed (no progress has been made) if aws.Int64Value(response.FailedRecordCount) == int64(len(*records)) { @@ -309,15 +316,15 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutReco *records = (*records)[:0] *records = append(*records, failedRecords...) - outputPlugin.dataLength = 0 + *dataLength = 0 for _, record := range *records { - outputPlugin.dataLength += len(record.Data) + *dataLength += len(record.Data) } } else { // request fully succeeded outputPlugin.timer.Reset() *records = (*records)[:0] - outputPlugin.dataLength = 0 + *dataLength = 0 } return fluentbit.FLB_OK, nil } From 4b35c4d5f685c90300cac550bac5afdfbcc9204a Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Thu, 21 May 2020 23:15:40 -0700 Subject: [PATCH 23/29] Remove log message used for debug --- kinesis/kinesis.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 0f7569e..d984e83 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -189,8 +189,6 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques return fluentbit.FLB_OK } - logrus.Infof("Adding record %s\n", string(data)) - *records = append(*records, &kinesis.PutRecordsRequestEntry{ Data: data, PartitionKey: aws.String(partitionKey), @@ -261,7 +259,6 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int) (int, error) { if len(*records) == 0 { - logrus.Info("No records") return fluentbit.FLB_OK, nil } if outputPlugin.lastInvalidPartitionKeyIndex >= 0 && outputPlugin.lastInvalidPartitionKeyIndex <= len(*records) { From f63d185f281c585e7d5ddb97dd6da1751390bd46 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Thu, 21 May 2020 23:26:52 -0700 Subject: [PATCH 24/29] Remove invalid partition key field from output plugin- todo: fix later --- kinesis/kinesis.go | 41 +++++++++++++++++------------------------ 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index d984e83..9ff3452 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -75,14 +75,13 @@ type OutputPlugin struct { // Partition key decides in which shard of your stream the data belongs to partitionKey string // Decides whether to append a newline after each data record - appendNewline bool - timeKey string - fmtStrftime *strftime.Strftime - lastInvalidPartitionKeyIndex int - client PutRecordsClient - timer *plugins.Timeout - PluginID int - random *random + appendNewline bool + timeKey string + fmtStrftime *strftime.Strftime + client PutRecordsClient + timer *plugins.Timeout + PluginID int + random *random } // NewOutputPlugin creates an OutputPlugin object @@ -121,17 +120,16 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, } return &OutputPlugin{ - stream: stream, - client: client, - dataKeys: dataKeys, - partitionKey: partitionKey, - appendNewline: appendNewline, - timeKey: timeKey, - fmtStrftime: timeFormatter, - lastInvalidPartitionKeyIndex: -1, - timer: timer, - PluginID: pluginID, - random: random, + stream: stream, + client: client, + dataKeys: dataKeys, + partitionKey: partitionKey, + appendNewline: appendNewline, + timeKey: timeKey, + fmtStrftime: timeFormatter, + timer: timer, + PluginID: pluginID, + random: random, }, nil } @@ -261,10 +259,6 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecord if len(*records) == 0 { return fluentbit.FLB_OK, nil } - if outputPlugin.lastInvalidPartitionKeyIndex >= 0 && outputPlugin.lastInvalidPartitionKeyIndex <= len(*records) { - logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, (*records)[outputPlugin.lastInvalidPartitionKeyIndex].Data) - outputPlugin.lastInvalidPartitionKeyIndex = -1 - } outputPlugin.timer.Check() logrus.Infof("About to send %d records to %s", len(*records), outputPlugin.stream) response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{ @@ -352,7 +346,6 @@ func (outputPlugin *OutputPlugin) getPartitionKey(records []*kinesis.PutRecordsR } } } - outputPlugin.lastInvalidPartitionKeyIndex = len(records) % maximumRecordsPerPut } return outputPlugin.randomString() } From a7d796282a00d4e7c113794af1533ee61813ebab Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Fri, 22 May 2020 00:10:33 -0700 Subject: [PATCH 25/29] Fix bug in error handling logic --- fluent-bit-kinesis.go | 1 + kinesis/kinesis.go | 2 ++ 2 files changed, 3 insertions(+) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index c90221f..82c4508 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -128,6 +128,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int func flushWithRetries(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records []*kinesisAPI.PutRecordsRequestEntry, retries int) { for i := 0; i < retries; i++ { + // TODO: Would probably want to backoff before retrying? retCode := pluginConcurrentFlush(kinesisOutput, tag, count, records) if retCode != output.FLB_RETRY { break diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 9ff3452..4530230 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -208,6 +208,8 @@ func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEnt retCode, err := outputPlugin.sendCurrentBatch(&requestBuf, &dataLength) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) + } + if retCode != fluentbit.FLB_OK { return retCode } } From 949dd4e4520250d023979113e52398f3832b4b46 Mon Sep 17 00:00:00 2001 From: Wesley Pettit Date: Fri, 22 May 2020 00:36:12 -0700 Subject: [PATCH 26/29] Remove unneeded info statements --- kinesis/kinesis.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 4530230..b8fd64f 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -262,12 +262,10 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecord return fluentbit.FLB_OK, nil } outputPlugin.timer.Check() - logrus.Infof("About to send %d records to %s", len(*records), outputPlugin.stream) response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{ Records: *records, StreamName: aws.String(outputPlugin.stream), }) - logrus.Infof("Tried to send %d records", len(*records)) if err != nil { logrus.Errorf("[kinesis %d] PutRecords failed with %v\n", outputPlugin.PluginID, err) outputPlugin.timer.Start() From 96dd60a90dbe56f5cde8d5a30b2aa20c565b41f6 Mon Sep 17 00:00:00 2001 From: Zack Wine Date: Fri, 29 May 2020 13:09:55 -0400 Subject: [PATCH 27/29] Fix retry logic for concurrent flushes. Fixes duplicate logs when kinesis limits are exceeded. Reduces the likelihood of dropped logs when kinesis provides backpressure (improvements in retries). Adds log messages to indicate when and how many records are dropped due to retry timeouts. Signed-off-by: Zack Wine --- fluent-bit-kinesis.go | 42 +++++++++++++++++++++++++++++++++++------- kinesis/kinesis.go | 36 +++++++++++++++++++++++++++++++----- 2 files changed, 66 insertions(+), 12 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 82c4508..8b59d25 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -33,7 +33,8 @@ const ( ) const ( - retries = 2 + retries = 6 + concurrentRetryLimit = 4 ) var ( @@ -118,8 +119,16 @@ func FLBPluginInit(ctx unsafe.Pointer) int { //export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { kinesisOutput := getPluginInstance(ctx) + + curRetries := kinesisOutput.GetConcurrentRetries() + if curRetries > concurrentRetryLimit { + logrus.Infof("[kinesis] flush returning retry, too many concurrent retries (%d)\n", curRetries) + return output.FLB_RETRY + } + events, count, retCode := unpackRecords(kinesisOutput, data, length) if retCode != output.FLB_OK { + logrus.Errorf("[kinesis] failed to unpackRecords\n") return retCode } go flushWithRetries(kinesisOutput, tag, count, events, retries) @@ -127,12 +136,32 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int } func flushWithRetries(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records []*kinesisAPI.PutRecordsRequestEntry, retries int) { - for i := 0; i < retries; i++ { - // TODO: Would probably want to backoff before retrying? - retCode := pluginConcurrentFlush(kinesisOutput, tag, count, records) + var retCode, tries int + + backoff := kinesisOutput.GetConcurrentRetries() + + for tries = 0; tries < retries; tries++ { + if backoff > 0 { + // Wait if other goroutines are in backoff mode, as well as implement a progressive backoff + time.Sleep(time.Duration((2^backoff)*100) * time.Millisecond) + } + + logrus.Debugf("[kinesis] Sending (%p) (%d) records, backoff=(%d)", records, len(records), backoff) + retCode = pluginConcurrentFlush(kinesisOutput, tag, count, &records) if retCode != output.FLB_RETRY { break } + backoff = kinesisOutput.AddConcurrentRetries(1) + logrus.Infof("[kinesis] Going to retry with (%p) (%d) records, backoff=(%d)", records, len(records), backoff) + } + if tries > 0 { + kinesisOutput.AddConcurrentRetries(int64(-tries)) + } + if retCode == output.FLB_ERROR { + logrus.Errorf("[kinesis] Failed to flush (%d) records with error", len(records)) + } + if retCode == output.FLB_RETRY { + logrus.Errorf("[kinesis] Failed flush (%d) records after retries %d", len(records), retries) } } @@ -177,11 +206,10 @@ func unpackRecords(kinesisOutput *kinesis.OutputPlugin, data unsafe.Pointer, len return records, count, output.FLB_OK } -func pluginConcurrentFlush(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records []*kinesisAPI.PutRecordsRequestEntry) int { +func pluginConcurrentFlush(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records *[]*kinesisAPI.PutRecordsRequestEntry) int { fluentTag := C.GoString(tag) logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag) - - retCode := kinesisOutput.Flush(&records) + retCode := kinesisOutput.Flush(records) if retCode != output.FLB_OK { return retCode } diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index b8fd64f..d11b7ae 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -21,6 +21,7 @@ import ( "fmt" "math/rand" "os" + "sync/atomic" "time" "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" @@ -82,6 +83,8 @@ type OutputPlugin struct { timer *plugins.Timeout PluginID int random *random + // Used to implement backoff for concurrent flushes + concurrentRetries int64 } // NewOutputPlugin creates an OutputPlugin object @@ -201,7 +204,7 @@ func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEnt requestBuf := make([]*kinesis.PutRecordsRequestEntry, 0, maximumRecordsPerPut) dataLength := 0 - for _, record := range *records { + for i, record := range *records { newRecordSize := len(record.Data) + len(aws.StringValue(record.PartitionKey)) if len(requestBuf) == maximumRecordsPerPut || (dataLength+newRecordSize) > maximumPutRecordBatchSize { @@ -210,6 +213,10 @@ func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEnt logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) } if retCode != fluentbit.FLB_OK { + unsent := (*records)[i:] + // requestBuf will contain records sendCurrentBatch failed to send, + // combine those with the records yet to be sent/batched + *records = append(requestBuf, unsent...) return retCode } } @@ -223,6 +230,8 @@ func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEnt if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) } + // requestBuf will contain records sendCurrentBatch failed to send + *records = requestBuf return retCode } @@ -284,6 +293,9 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecord // processAPIResponse processes the successful and failed records // it returns an error iff no records succeeded (i.e.) no progress has been made func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int, response *kinesis.PutRecordsOutput) (int, error) { + + var retCode int = fluentbit.FLB_OK + if aws.Int64Value(response.FailedRecordCount) > 0 { // start timer if all records failed (no progress has been made) if aws.Int64Value(response.FailedRecordCount) == int64(len(*records)) { @@ -291,7 +303,7 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutReco return fluentbit.FLB_RETRY, fmt.Errorf("PutRecords request returned with no records successfully recieved") } - logrus.Warnf("[kinesis %d] %d records failed to be delivered. Will retry.\n", outputPlugin.PluginID, aws.Int64Value(response.FailedRecordCount)) + logrus.Warnf("[kinesis %d] %d/%d records failed to be delivered. Will retry.\n", outputPlugin.PluginID, aws.Int64Value(response.FailedRecordCount), len(*records)) failedRecords := make([]*kinesis.PutRecordsRequestEntry, 0, aws.Int64Value(response.FailedRecordCount)) // try to resend failed records for i, record := range response.Records { @@ -299,12 +311,16 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutReco logrus.Debugf("[kinesis %d] Record failed to send with error: %s\n", outputPlugin.PluginID, aws.StringValue(record.ErrorMessage)) failedRecords = append(failedRecords, (*records)[i]) } + if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException { - logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) - return fluentbit.FLB_RETRY, nil + retCode = fluentbit.FLB_RETRY } } + if retCode == fluentbit.FLB_RETRY { + logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) + } + *records = (*records)[:0] *records = append(*records, failedRecords...) *dataLength = 0 @@ -317,7 +333,7 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutReco *records = (*records)[:0] *dataLength = 0 } - return fluentbit.FLB_OK, nil + return retCode, nil } // randomString generates a random string of length 8 @@ -361,3 +377,13 @@ func stringOrByteArray(v interface{}) string { return "" } } + +// GetConcurrentRetries value (goroutine safe) +func (outputPlugin *OutputPlugin) GetConcurrentRetries() int64 { + return atomic.LoadInt64(&outputPlugin.concurrentRetries) +} + +// AddConcurrentRetries will update the value (goroutine safe) +func (outputPlugin *OutputPlugin) AddConcurrentRetries(val int64) int64 { + return atomic.AddInt64(&outputPlugin.concurrentRetries, int64(val)) +} From 87c39b447cfc60e75726ca774792977a8a496beb Mon Sep 17 00:00:00 2001 From: Zack Wine Date: Mon, 1 Jun 2020 13:55:29 -0400 Subject: [PATCH 28/29] Address code review. Rename backoff to currentRetries for clarity. Signed-off-by: Zack Wine --- fluent-bit-kinesis.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 8b59d25..5516498 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -138,21 +138,21 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int func flushWithRetries(kinesisOutput *kinesis.OutputPlugin, tag *C.char, count int, records []*kinesisAPI.PutRecordsRequestEntry, retries int) { var retCode, tries int - backoff := kinesisOutput.GetConcurrentRetries() + currentRetries := kinesisOutput.GetConcurrentRetries() for tries = 0; tries < retries; tries++ { - if backoff > 0 { - // Wait if other goroutines are in backoff mode, as well as implement a progressive backoff - time.Sleep(time.Duration((2^backoff)*100) * time.Millisecond) + if currentRetries > 0 { + // Wait if other goroutines are retrying, as well as implement a progressive backoff + time.Sleep(time.Duration((2^currentRetries)*100) * time.Millisecond) } - logrus.Debugf("[kinesis] Sending (%p) (%d) records, backoff=(%d)", records, len(records), backoff) + logrus.Debugf("[kinesis] Sending (%p) (%d) records, currentRetries=(%d)", records, len(records), currentRetries) retCode = pluginConcurrentFlush(kinesisOutput, tag, count, &records) if retCode != output.FLB_RETRY { break } - backoff = kinesisOutput.AddConcurrentRetries(1) - logrus.Infof("[kinesis] Going to retry with (%p) (%d) records, backoff=(%d)", records, len(records), backoff) + currentRetries = kinesisOutput.AddConcurrentRetries(1) + logrus.Infof("[kinesis] Going to retry with (%p) (%d) records, currentRetries=(%d)", records, len(records), currentRetries) } if tries > 0 { kinesisOutput.AddConcurrentRetries(int64(-tries)) From 898263f7f6166ff711f207811cd9ca3bf0fb095e Mon Sep 17 00:00:00 2001 From: Zack Wine Date: Tue, 2 Jun 2020 14:14:48 -0400 Subject: [PATCH 29/29] Address code review. Make limitsExceeded var to ensure message is valid. Signed-off-by: Zack Wine --- kinesis/kinesis.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index d11b7ae..dd12574 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -295,6 +295,7 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecord func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int, response *kinesis.PutRecordsOutput) (int, error) { var retCode int = fluentbit.FLB_OK + var limitsExceeded bool if aws.Int64Value(response.FailedRecordCount) > 0 { // start timer if all records failed (no progress has been made) @@ -314,10 +315,11 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutReco if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException { retCode = fluentbit.FLB_RETRY + limitsExceeded = true } } - if retCode == fluentbit.FLB_RETRY { + if limitsExceeded { logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) }