diff --git a/pkg/cmd/roachtest/tests/jepsen.go b/pkg/cmd/roachtest/tests/jepsen.go index 754e2b63f013..dbba02c2ffa3 100644 --- a/pkg/cmd/roachtest/tests/jepsen.go +++ b/pkg/cmd/roachtest/tests/jepsen.go @@ -48,7 +48,7 @@ process for those artifacts and that would add some traceability. If you want to make a change to jepsen (like upgrade the version to resolve issues with env incompatibility or bump jdbc driver versions), you can create -a pull request for tc-nightly-master branch and after merging build a new +a pull request for tc-nightly-main branch and after merging build a new artifact using: # install build dependencies and build tools @@ -59,7 +59,7 @@ chmod +x lein # clone repository and checkout release branch git clone https://github.com/cockroachdb/jepsen cd jepsen/cockroachdb -git checkout tc-nightly-master +git checkout tc-nightly-main # build executable jar ~/lein uberjar @@ -81,7 +81,7 @@ const jepsenRepo = "https://github.com/cockroachdb/jepsen" const repoBranch = "tc-nightly" const gcpPath = "https://storage.googleapis.com/cockroach-jepsen" -const binaryVersion = "0.1.0-3d7c345d-standalone" +const binaryVersion = "0.1.0-21cbebe-standalone" var jepsenNemeses = []struct { name, config string diff --git a/pkg/kv/kvserver/asim/storerebalancer/replica_rankings.go b/pkg/kv/kvserver/asim/storerebalancer/replica_rankings.go index 0c40833f0798..6606c6247cde 100644 --- a/pkg/kv/kvserver/asim/storerebalancer/replica_rankings.go +++ b/pkg/kv/kvserver/asim/storerebalancer/replica_rankings.go @@ -29,5 +29,5 @@ func hottestRanges( accumulator.AddReplica(candidateReplica) } replRankings.Update(accumulator) - return replRankings.TopLoad() + return replRankings.TopLoad(dim) } diff --git a/pkg/kv/kvserver/replica_rankings.go b/pkg/kv/kvserver/replica_rankings.go index f0b68e4016a6..147fd5062ac0 100644 --- a/pkg/kv/kvserver/replica_rankings.go +++ b/pkg/kv/kvserver/replica_rankings.go @@ -102,10 +102,15 @@ func NewReplicaRankings() *ReplicaRankings { // TODO(kvoli): When adding another load dimension to be balanced upon, it will // be necessary to clarify the semantics of this API. This is especially true // since the UI is coupled to this function. -func NewReplicaAccumulator(dimension load.Dimension) *RRAccumulator { - res := &RRAccumulator{} - res.dim.val = func(r CandidateReplica) float64 { - return r.RangeUsageInfo().Load().Dim(dimension) +func NewReplicaAccumulator(dims ...load.Dimension) *RRAccumulator { + res := &RRAccumulator{ + dims: map[load.Dimension]*rrPriorityQueue{}, + } + for _, dim := range dims { + res.dims[dim] = &rrPriorityQueue{} + res.dims[dim].val = func(r CandidateReplica) float64 { + return r.RangeUsageInfo().Load().Dim(dim) + } } return res } @@ -118,13 +123,13 @@ func (rr *ReplicaRankings) Update(acc *RRAccumulator) { } // TopLoad returns the highest load CandidateReplicas that are tracked. -func (rr *ReplicaRankings) TopLoad() []CandidateReplica { +func (rr *ReplicaRankings) TopLoad(dimension load.Dimension) []CandidateReplica { rr.mu.Lock() defer rr.mu.Unlock() // If we have a new set of data, consume it. Otherwise, just return the most // recently consumed data. - if rr.mu.dimAccumulator != nil && rr.mu.dimAccumulator.dim.Len() > 0 { - rr.mu.byDim = consumeAccumulator(&rr.mu.dimAccumulator.dim) + if rr.mu.dimAccumulator != nil && rr.mu.dimAccumulator.dims[dimension].Len() > 0 { + rr.mu.byDim = consumeAccumulator(rr.mu.dimAccumulator.dims[dimension]) } return rr.mu.byDim } @@ -138,23 +143,33 @@ func (rr *ReplicaRankings) TopLoad() []CandidateReplica { // prevents concurrent loaders of data from messing with each other -- the last // `update`d accumulator will win. type RRAccumulator struct { - dim rrPriorityQueue + dims map[load.Dimension]*rrPriorityQueue } // AddReplica adds a replica to the replica accumulator. func (a *RRAccumulator) AddReplica(repl CandidateReplica) { + for dim := range a.dims { + a.addReplicaForDimension(repl, dim) + + } +} + +func (a *RRAccumulator) addReplicaForDimension(repl CandidateReplica, dim load.Dimension) { + rr := a.dims[dim] // If the heap isn't full, just push the new replica and return. - if a.dim.Len() < numTopReplicasToTrack { - heap.Push(&a.dim, repl) + if rr.Len() < numTopReplicasToTrack { + + heap.Push(a.dims[dim], repl) return } // Otherwise, conditionally push if the new replica is more deserving than // the current tip of the heap. - if a.dim.val(repl) > a.dim.val(a.dim.entries[0]) { - heap.Pop(&a.dim) - heap.Push(&a.dim, repl) + if rr.val(repl) > rr.val(rr.entries[0]) { + heap.Pop(rr) + heap.Push(rr, repl) } + } func consumeAccumulator(pq *rrPriorityQueue) []CandidateReplica { diff --git a/pkg/kv/kvserver/replica_rankings_test.go b/pkg/kv/kvserver/replica_rankings_test.go index 0f8770d8459e..0e02c07861d7 100644 --- a/pkg/kv/kvserver/replica_rankings_test.go +++ b/pkg/kv/kvserver/replica_rankings_test.go @@ -37,52 +37,65 @@ func TestReplicaRankings(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) + dimensions := []aload.Dimension{aload.Queries, aload.CPU} rr := NewReplicaRankings() testCases := []struct { - replicasByQPS []float64 + replicasByLoad []float64 }{ - {replicasByQPS: []float64{}}, - {replicasByQPS: []float64{0}}, - {replicasByQPS: []float64{1, 0}}, - {replicasByQPS: []float64{3, 2, 1, 0}}, - {replicasByQPS: []float64{3, 3, 2, 2, 1, 1, 0, 0}}, - {replicasByQPS: []float64{1.1, 1.0, 0.9, -0.9, -1.0, -1.1}}, + {replicasByLoad: []float64{}}, + {replicasByLoad: []float64{0}}, + {replicasByLoad: []float64{1, 0}}, + {replicasByLoad: []float64{3, 2, 1, 0}}, + {replicasByLoad: []float64{3, 3, 2, 2, 1, 1, 0, 0}}, + {replicasByLoad: []float64{1.1, 1.0, 0.9, -0.9, -1.0, -1.1}}, } for _, tc := range testCases { - acc := NewReplicaAccumulator(aload.Queries) - - // Randomize the order of the inputs each time the test is run. - want := make([]float64, len(tc.replicasByQPS)) - copy(want, tc.replicasByQPS) - rand.Shuffle(len(tc.replicasByQPS), func(i, j int) { - tc.replicasByQPS[i], tc.replicasByQPS[j] = tc.replicasByQPS[j], tc.replicasByQPS[i] - }) - - for i, replQPS := range tc.replicasByQPS { - acc.AddReplica(candidateReplica{ - Replica: &Replica{RangeID: roachpb.RangeID(i)}, - usage: allocator.RangeUsageInfo{QueriesPerSecond: replQPS}, + for _, dimension := range dimensions { + acc := NewReplicaAccumulator(dimensions...) + + // Randomize the order of the inputs each time the test is run. Also make + // a copy so that we can test on the copy for each dimension without + // mutating the underlying test case slice. + rLoad := make([]float64, len(tc.replicasByLoad)) + want := make([]float64, len(tc.replicasByLoad)) + copy(want, tc.replicasByLoad) + copy(rLoad, tc.replicasByLoad) + + rand.Shuffle(len(rLoad), func(i, j int) { + rLoad[i], rLoad[j] = rLoad[j], rLoad[i] }) - } - rr.Update(acc) - // Make sure we can read off all expected replicas in the correct order. - repls := rr.TopLoad() - if len(repls) != len(want) { - t.Errorf("wrong number of replicas in output; got: %v; want: %v", repls, tc.replicasByQPS) - continue - } - for i := range want { - if repls[i].RangeUsageInfo().QueriesPerSecond != want[i] { - t.Errorf("got %f for %d'th element; want %f (input: %v)", repls[i].RangeUsageInfo().QueriesPerSecond, i, want, tc.replicasByQPS) - break + for i, replLoad := range rLoad { + acc.AddReplica(candidateReplica{ + Replica: &Replica{RangeID: roachpb.RangeID(i)}, + usage: allocator.RangeUsageInfo{ + // We should get the same ordering for both QPS and CPU. + QueriesPerSecond: replLoad, + RequestCPUNanosPerSecond: replLoad, + }, + }) + } + rr.Update(acc) + + // Make sure we can read off all expected replicas in the correct order. + repls := rr.TopLoad(dimension) + if len(repls) != len(want) { + t.Errorf("wrong number of replicas in output; got: %v; want: %v", repls, rLoad) + continue + } + for i := range want { + if repls[i].RangeUsageInfo().Load().Dim(dimension) != want[i] { + t.Errorf("got %f for %d'th element; want %f (input: %v)", + repls[i].RangeUsageInfo().Load().Dim(dimension), i, want, rLoad) + break + } + } + replsCopy := rr.TopLoad(dimension) + if !reflect.DeepEqual(repls, replsCopy) { + t.Errorf("got different replicas on second call to topQPS; first call: %v, second call: %v", repls, replsCopy) } - } - replsCopy := rr.TopLoad() - if !reflect.DeepEqual(repls, replsCopy) { - t.Errorf("got different replicas on second call to topQPS; first call: %v, second call: %v", repls, replsCopy) } } } diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index ba2c4ae54851..231a3d6cdec1 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvpb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/load" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/closedts/sidetransport" @@ -2729,7 +2730,10 @@ func (s *Store) Capacity(ctx context.Context, useCached bool) (roachpb.StoreCapa replicaCount := s.metrics.ReplicaCount.Value() bytesPerReplica := make([]float64, 0, replicaCount) writesPerReplica := make([]float64, 0, replicaCount) - rankingsAccumulator := NewReplicaAccumulator(s.rebalanceObjManager.Objective().ToDimension()) + // We wish to track both CPU and QPS, due to different usecases between UI + // and rebalancing. By default rebalancing uses CPU whilst the UI will use + // QPS. + rankingsAccumulator := NewReplicaAccumulator(load.CPU, load.Queries) rankingsByTenantAccumulator := NewTenantReplicaAccumulator() // Query the current L0 sublevels and record the updated maximum to metrics. @@ -3300,7 +3304,7 @@ type HotReplicaInfo struct { // Note that this uses cached information, so it's cheap but may be slightly // out of date. func (s *Store) HottestReplicas() []HotReplicaInfo { - topLoad := s.replRankings.TopLoad() + topLoad := s.replRankings.TopLoad(load.Queries) return mapToHotReplicasInfo(topLoad) } diff --git a/pkg/kv/kvserver/store_rebalancer.go b/pkg/kv/kvserver/store_rebalancer.go index 621963c944cd..2f67ad2c0ddf 100644 --- a/pkg/kv/kvserver/store_rebalancer.go +++ b/pkg/kv/kvserver/store_rebalancer.go @@ -275,8 +275,8 @@ func (sr *StoreRebalancer) Start(ctx context.Context, stopper *stop.Stopper) { if !sr.subscribedToSpanConfigs() { continue } - hottestRanges := sr.replicaRankings.TopLoad() objective := sr.RebalanceObjective() + hottestRanges := sr.replicaRankings.TopLoad(objective.ToDimension()) options := sr.scorerOptions(ctx, objective.ToDimension()) rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, mode) sr.rebalanceStore(ctx, rctx) diff --git a/pkg/kv/kvserver/store_rebalancer_test.go b/pkg/kv/kvserver/store_rebalancer_test.go index d3e68d0c14d4..21b9225e0a34 100644 --- a/pkg/kv/kvserver/store_rebalancer_test.go +++ b/pkg/kv/kvserver/store_rebalancer_test.go @@ -500,8 +500,10 @@ type testRange struct { qps, reqCPU float64 } -func loadRanges(rr *ReplicaRankings, s *Store, ranges []testRange, loadDimension load.Dimension) { - acc := NewReplicaAccumulator(loadDimension) +func loadRanges(rr *ReplicaRankings, s *Store, ranges []testRange) { + // Track both CPU and QPS by default, the ordering the consumer uses will + // depend on the current rebalance objective. + acc := NewReplicaAccumulator(load.Queries, load.CPU) for i, r := range ranges { rangeID := roachpb.RangeID(i + 1) repl := &Replica{store: s, RangeID: rangeID} @@ -787,8 +789,8 @@ func TestChooseLeaseToTransfer(t *testing.T) { for _, tc := range testCases { t.Run("", withQPSCPU(t, objectiveProvider, func(t *testing.T) { lbRebalanceDimension := objectiveProvider.Objective().ToDimension() - loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps, reqCPU: tc.reqCPU}}, lbRebalanceDimension) - hottestRanges := sr.replicaRankings.TopLoad() + loadRanges(rr, s, []testRange{{voters: tc.storeIDs, qps: tc.qps, reqCPU: tc.reqCPU}}) + hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension) options := sr.scorerOptions(ctx, lbRebalanceDimension) options.LoadThreshold = allocatorimpl.WithAllDims(0.1) rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) @@ -929,10 +931,10 @@ func TestChooseRangeToRebalanceRandom(t *testing.T) { loadRanges( rr, s, []testRange{ {voters: voterStores, nonVoters: nonVoterStores, qps: perReplicaQPS, reqCPU: perReplicaReqCPU}, - }, lbRebalanceDimension, + }, ) - hottestRanges := sr.replicaRankings.TopLoad() + hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension) options := sr.scorerOptions(ctx, lbRebalanceDimension) rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{ReplicaEnforcementLevel: allocatorimpl.IOOverloadThresholdIgnore} @@ -1270,10 +1272,10 @@ func TestChooseRangeToRebalanceAcrossHeterogeneousZones(t *testing.T) { loadRanges( rr, s, []testRange{ {voters: tc.voters, nonVoters: tc.nonVoters, qps: testingQPS, reqCPU: testingReqCPU}, - }, lbRebalanceDimension, + }, ) - hottestRanges := sr.replicaRankings.TopLoad() + hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension) options := sr.scorerOptions(ctx, lbRebalanceDimension) rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, LBRebalancingLeasesAndReplicas) rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{ @@ -1360,10 +1362,9 @@ func TestChooseRangeToRebalanceIgnoresRangeOnBestStores(t *testing.T) { qps: 100, reqCPU: 100 * float64(time.Millisecond)}, }, - lbRebalanceDimension, ) - hottestRanges := sr.replicaRankings.TopLoad() + hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension) options := sr.scorerOptions(ctx, lbRebalanceDimension) rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{ @@ -1528,10 +1529,9 @@ func TestChooseRangeToRebalanceOffHotNodes(t *testing.T) { s.cfg.DefaultSpanConfig.NumReplicas = int32(len(tc.voters)) loadRanges(rr, s, []testRange{{voters: tc.voters, qps: tc.QPS, reqCPU: tc.reqCPU}}, - lbRebalanceDimension, ) - hottestRanges := sr.replicaRankings.TopLoad() + hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension) options := sr.scorerOptions(ctx, lbRebalanceDimension) rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{ @@ -1621,9 +1621,9 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { // Load in a range with replicas on an overfull node, a slightly underfull // node, and a very underfull node. - loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 4, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension) + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 4, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}) - hottestRanges := sr.replicaRankings.TopLoad() + hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension) options := sr.scorerOptions(ctx, lbRebalanceDimension) rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) repl := rctx.hottestRanges[0] @@ -1638,9 +1638,9 @@ func TestNoLeaseTransferToBehindReplicas(t *testing.T) { // Then do the same, but for replica rebalancing. Make s5 an existing replica // that's behind, and see how a new replica is preferred as the leaseholder // over it. - loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension) + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}) - hottestRanges = sr.replicaRankings.TopLoad() + hottestRanges = sr.replicaRankings.TopLoad(lbRebalanceDimension) options = sr.scorerOptions(ctx, lbRebalanceDimension) rctx = sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) rctx.options.IOOverloadOptions = allocatorimpl.IOOverloadOptions{ @@ -1798,9 +1798,9 @@ func TestStoreRebalancerIOOverloadCheck(t *testing.T) { // Load in a range with replicas on an overfull node, a slightly underfull // node, and a very underfull node. - loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}, lbRebalanceDimension) + loadRanges(rr, s, []testRange{{voters: []roachpb.StoreID{1, 3, 5}, qps: 100, reqCPU: 100 * float64(time.Millisecond)}}) - hottestRanges := sr.replicaRankings.TopLoad() + hottestRanges := sr.replicaRankings.TopLoad(lbRebalanceDimension) options := sr.scorerOptions(ctx, lbRebalanceDimension) rctx := sr.NewRebalanceContext(ctx, options, hottestRanges, sr.RebalanceMode()) require.Greater(t, len(rctx.hottestRanges), 0)