diff --git a/aggregate/aggregator.go b/aggregate/aggregator.go index 7353be8..0e82c45 100644 --- a/aggregate/aggregator.go +++ b/aggregate/aggregator.go @@ -4,6 +4,7 @@ import ( "crypto/md5" "fmt" + "github.com/aws/amazon-kinesis-streams-for-fluent-bit/util" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/kinesis" "github.com/sirupsen/logrus" @@ -30,52 +31,80 @@ type Aggregator struct { records []*Record aggSize int // Size of both records, and partitionKeys in bytes maxAggRecordSize int + stringGen *util.RandomStringGenerator } // NewAggregator create a new aggregator -func NewAggregator() *Aggregator { +func NewAggregator(stringGen *util.RandomStringGenerator) *Aggregator { return &Aggregator{ partitionKeys: make(map[string]uint64, 0), records: make([]*Record, 0), maxAggRecordSize: defaultMaxAggRecordSize, aggSize: initialAggRecordSize, + stringGen: stringGen, } } // AddRecord to the aggregate buffer. // Will return a kinesis PutRecordsRequest once buffer is full, or if the data exceeds the aggregate limit. -func (a *Aggregator) AddRecord(partitionKey string, data []byte) (entry *kinesis.PutRecordsRequestEntry, err error) { +func (a *Aggregator) AddRecord(partitionKey string, hasPartitionKey bool, data []byte) (entry *kinesis.PutRecordsRequestEntry, err error) { - partitionKeySize := len([]byte(partitionKey)) - if partitionKeySize < 1 { - return nil, fmt.Errorf("Invalid partition key provided") + if hasPartitionKey { + partitionKeySize := len([]byte(partitionKey)) + if partitionKeySize < 1 { + return nil, fmt.Errorf("Invalid partition key provided") + } } dataSize := len(data) // If this is a very large record, then don't aggregate it. if dataSize >= a.maxAggRecordSize { + if !hasPartitionKey { + partitionKey = a.stringGen.RandomString() + } return &kinesis.PutRecordsRequestEntry{ Data: data, PartitionKey: aws.String(partitionKey), }, nil } + + if !hasPartitionKey { + if len(a.partitionKeys) > 0 { + // Take any partition key from the map, as long as one exists + for k, _ := range a.partitionKeys { + partitionKey = k + break + } + } else { + partitionKey = a.stringGen.RandomString() + } + } + // Check if we need to add a new partition key, and if we do how much space it will take pKeyIdx, pKeyAddedSize := a.checkPartitionKey(partitionKey) // data field size is proto size of data + data field number size // partition key field size is varint of index size + field number size - recordSize := protowire.SizeBytes(dataSize) + fieldNumberSize + protowire.SizeVarint(pKeyIdx) + fieldNumberSize - // Total size is proto size of data + field number of parent proto - addedSize := protowire.SizeBytes(recordSize) + fieldNumberSize + dataFieldSize := protowire.SizeBytes(dataSize) + fieldNumberSize + pkeyFieldSize := protowire.SizeVarint(pKeyIdx) + fieldNumberSize + // Total size is byte size of data + pkey field + field number of parent proto - if a.getSize()+addedSize+pKeyAddedSize >= maximumRecordSize { - // Aggregate records, and return + if a.getSize()+protowire.SizeBytes(dataFieldSize+pkeyFieldSize)+fieldNumberSize+pKeyAddedSize >= maximumRecordSize { + // Aggregate records, and return if error entry, err = a.AggregateRecords() if err != nil { return entry, err } + + if !hasPartitionKey { + // choose a new partition key if needed now that we've aggregated the previous records + partitionKey = a.stringGen.RandomString() + } + // Recompute field size, since it changed + pKeyIdx, _ = a.checkPartitionKey(partitionKey) + pkeyFieldSize = protowire.SizeVarint(pKeyIdx) + fieldNumberSize } // Add new record, and update aggSize @@ -86,7 +115,7 @@ func (a *Aggregator) AddRecord(partitionKey string, data []byte) (entry *kinesis PartitionKeyIndex: &partitionKeyIndex, }) - a.aggSize += addedSize + a.aggSize += protowire.SizeBytes(dataFieldSize+pkeyFieldSize) + fieldNumberSize return entry, err } diff --git a/aggregate/aggregator_test.go b/aggregate/aggregator_test.go index a46e3cf..2316d34 100644 --- a/aggregate/aggregator_test.go +++ b/aggregate/aggregator_test.go @@ -3,19 +3,34 @@ package aggregate import ( "testing" + "github.com/aws/amazon-kinesis-streams-for-fluent-bit/util" "github.com/stretchr/testify/assert" ) const concurrencyRetryLimit = 4 func TestAddRecordCalculatesCorrectSize(t *testing.T) { - aggregator := NewAggregator() + generator := util.NewRandomStringGenerator(18) + aggregator := NewAggregator(generator) - _, err := aggregator.AddRecord("test partition key", []byte("test value")) + _, err := aggregator.AddRecord("", false, []byte("test value")) assert.Equal(t, nil, err, "Expected aggregator not to return error") assert.Equal(t, 36, aggregator.aggSize, "Expected aggregator to compute correct size") - _, err = aggregator.AddRecord("test partition key 2", []byte("test value 2")) + _, err = aggregator.AddRecord("test partition key 2", true, []byte("test value 2")) assert.Equal(t, nil, err, "Expected aggregator not to return error") assert.Equal(t, 76, aggregator.aggSize, "Expected aggregator to compute correct size") } + +func TestAddRecordDoesNotAddNewRandomPartitionKey(t *testing.T) { + generator := util.NewRandomStringGenerator(18) + aggregator := NewAggregator(generator) + + _, err := aggregator.AddRecord("", false, []byte("test value")) + assert.Equal(t, nil, err, "Expected aggregator not to return error") + assert.Equal(t, 36, aggregator.aggSize, "Expected aggregator to compute correct size") + + _, err = aggregator.AddRecord("", false, []byte("test value 2")) + assert.Equal(t, nil, err, "Expected aggregator not to return error") + assert.Equal(t, 1, len(aggregator.partitionKeys), "Expected aggregator to reuse partitionKey value") +} diff --git a/fluent-bit-kinesis.go b/fluent-bit-kinesis.go index b08ff56..71f0304 100644 --- a/fluent-bit-kinesis.go +++ b/fluent-bit-kinesis.go @@ -113,7 +113,7 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin, } if isAggregate && partitionKey != "" { - logrus.Errorf("[kinesis %d] WARNING: The options 'aggregation' and 'partition_key' should not be used simaltaniously", pluginID) + logrus.Errorf("[kinesis %d] WARNING: The options 'aggregation' and 'partition_key' should not be used simultaneously", pluginID) } var concurrencyInt, concurrencyRetriesInt int diff --git a/kinesis/kinesis.go b/kinesis/kinesis.go index 14e0b15..de19190 100644 --- a/kinesis/kinesis.go +++ b/kinesis/kinesis.go @@ -20,7 +20,6 @@ import ( "bytes" "compress/zlib" "fmt" - "math/rand" "os" "strings" "sync/atomic" @@ -28,6 +27,7 @@ import ( "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" "github.com/aws/amazon-kinesis-streams-for-fluent-bit/aggregate" + "github.com/aws/amazon-kinesis-streams-for-fluent-bit/util" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials/stscreds" @@ -42,8 +42,7 @@ import ( ) const ( - partitionKeyCharset = "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" - truncatedSuffix = "[Truncated...]" + truncatedSuffix = "[Truncated...]" ) const ( @@ -65,11 +64,6 @@ type PutRecordsClient interface { PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error) } -type random struct { - seededRandom *rand.Rand - buffer []byte -} - // CompressionType indicates the type of compression to apply to each record type CompressionType string @@ -98,16 +92,15 @@ type OutputPlugin struct { client PutRecordsClient timer *plugins.Timeout PluginID int - random *random + stringGen *util.RandomStringGenerator Concurrency int concurrencyRetryLimit int // Concurrency is the limit, goroutineCount represents the running goroutines - goroutineCount int32 + goroutineCount int32 // Used to implement backoff for concurrent flushes concurrentRetries uint32 isAggregate bool aggregator *aggregate.Aggregator - aggregatePartitionKey string compression CompressionType // If specified, dots in key names should be replaced with other symbols replaceDots string @@ -130,11 +123,7 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEnd return nil, err } - seededRand := rand.New(rand.NewSource(time.Now().UnixNano())) - random := &random{ - seededRandom: seededRand, - buffer: make([]byte, 8), - } + stringGen := util.NewRandomStringGenerator(8) var timeFormatter *strftime.Strftime if timeKey != "" { @@ -150,7 +139,7 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEnd var aggregator *aggregate.Aggregator if isAggregate { - aggregator = aggregate.NewAggregator() + aggregator = aggregate.NewAggregator(stringGen) } return &OutputPlugin{ @@ -164,7 +153,7 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEnd logKey: logKey, timer: timer, PluginID: pluginID, - random: random, + stringGen: stringGen, Concurrency: concurrency, concurrencyRetryLimit: retryLimit, isAggregate: isAggregate, @@ -249,9 +238,12 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques record[outputPlugin.timeKey] = buf.String() } - partitionKey := outputPlugin.getPartitionKey(record) - logrus.Debugf("[kinesis %d] Got value: %s for a given partition key.\n", outputPlugin.PluginID, partitionKey) - data, err := outputPlugin.processRecord(record, partitionKey) + partitionKey, hasPartitionKey := outputPlugin.getPartitionKey(record) + var partitionKeyLen = len(partitionKey) + if !hasPartitionKey { + partitionKeyLen = outputPlugin.stringGen.Size + } + data, err := outputPlugin.processRecord(record, partitionKeyLen) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) // discard this single bad record instead and let the batch continue @@ -259,13 +251,17 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques } if !outputPlugin.isAggregate { + if !hasPartitionKey { + partitionKey = outputPlugin.stringGen.RandomString() + } + logrus.Debugf("[kinesis %d] Got value: %s for a given partition key.\n", outputPlugin.PluginID, partitionKey) *records = append(*records, &kinesis.PutRecordsRequestEntry{ Data: data, PartitionKey: aws.String(partitionKey), }) } else { // Use the KPL aggregator to buffer records isAggregate is true - aggRecord, err := outputPlugin.aggregator.AddRecord(partitionKey, data) + aggRecord, err := outputPlugin.aggregator.AddRecord(partitionKey, hasPartitionKey, data) if err != nil { logrus.Errorf("[kinesis %d] Failed to aggregate record %v\n", outputPlugin.PluginID, err) // discard this single bad record instead and let the batch continue @@ -275,7 +271,6 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques // If aggRecord isn't nil, then a full kinesis record has been aggregated if aggRecord != nil { *records = append(*records, aggRecord) - outputPlugin.aggregatePartitionKey = outputPlugin.randomString() } } @@ -294,7 +289,6 @@ func (outputPlugin *OutputPlugin) FlushAggregatedRecords(records *[]*kinesis.Put if aggRecord != nil { *records = append(*records, aggRecord) - outputPlugin.aggregatePartitionKey = outputPlugin.randomString() } return fluentbit.FLB_OK @@ -426,7 +420,7 @@ func replaceDots(obj map[interface{}]interface{}, replacement string) map[interf return obj } -func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface{}, partitionKey string) ([]byte, error) { +func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface{}, partitionKeyLen int) ([]byte, error) { if outputPlugin.dataKeys != "" { record = plugins.DataKeys(outputPlugin.dataKeys, record) } @@ -473,9 +467,9 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface } } - if len(data)+len(partitionKey) > maximumRecordSize { - logrus.Warnf("[kinesis %d] Found record with %d bytes, truncating to 1MB, stream=%s\n", outputPlugin.PluginID, len(data)+len(partitionKey), outputPlugin.stream) - data = data[:maximumRecordSize-len(partitionKey)-len(truncatedSuffix)] + if len(data)+partitionKeyLen > maximumRecordSize { + logrus.Warnf("[kinesis %d] Found record with %d bytes, truncating to 1MB, stream=%s\n", outputPlugin.PluginID, len(data)+partitionKeyLen, outputPlugin.stream) + data = data[:maximumRecordSize-partitionKeyLen-len(truncatedSuffix)] data = append(data, []byte(truncatedSuffix)...) } @@ -554,15 +548,6 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutReco return retCode, nil } -// randomString generates a random string of length 8 -// it uses the math/rand library -func (outputPlugin *OutputPlugin) randomString() string { - for i := range outputPlugin.random.buffer { - outputPlugin.random.buffer[i] = partitionKeyCharset[outputPlugin.random.seededRandom.Intn(len(partitionKeyCharset))] - } - return string(outputPlugin.random.buffer) -} - func getFromMap(dataKey string, record map[interface{}]interface{}) interface{} { for k, v := range record { currentKey := stringOrByteArray(k) @@ -575,8 +560,9 @@ func getFromMap(dataKey string, record map[interface{}]interface{}) interface{} } // getPartitionKey returns the value for a given valid key -// if the given key is empty or invalid, it returns a random string -func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interface{}) string { +// if the given key is empty or invalid, it returns empty +// second return value indicates whether a partition key was found or not +func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interface{}) (string, bool) { partitionKey := outputPlugin.partitionKey if partitionKey != "" { partitionKeys := strings.Split(partitionKey, "->") @@ -589,7 +575,7 @@ func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interfa if len(value) > partitionKeyMaxLength { value = value[0:partitionKeyMaxLength] } - return value + return value, true } } _, ok := newRecord.(map[interface{}]interface{}) @@ -597,17 +583,11 @@ func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interfa record = newRecord.(map[interface{}]interface{}) } else { logrus.Errorf("[kinesis %d] The partition key could not be found in the record, using a random string instead", outputPlugin.PluginID) - return outputPlugin.randomString() + return "", false } } } - if outputPlugin.isAggregate { - if outputPlugin.aggregatePartitionKey == "" { - outputPlugin.aggregatePartitionKey = outputPlugin.randomString() - } - return outputPlugin.aggregatePartitionKey - } - return outputPlugin.randomString() + return "", false } func zlibCompress(data []byte) ([]byte, error) { diff --git a/kinesis/kinesis_test.go b/kinesis/kinesis_test.go index d728aee..3e6e977 100644 --- a/kinesis/kinesis_test.go +++ b/kinesis/kinesis_test.go @@ -2,7 +2,6 @@ package kinesis import ( "encoding/json" - "math/rand" "os" "testing" "time" @@ -10,6 +9,7 @@ import ( "github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins" "github.com/aws/amazon-kinesis-streams-for-fluent-bit/aggregate" "github.com/aws/amazon-kinesis-streams-for-fluent-bit/kinesis/mock_kinesis" + "github.com/aws/amazon-kinesis-streams-for-fluent-bit/util" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/service/kinesis" fluentbit "github.com/fluent/fluent-bit-go/output" @@ -29,16 +29,11 @@ func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient, isAggregate os.Exit(1) }) - seededRand := rand.New(rand.NewSource(time.Now().UnixNano())) - b := make([]byte, 8) - random := &random{ - seededRandom: seededRand, - buffer: b, - } + stringGen := util.NewRandomStringGenerator(8) var aggregator *aggregate.Aggregator if isAggregate { - aggregator = aggregate.NewAggregator() + aggregator = aggregate.NewAggregator(stringGen) } return &OutputPlugin{ @@ -48,7 +43,7 @@ func newMockOutputPlugin(client *mock_kinesis.MockPutRecordsClient, isAggregate partitionKey: "", timer: timer, PluginID: 0, - random: random, + stringGen: stringGen, concurrencyRetryLimit: concurrencyRetryLimit, isAggregate: isAggregate, aggregator: aggregator, @@ -102,7 +97,7 @@ func TestTruncateLargeLogEvent(t *testing.T) { timeStamp := time.Now() retCode := outputPlugin.AddRecord(&records, record, &timeStamp) - actualData, err := outputPlugin.processRecord(record, "testKey") + actualData, err := outputPlugin.processRecord(record, len("testKey")) if err != nil { logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err) } @@ -256,28 +251,34 @@ func TestGetPartitionKey(t *testing.T) { //test getPartitionKey() with single partition key outputPlugin, _ := newMockOutputPlugin(nil, false) outputPlugin.partitionKey = "testKey" - value := outputPlugin.getPartitionKey(record) + value, hasValue := outputPlugin.getPartitionKey(record) + assert.Equal(t, true, hasValue, "Should find value") assert.Equal(t, value, "test value with no nested keys") //test getPartitionKey() with nested partition key outputPlugin.partitionKey = "testKeyWithOneNestedKey->nestedKey" - value = outputPlugin.getPartitionKey(record) + value, hasValue = outputPlugin.getPartitionKey(record) + assert.Equal(t, true, hasValue, "Should find value") assert.Equal(t, value, "test value with one nested key") outputPlugin.partitionKey = "testKeyWithNestedKeys->outerKey->innerKey" - value = outputPlugin.getPartitionKey(record) + value, hasValue = outputPlugin.getPartitionKey(record) + assert.Equal(t, true, hasValue, "Should find value") assert.Equal(t, value, "test value with inner key") //test getPartitionKey() with partition key not found outputPlugin.partitionKey = "some key" - value = outputPlugin.getPartitionKey(record) - assert.Len(t, value, 8, "This should be a random string") + value, hasValue = outputPlugin.getPartitionKey(record) + assert.Equal(t, false, hasValue, "Should not find value") + assert.Len(t, value, 0, "This should be an empty string") outputPlugin.partitionKey = "testKeyWithOneNestedKey" - value = outputPlugin.getPartitionKey(record) - assert.Len(t, value, 8, "This should be a random string") + value, hasValue = outputPlugin.getPartitionKey(record) + assert.Equal(t, false, hasValue, "Should not find value") + assert.Len(t, value, 0, "This should be an empty string") outputPlugin.partitionKey = "testKeyWithOneNestedKey->someKey" - value = outputPlugin.getPartitionKey(record) - assert.Len(t, value, 8, "This should be a random string") + value, hasValue = outputPlugin.getPartitionKey(record) + assert.Equal(t, false, hasValue, "Should not find value") + assert.Len(t, value, 0, "This should be an empty string") } diff --git a/util/random.go b/util/random.go new file mode 100644 index 0000000..2e430dc --- /dev/null +++ b/util/random.go @@ -0,0 +1,34 @@ +package util + +import ( + "math/rand" + "time" +) + +const ( + partitionKeyCharset = "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" +) + +type RandomStringGenerator struct { + seededRandom *rand.Rand + buffer []byte + Size int +} + +// Provides a generator of random strings of provided length +// it uses the math/rand library +func NewRandomStringGenerator(stringSize int) *RandomStringGenerator { + + return &RandomStringGenerator{ + seededRandom: rand.New(rand.NewSource(time.Now().UnixNano())), + buffer: make([]byte, stringSize), + Size: stringSize, + } +} + +func (gen *RandomStringGenerator) RandomString() string { + for i := range gen.buffer { + gen.buffer[i] = partitionKeyCharset[gen.seededRandom.Intn(len(partitionKeyCharset))] + } + return string(gen.buffer) +}