From 1e9d871f4aa44504b1a05a1907b0876b976bd457 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Tue, 16 May 2023 17:02:25 +0800 Subject: [PATCH] feat: async commit Closes: #16173 --- simapp/app.go | 2 +- store/iavl/store.go | 86 +++++++++++++++++++++++++++++++----- store/mem/store.go | 4 ++ store/rootmulti/dbadapter.go | 4 ++ store/rootmulti/store.go | 9 ++++ store/transient/store.go | 4 ++ store/types/store.go | 1 + x/upgrade/abci.go | 4 ++ x/upgrade/keeper/keeper.go | 8 +++- x/upgrade/module.go | 11 ++--- 10 files changed, 115 insertions(+), 18 deletions(-) diff --git a/simapp/app.go b/simapp/app.go index 06a625e6a084..39d1b622b0ca 100644 --- a/simapp/app.go +++ b/simapp/app.go @@ -317,7 +317,7 @@ func NewSimApp( } homePath := cast.ToString(appOpts.Get(flags.FlagHome)) // set the governance module account as the authority for conducting upgrades - app.UpgradeKeeper = upgradekeeper.NewKeeper(skipUpgradeHeights, keys[upgradetypes.StoreKey], appCodec, homePath, app.BaseApp, authtypes.NewModuleAddress(govtypes.ModuleName).String()) + app.UpgradeKeeper = upgradekeeper.NewKeeper(skipUpgradeHeights, keys[upgradetypes.StoreKey], appCodec, homePath, app.BaseApp, authtypes.NewModuleAddress(govtypes.ModuleName).String(), app.CommitMultiStore()) // Register the proposal types // Deprecated: Avoid adding new handlers, instead use the new proposal flow diff --git a/store/iavl/store.go b/store/iavl/store.go index a772bb637bc3..31c2c0f082d3 100644 --- a/store/iavl/store.go +++ b/store/iavl/store.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "sync" abci "github.com/cometbft/cometbft/abci/types" cmtprotocrypto "github.com/cometbft/cometbft/proto/tendermint/crypto" @@ -23,6 +24,7 @@ import ( const ( DefaultIAVLCacheSize = 500000 + CommitQueueBuffer = 10 // TODO configurable ) var ( @@ -35,9 +37,14 @@ var ( // Store Implements types.KVStore and CommitKVStore. type Store struct { - tree Tree - logger log.Logger - metrics metrics.StoreMetrics + tree Tree + logger log.Logger + metrics metrics.StoreMetrics + initialVersion int64 + + mtx sync.Mutex + commitQueue chan<- int64 + commitQuit <-chan error } // LoadStore returns an IAVL Store as a CommitKVStore. Internally, it will load the @@ -132,15 +139,56 @@ func (st *Store) GetImmutable(version int64) (*Store, error) { func (st *Store) Commit() types.CommitID { defer st.metrics.MeasureSince("store", "iavl", "commit") - hash, version, err := st.tree.SaveVersion() - if err != nil { - panic(err) - } + commitId := st.workingCommitID() - return types.CommitID{ - Version: version, - Hash: hash, + st.mtx.Lock() + defer st.mtx.Unlock() + + if st.commitQueue == nil { + st.initAsyncCommit() } + st.commitQueue <- commitId.Version + + return commitId +} + +func (st *Store) initAsyncCommit() { + commitQueue := make(chan int64, CommitQueueBuffer) + quitChan := make(chan error) + + go func() { + defer close(quitChan) + + for expVersion := range commitQueue { + _, version, err := st.tree.SaveVersion() + if err != nil { + quitChan <- err + break + } + + if version != expVersion { + quitChan <- fmt.Errorf("version sanity check failed: %d != %d", expVersion, version) + break + } + } + }() + + st.commitQueue = commitQueue + st.commitQuit = quitChan +} + +// WaitAsyncCommit waits for the async commits to finish +func (st *Store) WaitAsyncCommit() error { + st.mtx.Lock() + defer st.mtx.Unlock() + + close(st.commitQueue) + err := <-st.commitQuit + + st.commitQueue = nil + st.commitQuit = nil + + return err } // WorkingHash returns the hash of the current working tree. @@ -276,11 +324,27 @@ func (st *Store) ReverseIterator(start, end []byte) types.Iterator { } // SetInitialVersion sets the initial version of the IAVL tree. It is used when -// starting a new chain at an arbitrary height. +// starting a new chain at an arbitrary height, or adding a new store in an upgrade. func (st *Store) SetInitialVersion(version int64) { + st.initialVersion = version st.tree.SetInitialVersion(uint64(version)) } +// workingCommitID returns the commit id without actual commit. +// +// FIXME should be done in iavl library. +func (st *Store) workingCommitID() types.CommitID { + version := st.tree.Version() + 1 + if version == 1 && st.initialVersion > 0 { + version = st.initialVersion + } + return types.CommitID{ + Hash: st.WorkingHash(), + Version: version, + } + +} + // Exports the IAVL store at the given version, returning an iavl.Exporter for the tree. func (st *Store) Export(version int64) (*iavl.Exporter, error) { istore, err := st.GetImmutable(version) diff --git a/store/mem/store.go b/store/mem/store.go index b819d7536302..584be6d1f3e9 100644 --- a/store/mem/store.go +++ b/store/mem/store.go @@ -49,6 +49,10 @@ func (s Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.Cach // Commit performs a no-op as entries are persistent between commitments. func (s *Store) Commit() (id types.CommitID) { return } +func (s *Store) WaitAsyncCommit() error { + return nil +} + func (s *Store) SetPruning(pruning pruningtypes.PruningOptions) {} // GetPruning is a no-op as pruning options cannot be directly set on this store. diff --git a/store/rootmulti/dbadapter.go b/store/rootmulti/dbadapter.go index 4f32ada4e9ff..97ceeb4173c7 100644 --- a/store/rootmulti/dbadapter.go +++ b/store/rootmulti/dbadapter.go @@ -30,6 +30,10 @@ func (cdsa commitDBStoreAdapter) Commit() types.CommitID { } } +func (cdsa commitDBStoreAdapter) WaitAsyncCommit() error { + return nil +} + func (cdsa commitDBStoreAdapter) LastCommitID() types.CommitID { return types.CommitID{ Version: -1, diff --git a/store/rootmulti/store.go b/store/rootmulti/store.go index d057fcf5b23f..d2014a43894b 100644 --- a/store/rootmulti/store.go +++ b/store/rootmulti/store.go @@ -497,6 +497,15 @@ func (rs *Store) Commit() types.CommitID { } } +// WaitAsyncCommit waits for the async commits to finish +func (rs *Store) WaitAsyncCommit() error { + errs := make([]error, 0, len(rs.stores)) + for _, store := range rs.stores { + errs = append(errs, store.WaitAsyncCommit()) + } + return errors.Join(errs...) +} + // WorkingHash returns the current hash of the store. // it will be used to get the current app hash before commit. func (rs *Store) WorkingHash() []byte { diff --git a/store/transient/store.go b/store/transient/store.go index 6f393279f571..aed00c2eac10 100644 --- a/store/transient/store.go +++ b/store/transient/store.go @@ -30,6 +30,10 @@ func (ts *Store) Commit() (id types.CommitID) { return } +func (ts *Store) WaitAsyncCommit() error { + return nil +} + func (ts *Store) SetPruning(_ pruningtypes.PruningOptions) {} // GetPruning is a no-op as pruning options cannot be directly set on this store. diff --git a/store/types/store.go b/store/types/store.go index e2a83bf5636d..89c3be9fb1ff 100644 --- a/store/types/store.go +++ b/store/types/store.go @@ -21,6 +21,7 @@ type Store interface { type Committer interface { Commit() CommitID LastCommitID() CommitID + WaitAsyncCommit() error // WorkingHash returns the hash of the KVStore's state before commit. WorkingHash() []byte diff --git a/x/upgrade/abci.go b/x/upgrade/abci.go index ae72c678edf2..22e9abd74470 100644 --- a/x/upgrade/abci.go +++ b/x/upgrade/abci.go @@ -73,6 +73,10 @@ func BeginBlocker(k *keeper.Keeper, ctx sdk.Context) { panic(fmt.Errorf("unable to write upgrade info to filesystem: %s", err.Error())) } + if err := k.WaitAsyncCommit(); err != nil { + panic(fmt.Errorf("async commit failed: %w", err)) + } + upgradeMsg := BuildUpgradeNeededMsg(plan) logger.Error(upgradeMsg) panic(upgradeMsg) diff --git a/x/upgrade/keeper/keeper.go b/x/upgrade/keeper/keeper.go index 143f8225fd5e..1815a3693982 100644 --- a/x/upgrade/keeper/keeper.go +++ b/x/upgrade/keeper/keeper.go @@ -40,6 +40,7 @@ type Keeper struct { downgradeVerified bool // tells if we've already sanity checked that this binary version isn't being used against an old state. authority string // the address capable of executing and canceling an upgrade. Usually the gov module account initVersionMap module.VersionMap // the module version map at init genesis + cms storetypes.CommitMultiStore // wait for async commit to finish before trigger restart } // NewKeeper constructs an upgrade Keeper which requires the following arguments: @@ -48,7 +49,7 @@ type Keeper struct { // cdc - the app-wide binary codec // homePath - root directory of the application's config // vs - the interface implemented by baseapp which allows setting baseapp's protocol version field -func NewKeeper(skipUpgradeHeights map[int64]bool, storeKey storetypes.StoreKey, cdc codec.BinaryCodec, homePath string, vs xp.ProtocolVersionSetter, authority string) *Keeper { +func NewKeeper(skipUpgradeHeights map[int64]bool, storeKey storetypes.StoreKey, cdc codec.BinaryCodec, homePath string, vs xp.ProtocolVersionSetter, authority string, cms storetypes.CommitMultiStore) *Keeper { k := &Keeper{ homePath: homePath, skipUpgradeHeights: skipUpgradeHeights, @@ -57,6 +58,7 @@ func NewKeeper(skipUpgradeHeights map[int64]bool, storeKey storetypes.StoreKey, upgradeHandlers: map[string]types.UpgradeHandler{}, versionSetter: vs, authority: authority, + cms: cms, } if upgradePlan, err := k.ReadUpgradeInfoFromDisk(); err == nil && upgradePlan.Height > 0 { @@ -468,3 +470,7 @@ func (k *Keeper) SetDowngradeVerified(v bool) { func (k Keeper) DowngradeVerified() bool { return k.downgradeVerified } + +func (k Keeper) WaitAsyncCommit() error { + return k.cms.WaitAsyncCommit() +} diff --git a/x/upgrade/module.go b/x/upgrade/module.go index 77938040fbf3..77009590f2a5 100644 --- a/x/upgrade/module.go +++ b/x/upgrade/module.go @@ -177,10 +177,11 @@ func init() { type ModuleInputs struct { depinject.In - Config *modulev1.Module - Key *store.KVStoreKey - Cdc codec.Codec - AddressCodec address.Codec + Config *modulev1.Module + Key *store.KVStoreKey + Cdc codec.Codec + AddressCodec address.Codec + CommitMultiStore store.CommitMultiStore AppOpts servertypes.AppOptions `optional:"true"` } @@ -215,7 +216,7 @@ func ProvideModule(in ModuleInputs) ModuleOutputs { } // set the governance module account as the authority for conducting upgrades - k := keeper.NewKeeper(skipUpgradeHeights, in.Key, in.Cdc, homePath, nil, authority.String()) + k := keeper.NewKeeper(skipUpgradeHeights, in.Key, in.Cdc, homePath, nil, authority.String(), in.CommitMultiStore) baseappOpt := func(app *baseapp.BaseApp) { k.SetVersionSetter(app) }