From f1ee974ec823c720696b7d3940d38de69c89c2c8 Mon Sep 17 00:00:00 2001 From: yihuang Date: Thu, 22 Dec 2022 03:58:18 +0800 Subject: [PATCH] refactor: make cachekv store thread-safe again (#14378) Co-authored-by: Aleksandr Bezobchuk --- CHANGELOG.md | 1 + store/cachekv/internal/btree.go | 38 ++++++++------ store/cachekv/internal/btree_test.go | 3 +- store/cachekv/internal/memiterator.go | 24 ++------- store/cachekv/store.go | 71 ++++++++++----------------- 5 files changed, 57 insertions(+), 80 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0d8c6e31f426..1b30e097b786 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -128,6 +128,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * (x/auth/vesting) [#13502](https://github.com/cosmos/cosmos-sdk/pull/13502) Add Amino Msg registration for `MsgCreatePeriodicVestingAccount`. * (x/auth)[#13780](https://github.com/cosmos/cosmos-sdk/pull/13780) `id` (type of int64) in `AccountAddressByID` grpc query is now deprecated, update to account-id(type of uint64) to use `AccountAddressByID`. * (x/group) [#13876](https://github.com/cosmos/cosmos-sdk/pull/13876) Fix group MinExecutionPeriod that is checked on execution now, instead of voting period end. +* (store) [#14378](https://github.com/cosmos/cosmos-sdk/pull/14378) The `CacheKV` store is thread-safe again, which includes improved iteration and deletion logic. Iteration is on a strictly isolated view now, which is breaking from previous behavior. ### API Breaking Changes diff --git a/store/cachekv/internal/btree.go b/store/cachekv/internal/btree.go index c09b33fab217..d0340022f2c5 100644 --- a/store/cachekv/internal/btree.go +++ b/store/cachekv/internal/btree.go @@ -4,6 +4,7 @@ import ( "bytes" "errors" + "github.com/cosmos/cosmos-sdk/store/types" "github.com/tidwall/btree" ) @@ -21,23 +22,24 @@ var errKeyEmpty = errors.New("key cannot be empty") // // We choose tidwall/btree over google/btree here because it provides API to implement step iterator directly. type BTree struct { - tree btree.BTreeG[item] + tree *btree.BTreeG[item] } // NewBTree creates a wrapper around `btree.BTreeG`. -func NewBTree() *BTree { - return &BTree{tree: *btree.NewBTreeGOptions(byKeys, btree.Options{ - Degree: bTreeDegree, - // Contract: cachekv store must not be called concurrently - NoLocks: true, - })} +func NewBTree() BTree { + return BTree{ + tree: btree.NewBTreeGOptions(byKeys, btree.Options{ + Degree: bTreeDegree, + NoLocks: false, + }), + } } -func (bt *BTree) Set(key, value []byte) { +func (bt BTree) Set(key, value []byte) { bt.tree.Set(newItem(key, value)) } -func (bt *BTree) Get(key []byte) []byte { +func (bt BTree) Get(key []byte) []byte { i, found := bt.tree.Get(newItem(key, nil)) if !found { return nil @@ -45,22 +47,30 @@ func (bt *BTree) Get(key []byte) []byte { return i.value } -func (bt *BTree) Delete(key []byte) { +func (bt BTree) Delete(key []byte) { bt.tree.Delete(newItem(key, nil)) } -func (bt *BTree) Iterator(start, end []byte) (*memIterator, error) { //nolint:revive +func (bt BTree) Iterator(start, end []byte) (types.Iterator, error) { if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { return nil, errKeyEmpty } - return NewMemIterator(start, end, bt, make(map[string]struct{}), true), nil + return newMemIterator(start, end, bt, true), nil } -func (bt *BTree) ReverseIterator(start, end []byte) (*memIterator, error) { //nolint:revive +func (bt BTree) ReverseIterator(start, end []byte) (types.Iterator, error) { if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { return nil, errKeyEmpty } - return NewMemIterator(start, end, bt, make(map[string]struct{}), false), nil + return newMemIterator(start, end, bt, false), nil +} + +// Copy the tree. This is a copy-on-write operation and is very fast because +// it only performs a shadowed copy. +func (bt BTree) Copy() BTree { + return BTree{ + tree: bt.tree.Copy(), + } } // item is a btree item with byte slices as keys and values diff --git a/store/cachekv/internal/btree_test.go b/store/cachekv/internal/btree_test.go index f85a8bbaf109..b6aa22db8e0f 100644 --- a/store/cachekv/internal/btree_test.go +++ b/store/cachekv/internal/btree_test.go @@ -3,6 +3,7 @@ package internal import ( "testing" + "github.com/cosmos/cosmos-sdk/store/types" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/stretchr/testify/require" ) @@ -181,7 +182,7 @@ func TestDBIterator(t *testing.T) { verifyIterator(t, ritr, nil, "reverse iterator with empty db") } -func verifyIterator(t *testing.T, itr *memIterator, expected []int64, msg string) { +func verifyIterator(t *testing.T, itr types.Iterator, expected []int64, msg string) { i := 0 for itr.Valid() { key := itr.Key() diff --git a/store/cachekv/internal/memiterator.go b/store/cachekv/internal/memiterator.go index 34c3796c0677..b2fa93d3f71c 100644 --- a/store/cachekv/internal/memiterator.go +++ b/store/cachekv/internal/memiterator.go @@ -11,7 +11,7 @@ import ( var _ types.Iterator = (*memIterator)(nil) // memIterator iterates over iterKVCache items. -// if key is nil, means it was deleted. +// if value is nil, means it was deleted. // Implements Iterator. type memIterator struct { iter btree.IterG[item] @@ -19,12 +19,10 @@ type memIterator struct { start []byte end []byte ascending bool - lastKey []byte - deleted map[string]struct{} valid bool } -func NewMemIterator(start, end []byte, items *BTree, deleted map[string]struct{}, ascending bool) *memIterator { //nolint:revive +func newMemIterator(start, end []byte, items BTree, ascending bool) *memIterator { iter := items.tree.Iter() var valid bool if ascending { @@ -52,8 +50,6 @@ func NewMemIterator(start, end []byte, items *BTree, deleted map[string]struct{} start: start, end: end, ascending: ascending, - lastKey: nil, - deleted: deleted, valid: valid, } @@ -113,21 +109,7 @@ func (mi *memIterator) Key() []byte { } func (mi *memIterator) Value() []byte { - item := mi.iter.Item() - key := item.key - // We need to handle the case where deleted is modified and includes our current key - // We handle this by maintaining a lastKey object in the iterator. - // If the current key is the same as the last key (and last key is not nil / the start) - // then we are calling value on the same thing as last time. - // Therefore we don't check the mi.deleted to see if this key is included in there. - if _, ok := mi.deleted[string(key)]; ok { - if mi.lastKey == nil || !bytes.Equal(key, mi.lastKey) { - // not re-calling on old last key - return nil - } - } - mi.lastKey = key - return item.value + return mi.iter.Item().value } func (mi *memIterator) assertValid() { diff --git a/store/cachekv/store.go b/store/cachekv/store.go index e9f9d7195f4c..666a71257354 100644 --- a/store/cachekv/store.go +++ b/store/cachekv/store.go @@ -24,14 +24,11 @@ type cValue struct { } // Store wraps an in-memory cache around an underlying types.KVStore. -// If a cached value is nil but deleted is defined for the corresponding key, -// it means the parent doesn't have the key. (No need to delete upon Write()) type Store struct { mtx sync.Mutex cache map[string]*cValue - deleted map[string]struct{} unsortedCache map[string]struct{} - sortedCache *internal.BTree // always ascending sorted + sortedCache internal.BTree // always ascending sorted parent types.KVStore } @@ -41,7 +38,6 @@ var _ types.CacheKVStore = (*Store)(nil) func NewStore(parent types.KVStore) *Store { return &Store{ cache: make(map[string]*cValue), - deleted: make(map[string]struct{}), unsortedCache: make(map[string]struct{}), sortedCache: internal.NewBTree(), parent: parent, @@ -63,7 +59,7 @@ func (store *Store) Get(key []byte) (value []byte) { cacheValue, ok := store.cache[conv.UnsafeBytesToStr(key)] if !ok { value = store.parent.Get(key) - store.setCacheValue(key, value, false, false) + store.setCacheValue(key, value, false) } else { value = cacheValue.value } @@ -79,7 +75,7 @@ func (store *Store) Set(key []byte, value []byte) { types.AssertValidKey(key) types.AssertValidValue(value) - store.setCacheValue(key, value, false, true) + store.setCacheValue(key, value, true) } // Has implements types.KVStore. @@ -94,7 +90,7 @@ func (store *Store) Delete(key []byte) { defer store.mtx.Unlock() types.AssertValidKey(key) - store.setCacheValue(key, nil, true, true) + store.setCacheValue(key, nil, true) } // Implements Cachetypes.KVStore. @@ -102,7 +98,7 @@ func (store *Store) Write() { store.mtx.Lock() defer store.mtx.Unlock() - if len(store.cache) == 0 && len(store.deleted) == 0 && len(store.unsortedCache) == 0 { + if len(store.cache) == 0 && len(store.unsortedCache) == 0 { store.sortedCache = internal.NewBTree() return } @@ -122,19 +118,16 @@ func (store *Store) Write() { // TODO: Consider allowing usage of Batch, which would allow the write to // at least happen atomically. for _, key := range keys { - if store.isDeleted(key) { - // We use []byte(key) instead of conv.UnsafeStrToBytes because we cannot - // be sure if the underlying store might do a save with the byteslice or - // not. Once we get confirmation that .Delete is guaranteed not to - // save the byteslice, then we can assume only a read-only copy is sufficient. - store.parent.Delete([]byte(key)) - continue - } - + // We use []byte(key) instead of conv.UnsafeStrToBytes because we cannot + // be sure if the underlying store might do a save with the byteslice or + // not. Once we get confirmation that .Delete is guaranteed not to + // save the byteslice, then we can assume only a read-only copy is sufficient. cacheValue := store.cache[key] if cacheValue.value != nil { - // It already exists in the parent, hence delete it. + // It already exists in the parent, hence update it. store.parent.Set([]byte(key), cacheValue.value) + } else { + store.parent.Delete([]byte(key)) } } @@ -144,9 +137,6 @@ func (store *Store) Write() { for key := range store.cache { delete(store.cache, key) } - for key := range store.deleted { - delete(store.deleted, key) - } for key := range store.unsortedCache { delete(store.unsortedCache, key) } @@ -180,16 +170,24 @@ func (store *Store) iterator(start, end []byte, ascending bool) types.Iterator { store.mtx.Lock() defer store.mtx.Unlock() - var parent, cache types.Iterator + store.dirtyItems(start, end) + isoSortedCache := store.sortedCache.Copy() + + var ( + err error + parent, cache types.Iterator + ) if ascending { parent = store.parent.Iterator(start, end) + cache, err = isoSortedCache.Iterator(start, end) } else { parent = store.parent.ReverseIterator(start, end) + cache, err = isoSortedCache.ReverseIterator(start, end) + } + if err != nil { + panic(err) } - - store.dirtyItems(start, end) - cache = internal.NewMemIterator(start, end, store.sortedCache, store.deleted, ascending) return internal.NewCacheMergeIterator(parent, cache, ascending) } @@ -370,13 +368,7 @@ func (store *Store) clearUnsortedCacheSubset(unsorted []*kv.Pair, sortState sort } for _, item := range unsorted { - if item.Value == nil { - // deleted element, tracked by store.deleted - // setting arbitrary value - store.sortedCache.Set(item.Key, []byte{}) - continue - } - + // sortedCache is able to store `nil` value to represent deleted items. store.sortedCache.Set(item.Key, item.Value) } } @@ -385,23 +377,14 @@ func (store *Store) clearUnsortedCacheSubset(unsorted []*kv.Pair, sortState sort // etc // Only entrypoint to mutate store.cache. -func (store *Store) setCacheValue(key, value []byte, deleted bool, dirty bool) { +// A `nil` value means a deletion. +func (store *Store) setCacheValue(key, value []byte, dirty bool) { keyStr := conv.UnsafeBytesToStr(key) store.cache[keyStr] = &cValue{ value: value, dirty: dirty, } - if deleted { - store.deleted[keyStr] = struct{}{} - } else { - delete(store.deleted, keyStr) - } if dirty { store.unsortedCache[keyStr] = struct{}{} } } - -func (store *Store) isDeleted(key string) bool { - _, ok := store.deleted[key] - return ok -}