Skip to content

Commit

Permalink
storage: start with all the system ranges at bootstrap
Browse files Browse the repository at this point in the history
Before this patch, a store would be bootstrapped with a single range,
and then we'd rely on the split queue to create splits at a statically
defined list of keys and between system tables.
This patch changes the bootstrapping so that the store starts up with
all these ranges and we don't rely on the split queue any more.
This simplifies thigs: less moving pieces for new clusters. Besides
that, there was a problem before since all the ranges deriving from the
original mother one were starting with expiration-based leases. Once
those expired, most of them would transition to epoch-based leases. That
transition is disruptive - clears the timestamp cache and such - and so
transactions running at those times (4.5s after cluster startup) would incur restarts.
This was a problem for tests.
Also, the fact that ranges were starting up with expiration-based leases
was also a problem for rangefeed tests since rangefeeds don't work on
those ranges (as I understand it).

As a result of this patch, a bunch of different tests now run with more
realistic stores: there are a million ways to create stores in tests and
most of them before were just getting a single range; now most get many.

Fixes #32495
Fixes #31182
Fixes #31065

Release note: None
  • Loading branch information
andreimatei committed Jan 3, 2019
1 parent 046cca2 commit 352bdf4
Show file tree
Hide file tree
Showing 58 changed files with 1,671 additions and 941 deletions.
1 change: 1 addition & 0 deletions pkg/base/test_server_args.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,5 +172,6 @@ const (
// ReplicationManual means that the split and replication queues of all
// servers are stopped, and the test must manually control splitting and
// replication through the TestServer.
// Note that the server starts with a number of system ranges.
ReplicationManual
)
15 changes: 0 additions & 15 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,6 @@ func TestChangefeedTimestamps(t *testing.T) {
defer leaktest.AfterTest(t)()

testFn := func(t *testing.T, db *gosql.DB, f testfeedFactory) {
// HACK: remove this once #32495 is fixed.
maybeWaitForEpochLeases(t, f.Server())

ctx := context.Background()
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
Expand Down Expand Up @@ -299,9 +296,6 @@ func TestChangefeedResolvedFrequency(t *testing.T) {
defer leaktest.AfterTest(t)()

testFn := func(t *testing.T, db *gosql.DB, f testfeedFactory) {
// HACK: remove this once #32495 is fixed.
maybeWaitForEpochLeases(t, f.Server())

sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)

Expand Down Expand Up @@ -577,9 +571,6 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) {
defer leaktest.AfterTest(t)()

testFn := func(t *testing.T, db *gosql.DB, f testfeedFactory) {
// HACK: remove this once #32495 is fixed.
maybeWaitForEpochLeases(t, f.Server())

sqlDB := sqlutils.MakeSQLRunner(db)

t.Run(`add column with default`, func(t *testing.T) {
Expand Down Expand Up @@ -905,9 +896,6 @@ func TestChangefeedMonitoring(t *testing.T) {
defer leaktest.AfterTest(t)()

testFn := func(t *testing.T, db *gosql.DB, f testfeedFactory) {
// HACK: remove this once #32495 is fixed.
maybeWaitForEpochLeases(t, f.Server())

beforeEmitRowCh := make(chan struct{}, 2)
knobs := f.Server().(*server.TestServer).Cfg.TestingKnobs.
DistSQL.(*distsqlrun.TestingKnobs).
Expand Down Expand Up @@ -1131,9 +1119,6 @@ func TestChangefeedDataTTL(t *testing.T) {
defer leaktest.AfterTest(t)()

testFn := func(t *testing.T, db *gosql.DB, f testfeedFactory) {
// HACK: remove this once #32495 is fixed.
maybeWaitForEpochLeases(t, f.Server())

// Set a very simple channel-based, wait-and-resume function as the
// BeforeEmitRow hook.
var shouldWait int32
Expand Down
57 changes: 0 additions & 57 deletions pkg/ccl/changefeedccl/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
Expand Down Expand Up @@ -670,62 +669,6 @@ func expectResolvedTimestamp(t testing.TB, f testfeed) hlc.Timestamp {
return parseTimeToHLC(t, valueRaw.CRDB.Resolved)
}

// maybeWaitForEpochLeases waits until all ranges serving user table data have
// epoch leases.
//
// Changefeed resolved timestamps rely on RangeFeed checkpoints which rely on
// closed timestamps which only work with epoch leases. This means that any
// changefeed test that _uses RangeFeed_ and _needs resolved timestamps_ should
// first wait until all the relevant ranges have epoch-leases.
//
// We added this to unblock RangeFeed work, but it takes ~10s, so we should fix
// it for real at some point. The permanent fix is being tracked in #32495.
func maybeWaitForEpochLeases(t *testing.T, s serverutils.TestServerInterface) {
// If it's not a rangefeed test, don't bother waiting.
if !strings.Contains(t.Name(), `rangefeed`) {
return
}

userTablesSpan := roachpb.RSpan{
Key: roachpb.RKey(keys.MakeTablePrefix(keys.MinUserDescID)),
EndKey: roachpb.RKeyMax,
}
testutils.SucceedsSoon(t, func() error {
ctx := context.Background()
var rangeDescs []roachpb.RangeDescriptor
if err := s.DB().Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
var err error
rangeDescs, err = allRangeDescriptors(ctx, txn)
return err
}); err != nil {
return err
}

// Force a lease acquisition so we don't get stuck waiting forever.
if _, err := s.DB().Scan(ctx, userTablesSpan.Key, userTablesSpan.EndKey, 0); err != nil {
return err
}

stores := s.GetStores().(*storage.Stores)
for _, rangeDesc := range rangeDescs {
if !rangeDesc.ContainsKeyRange(userTablesSpan.Key, userTablesSpan.EndKey) {
continue
}
replica, err := stores.GetReplicaForRangeID(rangeDesc.RangeID)
if err != nil {
return err
}
lease, _ := replica.GetLease()
if lease.Epoch == 0 {
err := errors.Errorf("%s does not have an epoch lease: should resolve in %s",
rangeDesc, lease.Expiration.GoTime().Sub(timeutil.Now()))
return err
}
}
return nil
})
}

func sinklessTest(testFn func(*testing.T, *gosql.DB, testfeedFactory)) func(*testing.T) {
return func(t *testing.T) {
ctx := context.Background()
Expand Down
3 changes: 0 additions & 3 deletions pkg/ccl/changefeedccl/validations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@ func TestValidations(t *testing.T) {
if strings.Contains(t.Name(), `rangefeed`) {
t.Skip(`#32946`)
}
// HACK: remove this once #32495 is fixed.
maybeWaitForEpochLeases(t, f.Server())

sqlDB := sqlutils.MakeSQLRunner(db)

t.Run("bank", func(t *testing.T) {
Expand Down
6 changes: 3 additions & 3 deletions pkg/cli/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2069,16 +2069,16 @@ func checkNodeStatus(t *testing.T, c cliTest, output string, start time.Time) {
// address. They don't need closer checks.
baseIdx := len(baseNodeColumnHeaders)

// Adding fields that need verification for --range flag.
// Adding fields that need verification for --ranges flag.
// We have to allow up to 1 unavailable/underreplicated range because
// sometimes we run the `node status` command before the server has fully
// initialized itself and it doesn't consider itself live yet. In such cases,
// there will only be one range covering the entire keyspace because it won't
// have been able to do any splits yet.
if nodeCtx.statusShowRanges || nodeCtx.statusShowAll {
testcases = append(testcases,
testCase{"leader_ranges", baseIdx, 3},
testCase{"leaseholder_ranges", baseIdx + 1, 3},
testCase{"leader_ranges", baseIdx, 22},
testCase{"leaseholder_ranges", baseIdx + 1, 22},
testCase{"ranges", baseIdx + 2, 22},
testCase{"unavailable_ranges", baseIdx + 3, 1},
testCase{"underreplicated_ranges", baseIdx + 4, 1},
Expand Down
10 changes: 9 additions & 1 deletion pkg/config/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,13 +319,17 @@ var staticSplits = []roachpb.RKey{
}

// StaticSplits are predefined split points in the system keyspace.
// Corresponding ranges are created at cluster bootstrap time.
//
// There are two reasons for a static split. First, spans that are critical to
// cluster stability, like the node liveness span, are split into their own
// ranges to ease debugging (see #17297). Second, spans in the system keyspace
// that can be targeted by zone configs, like the meta span and the timeseries
// span, are split off into their own ranges because zone configs cannot apply
// to fractions of a range.
//
// Note that these are not the only splits created at cluster bootstrap; splits
// between various system tables are also created.
func StaticSplits() []roachpb.RKey {
return staticSplits
}
Expand All @@ -338,10 +342,14 @@ func StaticSplits() []roachpb.RKey {
// system ranges that come before the system tables. The system-config range is
// somewhat special in that it can contain multiple SQL tables
// (/table/0-/table/<max-system-config-desc>) within a single range.
func (s *SystemConfig) ComputeSplitKey(startKey, endKey roachpb.RKey) roachpb.RKey {
func (s *SystemConfig) ComputeSplitKey(startKey, endKey roachpb.RKey) (rr roachpb.RKey) {
// Before dealing with splits necessitated by SQL tables, handle all of the
// static splits earlier in the keyspace. Note that this list must be kept in
// the proper order (ascending in the keyspace) for the logic below to work.
//
// For new clusters, the static splits correspond to ranges created at
// bootstrap time. Older stores might be used with a version with more
// staticSplits though, in which case this code is useful.
for _, split := range staticSplits {
if startKey.Less(split) {
if split.Less(endKey) {
Expand Down
16 changes: 10 additions & 6 deletions pkg/config/system_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ func TestGetLargestID(t *testing.T) {
ms := sqlbase.MakeMetadataSchema()
descIDs := ms.DescriptorIDs()
maxDescID := descIDs[len(descIDs)-1]
return testCase{ms.GetInitialValues(), uint32(maxDescID), 0, ""}
kvs, _ /* splits */ := ms.GetInitialValues()
return testCase{kvs, uint32(maxDescID), 0, ""}
}(),

// Test non-zero max.
Expand Down Expand Up @@ -259,8 +260,9 @@ func TestComputeSplitKeySystemRanges(t *testing.T) {
}

cfg := config.NewSystemConfig()
kvs, _ /* splits */ := sqlbase.MakeMetadataSchema().GetInitialValues()
cfg.SystemConfigEntries = config.SystemConfigEntries{
Values: sqlbase.MakeMetadataSchema().GetInitialValues(),
Values: kvs,
}
for tcNum, tc := range testCases {
splitKey := cfg.ComputeSplitKey(tc.start, tc.end)
Expand Down Expand Up @@ -290,10 +292,10 @@ func TestComputeSplitKeyTableIDs(t *testing.T) {

schema := sqlbase.MakeMetadataSchema()
// Real system tables only.
baseSql := schema.GetInitialValues()
baseSql, _ /* splits */ := schema.GetInitialValues()
// Real system tables plus some user stuff.
userSQL := append(schema.GetInitialValues(),
descriptor(start), descriptor(start+1), descriptor(start+5))
kvs, _ /* splits */ := schema.GetInitialValues()
userSQL := append(kvs, descriptor(start), descriptor(start+1), descriptor(start+5))
// Real system tables and partitioned user tables.
subzoneSQL := append(userSQL,
zoneConfig(start+1, subzone("a", ""), subzone("c", "e")),
Expand Down Expand Up @@ -419,8 +421,10 @@ func TestGetZoneConfigForKey(t *testing.T) {
config.ZoneConfigHook = originalZoneConfigHook
}()
cfg := config.NewSystemConfig()

kvs, _ /* splits */ := sqlbase.MakeMetadataSchema().GetInitialValues()
cfg.SystemConfigEntries = config.SystemConfigEntries{
Values: sqlbase.MakeMetadataSchema().GetInitialValues(),
Values: kvs,
}
for tcNum, tc := range testCases {
var objectID uint32
Expand Down
6 changes: 6 additions & 0 deletions pkg/keys/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,9 @@ const (
TableCommentType = 1
ColumnCommentType = 2
)

// PseudoTableIDs is the list of ids from above that are not real tables (i.e.
// there's no table descriptor). They're grouped here because the cluster
// bootstrap process needs to create splits for them; splits for the tables
// happen separately.
var PseudoTableIDs = []uint32{MetaRangesID, SystemRangesID, TimeseriesRangesID, LivenessRangesID}
3 changes: 2 additions & 1 deletion pkg/kv/txn_coord_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,8 @@ func setupMetricsTest(t *testing.T) (*localtestcluster.LocalTestCluster, TxnMetr
s := &localtestcluster.LocalTestCluster{
DBContext: &dbCtx,
// Liveness heartbeat txns mess up the metrics.
DontStartLivenessHeartbeat: true,
DisableLivenessHeartbeat: true,
DontCreateSystemRanges: true,
}
s.Start(t, testutils.NewNodeTestBaseContext(), InitFactoryForLocalTestCluster)

Expand Down
50 changes: 45 additions & 5 deletions pkg/server/admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -1406,14 +1407,48 @@ func TestAdminAPIRangeLogByRangeID(t *testing.T) {
// TestAdminAPIRangeLogByRangeID).
func TestAdminAPIFullRangeLog(t *testing.T) {
defer leaktest.AfterTest(t)()
s, _, kvDB := serverutils.StartServer(t, base.TestServerArgs{})
s, db, _ := serverutils.StartServer(t,
base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &storage.StoreTestingKnobs{
DisableSplitQueue: true,
},
},
})
defer s.Stopper().Stop(context.Background())

expectedRanges, err := ExpectedInitialRangeCount(kvDB)
// Insert something in the rangelog table, otherwise it's empty for new
// clusters.
rows, err := db.Query(`SELECT count(1) FROM system.rangelog`)
if err != nil {
t.Fatal(err)
}
expectedEvents := expectedRanges - 1 // one for each split
if !rows.Next() {
t.Fatal("missing row")
}
var cnt int
if err := rows.Scan(&cnt); err != nil {
t.Fatal(err)
}
if err := rows.Close(); err != nil {
t.Fatal(err)
}
if cnt != 0 {
t.Fatalf("expected 0 rows in system.rangelog, found: %d", cnt)
}
const rangeID = 100
for i := 0; i < 10; i++ {
if _, err := db.Exec(
`INSERT INTO system.rangelog (
timestamp, "rangeID", "storeID", "eventType"
) VALUES (now(), $1, 1, $2)`,
rangeID,
storagepb.RangeLogEventType_add.String(),
); err != nil {
t.Fatal(err)
}
}
expectedEvents := 10

testCases := []struct {
hasLimit bool
Expand All @@ -1436,8 +1471,13 @@ func TestAdminAPIFullRangeLog(t *testing.T) {
if err := getAdminJSONProto(s, url, &resp); err != nil {
t.Fatal(err)
}
if e, a := tc.expected, len(resp.Events); e != a {
t.Fatalf("expected %d events, got %d", e, a)
events := resp.Events
if e, a := tc.expected, len(events); e != a {
var sb strings.Builder
for _, ev := range events {
sb.WriteString(ev.String() + "\n")
}
t.Fatalf("expected %d events, got %d:\n%s", e, a, sb.String())
}
})
}
Expand Down
21 changes: 20 additions & 1 deletion pkg/server/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"

"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -73,13 +74,31 @@ func (s *initServer) Bootstrap(
s.semaphore.acquire()
defer s.semaphore.release()

if err := s.server.node.bootstrap(ctx, s.server.engines, s.server.cfg.Settings.Version.BootstrapVersion()); err != nil {
if err := s.server.node.bootstrap(
ctx, s.server.engines, s.server.cfg.Settings.Version.BootstrapVersion(),
); err != nil {
if _, ok := err.(*duplicateBootstrapError); ok {
return nil, status.Errorf(codes.AlreadyExists, err.Error())
}
log.Error(ctx, "node bootstrap failed: ", err)
return nil, err
}
// Force all the system ranges through the replication queue so they
// upreplicate as quickly as possible when a new node joins. Without this
// code, the upreplication would be up to the whim of the scanner, which
// might be too slow for new clusters.
done := false
if err := s.server.node.stores.VisitStores(func(store *storage.Store) error {
if !done {
done = true
if err := store.ForceReplicationScanAndProcess(); err != nil {
return err
}
}
return nil
}); err != nil {
return nil, err
}

close(s.bootstrapped)
return &serverpb.BootstrapResponse{}, nil
Expand Down
Loading

0 comments on commit 352bdf4

Please sign in to comment.