diff --git a/README.md b/README.md index 69d94d5..78fa1b7 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,9 @@ If you think you’ve found a potential security issue, please do not post it in * `endpoint`: Specify a custom endpoint for the Kinesis Streams API. * `append_newline`: If you set append_newline as true, a newline will be addded after each log record. * `time_key`: Add the timestamp to the record under this key. By default the timestamp from Fluent Bit will not be added to records sent to Kinesis. -* `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`. +* `time_key_format`: [strftime](http://man7.org/linux/man-pages/man3/strftime.3.html) compliant format string for the timestamp; for example, `%Y-%m-%dT%H:%M:%S%z`. This option is used with `time_key`. +* `experimental_concurrency`: Specify a limit of concurrent go routines for flushing records to kinesis. By default `experimental_concurrency` is set to 0 and records are flushed in Fluent Bit's single thread. This means that requests to Kinesis will block the execution of Fluent Bit. If this value is set to `4` for example then calls to Flush records from fluentbit will spawn concurrent go routines until the limit of `4` concurrent go routines are running. Once the `experimental_concurrency` limit is reached calls to Flush will return a retry code. The upper limit of the `experimental_concurrency` option is `10`. WARNING: Enabling `experimental_concurrency` can lead to data loss if the retry count is reached. Enabling concurrency will increase resource usage (memory and CPU). +* `experimental_concurrency_retries`: Specify a limit to the number of retries concurrent goroutines will attempt. By default `4` retries will be attempted before records are dropped. ### Permissions diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index 96dfd63..aa8f1f5 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -16,16 +16,25 @@ package main import ( "C" "fmt" + "strconv" "strings" "time" "unsafe" "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" "github.com/aws/amazon-kinesis-streams-for-fluent-bit/kinesis" + kinesisAPI "github.com/aws/aws-sdk-go/service/kinesis" "github.com/fluent/fluent-bit-go/output" "github.com/sirupsen/logrus" ) +const ( + // Kinesis API Limit https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/#Kinesis.PutRecords + maximumRecordsPerPut = 500 + maximumConcurrency = 10 + defaultConcurrentRetries = 4 +) + var ( pluginInstances []*kinesis.OutputPlugin ) @@ -66,6 +75,10 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, logrus.Infof("[firehose %d] plugin parameter time_key = '%s'\n", pluginID, timeKey) timeKeyFmt := output.FLBPluginConfigKey(ctx, "time_key_format") logrus.Infof("[firehose %d] plugin parameter time_key_format = '%s'\n", pluginID, timeKeyFmt) + concurrency := output.FLBPluginConfigKey(ctx, "experimental_concurrency") + logrus.Infof("[firehose %d] plugin parameter experimental_concurrency = '%s'\n", pluginID, concurrency) + concurrencyRetries := output.FLBPluginConfigKey(ctx, "experimental_concurrency_retries") + logrus.Infof("[firehose %d] plugin parameter experimental_concurrency_retries = '%s'\n", pluginID, concurrencyRetries) if stream == "" || region == "" { return nil, fmt.Errorf("[kinesis %d] stream and region are required configuration parameters", pluginID) @@ -83,7 +96,41 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, if strings.ToLower(appendNewline) == "true" { appendNL = true } - return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeKeyFmt, appendNL, pluginID) + + var concurrencyInt, concurrencyRetriesInt int + var err error + if concurrency != "" { + concurrencyInt, err = strconv.Atoi(concurrency) + if err != nil { + logrus.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value %s specified: %v", pluginID, concurrency, err) + return nil, err + } + if concurrencyInt < 0 { + return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value (%s) specified, must be a non-negative number", pluginID, concurrency) + } + + if concurrencyInt > maximumConcurrency { + return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency' value (%s) specified, must be less than or equal to %d", pluginID, concurrency, maximumConcurrency) + } + + if concurrencyInt > 0 { + logrus.Warnf("[kinesis %d] WARNING: Enabling concurrency can lead to data loss. If 'experimental_concurrency_retries' is reached data will be lost.", pluginID) + } + } + + if concurrencyRetries != "" { + concurrencyRetriesInt, err = strconv.Atoi(concurrencyRetries) + if err != nil { + return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency_retries' value (%s) specified: %v", pluginID, concurrencyRetries, err) + } + if concurrencyRetriesInt < 0 { + return nil, fmt.Errorf("[kinesis %d] Invalid 'experimental_concurrency_retries' value (%s) specified, must be a non-negative number", pluginID, concurrencyRetries) + } + } else { + concurrencyRetriesInt = defaultConcurrentRetries + } + + return kinesis.NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeKeyFmt, concurrencyInt, concurrencyRetriesInt, appendNL, pluginID) } // The "export" comments have syntactic meaning @@ -107,19 +154,37 @@ func FLBPluginInit(ctx unsafe.Pointer) int { //export FLBPluginFlushCtx func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int { - var count int + kinesisOutput := getPluginInstance(ctx) + + fluentTag := C.GoString(tag) + + events, count, retCode := unpackRecords(kinesisOutput, data, length) + if retCode != output.FLB_OK { + logrus.Errorf("[kinesis %d] failed to unpackRecords with tag: %s\n", kinesisOutput.PluginID, fluentTag) + + return retCode + } + + logrus.Debugf("[kinesis %d] Flushing %d logs with tag: %s\n", kinesisOutput.PluginID, count, fluentTag) + if kinesisOutput.Concurrency > 0 { + return kinesisOutput.FlushConcurrent(count, events) + } + + return kinesisOutput.Flush(&events) +} + +func unpackRecords(kinesisOutput *kinesis.OutputPlugin, data unsafe.Pointer, length C.int) ([]*kinesisAPI.PutRecordsRequestEntry, int, int) { var ret int var ts interface{} var timestamp time.Time var record map[interface{}]interface{} + count := 0 + + records := make([]*kinesisAPI.PutRecordsRequestEntry, 0, maximumRecordsPerPut) // Create Fluent Bit decoder dec := output.NewDecoder(data, int(length)) - kinesisOutput := getPluginInstance(ctx) - fluentTag := C.GoString(tag) - logrus.Debugf("[kinesis %d] Found logs with tag: %s\n", kinesisOutput.PluginID, fluentTag) - for { //Extract Record ret, ts, record = output.GetRecord(dec) @@ -138,27 +203,19 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int timestamp = time.Now() } - retCode := kinesisOutput.AddRecord(record, ×tamp) + retCode := kinesisOutput.AddRecord(&records, record, ×tamp) if retCode != output.FLB_OK { - return retCode + return nil, 0, retCode } + count++ } - retCode := kinesisOutput.Flush() - if retCode != output.FLB_OK { - return retCode - } - logrus.Debugf("[kinesis %d] Processed %d events with tag %s\n", kinesisOutput.PluginID, count, fluentTag) - return output.FLB_OK + return records, count, output.FLB_OK } //export FLBPluginExit func FLBPluginExit() int { - // Before final exit, call Flush() for all the instances of the Output Plugin - for i := range pluginInstances { - pluginInstances[i].Flush() - } return output.FLB_OK } diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 4cbf583..7be85a4 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -21,6 +21,7 @@ import ( "fmt" "math/rand" "os" + "sync/atomic" "time" "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" @@ -30,6 +31,7 @@ import ( "github.com/aws/aws-sdk-go/aws/endpoints" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/kinesis" + "github.com/fluent/fluent-bit-go/output" fluentbit "github.com/fluent/fluent-bit-go/output" jsoniter "github.com/json-iterator/go" "github.com/lestrrat-go/strftime" @@ -75,26 +77,28 @@ type OutputPlugin struct { // Partition key decides in which shard of your stream the data belongs to partitionKey string // Decides whether to append a newline after each data record - appendNewline bool - timeKey string - fmtStrftime *strftime.Strftime - lastInvalidPartitionKeyIndex int - client PutRecordsClient - records []*kinesis.PutRecordsRequestEntry - dataLength int - timer *plugins.Timeout - PluginID int - random *random + appendNewline bool + timeKey string + fmtStrftime *strftime.Strftime + client PutRecordsClient + timer *plugins.Timeout + PluginID int + random *random + Concurrency int + concurrencyRetryLimit int + // Concurrency is the limit, goroutineCount represents the running goroutines + goroutineCount int32 + // Used to implement backoff for concurrent flushes + concurrentRetries uint32 } // NewOutputPlugin creates an OutputPlugin object -func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeFmt string, appendNewline bool, pluginID int) (*OutputPlugin, error) { +func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, timeKey, timeFmt string, concurrency, retryLimit int, appendNewline bool, pluginID int) (*OutputPlugin, error) { client, err := newPutRecordsClient(roleARN, region, endpoint) if err != nil { return nil, err } - records := make([]*kinesis.PutRecordsRequestEntry, 0, maximumRecordsPerPut) timer, err := plugins.NewTimeout(func(d time.Duration) { logrus.Errorf("[kinesis %d] timeout threshold reached: Failed to send logs for %s\n", pluginID, d.String()) logrus.Errorf("[kinesis %d] Quitting Fluent Bit", pluginID) @@ -124,18 +128,18 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, endpoint, } return &OutputPlugin{ - stream: stream, - client: client, - records: records, - dataKeys: dataKeys, - partitionKey: partitionKey, - appendNewline: appendNewline, - timeKey: timeKey, - fmtStrftime: timeFormatter, - lastInvalidPartitionKeyIndex: -1, - timer: timer, - PluginID: pluginID, - random: random, + stream: stream, + client: client, + dataKeys: dataKeys, + partitionKey: partitionKey, + appendNewline: appendNewline, + timeKey: timeKey, + fmtStrftime: timeFormatter, + timer: timer, + PluginID: pluginID, + random: random, + Concurrency: concurrency, + concurrencyRetryLimit: retryLimit, }, nil } @@ -172,10 +176,9 @@ func newPutRecordsClient(roleARN string, awsRegion string, endpoint string) (*ki return client, nil } -// AddRecord accepts a record and adds it to the buffer, flushing the buffer if it is full -// the return value is one of: FLB_OK FLB_RETRY -// API Errors lead to an FLB_RETRY, and data processing errors are logged, the record is discarded and FLB_OK is returned -func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, timeStamp *time.Time) int { +// AddRecord accepts a record and adds it to the buffer +// the return value is one of: FLB_OK FLB_RETRY FLB_ERROR +func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsRequestEntry, record map[interface{}]interface{}, timeStamp *time.Time) int { if outputPlugin.timeKey != "" { buf := new(bytes.Buffer) err := outputPlugin.fmtStrftime.Format(buf, *timeStamp) @@ -194,36 +197,120 @@ func (outputPlugin *OutputPlugin) AddRecord(record map[interface{}]interface{}, return fluentbit.FLB_OK } - newRecordSize := len(data) + len(partitionKey) - - if len(outputPlugin.records) == maximumRecordsPerPut || (outputPlugin.dataLength+newRecordSize) > maximumPutRecordBatchSize { - retCode, err := outputPlugin.sendCurrentBatch() - if err != nil { - logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) - } - if retCode != fluentbit.FLB_OK { - return retCode - } - } - - outputPlugin.records = append(outputPlugin.records, &kinesis.PutRecordsRequestEntry{ + *records = append(*records, &kinesis.PutRecordsRequestEntry{ Data: data, PartitionKey: aws.String(partitionKey), }) - outputPlugin.dataLength += newRecordSize return fluentbit.FLB_OK } // Flush sends the current buffer of log records // Returns FLB_OK, FLB_RETRY, FLB_ERROR -func (outputPlugin *OutputPlugin) Flush() int { - retCode, err := outputPlugin.sendCurrentBatch() +func (outputPlugin *OutputPlugin) Flush(records *[]*kinesis.PutRecordsRequestEntry) int { + // Use a different buffer to batch the logs + requestBuf := make([]*kinesis.PutRecordsRequestEntry, 0, maximumRecordsPerPut) + dataLength := 0 + + for i, record := range *records { + newRecordSize := len(record.Data) + len(aws.StringValue(record.PartitionKey)) + + if len(requestBuf) == maximumRecordsPerPut || (dataLength+newRecordSize) > maximumPutRecordBatchSize { + retCode, err := outputPlugin.sendCurrentBatch(&requestBuf, &dataLength) + if err != nil { + logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) + } + if retCode != fluentbit.FLB_OK { + unsent := (*records)[i:] + // requestBuf will contain records sendCurrentBatch failed to send, + // combine those with the records yet to be sent/batched + *records = append(requestBuf, unsent...) + return retCode + } + } + + requestBuf = append(requestBuf, record) + dataLength += newRecordSize + } + + // send any remaining records + retCode, err := outputPlugin.sendCurrentBatch(&requestBuf, &dataLength) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) } + + if retCode == output.FLB_OK { + logrus.Debugf("[kinesis %d] Flushed %d logs\n", outputPlugin.PluginID, len(*records)) + } + + // requestBuf will contain records sendCurrentBatch failed to send + *records = requestBuf return retCode } +// FlushWithRetries sends the current buffer of log records, with retries +func (outputPlugin *OutputPlugin) FlushWithRetries(count int, records []*kinesis.PutRecordsRequestEntry) { + var retCode, tries int + + currentRetries := outputPlugin.getConcurrentRetries() + outputPlugin.addGoroutineCount(1) + + for tries = 0; tries < outputPlugin.concurrencyRetryLimit; tries++ { + if currentRetries > 0 { + // Wait if other goroutines are retrying, as well as implement a progressive backoff + if currentRetries > uint32(outputPlugin.concurrencyRetryLimit) { + time.Sleep(time.Duration((1< 0 { + outputPlugin.addConcurrentRetries(-tries) + } + + switch retCode { + case output.FLB_ERROR: + logrus.Errorf("[kinesis %d] Failed to send (%d) records with error", outputPlugin.PluginID, len(records)) + case output.FLB_RETRY: + logrus.Errorf("[kinesis %d] Failed to send (%d) records after retries %d", outputPlugin.PluginID, len(records), outputPlugin.concurrencyRetryLimit) + case output.FLB_OK: + logrus.Debugf("[kinesis %d] Flushed %d records\n", outputPlugin.PluginID, count) + } +} + +// FlushConcurrent sends the current buffer of log records in a goroutine with retries +// Returns FLB_OK, FLB_RETRY +// Will return FLB_RETRY if the limit of concurrency has been reached +func (outputPlugin *OutputPlugin) FlushConcurrent(count int, records []*kinesis.PutRecordsRequestEntry) int { + + runningGoRoutines := outputPlugin.getGoroutineCount() + if runningGoRoutines+1 > int32(outputPlugin.Concurrency) { + logrus.Infof("[kinesis %d] flush returning retry, concurrency limit reached (%d)\n", outputPlugin.PluginID, runningGoRoutines) + return output.FLB_RETRY + } + + curRetries := outputPlugin.getConcurrentRetries() + if curRetries > 0 { + logrus.Infof("[kinesis %d] flush returning retry, kinesis retries in progress (%d)\n", outputPlugin.PluginID, curRetries) + return output.FLB_RETRY + } + + go outputPlugin.FlushWithRetries(count, records) + + return output.FLB_OK + +} + func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface{}, partitionKey string) ([]byte, error) { if outputPlugin.dataKeys != "" { record = plugins.DataKeys(outputPlugin.dataKeys, record) @@ -255,15 +342,13 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface return data, nil } -func (outputPlugin *OutputPlugin) sendCurrentBatch() (int, error) { - if outputPlugin.lastInvalidPartitionKeyIndex >= 0 { - logrus.Errorf("[kinesis %d] Invalid partition key. Failed to find partition_key %s in log record %s", outputPlugin.PluginID, outputPlugin.partitionKey, outputPlugin.records[outputPlugin.lastInvalidPartitionKeyIndex].Data) - outputPlugin.lastInvalidPartitionKeyIndex = -1 +func (outputPlugin *OutputPlugin) sendCurrentBatch(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int) (int, error) { + if len(*records) == 0 { + return fluentbit.FLB_OK, nil } outputPlugin.timer.Check() - response, err := outputPlugin.client.PutRecords(&kinesis.PutRecordsInput{ - Records: outputPlugin.records, + Records: *records, StreamName: aws.String(outputPlugin.stream), }) if err != nil { @@ -276,48 +361,57 @@ func (outputPlugin *OutputPlugin) sendCurrentBatch() (int, error) { } return fluentbit.FLB_RETRY, err } - logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(outputPlugin.records)) + logrus.Debugf("[kinesis %d] Sent %d events to Kinesis\n", outputPlugin.PluginID, len(*records)) - return outputPlugin.processAPIResponse(response) + return outputPlugin.processAPIResponse(records, dataLength, response) } // processAPIResponse processes the successful and failed records // it returns an error iff no records succeeded (i.e.) no progress has been made -func (outputPlugin *OutputPlugin) processAPIResponse(response *kinesis.PutRecordsOutput) (int, error) { +func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutRecordsRequestEntry, dataLength *int, response *kinesis.PutRecordsOutput) (int, error) { + + var retCode int = fluentbit.FLB_OK + var limitsExceeded bool + if aws.Int64Value(response.FailedRecordCount) > 0 { // start timer if all records failed (no progress has been made) - if aws.Int64Value(response.FailedRecordCount) == int64(len(outputPlugin.records)) { + if aws.Int64Value(response.FailedRecordCount) == int64(len(*records)) { outputPlugin.timer.Start() return fluentbit.FLB_RETRY, fmt.Errorf("PutRecords request returned with no records successfully recieved") } - logrus.Warnf("[kinesis %d] %d records failed to be delivered. Will retry.\n", outputPlugin.PluginID, aws.Int64Value(response.FailedRecordCount)) + logrus.Warnf("[kinesis %d] %d/%d records failed to be delivered. Will retry.\n", outputPlugin.PluginID, aws.Int64Value(response.FailedRecordCount), len(*records)) failedRecords := make([]*kinesis.PutRecordsRequestEntry, 0, aws.Int64Value(response.FailedRecordCount)) // try to resend failed records for i, record := range response.Records { if record.ErrorMessage != nil { logrus.Debugf("[kinesis %d] Record failed to send with error: %s\n", outputPlugin.PluginID, aws.StringValue(record.ErrorMessage)) - failedRecords = append(failedRecords, outputPlugin.records[i]) + failedRecords = append(failedRecords, (*records)[i]) } + if aws.StringValue(record.ErrorCode) == kinesis.ErrCodeProvisionedThroughputExceededException { - logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) - return fluentbit.FLB_RETRY, nil + retCode = fluentbit.FLB_RETRY + limitsExceeded = true } } - outputPlugin.records = outputPlugin.records[:0] - outputPlugin.records = append(outputPlugin.records, failedRecords...) - outputPlugin.dataLength = 0 - for _, record := range outputPlugin.records { - outputPlugin.dataLength += len(record.Data) + if limitsExceeded { + logrus.Warnf("[kinesis %d] Throughput limits for the stream may have been exceeded.", outputPlugin.PluginID) + } + + *records = (*records)[:0] + *records = append(*records, failedRecords...) + *dataLength = 0 + for _, record := range *records { + *dataLength += len(record.Data) } } else { // request fully succeeded outputPlugin.timer.Reset() - outputPlugin.records = outputPlugin.records[:0] - outputPlugin.dataLength = 0 + *records = (*records)[:0] + *dataLength = 0 } - return fluentbit.FLB_OK, nil + return retCode, nil } // randomString generates a random string of length 8 @@ -330,7 +424,7 @@ func (outputPlugin *OutputPlugin) randomString() string { } // getPartitionKey returns the value for a given valid key -// if the given key is emapty or invalid, it returns a random string +// if the given key is empty or invalid, it returns a random string func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interface{}) string { partitionKey := outputPlugin.partitionKey if partitionKey != "" { @@ -346,7 +440,6 @@ func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interfa } } } - outputPlugin.lastInvalidPartitionKeyIndex = len(outputPlugin.records) % maximumRecordsPerPut } return outputPlugin.randomString() } @@ -362,3 +455,23 @@ func stringOrByteArray(v interface{}) string { return "" } } + +// getConcurrentRetries value (goroutine safe) +func (outputPlugin *OutputPlugin) getConcurrentRetries() uint32 { + return atomic.LoadUint32(&outputPlugin.concurrentRetries) +} + +// addConcurrentRetries will update the value (goroutine safe) +func (outputPlugin *OutputPlugin) addConcurrentRetries(val int) uint32 { + return atomic.AddUint32(&outputPlugin.concurrentRetries, uint32(val)) +} + +// getConcurrentRetries value (goroutine safe) +func (outputPlugin *OutputPlugin) getGoroutineCount() int32 { + return atomic.LoadInt32(&outputPlugin.goroutineCount) +} + +// addConcurrentRetries will update the value (goroutine safe) +func (outputPlugin *OutputPlugin) addGoroutineCount(val int) int32 { + return atomic.AddInt32(&outputPlugin.goroutineCount, int32(val)) +} diff --git a/kinesis/kinesis_test.go b/kinesis/kinesis_test.go index 1cff431..20149a3 100644 --- a/kinesis/kinesis_test.go +++ b/kinesis/kinesis_test.go @@ -18,7 +18,6 @@ import ( // newMockOutputPlugin creates an mock OutputPlugin object func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient) (*OutputPlugin, error) { - records := make([]*kinesis.PutRecordsRequestEntry, 0, 500) timer, _ := plugins.NewTimeout(func(d time.Duration) { logrus.Errorf("[kinesis] timeout threshold reached: Failed to send logs for %v", d) @@ -34,15 +33,13 @@ func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient) (*OutputPlug } return &OutputPlugin{ - stream: "stream", - client: client, - records: records, - dataKeys: "", - partitionKey: "", - lastInvalidPartitionKeyIndex: -1, - timer: timer, - PluginID: 0, - random: random, + stream: "stream", + client: client, + dataKeys: "", + partitionKey: "", + timer: timer, + PluginID: 0, + random: random, }, nil } @@ -67,6 +64,8 @@ func TestStringOrByteArray(t *testing.T) { } func TestAddRecord(t *testing.T) { + records := make([]*kinesis.PutRecordsRequestEntry, 0, 500) + record := map[interface{}]interface{}{ "testkey": []byte("test value"), } @@ -74,12 +73,14 @@ func TestAddRecord(t *testing.T) { outputPlugin, _ := newMockOutputPlugin(nil) timeStamp := time.Now() - retCode := outputPlugin.AddRecord(record, &timeStamp) + retCode := outputPlugin.AddRecord(&records, record, &timeStamp) assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") - assert.Len(t, outputPlugin.records, 1, "Expected output to contain 1 record") + assert.Len(t, records, 1, "Expected output to contain 1 record") } func TestAddRecordAndFlush(t *testing.T) { + records := make([]*kinesis.PutRecordsRequestEntry, 0, 500) + record := map[interface{}]interface{}{ "testkey": []byte("test value"), } @@ -94,9 +95,9 @@ func TestAddRecordAndFlush(t *testing.T) { outputPlugin, _ := newMockOutputPlugin(mockKinesis) timeStamp := time.Now() - retCode := outputPlugin.AddRecord(record, &timeStamp) + retCode := outputPlugin.AddRecord(&records, record, &timeStamp) assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") - retCode = outputPlugin.Flush() + retCode = outputPlugin.Flush(&records) assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") }