Skip to content

Commit

Permalink
Introduce writeBatch.adaptiveSize field
Browse files Browse the repository at this point in the history
  • Loading branch information
linasm committed Apr 21, 2021
1 parent 2a26a83 commit ba0140e
Showing 1 changed file with 14 additions and 7 deletions.
21 changes: 14 additions & 7 deletions src/dbnode/ts/writes/write_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var (
const (
// preallocateBatchCoeff is used for allocating write batches of slightly bigger
// capacity than needed for the current request, in order to reduce allocations on
// subsequent reuse of pooled write batch.
// subsequent reuse of pooled write batch (effective when writeBatch.adaptiveSize == true).
preallocateBatchCoeff = 1.2
)

Expand All @@ -53,6 +53,11 @@ type writeBatch struct {
// writeBatch itself gets finalized.
finalizeAnnotationFn FinalizeAnnotationFn
finalizeFn func(WriteBatch)

// adaptiveSize means that we create writeBatch with nil slices originally,
// and then allocate/expand them based on the actual batch size (this provides
// more resilience when dealing with small batch sizes).
adaptiveSize bool
}

// NewWriteBatch creates a new WriteBatch.
Expand All @@ -62,11 +67,12 @@ func NewWriteBatch(
finalizeFn func(WriteBatch),
) WriteBatch {
var (
adaptiveSize = initialBatchSize == 0
writes []BatchWrite
pendingIndex []PendingIndexInsert
)

if initialBatchSize > 0 {
if !adaptiveSize {
writes = make([]BatchWrite, 0, initialBatchSize)
pendingIndex = make([]PendingIndexInsert, 0, initialBatchSize)
// Leaving nil slices if initialBatchSize == 0,
Expand All @@ -78,6 +84,7 @@ func NewWriteBatch(
pendingIndex: pendingIndex,
ns: ns,
finalizeFn: finalizeFn,
adaptiveSize: adaptiveSize,
}
}

Expand Down Expand Up @@ -122,12 +129,12 @@ func (b *writeBatch) Reset(
ns ident.ID,
) {
// Preallocate slightly more when not using initialBatchSize.
preallocateBatchCap := int(float32(batchSize) * preallocateBatchCoeff)
adaptiveBatchCap := int(float32(batchSize) * preallocateBatchCoeff)

if batchSize > cap(b.writes) {
batchCap := batchSize
if cap(b.writes) == 0 {
batchCap = preallocateBatchCap
if b.adaptiveSize {
batchCap = adaptiveBatchCap
}
b.writes = make([]BatchWrite, 0, batchCap)
} else {
Expand All @@ -136,8 +143,8 @@ func (b *writeBatch) Reset(

if batchSize > cap(b.pendingIndex) {
batchCap := batchSize
if cap(b.pendingIndex) == 0 {
batchCap = preallocateBatchCap
if b.adaptiveSize {
batchCap = adaptiveBatchCap
}
b.pendingIndex = make([]PendingIndexInsert, 0, batchCap)
} else {
Expand Down

0 comments on commit ba0140e

Please sign in to comment.