Skip to content

Commit

Permalink
sql: initialize sql instance during instance provider start
Browse files Browse the repository at this point in the history
Before this change, there was a race condition where the instance
provider and the instance reader would start before the instance
provider created a SQL instance, potentially causing the reader to not
cache the instance before initialization was complete. This is
a problem in multi-tenant environments, where we may not be able to plan
queries if the reader does not know of any SQL instances.

This change moves sql instance initialization into the instance
provider's `Start()` function before starting the reader, so that the
instance is guaranteed to be visible to the reader on its first
rangefeed scan of the `system.sql_instances` table.

Fixes: cockroachdb#82706
Fixes: cockroachdb#81807
Fixes: cockroachdb#81567

Release note: None
  • Loading branch information
rharding6373 committed Jun 30, 2022
1 parent b630c08 commit 2c3afe4
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func verifyIngestionStats(t *testing.T, streamID int64, cutoverTime time.Time, s
func TestTenantStreaming(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
skip.WithIssue(t, 82706)

skip.UnderRace(t, "slow under race")

Expand Down
45 changes: 38 additions & 7 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,17 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
cfg.AmbientCtx,
cfg.stopper, cfg.clock, cfg.db, codec, cfg.Settings, sqllivenessKnobs,
)
cfg.sqlInstanceProvider = instanceprovider.New(
cfg.stopper, cfg.db, codec, cfg.sqlLivenessProvider, cfg.advertiseAddr, cfg.Locality, cfg.rangeFeedFactory, cfg.clock,
)
// If the node id is already populated, we only need to create a placeholder
// instance provider without initializing the instance, since this is not a
// SQL pod server.
_, isNotSQLPod := cfg.nodeIDContainer.OptionalNodeID()
if isNotSQLPod {
cfg.sqlInstanceProvider = instanceprovider.NewPlaceholder()
} else {
cfg.sqlInstanceProvider = instanceprovider.New(
cfg.stopper, cfg.db, codec, cfg.sqlLivenessProvider, cfg.advertiseAddr, cfg.Locality, cfg.rangeFeedFactory, cfg.clock,
)
}

if !codec.ForSystemTenant() {
// In a multi-tenant environment, use the sqlInstanceProvider to resolve
Expand Down Expand Up @@ -1108,6 +1116,24 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
}, nil
}

func maybeCheckTenantAccess(ctx context.Context, codec keys.SQLCodec, db *kv.DB) error {
if codec.ForSystemTenant() {
// Skip check for system tenant and return early.
return nil
}
// Perform a simple read to the tenant in order to verify that this instance
// will be authorized to access the tenant keyspace.
err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
start := codec.TenantPrefix()
end := start.PrefixEnd()
if _, err := txn.Scan(ctx, start, end, 1); err != nil {
return err
}
return nil
})
return err
}

// Checks if tenant exists. This function does a very superficial check to see if the system db
// has been bootstrapped for the tenant. This is not a complete check and is only sufficient
// to be used in the dev environment.
Expand Down Expand Up @@ -1140,6 +1166,11 @@ func (s *SQLServer) startSQLLivenessAndInstanceProviders(ctx context.Context) er
return err
}
}
// Check that this SQL instance will have access to this tenant's keyspace
// before initializing the instance. Otherwise, initialization may get stuck.
if err := maybeCheckTenantAccess(ctx, s.execCfg.Codec, s.execCfg.DB); err != nil {
return err
}
s.sqlLivenessProvider.Start(ctx)
// sqlInstanceProvider must always be started after sqlLivenessProvider
// as sqlInstanceProvider relies on the session initialized and maintained by
Expand All @@ -1150,7 +1181,7 @@ func (s *SQLServer) startSQLLivenessAndInstanceProviders(ctx context.Context) er
return nil
}

func (s *SQLServer) initInstanceID(ctx context.Context) error {
func (s *SQLServer) setInstanceID(ctx context.Context) error {
if _, ok := s.sqlIDContainer.OptionalNodeID(); ok {
// sqlIDContainer has already been initialized with a node ID,
// we don't need to initialize a SQL instance ID in this case
Expand Down Expand Up @@ -1179,8 +1210,8 @@ func (s *SQLServer) preStart(
socketFile string,
orphanedLeasesTimeThresholdNanos int64,
) error {
// The sqlliveness and sqlinstance subsystem should be started first to ensure instance ID is
// initialized prior to any other systems that need it.
// The sqlliveness and sqlinstance subsystem should be started first to ensure
// the instance ID is initialized prior to any other systems that need it.
if err := s.startSQLLivenessAndInstanceProviders(ctx); err != nil {
return err
}
Expand All @@ -1191,7 +1222,7 @@ func (s *SQLServer) preStart(
if err := maybeCheckTenantExists(ctx, s.execCfg.Codec, s.execCfg.DB); err != nil {
return err
}
if err := s.initInstanceID(ctx); err != nil {
if err := s.setInstanceID(ctx); err != nil {
return err
}
s.connManager = connManager
Expand Down
59 changes: 55 additions & 4 deletions pkg/sql/sqlinstance/instanceprovider/instanceprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,42 @@ type writer interface {
ReleaseInstanceID(ctx context.Context, instanceID base.SQLInstanceID) error
}

// provider implements the sqlinstance.Provider interface for access to the sqlinstance subsystem.
// placeholderProvider implements the sqlinstance.Provider interface as a
// placeholder for an instance provider, when an instance provider must be
// instantiated for a non-SQL instance. It starts a Reader to provide the
// AddressResolver interface, but otherwise throws unsupported errors.
type placeholderProvider struct {
*instancestorage.Reader
}

func NewPlaceholder() sqlinstance.Provider {
return &placeholderProvider{}
}

// Start implements the sqlinstance.Provider interface.
func (p *placeholderProvider) Start(ctx context.Context) error {
return nil
}

// Instance implements the sqlinstance.Provider interface.
func (p *placeholderProvider) Instance(
ctx context.Context,
) (_ base.SQLInstanceID, _ sqlliveness.SessionID, err error) {
return base.SQLInstanceID(0), "", sqlinstance.NotASQLInstanceError
}

// GetInstance implements the AddressResolver interface.
func (p *placeholderProvider) GetInstance(context.Context, base.SQLInstanceID) (sqlinstance.InstanceInfo, error) {
return sqlinstance.InstanceInfo{}, sqlinstance.NotASQLInstanceError
}

// GetAllInstances implements the AddressResolver interface.
func (p *placeholderProvider) GetAllInstances(context.Context) ([]sqlinstance.InstanceInfo, error) {
return nil, sqlinstance.NotASQLInstanceError
}

// provider implements the sqlinstance.Provider interface for access to the
// sqlinstance subsystem.
type provider struct {
*instancestorage.Reader
storage writer
Expand Down Expand Up @@ -85,6 +120,12 @@ func (p *provider) Start(ctx context.Context) error {
if p.started() {
return p.initError
}
// Initialize the instance. We need to do this before starting the reader, so
// that the reader sees the instance.
if err := p.initAndWait(ctx); err != nil {
return err
}

if err := p.Reader.Start(ctx); err != nil {
p.initOnce.Do(func() {
p.initError = err
Expand All @@ -110,21 +151,31 @@ func (p *provider) Instance(
if !p.started() {
return base.SQLInstanceID(0), "", sqlinstance.NotStartedError
}

p.maybeInitialize()
select {
case <-ctx.Done():
return base.SQLInstanceID(0), "", ctx.Err()
case <-p.stopper.ShouldQuiesce():
return base.SQLInstanceID(0), "", stop.ErrUnavailable
case <-p.initialized:
return p.instanceID, p.sessionID, p.initError
}
}

func (p *provider) initAndWait(ctx context.Context) error {
p.maybeInitialize()
select {
case <-ctx.Done():
return ctx.Err()
case <-p.stopper.ShouldQuiesce():
return stop.ErrUnavailable
case <-p.initialized:
if p.initError == nil {
log.Ops.Infof(ctx, "created SQL instance %d", p.instanceID)
} else {
log.Ops.Warningf(ctx, "error creating SQL instance: %s", p.initError)
}
return p.instanceID, p.sessionID, p.initError
}
return p.initError
}

func (p *provider) maybeInitialize() {
Expand Down
2 changes: 2 additions & 0 deletions pkg/sql/sqlinstance/instanceprovider/instanceprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestInstanceProvider(t *testing.T) {
defer stopper.Stop(ctx)
instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, addr)
slInstance.Start(ctx)
instanceProvider.InitAndWaitForTest(ctx)
instanceID, sessionID, err := instanceProvider.Instance(ctx)
require.NoError(t, err)
require.Equal(t, expectedInstanceID, instanceID)
Expand Down Expand Up @@ -101,6 +102,7 @@ func TestInstanceProvider(t *testing.T) {
instanceProvider := instanceprovider.NewTestInstanceProvider(stopper, slInstance, "addr")
slInstance.Start(ctx)
instanceProvider.ShutdownSQLInstanceForTest(ctx)
instanceProvider.InitAndWaitForTest(ctx)
_, _, err := instanceProvider.Instance(ctx)
require.Error(t, err)
require.Equal(t, "instance never initialized", err.Error())
Expand Down
8 changes: 7 additions & 1 deletion pkg/sql/sqlinstance/instanceprovider/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import (
)

// TestInstanceProvider exposes ShutdownSQLInstanceForTest
// method for testing purposes.
// and InitAndWaitForTest methods for testing purposes.
type TestInstanceProvider interface {
sqlinstance.Provider
InitAndWaitForTest(context.Context)
ShutdownSQLInstanceForTest(context.Context)
}

Expand All @@ -43,6 +44,11 @@ func NewTestInstanceProvider(
return p
}

// InitAndWaitForTest explicitly calls initAndWait for testing purposes.
func (p *provider) InitAndWaitForTest(ctx context.Context) {
_ = p.initAndWait(ctx)
}

// ShutdownSQLInstanceForTest explicitly calls shutdownSQLInstance for testing purposes.
func (p *provider) ShutdownSQLInstanceForTest(ctx context.Context) {
p.shutdownSQLInstance(ctx)
Expand Down
8 changes: 6 additions & 2 deletions pkg/sql/sqlinstance/sqlinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ type Provider interface {
// Instance returns the instance ID and sqlliveness.SessionID for the
// current SQL instance.
Instance(context.Context) (base.SQLInstanceID, sqlliveness.SessionID, error)
// Start starts the instanceprovider. This will block until
// the underlying instance data reader has been started.
// Start starts the instanceprovider and initializes the current SQL instance.
// This will block until the underlying instance data reader has been started.
Start(context.Context) error
}

Expand All @@ -59,3 +59,7 @@ var NonExistentInstanceError = errors.Errorf("non existent SQL instance")

// NotStartedError can be returned if the sqlinstance subsystem has not been started yet.
var NotStartedError = errors.Errorf("sqlinstance subsystem not started")

// NotASQLInstanceError can be returned if a function is is not supported for
// non-SQL instances.
var NotASQLInstanceError = errors.Errorf("not supported for non-SQL instance")

0 comments on commit 2c3afe4

Please sign in to comment.