Skip to content

Commit

Permalink
server: properly use workerCtx in PreStart
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
knz committed Oct 26, 2022
1 parent f9da6f3 commit 82e2456
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 24 deletions.
8 changes: 4 additions & 4 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func (n *Node) AnnotateCtxWithSpan(
// to carry HTTP, only if httpAddr is non-null will this node accept
// proxied traffic from other nodes.
func (n *Node) start(
ctx context.Context,
ctx, workersCtx context.Context,
addr, sqlAddr, httpAddr net.Addr,
state initState,
initialStart bool,
Expand Down Expand Up @@ -462,7 +462,7 @@ func (n *Node) start(
// Create stores from the engines that were already initialized.
for _, e := range state.initializedEngines {
s := kvserver.NewStore(ctx, n.storeCfg, e, &n.Descriptor)
if err := s.Start(ctx, n.stopper); err != nil {
if err := s.Start(workersCtx, n.stopper); err != nil {
return errors.Wrap(err, "failed to start store")
}

Expand Down Expand Up @@ -519,7 +519,7 @@ func (n *Node) start(
// [1]: It's important to note that store IDs are allocated via a
// sequence ID generator stored in a system key.
n.additionalStoreInitCh = make(chan struct{})
if err := n.stopper.RunAsyncTask(ctx, "initialize-additional-stores", func(ctx context.Context) {
if err := n.stopper.RunAsyncTask(workersCtx, "initialize-additional-stores", func(ctx context.Context) {
if err := n.initializeAdditionalStores(ctx, state.uninitializedEngines, n.stopper); err != nil {
log.Fatalf(ctx, "while initializing additional stores: %v", err)
}
Expand All @@ -540,7 +540,7 @@ func (n *Node) start(
// with a given cluster version, but not if the server starts with a lower
// one and gets bumped immediately, which would be possible if gossip got
// started earlier).
n.startGossiping(ctx, n.stopper)
n.startGossiping(workersCtx, n.stopper)

allEngines := append([]storage.Engine(nil), state.initializedEngines...)
allEngines = append(allEngines, state.uninitializedEngines...)
Expand Down
43 changes: 23 additions & 20 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1074,9 +1074,12 @@ func (s *Server) Start(ctx context.Context) error {
func (s *Server) PreStart(ctx context.Context) error {
ctx = s.AnnotateCtx(ctx)

// Start a context for the asynchronous network workers.
workersCtx := s.AnnotateCtx(context.Background())

// Start the time sanity checker.
s.startTime = timeutil.Now()
if err := s.startMonitoringForwardClockJumps(ctx); err != nil {
if err := s.startMonitoringForwardClockJumps(workersCtx); err != nil {
return err
}

Expand All @@ -1090,9 +1093,6 @@ func (s *Server) PreStart(ctx context.Context) error {
return err
}

// Start a context for the asynchronous network workers.
workersCtx := s.AnnotateCtx(context.Background())

// connManager tracks incoming connections accepted via listeners
// and automatically closes them when the stopper indicates a
// shutdown.
Expand Down Expand Up @@ -1344,7 +1344,7 @@ func (s *Server) PreStart(ctx context.Context) error {
// incoming connections.
startRPCServer(workersCtx)
onInitServerReady()
state, initialStart, err := initServer.ServeAndWait(ctx, s.stopper, &s.cfg.Settings.SV)
state, initialStart, err := initServer.ServeAndWait(workersCtx, s.stopper, &s.cfg.Settings.SV)
if err != nil {
return errors.Wrap(err, "during init")
}
Expand Down Expand Up @@ -1433,7 +1433,7 @@ func (s *Server) PreStart(ctx context.Context) error {
// cluster. Someone has to gossip the ClusterID before Gossip is connected,
// but this gossip only happens once the first range has a leaseholder, i.e.
// when a quorum of nodes has gone fully operational.
_ = s.stopper.RunAsyncTask(ctx, "connect-gossip", func(ctx context.Context) {
_ = s.stopper.RunAsyncTask(workersCtx, "connect-gossip", func(ctx context.Context) {
log.Ops.Infof(ctx, "connecting to gossip network to verify cluster ID %q", state.clusterID)
select {
case <-s.gossip.Connected:
Expand All @@ -1445,7 +1445,7 @@ func (s *Server) PreStart(ctx context.Context) error {

// Start measuring the Go scheduler latency.
if err := schedulerlatency.StartSampler(
ctx, s.st, s.stopper, s.registry, base.DefaultMetricsSampleInterval,
workersCtx, s.st, s.stopper, s.registry, base.DefaultMetricsSampleInterval,
); err != nil {
return err
}
Expand Down Expand Up @@ -1482,7 +1482,7 @@ func (s *Server) PreStart(ctx context.Context) error {
advHTTPAddrU := util.NewUnresolvedAddr("tcp", s.cfg.HTTPAdvertiseAddr)

if err := s.node.start(
ctx,
ctx, workersCtx,
advAddrU,
advSQLAddrU,
advHTTPAddrU,
Expand All @@ -1500,7 +1500,7 @@ func (s *Server) PreStart(ctx context.Context) error {
if err := s.startPersistingHLCUpperBound(ctx, hlcUpperBoundExists); err != nil {
return err
}
s.replicationReporter.Start(ctx, s.stopper)
s.replicationReporter.Start(workersCtx, s.stopper)

// Configure the Sentry reporter to add some additional context to reports.
sentry.ConfigureScope(func(scope *sentry.Scope) {
Expand All @@ -1524,7 +1524,7 @@ func (s *Server) PreStart(ctx context.Context) error {
)

// Begin recording runtime statistics.
if err := startSampleEnvironment(s.AnnotateCtx(ctx),
if err := startSampleEnvironment(workersCtx,
s.ClusterSettings(),
s.stopper,
s.cfg.GoroutineDumpDirName,
Expand All @@ -1551,7 +1551,7 @@ func (s *Server) PreStart(ctx context.Context) error {
// traffic may access it).
//
// See https://github.com/cockroachdb/cockroach/issues/73897.
if err := s.protectedtsProvider.Start(ctx, s.stopper); err != nil {
if err := s.protectedtsProvider.Start(workersCtx, s.stopper); err != nil {
// TODO(knz,arul): This mechanism could probably be removed now.
// The PTS Cache is a thing from the past when secondary tenants
// couldn’t use protected timestamps. We started using span configs
Expand Down Expand Up @@ -1586,7 +1586,7 @@ func (s *Server) PreStart(ctx context.Context) error {

// Once all stores are initialized, check if offline storage recovery
// was done prior to start and record any actions appropriately.
logPendingLossOfQuorumRecoveryEvents(ctx, s.node.stores)
logPendingLossOfQuorumRecoveryEvents(workersCtx, s.node.stores)

// Report server listen addresses to logs.
log.Ops.Infof(ctx, "starting %s server at %s (use: %s)",
Expand All @@ -1605,7 +1605,7 @@ func (s *Server) PreStart(ctx context.Context) error {
// Begin the node liveness heartbeat. Add a callback which records the local
// store "last up" timestamp for every store whenever the liveness record is
// updated.
s.nodeLiveness.Start(ctx, liveness.NodeLivenessStartOptions{
s.nodeLiveness.Start(workersCtx, liveness.NodeLivenessStartOptions{
Engines: s.engines,
OnSelfLive: func(ctx context.Context) {
now := s.clock.Now()
Expand All @@ -1624,7 +1624,7 @@ func (s *Server) PreStart(ctx context.Context) error {

if !s.cfg.SpanConfigsDisabled && s.spanConfigSubscriber != nil {
if subscriber, ok := s.spanConfigSubscriber.(*spanconfigkvsubscriber.KVSubscriber); ok {
if err := subscriber.Start(ctx, s.stopper); err != nil {
if err := subscriber.Start(workersCtx, s.stopper); err != nil {
return err
}
}
Expand All @@ -1635,10 +1635,10 @@ func (s *Server) PreStart(ctx context.Context) error {
// to make sure this runs only on one node. SQL is used to actually GC. We
// count it as a KV operation since it grooms cluster-wide data, not
// something associated to SQL tenants.
s.startSystemLogsGC(ctx)
s.startSystemLogsGC(workersCtx)

// Begin an async task to periodically purge old sessions in the system.web_sessions table.
if err = startPurgeOldSessions(ctx, s.authentication); err != nil {
if err = startPurgeOldSessions(workersCtx, s.authentication); err != nil {
return err
}

Expand Down Expand Up @@ -1706,18 +1706,21 @@ func (s *Server) PreStart(ctx context.Context) error {
s.debug.RegisterClosedTimestampSideTransport(s.ctSender, s.node.storeCfg.ClosedTimestampReceiver)

// Start the closed timestamp loop.
s.ctSender.Run(ctx, state.nodeID)
s.ctSender.Run(workersCtx, state.nodeID)

// Attempt to upgrade cluster version now that the sql server has been
// started. At this point we know that all startupmigrations have successfully
// been run so it is safe to upgrade to the binary's current version.
//
// NB: We run this under the startup ctx (not workersCtx) so as to ensure
// all the upgrade steps are traced, for use during troubleshooting.
s.startAttemptUpgrade(ctx)

if err := s.node.tenantSettingsWatcher.Start(ctx, s.sqlServer.execCfg.SystemTableIDResolver); err != nil {
if err := s.node.tenantSettingsWatcher.Start(workersCtx, s.sqlServer.execCfg.SystemTableIDResolver); err != nil {
return errors.Wrap(err, "failed to initialize the tenant settings watcher")
}

if err := s.kvProber.Start(ctx, s.stopper); err != nil {
if err := s.kvProber.Start(workersCtx, s.stopper); err != nil {
return errors.Wrapf(err, "failed to start KV prober")
}

Expand All @@ -1726,7 +1729,7 @@ func (s *Server) PreStart(ctx context.Context) error {
// startup fails, and write to range log once the server is running as we need
// to run sql statements to update rangelog.
publishPendingLossOfQuorumRecoveryEvents(
ctx, s.node.execCfg.InternalExecutor, s.node.stores, s.stopper,
workersCtx, s.node.execCfg.InternalExecutor, s.node.stores, s.stopper,
)

log.Event(ctx, "server initialized")
Expand Down

0 comments on commit 82e2456

Please sign in to comment.