Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
90427: instancestorage: asynchronously pre-allocate instance IDs in sql_instances r=JeffSwenson,ajwerner a=jaylim-crl

Related to #85737.

Previously, when a SQL pod starts up, we will allocate an instance ID for it
synchronously. This can be a problem for multi-region setup because that would
now involve a read and write across all regions. This commit is part of the
work to make the system.sql_instances table REGIONAL BY ROW to reduce cold
start times. Here, we would asynchronously pre-allocate instance IDs, and the
startup process will then claim an ID for the SQL pod. Once the sql_instances
table is made REGIONAL BY ROW, we can update the logic to pre-allocate for
every region, and only perform a regional lookup when claiming an ID.

At the same time, we will now use a cached reader of the sqlLivenessProvider
with the SQL instance storage. This is fine since the cached reader is on the
conservative side, and will return true on IsAlive if it does not know. The
only downside to this is that we leave instance rows lying around in the
sql_instances table longer, but that is fine since they will eventually be
reclaimed once the cache gets updated.

Epic: None

Release note (sql change): The `system.sql_instances` table now includes
pre-allocated ID entries, where all the fields except `id` will be NULL.

91170: dev: add -flex-types argument for sqlite logic tests by default r=yuzefovich a=yuzefovich

This commit adds a shortcut for specifying `-flex-types` test argument of the logic tests that is needed to run sqlite targets. It also defaults to passing this argument if only sqlite targets are specified. We do use this flag in the CI, so it reduces a possibility of confusion.

Found when working on #58089.

Epic: None

Release note: None

91388: loqrecovery: prohibit plans with any descriptor changes r=tbg a=aliher1911

Previously, loss of quorum recovery allowed plan creation for cases where descriptor change didn't change survivor replica. This is not sufficient as replica will still be checked against its status and will panic when this entry is applied.
This diff changes validation behaviour to treat this change as a problem and require force flag to override it.

Release note: None

Fixes #91271

Co-authored-by: Jay <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Oleg Afanasyev <[email protected]>
  • Loading branch information
4 people committed Nov 8, 2022
4 parents 666fdee + f4dc2a0 + e4cfc1f + cceef95 commit d8ebd29
Show file tree
Hide file tree
Showing 18 changed files with 1,233 additions and 241 deletions.
2 changes: 1 addition & 1 deletion dev
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fi
set -euo pipefail

# Bump this counter to force rebuilding `dev` on all machines.
DEV_VERSION=59
DEV_VERSION=60

THIS_DIR=$(cd "$(dirname "$0")" && pwd)
BINARY_DIR=$THIS_DIR/bin/dev-versions
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/serverccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,10 @@ go_test(
"//pkg/server",
"//pkg/server/serverpb",
"//pkg/server/systemconfigwatcher/systemconfigwatchertest",
"//pkg/settings/cluster",
"//pkg/sql",
"//pkg/sql/distsql",
"//pkg/sql/sqlinstance/instancestorage",
"//pkg/sql/tests",
"//pkg/testutils",
"//pkg/testutils/serverutils",
Expand Down
45 changes: 45 additions & 0 deletions pkg/ccl/serverccl/server_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,19 @@ import (
"io"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl/licenseccl"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/systemconfigwatcher/systemconfigwatchertest"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/distsql"
"github.com/cockroachdb/cockroach/pkg/sql/sqlinstance/instancestorage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
Expand Down Expand Up @@ -235,6 +239,47 @@ func TestTenantRowIDs(t *testing.T) {
require.Equal(t, numRows, rowCount)
}

// TestTenantInstanceIDReclaimLoop confirms that the sql_instances reclaim loop
// has been started.
func TestTenantInstanceIDReclaimLoop(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

settings := cluster.MakeTestingClusterSettings()
tc := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Settings: settings,
// Don't use a default test tenant. We will explicitly create one.
DisableDefaultTestTenant: true,
},
})
defer tc.Stopper().Stop(ctx)

clusterSettings := tc.Server(0).ClusterSettings()
instancestorage.ReclaimLoopInterval.Override(ctx, &clusterSettings.SV, 250*time.Millisecond)
instancestorage.PreallocatedCount.Override(ctx, &clusterSettings.SV, 5)

_, db := serverutils.StartTenant(
t,
tc.Server(0),
base.TestTenantArgs{TenantID: serverutils.TestTenantID(), Settings: settings},
)
defer db.Close()
sqlDB := sqlutils.MakeSQLRunner(db)

var rowCount int64
testutils.SucceedsSoon(t, func() error {
sqlDB.QueryRow(t, `SELECT count(*) FROM system.sql_instances WHERE addr IS NULL`).Scan(&rowCount)
// We set PreallocatedCount to 5. When the tenant gets started, it drops
// to 4. Eventually this will be 5 if the reclaim loop runs.
if rowCount == 5 {
return nil
}
return fmt.Errorf("waiting for preallocated rows")
})
}

// TestNoInflightTracesVirtualTableOnTenant verifies that internal inflight traces table
// is correctly handled by tenants (which don't provide this functionality as of now).
func TestNoInflightTracesVirtualTableOnTenant(t *testing.T) {
Expand Down
27 changes: 21 additions & 6 deletions pkg/cmd/dev/testlogic.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,13 @@ import (
)

const (
bigtestFlag = "bigtest"
filesFlag = "files"
subtestsFlag = "subtests"
configsFlag = "config"
showSQLFlag = "show-sql"
noGenFlag = "no-gen"
bigtestFlag = "bigtest"
filesFlag = "files"
subtestsFlag = "subtests"
configsFlag = "config"
showSQLFlag = "show-sql"
noGenFlag = "no-gen"
flexTypesFlag = "flex-types"
)

func makeTestLogicCmd(runE func(cmd *cobra.Command, args []string) error) *cobra.Command {
Expand Down Expand Up @@ -61,6 +62,7 @@ func makeTestLogicCmd(runE func(cmd *cobra.Command, args []string) error) *cobra
testLogicCmd.Flags().String(stressArgsFlag, "", "additional arguments to pass to stress")
testLogicCmd.Flags().String(testArgsFlag, "", "additional arguments to pass to go test binary")
testLogicCmd.Flags().Bool(showDiffFlag, false, "generate a diff for expectation mismatches when possible")
testLogicCmd.Flags().Bool(flexTypesFlag, false, "tolerate when a result column is produced with a different numeric type")

addCommonBuildFlags(testLogicCmd)
return testLogicCmd
Expand Down Expand Up @@ -88,6 +90,7 @@ func (d *dev) testlogic(cmd *cobra.Command, commandLine []string) error {
stressCmdArgs = mustGetFlagString(cmd, stressArgsFlag)
testArgs = mustGetFlagString(cmd, testArgsFlag)
showDiff = mustGetFlagBool(cmd, showDiffFlag)
flexTypes = mustGetFlagBool(cmd, flexTypesFlag)
)
if rewrite {
ignoreCache = true
Expand Down Expand Up @@ -132,15 +135,19 @@ func (d *dev) testlogic(cmd *cobra.Command, commandLine []string) error {

var targets []string
args := []string{"test"}
var hasNonSqlite bool
for _, choice := range choices {
var testsDir string
switch choice {
case "base":
testsDir = "//pkg/sql/logictest/tests"
hasNonSqlite = true
case "ccl":
testsDir = "//pkg/ccl/logictestccl/tests"
hasNonSqlite = true
case "opt":
testsDir = "//pkg/sql/opt/exec/execbuilder/tests"
hasNonSqlite = true
case "sqlite":
testsDir = "//pkg/sql/sqlitelogictest/tests"
bigtest = true
Expand Down Expand Up @@ -216,6 +223,14 @@ func (d *dev) testlogic(cmd *cobra.Command, commandLine []string) error {
if len(files) > 0 {
args = append(args, "--test_arg", "-show-sql")
}
if !hasNonSqlite {
// If we only have sqlite targets, then we always append --flex-types
// argument to simulate what we do in the CI.
flexTypes = true
}
if flexTypes {
args = append(args, "--test_arg", "-flex-types")
}

if rewrite {
if stress {
Expand Down
17 changes: 8 additions & 9 deletions pkg/kv/kvserver/loqrecovery/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,15 +503,14 @@ func checkDescriptor(rankedDescriptors rankedReplicas) (problems []Problem) {
},
})
case loqrecoverypb.DescriptorChangeType_ReplicaChange:
// Check if our own replica is being removed as part of descriptor
// change.
_, ok := change.Desc.GetReplicaDescriptor(rankedDescriptors.storeID())
if !ok {
problems = append(problems, rangeReplicaRemoval{
rangeID: rankedDescriptors.rangeID(),
span: rankedDescriptors.span(),
})
}
// Any change of descriptor even if it doesn't change current replica
// is not safe to apply if we change replica id.
// Until we have a way to remove this change, we should treat this as
// a problem.
problems = append(problems, rangeReplicaChange{
rangeID: rankedDescriptors.rangeID(),
span: rankedDescriptors.span(),
})
}
}
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ ok
make-plan
----
ERROR: loss of quorum recovery error
range has unapplied descriptor change that removes current replica
range has unapplied descriptor change
r2: /{Table/1-Max}

make-plan force=true
Expand Down
26 changes: 7 additions & 19 deletions pkg/kv/kvserver/loqrecovery/testdata/pending_descriptor_changes
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ ok
make-plan
----
ERROR: loss of quorum recovery error
range has unapplied descriptor change that removes current replica
range has unapplied descriptor change
r2: /{Table/1-Max}


Expand Down Expand Up @@ -109,7 +109,8 @@ range has unapplied merge operation


# Check that ranges with pending descriptor changes where the change removes other replicas are
# considered safe to proceed with.
# considered unsafe to proceed with. This is forbidden because any change will fail if replica
# id of survivor replica reverts.
replication-data
- StoreID: 1
RangeID: 1
Expand All @@ -132,7 +133,7 @@ replication-data
RangeAppliedIndex: 11
RaftCommittedIndex: 14
DescriptorUpdates:
- Type: 2 # pending descriptor update where replicas 2 3 are replaced with 4 which is considered safe
- Type: 2 # pending descriptor update where replicas 2 3 are replaced with 4 which is not considered safe
Replicas:
- { NodeID: 1, StoreID: 1, ReplicaID: 1}
- { NodeID: 4, StoreID: 4, ReplicaID: 4}
Expand All @@ -145,22 +146,9 @@ ok

make-plan
----
- RangeID: 1
StartKey: /Min
OldReplicaID: 1
NewReplica:
NodeID: 1
StoreID: 1
ReplicaID: 14
NextReplicaID: 15
- RangeID: 2
StartKey: /Table/1
OldReplicaID: 1
NewReplica:
NodeID: 1
StoreID: 1
ReplicaID: 14
NextReplicaID: 15
ERROR: loss of quorum recovery error
range has unapplied descriptor change
r2: /{Table/1-Max}


# Check that if descriptor didn't lose quorum, we should not fail if raft log contains future changes
Expand Down
8 changes: 4 additions & 4 deletions pkg/kv/kvserver/loqrecovery/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,18 @@ func (i rangeMerge) Span() roachpb.Span {
return i.span
}

type rangeReplicaRemoval struct {
type rangeReplicaChange struct {
rangeID roachpb.RangeID
span roachpb.Span
}

func (i rangeReplicaRemoval) String() string {
return fmt.Sprintf("range has unapplied descriptor change that removes current replica\n r%d: %v",
func (i rangeReplicaChange) String() string {
return fmt.Sprintf("range has unapplied descriptor change\n r%d: %v",
i.rangeID,
i.span)
}

func (i rangeReplicaRemoval) Span() roachpb.Span {
func (i rangeReplicaChange) Span() roachpb.Span {
return i.span
}

Expand Down
13 changes: 10 additions & 3 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -496,14 +496,15 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*SQLServer, error) {
return nil, errors.AssertionFailedf("non-system codec used for SQL pod")
}

cfg.sqlInstanceStorage = instancestorage.NewStorage(cfg.db, codec, cfg.sqlLivenessProvider)
cfg.sqlInstanceStorage = instancestorage.NewStorage(
cfg.db, codec, cfg.sqlLivenessProvider.CachedReader(), cfg.Settings)
cfg.sqlInstanceReader = instancestorage.NewReader(
cfg.sqlInstanceStorage,
cfg.sqlLivenessProvider.CachedReader(),
cfg.rangeFeedFactory,
codec, cfg.clock, cfg.stopper)

// In a multi-tenant environment, use the sqlInstanceProvider to resolve
// In a multi-tenant environment, use the sqlInstanceReader to resolve
// SQL pod addresses.
addressResolver := func(nodeID roachpb.NodeID) (net.Addr, error) {
info, err := cfg.sqlInstanceReader.GetInstance(cfg.rpcContext.MasterCtx, base.SQLInstanceID(nodeID))
Expand Down Expand Up @@ -1317,7 +1318,13 @@ func (s *SQLServer) preStart(
if err != nil {
return err
}
// Allocate our instance ID.
// Start instance ID reclaim loop.
if err := s.sqlInstanceStorage.RunInstanceIDReclaimLoop(
ctx, stopper, timeutil.DefaultTimeSource{}, session.Expiration,
); err != nil {
return err
}
// Acquire our instance ID.
instanceID, err := s.sqlInstanceStorage.CreateInstance(
ctx, session.ID(), session.Expiration(), s.cfg.AdvertiseAddr, s.distSQLServer.Locality)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ SELECT * FROM t

# Check sql instance locality in the secondary tenant.
query IT
SELECT id, locality FROM system.sql_instances
SELECT id, locality FROM system.sql_instances WHERE locality IS NOT NULL
----
1 {"Tiers": "region=test"}
2 {"Tiers": "region=test1"}
Expand Down
10 changes: 9 additions & 1 deletion pkg/sql/sqlinstance/instancestorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ go_library(
"//pkg/kv/kvclient/rangefeed",
"//pkg/multitenant",
"//pkg/roachpb",
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/catalog",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/systemschema",
Expand All @@ -32,8 +34,11 @@ go_library(
"//pkg/util/hlc",
"//pkg/util/json",
"//pkg/util/log",
"//pkg/util/retry",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/syncutil/singleflight",
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
],
)
Expand All @@ -42,19 +47,22 @@ go_test(
name = "instancestorage_test",
srcs = [
"instancereader_test.go",
"instancestorage_internal_test.go",
"instancestorage_test.go",
"main_test.go",
],
args = ["-test.timeout=295s"],
embed = [":instancestorage"],
deps = [
":instancestorage",
"//pkg/base",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient/rangefeed",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/settings/cluster",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/systemschema",
"//pkg/sql/sqlinstance",
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/sqlinstance/instancestorage/instancereader.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,10 @@ func (r *Reader) getAllLiveInstances(ctx context.Context) ([]instancerow, error)
{
truncated := rows[:0]
for _, row := range rows {
// Skip instances which are preallocated.
if row.isAvailable() {
continue
}
isAlive, err := r.slReader.IsAlive(ctx, row.sessionID)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sqlinstance/instancestorage/instancereader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestReader(t *testing.T) {
tDB.Exec(t, schema)
tableID := getTableID(t, tDB, dbName, "sql_instances")
slStorage := slstorage.NewFakeStorage()
storage := instancestorage.NewTestingStorage(s.DB(), keys.SystemSQLCodec, tableID, slStorage)
storage := instancestorage.NewTestingStorage(s.DB(), keys.SystemSQLCodec, tableID, slStorage, s.ClusterSettings())
reader := instancestorage.NewTestingReader(storage, slStorage, s.RangeFeedFactory().(*rangefeed.Factory), keys.SystemSQLCodec, tableID, s.Clock(), s.Stopper())
return storage, slStorage, s.Clock(), reader
}
Expand Down
Loading

0 comments on commit d8ebd29

Please sign in to comment.