Skip to content

Commit

Permalink
improve snapshot performance (#3125)
Browse files Browse the repository at this point in the history
* improve snapshot performance

* delete clone interface of kv_cache

* unify clear logic

* improve miss hit

* fix tag usage

* address commit

* delete key if len is zero
  • Loading branch information
CoderZhi authored Mar 5, 2022
1 parent 0837c49 commit dd6e430
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 117 deletions.
86 changes: 63 additions & 23 deletions db/batch/batch_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ type (

// cachedBatch implements the CachedBatch interface
cachedBatch struct {
lock sync.RWMutex
KVStoreCache
lock sync.RWMutex
kvStoreBatch *baseKVStoreBatch
tag int // latest snapshot + 1
batchShots []int // snapshots of batch are merely size of write queue at time of snapshot
cacheShots []KVStoreCache // snapshots of cache
caches []KVStoreCache // snapshots of cache
keyTags map[hash.Hash160][]int
tagKeys [][]hash.Hash160
}
)

Expand Down Expand Up @@ -189,9 +190,10 @@ func (b *baseKVStoreBatch) truncate(size int) {
func NewCachedBatch() CachedBatch {
return &cachedBatch{
kvStoreBatch: newBaseKVStoreBatch(),
KVStoreCache: NewKVCache(),
batchShots: make([]int, 0),
cacheShots: make([]KVStoreCache, 0),
caches: []KVStoreCache{NewKVCache()},
keyTags: map[hash.Hash160][]int{},
tagKeys: [][]hash.Hash160{{}},
}
}

Expand Down Expand Up @@ -224,22 +226,44 @@ func (cb *cachedBatch) Unlock() {
// ClearAndUnlock clears the write queue and unlocks the batch
func (cb *cachedBatch) ClearAndUnlock() {
defer cb.lock.Unlock()
cb.KVStoreCache.Clear()
cb.clear()
}

func (cb *cachedBatch) currentCache() KVStoreCache {
return cb.caches[len(cb.caches)-1]
}

func (cb *cachedBatch) clear() {
cb.kvStoreBatch.Clear()
// clear all saved snapshots
cb.tag = 0
cb.batchShots = nil
cb.cacheShots = nil
cb.batchShots = make([]int, 0)
cb.cacheShots = make([]KVStoreCache, 0)
cb.caches = []KVStoreCache{NewKVCache()}
cb.keyTags = map[hash.Hash160][]int{}
cb.tagKeys = [][]hash.Hash160{{}}
}

func (cb *cachedBatch) touchKey(h hash.Hash160) {
tags, ok := cb.keyTags[h]
if !ok {
cb.keyTags[h] = []int{cb.tag}
cb.tagKeys[cb.tag] = append(cb.tagKeys[cb.tag], h)
return
}
if tags[len(tags)-1] != cb.tag {
cb.keyTags[h] = append(tags, cb.tag)
cb.tagKeys[cb.tag] = append(cb.tagKeys[cb.tag], h)
}
}

// Put inserts a <key, value> record
func (cb *cachedBatch) Put(namespace string, key, value []byte, errorFormat string, errorArgs ...interface{}) {
cb.lock.Lock()
defer cb.lock.Unlock()
h := cb.hash(namespace, key)
cb.Write(h, value)
cb.touchKey(h)
cb.currentCache().Write(h, value)
cb.kvStoreBatch.batch(Put, namespace, key, value, errorFormat, errorArgs)
}

Expand All @@ -248,30 +272,35 @@ func (cb *cachedBatch) Delete(namespace string, key []byte, errorFormat string,
cb.lock.Lock()
defer cb.lock.Unlock()
h := cb.hash(namespace, key)
cb.Evict(h)
cb.touchKey(h)
cb.currentCache().Evict(h)
cb.kvStoreBatch.batch(Delete, namespace, key, nil, errorFormat, errorArgs)
}

// Clear clear the cached batch buffer
func (cb *cachedBatch) Clear() {
cb.lock.Lock()
defer cb.lock.Unlock()
cb.KVStoreCache.Clear()
cb.kvStoreBatch.Clear()
// clear all saved snapshots
cb.tag = 0
cb.batchShots = nil
cb.cacheShots = nil
cb.batchShots = make([]int, 0)
cb.cacheShots = make([]KVStoreCache, 0)
cb.clear()
}

// Get retrieves a record
func (cb *cachedBatch) Get(namespace string, key []byte) ([]byte, error) {
cb.lock.RLock()
defer cb.lock.RUnlock()
h := cb.hash(namespace, key)
return cb.Read(h)
var v []byte
err := ErrNotExist
if tags, ok := cb.keyTags[h]; ok {
for i := len(tags) - 1; i >= 0; i-- {
v, err = cb.caches[tags[i]].Read(h)
if errors.Cause(err) == ErrNotExist {
continue
}
break
}
}
return v, err
}

// Snapshot takes a snapshot of current cached batch
Expand All @@ -281,7 +310,8 @@ func (cb *cachedBatch) Snapshot() int {
defer func() { cb.tag++ }()
// save a copy of current batch/cache
cb.batchShots = append(cb.batchShots, cb.kvStoreBatch.Size())
cb.cacheShots = append(cb.cacheShots, cb.KVStoreCache.Clone())
cb.caches = append(cb.caches, NewKVCache())
cb.tagKeys = append(cb.tagKeys, []hash.Hash160{})
return cb.tag
}

Expand All @@ -296,9 +326,19 @@ func (cb *cachedBatch) Revert(snapshot int) error {
cb.tag = snapshot + 1
cb.batchShots = cb.batchShots[:cb.tag]
cb.kvStoreBatch.truncate(cb.batchShots[snapshot])
cb.cacheShots = cb.cacheShots[:cb.tag]
cb.KVStoreCache = nil
cb.KVStoreCache = cb.cacheShots[snapshot]
cb.caches = cb.caches[:cb.tag+1]
cb.caches[cb.tag].Clear()
for tag := cb.tag; tag < len(cb.tagKeys); tag++ {
keys := cb.tagKeys[tag]
for _, key := range keys {
cb.keyTags[key] = cb.keyTags[key][:len(cb.keyTags[key])-1]
if len(cb.keyTags[key]) == 0 {
delete(cb.keyTags, key)
}
}
}
cb.tagKeys = cb.tagKeys[:cb.tag+1]
cb.tagKeys[cb.tag] = []hash.Hash160{}
return nil
}

Expand Down
34 changes: 34 additions & 0 deletions db/batch/batch_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,15 @@ func TestCachedBatch(t *testing.T) {
}
return wi
}).SerializeQueue(nil, nil)))
cb.Clear()
require.Equal(0, cb.Size())
}

func TestSnapshot(t *testing.T) {
require := require.New(t)

cb := NewCachedBatch()
cb.Clear()
cb.Put(bucket1, testK1[0], testV1[0], "")
cb.Put(bucket1, testK1[1], testV1[1], "")
s0 := cb.Snapshot()
Expand Down Expand Up @@ -187,9 +190,16 @@ func TestSnapshot(t *testing.T) {
v, err = cb.Get(bucket1, testK2[2])
require.NoError(err)
require.Equal(testV2[2], v)
cb.Put(bucket1, testK2[2], testV2[1], "")
v, err = cb.Get(bucket1, testK2[2])
require.NoError(err)
require.Equal(testV2[1], v)

// snapshot 1
require.NoError(cb.Revert(2))
v, err = cb.Get(bucket1, testK2[2])
require.NoError(err)
require.Equal(testV2[2], v)
require.NoError(cb.Revert(1))
_, err = cb.Get(bucket1, testK1[0])
require.Equal(ErrAlreadyDeleted, err)
Expand Down Expand Up @@ -241,3 +251,27 @@ func BenchmarkCachedBatch_Digest(b *testing.B) {
require.NotEqual(b, hash.ZeroHash256, h)
}
}

func BenchmarkCachedBatch_Snapshot(b *testing.B) {
cb := NewCachedBatch()

for i := 0; i < b.N; i++ {
k := hash.Hash256b([]byte(strconv.Itoa(i)))
var v [1024]byte
for i := range v {
v[i] = byte(rand.Intn(8))
}
cb.Put(bucket1, k[:], v[:], "")
value, err := cb.Get(bucket1, k[:])
require.NoError(b, err)
require.True(b, bytes.Equal(value, v[:]))
sn := cb.Snapshot()
cb.Delete(bucket1, k[:], "")
_, err = cb.Get(bucket1, k[:])
require.Error(b, err)
cb.Revert(sn)
value, err = cb.Get(bucket1, k[:])
require.NoError(b, err)
require.True(b, bytes.Equal(value, v[:]))
}
}
18 changes: 0 additions & 18 deletions db/batch/kv_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ type (
Evict(hash.Hash160)
// Clear clear the cache
Clear()
// Clone clones the cache
Clone() KVStoreCache
}

// kvCache implements KVStoreCache interface
Expand Down Expand Up @@ -83,19 +81,3 @@ func (c *kvCache) Clear() {
c.cache = make(map[hash.Hash160][]byte)
c.deleted = make(map[hash.Hash160]struct{})
}

// Clone clones the cache
func (c *kvCache) Clone() KVStoreCache {
clone := kvCache{
cache: make(map[hash.Hash160][]byte),
deleted: make(map[hash.Hash160]struct{}),
}
// clone entries in map
for k, v := range c.cache {
clone.cache[k] = v
}
for k, v := range c.deleted {
clone.deleted[k] = v
}
return &clone
}
76 changes: 0 additions & 76 deletions db/batch/kv_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,80 +96,4 @@ func TestKvCache(t *testing.T) {
v, err = c.Read(k3)
require.NoError(err)
require.Equal(v, v3)

// 10. clone - make a comparison between both
cc := c.Clone()
require.Equal(cc, c)

v, err = cc.Read(k1)
require.NoError(err)
require.Equal(v, v1)

v, err = cc.Read(k2)
require.NoError(err)
require.Equal(v, v2)

v, err = cc.Read(k3)
require.NoError(err)
require.Equal(v, v3)

// 11. clear one and the other one will stay unchanged
c.Clear()

v, err = c.Read(k1)
require.Equal(err, ErrNotExist)
require.Nil(v)

v, err = c.Read(k2)
require.Equal(err, ErrNotExist)
require.Nil(v)

v, err = c.Read(k3)
require.Equal(err, ErrNotExist)
require.Nil(v)

v, err = cc.Read(k1)
require.NoError(err)
require.Equal(v, v1)

v, err = cc.Read(k2)
require.NoError(err)
require.Equal(v, v2)

v, err = cc.Read(k3)
require.NoError(err)
require.Equal(v, v3)

require.NotEqual(cc, c)

// 12. clone - make different changes and compare again
c.Write(k1, v2)
c.Write(k2, v3)
c.Write(k3, v1)

v, err = c.Read(k1)
require.NoError(err)
require.Equal(v, v2)

v, err = c.Read(k2)
require.NoError(err)
require.Equal(v, v3)

v, err = c.Read(k3)
require.NoError(err)
require.Equal(v, v1)

v, err = cc.Read(k1)
require.NoError(err)
require.Equal(v, v1)

v, err = cc.Read(k2)
require.NoError(err)
require.Equal(v, v2)

v, err = cc.Read(k3)
require.NoError(err)
require.Equal(v, v3)

require.NotEqual(cc, c)
}

0 comments on commit dd6e430

Please sign in to comment.