diff --git a/pkg/cmd/roachtest/tests/foo_test.go b/pkg/cmd/roachtest/tests/foo_test.go new file mode 100644 index 000000000000..33a01543efcf --- /dev/null +++ b/pkg/cmd/roachtest/tests/foo_test.go @@ -0,0 +1,12 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +// + +package tests diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 2862b5ace9c8..1ce438c56de5 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -636,7 +636,7 @@ type Replica struct { // Followers to which replication traffic is currently dropped. // // Never mutated in place (always replaced wholesale), so can be leaked - // outside of the surrounding mutex. + // outside the surrounding mutex. pausedFollowers map[roachpb.ReplicaID]struct{} } diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index f48c2423dd6b..9adc86ffc2c0 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -2601,12 +2601,15 @@ func (r *Replica) sendSnapshot( return err } - if ioThresh := r.store.ioOverloadedStores.Load()[recipient.StoreID]; ioThresh != nil && destPaused { + if destPaused { // If the destination is paused, be more hesitant to send snapshots. The destination being // paused implies that we have recently checked that it's not required for quorum, and that // we wish to conserve I/O on that store, which sending a snapshot counteracts. So hold back on // the snapshot as well. - return errors.Errorf("skipping snapshot; %s is overloaded: %s", recipient, ioThresh) + return errors.Errorf( + "skipping snapshot; %s is overloaded: %s", + recipient, r.store.ioThresholds.Current().IOThreshold(recipient.StoreID), + ) } // Check follower snapshots cluster setting. diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index fd9cc8ee0a2b..22998029aca1 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -30,7 +30,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" - "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" @@ -1164,9 +1163,7 @@ func maybeFatalOnRaftReadyErr(ctx context.Context, expl string, err error) (remo // tick the Raft group, returning true if the raft group exists and should // be queued for Ready processing; false otherwise. func (r *Replica) tick( - ctx context.Context, - livenessMap liveness.IsLiveMap, - ioOverloadMap map[roachpb.StoreID]*admissionpb.IOThreshold, + ctx context.Context, livenessMap liveness.IsLiveMap, ioThresholdMap *ioThresholdMap, ) (bool, error) { r.unreachablesMu.Lock() remotes := r.unreachablesMu.remotes @@ -1191,7 +1188,7 @@ func (r *Replica) tick( return false, nil } - r.updatePausedFollowersLocked(ctx, ioOverloadMap) + r.updatePausedFollowersLocked(ctx, ioThresholdMap) now := r.store.Clock().NowAsClockTimestamp() if r.maybeQuiesceRaftMuLockedReplicaMuLocked(ctx, now, livenessMap) { diff --git a/pkg/kv/kvserver/replica_raft_overload.go b/pkg/kv/kvserver/replica_raft_overload.go index 003ede45322e..b7f0c2a79d67 100644 --- a/pkg/kv/kvserver/replica_raft_overload.go +++ b/pkg/kv/kvserver/replica_raft_overload.go @@ -14,11 +14,11 @@ import ( "context" "math/rand" "sort" - "sync/atomic" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/errors" "go.etcd.io/etcd/raft/v3/tracker" ) @@ -42,12 +42,16 @@ var pauseReplicationIOThreshold = settings.RegisterFloatSetting( }, ) +type ioThresholdMapI interface { + // OverloadedWithThreshold returns true if the store's score exceeds the threshold. + OverloadedWithThreshold(_ roachpb.StoreID, threshold float64) bool +} + type computeExpendableOverloadedFollowersInput struct { - self roachpb.ReplicaID - replDescs roachpb.ReplicaSet - // TODO(tbg): all entries are overloaded, so consdier removing the IOThreshold here - // because it's confusing. - ioOverloadMap map[roachpb.StoreID]*admissionpb.IOThreshold + self roachpb.ReplicaID + replDescs roachpb.ReplicaSet + ioOverloadMap ioThresholdMapI + threshold float64 // cut-off above which stores are considered overloaded // getProgressMap returns Raft's view of the progress map. This is only called // when needed, and at most once. getProgressMap func(context.Context) map[uint64]tracker.Progress @@ -86,7 +90,7 @@ const ( // overload), this method does very little work. // // If at least one follower is (close to being) overloaded, we determine the -// maximum set of such followers that we can afford not replicating to without +// maximum set of such followers that we can afford not to replicate to without // losing quorum by successively reducing the set of overloaded followers by one // randomly selected overloaded voter. The randomness makes it more likely that // when there are multiple overloaded stores in the system that cannot be @@ -108,7 +112,7 @@ func computeExpendableOverloadedFollowers( var prs map[uint64]tracker.Progress for _, replDesc := range d.replDescs.AsProto() { - if _, overloaded := d.ioOverloadMap[replDesc.StoreID]; !overloaded || replDesc.ReplicaID == d.self { + if overloaded := d.ioOverloadMap.OverloadedWithThreshold(replDesc.StoreID, d.threshold); !overloaded || replDesc.ReplicaID == d.self { continue } // There's at least one overloaded follower, so initialize @@ -193,26 +197,94 @@ func computeExpendableOverloadedFollowers( return liveOverloadedVoterCandidates, nonLive } -type overloadedStoresMap atomic.Value // map[roachpb.StoreID]*admissionpb.IOThreshold +type ioThresholdMap struct { + seq int + m map[roachpb.StoreID]*admissionpb.IOThreshold +} + +var _ ioThresholdMapI = (*ioThresholdMap)(nil) + +func (osm *ioThresholdMap) Overloaded(id roachpb.StoreID) bool { + _, overloaded := osm.m[id].Score() + return overloaded +} + +// OverloadedWithThreshold implements ioThresholdMapI. +func (osm *ioThresholdMap) OverloadedWithThreshold(id roachpb.StoreID, threshold float64) bool { + sc, _ := osm.m[id].Score() + return sc > threshold +} + +func (osm *ioThresholdMap) NumOverloaded() int { + var n int + for id := range osm.m { + if osm.Overloaded(id) { + n++ + } + } + return n +} + +func (osm *ioThresholdMap) IOThreshold(id roachpb.StoreID) *admissionpb.IOThreshold { + return osm.m[id] +} + +// Sequence allows distinguishing sets of overloaded stores. Whenever an +// ioThresholdMap is created, it inherits the sequence of its predecessor, +// incrementing only when the set of overloaded stores has changed in the +// transition. +func (osm *ioThresholdMap) Sequence() int { + return osm.seq +} -func (osm *overloadedStoresMap) Load() map[roachpb.StoreID]*admissionpb.IOThreshold { - v, _ := (*atomic.Value)(osm).Load().(map[roachpb.StoreID]*admissionpb.IOThreshold) - return v +type ioThresholds struct { + mu struct { + syncutil.Mutex + inner *ioThresholdMap // always replaced wholesale, so can leak out of mu + } } -func (osm *overloadedStoresMap) Swap( +func (osm *ioThresholds) Current() *ioThresholdMap { + osm.mu.Lock() + defer osm.mu.Unlock() + return osm.mu.inner +} + +// Replace replaces the stored view of stores for which we track IOThresholds. +// If the set of overloaded stores (i.e. with a score of >=1) changes in the +// process, the updated view will have an incremented Sequence(). +func (osm *ioThresholds) Replace( m map[roachpb.StoreID]*admissionpb.IOThreshold, -) map[roachpb.StoreID]*admissionpb.IOThreshold { - v, _ := (*atomic.Value)(osm).Swap(m).(map[roachpb.StoreID]*admissionpb.IOThreshold) - return v +) (prev, cur *ioThresholdMap) { + osm.mu.Lock() + defer osm.mu.Unlock() + last := osm.mu.inner + if last == nil { + last = &ioThresholdMap{} + } + next := &ioThresholdMap{seq: last.seq, m: m} + var delta int + for id := range last.m { + if last.Overloaded(id) != next.Overloaded(id) { + delta = 1 + break + } + } + for id := range next.m { + if last.Overloaded(id) != next.Overloaded(id) { + delta = 1 + break + } + } + next.seq += delta + osm.mu.inner = next + return last, next } -func (r *Replica) updatePausedFollowersLocked( - ctx context.Context, ioOverloadMap map[roachpb.StoreID]*admissionpb.IOThreshold, -) { +func (r *Replica) updatePausedFollowersLocked(ctx context.Context, ioThresholdMap *ioThresholdMap) { r.mu.pausedFollowers = nil - if len(ioOverloadMap) == 0 { + if ioThresholdMap.NumOverloaded() == 0 { return } @@ -254,7 +326,8 @@ func (r *Replica) updatePausedFollowersLocked( d := computeExpendableOverloadedFollowersInput{ self: r.replicaID, replDescs: r.descRLocked().Replicas(), - ioOverloadMap: ioOverloadMap, + ioOverloadMap: ioThresholdMap, + threshold: pauseReplicationIOThreshold.Get(&r.store.cfg.Settings.SV), getProgressMap: func(_ context.Context) map[uint64]tracker.Progress { prs := r.mu.internalRaftGroup.Status().Progress updateRaftProgressFromActivity(ctx, prs, r.descRLocked().Replicas().AsProto(), func(id roachpb.ReplicaID) bool { diff --git a/pkg/kv/kvserver/replica_raft_overload_test.go b/pkg/kv/kvserver/replica_raft_overload_test.go index 842d08cd268e..65d12f379473 100644 --- a/pkg/kv/kvserver/replica_raft_overload_test.go +++ b/pkg/kv/kvserver/replica_raft_overload_test.go @@ -47,7 +47,7 @@ func TestReplicaRaftOverload_computeExpendableOverloadedFollowers(t *testing.T) var seed uint64 var replDescs roachpb.ReplicaSet var self roachpb.ReplicaID - ioOverloadMap := map[roachpb.StoreID]*admissionpb.IOThreshold{} + ioOverloadMap := &ioThresholdMap{m: map[roachpb.StoreID]*admissionpb.IOThreshold{}} snapshotMap := map[roachpb.ReplicaID]struct{}{} downMap := map[roachpb.ReplicaID]struct{}{} match := map[roachpb.ReplicaID]uint64{} @@ -87,7 +87,7 @@ func TestReplicaRaftOverload_computeExpendableOverloadedFollowers(t *testing.T) } replDescs.AddReplica(replDesc) case "overloaded": - ioOverloadMap[roachpb.StoreID(id)] = &admissionpb.IOThreshold{ + ioOverloadMap.m[roachpb.StoreID(id)] = &admissionpb.IOThreshold{ L0NumSubLevels: 1000, L0NumSubLevelsThreshold: 20, L0NumFiles: 1, @@ -140,6 +140,7 @@ func TestReplicaRaftOverload_computeExpendableOverloadedFollowers(t *testing.T) self: self, replDescs: replDescs, ioOverloadMap: ioOverloadMap, + threshold: 1.0, getProgressMap: getProgressMap, seed: int64(seed), minLiveMatchIndex: minLiveMatchIndex, diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index aa72c7f8c7af..0233815a3f2d 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -932,10 +932,10 @@ type Store struct { // liveness. It is updated periodically in raftTickLoop() // and reactively in nodeIsLiveCallback() on liveness updates. livenessMap atomic.Value - // ioOverloadedStores is analogous to livenessMap, but stores a - // map[StoreID]*IOThreshold. It is gossip-backed but is not updated + // ioThresholds is analogous to livenessMap, but stores the *IOThresholds for + // the stores in the cluster . It is gossip-backed but is not updated // reactively, i.e. will refresh on each tick loop iteration only. - ioOverloadedStores overloadedStoresMap + ioThresholds ioThresholds // cachedCapacity caches information on store capacity to prevent // expensive recomputations in case leases or replicas are rapidly diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 6cc38f1bdc1f..afa98dd605bc 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -646,12 +646,12 @@ func (s *Store) processTick(_ context.Context, rangeID roachpb.RangeID) bool { return false } livenessMap, _ := s.livenessMap.Load().(liveness.IsLiveMap) - storeOverloadMap := s.ioOverloadedStores.Load() + ioThresholds := s.ioThresholds.Current() start := timeutil.Now() ctx := r.raftCtx - exists, err := r.tick(ctx, livenessMap, storeOverloadMap) + exists, err := r.tick(ctx, livenessMap, ioThresholds) if err != nil { log.Errorf(ctx, "%v", err) } @@ -741,7 +741,7 @@ func (s *Store) raftTickLoop(ctx context.Context) { if s.cfg.NodeLiveness != nil { s.updateLivenessMap() } - s.updateStoreOverloadMap() + s.updateIOThresholdMap() s.unquiescedReplicas.Lock() // Why do we bother to ever queue a Replica on the Raft scheduler for @@ -765,23 +765,20 @@ func (s *Store) raftTickLoop(ctx context.Context) { var shouldLogStoreOverloadMap = log.Every(10 * time.Second) -func (s *Store) updateStoreOverloadMap() { - storeOverloadMap := map[roachpb.StoreID]*admissionpb.IOThreshold{} - overloadThresh := pauseReplicationIOThreshold.Get(&s.cfg.Settings.SV) +func (s *Store) updateIOThresholdMap() { + ioThresholdMap := map[roachpb.StoreID]*admissionpb.IOThreshold{} for _, sd := range s.allocator.StorePool.GetStores() { - if score, _ := sd.Capacity.IOThreshold.Score(); overloadThresh != 0 && score > overloadThresh { - ioThreshold := sd.Capacity.IOThreshold // need a copy - storeOverloadMap[sd.StoreID] = &ioThreshold - } + ioThreshold := sd.Capacity.IOThreshold // need a copy + ioThresholdMap[sd.StoreID] = &ioThreshold } - old := s.ioOverloadedStores.Swap(storeOverloadMap) - // Consider logging if we're going from seeing overloaded stores to seeing no overloaded stores, or when - // there are still overloaded stores and we haven't logged for a while. - shouldLog := log.V(1) || - (len(old) > 0 && len(storeOverloadMap) == 0) || - (len(storeOverloadMap) > 0 && shouldLogStoreOverloadMap.ShouldLog()) + old, cur := s.ioThresholds.Replace(ioThresholdMap) + // Log whenever the set of overloaded stores changes. + shouldLog := log.V(1) || old.seq != cur.seq if shouldLog { - log.Infof(s.AnnotateCtx(context.Background()), "IO overloaded stores [threshold %.2f]: %+v (before: %+v)", overloadThresh, storeOverloadMap, old) + log.Infof( + s.AnnotateCtx(context.Background()), "IO overloaded stores [threshold %.2f]: %+v (before: %+v)", + pauseReplicationIOThreshold.Get(&s.cfg.Settings.SV), ioThresholdMap, old, + ) } } diff --git a/pkg/util/admission/admissionpb/io_threshold.go b/pkg/util/admission/admissionpb/io_threshold.go index 2ed07df22064..ab088938aaca 100644 --- a/pkg/util/admission/admissionpb/io_threshold.go +++ b/pkg/util/admission/admissionpb/io_threshold.go @@ -31,8 +31,8 @@ import ( // to compactions falling behind (though that may change if we increase the // max number of compactions). And we will need to incorporate overload due to // disk bandwidth bottleneck. -func (iot IOThreshold) Score() (float64, bool) { - if iot == (IOThreshold{}) { +func (iot *IOThreshold) Score() (float64, bool) { + if iot == nil { return 0, false } f := math.Max( @@ -43,8 +43,8 @@ func (iot IOThreshold) Score() (float64, bool) { } // SafeFormat implements redact.SafeFormatter. -func (iot IOThreshold) SafeFormat(s interfaces.SafePrinter, _ rune) { - if iot == (IOThreshold{}) { +func (iot *IOThreshold) SafeFormat(s interfaces.SafePrinter, _ rune) { + if iot == nil { s.Printf("N/A") return } @@ -55,6 +55,6 @@ func (iot IOThreshold) SafeFormat(s interfaces.SafePrinter, _ rune) { } } -func (iot IOThreshold) String() string { +func (iot *IOThreshold) String() string { return redact.StringWithoutMarkers(iot) }