Skip to content

Commit

Permalink
Merge mainline
Browse files Browse the repository at this point in the history
  • Loading branch information
zackwine committed Sep 4, 2020
2 parents b3acbaf + dd7ce3b commit 32ed197
Showing 1 changed file with 77 additions and 10 deletions.
87 changes: 77 additions & 10 deletions kinesis/kinesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins"
"github.com/aws/amazon-kinesis-streams-for-fluent-bit/aggregate"
"github.com/aws/amazon-kinesis-streams-for-fluent-bit/kinesis/mock_kinesis"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
Expand All @@ -16,8 +17,10 @@ import (
"github.com/stretchr/testify/assert"
)

const concurrencyRetryLimit = 4

// newMockOutputPlugin creates an mock OutputPlugin object
func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient) (*OutputPlugin, error) {
func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient, isAggregate bool) (*OutputPlugin, error) {

timer, _ := plugins.NewTimeout(func(d time.Duration) {
logrus.Errorf("[kinesis] timeout threshold reached: Failed to send logs for %v", d)
Expand All @@ -32,14 +35,22 @@ func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient) (*OutputPlug
buffer: b,
}

var aggregator *aggregate.Aggregator
if isAggregate {
aggregator = aggregate.NewAggregator()
}

return &OutputPlugin{
stream: "stream",
client: client,
dataKeys: "",
partitionKey: "",
timer: timer,
PluginID: 0,
random: random,
stream: "stream",
client: client,
dataKeys: "",
partitionKey: "",
timer: timer,
PluginID: 0,
random: random,
concurrencyRetryLimit: concurrencyRetryLimit,
isAggregate: isAggregate,
aggregator: aggregator,
}, nil
}

Expand Down Expand Up @@ -70,7 +81,7 @@ func TestAddRecord(t *testing.T) {
"testkey": []byte("test value"),
}

outputPlugin, _ := newMockOutputPlugin(nil)
outputPlugin, _ := newMockOutputPlugin(nil, false)

timeStamp := time.Now()
retCode := outputPlugin.AddRecord(&records, record, &timeStamp)
Expand All @@ -92,7 +103,7 @@ func TestAddRecordAndFlush(t *testing.T) {
FailedRecordCount: aws.Int64(0),
}, nil)

outputPlugin, _ := newMockOutputPlugin(mockKinesis)
outputPlugin, _ := newMockOutputPlugin(mockKinesis, false)

timeStamp := time.Now()
retCode := outputPlugin.AddRecord(&records, record, &timeStamp)
Expand All @@ -111,6 +122,62 @@ func TestZlibCompression(t *testing.T) {
assert.Lessf(t, len(compressedBuf), len(testData), "Compressed data buffer should contain fewer bytes")
}

func TestAddRecordAndFlushAggregate(t *testing.T) {
records := make([]*kinesis.PutRecordsRequestEntry, 0, 500)

record := map[interface{}]interface{}{
"testkey": []byte("test value"),
}

ctrl := gomock.NewController(t)
mockKinesis := mock_kinesis.NewMockPutRecordsClient(ctrl)

mockKinesis.EXPECT().PutRecords(gomock.Any()).Return(&kinesis.PutRecordsOutput{
FailedRecordCount: aws.Int64(0),
}, nil)

outputPlugin, _ := newMockOutputPlugin(mockKinesis, true)

checkIsAggregate := outputPlugin.IsAggregate()
assert.Equal(t, checkIsAggregate, true, "Expected IsAggregate() to return true")

timeStamp := time.Now()
retCode := outputPlugin.AddRecord(&records, record, &timeStamp)
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected AddRecord return code to be FLB_OK")

retCode = outputPlugin.FlushAggregatedRecords(&records)
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected FlushAggregatedRecords return code to be FLB_OK")

retCode = outputPlugin.Flush(&records)
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected Flush return code to be FLB_OK")
}

func TestAddRecordWithConcurrency(t *testing.T) {
records := make([]*kinesis.PutRecordsRequestEntry, 0, 500)

record := map[interface{}]interface{}{
"testkey": []byte("test value"),
}

ctrl := gomock.NewController(t)
mockKinesis := mock_kinesis.NewMockPutRecordsClient(ctrl)

mockKinesis.EXPECT().PutRecords(gomock.Any()).Return(&kinesis.PutRecordsOutput{
FailedRecordCount: aws.Int64(0),
}, nil)

outputPlugin, _ := newMockOutputPlugin(mockKinesis, false)
// Enable concurrency
outputPlugin.Concurrency = 2

timeStamp := time.Now()
retCode := outputPlugin.AddRecord(&records, record, &timeStamp)
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected AddRecord return code to be FLB_OK")

retCode = outputPlugin.FlushConcurrent(len(records), records)
assert.Equal(t, retCode, fluentbit.FLB_OK, "Expected FlushConcurrent return code to be FLB_OK")
}

func TestZlibCompressionEmpty(t *testing.T) {

_, err := zlibCompress(nil)
Expand Down

0 comments on commit 32ed197

Please sign in to comment.