Skip to content

Commit

Permalink
Introduce fast merge iterator
Browse files Browse the repository at this point in the history
This commit introduces fast merge iterator which uses a tree-based
structure instead of a heap to maintain all the iterators. This
implementation is about 2X faster than the heap implementation.

### Benchmarks
master-x.txt and new-x.txt are the result of `ReadMerged`
benchmarks with x tables. Each table contains 10e6 entries.
```
benchstat master-5.txt new-5.txt
name           old time/op  new time/op  delta
ReadMerged-16   826ms ± 1%   470ms ± 2%  -43.14%  (p=0.008 n=5+5)

benchstat master-16.txt new-16.txt
name           old time/op  new time/op  delta
ReadMerged-16   4.22s ± 1%   2.08s ± 2%  -50.72%  (p=0.008 n=5+5)

benchstat master-32.txt new-32.txt
name           old time/op  new time/op  delta
ReadMerged-16   10.6s ± 3%    4.8s ± 2%  -54.58%  (p=0.008 n=5+5)

benchstat master-64.txt new-64.txt
name           old time/op  new time/op  delta
ReadMerged-16   25.1s ± 1%   11.0s ± 2%  -55.94%  (p=0.008 n=5+5)

benchstat master-128.txt new-128.txt
name           old time/op  new time/op  delta
ReadMerged-16   58.5s ± 1%   25.8s ± 2%  -55.95%  (p=0.016 n=4+5)

```

Inspired by 
pingcap@d04b5eb
  • Loading branch information
Ibrahim Jarif authored Oct 21, 2019
1 parent 385da91 commit 73ea6e6
Show file tree
Hide file tree
Showing 6 changed files with 375 additions and 241 deletions.
5 changes: 3 additions & 2 deletions iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ var DefaultIteratorOptions = IteratorOptions{

// Iterator helps iterating over the KV pairs in a lexicographically sorted order.
type Iterator struct {
iitr *y.MergeIterator
iitr y.Iterator
txn *Txn
readTs uint64

Expand Down Expand Up @@ -466,9 +466,10 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator {
iters = append(iters, tables[i].NewUniIterator(opt.Reverse))
}
iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references.

res := &Iterator{
txn: txn,
iitr: y.NewMergeIterator(iters, opt.Reverse),
iitr: table.NewMergeIterator(iters, opt.Reverse),
opt: opt,
readTs: txn.readTs,
}
Expand Down
2 changes: 1 addition & 1 deletion levels.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,7 @@ func (s *levelsController) compactBuildTables(
valid = append(valid, table)
}
iters = append(iters, table.NewConcatIterator(valid, false))
it := y.NewMergeIterator(iters, false)
it := table.NewMergeIterator(iters, false)
defer it.Close() // Important to close the iterator to do ref counting.

it.Rewind()
Expand Down
213 changes: 213 additions & 0 deletions table/merge_iterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
/*
* Copyright 2019 Dgraph Labs, Inc. and Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package table

import (
"github.com/dgraph-io/badger/y"
"github.com/pkg/errors"
)

// MergeIterator merges multiple iterators.
// NOTE: MergeIterator owns the array of iterators and is responsible for closing them.
type MergeIterator struct {
left node
right node
small *node
reverse bool
}

type node struct {
valid bool
key []byte
iter y.Iterator

// The two iterators are type asserted from `y.Iterator`, used to inline more function calls.
// Calling functions on concrete types is much faster (about 25-30%) than calling the
// interface's function.
merge *MergeIterator
concat *ConcatIterator
}

func (n *node) setIterator(iter y.Iterator) {
n.iter = iter
// It's okay if the type assertion below fails and n.merge/n.concat are set to nil.
// We handle the nil values of merge and concat in all the methods.
n.merge, _ = iter.(*MergeIterator)
n.concat, _ = iter.(*ConcatIterator)
}

func (n *node) setKey() {
if n.merge != nil {
n.valid = n.merge.small.valid
if n.valid {
n.key = n.merge.small.key
}
} else if n.concat != nil {
n.valid = n.concat.Valid()
if n.valid {
n.key = n.concat.Key()
}
} else {
n.valid = n.iter.Valid()
if n.valid {
n.key = n.iter.Key()
}
}
}

func (n *node) next() {
if n.merge != nil {
n.merge.Next()
} else if n.concat != nil {
n.concat.Next()
} else {
n.iter.Next()
}
n.setKey()
}

func (n *node) rewind() {
n.iter.Rewind()
n.setKey()
}

func (n *node) seek(key []byte) {
n.iter.Seek(key)
n.setKey()
}

func (mi *MergeIterator) fix() {
if !mi.bigger().valid {
return
}
if !mi.small.valid {
mi.swapSmall()
return
}
cmp := y.CompareKeys(mi.small.key, mi.bigger().key)
// Both the keys are equal.
if cmp == 0 {
// In case of same keys, move the right iterator ahead.
mi.right.next()
if &mi.right == mi.small {
mi.swapSmall()
}
return
} else if cmp < 0 { // Small is less than bigger().
if mi.reverse {
mi.swapSmall()
} else {
// we don't need to do anything. Small already points to the smallest.
}
return
} else { // bigger() is less than small.
if mi.reverse {
// Do nothing since we're iterating in reverse. Small currently points to
// the bigger key and that's okay in reverse iteration.
} else {
mi.swapSmall()
}
return
}
}

func (mi *MergeIterator) bigger() *node {
if mi.small == &mi.left {
return &mi.right
}
return &mi.left
}

func (mi *MergeIterator) swapSmall() {
if mi.small == &mi.left {
mi.small = &mi.right
return
}
if mi.small == &mi.right {
mi.small = &mi.left
return
}
}

// Next returns the next element. If it is the same as the current key, ignore it.
func (mi *MergeIterator) Next() {
mi.small.next()
mi.fix()
}

// Rewind seeks to first element (or last element for reverse iterator).
func (mi *MergeIterator) Rewind() {
mi.left.rewind()
mi.right.rewind()
mi.fix()
}

// Seek brings us to element with key >= given key.
func (mi *MergeIterator) Seek(key []byte) {
mi.left.seek(key)
mi.right.seek(key)
mi.fix()
}

// Valid returns whether the MergeIterator is at a valid element.
func (mi *MergeIterator) Valid() bool {
return mi.small.valid
}

// Key returns the key associated with the current iterator.
func (mi *MergeIterator) Key() []byte {
return mi.small.key
}

// Value returns the value associated with the iterator.
func (mi *MergeIterator) Value() y.ValueStruct {
return mi.small.iter.Value()
}

// Close implements y.Iterator.
func (mi *MergeIterator) Close() error {
err1 := mi.left.iter.Close()
err2 := mi.right.iter.Close()
if err1 != nil {
return errors.Wrap(err1, "MergeIterator")
}
return errors.Wrap(err2, "MergeIterator")
}

// NewMergeIterator creates a merge iterator.
func NewMergeIterator(iters []y.Iterator, reverse bool) y.Iterator {
if len(iters) == 0 {
return nil
} else if len(iters) == 1 {
return iters[0]
} else if len(iters) == 2 {
mi := &MergeIterator{
reverse: reverse,
}
mi.left.setIterator(iters[0])
mi.right.setIterator(iters[1])
// Assign left iterator randomly. This will be fixed when user calls rewind/seek.
mi.small = &mi.left
return mi
}
mid := len(iters) / 2
return NewMergeIterator(
[]y.Iterator{
NewMergeIterator(iters[:mid], reverse),
NewMergeIterator(iters[mid:], reverse),
}, reverse)
}
Loading

0 comments on commit 73ea6e6

Please sign in to comment.