Skip to content

Commit

Permalink
Fix merge iterator duplicates issue (#1157)
Browse files Browse the repository at this point in the history
Fixes: #1153

The tree-based merge iterator would not skip duplicate keys.
This commit fixes it. 
If merge iterator consists of 3 iterators
```
it1 = [1, 1, 1]
it2 = [1, 1, 1, 1, 2]
it3 = [1, 1, 1, 1, 2]
```
The result should be
```
[1, 2]
```
  • Loading branch information
Ibrahim Jarif authored Dec 18, 2019
1 parent cfa3870 commit 779d9a0
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 19 deletions.
26 changes: 21 additions & 5 deletions table/merge_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,20 @@
package table

import (
"bytes"

"github.com/dgraph-io/badger/v2/y"
"github.com/pkg/errors"
)

// MergeIterator merges multiple iterators.
// NOTE: MergeIterator owns the array of iterators and is responsible for closing them.
type MergeIterator struct {
left node
right node
small *node
left node
right node
small *node

curKey []byte
reverse bool
}

Expand Down Expand Up @@ -145,22 +149,34 @@ func (mi *MergeIterator) swapSmall() {

// Next returns the next element. If it is the same as the current key, ignore it.
func (mi *MergeIterator) Next() {
mi.small.next()
mi.fix()
for mi.Valid() {
if !bytes.Equal(mi.small.key, mi.curKey) {
break
}
mi.small.next()
mi.fix()
}
mi.setCurrent()
}

func (mi *MergeIterator) setCurrent() {
mi.curKey = append(mi.curKey[:0], mi.small.key...)
}

// Rewind seeks to first element (or last element for reverse iterator).
func (mi *MergeIterator) Rewind() {
mi.left.rewind()
mi.right.rewind()
mi.fix()
mi.setCurrent()
}

// Seek brings us to element with key >= given key.
func (mi *MergeIterator) Seek(key []byte) {
mi.left.seek(key)
mi.right.seek(key)
mi.fix()
mi.setCurrent()
}

// Valid returns whether the MergeIterator is at a valid element.
Expand Down
92 changes: 78 additions & 14 deletions table/merge_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,28 +164,60 @@ func TestMergeMore(t *testing.T) {
it3 := newSimpleIterator([]string{"1"}, []string{"c1"}, false)
it4 := newSimpleIterator([]string{"1", "7", "9"}, []string{"d1", "d7", "d9"}, false)
t.Run("forward", func(t *testing.T) {
mergeIt := NewMergeIterator([]y.Iterator{it, it2, it3, it4}, false)
expectedKeys := []string{"1", "2", "3", "5", "7", "9"}
expectedVals := []string{"a1", "b2", "a3", "b5", "a7", "d9"}
mergeIt.Rewind()
k, v := getAll(mergeIt)
require.EqualValues(t, expectedKeys, k)
require.EqualValues(t, expectedVals, v)
closeAndCheck(t, mergeIt, 4)
t.Run("no duplicates", func(t *testing.T) {
mergeIt := NewMergeIterator([]y.Iterator{it, it2, it3, it4}, false)
mergeIt.Rewind()
k, v := getAll(mergeIt)
require.EqualValues(t, expectedKeys, k)
require.EqualValues(t, expectedVals, v)
closeAndCheck(t, mergeIt, 4)
})
t.Run("duplicates", func(t *testing.T) {
it5 := newSimpleIterator(
[]string{"1", "1", "3", "7"},
[]string{"a1", "a1-1", "a3", "a7"},
false)
mergeIt := NewMergeIterator([]y.Iterator{it5, it2, it3, it4}, false)
expectedKeys := []string{"1", "2", "3", "5", "7", "9"}
expectedVals := []string{"a1", "b2", "a3", "b5", "a7", "d9"}
mergeIt.Rewind()
k, v := getAll(mergeIt)
require.EqualValues(t, expectedKeys, k)
require.EqualValues(t, expectedVals, v)
closeAndCheck(t, mergeIt, 4)
})
})
t.Run("reverse", func(t *testing.T) {
it.reversed = true
it2.reversed = true
it3.reversed = true
it4.reversed = true
mergeIt := NewMergeIterator([]y.Iterator{it, it2, it3, it4}, true)
expectedKeys := []string{"9", "7", "5", "3", "2", "1"}
expectedVals := []string{"d9", "a7", "b5", "a3", "b2", "a1"}
mergeIt.Rewind()
k, v := getAll(mergeIt)
require.EqualValues(t, expectedKeys, k)
require.EqualValues(t, expectedVals, v)
closeAndCheck(t, mergeIt, 4)
t.Run("no duplicates", func(t *testing.T) {
mergeIt := NewMergeIterator([]y.Iterator{it, it2, it3, it4}, true)
expectedKeys := []string{"9", "7", "5", "3", "2", "1"}
expectedVals := []string{"d9", "a7", "b5", "a3", "b2", "a1"}
mergeIt.Rewind()
k, v := getAll(mergeIt)
require.EqualValues(t, expectedKeys, k)
require.EqualValues(t, expectedVals, v)
closeAndCheck(t, mergeIt, 4)
})
t.Run("duplicates", func(t *testing.T) {
it5 := newSimpleIterator(
[]string{"1", "1", "3", "7"},
[]string{"a1", "a1-1", "a3", "a7"},
true)
mergeIt := NewMergeIterator([]y.Iterator{it5, it2, it3, it4}, true)
expectedKeys := []string{"9", "7", "5", "3", "2", "1"}
expectedVals := []string{"d9", "a7", "b5", "a3", "b2", "a1-1"}
mergeIt.Rewind()
k, v := getAll(mergeIt)
require.EqualValues(t, expectedKeys, k)
require.EqualValues(t, expectedVals, v)
closeAndCheck(t, mergeIt, 4)
})
})
}

Expand Down Expand Up @@ -304,3 +336,35 @@ func TestMergeIteratorDuplicate(t *testing.T) {
require.Equal(t, expectedVals, v)
})
}

func TestMergeDuplicates(t *testing.T) {
it := newSimpleIterator([]string{"1", "1", "1"}, []string{"a1", "a3", "a7"}, false)
it2 := newSimpleIterator([]string{"1", "1", "1"}, []string{"b2", "b3", "b5"}, false)
it3 := newSimpleIterator([]string{"1"}, []string{"c1"}, false)
it4 := newSimpleIterator([]string{"1", "1", "2"}, []string{"d1", "d7", "d9"}, false)
t.Run("forward", func(t *testing.T) {
expectedKeys := []string{"1", "2"}
expectedVals := []string{"a1", "d9"}
mergeIt := NewMergeIterator([]y.Iterator{it, it2, it3, it4}, false)
mergeIt.Rewind()
k, v := getAll(mergeIt)
require.EqualValues(t, expectedKeys, k)
require.EqualValues(t, expectedVals, v)
closeAndCheck(t, mergeIt, 4)
})

t.Run("reverse", func(t *testing.T) {
it.reversed = true
it2.reversed = true
it3.reversed = true
it4.reversed = true
expectedKeys := []string{"2", "1"}
expectedVals := []string{"d9", "a7"}
mergeIt := NewMergeIterator([]y.Iterator{it, it2, it3, it4}, true)
mergeIt.Rewind()
k, v := getAll(mergeIt)
require.EqualValues(t, expectedKeys, k)
require.EqualValues(t, expectedVals, v)
closeAndCheck(t, mergeIt, 4)
})
}

0 comments on commit 779d9a0

Please sign in to comment.