diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index e083bd0728c7..3fc0c4e559f4 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -58,6 +58,7 @@ go_library( "replica_proposal_quota.go", "replica_protected_timestamp.go", "replica_raft.go", + "replica_raft_overload.go", "replica_raft_quiesce.go", "replica_raftstorage.go", "replica_range_lease.go", @@ -235,6 +236,7 @@ go_test( "client_replica_backpressure_test.go", "client_replica_circuit_breaker_test.go", "client_replica_gc_test.go", + "client_replica_raft_overload_test.go", "client_replica_test.go", "client_spanconfigs_test.go", "client_split_burst_test.go", @@ -283,6 +285,7 @@ go_test( "replica_probe_test.go", "replica_proposal_buf_test.go", "replica_protected_timestamp_test.go", + "replica_raft_overload_test.go", "replica_raft_test.go", "replica_raft_truncation_test.go", "replica_rangefeed_test.go", @@ -401,6 +404,7 @@ go_test( "//pkg/ts", "//pkg/ts/tspb", "//pkg/util", + "//pkg/util/admission/admissionpb", "//pkg/util/caller", "//pkg/util/circuit", "//pkg/util/contextutil", diff --git a/pkg/kv/kvserver/client_replica_raft_overload_test.go b/pkg/kv/kvserver/client_replica_raft_overload_test.go new file mode 100644 index 000000000000..e7bcab234852 --- /dev/null +++ b/pkg/kv/kvserver/client_replica_raft_overload_test.go @@ -0,0 +1,101 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +// + +package kvserver_test + +import ( + "context" + "sync/atomic" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +// TestReplicaRaftOverload is an end-to-end test verifying that leaseholders +// will "pause" followers that are on overloaded stores, and will unpause when +// the overload ends. +// +// This primarily tests the gossip signal as well as the mechanics of pausing, +// and does not check that "pausing" really results in no traffic being sent +// to the followers. +func TestReplicaRaftOverload(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + var on atomic.Value // bool + on.Store(false) + var args base.TestClusterArgs + args.ReplicationMode = base.ReplicationManual + args.ServerArgs.Knobs.Store = &kvserver.StoreTestingKnobs{StoreGossipIntercept: func(descriptor *roachpb.StoreDescriptor) { + if !on.Load().(bool) || descriptor.StoreID != 3 { + return + } + descriptor.Capacity.IOThreshold = admissionpb.IOThreshold{ + L0NumSubLevels: 1000000, + L0NumSubLevelsThreshold: 1, + L0NumFiles: 1000000, + L0NumFilesThreshold: 1, + } + }} + tc := testcluster.StartTestCluster(t, 3, args) + defer tc.Stopper().Stop(ctx) + + { + _, err := tc.ServerConn(0).Exec(`SET CLUSTER SETTING admission.kv.pause_replication_io_threshold = 1.0`) + require.NoError(t, err) + } + k := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, k, tc.Targets(1, 2)...) + // Also split off another range that is on s3 so that we can verify that a + // quiesced range doesn't do anything unexpected, like not release the paused + // follower from its map (and thus messing with metrics when overload recovers). + tc.SplitRangeOrFatal(t, k.Next()) + + // Gossip faux IO overload from s3. s1 should pick up on that and pause followers. + on.Store(true) + require.NoError(t, tc.GetFirstStoreFromServer(t, 2 /* n3 */).GossipStore(ctx, false /* useCached */)) + testutils.SucceedsSoon(t, func() error { + // Touch the one range that is on s3 since it's likely quiesced, and wouldn't unquiesce + // if s3 becomes overloaded. Note that we do no such thing for the right sibling range, + // so it may or may not contribute here (depending on when it quiesces). + // + // See: https://github.com/cockroachdb/cockroach/issues/84252 + require.NoError(t, tc.Servers[0].DB().Put(ctx, tc.ScratchRange(t), "foo")) + s1 := tc.GetFirstStoreFromServer(t, 0) + require.NoError(t, s1.ComputeMetrics(ctx, 0 /* tick */)) + if n := s1.Metrics().RaftPausedFollowerCount.Value(); n == 0 { + return errors.New("no paused followers") + } + return nil + }) + + // Now remove the gossip intercept and check again. The follower should un-pause immediately. + on.Store(false) + require.NoError(t, tc.GetFirstStoreFromServer(t, 2 /* n3 */).GossipStore(ctx, false /* useCached */)) + testutils.SucceedsSoon(t, func() error { + s1 := tc.GetFirstStoreFromServer(t, 0) + require.NoError(t, s1.ComputeMetrics(ctx, 0 /* tick */)) + if n := s1.Metrics().RaftPausedFollowerCount.Value(); n > 0 { + return errors.Errorf("%d paused followers", n) + } + return nil + }) +} diff --git a/pkg/kv/kvserver/kvserverpb/state.proto b/pkg/kv/kvserver/kvserverpb/state.proto index ac6cdb577f70..d14566cbbe3d 100644 --- a/pkg/kv/kvserver/kvserverpb/state.proto +++ b/pkg/kv/kvserver/kvserverpb/state.proto @@ -184,6 +184,7 @@ message RangeInfo { // The circuit breaker error, if any. This is nonzero if and only if the // circuit breaker on the source Replica is tripped. string circuit_breaker_error = 20; + repeated int32 paused_replicas = 21 [(gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/roachpb.ReplicaID"]; } // RangeSideTransportInfo describes a range's closed timestamp info communicated diff --git a/pkg/kv/kvserver/metrics.go b/pkg/kv/kvserver/metrics.go index 01eb6ebc9f5d..76d1bd2960f3 100644 --- a/pkg/kv/kvserver/metrics.go +++ b/pkg/kv/kvserver/metrics.go @@ -907,6 +907,20 @@ difficult to meaningfully interpret this metric.`, Unit: metric.Unit_COUNT, } + metaRaftFollowerPaused = metric.Metadata{ + Name: "admission.raft.paused_replicas", + Help: `Number of followers (i.e. Replicas) to which replication is currently paused to help them recover from I/O overload. + +Such Replicas will be ignored for the purposes of proposal quota, and will not +receive replication traffic. They are essentially treated as offline for the +purpose of replication. This serves as a crude form of admission control. + +The count is emitted by the leaseholder of each range. +.`, + Measurement: "Followers", + Unit: metric.Unit_COUNT, + } + // Replica queue metrics. metaMVCCGCQueueSuccesses = metric.Metadata{ Name: "queue.gc.process.success", @@ -1619,6 +1633,8 @@ type StoreMetrics struct { RaftLogFollowerBehindCount *metric.Gauge RaftLogTruncated *metric.Counter + RaftPausedFollowerCount *metric.Gauge + RaftEnqueuedPending *metric.Gauge RaftCoalescedHeartbeatsPending *metric.Gauge @@ -2095,6 +2111,8 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { RaftLogFollowerBehindCount: metric.NewGauge(metaRaftLogFollowerBehindCount), RaftLogTruncated: metric.NewCounter(metaRaftLogTruncated), + RaftPausedFollowerCount: metric.NewGauge(metaRaftFollowerPaused), + RaftEnqueuedPending: metric.NewGauge(metaRaftEnqueuedPending), // This Gauge measures the number of heartbeats queued up just before diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index bb51c0ab94b7..5115b7294ed2 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -139,6 +139,18 @@ func (rq *raftSnapshotQueue) processRaftSnapshot( return false, nil } } + repl.mu.RLock() + _, destPaused := repl.mu.pausedFollowers[id] + repl.mu.RUnlock() + if ioThresh := repl.store.ioOverloadedStores.Load()[repDesc.StoreID]; ioThresh != nil && destPaused { + // If the destination is paused, be more hesitant to send snapshots. The destination being + // paused implies that we have recently checked that it's not required for quorum, and that + // we wish to conserve I/O on that store, which sending a snapshot counteracts. So hold back on + // the snapshot as well. + err := errors.Errorf("skipping snapshot; %s is overloaded: %s", repDesc, ioThresh) + repl.reportSnapshotStatus(ctx, repDesc.ReplicaID, err) + return false, err + } err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY) diff --git a/pkg/kv/kvserver/raft_transport.go b/pkg/kv/kvserver/raft_transport.go index ed8564a03cf1..4e935a98b0f3 100644 --- a/pkg/kv/kvserver/raft_transport.go +++ b/pkg/kv/kvserver/raft_transport.go @@ -144,6 +144,12 @@ type RaftMessageHandler interface { ) error } +// TODO(tbg): remove all of these metrics. The "NodeID" in this struct refers to the remote NodeID, i.e. when we send +// a message it refers to the recipient and when we receive a message it refers to the sender. This doesn't map to +// metrics well, where everyone should report on their local decisions. Instead have a *RaftTransportMetrics struct +// that is per-Store and tracks metrics on behalf of that Store. +// +// See: https://github.com/cockroachdb/cockroach/issues/83917 type raftTransportStats struct { nodeID roachpb.NodeID queue int diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 57fddaec43e9..b6c520336a5d 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -13,6 +13,7 @@ package kvserver import ( "context" "fmt" + "sort" "sync/atomic" "time" "unsafe" @@ -566,14 +567,6 @@ type Replica struct { // (making the assumption that all followers are live at that point), // and when the range unquiesces (marking all replicating followers as // live). - // - // TODO(tschottdorf): keeping a map on each replica seems to be - // overdoing it. We should map the replicaID to a NodeID and then use - // node liveness (or any sensible measure of the peer being around). - // The danger in doing so is that a single stuck replica on an otherwise - // functioning node could fill up the quota pool. We are already taking - // this kind of risk though: a replica that gets stuck on an otherwise - // live node will not lose leaseholdership. lastUpdateTimes lastUpdateTimesMap // Computed checksum at a snapshot UUID. @@ -653,6 +646,12 @@ type Replica struct { // Historical information about the command that set the closed timestamp. closedTimestampSetter closedTimestampSetterInfo + + // Followers to which replication traffic is currently dropped. + // + // Never mutated in place (always replaced wholesale), so can be leaked + // outside of the surrounding mutex. + pausedFollowers map[roachpb.ReplicaID]struct{} } // The raft log truncations that are pending. Access is protected by its own @@ -698,6 +697,11 @@ type Replica struct { // loadBasedSplitter keeps information about load-based splitting. loadBasedSplitter split.Decider + // TODO(tbg): this is effectively unused, we only use it to call ReportUnreachable + // when a heartbeat gets dropped but it's unclear whether a) that ever fires in + // practice b) if it provides any benefit. + // + // See: https://github.com/cockroachdb/cockroach/issues/84246 unreachablesMu struct { syncutil.Mutex remotes map[roachpb.ReplicaID]struct{} @@ -1288,7 +1292,16 @@ func (r *Replica) State(ctx context.Context) kvserverpb.RangeInfo { if err := r.breaker.Signal().Err(); err != nil { ri.CircuitBreakerError = err.Error() } - + if m := r.mu.pausedFollowers; len(m) > 0 { + var sl []roachpb.ReplicaID + for id := range m { + sl = append(sl, id) + } + sort.Slice(sl, func(i, j int) bool { + return sl[i] < sl[j] + }) + ri.PausedReplicas = sl + } return ri } diff --git a/pkg/kv/kvserver/replica_metrics.go b/pkg/kv/kvserver/replica_metrics.go index 847b9f4926d1..64d1e724ebb5 100644 --- a/pkg/kv/kvserver/replica_metrics.go +++ b/pkg/kv/kvserver/replica_metrics.go @@ -42,12 +42,13 @@ type ReplicaMetrics struct { // RangeCounter is true if the current replica is responsible for range-level // metrics (generally the leaseholder, if live, otherwise the first replica in the // range descriptor). - RangeCounter bool - Unavailable bool - Underreplicated bool - Overreplicated bool - RaftLogTooLarge bool - BehindCount int64 + RangeCounter bool + Unavailable bool + Underreplicated bool + Overreplicated bool + RaftLogTooLarge bool + BehindCount int64 + PausedFollowerCount int64 QuotaPoolPercentUsed int64 // [0,100] @@ -74,6 +75,7 @@ func (r *Replica) Metrics( qpCap = int64(q.Capacity()) // NB: max capacity is MaxInt64, see NewIntPool qpUsed = qpCap - qpAvail } + paused := r.mu.pausedFollowers r.mu.RUnlock() r.store.unquiescedReplicas.Lock() @@ -101,6 +103,7 @@ func (r *Replica) Metrics( raftLogSize, raftLogSizeTrusted, qpUsed, qpCap, + paused, ) } @@ -122,6 +125,7 @@ func calcReplicaMetrics( raftLogSize int64, raftLogSizeTrusted bool, qpUsed, qpCapacity int64, // quota pool used and capacity bytes + paused map[roachpb.ReplicaID]struct{}, ) ReplicaMetrics { var m ReplicaMetrics @@ -150,6 +154,7 @@ func calcReplicaMetrics( // behind. if m.Leader { m.BehindCount = calcBehindCount(raftStatus, desc, livenessMap) + m.PausedFollowerCount = int64(len(paused)) } m.LatchMetrics = latchMetrics diff --git a/pkg/kv/kvserver/replica_proposal_quota.go b/pkg/kv/kvserver/replica_proposal_quota.go index a89ee1b805db..3c0103fcf82d 100644 --- a/pkg/kv/kvserver/replica_proposal_quota.go +++ b/pkg/kv/kvserver/replica_proposal_quota.go @@ -144,6 +144,7 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( // cannot correspond to values beyond the applied index there's no reason // to consider progress beyond it as meaningful. minIndex := status.Applied + r.mu.internalRaftGroup.WithProgress(func(id uint64, _ raft.ProgressType, progress tracker.Progress) { rep, ok := r.mu.state.Desc.GetReplicaDescriptorByID(roachpb.ReplicaID(id)) if !ok { @@ -207,6 +208,14 @@ func (r *Replica) updateProposalQuotaRaftMuLocked( if progress.Match < r.mu.proposalQuotaBaseIndex { return } + if _, paused := r.mu.pausedFollowers[roachpb.ReplicaID(id)]; paused { + // We are dropping MsgApp to this store, so we are effectively treating + // it as non-live for the purpose of replication and are letting it fall + // behind intentionally. + // + // See #79215. + return + } if progress.Match > 0 && progress.Match < minIndex { minIndex = progress.Match } diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index f95fc2d9cfd2..9563c9668359 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -30,6 +30,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" @@ -720,6 +721,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return unquiesceAndWakeLeader, nil }) r.mu.applyingEntries = len(rd.CommittedEntries) > 0 + pausedFollowers := r.mu.pausedFollowers r.mu.Unlock() if errors.Is(err, errRemoved) { // If we've been removed then just return. @@ -911,7 +913,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( msgApps, otherMsgs := splitMsgApps(rd.Messages) r.traceMessageSends(msgApps, "sending msgApp") - r.sendRaftMessagesRaftMuLocked(ctx, msgApps) + r.sendRaftMessagesRaftMuLocked(ctx, msgApps, pausedFollowers) // Use a more efficient write-only batch because we don't need to do any // reads from the batch. Any reads are performed on the underlying DB. @@ -1032,7 +1034,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // Update raft log entry cache. We clear any older, uncommitted log entries // and cache the latest ones. r.store.raftEntryCache.Add(r.RangeID, rd.Entries, true /* truncate */) - r.sendRaftMessagesRaftMuLocked(ctx, otherMsgs) + r.sendRaftMessagesRaftMuLocked(ctx, otherMsgs, nil /* blocked */) r.traceEntries(rd.CommittedEntries, "committed, before applying any entries") stats.tApplicationBegin = timeutil.Now() @@ -1154,9 +1156,13 @@ func maybeFatalOnRaftReadyErr(ctx context.Context, expl string, err error) (remo } } -// tick the Raft group, returning true if the raft group exists and is -// unquiesced; false otherwise. -func (r *Replica) tick(ctx context.Context, livenessMap liveness.IsLiveMap) (bool, error) { +// tick the Raft group, returning true if the raft group exists and should +// be queued for Ready processing; false otherwise. +func (r *Replica) tick( + ctx context.Context, + livenessMap liveness.IsLiveMap, + ioOverloadMap map[roachpb.StoreID]*admissionpb.IOThreshold, +) (bool, error) { r.unreachablesMu.Lock() remotes := r.unreachablesMu.remotes r.unreachablesMu.remotes = nil @@ -1180,6 +1186,54 @@ func (r *Replica) tick(ctx context.Context, livenessMap liveness.IsLiveMap) (boo return false, nil } + if r.replicaID == r.mu.leaderID && len(ioOverloadMap) > 0 && quotaPoolEnabledForRange(*r.descRLocked()) { + // When multiple followers are overloaded, we may not be able to exclude all + // of them from replication traffic due to quorum constraints. We would like + // a given Range to deterministically exclude the same store (chosen + // randomly), so that across multiple Ranges we have a chance of removing + // load from all overloaded Stores in the cluster. (It would be a bad idea + // to roll a per-Range dice here on every tick, since that would rapidly + // include and exclude individual followers from replication traffic, which + // would be akin to a high rate of packet loss. Once we've decided to ignore + // a follower, this decision should be somewhat stable for at least a few + // seconds). + // + // Note that we don't enable this mechanism for the liveness range (see + // quotaPoolEnabledForRange), simply to play it safe, as we know that the + // liveness range is unlikely to be a major contributor to any follower's + // I/O and wish to reduce the likelihood of a problem in replication pausing + // contributing to an outage of that critical range. + seed := int64(r.RangeID) + now := r.store.Clock().Now().GoTime() + d := computeExpendableOverloadedFollowersInput{ + replDescs: r.descRLocked().Replicas(), + ioOverloadMap: ioOverloadMap, + getProgressMap: func(_ context.Context) map[uint64]tracker.Progress { + prs := r.mu.internalRaftGroup.Status().Progress + updateRaftProgressFromActivity(ctx, prs, r.descRLocked().Replicas().AsProto(), func(id roachpb.ReplicaID) bool { + return r.mu.lastUpdateTimes.isFollowerActiveSince(ctx, id, now, r.store.cfg.RangeLeaseActiveDuration()) + }) + return prs + }, + minLiveMatchIndex: r.mu.proposalQuotaBaseIndex, + seed: seed, + } + r.mu.pausedFollowers, _ = computeExpendableOverloadedFollowers(ctx, d) + for replicaID := range r.mu.pausedFollowers { + // We're dropping messages to those followers (see handleRaftReady) but + // it's a good idea to tell raft not to even bother sending in the first + // place. Raft will react to this by moving the follower to probing state + // where it will be contacted only sporadically until it responds to an + // MsgApp (which it can only do once we stop dropping messages). Something + // similar would result naturally if we didn't report as unreachable, but + // with more wasted work. + r.mu.internalRaftGroup.ReportUnreachable(uint64(replicaID)) + } + } else if len(r.mu.pausedFollowers) > 0 { + // No store in the cluster is overloaded, or this replica is not raft leader. + r.mu.pausedFollowers = nil + } + now := r.store.Clock().NowAsClockTimestamp() if r.maybeQuiesceRaftMuLockedReplicaMuLocked(ctx, now, livenessMap) { return false, nil @@ -1191,6 +1245,14 @@ func (r *Replica) tick(ctx context.Context, livenessMap liveness.IsLiveMap) (boo // into the local Raft group. The leader won't hit that path, so we update // it whenever it ticks. In effect, this makes sure it always sees itself as // alive. + // + // Note that in a workload where the leader doesn't have inflight requests + // "most of the time" (i.e. occasional writes only on this range), it's quite + // likely that we'll never reach this line, since we'll return in the + // maybeQuiesceRaftMuLockedReplicaMuLocked branch above. + // + // This is likely unintentional, and the leader should likely consider itself + // live even when quiesced. if r.replicaID == r.mu.leaderID { r.mu.lastUpdateTimes.update(r.replicaID, timeutil.Now()) } @@ -1444,10 +1506,12 @@ func (r *Replica) maybeCoalesceHeartbeat( return true } -func (r *Replica) sendRaftMessagesRaftMuLocked(ctx context.Context, messages []raftpb.Message) { +func (r *Replica) sendRaftMessagesRaftMuLocked( + ctx context.Context, messages []raftpb.Message, blocked map[roachpb.ReplicaID]struct{}, +) { var lastAppResp raftpb.Message for _, message := range messages { - drop := false + _, drop := blocked[roachpb.ReplicaID(message.To)] switch message.Type { case raftpb.MsgApp: if util.RaceEnabled { @@ -1511,6 +1575,9 @@ func (r *Replica) sendRaftMessagesRaftMuLocked(ctx context.Context, messages []r } } + // TODO(tbg): record this to metrics. + // + // See: https://github.com/cockroachdb/cockroach/issues/83917 if !drop { r.sendRaftMessageRaftMuLocked(ctx, message) } diff --git a/pkg/kv/kvserver/replica_raft_overload.go b/pkg/kv/kvserver/replica_raft_overload.go new file mode 100644 index 000000000000..0495417a2613 --- /dev/null +++ b/pkg/kv/kvserver/replica_raft_overload.go @@ -0,0 +1,208 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver + +import ( + "context" + "math/rand" + "sort" + "sync/atomic" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/errors" + "go.etcd.io/etcd/raft/v3/tracker" +) + +var pauseReplicationIOThreshold = settings.RegisterFloatSetting( + settings.SystemOnly, + "admission.kv.pause_replication_io_threshold", + "pause replication to non-essential followers when their I/O admission control score exceeds the given threshold (zero to disable)", + // TODO(tbg): set a nonzero default. + // See: https://github.com/cockroachdb/cockroach/issues/83920 + 0.0, + func(v float64) error { + if v == 0 { + return nil + } + const min = 0.3 + if v < min { + return errors.Errorf("minimum admissible nonzero value is %f", min) + } + return nil + }, +) + +type computeExpendableOverloadedFollowersInput struct { + replDescs roachpb.ReplicaSet + // TODO(tbg): all entries are overloaded, so consdier removing the IOThreshold here + // because it's confusing. + ioOverloadMap map[roachpb.StoreID]*admissionpb.IOThreshold + // getProgressMap returns Raft's view of the progress map. This is only called + // when needed, and at most once. + getProgressMap func(context.Context) map[uint64]tracker.Progress + // seed is used to randomize selection of which followers to pause in case + // there are multiple followers that qualify, but quorum constraints require + // picking a subset. In practice, we set this to the RangeID to ensure maximum + // stability of the selection on a per-Range basis while encouraging randomness + // across ranges (which in turn should reduce load on all overloaded followers). + seed int64 + // In addition to being in StateReplicate in the progress map, we also only + // consider a follower live it its Match index matches or exceeds + // minLiveMatchIndex. This makes sure that a follower that is behind is not + // mistaken for one that can meaningfully contribute to quorum in the short + // term. Without this, it is - at least in theory - possible that as an + // overloaded formerly expendable store becomes non-overloaded, we will + // quickly mark another overloaded store as expendable under the assumption + // that the original store can now contribute to quorum. However, that store + // is likely behind on the log, and we should consider it as non-live until + // it has caught up. + minLiveMatchIndex uint64 +} + +type nonLiveReason byte + +const ( + nonLiveReasonInactive nonLiveReason = iota + nonLiveReasonPaused + nonLiveReasonBehind +) + +// computeExpendableOverloadedFollowers computes a set of followers that we can +// intentionally exempt from replication traffic (MsgApp) to help them avoid I/O +// overload. +// +// In the common case (no store or at least no follower store close to I/O +// overload), this method does very little work. +// +// If at least one follower is (close to being) overloaded, we determine the +// maximum set of such followers that we can afford not replicating to without +// losing quorum by successively reducing the set of overloaded followers by one +// randomly selected overloaded voter. The randomness makes it more likely that +// when there are multiple overloaded stores in the system that cannot be +// jointly excluded, both stores will in aggregate be relieved from +// approximately 50% of follower raft traffic. +// +// This method uses Raft's view of liveness and in particular will consider +// followers that haven't responded recently (including heartbeats) or are +// waiting for a snapshot as not live. In particular, a follower that is +// initially in the map may transfer out of the map by virtue of being cut off +// from the raft log via a truncation. This is acceptable, since the snapshot +// prevents the replica from receiving log traffic. +func computeExpendableOverloadedFollowers( + ctx context.Context, d computeExpendableOverloadedFollowersInput, +) (map[roachpb.ReplicaID]struct{}, map[roachpb.ReplicaID]nonLiveReason) { + var nonLive map[roachpb.ReplicaID]nonLiveReason + var liveOverloadedVoterCandidates map[roachpb.ReplicaID]struct{} + var liveOverloadedNonVoterCandidates map[roachpb.ReplicaID]struct{} + + var prs map[uint64]tracker.Progress + + for _, replDesc := range d.replDescs.AsProto() { + if _, overloaded := d.ioOverloadMap[replDesc.StoreID]; !overloaded { + continue + } + // There's at least one overloaded follower, so initialize + // extra state to determine which traffic we can drop without + // losing quorum. + if prs == nil { + prs = d.getProgressMap(ctx) + nonLive = map[roachpb.ReplicaID]nonLiveReason{} + for id, pr := range prs { + if !pr.RecentActive { + nonLive[roachpb.ReplicaID(id)] = nonLiveReasonInactive + } + if pr.IsPaused() { + nonLive[roachpb.ReplicaID(id)] = nonLiveReasonPaused + } + if pr.Match < d.minLiveMatchIndex { + nonLive[roachpb.ReplicaID(id)] = nonLiveReasonBehind + } + } + liveOverloadedVoterCandidates = map[roachpb.ReplicaID]struct{}{} + liveOverloadedNonVoterCandidates = map[roachpb.ReplicaID]struct{}{} + } + + // Mark replica on overloaded store as possibly pausable. + // + // NB: we make no distinction between non-live and live replicas at this + // point. That is, even if a replica is considered "non-live", we will still + // consider "additionally" pausing it. The first instinct was to avoid + // layering anything on top of a non-live follower, however a paused + // follower immediately becomes non-live, so if we want stable metrics on + // which followers are "paused", then we need the "pausing" state to + // overrule the "non-live" state. + if prs[uint64(replDesc.ReplicaID)].IsLearner { + liveOverloadedNonVoterCandidates[replDesc.ReplicaID] = struct{}{} + } else { + liveOverloadedVoterCandidates[replDesc.ReplicaID] = struct{}{} + } + } + + // Start out greedily with all overloaded candidates paused, and remove + // randomly chosen candidates until we think the raft group can obtain quorum. + var rnd *rand.Rand + for len(liveOverloadedVoterCandidates) > 0 { + up := d.replDescs.CanMakeProgress(func(replDesc roachpb.ReplicaDescriptor) bool { + rid := replDesc.ReplicaID + if _, ok := nonLive[rid]; ok { + return false // not live + } + if _, ok := liveOverloadedVoterCandidates[rid]; ok { + return false // want to drop traffic + } + if _, ok := liveOverloadedNonVoterCandidates[rid]; ok { + return false // want to drop traffic + } + return true // live for all we know + }) + if up { + // We've found the largest set of voters to drop traffic to + // without losing quorum. + break + } + var sl []roachpb.ReplicaID + for sid := range liveOverloadedVoterCandidates { + sl = append(sl, sid) + } + // Sort for determinism during tests. + sort.Slice(sl, func(i, j int) bool { + return sl[i] < sl[j] + }) + // Remove a random voter candidate, and loop around to see if we now have + // quorum. + if rnd == nil { + rnd = rand.New(rand.NewSource(d.seed)) + } + delete(liveOverloadedVoterCandidates, sl[rnd.Intn(len(sl))]) + } + + // Return union of non-voter and voter candidates. + for nonVoter := range liveOverloadedNonVoterCandidates { + liveOverloadedVoterCandidates[nonVoter] = struct{}{} + } + return liveOverloadedVoterCandidates, nonLive +} + +type overloadedStoresMap atomic.Value // map[roachpb.StoreID]*admissionpb.IOThreshold + +func (osm *overloadedStoresMap) Load() map[roachpb.StoreID]*admissionpb.IOThreshold { + v, _ := (*atomic.Value)(osm).Load().(map[roachpb.StoreID]*admissionpb.IOThreshold) + return v +} + +func (osm *overloadedStoresMap) Swap( + m map[roachpb.StoreID]*admissionpb.IOThreshold, +) map[roachpb.StoreID]*admissionpb.IOThreshold { + v, _ := (*atomic.Value)(osm).Swap(m).(map[roachpb.StoreID]*admissionpb.IOThreshold) + return v +} diff --git a/pkg/kv/kvserver/replica_raft_overload_test.go b/pkg/kv/kvserver/replica_raft_overload_test.go new file mode 100644 index 000000000000..25ad84ea7e5e --- /dev/null +++ b/pkg/kv/kvserver/replica_raft_overload_test.go @@ -0,0 +1,154 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. +// + +package kvserver + +import ( + "context" + "fmt" + "sort" + "strconv" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/datadriven" + "github.com/stretchr/testify/require" + "go.etcd.io/etcd/raft/v3/tracker" +) + +func TestReplicaRaftOverload_computeExpendableOverloadedFollowers(t *testing.T) { + defer leaktest.AfterTest(t)() + + datadriven.Walk(t, testutils.TestDataPath(t, "replica_raft_overload"), func(t *testing.T, path string) { + datadriven.RunTest(t, path, func(t *testing.T, d *datadriven.TestData) string { + var buf strings.Builder + tr := tracing.NewTracer() + ctx, finishAndGet := tracing.ContextWithRecordingSpan(context.Background(), tr, path) + defer func() { + if t.Failed() { + t.Logf("%s", finishAndGet()) + } + }() + require.Equal(t, "run", d.Cmd) + var seed uint64 + var replDescs roachpb.ReplicaSet + ioOverloadMap := map[roachpb.StoreID]*admissionpb.IOThreshold{} + snapshotMap := map[roachpb.ReplicaID]struct{}{} + downMap := map[roachpb.ReplicaID]struct{}{} + match := map[roachpb.ReplicaID]uint64{} + minLiveMatchIndex := uint64(0) // accept all live followers by default + for _, arg := range d.CmdArgs { + for i := range arg.Vals { + sl := strings.SplitN(arg.Vals[i], "@", 2) + if len(sl) == 1 { + sl = append(sl, "") + } + idS, matchS := sl[0], sl[1] + arg.Vals[i] = idS + + var id uint64 + arg.Scan(t, i, &id) + switch arg.Key { + case "min-live-match-index": + minLiveMatchIndex = id + case "voters", "learners": + replicaID := roachpb.ReplicaID(id) + if matchS != "" { + var err error + match[replicaID], err = strconv.ParseUint(matchS, 10, 64) + require.NoError(t, err) + } + typ := roachpb.ReplicaTypeVoterFull() + if arg.Key == "learners" { + typ = roachpb.ReplicaTypeNonVoter() + } + replDesc := roachpb.ReplicaDescriptor{ + NodeID: roachpb.NodeID(id), + StoreID: roachpb.StoreID(id), + ReplicaID: replicaID, + Type: typ, + } + replDescs.AddReplica(replDesc) + case "overloaded": + ioOverloadMap[roachpb.StoreID(id)] = &admissionpb.IOThreshold{ + L0NumSubLevels: 1000, + L0NumSubLevelsThreshold: 20, + L0NumFiles: 1, + L0NumFilesThreshold: 1, + } + case "snapshot": + snapshotMap[roachpb.ReplicaID(id)] = struct{}{} + case "down": + downMap[roachpb.ReplicaID(id)] = struct{}{} + case "seed": + d.ScanArgs(t, "seed", &seed) + default: + t.Fatalf("unknown: %s", arg.Key) + } + } + } + + getProgressMap := func(ctx context.Context) map[uint64]tracker.Progress { + log.Eventf(ctx, "getProgressMap was called") + + // First, set up a progress map in which all replicas are tracked and are live. + m := map[uint64]tracker.Progress{} + for _, replDesc := range replDescs.AsProto() { + pr := tracker.Progress{ + State: tracker.StateReplicate, + Match: match[replDesc.ReplicaID], + RecentActive: true, + IsLearner: replDesc.GetType() == roachpb.LEARNER || replDesc.GetType() == roachpb.NON_VOTER, + Inflights: tracker.NewInflights(1), // avoid NPE + } + m[uint64(replDesc.ReplicaID)] = pr + } + // Mark replicas as down or needing snapshot as configured. + for replicaID := range downMap { + id := uint64(replicaID) + pr := m[id] + pr.RecentActive = false + m[id] = pr + } + for replicaID := range snapshotMap { + id := uint64(replicaID) + pr := m[id] + pr.State = tracker.StateSnapshot + m[id] = pr + } + return m + } + + m, _ := computeExpendableOverloadedFollowers(ctx, computeExpendableOverloadedFollowersInput{ + replDescs: replDescs, + ioOverloadMap: ioOverloadMap, + getProgressMap: getProgressMap, + seed: int64(seed), + minLiveMatchIndex: minLiveMatchIndex, + }) + { + var sl []roachpb.ReplicaID + for replID := range m { + sl = append(sl, replID) + } + sort.Slice(sl, func(i, j int) bool { return sl[i] < sl[j] }) + fmt.Fprintln(&buf, sl) + } + return buf.String() + }) + }) +} diff --git a/pkg/kv/kvserver/replica_raft_quiesce.go b/pkg/kv/kvserver/replica_raft_quiesce.go index 3b4e3786c4d7..40157633ed45 100644 --- a/pkg/kv/kvserver/replica_raft_quiesce.go +++ b/pkg/kv/kvserver/replica_raft_quiesce.go @@ -182,7 +182,7 @@ func (r *Replica) canUnquiesceRLocked() bool { func (r *Replica) maybeQuiesceRaftMuLockedReplicaMuLocked( ctx context.Context, now hlc.ClockTimestamp, livenessMap liveness.IsLiveMap, ) bool { - status, lagging, ok := shouldReplicaQuiesce(ctx, r, now, livenessMap) + status, lagging, ok := shouldReplicaQuiesce(ctx, r, now, livenessMap, r.mu.pausedFollowers) if !ok { return false } @@ -267,7 +267,11 @@ func (s laggingReplicaSet) Less(i, j int) bool { return s[i].NodeID < s[j].NodeI // // NOTE: The last 3 conditions are fairly, but not completely, overlapping. func shouldReplicaQuiesce( - ctx context.Context, q quiescer, now hlc.ClockTimestamp, livenessMap liveness.IsLiveMap, + ctx context.Context, + q quiescer, + now hlc.ClockTimestamp, + livenessMap liveness.IsLiveMap, + pausedFollowers map[roachpb.ReplicaID]struct{}, ) (*raft.Status, laggingReplicaSet, bool) { if testingDisableQuiescence { return nil, nil, false @@ -354,6 +358,19 @@ func shouldReplicaQuiesce( return nil, nil, false } + if len(pausedFollowers) > 0 { + // TODO(tbg): we should use a mechanism similar to livenessMap below (including a + // callback that unquiesces when paused followers unpause, since they will by + // definition be lagging). This was a little too much code churn at the time + // at which this comment was written. + // + // See: https://github.com/cockroachdb/cockroach/issues/84252 + if log.V(4) { + log.Infof(ctx, "not quiescing: overloaded followers %v", pausedFollowers) + } + return nil, nil, false + } + var foundSelf bool var lagging laggingReplicaSet for _, rep := range q.descRLocked().Replicas().Descriptors() { diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index afc446a03214..9a0207b4d31c 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -8070,7 +8070,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { ticks := r.mu.ticks r.mu.Unlock() for ; (ticks % electionTicks) != 0; ticks++ { - if _, err := r.tick(ctx, nil); err != nil { + if _, err := r.tick(ctx, nil, nil); err != nil { t.Fatal(err) } } @@ -8121,7 +8121,7 @@ func TestReplicaRefreshPendingCommandsTicks(t *testing.T) { r.mu.Unlock() // Tick raft. - if _, err := r.tick(ctx, nil); err != nil { + if _, err := r.tick(ctx, nil, nil); err != nil { t.Fatal(err) } @@ -9187,7 +9187,8 @@ func TestReplicaMetrics(t *testing.T) { ctx, hlc.Timestamp{}, &cfg.RaftConfig, spanConfig, c.liveness, 0, &c.desc, c.raftStatus, kvserverpb.LeaseStatus{}, c.storeID, c.expected.Quiescent, c.expected.Ticking, - concurrency.LatchMetrics{}, concurrency.LockTableMetrics{}, c.raftLogSize, true, 0, 0) + concurrency.LatchMetrics{}, concurrency.LockTableMetrics{}, c.raftLogSize, + true, 0, 0, nil /* overloadMap */) require.Equal(t, c.expected, metrics) }) } @@ -9910,6 +9911,7 @@ type testQuiescer struct { // Not used to implement quiescer, but used by tests. livenessMap liveness.IsLiveMap + paused map[roachpb.ReplicaID]struct{} } func (q *testQuiescer) descRLocked() *roachpb.RangeDescriptor { @@ -10002,7 +10004,7 @@ func TestShouldReplicaQuiesce(t *testing.T) { }, } q = transform(q) - _, lagging, ok := shouldReplicaQuiesce(context.Background(), q, hlc.ClockTimestamp{}, q.livenessMap) + _, lagging, ok := shouldReplicaQuiesce(context.Background(), q, hlc.ClockTimestamp{}, q.livenessMap, q.paused) require.Equal(t, expected, ok) if ok { // Any non-live replicas should be in the laggingReplicaSet. @@ -10127,6 +10129,13 @@ func TestShouldReplicaQuiesce(t *testing.T) { return q }) } + // Verify no quiescence when a follower is paused. + test(false, func(q *testQuiescer) *testQuiescer { + q.paused = map[roachpb.ReplicaID]struct{}{ + q.desc.Replicas().AsProto()[0].ReplicaID: {}, + } + return q + }) } func TestFollowerQuiesceOnNotify(t *testing.T) { diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index 9c04b36da314..b90f08658c6e 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -930,6 +930,10 @@ type Store struct { // liveness. It is updated periodically in raftTickLoop() // and reactively in nodeIsLiveCallback() on liveness updates. livenessMap atomic.Value + // ioOverloadedStores is analogous to livenessMap, but stores a + // map[StoreID]*IOThreshold. It is gossip-backed but is not updated + // reactively, i.e. will refresh on each tick loop iteration only. + ioOverloadedStores overloadedStoresMap // cachedCapacity caches information on store capacity to prevent // expensive recomputations in case leases or replicas are rapidly @@ -2548,6 +2552,11 @@ func (s *Store) GossipStore(ctx context.Context, useCached bool) error { // Unique gossip key per store. gossipStoreKey := gossip.MakeStoreDescKey(storeDesc.StoreID) // Gossip store descriptor. + if fn := s.cfg.TestingKnobs.StoreGossipIntercept; fn != nil { + // Give the interceptor a chance to see and/or mutate the descriptor we're about + // to gossip. + fn(storeDesc) + } return s.cfg.Gossip.AddInfoProto(gossipStoreKey, storeDesc, gossip.StoreTTL) } @@ -3126,6 +3135,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { underreplicatedRangeCount int64 overreplicatedRangeCount int64 behindCount int64 + pausedFollowerCount int64 locks int64 totalLockHoldDurationNanos int64 @@ -3184,6 +3194,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { overreplicatedRangeCount++ } } + pausedFollowerCount += metrics.PausedFollowerCount behindCount += metrics.BehindCount if qps, dur := rep.leaseholderStats.AverageRatePerSecond(); dur >= replicastats.MinStatsDuration { averageQueriesPerSecond += qps @@ -3244,6 +3255,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { s.metrics.UnderReplicatedRangeCount.Update(underreplicatedRangeCount) s.metrics.OverReplicatedRangeCount.Update(overreplicatedRangeCount) s.metrics.RaftLogFollowerBehindCount.Update(behindCount) + s.metrics.RaftPausedFollowerCount.Update(pausedFollowerCount) var averageLockHoldDurationNanos int64 var averageLockWaitDurationNanos int64 diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index f06897fadf21..6cc38f1bdc1f 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" "github.com/cockroachdb/cockroach/pkg/util/stop" @@ -645,10 +646,12 @@ func (s *Store) processTick(_ context.Context, rangeID roachpb.RangeID) bool { return false } livenessMap, _ := s.livenessMap.Load().(liveness.IsLiveMap) + storeOverloadMap := s.ioOverloadedStores.Load() start := timeutil.Now() ctx := r.raftCtx - exists, err := r.tick(ctx, livenessMap) + + exists, err := r.tick(ctx, livenessMap, storeOverloadMap) if err != nil { log.Errorf(ctx, "%v", err) } @@ -738,6 +741,7 @@ func (s *Store) raftTickLoop(ctx context.Context) { if s.cfg.NodeLiveness != nil { s.updateLivenessMap() } + s.updateStoreOverloadMap() s.unquiescedReplicas.Lock() // Why do we bother to ever queue a Replica on the Raft scheduler for @@ -759,6 +763,28 @@ func (s *Store) raftTickLoop(ctx context.Context) { } } +var shouldLogStoreOverloadMap = log.Every(10 * time.Second) + +func (s *Store) updateStoreOverloadMap() { + storeOverloadMap := map[roachpb.StoreID]*admissionpb.IOThreshold{} + overloadThresh := pauseReplicationIOThreshold.Get(&s.cfg.Settings.SV) + for _, sd := range s.allocator.StorePool.GetStores() { + if score, _ := sd.Capacity.IOThreshold.Score(); overloadThresh != 0 && score > overloadThresh { + ioThreshold := sd.Capacity.IOThreshold // need a copy + storeOverloadMap[sd.StoreID] = &ioThreshold + } + } + old := s.ioOverloadedStores.Swap(storeOverloadMap) + // Consider logging if we're going from seeing overloaded stores to seeing no overloaded stores, or when + // there are still overloaded stores and we haven't logged for a while. + shouldLog := log.V(1) || + (len(old) > 0 && len(storeOverloadMap) == 0) || + (len(storeOverloadMap) > 0 && shouldLogStoreOverloadMap.ShouldLog()) + if shouldLog { + log.Infof(s.AnnotateCtx(context.Background()), "IO overloaded stores [threshold %.2f]: %+v (before: %+v)", overloadThresh, storeOverloadMap, old) + } +} + func (s *Store) updateLivenessMap() { nextMap := s.cfg.NodeLiveness.GetIsLiveMap() for nodeID, entry := range nextMap { diff --git a/pkg/kv/kvserver/testdata/replica_raft_overload/basic.txt b/pkg/kv/kvserver/testdata/replica_raft_overload/basic.txt new file mode 100644 index 000000000000..8a793d9bb5ff --- /dev/null +++ b/pkg/kv/kvserver/testdata/replica_raft_overload/basic.txt @@ -0,0 +1,180 @@ +############################## +# Basic sanity checks +############################## + +run +---- +[] + +run voters=(1) overloaded=(1) +---- +[] + +# Will pause as many learners as we can... +run voters=(1) learners=(2,3,4,5,6,7) overloaded=(2,4,6,7) +---- +[2 4 6 7] + +# ... even when we can't do anything about overloaded voters. +run voters=(1) learners=(2,3,4,5,6,7) overloaded=(1,2,4,6,7) +---- +[2 4 6 7] + +############################## +# RF=3 +############################## + +run voters=(1,2,3) +---- +[] + +run voters=(1,2,3) overloaded=(3) +---- +[3] + +run voters=(1,2,3) overloaded=(2,3) +---- +[3] + +run voters=(1,2,3) overloaded=(1,2,3) +---- +[3] + +run voters=(1,2,3) overloaded=(1,2,3) seed=3 +---- +[1] + +run voters=(1,2,3) learners=(4,5) +---- +[] + +run voters=(1,2,3) learners=(4,5) overloaded=(3,4) +---- +[3 4] + +run voters=(1,2,3) learners=(4,5) overloaded=(3,4,5) +---- +[3 4 5] + +run voters=(1,2,3) learners=(4,5) overloaded=(2,3,4,5) +---- +[3 4 5] + +run voters=(1,2,3) learners=(4,5) overloaded=(1,2,3,4,5) +---- +[3 4 5] + +run voters=(1,2,3) down=(3) overloaded=(1) +---- +[] + +run voters=(1,2,3) snapshot=(3) overloaded=(1) +---- +[] + +run voters=(1,2,3) down=(3) overloaded=(1,2) +---- +[] + +run voters=(1,2,3) snapshot=(3) overloaded=(1,2) +---- +[] + +# When a "down" follower is also overloaded, we consider +# pausing it (even though it's already "down"). If we didn't +# do this we'd pause a healthy follower, it will become non-healthy +# due to the pausing, and then we won't consider it paused, it will +# be caught up, get paused again, etc. +run voters=(1,2,3) down=(3) overloaded=(1,2,3) +---- +[3] + +run voters=(1,2,3) down=(3) overloaded=(3) +---- +[3] + +run voters=(1,2,3) learners=(4) down=(4) overloaded=(1,2) +---- +[2] + +run voters=(1,2,3) learners=(4) snapshot=(4) overloaded=(1,2) +---- +[2] + +run voters=(1,2,3) learners=(4) snapshot=(4) overloaded=(1,2,4) +---- +[2 4] + +# Won't pause 2 since a quorum is down... +run voters=(1,2,3) down=(1,2) overloaded=(2,3) +---- +[] + +# ... but if there's an overloaded learner we'll still pause it. +run voters=(1,2,3) learners=(4) down=(1,2) overloaded=(2,3,4) +---- +[4] + +############################## +# RF=4 +############################## + +run voters=(1,2,3,4) overloaded=(1,2,3,4) +---- +[2] + +run voters=(1,2,3,4) learners=(5) overloaded=(1,2,3,4) +---- +[2] + +run voters=(1,2,3,4) learners=(5) overloaded=(1,2,3,4,5) +---- +[2 5] + +############################## +# RF=5 +############################## + +run voters=(1,2,3,4,5) +---- +[] + +run voters=(1,2,3,4,5) overloaded=(1) +---- +[1] + +run voters=(1,2,3,4,5) overloaded=(1,2) +---- +[1 2] + +run voters=(1,2,3,4,5) overloaded=(1,2,3) +---- +[2 3] + +run voters=(1,2,3,4,5) overloaded=(1,2,3,4) +---- +[2 4] + +run voters=(1,2,3,4,5) overloaded=(1,2,3,4,5) +---- +[1 4] + +run voters=(1,2,3,4,5) overloaded=(1,2,3,4,5) snapshot=(1) +---- +[1 4] + +run voters=(1,2,3,4,5) overloaded=(1,2,3,4,5) snapshot=(1,2) +---- +[] + +run voters=(1,2,3,4,5) overloaded=(1,2,3,4,5) snapshot=(1,2,3) +---- +[] + +run voters=(1,2,3,4,5) learners=(6) overloaded=(1,6) +---- +[1 6] + +run voters=(1,2,3,4,5) learners=(6,7) overloaded=(1,2,3,4,5,6,7) +---- +[1 4 6 7] diff --git a/pkg/kv/kvserver/testdata/replica_raft_overload/min_live_match_index b/pkg/kv/kvserver/testdata/replica_raft_overload/min_live_match_index new file mode 100644 index 000000000000..cc734b2bfd3a --- /dev/null +++ b/pkg/kv/kvserver/testdata/replica_raft_overload/min_live_match_index @@ -0,0 +1,24 @@ +# Start out in a situation in which s3 is overloaded. The quorum (1,2) is ahead +# of the min live index, so we can afford pausing replication to s3. +run voters=(1@100,2@80,3@60) overloaded=(3) min-live-match-index=60 +---- +[3] + +# We'll also pause s3 if it's out of the quota pool window. +run voters=(1@100,2@80,3@60) overloaded=(3) min-live-match-index=59 +---- +[3] + +# s3 quickly recovers while s2 becomes overloaded. However, s3 hasn't caught up +# yet, so we don't count it towards quorum yet, meaning that we will need to +# continue replicating to s2 despite it being overloaded, until s3 considered +# caught up. +run voters=(1@100,2@90,3@69) overloaded=(2) min-live-match-index=70 +---- +[] + +# s3 has caught up to the min-live-match-index of 70, so we can now pause +# replication to s2. +run voters=(1@100,2@90,3@70) overloaded=(2) min-live-match-index=70 +---- +[2] diff --git a/pkg/kv/kvserver/testing_knobs.go b/pkg/kv/kvserver/testing_knobs.go index ff36aff7d8c2..2ab292632729 100644 --- a/pkg/kv/kvserver/testing_knobs.go +++ b/pkg/kv/kvserver/testing_knobs.go @@ -416,6 +416,10 @@ type StoreTestingKnobs struct { // send snapshot semaphore. AfterSendSnapshotThrottle func() + // This method, if set, gets to see (and mutate, if desired) any local + // StoreDescriptor before it is being sent out on the Gossip network. + StoreGossipIntercept func(descriptor *roachpb.StoreDescriptor) + // EnqueueReplicaInterceptor intercepts calls to `store.Enqueue()`. EnqueueReplicaInterceptor func(queueName string, replica *Replica) } diff --git a/pkg/server/status/health_check.go b/pkg/server/status/health_check.go index 4153b78b94fd..5e5694244a15 100644 --- a/pkg/server/status/health_check.go +++ b/pkg/server/status/health_check.go @@ -43,12 +43,13 @@ var ( // large backlog but show no sign of processing times. var trackedMetrics = map[string]threshold{ // Gauges. - "ranges.unavailable": gaugeZero, - "ranges.underreplicated": gaugeZero, - "requests.backpressure.split": gaugeZero, - "requests.slow.latch": gaugeZero, - "requests.slow.lease": gaugeZero, - "requests.slow.raft": gaugeZero, + "ranges.unavailable": gaugeZero, + "ranges.underreplicated": gaugeZero, + "requests.backpressure.split": gaugeZero, + "requests.slow.latch": gaugeZero, + "requests.slow.lease": gaugeZero, + "requests.slow.raft": gaugeZero, + "admission.raft.paused_replicas": gaugeZero, // TODO(tbg): this fires too eagerly. On a large machine that can handle many // concurrent requests, we'll blow a limit that would be disastrous to a smaller // machine. This will be hard to fix. We could track the max goroutine count diff --git a/pkg/ts/catalog/chart_catalog.go b/pkg/ts/catalog/chart_catalog.go index de32c6423028..7ef7be978853 100644 --- a/pkg/ts/catalog/chart_catalog.go +++ b/pkg/ts/catalog/chart_catalog.go @@ -564,6 +564,12 @@ var charts = []sectionDescription{ "ranges.overreplicated", }, }, + { + Title: "Paused Followers", + Metrics: []string{ + "admission.raft.paused_replicas", + }, + }, { Title: "Operations", Metrics: []string{ diff --git a/pkg/util/admission/admissionpb/io_threshold.go b/pkg/util/admission/admissionpb/io_threshold.go index f4089fa47b8e..2ed07df22064 100644 --- a/pkg/util/admission/admissionpb/io_threshold.go +++ b/pkg/util/admission/admissionpb/io_threshold.go @@ -46,6 +46,7 @@ func (iot IOThreshold) Score() (float64, bool) { func (iot IOThreshold) SafeFormat(s interfaces.SafePrinter, _ rune) { if iot == (IOThreshold{}) { s.Printf("N/A") + return } sc, overload := iot.Score() s.Printf("%.3f", redact.SafeFloat(sc))