Skip to content

Commit

Permalink
feat: Optimistic Execution (backport #16581) (#18205)
Browse files Browse the repository at this point in the history
Co-authored-by: Facundo Medica <[email protected]>
Co-authored-by: Facundo <[email protected]>
  • Loading branch information
3 people authored Oct 23, 2023
1 parent 1376e0d commit 42dbfc4
Show file tree
Hide file tree
Showing 11 changed files with 359 additions and 20 deletions.
5 changes: 3 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
### Features

* (server) [#18162](https://github.com/cosmos/cosmos-sdk/pull/18162) Start gRPC & API server in standalone mode.
* (baseapp) [#16581](https://github.com/cosmos/cosmos-sdk/pull/16581) Implement Optimistic Execution as an experimental feature (not enabled by default).

### Improvements

Expand Down Expand Up @@ -326,7 +327,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (crypto/keyring) [#13734](https://github.com/cosmos/cosmos-sdk/pull/13834) The keyring's `Sign` method now takes a new `signMode` argument. It is only used if the signing key is a Ledger hardware device. You can set it to 0 in all other cases.
* (snapshots) [14048](https://github.com/cosmos/cosmos-sdk/pull/14048) Move the Snapshot package to the store package. This is done in an effort group all storage related logic under one package.
* (signing) [#13701](https://github.com/cosmos/cosmos-sdk/pull/) Add `context.Context` as an argument `x/auth/signing.VerifySignature`.
* (store) [#11825](https://github.com/cosmos/cosmos-sdk/pull/11825) Make extension snapshotter interface safer to use, renamed the util function `WriteExtensionItem` to `WriteExtensionPayload`.
* (store) [#11825](https://github.com/cosmos/cosmos-sdk/pull/11825) Make extension snapshotter interface safer to use, renamed the util function `WriteExtensionItem` to `WriteExtensionPayload`.

### Client Breaking Changes

Expand Down Expand Up @@ -401,4 +402,4 @@ Ref: https://keepachangelog.com/en/1.0.0/

## Previous Versions

[CHANGELOG of previous versions](https://github.com/cosmos/cosmos-sdk/blob/main/CHANGELOG.md#v0470---2023-03-14).
[CHANGELOG of previous versions](https://github.com/cosmos/cosmos-sdk/blob/main/CHANGELOG.md#v0470---2023-03-14).
95 changes: 83 additions & 12 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,8 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
// processed the first block, as we want to avoid overwriting the finalizeState
// after state changes during InitChain.
if req.Height > app.initialHeight {
// abort any running OE
app.optimisticExec.Abort()
app.setState(execModeFinalize, header)
}

Expand Down Expand Up @@ -534,6 +536,19 @@ func (app *BaseApp) ProcessProposal(req *abci.RequestProcessProposal) (resp *abc
return &abci.ResponseProcessProposal{Status: abci.ResponseProcessProposal_REJECT}, nil
}

// Only execute optimistic execution if the proposal is accepted, OE is
// enabled and the block height is greater than the initial height. During
// the first block we'll be carrying state from InitChain, so it would be
// impossible for us to easily revert.
// After the first block has been processed, the next blocks will get executed
// optimistically, so that when the ABCI client calls `FinalizeBlock` the app
// can have a response ready.
if resp.Status == abci.ResponseProcessProposal_ACCEPT &&
app.optimisticExec.Enabled() &&
req.Height > app.initialHeight {
app.optimisticExec.Execute(req)
}

return resp, nil
}

Expand Down Expand Up @@ -671,17 +686,11 @@ func (app *BaseApp) VerifyVoteExtension(req *abci.RequestVerifyVoteExtension) (r
return resp, err
}

// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
// Specifically, it will execute an application's BeginBlock (if defined), followed
// by the transactions in the proposal, finally followed by the application's
// EndBlock (if defined).
//
// For each raw transaction, i.e. a byte slice, BaseApp will only execute it if
// it adheres to the sdk.Tx interface. Otherwise, the raw transaction will be
// skipped. This is to support compatibility with proposers injecting vote
// extensions into the proposal, which should not themselves be executed in cases
// where they adhere to the sdk.Tx interface.
func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
// internalFinalizeBlock executes the block, called by the Optimistic
// Execution flow or by the FinalizeBlock ABCI method. The context received is
// only used to handle early cancellation, for anything related to state app.finalizeBlockState.ctx
// must be used.
func (app *BaseApp) internalFinalizeBlock(ctx context.Context, req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
var events []abci.Event

if err := app.checkHalt(req.Height, req.Time); err != nil {
Expand Down Expand Up @@ -754,6 +763,15 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons
return nil, err
}

// First check for an abort signal after beginBlock, as it's the first place
// we spend any significant amount of time.
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

events = append(events, beginBlock.Events...)

// Iterate over all raw transactions in the proposal and attempt to execute
Expand All @@ -780,6 +798,14 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons
)
}

// check after every tx if we should abort
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

txResults = append(txResults, response)
}

Expand All @@ -792,6 +818,14 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons
return nil, err
}

// check after endBlock if we should abort, to avoid propagating the result
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// continue
}

events = append(events, endBlock.Events...)
cp := app.GetConsensusParams(app.finalizeBlockState.ctx)

Expand All @@ -800,10 +834,47 @@ func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.Respons
TxResults: txResults,
ValidatorUpdates: endBlock.ValidatorUpdates,
ConsensusParamUpdates: &cp,
AppHash: app.workingHash(),
}, nil
}

// FinalizeBlock will execute the block proposal provided by RequestFinalizeBlock.
// Specifically, it will execute an application's BeginBlock (if defined), followed
// by the transactions in the proposal, finally followed by the application's
// EndBlock (if defined).
//
// For each raw transaction, i.e. a byte slice, BaseApp will only execute it if
// it adheres to the sdk.Tx interface. Otherwise, the raw transaction will be
// skipped. This is to support compatibility with proposers injecting vote
// extensions into the proposal, which should not themselves be executed in cases
// where they adhere to the sdk.Tx interface.
func (app *BaseApp) FinalizeBlock(req *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error) {
if app.optimisticExec.Initialized() {
// check if the hash we got is the same as the one we are executing
aborted := app.optimisticExec.AbortIfNeeded(req.Hash)
// Wait for the OE to finish, regardless of whether it was aborted or not
res, err := app.optimisticExec.WaitResult()

// only return if we are not aborting
if !aborted {
if res != nil {
res.AppHash = app.workingHash()
}
return res, err
}

// if it was aborted, we need to reset the state
app.finalizeBlockState = nil
app.optimisticExec.Reset()
}

// if no OE is running, just run the block (this is either a block replay or a OE that got aborted)
res, err := app.internalFinalizeBlock(context.Background(), req)
if res != nil {
res.AppHash = app.workingHash()
}
return res, err
}

// checkHalt checkes if height or time exceeds halt-height or halt-time respectively.
func (app *BaseApp) checkHalt(height int64, time time.Time) error {
var halt bool
Expand Down
41 changes: 41 additions & 0 deletions baseapp/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2312,3 +2312,44 @@ func TestABCI_PrepareProposal_Panic(t *testing.T) {
require.NoError(t, err)
require.Equal(t, 3, len(resPrepareProposal.Txs))
}

func TestOptimisticExecution(t *testing.T) {
suite := NewBaseAppSuite(t, baseapp.SetOptimisticExecution())

_, err := suite.baseApp.InitChain(&abci.RequestInitChain{
ConsensusParams: &cmtproto.ConsensusParams{},
})
require.NoError(t, err)

// run 50 blocks
for i := 0; i < 50; i++ {
tx := newTxCounter(t, suite.txConfig, 0, 1)
txBytes, err := suite.txConfig.TxEncoder()(tx)
require.NoError(t, err)

reqProcProp := abci.RequestProcessProposal{
Txs: [][]byte{txBytes},
Height: suite.baseApp.LastBlockHeight() + 1,
Hash: []byte("some-hash" + strconv.FormatInt(suite.baseApp.LastBlockHeight()+1, 10)),
}

respProcProp, err := suite.baseApp.ProcessProposal(&reqProcProp)
require.Equal(t, abci.ResponseProcessProposal_ACCEPT, respProcProp.Status)
require.NoError(t, err)

reqFinalizeBlock := abci.RequestFinalizeBlock{
Height: reqProcProp.Height,
Txs: reqProcProp.Txs,
Hash: reqProcProp.Hash,
}

respFinalizeBlock, err := suite.baseApp.FinalizeBlock(&reqFinalizeBlock)
require.NoError(t, err)
require.Len(t, respFinalizeBlock.TxResults, 1)

_, err = suite.baseApp.Commit()
require.NoError(t, err)
}

require.Equal(t, int64(50), suite.baseApp.LastBlockHeight())
}
6 changes: 6 additions & 0 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"cosmossdk.io/store/snapshots"
storetypes "cosmossdk.io/store/types"

"github.com/cosmos/cosmos-sdk/baseapp/oe"
"github.com/cosmos/cosmos-sdk/codec"
codectypes "github.com/cosmos/cosmos-sdk/codec/types"
servertypes "github.com/cosmos/cosmos-sdk/server/types"
Expand Down Expand Up @@ -178,6 +179,11 @@ type BaseApp struct {
chainID string

cdc codec.Codec

// optimisticExec contains the context required for Optimistic Execution,
// including the goroutine handling.This is experimental and must be enabled
// by developers.
optimisticExec *oe.OptimisticExecution
}

// NewBaseApp returns a reference to an initialized BaseApp. It accepts a
Expand Down
157 changes: 157 additions & 0 deletions baseapp/oe/optimistic_execution.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package oe

import (
"bytes"
"context"
"encoding/hex"
"math/rand"
"sync"
"time"

abci "github.com/cometbft/cometbft/abci/types"

"cosmossdk.io/log"
)

// FinalizeBlockFunc is the function that is called by the OE to finalize the
// block. It is the same as the one in the ABCI app.
type FinalizeBlockFunc func(context.Context, *abci.RequestFinalizeBlock) (*abci.ResponseFinalizeBlock, error)

// OptimisticExecution is a struct that contains the OE context. It is used to
// run the FinalizeBlock function in a goroutine, and to abort it if needed.
type OptimisticExecution struct {
finalizeBlockFunc FinalizeBlockFunc // ABCI FinalizeBlock function with a context
logger log.Logger

mtx sync.Mutex
stopCh chan struct{}
request *abci.RequestFinalizeBlock
response *abci.ResponseFinalizeBlock
err error
cancelFunc func() // cancel function for the context
initialized bool // A boolean value indicating whether the struct has been initialized

// debugging/testing options
abortRate int // number from 0 to 100 that determines the percentage of OE that should be aborted
}

// NewOptimisticExecution initializes the Optimistic Execution context but does not start it.
func NewOptimisticExecution(logger log.Logger, fn FinalizeBlockFunc, opts ...func(*OptimisticExecution)) *OptimisticExecution {
logger = logger.With(log.ModuleKey, "oe")
oe := &OptimisticExecution{logger: logger, finalizeBlockFunc: fn}
for _, opt := range opts {
opt(oe)
}
return oe
}

// WithAbortRate sets the abort rate for the OE. The abort rate is a number from
// 0 to 100 that determines the percentage of OE that should be aborted.
// This is for testing purposes only and must not be used in production.
func WithAbortRate(rate int) func(*OptimisticExecution) {
return func(oe *OptimisticExecution) {
oe.abortRate = rate
}
}

// Reset resets the OE context. Must be called whenever we want to invalidate
// the current OE.
func (oe *OptimisticExecution) Reset() {
oe.mtx.Lock()
defer oe.mtx.Unlock()
oe.request = nil
oe.response = nil
oe.err = nil
oe.initialized = false
}

func (oe *OptimisticExecution) Enabled() bool {
return oe != nil
}

// Initialized returns true if the OE was initialized, meaning that it contains
// a request and it was run or it is running.
func (oe *OptimisticExecution) Initialized() bool {
if oe == nil {
return false
}
oe.mtx.Lock()
defer oe.mtx.Unlock()

return oe.initialized
}

// Execute initializes the OE and starts it in a goroutine.
func (oe *OptimisticExecution) Execute(req *abci.RequestProcessProposal) {
oe.mtx.Lock()
defer oe.mtx.Unlock()

oe.stopCh = make(chan struct{})
oe.request = &abci.RequestFinalizeBlock{
Txs: req.Txs,
DecidedLastCommit: req.ProposedLastCommit,
Misbehavior: req.Misbehavior,
Hash: req.Hash,
Height: req.Height,
Time: req.Time,
NextValidatorsHash: req.NextValidatorsHash,
ProposerAddress: req.ProposerAddress,
}

oe.logger.Debug("OE started", "height", req.Height, "hash", hex.EncodeToString(req.Hash), "time", req.Time.String())
ctx, cancel := context.WithCancel(context.Background())
oe.cancelFunc = cancel
oe.initialized = true

go func() {
start := time.Now()
resp, err := oe.finalizeBlockFunc(ctx, oe.request)
oe.mtx.Lock()
executionTime := time.Since(start)
oe.logger.Debug("OE finished", "duration", executionTime.String(), "height", req.Height, "hash", hex.EncodeToString(req.Hash))
oe.response, oe.err = resp, err
close(oe.stopCh)
oe.mtx.Unlock()
}()
}

// AbortIfNeeded aborts the OE if the request hash is not the same as the one in
// the running OE. Returns true if the OE was aborted.
func (oe *OptimisticExecution) AbortIfNeeded(reqHash []byte) bool {
if oe == nil {
return false
}

oe.mtx.Lock()
defer oe.mtx.Unlock()

if !bytes.Equal(oe.request.Hash, reqHash) {
oe.logger.Error("OE aborted due to hash mismatch", "oe_hash", hex.EncodeToString(oe.request.Hash), "req_hash", hex.EncodeToString(reqHash), "oe_height", oe.request.Height, "req_height", oe.request.Height)
oe.cancelFunc()
return true
} else if oe.abortRate > 0 && rand.Intn(100) < oe.abortRate {
// this is for test purposes only, we can emulate a certain percentage of
// OE needed to be aborted.
oe.cancelFunc()
oe.logger.Error("OE aborted due to test abort rate")
return true
}

return false
}

// Abort aborts the OE unconditionally and waits for it to finish.
func (oe *OptimisticExecution) Abort() {
if oe == nil || oe.cancelFunc == nil {
return
}

oe.cancelFunc()
<-oe.stopCh
}

// WaitResult waits for the OE to finish and returns the result.
func (oe *OptimisticExecution) WaitResult() (*abci.ResponseFinalizeBlock, error) {
<-oe.stopCh
return oe.response, oe.err
}
Loading

0 comments on commit 42dbfc4

Please sign in to comment.