diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index f36112d5fcbc..712b53b76e31 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -80,7 +80,6 @@ func verifyIngestionStats(t *testing.T, streamID int64, cutoverTime time.Time, s func TestTenantStreaming(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 82706) skip.UnderRace(t, "slow under race") diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index d217411d748f..439ea8cf2edf 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -405,8 +405,12 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.AmbientCtx, cfg.stopper, cfg.clock, cfg.db, codec, cfg.Settings, sqllivenessKnobs, ) + // If the node id is already populated, we only need to create a placeholder + // instance provider without initializing the instance, since this is not a + // SQL pod server. + _, isNotSQLPod := cfg.nodeIDContainer.OptionalNodeID() cfg.sqlInstanceProvider = instanceprovider.New( - cfg.stopper, cfg.db, codec, cfg.sqlLivenessProvider, cfg.advertiseAddr, cfg.rangeFeedFactory, cfg.clock, + cfg.stopper, cfg.db, codec, cfg.sqlLivenessProvider, cfg.advertiseAddr, cfg.rangeFeedFactory, cfg.clock, isNotSQLPod, ) if !codec.ForSystemTenant() { @@ -1156,7 +1160,7 @@ func (s *SQLServer) startSQLLivenessAndInstanceProviders(ctx context.Context) er return nil } -func (s *SQLServer) initInstanceID(ctx context.Context) error { +func (s *SQLServer) setInstanceID(ctx context.Context) error { if _, ok := s.sqlIDContainer.OptionalNodeID(); ok { // sqlIDContainer has already been initialized with a node ID, // we don't need to initialize a SQL instance ID in this case @@ -1185,8 +1189,8 @@ func (s *SQLServer) preStart( socketFile string, orphanedLeasesTimeThresholdNanos int64, ) error { - // The sqlliveness and sqlinstance subsystem should be started first to ensure instance ID is - // initialized prior to any other systems that need it. + // The sqlliveness and sqlinstance subsystem should be started first to ensure + // the instance ID is initialized prior to any other systems that need it. if err := s.startSQLLivenessAndInstanceProviders(ctx); err != nil { return err } @@ -1197,7 +1201,7 @@ func (s *SQLServer) preStart( if err := maybeCheckTenantExists(ctx, s.execCfg.Codec, s.execCfg.DB); err != nil { return err } - if err := s.initInstanceID(ctx); err != nil { + if err := s.setInstanceID(ctx); err != nil { return err } s.connManager = connManager diff --git a/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go b/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go index c9abb511f57c..8ec201fbe416 100644 --- a/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go +++ b/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go @@ -47,6 +47,7 @@ type provider struct { instanceID base.SQLInstanceID sessionID sqlliveness.SessionID initError error + isSkeleton bool mu struct { syncutil.Mutex started bool @@ -62,6 +63,7 @@ func New( addr string, f *rangefeed.Factory, clock *hlc.Clock, + isSkeleton bool, ) sqlinstance.Provider { storage := instancestorage.NewStorage(db, codec, slProvider) reader := instancestorage.NewReader(storage, slProvider.CachedReader(), f, codec, clock, stopper) @@ -72,6 +74,7 @@ func New( session: slProvider, instanceAddr: addr, initialized: make(chan struct{}), + isSkeleton: isSkeleton, } return p } @@ -81,6 +84,12 @@ func (p *provider) Start(ctx context.Context) error { if p.started() { return p.initError } + // Initialize the instance. We need to do this before starting the reader, so + // that the reader sees the instance. + if err := p.initAndWait(ctx); err != nil { + return err + } + if err := p.Reader.Start(ctx); err != nil { p.initOnce.Do(func() { p.initError = err @@ -106,21 +115,35 @@ func (p *provider) Instance( if !p.started() { return base.SQLInstanceID(0), "", sqlinstance.NotStartedError } - - p.maybeInitialize() select { case <-ctx.Done(): return base.SQLInstanceID(0), "", ctx.Err() case <-p.stopper.ShouldQuiesce(): return base.SQLInstanceID(0), "", stop.ErrUnavailable + case <-p.initialized: + return p.instanceID, p.sessionID, p.initError + } +} + +func (p *provider) initAndWait(ctx context.Context) error { + if p.isSkeleton { + // Don't initialize the instance if the provider is a placeholder. + return nil + } + p.maybeInitialize() + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.stopper.ShouldQuiesce(): + return stop.ErrUnavailable case <-p.initialized: if p.initError == nil { log.Ops.Infof(ctx, "created SQL instance %d", p.instanceID) } else { log.Ops.Warningf(ctx, "error creating SQL instance: %s", p.initError) } - return p.instanceID, p.sessionID, p.initError } + return p.initError } func (p *provider) maybeInitialize() { diff --git a/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go b/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go index 1ccbcee894b0..15949afe64f7 100644 --- a/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go +++ b/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go @@ -65,6 +65,7 @@ func TestInstanceProvider(t *testing.T) { defer stopper.Stop(ctx) instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, addr) slInstance.Start(ctx) + instanceProvider.InitAndWaitForTest(ctx) instanceID, sessionID, err := instanceProvider.Instance(ctx) require.NoError(t, err) require.Equal(t, expectedInstanceID, instanceID) @@ -101,6 +102,7 @@ func TestInstanceProvider(t *testing.T) { instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, "addr") slInstance.Start(ctx) instanceProvider.ShutdownSQLInstanceForTest(ctx) + instanceProvider.InitAndWaitForTest(ctx) _, _, err := instanceProvider.Instance(ctx) require.Error(t, err) require.Equal(t, "instance never initialized", err.Error()) diff --git a/pkg/sql/sqlinstance/instanceprovider/test_helpers.go b/pkg/sql/sqlinstance/instanceprovider/test_helpers.go index dd83dd00b727..a77e69ffb6c9 100644 --- a/pkg/sql/sqlinstance/instanceprovider/test_helpers.go +++ b/pkg/sql/sqlinstance/instanceprovider/test_helpers.go @@ -20,9 +20,10 @@ import ( ) // TestInstanceProvider exposes ShutdownSQLInstanceForTest -// method for testing purposes. +// and InitAndWaitForTest methods for testing purposes. type TestInstanceProvider interface { sqlinstance.Provider + InitAndWaitForTest(context.Context) ShutdownSQLInstanceForTest(context.Context) } @@ -43,6 +44,14 @@ func NewTestInstanceProvider( return p } +// InitAndWaitForTest explicitly calls initAndWait for testing purposes. +func (p *provider) InitAndWaitForTest(ctx context.Context) { + if err := p.initAndWait(ctx); err != nil { + //nolint:returnerrcheck + return + } +} + // ShutdownSQLInstanceForTest explicitly calls shutdownSQLInstance for testing purposes. func (p *provider) ShutdownSQLInstanceForTest(ctx context.Context) { p.shutdownSQLInstance(ctx) diff --git a/pkg/sql/sqlinstance/sqlinstance.go b/pkg/sql/sqlinstance/sqlinstance.go index 11247e2718dc..e87bef394aca 100644 --- a/pkg/sql/sqlinstance/sqlinstance.go +++ b/pkg/sql/sqlinstance/sqlinstance.go @@ -47,8 +47,8 @@ type Provider interface { // Instance returns the instance ID and sqlliveness.SessionID for the // current SQL instance. Instance(context.Context) (base.SQLInstanceID, sqlliveness.SessionID, error) - // Start starts the instanceprovider. This will block until - // the underlying instance data reader has been started. + // Start starts the instanceprovider and initializes the current SQL instance. + // This will block until the underlying instance data reader has been started. Start(context.Context) error }