diff --git a/firehose/firehose.go b/firehose/firehose.go index 48d22c5..f2578cf 100644 --- a/firehose/firehose.go +++ b/firehose/firehose.go @@ -60,7 +60,6 @@ type OutputPlugin struct { client PutRecordBatcher records []*firehose.Record dataLength int - backoff *plugins.Backoff timer *plugins.Timeout PluginID int } @@ -106,7 +105,6 @@ func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, endpoint, timeKe client: client, records: records, dataKeys: dataKeys, - backoff: plugins.NewBackoff(), timer: timer, timeKey: timeKey, fmtStrftime: timeFormatter, @@ -162,11 +160,12 @@ func (output *OutputPlugin) AddRecord(record map[interface{}]interface{}, timeSt newDataSize := len(data) if len(output.records) == maximumRecordsPerPut || (output.dataLength+newDataSize) > maximumPutRecordBatchSize { - err = output.sendCurrentBatch() + retCode, err := output.sendCurrentBatch() if err != nil { logrus.Errorf("[firehose %d] %v\n", output.PluginID, err) - // send failures are retryable - return fluentbit.FLB_RETRY + } + if retCode != fluentbit.FLB_OK { + return retCode } } @@ -178,8 +177,13 @@ func (output *OutputPlugin) AddRecord(record map[interface{}]interface{}, timeSt } // Flush sends the current buffer of records -func (output *OutputPlugin) Flush() error { - return output.sendCurrentBatch() +// Returns FLB_OK, FLB_RETRY, FLB_ERROR +func (output *OutputPlugin) Flush() int { + retCode, err := output.sendCurrentBatch() + if err != nil { + logrus.Errorf("[firehose %d] %v\n", output.PluginID, err) + } + return retCode } func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([]byte, error) { @@ -211,8 +215,7 @@ func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([ return data, nil } -func (output *OutputPlugin) sendCurrentBatch() error { - output.backoff.Wait() +func (output *OutputPlugin) sendCurrentBatch() (int, error) { output.timer.Check() response, err := output.client.PutRecordBatch(&firehose.PutRecordBatchInput{ @@ -225,12 +228,9 @@ func (output *OutputPlugin) sendCurrentBatch() error { if aerr, ok := err.(awserr.Error); ok { if aerr.Code() == firehose.ErrCodeServiceUnavailableException { logrus.Warnf("[firehose %d] Throughput limits for the delivery stream may have been exceeded.", output.PluginID) - // https://docs.aws.amazon.com/sdk-for-go/api/service/firehose/#Firehose.PutRecordBatch - // Firehose recommends backoff when this error is encountered - output.backoff.StartBackoff() } } - return err + return fluentbit.FLB_RETRY, err } logrus.Debugf("[firehose %d] Sent %d events to Firehose\n", output.PluginID, len(output.records)) @@ -239,12 +239,12 @@ func (output *OutputPlugin) sendCurrentBatch() error { // processAPIResponse processes the successful and failed records // it returns an error iff no records succeeded (i.e.) no progress has been made -func (output *OutputPlugin) processAPIResponse(response *firehose.PutRecordBatchOutput) error { +func (output *OutputPlugin) processAPIResponse(response *firehose.PutRecordBatchOutput) (int, error) { if aws.Int64Value(response.FailedPutCount) > 0 { // start timer if all records failed (no progress has been made) if aws.Int64Value(response.FailedPutCount) == int64(len(output.records)) { output.timer.Start() - return fmt.Errorf("PutRecordBatch request returned with no records successfully recieved") + return fluentbit.FLB_RETRY, fmt.Errorf("PutRecordBatch request returned with no records successfully recieved") } logrus.Warnf("[firehose %d] %d records failed to be delivered. Will retry.\n", output.PluginID, aws.Int64Value(response.FailedPutCount)) @@ -256,9 +256,8 @@ func (output *OutputPlugin) processAPIResponse(response *firehose.PutRecordBatch failedRecords = append(failedRecords, output.records[i]) } if aws.StringValue(record.ErrorCode) == firehose.ErrCodeServiceUnavailableException { - // https://docs.aws.amazon.com/sdk-for-go/api/service/firehose/#Firehose.PutRecordBatch - // Firehose recommends backoff when this error is encountered - output.backoff.StartBackoff() + logrus.Warnf("[firehose %d] Throughput limits for the delivery stream may have been exceeded.", output.PluginID) + return fluentbit.FLB_RETRY, nil } } @@ -272,10 +271,9 @@ func (output *OutputPlugin) processAPIResponse(response *firehose.PutRecordBatch } else { // request fully succeeded output.timer.Reset() - output.backoff.Reset() output.records = output.records[:0] output.dataLength = 0 } - return nil + return fluentbit.FLB_OK, nil } diff --git a/firehose/firehose_test.go b/firehose/firehose_test.go index 5ecf0d5..650d306 100644 --- a/firehose/firehose_test.go +++ b/firehose/firehose_test.go @@ -40,7 +40,6 @@ func TestAddRecord(t *testing.T) { dataKeys: "", client: nil, records: make([]*firehose.Record, 0, 500), - backoff: plugins.NewBackoff(), timer: timer, } @@ -79,7 +78,6 @@ func TestAddRecordAndFlush(t *testing.T) { dataKeys: "", client: mockFirehose, records: make([]*firehose.Record, 0, 500), - backoff: plugins.NewBackoff(), timer: timer, } @@ -87,7 +85,7 @@ func TestAddRecordAndFlush(t *testing.T) { retCode := output.AddRecord(record, &timeStamp) assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") - err := output.Flush() - assert.NoError(t, err, "Unexpected error calling flush") + retCode = output.Flush() + assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK") } diff --git a/fluent-bit-firehose.go b/fluent-bit-firehose.go index d4d1e47..e5ae2dc 100644 --- a/fluent-bit-firehose.go +++ b/fluent-bit-firehose.go @@ -131,10 +131,9 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int } count++ } - err := firehoseOutput.Flush() - if err != nil { - logrus.Errorf("[firehose %d] %v\n", firehoseOutput.PluginID, err) - return output.FLB_ERROR + retCode := firehoseOutput.Flush() + if retCode != output.FLB_OK { + return retCode } logrus.Debugf("[firehose %d] Processed %d events with tag %s\n", firehoseOutput.PluginID, count, fluentTag) @@ -145,10 +144,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int func FLBPluginExit() int { // Before final exit, call Flush() for all the instances of the Output Plugin for i := range pluginInstances { - err := pluginInstances[i].Flush() - if err != nil { - logrus.Errorf("[firehose %d] %v\n", pluginInstances[i].PluginID, err) - } + pluginInstances[i].Flush() } return output.FLB_OK diff --git a/plugins/plugins.go b/plugins/plugins.go index 38f67dc..b8bd206 100644 --- a/plugins/plugins.go +++ b/plugins/plugins.go @@ -20,7 +20,6 @@ import ( "strings" "time" - retry "github.com/cenkalti/backoff" "github.com/sirupsen/logrus" ) @@ -29,53 +28,6 @@ const ( sendFailureTimeoutEnvVar = "SEND_FAILURE_TIMEOUT" ) -const ( - initialInterval = 100 * time.Millisecond - maxInterval = 10 * time.Second -) - -// Backoff wraps github.com/cenkalti/backoff -// Wait() is called for each AWS API call that may need back off -// But backoff only occurs if StartBackoff() has previously been called -// Reset() should be called whenever backoff can end. -type Backoff struct { - doBackoff bool - expBackoff *retry.ExponentialBackOff -} - -// Reset ends the exponential backoff -func (b *Backoff) Reset() { - b.doBackoff = false - b.expBackoff.Reset() -} - -// Wait enacts the exponential backoff, if StartBackoff() has been called -func (b *Backoff) Wait() { - if b.doBackoff { - d := b.expBackoff.NextBackOff() - logrus.Debugf("[go plugin] In exponential backoff, waiting %v", d) - time.Sleep(d) - } -} - -// StartBackoff begins exponential backoff -// its a no-op if backoff has already started -func (b *Backoff) StartBackoff() { - b.doBackoff = true -} - -// NewBackoff creates a new Backoff struct with default values -func NewBackoff() *Backoff { - b := retry.NewExponentialBackOff() - b.InitialInterval = initialInterval - b.MaxElapsedTime = 0 // The backoff object never expires - b.MaxInterval = maxInterval - return &Backoff{ - doBackoff: false, - expBackoff: b, - } -} - // Timeout is a simple timeout for single-threaded programming // (Goroutines are expensive in Cgo) type Timeout struct {