Skip to content

Commit

Permalink
cli/start: lift the server init/startup in a separate function
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
knz committed Oct 28, 2022
1 parent e65c1e4 commit 03cac30
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 32 deletions.
7 changes: 5 additions & 2 deletions pkg/cli/mt_start_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/pebble/vfs"
"github.com/cockroachdb/redact"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -223,9 +224,11 @@ func runStartSQL(cmd *cobra.Command, args []string) error {
// its network sockets, but perhaps before it has done bootstrapping
serverCfg.ReadyFn = func(_ bool) { reportReadinessExternally(ctx, cmd, false /* waitForInit */) }

const serverType redact.SafeString = "SQL server"

// Beyond this point, the configuration is set and the server is
// ready to start.
log.Ops.Info(ctx, "starting cockroach SQL node")
log.Ops.Infof(ctx, "starting cockroach %s", serverType)

tenantServer, err := server.NewTenantServer(
ctx,
Expand Down Expand Up @@ -255,7 +258,7 @@ func runStartSQL(cmd *cobra.Command, args []string) error {

// Report the server identifiers and other server details
// in the same format as 'cockroach start'.
if err := reportServerInfo(ctx, tBegin, &serverCfg, st, false /* isHostNode */, false /* initialStart */, tenantClusterID); err != nil {
if err := reportServerInfo(ctx, tBegin, &serverCfg, st, serverType, false /* initialStart */, tenantClusterID); err != nil {
return err
}

Expand Down
98 changes: 68 additions & 30 deletions pkg/cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,22 +552,71 @@ If problems persist, please see %s.`

initGEOS(ctx)

const serverType redact.SafeString = "node"
// Beyond this point, the configuration is set and the server is
// ready to start.
log.Ops.Info(ctx, "starting cockroach node")

// Run the rest of the startup process in a goroutine separate from
// the main goroutine to avoid preventing proper handling of signals
// if we get stuck on something during initialization (#10138).
var serverStatusMu serverStatus
var s serverStartupInterface
shutdownReqC := make(chan server.ShutdownRequest, 1)

newServerFn := func(_ context.Context, serverCfg server.Config, stopper *stop.Stopper) (serverStartupInterface, error) {
return server.NewServer(serverCfg, stopper)
}

const isStorageClusterNode = true
maybeRunInitialSQL := func(ctx context.Context, s serverStartupInterface) error {
// Run SQL for new clusters.
//
// TODO(knz): If/when we want auto-creation of an initial admin user,
// this can be achieved here.
return runInitialSQL(ctx, s.(*server.Server), startSingleNode, "" /* adminUser */, "" /* adminPassword */)
}

srvStatus, serverShutdownReqC := createAndStartServerAsync(ctx,
tBegin, &serverCfg, stopper, startupSpan, newServerFn, maybeRunInitialSQL, serverType)

return waitForShutdown(
// NB: we delay the access to s, as it is assigned
// asynchronously in a goroutine above.
stopper, serverShutdownReqC, signalCh,
srvStatus)
}

// createAndStartServerAsync starts an async goroutine which instantiates
// the server and starts it.
// We run it in a separate goroutine because the instantiation&start
// could block, and we want to retain the option to start shutting down
// the process (e.g. via Ctrl+C on the terminal) even in that case.
// The shutdown logic thus starts running asynchronously, via waitForShutdown,
// concurrently with createAndStartServerAsync.
//
// The arguments are as follows:
// - tBegin: time when startup began; used to report statistics at the end of startup.
// - serverCfg: the server configuration.
// - stopper: the stopper used to start all the async tasks. This is the stopper
// used by the shutdown logic.
// - startupSpan: the tracing span for the context that was started earlier
// during startup. It needs to be finalized when the async goroutine completes.
// - newServerFn: a constructor function for the server object.
// - maybeRunInitialSQL: a callback that will be called after the server has
// initialized, but before it starts accepting clients.
// - serverType: a title used for the type of server. This is used
// when reporting the startup messages on the terminal & logs.
func createAndStartServerAsync(
ctx context.Context,
tBegin time.Time,
serverCfg *server.Config,
stopper *stop.Stopper,
startupSpan *tracing.Span,
newServerFn newServerFn,
maybeRunInitialSQL func(context.Context, serverStartupInterface) error,
serverType redact.SafeString,
) (srvStatus *serverStatus, serverShutdownReqC <-chan server.ShutdownRequest) {
var serverStatusMu serverStatus
var s serverStartupInterface
shutdownReqC := make(chan server.ShutdownRequest, 1)

log.Ops.Infof(ctx, "starting cockroach %s", serverType)

go func() {
// Ensure that the log files see the startup messages immediately.
Expand All @@ -592,7 +641,7 @@ If problems persist, please see %s.`
if err := func() error {
// Instantiate the server.
var err error
s, err = newServerFn(ctx, serverCfg, stopper)
s, err = newServerFn(ctx, *serverCfg, stopper)
if err != nil {
return errors.Wrap(err, "failed to start server")
}
Expand Down Expand Up @@ -641,17 +690,8 @@ If problems persist, please see %s.`
}
initialStart := s.InitialStart()

if isStorageClusterNode {
// Run SQL for new clusters.
//
// TODO(knz): If/when we want auto-creation of an initial admin user,
// this can be achieved here.
//
// TODO(knz): It's unfortunate that this conditional is piercing the
// veil of the serverStartupInterface type, but the alternative
// (extracting all the dependencies of runInitialSQL into the interface)
// is objectively worse.
if err := runInitialSQL(ctx, s.(*server.Server), startSingleNode, "" /* adminUser */, "" /* adminPassword */); err != nil {
if maybeRunInitialSQL != nil {
if err := maybeRunInitialSQL(ctx, s); err != nil {
return err
}
}
Expand All @@ -663,8 +703,8 @@ If problems persist, please see %s.`

// Now inform the user that the server is running and tell the
// user about its run-time derived parameters.
return reportServerInfo(ctx, tBegin, &serverCfg, s.ClusterSettings(),
isStorageClusterNode, initialStart, s.LogicalClusterID())
return reportServerInfo(ctx, tBegin, serverCfg, s.ClusterSettings(),
serverType, initialStart, s.LogicalClusterID())
}(); err != nil {
shutdownReqC <- server.MakeShutdownRequest(
server.ShutdownReasonServerStartupError, errors.Wrapf(err, "server startup failed"))
Expand All @@ -681,7 +721,9 @@ If problems persist, please see %s.`
}
}()

return waitForShutdown(stopper, shutdownReqC, signalCh, &serverStatusMu)
serverShutdownReqC = shutdownReqC
srvStatus = &serverStatusMu
return srvStatus, serverShutdownReqC
}

// serverStatus coordinates the async goroutine that starts the server
Expand Down Expand Up @@ -765,7 +807,7 @@ type serverShutdownInterface interface {
func waitForShutdown(
stopper *stop.Stopper,
shutdownC <-chan server.ShutdownRequest,
signalCh chan os.Signal,
signalCh <-chan os.Signal,
serverStatusMu *serverStatus,
) (returnErr error) {
shutdownCtx, shutdownSpan := serverCfg.AmbientCtx.AnnotateCtxWithSpan(context.Background(), "server shutdown")
Expand Down Expand Up @@ -999,17 +1041,13 @@ func reportServerInfo(
startTime time.Time,
serverCfg *server.Config,
st *cluster.Settings,
isHostNode, initialStart bool,
serverType redact.SafeString,
initialStart bool,
tenantClusterID uuid.UUID,
) error {
srvS := redact.SafeString("SQL server")
if isHostNode {
srvS = "node"
}

var buf redact.StringBuilder
info := build.GetInfo()
buf.Printf("CockroachDB %s starting at %s (took %0.1fs)\n", srvS, timeutil.Now(), timeutil.Since(startTime).Seconds())
buf.Printf("CockroachDB %s starting at %s (took %0.1fs)\n", serverType, timeutil.Now(), timeutil.Since(startTime).Seconds())
buf.Printf("build:\t%s %s @ %s (%s)\n",
redact.Safe(info.Distribution), redact.Safe(info.Tag), redact.Safe(info.Time), redact.Safe(info.GoVersion))
buf.Printf("webui:\t%s\n", log.SafeManaged(serverCfg.AdminURL()))
Expand Down Expand Up @@ -1072,7 +1110,7 @@ func reportServerInfo(
buf.Printf("tenant clusterID:\t%s\n", log.SafeManaged(tenantClusterID))
}
nodeID := serverCfg.BaseConfig.IDContainer.Get()
if isHostNode {
if !serverCfg.SQLConfig.TenantID.IsSet() {
if initialStart {
if nodeID == kvserver.FirstNodeID {
buf.Printf("status:\tinitialized new cluster\n")
Expand Down Expand Up @@ -1107,7 +1145,7 @@ func reportServerInfo(
return err
}
msgS := msg.ToString()
log.Ops.Infof(ctx, "%s startup completed:\n%s", srvS, msgS)
log.Ops.Infof(ctx, "%s startup completed:\n%s", serverType, msgS)
if !startCtx.inBackground && !log.LoggingToStderr(severity.INFO) {
fmt.Print(msgS.StripMarkers())
}
Expand Down

0 comments on commit 03cac30

Please sign in to comment.