Skip to content

Commit

Permalink
Merge pull request #22767 from asubiotto/lead-check
Browse files Browse the repository at this point in the history
 storage: transfer raft leadership and wait grace period when draining
  • Loading branch information
asubiotto authored Feb 27, 2018
2 parents 5c8b360 + 68577d7 commit 5fafb86
Show file tree
Hide file tree
Showing 5 changed files with 92 additions and 32 deletions.
75 changes: 49 additions & 26 deletions pkg/storage/client_replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ func TestRangeTransferLease(t *testing.T) {
// replica. The check is executed in a retry loop because the lease may not
// have been applied yet.
checkHasLease := func(t *testing.T, sender *storage.Stores) {
t.Helper()
testutils.SucceedsSoon(t, func() error {
return sendRead(sender).GoError()
})
Expand Down Expand Up @@ -724,39 +725,57 @@ func TestRangeTransferLease(t *testing.T) {
}
})

// We have to ensure that replica0 is the raft leader and that replica1 has
// caught up to replica0 as draining code doesn't transfer leases to
// behind replicas.
testutils.SucceedsSoon(t, func() error {
r := mtc.getRaftLeader(rangeID)
if r == nil {
return errors.Errorf("could not find raft leader replica for range %d", rangeID)
}
desc, err := r.GetReplicaDescriptor()
// ensureLeaderAndRaftState is a helper function that blocks until leader is
// the raft leader and follower is up to date.
ensureLeaderAndRaftState := func(leader *storage.Replica, follower roachpb.ReplicaDescriptor) {
t.Helper()
leaderDesc, err := leader.GetReplicaDescriptor()
if err != nil {
return errors.Wrap(err, "could not get replica descriptor")
}
if desc != replica0Desc {
return errors.Errorf("expected replica with id %v to be raft leader, instead got id %v", replica0Desc.ReplicaID, desc.ReplicaID)
t.Fatal(err)
}
return nil
})
testutils.SucceedsSoon(t, func() error {
r := mtc.getRaftLeader(rangeID)
if r == nil {
return errors.Errorf("could not find raft leader replica for range %d", rangeID)
}
desc, err := r.GetReplicaDescriptor()
if err != nil {
return errors.Wrap(err, "could not get replica descriptor")
}
if desc != leaderDesc {
return errors.Errorf(
"expected replica with id %v to be raft leader, instead got id %v",
leaderDesc.ReplicaID,
desc.ReplicaID,
)
}
return nil
})

testutils.SucceedsSoon(t, func() error {
status := replica0.RaftStatus()
progress, ok := status.Progress[uint64(replica1Desc.ReplicaID)]
if !ok {
return errors.Errorf("replica1 progress not found in progress map: %v", status.Progress)
}
if progress.Match < status.Commit {
return errors.New("replica1 failed to catch up")
}
return nil
})
testutils.SucceedsSoon(t, func() error {
status := leader.RaftStatus()
progress, ok := status.Progress[uint64(follower.ReplicaID)]
if !ok {
return errors.Errorf(
"replica %v progress not found in progress map: %v",
follower.ReplicaID,
status.Progress,
)
}
if progress.Match < status.Commit {
return errors.Errorf("replica %v failed to catch up", follower.ReplicaID)
}
return nil
})
}

// DrainTransfer verifies that a draining store attempts to transfer away
// range leases owned by its replicas.
t.Run("DrainTransfer", func(t *testing.T) {
// We have to ensure that replica0 is the raft leader and that replica1 has
// caught up to replica0 as draining code doesn't transfer leases to
// behind replicas.
ensureLeaderAndRaftState(replica0, replica1Desc)
mtc.stores[0].SetDraining(true)

// Check that replica0 doesn't serve reads any more.
Expand Down Expand Up @@ -798,6 +817,10 @@ func TestRangeTransferLease(t *testing.T) {

// Wait for extension to be blocked.
<-extensionSem

// Make sure that replica 0 is up to date enough to receive the lease.
ensureLeaderAndRaftState(replica1, replica0Desc)

// Drain node 1 with an extension in progress.
go func() {
mtc.stores[1].SetDraining(true)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1221,7 +1221,7 @@ func (m *multiTestContext) getRaftLeader(rangeID roachpb.RangeID) *storage.Repli
// status yet.
continue
}
if raftStatus.Term > latestTerm {
if raftStatus.Term > latestTerm || (raftLeaderRepl == nil && raftStatus.Term == latestTerm) {
// If we find any newer term, it means any previous election is
// invalid.
raftLeaderRepl = nil
Expand Down
5 changes: 5 additions & 0 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,11 @@ type Replica struct {
// Note that there are two replicaStateLoaders, in raftMu and mu,
// depending on which lock is being held.
stateLoader stateloader.StateLoader

// draining specifies whether this replica is draining. Raft leadership
// transfers due to a lease change will be attempted even if the target does
// not have all the log entries.
draining bool
}

unreachablesMu struct {
Expand Down
7 changes: 5 additions & 2 deletions pkg/storage/replica_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,8 +419,11 @@ func (r *Replica) maybeTransferRaftLeadership(ctx context.Context, target roachp
err := r.withRaftGroup(func(raftGroup *raft.RawNode) (bool, error) {
// Only the raft leader can attempt a leadership transfer.
if status := raftGroup.Status(); status.RaftState == raft.StateLeader {
// Only attempt this if the target has all the log entries.
if pr, ok := status.Progress[uint64(target)]; ok && pr.Match == r.mu.lastIndex {
// Only attempt this if the target has all the log entries. Although
// TransferLeader is supposed to do the right thing if the target is not
// caught up, this check avoids periods of 0 QPS:
// https://github.com/cockroachdb/cockroach/issues/22573#issuecomment-366106118
if pr, ok := status.Progress[uint64(target)]; (ok && pr.Match == r.mu.lastIndex) || r.mu.draining {
log.VEventf(ctx, 1, "transferring raft leadership to replica ID %v", target)
r.store.metrics.RangeRaftLeaderTransfers.Inc(1)
raftGroup.TransferLeader(uint64(target))
Expand Down
35 changes: 32 additions & 3 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -943,13 +943,21 @@ func (s *Store) AnnotateCtx(ctx context.Context) context.Context {
return s.cfg.AmbientCtx.AnnotateCtx(ctx)
}

const raftLeadershipTransferWait = 5 * time.Second

// SetDraining (when called with 'true') causes incoming lease transfers to be
// rejected, prevents all of the Store's Replicas from acquiring or extending
// range leases, and attempts to transfer away any leases owned.
// When called with 'false', returns to the normal mode of operation.
func (s *Store) SetDraining(drain bool) {
s.draining.Store(drain)
if !drain {
newStoreReplicaVisitor(s).Visit(func(r *Replica) bool {
r.mu.Lock()
r.mu.draining = false
r.mu.Unlock()
return true
})
return
}

Expand All @@ -965,6 +973,11 @@ func (s *Store) SetDraining(drain bool) {
r.AnnotateCtx(ctx), "storage.Store: draining replica", sem, true, /* wait */
func(ctx context.Context) {
defer wg.Done()

r.mu.Lock()
r.mu.draining = true
r.mu.Unlock()

var drainingLease roachpb.Lease
for {
var llHandle *leaseRequestHandle
Expand Down Expand Up @@ -993,14 +1006,27 @@ func (s *Store) SetDraining(drain bool) {
log.Errorf(ctx, "could not get zone config for key %s when draining: %s", desc.StartKey, err)
}
}
if _, err := s.replicateQueue.transferLease(
transferred, err := s.replicateQueue.transferLease(
ctx,
r,
desc,
zone,
transferLeaseOptions{},
); log.V(1) && err != nil {
log.Errorf(ctx, "error transferring lease when draining: %s", err)
)
if log.V(1) {
if transferred {
log.Infof(ctx, "transferred lease %s for replica %s", drainingLease, desc)
} else {
// Note that a nil error means that there were no suitable
// candidates.
log.Errorf(
ctx,
"did not transfer lease %s for replica %s when draining: %s",
drainingLease,
desc,
err,
)
}
}
}
}); err != nil {
Expand All @@ -1013,6 +1039,9 @@ func (s *Store) SetDraining(drain bool) {
return true
})
wg.Wait()
if drain {
time.Sleep(raftLeadershipTransferWait)
}
}

// IsStarted returns true if the Store has been started.
Expand Down

0 comments on commit 5fafb86

Please sign in to comment.