diff --git a/store/CHANGELOG.md b/store/CHANGELOG.md index 6cf411505792..c1b260b592ea 100644 --- a/store/CHANGELOG.md +++ b/store/CHANGELOG.md @@ -34,6 +34,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ ### Improvements +* [#20817](https://github.com/cosmos/cosmos-sdk/pull/20817) Parallelize the `CacheMultiStore.Write` method. * [#19770](https://github.com/cosmos/cosmos-sdk/pull/19770) Upgrade IAVL to IAVL v1.1.1. ## v1.0.2 (January 10, 2024) diff --git a/store/cachemulti/benchmark_test.go b/store/cachemulti/benchmark_test.go new file mode 100644 index 000000000000..4e7aa88cc396 --- /dev/null +++ b/store/cachemulti/benchmark_test.go @@ -0,0 +1,70 @@ +package cachemulti + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + coretesting "cosmossdk.io/core/testing" + "cosmossdk.io/log" + dbm "cosmossdk.io/store/db" + "cosmossdk.io/store/iavl" + "cosmossdk.io/store/types" +) + +func setupStore(b *testing.B, storeCount uint) (Store, map[string]types.StoreKey) { + b.Helper() + + db := coretesting.NewMemDB() + storeKeys := make(map[string]types.StoreKey) + stores := make(map[types.StoreKey]types.CacheWrapper) + for i := uint(0); i < storeCount; i++ { + key := types.NewKVStoreKey(fmt.Sprintf("store%d", i)) + storeKeys[key.Name()] = key + sdb := dbm.NewPrefixDB(db, []byte(key.Name())) + istore, err := iavl.LoadStore(sdb, log.NewNopLogger(), key, types.CommitID{}, 1000, false, nil) + require.NoError(b, err) + stores[key] = types.KVStore(istore) + } + + return NewStore(db, stores, storeKeys, nil, types.TraceContext{}), storeKeys +} + +func benchmarkStore(b *testing.B, storeCount, runnerCount, keyCount uint) { + b.Helper() + store, storeKeys := setupStore(b, storeCount) + b.ResetTimer() + + b.ReportAllocs() + for i := 0; i < b.N; i++ { + b.StopTimer() + for _, key := range storeKeys { + cstore := store.GetKVStore(key) + for j := uint(0); j < keyCount; j++ { + dataKey := fmt.Sprintf("key%s-%d", key.Name(), j) + dataValue := fmt.Sprintf("value%s-%d", key.Name(), j) + cstore.Set([]byte(dataKey), []byte(dataValue)) + } + } + b.StartTimer() + err := store.writeStoresParallel(int(runnerCount)) + require.NoError(b, err) + } +} + +func BenchmarkCacheMultiStore(b *testing.B) { + storeCounts := []uint{2, 4, 8, 16, 32} + runnerCounts := []uint{1, 2, 4, 8, 16} + keyCounts := []uint{100, 1000, 10000} + + for _, storeCount := range storeCounts { + for _, keyCount := range keyCounts { + for _, runnerCount := range runnerCounts { + b.Run(fmt.Sprintf("storeCount=%d/runnerCount=%d/keyCount=%d/", storeCount, runnerCount, keyCount), func(sub *testing.B) { + benchmarkStore(sub, storeCount, runnerCount, keyCount) + }) + } + } + } +} diff --git a/store/cachemulti/store.go b/store/cachemulti/store.go index 42742f566ed9..959e837fe25f 100644 --- a/store/cachemulti/store.go +++ b/store/cachemulti/store.go @@ -1,8 +1,10 @@ package cachemulti import ( + "errors" "fmt" "io" + "sync" corestore "cosmossdk.io/core/store" "cosmossdk.io/store/cachekv" @@ -11,9 +13,14 @@ import ( "cosmossdk.io/store/types" ) -// storeNameCtxKey is the TraceContext metadata key that identifies -// the store which emitted a given trace. -const storeNameCtxKey = "store_name" +const ( + // storeNameCtxKey is the TraceContext metadata key that identifies + // the store which emitted a given trace. + storeNameCtxKey = "store_name" + // maxRunners is the maximum number of concurrent goroutines that + // can be used to write to the underlying stores in parallel. + maxRunners = 4 +) //---------------------------------------- // Store @@ -121,9 +128,46 @@ func (cms Store) GetStoreType() types.StoreType { // Write calls Write on each underlying store. func (cms Store) Write() { cms.db.Write() - for _, store := range cms.stores { - store.Write() + + if err := cms.writeStoresParallel(maxRunners); err != nil { + panic(err) + } +} + +func (cms Store) writeStoresParallel(runnerCount int) error { + sem := make(chan struct{}, runnerCount) // Semaphore to limit number of concurrent goroutines + errChan := make(chan error, len(cms.stores)) // Channel to collect errors from goroutines + var wg sync.WaitGroup + + for storeKey, store := range cms.stores { + wg.Add(1) + sem <- struct{}{} + + go func() { + defer func() { + wg.Done() + <-sem // Release the slot + + if r := recover(); r != nil { + errChan <- fmt.Errorf("panic in Write for store %s: %v", storeKey.Name(), r) + } + }() + store.Write() + }() + } + + go func() { + wg.Wait() + close(errChan) + }() + + // Collect errors from goroutines + var allErrors []error + for err := range errChan { + allErrors = append(allErrors, err) } + + return errors.Join(allErrors...) } // Implements CacheWrapper. diff --git a/store/rootmulti/store_test.go b/store/rootmulti/store_test.go index 31d24ed1205c..086f85a614bb 100644 --- a/store/rootmulti/store_test.go +++ b/store/rootmulti/store_test.go @@ -1031,3 +1031,37 @@ func TestCommitStores(t *testing.T) { }) } } + +func TestCacheMultiStoreWrite(t *testing.T) { + db := coretesting.NewMemDB() + ms := newMultiStoreWithMounts(db, pruningtypes.NewPruningOptions(pruningtypes.PruningNothing)) + require.NoError(t, ms.LoadLatestVersion()) + + cacheMulti := ms.CacheMultiStore() + + toVersion := int64(100) + keyCount := 100 + storeKeys := []types.StoreKey{testStoreKey1, testStoreKey2, testStoreKey3} + for i := int64(1); i <= toVersion; i++ { + for _, storeKey := range storeKeys { + store := cacheMulti.GetKVStore(storeKey) + for j := 0; j < keyCount; j++ { + store.Set([]byte(fmt.Sprintf("key-%d-%d", i, j)), []byte(fmt.Sprintf("value-%d-%d", i, j))) + } + } + cacheMulti.Write() + ms.Commit() + } + + // check the data + for _, storeKey := range storeKeys { + store := cacheMulti.GetKVStore(storeKey) + for i := int64(1); i <= toVersion; i++ { + for j := 0; j < keyCount; j++ { + key := []byte(fmt.Sprintf("key-%d-%d", i, j)) + value := store.Get(key) + require.Equal(t, []byte(fmt.Sprintf("value-%d-%d", i, j)), value) + } + } + } +}