From 50841e1d95b170b61bdcad9dee989b3ba4ab0ce1 Mon Sep 17 00:00:00 2001 From: Yacov Manevich Date: Fri, 20 Sep 2024 19:39:06 +0200 Subject: [PATCH] Simplify common.Engine API Halt() is only used for the bootstrapper engine, and not invoked on other engine types. Context() is only used in tests. Therefore, this commit removes these methods from the common engine API to simplify it. Signed-off-by: Yacov Manevich --- chains/manager.go | 14 +++++- .../avalanche/bootstrap/bootstrapper.go | 46 ++++++++--------- snow/engine/common/engine.go | 12 ----- snow/engine/common/halter.go | 9 ++-- snow/engine/common/no_ops_handlers.go | 7 --- snow/engine/common/traced_engine.go | 12 ----- snow/engine/enginetest/engine.go | 26 ---------- snow/engine/snowman/bootstrap/bootstrapper.go | 7 +-- .../snowman/bootstrap/bootstrapper_test.go | 38 ++++++++------ snow/engine/snowman/bootstrap/config.go | 2 + snow/engine/snowman/bootstrap/storage.go | 7 ++- snow/engine/snowman/bootstrap/storage_test.go | 4 +- snow/engine/snowman/engine.go | 2 - snow/engine/snowman/syncer/state_syncer.go | 6 --- snow/networking/handler/handler.go | 50 +++++++------------ snow/networking/handler/handler_test.go | 6 +++ snow/networking/handler/health_test.go | 1 + snow/networking/router/chain_router_test.go | 7 +++ snow/networking/sender/sender_test.go | 3 ++ vms/platformvm/vm_test.go | 4 ++ 20 files changed, 112 insertions(+), 151 deletions(-) diff --git a/chains/manager.go b/chains/manager.go index a5375d6aacbc..bed26ebb8ada 100644 --- a/chains/manager.go +++ b/chains/manager.go @@ -888,6 +888,8 @@ func (m *manager) createAvalancheChain( return nil, err } + var halter common.Halter + // Asynchronously passes messages from the network to the consensus engine h, err := handler.New( ctx, @@ -900,6 +902,7 @@ func (m *manager) createAvalancheChain( connectedValidators, peerTracker, handlerReg, + halter.Halt, ) if err != nil { return nil, fmt.Errorf("error initializing network handler: %w", err) @@ -950,6 +953,7 @@ func (m *manager) createAvalancheChain( // create bootstrap gear bootstrapCfg := smbootstrap.Config{ + ShouldHalt: halter.Halted, NonVerifyingParse: block.ParseFunc(proposerVM.ParseLocalBlock), AllGetsServer: snowGetHandler, Ctx: ctx, @@ -1012,17 +1016,19 @@ func (m *manager) createAvalancheChain( avalancheBootstrapperConfig.StopVertexID = m.Upgrades.CortinaXChainStopVertexID } - avalancheBootstrapper, err := avbootstrap.New( + var avalancheBootstrapper common.BootstrapableEngine + avBoot, err := avbootstrap.New( avalancheBootstrapperConfig, snowmanBootstrapper.Start, avalancheMetrics, ) + avalancheBootstrapper = avBoot if err != nil { return nil, fmt.Errorf("error initializing avalanche bootstrapper: %w", err) } if m.TracingEnabled { - avalancheBootstrapper = common.TraceBootstrapableEngine(avalancheBootstrapper, m.Tracer) + avalancheBootstrapper = common.TraceBootstrapableEngine(avBoot, m.Tracer) } h.SetEngineManager(&handler.EngineManager{ @@ -1277,6 +1283,8 @@ func (m *manager) createSnowmanChain( return nil, err } + var halter common.Halter + // Asynchronously passes messages from the network to the consensus engine h, err := handler.New( ctx, @@ -1289,6 +1297,7 @@ func (m *manager) createSnowmanChain( connectedValidators, peerTracker, handlerReg, + halter.Halt, ) if err != nil { return nil, fmt.Errorf("couldn't initialize message handler: %w", err) @@ -1340,6 +1349,7 @@ func (m *manager) createSnowmanChain( // create bootstrap gear bootstrapCfg := smbootstrap.Config{ + ShouldHalt: halter.Halted, NonVerifyingParse: block.ParseFunc(proposerVM.ParseLocalBlock), AllGetsServer: snowGetHandler, Ctx: ctx, diff --git a/snow/engine/avalanche/bootstrap/bootstrapper.go b/snow/engine/avalanche/bootstrap/bootstrapper.go index 00f9ab64a458..e8393274c0f8 100644 --- a/snow/engine/avalanche/bootstrap/bootstrapper.go +++ b/snow/engine/avalanche/bootstrap/bootstrapper.go @@ -43,14 +43,14 @@ const ( epsilon = 1e-6 // small amount to add to time to avoid division by 0 ) -var _ common.BootstrapableEngine = (*bootstrapper)(nil) +var _ common.BootstrapableEngine = (*Bootstrapper)(nil) func New( config Config, onFinished func(ctx context.Context, lastReqID uint32) error, reg prometheus.Registerer, -) (common.BootstrapableEngine, error) { - b := &bootstrapper{ +) (*Bootstrapper, error) { + b := &Bootstrapper{ Config: config, StateSummaryFrontierHandler: common.NewNoOpStateSummaryFrontierHandler(config.Ctx.Log), @@ -72,12 +72,12 @@ func New( } // Note: To align with the Snowman invariant, it should be guaranteed the VM is -// not used until after the bootstrapper has been Started. -type bootstrapper struct { +// not used until after the Bootstrapper has been Started. +type Bootstrapper struct { Config common.Halter - // list of NoOpsHandler for messages dropped by bootstrapper + // list of NoOpsHandler for messages dropped by Bootstrapper common.StateSummaryFrontierHandler common.AcceptedStateSummaryHandler common.AcceptedFrontierHandler @@ -107,11 +107,11 @@ type bootstrapper struct { onFinished func(ctx context.Context, lastReqID uint32) error } -func (b *bootstrapper) Context() *snow.ConsensusContext { +func (b *Bootstrapper) Context() *snow.ConsensusContext { return b.Ctx } -func (b *bootstrapper) Clear(context.Context) error { +func (b *Bootstrapper) Clear(context.Context) error { b.Ctx.Lock.Lock() defer b.Ctx.Lock.Unlock() @@ -130,7 +130,7 @@ func (b *bootstrapper) Clear(context.Context) error { // Ancestors handles the receipt of multiple containers. Should be received in // response to a GetAncestors message to [nodeID] with request ID [requestID]. // Expects vtxs[0] to be the vertex requested in the corresponding GetAncestors. -func (b *bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, vtxs [][]byte) error { +func (b *Bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, requestID uint32, vtxs [][]byte) error { request := common.Request{ NodeID: nodeID, RequestID: requestID, @@ -254,7 +254,7 @@ func (b *bootstrapper) Ancestors(ctx context.Context, nodeID ids.NodeID, request return b.process(ctx, verticesToProcess...) } -func (b *bootstrapper) GetAncestorsFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { +func (b *Bootstrapper) GetAncestorsFailed(ctx context.Context, nodeID ids.NodeID, requestID uint32) error { request := common.Request{ NodeID: nodeID, RequestID: requestID, @@ -276,7 +276,7 @@ func (b *bootstrapper) GetAncestorsFailed(ctx context.Context, nodeID ids.NodeID return b.fetch(ctx, vtxID) } -func (b *bootstrapper) Connected( +func (b *Bootstrapper) Connected( ctx context.Context, nodeID ids.NodeID, nodeVersion *version.Application, @@ -288,7 +288,7 @@ func (b *bootstrapper) Connected( return b.StartupTracker.Connected(ctx, nodeID, nodeVersion) } -func (b *bootstrapper) Disconnected(ctx context.Context, nodeID ids.NodeID) error { +func (b *Bootstrapper) Disconnected(ctx context.Context, nodeID ids.NodeID) error { if err := b.VM.Disconnected(ctx, nodeID); err != nil { return err } @@ -296,16 +296,16 @@ func (b *bootstrapper) Disconnected(ctx context.Context, nodeID ids.NodeID) erro return b.StartupTracker.Disconnected(ctx, nodeID) } -func (*bootstrapper) Timeout(context.Context) error { +func (*Bootstrapper) Timeout(context.Context) error { return nil } -func (*bootstrapper) Gossip(context.Context) error { +func (*Bootstrapper) Gossip(context.Context) error { return nil } -func (b *bootstrapper) Shutdown(ctx context.Context) error { - b.Ctx.Log.Info("shutting down bootstrapper") +func (b *Bootstrapper) Shutdown(ctx context.Context) error { + b.Ctx.Log.Info("shutting down Bootstrapper") b.Ctx.Lock.Lock() defer b.Ctx.Lock.Unlock() @@ -313,11 +313,11 @@ func (b *bootstrapper) Shutdown(ctx context.Context) error { return b.VM.Shutdown(ctx) } -func (*bootstrapper) Notify(context.Context, common.Message) error { +func (*Bootstrapper) Notify(context.Context, common.Message) error { return nil } -func (b *bootstrapper) Start(ctx context.Context, startReqID uint32) error { +func (b *Bootstrapper) Start(ctx context.Context, startReqID uint32) error { b.Ctx.Log.Info("starting bootstrap") b.Ctx.State.Set(snow.EngineState{ @@ -388,7 +388,7 @@ func (b *bootstrapper) Start(ctx context.Context, startReqID uint32) error { return b.startSyncing(ctx, nil) } -func (b *bootstrapper) HealthCheck(ctx context.Context) (interface{}, error) { +func (b *Bootstrapper) HealthCheck(ctx context.Context) (interface{}, error) { b.Ctx.Lock.Lock() defer b.Ctx.Lock.Unlock() @@ -403,7 +403,7 @@ func (b *bootstrapper) HealthCheck(ctx context.Context) (interface{}, error) { // Add the vertices in [vtxIDs] to the set of vertices that we need to fetch, // and then fetch vertices (and their ancestors) until either there are no more // to fetch or we are at the maximum number of outstanding requests. -func (b *bootstrapper) fetch(ctx context.Context, vtxIDs ...ids.ID) error { +func (b *Bootstrapper) fetch(ctx context.Context, vtxIDs ...ids.ID) error { b.needToFetch.Add(vtxIDs...) for b.needToFetch.Len() > 0 && b.outstandingRequests.Len() < maxOutstandingGetAncestorsRequests { vtxID, _ := b.needToFetch.Pop() // Length checked in predicate above @@ -442,7 +442,7 @@ func (b *bootstrapper) fetch(ctx context.Context, vtxIDs ...ids.ID) error { } // Process the vertices in [vtxs]. -func (b *bootstrapper) process(ctx context.Context, vtxs ...avalanche.Vertex) error { +func (b *Bootstrapper) process(ctx context.Context, vtxs ...avalanche.Vertex) error { // Vertices that we need to process prioritized by vertices that are unknown // or the furthest down the DAG. Unknown vertices are prioritized to ensure // that once we have made it below a certain height in DAG traversal we do @@ -565,7 +565,7 @@ func (b *bootstrapper) process(ctx context.Context, vtxs ...avalanche.Vertex) er } // startSyncing starts bootstrapping. Process the vertices in [accepterContainerIDs]. -func (b *bootstrapper) startSyncing(ctx context.Context, acceptedContainerIDs []ids.ID) error { +func (b *Bootstrapper) startSyncing(ctx context.Context, acceptedContainerIDs []ids.ID) error { pendingContainerIDs := b.VtxBlocked.MissingIDs() // Append the list of accepted container IDs to pendingContainerIDs to ensure // we iterate over every container that must be traversed. @@ -592,7 +592,7 @@ func (b *bootstrapper) startSyncing(ctx context.Context, acceptedContainerIDs [] // checkFinish repeatedly executes pending transactions and requests new frontier blocks until there aren't any new ones // after which it finishes the bootstrap process -func (b *bootstrapper) checkFinish(ctx context.Context) error { +func (b *Bootstrapper) checkFinish(ctx context.Context) error { // If we still need to fetch vertices, we can't finish if len(b.VtxBlocked.MissingIDs()) > 0 { return nil diff --git a/snow/engine/common/engine.go b/snow/engine/common/engine.go index 537e33c5f192..b99b586f131f 100644 --- a/snow/engine/common/engine.go +++ b/snow/engine/common/engine.go @@ -9,7 +9,6 @@ import ( "github.com/ava-labs/avalanchego/api/health" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/validators" "github.com/ava-labs/avalanchego/utils/set" ) @@ -23,9 +22,6 @@ import ( type Engine interface { Handler - // Return the context of the chain this engine is working on - Context() *snow.ConsensusContext - // Start engine operations from given request ID Start(ctx context.Context, startReqID uint32) error @@ -425,14 +421,6 @@ type InternalHandler interface { // Gossip to the network a container on the accepted frontier Gossip(context.Context) error - // Halt this engine. - // - // This function will be called before the environment starts exiting. This - // function is special, in that it does not expect the chain's context lock - // to be held before calling this function. This function also does not - // require the engine to have been started. - Halt(context.Context) - // Shutdown this engine. // // This function will be called when the environment is exiting. diff --git a/snow/engine/common/halter.go b/snow/engine/common/halter.go index 1fcea981d2e4..d46e85b181df 100644 --- a/snow/engine/common/halter.go +++ b/snow/engine/common/halter.go @@ -3,15 +3,12 @@ package common -import ( - "context" - "sync/atomic" -) +import "sync/atomic" var _ Haltable = (*Halter)(nil) type Haltable interface { - Halt(context.Context) + Halt() Halted() bool } @@ -19,7 +16,7 @@ type Halter struct { halted uint32 } -func (h *Halter) Halt(context.Context) { +func (h *Halter) Halt() { atomic.StoreUint32(&h.halted, 1) } diff --git a/snow/engine/common/no_ops_handlers.go b/snow/engine/common/no_ops_handlers.go index 0b2247ab3cde..33ea424c968f 100644 --- a/snow/engine/common/no_ops_handlers.go +++ b/snow/engine/common/no_ops_handlers.go @@ -355,13 +355,6 @@ func (nop *noOpInternalHandler) Gossip(context.Context) error { return nil } -func (nop *noOpInternalHandler) Halt(context.Context) { - nop.log.Debug("dropping request", - zap.String("reason", "unhandled by this gear"), - zap.String("messageOp", "halt"), - ) -} - func (nop *noOpInternalHandler) Shutdown(context.Context) error { nop.log.Debug("dropping request", zap.String("reason", "unhandled by this gear"), diff --git a/snow/engine/common/traced_engine.go b/snow/engine/common/traced_engine.go index 4574f0c3364a..a1ca48ad8293 100644 --- a/snow/engine/common/traced_engine.go +++ b/snow/engine/common/traced_engine.go @@ -10,7 +10,6 @@ import ( "go.opentelemetry.io/otel/attribute" "github.com/ava-labs/avalanchego/ids" - "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/trace" "github.com/ava-labs/avalanchego/utils/set" "github.com/ava-labs/avalanchego/version" @@ -344,13 +343,6 @@ func (e *tracedEngine) Gossip(ctx context.Context) error { return e.engine.Gossip(ctx) } -func (e *tracedEngine) Halt(ctx context.Context) { - ctx, span := e.tracer.Start(ctx, "tracedEngine.Halt") - defer span.End() - - e.engine.Halt(ctx) -} - func (e *tracedEngine) Shutdown(ctx context.Context) error { ctx, span := e.tracer.Start(ctx, "tracedEngine.Shutdown") defer span.End() @@ -367,10 +359,6 @@ func (e *tracedEngine) Notify(ctx context.Context, msg Message) error { return e.engine.Notify(ctx, msg) } -func (e *tracedEngine) Context() *snow.ConsensusContext { - return e.engine.Context() -} - func (e *tracedEngine) Start(ctx context.Context, startReqID uint32) error { ctx, span := e.tracer.Start(ctx, "tracedEngine.Start", oteltrace.WithAttributes( attribute.Int64("requestID", int64(startReqID)), diff --git a/snow/engine/enginetest/engine.go b/snow/engine/enginetest/engine.go index 29c577f60325..1ee1b7206590 100644 --- a/snow/engine/enginetest/engine.go +++ b/snow/engine/enginetest/engine.go @@ -189,19 +189,6 @@ func (e *Engine) Start(ctx context.Context, startReqID uint32) error { return errStart } -func (e *Engine) Context() *snow.ConsensusContext { - if e.ContextF != nil { - return e.ContextF() - } - if !e.CantContext { - return nil - } - if e.T != nil { - require.FailNow(e.T, "Unexpectedly called Context") - } - return nil -} - func (e *Engine) Timeout(ctx context.Context) error { if e.TimeoutF != nil { return e.TimeoutF(ctx) @@ -228,19 +215,6 @@ func (e *Engine) Gossip(ctx context.Context) error { return errGossip } -func (e *Engine) Halt(ctx context.Context) { - if e.HaltF != nil { - e.HaltF(ctx) - return - } - if !e.CantHalt { - return - } - if e.T != nil { - require.FailNow(e.T, "Unexpectedly called Halt") - } -} - func (e *Engine) Shutdown(ctx context.Context) error { if e.ShutdownF != nil { return e.ShutdownF(ctx) diff --git a/snow/engine/snowman/bootstrap/bootstrapper.go b/snow/engine/snowman/bootstrap/bootstrapper.go index 966bdcf67d11..bece6f32a1b2 100644 --- a/snow/engine/snowman/bootstrap/bootstrapper.go +++ b/snow/engine/snowman/bootstrap/bootstrapper.go @@ -68,7 +68,7 @@ var ( // called, so it must be guaranteed the VM is not used until after Start. type Bootstrapper struct { Config - common.Halter + shouldHalt func() bool *metrics // list of NoOpsHandler for messages dropped by bootstrapper @@ -120,6 +120,7 @@ type Bootstrapper struct { func New(config Config, onFinished func(ctx context.Context, lastReqID uint32) error) (*Bootstrapper, error) { metrics, err := newMetrics(config.Ctx.Registerer) return &Bootstrapper{ + shouldHalt: config.ShouldHalt, nonVerifyingParser: config.NonVerifyingParse, Config: config, metrics: metrics, @@ -649,7 +650,7 @@ func (b *Bootstrapper) tryStartExecuting(ctx context.Context) error { numToExecute := b.tree.Len() err = execute( ctx, - b, + b.ShouldHalt, log, b.DB, &parseAcceptor{ @@ -673,7 +674,7 @@ func (b *Bootstrapper) tryStartExecuting(ctx context.Context) error { lastAccepted.Height(), ) } - if b.Halted() { + if b.shouldHalt() { return nil } diff --git a/snow/engine/snowman/bootstrap/bootstrapper_test.go b/snow/engine/snowman/bootstrap/bootstrapper_test.go index e35eb1d9990a..14024dc65fb6 100644 --- a/snow/engine/snowman/bootstrap/bootstrapper_test.go +++ b/snow/engine/snowman/bootstrap/bootstrapper_test.go @@ -20,6 +20,7 @@ import ( "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/snow/consensus/snowman" "github.com/ava-labs/avalanchego/snow/consensus/snowman/snowmantest" + "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/common/tracker" "github.com/ava-labs/avalanchego/snow/engine/enginetest" "github.com/ava-labs/avalanchego/snow/engine/snowman/block/blocktest" @@ -35,7 +36,7 @@ import ( var errUnknownBlock = errors.New("unknown block") -func newConfig(t *testing.T) (Config, ids.NodeID, *enginetest.Sender, *blocktest.VM) { +func newConfig(t *testing.T) (Config, ids.NodeID, *enginetest.Sender, *blocktest.VM, func()) { require := require.New(t) snowCtx := snowtest.Context(t, snowtest.CChainID) @@ -89,7 +90,10 @@ func newConfig(t *testing.T) (Config, ids.NodeID, *enginetest.Sender, *blocktest peerTracker.Connected(peer, version.CurrentApp) + var halter common.Halter + return Config{ + ShouldHalt: halter.Halted, NonVerifyingParse: vm.ParseBlock, AllGetsServer: snowGetHandler, Ctx: ctx, @@ -103,7 +107,7 @@ func newConfig(t *testing.T) (Config, ids.NodeID, *enginetest.Sender, *blocktest AncestorsMaxContainersReceived: 2000, DB: memdb.New(), VM: vm, - }, peer, sender, vm + }, peer, sender, vm, halter.Halt } func TestBootstrapperStartsOnlyIfEnoughStakeIsConnected(t *testing.T) { @@ -140,6 +144,9 @@ func TestBootstrapperStartsOnlyIfEnoughStakeIsConnected(t *testing.T) { require.NoError(err) cfg := Config{ + ShouldHalt: func() bool { + return false + }, AllGetsServer: snowGetHandler, Ctx: ctx, Beacons: peers, @@ -213,7 +220,7 @@ func TestBootstrapperStartsOnlyIfEnoughStakeIsConnected(t *testing.T) { func TestBootstrapperSingleFrontier(t *testing.T) { require := require.New(t) - config, _, _, vm := newConfig(t) + config, _, _, vm, _ := newConfig(t) blks := snowmantest.BuildChain(1) initializeVMWithBlockchain(vm, blks) @@ -241,7 +248,7 @@ func TestBootstrapperSingleFrontier(t *testing.T) { func TestBootstrapperUnknownByzantineResponse(t *testing.T) { require := require.New(t) - config, peerID, sender, vm := newConfig(t) + config, peerID, sender, vm, _ := newConfig(t) blks := snowmantest.BuildChain(2) initializeVMWithBlockchain(vm, blks) @@ -286,7 +293,7 @@ func TestBootstrapperUnknownByzantineResponse(t *testing.T) { func TestBootstrapperPartialFetch(t *testing.T) { require := require.New(t) - config, peerID, sender, vm := newConfig(t) + config, peerID, sender, vm, _ := newConfig(t) blks := snowmantest.BuildChain(4) initializeVMWithBlockchain(vm, blks) @@ -336,7 +343,7 @@ func TestBootstrapperPartialFetch(t *testing.T) { func TestBootstrapperEmptyResponse(t *testing.T) { require := require.New(t) - config, peerID, sender, vm := newConfig(t) + config, peerID, sender, vm, _ := newConfig(t) blks := snowmantest.BuildChain(2) initializeVMWithBlockchain(vm, blks) @@ -384,7 +391,7 @@ func TestBootstrapperEmptyResponse(t *testing.T) { func TestBootstrapperAncestors(t *testing.T) { require := require.New(t) - config, peerID, sender, vm := newConfig(t) + config, peerID, sender, vm, _ := newConfig(t) blks := snowmantest.BuildChain(4) initializeVMWithBlockchain(vm, blks) @@ -429,7 +436,7 @@ func TestBootstrapperAncestors(t *testing.T) { func TestBootstrapperFinalized(t *testing.T) { require := require.New(t) - config, peerID, sender, vm := newConfig(t) + config, peerID, sender, vm, _ := newConfig(t) blks := snowmantest.BuildChain(3) initializeVMWithBlockchain(vm, blks) @@ -471,7 +478,7 @@ func TestBootstrapperFinalized(t *testing.T) { func TestRestartBootstrapping(t *testing.T) { require := require.New(t) - config, peerID, sender, vm := newConfig(t) + config, peerID, sender, vm, _ := newConfig(t) blks := snowmantest.BuildChain(5) initializeVMWithBlockchain(vm, blks) @@ -532,7 +539,7 @@ func TestRestartBootstrapping(t *testing.T) { func TestBootstrapOldBlockAfterStateSync(t *testing.T) { require := require.New(t) - config, peerID, sender, vm := newConfig(t) + config, peerID, sender, vm, _ := newConfig(t) blks := snowmantest.BuildChain(2) initializeVMWithBlockchain(vm, blks) @@ -575,7 +582,7 @@ func TestBootstrapOldBlockAfterStateSync(t *testing.T) { func TestBootstrapContinueAfterHalt(t *testing.T) { require := require.New(t) - config, _, _, vm := newConfig(t) + config, _, _, vm, halt := newConfig(t) blks := snowmantest.BuildChain(2) initializeVMWithBlockchain(vm, blks) @@ -594,7 +601,7 @@ func TestBootstrapContinueAfterHalt(t *testing.T) { getBlockF := vm.GetBlockF vm.GetBlockF = func(ctx context.Context, blkID ids.ID) (snowman.Block, error) { - bs.Halt(ctx) + halt() return getBlockF(ctx, blkID) } @@ -672,6 +679,9 @@ func TestBootstrapNoParseOnNew(t *testing.T) { peerTracker.Connected(peer, version.CurrentApp) config := Config{ + ShouldHalt: func() bool { + return false + }, AllGetsServer: snowGetHandler, Ctx: ctx, Beacons: peers, @@ -702,7 +712,7 @@ func TestBootstrapNoParseOnNew(t *testing.T) { func TestBootstrapperReceiveStaleAncestorsMessage(t *testing.T) { require := require.New(t) - config, peerID, sender, vm := newConfig(t) + config, peerID, sender, vm, _ := newConfig(t) blks := snowmantest.BuildChain(3) initializeVMWithBlockchain(vm, blks) @@ -745,7 +755,7 @@ func TestBootstrapperReceiveStaleAncestorsMessage(t *testing.T) { func TestBootstrapperRollbackOnSetState(t *testing.T) { require := require.New(t) - config, _, _, vm := newConfig(t) + config, _, _, vm, _ := newConfig(t) blks := snowmantest.BuildChain(2) initializeVMWithBlockchain(vm, blks) diff --git a/snow/engine/snowman/bootstrap/config.go b/snow/engine/snowman/bootstrap/config.go index d501e37f5499..1211e68ebbb6 100644 --- a/snow/engine/snowman/bootstrap/config.go +++ b/snow/engine/snowman/bootstrap/config.go @@ -42,4 +42,6 @@ type Config struct { NonVerifyingParse block.ParseFunc Bootstrapped func() + + ShouldHalt func() bool } diff --git a/snow/engine/snowman/bootstrap/storage.go b/snow/engine/snowman/bootstrap/storage.go index bc5488e3870e..1ddbb5731a3e 100644 --- a/snow/engine/snowman/bootstrap/storage.go +++ b/snow/engine/snowman/bootstrap/storage.go @@ -13,7 +13,6 @@ import ( "github.com/ava-labs/avalanchego/database" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow/consensus/snowman" - "github.com/ava-labs/avalanchego/snow/engine/common" "github.com/ava-labs/avalanchego/snow/engine/snowman/block" "github.com/ava-labs/avalanchego/snow/engine/snowman/bootstrap/interval" "github.com/ava-labs/avalanchego/utils/logging" @@ -127,7 +126,7 @@ func process( // TODO: Replace usage of haltable with context cancellation. func execute( ctx context.Context, - haltable common.Haltable, + shouldHalt func() bool, log logging.Func, db database.Database, nonVerifyingParser block.Parser, @@ -172,7 +171,7 @@ func execute( var ( numProcessed = totalNumberToProcess - tree.Len() - halted = haltable.Halted() + halted = shouldHalt() ) if numProcessed >= minBlocksToCompact && !halted { log("compacting database after executing blocks...") @@ -196,7 +195,7 @@ func execute( zap.Uint64("numToExecute", totalNumberToProcess), ) - for !haltable.Halted() && iterator.Next() { + for !shouldHalt() && iterator.Next() { blkBytes := iterator.Value() blk, err := nonVerifyingParser.ParseBlock(ctx, blkBytes) if err != nil { diff --git a/snow/engine/snowman/bootstrap/storage_test.go b/snow/engine/snowman/bootstrap/storage_test.go index 44ac1621226d..cafb79514921 100644 --- a/snow/engine/snowman/bootstrap/storage_test.go +++ b/snow/engine/snowman/bootstrap/storage_test.go @@ -221,7 +221,7 @@ func TestExecute(t *testing.T) { unhalted := &common.Halter{} halted := &common.Halter{} - halted.Halt(context.Background()) + halted.Halt() tests := []struct { name string @@ -269,7 +269,7 @@ func TestExecute(t *testing.T) { require.NoError(execute( context.Background(), - test.haltable, + test.haltable.Halted, logging.NoLog{}.Info, db, parser, diff --git a/snow/engine/snowman/engine.go b/snow/engine/snowman/engine.go index bdc791c2cf4c..0998a9e9f56d 100644 --- a/snow/engine/snowman/engine.go +++ b/snow/engine/snowman/engine.go @@ -434,8 +434,6 @@ func (*Engine) Timeout(context.Context) error { return nil } -func (*Engine) Halt(context.Context) {} - func (e *Engine) Shutdown(ctx context.Context) error { e.Ctx.Log.Info("shutting down consensus engine") diff --git a/snow/engine/snowman/syncer/state_syncer.go b/snow/engine/snowman/syncer/state_syncer.go index 2fea946f3106..76e647e73a64 100644 --- a/snow/engine/snowman/syncer/state_syncer.go +++ b/snow/engine/snowman/syncer/state_syncer.go @@ -109,10 +109,6 @@ func New( } } -func (ss *stateSyncer) Context() *snow.ConsensusContext { - return ss.Ctx -} - func (ss *stateSyncer) Start(ctx context.Context, startReqID uint32) error { ss.Ctx.Log.Info("starting state sync") @@ -604,8 +600,6 @@ func (ss *stateSyncer) Shutdown(ctx context.Context) error { return ss.VM.Shutdown(ctx) } -func (*stateSyncer) Halt(context.Context) {} - func (*stateSyncer) Timeout(context.Context) error { return nil } diff --git a/snow/networking/handler/handler.go b/snow/networking/handler/handler.go index 73e40ceee60f..9ab6c614c582 100644 --- a/snow/networking/handler/handler.go +++ b/snow/networking/handler/handler.go @@ -77,6 +77,8 @@ type Handler interface { // handler passes incoming messages from the network to the consensus engine. // (Actually, it receives the incoming messages from a ChainRouter, but same difference.) type handler struct { + haltBootstrapping func() + metrics *metrics // Useful for faking time in tests @@ -138,20 +140,22 @@ func New( peerTracker commontracker.Peers, p2pTracker *p2p.PeerTracker, reg prometheus.Registerer, + haltBootstrapping func(), ) (Handler, error) { h := &handler{ - ctx: ctx, - validators: validators, - msgFromVMChan: msgFromVMChan, - preemptTimeouts: subnet.OnBootstrapCompleted(), - gossipFrequency: gossipFrequency, - timeouts: make(chan struct{}, 1), - closingChan: make(chan struct{}), - closed: make(chan struct{}), - resourceTracker: resourceTracker, - subnet: subnet, - peerTracker: peerTracker, - p2pTracker: p2pTracker, + haltBootstrapping: haltBootstrapping, + ctx: ctx, + validators: validators, + msgFromVMChan: msgFromVMChan, + preemptTimeouts: subnet.OnBootstrapCompleted(), + gossipFrequency: gossipFrequency, + timeouts: make(chan struct{}, 1), + closingChan: make(chan struct{}), + closed: make(chan struct{}), + resourceTracker: resourceTracker, + subnet: subnet, + peerTracker: peerTracker, + p2pTracker: p2pTracker, } h.asyncMessagePool.SetLimit(threadPoolSize) @@ -316,7 +320,7 @@ func (h *handler) RegisterTimeout(d time.Duration) { // Note: It is possible for Stop to be called before/concurrently with Start. // // Invariant: Stop must never block. -func (h *handler) Stop(ctx context.Context) { +func (h *handler) Stop(_ context.Context) { h.closeOnce.Do(func() { h.startClosingTime = h.clock.Time() @@ -325,25 +329,7 @@ func (h *handler) Stop(ctx context.Context) { h.syncMessageQueue.Shutdown() h.asyncMessageQueue.Shutdown() close(h.closingChan) - - // TODO: switch this to use a [context.Context] with a cancel function. - // - // Don't process any more bootstrap messages. If a dispatcher is - // processing a bootstrap message, stop. We do this because if we - // didn't, and the engine was in the middle of executing state - // transitions during bootstrapping, we wouldn't be able to grab - // [h.ctx.Lock] until the engine finished executing state transitions, - // which may take a long time. As a result, the router would time out on - // shutting down this chain. - state := h.ctx.State.Get() - bootstrapper, ok := h.engineManager.Get(state.Type).Get(snow.Bootstrapping) - if !ok { - h.ctx.Log.Error("bootstrapping engine doesn't exist", - zap.Stringer("type", state.Type), - ) - return - } - bootstrapper.Halt(ctx) + h.haltBootstrapping() }) } diff --git a/snow/networking/handler/handler_test.go b/snow/networking/handler/handler_test.go index a2b5aa9acc8b..661ba17300fb 100644 --- a/snow/networking/handler/handler_test.go +++ b/snow/networking/handler/handler_test.go @@ -77,6 +77,7 @@ func TestHandlerDropsTimedOutMessages(t *testing.T) { commontracker.NewPeers(), peerTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) handler := handlerIntf.(*handler) @@ -183,6 +184,7 @@ func TestHandlerClosesOnError(t *testing.T) { commontracker.NewPeers(), peerTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) handler := handlerIntf.(*handler) @@ -285,6 +287,7 @@ func TestHandlerDropsGossipDuringBootstrapping(t *testing.T) { commontracker.NewPeers(), peerTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) handler := handlerIntf.(*handler) @@ -375,6 +378,7 @@ func TestHandlerDispatchInternal(t *testing.T) { commontracker.NewPeers(), peerTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -550,6 +554,7 @@ func TestDynamicEngineTypeDispatch(t *testing.T) { commontracker.NewPeers(), peerTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -632,6 +637,7 @@ func TestHandlerStartError(t *testing.T) { commontracker.NewPeers(), peerTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) diff --git a/snow/networking/handler/health_test.go b/snow/networking/handler/health_test.go index aa082b042321..4e7e5732b8de 100644 --- a/snow/networking/handler/health_test.go +++ b/snow/networking/handler/health_test.go @@ -93,6 +93,7 @@ func TestHealthCheckSubnet(t *testing.T) { peerTracker, p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) diff --git a/snow/networking/router/chain_router_test.go b/snow/networking/router/chain_router_test.go index d2770ce1d211..149be6dc7b52 100644 --- a/snow/networking/router/chain_router_test.go +++ b/snow/networking/router/chain_router_test.go @@ -114,6 +114,7 @@ func TestShutdown(t *testing.T) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -239,6 +240,7 @@ func TestConnectedAfterShutdownErrorLogRegression(t *testing.T) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -371,6 +373,7 @@ func TestShutdownTimesOut(t *testing.T) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -539,6 +542,7 @@ func TestRouterTimeout(t *testing.T) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -1070,6 +1074,7 @@ func TestValidatorOnlyMessageDrops(t *testing.T) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -1235,6 +1240,7 @@ func TestValidatorOnlyAllowedNodeMessageDrops(t *testing.T) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -1486,6 +1492,7 @@ func newChainRouterTest(t *testing.T) (*ChainRouter, *enginetest.Engine) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(t, err) diff --git a/snow/networking/sender/sender_test.go b/snow/networking/sender/sender_test.go index 79b9ca0ba087..1f7469575690 100644 --- a/snow/networking/sender/sender_test.go +++ b/snow/networking/sender/sender_test.go @@ -140,6 +140,7 @@ func TestTimeout(t *testing.T) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -398,6 +399,7 @@ func TestReliableMessages(t *testing.T) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) @@ -554,6 +556,7 @@ func TestReliableMessagesToMyself(t *testing.T) { commontracker.NewPeers(), p2pTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err) diff --git a/vms/platformvm/vm_test.go b/vms/platformvm/vm_test.go index 9c87556e7603..2d1740a650e0 100644 --- a/vms/platformvm/vm_test.go +++ b/vms/platformvm/vm_test.go @@ -1319,6 +1319,9 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { require.NoError(err) bootstrapConfig := bootstrap.Config{ + ShouldHalt: func() bool { + return false + }, NonVerifyingParse: vm.ParseBlock, AllGetsServer: snowGetHandler, Ctx: consensusCtx, @@ -1353,6 +1356,7 @@ func TestBootstrapPartiallyAccepted(t *testing.T) { tracker.NewPeers(), peerTracker, prometheus.NewRegistry(), + func() {}, ) require.NoError(err)