From 92187e19958f1f7a5906672f60e81fead51f07c7 Mon Sep 17 00:00:00 2001 From: Radu Berinde Date: Tue, 13 Aug 2024 13:31:42 -0700 Subject: [PATCH] db: fix cases where SeekPrefixGE prefix doesn't match key This commit adds `SeekPrefixGE` assertions verifying that the `prefix` actually is the prefix for the `key`. This was not always the case so the offending code paths are adjusted. In the future, we should create a wrapper iterator that verifies this sort of thing. Informs #3794 --- internal/base/comparer.go | 6 ++++ iterator.go | 11 ++----- level_iter.go | 3 ++ merging_iter.go | 43 ++++++++++++++++++---------- sstable/reader_iter_single_lvl.go | 37 ++++++++++++++++++------ sstable/reader_iter_two_lvl.go | 29 ++++++++++++------- sstable/testdata/virtual_reader_iter | 10 +++++++ 7 files changed, 98 insertions(+), 41 deletions(-) diff --git a/internal/base/comparer.go b/internal/base/comparer.go index 934ba820a9..46c6a67f96 100644 --- a/internal/base/comparer.go +++ b/internal/base/comparer.go @@ -159,6 +159,12 @@ func (s Split) Prefix(k []byte) []byte { return k[:i:i] } +// HasPrefix returns true if the given key has the given prefix. +func (s Split) HasPrefix(prefix, key []byte) bool { + i := s(key) + return bytes.Equal(prefix, key[:i:i]) +} + // DefaultSplit is a trivial implementation of Split which always returns the // full key. var DefaultSplit Split = func(key []byte) int { return len(key) } diff --git a/iterator.go b/iterator.go index ee85d4d027..230e10c2ea 100644 --- a/iterator.go +++ b/iterator.go @@ -1476,23 +1476,18 @@ func (i *Iterator) SeekPrefixGE(key []byte) bool { } // Make a copy of the prefix so that modifications to the key after // SeekPrefixGE returns does not affect the stored prefix. - if cap(i.prefixOrFullSeekKey) < prefixLen { - i.prefixOrFullSeekKey = make([]byte, prefixLen) - } else { - i.prefixOrFullSeekKey = i.prefixOrFullSeekKey[:prefixLen] - } i.hasPrefix = true - copy(i.prefixOrFullSeekKey, keyPrefix) + i.prefixOrFullSeekKey = append(i.prefixOrFullSeekKey[:0], keyPrefix...) if lowerBound := i.opts.GetLowerBound(); lowerBound != nil && i.cmp(key, lowerBound) < 0 { - if p := i.comparer.Split.Prefix(lowerBound); !bytes.Equal(i.prefixOrFullSeekKey, p) { + if !i.comparer.Split.HasPrefix(i.prefixOrFullSeekKey, lowerBound) { i.err = errors.New("pebble: SeekPrefixGE supplied with key outside of lower bound") i.iterValidityState = IterExhausted return false } key = lowerBound } else if upperBound := i.opts.GetUpperBound(); upperBound != nil && i.cmp(key, upperBound) > 0 { - if p := i.comparer.Split.Prefix(upperBound); !bytes.Equal(i.prefixOrFullSeekKey, p) { + if !i.comparer.Split.HasPrefix(i.prefixOrFullSeekKey, upperBound) { i.err = errors.New("pebble: SeekPrefixGE supplied with key outside of upper bound") i.iterValidityState = IterExhausted return false diff --git a/level_iter.go b/level_iter.go index 53da788b88..0a241e12c4 100644 --- a/level_iter.go +++ b/level_iter.go @@ -655,6 +655,9 @@ func (l *levelIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV } func (l *levelIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *base.InternalKV { + if invariants.Enabled && !l.split.HasPrefix(prefix, key) { + panic(fmt.Sprintf("invalid prefix %q for key %q", prefix, key)) + } if invariants.Enabled && l.lower != nil && l.cmp(key, l.lower) < 0 { panic(errors.AssertionFailedf("levelIter SeekGE to key %q violates lower bound %q", key, l.lower)) } diff --git a/merging_iter.go b/merging_iter.go index 254d26b8c0..f49a0b80fd 100644 --- a/merging_iter.go +++ b/merging_iter.go @@ -524,11 +524,9 @@ func (m *mergingIter) nextEntry(l *mergingIterLevel, succKey []byte) error { // P2. Care is taken to avoid ever advancing the iterator beyond the current // prefix. If nextEntry is ever invoked while we're already beyond the // current prefix, we're violating the invariant. - if invariants.Enabled && m.prefix != nil { - if p := m.split.Prefix(l.iterKV.K.UserKey); !bytes.Equal(m.prefix, p) { - m.logger.Fatalf("mergingIter: prefix violation: nexting beyond prefix %q; existing heap root %q\n%s", - m.prefix, l.iterKV, debug.Stack()) - } + if invariants.Enabled && m.prefix != nil && !m.split.HasPrefix(m.prefix, l.iterKV.K.UserKey) { + m.logger.Fatalf("mergingIter: prefix violation: nexting beyond prefix %q; existing heap root %q\n%s", + m.prefix, l.iterKV, debug.Stack()) } oldTopLevel := l.index @@ -905,6 +903,10 @@ func (m *mergingIter) findPrevEntry() *base.InternalKV { // // If an error occurs, seekGE returns the error without setting m.err. func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) error { + if invariants.Enabled && m.lower != nil && m.heap.cmp(key, m.lower) < 0 { + m.logger.Fatalf("mergingIter: lower bound violation: %s < %s\n%s", key, m.lower, debug.Stack()) + } + // When seeking, we can use tombstones to adjust the key we seek to on each // level. Consider the series of range tombstones: // @@ -957,15 +959,11 @@ func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) erro } for ; level < len(m.levels); level++ { - if invariants.Enabled && m.lower != nil && m.heap.cmp(key, m.lower) < 0 { - m.logger.Fatalf("mergingIter: lower bound violation: %s < %s\n%s", key, m.lower, debug.Stack()) - } - l := &m.levels[level] if m.prefix != nil { l.iterKV = l.iter.SeekPrefixGE(m.prefix, key, flags) if l.iterKV != nil { - if !bytes.Equal(m.prefix, m.split.Prefix(l.iterKV.K.UserKey)) { + if !m.split.HasPrefix(m.prefix, l.iterKV.K.UserKey) { // Prevent keys without a matching prefix from being added to the heap by setting // iterKey and iterValue to their zero values before calling initMinHeap. l.iterKV = nil @@ -999,7 +997,21 @@ func (m *mergingIter) seekGE(key []byte, level int, flags base.SeekGEFlags) erro // Based on the containment condition tombstone.End > key, so // the assignment to key results in a monotonically // non-decreasing key across iterations of this loop. - // + if m.prefix != nil && !m.split.HasPrefix(m.prefix, l.tombstone.End) { + // Any keys with m.prefix on subsequent levels are under the tombstone. + // We still need to perform the seeks, in case the next seek uses + // the TrySeekUsingNext flag. + for level++; level < len(m.levels); level++ { + l := &m.levels[level] + if kv := l.iter.SeekPrefixGE(m.prefix, key, flags); kv == nil { + if err := l.iter.Error(); err != nil { + return err + } + } + l.iterKV = nil + } + break + } // The adjustment of key here can only move it to a larger key. // Since the caller of seekGE guaranteed that the original key // was greater than or equal to m.lower, the new key will @@ -1037,6 +1049,9 @@ func (m *mergingIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) * func (m *mergingIter) SeekPrefixGEStrict( prefix, key []byte, flags base.SeekGEFlags, ) *base.InternalKV { + if invariants.Enabled && !m.split.HasPrefix(prefix, key) { + panic(fmt.Sprintf("invalid prefix %q for key %q", prefix, key)) + } m.prefix = prefix m.err = m.seekGE(key, 0 /* start level */, flags) if m.err != nil { @@ -1044,10 +1059,8 @@ func (m *mergingIter) SeekPrefixGEStrict( } iterKV := m.findNextEntry() - if invariants.Enabled && iterKV != nil { - if !bytes.Equal(m.prefix, m.split.Prefix(iterKV.K.UserKey)) { - m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %q\n", iterKV, m.prefix) - } + if invariants.Enabled && iterKV != nil && !m.split.HasPrefix(m.prefix, iterKV.K.UserKey) { + m.logger.Fatalf("mergingIter: prefix violation: returning key %q without prefix %q\n", iterKV, m.prefix) } return iterKV } diff --git a/sstable/reader_iter_single_lvl.go b/sstable/reader_iter_single_lvl.go index fabe55a444..54b6bdfa2a 100644 --- a/sstable/reader_iter_single_lvl.go +++ b/sstable/reader_iter_single_lvl.go @@ -176,8 +176,13 @@ type singleLevelIterator[D any, PD block.DataBlockIterator[D]] struct { // present, should be used for prefix seeks or not. In some cases it is // beneficial to skip a filter block even if it exists (eg. if probability of // a match is high). - useFilterBlock bool - lastBloomFilterMatched bool + useFilterBlock bool + + // didNotPositionOnLastSeekGE is set to true if we completed a call to SeekGE + // or SeekPrefixGE without positioning the iterator internally. If this flag + // is set, the TrySeekUsingNext optimization is disabled on the next seek. + // This happens for example when the bloom filter excludes a prefix. + didNotPositionOnLastSeekGE bool transforms IterTransforms @@ -665,6 +670,11 @@ func (i *singleLevelIterator[D, PD]) SeekGE(key []byte, flags base.SeekGEFlags) key = i.lower } } + if i.didNotPositionOnLastSeekGE { + // Iterator is not positioned based on last seek. + flags = flags.DisableTrySeekUsingNext() + i.didNotPositionOnLastSeekGE = false + } if flags.TrySeekUsingNext() { // The i.exhaustedBounds comparison indicates that the upper bound was @@ -817,6 +827,12 @@ func (i *singleLevelIterator[D, PD]) SeekPrefixGE( // TODO(bananabrick): We can optimize away this check for the level iter // if necessary. if i.cmp(key, i.lower) < 0 { + if !i.reader.Split.HasPrefix(prefix, i.lower) { + i.err = nil // clear any cached iteration error + // Disable the TrySeekUsingNext optimization next time. + i.didNotPositionOnLastSeekGE = true + return nil + } key = i.lower } } @@ -826,6 +842,15 @@ func (i *singleLevelIterator[D, PD]) SeekPrefixGE( func (i *singleLevelIterator[D, PD]) seekPrefixGE( prefix, key []byte, flags base.SeekGEFlags, ) (kv *base.InternalKV) { + if invariants.Enabled && !i.reader.Split.HasPrefix(prefix, key) { + panic(fmt.Sprintf("invalid prefix %q for key %q", prefix, key)) + } + if i.didNotPositionOnLastSeekGE { + // Iterator is not positioned based on last seek. + flags = flags.DisableTrySeekUsingNext() + i.didNotPositionOnLastSeekGE = false + } + // NOTE: prefix is only used for bloom filter checking and not later work in // this method. Hence, we can use the existing iterator position if the last // SeekPrefixGE did not fail bloom filter matching. @@ -833,11 +858,6 @@ func (i *singleLevelIterator[D, PD]) seekPrefixGE( err := i.err i.err = nil // clear cached iteration error if i.useFilterBlock { - if !i.lastBloomFilterMatched { - // Iterator is not positioned based on last seek. - flags = flags.DisableTrySeekUsingNext() - } - i.lastBloomFilterMatched = false // Check prefix bloom filter. var mayContain bool mayContain, i.err = i.bloomFilterMayContain(prefix) @@ -848,9 +868,10 @@ func (i *singleLevelIterator[D, PD]) seekPrefixGE( // since the caller was allowed to call Next when SeekPrefixGE returned // nil. This is no longer allowed. PD(&i.data).Invalidate() + // Disable the TrySeekUsingNext optimization next time. + i.didNotPositionOnLastSeekGE = true return nil } - i.lastBloomFilterMatched = true } if flags.TrySeekUsingNext() { // The i.exhaustedBounds comparison indicates that the upper bound was diff --git a/sstable/reader_iter_two_lvl.go b/sstable/reader_iter_two_lvl.go index 593d9c4e5a..d2fb26f8ed 100644 --- a/sstable/reader_iter_two_lvl.go +++ b/sstable/reader_iter_two_lvl.go @@ -32,8 +32,12 @@ type twoLevelIterator[D any, PD block.DataBlockIterator[D]] struct { // useFilterBlock controls whether we consult the bloom filter in the // twoLevelIterator code. Note that secondLevel.useFilterBlock is always // false - any filtering happens at the top level. - useFilterBlock bool - lastBloomFilterMatched bool + useFilterBlock bool + // didNotPositionOnLastSeekGE is set to true if we completed a call to SeekGE + // or SeekPrefixGE without positioning the iterator internally. If this flag + // is set, the TrySeekUsingNext optimization is disabled on the next seek. + // This happens for example when the bloom filter excludes a prefix. + didNotPositionOnLastSeekGE bool } var _ Iterator = (*twoLevelIterator[rowblk.Iter, *rowblk.Iter])(nil) @@ -371,9 +375,19 @@ func (i *twoLevelIterator[D, PD]) SeekPrefixGE( // TODO(bananabrick): We can optimize away this check for the level iter // if necessary. if i.secondLevel.cmp(key, i.secondLevel.lower) < 0 { + if !i.secondLevel.reader.Split.HasPrefix(prefix, i.secondLevel.lower) { + i.secondLevel.err = nil // clear any cached iteration error + // Disable the TrySeekUsingNext optimization next time. + i.didNotPositionOnLastSeekGE = true + return nil + } key = i.secondLevel.lower } } + if i.didNotPositionOnLastSeekGE { + flags = flags.DisableTrySeekUsingNext() + i.didNotPositionOnLastSeekGE = false + } // NOTE: prefix is only used for bloom filter checking and not later work in // this method. Hence, we can use the existing iterator position if the last @@ -385,8 +399,7 @@ func (i *twoLevelIterator[D, PD]) SeekPrefixGE( // The twoLevelIterator could be already exhausted. Utilize that when // trySeekUsingNext is true. See the comment about data-exhausted, PGDE, and // bounds-exhausted near the top of the file. - filterUsedAndDidNotMatch := i.useFilterBlock && !i.lastBloomFilterMatched - if flags.TrySeekUsingNext() && !filterUsedAndDidNotMatch && + if flags.TrySeekUsingNext() && (i.secondLevel.exhaustedBounds == +1 || (PD(&i.secondLevel.data).IsDataInvalidated() && i.secondLevel.index.IsDataInvalidated())) && err == nil { // Already exhausted, so return nil. @@ -395,11 +408,6 @@ func (i *twoLevelIterator[D, PD]) SeekPrefixGE( // Check prefix bloom filter. if i.useFilterBlock { - if !i.lastBloomFilterMatched { - // Iterator is not positioned based on last seek. - flags = flags.DisableTrySeekUsingNext() - } - i.lastBloomFilterMatched = false var mayContain bool mayContain, i.secondLevel.err = i.secondLevel.bloomFilterMayContain(prefix) if i.secondLevel.err != nil || !mayContain { @@ -409,9 +417,10 @@ func (i *twoLevelIterator[D, PD]) SeekPrefixGE( // since the caller was allowed to call Next when SeekPrefixGE returned // nil. This is no longer allowed. PD(&i.secondLevel.data).Invalidate() + // Disable the TrySeekUsingNext optimization next time. + i.didNotPositionOnLastSeekGE = true return nil } - i.lastBloomFilterMatched = true } // Bloom filter matches. diff --git a/sstable/testdata/virtual_reader_iter b/sstable/testdata/virtual_reader_iter index 1c23a66bbb..8c634d074f 100644 --- a/sstable/testdata/virtual_reader_iter +++ b/sstable/testdata/virtual_reader_iter @@ -77,6 +77,11 @@ bounds: [dd#5,SET-ddd#6,SET] # Check lower bound enforcement during SeekPrefixGE. iter seek-prefix-ge d +---- +. + +iter +seek-prefix-ge dd next next ---- @@ -292,6 +297,11 @@ bounds: [dd#5,SET-ddd#6,SET] # Check lower bound enforcement during SeekPrefixGE. iter seek-prefix-ge d +---- +. + +iter +seek-prefix-ge dd next next ----