diff --git a/pkg/server/server.go b/pkg/server/server.go index b2eadf8349e9..3646a00a6dbd 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -743,6 +743,9 @@ func NewServer(cfg Config, stopper *stop.Stopper) (*Server, error) { lateBoundServer := &Server{} + // The following initialization is mirrored in NewTenantServer(). + // Please keep them in sync. + // Instantiate the API privilege checker. // // TODO(tbg): give adminServer only what it needs (and avoid circular deps). diff --git a/pkg/server/tenant.go b/pkg/server/tenant.go index 718163d74d64..cb08cddeee27 100644 --- a/pkg/server/tenant.go +++ b/pkg/server/tenant.go @@ -154,64 +154,111 @@ func NewTenantServer( if err != nil { return nil, err } + + // The following initialization mirrors that of NewServer(). + // Please keep them in sync. + // Instantiate the API privilege checker. + // + // TODO(tbg): give adminServer only what it needs (and avoid circular deps). + adminAuthzCheck := &adminPrivilegeChecker{ + ie: args.circularInternalExecutor, + st: args.Settings, + makePlanner: nil, + } + + // Instantiate the admin API server. + sAdmin := newTenantAdminServer(baseCfg.AmbientCtx) + + // Instantiate the HTTP server. + // These callbacks help us avoid a dependency on gossip in httpServer. + parseNodeIDFn := func(s string) (roachpb.NodeID, bool, error) { + return roachpb.NodeID(0), false, errors.New("tenants cannot proxy to KV Nodes") + } + getNodeIDHTTPAddressFn := func(id roachpb.NodeID) (*util.UnresolvedAddr, error) { + return nil, errors.New("tenants cannot proxy to KV Nodes") + } + sHTTP := newHTTPServer(baseCfg, args.rpcContext, parseNodeIDFn, getNodeIDHTTPAddressFn) + + // This is where we would be instantiating the SQL session registry + // in NewServer(). + // This is currently performed in makeTenantSQLServerArgs(). + + // Instantiate the cache of closed SQL sessions. closedSessionCache := sql.NewClosedSessionCache( baseCfg.Settings, args.monitorAndMetrics.rootSQLMemoryMonitor, time.Now) args.closedSessionCache = closedSessionCache - // Add the server tags to the startup context. - // - // We use args.BaseConfig here instead of baseCfg directly because - // makeTenantSQLArgs defines its own AmbientCtx instance and it's - // defined by-value. - ctx = args.BaseConfig.AmbientCtx.AnnotateCtx(ctx) - + // Instantiate the status API server. // The tenantStatusServer needs access to the sqlServer, // but we also need the same object to set up the sqlServer. // So construct the tenant status server with a nil sqlServer, // and then assign it once an SQL server gets created. We are // going to assume that the tenant status server won't require - // the SQL server object. + // the SQL server object until later. sStatus := newTenantStatusServer( - baseCfg.AmbientCtx, nil, - args.sessionRegistry, args.closedSessionCache, args.flowScheduler, baseCfg.Settings, nil, - args.rpcContext, args.stopper, + baseCfg.AmbientCtx, + adminAuthzCheck, + args.sessionRegistry, + args.closedSessionCache, + args.flowScheduler, + baseCfg.Settings, + args.rpcContext, + args.stopper, ) - args.sqlStatusServer = sStatus + // Connect the admin server to the status service. + // + // TODO(knz): This would not be necessary if we could use the + // adminServer directly. + sAdmin.status = sStatus + + // This is the location in NewServer() where we would be configuring + // the path to the special file that blocks background jobs. + // This should probably done here. + // See: https://github.com/cockroachdb/cockroach/issues/90524 + + // This is the location in NewServer() where we would be creating + // the eventsServer. This is currently performed in + // makeTenantSQLServerArgs(). + + // Instantiate the SQL server proper. sqlServer, err := newSQLServer(ctx, args) if err != nil { return nil, err } - adminAuthzCheck := &adminPrivilegeChecker{ - ie: sqlServer.execCfg.InternalExecutor, - st: args.Settings, - makePlanner: func(opName string) (interface{}, func()) { - txn := args.db.NewTxn(ctx, "check-system-privilege") - return sql.NewInternalPlanner( - opName, - txn, - username.RootUserName(), - &sql.MemoryMetrics{}, - sqlServer.execCfg, - sessiondatapb.SessionData{}, - ) - }, - } - sStatus.privilegeChecker = adminAuthzCheck - sStatus.sqlServer = sqlServer - - drainServer := newDrainServer(baseCfg, args.stopper, args.grpc, sqlServer) - sAdmin := newTenantAdminServer(baseCfg.AmbientCtx, sqlServer, sStatus, drainServer) + // Tell the authz server how to connect to SQL. + adminAuthzCheck.makePlanner = func(opName string) (interface{}, func()) { + // This is a hack to get around a Go package dependency cycle. See comment + // in sql/jobs/registry.go on planHookMaker. + txn := args.db.NewTxn(ctx, "check-system-privilege") + return sql.NewInternalPlanner( + opName, + txn, + username.RootUserName(), + &sql.MemoryMetrics{}, + sqlServer.execCfg, + sessiondatapb.SessionData{}, + ) + } + // Create the authentication RPC server (login/logout). sAuth := newAuthenticationServer(baseCfg.Config, sqlServer) - // Register and start gRPC service on pod. This is separate from the - // gRPC + Gateway services configured below. + // Connect the various servers to RPC. for _, gw := range []grpcGatewayServer{sAdmin, sStatus, sAuth} { - gw.RegisterService(args.grpcServer) + gw.RegisterService(args.grpc.Server) } + // Tell the status/admin servers how to access SQL structures. + // + // TODO(knz): If/when we want to support statement diagnostic requests + // in secondary tenants, this is where we would call setStmtDiagnosticsRequester(), + // like in NewServer(). + sStatus.baseStatusServer.sqlServer = sqlServer + sAdmin.sqlServer = sqlServer + + // Create the debug API server. debugServer := debug.NewServer( baseCfg.AmbientCtx, args.Settings, @@ -219,15 +266,15 @@ func NewTenantServer( sqlServer.execCfg.SQLStatusServer, ) - parseNodeIDFn := func(s string) (roachpb.NodeID, bool, error) { - return roachpb.NodeID(0), false, errors.New("tenants cannot proxy to KV Nodes") - } - getNodeIDHTTPAddressFn := func(id roachpb.NodeID) (*util.UnresolvedAddr, error) { - return nil, errors.New("tenants cannot proxy to KV Nodes") - } - sHTTP := newHTTPServer(baseCfg, args.rpcContext, parseNodeIDFn, getNodeIDHTTPAddressFn) + // Create a drain server. + drainServer := newDrainServer(baseCfg, args.stopper, args.grpc, sqlServer) + // Connect the admin server to the drain service. + // + // TODO(knz): This would not be necessary if we could use the + // adminServer directly. + sAdmin.drain = drainServer - sw := &SQLServerWrapper{ + return &SQLServerWrapper{ clock: args.clock, rpcContext: args.rpcContext, @@ -254,8 +301,7 @@ func NewTenantServer( externalStorageBuilder: args.externalStorageBuilder, costController: args.costController, - } - return sw, nil + }, nil } // PreStart starts the server on the specified port(s) and @@ -574,6 +620,8 @@ func (s *SQLServerWrapper) PreStart(ctx context.Context) error { } nextLiveInstanceIDFn := makeNextLiveInstanceIDFn(s.sqlServer.sqlInstanceProvider, instanceID) + + // Start the cost controller for this secondary tenant. if err := s.costController.Start( workersCtx, s.stopper, instanceID, s.sqlServer.sqlLivenessSessionID, externalUsageFn, nextLiveInstanceIDFn, diff --git a/pkg/server/tenant_admin.go b/pkg/server/tenant_admin.go index bce375ecbfee..b9d44b769df5 100644 --- a/pkg/server/tenant_admin.go +++ b/pkg/server/tenant_admin.go @@ -59,17 +59,9 @@ func (t *tenantAdminServer) RegisterGateway( var _ grpcGatewayServer = &tenantAdminServer{} -func newTenantAdminServer( - ambientCtx log.AmbientContext, - sqlServer *SQLServer, - status *tenantStatusServer, - drain *drainServer, -) *tenantAdminServer { +func newTenantAdminServer(ambientCtx log.AmbientContext) *tenantAdminServer { return &tenantAdminServer{ AmbientContext: ambientCtx, - sqlServer: sqlServer, - drain: drain, - status: status, } } diff --git a/pkg/server/tenant_status.go b/pkg/server/tenant_status.go index 29c2fb10d176..9cb048425f7b 100644 --- a/pkg/server/tenant_status.go +++ b/pkg/server/tenant_status.go @@ -87,7 +87,6 @@ func newTenantStatusServer( closedSessionCache *sql.ClosedSessionCache, flowScheduler *flowinfra.FlowScheduler, st *cluster.Settings, - sqlServer *SQLServer, rpcCtx *rpc.Context, stopper *stop.Stopper, ) *tenantStatusServer { @@ -100,7 +99,6 @@ func newTenantStatusServer( closedSessionCache: closedSessionCache, flowScheduler: flowScheduler, st: st, - sqlServer: sqlServer, rpcCtx: rpcCtx, stopper: stopper, },