Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix aggregator size estimation #155

Merged
merged 5 commits into from
Aug 23, 2021
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 43 additions & 7 deletions aggregate/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,25 @@ var (
const (
maximumRecordSize = 1024 * 1024 // 1 MB
defaultMaxAggRecordSize = 20 * 1024 // 20K
pKeyIdxSize = 8
aggProtobufBytes = 2 // Marshalling the data into protobuf adds an additional 2 bytes.
initialAggRecordSize = 0
fieldNumberSize = 1 // All field numbers are below 16, meaning they will only take up 1 byte
)

// Effectively just ceil(log base 128 of int)
// The size in bytes that the protobuf representation will take
func varint64Size(varint uint64) (size int) {
size = 1
for varint >= 0x80 {
jamiees2 marked this conversation as resolved.
Show resolved Hide resolved
size += 1
varint >>= 7;
}
return size;
}

func varintSize(varint int) (size int) {
return varint64Size(uint64(varint))
}

// Aggregator kinesis aggregator
type Aggregator struct {
partitionKeys map[string]uint64
Expand All @@ -38,6 +53,7 @@ func NewAggregator() *Aggregator {
partitionKeys: make(map[string]uint64, 0),
records: make([]*Record, 0),
maxAggRecordSize: defaultMaxAggRecordSize,
aggSize: initialAggRecordSize,
}
}

Expand All @@ -59,8 +75,16 @@ func (a *Aggregator) AddRecord(partitionKey string, data []byte) (entry *kinesis
PartitionKey: aws.String(partitionKey),
}, nil
}
// Check if we need to add a new partition key, and if we do how much space it will take
pKeyIdx, pKeyAddedSize := a.checkPartitionKey(partitionKey)

// data field size is data length + varint of data length size + data field number size
// partition key field size is varint of index size + field number size
recordSize := dataSize + varintSize(dataSize) + fieldNumberSize + varint64Size(pKeyIdx) + fieldNumberSize
// Total size is record size + varint of record size size + field number of parent proto
addedSize := recordSize + varintSize(recordSize) + fieldNumberSize

if a.getSize()+dataSize+partitionKeySize+pKeyIdxSize >= maximumRecordSize {
if a.getSize() + addedSize + pKeyAddedSize >= maximumRecordSize {
// Aggregate records, and return
entry, err = a.AggregateRecords()
if err != nil {
Expand All @@ -76,7 +100,7 @@ func (a *Aggregator) AddRecord(partitionKey string, data []byte) (entry *kinesis
PartitionKeyIndex: &partitionKeyIndex,
})

a.aggSize += dataSize + pKeyIdxSize
a.aggSize += addedSize

return entry, err
}
Expand Down Expand Up @@ -132,10 +156,22 @@ func (a *Aggregator) addPartitionKey(partitionKey string) uint64 {

idx := uint64(len(a.partitionKeys))
a.partitionKeys[partitionKey] = idx
a.aggSize += len([]byte(partitionKey))

partitionKeyLen := len([]byte(partitionKey))
a.aggSize += partitionKeyLen + varintSize(partitionKeyLen) + fieldNumberSize
return idx
}

func (a *Aggregator) checkPartitionKey(partitionKey string) (uint64, int) {
if idx, ok := a.partitionKeys[partitionKey]; ok {
return idx, 0
}

idx := uint64(len(a.partitionKeys))
partitionKeyLen := len([]byte(partitionKey))
return idx, partitionKeyLen + varintSize(partitionKeyLen) + fieldNumberSize
}

func (a *Aggregator) getPartitionKeys() []string {
keys := make([]string, 0)
for pk := range a.partitionKeys {
Expand All @@ -146,11 +182,11 @@ func (a *Aggregator) getPartitionKeys() []string {

// getSize of protobuf records, partitionKeys, magicNumber, and md5sum in bytes
func (a *Aggregator) getSize() int {
return a.aggSize + kclMagicNumberLen + md5.Size + aggProtobufBytes
return kclMagicNumberLen + md5.Size + a.aggSize
}

func (a *Aggregator) clearBuffer() {
a.partitionKeys = make(map[string]uint64, 0)
a.records = make([]*Record, 0)
a.aggSize = 0
a.aggSize = initialAggRecordSize
}