diff --git a/batch.go b/batch.go index 65d5d3e74c..556874e2eb 100644 --- a/batch.go +++ b/batch.go @@ -5,6 +5,7 @@ package pebble import ( + "bytes" "context" "encoding/binary" "fmt" @@ -1686,7 +1687,16 @@ func (i *batchIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.InternalKV } func (i *batchIter) SeekPrefixGE(prefix, key []byte, flags base.SeekGEFlags) *base.InternalKV { - return i.SeekGE(key, flags) + kv := i.SeekGE(key, flags) + if kv == nil { + return nil + } + // If the key doesn't have the sought prefix, return nil. + if !bytes.Equal(i.batch.comparer.Split.Prefix(kv.K.UserKey), prefix) { + i.kv = base.InternalKV{} + return nil + } + return kv } func (i *batchIter) SeekLT(key []byte, flags base.SeekLTFlags) *base.InternalKV { @@ -2160,7 +2170,15 @@ func (i *flushableBatchIter) SeekGE(key []byte, flags base.SeekGEFlags) *base.In func (i *flushableBatchIter) SeekPrefixGE( prefix, key []byte, flags base.SeekGEFlags, ) *base.InternalKV { - return i.SeekGE(key, flags) + kv := i.SeekGE(key, flags) + if kv == nil { + return nil + } + // If the key doesn't have the sought prefix, return nil. + if !bytes.Equal(i.batch.comparer.Split.Prefix(kv.K.UserKey), prefix) { + return nil + } + return kv } // SeekLT implements internalIterator.SeekLT, as documented in the pebble diff --git a/data_test.go b/data_test.go index c5d569f515..59c06a994f 100644 --- a/data_test.go +++ b/data_test.go @@ -339,7 +339,7 @@ func printIterState( hasPoint, hasRange := iter.HasPointAndRange() fmt.Fprintf(b, "%s:%s (", iter.Key(), validityStateStr) if hasPoint { - fmt.Fprintf(b, "%s, ", iter.Value()) + fmt.Fprintf(b, "%s, ", formatASCIIValue(iter.Value())) } else { fmt.Fprint(b, "., ") } @@ -388,13 +388,24 @@ func formatASCIIKey(b []byte) string { return string(b) } +func formatASCIIValue(b []byte) string { + if len(b) > 1<<10 { + return fmt.Sprintf("[LARGE VALUE len=%d]", len(b)) + } + if bytes.IndexFunc(b, func(r rune) bool { return r < '!' || r > 'z' }) != -1 { + // This key is not just legible ASCII characters. Quote it. + return fmt.Sprintf("%q", b) + } + return string(b) +} + func writeRangeKeys(b io.Writer, iter *Iterator) { rangeKeys := iter.RangeKeys() for j := 0; j < len(rangeKeys); j++ { if j > 0 { fmt.Fprint(b, ",") } - fmt.Fprintf(b, " %s=%s", rangeKeys[j].Suffix, rangeKeys[j].Value) + fmt.Fprintf(b, " %s=%s", rangeKeys[j].Suffix, formatASCIIValue(rangeKeys[j].Value)) } } diff --git a/iterator_histories_test.go b/iterator_histories_test.go index 89b70760e3..2e51fa2f79 100644 --- a/iterator_histories_test.go +++ b/iterator_histories_test.go @@ -192,6 +192,16 @@ func TestIterHistories(t *testing.T) { return err.Error() } return "" + case "disable-flushes": + d.mu.Lock() + d.mu.compact.flushing = true + d.mu.Unlock() + return "" + case "enable-flushes": + d.mu.Lock() + d.mu.compact.flushing = false + d.mu.Unlock() + return "" case "get": var reader Reader = d if arg, ok := td.Arg("reader"); ok { diff --git a/testdata/iter_histories/prefix_iteration b/testdata/iter_histories/prefix_iteration index 8816b62719..19b24c5743 100644 --- a/testdata/iter_histories/prefix_iteration +++ b/testdata/iter_histories/prefix_iteration @@ -382,3 +382,76 @@ stats ---- . stats: seeked 1 times (1 internal); stepped 0 times (0 internal) + +# Test that a prefix seek through a batch iterator that enforces the prefix +# strictly. + +reset +---- + +batch name=foo +set b@1 b@1 +set d@9 d@9 +set g@4 g@4 +set e@2 e@2 +---- +wrote 4 keys to batch "foo" + +# The stats should indicate only 3 KVs were ever surfaced to the merging iterator. + +combined-iter reader=foo name=fooiter +seek-prefix-ge b@10 +seek-prefix-ge c@10 +seek-prefix-ge d@10 +seek-prefix-ge g@2 +seek-prefix-ge e@2 +stats +---- +b@1: (b@1, .) +. +d@9: (d@9, .) +. +e@2: (e@2, .) +stats: seeked 5 times (5 internal); stepped 0 times (0 internal); blocks: 0B cached; points: 3 (9B keys, 9B values) + +# Test the above case but with a large committed batch (which should be a +# flushableBatchIter). + +define memtable-size=65536 +---- + +# We diable flushes to avoid scheduling a flush that might race with our +# iterator. If the iterator observed the state after the large batch has been +# flushed to sstables, we would see nonzero block bytes appear in the iterator +# stats. +disable-flushes +---- + +batch commit +set b@1 +set d@9 +set g@4 +set e@2 +---- +committed 4 keys + +lsm +---- + +combined-iter +seek-prefix-ge b@10 +seek-prefix-ge c@10 +seek-prefix-ge d@10 +seek-prefix-ge g@2 +seek-prefix-ge e@2 +stats +---- +b@1: ([LARGE VALUE len=10000], .) +. +d@9: ([LARGE VALUE len=10000], .) +. +e@2: ([LARGE VALUE len=10000], .) +stats: seeked 5 times (5 internal); stepped 0 times (0 internal); blocks: 0B cached; points: 3 (9B keys, 29KB values) + +enable-flushes +----