Skip to content

Commit

Permalink
rac2,replica_rac2: add ForceFlushIndexChangedLocked to Processor, Ran…
Browse files Browse the repository at this point in the history
…geController

This is used to set the highest index up to which all send-queues in
pull mode must be force-flushed.

Informs #135601

Epic: CRDB-37515

Release note: None
  • Loading branch information
sumeerbhola committed Nov 27, 2024
1 parent a86b127 commit 2b1cfc9
Show file tree
Hide file tree
Showing 7 changed files with 1,091 additions and 114 deletions.
234 changes: 158 additions & 76 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller.go

Large diffs are not rendered by default.

60 changes: 47 additions & 13 deletions pkg/kv/kvserver/kvflowcontrol/rac2/range_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"cmp"
"context"
"fmt"
"math"
"slices"
"sort"
"strconv"
Expand Down Expand Up @@ -240,8 +239,12 @@ func (s *testingRCState) sendStreamString(rangeID roachpb.RangeID) string {
testRepl.info.Match+1, rss.mu.sendQueue.indexToSend,
rss.mu.sendQueue.indexToSend,
rss.mu.sendQueue.nextRaftIndex, rss.mu.sendQueue.preciseSizeSum)
if rss.mu.sendQueue.forceFlushScheduled {
fmt.Fprintf(&b, " force-flushing")
if rss.mu.sendQueue.forceFlushStopIndex.active() {
var stopStr string
if !rss.mu.sendQueue.forceFlushStopIndex.untilInfinity() {
stopStr = fmt.Sprintf(" (stop=%d)", rss.mu.sendQueue.forceFlushStopIndex)
}
fmt.Fprintf(&b, " force-flushing%s", stopStr)
}
if rss.mu.sendQueue.deductedForSchedulerTokens > 0 {
fmt.Fprintf(&b, " deducted=%v", rss.mu.sendQueue.deductedForSchedulerTokens)
Expand Down Expand Up @@ -355,10 +358,11 @@ func (s *testingRCState) getOrInitRange(
}

init := RangeControllerInitState{
Term: 1,
ReplicaSet: r.replicas(),
Leaseholder: r.localReplicaID,
NextRaftIndex: r.nextRaftIndex,
Term: 1,
ReplicaSet: r.replicas(),
Leaseholder: r.localReplicaID,
NextRaftIndex: r.nextRaftIndex,
ForceFlushIndex: r.forceFlushIndex,
}
options.ReplicaMutexAsserter.RaftMu.Lock()
testRC.rc = NewRangeController(s.testCtx, options, init)
Expand Down Expand Up @@ -545,11 +549,12 @@ func (r *testingRCRange) admit(ctx context.Context, storeID roachpb.StoreID, av
}

type testingRange struct {
rangeID roachpb.RangeID
tenantID roachpb.TenantID
localReplicaID roachpb.ReplicaID
nextRaftIndex uint64
replicaSet map[roachpb.ReplicaID]testingReplica
rangeID roachpb.RangeID
tenantID roachpb.TenantID
localReplicaID roachpb.ReplicaID
nextRaftIndex uint64
forceFlushIndex uint64
replicaSet map[roachpb.ReplicaID]testingReplica
}

// Used by simulation test.
Expand Down Expand Up @@ -1196,6 +1201,35 @@ func TestRangeController(t *testing.T) {
}()
return state.rangeStateString()

case "set_force_flush_index":
var rangeID int
d.ScanArgs(t, "range_id", &rangeID)
var index int
d.ScanArgs(t, "index", &index)
mode := MsgAppPull
if d.HasArg("push-mode") {
mode = MsgAppPush
}
testRC := state.ranges[roachpb.RangeID(rangeID)]
func() {
testRC.rc.opts.ReplicaMutexAsserter.RaftMu.Lock()
defer testRC.rc.opts.ReplicaMutexAsserter.RaftMu.Unlock()
testRC.rc.opts.ReplicaMutexAsserter.ReplicaMu.Lock()
defer testRC.rc.opts.ReplicaMutexAsserter.ReplicaMu.Unlock()
testRC.rc.ForceFlushIndexChangedLocked(ctx, uint64(index))
}()
// Send an empty raft event in order to trigger any potential changes.
event := testRC.makeRaftEventWithReplicasState()
event.MsgAppMode = mode
func() {
testRC.rc.opts.ReplicaMutexAsserter.RaftMu.Lock()
defer testRC.rc.opts.ReplicaMutexAsserter.RaftMu.Unlock()
require.NoError(t, testRC.rc.HandleRaftEventRaftMuLocked(ctx, event))
}()
// Sleep for a bit to allow any timers to fire.
time.Sleep(20 * time.Millisecond)
return state.sendStreamString(roachpb.RangeID(rangeID))

case "close_rcs":
for _, r := range state.ranges {
func() {
Expand Down Expand Up @@ -1313,7 +1347,7 @@ func TestRangeController(t *testing.T) {
} else {
fromIndex := event.sendingEntryRange[replicaID].fromIndex
toIndex := event.sendingEntryRange[replicaID].toIndex
entries, err := testRC.raftLog.Entries(fromIndex, toIndex+1, math.MaxUint64)
entries, err := testRC.raftLog.Entries(fromIndex, toIndex+1, infinityEntryIndex)
require.NoError(t, err)
msgApp.Entries = entries
}
Expand Down
Loading

0 comments on commit 2b1cfc9

Please sign in to comment.