diff --git a/.changelog/unreleased/improvements/2475-blocksync-2nd-request.md b/.changelog/unreleased/improvements/2475-blocksync-2nd-request.md new file mode 100644 index 00000000000..67614a8e35f --- /dev/null +++ b/.changelog/unreleased/improvements/2475-blocksync-2nd-request.md @@ -0,0 +1,3 @@ +- `[blocksync]` Request a block from peer B if we are approaching pool's height + (less than 50 blocks) and the current peer A is slow in sending us the + block [\#2475](https://github.com/cometbft/cometbft/pull/2475) diff --git a/.changelog/unreleased/improvements/2475-blocksync-no-block-response.md b/.changelog/unreleased/improvements/2475-blocksync-no-block-response.md new file mode 100644 index 00000000000..d01b3679866 --- /dev/null +++ b/.changelog/unreleased/improvements/2475-blocksync-no-block-response.md @@ -0,0 +1,3 @@ +- `[blocksync]` Request the block N from peer B immediately after getting + `NoBlockResponse` from peer A + [\#2475](https://github.com/cometbft/cometbft/pull/2475) diff --git a/.changelog/unreleased/improvements/2475-blocksync-sort-peers.md b/.changelog/unreleased/improvements/2475-blocksync-sort-peers.md new file mode 100644 index 00000000000..5c544401ba6 --- /dev/null +++ b/.changelog/unreleased/improvements/2475-blocksync-sort-peers.md @@ -0,0 +1,2 @@ +- `[blocksync]` Sort peers by download rate (the fastest peer is picked first) + [\#2475](https://github.com/cometbft/cometbft/pull/2475) diff --git a/CHANGELOG.md b/CHANGELOG.md index b91ed664f82..59ef01f70e5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ * [#11](https://github.com/osmosis-labs/cometbft/pull/11) Skip verification of commit sigs * [#13](https://github.com/osmosis-labs/cometbft/pull/13) Avoid double-saving ABCI responses * [#17](https://github.com/osmosis-labs/cometbft/pull/17) Set the max number of (concurrently) downloaded blocks to {peersCount * 20} +* [#18](https://github.com/osmosis-labs/cometbft/pull/18) Sort peers by download rate & multiple requests for closer blocks ## osmo-v23/v0.37.4-2 diff --git a/blocksync/pool.go b/blocksync/pool.go index 7645fd586d6..2d742293313 100644 --- a/blocksync/pool.go +++ b/blocksync/pool.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "math" + "sort" "time" flow "github.com/cometbft/cometbft/libs/flowrate" @@ -34,9 +35,20 @@ const ( // enough. If a peer is not sending us data at at least that rate, we // consider them to have timedout and we disconnect. // - // Assuming a DSL connection (not a good choice) 128 Kbps (upload) ~ 15 KB/s, - // sending data across atlantic ~ 7.5 KB/s. - minRecvRate = 7680 + // Based on the experiments with [Osmosis](https://osmosis.zone/), the + // minimum rate could be as high as 500 KB/s. However, we're setting it to + // 128 KB/s for now to be conservative. + minRecvRate = 128 * 1024 // 128 KB/s + + // peerConnWait is the time that must have elapsed since the pool routine + // was created before we start making requests. This is to give the peer + // routine time to connect to peers. + peerConnWait = 3 * time.Second + + // If we're within minBlocksForSingleRequest blocks of the pool's height, we + // send 2 parallel requests to 2 peers for the same block. If we're further + // away, we send a single request. + minBlocksForSingleRequest = 50 ) var ( @@ -58,7 +70,8 @@ var ( // BlockPool keeps track of the block sync peers, block requests and block responses. type BlockPool struct { service.BaseService - startTime time.Time + startTime time.Time + startHeight int64 mtx cmtsync.Mutex // block requests @@ -66,7 +79,8 @@ type BlockPool struct { height int64 // the lowest key in requesters. // peers peers map[p2p.ID]*bpPeer - maxPeerHeight int64 // the biggest reported height + sortedPeers []*bpPeer // sorted by curRate, highest first + maxPeerHeight int64 // the biggest reported height requestsCh chan<- BlockRequest errorsCh chan<- peerError @@ -78,8 +92,9 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p bp := &BlockPool{ peers: make(map[p2p.ID]*bpPeer), - requesters: make(map[int64]*bpRequester), - height: start, + requesters: make(map[int64]*bpRequester), + height: start, + startHeight: start, requestsCh: requestsCh, errorsCh: errorsCh, @@ -91,8 +106,8 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p // OnStart implements service.Service by spawning requesters routine and recording // pool's start time. func (pool *BlockPool) OnStart() error { - go pool.makeRequestersRoutine() pool.startTime = time.Now() + go pool.makeRequestersRoutine() return nil } @@ -102,6 +117,14 @@ func (pool *BlockPool) makeRequestersRoutine() { return } + // Check if we are within peerConnWait seconds of start time + // This gives us some time to connect to peers before starting a wave of requests + if time.Since(pool.startTime) < peerConnWait { + // Calculate the duration to sleep until peerConnWait seconds have passed since pool.startTime + sleepDuration := peerConnWait - time.Since(pool.startTime) + time.Sleep(sleepDuration) + } + pool.mtx.Lock() var ( maxRequestersCreated = len(pool.requesters) >= len(pool.peers)*maxPendingRequestsPerPeer @@ -119,6 +142,8 @@ func (pool *BlockPool) makeRequestersRoutine() { time.Sleep(requestInterval) default: pool.makeNextRequester(nextHeight) + // Sleep for a bit to make the requests more ordered. + time.Sleep(requestInterval) } } } @@ -140,11 +165,16 @@ func (pool *BlockPool) removeTimedoutPeers() { "minRate", fmt.Sprintf("%d KB/s", minRecvRate/1024)) peer.didTimeout = true } + + peer.curRate = curRate } + if peer.didTimeout { pool.removePeer(peer.id) } } + + pool.sortPeers() } // IsCaughtUp returns true if this node is caught up, false - otherwise. @@ -187,81 +217,109 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) return } -// PopRequest pops the first block at pool.height. -// It must have been validated by 'second'.Commit from PeekTwoBlocks(). +// PopRequest removes the requester at pool.height and increments pool.height. func (pool *BlockPool) PopRequest() { pool.mtx.Lock() defer pool.mtx.Unlock() - if r := pool.requesters[pool.height]; r != nil { - /* The block can disappear at any time, due to removePeer(). - if r := pool.requesters[pool.height]; r == nil || r.block == nil { - PanicSanity("PopRequest() requires a valid block") - } - */ - if err := r.Stop(); err != nil { - pool.Logger.Error("Error stopping requester", "err", err) - } - delete(pool.requesters, pool.height) - pool.height++ - } else { + r := pool.requesters[pool.height] + if r == nil { panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height)) } + + if err := r.Stop(); err != nil { + pool.Logger.Error("Error stopping requester", "err", err) + } + delete(pool.requesters, pool.height) + pool.height++ + + // Notify the next minBlocksForSingleRequest requesters about new height, so + // they can potentially request a block from the second peer. + for i := int64(0); i < minBlocksForSingleRequest && i < int64(len(pool.requesters)); i++ { + pool.requesters[pool.height+i].newHeight(pool.height) + } } -// RedoRequest invalidates the block at pool.height, -// Remove the peer and redo request from others. +// RemovePeerAndRedoAllPeerRequests retries the request at the given height and +// all the requests made to the same peer. The peer is removed from the pool. // Returns the ID of the removed peer. -func (pool *BlockPool) RedoRequest(height int64) p2p.ID { +func (pool *BlockPool) RemovePeerAndRedoAllPeerRequests(height int64) p2p.ID { pool.mtx.Lock() defer pool.mtx.Unlock() request := pool.requesters[height] - peerID := request.getPeerID() - if peerID != p2p.ID("") { - // RemovePeer will redo all requesters associated with this peer. - pool.removePeer(peerID) - } + peerID := request.gotBlockFromPeerID() + // RemovePeer will redo all requesters associated with this peer. + pool.removePeer(peerID) return peerID } -// AddBlock validates that the block comes from the peer it was expected from and calls the requester to store it. +// RedoRequestFrom retries the request at the given height. It does not remove the +// peer. +func (pool *BlockPool) RedoRequestFrom(height int64, peerID p2p.ID) { + pool.mtx.Lock() + defer pool.mtx.Unlock() + + if requester, ok := pool.requesters[height]; ok { // If we requested this block + if requester.didRequestFrom(peerID) { // From this specific peer + requester.redo(peerID) + } + } +} + +// Deprecated: use RemovePeerAndRedoAllPeerRequests instead. +func (pool *BlockPool) RedoRequest(height int64) p2p.ID { + return pool.RemovePeerAndRedoAllPeerRequests(height) +} + +// AddBlock validates that the block comes from the peer it was expected from +// and calls the requester to store it. +// +// This requires an extended commit at the same height as the supplied block - +// the block contains the last commit, but we need the latest commit in case we +// need to switch over from block sync to consensus at this height. If the +// height of the extended commit and the height of the block do not match, we +// do not add the block and return an error. // TODO: ensure that blocks come in order for each peer. -func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int) { +func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int) error { pool.mtx.Lock() defer pool.mtx.Unlock() requester := pool.requesters[block.Height] if requester == nil { - pool.Logger.Info( - "peer sent us a block we didn't expect", - "peer", - peerID, - "curHeight", - pool.height, - "blockHeight", - block.Height) - diff := pool.height - block.Height - if diff < 0 { - diff *= -1 - } - const maxDiff = 100 // maximum difference between current and received block height - if diff > maxDiff { - pool.sendError(errors.New("peer sent us a block we didn't expect with a height too far ahead/behind"), peerID) + // Because we're issuing 2nd requests for closer blocks, it's possible to + // receive a block we've already processed from a second peer. Hence, we + // can't punish it. But if the peer sent us a block we clearly didn't + // request, we disconnect. + if block.Height > pool.height || block.Height < pool.startHeight { + err := fmt.Errorf("peer sent us block #%d we didn't expect (current height: %d, start height: %d)", + block.Height, pool.height, pool.startHeight) + pool.sendError(err, peerID) + return err } - return + + return fmt.Errorf("got an already committed block #%d (possibly from the slow peer %s)", block.Height, peerID) } if !requester.setBlock(block, peerID) { - err := errors.New("requester is different or block already exists") + err := fmt.Errorf("requested block #%d from %v, not %s", block.Height, requester.requestedFrom(), peerID) pool.sendError(err, peerID) - return + return err } peer := pool.peers[peerID] if peer != nil { peer.decrPending(blockSize) } + + return nil +} + +// Height returns the pool's height. +func (pool *BlockPool) Height() int64 { + pool.mtx.Lock() + defer pool.mtx.Unlock() + return pool.height } // MaxPeerHeight returns the highest reported height. @@ -284,6 +342,9 @@ func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) { peer = newBPPeer(pool, peerID, base, height) peer.setLogger(pool.Logger.With("peer", peerID)) pool.peers[peerID] = peer + // no need to sort because curRate is 0 at start. + // just add to the beginning so it's picked first by pickIncrAvailablePeer. + pool.sortedPeers = append([]*bpPeer{peer}, pool.sortedPeers...) } if height > pool.maxPeerHeight { @@ -302,7 +363,7 @@ func (pool *BlockPool) RemovePeer(peerID p2p.ID) { func (pool *BlockPool) removePeer(peerID p2p.ID) { for _, requester := range pool.requesters { - if requester.getPeerID() == peerID { + if requester.didRequestFrom(peerID) { requester.redo(peerID) } } @@ -314,6 +375,12 @@ func (pool *BlockPool) removePeer(peerID p2p.ID) { } delete(pool.peers, peerID) + for i, p := range pool.sortedPeers { + if p.id == peerID { + pool.sortedPeers = append(pool.sortedPeers[:i], pool.sortedPeers[i+1:]...) + break + } + } // Find a new peer with the biggest height and update maxPeerHeight if the // peer's height was the biggest. @@ -336,11 +403,14 @@ func (pool *BlockPool) updateMaxPeerHeight() { // Pick an available peer with the given height available. // If no peers are available, returns nil. -func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { +func (pool *BlockPool) pickIncrAvailablePeer(height int64, excludePeerID p2p.ID) *bpPeer { pool.mtx.Lock() defer pool.mtx.Unlock() - for _, peer := range pool.peers { + for _, peer := range pool.sortedPeers { + if peer.id == excludePeerID { + continue + } if peer.didTimeout { pool.removePeer(peer.id) continue @@ -354,9 +424,19 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { peer.incrPending() return peer } + return nil } +// Sort peers by curRate, highest first. +// +// CONTRACT: pool.mtx must be locked. +func (pool *BlockPool) sortPeers() { + sort.Slice(pool.sortedPeers, func(i, j int) bool { + return pool.sortedPeers[i].curRate > pool.sortedPeers[j].curRate + }) +} + func (pool *BlockPool) makeNextRequester(nextHeight int64) { pool.mtx.Lock() request := newBPRequester(pool, nextHeight) @@ -406,6 +486,7 @@ func (pool *BlockPool) debug() string { type bpPeer struct { didTimeout bool + curRate int64 numPending int32 height int64 base int64 @@ -478,27 +559,41 @@ func (peer *bpPeer) onTimeout() { //------------------------------------- +// bpRequester requests a block from a peer. +// +// If the height is within minBlocksForSingleRequest blocks of the pool's +// height, it will send an additional request to another peer. This is to avoid +// a situation where blocksync is stuck because of a single slow peer. Note +// that it's okay to send a single request when the requested height is far +// from the pool's height. If the peer is slow, it will timeout and be replaced +// with another peer. type bpRequester struct { service.BaseService - pool *BlockPool - height int64 - gotBlockCh chan struct{} - redoCh chan p2p.ID // redo may send multitime, add peerId to identify repeat - mtx cmtsync.Mutex - peerID p2p.ID - block *types.Block + pool *BlockPool + height int64 + gotBlockCh chan struct{} + redoCh chan p2p.ID // redo may got multiple messages, add peerId to identify repeat + newHeightCh chan int64 + + mtx cmtsync.Mutex + peerID p2p.ID + secondPeerID p2p.ID // alternative peer to request from (if close to pool's height) + gotBlockFrom p2p.ID + block *types.Block } func newBPRequester(pool *BlockPool, height int64) *bpRequester { bpr := &bpRequester{ - pool: pool, - height: height, - gotBlockCh: make(chan struct{}, 1), - redoCh: make(chan p2p.ID, 1), - - peerID: "", - block: nil, + pool: pool, + height: height, + gotBlockCh: make(chan struct{}, 1), + redoCh: make(chan p2p.ID, 1), + newHeightCh: make(chan int64, 1), + + peerID: "", + secondPeerID: "", + block: nil, } bpr.BaseService = *service.NewBaseService(nil, "bpRequester", bpr) return bpr @@ -509,14 +604,20 @@ func (bpr *bpRequester) OnStart() error { return nil } -// Returns true if the peer matches and block doesn't already exist. +// Returns true if the peer(s) match and block doesn't already exist. func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool { bpr.mtx.Lock() - if bpr.block != nil || bpr.peerID != peerID { + if bpr.peerID != peerID && bpr.secondPeerID != peerID { bpr.mtx.Unlock() return false } + if bpr.block != nil { + bpr.mtx.Unlock() + return true // getting a block from both peers is not an error + } + bpr.block = block + bpr.gotBlockFrom = peerID bpr.mtx.Unlock() select { @@ -532,19 +633,53 @@ func (bpr *bpRequester) getBlock() *types.Block { return bpr.block } -func (bpr *bpRequester) getPeerID() p2p.ID { +// Returns the IDs of peers we've requested a block from. +func (bpr *bpRequester) requestedFrom() []p2p.ID { + bpr.mtx.Lock() + defer bpr.mtx.Unlock() + peerIDs := make([]p2p.ID, 0, 2) + if bpr.peerID != "" { + peerIDs = append(peerIDs, bpr.peerID) + } + if bpr.secondPeerID != "" { + peerIDs = append(peerIDs, bpr.secondPeerID) + } + return peerIDs +} + +// Returns true if we've requested a block from the given peer. +func (bpr *bpRequester) didRequestFrom(peerID p2p.ID) bool { bpr.mtx.Lock() defer bpr.mtx.Unlock() - return bpr.peerID + return bpr.peerID == peerID || bpr.secondPeerID == peerID } -// This is called from the requestRoutine, upon redo(). -func (bpr *bpRequester) reset() { +// Returns the ID of the peer who sent us the block. +func (bpr *bpRequester) gotBlockFromPeerID() p2p.ID { bpr.mtx.Lock() defer bpr.mtx.Unlock() + return bpr.gotBlockFrom +} + +// Removes the block (IF we got it from the given peer) and resets the peer. +func (bpr *bpRequester) reset(peerID p2p.ID) (removedBlock bool) { + bpr.mtx.Lock() + defer bpr.mtx.Unlock() + + // Only remove the block if we got it from that peer. + if bpr.gotBlockFrom == peerID { + bpr.block = nil + bpr.gotBlockFrom = "" + removedBlock = true + } + + if bpr.peerID == peerID { + bpr.peerID = "" + } else { + bpr.secondPeerID = "" + } - bpr.peerID = "" - bpr.block = nil + return removedBlock } // Tells bpRequester to pick another peer and try again. @@ -557,34 +692,75 @@ func (bpr *bpRequester) redo(peerID p2p.ID) { } } +func (bpr *bpRequester) pickPeerAndSendRequest() { + bpr.mtx.Lock() + secondPeerID := bpr.secondPeerID + bpr.mtx.Unlock() + + var peer *bpPeer +PICK_PEER_LOOP: + for { + if !bpr.IsRunning() || !bpr.pool.IsRunning() { + return + } + peer = bpr.pool.pickIncrAvailablePeer(bpr.height, secondPeerID) + if peer == nil { + bpr.Logger.Debug("No peers currently available; will retry shortly", "height", bpr.height) + time.Sleep(requestInterval) + continue PICK_PEER_LOOP + } + break PICK_PEER_LOOP + } + bpr.mtx.Lock() + bpr.peerID = peer.id + bpr.mtx.Unlock() + + bpr.pool.sendRequest(bpr.height, peer.id) +} + +// Picks a second peer and sends a request to it. If the second peer is already +// set, does nothing. +func (bpr *bpRequester) pickSecondPeerAndSendRequest() { + bpr.mtx.Lock() + if bpr.secondPeerID != "" { + bpr.mtx.Unlock() + return + } + peerID := bpr.peerID + bpr.mtx.Unlock() + + secondPeer := bpr.pool.pickIncrAvailablePeer(bpr.height, peerID) + if secondPeer != nil { + bpr.mtx.Lock() + bpr.secondPeerID = secondPeer.id + bpr.mtx.Unlock() + + bpr.pool.sendRequest(bpr.height, secondPeer.id) + } +} + +// Informs the requester of a new pool's height. +func (bpr *bpRequester) newHeight(height int64) { + select { + case bpr.newHeightCh <- height: + default: + } +} + // Responsible for making more requests as necessary // Returns only when a block is found (e.g. AddBlock() is called) func (bpr *bpRequester) requestRoutine() { + gotBlock := false + OUTER_LOOP: for { - // Pick a peer to send request to. - var peer *bpPeer - PICK_PEER_LOOP: - for { - if !bpr.IsRunning() || !bpr.pool.IsRunning() { - return - } - peer = bpr.pool.pickIncrAvailablePeer(bpr.height) - if peer == nil { - bpr.Logger.Debug("No peers currently available; will retry shortly", "height", bpr.height) - time.Sleep(requestInterval) - continue PICK_PEER_LOOP - } - break PICK_PEER_LOOP + bpr.pickPeerAndSendRequest() + + poolHeight := bpr.pool.Height() + if bpr.height-poolHeight < minBlocksForSingleRequest { + bpr.pickSecondPeerAndSendRequest() } - bpr.mtx.Lock() - bpr.peerID = peer.id - bpr.mtx.Unlock() - to := time.NewTimer(requestRetrySeconds * time.Second) - // Send request and wait. - bpr.pool.sendRequest(bpr.height, peer.id) - WAIT_LOOP: for { select { case <-bpr.pool.Quit(): @@ -594,22 +770,34 @@ OUTER_LOOP: return case <-bpr.Quit(): return - case <-to.C: - bpr.Logger.Debug("Retrying block request after timeout", "height", bpr.height, "peer", bpr.peerID) - // Simulate a redo - bpr.reset() - continue OUTER_LOOP + case <-time.After(requestRetrySeconds * time.Second): + if !gotBlock { + bpr.Logger.Debug("Retrying block request(s) after timeout", "height", bpr.height, "peer", bpr.peerID, "secondPeerID", bpr.secondPeerID) + bpr.reset(bpr.peerID) + bpr.reset(bpr.secondPeerID) + continue OUTER_LOOP + } case peerID := <-bpr.redoCh: - if peerID == bpr.peerID { - bpr.reset() + if bpr.didRequestFrom(peerID) { + removedBlock := bpr.reset(peerID) + if removedBlock { + gotBlock = false + } + } + // If both peers returned NoBlockResponse or bad block, reschedule both + // requests. If not, wait for the other peer. + if len(bpr.requestedFrom()) == 0 { continue OUTER_LOOP - } else { - continue WAIT_LOOP + } + case newHeight := <-bpr.newHeightCh: + if !gotBlock && bpr.height-newHeight < minBlocksForSingleRequest { + // The operation is a noop if the second peer is already set. The cost is checking a mutex. + bpr.pickSecondPeerAndSendRequest() } case <-bpr.gotBlockCh: + gotBlock = true // We got a block! // Continue the for-loop and wait til Quit. - continue WAIT_LOOP } } } diff --git a/blocksync/pool_test.go b/blocksync/pool_test.go index 8309b19e42b..df991fae1c4 100644 --- a/blocksync/pool_test.go +++ b/blocksync/pool_test.go @@ -42,7 +42,7 @@ func (p testPeer) runInputRoutine() { // Request desired, pretend like we got the block immediately. func (p testPeer) simulateInput(input inputData) { block := &types.Block{Header: types.Header{Height: input.request.Height}} - input.pool.AddBlock(input.request.PeerID, block, 123) + _ = input.pool.AddBlock(input.request.PeerID, block, 123) // TODO: uncommenting this creates a race which is detected by: // https://github.com/golang/go/blob/2bd767b1022dd3254bcec469f0ee164024726486/src/testing/testing.go#L854-L856 // see: https://github.com/tendermint/tendermint/issues/3390#issue-418379890 diff --git a/blocksync/reactor.go b/blocksync/reactor.go index 9867ac1b7e1..7fa2bbf73be 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -237,7 +237,9 @@ func (bcR *Reactor) ReceiveEnvelope(e p2p.Envelope) { bcR.Switch.StopPeerForError(e.Src, err) return } - bcR.pool.AddBlock(e.Src.ID(), bi, msg.Block.Size()) + if err := bcR.pool.AddBlock(e.Src.ID(), bi, msg.Block.Size()); err != nil { + bcR.Logger.Error("failed to add block", "peer", e.Src, "err", err) + } case *bcproto.StatusRequest: // Send peer our state. e.Src.TrySendEnvelope(p2p.Envelope{ @@ -252,6 +254,7 @@ func (bcR *Reactor) ReceiveEnvelope(e p2p.Envelope) { bcR.pool.SetPeerRange(e.Src.ID(), msg.Base, msg.Height) case *bcproto.NoBlockResponse: bcR.Logger.Debug("Peer does not have requested block", "peer", e.Src, "height", msg.Height) + bcR.pool.RedoRequestFrom(msg.Height, e.Src.ID()) default: bcR.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg))) } @@ -399,14 +402,14 @@ FOR_LOOP: } bcR.Logger.Error("Error in validation", "err", err) - peerID := bcR.pool.RedoRequest(first.Height) + peerID := bcR.pool.RemovePeerAndRedoAllPeerRequests(first.Height) peer := bcR.Switch.Peers().Get(peerID) if peer != nil { // NOTE: we've already removed the peer's request, but we // still need to clean up the rest. bcR.Switch.StopPeerForError(peer, fmt.Errorf("Reactor validation error: %v", err)) } - peerID2 := bcR.pool.RedoRequest(second.Height) + peerID2 := bcR.pool.RemovePeerAndRedoAllPeerRequests(second.Height) peer2 := bcR.Switch.Peers().Get(peerID2) if peer2 != nil && peer2 != peer { // NOTE: we've already removed the peer's request, but we