Skip to content

Commit

Permalink
*: add simpleLevelIterator, reduce merging levels in external iter
Browse files Browse the repository at this point in the history
Currently, we create a new merging iterator level for each file
passed into NewExternalIter. This is unnecessary for most use-cases
of creating ExternalIters around lots of sstables, as we can externally
guarantee that many of those sstables won't have overlapping points
with each other. We can have the caller pass this knowledge
by specifying a [][]sstable.ReadableFile where each sub-slice
obeys level invariants for files within it, and is also already
sorted by user keys.

This change makes the interface change to allow for the above
optimization, and also adds a `simpleLevelIter` that implements
forward iteration within a single "level". For files that don't
contain range deletes, we shove all the point iters into one
`simpleLevelIter`, greatly reducing merging iterator levels
and speeding up its operations by a lot.

Fixes cockroachdb/cockroach#83051.
  • Loading branch information
itsbilal committed Aug 19, 2022
1 parent cd7f076 commit 406c1dc
Show file tree
Hide file tree
Showing 5 changed files with 448 additions and 45 deletions.
323 changes: 291 additions & 32 deletions external_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,69 @@
package pebble

import (
"fmt"
"sort"

"github.com/cockroachdb/errors"
"github.com/cockroachdb/pebble/internal/base"
"github.com/cockroachdb/pebble/internal/keyspan"
"github.com/cockroachdb/pebble/internal/manifest"
"github.com/cockroachdb/pebble/sstable"
)

// NewExternalIter takes an input set of sstable files which may overlap
// arbitrarily and returns an Iterator over the merged contents of the sstables.
// ExternalIterOption provide an interface to specify open-time options to
// NewExternalIter.
type ExternalIterOption interface {
// iterApply is called on the iterator during opening in order to set internal
// parameters.
iterApply(*Iterator)
// readerOptions returns any reader options added by this iter option.
readerOptions() []sstable.ReaderOption
}

type externalIterReaderOptions struct {
opts []sstable.ReaderOption
}

func (e *externalIterReaderOptions) iterApply(iterator *Iterator) {
// Do nothing.
}

func (e *externalIterReaderOptions) readerOptions() []sstable.ReaderOption {
return e.opts
}

// ExternalIterReaderOptions returns an ExternalIterOption that specifies
// sstable.ReaderOptions to be applied on sstable readers in NewExternalIter.
func ExternalIterReaderOptions(opts ...sstable.ReaderOption) ExternalIterOption {
return &externalIterReaderOptions{opts: opts}
}

// ExternalIterForwardOnly is an ExternalIterOption that specifies this iterator
// will only be used for forward positioning operations (First, SeekGE, Next).
// This could enable optimizations that take advantage of this invariant.
// Behaviour when a reverse positioning operation is done on an iterator
// opened with this option is unpredictable, though in most cases it should.
type ExternalIterForwardOnly struct{}

func (e ExternalIterForwardOnly) iterApply(iter *Iterator) {
iter.forwardOnly = true
}

func (e ExternalIterForwardOnly) readerOptions() []sstable.ReaderOption {
return nil
}

// NewExternalIter takes an input 2d array of sstable files which may overlap
// across subarrays but not within a subarray (at least as far as points are
// concerned; range keys are allowed to overlap arbitrarily even within a
// subarray), and returns an Iterator over the merged contents of the sstables.
// Input sstables may contain point keys, range keys, range deletions, etc. The
// input files slice must be sorted in reverse chronological ordering. A key in
// a file at a lower index will shadow a key with an identical user key
// contained within a file at a higher index.
// input files slice must be sorted in reverse chronological ordering. A key in a
// file at a lower index subarray will shadow a key with an identical user key
// contained within a file at a higher index subarray. Each subarray must be
// sorted in internal key order, where lower index files contain keys that sort
// left of files with higher indexes.
//
// Input sstables must only contain keys with the zero sequence number.
//
Expand All @@ -27,26 +77,41 @@ import (
func NewExternalIter(
o *Options,
iterOpts *IterOptions,
files []sstable.ReadableFile,
extraReaderOpts ...sstable.ReaderOption,
files [][]sstable.ReadableFile,
extraOpts ...ExternalIterOption,
) (it *Iterator, err error) {
if iterOpts != nil {
if err := validateExternalIterOpts(iterOpts); err != nil {
return nil, err
}
}

var readers []*sstable.Reader
var readers [][]*sstable.Reader

// Ensure we close all the opened readers if we error out.
defer func() {
if err != nil {
for i := range readers {
_ = readers[i].Close()
for j := range readers[i] {
_ = readers[i][j].Close()
}
}
}
}()
readers, err = openExternalTables(o, files, o.MakeReaderOptions(), extraReaderOpts...)
seqNumOffset := 0
var extraReaderOpts []sstable.ReaderOption
for i := range extraOpts {
extraReaderOpts = append(extraReaderOpts, extraOpts[i].readerOptions()...)
}
for _, levelFiles := range files {
seqNumOffset += len(levelFiles)
}
for _, levelFiles := range files {
var subReaders []*sstable.Reader
seqNumOffset -= len(levelFiles)
subReaders, err = openExternalTables(o, levelFiles, seqNumOffset, o.MakeReaderOptions(), extraReaderOpts...)
readers = append(readers, subReaders)
}
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -83,6 +148,9 @@ func NewExternalIter(
dbi.opts = *iterOpts
dbi.saveBounds(iterOpts.LowerBound, iterOpts.UpperBound)
}
for i := range extraOpts {
extraOpts[i].iterApply(dbi)
}
finishInitializingExternal(dbi)
return dbi, nil
}
Expand Down Expand Up @@ -117,24 +185,46 @@ func finishInitializingExternal(it *Iterator) {
if len(it.externalReaders) > cap(mlevels) {
mlevels = make([]mergingIterLevel, 0, len(it.externalReaders))
}
for _, r := range it.externalReaders {
var (
rangeDelIter keyspan.FragmentIterator
pointIter internalIterator
err error
)
pointIter, err = r.NewIter(it.opts.LowerBound, it.opts.UpperBound)
if err == nil {
rangeDelIter, err = r.NewRawRangeDelIter()
for _, readers := range it.externalReaders {
var combinedIters []internalIterator
for _, r := range readers {
var (
rangeDelIter keyspan.FragmentIterator
pointIter internalIterator
err error
)
pointIter, err = r.NewIter(it.opts.LowerBound, it.opts.UpperBound)
if err == nil {
rangeDelIter, err = r.NewRawRangeDelIter()
}
if err != nil {
pointIter = &errorIter{err: err}
rangeDelIter = &errorKeyspanIter{err: err}
}
if err == nil && rangeDelIter == nil && pointIter != nil && it.forwardOnly {
// TODO(bilal): Consider implementing range key pausing in
// simpleLevelIter so we can reduce mergingIterLevels even more by
// sending all sstable iterators to combinedIters, not just those
// corresponding to sstables without range deletes.
combinedIters = append(combinedIters, pointIter)
continue
}
mlevels = append(mlevels, mergingIterLevel{
iter: base.WrapIterWithStats(pointIter),
rangeDelIter: rangeDelIter,
})
}
if err != nil {
pointIter = &errorIter{err: err}
rangeDelIter = &errorKeyspanIter{err: err}
if len(combinedIters) > 0 {
sli := &simpleLevelIter{
cmp: it.cmp,
iters: combinedIters,
}
sli.init(it.opts)
mlevels = append(mlevels, mergingIterLevel{
iter: base.WrapIterWithStats(sli),
rangeDelIter: nil,
})
}
mlevels = append(mlevels, mergingIterLevel{
iter: base.WrapIterWithStats(pointIter),
rangeDelIter: rangeDelIter,
})
}
it.alloc.merging.init(&it.opts, it.cmp, it.split, mlevels...)
it.alloc.merging.snapshot = base.InternalKeySeqNumMax
Expand All @@ -154,11 +244,22 @@ func finishInitializingExternal(it *Iterator) {
it.opts.LowerBound, it.opts.UpperBound,
&it.hasPrefix, &it.prefixOrFullSeekKey,
)
for _, r := range it.externalReaders {
if rki, err := r.NewRawRangeKeyIter(); err != nil {
it.rangeKey.iterConfig.AddLevel(&errorKeyspanIter{err: err})
} else if rki != nil {
it.rangeKey.iterConfig.AddLevel(rki)
// We could take advantage of the lack of overlaps in range keys within
// each slice in it.externalReaders, and generate keyspan.LevelIters
// out of those. However, since range keys are expected to be sparse to
// begin with, the performance gain might not be significant enough to
// warrant it.
//
// TODO(bilal): Explore adding a simpleRangeKeyLevelIter that does not
// operate on FileMetadatas (similar to simpleLevelIter), and implements
// this optimization.
for _, readers := range it.externalReaders {
for _, r := range readers {
if rki, err := r.NewRawRangeKeyIter(); err != nil {
it.rangeKey.iterConfig.AddLevel(&errorKeyspanIter{err: err})
} else if rki != nil {
it.rangeKey.iterConfig.AddLevel(rki)
}
}
}
}
Expand All @@ -171,6 +272,7 @@ func finishInitializingExternal(it *Iterator) {
func openExternalTables(
o *Options,
files []sstable.ReadableFile,
seqNumOffset int,
readerOpts sstable.ReaderOptions,
extraReaderOpts ...sstable.ReaderOption,
) (readers []*sstable.Reader, err error) {
Expand All @@ -182,8 +284,165 @@ func openExternalTables(
}
// Use the index of the file in files as the sequence number for all of
// its keys.
r.Properties.GlobalSeqNum = uint64(len(files) - i)
r.Properties.GlobalSeqNum = uint64(len(files) - i + seqNumOffset)
readers = append(readers, r)
}
return readers, err
}

// simpleLevelIter is similar to a levelIter in that it merges the points
// from multiple point iterators that are non-overlapping in the key ranges
// they return. It is only expected to support forward iteration and forward
// regular seeking; reverse iteration and prefix seeking is not supported.
// Intended to be a low-overhead, non-FileMetadata dependent option for
// NewExternalIter. To optimize seeking and forward iteration, it maintains
// two slices of child iterators; one of all iterators, and a subset of it that
// contains just the iterators that contain point keys within the current
// bounds.
//
// Note that this levelIter does not support pausing at file boundaries
// in case of range tombstones in this file that could apply to points outside
// of this file (and outside of this level). This is sufficient for optimizing
// the main use cases of NewExternalIter, however for completeness it would make
// sense to build this pausing functionality in.
type simpleLevelIter struct {
cmp Compare
lowerBound []byte
iters []internalIterator
filtered []internalIterator
firstKeys [][]byte
firstKeysBuf []byte
currentIdx int
}

// init initializes this simpleLevelIter.
func (s *simpleLevelIter) init(opts IterOptions) {
s.currentIdx = 0
s.lowerBound = opts.LowerBound
s.resetFilteredIters()
}

func (s *simpleLevelIter) resetFilteredIters() {
s.filtered = s.filtered[:0]
s.firstKeys = s.firstKeys[:0]
s.firstKeysBuf = s.firstKeysBuf[:0]
for i := range s.iters {
var iterKey *base.InternalKey
if s.lowerBound != nil {
iterKey, _ = s.iters[i].SeekGE(s.lowerBound, base.SeekGEFlagsNone)
} else {
iterKey, _ = s.iters[i].First()
}
if iterKey != nil {
s.filtered = append(s.filtered, s.iters[i])
bufStart := len(s.firstKeysBuf)
s.firstKeysBuf = append(s.firstKeysBuf, iterKey.UserKey...)
s.firstKeys = append(s.firstKeys, s.firstKeysBuf[bufStart:bufStart+len(iterKey.UserKey)])
}
}
}

func (s *simpleLevelIter) SeekGE(key []byte, flags base.SeekGEFlags) (*base.InternalKey, []byte) {
// Find the first file that is entirely >= key. The file before that could
// contain the key we're looking for.
n := sort.Search(len(s.firstKeys), func(i int) bool {
return s.cmp(key, s.firstKeys[i]) <= 0
})
if n > 0 {
s.currentIdx = n - 1
} else {
s.currentIdx = n
}
if s.currentIdx < len(s.filtered) {
if iterKey, val := s.filtered[s.currentIdx].SeekGE(key, flags); iterKey != nil {
return iterKey, val
}
s.currentIdx++
}
return s.skipEmptyFileForward(key, flags)
}

func (s *simpleLevelIter) skipEmptyFileForward(
seekKey []byte, flags base.SeekGEFlags,
) (*base.InternalKey, []byte) {
var iterKey *base.InternalKey
var val []byte
for s.currentIdx >= 0 && s.currentIdx < len(s.filtered) {
if seekKey != nil {
iterKey, val = s.filtered[s.currentIdx].SeekGE(seekKey, flags)
} else if s.lowerBound != nil {
iterKey, val = s.filtered[s.currentIdx].SeekGE(s.lowerBound, flags)
} else {
iterKey, val = s.filtered[s.currentIdx].First()
}
if iterKey != nil {
return iterKey, val
}
s.currentIdx++
}
return nil, nil
}

func (s *simpleLevelIter) SeekPrefixGE(
prefix, key []byte, flags base.SeekGEFlags,
) (*base.InternalKey, []byte) {
panic("unimplemented")
}

func (s *simpleLevelIter) SeekLT(key []byte, flags base.SeekLTFlags) (*base.InternalKey, []byte) {
panic("unimplemented")
}

func (s *simpleLevelIter) First() (*base.InternalKey, []byte) {
s.currentIdx = 0
return s.skipEmptyFileForward(nil /* seekKey */, base.SeekGEFlagsNone)
}

func (s *simpleLevelIter) Last() (*base.InternalKey, []byte) {
panic("unimplemented")
}

func (s *simpleLevelIter) Next() (*base.InternalKey, []byte) {
if s.currentIdx < 0 || s.currentIdx >= len(s.filtered) {
return nil, nil
}
if iterKey, val := s.filtered[s.currentIdx].Next(); iterKey != nil {
return iterKey, val
}
s.currentIdx++
return s.skipEmptyFileForward(nil /* seekKey */, base.SeekGEFlagsNone)
}

func (s *simpleLevelIter) Prev() (*base.InternalKey, []byte) {
panic("unimplemented")
}

func (s *simpleLevelIter) Error() error {
return nil
}

func (s *simpleLevelIter) Close() error {
var err error
for i := range s.iters {
err = firstError(err, s.iters[i].Close())
}
return err
}

func (s *simpleLevelIter) SetBounds(lower, upper []byte) {
s.currentIdx = -1
s.lowerBound = lower
for i := range s.iters {
s.iters[i].SetBounds(lower, upper)
}
s.resetFilteredIters()
}

func (s *simpleLevelIter) String() string {
if s.currentIdx < 0 || s.currentIdx >= len(s.filtered) {
return "simpleLevelIter: current=<nil>"
}
return fmt.Sprintf("simpleLevelIter: current=%s", s.filtered[s.currentIdx])
}

var _ internalIterator = &simpleLevelIter{}
Loading

0 comments on commit 406c1dc

Please sign in to comment.