diff --git a/pkg/ccl/backupccl/system_schema.go b/pkg/ccl/backupccl/system_schema.go index 181ad48e5165..04e23ee2bd0d 100644 --- a/pkg/ccl/backupccl/system_schema.go +++ b/pkg/ccl/backupccl/system_schema.go @@ -525,7 +525,7 @@ var systemTableBackupConfiguration = map[string]systemBackupConfiguration{ systemschema.TenantUsageTable.GetName(): { shouldIncludeInClusterBackup: optOutOfClusterBackup, }, - systemschema.SQLInstancesTable.GetName(): { + systemschema.SQLInstancesTable().GetName(): { shouldIncludeInClusterBackup: optOutOfClusterBackup, }, systemschema.SpanConfigurationsTable.GetName(): { diff --git a/pkg/ccl/multiregionccl/BUILD.bazel b/pkg/ccl/multiregionccl/BUILD.bazel index 4299e8b92f47..a93ef2546d9a 100644 --- a/pkg/ccl/multiregionccl/BUILD.bazel +++ b/pkg/ccl/multiregionccl/BUILD.bazel @@ -60,19 +60,22 @@ go_test( "//pkg/security/securitytest", "//pkg/server", "//pkg/server/serverpb", + "//pkg/settings/cluster", "//pkg/sql", "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/descs", "//pkg/sql/catalog/desctestutils", - "//pkg/sql/enum", + "//pkg/sql/catalog/tabledesc", + "//pkg/sql/catalog/typedesc", "//pkg/sql/execinfra", "//pkg/sql/parser", "//pkg/sql/rowenc", "//pkg/sql/sem/tree", - "//pkg/sql/sqlliveness/slstorage", + "//pkg/sql/sqlliveness", "//pkg/sql/sqltestutils", "//pkg/sql/tests", + "//pkg/sql/types", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", @@ -80,15 +83,12 @@ go_test( "//pkg/testutils/testcluster", "//pkg/util", "//pkg/util/envutil", - "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", "//pkg/util/randutil", "//pkg/util/syncutil", - "//pkg/util/timeutil", "//pkg/util/tracing", "//pkg/util/tracing/tracingpb", - "//pkg/util/uuid", "@com_github_cockroachdb_apd_v3//:apd", "@com_github_cockroachdb_datadriven//:datadriven", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/ccl/multiregionccl/multiregion_system_table_test.go b/pkg/ccl/multiregionccl/multiregion_system_table_test.go index 7b0dab9004ec..51893decd4f9 100644 --- a/pkg/ccl/multiregionccl/multiregion_system_table_test.go +++ b/pkg/ccl/multiregionccl/multiregion_system_table_test.go @@ -10,131 +10,224 @@ package multiregionccl import ( "context" - "fmt" + gosql "database/sql" "testing" - "time" "github.com/cockroachdb/apd/v3" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl/multiregionccltestutils" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/enum" - "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/envutil" - "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/timeutil" - "github.com/cockroachdb/cockroach/pkg/util/uuid" + "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) -func createSqllivenessTable( - t *testing.T, db *sqlutils.SQLRunner, dbName string, -) (tableID descpb.ID) { - t.Helper() - db.Exec(t, fmt.Sprintf(` - CREATE DATABASE IF NOT EXISTS "%s" - WITH PRIMARY REGION "us-east1" - REGIONS "us-east1", "us-east2", "us-east3" - `, dbName)) - - // expiration needs to be column 2. slstorage.Table assumes the column id. - // session_uuid and crdb_region are identified by their location in the - // primary key. - db.Exec(t, fmt.Sprintf(` - CREATE TABLE "%s".sqlliveness ( - session_uuid BYTES NOT NULL, - expiration DECIMAL NOT NULL, - crdb_region "%s".public.crdb_internal_region, - PRIMARY KEY(crdb_region, session_uuid) - ) LOCALITY REGIONAL BY ROW; - `, dbName, dbName)) - db.QueryRow(t, ` - select u.id - from system.namespace t - join system.namespace u - on t.id = u."parentID" - where t.name = $1 and u.name = $2`, - dbName, "sqlliveness").Scan(&tableID) - return tableID -} - -func TestMrSystemDatabase(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")() +// alterCrdbRegionType converts the crdb_region []byte column in a system +// database table into the system database's enum type. +func alterCrdbRegionType( + ctx context.Context, tableID descpb.ID, db *kv.DB, executor descs.TxnManager, +) error { + flags := tree.CommonLookupFlags{ + Required: true, + AvoidLeased: true, + } + objFlags := tree.ObjectLookupFlags{ + CommonLookupFlags: flags, + } - _, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(t, 3, base.TestingKnobs{}) - defer cleanup() + getRegionEnum := func(systemDB catalog.DatabaseDescriptor, txn *kv.Txn, collection *descs.Collection) (*typedesc.Mutable, *types.T, error) { + enumID, err := systemDB.MultiRegionEnumID() + if err != nil { + return nil, nil, err + } + enumTypeDesc, err := collection.GetMutableTypeByID(ctx, txn, enumID, objFlags) + if err != nil { + return nil, nil, err + } + schema, err := collection.GetImmutableSchemaByID(ctx, txn, enumTypeDesc.GetParentSchemaID(), flags) + if err != nil { + return nil, nil, err + } + enumName := tree.MakeQualifiedTypeName(systemDB.GetName(), schema.GetName(), enumTypeDesc.GetName()) + enumType, err := enumTypeDesc.MakeTypesT(ctx, &enumName, nil) + if err != nil { + return nil, nil, err + } + return enumTypeDesc, enumType, nil + } - tDB := sqlutils.MakeSQLRunner(sqlDB) + getMutableColumn := func(table *tabledesc.Mutable, name string) (*descpb.ColumnDescriptor, error) { + for i := range table.Columns { + if table.Columns[i].Name == name { + return &table.Columns[i], nil + } + } + return nil, errors.New("crdb_region column not found") + } - t.Run("Sqlliveness", func(t *testing.T) { - row := tDB.QueryRow(t, `SELECT crdb_region, session_uuid, expiration FROM system.sqlliveness LIMIT 1`) - var sessionUUID string - var crdbRegion string - var rawExpiration apd.Decimal - row.Scan(&crdbRegion, &sessionUUID, &rawExpiration) + err := executor.DescsTxn(ctx, db, func(ctx context.Context, txn *kv.Txn, collection *descs.Collection) error { + _, systemDB, err := collection.GetImmutableDatabaseByID(ctx, txn, keys.SystemDatabaseID, flags) + if err != nil { + return err + } + + enumTypeDesc, enumType, err := getRegionEnum(systemDB, txn, collection) + if err != nil { + return err + } + + // Change the crdb_region column's type to the enum + tableDesc, err := collection.GetMutableTableByID(ctx, txn, tableID, objFlags) + if err != nil { + return err + } + column, err := getMutableColumn(tableDesc, "crdb_region") + if err != nil { + return err + } + column.Type = enumType + if err := collection.WriteDesc(ctx, false, tableDesc, txn); err != nil { + return err + } + + // Add a back reference to the enum + enumTypeDesc.AddReferencingDescriptorID(tableID) + return collection.WriteDesc(ctx, false, enumTypeDesc, txn) }) + if err != nil { + return errors.Wrapf(err, "unable to change crdb_region from []byte to the multi-region enum for table %d", tableID) + } + return err } -func TestRbrSqllivenessTable(t *testing.T) { +func TestMrSystemDatabase(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")() - ctx := context.Background() + // Enable settings required for configuring a tenant's system database as multi-region. + cs := cluster.MakeTestingClusterSettings() + sql.SecondaryTenantsMultiRegionAbstractionsEnabled.Override(context.Background(), &cs.SV, true) + sql.SecondaryTenantZoneConfigsEnabled.Override(context.Background(), &cs.SV, true) - cluster, sqlDB, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(t, 3, base.TestingKnobs{}) + cluster, _, cleanup := multiregionccltestutils.TestingCreateMultiRegionCluster(t, 3, base.TestingKnobs{}, multiregionccltestutils.WithSettings(cs)) defer cleanup() - kvDB := cluster.Servers[0].DB() - settings := cluster.Servers[0].Cfg.Settings - stopper := cluster.Servers[0].Stopper() - - tDB := sqlutils.MakeSQLRunner(sqlDB) - - t0 := time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC) - timeSource := timeutil.NewManualTime(t0) - clock := hlc.NewClock(timeSource, base.DefaultMaxClockOffset) - - setup := func(t *testing.T) *slstorage.Storage { - dbName := t.Name() - tableID := createSqllivenessTable(t, tDB, dbName) - var ambientCtx log.AmbientContext - // rbrIndexID is the index id used to access the regional by row index in - // tests. In production it will be index 2, but the freshly created test table - // will have index 1. - const rbrIndexID = 1 - return slstorage.NewTestingStorage(ambientCtx, stopper, clock, kvDB, keys.SystemSQLCodec, settings, - tableID, rbrIndexID, timeSource.NewTimer) + + id, err := roachpb.MakeTenantID(11) + require.NoError(t, err) + + tenantArgs := base.TestTenantArgs{ + Settings: cs, + TenantID: id, + Locality: *cluster.Servers[0].Locality(), } + tenantServer, tenantSQL := serverutils.StartTenant(t, cluster.Servers[0], tenantArgs) - t.Run("SqlRead", func(t *testing.T) { - storage := setup(t) + tDB := sqlutils.MakeSQLRunner(tenantSQL) - initialUUID := uuid.MakeV4() - session, err := slstorage.MakeSessionID(enum.One, initialUUID) - require.NoError(t, err) + tDB.Exec(t, `ALTER DATABASE system SET PRIMARY REGION "us-east1"`) + tDB.Exec(t, `ALTER DATABASE system ADD REGION "us-east2"`) + tDB.Exec(t, `ALTER DATABASE system ADD REGION "us-east3"`) - writeExpiration := clock.Now().Add(10, 00) - require.NoError(t, storage.Insert(ctx, session, writeExpiration)) + ctx := context.Background() + executor := tenantServer.ExecutorConfig().(sql.ExecutorConfig) + // Changing the type of the crdb_region field is required to modify the + // types with SET LOCALITY REGIONAL BY ROW. + require.NoError(t, alterCrdbRegionType(ctx, keys.SqllivenessID, executor.DB, executor.InternalExecutorFactory)) + require.NoError(t, alterCrdbRegionType(ctx, keys.SQLInstancesTableID, executor.DB, executor.InternalExecutorFactory)) + + // Run schema validations to ensure the manual descriptor modifications are + // okay. + tDB.CheckQueryResults(t, `SELECT * FROM crdb_internal.invalid_objects`, [][]string{}) + + t.Run("Sqlliveness", func(t *testing.T) { + tDB.Exec(t, `ALTER TABLE system.sqlliveness SET LOCALITY REGIONAL BY ROW`) + row := tDB.QueryRow(t, `SELECT crdb_region, session_uuid, expiration FROM system.sqlliveness LIMIT 1`) var sessionUUID string var crdbRegion string var rawExpiration apd.Decimal - - row := tDB.QueryRow(t, fmt.Sprintf(`SELECT crdb_region, session_uuid, expiration FROM "%s".sqlliveness`, t.Name())) row.Scan(&crdbRegion, &sessionUUID, &rawExpiration) + require.Equal(t, "us-east1", crdbRegion) - require.Contains(t, []string{"us-east1", "us-east2", "us-east3"}, crdbRegion) - require.Equal(t, sessionUUID, string(initialUUID.GetBytes())) - - readExpiration, err := hlc.DecimalToHLC(&rawExpiration) - require.NoError(t, err) + }) - require.Equal(t, writeExpiration, readExpiration) + t.Run("Sqlinstances", func(t *testing.T) { + tDB.Exec(t, `ALTER TABLE system.sql_instances SET LOCALITY REGIONAL BY ROW`) + + t.Run("InUse", func(t *testing.T) { + query := ` + SELECT id, addr, session_id, locality, crdb_region + FROM system.sql_instances + WHERE session_id IS NOT NULL + ` + rows := tDB.Query(t, query) + require.True(t, rows.Next()) + for { + var id base.SQLInstanceID + var addr, locality string + var crdb_region string + var session sqlliveness.SessionID + + require.NoError(t, rows.Scan(&id, &addr, &session, &locality, &crdb_region)) + + require.True(t, 0 < id) + require.NotEmpty(t, addr) + require.NotEmpty(t, locality) + require.NotEmpty(t, session) + require.NotEmpty(t, crdb_region) + + require.Equal(t, "us-east1", crdb_region) + + if !rows.Next() { + break + } + } + require.NoError(t, rows.Close()) + }) + + t.Run("Preallocated", func(t *testing.T) { + query := ` + SELECT id, addr, session_id, locality, crdb_region + FROM system.sql_instances + WHERE session_id IS NULL + ` + rows := tDB.Query(t, query) + require.True(t, rows.Next()) + for { + var id base.SQLInstanceID + var addr, locality, session gosql.NullString + var crdb_region string + + require.NoError(t, rows.Scan(&id, &addr, &session, &locality, &crdb_region)) + + require.True(t, 0 < id) + require.False(t, addr.Valid) + require.False(t, locality.Valid) + require.False(t, session.Valid) + require.NotEmpty(t, crdb_region) + + if !rows.Next() { + break + } + } + require.NoError(t, rows.Close()) + }) }) } diff --git a/pkg/ccl/multiregionccl/multiregionccltestutils/BUILD.bazel b/pkg/ccl/multiregionccl/multiregionccltestutils/BUILD.bazel index b25c99ad3cfb..a6d2e2e60bac 100644 --- a/pkg/ccl/multiregionccl/multiregionccltestutils/BUILD.bazel +++ b/pkg/ccl/multiregionccl/multiregionccltestutils/BUILD.bazel @@ -9,6 +9,7 @@ go_library( deps = [ "//pkg/base", "//pkg/roachpb", + "//pkg/settings/cluster", "//pkg/testutils/testcluster", "@com_github_cockroachdb_errors//:errors", ], diff --git a/pkg/ccl/multiregionccl/multiregionccltestutils/testutils.go b/pkg/ccl/multiregionccl/multiregionccltestutils/testutils.go index 98083a07fa13..f2c97cfbf776 100644 --- a/pkg/ccl/multiregionccl/multiregionccltestutils/testutils.go +++ b/pkg/ccl/multiregionccl/multiregionccltestutils/testutils.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/errors" ) @@ -24,6 +25,7 @@ type multiRegionTestClusterParams struct { baseDir string replicationMode base.TestClusterReplicationMode useDatabase string + settings *cluster.Settings } // MultiRegionTestClusterParamsOption is an option that can be passed to @@ -55,6 +57,13 @@ func WithUseDatabase(db string) MultiRegionTestClusterParamsOption { } } +// WithSettings is used to configure the settings the cluster is created with. +func WithSettings(settings *cluster.Settings) MultiRegionTestClusterParamsOption { + return func(params *multiRegionTestClusterParams) { + params.settings = settings + } +} + // TestingCreateMultiRegionCluster creates a test cluster with numServers number // of nodes and the provided testing knobs applied to each of the nodes. Every // node is placed in its own locality, named "us-east1", "us-east2", and so on. @@ -97,6 +106,7 @@ func TestingCreateMultiRegionClusterWithRegionList( for _, region := range regionNames { for i := 0; i < serversPerRegion; i++ { serverArgs[totalServerCount] = base.TestServerArgs{ + Settings: params.settings, Knobs: knobs, ExternalIODir: params.baseDir, UseDatabase: params.useDatabase, diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index 978c16180d60..2d7f9f9e22ef 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -500,7 +500,7 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) { cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings) cfg.sqlInstanceReader = instancestorage.NewReader( cfg.sqlInstanceStorage, - cfg.sqlLivenessProvider.CachedReader(), + cfg.sqlLivenessProvider, cfg.rangeFeedFactory, codec, cfg.clock, cfg.stopper) diff --git a/pkg/sql/catalog/bootstrap/metadata.go b/pkg/sql/catalog/bootstrap/metadata.go index 7067fd48880b..badb03e60947 100644 --- a/pkg/sql/catalog/bootstrap/metadata.go +++ b/pkg/sql/catalog/bootstrap/metadata.go @@ -345,7 +345,7 @@ func addSystemDescriptorsToSchema(target *MetadataSchema) { target.AddDescriptor(systemschema.TransactionStatisticsTable) target.AddDescriptor(systemschema.DatabaseRoleSettingsTable) target.AddDescriptorForSystemTenant(systemschema.TenantUsageTable) - target.AddDescriptor(systemschema.SQLInstancesTable) + target.AddDescriptor(systemschema.SQLInstancesTable()) target.AddDescriptorForSystemTenant(systemschema.SpanConfigurationsTable) // Tables introduced in 22.1. diff --git a/pkg/sql/catalog/systemschema/system.go b/pkg/sql/catalog/systemschema/system.go index 0bad8c7a6ed4..a1662d55a9ba 100644 --- a/pkg/sql/catalog/systemschema/system.go +++ b/pkg/sql/catalog/systemschema/system.go @@ -658,6 +658,17 @@ CREATE TABLE system.sql_instances ( FAMILY "primary" (id, addr, session_id, locality) )` + MrSQLInstancesTableSchema = ` +CREATE TABLE system.sql_instances ( + id INT NOT NULL, + addr STRING, + session_id BYTES, + locality JSONB, + crdb_region BYTES NOT NULL, + CONSTRAINT "primary" PRIMARY KEY (crdb_region, id), + FAMILY "primary" (crdb_region, id, addr, session_id, locality) +)` + SpanConfigurationsTableSchema = ` CREATE TABLE system.span_configurations ( start_key BYTES NOT NULL, @@ -909,7 +920,7 @@ func MakeSystemTables() []SystemTable { TransactionStatisticsTable, DatabaseRoleSettingsTable, TenantUsageTable, - SQLInstancesTable, + SQLInstancesTable(), SpanConfigurationsTable, TenantSettingsTable, SpanCountTable, @@ -2482,31 +2493,68 @@ var ( }, )) - // SQLInstancesTable is the descriptor for the sqlinstances table - // It stores information about all the SQL instances for a tenant - // and their associated session, locality, and address information. - SQLInstancesTable = makeSystemTable( - SQLInstancesTableSchema, - systemTable( - catconstants.SQLInstancesTableName, - keys.SQLInstancesTableID, - []descpb.ColumnDescriptor{ - {Name: "id", ID: 1, Type: types.Int, Nullable: false}, - {Name: "addr", ID: 2, Type: types.String, Nullable: true}, - {Name: "session_id", ID: 3, Type: types.Bytes, Nullable: true}, - {Name: "locality", ID: 4, Type: types.Jsonb, Nullable: true}, - }, - []descpb.ColumnFamilyDescriptor{ - { - Name: "primary", - ID: 0, - ColumnNames: []string{"id", "addr", "session_id", "locality"}, - ColumnIDs: []descpb.ColumnID{1, 2, 3, 4}, - DefaultColumnID: 0, + // SQLInstancesTable is the descriptor for the sqlinstances table. It + // stores information about all the SQL instances for a tenant and their + // associated session, locality, and address information. + // + // TODO(jeffswenson): remove the function wrapper around the + // SQLInstanceTable descriptor. See TestSupportMultiRegion for context. + SQLInstancesTable = func() SystemTable { + if TestSupportMultiRegion() { + return makeSystemTable( + MrSQLInstancesTableSchema, + systemTable( + catconstants.SQLInstancesTableName, + keys.SQLInstancesTableID, + []descpb.ColumnDescriptor{ + {Name: "id", ID: 1, Type: types.Int, Nullable: false}, + {Name: "addr", ID: 2, Type: types.String, Nullable: true}, + {Name: "session_id", ID: 3, Type: types.Bytes, Nullable: true}, + {Name: "locality", ID: 4, Type: types.Jsonb, Nullable: true}, + {Name: "crdb_region", ID: 5, Type: types.Bytes, Nullable: false}, + }, + []descpb.ColumnFamilyDescriptor{ + { + Name: "primary", + ID: 0, + ColumnNames: []string{"id", "addr", "session_id", "locality", "crdb_region"}, + ColumnIDs: []descpb.ColumnID{1, 2, 3, 4, 5}, + DefaultColumnID: 0, + }, + }, + descpb.IndexDescriptor{ + Name: "primary", + ID: 2, + Unique: true, + KeyColumnNames: []string{"crdb_region", "id"}, + KeyColumnDirections: []catpb.IndexColumn_Direction{catpb.IndexColumn_ASC, catpb.IndexColumn_ASC}, + KeyColumnIDs: []descpb.ColumnID{5, 1}, + }, + )) + } + return makeSystemTable( + SQLInstancesTableSchema, + systemTable( + catconstants.SQLInstancesTableName, + keys.SQLInstancesTableID, + []descpb.ColumnDescriptor{ + {Name: "id", ID: 1, Type: types.Int, Nullable: false}, + {Name: "addr", ID: 2, Type: types.String, Nullable: true}, + {Name: "session_id", ID: 3, Type: types.Bytes, Nullable: true}, + {Name: "locality", ID: 4, Type: types.Jsonb, Nullable: true}, }, - }, - pk("id"), - )) + []descpb.ColumnFamilyDescriptor{ + { + Name: "primary", + ID: 0, + ColumnNames: []string{"id", "addr", "session_id", "locality"}, + ColumnIDs: []descpb.ColumnID{1, 2, 3, 4}, + DefaultColumnID: 0, + }, + }, + pk("id"), + )) + } // SpanConfigurationsTable is the descriptor for the system tenant's span // configurations table. diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 336a02f30521..af4bd70c2649 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -201,11 +201,11 @@ var allowCrossDatabaseSeqReferences = settings.RegisterBoolSetting( // tenant. const SecondaryTenantsZoneConfigsEnabledSettingName = "sql.zone_configs.allow_for_secondary_tenant.enabled" -// secondaryTenantZoneConfigsEnabled controls if secondary tenants are allowed +// SecondaryTenantZoneConfigsEnabled controls if secondary tenants are allowed // to set zone configurations. It has no effect for the system tenant. // // This setting has no effect on zone configurations that have already been set. -var secondaryTenantZoneConfigsEnabled = settings.RegisterBoolSetting( +var SecondaryTenantZoneConfigsEnabled = settings.RegisterBoolSetting( settings.TenantReadOnly, SecondaryTenantsZoneConfigsEnabledSettingName, "allow secondary tenants to set zone configurations; does not affect the system tenant", diff --git a/pkg/sql/set_zone_config.go b/pkg/sql/set_zone_config.go index 06379f1baf51..51274b8065d5 100644 --- a/pkg/sql/set_zone_config.go +++ b/pkg/sql/set_zone_config.go @@ -240,7 +240,7 @@ func (p *planner) SetZoneConfig(ctx context.Context, n *tree.SetZoneConfig) (pla } if !p.ExecCfg().Codec.ForSystemTenant() && - !secondaryTenantZoneConfigsEnabled.Get(&p.ExecCfg().Settings.SV) { + !SecondaryTenantZoneConfigsEnabled.Get(&p.ExecCfg().Settings.SV) { // Return an unimplemented error here instead of referencing the cluster // setting here as zone configurations for secondary tenants are intended to // be hidden. diff --git a/pkg/sql/sqlinstance/instancestorage/BUILD.bazel b/pkg/sql/sqlinstance/instancestorage/BUILD.bazel index 9a0e1e19e770..9e4d2dd05aec 100644 --- a/pkg/sql/sqlinstance/instancestorage/BUILD.bazel +++ b/pkg/sql/sqlinstance/instancestorage/BUILD.bazel @@ -16,6 +16,7 @@ go_library( "//pkg/keys", "//pkg/kv", "//pkg/kv/kvclient/rangefeed", + "//pkg/kv/kvserver/concurrency/lock", "//pkg/multitenant", "//pkg/roachpb", "//pkg/settings", @@ -23,12 +24,12 @@ go_library( "//pkg/sql/catalog", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/systemschema", - "//pkg/sql/rowenc", + "//pkg/sql/enum", "//pkg/sql/rowenc/valueside", "//pkg/sql/sem/tree", "//pkg/sql/sqlinstance", "//pkg/sql/sqlliveness", - "//pkg/sql/types", + "//pkg/sql/sqlliveness/slstorage", "//pkg/util/encoding", "//pkg/util/grpcutil", "//pkg/util/hlc", @@ -37,7 +38,6 @@ go_library( "//pkg/util/retry", "//pkg/util/stop", "//pkg/util/syncutil", - "//pkg/util/syncutil/singleflight", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", ], @@ -50,6 +50,7 @@ go_test( "instancestorage_internal_test.go", "instancestorage_test.go", "main_test.go", + "row_codec_test.go", ], args = ["-test.timeout=295s"], embed = [":instancestorage"], @@ -65,6 +66,7 @@ go_test( "//pkg/settings/cluster", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/systemschema", + "//pkg/sql/enum", "//pkg/sql/sqlinstance", "//pkg/sql/sqlliveness", "//pkg/sql/sqlliveness/slstorage", @@ -72,6 +74,8 @@ go_test( "//pkg/testutils/serverutils", "//pkg/testutils/sqlutils", "//pkg/testutils/testcluster", + "//pkg/util/encoding", + "//pkg/util/envutil", "//pkg/util/hlc", "//pkg/util/leaktest", "//pkg/util/log", @@ -79,6 +83,7 @@ go_test( "//pkg/util/stop", "//pkg/util/syncutil", "//pkg/util/timeutil", + "//pkg/util/uuid", "@com_github_cockroachdb_errors//:errors", "@com_github_stretchr_testify//require", ], diff --git a/pkg/sql/sqlinstance/instancestorage/instancereader_test.go b/pkg/sql/sqlinstance/instancestorage/instancereader_test.go index a7e97c25a756..2f4f44e616a8 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancereader_test.go +++ b/pkg/sql/sqlinstance/instancestorage/instancereader_test.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/enum" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" @@ -72,7 +73,7 @@ func TestReader(t *testing.T) { t.Run("basic-get-instance-data", func(t *testing.T) { storage, slStorage, clock, reader := setup(t) require.NoError(t, reader.Start(ctx)) - const sessionID = sqlliveness.SessionID("session_id") + sessionID := makeSession() const addr = "addr" locality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "test"}, {Key: "az", Value: "a"}}} // Set a high enough expiration to ensure the session stays @@ -111,9 +112,10 @@ func TestReader(t *testing.T) { require.NoError(t, reader.Start(ctx)) // Set up expected test data. + region := enum.One instanceIDs := []base.SQLInstanceID{1, 2, 3} addresses := []string{"addr1", "addr2", "addr3"} - sessionIDs := []sqlliveness.SessionID{"session1", "session2", "session3"} + sessionIDs := []sqlliveness.SessionID{makeSession(), makeSession(), makeSession()} localities := []roachpb.Locality{ {Tiers: []roachpb.Tier{{Key: "region", Value: "region1"}}}, {Tiers: []roachpb.Tier{{Key: "region", Value: "region2"}}}, @@ -166,7 +168,7 @@ func TestReader(t *testing.T) { // Release an instance and verify only active instances are returned. { - err := storage.ReleaseInstanceID(ctx, instanceIDs[0]) + err := storage.ReleaseInstanceID(ctx, region, instanceIDs[0]) if err != nil { t.Fatal(err) } @@ -200,7 +202,7 @@ func TestReader(t *testing.T) { // the latest instance information is returned. This heuristic is used // when instance information isn't released correctly prior to SQL instance shutdown. { - sessionID := sqlliveness.SessionID("session4") + sessionID := makeSession() locality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "region4"}}} sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0) id, err := storage.CreateInstance(ctx, sessionID, sessionExpiry, addresses[2], locality) @@ -228,9 +230,10 @@ func TestReader(t *testing.T) { storage, slStorage, clock, reader := setup(t) require.NoError(t, reader.Start(ctx)) // Create three instances and release one. + region := enum.One instanceIDs := [...]base.SQLInstanceID{1, 2, 3} addresses := [...]string{"addr1", "addr2", "addr3"} - sessionIDs := [...]sqlliveness.SessionID{"session1", "session2", "session3"} + sessionIDs := [...]sqlliveness.SessionID{makeSession(), makeSession(), makeSession()} localities := [...]roachpb.Locality{ {Tiers: []roachpb.Tier{{Key: "region", Value: "region1"}}}, {Tiers: []roachpb.Tier{{Key: "region", Value: "region2"}}}, @@ -270,7 +273,7 @@ func TestReader(t *testing.T) { // Verify request for released instance data results in an error. { - err := storage.ReleaseInstanceID(ctx, instanceIDs[0]) + err := storage.ReleaseInstanceID(ctx, region, instanceIDs[0]) if err != nil { t.Fatal(err) } diff --git a/pkg/sql/sqlinstance/instancestorage/instancestorage.go b/pkg/sql/sqlinstance/instancestorage/instancestorage.go index e54e2c871513..b42674675c2c 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancestorage.go +++ b/pkg/sql/sqlinstance/instancestorage/instancestorage.go @@ -16,23 +16,24 @@ import ( "context" "math" "math/rand" - "sort" "time" "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/enum" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" + "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/stop" - "github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" ) @@ -70,13 +71,17 @@ var PreallocatedCount = settings.RegisterIntSetting( var errNoPreallocatedRows = errors.New("no preallocated rows") // Storage implements the storage layer for the sqlinstance subsystem. +// +// SQL Instance IDs must be globally unique. The SQL Instance table may be +// partitioned by region. In order to allow for fast cold starts, SQL Instances +// are pre-allocated into each region. If a sql_instance row does not have a +// session id, it is available for immediate use. It is also legal to reclaim +// instances ids if the owning session has expired. type Storage struct { - codec keys.SQLCodec - db *kv.DB - slReader sqlliveness.Reader - rowcodec rowCodec - settings *cluster.Settings - reclaimGroup singleflight.Group + db *kv.DB + slReader sqlliveness.Reader + rowcodec rowCodec + settings *cluster.Settings // TestingKnobs refers to knobs used for testing. TestingKnobs struct { // JitteredIntervalFn corresponds to the function used to jitter the @@ -87,6 +92,7 @@ type Storage struct { // instancerow encapsulates data for a single row within the sql_instances table. type instancerow struct { + region []byte instanceID base.SQLInstanceID addr string sessionID sqlliveness.SessionID @@ -97,11 +103,7 @@ type instancerow struct { // isAvailable returns true if the instance row hasn't been claimed by a SQL pod // (i.e. available for claiming), or false otherwise. func (r *instancerow) isAvailable() bool { - return r.addr == "" -} - -type dbScan interface { - Scan(ctx context.Context, begin, end interface{}, maxRows int64) ([]kv.KeyValue, error) + return r.sessionID == "" } // NewTestingStorage constructs a new storage with control for the database @@ -115,7 +117,6 @@ func NewTestingStorage( ) *Storage { s := &Storage{ db: db, - codec: codec, rowcodec: makeRowCodec(codec, sqlInstancesTableID), slReader: slReader, settings: settings, @@ -145,6 +146,14 @@ func (s *Storage) CreateInstance( if len(sessionID) == 0 { return base.SQLInstanceID(0), errors.New("no session information for instance") } + + region, _, err := slstorage.UnsafeDecodeSessionID(sessionID) + if err != nil { + return base.SQLInstanceID(0), errors.Wrap(err, "unable to determine region for sql_instance") + } + + // TODO(jeffswenson): advance session expiration. This can get stuck in a + // loop if the session already expired. ctx = multitenant.WithTenantCostControlExemption(ctx) assignInstance := func() (base.SQLInstanceID, error) { var availableID base.SQLInstanceID @@ -163,12 +172,14 @@ func (s *Storage) CreateInstance( return err } - availableID, err = s.getAvailableInstanceIDForRegion(ctx, txn) + // Try to retrieve an available instance ID. This blocks until one + // is available. + availableID, err = s.getAvailableInstanceIDForRegion(ctx, region, txn) if err != nil { return err } - key := s.rowcodec.encodeKey(availableID) + key := s.rowcodec.encodeKey(region, availableID) value, err := s.rowcodec.encodeValue(addr, sessionID, locality) if err != nil { log.Warningf(ctx, "failed to encode row for instance id %d: %v", availableID, err) @@ -183,10 +194,8 @@ func (s *Storage) CreateInstance( } return availableID, nil } - var err error - // There's a possibility where there are no available rows, and the cached - // reader takes time to fully find out that the sessions are dead, so retry - // with a backoff. + // It's possible that all allocated IDs are claimed, so retry with a back + // off. opts := retry.Options{ InitialBackoff: 50 * time.Millisecond, MaxBackoff: 200 * time.Millisecond, @@ -194,7 +203,7 @@ func (s *Storage) CreateInstance( } for r := retry.StartWithCtx(ctx, opts); r.Next(); { log.Infof(ctx, "assigning instance id to addr %s", addr) - instanceID, err = assignInstance() + instanceID, err := assignInstance() // Instance was successfully assigned an ID. if err == nil { return instanceID, err @@ -202,10 +211,19 @@ func (s *Storage) CreateInstance( if !errors.Is(err, errNoPreallocatedRows) { return base.SQLInstanceID(0), err } - // If the transaction failed because there were no pre-allocated rows, - // trigger reclaiming, and retry. This blocks until the reclaim process - // completes. - if err := s.generateAvailableInstanceRows(ctx, sessionExpiration); err != nil { + // If assignInstance failed because there are no available rows, + // allocate new instance IDs for the local region. + // + // There is a choice during start up: + // 1. Allocate for every region. + // 2. Allocate only for the local region. + // + // Allocating only for the local region removes one global round trip. + // In the uncontended case, allocating locally requires reading from + // every region, then writing to the local region. Allocating globally + // would require one round trip for reading and one round trip for + // writes. + if err := s.generateAvailableInstanceRows(ctx, [][]byte{region}, sessionExpiration); err != nil { log.Warningf(ctx, "failed to generate available instance rows: %v", err) } } @@ -221,163 +239,141 @@ func (s *Storage) CreateInstance( // TODO(jaylim-crl): Store current region enum in s once we implement regional // by row for the sql_instances table. func (s *Storage) getAvailableInstanceIDForRegion( - ctx context.Context, db dbScan, + ctx context.Context, region []byte, txn *kv.Txn, ) (base.SQLInstanceID, error) { - rows, err := s.getRegionalInstanceRows(ctx, db) + rows, err := s.getInstanceRows(ctx, region, txn, lock.WaitPolicy_SkipLocked) if err != nil { return base.SQLInstanceID(0), err } - sortAvailableRowsFirst(rows) - for i := 0; i < len(rows); i++ { - if rows[i].isAvailable() { - return rows[i].instanceID, nil + for _, row := range rows { + if row.isAvailable() { + return row.instanceID, nil } + } + for _, row := range rows { // If the row has already been used, check if the session is alive. // This is beneficial since the rows already belong to the same region. // We will only do this after checking all **available** instance IDs. - sessionAlive, _ := s.slReader.IsAlive(ctx, rows[i].sessionID) + // If there are no locally available regions, the caller needs to + // consult all regions to determine which IDs are safe to allocate. + sessionAlive, _ := s.slReader.IsAlive(ctx, row.sessionID) if !sessionAlive { - return rows[i].instanceID, nil + return row.instanceID, nil } } + return base.SQLInstanceID(0), errNoPreallocatedRows } -// idsToReclaim retrieves two lists of instance IDs, one to claim, and another -// to delete. -func (s *Storage) idsToReclaim( - ctx context.Context, db dbScan, -) (toClaim []base.SQLInstanceID, toDelete []base.SQLInstanceID, _ error) { - rows, err := s.getGlobalInstanceRows(ctx, db) - if err != nil { - return nil, nil, err +// reclaimRegion will reclaim instances belonging to expired sessions and +// delete surplus sessions. reclaimRegion should only be called by the +// background clean up and allocation job. +func (s *Storage) reclaimRegion(ctx context.Context, region []byte) error { + // In a separate transaction, read all rows that exist in the region. This + // allows us to check for expired sessions outside of a transaction. The + // expired sessions are stored in a map that is consulted in the clean up + // transaction. This is safe because once a session is in-active, it will + // never become active again. + var instances []instancerow + if err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + var err error + instances, err = s.getInstanceRows(ctx, region, txn, lock.WaitPolicy_Block) + return err + }); err != nil { + return err } - sortAvailableRowsFirst(rows) - availableCount := 0 - for _, row := range rows { - if row.isAvailable() { - availableCount++ - } else { - // Instance row has already been claimed. - sessionAlive, _ := s.slReader.IsAlive(ctx, row.sessionID) - if !sessionAlive { - toClaim = append(toClaim, row.instanceID) - } + // Build a map of expired regions + isExpired := map[sqlliveness.SessionID]bool{} + for i := range instances { + alive, err := s.slReader.IsAlive(ctx, instances[i].sessionID) + if err != nil { + return err } + isExpired[instances[i].sessionID] = !alive } - // Since PreallocatedCount is a cluster setting, there could be a scenario - // where we have more pre-allocated rows than requested. In that case, we - // will just ignore anyway. Eventually, it will converge to the requested - // count. - claimCount := int(math.Max(float64(int(PreallocatedCount.Get(&s.settings.SV))-availableCount), 0)) - - // Truncate toClaim, delete the rest, and we are done here. - if len(toClaim) > claimCount { - return toClaim[:claimCount], toClaim[claimCount:], nil - } - - // Sort in ascending order of instance IDs for the loop below. - sort.SliceStable(rows, func(idx1, idx2 int) bool { - return rows[idx1].instanceID < rows[idx2].instanceID - }) - - // Insufficient toClaim instances. Initialize prevInstanceID with starter - // value of 0 as instanceIDs begin from 1. - prevInstanceID := base.SQLInstanceID(0) - for i := 0; i < len(rows) && len(toClaim) < claimCount; { - // Check for a gap between adjacent instance IDs indicating the - // availability of an unused instance ID. - if rows[i].instanceID-prevInstanceID > 1 { - toClaim = append(toClaim, prevInstanceID+1) - prevInstanceID = prevInstanceID + 1 - } else { - prevInstanceID = rows[i].instanceID - i++ + // Reclaim and delete rows + target := int(PreallocatedCount.Get(&s.settings.SV)) + return s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + instances, err := s.getInstanceRows(ctx, region, txn, lock.WaitPolicy_Block) + if err != nil { + return err } - } - remainingCount := claimCount - len(toClaim) - for i := 1; i <= remainingCount; i++ { - toClaim = append(toClaim, prevInstanceID+base.SQLInstanceID(i)) - } - return toClaim, toDelete, nil -} + toReclaim, toDelete := idsToReclaim(target, instances, isExpired) -// sortAvailableRowsFirst sorts rows such that all available rows are placed -// in front. -func sortAvailableRowsFirst(rows []instancerow) { - sort.SliceStable(rows, func(idx1, idx2 int) bool { - addr1, addr2 := rows[idx1].addr, rows[idx2].addr - switch { - case addr1 == "" && addr2 == "": - // Both are available. - return rows[idx1].instanceID < rows[idx2].instanceID - case addr1 == "": - // addr1 should go before addr2. - return true - case addr2 == "": - // addr2 should go before addr1. - return false - default: - // Both are used. - return rows[idx1].instanceID < rows[idx2].instanceID + writeBatch := txn.NewBatch() + for _, instance := range toReclaim { + availableValue, err := s.rowcodec.encodeAvailableValue() + if err != nil { + return err + } + writeBatch.Put(s.rowcodec.encodeKey(region, instance), availableValue) + } + for _, instance := range toDelete { + writeBatch.Del(s.rowcodec.encodeKey(region, instance)) } - }) -} -// getGlobalInstanceRows decodes and returns all instance rows across all -// regions from the sql_instances table. -// -// TODO(jaylim-crl): For now, global and regional are the same. -func (s *Storage) getGlobalInstanceRows( - ctx context.Context, db dbScan, -) (instances []instancerow, _ error) { - return s.getRegionalInstanceRows(ctx, db) + return txn.CommitInBatch(ctx, writeBatch) + }) } -// getRegionalInstanceRows decodes and returns all instance rows associated +// getInstanceRows decodes and returns all instance rows associated // with a given region from the sql_instances table. This returns both used and // available instance rows. // // TODO(rima): Add locking mechanism to prevent thrashing at startup in the // case where multiple instances attempt to initialize their instance IDs // simultaneously. -// -// TODO(jaylim-crl): This currently fetches all rows. We want to only fetch rows -// associated with the region of the SQL pod. This method will likely need to -// take in a region (or if we stored the region in s). -func (s *Storage) getRegionalInstanceRows( - ctx context.Context, db dbScan, -) (instances []instancerow, _ error) { - start := s.rowcodec.makeIndexPrefix() - end := start.PrefixEnd() - // Fetch all rows. The expected data size is small, so it should - // be okay to fetch all rows together. - const maxRows = 0 - rows, err := db.Scan(ctx, start, end, maxRows) - if err != nil { +func (s *Storage) getInstanceRows( + ctx context.Context, region []byte, txn *kv.Txn, waitPolicy lock.WaitPolicy, +) ([]instancerow, error) { + var start roachpb.Key + if region == nil { + start = s.rowcodec.makeIndexPrefix() + } else { + start = s.rowcodec.makeRegionPrefix(region) + } + + // Scan the entire range + batch := txn.NewBatch() + batch.Header.WaitPolicy = waitPolicy + batch.Scan(start, start.PrefixEnd()) + if err := txn.Run(ctx, batch); err != nil { return nil, err } + if len(batch.Results) != 1 { + return nil, errors.AssertionFailedf("expected exactly on batch result found %d: %+v", len(batch.Results), batch.Results[1]) + } + if err := batch.Results[0].Err; err != nil { + return nil, err + } + rows := batch.Results[0].Rows + + // Convert the result to instancerows + instances := make([]instancerow, len(rows)) for i := range rows { - instance, err := s.rowcodec.decodeRow(rows[i].Key, rows[i].Value) + var err error + instances[i], err = s.rowcodec.decodeRow(rows[i].Key, rows[i].Value) if err != nil { return nil, err } - instances = append(instances, instance) } return instances, nil } -// ReleaseInstanceID deletes an instance ID record. The instance ID can then be -// reused by another SQL pod of the same tenant. -func (s *Storage) ReleaseInstanceID(ctx context.Context, id base.SQLInstanceID) error { +// ReleaseInstanceID deletes an instance ID record. The instance ID becomes +// available to be reused by another SQL pod of the same tenant. +// TODO(jeffswenson): delete this, it is unused. +func (s *Storage) ReleaseInstanceID( + ctx context.Context, region []byte, id base.SQLInstanceID, +) error { // TODO(andrei): Ensure that we do not delete an instance ID that we no longer // own, instead of deleting blindly. - key := s.rowcodec.encodeKey(id) + key := s.rowcodec.encodeKey(region, id) ctx = multitenant.WithTenantCostControlExemption(ctx) if _, err := s.db.Del(ctx, key); err != nil { return errors.Wrapf(err, "could not delete instance %d", id) @@ -393,6 +389,9 @@ func (s *Storage) RunInstanceIDReclaimLoop( ts timeutil.TimeSource, sessionExpirationFn func() hlc.Timestamp, ) error { + // TODO(jeffswenson): load regions from the system database enum. + regions := [][]byte{enum.One} + return stopper.RunAsyncTask(ctx, "instance-id-reclaim-loop", func(ctx context.Context) { ctx, cancel := stopper.WithCancelOnQuiesce(ctx) defer cancel() @@ -411,7 +410,18 @@ func (s *Storage) RunInstanceIDReclaimLoop( return case <-timer.Ch(): timer.MarkRead() - if err := s.generateAvailableInstanceRows(ctx, sessionExpirationFn()); err != nil { + + // Mark instances that belong to expired sessions as available + // and delete surplus IDs. Cleaning up surplus IDs is necessary + // to avoid ID exhaustion. + for _, region := range regions { + if err := s.reclaimRegion(ctx, region); err != nil { + log.Warningf(ctx, "failed to reclaim instances in region '%v': %v", region, err) + } + } + + // Allocate new ids regions that do not have enough pre-allocated sql instances. + if err := s.generateAvailableInstanceRows(ctx, regions, sessionExpirationFn()); err != nil { log.Warningf(ctx, "failed to generate available instance rows: %v", err) } } @@ -430,44 +440,85 @@ func (s *Storage) RunInstanceIDReclaimLoop( // insufficient. One global KV read and write would be sufficient for **all** // regions. func (s *Storage) generateAvailableInstanceRows( - ctx context.Context, sessionExpiration hlc.Timestamp, + ctx context.Context, regions [][]byte, sessionExpiration hlc.Timestamp, ) error { ctx = multitenant.WithTenantCostControlExemption(ctx) - // We don't care about the results here. - _, _, err := s.reclaimGroup.Do("reclaim-instance-ids", func() (interface{}, error) { - err := s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - // Set the transaction deadline to the session expiration to ensure - // transaction commits before the session expires. - err := txn.UpdateDeadline(ctx, sessionExpiration) - if err != nil { - return err - } + target := int(PreallocatedCount.Get(&s.settings.SV)) + return s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { + instances, err := s.getInstanceRows(ctx, nil /*global*/, txn, lock.WaitPolicy_Block) + if err != nil { + return err + } - // Fetch IDs to claim and delete. - toClaim, toDelete, err := s.idsToReclaim(ctx, txn) + b := txn.NewBatch() + for _, row := range idsToAllocate(target, regions, instances) { + value, err := s.rowcodec.encodeAvailableValue() if err != nil { - return err + return errors.Wrapf(err, "failed to encode row for instance id %d", row.instanceID) } + b.Put(s.rowcodec.encodeKey(row.region, row.instanceID), value) + } + return txn.CommitInBatch(ctx, b) + }) +} - b := txn.NewBatch() - for _, instanceID := range toClaim { - key := s.rowcodec.encodeKey(instanceID) - value, err := s.rowcodec.encodeValue("", sqlliveness.SessionID([]byte{}), roachpb.Locality{}) - if err != nil { - log.Warningf(ctx, "failed to encode row for instance id %d: %v", instanceID, err) - return err - } - b.Put(key, value) - } - for _, instanceID := range toDelete { - key := s.rowcodec.encodeKey(instanceID) - b.Del(key) +// idsToReclaim determines which instance rows with sessions should be +// reclaimed and which surplus instances should be deleted. +func idsToReclaim( + target int, instances []instancerow, isExpired map[sqlliveness.SessionID]bool, +) (toReclaim []base.SQLInstanceID, toDelete []base.SQLInstanceID) { + available := 0 + for _, instance := range instances { + free := instance.isAvailable() || isExpired[instance.sessionID] + switch { + case !free: + /* skip since it is in use */ + case target <= available: + toDelete = append(toDelete, instance.instanceID) + case instance.isAvailable(): + available += 1 + case isExpired[instance.sessionID]: + available += 1 + toReclaim = append(toReclaim, instance.instanceID) + } + } + return toReclaim, toDelete +} + +// idsToAllocate inspects the allocated instances and determines which IDs +// should be allocated. It avoids any ID that is present in the existing +// instances. It only allocates for the passed in regions. +func idsToAllocate( + target int, regions [][]byte, instances []instancerow, +) (toAllocate []instancerow) { + availablePerRegion := map[string]int{} + existingIDs := map[base.SQLInstanceID]struct{}{} + for _, row := range instances { + existingIDs[row.instanceID] = struct{}{} + if row.isAvailable() { + availablePerRegion[string(row.region)] += 1 + } + } + + lastID := base.SQLInstanceID(0) + nextID := func() base.SQLInstanceID { + for { + lastID += 1 + _, exists := existingIDs[lastID] + if !exists { + return lastID } - return txn.CommitInBatch(ctx, b) - }) - return nil, err - }) - return err + } + } + + for _, region := range regions { + available := availablePerRegion[string(region)] + for i := 0; i+available < target; i++ { + toAllocate = append(toAllocate, instancerow{region: region, instanceID: nextID()}) + } + } + + return toAllocate } // jitteredInterval returns a randomly jittered (+/-15%) duration. diff --git a/pkg/sql/sqlinstance/instancestorage/instancestorage_internal_test.go b/pkg/sql/sqlinstance/instancestorage/instancestorage_internal_test.go index 5a8451711098..94a488a45ec9 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancestorage_internal_test.go +++ b/pkg/sql/sqlinstance/instancestorage/instancestorage_internal_test.go @@ -13,7 +13,6 @@ package instancestorage import ( "context" gosql "database/sql" - "fmt" "sort" "strings" "testing" @@ -26,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/enum" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness/slstorage" @@ -36,9 +36,18 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/stretchr/testify/require" ) +func makeSession() sqlliveness.SessionID { + session, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4()) + if err != nil { + panic(err) + } + return session +} + func TestGetAvailableInstanceIDForRegion(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -47,11 +56,19 @@ func TestGetAvailableInstanceIDForRegion(t *testing.T) { s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) defer s.Stopper().Stop(ctx) + getAvailableInstanceID := func(storage *Storage, region []byte) (id base.SQLInstanceID, err error) { + err = storage.db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error { + id, err = storage.getAvailableInstanceIDForRegion(ctx, region, txn) + return err + }) + return + } + t.Run("no rows", func(t *testing.T) { stopper, storage, _, _ := setup(t, sqlDB, kvDB, s.ClusterSettings()) defer stopper.Stop(ctx) - id, err := storage.getAvailableInstanceIDForRegion(ctx, storage.db) + id, err := getAvailableInstanceID(storage, nil) require.Error(t, err, errNoPreallocatedRows.Error()) require.Equal(t, base.SQLInstanceID(0), id) }) @@ -62,13 +79,15 @@ func TestGetAvailableInstanceIDForRegion(t *testing.T) { defer stopper.Stop(ctx) // Pre-allocate four instances. + region := enum.One instanceIDs := [...]base.SQLInstanceID{4, 3, 2, 1} addresses := [...]string{"addr4", "addr3", "addr2", "addr1"} - sessionIDs := [...]sqlliveness.SessionID{"session4", "session3", "session2", "session1"} + sessionIDs := [...]sqlliveness.SessionID{makeSession(), makeSession(), makeSession(), makeSession()} sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0) for _, id := range instanceIDs { require.NoError(t, storage.CreateInstanceDataForTest( ctx, + region, id, "", sqlliveness.SessionID([]byte{}), @@ -79,168 +98,189 @@ func TestGetAvailableInstanceIDForRegion(t *testing.T) { // Take instance 4. 1 should be prioritized. claim(ctx, t, instanceIDs[0], addresses[0], sessionIDs[0], sessionExpiry, storage, slStorage) - id, err := storage.getAvailableInstanceIDForRegion(ctx, storage.db) + id, err := getAvailableInstanceID(storage, region) require.NoError(t, err) require.Equal(t, base.SQLInstanceID(1), id) // Take instance 1 and 2. 3 should be prioritized. claim(ctx, t, instanceIDs[2], addresses[2], sessionIDs[2], sessionExpiry, storage, slStorage) claim(ctx, t, instanceIDs[3], addresses[3], sessionIDs[3], sessionExpiry, storage, slStorage) - id, err = storage.getAvailableInstanceIDForRegion(ctx, storage.db) + id, err = getAvailableInstanceID(storage, region) require.NoError(t, err) require.Equal(t, base.SQLInstanceID(3), id) // Take instance 3. No rows left. claim(ctx, t, instanceIDs[1], addresses[1], sessionIDs[1], sessionExpiry, storage, slStorage) - id, err = storage.getAvailableInstanceIDForRegion(ctx, storage.db) + id, err = getAvailableInstanceID(storage, region) require.Error(t, err, errNoPreallocatedRows.Error()) require.Equal(t, base.SQLInstanceID(0), id) // Make instance 3 and 4 expire. 3 should be prioritized. require.NoError(t, slStorage.Delete(ctx, sessionIDs[0])) require.NoError(t, slStorage.Delete(ctx, sessionIDs[1])) - id, err = storage.getAvailableInstanceIDForRegion(ctx, storage.db) + id, err = getAvailableInstanceID(storage, region) require.NoError(t, err) require.Equal(t, base.SQLInstanceID(3), id) }) } -func TestIDsToReclaim(t *testing.T) { +func TestIdsToAllocate(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - ctx := context.Background() - s, sqlDB, kvDB := serverutils.StartServer(t, base.TestServerArgs{}) - defer s.Stopper().Stop(ctx) + type testCase struct { + name string + target int + regions [][]byte + instances []instancerow + toAllocate []instancerow + } - const preallocatedCount = 5 - PreallocatedCount.Override(ctx, &s.ClusterSettings().SV, preallocatedCount) + nonEmptySession := makeSession() + regions := [][]byte{{0}, {1}, {2}, {3}, {4}} - sortAsc := func(ids []base.SQLInstanceID) { - sort.SliceStable(ids, func(i, j int) bool { - return ids[i] < ids[j] + for _, tc := range []testCase{ + { + name: "initial-allocation", + target: 2, + regions: [][]byte{regions[1], regions[2], regions[3]}, + toAllocate: []instancerow{ + {region: regions[1], instanceID: 1}, + {region: regions[1], instanceID: 2}, + {region: regions[2], instanceID: 3}, + {region: regions[2], instanceID: 4}, + {region: regions[3], instanceID: 5}, + {region: regions[3], instanceID: 6}, + }, + }, + { + name: "partial-allocation", + target: 2, + regions: [][]byte{regions[1], regions[2], regions[3]}, + instances: []instancerow{ + {region: regions[1], instanceID: 1, sessionID: nonEmptySession}, + {region: regions[1], instanceID: 2}, + /* 3 is a hole */ + {region: regions[2], instanceID: 4, sessionID: nonEmptySession}, + /* 5 and 6 are holes */ + {region: regions[2], instanceID: 7, sessionID: nonEmptySession}, + }, + toAllocate: []instancerow{ + {region: regions[1], instanceID: 3}, + {region: regions[2], instanceID: 5}, + {region: regions[2], instanceID: 6}, + {region: regions[3], instanceID: 8}, + {region: regions[3], instanceID: 9}, + }, + }, + } { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.toAllocate, idsToAllocate(tc.target, tc.regions, tc.instances)) }) } +} - t.Run("no rows", func(t *testing.T) { - stopper, storage, _, _ := setup(t, sqlDB, kvDB, s.ClusterSettings()) - defer stopper.Stop(ctx) - - toClaim, toDelete, err := storage.idsToReclaim(ctx, storage.db) - require.NoError(t, err) - require.Len(t, toClaim, preallocatedCount) - require.Equal(t, []base.SQLInstanceID{1, 2, 3, 4, 5}, toClaim) - require.Len(t, toDelete, 0) - }) - - t.Run("nothing to reclaim", func(t *testing.T) { - const expiration = time.Minute - stopper, storage, _, clock := setup(t, sqlDB, kvDB, s.ClusterSettings()) - defer stopper.Stop(ctx) - - // Pre-allocate two instances. - instanceIDs := [...]base.SQLInstanceID{1, 2, 3, 4, 5} - sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0) - for _, id := range instanceIDs { - require.NoError(t, storage.CreateInstanceDataForTest( - ctx, - id, - "", - sqlliveness.SessionID([]byte{}), - sessionExpiry, - roachpb.Locality{}, - )) - } - - toClaim, toDelete, err := storage.idsToReclaim(ctx, storage.db) - require.NoError(t, err) - require.Len(t, toClaim, 0) - require.Len(t, toDelete, 0) - }) - - t.Run("preallocated rows", func(t *testing.T) { - const expiration = time.Minute - stopper, storage, slStorage, clock := setup(t, sqlDB, kvDB, s.ClusterSettings()) - defer stopper.Stop(ctx) - - // Pre-allocate two instances. - instanceIDs := [...]base.SQLInstanceID{5, 2} - addresses := [...]string{"addr5", "addr2"} - sessionIDs := [...]sqlliveness.SessionID{"session5", "session2"} - sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0) - for _, id := range instanceIDs { - require.NoError(t, storage.CreateInstanceDataForTest( - ctx, - id, - "", - sqlliveness.SessionID([]byte{}), - sessionExpiry, - roachpb.Locality{}, - )) - } - - toClaim, toDelete, err := storage.idsToReclaim(ctx, storage.db) - sortAsc(toClaim) - require.NoError(t, err) - require.Len(t, toClaim, 3) - require.Equal(t, []base.SQLInstanceID{1, 3, 4}, toClaim) - require.Len(t, toDelete, 0) - - // Take instance 5. - claim(ctx, t, instanceIDs[0], addresses[0], sessionIDs[0], sessionExpiry, storage, slStorage) - toClaim, toDelete, err = storage.idsToReclaim(ctx, storage.db) - sortAsc(toClaim) - require.NoError(t, err) - require.Len(t, toClaim, 4) - require.Equal(t, []base.SQLInstanceID{1, 3, 4, 6}, toClaim) - require.Len(t, toDelete, 0) +func TestIdsToReclaim(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) - // Take instance 2. - claim(ctx, t, instanceIDs[1], addresses[1], sessionIDs[1], sessionExpiry, storage, slStorage) - toClaim, toDelete, err = storage.idsToReclaim(ctx, storage.db) - sortAsc(toClaim) - require.NoError(t, err) - require.Len(t, toClaim, 5) - require.Equal(t, []base.SQLInstanceID{1, 3, 4, 6, 7}, toClaim) - require.Len(t, toDelete, 0) + type instance struct { + id int + session sqlliveness.SessionID + dead bool + } - // Make instance 5 expire. - require.NoError(t, slStorage.Delete(ctx, sessionIDs[0])) - toClaim, toDelete, err = storage.idsToReclaim(ctx, storage.db) - sortAsc(toClaim) - require.NoError(t, err) - require.Len(t, toClaim, 5) - require.Equal(t, []base.SQLInstanceID{1, 3, 4, 5, 6}, toClaim) - require.Len(t, toDelete, 0) - }) + type testCase struct { + name string + target int + instances []instance + toReclaim []base.SQLInstanceID + toDelete []base.SQLInstanceID + } - t.Run("too many expired rows", func(t *testing.T) { - const expiration = time.Minute - stopper, storage, _, clock := setup(t, sqlDB, kvDB, s.ClusterSettings()) - defer stopper.Stop(ctx) + sessions := make([]sqlliveness.SessionID, 10) + for i := range sessions { + sessions[i] = makeSession() + } - // Pre-allocate 10 instances. - instanceIDs := [...]base.SQLInstanceID{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} - sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0) - for _, id := range instanceIDs { - require.NoError(t, storage.CreateInstanceDataForTest( - ctx, - id, - fmt.Sprintf("addr%d", id), - sqlliveness.SessionID([]byte(fmt.Sprintf("session%d", id))), - sessionExpiry, - roachpb.Locality{}, - )) - } + for _, tc := range []testCase{ + { + name: "reclaim-and-delete", + target: 2, + instances: []instance{ + {id: 1, session: sessions[1], dead: true}, + {id: 2, session: sessions[2], dead: true}, + {id: 3, session: sessions[3], dead: true}, + {id: 4, session: sessions[4]}, + {id: 5, session: ""}, + }, + toReclaim: []base.SQLInstanceID{1, 2}, + toDelete: []base.SQLInstanceID{3, 5}, + }, + { + name: "reclaim", + target: 5, + instances: []instance{ + {id: 1, session: sessions[1], dead: true}, + {id: 2, session: sessions[2], dead: true}, + {id: 3, session: sessions[3], dead: true}, + {id: 4, session: sessions[4]}, + {id: 5, session: ""}, + }, + toReclaim: []base.SQLInstanceID{1, 2, 3}, + }, + { + name: "delete", + target: 2, + instances: []instance{ + {id: 1}, + {id: 2}, + {id: 3}, + {id: 4}, + }, + toDelete: []base.SQLInstanceID{3, 4}, + }, + { + name: "all-in-use", + target: 5, + instances: []instance{ + {id: 1, session: sessions[1]}, + {id: 2, session: sessions[2]}, + {id: 3, session: sessions[3]}, + {id: 4, session: sessions[4]}, + }, + }, + { + name: "all-pre-allocated", + target: 5, + instances: []instance{ + {id: 1}, + {id: 2}, + {id: 3}, + {id: 4}, + {id: 5}, + }, + }, + { + name: "empty", + target: 5, + }, + } { + t.Run(tc.name, func(t *testing.T) { + instances := make([]instancerow, len(tc.instances)) + isExpired := map[sqlliveness.SessionID]bool{} + for i, instance := range tc.instances { + instances[i] = instancerow{instanceID: base.SQLInstanceID(instance.id), sessionID: instance.session} + if instance.session != "" && instance.dead { + isExpired[instance.session] = true + } + } - toClaim, toDelete, err := storage.idsToReclaim(ctx, storage.db) - sortAsc(toClaim) - require.NoError(t, err) - require.Len(t, toClaim, 5) - require.Equal(t, []base.SQLInstanceID{1, 2, 3, 4, 5}, toClaim) - require.Len(t, toDelete, 5) - require.Equal(t, []base.SQLInstanceID{6, 7, 8, 9, 10}, toDelete) - }) + toReclaim, toDelete := idsToReclaim(tc.target, instances, isExpired) + require.Equal(t, tc.toReclaim, toReclaim) + require.Equal(t, tc.toDelete, toDelete) + }) + } } func TestGenerateAvailableInstanceRows(t *testing.T) { @@ -255,13 +295,15 @@ func TestGenerateAvailableInstanceRows(t *testing.T) { const preallocatedCount = 5 PreallocatedCount.Override(ctx, &s.ClusterSettings().SV, preallocatedCount) + regions := [][]byte{enum.One} + t.Run("nothing preallocated", func(t *testing.T) { stopper, storage, _, clock := setup(t, sqlDB, kvDB, s.ClusterSettings()) defer stopper.Stop(ctx) sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0) - require.NoError(t, storage.generateAvailableInstanceRows(ctx, sessionExpiry)) + require.NoError(t, storage.generateAvailableInstanceRows(ctx, regions, sessionExpiry)) instances, err := storage.GetAllInstancesDataForTest(ctx) sortInstancesForTest(instances) @@ -280,14 +322,16 @@ func TestGenerateAvailableInstanceRows(t *testing.T) { sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0) + region := enum.One instanceIDs := [...]base.SQLInstanceID{1, 3, 5, 8} addresses := [...]string{"addr1", "addr3", "addr5", "addr8"} - sessionIDs := [...]sqlliveness.SessionID{"session1", "session3", "session5", "session8"} + sessionIDs := [...]sqlliveness.SessionID{makeSession(), makeSession(), makeSession(), makeSession()} // Preallocate first two, and claim the other two (have not expired) - for i := 0; i < 2; i++ { + for _, i := range []int{0, 1} { require.NoError(t, storage.CreateInstanceDataForTest( ctx, + region, instanceIDs[i], "", sqlliveness.SessionID([]byte{}), @@ -295,17 +339,17 @@ func TestGenerateAvailableInstanceRows(t *testing.T) { roachpb.Locality{}, )) } - for i := 2; i < 4; i++ { + for _, i := range []int{2, 3} { claim(ctx, t, instanceIDs[i], addresses[i], sessionIDs[i], sessionExpiry, storage, slStorage) } // Generate available rows. - require.NoError(t, storage.generateAvailableInstanceRows(ctx, sessionExpiry)) + require.NoError(t, storage.generateAvailableInstanceRows(ctx, regions, sessionExpiry)) instances, err := storage.GetAllInstancesDataForTest(ctx) sortInstancesForTest(instances) require.NoError(t, err) - require.Equal(t, preallocatedCount+2, len(instances)) + require.Equal(t, preallocatedCount+2, len(instances), "instances: %+v", instances) { var foundIDs []base.SQLInstanceID for i := 0; i < preallocatedCount; i++ { @@ -327,12 +371,12 @@ func TestGenerateAvailableInstanceRows(t *testing.T) { } // Delete the expired rows. - require.NoError(t, storage.generateAvailableInstanceRows(ctx, sessionExpiry)) + require.NoError(t, storage.reclaimRegion(ctx, region)) instances, err = storage.GetAllInstancesDataForTest(ctx) sortInstancesForTest(instances) require.NoError(t, err) - require.Equal(t, preallocatedCount, len(instances)) + require.Equal(t, preallocatedCount, len(instances), "instances: %+v", instances) { var foundIDs []base.SQLInstanceID for i := 0; i < preallocatedCount; i++ { @@ -340,7 +384,7 @@ func TestGenerateAvailableInstanceRows(t *testing.T) { require.Empty(t, instances[i].SessionID) require.Empty(t, instances[i].InstanceAddr) } - require.Equal(t, []base.SQLInstanceID{1, 2, 3, 4, 6}, foundIDs) + require.Equal(t, []base.SQLInstanceID{1, 2, 3, 4, 5}, foundIDs) } // Claim 1 and 3, and make them expire. @@ -350,7 +394,7 @@ func TestGenerateAvailableInstanceRows(t *testing.T) { } // Should reclaim expired rows. - require.NoError(t, storage.generateAvailableInstanceRows(ctx, sessionExpiry)) + require.NoError(t, storage.reclaimRegion(ctx, region)) instances, err = storage.GetAllInstancesDataForTest(ctx) sortInstancesForTest(instances) @@ -363,7 +407,7 @@ func TestGenerateAvailableInstanceRows(t *testing.T) { require.Empty(t, instances[i].SessionID) require.Empty(t, instances[i].InstanceAddr) } - require.Equal(t, []base.SQLInstanceID{1, 2, 3, 4, 6}, foundIDs) + require.Equal(t, []base.SQLInstanceID{1, 2, 3, 4, 5}, foundIDs) } }) } @@ -430,8 +474,11 @@ func claim( storage *Storage, slStorage *slstorage.FakeStorage, ) { + t.Helper() + region, _, err := slstorage.UnsafeDecodeSessionID(sessionID) + require.NoError(t, err) require.NoError(t, slStorage.Insert(ctx, sessionID, sessionExpiration)) require.NoError(t, storage.CreateInstanceDataForTest( - ctx, instanceID, addr, sessionID, sessionExpiration, roachpb.Locality{}, + ctx, region, instanceID, addr, sessionID, sessionExpiration, roachpb.Locality{}, )) } diff --git a/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go b/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go index f049d47f0355..c1e0ed917ab3 100644 --- a/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go +++ b/pkg/sql/sqlinstance/instancestorage/instancestorage_test.go @@ -26,6 +26,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/enum" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" @@ -39,10 +40,19 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/stop" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/uuid" "github.com/cockroachdb/errors" "github.com/stretchr/testify/require" ) +func makeSession() sqlliveness.SessionID { + session, err := slstorage.MakeSessionID(enum.One, uuid.MakeV4()) + if err != nil { + panic(err) + } + return session +} + // TestStorage verifies that instancestorage stores and retrieves SQL instance data correctly. // Also, it verifies that released instance IDs are correctly updated within the database // and reused for new SQL instances. @@ -78,7 +88,7 @@ func TestStorage(t *testing.T) { stopper, storage, _, clock := setup(t) defer stopper.Stop(ctx) const id = base.SQLInstanceID(1) - const sessionID = sqlliveness.SessionID("session_id") + sessionID := makeSession() const addr = "addr" locality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "test"}, {Key: "az", Value: "a"}}} const expiration = time.Minute @@ -95,9 +105,10 @@ func TestStorage(t *testing.T) { defer stopper.Stop(ctx) // Create three instances and release one. + region := enum.One instanceIDs := [...]base.SQLInstanceID{1, 2, 3, 4, 5} addresses := [...]string{"addr1", "addr2", "addr3", "addr4", "addr5"} - sessionIDs := [...]sqlliveness.SessionID{"session1", "session2", "session3", "session4", "session5"} + sessionIDs := [...]sqlliveness.SessionID{makeSession(), makeSession(), makeSession(), makeSession(), makeSession()} localities := [...]roachpb.Locality{ {Tiers: []roachpb.Tier{{Key: "region", Value: "region1"}}}, {Tiers: []roachpb.Tier{{Key: "region", Value: "region2"}}}, @@ -106,7 +117,7 @@ func TestStorage(t *testing.T) { {Tiers: []roachpb.Tier{{Key: "region", Value: "region5"}}}, } sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0) - for index := 0; index < 3; index++ { + for _, index := range []int{0, 1, 2} { instanceID, err := storage.CreateInstance(ctx, sessionIDs[index], sessionExpiry, addresses[index], localities[index]) require.NoError(t, err) require.NoError(t, slStorage.Insert(ctx, sessionIDs[index], sessionExpiry)) @@ -119,13 +130,13 @@ func TestStorage(t *testing.T) { sortInstances(instances) require.NoError(t, err) require.Equal(t, preallocatedCount, len(instances)) - for i := 0; i < 3; i++ { + for _, i := range []int{0, 1, 2} { require.Equal(t, instanceIDs[i], instances[i].InstanceID) require.Equal(t, sessionIDs[i], instances[i].SessionID) require.Equal(t, addresses[i], instances[i].InstanceAddr) require.Equal(t, localities[i], instances[i].Locality) } - for i := 3; i < 5; i++ { + for _, i := range []int{3, 4} { require.Equal(t, base.SQLInstanceID(i+1), instances[i].InstanceID) require.Empty(t, instances[i].SessionID) require.Empty(t, instances[i].InstanceAddr) @@ -134,7 +145,7 @@ func TestStorage(t *testing.T) { } // Create two more instances. - for index := 3; index < 5; index++ { + for _, index := range []int{3, 4} { instanceID, err := storage.CreateInstance(ctx, sessionIDs[index], sessionExpiry, addresses[index], localities[index]) require.NoError(t, err) require.NoError(t, slStorage.Insert(ctx, sessionIDs[index], sessionExpiry)) @@ -147,7 +158,7 @@ func TestStorage(t *testing.T) { sortInstances(instances) require.NoError(t, err) require.Equal(t, preallocatedCount, len(instances)) - for i := 0; i < 5; i++ { + for i := range instances { require.Equal(t, instanceIDs[i], instances[i].InstanceID) require.Equal(t, sessionIDs[i], instances[i].SessionID) require.Equal(t, addresses[i], instances[i].InstanceAddr) @@ -157,7 +168,7 @@ func TestStorage(t *testing.T) { // Release an instance and verify all instances are returned. { - require.NoError(t, storage.ReleaseInstanceID(ctx, instanceIDs[0])) + require.NoError(t, storage.ReleaseInstanceID(ctx, region, instanceIDs[0])) instances, err := storage.GetAllInstancesDataForTest(ctx) require.NoError(t, err) require.Equal(t, preallocatedCount-1, len(instances)) @@ -171,15 +182,15 @@ func TestStorage(t *testing.T) { } // Verify instance ID associated with an expired session gets reused. + sessionID6 := makeSession() + addr6 := "addr6" + locality6 := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "region6"}}} { require.NoError(t, slStorage.Delete(ctx, sessionIDs[4])) var err error var instanceID base.SQLInstanceID - newSessionID := sqlliveness.SessionID("session6") - newAddr := "addr6" - newLocality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "region6"}}} - require.NoError(t, slStorage.Insert(ctx, newSessionID, sessionExpiry)) - instanceID, err = storage.CreateInstance(ctx, newSessionID, sessionExpiry, newAddr, newLocality) + require.NoError(t, slStorage.Insert(ctx, sessionID6, sessionExpiry)) + instanceID, err = storage.CreateInstance(ctx, sessionID6, sessionExpiry, addr6, locality6) require.NoError(t, err) require.Equal(t, instanceIDs[4], instanceID) var instances []sqlinstance.InstanceInfo @@ -191,9 +202,9 @@ func TestStorage(t *testing.T) { for index, instance := range instances { foundIDs = append(foundIDs, instance.InstanceID) if index == 3 { - require.Equal(t, newSessionID, instance.SessionID) - require.Equal(t, newAddr, instance.InstanceAddr) - require.Equal(t, newLocality, instance.Locality) + require.Equal(t, sessionID6, instance.SessionID) + require.Equal(t, addr6, instance.InstanceAddr) + require.Equal(t, locality6, instance.Locality) continue } require.Equal(t, sessionIDs[index+1], instance.SessionID) @@ -207,7 +218,7 @@ func TestStorage(t *testing.T) { { var err error var instanceID base.SQLInstanceID - newSessionID := sqlliveness.SessionID("session7") + newSessionID := makeSession() newAddr := "addr7" newLocality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "region7"}}} instanceID, err = storage.CreateInstance(ctx, newSessionID, sessionExpiry, newAddr, newLocality) @@ -226,12 +237,9 @@ func TestStorage(t *testing.T) { require.Equal(t, newAddr, instances[index].InstanceAddr) require.Equal(t, newLocality, instances[index].Locality) case 4: - newSessionID := sqlliveness.SessionID("session6") - newAddr := "addr6" - newLocality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "region6"}}} - require.Equal(t, newSessionID, instances[index].SessionID) - require.Equal(t, newAddr, instances[index].InstanceAddr) - require.Equal(t, newLocality, instances[index].Locality) + require.Equal(t, sessionID6, instances[index].SessionID) + require.Equal(t, addr6, instances[index].InstanceAddr) + require.Equal(t, locality6, instances[index].Locality) default: require.Equal(t, sessionIDs[index], instances[index].SessionID) require.Equal(t, addresses[index], instances[index].InstanceAddr) @@ -271,13 +279,13 @@ func TestSQLAccess(t *testing.T) { storage := instancestorage.NewTestingStorage( kvDB, keys.SystemSQLCodec, tableID, slstorage.NewFakeStorage(), s.ClusterSettings()) const ( - sessionID = sqlliveness.SessionID("session") addr = "addr" tierStr = "region=test1,zone=test2" localityStr = "{\"Tiers\": \"" + tierStr + "\"}" expiration = time.Minute expectedNumCols = 4 ) + sessionID := makeSession() var locality roachpb.Locality require.NoError(t, locality.Set(tierStr)) instanceID, err := storage.CreateInstance(ctx, sessionID, clock.Now().Add(expiration.Nanoseconds(), 0), addr, locality) @@ -343,10 +351,10 @@ func TestConcurrentCreateAndRelease(t *testing.T) { runsPerWorker = 100 workers = 100 controllerSteps = 100 - sessionID = sqlliveness.SessionID("session") addr = "addr" expiration = time.Minute ) + sessionID := makeSession() locality := roachpb.Locality{Tiers: []roachpb.Tier{{Key: "region", Value: "test-region"}}} sessionExpiry := clock.Now().Add(expiration.Nanoseconds(), 0) err := slStorage.Insert(ctx, sessionID, sessionExpiry) @@ -354,7 +362,8 @@ func TestConcurrentCreateAndRelease(t *testing.T) { t.Fatal(err) } var ( - state = struct { + region = enum.One + state = struct { syncutil.RWMutex liveInstances map[base.SQLInstanceID]struct{} freeInstances map[base.SQLInstanceID]struct{} @@ -396,7 +405,7 @@ func TestConcurrentCreateAndRelease(t *testing.T) { if i == -1 { return } - require.NoError(t, storage.ReleaseInstanceID(ctx, i)) + require.NoError(t, storage.ReleaseInstanceID(ctx, region, i)) state.freeInstances[i] = struct{}{} delete(state.liveInstances, i) } @@ -424,7 +433,7 @@ func TestConcurrentCreateAndRelease(t *testing.T) { t.Helper() state.RLock() defer state.RUnlock() - instanceInfo, err := storage.GetInstanceDataForTest(ctx, i) + instanceInfo, err := storage.GetInstanceDataForTest(ctx, region, i) if _, free := state.freeInstances[i]; free { require.Error(t, err) require.ErrorIs(t, err, sqlinstance.NonExistentInstanceError) @@ -539,9 +548,10 @@ func TestReclaimLoop(t *testing.T) { } // Consume two rows. + region := enum.One instanceIDs := [...]base.SQLInstanceID{1, 2} addresses := [...]string{"addr1", "addr2"} - sessionIDs := [...]sqlliveness.SessionID{"session1", "session2"} + sessionIDs := [...]sqlliveness.SessionID{makeSession(), makeSession()} localities := [...]roachpb.Locality{ {Tiers: []roachpb.Tier{{Key: "region", Value: "region1"}}}, {Tiers: []roachpb.Tier{{Key: "region", Value: "region2"}}}, @@ -550,6 +560,7 @@ func TestReclaimLoop(t *testing.T) { require.NoError(t, slStorage.Insert(ctx, sessionIDs[i], sessionExpiry)) require.NoError(t, storage.CreateInstanceDataForTest( ctx, + region, id, addresses[i], sessionIDs[i], diff --git a/pkg/sql/sqlinstance/instancestorage/row_codec.go b/pkg/sql/sqlinstance/instancestorage/row_codec.go index 67c27ae86bef..20e3cbe9f778 100644 --- a/pkg/sql/sqlinstance/instancestorage/row_codec.go +++ b/pkg/sql/sqlinstance/instancestorage/row_codec.go @@ -11,39 +11,143 @@ package instancestorage import ( + "bytes" + "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" - "github.com/cockroachdb/cockroach/pkg/sql/rowenc" + "github.com/cockroachdb/cockroach/pkg/sql/enum" "github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlliveness" - "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/json" "github.com/cockroachdb/errors" ) +type keyCodec interface { + // makeIndexPrefix returns a roachpb.Key that is the prefix for all encoded + // keys and can be used to scan the entire table. + makeIndexPrefix() roachpb.Key + + // makeRegionPrefix returns a roachpb.Key that is the prefix for all keys + // in the region and can be used to scan the region. + makeRegionPrefix(region []byte) roachpb.Key + + // encodeKey makes a key for the sql_instance table. + encodeKey(region []byte, id base.SQLInstanceID) roachpb.Key + + // decodeKey decodes a sql_instance table key into its logical components. + decodeKey(key roachpb.Key) (region []byte, id base.SQLInstanceID, err error) +} + // rowCodec encodes/decodes rows from the sql_instances table. type rowCodec struct { + keyCodec codec keys.SQLCodec columns []catalog.Column decoder valueside.Decoder tableID descpb.ID } +// rbrKeyCodec is used by the regional by row compatible sql_instances index format. +type rbrKeyCodec struct { + indexPrefix roachpb.Key +} + +func (c rbrKeyCodec) makeIndexPrefix() roachpb.Key { + return c.indexPrefix.Clone() +} + +func (c rbrKeyCodec) makeRegionPrefix(region []byte) roachpb.Key { + return encoding.EncodeBytesAscending(c.indexPrefix.Clone(), region) +} + +func (c rbrKeyCodec) encodeKey(region []byte, instanceID base.SQLInstanceID) roachpb.Key { + key := c.makeRegionPrefix(region) + key = encoding.EncodeVarintAscending(key, int64(instanceID)) + key = keys.MakeFamilyKey(key, 0) + return key +} + +func (c rbrKeyCodec) decodeKey(key roachpb.Key) (region []byte, id base.SQLInstanceID, err error) { + if !bytes.HasPrefix(key, c.indexPrefix) { + return nil, 0, errors.Newf("sql_instances table key has an invalid prefix: %v", key) + } + rem := key[len(c.indexPrefix):] + + rem, region, err = encoding.DecodeBytesAscending(rem, nil) + if err != nil { + return nil, 0, errors.Newf("failed to decode region from sql_instances key: %v", key) + } + + _, rawID, err := encoding.DecodeVarintAscending(rem) + if err != nil { + return nil, 0, errors.Wrapf(err, "failed to decode sql instance id from key: %v", key) + } + + return region, base.SQLInstanceID(rawID), nil +} + +// rbtKeyCodec is used by the legacy sql_instances index format. +type rbtKeyCodec struct { + indexPrefix roachpb.Key + codec keys.SQLCodec +} + +func (c rbtKeyCodec) makeIndexPrefix() roachpb.Key { + return c.indexPrefix.Clone() +} + +func (c rbtKeyCodec) makeRegionPrefix(region []byte) roachpb.Key { + return c.indexPrefix.Clone() +} + +func (c *rbtKeyCodec) encodeKey(_ []byte, instanceID base.SQLInstanceID) roachpb.Key { + key := c.indexPrefix.Clone() + key = encoding.EncodeVarintAscending(key, int64(instanceID)) + return keys.MakeFamilyKey(key, 0) +} + +func (c *rbtKeyCodec) decodeKey(key roachpb.Key) (region []byte, id base.SQLInstanceID, err error) { + if !bytes.HasPrefix(key, c.indexPrefix) { + return nil, 0, errors.Newf("sql_instances table key has an invalid prefix: %v", key) + } + rem := key[len(c.indexPrefix):] + + _, rawID, err := encoding.DecodeVarintAscending(rem) + if err != nil { + return nil, 0, errors.Wrapf(err, "failed to decode sql instance id from key: %v", key) + } + + return enum.One, base.SQLInstanceID(rawID), nil +} + // MakeRowCodec makes a new rowCodec for the sql_instances table. func makeRowCodec(codec keys.SQLCodec, tableID descpb.ID) rowCodec { - columns := systemschema.SQLInstancesTable.PublicColumns() + columns := systemschema.SQLInstancesTable().PublicColumns() + + var key keyCodec + if systemschema.TestSupportMultiRegion() { + key = &rbrKeyCodec{ + indexPrefix: codec.IndexPrefix(uint32(tableID), 2), + } + } else { + key = &rbtKeyCodec{ + indexPrefix: codec.IndexPrefix(uint32(tableID), 1), + } + } + return rowCodec{ - codec: codec, - columns: columns, - decoder: valueside.MakeDecoder(columns), - tableID: tableID, + keyCodec: key, + codec: codec, + columns: columns, + decoder: valueside.MakeDecoder(columns), + tableID: tableID, } } @@ -51,15 +155,16 @@ func makeRowCodec(codec keys.SQLCodec, tableID descpb.ID) rowCodec { // or uninitialized. If it is, the fields stored in the value will be left with // their default values. func (d *rowCodec) decodeRow(key roachpb.Key, value *roachpb.Value) (instancerow, error) { - instanceID, err := d.decodeKey(key) + region, instanceID, err := d.decodeKey(key) if err != nil { return instancerow{}, err } r := instancerow{ + region: region, instanceID: instanceID, } - if value == nil || !value.IsPresent() { + if !value.IsPresent() { return r, nil } @@ -71,19 +176,6 @@ func (d *rowCodec) decodeRow(key roachpb.Key, value *roachpb.Value) (instancerow return r, nil } -// makeIndexPrefix returns a roachpb.Key that is the prefix for all encoded -// keys and can be used to scan the entire table. -func (d *rowCodec) makeIndexPrefix() roachpb.Key { - return d.codec.IndexPrefix(uint32(d.tableID), 1) -} - -// encodeKey converts the instanceID into an encoded key for the table. -func (d *rowCodec) encodeKey(instanceID base.SQLInstanceID) roachpb.Key { - key := d.makeIndexPrefix() - key = encoding.EncodeVarintAscending(key, int64(instanceID)) - return keys.MakeFamilyKey(key, 0) -} - // encodeValue encodes the sql_instance columns into a kv value. func (d *rowCodec) encodeValue( addr string, sessionID sqlliveness.SessionID, locality roachpb.Locality, @@ -128,19 +220,12 @@ func (d *rowCodec) encodeValue( return v, nil } -// decodeKey decodes a sql_instance key into its logical components. -func (d *rowCodec) decodeKey(key roachpb.Key) (base.SQLInstanceID, error) { - types := []*types.T{d.columns[0].GetType()} - row := make([]rowenc.EncDatum, 1) - _, _, err := rowenc.DecodeIndexKey(d.codec, types, row, nil, key) +func (d *rowCodec) encodeAvailableValue() (*roachpb.Value, error) { + value, err := d.encodeValue("", sqlliveness.SessionID([]byte{}), roachpb.Locality{}) if err != nil { - return base.SQLInstanceID(0), errors.Wrap(err, "failed to decode key") - } - var alloc tree.DatumAlloc - if err := row[0].EnsureDecoded(types[0], &alloc); err != nil { - return base.SQLInstanceID(0), err + return nil, errors.Wrap(err, "failed to encode available sql_instances value") } - return base.SQLInstanceID(tree.MustBeDInt(row[0].Datum)), nil + return value, nil } // decodeRow decodes a row of the sql_instances table. diff --git a/pkg/sql/sqlinstance/instancestorage/row_codec_test.go b/pkg/sql/sqlinstance/instancestorage/row_codec_test.go new file mode 100644 index 000000000000..90d1a2a5884b --- /dev/null +++ b/pkg/sql/sqlinstance/instancestorage/row_codec_test.go @@ -0,0 +1,108 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package instancestorage + +import ( + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/systemschema" + "github.com/cockroachdb/cockroach/pkg/sql/enum" + "github.com/cockroachdb/cockroach/pkg/util/encoding" + "github.com/cockroachdb/cockroach/pkg/util/envutil" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +const ( + tableID = 42 + tenantID = 1337 +) + +func TestRowCodec(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + tenantID, err := roachpb.MakeTenantID(tenantID) + require.NoError(t, err) + codec := keys.MakeSQLCodec(tenantID) + + t.Run("RegionalByRow", func(t *testing.T) { + defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "1")() + testEncoder(t, makeRowCodec(codec, tableID), tenantID) + }) + t.Run("RegionalByTable", func(t *testing.T) { + defer envutil.TestSetEnv(t, "COCKROACH_MR_SYSTEM_DATABASE", "0")() + testEncoder(t, makeRowCodec(codec, tableID), tenantID) + }) +} + +func testEncoder(t *testing.T, codec rowCodec, expectedID roachpb.TenantID) { + region := []byte{103} /* 103 is an arbitrary value */ + if !systemschema.TestSupportMultiRegion() { + // region is always enum.One if the system database is not configured + // for multi-region. + region = enum.One + } + + t.Run("IndexPrefix", func(t *testing.T) { + prefix := codec.makeIndexPrefix() + + rem, decodedID, err := keys.DecodeTenantPrefix(prefix) + require.NoError(t, err) + require.Equal(t, expectedID, decodedID) + + _, decodedTableID, indexID, err := keys.DecodeTableIDIndexID(rem) + require.NoError(t, err) + require.Equal(t, decodedTableID, uint32(tableID)) + + if systemschema.TestSupportMultiRegion() { + require.Equal(t, indexID, uint32(2)) + } else { + require.Equal(t, indexID, uint32(1)) + } + }) + + t.Run("RegionPrefix", func(t *testing.T) { + prefix := codec.makeRegionPrefix(region) + + rem, decodedID, err := keys.DecodeTenantPrefix(prefix) + require.NoError(t, err) + require.Equal(t, expectedID, decodedID) + + rem, decodedTableID, indexID, err := keys.DecodeTableIDIndexID(rem) + require.NoError(t, err) + require.Equal(t, decodedTableID, uint32(tableID)) + + if systemschema.TestSupportMultiRegion() { + require.Equal(t, indexID, uint32(2)) + _, decodedRegion, err := encoding.DecodeBytesAscending(rem, nil) + require.Equal(t, region, decodedRegion) + require.NoError(t, err) + } else { + require.Equal(t, indexID, uint32(1)) + } + }) + + t.Run("RoundTripKey", func(t *testing.T) { + id := base.SQLInstanceID(42) + key := codec.encodeKey(region, id) + + decodedRegion, decodedID, err := codec.decodeKey(key) + + require.NoError(t, err) + require.Equal(t, decodedID, id) + require.Equal(t, region, decodedRegion) + }) +} diff --git a/pkg/sql/sqlinstance/instancestorage/test_helpers.go b/pkg/sql/sqlinstance/instancestorage/test_helpers.go index 08d84134e9c0..76ffe2651b58 100644 --- a/pkg/sql/sqlinstance/instancestorage/test_helpers.go +++ b/pkg/sql/sqlinstance/instancestorage/test_helpers.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/multitenant" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/sqlinstance" @@ -76,6 +77,7 @@ func (f *FakeStorage) ReleaseInstanceID(_ context.Context, id base.SQLInstanceID // table for testing purposes. func (s *Storage) CreateInstanceDataForTest( ctx context.Context, + region []byte, instanceID base.SQLInstanceID, addr string, sessionID sqlliveness.SessionID, @@ -90,7 +92,7 @@ func (s *Storage) CreateInstanceDataForTest( if err != nil { return err } - key := s.rowcodec.encodeKey(instanceID) + key := s.rowcodec.encodeKey(region, instanceID) value, err := s.rowcodec.encodeValue(addr, sessionID, locality) if err != nil { return err @@ -104,9 +106,9 @@ func (s *Storage) CreateInstanceDataForTest( // GetInstanceDataForTest returns instance data directly from raw storage for // testing purposes. func (s *Storage) GetInstanceDataForTest( - ctx context.Context, instanceID base.SQLInstanceID, + ctx context.Context, region []byte, instanceID base.SQLInstanceID, ) (sqlinstance.InstanceInfo, error) { - k := s.rowcodec.encodeKey(instanceID) + k := s.rowcodec.encodeKey(region, instanceID) ctx = multitenant.WithTenantCostControlExemption(ctx) row, err := s.db.Get(ctx, k) if err != nil { @@ -136,7 +138,7 @@ func (s *Storage) GetAllInstancesDataForTest( var rows []instancerow ctx = multitenant.WithTenantCostControlExemption(ctx) err = s.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error { - rows, err = s.getGlobalInstanceRows(ctx, txn) + rows, err = s.getInstanceRows(ctx, nil /*global*/, txn, lock.WaitPolicy_Block) return err }) if err != nil { diff --git a/pkg/sql/sqlinstance/sqlinstance.go b/pkg/sql/sqlinstance/sqlinstance.go index ac4571903678..68fd090574f4 100644 --- a/pkg/sql/sqlinstance/sqlinstance.go +++ b/pkg/sql/sqlinstance/sqlinstance.go @@ -27,6 +27,7 @@ import ( // InstanceInfo exposes information on a SQL instance such as ID, network // address, the associated sqlliveness.SessionID, and the instance's locality. type InstanceInfo struct { + Region []byte InstanceID base.SQLInstanceID InstanceAddr string SessionID sqlliveness.SessionID diff --git a/pkg/upgrade/upgrades/alter_sql_instances_locality.go b/pkg/upgrade/upgrades/alter_sql_instances_locality.go index 220a214e2b97..de8ff5950f44 100644 --- a/pkg/upgrade/upgrades/alter_sql_instances_locality.go +++ b/pkg/upgrade/upgrades/alter_sql_instances_locality.go @@ -34,7 +34,7 @@ func alterSystemSQLInstancesAddLocality( query: addLocalityCol, schemaExistsFn: hasColumn, } - if err := migrateTable(ctx, cs, d, op, keys.SQLInstancesTableID, systemschema.SQLInstancesTable); err != nil { + if err := migrateTable(ctx, cs, d, op, keys.SQLInstancesTableID, systemschema.SQLInstancesTable()); err != nil { return err } return nil diff --git a/pkg/upgrade/upgrades/alter_sql_instances_locality_test.go b/pkg/upgrade/upgrades/alter_sql_instances_locality_test.go index a350db892e7f..0925a50ed227 100644 --- a/pkg/upgrade/upgrades/alter_sql_instances_locality_test.go +++ b/pkg/upgrade/upgrades/alter_sql_instances_locality_test.go @@ -64,7 +64,8 @@ func TestAlterSystemSqlInstancesTable(t *testing.T) { ) // Inject the old copy of the descriptor. - upgrades.InjectLegacyTable(ctx, t, s, systemschema.SQLInstancesTable, getDeprecatedSqlInstancesDescriptor) + sqlInstancesTable := systemschema.SQLInstancesTable() + upgrades.InjectLegacyTable(ctx, t, s, sqlInstancesTable, getDeprecatedSqlInstancesDescriptor) // Validate that the table sql_instances has the old schema. upgrades.ValidateSchemaExists( ctx, @@ -72,7 +73,7 @@ func TestAlterSystemSqlInstancesTable(t *testing.T) { s, sqlDB, keys.SQLInstancesTableID, - systemschema.SQLInstancesTable, + systemschema.SQLInstancesTable(), []string{}, validationSchemas, false, /* expectExists */ @@ -92,7 +93,7 @@ func TestAlterSystemSqlInstancesTable(t *testing.T) { s, sqlDB, keys.SQLInstancesTableID, - systemschema.SQLInstancesTable, + sqlInstancesTable, []string{}, validationSchemas, true, /* expectExists */