From 1bc3aac987c712a4ba9c980625980c3b8d4982ab Mon Sep 17 00:00:00 2001 From: Torao Takami Date: Tue, 19 Jul 2022 17:09:51 +0900 Subject: [PATCH] Remove prefetch functionality (#596) * remove prefetch functionality and write by multi-threading * apply goimports * fix a few trivial diff --- baseapp/abci.go | 6 --- baseapp/grpcrouter.go | 11 ---- server/mock/store.go | 4 -- server/mock/store_test.go | 1 - simapp/ante_handler.go | 2 +- store/cachekv/store.go | 9 ---- store/cachekv/store_test.go | 1 - store/cachemulti/store.go | 21 +------- store/dbadapter/store.go | 9 ---- store/dbadapter/store_test.go | 2 - store/gaskv/store.go | 6 --- store/gaskv/store_test.go | 1 - store/iavl/metrics.go | 24 +++------ store/iavl/prefetch.go | 86 -------------------------------- store/iavl/store.go | 20 +------- store/iavl/store_test.go | 2 - store/iavl/tree.go | 1 - store/listenkv/store.go | 11 ---- store/prefix/store.go | 5 -- store/prefix/store_test.go | 1 - store/rootmulti/store.go | 85 +++++++------------------------ store/rootmulti/store_test.go | 12 ----- store/tracekv/store.go | 6 --- store/tracekv/store_test.go | 1 - store/types/store.go | 3 -- x/auth/ante/ante.go | 2 +- x/auth/ante/sigverify.go | 9 +--- x/auth/ante/sigverify_test.go | 2 +- x/auth/keeper/account.go | 6 --- x/auth/keeper/keeper.go | 2 - x/auth/keeper/keeper_test.go | 2 - x/auth/types/expected_keepers.go | 1 - x/bank/keeper/grpc_query.go | 9 ---- x/bank/keeper/view.go | 52 ------------------- x/bank/types/expected_keepers.go | 1 - x/feegrant/expected_keepers.go | 1 - x/gov/types/expected_keepers.go | 1 - 37 files changed, 31 insertions(+), 387 deletions(-) delete mode 100644 store/iavl/prefetch.go diff --git a/baseapp/abci.go b/baseapp/abci.go index b2f93696ef..78c3244fa1 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -21,8 +21,6 @@ import ( "github.com/line/lbm-sdk/telemetry" sdk "github.com/line/lbm-sdk/types" sdkerrors "github.com/line/lbm-sdk/types/errors" - - iavlstore "github.com/line/lbm-sdk/store/iavl" ) // InitChain implements the ABCI interface. It runs the initialization logic @@ -280,13 +278,11 @@ func (app *BaseApp) CheckTxAsync(req abci.RequestCheckTx, callback abci.CheckTxC func (app *BaseApp) BeginRecheckTx(req abci.RequestBeginRecheckTx) abci.ResponseBeginRecheckTx { // NOTE: This is safe because Ostracon holds a lock on the mempool for Rechecking. app.setCheckState(req.Header) - iavlstore.PausePrefetcher() return abci.ResponseBeginRecheckTx{Code: abci.CodeTypeOK} } // EndRecheckTx implements the ABCI interface. func (app *BaseApp) EndRecheckTx(req abci.RequestEndRecheckTx) abci.ResponseEndRecheckTx { - iavlstore.ResumePrefetcher() return abci.ResponseEndRecheckTx{Code: abci.CodeTypeOK} } @@ -344,9 +340,7 @@ func (app *BaseApp) Commit() (res abci.ResponseCommit) { // The write to the DeliverTx state writes all state transitions to the root // MultiStore (app.cms) so when Commit() is called is persists those values. app.deliverState.ms.Write() - iavlstore.PausePrefetcher() commitID := app.cms.Commit() - iavlstore.ResumePrefetcher() app.logger.Info("commit synced", "commit", fmt.Sprintf("%X", commitID)) // iavl, db & disk stats diff --git a/baseapp/grpcrouter.go b/baseapp/grpcrouter.go index 8804633aaf..c0d43a23a2 100644 --- a/baseapp/grpcrouter.go +++ b/baseapp/grpcrouter.go @@ -2,7 +2,6 @@ package baseapp import ( "fmt" - "sync" gogogrpc "github.com/gogo/protobuf/grpc" "google.golang.org/grpc" @@ -20,7 +19,6 @@ var protoCodec = encoding.GetCodec(proto.Name) // GRPCQueryRouter routes ABCI Query requests to GRPC handlers type GRPCQueryRouter struct { - lck sync.Mutex routes map[string]GRPCQueryHandler interfaceRegistry codectypes.InterfaceRegistry serviceData []serviceData @@ -37,7 +35,6 @@ var _ gogogrpc.Server = &GRPCQueryRouter{} // NewGRPCQueryRouter creates a new GRPCQueryRouter func NewGRPCQueryRouter() *GRPCQueryRouter { return &GRPCQueryRouter{ - lck: sync.Mutex{}, routes: map[string]GRPCQueryHandler{}, } } @@ -49,8 +46,6 @@ type GRPCQueryHandler = func(ctx sdk.Context, req abci.RequestQuery) (abci.Respo // Route returns the GRPCQueryHandler for a given query route path or nil // if not found func (qrt *GRPCQueryRouter) Route(path string) GRPCQueryHandler { - qrt.lck.Lock() - defer qrt.lck.Unlock() handler, found := qrt.routes[path] if !found { return nil @@ -64,9 +59,6 @@ func (qrt *GRPCQueryRouter) Route(path string) GRPCQueryHandler { // This functions PANICS: // - if a protobuf service is registered twice. func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interface{}) { - qrt.lck.Lock() - defer qrt.lck.Unlock() - // adds a top-level query handler based on the gRPC service name for _, method := range sd.Methods { fqName := fmt.Sprintf("/%s/%s", sd.ServiceName, method.MethodName) @@ -127,9 +119,6 @@ func (qrt *GRPCQueryRouter) RegisterService(sd *grpc.ServiceDesc, handler interf // SetInterfaceRegistry sets the interface registry for the router. This will // also register the interface reflection gRPC service. func (qrt *GRPCQueryRouter) SetInterfaceRegistry(interfaceRegistry codectypes.InterfaceRegistry) { - // qrt.lck.Lock() - // defer qrt.lck.Unlock() - qrt.interfaceRegistry = interfaceRegistry // Once we have an interface registry, we can register the interface // registry reflection gRPC service. diff --git a/server/mock/store.go b/server/mock/store.go index 3022b2bb19..3794bab938 100644 --- a/server/mock/store.go +++ b/server/mock/store.go @@ -180,10 +180,6 @@ func (kv kvStore) Delete(key []byte) { delete(kv.store, string(key)) } -func (kv kvStore) Prefetch(key []byte, forSet bool) (hits, misses int, value []byte) { - return 0, 0, nil -} - func (kv kvStore) Prefix(prefix []byte) sdk.KVStore { panic("not implemented") } diff --git a/server/mock/store_test.go b/server/mock/store_test.go index 538ba5addf..7a40eaa59d 100644 --- a/server/mock/store_test.go +++ b/server/mock/store_test.go @@ -27,7 +27,6 @@ func TestStore(t *testing.T) { require.False(t, store.Has(k)) store.Set(k, v) require.True(t, store.Has(k)) - store.Prefetch(k, false) require.Equal(t, v, store.Get(k)) store.Delete(k) require.False(t, store.Has(k)) diff --git a/simapp/ante_handler.go b/simapp/ante_handler.go index 525431f174..562291e857 100644 --- a/simapp/ante_handler.go +++ b/simapp/ante_handler.go @@ -31,7 +31,7 @@ func NewAnteHandler( // ante.NewDeductFeeDecorator(ak, bankKeeper), ante.NewSigGasConsumeDecorator(ak, sigGasConsumer), ante.NewSigVerificationDecorator(ak, signModeHandler), - ante.NewIncrementSequenceDecorator(ak, bankKeeper), + ante.NewIncrementSequenceDecorator(ak), ibcante.NewAnteDecorator(channelKeeper), ) } diff --git a/store/cachekv/store.go b/store/cachekv/store.go index bab801d60f..40e3210335 100644 --- a/store/cachekv/store.go +++ b/store/cachekv/store.go @@ -94,15 +94,6 @@ func (store *Store) Delete(key []byte) { store.setCacheValue(key, nil, true, true) } -// Prefetch implements types.KVStore. -func (store *Store) Prefetch(key []byte, forSet bool) (hits, misses int, value []byte) { - defer telemetry.MeasureSince(time.Now(), "store", "cachekv", "prefetch") - - // do not update cache - types.AssertValidKey(key) - return store.parent.Prefetch(key, forSet) -} - // Implements Cachetypes.KVStore. func (store *Store) Write() { store.mtx.Lock() diff --git a/store/cachekv/store_test.go b/store/cachekv/store_test.go index 041e203a9b..0e6a4ec5b6 100644 --- a/store/cachekv/store_test.go +++ b/store/cachekv/store_test.go @@ -33,7 +33,6 @@ func TestCacheKVStore(t *testing.T) { // put something in mem and in cache mem.Set(keyFmt(1), valFmt(1)) st.Set(keyFmt(1), valFmt(1)) - st.Prefetch(keyFmt(1), false) require.Equal(t, valFmt(1), st.Get(keyFmt(1))) // update it in cache, shoudn't change mem diff --git a/store/cachemulti/store.go b/store/cachemulti/store.go index 6097d8a42f..5b88cfdd13 100644 --- a/store/cachemulti/store.go +++ b/store/cachemulti/store.go @@ -3,7 +3,6 @@ package cachemulti import ( "fmt" "io" - "sync" tmdb "github.com/line/tm-db/v2" @@ -138,26 +137,8 @@ func (cms Store) GetStoreType() types.StoreType { // Write calls Write on each underlying store. func (cms Store) Write() { cms.db.Write() - var wg sync.WaitGroup - var panicMsg interface{} for _, store := range cms.stores { - wg.Add(1) - go func(s types.CacheWrap) { - defer func() { - if msg := recover(); msg != nil { - if panicMsg == nil { - panicMsg = msg - } - } - wg.Done() - }() - - s.Write() - }(store) - } - wg.Wait() - if panicMsg != nil { - panic(panicMsg) + store.Write() } } diff --git a/store/dbadapter/store.go b/store/dbadapter/store.go index 7db4c9366d..c9efbc57bf 100644 --- a/store/dbadapter/store.go +++ b/store/dbadapter/store.go @@ -51,15 +51,6 @@ func (dsa Store) Delete(key []byte) { } } -// Prefetch wraps the underlying DB's Get method panicing on error. -func (dsa Store) Prefetch(key []byte, forSet bool) (hits, misses int, value []byte) { - v, err := dsa.DB.Get(key) - if err != nil { - return 0, 0, nil - } - return 1, 1, v -} - // Iterator wraps the underlying DB's Iterator method panicing on error. func (dsa Store) Iterator(start, end []byte) types.Iterator { iter, err := dsa.DB.Iterator(start, end) diff --git a/store/dbadapter/store_test.go b/store/dbadapter/store_test.go index 540714201a..16bca79416 100644 --- a/store/dbadapter/store_test.go +++ b/store/dbadapter/store_test.go @@ -35,8 +35,6 @@ func TestAccessors(t *testing.T) { retFoo := []byte("xxx") mockDB.EXPECT().Get(gomock.Eq(key)).Times(1).Return(retFoo, nil) require.True(t, bytes.Equal(retFoo, store.Get(key))) - mockDB.EXPECT().Get(gomock.Eq(key)).Times(1).Return(retFoo, nil) - store.Prefetch(key, false) mockDB.EXPECT().Get(gomock.Eq(key)).Times(1).Return(nil, errFoo) require.Panics(t, func() { store.Get(key) }) diff --git a/store/gaskv/store.go b/store/gaskv/store.go index 2c03aac482..2c273bdcdf 100644 --- a/store/gaskv/store.go +++ b/store/gaskv/store.go @@ -71,12 +71,6 @@ func (gs *Store) Delete(key []byte) { gs.parent.Delete(key) } -// Implements KVStore. -func (gs *Store) Prefetch(key []byte, forSet bool) (hits, misses int, value []byte) { - defer telemetry.MeasureSince(time.Now(), "store", "gaskv", "load") - return gs.parent.Prefetch(key, forSet) -} - // Iterator implements the KVStore interface. It returns an iterator which // incurs a flat gas cost for seeking to the first key/value pair and a variable // gas cost based on the current value's length if the iterator is valid. diff --git a/store/gaskv/store_test.go b/store/gaskv/store_test.go index addf187e89..df1842bb76 100644 --- a/store/gaskv/store_test.go +++ b/store/gaskv/store_test.go @@ -32,7 +32,6 @@ func TestGasKVStoreBasic(t *testing.T) { require.Panics(t, func() { st.Set([]byte(""), []byte("value")) }, "setting an empty key should panic") require.Empty(t, st.Get(keyFmt(1)), "Expected `key1` to be empty") - st.Prefetch(keyFmt(1), false) st.Set(keyFmt(1), valFmt(1)) require.Equal(t, valFmt(1), st.Get(keyFmt(1))) st.Delete(keyFmt(1)) diff --git a/store/iavl/metrics.go b/store/iavl/metrics.go index 8100b6df33..d594e0f443 100644 --- a/store/iavl/metrics.go +++ b/store/iavl/metrics.go @@ -15,11 +15,10 @@ const ( // Metrics contains metrics exposed by this package. type Metrics struct { - IAVLCacheHits metrics.Gauge - IAVLCacheMisses metrics.Gauge - IAVLCacheEntries metrics.Gauge - IAVLCacheBytes metrics.Gauge - IAVLCachePeakBytes metrics.Gauge + IAVLCacheHits metrics.Gauge + IAVLCacheMisses metrics.Gauge + IAVLCacheEntries metrics.Gauge + IAVLCacheBytes metrics.Gauge } // PrometheusMetrics returns Metrics build using Prometheus client library. @@ -55,23 +54,16 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { Name: "bytes_size", Help: "Cache bytes size of the iavl cache", }, labels).With(labelsAndValues...), - IAVLCachePeakBytes: prometheus.NewGaugeFrom(stdprometheus.GaugeOpts{ - Namespace: namespace, - Subsystem: MetricsSubsystem, - Name: "peak_bytes_size", - Help: "Peak cache bytes size of the iavl cache", - }, labels).With(labelsAndValues...), } } // NopMetrics returns no-op Metrics. func NopMetrics() *Metrics { return &Metrics{ - IAVLCacheHits: discard.NewGauge(), - IAVLCacheMisses: discard.NewGauge(), - IAVLCacheEntries: discard.NewGauge(), - IAVLCacheBytes: discard.NewGauge(), - IAVLCachePeakBytes: discard.NewGauge(), + IAVLCacheHits: discard.NewGauge(), + IAVLCacheMisses: discard.NewGauge(), + IAVLCacheEntries: discard.NewGauge(), + IAVLCacheBytes: discard.NewGauge(), } } diff --git a/store/iavl/prefetch.go b/store/iavl/prefetch.go deleted file mode 100644 index 4cef6edd36..0000000000 --- a/store/iavl/prefetch.go +++ /dev/null @@ -1,86 +0,0 @@ -package iavl - -import ( - "runtime" - "sync/atomic" - "time" - - "github.com/line/lbm-sdk/telemetry" -) - -var ( - usePrefetch = 0 - prefetchCommiters int64 - prefetchJobs chan func() - prefetchLocks chan bool - prefetchToken chan bool - - prefetchJobsSize = 100000 // should be pending queue * 4 -) - -func StartPrefetch() { - usePrefetch = 1 -} - -func StopPrefetch() { - usePrefetch = -1 -} - -func PausePrefetcher() { - if atomic.AddInt64(&prefetchCommiters, 1) == 1 { - prefetchToken <- true - } -} - -func ResumePrefetcher() { - if atomic.AddInt64(&prefetchCommiters, -1) == 0 { - <-prefetchToken - } -} - -func prefetcher() { - workers := runtime.NumCPU() / 4 - if workers < 4 { - workers = 4 - } - - prefetchJobs = make(chan func(), prefetchJobsSize) - prefetchLocks = make(chan bool, workers) - prefetchToken = make(chan bool, 1) - - go func() { - for { - f := <-prefetchJobs - if len(prefetchToken) != 0 { - prefetchToken <- true - <-prefetchToken - } - prefetchLocks <- true - go func(f func()) { - f() - <-prefetchLocks - }(f) - } - }() -} - -// Implements type.KVStore. -func (st *Store) Prefetch(key []byte, forSet bool) (hits, misses int, value []byte) { - if usePrefetch != 1 { - return - } - defer telemetry.MeasureSince(time.Now(), "store", "iavl", "load") - select { - case prefetchJobs <- func() { - defer func() { - // ignore panic - recover() - }() - st.tree.Prefetch(key, forSet) - }: - // good - default: - // drop this request - } - return -} diff --git a/store/iavl/store.go b/store/iavl/store.go index 30cce50ab3..0b27bcab49 100644 --- a/store/iavl/store.go +++ b/store/iavl/store.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "io" - "os" "time" ics23 "github.com/confio/ics23/go" @@ -55,17 +54,13 @@ func (cms *CacheManagerSingleton) GetCache() types.Cache { func NewCacheManagerSingleton(cacheSize int, provider MetricsProvider) types.CacheManager { cm := &CacheManagerSingleton{ - cache: NewFastCache(cacheSize), - // cache: NewRistrettoCache(cacheSize), - // cache: NewFreeCache(cacheSize), + cache: NewFastCache(cacheSize), metrics: provider(), } startCacheMetricUpdator(cm.cache, cm.metrics) return cm } -var peakCacheBytes uint64 - func startCacheMetricUpdator(cache types.Cache, metrics *Metrics) { // Execution time of `fastcache.UpdateStats()` can increase linearly as cache entries grows // So we update the metrics with a separate go route. @@ -76,10 +71,6 @@ func startCacheMetricUpdator(cache types.Cache, metrics *Metrics) { metrics.IAVLCacheMisses.Set(float64(misses)) metrics.IAVLCacheEntries.Set(float64(entries)) metrics.IAVLCacheBytes.Set(float64(bytes)) - if bytes > peakCacheBytes { - peakCacheBytes = bytes - metrics.IAVLCachePeakBytes.Set(float64(bytes)) - } time.Sleep(10 * time.Second) } }() @@ -258,15 +249,6 @@ func (st *Store) Delete(key []byte) { st.tree.Remove(key) } -func init() { - if os.Getenv("USE_PREFETCH") != "NO" { - usePrefetch = 1 - } else { - usePrefetch = -1 - } - prefetcher() -} - // DeleteVersions deletes a series of versions from the MutableTree. An error // is returned if any single version is invalid or the delete fails. All writes // happen in a single batch with a single commit. diff --git a/store/iavl/store_test.go b/store/iavl/store_test.go index a78e2efbd6..90df384295 100644 --- a/store/iavl/store_test.go +++ b/store/iavl/store_test.go @@ -81,8 +81,6 @@ func TestLoadStore(t *testing.T) { hStore, err := store.GetImmutable(verH) require.NoError(t, err) require.Equal(t, string(hStore.Get([]byte("hello"))), "hallo") - hStore.Prefetch([]byte("hello"), false) - hStore.Prefetch([]byte("hello"), true) // Querying an existing store at some previous pruned height Hp hpStore, err := store.GetImmutable(verHp) diff --git a/store/iavl/tree.go b/store/iavl/tree.go index 75dea2ba0e..214f4cc255 100644 --- a/store/iavl/tree.go +++ b/store/iavl/tree.go @@ -31,7 +31,6 @@ type ( GetVersionedWithProof(key []byte, version int64) ([]byte, *iavl.RangeProof, error) GetImmutable(version int64) (*iavl.ImmutableTree, error) SetInitialVersion(version uint64) - Prefetch(key []byte, forSet bool) (hits, misses int, value []byte) } // immutableTree is a simple wrapper around a reference to an iavl.ImmutableTree diff --git a/store/listenkv/store.go b/store/listenkv/store.go index cb9e590bb2..4fc829b339 100644 --- a/store/listenkv/store.go +++ b/store/listenkv/store.go @@ -2,10 +2,8 @@ package listenkv import ( "io" - "time" "github.com/line/lbm-sdk/store/types" - "github.com/line/lbm-sdk/telemetry" ) var _ types.KVStore = &Store{} @@ -47,15 +45,6 @@ func (s *Store) Delete(key []byte) { s.onWrite(true, key, nil) } -// Prefetch implements types.KVStore. -func (s *Store) Prefetch(key []byte, forSet bool) (hits, misses int, value []byte) { - defer telemetry.MeasureSince(time.Now(), "store", "cachekv", "prefetch") - - // do not update cache - types.AssertValidKey(key) - return s.parent.Prefetch(key, forSet) -} - // Has implements the KVStore interface. It delegates the Has call to the // parent KVStore. func (s *Store) Has(key []byte) bool { diff --git a/store/prefix/store.go b/store/prefix/store.go index f67319dcd7..36744517e5 100644 --- a/store/prefix/store.go +++ b/store/prefix/store.go @@ -86,11 +86,6 @@ func (s Store) Delete(key []byte) { s.parent.Delete(s.key(key)) } -// Implements KVStore -func (s Store) Prefetch(key []byte, forSet bool) (hits, misses int, value []byte) { - return s.parent.Prefetch(s.key(key), forSet) -} - // Implements KVStore // Check https://github.com/tendermint/tendermint/blob/master/libs/db/prefix_db.go#L106 func (s Store) Iterator(start, end []byte) types.Iterator { diff --git a/store/prefix/store_test.go b/store/prefix/store_test.go index 9ef6378321..72f1551448 100644 --- a/store/prefix/store_test.go +++ b/store/prefix/store_test.go @@ -65,7 +65,6 @@ func testPrefixStore(t *testing.T, baseStore types.KVStore, prefix []byte) { key := kvps[i].key value := kvps[i].value require.True(t, prefixPrefixStore.Has(key)) - prefixPrefixStore.Prefetch(key, false) require.Equal(t, value, prefixPrefixStore.Get(key)) key = append([]byte("prefix"), key...) diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index 4bc4b34e03..3a50320981 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -9,7 +9,6 @@ import ( "math" "sort" "strings" - "sync" protoio "github.com/gogo/protobuf/io" gogotypes "github.com/gogo/protobuf/types" @@ -64,9 +63,6 @@ type Store struct { interBlockCache types.MultiStorePersistentCache iavlCacheManager types.CacheManager - pruneLock *sync.Mutex - prunePass chan bool - listeners map[types.StoreKey][]types.WriteListener } @@ -87,8 +83,6 @@ func NewStore(db tmdb.DB) *Store { stores: make(map[types.StoreKey]types.CommitKVStore), keysByName: make(map[string]types.StoreKey), pruneHeights: make([]int64, 0), - pruneLock: &sync.Mutex{}, - prunePass: make(chan bool, 1), listeners: make(map[types.StoreKey][]types.WriteListener), iavlCacheSize: iavl.DefaultIAVLCacheSize, } @@ -407,21 +401,16 @@ func (rs *Store) Commit() types.CommitID { // - KeepEvery % (height - KeepRecent) != 0 as that means the height is not // a 'snapshot' height. if rs.pruningOpts.KeepEvery == 0 || pruneHeight%int64(rs.pruningOpts.KeepEvery) != 0 { - rs.pruneLock.Lock() rs.pruneHeights = append(rs.pruneHeights, pruneHeight) - rs.pruneLock.Unlock() } } // batch prune if the current height is a pruning interval height if rs.pruningOpts.Interval > 0 && version%int64(rs.pruningOpts.Interval) == 0 { - go rs.pruneStores() + rs.pruneStores() } - rs.pruneLock.Lock() - pruneHeights := rs.pruneHeights - rs.pruneLock.Unlock() - flushMetadata(rs.db, version, rs.lastCommitInfo, pruneHeights) + flushMetadata(rs.db, version, rs.lastCommitInfo, rs.pruneHeights) return types.CommitID{ Version: version, @@ -429,32 +418,10 @@ func (rs *Store) Commit() types.CommitID { } } -func (rs *Store) prunePassEnter() bool { - select { - case rs.prunePass <- true: - return true - default: - return false - } -} - -func (rs *Store) prunePassLeave() { - <-rs.prunePass -} - // pruneStores will batch delete a list of heights from each mounted sub-store. // Afterwards, pruneHeights is reset. func (rs *Store) pruneStores() { - if !rs.prunePassEnter() { - return - } - defer rs.prunePassLeave() - - rs.pruneLock.Lock() - pruneHeights := make([]int64, len(rs.pruneHeights)) - copy(pruneHeights, rs.pruneHeights) - rs.pruneLock.Unlock() - if len(pruneHeights) == 0 { + if len(rs.pruneHeights) == 0 { return } @@ -464,7 +431,7 @@ func (rs *Store) pruneStores() { // it to get the underlying IAVL store. store = rs.GetCommitKVStore(key) - if err := store.(*iavl.Store).DeleteVersions(pruneHeights...); err != nil { + if err := store.(*iavl.Store).DeleteVersions(rs.pruneHeights...); err != nil { if err == iavltree.ErrBusy { return } @@ -475,21 +442,7 @@ func (rs *Store) pruneStores() { } } - // remove pruned heights from rs.pruneHeights - // new rs.pruneHeights will be updated by next flushMetaData cycle - rs.pruneLock.Lock() - rph := map[int64]bool{} - for _, h := range pruneHeights { - rph[h] = true - } - nph := make([]int64, 0, len(rs.pruneHeights)) - for _, h := range rs.pruneHeights { - if _, ok := rph[h]; !ok { - nph = append(nph, h) - } - } - rs.pruneHeights = nph - rs.pruneLock.Unlock() + rs.pruneHeights = make([]int64, 0) } // CacheWrap implements CacheWrapper/Store/CommitStore. @@ -1019,24 +972,20 @@ func getLatestVersion(db tmdb.DB) int64 { // Commits each store and returns a new commitInfo. func commitStores(version int64, storeMap map[types.StoreKey]types.CommitKVStore) *types.CommitInfo { - storeInfos := make([]types.StoreInfo, len(storeMap)) + storeInfos := make([]types.StoreInfo, 0, len(storeMap)) - var wg sync.WaitGroup - ix := 0 for key, store := range storeMap { - wg.Add(1) - go func(i int, k types.StoreKey, s types.CommitKVStore) { - commitID := s.Commit() - - si := types.StoreInfo{} - si.Name = k.Name() - si.CommitId = commitID - storeInfos[i] = si - wg.Done() - }(ix, key, store) - ix++ - } - wg.Wait() + commitID := store.Commit() + + if store.GetStoreType() == types.StoreTypeTransient { + continue + } + + si := types.StoreInfo{} + si.Name = key.Name() + si.CommitId = commitID + storeInfos = append(storeInfos, si) + } return &types.CommitInfo{ Version: version, diff --git a/store/rootmulti/store_test.go b/store/rootmulti/store_test.go index f2fcd51227..4744cbd77b 100644 --- a/store/rootmulti/store_test.go +++ b/store/rootmulti/store_test.go @@ -11,12 +11,10 @@ import ( "io/ioutil" "math/rand" "testing" - "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - iavltree "github.com/line/iavl/v2" abci "github.com/line/ostracon/abci/types" tmdb "github.com/line/tm-db/v2" "github.com/line/tm-db/v2/memdb" @@ -108,8 +106,6 @@ func TestCacheMultiStoreWithVersion(t *testing.T) { kvStore := cms.GetKVStore(ms.keysByName["store1"]) require.NotNil(t, kvStore) require.Equal(t, kvStore.Get(k), v) - kvStore.Prefetch(k, true) - kvStore.Prefetch(k, false) // require we cannot commit (write) to a cache-versioned multi-store require.Panics(t, func() { @@ -559,11 +555,7 @@ func TestMultiStore_PruningRestart(t *testing.T) { // commit one more block and ensure the heights have been pruned ms.Commit() - // pruning is background job, sleeps for a few seconds for the pruning to finish - time.Sleep(5 * time.Second) - ms.pruneLock.Lock() require.Empty(t, ms.pruneHeights) - ms.pruneLock.Unlock() for _, v := range pruneHeights { _, err := ms.CacheMultiStoreWithVersion(v) @@ -1056,7 +1048,3 @@ func hashStores(stores map[types.StoreKey]types.CommitKVStore) []byte { } return sdkmaps.HashFromMap(m) } - -func init() { - iavltree.PruningThreshold = 1 << 20 -} diff --git a/store/tracekv/store.go b/store/tracekv/store.go index e43c11130c..9cd5d4f32f 100644 --- a/store/tracekv/store.go +++ b/store/tracekv/store.go @@ -78,12 +78,6 @@ func (tkv *Store) Has(key []byte) bool { return tkv.parent.Has(key) } -// Prefetch implements the KVStore interface. It traces a read operation and -// delegates a Get call to the parent KVStore. -func (tkv *Store) Prefetch(key []byte, forSet bool) (hits, misses int, value []byte) { - return tkv.parent.Prefetch(key, forSet) -} - // Iterator implements the KVStore interface. It delegates the Iterator call // the to the parent KVStore. func (tkv *Store) Iterator(start, end []byte) types.Iterator { diff --git a/store/tracekv/store_test.go b/store/tracekv/store_test.go index a4b5cae8a9..9bbd682850 100644 --- a/store/tracekv/store_test.go +++ b/store/tracekv/store_test.go @@ -68,7 +68,6 @@ func TestTraceKVStoreGet(t *testing.T) { store := newTraceKVStore(&buf) buf.Reset() value := store.Get(tc.key) - store.Prefetch(tc.key, false) require.Equal(t, tc.expectedValue, value) require.Equal(t, tc.expectedOut, buf.String()) diff --git a/store/types/store.go b/store/types/store.go index fe2a476ec1..e93520f42b 100644 --- a/store/types/store.go +++ b/store/types/store.go @@ -228,9 +228,6 @@ type KVStore interface { // Delete deletes the key. Panics on nil key. Delete(key []byte) - // Prefetch fetches the key'ed object, filling ibc & iavl cache along the way. - Prefetch(key []byte, forSet bool) (hits, misses int, value []byte) - // Iterator over a domain of keys in ascending order. End is exclusive. // Start must be less than end, or the Iterator is invalid. // Iterator must be closed by caller. diff --git a/x/auth/ante/ante.go b/x/auth/ante/ante.go index b35003b09a..2b45934605 100644 --- a/x/auth/ante/ante.go +++ b/x/auth/ante/ante.go @@ -51,7 +51,7 @@ func NewAnteHandler(options HandlerOptions) (sdk.AnteHandler, error) { NewValidateSigCountDecorator(options.AccountKeeper), NewSigGasConsumeDecorator(options.AccountKeeper, sigGasConsumer), NewSigVerificationDecorator(options.AccountKeeper, options.SignModeHandler), - NewIncrementSequenceDecorator(options.AccountKeeper, options.BankKeeper), + NewIncrementSequenceDecorator(options.AccountKeeper), } return sdk.ChainAnteDecorators(anteDecorators...), nil diff --git a/x/auth/ante/sigverify.go b/x/auth/ante/sigverify.go index d3ba8712f5..b7744ae0e3 100644 --- a/x/auth/ante/sigverify.go +++ b/x/auth/ante/sigverify.go @@ -394,13 +394,11 @@ func (svd *SigVerificationDecorator) checkCache(sigKey string, txHash []byte) (v // client. It is recommended to instead use multiple messages in a tx. type IncrementSequenceDecorator struct { ak AccountKeeper - bk types.BankKeeper } -func NewIncrementSequenceDecorator(ak AccountKeeper, bk types.BankKeeper) IncrementSequenceDecorator { +func NewIncrementSequenceDecorator(ak AccountKeeper) IncrementSequenceDecorator { return IncrementSequenceDecorator{ ak: ak, - bk: bk, } } @@ -420,11 +418,6 @@ func (isd IncrementSequenceDecorator) AnteHandle(ctx sdk.Context, tx sdk.Tx, sim isd.ak.SetAccount(ctx, acc) } - // prefetching - if ctx.IsCheckTx() { - isd.bk.Prefetch(ctx, tx) - } - return next(ctx, tx, simulate) } diff --git a/x/auth/ante/sigverify_test.go b/x/auth/ante/sigverify_test.go index 657993cd60..ac4b776f10 100644 --- a/x/auth/ante/sigverify_test.go +++ b/x/auth/ante/sigverify_test.go @@ -367,7 +367,7 @@ func (suite *AnteTestSuite) TestIncrementSequenceDecorator() { tx, err := suite.CreateTestTx(privs, accNums, accSeqs, suite.ctx.ChainID()) suite.Require().NoError(err) - isd := ante.NewIncrementSequenceDecorator(suite.app.AccountKeeper, suite.app.BankKeeper) + isd := ante.NewIncrementSequenceDecorator(suite.app.AccountKeeper) antehandler := sdk.ChainAnteDecorators(isd) testCases := []struct { diff --git a/x/auth/keeper/account.go b/x/auth/keeper/account.go index 9f2e558555..0b42acd43b 100644 --- a/x/auth/keeper/account.go +++ b/x/auth/keeper/account.go @@ -42,12 +42,6 @@ func (ak AccountKeeper) GetAccount(ctx sdk.Context, addr sdk.AccAddress) types.A return ak.decodeAccount(bz) } -// Prefetch implements AccountKeeperI. -func (ak AccountKeeper) Prefetch(ctx sdk.Context, addr sdk.AccAddress, forSet bool) { - store := ctx.KVStore(ak.key) - store.Prefetch(types.AddressStoreKey(addr), forSet) -} - // GetAllAccounts returns all accounts in the accountKeeper. func (ak AccountKeeper) GetAllAccounts(ctx sdk.Context) (accounts []types.AccountI) { ak.IterateAccounts(ctx, func(acc types.AccountI) (stop bool) { diff --git a/x/auth/keeper/keeper.go b/x/auth/keeper/keeper.go index cad4379cbe..b7e2fb56e2 100644 --- a/x/auth/keeper/keeper.go +++ b/x/auth/keeper/keeper.go @@ -43,8 +43,6 @@ type AccountKeeperI interface { // Fetch the sequence of an account at a specified address. GetSequence(sdk.Context, sdk.AccAddress) (uint64, error) - // Prefetch an account, i.e. pre-fetch - Prefetch(sdk.Context, sdk.AccAddress, bool) // Fetch the next account number, and increment the internal counter. GetNextAccountNumber(sdk.Context) uint64 } diff --git a/x/auth/keeper/keeper_test.go b/x/auth/keeper/keeper_test.go index 7a4f2d5d6a..10dc961ce2 100644 --- a/x/auth/keeper/keeper_test.go +++ b/x/auth/keeper/keeper_test.go @@ -61,8 +61,6 @@ func TestAccountMapperGetSet(t *testing.T) { require.EqualValues(t, 0, acc.GetSequence()) // NewAccount doesn't call Set, so it's still nil - app.AccountKeeper.Prefetch(ctx, addr, true) - app.AccountKeeper.Prefetch(ctx, addr, false) require.Nil(t, app.AccountKeeper.GetAccount(ctx, addr)) // set some values on the account and save it diff --git a/x/auth/types/expected_keepers.go b/x/auth/types/expected_keepers.go index dcf00106e9..dd1fd5d9a2 100644 --- a/x/auth/types/expected_keepers.go +++ b/x/auth/types/expected_keepers.go @@ -7,5 +7,4 @@ import ( // BankKeeper defines the contract needed for supply related APIs (noalias) type BankKeeper interface { SendCoinsFromAccountToModule(ctx sdk.Context, senderAddr sdk.AccAddress, recipientModule string, amt sdk.Coins) error - Prefetch(ctx sdk.Context, tx sdk.Tx) } diff --git a/x/bank/keeper/grpc_query.go b/x/bank/keeper/grpc_query.go index 078719f8f4..ac86873d64 100644 --- a/x/bank/keeper/grpc_query.go +++ b/x/bank/keeper/grpc_query.go @@ -34,15 +34,6 @@ func (k BaseKeeper) Balance(ctx context.Context, req *types.QueryBalanceRequest) sdkCtx := sdk.UnwrapSDKContext(ctx) balance := k.GetBalance(sdkCtx, sdk.AccAddress(req.Address), req.Denom) - if balance.Amount.Int64() == 0 { - go func() { - store := sdkCtx.KVStore(k.storeKey) - balancesStore := prefix.NewStore(store, types.BalancesPrefix) - accountStore := prefix.NewStore(balancesStore, AddressToPrefixKey(sdk.AccAddress(req.Address))) - k.ak.Prefetch(sdkCtx, sdk.AccAddress(req.Address), true) - accountStore.Prefetch([]byte(req.Denom), true) - }() - } return &types.QueryBalanceResponse{Balance: &balance}, nil } diff --git a/x/bank/keeper/view.go b/x/bank/keeper/view.go index cc76144840..8bffb5e75e 100644 --- a/x/bank/keeper/view.go +++ b/x/bank/keeper/view.go @@ -27,8 +27,6 @@ type ViewKeeper interface { LockedCoins(ctx sdk.Context, addr sdk.AccAddress) sdk.Coins SpendableCoins(ctx sdk.Context, addr sdk.AccAddress) sdk.Coins - Prefetch(ctx sdk.Context, tx sdk.Tx) - IterateAccountBalances(ctx sdk.Context, addr sdk.AccAddress, cb func(coin sdk.Coin) (stop bool)) IterateAllBalances(ctx sdk.Context, cb func(address sdk.AccAddress, coin sdk.Coin) (stop bool)) } @@ -112,56 +110,6 @@ func (k BaseViewKeeper) GetBalance(ctx sdk.Context, addr sdk.AccAddress, denom s return balance } -func (k BaseViewKeeper) Prefetch(ctx sdk.Context, tx sdk.Tx) { - store := ctx.KVStore(k.storeKey) - balancesStore := prefix.NewStore(store, types.BalancesPrefix) - - for _, msg := range tx.GetMsgs() { - switch msg := msg.(type) { - case *types.MsgSend: - addrs := map[string]bool{} - denoms := map[string]bool{} - addrs[msg.FromAddress] = false - addrs[msg.ToAddress] = true - for _, a := range msg.Amount { - denoms[a.Denom] = true - } - for a, isReceiver := range addrs { - addr := sdk.AccAddress(a) - k.ak.Prefetch(ctx, addr, isReceiver) - accountStore := prefix.NewStore(balancesStore, AddressToPrefixKey(addr)) - for denom := range denoms { - accountStore.Prefetch([]byte(denom), isReceiver) - } - } - - case *types.MsgMultiSend: - addrs := map[string]bool{} - denoms := map[string]bool{} - for _, i := range msg.Inputs { - addrs[i.Address] = false - for _, a := range i.Coins { - denoms[a.Denom] = true - } - } - for _, i := range msg.Outputs { - addrs[i.Address] = true - for _, a := range i.Coins { - denoms[a.Denom] = true - } - } - for a, isReceiver := range addrs { - addr := sdk.AccAddress(a) - k.ak.Prefetch(ctx, addr, isReceiver) - accountStore := prefix.NewStore(balancesStore, AddressToPrefixKey(addr)) - for denom := range denoms { - accountStore.Prefetch([]byte(denom), isReceiver) - } - } - } - } -} - // IterateAccountBalances iterates over the balances of a single account and // provides the token balance to a callback. If true is returned from the // callback, iteration is halted. diff --git a/x/bank/types/expected_keepers.go b/x/bank/types/expected_keepers.go index bda99111bf..df37aa4d4d 100644 --- a/x/bank/types/expected_keepers.go +++ b/x/bank/types/expected_keepers.go @@ -15,7 +15,6 @@ type AccountKeeper interface { GetAllAccounts(ctx sdk.Context) []types.AccountI HasAccount(ctx sdk.Context, addr sdk.AccAddress) bool SetAccount(ctx sdk.Context, acc types.AccountI) - Prefetch(ctx sdk.Context, addr sdk.AccAddress, forSet bool) IterateAccounts(ctx sdk.Context, process func(types.AccountI) bool) diff --git a/x/feegrant/expected_keepers.go b/x/feegrant/expected_keepers.go index 80ae17dfe2..17dce0849a 100644 --- a/x/feegrant/expected_keepers.go +++ b/x/feegrant/expected_keepers.go @@ -19,5 +19,4 @@ type AccountKeeper interface { type BankKeeper interface { SpendableCoins(ctx sdk.Context, addr sdk.AccAddress) sdk.Coins SendCoinsFromAccountToModule(ctx sdk.Context, senderAddr sdk.AccAddress, recipientModule string, amt sdk.Coins) error - Prefetch(ctx sdk.Context, tx sdk.Tx) } diff --git a/x/gov/types/expected_keepers.go b/x/gov/types/expected_keepers.go index 7be6c07d00..4bc3c8a1a3 100644 --- a/x/gov/types/expected_keepers.go +++ b/x/gov/types/expected_keepers.go @@ -47,7 +47,6 @@ type BankKeeper interface { SendCoinsFromModuleToAccount(ctx sdk.Context, senderModule string, recipientAddr sdk.AccAddress, amt sdk.Coins) error SendCoinsFromAccountToModule(ctx sdk.Context, senderAddr sdk.AccAddress, recipientModule string, amt sdk.Coins) error BurnCoins(ctx sdk.Context, name string, amt sdk.Coins) error - Prefetch(ctx sdk.Context, tx sdk.Tx) } // Event Hooks