Skip to content

Commit

Permalink
kvserver: allow voter demoting to get a lease, in case there's an inc…
Browse files Browse the repository at this point in the history
…oming voter

It is possible that we've entered a joint config where the leaseholder is being
replaced with an incoming voter. We try to transfer the lease away, but this fails.
In this case, we need the demoted voter to re-aquire the lease, as it might be an epoch
based lease, that doesn't expire. Otherwise we might be stuck without anyone else getting
the lease.

TODO: test and backport this

Release note: None
  • Loading branch information
shralex committed Jul 7, 2022
1 parent 82923fc commit dd22457
Show file tree
Hide file tree
Showing 11 changed files with 312 additions and 67 deletions.
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/replica_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func NewReplicaSlice(
}
}
canReceiveLease := func(rDesc roachpb.ReplicaDescriptor) bool {
if err := roachpb.CheckCanReceiveLease(rDesc, desc.Replicas()); err != nil {
if err := roachpb.CheckCanReceiveLease(rDesc, desc.Replicas(), true /* leaseHolderRemovalAllowed */); err != nil {
return false
}
return true
Expand Down
3 changes: 2 additions & 1 deletion pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -1479,8 +1479,9 @@ func (a *Allocator) ValidLeaseTargets(
) []roachpb.ReplicaDescriptor {
candidates := make([]roachpb.ReplicaDescriptor, 0, len(existing))
replDescs := roachpb.MakeReplicaSet(existing)
lhRemovalAllowed := a.StorePool.St.Version.IsActive(ctx, clusterversion.EnableLeaseHolderRemoval)
for i := range existing {
if err := roachpb.CheckCanReceiveLease(existing[i], replDescs); err != nil {
if err := roachpb.CheckCanReceiveLease(existing[i], replDescs, lhRemovalAllowed); err != nil {
continue
}
// If we're not allowed to include the current replica, remove it from
Expand Down
6 changes: 5 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_lease_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb"
Expand Down Expand Up @@ -67,9 +68,12 @@ func RequestLease(
Requested: args.Lease,
}

lhRemovalAllowed :=
cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx, clusterversion.EnableLeaseHolderRemoval)
// If this check is removed at some point, the filtering of learners on the
// sending side would have to be removed as well.
if err := roachpb.CheckCanReceiveLease(args.Lease.Replica, cArgs.EvalCtx.Desc().Replicas()); err != nil {
if err := roachpb.CheckCanReceiveLease(args.Lease.Replica, cArgs.EvalCtx.Desc().Replicas(),
lhRemovalAllowed); err != nil {
rErr.Message = err.Error()
return newFailedLeaseTrigger(false /* isTransfer */), rErr
}
Expand Down
62 changes: 47 additions & 15 deletions pkg/kv/kvserver/batcheval/cmd_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,10 @@ func TestLeaseCommandLearnerReplica(t *testing.T) {
clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 123)), time.Nanosecond /* maxOffset */)
cArgs := CommandArgs{
EvalCtx: (&MockEvalCtx{
StoreID: voterStoreID,
Desc: &desc,
Clock: clock,
ClusterSettings: cluster.MakeTestingClusterSettings(),
StoreID: voterStoreID,
Desc: &desc,
Clock: clock,
}).EvalContext(),
Args: &roachpb.TransferLeaseRequest{
Lease: roachpb.Lease{
Expand Down Expand Up @@ -265,17 +266,37 @@ func TestCheckCanReceiveLease(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

const none = roachpb.ReplicaType(-1)

for _, tc := range []struct {
leaseholderType roachpb.ReplicaType
eligible bool
leaseholderType roachpb.ReplicaType
anotherReplicaType roachpb.ReplicaType
eligibleLhRemovalEnabled bool
eligibleLhRemovalDisabled bool
}{
{leaseholderType: roachpb.VOTER_FULL, eligible: true},
{leaseholderType: roachpb.VOTER_INCOMING, eligible: true},
{leaseholderType: roachpb.VOTER_OUTGOING, eligible: false},
{leaseholderType: roachpb.VOTER_DEMOTING_LEARNER, eligible: false},
{leaseholderType: roachpb.VOTER_DEMOTING_NON_VOTER, eligible: false},
{leaseholderType: roachpb.LEARNER, eligible: false},
{leaseholderType: roachpb.NON_VOTER, eligible: false},
{leaseholderType: roachpb.VOTER_FULL, anotherReplicaType: none, eligibleLhRemovalEnabled: true, eligibleLhRemovalDisabled: true},
{leaseholderType: roachpb.VOTER_INCOMING, anotherReplicaType: none, eligibleLhRemovalEnabled: true, eligibleLhRemovalDisabled: true},

// A VOTER_OUTGOING should only be able to get the lease if there's a VOTER_INCOMING.
{leaseholderType: roachpb.VOTER_OUTGOING, anotherReplicaType: none, eligibleLhRemovalEnabled: false, eligibleLhRemovalDisabled: false},
{leaseholderType: roachpb.VOTER_OUTGOING, anotherReplicaType: roachpb.VOTER_INCOMING, eligibleLhRemovalEnabled: true, eligibleLhRemovalDisabled: false},
{leaseholderType: roachpb.VOTER_OUTGOING, anotherReplicaType: roachpb.VOTER_OUTGOING, eligibleLhRemovalEnabled: false, eligibleLhRemovalDisabled: false},
{leaseholderType: roachpb.VOTER_OUTGOING, anotherReplicaType: roachpb.VOTER_FULL, eligibleLhRemovalEnabled: false, eligibleLhRemovalDisabled: false},

// A VOTER_DEMOTING_LEARNER should only be able to get the lease if there's a VOTER_INCOMING.
{leaseholderType: roachpb.VOTER_DEMOTING_LEARNER, anotherReplicaType: none, eligibleLhRemovalEnabled: false, eligibleLhRemovalDisabled: false},
{leaseholderType: roachpb.VOTER_DEMOTING_LEARNER, anotherReplicaType: roachpb.VOTER_INCOMING, eligibleLhRemovalEnabled: true, eligibleLhRemovalDisabled: false},
{leaseholderType: roachpb.VOTER_DEMOTING_LEARNER, anotherReplicaType: roachpb.VOTER_FULL, eligibleLhRemovalEnabled: false, eligibleLhRemovalDisabled: false},
{leaseholderType: roachpb.VOTER_DEMOTING_LEARNER, anotherReplicaType: roachpb.VOTER_OUTGOING, eligibleLhRemovalEnabled: false, eligibleLhRemovalDisabled: false},

// A VOTER_DEMOTING_NON_VOTER should only be able to get the lease if there's a VOTER_INCOMING.
{leaseholderType: roachpb.VOTER_DEMOTING_NON_VOTER, anotherReplicaType: none, eligibleLhRemovalEnabled: false, eligibleLhRemovalDisabled: false},
{leaseholderType: roachpb.VOTER_DEMOTING_NON_VOTER, anotherReplicaType: roachpb.VOTER_INCOMING, eligibleLhRemovalEnabled: true, eligibleLhRemovalDisabled: false},
{leaseholderType: roachpb.VOTER_DEMOTING_NON_VOTER, anotherReplicaType: roachpb.VOTER_FULL, eligibleLhRemovalEnabled: false, eligibleLhRemovalDisabled: false},
{leaseholderType: roachpb.VOTER_DEMOTING_NON_VOTER, anotherReplicaType: roachpb.VOTER_OUTGOING, eligibleLhRemovalEnabled: false, eligibleLhRemovalDisabled: false},

{leaseholderType: roachpb.LEARNER, anotherReplicaType: none, eligibleLhRemovalEnabled: false, eligibleLhRemovalDisabled: false},
{leaseholderType: roachpb.NON_VOTER, anotherReplicaType: none, eligibleLhRemovalEnabled: false, eligibleLhRemovalDisabled: false},
} {
t.Run(tc.leaseholderType.String(), func(t *testing.T) {
repDesc := roachpb.ReplicaDescriptor{
Expand All @@ -285,14 +306,25 @@ func TestCheckCanReceiveLease(t *testing.T) {
rngDesc := roachpb.RangeDescriptor{
InternalReplicas: []roachpb.ReplicaDescriptor{repDesc},
}
err := roachpb.CheckCanReceiveLease(rngDesc.InternalReplicas[0], rngDesc.Replicas())
require.Equal(t, tc.eligible, err == nil, "err: %v", err)
if tc.anotherReplicaType != none {
anotherDesc := roachpb.ReplicaDescriptor{
ReplicaID: 2,
Type: &tc.anotherReplicaType,
}
rngDesc.InternalReplicas = append(rngDesc.InternalReplicas, anotherDesc)
}
err := roachpb.CheckCanReceiveLease(rngDesc.InternalReplicas[0], rngDesc.Replicas(), true)
require.Equal(t, tc.eligibleLhRemovalEnabled, err == nil, "err: %v", err)

err = roachpb.CheckCanReceiveLease(rngDesc.InternalReplicas[0], rngDesc.Replicas(), false)
require.Equal(t, tc.eligibleLhRemovalDisabled, err == nil, "err: %v", err)
})
}

t.Run("replica not in range desc", func(t *testing.T) {
repDesc := roachpb.ReplicaDescriptor{ReplicaID: 1}
rngDesc := roachpb.RangeDescriptor{}
require.Regexp(t, "replica.*not found", roachpb.CheckCanReceiveLease(repDesc, rngDesc.Replicas()))
require.Regexp(t, "replica.*not found", roachpb.CheckCanReceiveLease(repDesc,
rngDesc.Replicas(), true))
})
}
7 changes: 6 additions & 1 deletion pkg/kv/kvserver/batcheval/cmd_lease_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/readsummary/rspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/spanset"
Expand Down Expand Up @@ -76,9 +77,13 @@ func TransferLease(
newLease := args.Lease
args.Lease = roachpb.Lease{} // prevent accidental use below

lhRemovalAllowed := cArgs.EvalCtx.ClusterSettings().Version.IsActive(ctx,
clusterversion.EnableLeaseHolderRemoval)
// If this check is removed at some point, the filtering of learners on the
// sending side would have to be removed as well.
if err := roachpb.CheckCanReceiveLease(newLease.Replica, cArgs.EvalCtx.Desc().Replicas()); err != nil {
if err := roachpb.CheckCanReceiveLease(
newLease.Replica, cArgs.EvalCtx.Desc().Replicas(), lhRemovalAllowed,
); err != nil {
return newFailedLeaseTrigger(true /* isTransfer */), err
}

Expand Down
189 changes: 177 additions & 12 deletions pkg/kv/kvserver/client_lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,25 +164,25 @@ func TestGossipNodeLivenessOnLeaseChange(t *testing.T) {
}

// TestCannotTransferLeaseToVoterOutgoing ensures that the evaluation of lease
// requests for nodes which are already in the VOTER_OUTGOING state will fail.
func TestCannotTransferLeaseToVoterOutgoing(t *testing.T) {
// requests for nodes which are already in the VOTER_DEMOTING_LEARNER state will fail
// (in this test, there is no VOTER_INCOMING node).
func TestCannotTransferLeaseToVoterDemoting(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

knobs, ltk := makeReplicationTestKnobs()
// Add a testing knob to allow us to block the change replicas command
// while it is being proposed. When we detect that the change replicas
// command to move n3 to VOTER_OUTGOING has been evaluated, we'll send
// the request to transfer the lease to n3. The hope is that it will
// get past the sanity above latch acquisition prior to change replicas
// command committing.
var scratchRangeID atomic.Value
scratchRangeID.Store(roachpb.RangeID(0))
// command to move n3 to VOTER_DEMOTING_LEARNER has been evaluated, we'll
// send the request to transfer the lease to n3. The hope is that it will
// get past the sanity check above latch acquisition prior to the change
// replicas command committing.
var scratchRangeID int64
changeReplicasChan := make(chan chan struct{}, 1)
shouldBlock := func(args kvserverbase.ProposalFilterArgs) bool {
// Block if a ChangeReplicas command is removing a node from our range.
return args.Req.RangeID == scratchRangeID.Load().(roachpb.RangeID) &&
return args.Req.RangeID == roachpb.RangeID(atomic.LoadInt64(&scratchRangeID)) &&
args.Cmd.ReplicatedEvalResult.ChangeReplicas != nil &&
len(args.Cmd.ReplicatedEvalResult.ChangeReplicas.Removed()) > 0
}
Expand All @@ -205,7 +205,7 @@ func TestCannotTransferLeaseToVoterOutgoing(t *testing.T) {

scratchStartKey := tc.ScratchRange(t)
desc := tc.AddVotersOrFatal(t, scratchStartKey, tc.Targets(1, 2)...)
scratchRangeID.Store(desc.RangeID)
atomic.StoreInt64(&scratchRangeID, int64(desc.RangeID))
// Make sure n1 has the lease to start with.
err := tc.Server(0).DB().AdminTransferLease(context.Background(),
scratchStartKey, tc.Target(0).StoreID)
Expand All @@ -214,7 +214,7 @@ func TestCannotTransferLeaseToVoterOutgoing(t *testing.T) {
// The test proceeds as follows:
//
// - Send an AdminChangeReplicasRequest to remove n3 and add n4
// - Block the step that moves n3 to VOTER_OUTGOING on changeReplicasChan
// - Block the step that moves n3 to VOTER_DEMOTING_LEARNER on changeReplicasChan
// - Send an AdminLeaseTransfer to make n3 the leaseholder
// - Try really hard to make sure that the lease transfer at least gets to
// latch acquisition before unblocking the ChangeReplicas.
Expand All @@ -229,7 +229,6 @@ func TestCannotTransferLeaseToVoterOutgoing(t *testing.T) {
_, err = tc.Server(0).DB().AdminChangeReplicas(ctx,
scratchStartKey, desc, []roachpb.ReplicationChange{
{ChangeType: roachpb.REMOVE_VOTER, Target: tc.Target(2)},
{ChangeType: roachpb.ADD_VOTER, Target: tc.Target(3)},
})
require.NoError(t, err)
}()
Expand Down Expand Up @@ -262,7 +261,173 @@ func TestCannotTransferLeaseToVoterOutgoing(t *testing.T) {
close(ch)
wg.Wait()
})
}

// TestTransferLeaseToVoterOutgoingWithIncoming ensures that the evaluation of lease
// requests for nodes which are already in the VOTER_DEMOTING_LEARNER state succeeds
// when there is a VOTER_INCOMING node.
func TestTransferLeaseToVoterDemotingWithIncoming(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

knobs, ltk := makeReplicationTestKnobs()
// Add a testing knob to allow us to block the change replicas command
// while it is being proposed. When we detect that the change replicas
// command to move n3 to VOTER_DEMOTING_LEARNER has been evaluated, we'll
// send the request to transfer the lease to n3. The hope is that it will
// get past the sanity check above latch acquisition prior to the change
// replicas command committing.
var scratchRangeID int64
changeReplicasChan := make(chan chan struct{}, 1)
shouldBlock := func(args kvserverbase.ProposalFilterArgs) bool {
// Block if a ChangeReplicas command is removing a node from our range.
return args.Req.RangeID == roachpb.RangeID(atomic.LoadInt64(&scratchRangeID)) &&
args.Cmd.ReplicatedEvalResult.ChangeReplicas != nil &&
len(args.Cmd.ReplicatedEvalResult.ChangeReplicas.Removed()) > 0
}
blockIfShould := func(args kvserverbase.ProposalFilterArgs) {
if shouldBlock(args) {
ch := make(chan struct{})
changeReplicasChan <- ch
<-ch
}
}
knobs.Store.(*kvserver.StoreTestingKnobs).TestingProposalFilter = func(args kvserverbase.ProposalFilterArgs) *roachpb.Error {
blockIfShould(args)
return nil
}
tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)

scratchStartKey := tc.ScratchRange(t)
desc := tc.AddVotersOrFatal(t, scratchStartKey, tc.Targets(1, 2)...)
atomic.StoreInt64(&scratchRangeID, int64(desc.RangeID))
// Make sure n1 has the lease to start with.
err := tc.Server(0).DB().AdminTransferLease(context.Background(),
scratchStartKey, tc.Target(0).StoreID)
require.NoError(t, err)

// The test proceeds as follows:
//
// - Send an AdminChangeReplicasRequest to remove n3 and add n4
// - Block the step that moves n3 to VOTER_DEMOTING_LEARNER on changeReplicasChan
// - Send an AdminLeaseTransfer to make n3 the leaseholder
// - Try really hard to make sure that the lease transfer at least gets to
// latch acquisition before unblocking the ChangeReplicas.
// - Unblock the ChangeReplicas.
// - Make sure the lease transfer succeeds.

ltk.withStopAfterJointConfig(func() {
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, err = tc.Server(0).DB().AdminChangeReplicas(ctx,
scratchStartKey, desc, []roachpb.ReplicationChange{
{ChangeType: roachpb.REMOVE_VOTER, Target: tc.Target(2)},
{ChangeType: roachpb.ADD_VOTER, Target: tc.Target(3)},
})
require.NoError(t, err)
}()
ch := <-changeReplicasChan
wg.Add(1)
go func() {
defer wg.Done()
// Make sure the lease is currently on n1.
desc, err := tc.LookupRange(scratchStartKey)
require.NoError(t, err)
leaseHolder, err := tc.FindRangeLeaseHolder(desc, nil)
require.NoError(t, err)
require.Equal(t, tc.Target(0), leaseHolder,
errors.Errorf("Leaseholder supposed to be on n1."))
// Move the lease to n3, the VOTER_DEMOTING_LEARNER.
err = tc.Server(0).DB().AdminTransferLease(context.Background(),
scratchStartKey, tc.Target(2).StoreID)
require.NoError(t, err)
// Make sure the lease moved to n3.
leaseHolder, err = tc.FindRangeLeaseHolder(desc, nil)
require.NoError(t, err)
require.Equal(t, tc.Target(2), leaseHolder,
errors.Errorf("Leaseholder supposed to be on n3."))
}()
// Try really hard to make sure that our request makes it past the
// sanity check error to the evaluation error.
for i := 0; i < 100; i++ {
runtime.Gosched()
time.Sleep(time.Microsecond)
}
close(ch)
wg.Wait()
})
}

// TestTransferLeaseFailureDuringJointConfig reproduces
// https://github.com/cockroachdb/cockroach/issues/83687
// and makes sure that if lease transfer fails during a joint configuration
// the previous leaseholder will successfully re-aquire the lease.
func TestTransferLeaseFailureDuringJointConfig(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

knobs := base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{}}
// Add a testing knob to allow us to fall a lease transfer command
// while it is being proposed.
var scratchRangeID int64
shouldFailProposal := func(args kvserverbase.ProposalFilterArgs) bool {
// Block if a ChangeReplicas command is removing a node from our range.
return args.Req.RangeID == roachpb.RangeID(atomic.LoadInt64(&scratchRangeID)) &&
args.Req.IsSingleTransferLeaseRequest()
}
knobs.Store.(*kvserver.StoreTestingKnobs).TestingProposalFilter = func(args kvserverbase.ProposalFilterArgs) *roachpb.Error {
if shouldFailProposal(args) {
return roachpb.NewErrorf("Injecting lease transfer failure")
}
return nil
}
tc := testcluster.StartTestCluster(t, 4, base.TestClusterArgs{
ServerArgs: base.TestServerArgs{Knobs: knobs},
ReplicationMode: base.ReplicationManual,
})
defer tc.Stopper().Stop(ctx)

scratchStartKey := tc.ScratchRange(t)
desc := tc.AddVotersOrFatal(t, scratchStartKey, tc.Targets(1, 2)...)
// Make sure n1 has the lease to start with.
err := tc.Server(0).DB().AdminTransferLease(context.Background(),
scratchStartKey, tc.Target(0).StoreID)
require.NoError(t, err)

// The next lease transfer should fail.
atomic.StoreInt64(&scratchRangeID, int64(desc.RangeID))

_, err = tc.Server(0).DB().AdminChangeReplicas(ctx,
scratchStartKey, desc, []roachpb.ReplicationChange{
{ChangeType: roachpb.REMOVE_VOTER, Target: tc.Target(0)},
{ChangeType: roachpb.ADD_VOTER, Target: tc.Target(3)},
})
require.Error(t, err)
require.Regexp(t, "Injecting lease transfer failure", err)

desc, err = tc.LookupRange(scratchStartKey)
require.NoError(t, err)
require.True(t, desc.Replicas().InAtomicReplicationChange())

// Further lease transfers should succeed.
atomic.StoreInt64(&scratchRangeID, 0)
store := tc.GetFirstStoreFromServer(t, 0)
repl := store.LookupReplica(roachpb.RKey(scratchStartKey))
_, _, err = store.Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, err)
desc, err = tc.LookupRange(scratchStartKey)
require.NoError(t, err)
require.False(t, desc.Replicas().InAtomicReplicationChange())
}

// TestStoreLeaseTransferTimestampCacheRead verifies that the timestamp cache on
Expand Down
Loading

0 comments on commit dd22457

Please sign in to comment.