From 35d13bb35ce300d85f85289f72e07c6bd8d8e694 Mon Sep 17 00:00:00 2001 From: rharding6373 Date: Fri, 24 Jun 2022 14:45:23 -0700 Subject: [PATCH] sql: initialize sql instance during instance provider start Before this change, there was a race condition where the instance provider and the instance reader would start before the instance provider created a SQL instance, potentially causing the reader to not cache the instance before initialization was complete. This is a problem in multi-tenant environments, where we may not be able to plan queries if the reader does not know of any SQL instances. This change moves sql instance initialization into the instance provider's `Start()` function before starting the reader, so that the instance is guaranteed to be visible to the reader on its first rangefeed scan of the `system.sql_instances` table. Fixes: #82706 Fixes: #81807 Fixes: #81567 Release note: None --- .../streamingest/stream_ingestion_job_test.go | 1 - pkg/server/server_sql.go | 14 +++++---- .../instanceprovider/instanceprovider.go | 29 +++++++++++++++++-- .../instanceprovider/instanceprovider_test.go | 2 ++ .../instanceprovider/test_helpers.go | 11 ++++++- pkg/sql/sqlinstance/sqlinstance.go | 4 +-- 6 files changed, 49 insertions(+), 12 deletions(-) diff --git a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go index f36112d5fcbc..712b53b76e31 100644 --- a/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go +++ b/pkg/ccl/streamingccl/streamingest/stream_ingestion_job_test.go @@ -80,7 +80,6 @@ func verifyIngestionStats(t *testing.T, streamID int64, cutoverTime time.Time, s func TestTenantStreaming(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 82706) skip.UnderRace(t, "slow under race") diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index d217411d748f..439ea8cf2edf 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -405,8 +405,12 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.AmbientCtx, cfg.stopper, cfg.clock, cfg.db, codec, cfg.Settings, sqllivenessKnobs, ) + // If the node id is already populated, we only need to create a placeholder + // instance provider without initializing the instance, since this is not a + // SQL pod server. + _, isNotSQLPod := cfg.nodeIDContainer.OptionalNodeID() cfg.sqlInstanceProvider = instanceprovider.New( - cfg.stopper, cfg.db, codec, cfg.sqlLivenessProvider, cfg.advertiseAddr, cfg.rangeFeedFactory, cfg.clock, + cfg.stopper, cfg.db, codec, cfg.sqlLivenessProvider, cfg.advertiseAddr, cfg.rangeFeedFactory, cfg.clock, isNotSQLPod, ) if !codec.ForSystemTenant() { @@ -1156,7 +1160,7 @@ func (s *SQLServer) startSQLLivenessAndInstanceProviders(ctx context.Context) er return nil } -func (s *SQLServer) initInstanceID(ctx context.Context) error { +func (s *SQLServer) setInstanceID(ctx context.Context) error { if _, ok := s.sqlIDContainer.OptionalNodeID(); ok { // sqlIDContainer has already been initialized with a node ID, // we don't need to initialize a SQL instance ID in this case @@ -1185,8 +1189,8 @@ func (s *SQLServer) preStart( socketFile string, orphanedLeasesTimeThresholdNanos int64, ) error { - // The sqlliveness and sqlinstance subsystem should be started first to ensure instance ID is - // initialized prior to any other systems that need it. + // The sqlliveness and sqlinstance subsystem should be started first to ensure + // the instance ID is initialized prior to any other systems that need it. if err := s.startSQLLivenessAndInstanceProviders(ctx); err != nil { return err } @@ -1197,7 +1201,7 @@ func (s *SQLServer) preStart( if err := maybeCheckTenantExists(ctx, s.execCfg.Codec, s.execCfg.DB); err != nil { return err } - if err := s.initInstanceID(ctx); err != nil { + if err := s.setInstanceID(ctx); err != nil { return err } s.connManager = connManager diff --git a/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go b/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go index c9abb511f57c..8ec201fbe416 100644 --- a/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go +++ b/pkg/sql/sqlinstance/instanceprovider/instanceprovider.go @@ -47,6 +47,7 @@ type provider struct { instanceID base.SQLInstanceID sessionID sqlliveness.SessionID initError error + isSkeleton bool mu struct { syncutil.Mutex started bool @@ -62,6 +63,7 @@ func New( addr string, f *rangefeed.Factory, clock *hlc.Clock, + isSkeleton bool, ) sqlinstance.Provider { storage := instancestorage.NewStorage(db, codec, slProvider) reader := instancestorage.NewReader(storage, slProvider.CachedReader(), f, codec, clock, stopper) @@ -72,6 +74,7 @@ func New( session: slProvider, instanceAddr: addr, initialized: make(chan struct{}), + isSkeleton: isSkeleton, } return p } @@ -81,6 +84,12 @@ func (p *provider) Start(ctx context.Context) error { if p.started() { return p.initError } + // Initialize the instance. We need to do this before starting the reader, so + // that the reader sees the instance. + if err := p.initAndWait(ctx); err != nil { + return err + } + if err := p.Reader.Start(ctx); err != nil { p.initOnce.Do(func() { p.initError = err @@ -106,21 +115,35 @@ func (p *provider) Instance( if !p.started() { return base.SQLInstanceID(0), "", sqlinstance.NotStartedError } - - p.maybeInitialize() select { case <-ctx.Done(): return base.SQLInstanceID(0), "", ctx.Err() case <-p.stopper.ShouldQuiesce(): return base.SQLInstanceID(0), "", stop.ErrUnavailable + case <-p.initialized: + return p.instanceID, p.sessionID, p.initError + } +} + +func (p *provider) initAndWait(ctx context.Context) error { + if p.isSkeleton { + // Don't initialize the instance if the provider is a placeholder. + return nil + } + p.maybeInitialize() + select { + case <-ctx.Done(): + return ctx.Err() + case <-p.stopper.ShouldQuiesce(): + return stop.ErrUnavailable case <-p.initialized: if p.initError == nil { log.Ops.Infof(ctx, "created SQL instance %d", p.instanceID) } else { log.Ops.Warningf(ctx, "error creating SQL instance: %s", p.initError) } - return p.instanceID, p.sessionID, p.initError } + return p.initError } func (p *provider) maybeInitialize() { diff --git a/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go b/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go index 1ccbcee894b0..15949afe64f7 100644 --- a/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go +++ b/pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go @@ -65,6 +65,7 @@ func TestInstanceProvider(t *testing.T) { defer stopper.Stop(ctx) instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, addr) slInstance.Start(ctx) + instanceProvider.InitAndWaitForTest(ctx) instanceID, sessionID, err := instanceProvider.Instance(ctx) require.NoError(t, err) require.Equal(t, expectedInstanceID, instanceID) @@ -101,6 +102,7 @@ func TestInstanceProvider(t *testing.T) { instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, "addr") slInstance.Start(ctx) instanceProvider.ShutdownSQLInstanceForTest(ctx) + instanceProvider.InitAndWaitForTest(ctx) _, _, err := instanceProvider.Instance(ctx) require.Error(t, err) require.Equal(t, "instance never initialized", err.Error()) diff --git a/pkg/sql/sqlinstance/instanceprovider/test_helpers.go b/pkg/sql/sqlinstance/instanceprovider/test_helpers.go index dd83dd00b727..a77e69ffb6c9 100644 --- a/pkg/sql/sqlinstance/instanceprovider/test_helpers.go +++ b/pkg/sql/sqlinstance/instanceprovider/test_helpers.go @@ -20,9 +20,10 @@ import ( ) // TestInstanceProvider exposes ShutdownSQLInstanceForTest -// method for testing purposes. +// and InitAndWaitForTest methods for testing purposes. type TestInstanceProvider interface { sqlinstance.Provider + InitAndWaitForTest(context.Context) ShutdownSQLInstanceForTest(context.Context) } @@ -43,6 +44,14 @@ func NewTestInstanceProvider( return p } +// InitAndWaitForTest explicitly calls initAndWait for testing purposes. +func (p *provider) InitAndWaitForTest(ctx context.Context) { + if err := p.initAndWait(ctx); err != nil { + //nolint:returnerrcheck + return + } +} + // ShutdownSQLInstanceForTest explicitly calls shutdownSQLInstance for testing purposes. func (p *provider) ShutdownSQLInstanceForTest(ctx context.Context) { p.shutdownSQLInstance(ctx) diff --git a/pkg/sql/sqlinstance/sqlinstance.go b/pkg/sql/sqlinstance/sqlinstance.go index 11247e2718dc..e87bef394aca 100644 --- a/pkg/sql/sqlinstance/sqlinstance.go +++ b/pkg/sql/sqlinstance/sqlinstance.go @@ -47,8 +47,8 @@ type Provider interface { // Instance returns the instance ID and sqlliveness.SessionID for the // current SQL instance. Instance(context.Context) (base.SQLInstanceID, sqlliveness.SessionID, error) - // Start starts the instanceprovider. This will block until - // the underlying instance data reader has been started. + // Start starts the instanceprovider and initializes the current SQL instance. + // This will block until the underlying instance data reader has been started. Start(context.Context) error }