Skip to content

Commit

Permalink
db: fix cases where SeekPrefixGE prefix doesn't match key
Browse files Browse the repository at this point in the history
This commit adds `SeekPrefixGE` assertions verifying that the `prefix`
actually is the prefix for the `key`. This was not always the case so
the offending code paths are adjusted.

In the future, we should create a wrapper iterator that verifies this
sort of thing.

Informs #3794
  • Loading branch information
RaduBerinde committed Aug 13, 2024
1 parent 80a5615 commit 92187e1
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 41 deletions.
6 changes: 6 additions & 0 deletions internal/base/comparer.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ func (s Split) Prefix(k []byte) []byte {
return k[:i:i]
}

// HasPrefix returns true if the given key has the given prefix.
func (s Split) HasPrefix(prefix, key []byte) bool {
i := s(key)
return bytes.Equal(prefix, key[:i:i])
}

// DefaultSplit is a trivial implementation of Split which always returns the
// full key.
var DefaultSplit Split = func(key []byte) int { return len(key) }
Expand Down
11 changes: 3 additions & 8 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1476,23 +1476,18 @@ func (i *Iterator) SeekPrefixGE(key []byte) bool {
}
// Make a copy of the prefix so that modifications to the key after
// SeekPrefixGE returns does not affect the stored prefix.
if cap(i.prefixOrFullSeekKey) < prefixLen {
i.prefixOrFullSeekKey = make([]byte, prefixLen)
} else {
i.prefixOrFullSeekKey = i.prefixOrFullSeekKey[:prefixLen]
}
i.hasPrefix = true
copy(i.prefixOrFullSeekKey, keyPrefix)
i.prefixOrFullSeekKey = append(i.prefixOrFullSeekKey[:0], keyPrefix...)

if lowerBound := i.opts.GetLowerBound(); lowerBound != nil && i.cmp(key, lowerBound) < 0 {
if p := i.comparer.Split.Prefix(lowerBound); !bytes.Equal(i.prefixOrFullSeekKey, p) {
if !i.comparer.Split.HasPrefix(i.prefixOrFullSeekKey, lowerBound) {
i.err = errors.New("pebble: SeekPrefixGE supplied with key outside of lower bound")
i.iterValidityState = IterExhausted
return false
}
key = lowerBound
} else if upperBound := i.opts.GetUpperBound(); upperBound != nil && i.cmp(key, upperBound) > 0 {
if p := i.comparer.Split.Prefix(upperBound); !bytes.Equal(i.prefixOrFullSeekKey, p) {
if !i.comparer.Split.HasPrefix(i.prefixOrFullSeekKey, upperBound) {
i.err = errors.New("pebble: SeekPrefixGE supplied with key outside of upper bound")
i.iterValidityState = IterExhausted
return false
Expand Down
3 changes: 3 additions & 0 deletions level_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,9 @@ func (l *levelIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV
}

func (l *levelIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *base.InternalKV {
if invariants.Enabled && !l.split.HasPrefix(prefix, key) {
panic(fmt.Sprintf("invalid prefix %q for key %q", prefix, key))
}
if invariants.Enabled && l.lower != nil && l.cmp(key, l.lower) < 0 {
panic(errors.AssertionFailedf("levelIter SeekGE to key %q violates lower bound %q", key, l.lower))
}
Expand Down
43 changes: 28 additions & 15 deletions merging_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,11 +524,9 @@ func (m *mergingIter) nextEntry(l *mergingIterLevel, succKey []byte) error {
// P2. Care is taken to avoid ever advancing the iterator beyond the current
// prefix. If nextEntry is ever invoked while we're already beyond the
// current prefix, we're violating the invariant.
if invariants.Enabled && m.prefix != nil {
if p := m.split.Prefix(l.iterKV.K.UserKey); !bytes.Equal(m.prefix, p) {
m.logger.Fatalf("mergingIter: prefix violation: nexting beyond prefix %q; existing heap root %q\n%s",
m.prefix, l.iterKV, debug.Stack())
}
if invariants.Enabled && m.prefix != nil && !m.split.HasPrefix(m.prefix, l.iterKV.K.UserKey) {
m.logger.Fatalf("mergingIter: prefix violation: nexting beyond prefix %q; existing heap root %q\n%s",
m.prefix, l.iterKV, debug.Stack())
}

oldTopLevel := l.index
Expand Down Expand Up @@ -905,6 +903,10 @@ func (m *mergingIter) findPrevEntry() *base.InternalKV {
//
// If an error occurs, seekGE returns the error without setting m.err.
func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) error {
if invariants.Enabled && m.lower != nil && m.heap.cmp(key, m.lower) < 0 {
m.logger.Fatalf("mergingIter: lower bound violation: %s < %s\n%s", key, m.lower, debug.Stack())
}

// When seeking, we can use tombstones to adjust the key we seek to on each
// level. Consider the series of range tombstones:
//
Expand Down Expand Up @@ -957,15 +959,11 @@ func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) erro
}

for ; level < len(m.levels); level++ {
if invariants.Enabled && m.lower != nil && m.heap.cmp(key, m.lower) < 0 {
m.logger.Fatalf("mergingIter: lower bound violation: %s < %s\n%s", key, m.lower, debug.Stack())
}

l := &m.levels[level]
if m.prefix != nil {
l.iterKV = l.iter.SeekPrefixGE(m.prefix, key, flags)
if l.iterKV != nil {
if !bytes.Equal(m.prefix, m.split.Prefix(l.iterKV.K.UserKey)) {
if !m.split.HasPrefix(m.prefix, l.iterKV.K.UserKey) {
// Prevent keys without a matching prefix from being added to the heap by setting
// iterKey and iterValue to their zero values before calling initMinHeap.
l.iterKV = nil
Expand Down Expand Up @@ -999,7 +997,21 @@ func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) erro
// Based on the containment condition tombstone.End > key, so
// the assignment to key results in a monotonically
// non-decreasing key across iterations of this loop.
//
if m.prefix != nil && !m.split.HasPrefix(m.prefix, l.tombstone.End) {
// Any keys with m.prefix on subsequent levels are under the tombstone.
// We still need to perform the seeks, in case the next seek uses
// the TrySeekUsingNext flag.
for level++; level < len(m.levels); level++ {
l := &m.levels[level]
if kv := l.iter.SeekPrefixGE(m.prefix, key, flags); kv == nil {
if err := l.iter.Error(); err != nil {
return err
}
}
l.iterKV = nil
}
break
}
// The adjustment of key here can only move it to a larger key.
// Since the caller of seekGE guaranteed that the original key
// was greater than or equal to m.lower, the new key will
Expand Down Expand Up @@ -1037,17 +1049,18 @@ func (m *mergingIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *
func (m *mergingIter) SeekPrefixGEStrict(
prefix, key []byte, flags base.SeekGEFlags,
) *base.InternalKV {
if invariants.Enabled && !m.split.HasPrefix(prefix, key) {
panic(fmt.Sprintf("invalid prefix %q for key %q", prefix, key))
}
m.prefix = prefix
m.err = m.seekGE(key, 0 /* start level */, flags)
if m.err != nil {
return nil
}

iterKV := m.findNextEntry()
if invariants.Enabled && iterKV != nil {
if !bytes.Equal(m.prefix, m.split.Prefix(iterKV.K.UserKey)) {
m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %q\n", iterKV, m.prefix)
}
if invariants.Enabled && iterKV != nil && !m.split.HasPrefix(m.prefix, iterKV.K.UserKey) {
m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %q\n", iterKV, m.prefix)
}
return iterKV
}
Expand Down
37 changes: 29 additions & 8 deletions sstable/reader_iter_single_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,8 +176,13 @@ type singleLevelIterator[D any, PD block.DataBlockIterator[D]] struct {
// present, should be used for prefix seeks or not. In some cases it is
// beneficial to skip a filter block even if it exists (eg. if probability of
// a match is high).
useFilterBlock bool
lastBloomFilterMatched bool
useFilterBlock bool

// didNotPositionOnLastSeekGE is set to true if we completed a call to SeekGE
// or SeekPrefixGE without positioning the iterator internally. If this flag
// is set, the TrySeekUsingNext optimization is disabled on the next seek.
// This happens for example when the bloom filter excludes a prefix.
didNotPositionOnLastSeekGE bool

transforms IterTransforms

Expand Down Expand Up @@ -665,6 +670,11 @@ func (i *singleLevelIterator[D, PD]) SeekGE(key []byte, flags base.SeekGEFlags)
key = i.lower
}
}
if i.didNotPositionOnLastSeekGE {
// Iterator is not positioned based on last seek.
flags = flags.DisableTrySeekUsingNext()
i.didNotPositionOnLastSeekGE = false
}

if flags.TrySeekUsingNext() {
// The i.exhaustedBounds comparison indicates that the upper bound was
Expand Down Expand Up @@ -817,6 +827,12 @@ func (i *singleLevelIterator[D, PD]) SeekPrefixGE(
// TODO(bananabrick): We can optimize away this check for the level iter
// if necessary.
if i.cmp(key, i.lower) < 0 {
if !i.reader.Split.HasPrefix(prefix, i.lower) {
i.err = nil // clear any cached iteration error
// Disable the TrySeekUsingNext optimization next time.
i.didNotPositionOnLastSeekGE = true
return nil
}
key = i.lower
}
}
Expand All @@ -826,18 +842,22 @@ func (i *singleLevelIterator[D, PD]) SeekPrefixGE(
func (i *singleLevelIterator[D, PD]) seekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (kv *base.InternalKV) {
if invariants.Enabled && !i.reader.Split.HasPrefix(prefix, key) {
panic(fmt.Sprintf("invalid prefix %q for key %q", prefix, key))
}
if i.didNotPositionOnLastSeekGE {
// Iterator is not positioned based on last seek.
flags = flags.DisableTrySeekUsingNext()
i.didNotPositionOnLastSeekGE = false
}

// NOTE: prefix is only used for bloom filter checking and not later work in
// this method. Hence, we can use the existing iterator position if the last
// SeekPrefixGE did not fail bloom filter matching.

err := i.err
i.err = nil // clear cached iteration error
if i.useFilterBlock {
if !i.lastBloomFilterMatched {
// Iterator is not positioned based on last seek.
flags = flags.DisableTrySeekUsingNext()
}
i.lastBloomFilterMatched = false
// Check prefix bloom filter.
var mayContain bool
mayContain, i.err = i.bloomFilterMayContain(prefix)
Expand All @@ -848,9 +868,10 @@ func (i *singleLevelIterator[D, PD]) seekPrefixGE(
// since the caller was allowed to call Next when SeekPrefixGE returned
// nil. This is no longer allowed.
PD(&i.data).Invalidate()
// Disable the TrySeekUsingNext optimization next time.
i.didNotPositionOnLastSeekGE = true
return nil
}
i.lastBloomFilterMatched = true
}
if flags.TrySeekUsingNext() {
// The i.exhaustedBounds comparison indicates that the upper bound was
Expand Down
29 changes: 19 additions & 10 deletions sstable/reader_iter_two_lvl.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,12 @@ type twoLevelIterator[D any, PD block.DataBlockIterator[D]] struct {
// useFilterBlock controls whether we consult the bloom filter in the
// twoLevelIterator code. Note that secondLevel.useFilterBlock is always
// false - any filtering happens at the top level.
useFilterBlock bool
lastBloomFilterMatched bool
useFilterBlock bool
// didNotPositionOnLastSeekGE is set to true if we completed a call to SeekGE
// or SeekPrefixGE without positioning the iterator internally. If this flag
// is set, the TrySeekUsingNext optimization is disabled on the next seek.
// This happens for example when the bloom filter excludes a prefix.
didNotPositionOnLastSeekGE bool
}

var _ Iterator = (*twoLevelIterator[rowblk.Iter, *rowblk.Iter])(nil)
Expand Down Expand Up @@ -371,9 +375,19 @@ func (i *twoLevelIterator[D, PD]) SeekPrefixGE(
// TODO(bananabrick): We can optimize away this check for the level iter
// if necessary.
if i.secondLevel.cmp(key, i.secondLevel.lower) < 0 {
if !i.secondLevel.reader.Split.HasPrefix(prefix, i.secondLevel.lower) {
i.secondLevel.err = nil // clear any cached iteration error
// Disable the TrySeekUsingNext optimization next time.
i.didNotPositionOnLastSeekGE = true
return nil
}
key = i.secondLevel.lower
}
}
if i.didNotPositionOnLastSeekGE {
flags = flags.DisableTrySeekUsingNext()
i.didNotPositionOnLastSeekGE = false
}

// NOTE: prefix is only used for bloom filter checking and not later work in
// this method. Hence, we can use the existing iterator position if the last
Expand All @@ -385,8 +399,7 @@ func (i *twoLevelIterator[D, PD]) SeekPrefixGE(
// The twoLevelIterator could be already exhausted. Utilize that when
// trySeekUsingNext is true. See the comment about data-exhausted, PGDE, and
// bounds-exhausted near the top of the file.
filterUsedAndDidNotMatch := i.useFilterBlock && !i.lastBloomFilterMatched
if flags.TrySeekUsingNext() && !filterUsedAndDidNotMatch &&
if flags.TrySeekUsingNext() &&
(i.secondLevel.exhaustedBounds == +1 || (PD(&i.secondLevel.data).IsDataInvalidated() && i.secondLevel.index.IsDataInvalidated())) &&
err == nil {
// Already exhausted, so return nil.
Expand All @@ -395,11 +408,6 @@ func (i *twoLevelIterator[D, PD]) SeekPrefixGE(

// Check prefix bloom filter.
if i.useFilterBlock {
if !i.lastBloomFilterMatched {
// Iterator is not positioned based on last seek.
flags = flags.DisableTrySeekUsingNext()
}
i.lastBloomFilterMatched = false
var mayContain bool
mayContain, i.secondLevel.err = i.secondLevel.bloomFilterMayContain(prefix)
if i.secondLevel.err != nil || !mayContain {
Expand All @@ -409,9 +417,10 @@ func (i *twoLevelIterator[D, PD]) SeekPrefixGE(
// since the caller was allowed to call Next when SeekPrefixGE returned
// nil. This is no longer allowed.
PD(&i.secondLevel.data).Invalidate()
// Disable the TrySeekUsingNext optimization next time.
i.didNotPositionOnLastSeekGE = true
return nil
}
i.lastBloomFilterMatched = true
}

// Bloom filter matches.
Expand Down
10 changes: 10 additions & 0 deletions sstable/testdata/virtual_reader_iter
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ bounds: [dd#5,SET-ddd#6,SET]
# Check lower bound enforcement during SeekPrefixGE.
iter
seek-prefix-ge d
----
.

iter
seek-prefix-ge dd
next
next
----
Expand Down Expand Up @@ -292,6 +297,11 @@ bounds: [dd#5,SET-ddd#6,SET]
# Check lower bound enforcement during SeekPrefixGE.
iter
seek-prefix-ge d
----
.

iter
seek-prefix-ge dd
next
next
----
Expand Down

0 comments on commit 92187e1

Please sign in to comment.