Skip to content

Commit

Permalink
Remove exponential backoff code #23
Browse files Browse the repository at this point in the history
  • Loading branch information
PettitWesley authored Jun 1, 2020
1 parent 7252a82 commit c370445
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 80 deletions.
38 changes: 18 additions & 20 deletions firehose/firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ type OutputPlugin struct {
client PutRecordBatcher
records []*firehose.Record
dataLength int
backoff *plugins.Backoff
timer *plugins.Timeout
PluginID int
}
Expand Down Expand Up @@ -106,7 +105,6 @@ func NewOutputPlugin(region, deliveryStream, dataKeys, roleARN, endpoint, timeKe
client: client,
records: records,
dataKeys: dataKeys,
backoff: plugins.NewBackoff(),
timer: timer,
timeKey: timeKey,
fmtStrftime: timeFormatter,
Expand Down Expand Up @@ -162,11 +160,12 @@ func (output *OutputPlugin) AddRecord(record map[interface{}]interface{}, timeSt
newDataSize := len(data)

if len(output.records) == maximumRecordsPerPut || (output.dataLength+newDataSize) > maximumPutRecordBatchSize {
err = output.sendCurrentBatch()
retCode, err := output.sendCurrentBatch()
if err != nil {
logrus.Errorf("[firehose %d] %v\n", output.PluginID, err)
// send failures are retryable
return fluentbit.FLB_RETRY
}
if retCode != fluentbit.FLB_OK {
return retCode
}
}

Expand All @@ -178,8 +177,13 @@ func (output *OutputPlugin) AddRecord(record map[interface{}]interface{}, timeSt
}

// Flush sends the current buffer of records
func (output *OutputPlugin) Flush() error {
return output.sendCurrentBatch()
// Returns FLB_OK, FLB_RETRY, FLB_ERROR
func (output *OutputPlugin) Flush() int {
retCode, err := output.sendCurrentBatch()
if err != nil {
logrus.Errorf("[firehose %d] %v\n", output.PluginID, err)
}
return retCode
}

func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([]byte, error) {
Expand Down Expand Up @@ -211,8 +215,7 @@ func (output *OutputPlugin) processRecord(record map[interface{}]interface{}) ([
return data, nil
}

func (output *OutputPlugin) sendCurrentBatch() error {
output.backoff.Wait()
func (output *OutputPlugin) sendCurrentBatch() (int, error) {
output.timer.Check()

response, err := output.client.PutRecordBatch(&firehose.PutRecordBatchInput{
Expand All @@ -225,12 +228,9 @@ func (output *OutputPlugin) sendCurrentBatch() error {
if aerr, ok := err.(awserr.Error); ok {
if aerr.Code() == firehose.ErrCodeServiceUnavailableException {
logrus.Warnf("[firehose %d] Throughput limits for the delivery stream may have been exceeded.", output.PluginID)
// https://docs.aws.amazon.com/sdk-for-go/api/service/firehose/#Firehose.PutRecordBatch
// Firehose recommends backoff when this error is encountered
output.backoff.StartBackoff()
}
}
return err
return fluentbit.FLB_RETRY, err
}
logrus.Debugf("[firehose %d] Sent %d events to Firehose\n", output.PluginID, len(output.records))

Expand All @@ -239,12 +239,12 @@ func (output *OutputPlugin) sendCurrentBatch() error {

// processAPIResponse processes the successful and failed records
// it returns an error iff no records succeeded (i.e.) no progress has been made
func (output *OutputPlugin) processAPIResponse(response *firehose.PutRecordBatchOutput) error {
func (output *OutputPlugin) processAPIResponse(response *firehose.PutRecordBatchOutput) (int, error) {
if aws.Int64Value(response.FailedPutCount) > 0 {
// start timer if all records failed (no progress has been made)
if aws.Int64Value(response.FailedPutCount) == int64(len(output.records)) {
output.timer.Start()
return fmt.Errorf("PutRecordBatch request returned with no records successfully recieved")
return fluentbit.FLB_RETRY, fmt.Errorf("PutRecordBatch request returned with no records successfully recieved")
}

logrus.Warnf("[firehose %d] %d records failed to be delivered. Will retry.\n", output.PluginID, aws.Int64Value(response.FailedPutCount))
Expand All @@ -256,9 +256,8 @@ func (output *OutputPlugin) processAPIResponse(response *firehose.PutRecordBatch
failedRecords = append(failedRecords, output.records[i])
}
if aws.StringValue(record.ErrorCode) == firehose.ErrCodeServiceUnavailableException {
// https://docs.aws.amazon.com/sdk-for-go/api/service/firehose/#Firehose.PutRecordBatch
// Firehose recommends backoff when this error is encountered
output.backoff.StartBackoff()
logrus.Warnf("[firehose %d] Throughput limits for the delivery stream may have been exceeded.", output.PluginID)
return fluentbit.FLB_RETRY, nil
}
}

Expand All @@ -272,10 +271,9 @@ func (output *OutputPlugin) processAPIResponse(response *firehose.PutRecordBatch
} else {
// request fully succeeded
output.timer.Reset()
output.backoff.Reset()
output.records = output.records[:0]
output.dataLength = 0
}

return nil
return fluentbit.FLB_OK, nil
}
6 changes: 2 additions & 4 deletions firehose/firehose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ func TestAddRecord(t *testing.T) {
dataKeys: "",
client: nil,
records: make([]*firehose.Record, 0, 500),
backoff: plugins.NewBackoff(),
timer: timer,
}

Expand Down Expand Up @@ -79,15 +78,14 @@ func TestAddRecordAndFlush(t *testing.T) {
dataKeys: "",
client: mockFirehose,
records: make([]*firehose.Record, 0, 500),
backoff: plugins.NewBackoff(),
timer: timer,
}

timeStamp := time.Now()
retCode := output.AddRecord(record, &timeStamp)
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK")

err := output.Flush()
assert.NoError(t, err, "Unexpected error calling flush")
retCode = output.Flush()
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected return code to be FLB_OK")

}
12 changes: 4 additions & 8 deletions fluent-bit-firehose.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,9 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
}
count++
}
err := firehoseOutput.Flush()
if err != nil {
logrus.Errorf("[firehose %d] %v\n", firehoseOutput.PluginID, err)
return output.FLB_ERROR
retCode := firehoseOutput.Flush()
if retCode != output.FLB_OK {
return retCode
}
logrus.Debugf("[firehose %d] Processed %d events with tag %s\n", firehoseOutput.PluginID, count, fluentTag)

Expand All @@ -145,10 +144,7 @@ func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, tag *C.char) int
func FLBPluginExit() int {
// Before final exit, call Flush() for all the instances of the Output Plugin
for i := range pluginInstances {
err := pluginInstances[i].Flush()
if err != nil {
logrus.Errorf("[firehose %d] %v\n", pluginInstances[i].PluginID, err)
}
pluginInstances[i].Flush()
}

return output.FLB_OK
Expand Down
48 changes: 0 additions & 48 deletions plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"strings"
"time"

retry "github.com/cenkalti/backoff"
"github.com/sirupsen/logrus"
)

Expand All @@ -29,53 +28,6 @@ const (
sendFailureTimeoutEnvVar = "SEND_FAILURE_TIMEOUT"
)

const (
initialInterval = 100 * time.Millisecond
maxInterval = 10 * time.Second
)

// Backoff wraps github.com/cenkalti/backoff
// Wait() is called for each AWS API call that may need back off
// But backoff only occurs if StartBackoff() has previously been called
// Reset() should be called whenever backoff can end.
type Backoff struct {
doBackoff bool
expBackoff *retry.ExponentialBackOff
}

// Reset ends the exponential backoff
func (b *Backoff) Reset() {
b.doBackoff = false
b.expBackoff.Reset()
}

// Wait enacts the exponential backoff, if StartBackoff() has been called
func (b *Backoff) Wait() {
if b.doBackoff {
d := b.expBackoff.NextBackOff()
logrus.Debugf("[go plugin] In exponential backoff, waiting %v", d)
time.Sleep(d)
}
}

// StartBackoff begins exponential backoff
// its a no-op if backoff has already started
func (b *Backoff) StartBackoff() {
b.doBackoff = true
}

// NewBackoff creates a new Backoff struct with default values
func NewBackoff() *Backoff {
b := retry.NewExponentialBackOff()
b.InitialInterval = initialInterval
b.MaxElapsedTime = 0 // The backoff object never expires
b.MaxInterval = maxInterval
return &Backoff{
doBackoff: false,
expBackoff: b,
}
}

// Timeout is a simple timeout for single-threaded programming
// (Goroutines are expensive in Cgo)
type Timeout struct {
Expand Down

0 comments on commit c370445

Please sign in to comment.