Skip to content

Commit

Permalink
kvserver: gracefully handle races during learner removals
Browse files Browse the repository at this point in the history
Previously, at the end of a replication change, if the `ChangeReplicas` request
found that the (demoted) learner replica it was trying to remove from the range
had already been removed (presumably because it raced with the mergeQueue, the
`StoreRebalancer`, or something else), it would error out. This was
unfortunate, because, for all practical purposes, the replication change _had
succeeded_.

We can now gracefully handle this instead by no-oping if we detect that the
replica we were trying to remove has already been removed.

Release note: None
  • Loading branch information
aayushshah15 committed Apr 7, 2022
1 parent 63ea913 commit 03d2b33
Show file tree
Hide file tree
Showing 6 changed files with 229 additions and 40 deletions.
134 changes: 99 additions & 35 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func splitTxnAttempt(
) error {
txn.SetDebugName(splitTxnName)

_, dbDescValue, err := conditionalGetDescValueFromDB(
_, dbDescValue, _, err := conditionalGetDescValueFromDB(
ctx, txn, oldDesc.StartKey, false /* forUpdate */, checkDescsEqual(oldDesc))
if err != nil {
return err
Expand Down Expand Up @@ -256,7 +256,7 @@ func splitTxnAttempt(
func splitTxnStickyUpdateAttempt(
ctx context.Context, txn *kv.Txn, desc *roachpb.RangeDescriptor, expiration hlc.Timestamp,
) error {
_, dbDescValue, err := conditionalGetDescValueFromDB(
_, dbDescValue, _, err := conditionalGetDescValueFromDB(
ctx, txn, desc.StartKey, false /* forUpdate */, checkDescsEqual(desc))
if err != nil {
return err
Expand Down Expand Up @@ -473,7 +473,7 @@ func (r *Replica) adminUnsplitWithDescriptor(
}

if err := r.store.DB().Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
_, dbDescValue, err := conditionalGetDescValueFromDB(
_, dbDescValue, _, err := conditionalGetDescValueFromDB(
ctx, txn, desc.StartKey, false /* forUpdate */, checkDescsEqual(desc))
if err != nil {
return err
Expand Down Expand Up @@ -613,7 +613,7 @@ func (r *Replica) AdminMerge(
// the merge's transaction record. It is critical to the range merge
// protocol that the transaction record be placed on the the left hand
// side's descriptor, as the MergeTrigger depends on this.
_, dbOrigLeftDescValue, err := conditionalGetDescValueFromDB(
_, dbOrigLeftDescValue, _, err := conditionalGetDescValueFromDB(
ctx, txn, origLeftDesc.StartKey, true /* forUpdate */, checkDescsEqual(origLeftDesc))
if err != nil {
return err
Expand Down Expand Up @@ -1124,7 +1124,7 @@ func (r *Replica) changeReplicasImpl(

if removals := targets.nonVoterRemovals; len(removals) > 0 {
for _, rem := range removals {
iChgs := []internalReplicationChange{{target: rem, typ: internalChangeTypeRemove}}
iChgs := []internalReplicationChange{{target: rem, typ: internalChangeTypeRemoveNonVoter}}
var err error
desc, err = execChangeReplicasTxn(ctx, desc, reason, details, iChgs,
changeReplicasTxnArgs{
Expand Down Expand Up @@ -1278,6 +1278,27 @@ func (r *Replica) maybeLeaveAtomicChangeReplicas(
})
}

// TestingRemoveLearner is used by tests to manually remove a learner replica.
func (r *Replica) TestingRemoveLearner(
ctx context.Context, beforeDesc *roachpb.RangeDescriptor, target roachpb.ReplicationTarget,
) (*roachpb.RangeDescriptor, error) {
desc, err := execChangeReplicasTxn(
ctx, beforeDesc, kvserverpb.ReasonAbandonedLearner, "",
[]internalReplicationChange{{target: target, typ: internalChangeTypeRemoveLearner}},
changeReplicasTxnArgs{
db: r.store.DB(),
liveAndDeadReplicas: r.store.allocator.storePool.liveAndDeadReplicas,
logChange: r.store.logChange,
testForceJointConfig: r.store.TestingKnobs().ReplicationAlwaysUseJointConfig,
testAllowDangerousReplicationChanges: r.store.TestingKnobs().AllowDangerousReplicationChanges,
},
)
if err != nil {
return nil, errors.Wrapf(err, `removing learners from %s`, beforeDesc)
}
return desc, err
}

// maybeLeaveAtomicChangeReplicasAndRemoveLearners transitions out of the joint
// config (if there is one), and then removes all learners. After this function
// returns, all remaining replicas will be of type VOTER_FULL or NON_VOTER.
Expand All @@ -1289,6 +1310,9 @@ func (r *Replica) maybeLeaveAtomicChangeReplicasAndRemoveLearners(
return nil, err
}

if fn := r.store.TestingKnobs().BeforeRemovingDemotedLearner; fn != nil {
fn()
}
// Now the config isn't joint any more, but we may have demoted some voters
// into learners. These learners should go as well.
learners := desc.Replicas().LearnerDescriptors()
Expand All @@ -1311,7 +1335,7 @@ func (r *Replica) maybeLeaveAtomicChangeReplicasAndRemoveLearners(
var err error
desc, err = execChangeReplicasTxn(
ctx, desc, kvserverpb.ReasonAbandonedLearner, "",
[]internalReplicationChange{{target: target, typ: internalChangeTypeRemove}},
[]internalReplicationChange{{target: target, typ: internalChangeTypeRemoveLearner}},
changeReplicasTxnArgs{db: store.DB(),
liveAndDeadReplicas: store.allocator.storePool.liveAndDeadReplicas,
logChange: store.logChange,
Expand Down Expand Up @@ -1831,7 +1855,7 @@ func (r *Replica) execReplicationChangesForVoters(
}

for _, target := range voterRemovals {
typ := internalChangeTypeRemove
typ := internalChangeTypeRemoveLearner
if rDesc, ok := desc.GetReplicaDescriptor(target.StoreID); ok && rDesc.GetType() == roachpb.VOTER_FULL {
typ = internalChangeTypeDemoteVoterToLearner
}
Expand Down Expand Up @@ -1870,12 +1894,21 @@ func (r *Replica) tryRollbackRaftLearner(
details string,
) {
repDesc, ok := rangeDesc.GetReplicaDescriptor(target.StoreID)
isLearnerOrNonVoter := repDesc.GetType() == roachpb.LEARNER || repDesc.GetType() == roachpb.NON_VOTER
if !ok || !isLearnerOrNonVoter {
if !ok {
// There's no learner to roll back.
log.Event(ctx, "learner to roll back not found; skipping")
return
}
var removeChgType internalChangeType
switch repDesc.GetType() {
case roachpb.NON_VOTER:
removeChgType = internalChangeTypeRemoveNonVoter
case roachpb.LEARNER:
removeChgType = internalChangeTypeRemoveLearner
default:
log.Event(ctx, "replica to rollback is no longer a learner; skipping")
return
}

// If (for example) the promotion failed because of a context deadline
// exceeded, we do still want to clean up after ourselves, so always use a new
Expand All @@ -1886,7 +1919,7 @@ func (r *Replica) tryRollbackRaftLearner(
rollbackFn := func(ctx context.Context) error {
_, err := execChangeReplicasTxn(
ctx, rangeDesc, reason, details,
[]internalReplicationChange{{target: target, typ: internalChangeTypeRemove}},
[]internalReplicationChange{{target: target, typ: removeChgType}},
changeReplicasTxnArgs{
db: r.store.DB(),
liveAndDeadReplicas: r.store.allocator.storePool.liveAndDeadReplicas,
Expand Down Expand Up @@ -1940,7 +1973,8 @@ const (
// NB: can't remove multiple learners at once (need to remove at least one
// voter with them), see:
// https://github.com/cockroachdb/cockroach/pull/40268
internalChangeTypeRemove
internalChangeTypeRemoveLearner
internalChangeTypeRemoveNonVoter
)

// internalReplicationChange is a replication target together with an internal
Expand All @@ -1962,6 +1996,9 @@ func (c internalReplicationChanges) useJoint() bool {
c[0].typ == internalChangeTypeDemoteVoterToLearner
return len(c) > 1 || isDemotion
}
func (c internalReplicationChanges) isSingleLearnerRemoval() bool {
return len(c) == 1 && c[0].typ == internalChangeTypeRemoveLearner
}

func prepareChangeReplicasTrigger(
ctx context.Context,
Expand Down Expand Up @@ -2013,7 +2050,7 @@ func prepareChangeReplicasTrigger(
chg.target)
}
added = append(added, rDesc)
case internalChangeTypeRemove:
case internalChangeTypeRemoveLearner, internalChangeTypeRemoveNonVoter:
rDesc, ok := updatedDesc.GetReplicaDescriptor(chg.target.StoreID)
if !ok {
return nil, errors.Errorf("target %s not found", chg.target)
Expand Down Expand Up @@ -2169,19 +2206,39 @@ func execChangeReplicasTxn(

descKey := keys.RangeDescriptorKey(referenceDesc.StartKey)

check := func(kvDesc *roachpb.RangeDescriptor) bool {
check := func(kvDesc *roachpb.RangeDescriptor) (matched, skip bool) {
// NB: We might fail to find the range if the range has been merged away
// in which case we definitely want to fail the check below.
if kvDesc != nil && kvDesc.RangeID == referenceDesc.RangeID && chgs.leaveJoint() {
// If there are no changes, we're trying to leave a joint config,
// so that's all we care about. But since leaving a joint config
// is done opportunistically whenever one is encountered, this is
// more likely to race than other operations. So we verify literally
// nothing about the descriptor, but once we get the descriptor out
// from conditionalGetDescValueFromDB, we'll check if it's in a
// joint config and if not, noop.
return true
if kvDesc != nil && kvDesc.RangeID == referenceDesc.RangeID {
if chgs.leaveJoint() && !kvDesc.Replicas().InAtomicReplicationChange() {
// If there are no changes, we're trying to leave a joint config, so
// that's all we care about. But since leaving a joint config is done
// opportunistically whenever one is encountered, this is more likely to
// race than other operations. So we verify that the descriptor fetched
// from kv is indeed in a joint config, and hint to the caller that it
// can no-op this replication change.
log.Infof(
ctx, "we were trying to exit a joint config but found that we are no longer in one; skipping",
)
return true /* matched */, true /* skip */
}
if chgs.isSingleLearnerRemoval() {
// If we're simply trying to remove a learner replica, but find that
// that learner has already been removed from the range, we can no-op.
learnerAlreadyRemoved := true
for _, repl := range kvDesc.Replicas().Descriptors() {
if repl.StoreID == chgs[0].target.StoreID {
learnerAlreadyRemoved = false
break
}
}
if learnerAlreadyRemoved {
log.Infof(ctx, "skipping learner removal because it was already removed")
return true /* matched */, true /* skip */
}
}
}

// Otherwise, check that the descriptors are equal.
//
// TODO(tbg): check that the replica sets are equal only. I was going to
Expand All @@ -2192,13 +2249,13 @@ func execChangeReplicasTxn(
if err := args.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
log.Event(ctx, "attempting txn")
txn.SetDebugName(replicaChangeTxnName)
desc, dbDescValue, err := conditionalGetDescValueFromDB(
desc, dbDescValue, skip, err := conditionalGetDescValueFromDB(
ctx, txn, referenceDesc.StartKey, false /* forUpdate */, check)
if err != nil {
return err
}
if chgs.leaveJoint() && !desc.Replicas().InAtomicReplicationChange() {
// Nothing to do. See comment in 'check' above for details.
if skip {
// The new descriptor already reflects what we needed to get done.
returnDesc = desc
return nil
}
Expand Down Expand Up @@ -2635,9 +2692,11 @@ func replicasCollocated(a, b []roachpb.ReplicaDescriptor) bool {
return true
}

func checkDescsEqual(desc *roachpb.RangeDescriptor) func(*roachpb.RangeDescriptor) bool {
return func(desc2 *roachpb.RangeDescriptor) bool {
return desc.Equal(desc2)
func checkDescsEqual(
desc *roachpb.RangeDescriptor,
) func(*roachpb.RangeDescriptor) (matched bool, skip bool) {
return func(desc2 *roachpb.RangeDescriptor) (matched, skip bool) {
return desc.Equal(desc2) /* matched */, false /* skip */
}
}

Expand All @@ -2646,6 +2705,10 @@ func checkDescsEqual(desc *roachpb.RangeDescriptor) func(*roachpb.RangeDescripto
// the raw fetched roachpb.Value. If the fetched value doesn't match the
// expectation, a ConditionFailedError is returned.
//
// The supplied `check` method verifies whether the descriptor fetched from kv
// matches expectations and returns a "skip" hint that the caller can use to
// optionally elide work that doesn't need to be done.
//
// The method allows callers to specify whether a locking read should be used or
// not. A locking read can be used to manage contention and avoid transaction
// restarts in a read-modify-write operation (which all users of this method
Expand All @@ -2672,29 +2735,30 @@ func conditionalGetDescValueFromDB(
txn *kv.Txn,
startKey roachpb.RKey,
forUpdate bool,
check func(*roachpb.RangeDescriptor) bool,
) (*roachpb.RangeDescriptor, []byte, error) {
check func(*roachpb.RangeDescriptor) (matched, skip bool),
) (kvDesc *roachpb.RangeDescriptor, kvDescBytes []byte, skip bool, err error) {
get := txn.Get
if forUpdate {
get = txn.GetForUpdate
}
descKey := keys.RangeDescriptorKey(startKey)
existingDescKV, err := get(ctx, descKey)
if err != nil {
return nil, nil, errors.Wrap(err, "fetching current range descriptor value")
return nil, nil, skip, errors.Wrap(err, "fetching current range descriptor value")
}
var existingDesc *roachpb.RangeDescriptor
if existingDescKV.Value != nil {
existingDesc = &roachpb.RangeDescriptor{}
if err := existingDescKV.Value.GetProto(existingDesc); err != nil {
return nil, nil, errors.Wrap(err, "decoding current range descriptor value")
return nil, nil, skip, errors.Wrap(err, "decoding current range descriptor value")
}
}

if !check(existingDesc) {
return nil, nil, &roachpb.ConditionFailedError{ActualValue: existingDescKV.Value}
matched, skip := check(existingDesc)
if !matched {
return nil, nil, false /* skip */, &roachpb.ConditionFailedError{ActualValue: existingDescKV.Value}
}
return existingDesc, existingDescKV.Value.TagAndDataBytes(), nil
return existingDesc, existingDescKV.Value.TagAndDataBytes(), skip, nil
}

// updateRangeDescriptor adds a ConditionalPut on the range descriptor. The
Expand Down
82 changes: 82 additions & 0 deletions pkg/kv/kvserver/replica_learner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1079,6 +1079,88 @@ func TestLearnerOrJointConfigAdminRelocateRange(t *testing.T) {
check([]roachpb.ReplicationTarget{tc.Target(0), tc.Target(1), tc.Target(2)})
}

// TestDemotedLearnerRemovalHandlesRace tests that a `ChangeReplicas` request at
// the last step of removing a demoted learner replica does not fail if it finds
// that the learner has been already removed from the range.
func TestDemotedLearnerRemovalHandlesRace(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
var activateTestingKnob int64
waitForRebalanceToBlockCh := make(chan struct{})
blockDemotedLearnerRemovalCh := make(chan struct{})
tc := testcluster.StartTestCluster(
t, 3, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{
Knobs: base.TestingKnobs{
Store: &kvserver.StoreTestingKnobs{
BeforeRemovingDemotedLearner: func() {
if atomic.LoadInt64(&activateTestingKnob) == 1 {
// Signal to the test that the rebalance is now trying to remove
// the demoted learner.
close(waitForRebalanceToBlockCh)
// Wait for the test to manually remove the demoted learner
// before letting the rebalance continue.
blockDemotedLearnerRemovalCh <- struct{}{}
}
},
},
},
},
},
)
defer tc.Stopper().Stop(ctx)

scratchKey := tc.ScratchRange(t)
// Add a new voting replica on node 2, then rebalance it to node 3. This will
// block on blockDemotedLearnerRemovalCh.
tc.AddVotersOrFatal(t, scratchKey, makeReplicationTargets(2)...)
atomic.StoreInt64(&activateTestingKnob, 1)
rebalanceCh := make(chan error)
var finishAndGetRecording func() tracing.Recording
err := tc.Stopper().RunAsyncTask(ctx, "test", func(ctx context.Context) {
ctx, finishAndGetRecording = tracing.ContextWithRecordingSpan(
ctx, tc.Servers[0].Tracer(), "rebalance",
)
_, err := tc.RebalanceVoter(
ctx,
scratchKey,
roachpb.ReplicationTarget{StoreID: 2, NodeID: 2}, /* src */
roachpb.ReplicationTarget{StoreID: 3, NodeID: 3}, /* dest */
)
rebalanceCh <- err
})
require.NoError(t, err)
defer func() {
// Unblock the rebalance and expect it to complete.
<-blockDemotedLearnerRemovalCh
require.NoError(t, <-rebalanceCh)
trace := finishAndGetRecording()
// Check that we actually detected that the learner was removed, and
// no-oped.
require.Contains(t, trace.String(), "skipping learner removal because it was already removed")
}()

select {
case <-waitForRebalanceToBlockCh:
// Continue.
case <-time.After(15 * time.Second):
t.Fatal("timed out waiting for rebalance to block")
}

// Manually remove the learner replica from the range, and expect that to not
// affect the previous rebalance anymore.
_, leaseRepl := getFirstStoreReplica(t, tc.Servers[0], scratchKey)
require.NotNil(t, leaseRepl)
beforeDesc := tc.LookupRangeOrFatal(t, scratchKey)
_, err = leaseRepl.TestingRemoveLearner(
ctx, &beforeDesc, roachpb.ReplicationTarget{StoreID: 2, NodeID: 2},
)
require.NoError(t, err)
}

func TestLearnerAndJointConfigAdminMerge(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
Loading

0 comments on commit 03d2b33

Please sign in to comment.