Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix partition key computation for aggregation #158

Merged
merged 1 commit into from
Aug 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 40 additions & 11 deletions aggregate/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/md5"
"fmt"

"github.com/aws/amazon-kinesis-streams-for-fluent-bit/util"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/sirupsen/logrus"
Expand All @@ -30,52 +31,80 @@ type Aggregator struct {
records []*Record
aggSize int // Size of both records, and partitionKeys in bytes
maxAggRecordSize int
stringGen *util.RandomStringGenerator
}

// NewAggregator create a new aggregator
func NewAggregator() *Aggregator {
func NewAggregator(stringGen *util.RandomStringGenerator) *Aggregator {

return &Aggregator{
partitionKeys: make(map[string]uint64, 0),
records: make([]*Record, 0),
maxAggRecordSize: defaultMaxAggRecordSize,
aggSize: initialAggRecordSize,
stringGen: stringGen,
}
}

// AddRecord to the aggregate buffer.
// Will return a kinesis PutRecordsRequest once buffer is full, or if the data exceeds the aggregate limit.
func (a *Aggregator) AddRecord(partitionKey string, data []byte) (entry *kinesis.PutRecordsRequestEntry, err error) {
func (a *Aggregator) AddRecord(partitionKey string, hasPartitionKey bool, data []byte) (entry *kinesis.PutRecordsRequestEntry, err error) {

partitionKeySize := len([]byte(partitionKey))
if partitionKeySize < 1 {
return nil, fmt.Errorf("Invalid partition key provided")
if hasPartitionKey {
partitionKeySize := len([]byte(partitionKey))
if partitionKeySize < 1 {
return nil, fmt.Errorf("Invalid partition key provided")
}
}

dataSize := len(data)

// If this is a very large record, then don't aggregate it.
if dataSize >= a.maxAggRecordSize {
if !hasPartitionKey {
partitionKey = a.stringGen.RandomString()
}
return &kinesis.PutRecordsRequestEntry{
Data: data,
PartitionKey: aws.String(partitionKey),
}, nil
}

if !hasPartitionKey {
if len(a.partitionKeys) > 0 {
// Take any partition key from the map, as long as one exists
for k, _ := range a.partitionKeys {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to suggestions for how to do this better, not sure how to get keys out of a map properly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could I ask a question about what "first" means here? Does it mean the first element/key was put in the map? Then I am not sure if you could get it from the map through iteration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I generally just want any key that is in the map, basically we choose a "random" key for the record but AFAICT this key does not matter, since the real partition key is just the first key, so this is just a way to save space. I'll update the comment.

partitionKey = k
break
}
} else {
partitionKey = a.stringGen.RandomString()
}
}

// Check if we need to add a new partition key, and if we do how much space it will take
pKeyIdx, pKeyAddedSize := a.checkPartitionKey(partitionKey)

// data field size is proto size of data + data field number size
// partition key field size is varint of index size + field number size
recordSize := protowire.SizeBytes(dataSize) + fieldNumberSize + protowire.SizeVarint(pKeyIdx) + fieldNumberSize
// Total size is proto size of data + field number of parent proto
addedSize := protowire.SizeBytes(recordSize) + fieldNumberSize
dataFieldSize := protowire.SizeBytes(dataSize) + fieldNumberSize
pkeyFieldSize := protowire.SizeVarint(pKeyIdx) + fieldNumberSize
// Total size is byte size of data + pkey field + field number of parent proto

if a.getSize()+addedSize+pKeyAddedSize >= maximumRecordSize {
// Aggregate records, and return
if a.getSize()+protowire.SizeBytes(dataFieldSize+pkeyFieldSize)+fieldNumberSize+pKeyAddedSize >= maximumRecordSize {
// Aggregate records, and return if error
entry, err = a.AggregateRecords()
if err != nil {
return entry, err
}

if !hasPartitionKey {
// choose a new partition key if needed now that we've aggregated the previous records
partitionKey = a.stringGen.RandomString()
}
hossain-rayhan marked this conversation as resolved.
Show resolved Hide resolved
// Recompute field size, since it changed
pKeyIdx, _ = a.checkPartitionKey(partitionKey)
pkeyFieldSize = protowire.SizeVarint(pKeyIdx) + fieldNumberSize
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also fixes a sizing bug, when we switch records we may be off on the size slightly (it may shrink if the previous pkeyIdx was > 128)

}

// Add new record, and update aggSize
Expand All @@ -86,7 +115,7 @@ func (a *Aggregator) AddRecord(partitionKey string, data []byte) (entry *kinesis
PartitionKeyIndex: &partitionKeyIndex,
})

a.aggSize += addedSize
a.aggSize += protowire.SizeBytes(dataFieldSize+pkeyFieldSize) + fieldNumberSize

return entry, err
}
Expand Down
21 changes: 18 additions & 3 deletions aggregate/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,34 @@ package aggregate
import (
"testing"

"github.com/aws/amazon-kinesis-streams-for-fluent-bit/util"
"github.com/stretchr/testify/assert"
)

const concurrencyRetryLimit = 4

func TestAddRecordCalculatesCorrectSize(t *testing.T) {
aggregator := NewAggregator()
generator := util.NewRandomStringGenerator(18)
aggregator := NewAggregator(generator)

_, err := aggregator.AddRecord("test partition key", []byte("test value"))
_, err := aggregator.AddRecord("", false, []byte("test value"))
assert.Equal(t, nil, err, "Expected aggregator not to return error")
assert.Equal(t, 36, aggregator.aggSize, "Expected aggregator to compute correct size")

_, err = aggregator.AddRecord("test partition key 2", []byte("test value 2"))
_, err = aggregator.AddRecord("test partition key 2", true, []byte("test value 2"))
assert.Equal(t, nil, err, "Expected aggregator not to return error")
assert.Equal(t, 76, aggregator.aggSize, "Expected aggregator to compute correct size")
}

func TestAddRecordDoesNotAddNewRandomPartitionKey(t *testing.T) {
generator := util.NewRandomStringGenerator(18)
aggregator := NewAggregator(generator)

_, err := aggregator.AddRecord("", false, []byte("test value"))
assert.Equal(t, nil, err, "Expected aggregator not to return error")
assert.Equal(t, 36, aggregator.aggSize, "Expected aggregator to compute correct size")

_, err = aggregator.AddRecord("", false, []byte("test value 2"))
assert.Equal(t, nil, err, "Expected aggregator not to return error")
assert.Equal(t, 1, len(aggregator.partitionKeys), "Expected aggregator to reuse partitionKey value")
}
2 changes: 1 addition & 1 deletion fluent-bit-kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func newKinesisOutput(ctx unsafe.Pointer, pluginID int) (*kinesis.OutputPlugin,
}

if isAggregate && partitionKey != "" {
logrus.Errorf("[kinesis %d] WARNING: The options 'aggregation' and 'partition_key' should not be used simaltaniously", pluginID)
logrus.Errorf("[kinesis %d] WARNING: The options 'aggregation' and 'partition_key' should not be used simultaneously", pluginID)
}

var concurrencyInt, concurrencyRetriesInt int
Expand Down
76 changes: 28 additions & 48 deletions kinesis/kinesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import (
"bytes"
"compress/zlib"
"fmt"
"math/rand"
"os"
"strings"
"sync/atomic"
"time"

"github.com/aws/amazon-kinesis-firehose-for-fluent-bit/plugins"
"github.com/aws/amazon-kinesis-streams-for-fluent-bit/aggregate"
"github.com/aws/amazon-kinesis-streams-for-fluent-bit/util"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/awserr"
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
Expand All @@ -42,8 +42,7 @@ import (
)

const (
partitionKeyCharset = "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
truncatedSuffix = "[Truncated...]"
truncatedSuffix = "[Truncated...]"
)

const (
Expand All @@ -65,11 +64,6 @@ type PutRecordsClient interface {
PutRecords(input *kinesis.PutRecordsInput) (*kinesis.PutRecordsOutput, error)
}

type random struct {
seededRandom *rand.Rand
buffer []byte
}

// CompressionType indicates the type of compression to apply to each record
type CompressionType string

Expand Down Expand Up @@ -98,16 +92,15 @@ type OutputPlugin struct {
client PutRecordsClient
timer *plugins.Timeout
PluginID int
random *random
stringGen *util.RandomStringGenerator
Concurrency int
concurrencyRetryLimit int
// Concurrency is the limit, goroutineCount represents the running goroutines
goroutineCount int32
goroutineCount int32
// Used to implement backoff for concurrent flushes
concurrentRetries uint32
isAggregate bool
aggregator *aggregate.Aggregator
aggregatePartitionKey string
compression CompressionType
// If specified, dots in key names should be replaced with other symbols
replaceDots string
Expand All @@ -130,11 +123,7 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEnd
return nil, err
}

seededRand := rand.New(rand.NewSource(time.Now().UnixNano()))
random := &random{
seededRandom: seededRand,
buffer: make([]byte, 8),
}
stringGen := util.NewRandomStringGenerator(8)

var timeFormatter *strftime.Strftime
if timeKey != "" {
Expand All @@ -150,7 +139,7 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEnd

var aggregator *aggregate.Aggregator
if isAggregate {
aggregator = aggregate.NewAggregator()
aggregator = aggregate.NewAggregator(stringGen)
}

return &OutputPlugin{
Expand All @@ -164,7 +153,7 @@ func NewOutputPlugin(region, stream, dataKeys, partitionKey, roleARN, kinesisEnd
logKey: logKey,
timer: timer,
PluginID: pluginID,
random: random,
stringGen: stringGen,
Concurrency: concurrency,
concurrencyRetryLimit: retryLimit,
isAggregate: isAggregate,
Expand Down Expand Up @@ -249,23 +238,30 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques
record[outputPlugin.timeKey] = buf.String()
}

partitionKey := outputPlugin.getPartitionKey(record)
logrus.Debugf("[kinesis %d] Got value: %s for a given partition key.\n", outputPlugin.PluginID, partitionKey)
data, err := outputPlugin.processRecord(record, partitionKey)
partitionKey, hasPartitionKey := outputPlugin.getPartitionKey(record)
var partitionKeyLen = len(partitionKey)
if !hasPartitionKey {
partitionKeyLen = outputPlugin.stringGen.Size
}
data, err := outputPlugin.processRecord(record, partitionKeyLen)
if err != nil {
logrus.Errorf("[kinesis %d] %v\n", outputPlugin.PluginID, err)
// discard this single bad record instead and let the batch continue
return fluentbit.FLB_OK
}

if !outputPlugin.isAggregate {
if !hasPartitionKey {
partitionKey = outputPlugin.stringGen.RandomString()
}
logrus.Debugf("[kinesis %d] Got value: %s for a given partition key.\n", outputPlugin.PluginID, partitionKey)
*records = append(*records, &kinesis.PutRecordsRequestEntry{
Data: data,
PartitionKey: aws.String(partitionKey),
})
} else {
// Use the KPL aggregator to buffer records isAggregate is true
aggRecord, err := outputPlugin.aggregator.AddRecord(partitionKey, data)
aggRecord, err := outputPlugin.aggregator.AddRecord(partitionKey, hasPartitionKey, data)
if err != nil {
logrus.Errorf("[kinesis %d] Failed to aggregate record %v\n", outputPlugin.PluginID, err)
// discard this single bad record instead and let the batch continue
Expand All @@ -275,7 +271,6 @@ func (outputPlugin *OutputPlugin) AddRecord(records *[]*kinesis.PutRecordsReques
// If aggRecord isn't nil, then a full kinesis record has been aggregated
if aggRecord != nil {
*records = append(*records, aggRecord)
outputPlugin.aggregatePartitionKey = outputPlugin.randomString()
}
}

Expand All @@ -294,7 +289,6 @@ func (outputPlugin *OutputPlugin) FlushAggregatedRecords(records *[]*kinesis.Put

if aggRecord != nil {
*records = append(*records, aggRecord)
outputPlugin.aggregatePartitionKey = outputPlugin.randomString()
}

return fluentbit.FLB_OK
Expand Down Expand Up @@ -426,7 +420,7 @@ func replaceDots(obj map[interface{}]interface{}, replacement string) map[interf
return obj
}

func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface{}, partitionKey string) ([]byte, error) {
func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface{}, partitionKeyLen int) ([]byte, error) {
if outputPlugin.dataKeys != "" {
record = plugins.DataKeys(outputPlugin.dataKeys, record)
}
Expand Down Expand Up @@ -473,9 +467,9 @@ func (outputPlugin *OutputPlugin) processRecord(record map[interface{}]interface
}
}

if len(data)+len(partitionKey) > maximumRecordSize {
logrus.Warnf("[kinesis %d] Found record with %d bytes, truncating to 1MB, stream=%s\n", outputPlugin.PluginID, len(data)+len(partitionKey), outputPlugin.stream)
data = data[:maximumRecordSize-len(partitionKey)-len(truncatedSuffix)]
if len(data)+partitionKeyLen > maximumRecordSize {
logrus.Warnf("[kinesis %d] Found record with %d bytes, truncating to 1MB, stream=%s\n", outputPlugin.PluginID, len(data)+partitionKeyLen, outputPlugin.stream)
data = data[:maximumRecordSize-partitionKeyLen-len(truncatedSuffix)]
data = append(data, []byte(truncatedSuffix)...)
}

Expand Down Expand Up @@ -554,15 +548,6 @@ func (outputPlugin *OutputPlugin) processAPIResponse(records *[]*kinesis.PutReco
return retCode, nil
}

// randomString generates a random string of length 8
// it uses the math/rand library
func (outputPlugin *OutputPlugin) randomString() string {
for i := range outputPlugin.random.buffer {
outputPlugin.random.buffer[i] = partitionKeyCharset[outputPlugin.random.seededRandom.Intn(len(partitionKeyCharset))]
}
return string(outputPlugin.random.buffer)
}

func getFromMap(dataKey string, record map[interface{}]interface{}) interface{} {
for k, v := range record {
currentKey := stringOrByteArray(k)
Expand All @@ -575,8 +560,9 @@ func getFromMap(dataKey string, record map[interface{}]interface{}) interface{}
}

// getPartitionKey returns the value for a given valid key
// if the given key is empty or invalid, it returns a random string
func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interface{}) string {
// if the given key is empty or invalid, it returns empty
// second return value indicates whether a partition key was found or not
func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interface{}) (string, bool) {
partitionKey := outputPlugin.partitionKey
if partitionKey != "" {
partitionKeys := strings.Split(partitionKey, "->")
Expand All @@ -589,25 +575,19 @@ func (outputPlugin *OutputPlugin) getPartitionKey(record map[interface{}]interfa
if len(value) > partitionKeyMaxLength {
value = value[0:partitionKeyMaxLength]
}
return value
return value, true
}
}
_, ok := newRecord.(map[interface{}]interface{})
if ok {
record = newRecord.(map[interface{}]interface{})
} else {
logrus.Errorf("[kinesis %d] The partition key could not be found in the record, using a random string instead", outputPlugin.PluginID)
return outputPlugin.randomString()
return "", false
}
}
}
if outputPlugin.isAggregate {
if outputPlugin.aggregatePartitionKey == "" {
outputPlugin.aggregatePartitionKey = outputPlugin.randomString()
}
return outputPlugin.aggregatePartitionKey
}
return outputPlugin.randomString()
return "", false
}

func zlibCompress(data []byte) ([]byte, error) {
Expand Down
Loading