From 56547ac174a30037a151c4d2b1a97357af4c6622 Mon Sep 17 00:00:00 2001 From: sumeerbhola Date: Wed, 27 Nov 2024 10:33:22 -0500 Subject: [PATCH] rac2,replica_rac2: add ForceFlushIndexChangedLocked to Processor, RangeController This is used to set the highest index up to which all send-queues in pull mode must be force-flushed. Informs #135601 Epic: CRDB-37515 Release note: None --- .../kvflowcontrol/rac2/range_controller.go | 237 ++++++--- .../rac2/range_controller_test.go | 60 ++- .../range_controller/force_flush_index | 478 ++++++++++++++++++ .../force_flush_index_push_pull | 349 +++++++++++++ .../kvflowcontrol/replica_rac2/processor.go | 71 ++- .../replica_rac2/processor_test.go | 9 +- .../replica_rac2/testdata/processor | 6 +- 7 files changed, 1096 insertions(+), 114 deletions(-) create mode 100644 pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/force_flush_index create mode 100644 pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/force_flush_index_push_pull diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go index 510ac8a43e8a..2e0ff31ebe28 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go @@ -37,9 +37,9 @@ import ( // closed if the term changes. // // Almost none of the methods are called with Replica.mu held. The caller and -// callee should order their mutexes before Replica.mu. The one exception is -// HoldsSendTokensLocked, which holds both raftMu and Replica.mu. The callee -// must not acquire its own mutex. +// callee should order their mutexes before Replica.mu. The exceptions are +// HoldsSendTokensLocked, ForceFlushIndexChangedLocked, which hold both raftMu +// and Replica.mu. The callee must not acquire its own mutex. // // RangeController dynamically switches between push and pull mode based on // RaftEvent handling. In general, the code here is oblivious to the fact that @@ -100,6 +100,13 @@ type RangeController interface { // // Requires replica.raftMu to be held. SetLeaseholderRaftMuLocked(ctx context.Context, replica roachpb.ReplicaID) + // ForceFlushIndexChangedLocked sets the force flush index, i.e., the index + // (inclusive) up to which all replicas with a send-queue must be + // force-flushed in MsgAppPull mode. It may be rarely called with no change + // to the index. + // + // Requires replica.raftMu and replica.mu to be held. + ForceFlushIndexChangedLocked(ctx context.Context, index uint64) // CloseRaftMuLocked closes the range controller. // // Requires replica.raftMu to be held. @@ -573,6 +580,9 @@ type RangeControllerInitState struct { // NextRaftIndex is the first index that will appear in the next non-empty // RaftEvent.Entries handled by this RangeController. NextRaftIndex uint64 + // FirstFlushIndex is an index up to (and including) which the + // rangeController running in pull mode must force-flush all send streams. + ForceFlushIndex uint64 } // rangeController is tied to a single leader term. @@ -587,8 +597,9 @@ type rangeController struct { replicaSet ReplicaSet // leaseholder can be NoReplicaID or not be in ReplicaSet, i.e., it is // eventually consistent with the set of replicas. - leaseholder roachpb.ReplicaID - nextRaftIndex uint64 + leaseholder roachpb.ReplicaID + nextRaftIndex uint64 + forceFlushIndex uint64 mu struct { // All the fields in this struct are modified while holding raftMu and @@ -673,11 +684,12 @@ func NewRangeController( log.VInfof(ctx, 1, "r%v creating range controller", o.RangeID) } rc := &rangeController{ - opts: o, - term: init.Term, - leaseholder: init.Leaseholder, - nextRaftIndex: init.NextRaftIndex, - replicaMap: make(map[roachpb.ReplicaID]*replicaState), + opts: o, + term: init.Term, + leaseholder: init.Leaseholder, + nextRaftIndex: init.NextRaftIndex, + forceFlushIndex: init.ForceFlushIndex, + replicaMap: make(map[roachpb.ReplicaID]*replicaState), } rc.scheduledMu.replicas = make(map[roachpb.ReplicaID]struct{}) rc.mu.waiterSetRefreshCh = make(chan struct{}) @@ -893,6 +905,10 @@ type existingSendStreamState struct { indexToSend uint64 } +// infinityEntryIndex is an exclusive upper-bound on the index of an actual +// entry. +const infinityEntryIndex uint64 = math.MaxUint64 + // constructRaftEventForReplica is called iff latestFollowerStateInfo.State is // StateReplicate. // @@ -910,7 +926,7 @@ func constructRaftEventForReplica( logSnapshot raft.LogSnapshot, scratchSendingEntries []entryFCState, ) (_ raftEventForReplica, scratch []entryFCState) { - firstNewEntryIndex, lastNewEntryIndex := uint64(math.MaxUint64), uint64(math.MaxUint64) + firstNewEntryIndex, lastNewEntryIndex := infinityEntryIndex, infinityEntryIndex if n := len(raftEventAppendState.newEntries); n > 0 { firstNewEntryIndex = raftEventAppendState.newEntries[0].id.index lastNewEntryIndex = raftEventAppendState.newEntries[n-1].id.index + 1 @@ -1129,18 +1145,18 @@ func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e Ra // the new entries. rs.scratchVoterStreamState = rs.computeReplicaStreamStateRaftMuLocked(ctx, needsTokens) if (rs.scratchVoterStreamState.noSendQ && rs.scratchVoterStreamState.hasSendTokens) || - rs.scratchVoterStreamState.forceFlushing { + rs.scratchVoterStreamState.forceFlushStopIndex.active() { if rs.desc.IsVoterOldConfig() { votersContributingToQuorum[0]++ - if rs.scratchVoterStreamState.forceFlushing && - !rs.scratchVoterStreamState.forceFlushingBecauseLeaseholder { + if rs.scratchVoterStreamState.forceFlushStopIndex.untilInfinity() && + !rs.scratchVoterStreamState.forceFlushBecauseLeaseholder { numOptionalForceFlushes[0]++ } } if numSets > 1 && rs.desc.IsVoterNewConfig() { votersContributingToQuorum[1]++ - if rs.scratchVoterStreamState.forceFlushing && - !rs.scratchVoterStreamState.forceFlushingBecauseLeaseholder { + if rs.scratchVoterStreamState.forceFlushStopIndex.untilInfinity() && + !rs.scratchVoterStreamState.forceFlushBecauseLeaseholder { // We never actually use numOptionalForceFlushes[1]. Just doing this // for symmetry. numOptionalForceFlushes[1]++ @@ -1189,8 +1205,22 @@ func (rc *rangeController) HandleRaftEventRaftMuLocked(ctx context.Context, e Ra // there is no adjustment needed to ensure quorum. ss = rs.computeReplicaStreamStateRaftMuLocked(ctx, needsTokens) } + // Make a final adjustment to start force-flushing due to + // rc.forceFlushIndex. We deliberately leave this until the end, since + // quorum or leaseholder requirements may already have ensured that this + // replica must force-flush. + // + // NB: Next is exclusive and the first entry that has not yet been sent. + // And forceFlushIndex is inclusive. Therefore, [Next, forceFlushIndex] + // needs to have been sent for force-flush to not be needed, and we + // check for non-emptiness of the interval below. + if rs.scratchEvent.replicaStateInfo.Next <= rc.forceFlushIndex && + ss.isReplicate && !ss.noSendQ && + (!ss.forceFlushStopIndex.active() || uint64(ss.forceFlushStopIndex) < rc.forceFlushIndex) { + ss.forceFlushStopIndex = forceFlushStopIndex(rc.forceFlushIndex) + } rd = replicaDirective{ - forceFlush: ss.forceFlushing, + forceFlushStopIndex: ss.forceFlushStopIndex, hasSendTokens: ss.hasSendTokens, preventSendQNoForceFlush: ss.preventSendQNoForceFlush, } @@ -1248,7 +1278,13 @@ func (rc *rangeController) computeVoterDirectives( // NB: this also includes probeRecentlyNoSendQ. continue } - if rs.scratchVoterStreamState.forceFlushingBecauseLeaseholder { + if rs.scratchVoterStreamState.forceFlushBecauseLeaseholder { + // No choice in whether to force-flush, so not added to any slices. + continue + } + if rs.scratchVoterStreamState.forceFlushStopIndex.active() && + !rs.scratchVoterStreamState.forceFlushStopIndex.untilInfinity() { + // No choice in whether to force-flush, so not added to any slices. continue } // INVARIANTS: @@ -1268,7 +1304,7 @@ func (rc *rangeController) computeVoterDirectives( bucketedTokensSend: bucketedSendTokens, tokensEval: rs.evalTokenCounter.tokens(admissionpb.ElasticWorkClass), } - if rs.scratchVoterStreamState.forceFlushing { + if rs.scratchVoterStreamState.forceFlushStopIndex.active() { forceFlushingScores = append(forceFlushingScores, score) } else if rs.scratchVoterStreamState.noSendQ { candidateDenySendQScores = append(candidateDenySendQScores, score) @@ -1312,7 +1348,7 @@ func (rc *rangeController) computeVoterDirectives( } // Since there is a single set, this must be a member. rs := rc.replicaMap[forceFlushingScores[i].replicaID] - rs.scratchVoterStreamState.forceFlushing = false + rs.scratchVoterStreamState.forceFlushStopIndex = 0 gap++ } } else if gap > 0 { @@ -1356,7 +1392,8 @@ func (rc *rangeController) computeVoterDirectives( if !isSetMember { continue } - rs.scratchVoterStreamState.forceFlushing = true + + rs.scratchVoterStreamState.forceFlushStopIndex = forceFlushStopIndex(infinityEntryIndex) rs.scratchVoterStreamState.preventSendQNoForceFlush = false gap-- if i == 0 && len(voterSets) > 1 && rs.desc.IsVoterNewConfig() { @@ -1496,6 +1533,13 @@ func (rc *rangeController) SetLeaseholderRaftMuLocked( rc.updateWaiterSetsRaftMuLocked() } +// ForceFlushIndexChangedLocked implements RangeController. +func (rc *rangeController) ForceFlushIndexChangedLocked(ctx context.Context, index uint64) { + rc.opts.ReplicaMutexAsserter.RaftMuAssertHeld() + rc.opts.ReplicaMutexAsserter.ReplicaMuAssertHeld() + rc.forceFlushIndex = index +} + // CloseRaftMuLocked closes the range controller. // // Requires replica.raftMu to be held. @@ -1859,8 +1903,6 @@ func (rc *rangeController) checkConsistencyRaftMuLocked(ctx context.Context) { // replicaState holds state for each replica. All methods are called with // raftMu held, hence it does not have its own mutex. -// -// TODO(sumeer): add mutex held assertions. type replicaState struct { parent *rangeController // stream aggregates across the streams for the same (tenant, store). This @@ -1889,14 +1931,27 @@ type replicaStreamState struct { // The remaining fields serve as output from replicaState and subsequent // input into replicaState. - // forceFlushing is true iff in StateReplicate and there is a send-queue and - // is being force-flushed. When provided as subsequent input, it should be - // interpreted as a directive that *may* change the current behavior, i.e., - // it may be asking the stream to start a force-flush or stop a force-flush. + // forceFlushStopIndex.active() is true iff in StateReplicate and there is a + // send-queue (!noSendQ) and is being force-flushed. When provided as + // subsequent input, it should be interpreted as a directive that *may* + // change the current behavior, i.e., it may be asking the stream to start a + // force-flush or stop a force-flush. // - // INVARIANT: forceFlushing => !noSendQ && !hasSendTokens. - forceFlushing bool - forceFlushingBecauseLeaseholder bool + // INVARIANT: forceFlushStopIndex.active() => !noSendQ && !hasSendTokens. + // + // When forceFlushStopIndex.active() is true and forceFlushStopIndex < + // infinityEntryIndex, the force-flush is being done due to the + // externally provided force-flush index. + // + // INVARIANT: forceFlushBecauseLeaseholder => + // forceFlushStopIndex==infinityEntryIndex. + forceFlushStopIndex forceFlushStopIndex + // A true value is always a directive, that is computed in the first-pass, + // in computeReplicaStreamStateRaftMuLocked. + forceFlushBecauseLeaseholder bool + // indexToSend is the state of the replicaSendStream. It is only populated + // in StateReplicate. + indexToSend uint64 // True only if noSendQ. When interpreted as a directive in subsequent // input, it may have been changed from false to true to prevent formation // of a send-queue. @@ -1914,8 +1969,8 @@ type replicaStreamState struct { // whether it has send tokens or should be force flushing. Only relevant for // pull mode. type replicaDirective struct { - forceFlush bool - hasSendTokens bool + forceFlushStopIndex forceFlushStopIndex + hasSendTokens bool // preventSendQNoForceFlush is only used for observability and debugging. preventSendQNoForceFlush bool } @@ -1939,9 +1994,6 @@ func NewReplicaState( // replicaSendStream maintains state for a replica to which we (typically) are // actively replicating. -// -// TODO(sumeer): assert that raftMu is held on the various methods that say it -// must be held. type replicaSendStream struct { parent *replicaState @@ -2037,23 +2089,23 @@ type replicaSendStream struct { // an index >= nextRaftIndexInitial and >= indexToSend. preciseSizeSum kvflowcontrol.Tokens - // tokenWatcherHandle, deductedForSchedulerTokens, forceFlushScheduled + // tokenWatcherHandle, deductedForSchedulerTokens, forceFlushStopIndex // can only be non-zero when connectedState == replicate, and the // send-queue is non-empty. // // INVARIANTS: // - // forceFlushScheduled => tokenWatcherHandle is zero and + // forceFlushStopIndex.active() => tokenWatcherHandle is zero and // deductedForSchedulerTokens == 0. // // tokenWatcherHandle is non-zero => deductedForSchedulerTokens == 0 and - // !forceFlushScheduled. + // !forceFlushStopIndex.active(). // // It follows from the above that: // // deductedForSchedulerTokens != 0 => tokenWatcherHandle is zero and - // !forceFlushScheduled. - forceFlushScheduled bool + // !forceFlushStopIndex.active() + forceFlushStopIndex forceFlushStopIndex tokenWatcherHandle SendTokenWatcherHandle deductedForSchedulerTokens kvflowcontrol.Tokens @@ -2242,13 +2294,15 @@ func (rs *replicaState) computeReplicaStreamStateRaftMuLocked( ) replicaStreamState { rs.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld() if rs.sendStream == nil { + // This is the zero value of replicaStreamState. Listed for readability. return replicaStreamState{ - isReplicate: false, - noSendQ: false, - forceFlushing: false, - forceFlushingBecauseLeaseholder: false, - hasSendTokens: false, - preventSendQNoForceFlush: false, + isReplicate: false, + noSendQ: false, + forceFlushStopIndex: 0, + forceFlushBecauseLeaseholder: false, + indexToSend: 0, + hasSendTokens: false, + preventSendQNoForceFlush: false, } } rss := rs.sendStream @@ -2267,16 +2321,15 @@ func (rs *replicaState) computeReplicaStreamStateRaftMuLocked( // for these two situations is more complicated, and we accept the // slight increase in latency when applying this behavior in the latter // situation. - noSendQ: true, - forceFlushing: false, - forceFlushingBecauseLeaseholder: false, - hasSendTokens: true, + noSendQ: true, + hasSendTokens: true, } } vss := replicaStreamState{ isReplicate: true, noSendQ: rss.isEmptySendQueueStreamLocked(), - forceFlushing: rss.mu.sendQueue.forceFlushScheduled, + forceFlushStopIndex: rss.mu.sendQueue.forceFlushStopIndex, + indexToSend: rss.mu.sendQueue.indexToSend, preventSendQNoForceFlush: false, } if rs.desc.ReplicaID == rs.parent.leaseholder { @@ -2286,8 +2339,8 @@ func (rs *replicaState) computeReplicaStreamStateRaftMuLocked( } else { // The leaseholder may not be force-flushing yet, but this will start // force-flushing. - vss.forceFlushing = true - vss.forceFlushingBecauseLeaseholder = true + vss.forceFlushStopIndex = forceFlushStopIndex(infinityEntryIndex) + vss.forceFlushBecauseLeaseholder = true } return vss } @@ -2297,7 +2350,7 @@ func (rs *replicaState) computeReplicaStreamStateRaftMuLocked( return vss } // Non-leaseholder and non-leader replica. - if vss.noSendQ && !vss.forceFlushing { + if vss.noSendQ { vss.hasSendTokens = true // If tokens are available, that is > 0, we decide we can send all the new // entries. This allows for a burst, but it is too complicated to make a @@ -2444,7 +2497,7 @@ func (rs *replicaState) scheduledRaftMuLocked( rss := rs.sendStream rss.mu.Lock() defer rss.mu.Unlock() - if !rss.mu.sendQueue.forceFlushScheduled && rss.mu.sendQueue.deductedForSchedulerTokens == 0 { + if !rss.mu.sendQueue.forceFlushStopIndex.active() && rss.mu.sendQueue.deductedForSchedulerTokens == 0 { // NB: it is possible mode != rss.mu.mode, and we will ignore the change // here. This is fine in that we will pick up the change in the next // RaftEvent. @@ -2464,7 +2517,7 @@ func (rs *replicaState) scheduledRaftMuLocked( // 4MB. Don't want to hog the scheduler thread for too long. const MaxBytesToSend kvflowcontrol.Tokens = 4 << 20 bytesToSend := MaxBytesToSend - if !rss.mu.sendQueue.forceFlushScheduled && + if !rss.mu.sendQueue.forceFlushStopIndex.active() && rss.mu.sendQueue.deductedForSchedulerTokens < bytesToSend { bytesToSend = rss.mu.sendQueue.deductedForSchedulerTokens } @@ -2515,8 +2568,17 @@ func (rs *replicaState) scheduledRaftMuLocked( return false, true } // Still have a send-queue. + if rss.mu.sendQueue.forceFlushStopIndex.active() && + uint64(rss.mu.sendQueue.forceFlushStopIndex) < rss.mu.sendQueue.indexToSend { + // It is possible that we don't have a quorum with no send-queue and we + // needed to rely on this force-flush until the send-queue was empty. That + // knowledge will become known in the next + // rangeController.HandleRaftEventRaftMuLocked, which will happen at the + // next tick. We accept a latency hiccup in this case for now. + rss.mu.sendQueue.forceFlushStopIndex = 0 + } watchForTokens := - !rss.mu.sendQueue.forceFlushScheduled && rss.mu.sendQueue.deductedForSchedulerTokens == 0 + !rss.mu.sendQueue.forceFlushStopIndex.active() && rss.mu.sendQueue.deductedForSchedulerTokens == 0 if watchForTokens { rss.startAttemptingToEmptySendQueueViaWatcherStreamLocked(ctx) } @@ -2577,7 +2639,8 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked( rss.parent.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld() rss.mu.AssertHeld() wasEmptySendQ := rss.isEmptySendQueueStreamLocked() - rss.tryHandleModeChangeRaftMuAndStreamLocked(ctx, event.mode, wasEmptySendQ, directive.forceFlush) + rss.tryHandleModeChangeRaftMuAndStreamLocked( + ctx, event.mode, wasEmptySendQ, directive.forceFlushStopIndex.active()) if event.mode == MsgAppPull { // MsgAppPull mode (i.e., followers). Populate sendingEntries. n := len(event.sendingEntries) @@ -2585,18 +2648,20 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked( panic(errors.AssertionFailedf("pull mode must not have sending entries (leader=%t)", rss.parent.desc.ReplicaID == rss.parent.parent.opts.LocalReplicaID)) } - if directive.forceFlush { - if !rss.mu.sendQueue.forceFlushScheduled { + if directive.forceFlushStopIndex.active() { + if !rss.mu.sendQueue.forceFlushStopIndex.active() { // Must have a send-queue, so sendingEntries should stay empty // (these will be queued). - rss.startForceFlushRaftMuAndStreamLocked(ctx) + rss.startForceFlushRaftMuAndStreamLocked(ctx, directive.forceFlushStopIndex) + } else if rss.mu.sendQueue.forceFlushStopIndex != directive.forceFlushStopIndex { + rss.mu.sendQueue.forceFlushStopIndex = directive.forceFlushStopIndex } } else { - // INVARIANT: !directive.forceFlush. - if rss.mu.sendQueue.forceFlushScheduled { + // INVARIANT: !directive.forceFlushStopIndex.active() + if rss.mu.sendQueue.forceFlushStopIndex.active() { // Must have a send-queue, so sendingEntries should stay empty (these // will be queued). - rss.mu.sendQueue.forceFlushScheduled = false + rss.mu.sendQueue.forceFlushStopIndex = 0 rss.parent.parent.opts.RangeControllerMetrics.SendQueue.ForceFlushedScheduledCount.Dec(1) rss.startAttemptingToEmptySendQueueViaWatcherStreamLocked(ctx) if directive.hasSendTokens { @@ -2722,7 +2787,7 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked( // NB: this will not do IO since everything here is in the unstable log // (see raft.LogSnapshot.unstable). slice, err := event.logSnapshot.LogSlice( - event.sendingEntries[0].id.index-1, event.sendingEntries[n-1].id.index, math.MaxInt64) + event.sendingEntries[0].id.index-1, event.sendingEntries[n-1].id.index, infinityEntryIndex) if err != nil { return false, err } @@ -2736,7 +2801,8 @@ func (rss *replicaSendStream) handleReadyEntriesRaftMuAndStreamLocked( } hasEmptySendQ := rss.isEmptySendQueueStreamLocked() - if event.mode == MsgAppPull && wasEmptySendQ && !hasEmptySendQ && !rss.mu.sendQueue.forceFlushScheduled { + if event.mode == MsgAppPull && wasEmptySendQ && !hasEmptySendQ && + !rss.mu.sendQueue.forceFlushStopIndex.active() { rss.startAttemptingToEmptySendQueueViaWatcherStreamLocked(ctx) } // NB: we don't special case to an empty send-queue in push mode, where Raft @@ -2791,11 +2857,13 @@ func (rss *replicaSendStream) tryHandleModeChangeRaftMuAndStreamLocked( } } -func (rss *replicaSendStream) startForceFlushRaftMuAndStreamLocked(ctx context.Context) { +func (rss *replicaSendStream) startForceFlushRaftMuAndStreamLocked( + ctx context.Context, forceFlushStopIndex forceFlushStopIndex, +) { rss.parent.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld() rss.mu.AssertHeld() rss.parent.parent.opts.RangeControllerMetrics.SendQueue.ForceFlushedScheduledCount.Inc(1) - rss.mu.sendQueue.forceFlushScheduled = true + rss.mu.sendQueue.forceFlushStopIndex = forceFlushStopIndex rss.parent.parent.scheduleReplica(rss.parent.desc.ReplicaID) rss.stopAttemptingToEmptySendQueueViaWatcherRaftMuAndStreamLocked(ctx, false) } @@ -2848,7 +2916,7 @@ func (rss *replicaSendStream) dequeueFromQueueAndSendRaftMuAndStreamLocked( rss.mu.sendQueue.entryTokensApproximator.addStats( approximatedNumEntries, approximatedNumActualTokens) } - if !rss.mu.sendQueue.forceFlushScheduled { + if !rss.mu.sendQueue.forceFlushStopIndex.active() { // Subtract from already deducted tokens. beforeDeductedTokens := rss.mu.sendQueue.deductedForSchedulerTokens rss.mu.sendQueue.deductedForSchedulerTokens -= tokensNeeded @@ -2872,7 +2940,7 @@ func (rss *replicaSendStream) dequeueFromQueueAndSendRaftMuAndStreamLocked( } if tokensNeeded > 0 { flag := AdjNormal - if rss.mu.sendQueue.forceFlushScheduled { + if rss.mu.sendQueue.forceFlushStopIndex.active() { flag = AdjForceFlush } rss.parent.sendTokenCounter.Deduct(ctx, admissionpb.ElasticWorkClass, tokensNeeded, flag) @@ -2915,7 +2983,7 @@ func (rss *replicaSendStream) changeToProbeRaftMuAndStreamLocked( if !rss.isEmptySendQueueStreamLocked() { panic(errors.AssertionFailedf("transitioning to probeRecentlyNoSendQ when have a send-queue")) } - if rss.mu.sendQueue.forceFlushScheduled { + if rss.mu.sendQueue.forceFlushStopIndex.active() { panic(errors.AssertionFailedf("no send-queue but force-flushing")) } if rss.mu.sendQueue.deductedForSchedulerTokens != 0 || @@ -2929,8 +2997,8 @@ func (rss *replicaSendStream) stopAttemptingToEmptySendQueueRaftMuAndStreamLocke ) { rss.parent.parent.opts.ReplicaMutexAsserter.RaftMuAssertHeld() rss.mu.AssertHeld() - if rss.mu.sendQueue.forceFlushScheduled { - rss.mu.sendQueue.forceFlushScheduled = false + if rss.mu.sendQueue.forceFlushStopIndex.active() { + rss.mu.sendQueue.forceFlushStopIndex = 0 rss.parent.parent.opts.RangeControllerMetrics.SendQueue.ForceFlushedScheduledCount.Dec(1) } rss.stopAttemptingToEmptySendQueueViaWatcherRaftMuAndStreamLocked(ctx, disconnect) @@ -2980,7 +3048,7 @@ func (rss *replicaSendStream) startAttemptingToEmptySendQueueViaWatcherStreamLoc ctx context.Context, ) { rss.mu.AssertHeld() - if rss.mu.sendQueue.forceFlushScheduled { + if rss.mu.sendQueue.forceFlushStopIndex.active() { panic(errors.AssertionFailedf("already trying to empty send-queue using force-flush")) } if rss.mu.sendQueue.deductedForSchedulerTokens != 0 || @@ -3020,8 +3088,8 @@ func (rss *replicaSendStream) Notify(ctx context.Context) { queueSize = 4096 } flag := AdjNormal - if rss.mu.sendQueue.forceFlushScheduled { - flag = AdjForceFlush + if rss.mu.sendQueue.forceFlushStopIndex.active() { + panic(errors.AssertionFailedf("cannot be force-flushing")) } tokens := rss.parent.sendTokenCounter.TryDeduct(ctx, admissionpb.ElasticWorkClass, queueSize, flag) if tokens == 0 { @@ -3253,3 +3321,20 @@ func (a *entryTokensApproximator) meanTokensPerEntry() kvflowcontrol.Tokens { } return mean } + +// forceFlushStopIndex is the inclusive index to send before force-flush can +// stop. When set to infinityEntryIndex, force-flush must continue until the +// send-queue is empty. The zero value implies no force-flush, even though +// this index is inclusive, since index 0 is never used in CockroachDB's use +// of Raft (see stateloader.RaftInitialLogIndex). +type forceFlushStopIndex uint64 + +// active returns whether the stream is force-flushing. +func (i forceFlushStopIndex) active() bool { + return i != 0 +} + +// untilInfinity implies active. +func (i forceFlushStopIndex) untilInfinity() bool { + return uint64(i) == infinityEntryIndex +} diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go index 0ad8eb8ff378..e483c53cc29a 100644 --- a/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go @@ -9,7 +9,6 @@ import ( "cmp" "context" "fmt" - "math" "slices" "sort" "strconv" @@ -240,8 +239,12 @@ func (s *testingRCState) sendStreamString(rangeID roachpb.RangeID) string { testRepl.info.Match+1, rss.mu.sendQueue.indexToSend, rss.mu.sendQueue.indexToSend, rss.mu.sendQueue.nextRaftIndex, rss.mu.sendQueue.preciseSizeSum) - if rss.mu.sendQueue.forceFlushScheduled { - fmt.Fprintf(&b, " force-flushing") + if rss.mu.sendQueue.forceFlushStopIndex.active() { + var stopStr string + if !rss.mu.sendQueue.forceFlushStopIndex.untilInfinity() { + stopStr = fmt.Sprintf(" (stop=%d)", rss.mu.sendQueue.forceFlushStopIndex) + } + fmt.Fprintf(&b, " force-flushing%s", stopStr) } if rss.mu.sendQueue.deductedForSchedulerTokens > 0 { fmt.Fprintf(&b, " deducted=%v", rss.mu.sendQueue.deductedForSchedulerTokens) @@ -355,10 +358,11 @@ func (s *testingRCState) getOrInitRange( } init := RangeControllerInitState{ - Term: 1, - ReplicaSet: r.replicas(), - Leaseholder: r.localReplicaID, - NextRaftIndex: r.nextRaftIndex, + Term: 1, + ReplicaSet: r.replicas(), + Leaseholder: r.localReplicaID, + NextRaftIndex: r.nextRaftIndex, + ForceFlushIndex: r.forceFlushIndex, } options.ReplicaMutexAsserter.RaftMu.Lock() testRC.rc = NewRangeController(s.testCtx, options, init) @@ -545,11 +549,12 @@ func (r *testingRCRange) admit(ctx context.Context, storeID roachpb.StoreID, av } type testingRange struct { - rangeID roachpb.RangeID - tenantID roachpb.TenantID - localReplicaID roachpb.ReplicaID - nextRaftIndex uint64 - replicaSet map[roachpb.ReplicaID]testingReplica + rangeID roachpb.RangeID + tenantID roachpb.TenantID + localReplicaID roachpb.ReplicaID + nextRaftIndex uint64 + forceFlushIndex uint64 + replicaSet map[roachpb.ReplicaID]testingReplica } // Used by simulation test. @@ -1196,6 +1201,35 @@ func TestRangeController(t *testing.T) { }() return state.rangeStateString() + case "set_force_flush_index": + var rangeID int + d.ScanArgs(t, "range_id", &rangeID) + var index int + d.ScanArgs(t, "index", &index) + mode := MsgAppPull + if d.HasArg("push-mode") { + mode = MsgAppPush + } + testRC := state.ranges[roachpb.RangeID(rangeID)] + func() { + testRC.rc.opts.ReplicaMutexAsserter.RaftMu.Lock() + defer testRC.rc.opts.ReplicaMutexAsserter.RaftMu.Unlock() + testRC.rc.opts.ReplicaMutexAsserter.ReplicaMu.Lock() + defer testRC.rc.opts.ReplicaMutexAsserter.ReplicaMu.Unlock() + testRC.rc.ForceFlushIndexChangedLocked(ctx, uint64(index)) + }() + // Send an empty raft event in order to trigger any potential changes. + event := testRC.makeRaftEventWithReplicasState() + event.MsgAppMode = mode + func() { + testRC.rc.opts.ReplicaMutexAsserter.RaftMu.Lock() + defer testRC.rc.opts.ReplicaMutexAsserter.RaftMu.Unlock() + require.NoError(t, testRC.rc.HandleRaftEventRaftMuLocked(ctx, event)) + }() + // Sleep for a bit to allow any timers to fire. + time.Sleep(20 * time.Millisecond) + return state.sendStreamString(roachpb.RangeID(rangeID)) + case "close_rcs": for _, r := range state.ranges { func() { @@ -1313,7 +1347,7 @@ func TestRangeController(t *testing.T) { } else { fromIndex := event.sendingEntryRange[replicaID].fromIndex toIndex := event.sendingEntryRange[replicaID].toIndex - entries, err := testRC.raftLog.Entries(fromIndex, toIndex+1, math.MaxUint64) + entries, err := testRC.raftLog.Entries(fromIndex, toIndex+1, infinityEntryIndex) require.NoError(t, err) msgApp.Entries = entries } diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/force_flush_index b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/force_flush_index new file mode 100644 index 000000000000..2eeb748a43f9 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/force_flush_index @@ -0,0 +1,478 @@ +# Initialize a range with three replicas, none of which have send tokens. +init regular_init=0 elastic_init=0 +range_id=1 tenant_id=1 local_replica_id=1 next_raft_index=1 + store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate next=1 + store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate next=1 + store_id=3 replica_id=3 type=VOTER_FULL state=StateReplicate next=1 +---- +r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3] +t1/s1: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB +t1/s2: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB +t1/s3: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + +# Append five entries. Replica 1 has no send tokens, but is not allowed to +# form a send-queue since it is the leader. Replica 3 also has no send tokens, +# but is not allowed to form a send-queue to maintain quorum. +raft_event pull-mode +range_id=1 + entries + term=1 index=1 pri=NormalPri size=6MiB + term=1 index=2 pri=NormalPri size=6MiB + term=1 index=3 pri=NormalPri size=6MiB + term=1 index=4 pri=NormalPri size=6MiB + term=1 index=5 pri=NormalPri size=6MiB +---- +t1/s1: eval reg=-30 MiB/+16 MiB ela=-30 MiB/+8.0 MiB + send reg=-30 MiB/+16 MiB ela=-30 MiB/+8.0 MiB +t1/s2: eval reg=+0 B/+16 MiB ela=-30 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB +t1/s3: eval reg=-30 MiB/+16 MiB ela=-30 MiB/+8.0 MiB + send reg=-30 MiB/+16 MiB ela=-30 MiB/+8.0 MiB + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,1) send_queue=[1,6) precise_q_size=+30 MiB watching-for-tokens +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+30 MiB ela=+0 B +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +MsgApps sent in pull mode: + to: 3, lowPri: false entries: [1 2 3 4 5] +++++ + +# Force flush up to index 2. Replica 2 starts force-flushing. +set_force_flush_index range_id=1 index=2 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,1) send_queue=[1,6) precise_q_size=+30 MiB force-flushing (stop=2) +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+30 MiB ela=+0 B +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +schedule-controller-event-count: 1 +scheduled-replicas: 2 + +# Scheduler event. Entry at index 1 is popped from replica 2. +handle_scheduler_event range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,2) send_queue=[2,6) precise_q_size=+24 MiB force-flushing (stop=2) +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+24 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +MsgApps sent in pull mode: + to: 2, lowPri: true entries: [1] +++++ +schedule-controller-event-count: 2 +scheduled-replicas: 2 + +# Scheduler event. Entry at index 2 is popped from replica 2, and force-flush +# stops. +handle_scheduler_event range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[2,3) send_queue=[3,6) precise_q_size=+18 MiB watching-for-tokens +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+18 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +MsgApps sent in pull mode: + to: 2, lowPri: true entries: [2] +++++ +schedule-controller-event-count: 2 + +# Force flush up to index 3. Replica 2 starts force-flushing. +set_force_flush_index range_id=1 index=3 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[2,3) send_queue=[3,6) precise_q_size=+18 MiB force-flushing (stop=3) +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+18 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +schedule-controller-event-count: 3 +scheduled-replicas: 2 + +# Scheduler event. Entry at index 3 is popped from replica 2, and force-flush +# stops. +handle_scheduler_event range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[3,4) send_queue=[4,6) precise_q_size=+12 MiB watching-for-tokens +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +MsgApps sent in pull mode: + to: 2, lowPri: true entries: [3] +++++ +schedule-controller-event-count: 3 + +# Force flush up to index 3, which is a noop. +set_force_flush_index range_id=1 index=3 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[3,4) send_queue=[4,6) precise_q_size=+12 MiB watching-for-tokens +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +schedule-controller-event-count: 3 + +# Force flush up to index 4. Replica 2 starts force-flushing. +set_force_flush_index range_id=1 index=4 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[3,4) send_queue=[4,6) precise_q_size=+12 MiB force-flushing (stop=4) +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +schedule-controller-event-count: 4 +scheduled-replicas: 2 + +# Transition replica 3 to StateSnapshot. Replica 2 needs to force-flush, but +# since it is already force-flushing, nothing changes. +set_replicas pull-mode +range_id=1 tenant_id=1 local_replica_id=1 next_raft_index=4 + store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate next=6 + store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate next=4 + store_id=3 replica_id=3 type=VOTER_FULL state=StateSnapshot next=6 +---- +r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3] + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,4) send_queue=[4,6) precise_q_size=+12 MiB force-flushing (stop=4) +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n3,s3):3: closed +++++ +schedule-controller-event-count: 4 +scheduled-replicas: 2 + +# Scheduler event. Entry at index 4 is popped from replica 2, and force-flush +# stops. +handle_scheduler_event range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[4,5) send_queue=[5,6) precise_q_size=+6.0 MiB watching-for-tokens +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+6.0 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 +++++ +(n3,s3):3: closed +++++ +MsgApps sent in pull mode: + to: 2, lowPri: true entries: [4] +++++ +schedule-controller-event-count: 4 + +# Schedule a raft event. It will get replica 2 to start force-flushing again. +raft_event pull-mode +range_id=1 +---- +t1/s1: eval reg=-30 MiB/+16 MiB ela=-30 MiB/+8.0 MiB + send reg=-30 MiB/+16 MiB ela=-30 MiB/+8.0 MiB +t1/s2: eval reg=+0 B/+16 MiB ela=-30 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=-24 MiB/+8.0 MiB +t1/s3: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[4,5) send_queue=[5,6) precise_q_size=+6.0 MiB force-flushing +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+6.0 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 +++++ +(n3,s3):3: closed +++++ +schedule-controller-event-count: 5 +scheduled-replicas: 2 + +# Entry 5 is popped and force-flushing stops. +handle_scheduler_event range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+30 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[5,6) send_queue=[6,6) precise_q_size=+0 B +eval deducted: reg=+0 B ela=+30 MiB +eval original in send-q: reg=+0 B ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +++++ +(n3,s3):3: closed +++++ +MsgApps sent in pull mode: + to: 2, lowPri: true entries: [5] +++++ +schedule-controller-event-count: 5 + +# Add another entry. Neither replica is allowed to form a send-queue. +raft_event pull-mode +range_id=1 + entries + term=1 index=6 pri=NormalPri size=6MiB +---- +t1/s1: eval reg=-36 MiB/+16 MiB ela=-36 MiB/+8.0 MiB + send reg=-36 MiB/+16 MiB ela=-36 MiB/+8.0 MiB +t1/s2: eval reg=-6.0 MiB/+16 MiB ela=-36 MiB/+8.0 MiB + send reg=-6.0 MiB/+16 MiB ela=-36 MiB/+8.0 MiB +t1/s3: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[6,7) send_queue=[7,7) precise_q_size=+0 B +eval deducted: reg=+36 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 + term=1 index=6 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[6,7) send_queue=[7,7) precise_q_size=+0 B +eval deducted: reg=+6.0 MiB ela=+30 MiB +eval original in send-q: reg=+0 B ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 + term=1 index=4 tokens=6291456 + term=1 index=5 tokens=6291456 +NormalPri: + term=1 index=6 tokens=6291456 +++++ +(n3,s3):3: closed +++++ +MsgApps sent in pull mode: + to: 2, lowPri: false entries: [6] +++++ +schedule-controller-event-count: 5 diff --git a/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/force_flush_index_push_pull b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/force_flush_index_push_pull new file mode 100644 index 000000000000..785633082977 --- /dev/null +++ b/pkg/kv/kvserver/kvflowcontrol/rac2/testdata/range_controller/force_flush_index_push_pull @@ -0,0 +1,349 @@ +# Initialize a range with three replicas, none of which have send tokens. +init regular_init=0 elastic_init=0 +range_id=1 tenant_id=1 local_replica_id=1 next_raft_index=1 + store_id=1 replica_id=1 type=VOTER_FULL state=StateReplicate next=1 + store_id=2 replica_id=2 type=VOTER_FULL state=StateReplicate next=1 + store_id=3 replica_id=3 type=VOTER_FULL state=StateReplicate next=1 +---- +r1: [(n1,s1):1*,(n2,s2):2,(n3,s3):3] +t1/s1: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB +t1/s2: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB +t1/s3: eval reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + +# Replica 1 is the leader so sending [1,1) is ignored and everything is +# considered sent (the leader does not see MsgApps for itself). +raft_event +range_id=1 + entries + term=1 index=1 pri=NormalPri size=6MiB + term=1 index=2 pri=NormalPri size=6MiB + term=1 index=3 pri=NormalPri size=6MiB + sending + replica_id=1 [1,1) + replica_id=2 [1,1) + replica_id=3 [1,1) +---- +t1/s1: eval reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB +t1/s2: eval reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB +t1/s3: eval reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + +# Replicas 2 and 3 have a send-queue which is possible in push mode. +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,1) send_queue=[1,4) precise_q_size=+18 MiB +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+18 MiB ela=+0 B +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,1) send_queue=[1,4) precise_q_size=+18 MiB +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+18 MiB ela=+0 B +++++ + +# Force flush up to index 1. Still in push-mode so it has no effect. +set_force_flush_index range_id=1 index=1 push-mode +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,1) send_queue=[1,4) precise_q_size=+18 MiB +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+18 MiB ela=+0 B +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,1) send_queue=[1,4) precise_q_size=+18 MiB +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+18 MiB ela=+0 B +++++ + +# Switch to pull mode. +raft_event pull-mode +range_id=1 +---- +t1/s1: eval reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB +t1/s2: eval reg=+0 B/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB +t1/s3: eval reg=+0 B/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=+0 B/+8.0 MiB + +# Replica 3 starts force-flushing with no limit, since it was necessary for +# quorum. Replica 2 is force-flushing up to index 1. +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,1) send_queue=[1,4) precise_q_size=+18 MiB force-flushing (stop=1) +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+18 MiB ela=+0 B +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,1) send_queue=[1,4) precise_q_size=+18 MiB force-flushing +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+18 MiB ela=+0 B +++++ +schedule-controller-event-count: 1 +scheduled-replicas: 2 3 + +# Update the force-flush index to 2. This propagates to replica 2. Also, since +# replica 2 is seen to be force-flushing, the second-pass decision (see code), +# decides replica 3 does not need to force-flush for quorum. So both have a +# stop index of 2. +set_force_flush_index range_id=1 index=2 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,1) send_queue=[1,4) precise_q_size=+18 MiB force-flushing (stop=2) +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+18 MiB ela=+0 B +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,1) send_queue=[1,4) precise_q_size=+18 MiB force-flushing (stop=2) +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+18 MiB ela=+0 B +++++ +schedule-controller-event-count: 1 +scheduled-replicas: 2 3 + +# Scheduler event. Entry at index 1 is popped from replicas 2, 3. +handle_scheduler_event range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,2) send_queue=[2,4) precise_q_size=+12 MiB force-flushing (stop=2) +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,2) send_queue=[2,4) precise_q_size=+12 MiB force-flushing (stop=2) +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 +++++ +MsgApps sent in pull mode: + to: 2, lowPri: true entries: [1] + to: 3, lowPri: true entries: [1] +++++ +schedule-controller-event-count: 2 +scheduled-replicas: 2 3 + +# Switch back to push mode. Force-flushing stops. +raft_event +range_id=1 +---- +t1/s1: eval reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB +t1/s2: eval reg=-12 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=-6.0 MiB/+8.0 MiB +t1/s3: eval reg=-12 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=-6.0 MiB/+8.0 MiB + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,2) send_queue=[2,4) precise_q_size=+12 MiB +eval deducted: reg=+12 MiB ela=+6.0 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,2) send_queue=[2,4) precise_q_size=+12 MiB +eval deducted: reg=+12 MiB ela=+6.0 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 +++++ +schedule-controller-event-count: 2 +scheduled-replicas: 2 3 + +# Switch back to pull mode. Force-flushing resumes. Replica 3 needs to +# force-flush for quorum, so the stopping point is infinity. +raft_event pull-mode +range_id=1 +---- +t1/s1: eval reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB +t1/s2: eval reg=+0 B/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=-6.0 MiB/+8.0 MiB +t1/s3: eval reg=+0 B/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=-6.0 MiB/+8.0 MiB + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[1,2) send_queue=[2,4) precise_q_size=+12 MiB force-flushing (stop=2) +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[1,2) send_queue=[2,4) precise_q_size=+12 MiB force-flushing +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+12 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 +++++ +schedule-controller-event-count: 2 +scheduled-replicas: 2 3 + +# Scheduler event. Entry at index 2 is popped from replicas 2, 3. Replica 2 +# stops force-flushing. +handle_scheduler_event range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[2,3) send_queue=[3,4) precise_q_size=+6.0 MiB watching-for-tokens +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+6.0 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[2,3) send_queue=[3,4) precise_q_size=+6.0 MiB force-flushing +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+6.0 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 +++++ +MsgApps sent in pull mode: + to: 2, lowPri: true entries: [2] + to: 3, lowPri: true entries: [2] +++++ +schedule-controller-event-count: 3 +scheduled-replicas: 3 + +# Switch back to push mode and then back to pull mode. Replica 2 should not be +# force-flushing and replica 3 should be force-flushing. +raft_event +range_id=1 +---- +t1/s1: eval reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB +t1/s2: eval reg=-6.0 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=-12 MiB/+8.0 MiB +t1/s3: eval reg=-6.0 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=-12 MiB/+8.0 MiB + +raft_event pull-mode +range_id=1 +---- +t1/s1: eval reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=-18 MiB/+16 MiB ela=-18 MiB/+8.0 MiB +t1/s2: eval reg=+0 B/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=-12 MiB/+8.0 MiB +t1/s3: eval reg=+0 B/+16 MiB ela=-18 MiB/+8.0 MiB + send reg=+0 B/+16 MiB ela=-12 MiB/+8.0 MiB + +stream_state range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[2,3) send_queue=[3,4) precise_q_size=+6.0 MiB watching-for-tokens +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+6.0 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[2,3) send_queue=[3,4) precise_q_size=+6.0 MiB force-flushing +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+6.0 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 +++++ +schedule-controller-event-count: 3 +scheduled-replicas: 3 + +# Replica 3 stops force-flushing. +handle_scheduler_event range_id=1 +---- +(n1,s1):1: state=replicate closed=false inflight=[1,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+18 MiB ela=+0 B +eval original in send-q: reg=+0 B ela=+0 B +NormalPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +(n2,s2):2: state=replicate closed=false inflight=[2,3) send_queue=[3,4) precise_q_size=+6.0 MiB watching-for-tokens +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+6.0 MiB ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 +++++ +(n3,s3):3: state=replicate closed=false inflight=[3,4) send_queue=[4,4) precise_q_size=+0 B +eval deducted: reg=+0 B ela=+18 MiB +eval original in send-q: reg=+0 B ela=+0 B +LowPri: + term=1 index=1 tokens=6291456 + term=1 index=2 tokens=6291456 + term=1 index=3 tokens=6291456 +++++ +MsgApps sent in pull mode: + to: 3, lowPri: true entries: [3] +++++ +schedule-controller-event-count: 3 diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go index 10449c5ff2b7..3bf9769efcca 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor.go @@ -113,10 +113,11 @@ type ACWorkQueue interface { } type rangeControllerInitState struct { - term uint64 - replicaSet rac2.ReplicaSet - leaseholder roachpb.ReplicaID - nextRaftIndex uint64 + term uint64 + replicaSet rac2.ReplicaSet + leaseholder roachpb.ReplicaID + nextRaftIndex uint64 + forceFlushIndex uint64 // These fields are required options for the RangeController specific to the // replica and range, rather than the store or node, so we pass them as part // of the range controller init state. @@ -202,9 +203,10 @@ type SideChannelInfoUsingRaftMessageRequest struct { // We *strongly* prefer methods to be called without holding Replica.mu, since // then the callee (implementation of Processor) does not need to worry about // (a) deadlocks, since it sometimes needs to lock Replica.mu itself, (b) the -// amount of work it is doing under this critical section. There are three +// amount of work it is doing under this critical section. There are four // exceptions to this, due to difficulty in changing the calling code: -// InitRaftLocked, OnDescChangedLocked, HoldsSendTokensLocked. +// InitRaftLocked, OnDescChangedLocked, ForceFlushIndexChangedLocked, +// HoldsSendTokensLocked. type Processor interface { // InitRaftLocked is called when raft.RawNode is initialized for the // Replica. NB: can be called twice before the Replica is fully initialized. @@ -246,6 +248,14 @@ type Processor interface { OnDescChangedLocked( ctx context.Context, desc *roachpb.RangeDescriptor, tenantID roachpb.TenantID) + // ForceFlushIndexChangedLocked sets the force flush index, i.e., the index + // (inclusive) up to which all replicas with a send-queue must be + // force-flushed in MsgAppPull mode. It may be rarely called with no change + // to the index. + // + // Both Replica.raftMu and Replica.mu are held. + ForceFlushIndexChangedLocked(ctx context.Context, index uint64) + // HandleRaftReadyRaftMuLocked corresponds to processing that happens when // Replica.handleRaftReadyRaftMuLocked is called. It must be called even // if there was no Ready, since it can be used to advance Admitted, and do @@ -403,6 +413,8 @@ type processorImpl struct { // leaseholderID is the currently known leaseholder replica. leaseholderID roachpb.ReplicaID + forceFlushIndex uint64 + // State at a follower. follower struct { // isLeaderUsingV2Protocol is true when the leaderID indicated that it's @@ -596,6 +608,23 @@ func (p *processorImpl) OnDescChangedLocked( } } +// ForceFlushIndexChangedLocked implements Processor. +func (p *processorImpl) ForceFlushIndexChangedLocked(ctx context.Context, index uint64) { + p.opts.ReplicaMutexAsserter.RaftMuAssertHeld() + p.opts.ReplicaMutexAsserter.ReplicaMuAssertHeld() + if buildutil.CrdbTestBuild && p.forceFlushIndex > index { + panic(errors.AssertionFailedf("force-flush index decreased from %d to %d", + p.forceFlushIndex, index)) + } + p.forceFlushIndex = index + if p.leader.rc != nil { + p.leader.rc.ForceFlushIndexChangedLocked(ctx, index) + // Schedule ready processing so that the RangeController can act on the + // change. + p.opts.RaftScheduler.EnqueueRaftReady(p.opts.RangeID) + } +} + // makeStateConsistentRaftMuLocked uses the union of the latest state // retrieved from RaftNode and the p.desc.replicas set to initialize or update // the internal state of processorImpl. @@ -720,16 +749,17 @@ func (p *processorImpl) createLeaderStateRaftMuLocked( } p.term = term rc := p.opts.RangeControllerFactory.New(ctx, rangeControllerInitState{ - term: term, - replicaSet: p.desc.replicas, - leaseholder: p.leaseholderID, - nextRaftIndex: nextUnstableIndex, - rangeID: p.opts.RangeID, - tenantID: p.desc.tenantID, - localReplicaID: p.opts.ReplicaID, - raftInterface: p.raftInterface, - msgAppSender: p.opts.MsgAppSender, - muAsserter: p.opts.ReplicaMutexAsserter, + term: term, + replicaSet: p.desc.replicas, + leaseholder: p.leaseholderID, + nextRaftIndex: nextUnstableIndex, + forceFlushIndex: p.forceFlushIndex, + rangeID: p.opts.RangeID, + tenantID: p.desc.tenantID, + localReplicaID: p.opts.ReplicaID, + raftInterface: p.raftInterface, + msgAppSender: p.opts.MsgAppSender, + muAsserter: p.opts.ReplicaMutexAsserter, }) func() { @@ -1232,10 +1262,11 @@ func (f RangeControllerFactoryImpl) New( Knobs: f.knobs, }, rac2.RangeControllerInitState{ - Term: state.term, - ReplicaSet: state.replicaSet, - Leaseholder: state.leaseholder, - NextRaftIndex: state.nextRaftIndex, + Term: state.term, + ReplicaSet: state.replicaSet, + Leaseholder: state.leaseholder, + NextRaftIndex: state.nextRaftIndex, + ForceFlushIndex: state.forceFlushIndex, }, ) } diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go index 40c6d395a6a7..db6127f5f745 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/processor_test.go @@ -138,8 +138,9 @@ type testRangeControllerFactory struct { func (f *testRangeControllerFactory) New( ctx context.Context, state rangeControllerInitState, ) rac2.RangeController { - fmt.Fprintf(f.b, " RangeControllerFactory.New(replicaSet=%s, leaseholder=%s, nextRaftIndex=%d)\n", - state.replicaSet, state.leaseholder, state.nextRaftIndex) + fmt.Fprintf(f.b, + " RangeControllerFactory.New(replicaSet=%s, leaseholder=%s, nextRaftIndex=%d, forceFlushIndex=%d)\n", + state.replicaSet, state.leaseholder, state.nextRaftIndex, state.forceFlushIndex) rc := &testRangeController{b: f.b, waited: true} f.rcs = append(f.rcs, rc) return rc @@ -227,6 +228,10 @@ func (c *testRangeController) SetLeaseholderRaftMuLocked( fmt.Fprintf(c.b, " RangeController.SetLeaseholderRaftMuLocked(%s)\n", replica) } +func (c *testRangeController) ForceFlushIndexChangedLocked(ctx context.Context, index uint64) { + fmt.Fprintf(c.b, " RangeController.ForceFlushIndexChangedLocked(%d)\n", index) +} + func (c *testRangeController) CloseRaftMuLocked(ctx context.Context) { fmt.Fprintf(c.b, " RangeController.CloseRaftMuLocked\n") } diff --git a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor index 5f28570d93dc..69a39915014c 100644 --- a/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor +++ b/pkg/kv/kvserver/kvflowcontrol/replica_rac2/testdata/processor @@ -296,7 +296,7 @@ Raft: term: 52 leader: 5 leaseholder: 10 mark: {Term:51 Index:28} next-unstable: handle-raft-ready-and-admit entries=v1/i28/t46/pri0/time2/len100 leader-term=52 ---- HandleRaftReady: - RangeControllerFactory.New(replicaSet=[(n1,s2):5,(n11,s11):11], leaseholder=10, nextRaftIndex=28) + RangeControllerFactory.New(replicaSet=[(n1,s2):5,(n11,s11):11], leaseholder=10, nextRaftIndex=28, forceFlushIndex=0) RangeController.AdmitRaftMuLocked(5, term:52, admitted:[LowPri:26,NormalPri:26,AboveNormalPri:26,HighPri:26]) RangeController.HandleRaftEventRaftMuLocked([28]) ..... @@ -392,7 +392,7 @@ handle-raft-ready-and-admit ---- HandleRaftReady: RangeController.CloseRaftMuLocked - RangeControllerFactory.New(replicaSet=[(n1,s2):5,(n11,s11):11], leaseholder=10, nextRaftIndex=29) + RangeControllerFactory.New(replicaSet=[(n1,s2):5,(n11,s11):11], leaseholder=10, nextRaftIndex=29, forceFlushIndex=0) RangeController.HandleRaftEventRaftMuLocked([]) ..... @@ -500,7 +500,7 @@ LogTracker: mark:{Term:50 Index:25}, stable:24, admitted:[24 24 24 24] # RangeController is created. set-enabled-level enabled-level=v1-encoding ---- - RangeControllerFactory.New(replicaSet=[(n11,s11):11,(n13,s13):13], leaseholder=5, nextRaftIndex=26) + RangeControllerFactory.New(replicaSet=[(n11,s11):11,(n13,s13):13], leaseholder=5, nextRaftIndex=26, forceFlushIndex=0) set-raft-state log-term=50 log-index=26 next-unstable-index=27 ----