Skip to content

Commit

Permalink
db: store *Comparer on Batch
Browse files Browse the repository at this point in the history
Store a pointer to a Comparer on the Batch struct, rather than copying
individual fields during initialization. These fields were not accessed
directly during hot paths, so the indirection is insignificant. This will ease
use of Comparer.Split in new places, such as the batch iterator early exiting
during a SeekPrefixGE.
  • Loading branch information
jbowens committed Jul 26, 2024
1 parent e36d078 commit d7fbb86
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 51 deletions.
83 changes: 35 additions & 48 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,9 @@ type batchInternal struct {
// batches. Large batches will set the data field to nil when committed as
// the data has been moved to a flushableBatch and inserted into the queue of
// memtables.
data []byte
cmp Compare
formatKey base.FormatKey
abbreviatedKey AbbreviatedKey
opts batchOptions
data []byte
comparer *base.Comparer
opts batchOptions

// An upper bound on required space to add this batch to a memtable.
// Note that although batches are limited to 4 GiB in size, that limit
Expand Down Expand Up @@ -464,12 +462,10 @@ func newBatchWithSize(db *DB, size int, opts ...BatchOption) *Batch {

func newIndexedBatch(db *DB, comparer *Comparer) *Batch {
i := indexedBatchPool.Get().(*indexedBatch)
i.batch.cmp = comparer.Compare
i.batch.formatKey = comparer.FormatKey
i.batch.abbreviatedKey = comparer.AbbreviatedKey
i.batch.comparer = comparer
i.batch.db = db
i.batch.index = &i.index
i.batch.index.Init(&i.batch.data, i.batch.cmp, i.batch.abbreviatedKey)
i.batch.index.Init(&i.batch.data, comparer.Compare, comparer.AbbreviatedKey)
i.batch.opts.ensureDefaults()
return &i.batch
}
Expand Down Expand Up @@ -508,9 +504,7 @@ func (b *Batch) release() {
// field. Without using an atomic to clear that field the Go race detector
// complains.
b.reset()
b.cmp = nil
b.formatKey = nil
b.abbreviatedKey = nil
b.comparer = nil

if b.index == nil {
batchPool.Put(b)
Expand Down Expand Up @@ -637,14 +631,14 @@ func (b *Batch) Apply(batch *Batch, _ *WriteOptions) error {
b.tombstones = nil
b.tombstonesSeqNum = 0
if b.rangeDelIndex == nil {
b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
}
err = b.rangeDelIndex.Add(uint32(offset))
case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
b.rangeKeys = nil
b.rangeKeysSeqNum = 0
if b.rangeKeyIndex == nil {
b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
}
err = b.rangeKeyIndex.Add(uint32(offset))
default:
Expand Down Expand Up @@ -1014,7 +1008,7 @@ func (b *Batch) DeleteRangeDeferred(startLen, endLen int) *DeferredBatchOp {
b.tombstonesSeqNum = 0
// Range deletions are rare, so we lazily allocate the index for them.
if b.rangeDelIndex == nil {
b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
b.rangeDelIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
}
b.deferredOp.index = b.rangeDelIndex
}
Expand Down Expand Up @@ -1069,7 +1063,7 @@ func (b *Batch) incrementRangeKeysCount() {
b.rangeKeysSeqNum = 0
// Range keys are rare, so we lazily allocate the index for them.
if b.rangeKeyIndex == nil {
b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.cmp, b.abbreviatedKey)
b.rangeKeyIndex = batchskl.NewSkiplist(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
}
b.deferredOp.index = b.rangeKeyIndex
}
Expand Down Expand Up @@ -1283,7 +1277,6 @@ func (b *Batch) newInternalIter(o *IterOptions) *batchIter {

func (b *Batch) initInternalIter(o *IterOptions, iter *batchIter) {
*iter = batchIter{
cmp: b.cmp,
batch: b,
iter: b.index.NewIter(o.GetLowerBound(), o.GetUpperBound()),
// NB: We explicitly do not propagate the batch snapshot to the point
Expand Down Expand Up @@ -1321,7 +1314,7 @@ func (b *Batch) newRangeDelIter(o *IterOptions, batchSnapshot base.SeqNum) *keys

func (b *Batch) initRangeDelIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot base.SeqNum) {
if b.rangeDelIndex == nil {
iter.Init(b.cmp, nil)
iter.Init(b.comparer.Compare, nil)
return
}

Expand All @@ -1335,26 +1328,25 @@ func (b *Batch) initRangeDelIter(_ *IterOptions, iter *keyspan.Iter, batchSnapsh
// cleared.
nextSeqNum := b.nextSeqNum()
if b.tombstones != nil && b.tombstonesSeqNum <= batchSnapshot {
iter.Init(b.cmp, b.tombstones)
iter.Init(b.comparer.Compare, b.tombstones)
return
}

tombstones := make([]keyspan.Span, 0, b.countRangeDels)
frag := &keyspan.Fragmenter{
Cmp: b.cmp,
Format: b.formatKey,
Cmp: b.comparer.Compare,
Format: b.comparer.FormatKey,
Emit: func(s keyspan.Span) {
tombstones = append(tombstones, s)
},
}
it := &batchIter{
cmp: b.cmp,
batch: b,
iter: b.rangeDelIndex.NewIter(nil, nil),
snapshot: batchSnapshot,
}
fragmentRangeDels(frag, it, int(b.countRangeDels))
iter.Init(b.cmp, tombstones)
iter.Init(b.comparer.Compare, tombstones)

// If we just read all the tombstones in the batch (eg, batchSnapshot was
// set to b.nextSeqNum()), then cache the tombstones so that a subsequent
Expand Down Expand Up @@ -1397,7 +1389,7 @@ func (b *Batch) newRangeKeyIter(o *IterOptions, batchSnapshot base.SeqNum) *keys

func (b *Batch) initRangeKeyIter(_ *IterOptions, iter *keyspan.Iter, batchSnapshot base.SeqNum) {
if b.rangeKeyIndex == nil {
iter.Init(b.cmp, nil)
iter.Init(b.comparer.Compare, nil)
return
}

Expand All @@ -1410,26 +1402,25 @@ func (b *Batch) initRangeKeyIter(_ *IterOptions, iter *keyspan.Iter, batchSnapsh
// sequence number the cache would've been cleared.
nextSeqNum := b.nextSeqNum()
if b.rangeKeys != nil && b.rangeKeysSeqNum <= batchSnapshot {
iter.Init(b.cmp, b.rangeKeys)
iter.Init(b.comparer.Compare, b.rangeKeys)
return
}

rangeKeys := make([]keyspan.Span, 0, b.countRangeKeys)
frag := &keyspan.Fragmenter{
Cmp: b.cmp,
Format: b.formatKey,
Cmp: b.comparer.Compare,
Format: b.comparer.FormatKey,
Emit: func(s keyspan.Span) {
rangeKeys = append(rangeKeys, s)
},
}
it := &batchIter{
cmp: b.cmp,
batch: b,
iter: b.rangeKeyIndex.NewIter(nil, nil),
snapshot: batchSnapshot,
}
fragmentRangeKeys(frag, it, int(b.countRangeKeys))
iter.Init(b.cmp, rangeKeys)
iter.Init(b.comparer.Compare, rangeKeys)

// If we just read all the range keys in the batch (eg, batchSnapshot was
// set to b.nextSeqNum()), then cache the range keys so that a subsequent
Expand Down Expand Up @@ -1553,13 +1544,11 @@ func (b *Batch) reset() {
// Zero out the struct, retaining only the fields necessary for manual
// reuse.
b.batchInternal = batchInternal{
data: b.data,
cmp: b.cmp,
formatKey: b.formatKey,
abbreviatedKey: b.abbreviatedKey,
opts: b.opts,
index: b.index,
db: b.db,
data: b.data,
comparer: b.comparer,
opts: b.opts,
index: b.index,
db: b.db,
}
b.applied.Store(false)
if b.data != nil {
Expand All @@ -1576,7 +1565,7 @@ func (b *Batch) reset() {
}
}
if b.index != nil {
b.index.Init(&b.data, b.cmp, b.abbreviatedKey)
b.index.Init(&b.data, b.comparer.Compare, b.comparer.AbbreviatedKey)
}
}

Expand Down Expand Up @@ -1658,7 +1647,6 @@ func (b *Batch) CommitStats() BatchCommitStats {
// Note: batchIter mirrors the implementation of flushableBatchIter. Keep the
// two in sync.
type batchIter struct {
cmp Compare
batch *Batch
iter batchskl.Iterator
kv base.InternalKV
Expand Down Expand Up @@ -1698,7 +1686,6 @@ func (i *batchIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV
}

func (i *batchIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *base.InternalKV {
i.err = nil // clear cached iteration error
return i.SeekGE(key, flags)
}

Expand Down Expand Up @@ -1849,9 +1836,9 @@ type flushableBatchEntry struct {
// flushableBatch wraps an existing batch and provides the interfaces needed
// for making the batch flushable (i.e. able to mimic a memtable).
type flushableBatch struct {
cmp Compare
formatKey base.FormatKey
data []byte
cmp Compare
comparer *base.Comparer
data []byte

// The base sequence number for the entries in the batch. This is the same
// value as Batch.seqNum() and is cached here for performance.
Expand Down Expand Up @@ -1883,10 +1870,10 @@ var _ flushable = (*flushableBatch)(nil)
// of the batch data.
func newFlushableBatch(batch *Batch, comparer *Comparer) (*flushableBatch, error) {
b := &flushableBatch{
data: batch.data,
cmp: comparer.Compare,
formatKey: comparer.FormatKey,
offsets: make([]flushableBatchEntry, 0, batch.Count()),
data: batch.data,
cmp: comparer.Compare,
comparer: comparer,
offsets: make([]flushableBatchEntry, 0, batch.Count()),
}
if b.data != nil {
// Note that this sequence number is not correct when this batch has not
Expand Down Expand Up @@ -1969,7 +1956,7 @@ func newFlushableBatch(batch *Batch, comparer *Comparer) (*flushableBatch, error
if len(rangeDelOffsets) > 0 {
frag := &keyspan.Fragmenter{
Cmp: b.cmp,
Format: b.formatKey,
Format: b.comparer.FormatKey,
Emit: func(s keyspan.Span) {
b.tombstones = append(b.tombstones, s)
},
Expand All @@ -1986,7 +1973,7 @@ func newFlushableBatch(batch *Batch, comparer *Comparer) (*flushableBatch, error
if len(rangeKeyOffsets) > 0 {
frag := &keyspan.Fragmenter{
Cmp: b.cmp,
Format: b.formatKey,
Format: b.comparer.FormatKey,
Emit: func(s keyspan.Span) {
b.rangeKeys = append(b.rangeKeys, s)
},
Expand Down
4 changes: 1 addition & 3 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,9 +528,7 @@ func TestIndexedBatchReset(t *testing.T) {
require.Equal(t, 1, indexCount(b.index))

b.Reset()
require.NotNil(t, b.cmp)
require.NotNil(t, b.formatKey)
require.NotNil(t, b.abbreviatedKey)
require.NotNil(t, b.comparer)
require.NotNil(t, b.index)
require.Nil(t, b.rangeDelIndex)
require.Nil(t, b.rangeKeyIndex)
Expand Down

0 comments on commit d7fbb86

Please sign in to comment.