diff --git a/pkg/server/prototype_allocator.go b/pkg/server/prototype_allocator.go index a65214c659b1..a27bfed92eed 100644 --- a/pkg/server/prototype_allocator.go +++ b/pkg/server/prototype_allocator.go @@ -23,13 +23,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/settings" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util/log" ) var testQPSThreshold = settings.RegisterNonNegativeFloatSetting( "server.test_qps_threshold", "the maximum fraction a store's qps can differ from the average before store-level rebalancing kicks in", - 0.25, + 0.15, ) func (s *Server) RunStoreLevelAllocator(ctx context.Context) { @@ -70,13 +71,15 @@ func (s *Server) RunStoreLevelAllocator(ctx context.Context) { avgQPS += qps } avgQPS /= float64(len(qpsPerStore)) - upperBound := math.Max(avgQPS*1.15, avgQPS+100) + upperBound := math.Max(avgQPS*(1+testQPSThreshold.Get(&s.st.SV)), avgQPS+100) log.Infof(ctx, "avgQPS: %f, upperBound: %f", avgQPS, upperBound) + consideringLeases := true + // TODO: Also consider trying to move work to under-utilized stores even // if there aren't any outliers at the top end. topLevelLoop: - for { + for iter := 0; iter < 64; iter++ { // Try to lessen the load on the hottest store. hottestStore, hottestQPS := findHottestStore(qpsPerStore) log.Infof(ctx, "hottestStore: s%d, hottestQPS: %f", hottestStore, hottestQPS) @@ -85,61 +88,137 @@ func (s *Server) RunStoreLevelAllocator(ctx context.Context) { } hottestRanges := hottestRangesByStore[hottestStore] - var rangeIDs []roachpb.RangeID + if len(hottestRanges) == 0 { + log.Warningf(ctx, "no more hot ranges for s%d to move", hottestStore) + } + + if consideringLeases { + var rangeIDs []roachpb.RangeID + for i := range hottestRanges { + rangeIDs = append(rangeIDs, hottestRanges[i].RangeID) + } + log.Infof(ctx, "hottest rangeIDs: %v", rangeIDs) + + // First check if there are any leases we can reasonably move. + for i, r := range hottestRanges { + qps := qps(r) + log.Infof(ctx, "considering r%d, qps=%f", r.RangeID, qps) + for j := range r.Nodes { + storeID := r.Nodes[j].Range.SourceStoreID + // Transfer the lease if we can move it to a store that will still be + // under the average per-store QPS. + if qpsPerStore[storeID]+qps < avgQPS { + // Attempt to transfer the lease, and make sure we don't do + // anything else to the range this go-round. + hottestRangesByStore[hottestStore] = append(hottestRangesByStore[hottestStore][:i], hottestRangesByStore[hottestStore][i+1:]...) + log.Infof(ctx, "transferring lease for r%d (qps=%f) to s%d (qps=%f)", r.RangeID, qps, storeID, qpsPerStore[storeID]) + if err := s.db.AdminTransferLease(ctx, r.Nodes[j].Range.State.ReplicaState.Desc.StartKey, storeID); err != nil { + log.Errorf(ctx, "error transferring lease for r%d to s%d: %s", r.RangeID, storeID, err) + continue topLevelLoop + } + qpsPerStore[storeID] += qps + qpsPerStore[hottestStore] -= qps + continue topLevelLoop + } + } + } + } + + // If that didn't work out, then resort to rebalancing replicas. + if consideringLeases { + log.Infof(ctx, "failed to find a store to transfer a lease to; beginning to consider replica rebalances") + consideringLeases = false + } + + hottestRanges = hottestRangesByStore[hottestStore] + var remainingRangeIDs []roachpb.RangeID for i := range hottestRanges { - rangeIDs = append(rangeIDs, hottestRanges[i].RangeID) + remainingRangeIDs = append(remainingRangeIDs, hottestRanges[i].RangeID) } - log.Infof(ctx, "hottest rangeIDs: %v", rangeIDs) + log.Infof(ctx, "hottest remaining rangeIDs: %v", remainingRangeIDs) - // First check if there are any leases we can reasonably move. for i, r := range hottestRanges { qps := qps(r) log.Infof(ctx, "considering r%d, qps=%f", r.RangeID, qps) + + // Pick out the stores that we want the range on, keeping existing + // replicas around if they aren't on overfull stores. + const desiredReplicas = 3 + targets := make([]roachpb.ReplicationTarget, 0, desiredReplicas) for j := range r.Nodes { storeID := r.Nodes[j].Range.SourceStoreID - // Transfer the lease if we can move it to a store that will still be - // under the average per-store QPS. - if qpsPerStore[storeID]+qps < avgQPS { - // Attempt to transfer the lease, and make sure we don't do - // anything else to the range this go-round. - hottestRangesByStore[hottestStore] = append(hottestRangesByStore[hottestStore][:i], hottestRangesByStore[hottestStore][i+1:]...) - log.Infof(ctx, "transferring lease for r%d (qps=%f) to s%d (qps=%f)", r.RangeID, qps, storeID, qpsPerStore[storeID]) - if err := s.db.AdminTransferLease(ctx, r.Nodes[j].Range.State.ReplicaState.Desc.StartKey, storeID); err != nil { - log.Errorf(ctx, "error transferring lease for r%d to s%d: %s", r.RangeID, storeID, err) - continue topLevelLoop - } - qpsPerStore[storeID] += qps - qpsPerStore[hottestStore] -= qps - continue topLevelLoop + if qpsPerStore[storeID] < upperBound { + targets = append(targets, roachpb.ReplicationTarget{ + NodeID: r.Nodes[j].Range.SourceNodeID, + StoreID: storeID, + }) } } - } - - log.Infof(ctx, "failed to find a store to transfer a lease to") - break topLevelLoop - /* - // If that didn't work out, then resort to rebalancing replicas. - log.Infof(ctx, "failed to find a store to transfer a lease to; beginning to consider replica rebalances") + // Then pick out which new stores to add the remaining replicas to. + for storeID, candidateQPS := range qpsPerStore { + if len(targets) >= desiredReplicas { + break + } + if candidateQPS+qps < avgQPS && !existingTarget(targets, storeID) { + desc, found := s.storePool.GetStoreDescriptor(storeID) + if !found { + log.Errorf(ctx, "couldn't find store descriptor for s%d", storeID) + } + targets = append(targets, roachpb.ReplicationTarget{ + NodeID: desc.Node.NodeID, + StoreID: storeID, + }) + } + } - hottestRanges := hottestRangesByStore[hottestStore] - var rangeIDs []roachpb.RangeID - for i := range hottestRanges { - rangeIDs = append(rangeIDs, hottestRanges[i].RangeID) + // If we still don't have enough targets, let them go up to the upper bound. + for storeID, candidateQPS := range qpsPerStore { + if len(targets) >= desiredReplicas { + break + } + if candidateQPS+qps < upperBound && !existingTarget(targets, storeID) { + desc, found := s.storePool.GetStoreDescriptor(storeID) + if !found { + log.Errorf(ctx, "couldn't find store descriptor for s%d", storeID) + } + targets = append(targets, roachpb.ReplicationTarget{ + NodeID: desc.Node.NodeID, + StoreID: storeID, + }) + } } - log.Infof(ctx, "hottest remaining rangeIDs: %v", rangeIDs) - for i, r := range hottestRanges { - qps := qps(r) - log.Infof(ctx, "considering r%d, qps=%f", r.RangeID, qps) + if len(targets) < desiredReplicas { + continue + } - for j := range r.Nodes { + // Pick the replica with the least QPS to be leaseholder; + // TestingRelocateRange transfers the lease to the first provided + // target. + newLeaseIdx := 0 + for j := 1; j < len(targets); j++ { + if qpsPerStore[targets[j].StoreID] < qpsPerStore[targets[newLeaseIdx].StoreID] { + newLeaseIdx = j } } + targets[0], targets[newLeaseIdx] = targets[newLeaseIdx], targets[0] + + // Attempt to relocate the range, and make sure we don't do + // anything else to the range this go-round. + hottestRangesByStore[hottestStore] = append(hottestRangesByStore[hottestStore][:i], hottestRangesByStore[hottestStore][i+1:]...) + log.Infof(ctx, "relocating range r%d from %v to %v; new leaseholder qps = %f", r.RangeID, r.Nodes[0].Range.State.ReplicaState.Desc, targets, qpsPerStore[targets[0].StoreID]) + if err := storage.TestingRelocateRange( + ctx, s.db, *r.Nodes[0].Range.State.ReplicaState.Desc, targets, + ); err != nil { + log.Errorf(ctx, "error relocating range r%d to %v: %s", r.RangeID, targets, err) + continue topLevelLoop + } - // TODO - //storage.TestingRelocateRange(ctx, s.db, rangeDesc, targets) - */ + qpsPerStore[hottestStore] -= qps + qpsPerStore[targets[0].StoreID] += qps + continue topLevelLoop + } } } } @@ -168,6 +247,15 @@ func findColdestStore(qpsPerStore map[roachpb.StoreID]float64) (roachpb.StoreID, return storeID, qps } +func existingTarget(targets []roachpb.ReplicationTarget, newStore roachpb.StoreID) bool { + for _, target := range targets { + if newStore == target.StoreID { + return true + } + } + return false +} + func processResponse( resp *serverpb.RaftDebugResponse, ) (map[roachpb.StoreID]float64, map[roachpb.StoreID][]*serverpb.RaftRangeStatus) { @@ -184,7 +272,7 @@ func processResponse( hottestRangeQueues[lease] = pq } heap.Push(pq, &r) - if pq.Len() > 32 { + if pq.Len() > 64 { heap.Pop(pq) } } diff --git a/pkg/storage/store_pool.go b/pkg/storage/store_pool.go index 974e9a42176e..0ddc767ac740 100644 --- a/pkg/storage/store_pool.go +++ b/pkg/storage/store_pool.go @@ -372,6 +372,12 @@ func (sp *StorePool) getStoreDetailLocked(storeID roachpb.StoreID) *storeDetail return detail } +// GetStoreDescriptor returns the latest store descriptor for the given +// storeID. +func (sp *StorePool) GetStoreDescriptor(storeID roachpb.StoreID) (roachpb.StoreDescriptor, bool) { + return sp.getStoreDescriptor(storeID) +} + // getStoreDescriptor returns the latest store descriptor for the given // storeID. func (sp *StorePool) getStoreDescriptor(storeID roachpb.StoreID) (roachpb.StoreDescriptor, bool) {