diff --git a/blocksync/pool.go b/blocksync/pool.go index d973bbf5ca8..d978b4983a2 100644 --- a/blocksync/pool.go +++ b/blocksync/pool.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "math" + "sync" "sync/atomic" "time" @@ -66,19 +67,20 @@ type BlockPool struct { service.BaseService startTime time.Time - mtx cmtsync.Mutex + mtx sync.Mutex // block requests requesters map[int64][]*bpRequester height int64 // the lowest key in requesters. // peers + peersMtx sync.Mutex peers map[p2p.ID]*bpPeer maxPeerHeight int64 // the biggest reported height // atomic numPending int32 // number of requests pending assignment or block response - requestsCh chan<- BlockRequest - errorsCh chan<- peerError + errorsCh chan peerError + requestsCh chan BlockRequest lastBlockTime time.Time blockCount int64 @@ -86,7 +88,7 @@ type BlockPool struct { // NewBlockPool returns a new BlockPool with the height equal to start. Block // requests and errors will be sent to requestsCh and errorsCh accordingly. -func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool { +func NewBlockPool(start int64, requestsCh chan BlockRequest, errorsCh chan peerError) *BlockPool { bp := &BlockPool{ peers: make(map[p2p.ID]*bpPeer), requesters: make(map[int64][]*bpRequester), // Updated to hold a slice of requesters @@ -106,30 +108,24 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p func (pool *BlockPool) OnStart() error { go pool.makeRequestersRoutine() pool.startTime = time.Now() + go pool.startErrorHandler() return nil } // spawns requesters as needed func (pool *BlockPool) makeRequestersRoutine() { + pool.mtx.Lock() + defer pool.mtx.Unlock() for { if !pool.IsRunning() { break } _, numPending, lenRequesters := pool.GetStatus() - switch { - case numPending >= maxPendingRequests: - // sleep for a bit. - time.Sleep(requestIntervalMS * time.Millisecond) - // check for timed out peers - pool.removeTimedoutPeers() - case lenRequesters >= maxTotalRequesters: - // sleep for a bit. + if numPending >= maxPendingRequests || lenRequesters >= maxTotalRequesters { time.Sleep(requestIntervalMS * time.Millisecond) - // check for timed out peers pool.removeTimedoutPeers() - default: - // request for more blocks. + } else { pool.makeNextRequester() } } @@ -172,11 +168,11 @@ func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequester // TODO: relax conditions, prevent abuse. func (pool *BlockPool) IsCaughtUp() bool { pool.mtx.Lock() - defer pool.mtx.Unlock() // Need at least 1 peer to be considered caught up. if len(pool.peers) == 0 { pool.Logger.Debug("Blockpool has no peers") + pool.mtx.Unlock() return false } @@ -188,6 +184,7 @@ func (pool *BlockPool) IsCaughtUp() bool { receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1) isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers + pool.mtx.Unlock() return isCaughtUp } @@ -197,7 +194,6 @@ func (pool *BlockPool) IsCaughtUp() bool { // The caller will verify the commit. func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) { pool.mtx.Lock() - defer pool.mtx.Unlock() if requestersAtHeight := pool.requesters[pool.height]; requestersAtHeight != nil { for _, requester := range requestersAtHeight { @@ -215,6 +211,7 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) } } } + pool.mtx.Unlock() return first, second } @@ -222,7 +219,6 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) // It must have been validated by 'second'.Commit from PeekTwoBlocks(). func (pool *BlockPool) PopRequest() { pool.mtx.Lock() - defer pool.mtx.Unlock() requestersAtHeight := pool.requesters[pool.height] if len(requestersAtHeight) > 0 { @@ -236,6 +232,8 @@ func (pool *BlockPool) PopRequest() { } else { panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height)) } + pool.mtx.Unlock() + } // RedoRequest invalidates the block at pool.height, @@ -260,17 +258,18 @@ func (pool *BlockPool) RedoRequest(height int64) p2p.ID { } // If no requester had a peerID, return an empty p2p.ID. + return peerID } // AddBlock validates that the block comes from the peer it was expected from and calls the requester to store it. // TODO: ensure that blocks come in order for each peer. func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int) { - pool.mtx.Lock() + defer pool.mtx.Unlock() + // Increment the block count since a new block has been received pool.blockCount++ - pool.mtx.Unlock() // Calculate the blocks per second now := time.Now() @@ -285,9 +284,6 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int redEnd := "\033[39m" fmt.Printf("%sBlocks per second: %f%s\n", redStart, blocksPerSecond, redEnd) - pool.mtx.Lock() - defer pool.mtx.Unlock() - requesters := pool.requesters[block.Height] if requesters == nil { // Log that a block was received that we did not expect @@ -339,6 +335,7 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int break } } + } // makeRequesterForHeight creates a requester for a specific block height if it does not exist @@ -366,15 +363,13 @@ func (pool *BlockPool) makeRequesterForHeight(height int64) { // MaxPeerHeight returns the highest reported height. func (pool *BlockPool) MaxPeerHeight() int64 { - pool.mtx.Lock() - defer pool.mtx.Unlock() - return pool.maxPeerHeight + return atomic.LoadInt64(&pool.maxPeerHeight) } // SetPeerRange sets the peer's alleged blockchain base and height. func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) { - pool.mtx.Lock() - defer pool.mtx.Unlock() + pool.peersMtx.Lock() + defer pool.peersMtx.Unlock() peer := pool.peers[peerID] if peer != nil { @@ -386,8 +381,8 @@ func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) { pool.peers[peerID] = peer } - if height > pool.maxPeerHeight { - pool.maxPeerHeight = height + if height > atomic.LoadInt64(&pool.maxPeerHeight) { + atomic.StoreInt64(&pool.maxPeerHeight, height) } } @@ -449,7 +444,6 @@ func (pool *BlockPool) updateMaxPeerHeight() { // If no peers are available, returns nil. func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { pool.mtx.Lock() - defer pool.mtx.Unlock() var bestPeer *bpPeer var bestThroughput int64 @@ -475,6 +469,7 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer { bestPeer.incrPending() } + pool.mtx.Unlock() return bestPeer } @@ -503,10 +498,6 @@ func (pool *BlockPool) makeNextRequester() { } } -func (pool *BlockPool) requestersLen() int64 { - return int64(len(pool.requesters)) -} - func (pool *BlockPool) sendRequest(height int64, peerID p2p.ID) { if !pool.IsRunning() { return @@ -599,16 +590,6 @@ func (peer *bpPeer) incrPending() { peer.numPending++ } -func (peer *bpPeer) decrPending(recvSize int) { - peer.numPending-- - if peer.numPending == 0 { - peer.timeout.Stop() - } else { - peer.recvMonitor.Update(recvSize) - peer.resetTimeout() - } -} - func (peer *bpPeer) onTimeout() { peer.pool.mtx.Lock() defer peer.pool.mtx.Unlock() @@ -671,21 +652,16 @@ func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool { } func (bpr *bpRequester) getBlock() *types.Block { - bpr.mtx.Lock() - defer bpr.mtx.Unlock() return bpr.block } func (bpr *bpRequester) getPeerID() p2p.ID { - bpr.mtx.Lock() - defer bpr.mtx.Unlock() return bpr.peerID } // This is called from the requestRoutine, upon redo(). func (bpr *bpRequester) reset() { bpr.mtx.Lock() - defer bpr.mtx.Unlock() if bpr.block != nil { atomic.AddInt32(&bpr.pool.numPending, 1) @@ -693,6 +669,8 @@ func (bpr *bpRequester) reset() { bpr.peerID = "" bpr.block = nil + bpr.mtx.Unlock() + } // Tells bpRequester to pick another peer and try again. @@ -776,6 +754,10 @@ func (bpr *bpRequester) stop() { } bpr.isStopped = true close(bpr.gotBlockCh) + // Drain the channel to avoid sending on closed channel + for range bpr.redoCh { + // Drain the channel + } if err := bpr.Stop(); err != nil { bpr.Logger.Error("Error stopping requester", "err", err) } @@ -805,3 +787,35 @@ func (pool *BlockPool) reassignRequest(height int64) { // Create a new requester for this height to replace the stopped one. pool.makeRequesterForHeight(height) } + +func (pool *BlockPool) startErrorHandler() { + for { + select { + case err := <-pool.errorsCh: + pool.mtx.Lock() + pool.removePeer(err.peerID) + pool.Logger.Error("Error processing block", "error", err.err, "peer", err.peerID) + pool.mtx.Unlock() + case <-pool.Quit(): + return + } + } +} + +func (pool *BlockPool) startRequestHandler() { + + for { + select { + case request := <-pool.requestsCh: + // Find an available peer with matching height + peer := pool.pickIncrAvailablePeer(request.Height) // Consider locking if needed + if peer != nil { + // Send request to peer (likely through peer's communication mechanism) + } else { + // Handle the case where no peer is available + } + case <-pool.Quit(): + return + } + } +}