Skip to content

Commit

Permalink
mempool: rework lock discipline to mitigate callback deadlocks (#9030)
Browse files Browse the repository at this point in the history
The priority mempool has a stricter synchronization requirement than the legacy
mempool. Under sufficiently-heavy load, exclusive access can lead to deadlocks
when processing a large batch of transaction rechecks through an out-of-process
application using the socket client.

By design, a socket client stalls when its send buffer fills, during which time
it holds a lock shared with the receive thread.  While blocked in this state, a
response read by the receive thread waits for the shared lock so the callback
can be invoked.

If we're lucky, the server will then read the next request and make enough room
in the buffer for the sender to proceed. If not however (e.g., if the next
request is bigger than the one just consumed), the receive thread is blocked:
It is waiting on the lock and cannot read a response.  Once the server's output
buffer fills, the system deadlocks.

This can happen with any sufficiently-busy workload, but is more likely during
a large recheck in the v1 mempool, where the callbacks need exclusive access to
mempool state.  As a workaround, process rechecks for the priority mempool in
their own goroutines outside the mempool mutex.  Responses still head-of-line
block, but will no longer get pushback due to contention on the mempool itself.
  • Loading branch information
M. J. Fromberger authored Jul 19, 2022
1 parent 32761ec commit 22ed610
Showing 1 changed file with 73 additions and 110 deletions.
183 changes: 73 additions & 110 deletions internal/mempool/v1/mempool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@ package v1
import (
"context"
"fmt"
"reflect"
"runtime"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/creachadair/taskgroup"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/internal/libs/clist"
Expand Down Expand Up @@ -41,8 +42,7 @@ type TxMempool struct {
cache mempool.TxCache // seen transactions

// Atomically-updated fields
txsBytes int64 // atomic: the total size of all transactions in the mempool, in bytes
txRecheck int64 // atomic: the number of pending recheck calls
txsBytes int64 // atomic: the total size of all transactions in the mempool, in bytes

// Synchronized fields, protected by mtx.
mtx *sync.RWMutex
Expand Down Expand Up @@ -83,8 +83,6 @@ func NewTxMempool(
txmp.cache = mempool.NewLRUTxCache(cfg.CacheSize)
}

proxyAppConn.SetResponseCallback(txmp.recheckTxCallback)

for _, opt := range options {
opt(txmp)
}
Expand Down Expand Up @@ -182,7 +180,6 @@ func (txmp *TxMempool) CheckTx(
cb func(*abci.Response),
txInfo mempool.TxInfo,
) error {

// During the initial phase of CheckTx, we do not need to modify any state.
// A transaction will not actually be added to the mempool until it survives
// a call to the ABCI CheckTx method and size constraint checks.
Expand Down Expand Up @@ -224,31 +221,23 @@ func (txmp *TxMempool) CheckTx(
return err
}

// Initiate an ABCI CheckTx for this transaction. The callback is
// responsible for adding the transaction to the pool if it survives.
//
// N.B.: We have to issue the call outside the lock. In a local client,
// even an "async" call invokes its callback immediately which will make
// the callback deadlock trying to acquire the same lock. This isn't a
// problem with out-of-process calls, but this has to work for both.
reqRes, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{Tx: tx})
// Invoke an ABCI CheckTx for this transaction.
rsp, err := txmp.proxyAppConn.CheckTxSync(ctx, abci.RequestCheckTx{Tx: tx})
if err != nil {
txmp.cache.Remove(tx)
return err
}
reqRes.SetCallback(func(res *abci.Response) {
wtx := &WrappedTx{
tx: tx,
hash: tx.Key(),
timestamp: time.Now().UTC(),
height: height,
}
wtx.SetPeer(txInfo.SenderID)
txmp.initialTxCallback(wtx, res)
if cb != nil {
cb(res)
}
})
wtx := &WrappedTx{
tx: tx,
hash: tx.Key(),
timestamp: time.Now().UTC(),
height: height,
}
wtx.SetPeer(txInfo.SenderID)
txmp.addNewTransaction(wtx, rsp)
if cb != nil {
cb(&abci.Response{Value: &abci.Response_CheckTx{CheckTx: rsp}})
}
return nil
}

Expand Down Expand Up @@ -304,10 +293,6 @@ func (txmp *TxMempool) Flush() {
cur = next
}
txmp.cache.Reset()

// Discard any pending recheck calls that may be in flight. The calls will
// still complete, but will have no effect on the mempool.
atomic.StoreInt64(&txmp.txRecheck, 0)
}

// allEntriesSorted returns a slice of all the transactions currently in the
Expand Down Expand Up @@ -403,12 +388,6 @@ func (txmp *TxMempool) Update(
newPreFn mempool.PreCheckFunc,
newPostFn mempool.PostCheckFunc,
) error {
// TODO(creachadair): This would be a nice safety check but requires Go 1.18.
// // Safety check: The caller is required to hold the lock.
// if txmp.mtx.TryLock() {
// txmp.mtx.Unlock()
// panic("mempool: Update caller does not hold the lock")
// }
// Safety check: Transactions and responses must match in number.
if len(blockTxs) != len(deliverTxResponses) {
panic(fmt.Sprintf("mempool: got %d transactions but %d DeliverTx responses",
Expand Down Expand Up @@ -456,9 +435,9 @@ func (txmp *TxMempool) Update(
return nil
}

// initialTxCallback handles the ABCI CheckTx response for the first time a
// addNewTransaction handles the ABCI CheckTx response for the first time a
// transaction is added to the mempool. A recheck after a block is committed
// goes to the default callback (see recheckTxCallback).
// goes to handleRecheckResult.
//
// If either the application rejected the transaction or a post-check hook is
// defined and rejects the transaction, it is discarded.
Expand All @@ -469,31 +448,22 @@ func (txmp *TxMempool) Update(
// transactions are evicted.
//
// Finally, the new transaction is added and size stats updated.
func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
if !ok {
txmp.logger.Error("mempool: received incorrect result type in CheckTx callback",
"expected", reflect.TypeOf(&abci.Response_CheckTx{}).Name(),
"got", reflect.TypeOf(res.Value).Name(),
)
return
}

func (txmp *TxMempool) addNewTransaction(wtx *WrappedTx, checkTxRes *abci.ResponseCheckTx) {
txmp.mtx.Lock()
defer txmp.mtx.Unlock()

var err error
if txmp.postCheck != nil {
err = txmp.postCheck(wtx.tx, checkTxRes.CheckTx)
err = txmp.postCheck(wtx.tx, checkTxRes)
}

if err != nil || checkTxRes.CheckTx.Code != abci.CodeTypeOK {
if err != nil || checkTxRes.Code != abci.CodeTypeOK {
txmp.logger.Info(
"rejected bad transaction",
"priority", wtx.Priority(),
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"peer_id", wtx.peers,
"code", checkTxRes.CheckTx.Code,
"code", checkTxRes.Code,
"post_check_err", err,
)

Expand All @@ -508,13 +478,13 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
// If there was a post-check error, record its text in the result for
// debugging purposes.
if err != nil {
checkTxRes.CheckTx.MempoolError = err.Error()
checkTxRes.MempoolError = err.Error()
}
return
}

priority := checkTxRes.CheckTx.Priority
sender := checkTxRes.CheckTx.Sender
priority := checkTxRes.Priority
sender := checkTxRes.Sender

// Disallow multiple concurrent transactions from the same sender assigned
// by the ABCI application. As a special case, an empty sender is not
Expand All @@ -528,7 +498,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
"tx", fmt.Sprintf("%X", w.tx.Hash()),
"sender", sender,
)
checkTxRes.CheckTx.MempoolError =
checkTxRes.MempoolError =
fmt.Sprintf("rejected valid incoming transaction; tx already exists for sender %q (%X)",
sender, w.tx.Hash())
txmp.metrics.RejectedTxs.Add(1)
Expand Down Expand Up @@ -563,7 +533,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"err", err.Error(),
)
checkTxRes.CheckTx.MempoolError =
checkTxRes.MempoolError =
fmt.Sprintf("rejected valid incoming transaction; mempool is full (%X)",
wtx.tx.Hash())
txmp.metrics.RejectedTxs.Add(1)
Expand Down Expand Up @@ -609,7 +579,7 @@ func (txmp *TxMempool) initialTxCallback(wtx *WrappedTx, res *abci.Response) {
}
}

wtx.SetGasWanted(checkTxRes.CheckTx.GasWanted)
wtx.SetGasWanted(checkTxRes.GasWanted)
wtx.SetPriority(priority)
wtx.SetSender(sender)
txmp.insertTx(wtx)
Expand All @@ -636,33 +606,14 @@ func (txmp *TxMempool) insertTx(wtx *WrappedTx) {
atomic.AddInt64(&txmp.txsBytes, wtx.Size())
}

// recheckTxCallback handles the responses from ABCI CheckTx calls issued
// during the recheck phase of a block Update. It updates the recheck counter
// and removes any transactions invalidated by the application.
// handleRecheckResult handles the responses from ABCI CheckTx calls issued
// during the recheck phase of a block Update. It removes any transactions
// invalidated by the application.
//
// This callback is NOT executed for the initial CheckTx on a new transaction;
// that case is handled by initialTxCallback instead.
func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response) {
checkTxRes, ok := res.Value.(*abci.Response_CheckTx)
if !ok {
// Don't log this; this is the default callback and other response types
// can safely be ignored.
return
}

// Check whether we are expecting recheck responses at this point.
// If not, we will ignore the response, this usually means the mempool was Flushed.
// If this is the "last" pending recheck, trigger a notification when it's been processed.
numLeft := atomic.AddInt64(&txmp.txRecheck, -1)
if numLeft == 0 {
defer txmp.notifyTxsAvailable() // notify waiters on return, if mempool is non-empty
} else if numLeft < 0 {
return
}

// This method is NOT executed for the initial CheckTx on a new transaction;
// that case is handled by addNewTransaction instead.
func (txmp *TxMempool) handleRecheckResult(tx types.Tx, checkTxRes *abci.ResponseCheckTx) {
txmp.metrics.RecheckTimes.Add(1)
tx := types.Tx(req.GetCheckTx().Tx)

txmp.mtx.Lock()
defer txmp.mtx.Unlock()

Expand All @@ -678,11 +629,11 @@ func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response)
// If a postcheck hook is defined, call it before checking the result.
var err error
if txmp.postCheck != nil {
err = txmp.postCheck(tx, checkTxRes.CheckTx)
err = txmp.postCheck(tx, checkTxRes)
}

if checkTxRes.CheckTx.Code == abci.CodeTypeOK && err == nil {
wtx.SetPriority(checkTxRes.CheckTx.Priority)
if checkTxRes.Code == abci.CodeTypeOK && err == nil {
wtx.SetPriority(checkTxRes.Priority)
return // N.B. Size of mempool did not change
}

Expand All @@ -691,7 +642,7 @@ func (txmp *TxMempool) recheckTxCallback(req *abci.Request, res *abci.Response)
"priority", wtx.Priority(),
"tx", fmt.Sprintf("%X", wtx.tx.Hash()),
"err", err,
"code", checkTxRes.CheckTx.Code,
"code", checkTxRes.Code,
)
txmp.removeTxByElement(elt)
txmp.metrics.FailedTxs.Add(1)
Expand All @@ -716,33 +667,45 @@ func (txmp *TxMempool) recheckTransactions() {
"num_txs", txmp.Size(),
"height", txmp.height,
)
// N.B.: We have to issue the calls outside the lock. In a local client,
// even an "async" call invokes its callback immediately which will make the
// callback deadlock trying to acquire the same lock. This isn't a problem
// with out-of-process calls, but this has to work for both.
txmp.mtx.Unlock()
defer txmp.mtx.Lock()

ctx := context.TODO()
atomic.StoreInt64(&txmp.txRecheck, int64(txmp.txs.Len()))
// Collect transactions currently in the mempool requiring recheck.
wtxs := make([]*WrappedTx, 0, txmp.txs.Len())
for e := txmp.txs.Front(); e != nil; e = e.Next() {
wtx := e.Value.(*WrappedTx)

// The response for this CheckTx is handled by the default recheckTxCallback.
_, err := txmp.proxyAppConn.CheckTxAsync(ctx, abci.RequestCheckTx{
Tx: wtx.tx,
Type: abci.CheckTxType_Recheck,
})
if err != nil {
txmp.logger.Error("failed to execute CheckTx during recheck",
"err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash()))
atomic.AddInt64(&txmp.txRecheck, -1)
wtxs = append(wtxs, e.Value.(*WrappedTx))
}

// Issue CheckTx calls for each remaining transaction, and when all the
// rechecks are complete signal watchers that transactions may be available.
go func() {
ctx := context.TODO()
g, start := taskgroup.New(nil).Limit(2 * runtime.NumCPU())

for _, wtx := range wtxs {
wtx := wtx
start(func() error {
rsp, err := txmp.proxyAppConn.CheckTxSync(ctx, abci.RequestCheckTx{
Tx: wtx.tx,
Type: abci.CheckTxType_Recheck,
})
if err != nil {
txmp.logger.Error("failed to execute CheckTx during recheck",
"err", err, "hash", fmt.Sprintf("%x", wtx.tx.Hash()))
} else {
txmp.handleRecheckResult(wtx.tx, rsp)
}
return nil
})
}
if _, err := txmp.proxyAppConn.FlushAsync(ctx); err != nil {
txmp.logger.Error("failed to flush transactions during recheck", "err", err)
}
}

if _, err := txmp.proxyAppConn.FlushAsync(ctx); err != nil {
txmp.logger.Error("failed to flush transactions during recheck", "err", err)
}
// When recheck is complete, trigger a notification for more transactions.
_ = g.Wait()
txmp.mtx.Lock()
defer txmp.mtx.Unlock()
txmp.notifyTxsAvailable()
}()
}

// canAddTx returns an error if we cannot insert the provided *WrappedTx into
Expand Down

0 comments on commit 22ed610

Please sign in to comment.