Skip to content

Commit

Permalink
kvserver: track when the set of overloaded stores changes
Browse files Browse the repository at this point in the history
This commit makes the following changes:

- track *all* IOThresholds in the store's map, not just the ones for
  overloaded stores.
- improve the container for these IOThresholds to be easier to work
  with.
- Rename all uses of the container away from "overload" towards
  "IOThreshold".
- add a Sequence() method that is bumped whenever the set of Stores
  whose IOThreshold score indicates I/O overload changes.

I originally started to work on this to address cockroachdb#84465, but realized
that we couldn't "just" leave the set of paused followers untouched
absent sequence changes. This is because the set of paused followers
has additional inputs, most importantly the set of live followers.
This set is per-Replica and subject to change, so we can't be too
sure the outcome would be the same, and we do want to be reactive
to followers becoming nonresponsive by, if necessary, unpausing
followers.

I think we will have to address cockroachdb#84465 by reducing the frequency
at which the paused stores are revisited, but adding an eager
pass whenever the sequence is bumped.

Additionally, for cockroachdb#84252, we are likely also going to be able to rely on
the sequence number to trigger unquiescing of ranges that were
previously quiesced in the presence of a paused follower.

Touches cockroachdb#84465.
Touches cockroachdb#84252.

Release note: None
  • Loading branch information
tbg committed Aug 11, 2022
1 parent e011dd4 commit 3d44442
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 56 deletions.
12 changes: 12 additions & 0 deletions pkg/cmd/roachtest/tests/foo_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
// Copyright 2022 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
//

package tests
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -636,7 +636,7 @@ type Replica struct {
// Followers to which replication traffic is currently dropped.
//
// Never mutated in place (always replaced wholesale), so can be leaked
// outside of the surrounding mutex.
// outside the surrounding mutex.
pausedFollowers map[roachpb.ReplicaID]struct{}
}

Expand Down
7 changes: 5 additions & 2 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -2601,12 +2601,15 @@ func (r *Replica) sendSnapshot(
return err
}

if ioThresh := r.store.ioOverloadedStores.Load()[recipient.StoreID]; ioThresh != nil && destPaused {
if destPaused {
// If the destination is paused, be more hesitant to send snapshots. The destination being
// paused implies that we have recently checked that it's not required for quorum, and that
// we wish to conserve I/O on that store, which sending a snapshot counteracts. So hold back on
// the snapshot as well.
return errors.Errorf("skipping snapshot; %s is overloaded: %s", recipient, ioThresh)
return errors.Errorf(
"skipping snapshot; %s is overloaded: %s",
recipient, r.store.ioThresholds.Current().IOThreshold(recipient.StoreID),
)
}

// Check follower snapshots cluster setting.
Expand Down
7 changes: 2 additions & 5 deletions pkg/kv/kvserver/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
Expand Down Expand Up @@ -1164,9 +1163,7 @@ func maybeFatalOnRaftReadyErr(ctx context.Context, expl string, err error) (remo
// tick the Raft group, returning true if the raft group exists and should
// be queued for Ready processing; false otherwise.
func (r *Replica) tick(
ctx context.Context,
livenessMap liveness.IsLiveMap,
ioOverloadMap map[roachpb.StoreID]*admissionpb.IOThreshold,
ctx context.Context, livenessMap liveness.IsLiveMap, ioThresholdMap *ioThresholdMap,
) (bool, error) {
r.unreachablesMu.Lock()
remotes := r.unreachablesMu.remotes
Expand All @@ -1191,7 +1188,7 @@ func (r *Replica) tick(
return false, nil
}

r.updatePausedFollowersLocked(ctx, ioOverloadMap)
r.updatePausedFollowersLocked(ctx, ioThresholdMap)

now := r.store.Clock().NowAsClockTimestamp()
if r.maybeQuiesceRaftMuLockedReplicaMuLocked(ctx, now, livenessMap) {
Expand Down
115 changes: 94 additions & 21 deletions pkg/kv/kvserver/replica_raft_overload.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@ import (
"context"
"math/rand"
"sort"
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/v3/tracker"
)
Expand All @@ -42,12 +42,16 @@ var pauseReplicationIOThreshold = settings.RegisterFloatSetting(
},
)

type ioThresholdMapI interface {
// OverloadedWithThreshold returns true if the store's score exceeds the threshold.
OverloadedWithThreshold(_ roachpb.StoreID, threshold float64) bool
}

type computeExpendableOverloadedFollowersInput struct {
self roachpb.ReplicaID
replDescs roachpb.ReplicaSet
// TODO(tbg): all entries are overloaded, so consdier removing the IOThreshold here
// because it's confusing.
ioOverloadMap map[roachpb.StoreID]*admissionpb.IOThreshold
self roachpb.ReplicaID
replDescs roachpb.ReplicaSet
ioOverloadMap ioThresholdMapI
threshold float64 // cut-off above which stores are considered overloaded
// getProgressMap returns Raft's view of the progress map. This is only called
// when needed, and at most once.
getProgressMap func(context.Context) map[uint64]tracker.Progress
Expand Down Expand Up @@ -86,7 +90,7 @@ const (
// overload), this method does very little work.
//
// If at least one follower is (close to being) overloaded, we determine the
// maximum set of such followers that we can afford not replicating to without
// maximum set of such followers that we can afford not to replicate to without
// losing quorum by successively reducing the set of overloaded followers by one
// randomly selected overloaded voter. The randomness makes it more likely that
// when there are multiple overloaded stores in the system that cannot be
Expand All @@ -108,7 +112,7 @@ func computeExpendableOverloadedFollowers(
var prs map[uint64]tracker.Progress

for _, replDesc := range d.replDescs.AsProto() {
if _, overloaded := d.ioOverloadMap[replDesc.StoreID]; !overloaded || replDesc.ReplicaID == d.self {
if overloaded := d.ioOverloadMap.OverloadedWithThreshold(replDesc.StoreID, d.threshold); !overloaded || replDesc.ReplicaID == d.self {
continue
}
// There's at least one overloaded follower, so initialize
Expand Down Expand Up @@ -193,26 +197,94 @@ func computeExpendableOverloadedFollowers(
return liveOverloadedVoterCandidates, nonLive
}

type overloadedStoresMap atomic.Value // map[roachpb.StoreID]*admissionpb.IOThreshold
type ioThresholdMap struct {
seq int
m map[roachpb.StoreID]*admissionpb.IOThreshold
}

var _ ioThresholdMapI = (*ioThresholdMap)(nil)

func (osm *ioThresholdMap) Overloaded(id roachpb.StoreID) bool {
_, overloaded := osm.m[id].Score()
return overloaded
}

// OverloadedWithThreshold implements ioThresholdMapI.
func (osm *ioThresholdMap) OverloadedWithThreshold(id roachpb.StoreID, threshold float64) bool {
sc, _ := osm.m[id].Score()
return sc > threshold
}

func (osm *ioThresholdMap) NumOverloaded() int {
var n int
for id := range osm.m {
if osm.Overloaded(id) {
n++
}
}
return n
}

func (osm *ioThresholdMap) IOThreshold(id roachpb.StoreID) *admissionpb.IOThreshold {
return osm.m[id]
}

// Sequence allows distinguishing sets of overloaded stores. Whenever an
// ioThresholdMap is created, it inherits the sequence of its predecessor,
// incrementing only when the set of overloaded stores has changed in the
// transition.
func (osm *ioThresholdMap) Sequence() int {
return osm.seq
}

func (osm *overloadedStoresMap) Load() map[roachpb.StoreID]*admissionpb.IOThreshold {
v, _ := (*atomic.Value)(osm).Load().(map[roachpb.StoreID]*admissionpb.IOThreshold)
return v
type ioThresholds struct {
mu struct {
syncutil.Mutex
inner *ioThresholdMap // always replaced wholesale, so can leak out of mu
}
}

func (osm *overloadedStoresMap) Swap(
func (osm *ioThresholds) Current() *ioThresholdMap {
osm.mu.Lock()
defer osm.mu.Unlock()
return osm.mu.inner
}

// Replace replaces the stored view of stores for which we track IOThresholds.
// If the set of overloaded stores (i.e. with a score of >=1) changes in the
// process, the updated view will have an incremented Sequence().
func (osm *ioThresholds) Replace(
m map[roachpb.StoreID]*admissionpb.IOThreshold,
) map[roachpb.StoreID]*admissionpb.IOThreshold {
v, _ := (*atomic.Value)(osm).Swap(m).(map[roachpb.StoreID]*admissionpb.IOThreshold)
return v
) (prev, cur *ioThresholdMap) {
osm.mu.Lock()
defer osm.mu.Unlock()
last := osm.mu.inner
if last == nil {
last = &ioThresholdMap{}
}
next := &ioThresholdMap{seq: last.seq, m: m}
var delta int
for id := range last.m {
if last.Overloaded(id) != next.Overloaded(id) {
delta = 1
break
}
}
for id := range next.m {
if last.Overloaded(id) != next.Overloaded(id) {
delta = 1
break
}
}
next.seq += delta
osm.mu.inner = next
return last, next
}

func (r *Replica) updatePausedFollowersLocked(
ctx context.Context, ioOverloadMap map[roachpb.StoreID]*admissionpb.IOThreshold,
) {
func (r *Replica) updatePausedFollowersLocked(ctx context.Context, ioThresholdMap *ioThresholdMap) {
r.mu.pausedFollowers = nil

if len(ioOverloadMap) == 0 {
if ioThresholdMap.NumOverloaded() == 0 {
return
}

Expand Down Expand Up @@ -254,7 +326,8 @@ func (r *Replica) updatePausedFollowersLocked(
d := computeExpendableOverloadedFollowersInput{
self: r.replicaID,
replDescs: r.descRLocked().Replicas(),
ioOverloadMap: ioOverloadMap,
ioOverloadMap: ioThresholdMap,
threshold: pauseReplicationIOThreshold.Get(&r.store.cfg.Settings.SV),
getProgressMap: func(_ context.Context) map[uint64]tracker.Progress {
prs := r.mu.internalRaftGroup.Status().Progress
updateRaftProgressFromActivity(ctx, prs, r.descRLocked().Replicas().AsProto(), func(id roachpb.ReplicaID) bool {
Expand Down
5 changes: 3 additions & 2 deletions pkg/kv/kvserver/replica_raft_overload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestReplicaRaftOverload_computeExpendableOverloadedFollowers(t *testing.T)
var seed uint64
var replDescs roachpb.ReplicaSet
var self roachpb.ReplicaID
ioOverloadMap := map[roachpb.StoreID]*admissionpb.IOThreshold{}
ioOverloadMap := &ioThresholdMap{m: map[roachpb.StoreID]*admissionpb.IOThreshold{}}
snapshotMap := map[roachpb.ReplicaID]struct{}{}
downMap := map[roachpb.ReplicaID]struct{}{}
match := map[roachpb.ReplicaID]uint64{}
Expand Down Expand Up @@ -87,7 +87,7 @@ func TestReplicaRaftOverload_computeExpendableOverloadedFollowers(t *testing.T)
}
replDescs.AddReplica(replDesc)
case "overloaded":
ioOverloadMap[roachpb.StoreID(id)] = &admissionpb.IOThreshold{
ioOverloadMap.m[roachpb.StoreID(id)] = &admissionpb.IOThreshold{
L0NumSubLevels: 1000,
L0NumSubLevelsThreshold: 20,
L0NumFiles: 1,
Expand Down Expand Up @@ -140,6 +140,7 @@ func TestReplicaRaftOverload_computeExpendableOverloadedFollowers(t *testing.T)
self: self,
replDescs: replDescs,
ioOverloadMap: ioOverloadMap,
threshold: 1.0,
getProgressMap: getProgressMap,
seed: int64(seed),
minLiveMatchIndex: minLiveMatchIndex,
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -932,10 +932,10 @@ type Store struct {
// liveness. It is updated periodically in raftTickLoop()
// and reactively in nodeIsLiveCallback() on liveness updates.
livenessMap atomic.Value
// ioOverloadedStores is analogous to livenessMap, but stores a
// map[StoreID]*IOThreshold. It is gossip-backed but is not updated
// ioThresholds is analogous to livenessMap, but stores the *IOThresholds for
// the stores in the cluster . It is gossip-backed but is not updated
// reactively, i.e. will refresh on each tick loop iteration only.
ioOverloadedStores overloadedStoresMap
ioThresholds ioThresholds

// cachedCapacity caches information on store capacity to prevent
// expensive recomputations in case leases or replicas are rapidly
Expand Down
31 changes: 14 additions & 17 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,12 +646,12 @@ func (s *Store) processTick(_ context.Context, rangeID roachpb.RangeID) bool {
return false
}
livenessMap, _ := s.livenessMap.Load().(liveness.IsLiveMap)
storeOverloadMap := s.ioOverloadedStores.Load()
ioThresholds := s.ioThresholds.Current()

start := timeutil.Now()
ctx := r.raftCtx

exists, err := r.tick(ctx, livenessMap, storeOverloadMap)
exists, err := r.tick(ctx, livenessMap, ioThresholds)
if err != nil {
log.Errorf(ctx, "%v", err)
}
Expand Down Expand Up @@ -741,7 +741,7 @@ func (s *Store) raftTickLoop(ctx context.Context) {
if s.cfg.NodeLiveness != nil {
s.updateLivenessMap()
}
s.updateStoreOverloadMap()
s.updateIOThresholdMap()

s.unquiescedReplicas.Lock()
// Why do we bother to ever queue a Replica on the Raft scheduler for
Expand All @@ -765,23 +765,20 @@ func (s *Store) raftTickLoop(ctx context.Context) {

var shouldLogStoreOverloadMap = log.Every(10 * time.Second)

func (s *Store) updateStoreOverloadMap() {
storeOverloadMap := map[roachpb.StoreID]*admissionpb.IOThreshold{}
overloadThresh := pauseReplicationIOThreshold.Get(&s.cfg.Settings.SV)
func (s *Store) updateIOThresholdMap() {
ioThresholdMap := map[roachpb.StoreID]*admissionpb.IOThreshold{}
for _, sd := range s.allocator.StorePool.GetStores() {
if score, _ := sd.Capacity.IOThreshold.Score(); overloadThresh != 0 && score > overloadThresh {
ioThreshold := sd.Capacity.IOThreshold // need a copy
storeOverloadMap[sd.StoreID] = &ioThreshold
}
ioThreshold := sd.Capacity.IOThreshold // need a copy
ioThresholdMap[sd.StoreID] = &ioThreshold
}
old := s.ioOverloadedStores.Swap(storeOverloadMap)
// Consider logging if we're going from seeing overloaded stores to seeing no overloaded stores, or when
// there are still overloaded stores and we haven't logged for a while.
shouldLog := log.V(1) ||
(len(old) > 0 && len(storeOverloadMap) == 0) ||
(len(storeOverloadMap) > 0 && shouldLogStoreOverloadMap.ShouldLog())
old, cur := s.ioThresholds.Replace(ioThresholdMap)
// Log whenever the set of overloaded stores changes.
shouldLog := log.V(1) || old.seq != cur.seq
if shouldLog {
log.Infof(s.AnnotateCtx(context.Background()), "IO overloaded stores [threshold %.2f]: %+v (before: %+v)", overloadThresh, storeOverloadMap, old)
log.Infof(
s.AnnotateCtx(context.Background()), "IO overloaded stores [threshold %.2f]: %+v (before: %+v)",
pauseReplicationIOThreshold.Get(&s.cfg.Settings.SV), ioThresholdMap, old,
)
}
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/util/admission/admissionpb/io_threshold.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ import (
// to compactions falling behind (though that may change if we increase the
// max number of compactions). And we will need to incorporate overload due to
// disk bandwidth bottleneck.
func (iot IOThreshold) Score() (float64, bool) {
if iot == (IOThreshold{}) {
func (iot *IOThreshold) Score() (float64, bool) {
if iot == nil {
return 0, false
}
f := math.Max(
Expand All @@ -43,8 +43,8 @@ func (iot IOThreshold) Score() (float64, bool) {
}

// SafeFormat implements redact.SafeFormatter.
func (iot IOThreshold) SafeFormat(s interfaces.SafePrinter, _ rune) {
if iot == (IOThreshold{}) {
func (iot *IOThreshold) SafeFormat(s interfaces.SafePrinter, _ rune) {
if iot == nil {
s.Printf("N/A")
return
}
Expand All @@ -55,6 +55,6 @@ func (iot IOThreshold) SafeFormat(s interfaces.SafePrinter, _ rune) {
}
}

func (iot IOThreshold) String() string {
func (iot *IOThreshold) String() string {
return redact.StringWithoutMarkers(iot)
}

0 comments on commit 3d44442

Please sign in to comment.