Skip to content

Commit

Permalink
Blocksync compatible refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
ValarDragon committed Dec 26, 2024
1 parent fcd17ce commit 9902795
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 80 deletions.
2 changes: 2 additions & 0 deletions blocksync/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -671,6 +671,8 @@ func (bpr *bpRequester) setBlock(block *types.Block, extCommit *types.ExtendedCo
return true // getting a block from both peers is not an error
}

// TODO: Shall we run block validation here?

bpr.block = block
bpr.extCommit = extCommit
bpr.gotBlockFrom = peerID
Expand Down
101 changes: 61 additions & 40 deletions blocksync/reactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
sm "github.com/cometbft/cometbft/state"
"github.com/cometbft/cometbft/store"
"github.com/cometbft/cometbft/types"
"github.com/pkg/errors"
)

const (
Expand Down Expand Up @@ -249,28 +250,31 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest, src p2p.Peer) (queu
}

func (bcR *Reactor) handlePeerResponse(msg *bcproto.BlockResponse, src p2p.Peer) {
bi, err := types.BlockFromProto(msg.Block)
bi, extCommit, err := blockFromMsg(msg)
if err != nil {
bcR.Logger.Error("Peer sent us invalid block", "peer", src, "msg", msg, "err", err)
bcR.Switch.StopPeerForError(src, err)
return
}

if err := bcR.pool.AddBlock(src.ID(), bi, extCommit, msg.Block.Size()); err != nil {
bcR.Logger.Error("failed to add block", "peer", src, "err", err)
}
}

func blockFromMsg(msg *bcproto.BlockResponse) (*types.Block, *types.ExtendedCommit, error) {
bi, err := types.BlockFromProto(msg.Block)
if err != nil {
return nil, nil, err
}
var extCommit *types.ExtendedCommit
if msg.ExtCommit != nil {
var err error
extCommit, err = types.ExtendedCommitFromProto(msg.ExtCommit)
if err != nil {
bcR.Logger.Error("failed to convert extended commit from proto",
"peer", src,
"err", err)
bcR.Switch.StopPeerForError(src, err)
return
}
}

if err := bcR.pool.AddBlock(src.ID(), bi, extCommit, msg.Block.Size()); err != nil {
bcR.Logger.Error("failed to add block", "peer", src, "err", err)
if err != nil {
return nil, nil, errors.Wrap(err, "failed to convert extended commit from proto")
}
return bi, extCommit, nil
}

// Receive implements Reactor by handling 4 types of messages (look below).
Expand Down Expand Up @@ -317,35 +321,26 @@ func (bcR *Reactor) localNodeBlocksTheChain(state sm.State) bool {
return val.VotingPower >= total/3
}

// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (bcR *Reactor) poolRoutine(stateSynced bool) {
bcR.metrics.Syncing.Set(1)
defer bcR.metrics.Syncing.Set(0)

trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond)
defer trySyncTicker.Stop()

statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
defer statusUpdateTicker.Stop()
func (bcR *Reactor) setupTickers() (trySyncTicker *time.Ticker, switchToConsensusTicker *time.Ticker, cleanup func()) {
trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond)

if bcR.switchToConsensusMs == 0 {
bcR.switchToConsensusMs = switchToConsensusIntervalSeconds * 1000
}
switchToConsensusTicker := time.NewTicker(time.Duration(bcR.switchToConsensusMs) * time.Millisecond)
defer switchToConsensusTicker.Stop()

blocksSynced := uint64(0)
switchToConsensusTicker = time.NewTicker(time.Duration(bcR.switchToConsensusMs) * time.Millisecond)

chainID := bcR.initialState.ChainID
state := bcR.initialState

lastHundred := time.Now()
lastRate := 0.0

didProcessCh := make(chan struct{}, 1)
cleanup = func() {
trySyncTicker.Stop()
switchToConsensusTicker.Stop()
}
return trySyncTicker, switchToConsensusTicker, cleanup
}

initialCommitHasExtensions := (bcR.initialState.LastBlockHeight > 0 && bcR.store.LoadBlockExtendedCommit(bcR.initialState.LastBlockHeight) != nil)
// Process our outbound peer block requests.
// This is architecturally confusing right now. Its called from within poolRoutine, but is a fully independent thread.
func (bcR *Reactor) sendPeerRequestRoutine() {
statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second)
defer statusUpdateTicker.Stop()

go func() {
for {
Expand Down Expand Up @@ -379,6 +374,31 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) {
}
}
}()
}

// Handle messages from the poolReactor telling the reactor what to do.
// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down!
func (bcR *Reactor) poolRoutine(stateSynced bool) {
bcR.metrics.Syncing.Set(1)
defer bcR.metrics.Syncing.Set(0)

trySyncTicker, switchToConsensusTicker, cleanupTickers := bcR.setupTickers()
defer cleanupTickers()

blocksSynced := uint64(0)

chainID := bcR.initialState.ChainID
state := bcR.initialState

lastHundred := time.Now()
lastRate := 0.0

didProcessCh := make(chan struct{}, 1)

initialCommitHasExtensions := (bcR.initialState.LastBlockHeight > 0 && bcR.store.LoadBlockExtendedCommit(bcR.initialState.LastBlockHeight) != nil)

// Process our outbound peer block requests.
bcR.sendPeerRequestRoutine()

FOR_LOOP:
for {
Expand Down Expand Up @@ -423,7 +443,7 @@ FOR_LOOP:
"initial_height", state.InitialHeight,
"max_peer_height", bcR.pool.MaxPeerHeight(),
)
continue FOR_LOOP
continue
}
if bcR.pool.IsCaughtUp() || bcR.localNodeBlocksTheChain(state) {
bcR.Logger.Info("Time to switch to consensus reactor!", "height", height)
Expand Down Expand Up @@ -461,7 +481,7 @@ FOR_LOOP:
if first == nil || second == nil {
// we need to have fetched two consecutive blocks in order to
// perform blocksync verification
continue FOR_LOOP
continue
}
// Some sanity checks on heights
if state.LastBlockHeight > 0 && state.LastBlockHeight+1 != first.Height {
Expand Down Expand Up @@ -532,7 +552,7 @@ FOR_LOOP:
// still need to clean up the rest.
bcR.Switch.StopPeerForError(peer2, ErrReactorValidation{Err: err})
}
continue FOR_LOOP
continue
}

bcR.pool.PopRequest()
Expand All @@ -559,14 +579,15 @@ FOR_LOOP:
blocksSynced++

if blocksSynced%100 == 0 {
// Calculate exponential moving average of blocks/second sync rate:
// Rate = 0.9 * previous_rate + 0.1 * (100 blocks / time_since_last_100_blocks)
lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds())
bcR.Logger.Info("Block Sync Rate", "height", bcR.pool.height,
"max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate)
lastHundred = time.Now()
}

continue FOR_LOOP

continue
case <-bcR.Quit():
break FOR_LOOP
case <-bcR.pool.Quit():
Expand Down
55 changes: 15 additions & 40 deletions blocksync/reactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,15 @@ type ReactorPair struct {
app proxy.AppConns
}

func stopReactorPairs(t *testing.T, reactorPairs []ReactorPair) {
for _, r := range reactorPairs {
err := r.reactor.Stop()
require.NoError(t, err)
err = r.app.Stop()
require.NoError(t, err)
}
}

func newReactor(
t *testing.T,
logger log.Logger,
Expand Down Expand Up @@ -201,14 +210,7 @@ func TestNoBlockResponse(t *testing.T) {
return s
}, p2p.Connect2Switches)

defer func() {
for _, r := range reactorPairs {
err := r.reactor.Stop()
require.NoError(t, err)
err = r.app.Stop()
require.NoError(t, err)
}
}()
defer stopReactorPairs(t, reactorPairs)

tests := []struct {
height int64
Expand Down Expand Up @@ -256,12 +258,7 @@ func TestBadBlockStopsPeer(t *testing.T) {
otherGenDoc, otherPrivVals := randGenesisDoc(1, false, 30)
otherChain := newReactor(t, log.TestingLogger(), otherGenDoc, otherPrivVals, maxBlockHeight)

defer func() {
err := otherChain.reactor.Stop()
require.Error(t, err)
err = otherChain.app.Stop()
require.NoError(t, err)
}()
defer stopReactorPairs(t, []ReactorPair{otherChain})

reactorPairs := make([]ReactorPair, 4)

Expand All @@ -275,18 +272,10 @@ func TestBadBlockStopsPeer(t *testing.T) {
return s
}, p2p.Connect2Switches)

defer func() {
for _, r := range reactorPairs {
err := r.reactor.Stop()
require.NoError(t, err)

err = r.app.Stop()
require.NoError(t, err)
}
}()
defer stopReactorPairs(t, reactorPairs)

for {
time.Sleep(1 * time.Second)
time.Sleep(500 * time.Millisecond)
caughtUp := true
for _, r := range reactorPairs {
if !r.reactor.pool.IsCaughtUp() {
Expand Down Expand Up @@ -338,14 +327,7 @@ func TestCheckSwitchToConsensusLastHeightZero(t *testing.T) {
reactorPairs := make([]ReactorPair, 1, 2)
reactorPairs[0] = newReactor(t, log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[0].reactor.switchToConsensusMs = 50
defer func() {
for _, r := range reactorPairs {
err := r.reactor.Stop()
require.NoError(t, err)
err = r.app.Stop()
require.NoError(t, err)
}
}()
defer stopReactorPairs(t, reactorPairs)

reactorPairs = append(reactorPairs, newReactor(t, log.TestingLogger(), genDoc, privVals, maxBlockHeight))

Expand Down Expand Up @@ -404,14 +386,7 @@ func ExtendedCommitNetworkHelper(t *testing.T, maxBlockHeight int64, enableVoteE
reactorPairs := make([]ReactorPair, 1, 2)
reactorPairs[0] = newReactor(t, log.TestingLogger(), genDoc, privVals, 0)
reactorPairs[0].reactor.switchToConsensusMs = 50
defer func() {
for _, r := range reactorPairs {
err := r.reactor.Stop()
require.NoError(t, err)
err = r.app.Stop()
require.NoError(t, err)
}
}()
defer stopReactorPairs(t, reactorPairs)

reactorPairs = append(reactorPairs, newReactor(t, log.TestingLogger(), genDoc, privVals, maxBlockHeight, invalidBlockHeightAt))

Expand Down

0 comments on commit 9902795

Please sign in to comment.