Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: Make mempool update async from block.Commit (#3008) #131

Merged
merged 1 commit into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/v0.38.6/features/3008-mempool-async-update.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
- `[consensus]` Make mempool updates asynchronous from consensus Commit's,
reducing latency for reaching consensus timeouts.
([#3008](https://github.com/cometbft/cometbft/pull/3008))
4 changes: 3 additions & 1 deletion spec/abci/abci++_app_requirements.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,9 @@ will be received on the mempool connection during this processing step, providin
update all four
connection states to the latest committed state at the same time.

When `Commit` returns, CometBFT unlocks the mempool.
CometBFT unlocks the mempool after it has finished updating for the new block,
which occurs asynchronously from `Commit`.
See [Mempool Update](../mempool/mempool.md) for more information on what the `update` task does.

WARNING: if the ABCI app logic processing the `Commit` message sends a
`/broadcast_tx_sync` or `/broadcast_tx` and waits for the response
Expand Down
39 changes: 32 additions & 7 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,34 +375,38 @@ func (blockExec *BlockExecutor) VerifyVoteExtension(ctx context.Context, vote *t
return nil
}

// Commit locks the mempool, runs the ABCI Commit message, and updates the
// Commit locks the mempool, runs the ABCI Commit message, and asynchronously starts updating the
// mempool.
// It returns the result of calling abci.Commit which is the height to retain (if any)).
// Commit returns the result of calling abci.Commit which is the height to retain (if any)).
// The application is expected to have persisted its state (if any) before returning
// from the ABCI Commit call. This is the only place where the application should
// persist its state.
// The Mempool must be locked during commit and update because state is
// typically reset on Commit and old txs must be replayed against committed
// state before new txs are run in the mempool, lest they be invalid.
// The mempool is unlocked when the Update routine completes, which is
// asynchronous from Commit.
func (blockExec *BlockExecutor) Commit(
state State,
block *types.Block,
abciResponse *abci.ResponseFinalizeBlock,
) (int64, error) {
blockExec.mempool.Lock()
defer blockExec.mempool.Unlock()
unlockMempool := func() { blockExec.mempool.Unlock() }

// while mempool is Locked, flush to ensure all async requests have completed
// in the ABCI app before Commit.
err := blockExec.mempool.FlushAppConn()
if err != nil {
blockExec.logger.Error("client error during mempool.FlushAppConn", "err", err)
unlockMempool()
blockExec.logger.Error("client error during mempool.FlushAppConn, flushing mempool", "err", err)
return 0, err
}

// Commit block, get hash back
res, err := blockExec.proxyApp.Commit(context.TODO())
if err != nil {
unlockMempool()
blockExec.logger.Error("client error during proxyAppConn.CommitSync", "err", err)
return 0, err
}
Expand All @@ -415,15 +419,36 @@ func (blockExec *BlockExecutor) Commit(
)

// Update mempool.
err = blockExec.mempool.Update(
go blockExec.asyncUpdateMempool(unlockMempool, block, state.Copy(), abciResponse)

return res.RetainHeight, nil
}

// updates the mempool with the latest state asynchronously.
func (blockExec *BlockExecutor) asyncUpdateMempool(
unlockMempool func(),
block *types.Block,
state State,
abciResponse *abci.ResponseFinalizeBlock,
) {
defer unlockMempool()

err := blockExec.mempool.Update(
block.Height,
block.Txs,
abciResponse.TxResults,
TxPreCheck(state),
TxPostCheck(state),
)

return res.RetainHeight, err
if err != nil {
// We panic in this case, out of legacy behavior. Before we made the mempool
// update complete asynchronously from Commit, we would panic if the mempool
// update failed. This is because we panic on any error within commit.
// We should consider changing this behavior in the future, as there is no
// need to panic if the mempool update failed. The most severe thing we
// would need to do is dump the mempool and restart it.
panic(fmt.Sprintf("client error during mempool.Update; error %v", err))
}
}

//---------------------------------------------------------
Expand Down
Loading