diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index 1c55b5f67b65..65ef0108bb97 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -79,7 +79,6 @@ func verifyIngestionStats(t *testing.T, streamID int64, cutoverTime time.Time, s func TestTenantStreaming(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 82706) skip.UnderRace(t, "slow under race") diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 6d13540e8a99..7d84998cf61b 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -404,9 +404,17 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.AmbientCtx, cfg.stopper, cfg.clock, cfg.db, codec, cfg.Settings, sqllivenessKnobs, ) - cfg.sqlInstanceProvider = instanceprovider.New( - cfg.stopper, cfg.db, codec, cfg.sqlLivenessProvider, cfg.advertiseAddr, cfg.Locality, cfg.rangeFeedFactory, cfg.clock, - ) + // If the node id is already populated, we only need to create a placeholder + // instance provider without initializing the instance, since this is not a + // SQL pod server. + _, isNotSQLPod := cfg.nodeIDContainer.OptionalNodeID() + if isNotSQLPod { + cfg.sqlInstanceProvider = instanceprovider.NewPlaceholder() + } else { + cfg.sqlInstanceProvider = instanceprovider.New( + cfg.stopper, cfg.db, codec, cfg.sqlLivenessProvider, cfg.advertiseAddr, cfg.Locality, cfg.rangeFeedFactory, cfg.clock, + ) + } if !codec.ForSystemTenant() { // In a multi-tenant environment, use the sqlInstanceProvider to resolve @@ -1108,6 +1116,24 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { }, nil } +func maybeCheckTenantAccess(ctx context.Context, codec keys.SQLCodec, db *kv.DB) error { + if codec.ForSystemTenant() { + // Skip check for system tenant and return early. + return nil + } + // Perform a simple read to the tenant in order to verify that this instance + // will be authorized to access the tenant keyspace. + err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + start := codec.TenantPrefix() + end := start.PrefixEnd() + if _, err := txn.Scan(ctx, start, end, 1); err != nil { + return err + } + return nil + }) + return err +} + // Checks if tenant exists. This function does a very superficial check to see if the system db // has been bootstrapped for the tenant. This is not a complete check and is only sufficient // to be used in the dev environment. @@ -1140,6 +1166,11 @@ func (s *SQLServer) startSQLLivenessAndInstanceProviders(ctx context.Context) er return err } } + // Check that this SQL instance will have access to this tenant's keyspace + // before initializing the instance. Otherwise, initialization may get stuck. + if err := maybeCheckTenantAccess(ctx, s.execCfg.Codec, s.execCfg.DB); err != nil { + return err + } s.sqlLivenessProvider.Start(ctx) // sqlInstanceProvider must always be started after sqlLivenessProvider // as sqlInstanceProvider relies on the session initialized and maintained by @@ -1150,7 +1181,7 @@ func (s *SQLServer) startSQLLivenessAndInstanceProviders(ctx context.Context) er return nil } -func (s *SQLServer) initInstanceID(ctx context.Context) error { +func (s *SQLServer) setInstanceID(ctx context.Context) error { if _, ok := s.sqlIDContainer.OptionalNodeID(); ok { // sqlIDContainer has already been initialized with a node ID, // we don't need to initialize a SQL instance ID in this case @@ -1179,8 +1210,8 @@ func (s *SQLServer) preStart( socketFile string, orphanedLeasesTimeThresholdNanos int64, ) error { - // The sqlliveness and sqlinstance subsystem should be started first to ensure instance ID is - // initialized prior to any other systems that need it. + // The sqlliveness and sqlinstance subsystem should be started first to ensure + // the instance ID is initialized prior to any other systems that need it. if err := s.startSQLLivenessAndInstanceProviders(ctx); err != nil { return err } @@ -1191,7 +1222,7 @@ func (s *SQLServer) preStart( if err := maybeCheckTenantExists(ctx, s.execCfg.Codec, s.execCfg.DB); err != nil { return err } - if err := s.initInstanceID(ctx); err != nil { + if err := s.setInstanceID(ctx); err != nil { return err } s.connManager = connManager diff --git a/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go b/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go index 363573bf6583..8ba4faced5f9 100644 --- a/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go +++ b/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go @@ -36,7 +36,42 @@ type writer interface { ReleaseInstanceID(ctx context.Context, instanceID base.SQLInstanceID) error } -// provider implements the sqlinstance.Provider interface for access to the sqlinstance subsystem. +// placeholderProvider implements the sqlinstance.Provider interface as a +// placeholder for an instance provider, when an instance provider must be +// instantiated for a non-SQL instance. It starts a Reader to provide the +// AddressResolver interface, but otherwise throws unsupported errors. +type placeholderProvider struct { + *instancestorage.Reader +} + +func NewPlaceholder() sqlinstance.Provider { + return &placeholderProvider{} +} + +// Start implements the sqlinstance.Provider interface. +func (p *placeholderProvider) Start(ctx context.Context) error { + return nil +} + +// Instance implements the sqlinstance.Provider interface. +func (p *placeholderProvider) Instance( + ctx context.Context, +) (_ base.SQLInstanceID, _ sqlliveness.SessionID, err error) { + return base.SQLInstanceID(0), "", sqlinstance.NotASQLInstanceError +} + +// GetInstance implements the AddressResolver interface. +func (p *placeholderProvider) GetInstance(context.Context, base.SQLInstanceID) (sqlinstance.InstanceInfo, error) { + return sqlinstance.InstanceInfo{}, sqlinstance.NotASQLInstanceError +} + +// GetAllInstances implements the AddressResolver interface. +func (p *placeholderProvider) GetAllInstances(context.Context) ([]sqlinstance.InstanceInfo, error) { + return nil, sqlinstance.NotASQLInstanceError +} + +// provider implements the sqlinstance.Provider interface for access to the +// sqlinstance subsystem. type provider struct { *instancestorage.Reader storage writer @@ -85,6 +120,12 @@ func (p *provider) Start(ctx context.Context) error { if p.started() { return p.initError } + // Initialize the instance. We need to do this before starting the reader, so + // that the reader sees the instance. + if err := p.initAndWait(ctx); err != nil { + return err + } + if err := p.Reader.Start(ctx); err != nil { p.initOnce.Do(func() { p.initError = err @@ -110,21 +151,31 @@ func (p *provider) Instance( if !p.started() { return base.SQLInstanceID(0), "", sqlinstance.NotStartedError } - - p.maybeInitialize() select { case <-ctx.Done(): return base.SQLInstanceID(0), "", ctx.Err() case <-p.stopper.ShouldQuiesce(): return base.SQLInstanceID(0), "", stop.ErrUnavailable + case <-p.initialized: + return p.instanceID, p.sessionID, p.initError + } +} + +func (p *provider) initAndWait(ctx context.Context) error { + p.maybeInitialize() + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.stopper.ShouldQuiesce(): + return stop.ErrUnavailable case <-p.initialized: if p.initError == nil { log.Ops.Infof(ctx, "created SQL instance %d", p.instanceID) } else { log.Ops.Warningf(ctx, "error creating SQL instance: %s", p.initError) } - return p.instanceID, p.sessionID, p.initError } + return p.initError } func (p *provider) maybeInitialize() { diff --git a/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go b/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go index 1ccbcee894b0..15949afe64f7 100644 --- a/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go +++ b/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go @@ -65,6 +65,7 @@ func TestInstanceProvider(t *testing.T) { defer stopper.Stop(ctx) instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, addr) slInstance.Start(ctx) + instanceProvider.InitAndWaitForTest(ctx) instanceID, sessionID, err := instanceProvider.Instance(ctx) require.NoError(t, err) require.Equal(t, expectedInstanceID, instanceID) @@ -101,6 +102,7 @@ func TestInstanceProvider(t *testing.T) { instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, "addr") slInstance.Start(ctx) instanceProvider.ShutdownSQLInstanceForTest(ctx) + instanceProvider.InitAndWaitForTest(ctx) _, _, err := instanceProvider.Instance(ctx) require.Error(t, err) require.Equal(t, "instance never initialized", err.Error()) diff --git a/pkg/sql/sqlinstance/instanceprovider/test_helpers.go b/pkg/sql/sqlinstance/instanceprovider/test_helpers.go index dd83dd00b727..a1eb1c6ea42b 100644 --- a/pkg/sql/sqlinstance/instanceprovider/test_helpers.go +++ b/pkg/sql/sqlinstance/instanceprovider/test_helpers.go @@ -20,9 +20,10 @@ import ( ) // TestInstanceProvider exposes ShutdownSQLInstanceForTest -// method for testing purposes. +// and InitAndWaitForTest methods for testing purposes. type TestInstanceProvider interface { sqlinstance.Provider + InitAndWaitForTest(context.Context) ShutdownSQLInstanceForTest(context.Context) } @@ -43,6 +44,11 @@ func NewTestInstanceProvider( return p } +// InitAndWaitForTest explicitly calls initAndWait for testing purposes. +func (p *provider) InitAndWaitForTest(ctx context.Context) { + _ = p.initAndWait(ctx) +} + // ShutdownSQLInstanceForTest explicitly calls shutdownSQLInstance for testing purposes. func (p *provider) ShutdownSQLInstanceForTest(ctx context.Context) { p.shutdownSQLInstance(ctx) diff --git a/pkg/sql/sqlinstance/sqlinstance.go b/pkg/sql/sqlinstance/sqlinstance.go index 62d726e414b3..607eb20353e4 100644 --- a/pkg/sql/sqlinstance/sqlinstance.go +++ b/pkg/sql/sqlinstance/sqlinstance.go @@ -49,8 +49,8 @@ type Provider interface { // Instance returns the instance ID and sqlliveness.SessionID for the // current SQL instance. Instance(context.Context) (base.SQLInstanceID, sqlliveness.SessionID, error) - // Start starts the instanceprovider. This will block until - // the underlying instance data reader has been started. + // Start starts the instanceprovider and initializes the current SQL instance. + // This will block until the underlying instance data reader has been started. Start(context.Context) error } @@ -59,3 +59,7 @@ var NonExistentInstanceError = errors.Errorf("non existent SQL instance") // NotStartedError can be returned if the sqlinstance subsystem has not been started yet. var NotStartedError = errors.Errorf("sqlinstance subsystem not started") + +// NotASQLInstanceError can be returned if a function is is not supported for +// non-SQL instances. +var NotASQLInstanceError = errors.Errorf("not supported for non-SQL instance")