Skip to content

Commit

Permalink
kvserver: perform initial upreplication of non-voters synchronously
Browse files Browse the repository at this point in the history
Resolves #63199

Before this commit, we relied on the raft snapshot queue to
asynchronously perform the initial upreplication of non-voting replicas.
This meant that by the time `AdminChangeReplicas` (and consequently,
`AdminRelocateRange`) returned to its client, non-voters were not
guaranteed to have been initialized. This was a deliberate decision and
was, thus far, believed to be copacetic. However, this decision subtly
made range merges (of ranges that have any number of non-voters)
extremely unlikely to suceed, while causing severe disruption on
foreground traffic on the right hand side of a merge. This was because
the `mergeQueue` will first call `AdminRelocateRange` on the right hand
side range in order to collocate its replicas with the replicas of the
left hand side range.  If the `mergeQueue` happened to relocate any
non-voting replicas, they were likely to still be waiting for their
initial snapshot by the time the `AdminMerge` attempted to subsume the
RHS. Essentially, this meant that we were subsuming the RHS of a merge
while some of its replicas weren't even initialized. This would cause
the merge to fail and, in the interim, block all traffic over the RHS
range for a 5 second window.

This commit fixes the unfortunate sequence of events described above by
making the behavior of `AdminChangeReplicas` more symmetric for voting
and non-voting replicas. Now, if `AdminChangeReplicas` successfully
returns, its client can safely assume that all new replicas have at
least been upreplicated via an initial snapshot.

Release note: None
  • Loading branch information
aayushshah15 committed Apr 9, 2021
1 parent 651184b commit 6fca1ce
Show file tree
Hide file tree
Showing 17 changed files with 579 additions and 365 deletions.
18 changes: 0 additions & 18 deletions pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,24 +578,6 @@ func TestFollowerReadsWithStaleDescriptor(t *testing.T) {
n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE VOTERS VALUES (ARRAY[1], 1)`)
n1.Exec(t, `ALTER TABLE test EXPERIMENTAL_RELOCATE NON_VOTERS VALUES (ARRAY[3], 1)`)

// Wait until the new non-voter is upreplicated to n3.
testutils.SucceedsSoon(
t, func() error {
return tc.Server(2).GetStores().(*kvserver.Stores).VisitStores(
func(s *kvserver.Store) error {
repl := s.LookupReplica(tablePrefix)
if repl == nil {
return errors.Errorf("no replica found on store %s", s)
}
if !repl.IsInitialized() {
return errors.Errorf("non-voter not initialized")
}
return nil
},
)
},
)

// Execute the query again and assert the cache is updated. This query will
// not be executed as a follower read since it attempts to use n2 which
// doesn't have a replica any more and then it tries n1 which returns an
Expand Down
73 changes: 73 additions & 0 deletions pkg/kv/kvserver/client_merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4471,6 +4471,79 @@ func TestMergeQueueSeesNonVoters(t *testing.T) {
}
}

// TestMergeQueueWithSlowNonVoterSnaps aims to check that non-voting replicas
// are initially upreplicated through a synchronously-sent snapshot inside of
// `AdminChangeReplicas`, like voting replicas are. Otherwise, range merges
// could be allowed to proceed with subsuming the right-hand side range while it
// still has uninitialized non-voters.
//
// Regression test for https://github.com/cockroachdb/cockroach/issues/63199.
func TestMergeQueueWithSlowNonVoterSnaps(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderShort(t, "this test sleeps for a few seconds")

ctx := context.Background()
var delaySnapshotTrap atomic.Value
var clusterArgs = base.TestClusterArgs{
// We dont want the replicate queue mucking with our test, so disable it.
ReplicationMode: base.ReplicationManual,
ServerArgsPerNode: map[int]base.TestServerArgs{
1: {
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
ReceiveSnapshot: func(header *kvserver.SnapshotRequest_Header) error {
val := delaySnapshotTrap.Load()
if val != nil {
fn := val.(func() error)
return fn()
}
return nil
},
},
},
},
},
}

dbName := "testdb"
tableName := "kv"
numNodes := 3
tc, _ := setupTestClusterWithDummyRange(t, clusterArgs, dbName, tableName, numNodes)
defer tc.Stopper().Stop(ctx)
// We're controlling merge queue operation via
// `store.SetMergeQueueActive`, so enable the cluster setting here.
_, err := tc.ServerConn(0).Exec(`SET CLUSTER SETTING kv.range_merge.queue_enabled=true`)
require.NoError(t, err)

store, err := tc.Server(0).GetStores().(*kvserver.Stores).GetStore(1)
require.Nil(t, err)
// We're going to split the dummy range created above with an empty
// expiration time. Disable the merge queue before splitting so that the
// split ranges aren't immediately merged.
store.SetMergeQueueActive(false)
leftDesc, rightDesc := splitDummyRangeInTestCluster(
t, tc, dbName, tableName, hlc.Timestamp{}, /* splitExpirationTime */
)
require.Equal(t, 1, len(leftDesc.Replicas().Descriptors()))
require.Equal(t, 1, len(rightDesc.Replicas().Descriptors()))

// Add non-voters for the LHS and RHS on servers 1 and 2 respectively so that
// the merge queue logic has to explicitly relocate the RHS non-voter to
// server 1, in order to align replica sets to proceed with the merge.
tc.AddNonVotersOrFatal(t, leftDesc.StartKey.AsRawKey(), tc.Target(1))
tc.AddNonVotersOrFatal(t, rightDesc.StartKey.AsRawKey(), tc.Target(2))

delaySnapshotTrap.Store(func() error {
time.Sleep(5 * time.Second)
return nil
})
store.SetMergeQueueActive(true)
store.MustForceMergeScanAndProcess()
verifyMerged(t, store, leftDesc.StartKey, rightDesc.StartKey)
}

func TestInvalidSubsumeRequest(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/client_migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestMigrateWithInflightSnapshot(t *testing.T) {
if processErr != nil {
return processErr
}
const msg = `skipping snapshot; replica is likely a learner in the process of being added: (n2,s2):2LEARNER`
const msg = `skipping snapshot; replica is likely a LEARNER in the process of being added: (n2,s2):2LEARNER`
formattedTrace := trace.String()
if !strings.Contains(formattedTrace, msg) {
return errors.Errorf(`expected "%s" in trace got:\n%s`, msg, formattedTrace)
Expand Down
17 changes: 10 additions & 7 deletions pkg/kv/kvserver/client_split_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3323,18 +3323,21 @@ func TestSplitTriggerMeetsUnexpectedReplicaID(t *testing.T) {
ctx := context.Background()

blockPromoteCh := make(chan struct{})
var skipLearnerSnaps int32
var skipSnaps int32
withoutLearnerSnap := func(fn func()) {
atomic.StoreInt32(&skipLearnerSnaps, 1)
atomic.StoreInt32(&skipSnaps, 1)
fn()
atomic.StoreInt32(&skipLearnerSnaps, 0)
atomic.StoreInt32(&skipSnaps, 0)
}
knobs := base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
ReplicaSkipLearnerSnapshot: func() bool {
return atomic.LoadInt32(&skipLearnerSnaps) != 0
ReplicaSkipInitialSnapshot: func() bool {
return atomic.LoadInt32(&skipSnaps) != 0
},
ReplicaAddStopAfterLearnerSnapshot: func(targets []roachpb.ReplicationTarget) bool {
if atomic.LoadInt32(&skipLearnerSnaps) != 0 {
RaftSnapshotQueueSkipReplica: func() bool {
return atomic.LoadInt32(&skipSnaps) != 0
},
VoterAddStopAfterLearnerSnapshot: func(targets []roachpb.ReplicationTarget) bool {
if atomic.LoadInt32(&skipSnaps) != 0 {
return false
}
if len(targets) > 0 && targets[0].StoreID == 2 {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kv/kvserver/merge_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ func (mq *mergeQueue) process(
} else if err != nil {
// While range merges are unstable, be extra cautious and mark every error
// as purgatory-worthy.
//
// TODO(aayush): Merges are indeed stable now, we can be smarter here about
// which errors should be marked as purgatory-worthy.
log.Warningf(ctx, "%v", err)
return false, rangeMergePurgatoryError{err}
}
if testingAggressiveConsistencyChecks {
Expand Down
Loading

0 comments on commit 6fca1ce

Please sign in to comment.