Skip to content

Commit

Permalink
server: split the tenant server creation into 3 stages
Browse files Browse the repository at this point in the history
The original motivation (and ultimate goal) for this commit is to
split the tenant server initialization into three phases: `New()`,
`PreStart()`, `AcceptClients()`, so as to reuse a common process startup
logic in a separate (later) commit.

To achieve this goal, this commit re-orders the initialization steps
in `server.NewTenantServer` (previously known as
`server.StartTenant`), and extracts many of them into a new method
`(*SQLServerWrapper).PreStart()`.

The specific order of the code in `NewTenantServer()` and
`(*SQLServerWrapper).PreStart()` was chosen to mirror the order of
things in `NewServer()` and `(*Server).PreStart()`.

Reasons for using the same order:

- it makes the review of this change easier: the reviewer can pull
  `server.go` and `tenant.go` and read them side-by-side, to satisfy
  themselves that the two implementations of
  `NewServer`/`NewTenantServer` and `PreStart` are equivalent.

- it will make it easier for future maintainers to keep them in sync.

- it helps us discover the fact that both sides share a lot of code.
  This opens an opportunity to merge them to a common implementation
  at a later stage.

While doing this work, care was also taken to discover which steps of
`(*Server).PreStart()` were *missing* from the tenant server
initialization. We found the following:

- the Sentry context enhancement (to report cluster ID, etc)
  was missing. This commit fixes that.

- several log entries that report the server configuration
  to the OPS channel were not emitted. This commit fixes that.

- the Graphite metric reporting was never enabled, even when
  configured. This commit fixes that.

- the Obs Service testing knobs (TestingKnobs.EventExporter) were
  not configured on the ObsServer instance. This commit fixes that.

- the `go.scheduler_latency` metric was not being measured.
  This commit fixes that.

Additionally, two followup issues were filed for the following
missing steps:

- missing support for the special file that blocks background jobs. (cockroachdb#90524)
- missing support for the system.eventlog cleanup loop. (cockroachdb#90521)

Release note: None
  • Loading branch information
knz committed Oct 26, 2022
1 parent 0c54d34 commit 616c822
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 56 deletions.
3 changes: 3 additions & 0 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) {

lateBoundServer := &Server{}

// The following initialization is mirrored in NewTenantServer().
// Please keep them in sync.

// Instantiate the API privilege checker.
//
// TODO(tbg): give adminServer only what it needs (and avoid circular deps).
Expand Down
138 changes: 93 additions & 45 deletions pkg/server/tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,80 +154,127 @@ func NewTenantServer(
if err != nil {
return nil, err
}

// The following initialization mirrors that of NewServer().
// Please keep them in sync.
// Instantiate the API privilege checker.
//
// TODO(tbg): give adminServer only what it needs (and avoid circular deps).
adminAuthzCheck := &adminPrivilegeChecker{
ie: args.circularInternalExecutor,
st: args.Settings,
makePlanner: nil,
}

// Instantiate the admin API server.
sAdmin := newTenantAdminServer(baseCfg.AmbientCtx)

// Instantiate the HTTP server.
// These callbacks help us avoid a dependency on gossip in httpServer.
parseNodeIDFn := func(s string) (roachpb.NodeID, bool, error) {
return roachpb.NodeID(0), false, errors.New("tenants cannot proxy to KV Nodes")
}
getNodeIDHTTPAddressFn := func(id roachpb.NodeID) (*util.UnresolvedAddr, error) {
return nil, errors.New("tenants cannot proxy to KV Nodes")
}
sHTTP := newHTTPServer(baseCfg, args.rpcContext, parseNodeIDFn, getNodeIDHTTPAddressFn)

// This is where we would be instantiating the SQL session registry
// in NewServer().
// This is currently performed in makeTenantSQLServerArgs().

// Instantiate the cache of closed SQL sessions.
closedSessionCache := sql.NewClosedSessionCache(
baseCfg.Settings, args.monitorAndMetrics.rootSQLMemoryMonitor, time.Now)
args.closedSessionCache = closedSessionCache

// Add the server tags to the startup context.
//
// We use args.BaseConfig here instead of baseCfg directly because
// makeTenantSQLArgs defines its own AmbientCtx instance and it's
// defined by-value.
ctx = args.BaseConfig.AmbientCtx.AnnotateCtx(ctx)

// Instantiate the status API server.
// The tenantStatusServer needs access to the sqlServer,
// but we also need the same object to set up the sqlServer.
// So construct the tenant status server with a nil sqlServer,
// and then assign it once an SQL server gets created. We are
// going to assume that the tenant status server won't require
// the SQL server object.
// the SQL server object until later.
sStatus := newTenantStatusServer(
baseCfg.AmbientCtx, nil,
args.sessionRegistry, args.closedSessionCache, args.remoteFlowRunner, baseCfg.Settings, nil,
args.rpcContext, args.stopper,
baseCfg.AmbientCtx,
adminAuthzCheck,
args.sessionRegistry,
args.closedSessionCache,
args.remoteFlowRunner,
baseCfg.Settings,
args.rpcContext,
args.stopper,
)

args.sqlStatusServer = sStatus
// Connect the admin server to the status service.
//
// TODO(knz): This would not be necessary if we could use the
// adminServer directly.
sAdmin.status = sStatus

// This is the location in NewServer() where we would be configuring
// the path to the special file that blocks background jobs.
// This should probably done here.
// See: https://github.com/cockroachdb/cockroach/issues/90524

// This is the location in NewServer() where we would be creating
// the eventsServer. This is currently performed in
// makeTenantSQLServerArgs().

// Instantiate the SQL server proper.
sqlServer, err := newSQLServer(ctx, args)
if err != nil {
return nil, err
}
adminAuthzCheck := &adminPrivilegeChecker{
ie: sqlServer.execCfg.InternalExecutor,
st: args.Settings,
makePlanner: func(opName string) (interface{}, func()) {
txn := args.db.NewTxn(ctx, "check-system-privilege")
return sql.NewInternalPlanner(
opName,
txn,
username.RootUserName(),
&sql.MemoryMetrics{},
sqlServer.execCfg,
sessiondatapb.SessionData{},
)
},
}
sStatus.privilegeChecker = adminAuthzCheck
sStatus.sqlServer = sqlServer

drainServer := newDrainServer(baseCfg, args.stopper, args.grpc, sqlServer)

sAdmin := newTenantAdminServer(baseCfg.AmbientCtx, sqlServer, sStatus, drainServer)
// Tell the authz server how to connect to SQL.
adminAuthzCheck.makePlanner = func(opName string) (interface{}, func()) {
// This is a hack to get around a Go package dependency cycle. See comment
// in sql/jobs/registry.go on planHookMaker.
txn := args.db.NewTxn(ctx, "check-system-privilege")
return sql.NewInternalPlanner(
opName,
txn,
username.RootUserName(),
&sql.MemoryMetrics{},
sqlServer.execCfg,
sessiondatapb.SessionData{},
)
}

// Create the authentication RPC server (login/logout).
sAuth := newAuthenticationServer(baseCfg.Config, sqlServer)

// Register and start gRPC service on pod. This is separate from the
// gRPC + Gateway services configured below.
// Connect the various servers to RPC.
for _, gw := range []grpcGatewayServer{sAdmin, sStatus, sAuth} {
gw.RegisterService(args.grpcServer)
gw.RegisterService(args.grpc.Server)
}

// Tell the status/admin servers how to access SQL structures.
//
// TODO(knz): If/when we want to support statement diagnostic requests
// in secondary tenants, this is where we would call setStmtDiagnosticsRequester(),
// like in NewServer().
sStatus.baseStatusServer.sqlServer = sqlServer
sAdmin.sqlServer = sqlServer

// Create the debug API server.
debugServer := debug.NewServer(
baseCfg.AmbientCtx,
args.Settings,
sqlServer.pgServer.HBADebugFn(),
sqlServer.execCfg.SQLStatusServer,
)

parseNodeIDFn := func(s string) (roachpb.NodeID, bool, error) {
return roachpb.NodeID(0), false, errors.New("tenants cannot proxy to KV Nodes")
}
getNodeIDHTTPAddressFn := func(id roachpb.NodeID) (*util.UnresolvedAddr, error) {
return nil, errors.New("tenants cannot proxy to KV Nodes")
}
sHTTP := newHTTPServer(baseCfg, args.rpcContext, parseNodeIDFn, getNodeIDHTTPAddressFn)
// Create a drain server.
drainServer := newDrainServer(baseCfg, args.stopper, args.grpc, sqlServer)
// Connect the admin server to the drain service.
//
// TODO(knz): This would not be necessary if we could use the
// adminServer directly.
sAdmin.drain = drainServer

sw := &SQLServerWrapper{
return &SQLServerWrapper{
clock: args.clock,
rpcContext: args.rpcContext,

Expand All @@ -254,8 +301,7 @@ func NewTenantServer(

externalStorageBuilder: args.externalStorageBuilder,
costController: args.costController,
}
return sw, nil
}, nil
}

// PreStart starts the server on the specified port(s) and
Expand Down Expand Up @@ -574,6 +620,8 @@ func (s *SQLServerWrapper) PreStart(ctx context.Context) error {
}

nextLiveInstanceIDFn := makeNextLiveInstanceIDFn(s.sqlServer.sqlInstanceProvider, instanceID)

// Start the cost controller for this secondary tenant.
if err := s.costController.Start(
workersCtx, s.stopper, instanceID, s.sqlServer.sqlLivenessSessionID,
externalUsageFn, nextLiveInstanceIDFn,
Expand Down
10 changes: 1 addition & 9 deletions pkg/server/tenant_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,17 +59,9 @@ func (t *tenantAdminServer) RegisterGateway(

var _ grpcGatewayServer = &tenantAdminServer{}

func newTenantAdminServer(
ambientCtx log.AmbientContext,
sqlServer *SQLServer,
status *tenantStatusServer,
drain *drainServer,
) *tenantAdminServer {
func newTenantAdminServer(ambientCtx log.AmbientContext) *tenantAdminServer {
return &tenantAdminServer{
AmbientContext: ambientCtx,
sqlServer: sqlServer,
drain: drain,
status: status,
}
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/server/tenant_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func newTenantStatusServer(
closedSessionCache *sql.ClosedSessionCache,
remoteFlowRunner *flowinfra.RemoteFlowRunner,
st *cluster.Settings,
sqlServer *SQLServer,
rpcCtx *rpc.Context,
stopper *stop.Stopper,
) *tenantStatusServer {
Expand All @@ -100,7 +99,6 @@ func newTenantStatusServer(
closedSessionCache: closedSessionCache,
remoteFlowRunner: remoteFlowRunner,
st: st,
sqlServer: sqlServer,
rpcCtx: rpcCtx,
stopper: stopper,
},
Expand Down

0 comments on commit 616c822

Please sign in to comment.