Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix list index implementation #171

Merged
merged 7 commits into from
Sep 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions engine/badgerengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func (e *Engine) Begin(writable bool) (engine.Transaction, error) {
tx := e.DB.NewTransaction(writable)

return &Transaction{
ng: e,
tx: tx,
writable: writable,
}, nil
Expand All @@ -48,6 +49,7 @@ func (e *Engine) Close() error {

// A Transaction uses Badger's transactions.
type Transaction struct {
ng *Engine
tx *badger.Txn
writable bool
discarded bool
Expand Down Expand Up @@ -110,6 +112,7 @@ func (t *Transaction) GetStore(name []byte) (engine.Store, error) {
pkey := buildStorePrefixKey(name)

return &Store{
ng: t.ng,
tx: t.tx,
prefix: pkey,
writable: t.writable,
Expand Down
25 changes: 25 additions & 0 deletions engine/badgerengine/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

// A Store is an implementation of the engine.Store interface.
type Store struct {
ng *Engine
tx *badger.Txn
prefix []byte
writable bool
Expand Down Expand Up @@ -102,6 +103,30 @@ func (s *Store) Truncate() error {
return nil
}

// NextSequence returns a monotonically increasing integer.
func (s *Store) NextSequence() (uint64, error) {
if !s.writable {
return 0, engine.ErrTransactionReadOnly
}

// TODO: this is an ineficient way of generating sequences.
// use a bigger lease in the future.
seq, err := s.ng.DB.GetSequence([]byte(s.name), 1)
if err != nil {
return 0, err
}
defer seq.Release()

nb, err := seq.Next()
if err != nil {
return 0, err
}

// the first number in a Badger sequence is always zero
// but Genji expects the first to be 1.
return nb + 1, nil
}

// NewIterator uses a Badger iterator with default options.
// Only one iterator is allowed per read-write transaction.
func (s *Store) NewIterator(cfg engine.IteratorConfig) engine.Iterator {
Expand Down
9 changes: 9 additions & 0 deletions engine/boltengine/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ func (s *Store) Truncate() error {
return err
}

// NextSequence returns a monotonically increasing integer.
func (s *Store) NextSequence() (uint64, error) {
if !s.bucket.Writable() {
return 0, engine.ErrTransactionReadOnly
}

return s.bucket.NextSequence()
}

// NewIterator uses the bucket cursor.
func (s *Store) NewIterator(cfg engine.IteratorConfig) engine.Iterator {
return &iterator{
Expand Down
2 changes: 2 additions & 0 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ type Store interface {
Truncate() error
// NewIterator creates an iterator with the given config.
NewIterator(IteratorConfig) Iterator
// NextSequence returns a monotonically increasing integer.
NextSequence() (uint64, error)
}

// IteratorConfig is used to configure an iterator upon creation.
Expand Down
63 changes: 63 additions & 0 deletions engine/enginetest/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func TestSuite(t *testing.T, builder Builder) {
{"Store/Get", TestStoreGet},
{"Store/Delete", TestStoreDelete},
{"Store/Truncate", TestStoreTruncate},
{"Store/NextSequence", TestStoreNextSequence},
{"TestQueries", TestQueries},
{"TestQueriesSameTransaction", TestQueriesSameTransaction},
}
Expand Down Expand Up @@ -784,6 +785,68 @@ func TestStoreTruncate(t *testing.T, builder Builder) {
})
}

// TestStoreNextSequence verifies NextSequence behaviour.
func TestStoreNextSequence(t *testing.T, builder Builder) {
t.Run("Should fail if tx not writable", func(t *testing.T) {
ng, cleanup := builder()
defer cleanup()
tx, err := ng.Begin(true)
require.NoError(t, err)
err = tx.CreateStore([]byte("test"))
require.NoError(t, err)
err = tx.Commit()
require.NoError(t, err)

tx, err = ng.Begin(false)
require.NoError(t, err)
defer tx.Rollback()

st, err := tx.GetStore([]byte("test"))
require.NoError(t, err)

_, err = st.NextSequence()
require.Error(t, err)
})

t.Run("Should return the next sequence", func(t *testing.T) {
st, cleanup := storeBuilder(t, builder)
defer cleanup()

for i := uint64(1); i < 100; i++ {
s, err := st.NextSequence()
require.NoError(t, err)
require.Equal(t, i, s)
}
})

t.Run("Should store the last sequence", func(t *testing.T) {
ng, cleanup := builder()
defer cleanup()
tx, err := ng.Begin(true)
require.NoError(t, err)
err = tx.CreateStore([]byte("test"))
require.NoError(t, err)
st, err := tx.GetStore([]byte("test"))
require.NoError(t, err)

s1, err := st.NextSequence()
require.NoError(t, err)

err = tx.Commit()
require.NoError(t, err)

tx, err = ng.Begin(true)
require.NoError(t, err)
defer tx.Rollback()

st, err = tx.GetStore([]byte("test"))
require.NoError(t, err)
s2, err := st.NextSequence()
require.NoError(t, err)
require.Equal(t, s1+1, s2)
})
}

// TestQueries test simple queries against the engine.
func TestQueries(t *testing.T, builder Builder) {
t.Run("SELECT", func(t *testing.T) {
Expand Down
13 changes: 7 additions & 6 deletions engine/memoryengine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@ const btreeDegree = 12
// Engine is a simple memory engine implementation that stores data in
// an in-memory Btree. It allows multiple readers and one single writer.
type Engine struct {
closed bool
stores map[string]*btree.BTree

mu sync.RWMutex
closed bool
stores map[string]*btree.BTree
sequences map[string]uint64
mu sync.RWMutex
}

// NewEngine creates an in-memory engine.
func NewEngine() *Engine {
return &Engine{
stores: make(map[string]*btree.BTree),
stores: make(map[string]*btree.BTree),
sequences: make(map[string]uint64),
}
}

Expand Down Expand Up @@ -124,7 +125,7 @@ func (tx *transaction) GetStore(name []byte) (engine.Store, error) {
return nil, engine.ErrStoreNotFound
}

return &storeTx{tx: tx, tr: tr}, nil
return &storeTx{tx: tx, tr: tr, name: string(name)}, nil
}

func (tx *transaction) CreateStore(name []byte) error {
Expand Down
16 changes: 14 additions & 2 deletions engine/memoryengine/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ func (i *item) Less(than btree.Item) bool {

// storeTx implements an engine.Store.
type storeTx struct {
tr *btree.BTree
tx *transaction
tr *btree.BTree
tx *transaction
name string
}

func (s *storeTx) Put(k, v []byte) error {
Expand Down Expand Up @@ -159,6 +160,17 @@ func (s *storeTx) Truncate() error {
return nil
}

// NextSequence returns a monotonically increasing integer.
func (s *storeTx) NextSequence() (uint64, error) {
if !s.tx.writable {
return 0, engine.ErrTransactionReadOnly
}

s.tx.ng.sequences[s.name]++

return s.tx.ng.sequences[s.name], nil
}

func (s *storeTx) NewIterator(cfg engine.IteratorConfig) engine.Iterator {
return &iterator{
tx: s.tx,
Expand Down
13 changes: 4 additions & 9 deletions index/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ import (
)

const (
// StorePrefix is the prefix used to name the index store.
StorePrefix = "i" + string(separator)

separator byte = 0x1E
// storePrefix is the prefix used to name the index stores.
storePrefix = "i"
)

var valueTypes = []document.ValueType{
Expand Down Expand Up @@ -75,13 +73,10 @@ func NewUniqueIndex(tx engine.Transaction, idxName string) *UniqueIndex {
func buildIndexName(name []byte, t document.ValueType) []byte {
var buf bytes.Buffer

// We can deterministically set the size of the buffer.
// The last 2 bytes are for the separator and the Type t.
buf.Grow(len(StorePrefix) + len(name) + 2)
buf.Grow(len(storePrefix) + len(name) + 1)

buf.WriteString(StorePrefix)
buf.WriteString(storePrefix)
buf.Write(name)
buf.WriteByte(separator)
buf.WriteByte(byte(t))

return buf.Bytes()
Expand Down
32 changes: 32 additions & 0 deletions index/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,38 @@ func TestIndexAscendGreaterThan(t *testing.T) {
require.Equal(t, 10, texts)
})
}

t.Run("Unique: false, Must iterate through similar values properly", func(t *testing.T) {
idx, cleanup := getIndex(t, false)
defer cleanup()

for i := int64(0); i < 100; i++ {
require.NoError(t, idx.Set(document.NewIntegerValue(1), key.AppendInt64(nil, i)))
require.NoError(t, idx.Set(document.NewTextValue("1"), key.AppendInt64(nil, i)))
}

var ints, texts int
i := int64(0)
err := idx.AscendGreaterOrEqual(document.Value{Type: document.IntegerValue}, func(val, rid []byte, isEqual bool) error {
requireEqualEncoded(t, document.NewDoubleValue(1), val)
require.Equal(t, key.AppendInt64(nil, i), rid)
i++
ints++
return nil
})

i = 0
err = idx.AscendGreaterOrEqual(document.Value{Type: document.TextValue}, func(val, rid []byte, isEqual bool) error {
requireEqualEncoded(t, document.NewTextValue("1"), val)
require.Equal(t, key.AppendInt64(nil, i), rid)
i++
texts++
return nil
})
require.NoError(t, err)
require.Equal(t, 100, ints)
require.Equal(t, 100, texts)
})
}

func TestIndexDescendLessOrEqual(t *testing.T) {
Expand Down
59 changes: 41 additions & 18 deletions index/list.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ func (idx *ListIndex) Set(v document.Value, k []byte) error {
return err
}

enc := key.AppendValue(nil, v)

buf := make([]byte, 0, len(enc)+len(k)+1)
buf = append(buf, enc...)
buf = append(buf, separator)
buf = append(buf, k...)
buf := key.AppendValue(nil, v)
seq, err := st.NextSequence()
if err != nil {
return err
}
buf = key.AppendInt64(buf, int64(seq))

return st.Put(buf, nil)
return st.Put(buf, k)
}

// Delete all the references to the key from the index.
Expand All @@ -52,19 +52,35 @@ func (idx *ListIndex) Delete(v document.Value, k []byte) error {
}
}

enc := key.AppendValue(nil, v)

st, err := getOrCreateStore(idx.tx, v.Type, idx.name)
if err != nil {
return err
}

buf := make([]byte, 0, len(enc)+len(k)+1)
buf = append(buf, enc...)
buf = append(buf, separator)
buf = append(buf, k...)
seek := key.AppendValue(nil, v)

it := st.NewIterator(engine.IteratorConfig{})
defer it.Close()

var buf []byte
var toDelete []byte
for it.Seek(seek); it.Valid(); it.Next() {
item := it.Item()
buf, err = item.ValueCopy(buf)
if err != nil {
return err
}
if bytes.Equal(buf, k) {
toDelete = item.Key()
break
}
}

if toDelete != nil {
return st.Delete(toDelete)
}

return st.Delete(buf)
return engine.ErrKeyNotFound
}

// AscendGreaterOrEqual seeks for the pivot and then goes through all the subsequent key value pairs in increasing order and calls the given function for each pair.
Expand Down Expand Up @@ -134,18 +150,25 @@ func (idx *ListIndex) iterateOnStore(pivot document.Value, reverse bool, fn func
seek = enc

if reverse {
seek = append(seek, separator, 0xFF)
seek = append(seek, 0xFF)
}
}

it := st.NewIterator(engine.IteratorConfig{Reverse: reverse})
defer it.Close()

var buf []byte
for it.Seek(seek); it.Valid(); it.Next() {
item := it.Item()
k := item.Key()
idx := bytes.LastIndexByte(k, separator)
err = fn(k[:idx], k[idx+1:], bytes.Equal(k[:idx], enc))
v := item.Key()
idx := len(v) - 8

buf, err = item.ValueCopy(buf)
if err != nil {
return err
}

err = fn(v[:idx], buf, bytes.Equal(v[:idx], enc))
if err != nil {
return err
}
Expand Down
Loading