Skip to content

Commit

Permalink
chore: remove mempool.postCheck (#158) (#181)
Browse files Browse the repository at this point in the history
  • Loading branch information
wetcod authored and Woosang Son committed Mar 8, 2021
1 parent 48bb17e commit fb98372
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 99 deletions.
2 changes: 1 addition & 1 deletion mempool/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestCacheAfterUpdate(t *testing.T) {
tx := types.Tx{byte(v)}
updateTxs = append(updateTxs, tx)
}
err := mempool.Update(int64(tcIndex), updateTxs, abciResponses(len(updateTxs), abci.CodeTypeOK), nil, nil)
err := mempool.Update(int64(tcIndex), updateTxs, abciResponses(len(updateTxs), abci.CodeTypeOK), nil)
require.NoError(t, err)

for _, v := range tc.reAddIndices {
Expand Down
84 changes: 50 additions & 34 deletions mempool/clist_mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ type CListMempool struct {
height int64 // the last block Update()'d to
txsBytes int64 // total size of mempool, in bytes

reserved int // the number of checking tx and it should be considered when checking mempool full
reservedBytes int64 // size of checking tx and it should be considered when checking mempool full
reservedMtx sync.Mutex

// notify listeners (ie. consensus) when txs are available
notifiedTxsAvailable bool
txsAvailable chan struct{} // fires once for each height, when the mempool is not empty
Expand All @@ -49,7 +53,6 @@ type CListMempool struct {
// CheckTx or ReapMaxBytesMaxGas(ReapMaxTxs) methods.
updateMtx tmsync.RWMutex
preCheck PreCheckFunc
postCheck PostCheckFunc

wal *auto.AutoFile // a log of mempool txs
txs *clist.CList // concurrent linked-list of good txs
Expand Down Expand Up @@ -125,13 +128,6 @@ func WithPreCheck(f PreCheckFunc) CListMempoolOption {
return func(mem *CListMempool) { mem.preCheck = f }
}

// WithPostCheck sets a filter for the mempool to reject a tx if f(tx) returns
// false. This is ran after CheckTx. Only applies to the first created block.
// After that, Update overwrites the existing value.
func WithPostCheck(f PostCheckFunc) CListMempoolOption {
return func(mem *CListMempool) { mem.postCheck = f }
}

// WithMetrics sets the metrics.
func WithMetrics(metrics *Metrics) CListMempoolOption {
return func(mem *CListMempool) { mem.metrics = metrics }
Expand Down Expand Up @@ -285,7 +281,16 @@ func (mem *CListMempool) CheckTx(tx types.Tx, cb func(*abci.Response), txInfo Tx

return ErrTxInCache
}
// END CACHE

// reserve mempool that should be called just before calling `mem.proxyAppConn.CheckTxAsync()`
if err := mem.reserve(int64(txSize)); err != nil {
// remove from cache
mem.cache.Remove(tx)
return err
}

// CONTRACT: `app.CheckTxAsync()` should check whether `GasWanted` is valid (0 <= GasWanted <= block.masGas)
reqRes := mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{Tx: tx})
reqRes.SetCallback(mem.reqResCb(tx, txInfo.SenderID, txInfo.SenderP2PID, cb))

Expand Down Expand Up @@ -395,6 +400,35 @@ func (mem *CListMempool) isFull(txSize int) error {
return nil
}

func (mem *CListMempool) reserve(txSize int64) error {
mem.reservedMtx.Lock()
defer mem.reservedMtx.Unlock()

var (
memSize = mem.Size()
txsBytes = mem.TxsBytes()
)

if memSize+mem.reserved >= mem.config.Size || txSize+mem.reservedBytes+txsBytes > mem.config.MaxTxsBytes {
return ErrMempoolIsFull{
memSize + mem.reserved, mem.config.Size,
txsBytes + mem.reservedBytes, mem.config.MaxTxsBytes,
}
}

mem.reserved++
mem.reservedBytes += txSize
return nil
}

func (mem *CListMempool) releaseReserve(txSize int64) {
mem.reservedMtx.Lock()
defer mem.reservedMtx.Unlock()

mem.reserved--
mem.reservedBytes -= txSize
}

// callback, which is called after the app checked the tx for the first time.
//
// The case where the app checks the tx for the second and subsequent times is
Expand All @@ -407,20 +441,7 @@ func (mem *CListMempool) resCbFirstTime(
) {
switch r := res.Value.(type) {
case *abci.Response_CheckTx:
var postCheckErr error
if mem.postCheck != nil {
postCheckErr = mem.postCheck(tx, r.CheckTx)
}
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
// Check mempool isn't full again to reduce the chance of exceeding the
// limits.
if err := mem.isFull(len(tx)); err != nil {
// remove from cache (mempool might have a space later)
mem.cache.Remove(tx)
mem.logger.Error(err.Error())
return
}

if r.CheckTx.Code == abci.CodeTypeOK {
memTx := &mempoolTx{
height: mem.height,
gasWanted: r.CheckTx.GasWanted,
Expand All @@ -438,13 +459,16 @@ func (mem *CListMempool) resCbFirstTime(
} else {
// ignore bad transaction
mem.logger.Debug("rejected bad transaction",
"tx", txID(tx), "peerID", peerP2PID, "res", r, "err", postCheckErr)
"tx", txID(tx), "peerID", peerP2PID, "res", r, "err")
mem.metrics.FailedTxs.Add(1)
if !mem.config.KeepInvalidTxsInCache {
// remove from cache (it might be good later)
mem.cache.Remove(tx)
}
}

// release `reserve` regardless it's OK or not (it might be good later)
mem.releaseReserve(int64(len(tx)))
default:
// ignore other messages
}
Expand All @@ -465,15 +489,11 @@ func (mem *CListMempool) resCbRecheck(req *abci.Request, res *abci.Response) {
memTx.tx,
tx))
}
var postCheckErr error
if mem.postCheck != nil {
postCheckErr = mem.postCheck(tx, r.CheckTx)
}
if (r.CheckTx.Code == abci.CodeTypeOK) && postCheckErr == nil {
if r.CheckTx.Code == abci.CodeTypeOK {
// Good, nothing to do.
} else {
// Tx became invalidated due to newly committed block.
mem.logger.Debug("tx is no longer valid", "tx", txID(tx), "res", r, "err", postCheckErr)
mem.logger.Debug("tx is no longer valid", "tx", txID(tx), "res", r, "err")

This comment has been minimized.

Copy link
@jinsan-line

jinsan-line Mar 9, 2021

I think we need to remove last "err" after merging this PR.

// NOTE: we remove tx from the cache because it might be good later
mem.removeTx(tx, mem.recheckCursor, !mem.config.KeepInvalidTxsInCache)
}
Expand Down Expand Up @@ -566,13 +586,12 @@ func (mem *CListMempool) ReapMaxTxs(max int) types.Txs {
return txs
}

// Lock() must be help by the caller during execution.
// Lock() must be held by the caller during execution.
func (mem *CListMempool) Update(
height int64,
txs types.Txs,
deliverTxResponses []*abci.ResponseDeliverTx,
preCheck PreCheckFunc,
postCheck PostCheckFunc,
) error {
// Set height
mem.height = height
Expand All @@ -581,9 +600,6 @@ func (mem *CListMempool) Update(
if preCheck != nil {
mem.preCheck = preCheck
}
if postCheck != nil {
mem.postCheck = postCheck
}

for i, tx := range txs {
if deliverTxResponses[i].Code == abci.CodeTypeOK {
Expand Down
38 changes: 15 additions & 23 deletions mempool/clist_mempool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,30 +147,22 @@ func TestMempoolFilters(t *testing.T) {
emptyTxArr := []types.Tx{[]byte{}}

nopPreFilter := func(tx types.Tx) error { return nil }
nopPostFilter := func(tx types.Tx, res *abci.ResponseCheckTx) error { return nil }

// each table driven test creates numTxsToCreate txs with checkTx, and at the end clears all remaining txs.
// each tx has 20 bytes
tests := []struct {
numTxsToCreate int
preFilter PreCheckFunc
postFilter PostCheckFunc
expectedNumTxs int
}{
{10, nopPreFilter, nopPostFilter, 10},
{10, PreCheckMaxBytes(10), nopPostFilter, 0},
{10, PreCheckMaxBytes(22), nopPostFilter, 10},
{10, nopPreFilter, PostCheckMaxGas(-1), 10},
{10, nopPreFilter, PostCheckMaxGas(0), 0},
{10, nopPreFilter, PostCheckMaxGas(1), 10},
{10, nopPreFilter, PostCheckMaxGas(3000), 10},
{10, PreCheckMaxBytes(10), PostCheckMaxGas(20), 0},
{10, PreCheckMaxBytes(30), PostCheckMaxGas(20), 10},
{10, PreCheckMaxBytes(22), PostCheckMaxGas(1), 10},
{10, PreCheckMaxBytes(22), PostCheckMaxGas(0), 0},
{10, nopPreFilter, 10},
{10, PreCheckMaxBytes(10), 0},
{10, PreCheckMaxBytes(20), 0},
{10, PreCheckMaxBytes(22), 10},
{10, PreCheckMaxBytes(30), 10},
}
for tcIndex, tt := range tests {
err := mempool.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter, tt.postFilter)
err := mempool.Update(1, emptyTxArr, abciResponses(len(emptyTxArr), abci.CodeTypeOK), tt.preFilter)
require.NoError(t, err)
checkTxs(t, mempool, tt.numTxsToCreate, UnknownPeerID)
require.Equal(t, tt.expectedNumTxs, mempool.Size(), "mempool had the incorrect size, on test case %d", tcIndex)
Expand All @@ -186,7 +178,7 @@ func TestMempoolUpdate(t *testing.T) {

// 1. Adds valid txs to the cache
{
err := mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
err := mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil)
require.NoError(t, err)
err = mempool.CheckTx([]byte{0x01}, nil, TxInfo{})
if assert.Error(t, err) {
Expand All @@ -198,7 +190,7 @@ func TestMempoolUpdate(t *testing.T) {
{
err := mempool.CheckTx([]byte{0x02}, nil, TxInfo{})
require.NoError(t, err)
err = mempool.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
err = mempool.Update(1, []types.Tx{[]byte{0x02}}, abciResponses(1, abci.CodeTypeOK), nil)
require.NoError(t, err)
assert.Zero(t, mempool.Size())
}
Expand All @@ -207,7 +199,7 @@ func TestMempoolUpdate(t *testing.T) {
{
err := mempool.CheckTx([]byte{0x03}, nil, TxInfo{})
require.NoError(t, err)
err = mempool.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil, nil)
err = mempool.Update(1, []types.Tx{[]byte{0x03}}, abciResponses(1, 1), nil)
require.NoError(t, err)
assert.Zero(t, mempool.Size())

Expand Down Expand Up @@ -239,7 +231,7 @@ func TestMempool_KeepInvalidTxsInCache(t *testing.T) {
_ = app.DeliverTx(abci.RequestDeliverTx{Tx: a})
_ = app.DeliverTx(abci.RequestDeliverTx{Tx: b})
err = mempool.Update(1, []types.Tx{a, b},
[]*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}, {Code: 2}}, nil, nil)
[]*abci.ResponseDeliverTx{{Code: abci.CodeTypeOK}, {Code: 2}}, nil)
require.NoError(t, err)

// a must be added to the cache
Expand Down Expand Up @@ -294,7 +286,7 @@ func TestTxsAvailable(t *testing.T) {
// it should fire once now for the new height
// since there are still txs left
committedTxs, txs := txs[:50], txs[50:]
if err := mempool.Update(1, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil {
if err := mempool.Update(1, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil); err != nil {
t.Error(err)
}
ensureFire(t, mempool.TxsAvailable(), timeoutMS)
Expand All @@ -306,7 +298,7 @@ func TestTxsAvailable(t *testing.T) {

// now call update with all the txs. it should not fire as there are no txs left
committedTxs = append(txs, moreTxs...) //nolint: gocritic
if err := mempool.Update(2, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil, nil); err != nil {
if err := mempool.Update(2, committedTxs, abciResponses(len(committedTxs), abci.CodeTypeOK), nil); err != nil {
t.Error(err)
}
ensureNoFire(t, mempool.TxsAvailable(), timeoutMS)
Expand Down Expand Up @@ -365,7 +357,7 @@ func TestSerialReap(t *testing.T) {
binary.BigEndian.PutUint64(txBytes, uint64(i))
txs = append(txs, txBytes)
}
if err := mempool.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil, nil); err != nil {
if err := mempool.Update(0, txs, abciResponses(len(txs), abci.CodeTypeOK), nil); err != nil {
t.Error(err)
}
}
Expand Down Expand Up @@ -536,7 +528,7 @@ func TestMempoolTxsBytes(t *testing.T) {
assert.EqualValues(t, 1, mempool.TxsBytes())

// 3. zero again after tx is removed by Update
err = mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil, nil)
err = mempool.Update(1, []types.Tx{[]byte{0x01}}, abciResponses(1, abci.CodeTypeOK), nil)
require.NoError(t, err)
assert.EqualValues(t, 0, mempool.TxsBytes())

Expand Down Expand Up @@ -586,7 +578,7 @@ func TestMempoolTxsBytes(t *testing.T) {
require.NotEmpty(t, res2.Data)

// Pretend like we committed nothing so txBytes gets rechecked and removed.
err = mempool.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil, nil)
err = mempool.Update(1, []types.Tx{}, abciResponses(0, abci.CodeTypeOK), nil)
require.NoError(t, err)
assert.EqualValues(t, 0, mempool.TxsBytes())

Expand Down
25 changes: 0 additions & 25 deletions mempool/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type Mempool interface {
blockTxs types.Txs,
deliverTxResponses []*abci.ResponseDeliverTx,
newPreFn PreCheckFunc,
newPostFn PostCheckFunc,
) error

// FlushAppConn flushes the mempool connection to ensure async reqResCb calls are
Expand Down Expand Up @@ -85,11 +84,6 @@ type Mempool interface {
// transaction doesn't exceeded the block size.
type PreCheckFunc func(types.Tx) error

// PostCheckFunc is an optional filter executed after CheckTx and rejects
// transaction if false is returned. An example would be to ensure a
// transaction doesn't require more gas than available for the block.
type PostCheckFunc func(types.Tx, *abci.ResponseCheckTx) error

// TxInfo are parameters that get passed when attempting to add a tx to the
// mempool.
type TxInfo struct {
Expand All @@ -114,22 +108,3 @@ func PreCheckMaxBytes(maxBytes int64) PreCheckFunc {
return nil
}
}

// PostCheckMaxGas checks that the wanted gas is smaller or equal to the passed
// maxGas. Returns nil if maxGas is -1.
func PostCheckMaxGas(maxGas int64) PostCheckFunc {
return func(tx types.Tx, res *abci.ResponseCheckTx) error {
if maxGas == -1 {
return nil
}
if res.GasWanted < 0 {
return fmt.Errorf("gas wanted %d is negative",
res.GasWanted)
}
if res.GasWanted > maxGas {
return fmt.Errorf("gas wanted %d is greater than max gas %d",
res.GasWanted, maxGas)
}
return nil
}
}
1 change: 0 additions & 1 deletion mempool/mock/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ func (Mempool) Update(
_ types.Txs,
_ []*abci.ResponseDeliverTx,
_ mempl.PreCheckFunc,
_ mempl.PostCheckFunc,
) error {
return nil
}
Expand Down
1 change: 0 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,6 @@ func createMempoolAndMempoolReactor(config *cfg.Config, proxyApp proxy.AppConns,
state.LastBlockHeight,
mempl.WithMetrics(memplMetrics),
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempoolLogger := logger.With("module", "mempool")
mempoolReactor := mempl.NewReactor(config.Mempool, mempool)
Expand Down
1 change: 0 additions & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,6 @@ func TestCreateProposalBlock(t *testing.T) {
state.LastBlockHeight,
mempl.WithMetrics(memplMetrics),
mempl.WithPreCheck(sm.TxPreCheck(state)),
mempl.WithPostCheck(sm.TxPostCheck(state)),
)
mempool.SetLogger(logger)

Expand Down
8 changes: 1 addition & 7 deletions state/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,13 +257,7 @@ func (blockExec *BlockExecutor) Commit(

// Update mempool.
updateMempoolStartTime := time.Now().UnixNano()
err = blockExec.mempool.Update(
block.Height,
block.Txs,
deliverTxResponses,
TxPreCheck(state),
TxPostCheck(state),
)
err = blockExec.mempool.Update(block.Height, block.Txs, deliverTxResponses, TxPreCheck(state))
updateMempoolEndTime := time.Now().UnixNano()

updateMempoolTimeMs := float64(updateMempoolEndTime-updateMempoolStartTime) / 1000000
Expand Down
6 changes: 0 additions & 6 deletions state/tx_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,3 @@ func TxPreCheck(state State) mempl.PreCheckFunc {
)
return mempl.PreCheckMaxBytes(maxDataBytes)
}

// TxPostCheck returns a function to filter transactions after processing.
// The function limits the gas wanted by a transaction to the block's maximum total gas.
func TxPostCheck(state State) mempl.PostCheckFunc {
return mempl.PostCheckMaxGas(state.ConsensusParams.Block.MaxGas)
}

0 comments on commit fb98372

Please sign in to comment.