Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: use longer-lived context for purgeOldSessions #90693

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func (n *Node) AnnotateCtxWithSpan(
// to carry HTTP, only if httpAddr is non-null will this node accept
// proxied traffic from other nodes.
func (n *Node) start(
ctx context.Context,
ctx, workersCtx context.Context,
addr, sqlAddr, httpAddr net.Addr,
state initState,
initialStart bool,
Expand Down Expand Up @@ -462,7 +462,7 @@ func (n *Node) start(
// Create stores from the engines that were already initialized.
for _, e := range state.initializedEngines {
s := kvserver.NewStore(ctx, n.storeCfg, e, &n.Descriptor)
if err := s.Start(ctx, n.stopper); err != nil {
if err := s.Start(workersCtx, n.stopper); err != nil {
return errors.Wrap(err, "failed to start store")
}

Expand Down Expand Up @@ -519,7 +519,7 @@ func (n *Node) start(
// [1]: It's important to note that store IDs are allocated via a
// sequence ID generator stored in a system key.
n.additionalStoreInitCh = make(chan struct{})
if err := n.stopper.RunAsyncTask(ctx, "initialize-additional-stores", func(ctx context.Context) {
if err := n.stopper.RunAsyncTask(workersCtx, "initialize-additional-stores", func(ctx context.Context) {
if err := n.initializeAdditionalStores(ctx, state.uninitializedEngines, n.stopper); err != nil {
log.Fatalf(ctx, "while initializing additional stores: %v", err)
}
Expand All @@ -540,7 +540,7 @@ func (n *Node) start(
// with a given cluster version, but not if the server starts with a lower
// one and gets bumped immediately, which would be possible if gossip got
// started earlier).
n.startGossiping(ctx, n.stopper)
n.startGossiping(workersCtx, n.stopper)

allEngines := append([]storage.Engine(nil), state.initializedEngines...)
allEngines = append(allEngines, state.uninitializedEngines...)
Expand Down
50 changes: 30 additions & 20 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1036,9 +1036,12 @@ func (s *Server) Start(ctx context.Context) error {
func (s *Server) PreStart(ctx context.Context) error {
ctx = s.AnnotateCtx(ctx)

// Start a context for the asynchronous network workers.
workersCtx := s.AnnotateCtx(context.Background())

// Start the time sanity checker.
s.startTime = timeutil.Now()
if err := s.startMonitoringForwardClockJumps(ctx); err != nil {
if err := s.startMonitoringForwardClockJumps(workersCtx); err != nil {
return err
}

Expand All @@ -1052,9 +1055,6 @@ func (s *Server) PreStart(ctx context.Context) error {
return err
}

// Start a context for the asynchronous network workers.
workersCtx := s.AnnotateCtx(context.Background())

// connManager tracks incoming connections accepted via listeners
// and automatically closes them when the stopper indicates a
// shutdown.
Expand Down Expand Up @@ -1302,7 +1302,7 @@ func (s *Server) PreStart(ctx context.Context) error {
// incoming connections.
startRPCServer(workersCtx)
onInitServerReady()
state, initialStart, err := initServer.ServeAndWait(ctx, s.stopper, &s.cfg.Settings.SV)
state, initialStart, err := initServer.ServeAndWait(workersCtx, s.stopper, &s.cfg.Settings.SV)
if err != nil {
return errors.Wrap(err, "during init")
}
Expand Down Expand Up @@ -1394,7 +1394,7 @@ func (s *Server) PreStart(ctx context.Context) error {
// cluster. Someone has to gossip the ClusterID before Gossip is connected,
// but this gossip only happens once the first range has a leaseholder, i.e.
// when a quorum of nodes has gone fully operational.
_ = s.stopper.RunAsyncTask(ctx, "connect-gossip", func(ctx context.Context) {
_ = s.stopper.RunAsyncTask(workersCtx, "connect-gossip", func(ctx context.Context) {
log.Ops.Infof(ctx, "connecting to gossip network to verify cluster ID %q", state.clusterID)
select {
case <-s.gossip.Connected:
Expand All @@ -1405,7 +1405,7 @@ func (s *Server) PreStart(ctx context.Context) error {
})

if err := schedulerlatency.StartSampler(
ctx, s.st, s.stopper, s.registry, base.DefaultMetricsSampleInterval,
workersCtx, s.st, s.stopper, s.registry, base.DefaultMetricsSampleInterval,
); err != nil {
return err
}
Expand Down Expand Up @@ -1441,7 +1441,7 @@ func (s *Server) PreStart(ctx context.Context) error {
advHTTPAddrU := util.NewUnresolvedAddr("tcp", s.cfg.HTTPAdvertiseAddr)

if err := s.node.start(
ctx,
ctx, workersCtx,
advAddrU,
advSQLAddrU,
advHTTPAddrU,
Expand All @@ -1459,7 +1459,7 @@ func (s *Server) PreStart(ctx context.Context) error {
if err := s.startPersistingHLCUpperBound(ctx, hlcUpperBoundExists); err != nil {
return err
}
s.replicationReporter.Start(ctx, s.stopper)
s.replicationReporter.Start(workersCtx, s.stopper)

sentry.ConfigureScope(func(scope *sentry.Scope) {
scope.SetTags(map[string]string{
Expand All @@ -1482,7 +1482,7 @@ func (s *Server) PreStart(ctx context.Context) error {
)

// Begin recording runtime statistics.
if err := startSampleEnvironment(s.AnnotateCtx(ctx),
if err := startSampleEnvironment(workersCtx,
s.ClusterSettings(),
s.stopper,
s.cfg.GoroutineDumpDirName,
Expand All @@ -1508,7 +1508,13 @@ func (s *Server) PreStart(ctx context.Context) error {
// traffic may access it).
//
// See https://github.com/cockroachdb/cockroach/issues/73897.
if err := s.protectedtsProvider.Start(ctx, s.stopper); err != nil {
if err := s.protectedtsProvider.Start(workersCtx, s.stopper); err != nil {
// TODO(knz,arul): This mechanism could probably be removed now.
// The PTS Cache is a thing from the past when secondary tenants
// couldn’t use protected timestamps. We started using span configs
// (in both the system and secondary tenants) to store PTS
// information in 22.1, at which point the PTS cache was only kept
// around to migrate between the old and new subsystems.
return err
}

Expand Down Expand Up @@ -1537,7 +1543,7 @@ func (s *Server) PreStart(ctx context.Context) error {

// Once all stores are initialized, check if offline storage recovery
// was done prior to start and record any actions appropriately.
logPendingLossOfQuorumRecoveryEvents(ctx, s.node.stores)
logPendingLossOfQuorumRecoveryEvents(workersCtx, s.node.stores)

log.Ops.Infof(ctx, "starting %s server at %s (use: %s)",
redact.Safe(s.cfg.HTTPRequestScheme()), log.SafeManaged(s.cfg.HTTPAddr), log.SafeManaged(s.cfg.HTTPAdvertiseAddr))
Expand All @@ -1555,7 +1561,7 @@ func (s *Server) PreStart(ctx context.Context) error {
// Begin the node liveness heartbeat. Add a callback which records the local
// store "last up" timestamp for every store whenever the liveness record is
// updated.
s.nodeLiveness.Start(ctx, liveness.NodeLivenessStartOptions{
s.nodeLiveness.Start(workersCtx, liveness.NodeLivenessStartOptions{
Engines: s.engines,
OnSelfLive: func(ctx context.Context) {
now := s.clock.Now()
Expand All @@ -1574,7 +1580,7 @@ func (s *Server) PreStart(ctx context.Context) error {

if !s.cfg.SpanConfigsDisabled && s.spanConfigSubscriber != nil {
if subscriber, ok := s.spanConfigSubscriber.(*spanconfigkvsubscriber.KVSubscriber); ok {
if err := subscriber.Start(ctx, s.stopper); err != nil {
if err := subscriber.Start(workersCtx, s.stopper); err != nil {
return err
}
}
Expand All @@ -1585,10 +1591,10 @@ func (s *Server) PreStart(ctx context.Context) error {
// to make sure this runs only on one node. SQL is used to actually GC. We
// count it as a KV operation since it grooms cluster-wide data, not
// something associated to SQL tenants.
s.startSystemLogsGC(ctx)
s.startSystemLogsGC(workersCtx)

// Begin an async task to periodically purge old sessions in the system.web_sessions table.
if err = startPurgeOldSessions(ctx, s.authentication); err != nil {
if err = startPurgeOldSessions(workersCtx, s.authentication); err != nil {
return err
}

Expand Down Expand Up @@ -1647,18 +1653,22 @@ func (s *Server) PreStart(ctx context.Context) error {
}
s.debug.RegisterClosedTimestampSideTransport(s.ctSender, s.node.storeCfg.ClosedTimestampReceiver)

s.ctSender.Run(ctx, state.nodeID)
// Start the closed timestamp loop.
s.ctSender.Run(workersCtx, state.nodeID)

// Attempt to upgrade cluster version now that the sql server has been
// started. At this point we know that all startupmigrations have successfully
// been run so it is safe to upgrade to the binary's current version.
//
// NB: We run this under the startup ctx (not workersCtx) so as to ensure
// all the upgrade steps are traced, for use during troubleshooting.
s.startAttemptUpgrade(ctx)

if err := s.node.tenantSettingsWatcher.Start(ctx, s.sqlServer.execCfg.SystemTableIDResolver); err != nil {
if err := s.node.tenantSettingsWatcher.Start(workersCtx, s.sqlServer.execCfg.SystemTableIDResolver); err != nil {
return errors.Wrap(err, "failed to initialize the tenant settings watcher")
}

if err := s.kvProber.Start(ctx, s.stopper); err != nil {
if err := s.kvProber.Start(workersCtx, s.stopper); err != nil {
return errors.Wrapf(err, "failed to start KV prober")
}

Expand All @@ -1667,7 +1677,7 @@ func (s *Server) PreStart(ctx context.Context) error {
// startup fails, and write to range log once the server is running as we need
// to run sql statements to update rangelog.
publishPendingLossOfQuorumRecoveryEvents(
ctx, s.node.execCfg.InternalExecutor, s.node.stores, s.stopper,
workersCtx, s.node.execCfg.InternalExecutor, s.node.stores, s.stopper,
)

log.Event(ctx, "server initialized")
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func startTenantInternal(
httpServer.handleHealth(gwMux)

// Begin an async task to periodically purge old sessions in the system.web_sessions table.
if err := startPurgeOldSessions(ctx, authServer); err != nil {
if err := startPurgeOldSessions(background, authServer); err != nil {
return nil, nil, nil, "", "", err
}

Expand Down