From b999f698c30b012c9093939b65a7deb32f35e044 Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Fri, 28 Jun 2024 00:48:58 +0300 Subject: [PATCH] Bring back stop recheck --- mempool/v0/clist_mempool.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/mempool/v0/clist_mempool.go b/mempool/v0/clist_mempool.go index cf555c4c540..a85646ef01c 100644 --- a/mempool/v0/clist_mempool.go +++ b/mempool/v0/clist_mempool.go @@ -46,10 +46,11 @@ type CListMempool struct { // Track whether we're rechecking txs. // These are not protected by a mutex and are expected to be mutated in // serial (ie. by abci responses which are called in serial). - recheckCursor *clist.CElement // next expected response - recheckEnd *clist.CElement // re-checking stops here - isRechecking atomic.Bool // true iff the rechecking process has begun and is not yet finished - recheckFull atomic.Bool // whether rechecking TXs cannot be completed before a new block is decided + recheckCursor *clist.CElement // next expected response + recheckEnd *clist.CElement // re-checking stops here + isRechecking atomic.Bool // true iff the rechecking process has begun and is not yet finished + recheckFull atomic.Bool // whether rechecking TXs cannot be completed before a new block is decided + stopRechecking atomic.Bool // true iff the rechecking process has begun and is not yet finished // Map for quick access to txs to record sender in CheckTx. // txsMap: txKey -> CElement @@ -149,6 +150,7 @@ func (mem *CListMempool) PreUpdate() { if rechecking != recheckFull { mem.logger.Debug("the state of recheckFull has flipped") } + mem.stopRechecking.Store(true) } // Safe for concurrent use by multiple goroutines. @@ -673,6 +675,7 @@ func (mem *CListMempool) Update( } func (mem *CListMempool) recheckTxs() { + mem.stopRechecking.Store(false) if mem.Size() == 0 { panic("recheckTxs is called, but the mempool is empty") } @@ -684,6 +687,10 @@ func (mem *CListMempool) recheckTxs() { // Push txs to proxyAppConn // NOTE: globalCb may be called concurrently. for e := mem.txs.Front(); e != nil; e = e.Next() { + if mem.stopRechecking.Load() { + break + } + memTx := e.Value.(*mempoolTx) mem.proxyAppConn.CheckTxAsync(abci.RequestCheckTx{ Tx: memTx.tx, @@ -691,6 +698,7 @@ func (mem *CListMempool) recheckTxs() { }) } + mem.stopRechecking.Store(false) mem.proxyAppConn.FlushAsync() }