diff --git a/external_iterator.go b/external_iterator.go index c16c5f9501..974d9989f2 100644 --- a/external_iterator.go +++ b/external_iterator.go @@ -5,6 +5,9 @@ package pebble import ( + "fmt" + "sort" + "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble/internal/base" "github.com/cockroachdb/pebble/internal/keyspan" @@ -12,12 +15,59 @@ import ( "github.com/cockroachdb/pebble/sstable" ) -// NewExternalIter takes an input set of sstable files which may overlap -// arbitrarily and returns an Iterator over the merged contents of the sstables. +// ExternalIterOption provide an interface to specify open-time options to +// NewExternalIter. +type ExternalIterOption interface { + // iterApply is called on the iterator during opening in order to set internal + // parameters. + iterApply(*Iterator) + // readerOptions returns any reader options added by this iter option. + readerOptions() []sstable.ReaderOption +} + +type externalIterReaderOptions struct { + opts []sstable.ReaderOption +} + +func (e *externalIterReaderOptions) iterApply(iterator *Iterator) { + // Do nothing. +} + +func (e *externalIterReaderOptions) readerOptions() []sstable.ReaderOption { + return e.opts +} + +// ExternalIterReaderOptions returns an ExternalIterOption that specifies +// sstable.ReaderOptions to be applied on sstable readers in NewExternalIter. +func ExternalIterReaderOptions(opts ...sstable.ReaderOption) ExternalIterOption { + return &externalIterReaderOptions{opts: opts} +} + +// ExternalIterForwardOnly is an ExternalIterOption that specifies this iterator +// will only be used for forward positioning operations (First, SeekGE, Next). +// This could enable optimizations that take advantage of this invariant. +// Behaviour when a reverse positioning operation is done on an iterator +// opened with this option is unpredictable, though in most cases it should. +type ExternalIterForwardOnly struct{} + +func (e ExternalIterForwardOnly) iterApply(iter *Iterator) { + iter.forwardOnly = true +} + +func (e ExternalIterForwardOnly) readerOptions() []sstable.ReaderOption { + return nil +} + +// NewExternalIter takes an input 2d array of sstable files which may overlap +// across subarrays but not within a subarray (at least as far as points are +// concerned; range keys are allowed to overlap arbitrarily even within a +// subarray), and returns an Iterator over the merged contents of the sstables. // Input sstables may contain point keys, range keys, range deletions, etc. The -// input files slice must be sorted in reverse chronological ordering. A key in -// a file at a lower index will shadow a key with an identical user key -// contained within a file at a higher index. +// input files slice must be sorted in reverse chronological ordering. A key in a +// file at a lower index subarray will shadow a key with an identical user key +// contained within a file at a higher index subarray. Each subarray must be +// sorted in internal key order, where lower index files contain keys that sort +// left of files with higher indexes. // // Input sstables must only contain keys with the zero sequence number. // @@ -27,8 +77,8 @@ import ( func NewExternalIter( o *Options, iterOpts *IterOptions, - files []sstable.ReadableFile, - extraReaderOpts ...sstable.ReaderOption, + files [][]sstable.ReadableFile, + extraOpts ...ExternalIterOption, ) (it *Iterator, err error) { if iterOpts != nil { if err := validateExternalIterOpts(iterOpts); err != nil { @@ -36,17 +86,32 @@ func NewExternalIter( } } - var readers []*sstable.Reader + var readers [][]*sstable.Reader // Ensure we close all the opened readers if we error out. defer func() { if err != nil { for i := range readers { - _ = readers[i].Close() + for j := range readers[i] { + _ = readers[i][j].Close() + } } } }() - readers, err = openExternalTables(o, files, o.MakeReaderOptions(), extraReaderOpts...) + seqNumOffset := 0 + var extraReaderOpts []sstable.ReaderOption + for i := range extraOpts { + extraReaderOpts = append(extraReaderOpts, extraOpts[i].readerOptions()...) + } + for _, levelFiles := range files { + seqNumOffset += len(levelFiles) + } + for _, levelFiles := range files { + var subReaders []*sstable.Reader + seqNumOffset -= len(levelFiles) + subReaders, err = openExternalTables(o, levelFiles, seqNumOffset, o.MakeReaderOptions(), extraReaderOpts...) + readers = append(readers, subReaders) + } if err != nil { return nil, err } @@ -83,6 +148,9 @@ func NewExternalIter( dbi.opts = *iterOpts dbi.saveBounds(iterOpts.LowerBound, iterOpts.UpperBound) } + for i := range extraOpts { + extraOpts[i].iterApply(dbi) + } finishInitializingExternal(dbi) return dbi, nil } @@ -117,24 +185,46 @@ func finishInitializingExternal(it *Iterator) { if len(it.externalReaders) > cap(mlevels) { mlevels = make([]mergingIterLevel, 0, len(it.externalReaders)) } - for _, r := range it.externalReaders { - var ( - rangeDelIter keyspan.FragmentIterator - pointIter internalIterator - err error - ) - pointIter, err = r.NewIter(it.opts.LowerBound, it.opts.UpperBound) - if err == nil { - rangeDelIter, err = r.NewRawRangeDelIter() + for _, readers := range it.externalReaders { + var combinedIters []internalIterator + for _, r := range readers { + var ( + rangeDelIter keyspan.FragmentIterator + pointIter internalIterator + err error + ) + pointIter, err = r.NewIter(it.opts.LowerBound, it.opts.UpperBound) + if err == nil { + rangeDelIter, err = r.NewRawRangeDelIter() + } + if err != nil { + pointIter = &errorIter{err: err} + rangeDelIter = &errorKeyspanIter{err: err} + } + if err == nil && rangeDelIter == nil && pointIter != nil && it.forwardOnly { + // TODO(bilal): Consider implementing range key pausing in + // simpleLevelIter so we can reduce mergingIterLevels even more by + // sending all sstable iterators to combinedIters, not just those + // corresponding to sstables without range deletes. + combinedIters = append(combinedIters, pointIter) + continue + } + mlevels = append(mlevels, mergingIterLevel{ + iter: base.WrapIterWithStats(pointIter), + rangeDelIter: rangeDelIter, + }) } - if err != nil { - pointIter = &errorIter{err: err} - rangeDelIter = &errorKeyspanIter{err: err} + if len(combinedIters) > 0 { + sli := &simpleLevelIter{ + cmp: it.cmp, + iters: combinedIters, + } + sli.init(it.opts) + mlevels = append(mlevels, mergingIterLevel{ + iter: base.WrapIterWithStats(sli), + rangeDelIter: nil, + }) } - mlevels = append(mlevels, mergingIterLevel{ - iter: base.WrapIterWithStats(pointIter), - rangeDelIter: rangeDelIter, - }) } it.alloc.merging.init(&it.opts, it.cmp, it.split, mlevels...) it.alloc.merging.snapshot = base.InternalKeySeqNumMax @@ -154,11 +244,22 @@ func finishInitializingExternal(it *Iterator) { it.opts.LowerBound, it.opts.UpperBound, &it.hasPrefix, &it.prefixOrFullSeekKey, ) - for _, r := range it.externalReaders { - if rki, err := r.NewRawRangeKeyIter(); err != nil { - it.rangeKey.iterConfig.AddLevel(&errorKeyspanIter{err: err}) - } else if rki != nil { - it.rangeKey.iterConfig.AddLevel(rki) + // We could take advantage of the lack of overlaps in range keys within + // each slice in it.externalReaders, and generate keyspan.LevelIters + // out of those. However, since range keys are expected to be sparse to + // begin with, the performance gain might not be significant enough to + // warrant it. + // + // TODO(bilal): Explore adding a simpleRangeKeyLevelIter that does not + // operate on FileMetadatas (similar to simpleLevelIter), and implements + // this optimization. + for _, readers := range it.externalReaders { + for _, r := range readers { + if rki, err := r.NewRawRangeKeyIter(); err != nil { + it.rangeKey.iterConfig.AddLevel(&errorKeyspanIter{err: err}) + } else if rki != nil { + it.rangeKey.iterConfig.AddLevel(rki) + } } } } @@ -171,6 +272,7 @@ func finishInitializingExternal(it *Iterator) { func openExternalTables( o *Options, files []sstable.ReadableFile, + seqNumOffset int, readerOpts sstable.ReaderOptions, extraReaderOpts ...sstable.ReaderOption, ) (readers []*sstable.Reader, err error) { @@ -182,8 +284,165 @@ func openExternalTables( } // Use the index of the file in files as the sequence number for all of // its keys. - r.Properties.GlobalSeqNum = uint64(len(files) - i) + r.Properties.GlobalSeqNum = uint64(len(files) - i + seqNumOffset) readers = append(readers, r) } return readers, err } + +// simpleLevelIter is similar to a levelIter in that it merges the points +// from multiple point iterators that are non-overlapping in the key ranges +// they return. It is only expected to support forward iteration and forward +// regular seeking; reverse iteration and prefix seeking is not supported. +// Intended to be a low-overhead, non-FileMetadata dependent option for +// NewExternalIter. To optimize seeking and forward iteration, it maintains +// two slices of child iterators; one of all iterators, and a subset of it that +// contains just the iterators that contain point keys within the current +// bounds. +// +// Note that this levelIter does not support pausing at file boundaries +// in case of range tombstones in this file that could apply to points outside +// of this file (and outside of this level). This is sufficient for optimizing +// the main use cases of NewExternalIter, however for completeness it would make +// sense to build this pausing functionality in. +type simpleLevelIter struct { + cmp Compare + lowerBound []byte + iters []internalIterator + filtered []internalIterator + firstKeys [][]byte + firstKeysBuf []byte + currentIdx int +} + +// init initializes this simpleLevelIter. +func (s *simpleLevelIter) init(opts IterOptions) { + s.currentIdx = 0 + s.lowerBound = opts.LowerBound + s.resetFilteredIters() +} + +func (s *simpleLevelIter) resetFilteredIters() { + s.filtered = s.filtered[:0] + s.firstKeys = s.firstKeys[:0] + s.firstKeysBuf = s.firstKeysBuf[:0] + for i := range s.iters { + var iterKey *base.InternalKey + if s.lowerBound != nil { + iterKey, _ = s.iters[i].SeekGE(s.lowerBound, base.SeekGEFlagsNone) + } else { + iterKey, _ = s.iters[i].First() + } + if iterKey != nil { + s.filtered = append(s.filtered, s.iters[i]) + bufStart := len(s.firstKeysBuf) + s.firstKeysBuf = append(s.firstKeysBuf, iterKey.UserKey...) + s.firstKeys = append(s.firstKeys, s.firstKeysBuf[bufStart:bufStart+len(iterKey.UserKey)]) + } + } +} + +func (s *simpleLevelIter) SeekGE(key []byte, flags base.SeekGEFlags) (*base.InternalKey, []byte) { + // Find the first file that is entirely >= key. The file before that could + // contain the key we're looking for. + n := sort.Search(len(s.firstKeys), func(i int) bool { + return s.cmp(key, s.firstKeys[i]) <= 0 + }) + if n > 0 { + s.currentIdx = n - 1 + } else { + s.currentIdx = n + } + if s.currentIdx < len(s.filtered) { + if iterKey, val := s.filtered[s.currentIdx].SeekGE(key, flags); iterKey != nil { + return iterKey, val + } + s.currentIdx++ + } + return s.skipEmptyFileForward(key, flags) +} + +func (s *simpleLevelIter) skipEmptyFileForward( + seekKey []byte, flags base.SeekGEFlags, +) (*base.InternalKey, []byte) { + var iterKey *base.InternalKey + var val []byte + for s.currentIdx >= 0 && s.currentIdx < len(s.filtered) { + if seekKey != nil { + iterKey, val = s.filtered[s.currentIdx].SeekGE(seekKey, flags) + } else if s.lowerBound != nil { + iterKey, val = s.filtered[s.currentIdx].SeekGE(s.lowerBound, flags) + } else { + iterKey, val = s.filtered[s.currentIdx].First() + } + if iterKey != nil { + return iterKey, val + } + s.currentIdx++ + } + return nil, nil +} + +func (s *simpleLevelIter) SeekPrefixGE( + prefix, key []byte, flags base.SeekGEFlags, +) (*base.InternalKey, []byte) { + panic("unimplemented") +} + +func (s *simpleLevelIter) SeekLT(key []byte, flags base.SeekLTFlags) (*base.InternalKey, []byte) { + panic("unimplemented") +} + +func (s *simpleLevelIter) First() (*base.InternalKey, []byte) { + s.currentIdx = 0 + return s.skipEmptyFileForward(nil /* seekKey */, base.SeekGEFlagsNone) +} + +func (s *simpleLevelIter) Last() (*base.InternalKey, []byte) { + panic("unimplemented") +} + +func (s *simpleLevelIter) Next() (*base.InternalKey, []byte) { + if s.currentIdx < 0 || s.currentIdx >= len(s.filtered) { + return nil, nil + } + if iterKey, val := s.filtered[s.currentIdx].Next(); iterKey != nil { + return iterKey, val + } + s.currentIdx++ + return s.skipEmptyFileForward(nil /* seekKey */, base.SeekGEFlagsNone) +} + +func (s *simpleLevelIter) Prev() (*base.InternalKey, []byte) { + panic("unimplemented") +} + +func (s *simpleLevelIter) Error() error { + return nil +} + +func (s *simpleLevelIter) Close() error { + var err error + for i := range s.iters { + err = firstError(err, s.iters[i].Close()) + } + return err +} + +func (s *simpleLevelIter) SetBounds(lower, upper []byte) { + s.currentIdx = -1 + s.lowerBound = lower + for i := range s.iters { + s.iters[i].SetBounds(lower, upper) + } + s.resetFilteredIters() +} + +func (s *simpleLevelIter) String() string { + if s.currentIdx < 0 || s.currentIdx >= len(s.filtered) { + return "simpleLevelIter: current=" + } + return fmt.Sprintf("simpleLevelIter: current=%s", s.filtered[s.currentIdx]) +} + +var _ internalIterator = &simpleLevelIter{} diff --git a/external_iterator_test.go b/external_iterator_test.go index 134a4de4d8..999bd81f0b 100644 --- a/external_iterator_test.go +++ b/external_iterator_test.go @@ -39,9 +39,12 @@ func TestExternalIterator(t *testing.T) { return "" case "iter": opts := IterOptions{KeyTypes: IterKeyTypePointsAndRanges} - var files []sstable.ReadableFile + var externalIterOpts []ExternalIterOption + var files [][]sstable.ReadableFile for _, arg := range td.CmdArgs { switch arg.Key { + case "fwd-only": + externalIterOpts = append(externalIterOpts, ExternalIterForwardOnly{}) case "mask-suffix": opts.RangeKeyMasking.Suffix = []byte(arg.Vals[0]) case "lower": @@ -52,11 +55,11 @@ func TestExternalIterator(t *testing.T) { for _, v := range arg.Vals { f, err := mem.Open(v) require.NoError(t, err) - files = append(files, f) + files = append(files, []sstable.ReadableFile{f}) } } } - it, err := NewExternalIter(o, &opts, files) + it, err := NewExternalIter(o, &opts, files, externalIterOpts...) require.NoError(t, err) return runIterCmd(td, it, true /* close iter */) default: @@ -64,3 +67,62 @@ func TestExternalIterator(t *testing.T) { } }) } + +func TestSimpleLevelIter(t *testing.T) { + mem := vfs.NewMem() + o := &Options{ + FS: mem, + Comparer: testkeys.Comparer, + FormatMajorVersion: FormatRangeKeys, + } + o.EnsureDefaults() + d, err := Open("", o) + require.NoError(t, err) + defer func() { require.NoError(t, d.Close()) }() + + datadriven.RunTest(t, "testdata/simple_level_iter", func(td *datadriven.TestData) string { + switch td.Cmd { + case "reset": + mem = vfs.NewMem() + return "" + case "build": + if err := runBuildCmd(td, d, mem); err != nil { + return err.Error() + } + return "" + case "iter": + var files []sstable.ReadableFile + for _, arg := range td.CmdArgs { + switch arg.Key { + case "files": + for _, v := range arg.Vals { + f, err := mem.Open(v) + require.NoError(t, err) + files = append(files, f) + } + } + } + readers, err := openExternalTables(o, files, 0, o.MakeReaderOptions()) + require.NoError(t, err) + defer func() { + for i := range readers { + _ = readers[i].Close() + } + }() + var internalIters []internalIterator + for i := range readers { + iter, err := readers[i].NewIter(nil, nil) + require.NoError(t, err) + internalIters = append(internalIters, iter) + } + it := &simpleLevelIter{cmp: o.Comparer.Compare, iters: internalIters} + it.init(IterOptions{}) + + response := runInternalIterCmd(td, it) + require.NoError(t, it.Close()) + return response + default: + return fmt.Sprintf("unknown command: %s", td.Cmd) + } + }) +} diff --git a/iterator.go b/iterator.go index 7bbbfd63d4..ec9a5e104b 100644 --- a/iterator.go +++ b/iterator.go @@ -187,7 +187,7 @@ type Iterator struct { prefixOrFullSeekKey []byte readSampling readSampling stats IteratorStats - externalReaders []*sstable.Reader + externalReaders [][]*sstable.Reader // Following fields used when constructing an iterator stack, eg, in Clone // and SetOptions or when re-fragmenting a batch's range keys/range dels. @@ -235,6 +235,9 @@ type Iterator struct { // Used for deriving the value of SeekPrefixGE(..., trySeekUsingNext), // and SeekGE/SeekLT optimizations lastPositioningOp lastPositioningOpKind + // Used for an optimization in external iterators to reduce the number of + // merging levels. + forwardOnly bool // Used in some tests to disable the random disabling of seek optimizations. forceEnableSeekOpt bool } @@ -1753,8 +1756,10 @@ func (i *Iterator) Close() error { i.readState = nil } - for _, r := range i.externalReaders { - err = firstError(err, r.Close()) + for _, readers := range i.externalReaders { + for _, r := range readers { + err = firstError(err, r.Close()) + } } // Close the closer for the current value if one was open. diff --git a/testdata/external_iterator b/testdata/external_iterator index f694e8f032..76a14740f5 100644 --- a/testdata/external_iterator +++ b/testdata/external_iterator @@ -10,7 +10,7 @@ del-range c z # Test that a delete range in a more recent file shadows keys in an # earlier file. -iter files=(2, 1) +iter files=(2, 1) fwd-only first next ---- @@ -26,7 +26,7 @@ set f f # the rangedel. Since the point keys are assigned a higher sequence # number, they should NOT be shadowed by the rangedel. -iter files=(3, 2, 1) +iter files=(3, 2, 1) fwd-only first next next @@ -49,7 +49,7 @@ build 5 range-key-del b d ---- -iter files=(5, 4, 3, 2, 1) +iter files=(5, 4, 3, 2, 1) fwd-only first next next @@ -111,7 +111,7 @@ build ek range-key-set e k @5 foo ---- -iter files=(ag, ek) +iter files=(ag, ek) fwd-only first next ---- @@ -131,7 +131,7 @@ set k@3 v set p@4 v ---- -iter files=(points, ag, ek) mask-suffix=@7 +iter files=(points, ag, ek) mask-suffix=@7 fwd-only first next next @@ -154,7 +154,7 @@ range-key-set a k @4 bar range-key-set a k @1 bax ---- -iter files=(points, ag, ek, stacked) +iter files=(points, ag, ek, stacked) fwd-only first next ---- @@ -163,7 +163,7 @@ a@4: (v, [a-k) @5=foo, @4=bar, @1=bax) # Test mutating the external iterator's options through SetOptions. -iter files=(points, ag, ek) +iter files=(points, ag, ek) fwd-only set-options key-types=point first next diff --git a/testdata/simple_level_iter b/testdata/simple_level_iter new file mode 100644 index 0000000000..76cd65825a --- /dev/null +++ b/testdata/simple_level_iter @@ -0,0 +1,77 @@ +build 1 +set b b +set c c +---- + + +iter files=(1) +first +next +next +---- +b:b +c:c +. + +build 2 +set d d +set f f +---- + +iter files=(1, 2) +first +next +next +next +---- +b:b +c:c +d:d +f:f + +# Test seeks within files. + +iter files=(1, 2) +seek-ge bb +next +next +next +---- +c:c +d:d +f:f +. + +iter files=(1, 2) +seek-ge a +next +next +next +---- +b:b +c:c +d:d +f:f + +iter files=(1, 2) +seek-ge d +next +next +---- +d:d +f:f +. + +iter files=(1, 2) +seek-ge f +next +---- +f:f +. + +iter files=(1, 2) +seek-ge ff +next +---- +. +.