From 990279561bc6659822681d178e1b59d514626d1b Mon Sep 17 00:00:00 2001 From: Dev Ojha Date: Thu, 26 Dec 2024 12:02:49 -0600 Subject: [PATCH] Blocksync compatible refactoring --- blocksync/pool.go | 2 + blocksync/reactor.go | 101 +++++++++++++++++++++++--------------- blocksync/reactor_test.go | 55 ++++++--------------- 3 files changed, 78 insertions(+), 80 deletions(-) diff --git a/blocksync/pool.go b/blocksync/pool.go index 2020f98eb9e..2dfdaf21a91 100644 --- a/blocksync/pool.go +++ b/blocksync/pool.go @@ -671,6 +671,8 @@ func (bpr *bpRequester) setBlock(block *types.Block, extCommit *types.ExtendedCo return true // getting a block from both peers is not an error } + // TODO: Shall we run block validation here? + bpr.block = block bpr.extCommit = extCommit bpr.gotBlockFrom = peerID diff --git a/blocksync/reactor.go b/blocksync/reactor.go index 853041c5f12..281284b2cd7 100644 --- a/blocksync/reactor.go +++ b/blocksync/reactor.go @@ -13,6 +13,7 @@ import ( sm "github.com/cometbft/cometbft/state" "github.com/cometbft/cometbft/store" "github.com/cometbft/cometbft/types" + "github.com/pkg/errors" ) const ( @@ -249,28 +250,31 @@ func (bcR *Reactor) respondToPeer(msg *bcproto.BlockRequest, src p2p.Peer) (queu } func (bcR *Reactor) handlePeerResponse(msg *bcproto.BlockResponse, src p2p.Peer) { - bi, err := types.BlockFromProto(msg.Block) + bi, extCommit, err := blockFromMsg(msg) if err != nil { bcR.Logger.Error("Peer sent us invalid block", "peer", src, "msg", msg, "err", err) bcR.Switch.StopPeerForError(src, err) return } + + if err := bcR.pool.AddBlock(src.ID(), bi, extCommit, msg.Block.Size()); err != nil { + bcR.Logger.Error("failed to add block", "peer", src, "err", err) + } +} + +func blockFromMsg(msg *bcproto.BlockResponse) (*types.Block, *types.ExtendedCommit, error) { + bi, err := types.BlockFromProto(msg.Block) + if err != nil { + return nil, nil, err + } var extCommit *types.ExtendedCommit if msg.ExtCommit != nil { - var err error extCommit, err = types.ExtendedCommitFromProto(msg.ExtCommit) - if err != nil { - bcR.Logger.Error("failed to convert extended commit from proto", - "peer", src, - "err", err) - bcR.Switch.StopPeerForError(src, err) - return - } } - - if err := bcR.pool.AddBlock(src.ID(), bi, extCommit, msg.Block.Size()); err != nil { - bcR.Logger.Error("failed to add block", "peer", src, "err", err) + if err != nil { + return nil, nil, errors.Wrap(err, "failed to convert extended commit from proto") } + return bi, extCommit, nil } // Receive implements Reactor by handling 4 types of messages (look below). @@ -317,35 +321,26 @@ func (bcR *Reactor) localNodeBlocksTheChain(state sm.State) bool { return val.VotingPower >= total/3 } -// Handle messages from the poolReactor telling the reactor what to do. -// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! -func (bcR *Reactor) poolRoutine(stateSynced bool) { - bcR.metrics.Syncing.Set(1) - defer bcR.metrics.Syncing.Set(0) - - trySyncTicker := time.NewTicker(trySyncIntervalMS * time.Millisecond) - defer trySyncTicker.Stop() - - statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) - defer statusUpdateTicker.Stop() +func (bcR *Reactor) setupTickers() (trySyncTicker *time.Ticker, switchToConsensusTicker *time.Ticker, cleanup func()) { + trySyncTicker = time.NewTicker(trySyncIntervalMS * time.Millisecond) if bcR.switchToConsensusMs == 0 { bcR.switchToConsensusMs = switchToConsensusIntervalSeconds * 1000 } - switchToConsensusTicker := time.NewTicker(time.Duration(bcR.switchToConsensusMs) * time.Millisecond) - defer switchToConsensusTicker.Stop() - - blocksSynced := uint64(0) + switchToConsensusTicker = time.NewTicker(time.Duration(bcR.switchToConsensusMs) * time.Millisecond) - chainID := bcR.initialState.ChainID - state := bcR.initialState - - lastHundred := time.Now() - lastRate := 0.0 - - didProcessCh := make(chan struct{}, 1) + cleanup = func() { + trySyncTicker.Stop() + switchToConsensusTicker.Stop() + } + return trySyncTicker, switchToConsensusTicker, cleanup +} - initialCommitHasExtensions := (bcR.initialState.LastBlockHeight > 0 && bcR.store.LoadBlockExtendedCommit(bcR.initialState.LastBlockHeight) != nil) +// Process our outbound peer block requests. +// This is architecturally confusing right now. Its called from within poolRoutine, but is a fully independent thread. +func (bcR *Reactor) sendPeerRequestRoutine() { + statusUpdateTicker := time.NewTicker(statusUpdateIntervalSeconds * time.Second) + defer statusUpdateTicker.Stop() go func() { for { @@ -379,6 +374,31 @@ func (bcR *Reactor) poolRoutine(stateSynced bool) { } } }() +} + +// Handle messages from the poolReactor telling the reactor what to do. +// NOTE: Don't sleep in the FOR_LOOP or otherwise slow it down! +func (bcR *Reactor) poolRoutine(stateSynced bool) { + bcR.metrics.Syncing.Set(1) + defer bcR.metrics.Syncing.Set(0) + + trySyncTicker, switchToConsensusTicker, cleanupTickers := bcR.setupTickers() + defer cleanupTickers() + + blocksSynced := uint64(0) + + chainID := bcR.initialState.ChainID + state := bcR.initialState + + lastHundred := time.Now() + lastRate := 0.0 + + didProcessCh := make(chan struct{}, 1) + + initialCommitHasExtensions := (bcR.initialState.LastBlockHeight > 0 && bcR.store.LoadBlockExtendedCommit(bcR.initialState.LastBlockHeight) != nil) + + // Process our outbound peer block requests. + bcR.sendPeerRequestRoutine() FOR_LOOP: for { @@ -423,7 +443,7 @@ FOR_LOOP: "initial_height", state.InitialHeight, "max_peer_height", bcR.pool.MaxPeerHeight(), ) - continue FOR_LOOP + continue } if bcR.pool.IsCaughtUp() || bcR.localNodeBlocksTheChain(state) { bcR.Logger.Info("Time to switch to consensus reactor!", "height", height) @@ -461,7 +481,7 @@ FOR_LOOP: if first == nil || second == nil { // we need to have fetched two consecutive blocks in order to // perform blocksync verification - continue FOR_LOOP + continue } // Some sanity checks on heights if state.LastBlockHeight > 0 && state.LastBlockHeight+1 != first.Height { @@ -532,7 +552,7 @@ FOR_LOOP: // still need to clean up the rest. bcR.Switch.StopPeerForError(peer2, ErrReactorValidation{Err: err}) } - continue FOR_LOOP + continue } bcR.pool.PopRequest() @@ -559,14 +579,15 @@ FOR_LOOP: blocksSynced++ if blocksSynced%100 == 0 { + // Calculate exponential moving average of blocks/second sync rate: + // Rate = 0.9 * previous_rate + 0.1 * (100 blocks / time_since_last_100_blocks) lastRate = 0.9*lastRate + 0.1*(100/time.Since(lastHundred).Seconds()) bcR.Logger.Info("Block Sync Rate", "height", bcR.pool.height, "max_peer_height", bcR.pool.MaxPeerHeight(), "blocks/s", lastRate) lastHundred = time.Now() } - continue FOR_LOOP - + continue case <-bcR.Quit(): break FOR_LOOP case <-bcR.pool.Quit(): diff --git a/blocksync/reactor_test.go b/blocksync/reactor_test.go index 7960958b7b0..3154f1fba84 100644 --- a/blocksync/reactor_test.go +++ b/blocksync/reactor_test.go @@ -60,6 +60,15 @@ type ReactorPair struct { app proxy.AppConns } +func stopReactorPairs(t *testing.T, reactorPairs []ReactorPair) { + for _, r := range reactorPairs { + err := r.reactor.Stop() + require.NoError(t, err) + err = r.app.Stop() + require.NoError(t, err) + } +} + func newReactor( t *testing.T, logger log.Logger, @@ -201,14 +210,7 @@ func TestNoBlockResponse(t *testing.T) { return s }, p2p.Connect2Switches) - defer func() { - for _, r := range reactorPairs { - err := r.reactor.Stop() - require.NoError(t, err) - err = r.app.Stop() - require.NoError(t, err) - } - }() + defer stopReactorPairs(t, reactorPairs) tests := []struct { height int64 @@ -256,12 +258,7 @@ func TestBadBlockStopsPeer(t *testing.T) { otherGenDoc, otherPrivVals := randGenesisDoc(1, false, 30) otherChain := newReactor(t, log.TestingLogger(), otherGenDoc, otherPrivVals, maxBlockHeight) - defer func() { - err := otherChain.reactor.Stop() - require.Error(t, err) - err = otherChain.app.Stop() - require.NoError(t, err) - }() + defer stopReactorPairs(t, []ReactorPair{otherChain}) reactorPairs := make([]ReactorPair, 4) @@ -275,18 +272,10 @@ func TestBadBlockStopsPeer(t *testing.T) { return s }, p2p.Connect2Switches) - defer func() { - for _, r := range reactorPairs { - err := r.reactor.Stop() - require.NoError(t, err) - - err = r.app.Stop() - require.NoError(t, err) - } - }() + defer stopReactorPairs(t, reactorPairs) for { - time.Sleep(1 * time.Second) + time.Sleep(500 * time.Millisecond) caughtUp := true for _, r := range reactorPairs { if !r.reactor.pool.IsCaughtUp() { @@ -338,14 +327,7 @@ func TestCheckSwitchToConsensusLastHeightZero(t *testing.T) { reactorPairs := make([]ReactorPair, 1, 2) reactorPairs[0] = newReactor(t, log.TestingLogger(), genDoc, privVals, 0) reactorPairs[0].reactor.switchToConsensusMs = 50 - defer func() { - for _, r := range reactorPairs { - err := r.reactor.Stop() - require.NoError(t, err) - err = r.app.Stop() - require.NoError(t, err) - } - }() + defer stopReactorPairs(t, reactorPairs) reactorPairs = append(reactorPairs, newReactor(t, log.TestingLogger(), genDoc, privVals, maxBlockHeight)) @@ -404,14 +386,7 @@ func ExtendedCommitNetworkHelper(t *testing.T, maxBlockHeight int64, enableVoteE reactorPairs := make([]ReactorPair, 1, 2) reactorPairs[0] = newReactor(t, log.TestingLogger(), genDoc, privVals, 0) reactorPairs[0].reactor.switchToConsensusMs = 50 - defer func() { - for _, r := range reactorPairs { - err := r.reactor.Stop() - require.NoError(t, err) - err = r.app.Stop() - require.NoError(t, err) - } - }() + defer stopReactorPairs(t, reactorPairs) reactorPairs = append(reactorPairs, newReactor(t, log.TestingLogger(), genDoc, privVals, maxBlockHeight, invalidBlockHeightAt))