Skip to content

Commit

Permalink
feat: async commit
Browse files Browse the repository at this point in the history
Closes: cosmos#16173
  • Loading branch information
yihuang committed May 16, 2023
1 parent 3f8fa79 commit 1e9d871
Show file tree
Hide file tree
Showing 10 changed files with 115 additions and 18 deletions.
2 changes: 1 addition & 1 deletion simapp/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func NewSimApp(
}
homePath := cast.ToString(appOpts.Get(flags.FlagHome))
// set the governance module account as the authority for conducting upgrades
app.UpgradeKeeper = upgradekeeper.NewKeeper(skipUpgradeHeights, keys[upgradetypes.StoreKey], appCodec, homePath, app.BaseApp, authtypes.NewModuleAddress(govtypes.ModuleName).String())
app.UpgradeKeeper = upgradekeeper.NewKeeper(skipUpgradeHeights, keys[upgradetypes.StoreKey], appCodec, homePath, app.BaseApp, authtypes.NewModuleAddress(govtypes.ModuleName).String(), app.CommitMultiStore())

// Register the proposal types
// Deprecated: Avoid adding new handlers, instead use the new proposal flow
Expand Down
86 changes: 75 additions & 11 deletions store/iavl/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"io"
"sync"

abci "github.com/cometbft/cometbft/abci/types"
cmtprotocrypto "github.com/cometbft/cometbft/proto/tendermint/crypto"
Expand All @@ -23,6 +24,7 @@ import (

const (
DefaultIAVLCacheSize = 500000
CommitQueueBuffer = 10 // TODO configurable
)

var (
Expand All @@ -35,9 +37,14 @@ var (

// Store Implements types.KVStore and CommitKVStore.
type Store struct {
tree Tree
logger log.Logger
metrics metrics.StoreMetrics
tree Tree
logger log.Logger
metrics metrics.StoreMetrics
initialVersion int64

mtx sync.Mutex
commitQueue chan<- int64
commitQuit <-chan error
}

// LoadStore returns an IAVL Store as a CommitKVStore. Internally, it will load the
Expand Down Expand Up @@ -132,15 +139,56 @@ func (st *Store) GetImmutable(version int64) (*Store, error) {
func (st *Store) Commit() types.CommitID {
defer st.metrics.MeasureSince("store", "iavl", "commit")

hash, version, err := st.tree.SaveVersion()
if err != nil {
panic(err)
}
commitId := st.workingCommitID()

return types.CommitID{
Version: version,
Hash: hash,
st.mtx.Lock()
defer st.mtx.Unlock()

if st.commitQueue == nil {
st.initAsyncCommit()
}
st.commitQueue <- commitId.Version

return commitId
}

func (st *Store) initAsyncCommit() {
commitQueue := make(chan int64, CommitQueueBuffer)
quitChan := make(chan error)

go func() {
defer close(quitChan)

for expVersion := range commitQueue {
_, version, err := st.tree.SaveVersion()
if err != nil {
quitChan <- err
break
}

if version != expVersion {
quitChan <- fmt.Errorf("version sanity check failed: %d != %d", expVersion, version)
break
}
}
}()

st.commitQueue = commitQueue
st.commitQuit = quitChan
}

// WaitAsyncCommit waits for the async commits to finish
func (st *Store) WaitAsyncCommit() error {
st.mtx.Lock()
defer st.mtx.Unlock()

close(st.commitQueue)
err := <-st.commitQuit

st.commitQueue = nil
st.commitQuit = nil

return err
}

// WorkingHash returns the hash of the current working tree.
Expand Down Expand Up @@ -276,11 +324,27 @@ func (st *Store) ReverseIterator(start, end []byte) types.Iterator {
}

// SetInitialVersion sets the initial version of the IAVL tree. It is used when
// starting a new chain at an arbitrary height.
// starting a new chain at an arbitrary height, or adding a new store in an upgrade.
func (st *Store) SetInitialVersion(version int64) {
st.initialVersion = version
st.tree.SetInitialVersion(uint64(version))
}

// workingCommitID returns the commit id without actual commit.
//
// FIXME should be done in iavl library.
func (st *Store) workingCommitID() types.CommitID {
version := st.tree.Version() + 1
if version == 1 && st.initialVersion > 0 {
version = st.initialVersion
}
return types.CommitID{
Hash: st.WorkingHash(),
Version: version,
}

}

// Exports the IAVL store at the given version, returning an iavl.Exporter for the tree.
func (st *Store) Export(version int64) (*iavl.Exporter, error) {
istore, err := st.GetImmutable(version)
Expand Down
4 changes: 4 additions & 0 deletions store/mem/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func (s Store) CacheWrapWithTrace(w io.Writer, tc types.TraceContext) types.Cach
// Commit performs a no-op as entries are persistent between commitments.
func (s *Store) Commit() (id types.CommitID) { return }

func (s *Store) WaitAsyncCommit() error {
return nil
}

func (s *Store) SetPruning(pruning pruningtypes.PruningOptions) {}

// GetPruning is a no-op as pruning options cannot be directly set on this store.
Expand Down
4 changes: 4 additions & 0 deletions store/rootmulti/dbadapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func (cdsa commitDBStoreAdapter) Commit() types.CommitID {
}
}

func (cdsa commitDBStoreAdapter) WaitAsyncCommit() error {
return nil
}

func (cdsa commitDBStoreAdapter) LastCommitID() types.CommitID {
return types.CommitID{
Version: -1,
Expand Down
9 changes: 9 additions & 0 deletions store/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,15 @@ func (rs *Store) Commit() types.CommitID {
}
}

// WaitAsyncCommit waits for the async commits to finish
func (rs *Store) WaitAsyncCommit() error {
errs := make([]error, 0, len(rs.stores))
for _, store := range rs.stores {
errs = append(errs, store.WaitAsyncCommit())
}
return errors.Join(errs...)
}

// WorkingHash returns the current hash of the store.
// it will be used to get the current app hash before commit.
func (rs *Store) WorkingHash() []byte {
Expand Down
4 changes: 4 additions & 0 deletions store/transient/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func (ts *Store) Commit() (id types.CommitID) {
return
}

func (ts *Store) WaitAsyncCommit() error {
return nil
}

func (ts *Store) SetPruning(_ pruningtypes.PruningOptions) {}

// GetPruning is a no-op as pruning options cannot be directly set on this store.
Expand Down
1 change: 1 addition & 0 deletions store/types/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ type Store interface {
type Committer interface {
Commit() CommitID
LastCommitID() CommitID
WaitAsyncCommit() error

// WorkingHash returns the hash of the KVStore's state before commit.
WorkingHash() []byte
Expand Down
4 changes: 4 additions & 0 deletions x/upgrade/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ func BeginBlocker(k *keeper.Keeper, ctx sdk.Context) {
panic(fmt.Errorf("unable to write upgrade info to filesystem: %s", err.Error()))
}

if err := k.WaitAsyncCommit(); err != nil {
panic(fmt.Errorf("async commit failed: %w", err))
}

upgradeMsg := BuildUpgradeNeededMsg(plan)
logger.Error(upgradeMsg)
panic(upgradeMsg)
Expand Down
8 changes: 7 additions & 1 deletion x/upgrade/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Keeper struct {
downgradeVerified bool // tells if we've already sanity checked that this binary version isn't being used against an old state.
authority string // the address capable of executing and canceling an upgrade. Usually the gov module account
initVersionMap module.VersionMap // the module version map at init genesis
cms storetypes.CommitMultiStore // wait for async commit to finish before trigger restart
}

// NewKeeper constructs an upgrade Keeper which requires the following arguments:
Expand All @@ -48,7 +49,7 @@ type Keeper struct {
// cdc - the app-wide binary codec
// homePath - root directory of the application's config
// vs - the interface implemented by baseapp which allows setting baseapp's protocol version field
func NewKeeper(skipUpgradeHeights map[int64]bool, storeKey storetypes.StoreKey, cdc codec.BinaryCodec, homePath string, vs xp.ProtocolVersionSetter, authority string) *Keeper {
func NewKeeper(skipUpgradeHeights map[int64]bool, storeKey storetypes.StoreKey, cdc codec.BinaryCodec, homePath string, vs xp.ProtocolVersionSetter, authority string, cms storetypes.CommitMultiStore) *Keeper {
k := &Keeper{
homePath: homePath,
skipUpgradeHeights: skipUpgradeHeights,
Expand All @@ -57,6 +58,7 @@ func NewKeeper(skipUpgradeHeights map[int64]bool, storeKey storetypes.StoreKey,
upgradeHandlers: map[string]types.UpgradeHandler{},
versionSetter: vs,
authority: authority,
cms: cms,
}

if upgradePlan, err := k.ReadUpgradeInfoFromDisk(); err == nil && upgradePlan.Height > 0 {
Expand Down Expand Up @@ -468,3 +470,7 @@ func (k *Keeper) SetDowngradeVerified(v bool) {
func (k Keeper) DowngradeVerified() bool {
return k.downgradeVerified
}

func (k Keeper) WaitAsyncCommit() error {
return k.cms.WaitAsyncCommit()
}
11 changes: 6 additions & 5 deletions x/upgrade/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,11 @@ func init() {
type ModuleInputs struct {
depinject.In

Config *modulev1.Module
Key *store.KVStoreKey
Cdc codec.Codec
AddressCodec address.Codec
Config *modulev1.Module
Key *store.KVStoreKey
Cdc codec.Codec
AddressCodec address.Codec
CommitMultiStore store.CommitMultiStore

AppOpts servertypes.AppOptions `optional:"true"`
}
Expand Down Expand Up @@ -215,7 +216,7 @@ func ProvideModule(in ModuleInputs) ModuleOutputs {
}

// set the governance module account as the authority for conducting upgrades
k := keeper.NewKeeper(skipUpgradeHeights, in.Key, in.Cdc, homePath, nil, authority.String())
k := keeper.NewKeeper(skipUpgradeHeights, in.Key, in.Cdc, homePath, nil, authority.String(), in.CommitMultiStore)
baseappOpt := func(app *baseapp.BaseApp) {
k.SetVersionSetter(app)
}
Expand Down

0 comments on commit 1e9d871

Please sign in to comment.