Skip to content

Commit

Permalink
refactor: make cachekv store thread-safe again (#14378)
Browse files Browse the repository at this point in the history
Co-authored-by: Aleksandr Bezobchuk <[email protected]>
  • Loading branch information
yihuang and alexanderbez authored Dec 21, 2022
1 parent 9983148 commit f1ee974
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 80 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (x/auth/vesting) [#13502](https://github.com/cosmos/cosmos-sdk/pull/13502) Add Amino Msg registration for `MsgCreatePeriodicVestingAccount`.
* (x/auth)[#13780](https://github.com/cosmos/cosmos-sdk/pull/13780) `id` (type of int64) in `AccountAddressByID` grpc query is now deprecated, update to account-id(type of uint64) to use `AccountAddressByID`.
* (x/group) [#13876](https://github.com/cosmos/cosmos-sdk/pull/13876) Fix group MinExecutionPeriod that is checked on execution now, instead of voting period end.
* (store) [#14378](https://github.com/cosmos/cosmos-sdk/pull/14378) The `CacheKV` store is thread-safe again, which includes improved iteration and deletion logic. Iteration is on a strictly isolated view now, which is breaking from previous behavior.

### API Breaking Changes

Expand Down
38 changes: 24 additions & 14 deletions store/cachekv/internal/btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"errors"

"github.com/cosmos/cosmos-sdk/store/types"
"github.com/tidwall/btree"
)

Expand All @@ -21,46 +22,55 @@ var errKeyEmpty = errors.New("key cannot be empty")
//
// We choose tidwall/btree over google/btree here because it provides API to implement step iterator directly.
type BTree struct {
tree btree.BTreeG[item]
tree *btree.BTreeG[item]
}

// NewBTree creates a wrapper around `btree.BTreeG`.
func NewBTree() *BTree {
return &BTree{tree: *btree.NewBTreeGOptions(byKeys, btree.Options{
Degree: bTreeDegree,
// Contract: cachekv store must not be called concurrently
NoLocks: true,
})}
func NewBTree() BTree {
return BTree{
tree: btree.NewBTreeGOptions(byKeys, btree.Options{
Degree: bTreeDegree,
NoLocks: false,
}),
}
}

func (bt *BTree) Set(key, value []byte) {
func (bt BTree) Set(key, value []byte) {
bt.tree.Set(newItem(key, value))
}

func (bt *BTree) Get(key []byte) []byte {
func (bt BTree) Get(key []byte) []byte {
i, found := bt.tree.Get(newItem(key, nil))
if !found {
return nil
}
return i.value
}

func (bt *BTree) Delete(key []byte) {
func (bt BTree) Delete(key []byte) {
bt.tree.Delete(newItem(key, nil))
}

func (bt *BTree) Iterator(start, end []byte) (*memIterator, error) { //nolint:revive
func (bt BTree) Iterator(start, end []byte) (types.Iterator, error) {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, errKeyEmpty
}
return NewMemIterator(start, end, bt, make(map[string]struct{}), true), nil
return newMemIterator(start, end, bt, true), nil
}

func (bt *BTree) ReverseIterator(start, end []byte) (*memIterator, error) { //nolint:revive
func (bt BTree) ReverseIterator(start, end []byte) (types.Iterator, error) {
if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) {
return nil, errKeyEmpty
}
return NewMemIterator(start, end, bt, make(map[string]struct{}), false), nil
return newMemIterator(start, end, bt, false), nil
}

// Copy the tree. This is a copy-on-write operation and is very fast because
// it only performs a shadowed copy.
func (bt BTree) Copy() BTree {
return BTree{
tree: bt.tree.Copy(),
}
}

// item is a btree item with byte slices as keys and values
Expand Down
3 changes: 2 additions & 1 deletion store/cachekv/internal/btree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"testing"

"github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -181,7 +182,7 @@ func TestDBIterator(t *testing.T) {
verifyIterator(t, ritr, nil, "reverse iterator with empty db")
}

func verifyIterator(t *testing.T, itr *memIterator, expected []int64, msg string) {
func verifyIterator(t *testing.T, itr types.Iterator, expected []int64, msg string) {
i := 0
for itr.Valid() {
key := itr.Key()
Expand Down
24 changes: 3 additions & 21 deletions store/cachekv/internal/memiterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,18 @@ import (
var _ types.Iterator = (*memIterator)(nil)

// memIterator iterates over iterKVCache items.
// if key is nil, means it was deleted.
// if value is nil, means it was deleted.
// Implements Iterator.
type memIterator struct {
iter btree.IterG[item]

start []byte
end []byte
ascending bool
lastKey []byte
deleted map[string]struct{}
valid bool
}

func NewMemIterator(start, end []byte, items *BTree, deleted map[string]struct{}, ascending bool) *memIterator { //nolint:revive
func newMemIterator(start, end []byte, items BTree, ascending bool) *memIterator {
iter := items.tree.Iter()
var valid bool
if ascending {
Expand Down Expand Up @@ -52,8 +50,6 @@ func NewMemIterator(start, end []byte, items *BTree, deleted map[string]struct{}
start: start,
end: end,
ascending: ascending,
lastKey: nil,
deleted: deleted,
valid: valid,
}

Expand Down Expand Up @@ -113,21 +109,7 @@ func (mi *memIterator) Key() []byte {
}

func (mi *memIterator) Value() []byte {
item := mi.iter.Item()
key := item.key
// We need to handle the case where deleted is modified and includes our current key
// We handle this by maintaining a lastKey object in the iterator.
// If the current key is the same as the last key (and last key is not nil / the start)
// then we are calling value on the same thing as last time.
// Therefore we don't check the mi.deleted to see if this key is included in there.
if _, ok := mi.deleted[string(key)]; ok {
if mi.lastKey == nil || !bytes.Equal(key, mi.lastKey) {
// not re-calling on old last key
return nil
}
}
mi.lastKey = key
return item.value
return mi.iter.Item().value
}

func (mi *memIterator) assertValid() {
Expand Down
71 changes: 27 additions & 44 deletions store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@ type cValue struct {
}

// Store wraps an in-memory cache around an underlying types.KVStore.
// If a cached value is nil but deleted is defined for the corresponding key,
// it means the parent doesn't have the key. (No need to delete upon Write())
type Store struct {
mtx sync.Mutex
cache map[string]*cValue
deleted map[string]struct{}
unsortedCache map[string]struct{}
sortedCache *internal.BTree // always ascending sorted
sortedCache internal.BTree // always ascending sorted
parent types.KVStore
}

Expand All @@ -41,7 +38,6 @@ var _ types.CacheKVStore = (*Store)(nil)
func NewStore(parent types.KVStore) *Store {
return &Store{
cache: make(map[string]*cValue),
deleted: make(map[string]struct{}),
unsortedCache: make(map[string]struct{}),
sortedCache: internal.NewBTree(),
parent: parent,
Expand All @@ -63,7 +59,7 @@ func (store *Store) Get(key []byte) (value []byte) {
cacheValue, ok := store.cache[conv.UnsafeBytesToStr(key)]
if !ok {
value = store.parent.Get(key)
store.setCacheValue(key, value, false, false)
store.setCacheValue(key, value, false)
} else {
value = cacheValue.value
}
Expand All @@ -79,7 +75,7 @@ func (store *Store) Set(key []byte, value []byte) {
types.AssertValidKey(key)
types.AssertValidValue(value)

store.setCacheValue(key, value, false, true)
store.setCacheValue(key, value, true)
}

// Has implements types.KVStore.
Expand All @@ -94,15 +90,15 @@ func (store *Store) Delete(key []byte) {
defer store.mtx.Unlock()

types.AssertValidKey(key)
store.setCacheValue(key, nil, true, true)
store.setCacheValue(key, nil, true)
}

// Implements Cachetypes.KVStore.
func (store *Store) Write() {
store.mtx.Lock()
defer store.mtx.Unlock()

if len(store.cache) == 0 && len(store.deleted) == 0 && len(store.unsortedCache) == 0 {
if len(store.cache) == 0 && len(store.unsortedCache) == 0 {
store.sortedCache = internal.NewBTree()
return
}
Expand All @@ -122,19 +118,16 @@ func (store *Store) Write() {
// TODO: Consider allowing usage of Batch, which would allow the write to
// at least happen atomically.
for _, key := range keys {
if store.isDeleted(key) {
// We use []byte(key) instead of conv.UnsafeStrToBytes because we cannot
// be sure if the underlying store might do a save with the byteslice or
// not. Once we get confirmation that .Delete is guaranteed not to
// save the byteslice, then we can assume only a read-only copy is sufficient.
store.parent.Delete([]byte(key))
continue
}

// We use []byte(key) instead of conv.UnsafeStrToBytes because we cannot
// be sure if the underlying store might do a save with the byteslice or
// not. Once we get confirmation that .Delete is guaranteed not to
// save the byteslice, then we can assume only a read-only copy is sufficient.
cacheValue := store.cache[key]
if cacheValue.value != nil {
// It already exists in the parent, hence delete it.
// It already exists in the parent, hence update it.
store.parent.Set([]byte(key), cacheValue.value)
} else {
store.parent.Delete([]byte(key))
}
}

Expand All @@ -144,9 +137,6 @@ func (store *Store) Write() {
for key := range store.cache {
delete(store.cache, key)
}
for key := range store.deleted {
delete(store.deleted, key)
}
for key := range store.unsortedCache {
delete(store.unsortedCache, key)
}
Expand Down Expand Up @@ -180,16 +170,24 @@ func (store *Store) iterator(start, end []byte, ascending bool) types.Iterator {
store.mtx.Lock()
defer store.mtx.Unlock()

var parent, cache types.Iterator
store.dirtyItems(start, end)
isoSortedCache := store.sortedCache.Copy()

var (
err error
parent, cache types.Iterator
)

if ascending {
parent = store.parent.Iterator(start, end)
cache, err = isoSortedCache.Iterator(start, end)
} else {
parent = store.parent.ReverseIterator(start, end)
cache, err = isoSortedCache.ReverseIterator(start, end)
}
if err != nil {
panic(err)
}

store.dirtyItems(start, end)
cache = internal.NewMemIterator(start, end, store.sortedCache, store.deleted, ascending)

return internal.NewCacheMergeIterator(parent, cache, ascending)
}
Expand Down Expand Up @@ -370,13 +368,7 @@ func (store *Store) clearUnsortedCacheSubset(unsorted []*kv.Pair, sortState sort
}

for _, item := range unsorted {
if item.Value == nil {
// deleted element, tracked by store.deleted
// setting arbitrary value
store.sortedCache.Set(item.Key, []byte{})
continue
}

// sortedCache is able to store `nil` value to represent deleted items.
store.sortedCache.Set(item.Key, item.Value)
}
}
Expand All @@ -385,23 +377,14 @@ func (store *Store) clearUnsortedCacheSubset(unsorted []*kv.Pair, sortState sort
// etc

// Only entrypoint to mutate store.cache.
func (store *Store) setCacheValue(key, value []byte, deleted bool, dirty bool) {
// A `nil` value means a deletion.
func (store *Store) setCacheValue(key, value []byte, dirty bool) {
keyStr := conv.UnsafeBytesToStr(key)
store.cache[keyStr] = &cValue{
value: value,
dirty: dirty,
}
if deleted {
store.deleted[keyStr] = struct{}{}
} else {
delete(store.deleted, keyStr)
}
if dirty {
store.unsortedCache[keyStr] = struct{}{}
}
}

func (store *Store) isDeleted(key string) bool {
_, ok := store.deleted[key]
return ok
}

0 comments on commit f1ee974

Please sign in to comment.