Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
74765: roachpb: introduce the concept of `SystemSpanConfig` and related protos r=arulajmani a=arulajmani

This patch is motivated by the desire to let the host tenant lay
protected timestamps on one or all secondary tenants' keyspace. It
also provides a mechanism to allow secondary tenants to lay protected
timestamps on their entire keyspace without updating every span
configuration.

We introduce the concept of `SystemSpanConfig` and
`SystemSpanConfigTarget` to enable this. We tie these together using a
`SystemSpanConfigEntry`.

A `SystemSpanConfig` is a system installed configuration that can apply
to multiple spans. It only contains protected timestamp information.

A `SystemSpanConfigTarget` is used to specify the spans a
`SystemSpanConfig` applies over. It can be used to target the entire
(logical) cluster or a particular secondary tenant. We will ensure that
only the host tenant can target secondary tenants in a future PR that
actually persists `SystemSpanConfigs`.

We will persist `SystemSpanConfigs` in `system.span_configurations` in
a future patch. The `SystemSpanConfigTarget` will be encoded into
special reserved keys when we do so.

This change introduces the notion of a hierarchy to span configurations.
The configuration that applies to a span will now bee the `SpanConfig`
stored in `system.span_configurations` combined with all the
`SystemSpanConfigs` that apply to the span. This can be at most 4
levels deep -- for a secondary tenant's range, the secondary tenant can
install a `SystemSpanConfig` that applies to all its ranges, the host
tenant can install a `SystemSpanConfig` that applies to all ranges of
the secondary tenant, and the host tenant can install a
`SystemSpanConfig` that applies to all ranges.

These protos form the data model which will later be used to enable
protected timestamp support for secondary tenants using the span config
infrastructure. It will be used by the various components such as the
`SQLTranslator`, `KVAccessor`, `Reconciler` etc.

Release note: None

75233: kvserver: avoid clobbering replica conf r=irfansharif a=irfansharif

Fixes #75109. There are two ways to read the configuration applied over
a given replica:
  (a) the copy embedded in each replica struct
  (b) spanconfig.StoreReader, hanging off the store struct

The interface in (b) is implemented by both the span configs
infrastructure and the legacy system config span it's designed to
replace; it's typically used by KV queues (see #69172). When switching
between subsystems in KV (controlled by spanconfig.store.enabled), for
we transparently switch the source for (b). We also use then use the
right source to refresh (a) for every replica. Incremental updates
thereafter mutate (a) directly. As you'd expect, we need to take special
care that only one subsystem is writing to (a) at a given point in time,
a guarantee that was not upheld before this commit. This bug manifested
after we enabled span configs by default (see #73876), and likely
affected many tests.

To avoid the system config span from clobbering (a) when span configs
are enabled, we just need to check what spanconfig.store.enabled
holds at the right spot. To repro:

    # Reliably fails with 1-2m on my GCE worker before this patch,
    # doesn't after.
    dev test pkg/kv/kvserver \
      -f TestBackpressureNotAppliedWhenReducingRangeSize \
      -v --show-logs --timeout 15m --stress

Release note: None

75280: sql: deprecate TableDescriptor.GCMutations r=postamar,ajwerner a=stevendanna

This appears unused. While the schema changer adds entries that the gc
job subsequently removes, the only other code that made use of this
field (outside of tests) was FindIndexByID. FindIndexByID appears to
use it to return a special error that no one looks for.

Release note: None

Co-authored-by: arulajmani <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
4 people committed Jan 25, 2022
4 parents 421a767 + 89decea + dedcec7 + c08ae50 commit c956f4c
Show file tree
Hide file tree
Showing 19 changed files with 279 additions and 367 deletions.
126 changes: 48 additions & 78 deletions pkg/jobs/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,38 +93,16 @@ func writeMutation(
}
}

func writeGCMutation(
t *testing.T,
kvDB *kv.DB,
tableDesc *tabledesc.Mutable,
m descpb.TableDescriptor_GCDescriptorMutation,
) {
tableDesc.GCMutations = append(tableDesc.GCMutations, m)
tableDesc.Version++
if err := catalog.ValidateSelf(tableDesc); err != nil {
t.Fatal(err)
}
if err := kvDB.Put(
context.Background(),
catalogkeys.MakeDescMetadataKey(keys.SystemSQLCodec, tableDesc.GetID()),
tableDesc.DescriptorProto(),
); err != nil {
t.Fatal(err)
}
}

type mutationOptions struct {
// Set if the desc should have any mutations of any sort.
hasMutation bool
// Set if the mutation being inserted is a GCMutation.
hasGCMutation bool
// Set if the desc should have a job that is dropping it.
hasDropJob bool
}

func (m mutationOptions) string() string {
return fmt.Sprintf("hasMutation=%s_hasGCMutation=%s_hasDropJob=%s",
strconv.FormatBool(m.hasMutation), strconv.FormatBool(m.hasGCMutation),
return fmt.Sprintf("hasMutation=%s_hasDropJob=%s",
strconv.FormatBool(m.hasMutation),
strconv.FormatBool(m.hasDropJob))
}

Expand Down Expand Up @@ -172,7 +150,7 @@ func TestRegistryGC(t *testing.T) {
writeJob := func(name string, created, finished time.Time, status Status, mutOptions mutationOptions) string {
tableName := constructTableName(name, mutOptions)
if _, err := sqlDB.Exec(fmt.Sprintf(`
CREATE DATABASE IF NOT EXISTS t;
CREATE DATABASE IF NOT EXISTS t;
CREATE TABLE t."%s" (k VARCHAR PRIMARY KEY DEFAULT 'default', v VARCHAR,i VARCHAR NOT NULL DEFAULT 'i');
INSERT INTO t."%s" VALUES('a', 'foo');
`, tableName, tableName)); err != nil {
Expand All @@ -187,10 +165,6 @@ INSERT INTO t."%s" VALUES('a', 'foo');
writeColumnMutation(t, kvDB, tableDesc, "i", descpb.DescriptorMutation{State: descpb.
DescriptorMutation_DELETE_AND_WRITE_ONLY, Direction: descpb.DescriptorMutation_DROP})
}
if mutOptions.hasGCMutation {
writeGCMutation(t, kvDB, tableDesc, descpb.TableDescriptor_GCDescriptorMutation{})
}

payload, err := protoutil.Marshal(&jobspb.Payload{
Description: name,
// register a mutation on the table so that jobs that reference
Expand Down Expand Up @@ -222,58 +196,54 @@ INSERT INTO t."%s" VALUES('a', 'foo');

// Test the descriptor when any of the following are set.
// 1. Mutations
// 2. GC Mutations
// 3. A drop job
// 2. A drop job
for _, hasMutation := range []bool{true, false} {
for _, hasGCMutation := range []bool{true, false} {
for _, hasDropJob := range []bool{true, false} {
if !hasMutation && !hasGCMutation && !hasDropJob {
continue
}
mutOptions := mutationOptions{
hasMutation: hasMutation,
hasGCMutation: hasGCMutation,
hasDropJob: hasDropJob,
}
oldRunningJob := writeJob("old_running", muchEarlier, time.Time{}, StatusRunning, mutOptions)
oldSucceededJob := writeJob("old_succeeded", muchEarlier, muchEarlier.Add(time.Minute), StatusSucceeded, mutOptions)
oldFailedJob := writeJob("old_failed", muchEarlier, muchEarlier.Add(time.Minute),
StatusFailed, mutOptions)
oldRevertFailedJob := writeJob("old_revert_failed", muchEarlier, muchEarlier.Add(time.Minute),
StatusRevertFailed, mutOptions)
oldCanceledJob := writeJob("old_canceled", muchEarlier, muchEarlier.Add(time.Minute),
StatusCanceled, mutOptions)
newRunningJob := writeJob("new_running", earlier, earlier.Add(time.Minute), StatusRunning,
mutOptions)
newSucceededJob := writeJob("new_succeeded", earlier, earlier.Add(time.Minute), StatusSucceeded, mutOptions)
newFailedJob := writeJob("new_failed", earlier, earlier.Add(time.Minute), StatusFailed, mutOptions)
newRevertFailedJob := writeJob("new_revert_failed", earlier, earlier.Add(time.Minute), StatusRevertFailed, mutOptions)
newCanceledJob := writeJob("new_canceled", earlier, earlier.Add(time.Minute),
StatusCanceled, mutOptions)

db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldSucceededJob}, {oldFailedJob}, {oldRevertFailedJob}, {oldCanceledJob},
{newRunningJob}, {newSucceededJob}, {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}})

if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, earlier); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newSucceededJob},
{newFailedJob}, {newRevertFailedJob}, {newCanceledJob}})
for _, hasDropJob := range []bool{true, false} {
if !hasMutation && !hasDropJob {
continue
}
mutOptions := mutationOptions{
hasMutation: hasMutation,
hasDropJob: hasDropJob,
}
oldRunningJob := writeJob("old_running", muchEarlier, time.Time{}, StatusRunning, mutOptions)
oldSucceededJob := writeJob("old_succeeded", muchEarlier, muchEarlier.Add(time.Minute), StatusSucceeded, mutOptions)
oldFailedJob := writeJob("old_failed", muchEarlier, muchEarlier.Add(time.Minute),
StatusFailed, mutOptions)
oldRevertFailedJob := writeJob("old_revert_failed", muchEarlier, muchEarlier.Add(time.Minute),
StatusRevertFailed, mutOptions)
oldCanceledJob := writeJob("old_canceled", muchEarlier, muchEarlier.Add(time.Minute),
StatusCanceled, mutOptions)
newRunningJob := writeJob("new_running", earlier, earlier.Add(time.Minute), StatusRunning,
mutOptions)
newSucceededJob := writeJob("new_succeeded", earlier, earlier.Add(time.Minute), StatusSucceeded, mutOptions)
newFailedJob := writeJob("new_failed", earlier, earlier.Add(time.Minute), StatusFailed, mutOptions)
newRevertFailedJob := writeJob("new_revert_failed", earlier, earlier.Add(time.Minute), StatusRevertFailed, mutOptions)
newCanceledJob := writeJob("new_canceled", earlier, earlier.Add(time.Minute),
StatusCanceled, mutOptions)

db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldSucceededJob}, {oldFailedJob}, {oldRevertFailedJob}, {oldCanceledJob},
{newRunningJob}, {newSucceededJob}, {newFailedJob}, {newRevertFailedJob}, {newCanceledJob}})

if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, earlier); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newSucceededJob},
{newFailedJob}, {newRevertFailedJob}, {newCanceledJob}})

if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, ts.Add(time.Minute*-10)); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newRevertFailedJob}})

// Delete the revert failed, and running jobs for the next run of the
// test.
_, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1 OR id = $2 OR id = $3 OR id = $4`,
oldRevertFailedJob, newRevertFailedJob, oldRunningJob, newRunningJob)
require.NoError(t, err)
if err := s.JobRegistry().(*Registry).cleanupOldJobs(ctx, ts.Add(time.Minute*-10)); err != nil {
t.Fatal(err)
}
db.CheckQueryResults(t, `SELECT id FROM system.jobs ORDER BY id`, [][]string{
{oldRunningJob}, {oldRevertFailedJob}, {newRunningJob}, {newRevertFailedJob}})

// Delete the revert failed, and running jobs for the next run of the
// test.
_, err := sqlDB.Exec(`DELETE FROM system.jobs WHERE id = $1 OR id = $2 OR id = $3 OR id = $4`,
oldRevertFailedJob, newRevertFailedJob, oldRunningJob, newRunningJob)
require.NoError(t, err)
}
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ go_test(
"client_split_burst_test.go",
"client_split_test.go",
"client_status_test.go",
"client_store_test.go",
"client_tenant_test.go",
"client_test.go",
"closed_timestamp_test.go",
Expand Down
15 changes: 7 additions & 8 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1011,11 +1011,11 @@ func TestStoreZoneUpdateAndRangeSplit(t *testing.T) {

ctx := context.Background()
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
// This test was written with the SystemConfigSpan in mind.
DisableSpanConfigs: true,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
// This test was written with the SystemConfigSpan in mind.
UseSystemConfigSpanForQueues: true,
},
},
})
Expand Down Expand Up @@ -1082,11 +1082,11 @@ func TestStoreRangeSplitWithMaxBytesUpdate(t *testing.T) {

ctx := context.Background()
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
// This test was written with the system config span in mind.
DisableSpanConfigs: true,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
// This test was written with the system config span in mind.
UseSystemConfigSpanForQueues: true,
},
},
})
Expand Down Expand Up @@ -3562,12 +3562,11 @@ func TestStoreRangeSplitAndMergeWithGlobalReads(t *testing.T) {

ctx := context.Background()
serv, _, _ := serverutils.StartServer(t, base.TestServerArgs{
DisableSpanConfigs: true,
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
DisableMergeQueue: true,
// This test was written with the system config span in mind.
UseSystemConfigSpanForQueues: true,
TestingResponseFilter: respFilter,
DisableMergeQueue: true,
TestingResponseFilter: respFilter,
},
},
})
Expand Down
72 changes: 72 additions & 0 deletions pkg/kv/kvserver/client_store_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package kvserver_test

import (
"context"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/spanconfig"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/testutils/testcluster"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
)

// TestStoreSetRangesMaxBytes creates a set of ranges via splitting and then
// sets the config zone to a custom max bytes value to verify the ranges' max
// bytes are updated appropriately.
func TestStoreSetRangesMaxBytes(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const expMaxBytes, defaultMaxBytes = 420 << 20, 512 << 20
ctx := context.Background()
tc := testcluster.StartTestCluster(t, 1,
base.TestClusterArgs{
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
ConfigureScratchRange: true,
},
},
},
},
)
defer tc.Stopper().Stop(ctx)
store := tc.GetFirstStoreFromServer(t, 0)
tdb := sqlutils.MakeSQLRunner(tc.Conns[0])

tdb.Exec(t, `SET CLUSTER SETTING kv.closed_timestamp.target_duration = '100ms'`) // speeds up the test

testKey := tc.ScratchRange(t)
testutils.SucceedsSoon(t, func() error {
repl := store.LookupReplica(roachpb.RKey(testKey))
if got := repl.GetMaxBytes(); got != defaultMaxBytes {
return errors.Errorf("range max bytes values did not start at %d; got %d", defaultMaxBytes, got)
}
return nil
})

tdb.Exec(t, `ALTER RANGE DEFAULT CONFIGURE ZONE USING range_max_bytes = $1`, expMaxBytes)

testutils.SucceedsSoon(t, func() error {
repl := store.LookupReplica(roachpb.RKey(testKey))
if got := repl.GetMaxBytes(); got != expMaxBytes {
return errors.Errorf("range max bytes values did not change to %d; got %d", expMaxBytes, got)
}
return nil
})
}
65 changes: 34 additions & 31 deletions pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,59 +245,62 @@ func TestReplicateQueueDownReplicate(t *testing.T) {
skip.UnderRace(t, "takes >1min under race")

ctx := context.Background()
const replicaCount = 3

// The goal of this test is to ensure that down replication occurs correctly
// using the replicate queue, and to ensure that's the case, the test
// cluster needs to be kept in auto replication mode.
tc := testcluster.StartTestCluster(t, replicaCount+2,
// The goal of this test is to ensure that down replication occurs
// correctly using the replicate queue, and to ensure that's the case,
// the test cluster needs to be kept in auto replication mode.
tc := testcluster.StartTestCluster(t, 3,
base.TestClusterArgs{
ReplicationMode: base.ReplicationAuto,
ServerArgs: base.TestServerArgs{
ScanMinIdleTime: 10 * time.Millisecond,
ScanMaxIdleTime: 10 * time.Millisecond,
Knobs: base.TestingKnobs{
SpanConfig: &spanconfig.TestingKnobs{
ConfigureScratchRange: true,
},
},
},
},
)
defer tc.Stopper().Stop(ctx)

// Disable the replication queues so that the range we're about to create
// doesn't get down-replicated too soon.
tc.ToggleReplicateQueues(false)

testKey := tc.ScratchRange(t)
desc := tc.LookupRangeOrFatal(t, testKey)
// At the end of StartTestCluster(), all ranges have 5 replicas since they're
// all "system ranges". When the ScratchRange() splits its range, it also
// starts up with 5 replicas. Since it's not a system range, its default zone
// config asks for 3x replication, and the replication queue will
// down-replicate it.
require.Len(t, desc.Replicas().Descriptors(), 5)
// Re-enable the replication queue.
tc.ToggleReplicateQueues(true)
testutils.SucceedsSoon(t, func() error {
desc := tc.LookupRangeOrFatal(t, testKey)
if got := len(desc.Replicas().Descriptors()); got != 3 {
return errors.Newf("expected 3 replicas for scratch range, found %d", got)
}
return nil
})

_, err := tc.ServerConn(0).Exec(
`ALTER RANGE DEFAULT CONFIGURE ZONE USING num_replicas = 1`,
)
require.NoError(t, err)

for _, s := range tc.Servers {
require.NoError(t, s.Stores().VisitStores(func(s *kvserver.Store) error {
require.NoError(t, s.ForceReplicationScanAndProcess())
return nil
}))
}

// Now wait until the replicas have been down-replicated back to the
// desired number.
testutils.SucceedsSoon(t, func() error {
descriptor, err := tc.LookupRange(testKey)
if err != nil {
t.Fatal(err)
}
if len(descriptor.InternalReplicas) != replicaCount {
return errors.Errorf("replica count, want %d, current %d", replicaCount, len(desc.InternalReplicas))
desc := tc.LookupRangeOrFatal(t, testKey)
if got := len(desc.Replicas().Descriptors()); got != 1 {
return errors.Errorf("expected 1 replica, found %d", got)
}
return nil
})

desc := tc.LookupRangeOrFatal(t, testKey)
infos, err := filterRangeLog(
tc.Conns[0], desc.RangeID, kvserverpb.RangeLogEventType_remove_voter, kvserverpb.ReasonRangeOverReplicated,
)
if err != nil {
t.Fatal(err)
}
if len(infos) < 1 {
t.Fatalf("found no downreplication due to over-replication in the range logs")
}
require.NoError(t, err)
require.Truef(t, len(infos) >= 1, "found no down replication due to over-replication in the range logs")
}

func scanAndGetNumNonVoters(
Expand Down
Loading

0 comments on commit c956f4c

Please sign in to comment.