Skip to content

Commit

Permalink
storage: quiesce ranges which have non-live replicas
Browse files Browse the repository at this point in the history
Previously all replicas had to be completely up to date in order to
quiesce ranges. This made the loss of a node in a cluster with many
ranges an expensive proposition, as a significant number of ranges
could be kept unquiesced for as long as the node was down.

This change refreshes a liveness map from the `NodeLiveness`
object on every Raft ticker loop and then passes that to
`Replica.tick()` to allow the leader to disregard non-live nodes
when making its should-quiesce determination.

Release note (performance improvement): prevent dead nodes in clusters
with many ranges from causing unnecessarily high CPU usage.

Note that this PR requires cockroachdb#26908 to function properly

Fixes cockroachdb#9446
  • Loading branch information
spencerkimball committed Jul 13, 2018
1 parent e484e51 commit c47ace3
Show file tree
Hide file tree
Showing 3 changed files with 133 additions and 21 deletions.
68 changes: 51 additions & 17 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -1556,6 +1556,10 @@ func (r *Replica) Desc() *roachpb.RangeDescriptor {
return r.mu.state.Desc
}

func (r *Replica) descRLocked() *roachpb.RangeDescriptor {
return r.mu.state.Desc
}

// NodeID returns the ID of the node this replica belongs to.
func (r *Replica) NodeID() roachpb.NodeID {
return r.store.nodeDesc.NodeID
Expand Down Expand Up @@ -3604,6 +3608,12 @@ func (r *Replica) quiesceLocked() bool {
return true
}

func (r *Replica) unquiesce() {
r.mu.Lock()
defer r.mu.Unlock()
r.unquiesceLocked()
}

func (r *Replica) unquiesceLocked() {
if r.mu.quiescent {
ctx := r.AnnotateCtx(context.TODO())
Expand Down Expand Up @@ -4082,7 +4092,7 @@ func fatalOnRaftReadyErr(ctx context.Context, expl string, err error) {

// tick the Raft group, returning any error and true if the raft group exists
// and false otherwise.
func (r *Replica) tick() (bool, error) {
func (r *Replica) tick(livenessMap map[roachpb.NodeID]bool) (bool, error) {
r.unreachablesMu.Lock()
remotes := r.unreachablesMu.remotes
r.unreachablesMu.remotes = nil
Expand All @@ -4106,7 +4116,7 @@ func (r *Replica) tick() (bool, error) {
if r.mu.quiescent {
return false, nil
}
if r.maybeQuiesceLocked() {
if r.maybeQuiesceLocked(livenessMap) {
return false, nil
}

Expand Down Expand Up @@ -4165,31 +4175,36 @@ func (r *Replica) tick() (bool, error) {
// it will either still apply to the recipient or the recipient will have moved
// forward and the quiesce message will fall back to being a heartbeat.
//
// The supplied livenessMap maps from node ID to a boolean indicating
// liveness. A range may be quiesced in the presence of non-live
// replicas if the remaining live replicas all meet the quiesce
// requirements. When a node considered non-live becomes live, the
// node liveness instance invokes a callback which causes all nodes to
// wakes up any ranges containing replicas owned by the newly-live
// node, allowing the out-of-date replicas to be brought back up to date.
// If livenessMap is nil, liveness data will not be used, meaning no range
// will quiesce if any replicas are behind, whether or not they are live.
// If any entry in the livenessMap is nil, then the missing node ID is
// treated as not live.
//
// TODO(peter): There remains a scenario in which a follower is left unquiesced
// while the leader is quiesced: the follower's receive queue is full and the
// "quiesce" message is dropped. This seems very very unlikely because if the
// follower isn't keeping up with raft messages it is unlikely that the leader
// would quiesce. The fallout from this situation are undesirable raft
// elections which will cause throughput hiccups to the range, but not
// correctness issues.
//
// TODO(peter): When a node goes down, any range which has a replica on the
// down node will not quiesce. This could be a significant performance
// impact. Additionally, when the node comes back up we want to bring any
// replicas it contains back up to date. Right now this will be handled because
// those ranges never quiesce. One thought for handling both these scenarios is
// to hook into the StorePool and its notion of "down" nodes. But that might
// not be sensitive enough.
func (r *Replica) maybeQuiesceLocked() bool {
func (r *Replica) maybeQuiesceLocked(livenessMap map[roachpb.NodeID]bool) bool {
ctx := r.AnnotateCtx(context.TODO())
status, ok := shouldReplicaQuiesce(ctx, r, r.store.Clock().Now(), len(r.mu.proposals))
status, ok := shouldReplicaQuiesce(ctx, r, r.store.Clock().Now(), len(r.mu.proposals), livenessMap)
if !ok {
return false
}
return r.quiesceAndNotifyLocked(ctx, status)
}

type quiescer interface {
descRLocked() *roachpb.RangeDescriptor
raftStatusRLocked() *raft.Status
raftLastIndexLocked() (uint64, error)
hasRaftReadyRLocked() bool
Expand Down Expand Up @@ -4221,7 +4236,11 @@ func (r *Replica) maybeTransferRaftLeader(
// facilitate testing. Returns the raft.Status and true on success, and (nil,
// false) on failure.
func shouldReplicaQuiesce(
ctx context.Context, q quiescer, now hlc.Timestamp, numProposals int,
ctx context.Context,
q quiescer,
now hlc.Timestamp,
numProposals int,
livenessMap map[roachpb.NodeID]bool,
) (*raft.Status, bool) {
if numProposals != 0 {
if log.V(4) {
Expand Down Expand Up @@ -4286,15 +4305,30 @@ func shouldReplicaQuiesce(
}
return nil, false
}

var foundSelf bool
for id, progress := range status.Progress {
if id == status.ID {
for _, rep := range q.descRLocked().Replicas {
if uint64(rep.ReplicaID) == status.ID {
foundSelf = true
}
if progress.Match != status.Applied {
if progress, ok := status.Progress[uint64(rep.ReplicaID)]; !ok {
if log.V(4) {
log.Infof(ctx, "not quiescing: could not locate replica %d in progress: %+v",
rep.ReplicaID, progress)
}
return nil, false
} else if progress.Match != status.Applied {
// Skip any node in the descriptor which is not live.
if livenessMap != nil && !livenessMap[rep.NodeID] {
if log.V(4) {
log.Infof(ctx, "skipping node %d because not live. Progress=%+v",
rep.NodeID, progress)
}
continue
}
if log.V(4) {
log.Infof(ctx, "not quiescing: replica %d match (%d) != applied (%d)",
id, progress.Match, status.Applied)
rep.ReplicaID, progress.Match, status.Applied)
}
return nil, false
}
Expand Down
46 changes: 43 additions & 3 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8152,7 +8152,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) {
ticks := r.mu.ticks
r.mu.Unlock()
for ; (ticks % electionTicks) != 0; ticks++ {
if _, err := r.tick(); err != nil {
if _, err := r.tick(nil); err != nil {
t.Fatal(err)
}
}
Expand Down Expand Up @@ -8206,7 +8206,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) {
r.mu.Unlock()

// Tick raft.
if _, err := r.tick(); err != nil {
if _, err := r.tick(nil); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -9349,11 +9349,17 @@ func TestSplitMsgApps(t *testing.T) {
}

type testQuiescer struct {
desc roachpb.RangeDescriptor
numProposals int
status *raft.Status
lastIndex uint64
raftReady bool
ownsValidLease bool
livenessMap map[roachpb.NodeID]bool
}

func (q *testQuiescer) descRLocked() *roachpb.RangeDescriptor {
return &q.desc
}

func (q *testQuiescer) raftStatusRLocked() *raft.Status {
Expand Down Expand Up @@ -9390,6 +9396,13 @@ func TestShouldReplicaQuiesce(t *testing.T) {
// true. The transform function is intended to perform one mutation to
// this quiescer so that shouldReplicaQuiesce will return false.
q := &testQuiescer{
desc: roachpb.RangeDescriptor{
Replicas: []roachpb.ReplicaDescriptor{
{NodeID: 1, ReplicaID: 1},
{NodeID: 2, ReplicaID: 2},
{NodeID: 3, ReplicaID: 3},
},
},
status: &raft.Status{
ID: 1,
HardState: raftpb.HardState{
Expand All @@ -9409,9 +9422,14 @@ func TestShouldReplicaQuiesce(t *testing.T) {
lastIndex: logIndex,
raftReady: false,
ownsValidLease: true,
livenessMap: map[roachpb.NodeID]bool{
1: true,
2: true,
3: true,
},
}
q = transform(q)
_, ok := shouldReplicaQuiesce(context.Background(), q, hlc.Timestamp{}, q.numProposals)
_, ok := shouldReplicaQuiesce(context.Background(), q, hlc.Timestamp{}, q.numProposals, q.livenessMap)
if expected != ok {
t.Fatalf("expected %v, but found %v", expected, ok)
}
Expand Down Expand Up @@ -9471,6 +9489,28 @@ func TestShouldReplicaQuiesce(t *testing.T) {
q.raftReady = true
return q
})
// Create a mismatch between the raft progress replica IDs and the
// replica IDs in the range descriptor.
for i := 0; i < 3; i++ {
test(false, func(q *testQuiescer) *testQuiescer {
q.desc.Replicas[i].ReplicaID = roachpb.ReplicaID(4 + i)
return q
})
}
// Pass a nil liveness map.
test(true, func(q *testQuiescer) *testQuiescer {
q.livenessMap = nil
return q
})
// Verify quiesce even when replica progress doesn't match, if
// the replica is on a non-live node.
for _, i := range []uint64{1, 2, 3} {
test(true, func(q *testQuiescer) *testQuiescer {
q.livenessMap[roachpb.NodeID(i)] = false
q.status.Progress[i] = raft.Progress{Match: invalidIndex}
return q
})
}
}

func TestReplicaRecomputeStats(t *testing.T) {
Expand Down
40 changes: 39 additions & 1 deletion pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,10 @@ type Store struct {

scheduler *raftScheduler

// livenessMap is a map from nodeID to a bool indicating
// liveness. It is updated periodically in raftTickLoop().
livenessMap atomic.Value

// cachedCapacity caches information on store capacity to prevent
// expensive recomputations in case leases or replicas are rapidly
// rebalancing.
Expand Down Expand Up @@ -1423,6 +1427,12 @@ func (s *Store) Start(ctx context.Context, stopper *stop.Stopper) error {
s.cfg.Transport.Listen(s.StoreID(), s)
s.processRaft(ctx)

// Register a callback to unquiesce any ranges with replicas on a
// node transitioning from non-live to live.
if s.cfg.NodeLiveness != nil {
s.cfg.NodeLiveness.RegisterCallback(s.nodeIsLiveCallback)
}

// Gossip is only ever nil while bootstrapping a cluster and
// in unittests.
if s.cfg.Gossip != nil {
Expand Down Expand Up @@ -3699,17 +3709,41 @@ func (s *Store) processTick(ctx context.Context, rangeID roachpb.RangeID) bool {
if !ok {
return false
}
livenessMap, _ := s.livenessMap.Load().(map[roachpb.NodeID]bool)

start := timeutil.Now()
r := (*Replica)(value)
exists, err := r.tick()
exists, err := r.tick(livenessMap)
if err != nil {
log.Error(ctx, err)
}
s.metrics.RaftTickingDurationNanos.Inc(timeutil.Since(start).Nanoseconds())
return exists // ready
}

// nodeIsLiveCallback is invoked when a node transitions from non-live
// to live. Iterate through all replicas and find any which belong to
// ranges containing the implicated node. Unquiesce if currently
// quiesced. Note that this mechanism can race with concurrent
// invocations of processTick, which may have a copy of the previous
// livenessMap where the now-live node is down. Those instances should
// be rare, however, and we expect the newly live node to eventually
// unquiesce the range.
func (s *Store) nodeIsLiveCallback(nodeID roachpb.NodeID) {
// Update the liveness map.
s.livenessMap.Store(s.cfg.NodeLiveness.GetIsLiveMap())

s.mu.replicas.Range(func(k int64, v unsafe.Pointer) bool {
r := (*Replica)(v)
for _, rep := range r.Desc().Replicas {
if rep.NodeID == nodeID {
r.unquiesce()
}
}
return true
})
}

func (s *Store) processRaft(ctx context.Context) {
if s.cfg.TestingKnobs.DisableProcessRaft {
return
Expand All @@ -3736,6 +3770,10 @@ func (s *Store) raftTickLoop(ctx context.Context) {
select {
case <-ticker.C:
rangeIDs = rangeIDs[:0]
// Update the liveness map.
if s.cfg.NodeLiveness != nil {
s.livenessMap.Store(s.cfg.NodeLiveness.GetIsLiveMap())
}

s.unquiescedReplicas.Lock()
// Why do we bother to ever queue a Replica on the Raft scheduler for
Expand Down

0 comments on commit c47ace3

Please sign in to comment.