Skip to content

Commit

Permalink
Merge PR #4265: CacheKVStore keep sorted items
Browse files Browse the repository at this point in the history
  • Loading branch information
mossid authored and alexanderbez committed May 15, 2019
1 parent d8bd7bc commit dfef88d
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 36 deletions.
1 change: 1 addition & 0 deletions .pending/improvements/sdk/2286-Improve-perform
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#2286 Improve performance of CacheKVStore iterator.
45 changes: 33 additions & 12 deletions store/cachekv/memiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cachekv

import (
"bytes"
"container/list"

cmn "github.com/tendermint/tendermint/libs/common"
dbm "github.com/tendermint/tendermint/libs/db"
Expand All @@ -12,20 +13,30 @@ import (
// Implements Iterator.
type memIterator struct {
start, end []byte
items []cmn.KVPair
items []*cmn.KVPair
ascending bool
}

func newMemIterator(start, end []byte, items []cmn.KVPair) *memIterator {
itemsInDomain := make([]cmn.KVPair, 0)
for _, item := range items {
if dbm.IsKeyInDomain(item.Key, start, end) {
itemsInDomain = append(itemsInDomain, item)
func newMemIterator(start, end []byte, items *list.List, ascending bool) *memIterator {
itemsInDomain := make([]*cmn.KVPair, 0)
var entered bool
for e := items.Front(); e != nil; e = e.Next() {
item := e.Value.(*cmn.KVPair)
if !dbm.IsKeyInDomain(item.Key, start, end) {
if entered {
break
}
continue
}
itemsInDomain = append(itemsInDomain, item)
entered = true
}

return &memIterator{
start: start,
end: end,
items: itemsInDomain,
start: start,
end: end,
items: itemsInDomain,
ascending: ascending,
}
}

Expand All @@ -45,17 +56,27 @@ func (mi *memIterator) assertValid() {

func (mi *memIterator) Next() {
mi.assertValid()
mi.items = mi.items[1:]
if mi.ascending {
mi.items = mi.items[1:]
} else {
mi.items = mi.items[:len(mi.items)-1]
}
}

func (mi *memIterator) Key() []byte {
mi.assertValid()
return mi.items[0].Key
if mi.ascending {
return mi.items[0].Key
}
return mi.items[len(mi.items)-1].Key
}

func (mi *memIterator) Value() []byte {
mi.assertValid()
return mi.items[0].Value
if mi.ascending {
return mi.items[0].Value
}
return mi.items[len(mi.items)-1].Value
}

func (mi *memIterator) Close() {
Expand Down
73 changes: 51 additions & 22 deletions store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cachekv

import (
"bytes"
"container/list"
"io"
"sort"
"sync"
Expand All @@ -24,18 +25,22 @@ type cValue struct {

// Store wraps an in-memory cache around an underlying types.KVStore.
type Store struct {
mtx sync.Mutex
cache map[string]cValue
parent types.KVStore
mtx sync.Mutex
cache map[string]*cValue
unsortedCache map[string]struct{}
sortedCache *list.List // always ascending sorted
parent types.KVStore
}

var _ types.CacheKVStore = (*Store)(nil)

// nolint
func NewStore(parent types.KVStore) *Store {
return &Store{
cache: make(map[string]cValue),
parent: parent,
cache: make(map[string]*cValue),
unsortedCache: make(map[string]struct{}),
sortedCache: list.New(),
parent: parent,
}
}

Expand Down Expand Up @@ -116,7 +121,9 @@ func (store *Store) Write() {
}

// Clear the cache
store.cache = make(map[string]cValue)
store.cache = make(map[string]*cValue)
store.unsortedCache = make(map[string]struct{})
store.sortedCache = list.New()
}

//----------------------------------------
Expand Down Expand Up @@ -146,6 +153,9 @@ func (store *Store) ReverseIterator(start, end []byte) types.Iterator {
}

func (store *Store) iterator(start, end []byte, ascending bool) types.Iterator {
store.mtx.Lock()
defer store.mtx.Unlock()

var parent, cache types.Iterator

if ascending {
Expand All @@ -154,43 +164,62 @@ func (store *Store) iterator(start, end []byte, ascending bool) types.Iterator {
parent = store.parent.ReverseIterator(start, end)
}

items := store.dirtyItems(start, end, ascending)
cache = newMemIterator(start, end, items)
store.dirtyItems(start, end)
cache = newMemIterator(start, end, store.sortedCache, ascending)

return newCacheMergeIterator(parent, cache, ascending)
}

// Constructs a slice of dirty items, to use w/ memIterator.
func (store *Store) dirtyItems(start, end []byte, ascending bool) []cmn.KVPair {
items := make([]cmn.KVPair, 0)
func (store *Store) dirtyItems(start, end []byte) {
unsorted := make([]*cmn.KVPair, 0)

for key, cacheValue := range store.cache {
if !cacheValue.dirty {
continue
}
for key := range store.unsortedCache {
cacheValue := store.cache[key]
if dbm.IsKeyInDomain([]byte(key), start, end) {
items = append(items, cmn.KVPair{Key: []byte(key), Value: cacheValue.value})
unsorted = append(unsorted, &cmn.KVPair{Key: []byte(key), Value: cacheValue.value})
delete(store.unsortedCache, key)
}
}

sort.Slice(items, func(i, j int) bool {
if ascending {
return bytes.Compare(items[i].Key, items[j].Key) < 0
}
return bytes.Compare(items[i].Key, items[j].Key) > 0
sort.Slice(unsorted, func(i, j int) bool {
return bytes.Compare(unsorted[i].Key, unsorted[j].Key) < 0
})

return items
for e := store.sortedCache.Front(); e != nil && len(unsorted) != 0; {
uitem := unsorted[0]
sitem := e.Value.(*cmn.KVPair)
comp := bytes.Compare(uitem.Key, sitem.Key)
switch comp {
case -1:
unsorted = unsorted[1:]
store.sortedCache.InsertBefore(uitem, e)
case 1:
e = e.Next()
case 0:
unsorted = unsorted[1:]
e.Value = uitem
e = e.Next()
}
}

for _, kvp := range unsorted {
store.sortedCache.PushBack(kvp)
}

}

//----------------------------------------
// etc

// Only entrypoint to mutate store.cache.
func (store *Store) setCacheValue(key, value []byte, deleted bool, dirty bool) {
store.cache[string(key)] = cValue{
store.cache[string(key)] = &cValue{
value: value,
deleted: deleted,
dirty: dirty,
}
if dirty {
store.unsortedCache[string(key)] = struct{}{}
}
}
46 changes: 46 additions & 0 deletions store/cachekv/store_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package cachekv_test

import (
"crypto/rand"
"sort"
"testing"

dbm "github.com/tendermint/tendermint/libs/db"

"github.com/cosmos/cosmos-sdk/store/cachekv"
"github.com/cosmos/cosmos-sdk/store/dbadapter"
)

func benchmarkCacheKVStoreIterator(numKVs int, b *testing.B) {
mem := dbadapter.Store{DB: dbm.NewMemDB()}
cstore := cachekv.NewStore(mem)
keys := make([]string, numKVs, numKVs)

for i := 0; i < numKVs; i++ {
key := make([]byte, 32)
value := make([]byte, 32)

_, _ = rand.Read(key)
_, _ = rand.Read(value)

keys[i] = string(key)
cstore.Set(key, value)
}

sort.Strings(keys)

for n := 0; n < b.N; n++ {
iter := cstore.Iterator([]byte(keys[0]), []byte(keys[numKVs-1]))

for _ = iter.Key(); iter.Valid(); iter.Next() {
}

iter.Close()
}
}

func BenchmarkCacheKVStoreIterator500(b *testing.B) { benchmarkCacheKVStoreIterator(500, b) }
func BenchmarkCacheKVStoreIterator1000(b *testing.B) { benchmarkCacheKVStoreIterator(1000, b) }
func BenchmarkCacheKVStoreIterator10000(b *testing.B) { benchmarkCacheKVStoreIterator(10000, b) }
func BenchmarkCacheKVStoreIterator50000(b *testing.B) { benchmarkCacheKVStoreIterator(50000, b) }
func BenchmarkCacheKVStoreIterator100000(b *testing.B) { benchmarkCacheKVStoreIterator(100000, b) }
4 changes: 2 additions & 2 deletions store/cachekv/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,8 @@ func TestCacheKVMergeIteratorRandom(t *testing.T) {
st := newCacheKVStore()
truth := dbm.NewMemDB()

start, end := 25, 75
max := 100
start, end := 25, 975
max := 1000
setRange(st, truth, start, end)

// do an op, test the iterator
Expand Down

0 comments on commit dfef88d

Please sign in to comment.