Skip to content

Commit

Permalink
fix(baseapp): introduce mutex to state (#18846)
Browse files Browse the repository at this point in the history
Co-authored-by: Aleksandr Bezobchuk <[email protected]>
  • Loading branch information
nivasan1 and alexanderbez authored Dec 21, 2023
1 parent 055487e commit c519104
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 48 deletions.
82 changes: 41 additions & 41 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (app *BaseApp) InitChain(req *abci.RequestInitChain) (*abci.ResponseInitCha
// done after the finalizeBlockState and context have been set as it's persisted
// to state.
if req.ConsensusParams != nil {
err := app.StoreConsensusParams(app.finalizeBlockState.ctx, *req.ConsensusParams)
err := app.StoreConsensusParams(app.finalizeBlockState.Context(), *req.ConsensusParams)
if err != nil {
return nil, err
}
Expand All @@ -82,28 +82,28 @@ func (app *BaseApp) InitChain(req *abci.RequestInitChain) (*abci.ResponseInitCha
// handler, the block height is zero by default. However, after Commit is called
// the height needs to reflect the true block height.
initHeader.Height = req.InitialHeight
app.checkState.ctx = app.checkState.ctx.WithBlockHeader(initHeader).
app.checkState.SetContext(app.checkState.Context().WithBlockHeader(initHeader).
WithHeaderInfo(coreheader.Info{
ChainID: req.ChainId,
Height: req.InitialHeight,
Time: req.Time,
})
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockHeader(initHeader).
}))
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockHeader(initHeader).
WithHeaderInfo(coreheader.Info{
ChainID: req.ChainId,
Height: req.InitialHeight,
Time: req.Time,
})
}))
}()

if app.initChainer == nil {
return &abci.ResponseInitChain{}, nil
}

// add block gas meter for any genesis transactions (allow infinite gas)
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockGasMeter(storetypes.NewInfiniteGasMeter())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(storetypes.NewInfiniteGasMeter()))

res, err := app.initChainer(app.finalizeBlockState.ctx, req)
res, err := app.initChainer(app.finalizeBlockState.Context(), req)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -410,7 +410,7 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc
return nil, errors.New("PrepareProposal called with invalid height")
}

app.prepareProposalState.ctx = app.getContextForProposal(app.prepareProposalState.ctx, req.Height).
app.prepareProposalState.SetContext(app.getContextForProposal(app.prepareProposalState.Context(), req.Height).
WithVoteInfos(toVoteInfo(req.LocalLastCommit.Votes)). // this is a set of votes that are not finalized yet, wait for commit
WithBlockHeight(req.Height).
WithProposer(req.ProposerAddress).
Expand All @@ -425,11 +425,11 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc
ChainID: app.chainID,
Height: req.Height,
Time: req.Time,
})
}))

app.prepareProposalState.ctx = app.prepareProposalState.ctx.
WithConsensusParams(app.GetConsensusParams(app.prepareProposalState.ctx)).
WithBlockGasMeter(app.getBlockGasMeter(app.prepareProposalState.ctx))
app.prepareProposalState.SetContext(app.prepareProposalState.Context().
WithConsensusParams(app.GetConsensusParams(app.prepareProposalState.Context())).
WithBlockGasMeter(app.getBlockGasMeter(app.prepareProposalState.Context())))

defer func() {
if err := recover(); err != nil {
Expand All @@ -444,7 +444,7 @@ func (app *BaseApp) PrepareProposal(req *abci.RequestPrepareProposal) (resp *abc
}
}()

resp, err = app.prepareProposal(app.prepareProposalState.ctx, req)
resp, err = app.prepareProposal(app.prepareProposalState.Context(), req)
if err != nil {
app.logger.Error("failed to prepare proposal", "height", req.Height, "time", req.Time, "err", err)
return &abci.ResponsePrepareProposal{Txs: req.Txs}, nil
Expand Down Expand Up @@ -502,7 +502,7 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
app.setState(execModeFinalize, header)
}

app.processProposalState.ctx = app.getContextForProposal(app.processProposalState.ctx, req.Height).
app.processProposalState.SetContext(app.getContextForProposal(app.processProposalState.Context(), req.Height).
WithVoteInfos(req.ProposedLastCommit.Votes). // this is a set of votes that are not finalized yet, wait for commit
WithBlockHeight(req.Height).
WithHeaderHash(req.Hash).
Expand All @@ -519,11 +519,11 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
ChainID: app.chainID,
Height: req.Height,
Time: req.Time,
})
}))

app.processProposalState.ctx = app.processProposalState.ctx.
WithConsensusParams(app.GetConsensusParams(app.processProposalState.ctx)).
WithBlockGasMeter(app.getBlockGasMeter(app.processProposalState.ctx))
app.processProposalState.SetContext(app.processProposalState.Context().
WithConsensusParams(app.GetConsensusParams(app.processProposalState.Context())).
WithBlockGasMeter(app.getBlockGasMeter(app.processProposalState.Context())))

defer func() {
if err := recover(); err != nil {
Expand All @@ -538,7 +538,7 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
}
}()

resp, err = app.processProposal(app.processProposalState.ctx, req)
resp, err = app.processProposal(app.processProposalState.Context(), req)
if err != nil {
app.logger.Error("failed to process proposal", "height", req.Height, "time", req.Time, "hash", fmt.Sprintf("%X", req.Hash), "err", err)
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil
Expand Down Expand Up @@ -578,7 +578,7 @@ func (app *BaseApp) ExtendVote(_ context.Context, req *abci.RequestExtendVote) (
// finalizeBlockState context, otherwise we don't get the uncommitted data
// from InitChain.
if req.Height == app.initialHeight {
ctx, _ = app.finalizeBlockState.ctx.CacheContext()
ctx, _ = app.finalizeBlockState.Context().CacheContext()
} else {
ms := app.cms.CacheMultiStore()
ctx = sdk.NewContext(ms, false, app.logger).WithStreamingManager(app.streamingManager).WithChainID(app.chainID).WithBlockHeight(req.Height)
Expand Down Expand Up @@ -654,7 +654,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.RequestVerifyVoteExtension) (r
// finalizeBlockState context, otherwise we don't get the uncommitted data
// from InitChain.
if req.Height == app.initialHeight {
ctx, _ = app.finalizeBlockState.ctx.CacheContext()
ctx, _ = app.finalizeBlockState.Context().CacheContext()
} else {
ms := app.cms.CacheMultiStore()
ctx = sdk.NewContext(ms, false, app.logger).WithStreamingManager(app.streamingManager).WithChainID(app.chainID).WithBlockHeight(req.Height)
Expand Down Expand Up @@ -696,7 +696,7 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.RequestVerifyVoteExtension) (r

// internalFinalizeBlock executes the block, called by the Optimistic
// Execution flow or by the FinalizeBlock ABCI method. The context received is
// only used to handle early cancellation, for anything related to state app.finalizeBlockState.ctx
// only used to handle early cancellation, for anything related to state app.finalizeBlockState.Context()
// must be used.
func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
var events []abci.Event
Expand Down Expand Up @@ -732,7 +732,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
}

// Context is now updated with Header information.
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().
WithBlockHeader(header).
WithHeaderHash(req.Hash).
WithHeaderInfo(coreheader.Info{
Expand All @@ -742,24 +742,24 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
Hash: req.Hash,
AppHash: app.LastCommitID().Hash,
}).
WithConsensusParams(app.GetConsensusParams(app.finalizeBlockState.ctx)).
WithConsensusParams(app.GetConsensusParams(app.finalizeBlockState.Context())).
WithVoteInfos(req.DecidedLastCommit.Votes).
WithExecMode(sdk.ExecModeFinalize).
WithCometInfo(corecomet.Info{
Evidence: sdk.ToSDKEvidence(req.Misbehavior),
ValidatorsHash: req.NextValidatorsHash,
ProposerAddress: req.ProposerAddress,
LastCommit: sdk.ToSDKCommitInfo(req.DecidedLastCommit),
})
}))

// GasMeter must be set after we get a context with updated consensus params.
gasMeter := app.getBlockGasMeter(app.finalizeBlockState.ctx)
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockGasMeter(gasMeter)
gasMeter := app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))

if app.checkState != nil {
app.checkState.ctx = app.checkState.ctx.
app.checkState.SetContext(app.checkState.Context().
WithBlockGasMeter(gasMeter).
WithHeaderHash(req.Hash)
WithHeaderHash(req.Hash))
}

if err := app.preBlock(req); err != nil {
Expand All @@ -783,8 +783,8 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
events = append(events, beginBlock.Events...)

// Reset the gas meter so that the AnteHandlers aren't required to
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.ctx)
app.finalizeBlockState.ctx = app.finalizeBlockState.ctx.WithBlockGasMeter(gasMeter)
gasMeter = app.getBlockGasMeter(app.finalizeBlockState.Context())
app.finalizeBlockState.SetContext(app.finalizeBlockState.Context().WithBlockGasMeter(gasMeter))

// Iterate over all raw transactions in the proposal and attempt to execute
// them, gathering the execution results.
Expand Down Expand Up @@ -825,7 +825,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
app.finalizeBlockState.ms = app.finalizeBlockState.ms.SetTracingContext(nil).(storetypes.CacheMultiStore)
}

endBlock, err := app.endBlock(app.finalizeBlockState.ctx)
endBlock, err := app.endBlock(app.finalizeBlockState.Context())
if err != nil {
return nil, err
}
Expand All @@ -839,7 +839,7 @@ func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.Request
}

events = append(events, endBlock.Events...)
cp := app.GetConsensusParams(app.finalizeBlockState.ctx)
cp := app.GetConsensusParams(app.finalizeBlockState.Context())

return &abci.ResponseFinalizeBlock{
Events: events,
Expand Down Expand Up @@ -887,7 +887,7 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons

// call the streaming service hooks with the FinalizeBlock messages
for _, streamingListener := range app.streamingManager.ABCIListeners {
if err := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.ctx, *req, *res); err != nil {
if err := streamingListener.ListenFinalizeBlock(app.finalizeBlockState.Context(), *req, *res); err != nil {
app.logger.Error("ListenFinalizeBlock listening hook failed", "height", req.Height, "err", err)
}
}
Expand Down Expand Up @@ -921,11 +921,11 @@ func (app *BaseApp) checkHalt(height int64, time time.Time) error {
// against that height and gracefully halt if it matches the latest committed
// height.
func (app *BaseApp) Commit() (*abci.ResponseCommit, error) {
header := app.finalizeBlockState.ctx.BlockHeader()
header := app.finalizeBlockState.Context().BlockHeader()
retainHeight := app.GetBlockRetentionHeight(header.Height)

if app.precommiter != nil {
app.precommiter(app.finalizeBlockState.ctx)
app.precommiter(app.finalizeBlockState.Context())
}

rms, ok := app.cms.(*rootmulti.Store)
Expand All @@ -941,7 +941,7 @@ func (app *BaseApp) Commit() (*abci.ResponseCommit, error) {

abciListeners := app.streamingManager.ABCIListeners
if len(abciListeners) > 0 {
ctx := app.finalizeBlockState.ctx
ctx := app.finalizeBlockState.Context()
blockHeight := ctx.BlockHeight()
changeSet := app.cms.PopStateCache()

Expand All @@ -961,7 +961,7 @@ func (app *BaseApp) Commit() (*abci.ResponseCommit, error) {
app.finalizeBlockState = nil

if app.prepareCheckStater != nil {
app.prepareCheckStater(app.checkState.ctx)
app.prepareCheckStater(app.checkState.Context())
}

// The SnapshotIfApplicable method will create the snapshot by starting the goroutine
Expand Down Expand Up @@ -1126,7 +1126,7 @@ func (app *BaseApp) FilterPeerByID(info string) *abci.ResponseQuery {
// access any state changes made in InitChain.
func (app *BaseApp) getContextForProposal(ctx sdk.Context, height int64) sdk.Context {
if height == app.initialHeight {
ctx, _ = app.finalizeBlockState.ctx.CacheContext()
ctx, _ = app.finalizeBlockState.Context().CacheContext()

// clear all context data set during InitChain to avoid inconsistent behavior
ctx = ctx.WithHeaderInfo(coreheader.Info{}).WithBlockHeader(cmtproto.Header{})
Expand Down Expand Up @@ -1236,7 +1236,7 @@ func (app *BaseApp) CreateQueryContext(height int64, prove bool) (sdk.Context, e
ctx := sdk.NewContext(cacheMS, true, app.logger).
WithMinGasPrices(app.minGasPrices).
WithBlockHeight(height).
WithGasMeter(storetypes.NewGasMeter(app.queryGasLimit)).WithBlockHeader(app.checkState.ctx.BlockHeader())
WithGasMeter(storetypes.NewGasMeter(app.queryGasLimit)).WithBlockHeader(app.checkState.Context().BlockHeader())

if height != lastBlockHeight {
rms, ok := app.cms.(*rootmulti.Store)
Expand Down Expand Up @@ -1304,7 +1304,7 @@ func (app *BaseApp) GetBlockRetentionHeight(commitHeight int64) int64 {
// evidence parameters instead of computing an estimated number of blocks based
// on the unbonding period and block commitment time as the two should be
// equivalent.
cp := app.GetConsensusParams(app.finalizeBlockState.ctx)
cp := app.GetConsensusParams(app.finalizeBlockState.Context())
if cp.Evidence != nil && cp.Evidence.MaxAgeNumBlocks > 0 {
retentionHeight = commitHeight - cp.Evidence.MaxAgeNumBlocks
}
Expand Down
12 changes: 6 additions & 6 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func (app *BaseApp) setState(mode execMode, header cmtproto.Header) {

switch mode {
case execModeCheck:
baseState.ctx = baseState.ctx.WithIsCheckTx(true).WithMinGasPrices(app.minGasPrices)
baseState.SetContext(baseState.Context().WithIsCheckTx(true).WithMinGasPrices(app.minGasPrices))
app.checkState = baseState

case execModePrepareProposal:
Expand Down Expand Up @@ -656,7 +656,7 @@ func (app *BaseApp) getContextForTx(mode execMode, txBytes []byte) sdk.Context {
if modeState == nil {
panic(fmt.Sprintf("state is nil for mode %v", mode))
}
ctx := modeState.ctx.
ctx := modeState.Context().
WithTxBytes(txBytes)
// WithVoteInfos(app.voteInfos) // TODO: identify if this is needed

Expand Down Expand Up @@ -696,7 +696,7 @@ func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context

func (app *BaseApp) preBlock(req *abci.RequestFinalizeBlock) error {
if app.preBlocker != nil {
ctx := app.finalizeBlockState.ctx
ctx := app.finalizeBlockState.Context()
rsp, err := app.preBlocker(ctx, req)
if err != nil {
return err
Expand All @@ -708,7 +708,7 @@ func (app *BaseApp) preBlock(req *abci.RequestFinalizeBlock) error {
// GasMeter must be set after we get a context with updated consensus params.
gasMeter := app.getBlockGasMeter(ctx)
ctx = ctx.WithBlockGasMeter(gasMeter)
app.finalizeBlockState.ctx = ctx
app.finalizeBlockState.SetContext(ctx)
}
}
return nil
Expand All @@ -721,7 +721,7 @@ func (app *BaseApp) beginBlock(req *abci.RequestFinalizeBlock) (sdk.BeginBlock,
)

if app.beginBlocker != nil {
resp, err = app.beginBlocker(app.finalizeBlockState.ctx)
resp, err = app.beginBlocker(app.finalizeBlockState.Context())
if err != nil {
return resp, err
}
Expand Down Expand Up @@ -783,7 +783,7 @@ func (app *BaseApp) endBlock(ctx context.Context) (sdk.EndBlock, error) {
var endblock sdk.EndBlock

if app.endBlocker != nil {
eb, err := app.endBlocker(app.finalizeBlockState.ctx)
eb, err := app.endBlocker(app.finalizeBlockState.Context())
if err != nil {
return endblock, err
}
Expand Down
15 changes: 14 additions & 1 deletion baseapp/state.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package baseapp

import (
"sync"

storetypes "cosmossdk.io/store/types"

sdk "github.com/cosmos/cosmos-sdk/types"
)

type state struct {
ms storetypes.CacheMultiStore
ms storetypes.CacheMultiStore

mtx sync.RWMutex
ctx sdk.Context
}

Expand All @@ -17,7 +21,16 @@ func (st *state) CacheMultiStore() storetypes.CacheMultiStore {
return st.ms.CacheMultiStore()
}

// SetContext updates the state's context to the context provided.
func (st *state) SetContext(ctx sdk.Context) {
st.mtx.Lock()
defer st.mtx.Unlock()
st.ctx = ctx
}

// Context returns the Context of the state.
func (st *state) Context() sdk.Context {
st.mtx.RLock()
defer st.mtx.RUnlock()
return st.ctx
}

0 comments on commit c519104

Please sign in to comment.