Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
faddat committed Feb 25, 2024
1 parent fd052e2 commit adb0032
Showing 1 changed file with 64 additions and 50 deletions.
114 changes: 64 additions & 50 deletions blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"fmt"
"math"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -66,27 +67,28 @@ type BlockPool struct {
service.BaseService
startTime time.Time

mtx cmtsync.Mutex
mtx sync.Mutex
// block requests
requesters map[int64][]*bpRequester
height int64 // the lowest key in requesters.
// peers
peersMtx sync.Mutex
peers map[p2p.ID]*bpPeer
maxPeerHeight int64 // the biggest reported height

// atomic
numPending int32 // number of requests pending assignment or block response

requestsCh chan<- BlockRequest
errorsCh chan<- peerError
errorsCh chan peerError
requestsCh chan BlockRequest

lastBlockTime time.Time
blockCount int64
}

// NewBlockPool returns a new BlockPool with the height equal to start. Block
// requests and errors will be sent to requestsCh and errorsCh accordingly.
func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- peerError) *BlockPool {
func NewBlockPool(start int64, requestsCh chan BlockRequest, errorsCh chan peerError) *BlockPool {
bp := &BlockPool{
peers: make(map[p2p.ID]*bpPeer),
requesters: make(map[int64][]*bpRequester), // Updated to hold a slice of requesters
Expand All @@ -106,30 +108,24 @@ func NewBlockPool(start int64, requestsCh chan<- BlockRequest, errorsCh chan<- p
func (pool *BlockPool) OnStart() error {
go pool.makeRequestersRoutine()
pool.startTime = time.Now()
go pool.startErrorHandler()
return nil
}

// spawns requesters as needed
func (pool *BlockPool) makeRequestersRoutine() {
pool.mtx.Lock()
defer pool.mtx.Unlock()
for {
if !pool.IsRunning() {
break
}

_, numPending, lenRequesters := pool.GetStatus()
switch {
case numPending >= maxPendingRequests:
// sleep for a bit.
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
case lenRequesters >= maxTotalRequesters:
// sleep for a bit.
if numPending >= maxPendingRequests || lenRequesters >= maxTotalRequesters {
time.Sleep(requestIntervalMS * time.Millisecond)
// check for timed out peers
pool.removeTimedoutPeers()
default:
// request for more blocks.
} else {
pool.makeNextRequester()
}
}
Expand Down Expand Up @@ -172,11 +168,11 @@ func (pool *BlockPool) GetStatus() (height int64, numPending int32, lenRequester
// TODO: relax conditions, prevent abuse.
func (pool *BlockPool) IsCaughtUp() bool {
pool.mtx.Lock()
defer pool.mtx.Unlock()

// Need at least 1 peer to be considered caught up.
if len(pool.peers) == 0 {
pool.Logger.Debug("Blockpool has no peers")
pool.mtx.Unlock()
return false
}

Expand All @@ -188,6 +184,7 @@ func (pool *BlockPool) IsCaughtUp() bool {
receivedBlockOrTimedOut := pool.height > 0 || time.Since(pool.startTime) > 5*time.Second
ourChainIsLongestAmongPeers := pool.maxPeerHeight == 0 || pool.height >= (pool.maxPeerHeight-1)
isCaughtUp := receivedBlockOrTimedOut && ourChainIsLongestAmongPeers
pool.mtx.Unlock()
return isCaughtUp
}

Expand All @@ -197,7 +194,6 @@ func (pool *BlockPool) IsCaughtUp() bool {
// The caller will verify the commit.
func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block) {
pool.mtx.Lock()
defer pool.mtx.Unlock()

if requestersAtHeight := pool.requesters[pool.height]; requestersAtHeight != nil {
for _, requester := range requestersAtHeight {
Expand All @@ -215,14 +211,14 @@ func (pool *BlockPool) PeekTwoBlocks() (first *types.Block, second *types.Block)
}
}
}
pool.mtx.Unlock()
return first, second
}

// PopRequest pops the first block at pool.height.
// It must have been validated by 'second'.Commit from PeekTwoBlocks().
func (pool *BlockPool) PopRequest() {
pool.mtx.Lock()
defer pool.mtx.Unlock()

requestersAtHeight := pool.requesters[pool.height]
if len(requestersAtHeight) > 0 {
Expand All @@ -236,6 +232,8 @@ func (pool *BlockPool) PopRequest() {
} else {
panic(fmt.Sprintf("Expected requester to pop, got nothing at height %v", pool.height))
}
pool.mtx.Unlock()

}

// RedoRequest invalidates the block at pool.height,
Expand All @@ -260,17 +258,18 @@ func (pool *BlockPool) RedoRequest(height int64) p2p.ID {
}

// If no requester had a peerID, return an empty p2p.ID.

return peerID
}

// AddBlock validates that the block comes from the peer it was expected from and calls the requester to store it.
// TODO: ensure that blocks come in order for each peer.
func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int) {

pool.mtx.Lock()
defer pool.mtx.Unlock()

// Increment the block count since a new block has been received
pool.blockCount++
pool.mtx.Unlock()

// Calculate the blocks per second
now := time.Now()
Expand All @@ -285,9 +284,6 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
redEnd := "\033[39m"
fmt.Printf("%sBlocks per second: %f%s\n", redStart, blocksPerSecond, redEnd)

pool.mtx.Lock()
defer pool.mtx.Unlock()

requesters := pool.requesters[block.Height]
if requesters == nil {
// Log that a block was received that we did not expect
Expand Down Expand Up @@ -339,6 +335,7 @@ func (pool *BlockPool) AddBlock(peerID p2p.ID, block *types.Block, blockSize int
break
}
}

}

// makeRequesterForHeight creates a requester for a specific block height if it does not exist
Expand Down Expand Up @@ -366,15 +363,13 @@ func (pool *BlockPool) makeRequesterForHeight(height int64) {

// MaxPeerHeight returns the highest reported height.
func (pool *BlockPool) MaxPeerHeight() int64 {
pool.mtx.Lock()
defer pool.mtx.Unlock()
return pool.maxPeerHeight
return atomic.LoadInt64(&pool.maxPeerHeight)
}

// SetPeerRange sets the peer's alleged blockchain base and height.
func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) {
pool.mtx.Lock()
defer pool.mtx.Unlock()
pool.peersMtx.Lock()
defer pool.peersMtx.Unlock()

peer := pool.peers[peerID]
if peer != nil {
Expand All @@ -386,8 +381,8 @@ func (pool *BlockPool) SetPeerRange(peerID p2p.ID, base int64, height int64) {
pool.peers[peerID] = peer
}

if height > pool.maxPeerHeight {
pool.maxPeerHeight = height
if height > atomic.LoadInt64(&pool.maxPeerHeight) {
atomic.StoreInt64(&pool.maxPeerHeight, height)
}
}

Expand Down Expand Up @@ -449,7 +444,6 @@ func (pool *BlockPool) updateMaxPeerHeight() {
// If no peers are available, returns nil.
func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer {
pool.mtx.Lock()
defer pool.mtx.Unlock()

var bestPeer *bpPeer
var bestThroughput int64
Expand All @@ -475,6 +469,7 @@ func (pool *BlockPool) pickIncrAvailablePeer(height int64) *bpPeer {
bestPeer.incrPending()
}

pool.mtx.Unlock()
return bestPeer
}

Expand Down Expand Up @@ -503,10 +498,6 @@ func (pool *BlockPool) makeNextRequester() {
}
}

func (pool *BlockPool) requestersLen() int64 {
return int64(len(pool.requesters))
}

func (pool *BlockPool) sendRequest(height int64, peerID p2p.ID) {
if !pool.IsRunning() {
return
Expand Down Expand Up @@ -599,16 +590,6 @@ func (peer *bpPeer) incrPending() {
peer.numPending++
}

func (peer *bpPeer) decrPending(recvSize int) {
peer.numPending--
if peer.numPending == 0 {
peer.timeout.Stop()
} else {
peer.recvMonitor.Update(recvSize)
peer.resetTimeout()
}
}

func (peer *bpPeer) onTimeout() {
peer.pool.mtx.Lock()
defer peer.pool.mtx.Unlock()
Expand Down Expand Up @@ -671,28 +652,25 @@ func (bpr *bpRequester) setBlock(block *types.Block, peerID p2p.ID) bool {
}

func (bpr *bpRequester) getBlock() *types.Block {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
return bpr.block
}

func (bpr *bpRequester) getPeerID() p2p.ID {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()
return bpr.peerID
}

// This is called from the requestRoutine, upon redo().
func (bpr *bpRequester) reset() {
bpr.mtx.Lock()
defer bpr.mtx.Unlock()

if bpr.block != nil {
atomic.AddInt32(&bpr.pool.numPending, 1)
}

bpr.peerID = ""
bpr.block = nil
bpr.mtx.Unlock()

}

// Tells bpRequester to pick another peer and try again.
Expand Down Expand Up @@ -776,6 +754,10 @@ func (bpr *bpRequester) stop() {
}
bpr.isStopped = true
close(bpr.gotBlockCh)
// Drain the channel to avoid sending on closed channel
for range bpr.redoCh {
// Drain the channel
}
if err := bpr.Stop(); err != nil {
bpr.Logger.Error("Error stopping requester", "err", err)
}
Expand Down Expand Up @@ -805,3 +787,35 @@ func (pool *BlockPool) reassignRequest(height int64) {
// Create a new requester for this height to replace the stopped one.
pool.makeRequesterForHeight(height)
}

func (pool *BlockPool) startErrorHandler() {
for {
select {
case err := <-pool.errorsCh:
pool.mtx.Lock()
pool.removePeer(err.peerID)
pool.Logger.Error("Error processing block", "error", err.err, "peer", err.peerID)
pool.mtx.Unlock()
case <-pool.Quit():
return
}
}
}

func (pool *BlockPool) startRequestHandler() {

for {
select {
case request := <-pool.requestsCh:
// Find an available peer with matching height
peer := pool.pickIncrAvailablePeer(request.Height) // Consider locking if needed
if peer != nil {
// Send request to peer (likely through peer's communication mechanism)
} else {
// Handle the case where no peer is available
}
case <-pool.Quit():
return
}
}
}

0 comments on commit adb0032

Please sign in to comment.