Skip to content

Commit

Permalink
[prototype] storage: Extend new allocator to also move range replicas
Browse files Browse the repository at this point in the history
With this update, TPC-C 10k on 30 went from overloaded to running at
peak efficiency over the course of about 4 hours (the manual
partitioning approach takes many hours to move all the replicas as well,
for a point of comparison). This is without having to run the replica
scatter from cockroachdb#26438.

Doing a 5 minute run to get a result that doesn't include all the
rebalancing time shows:

_elapsed_______tpmC____efc__avg(ms)__p50(ms)__p90(ms)__p95(ms)__p99(ms)_pMax(ms)
  290.9s   124799.1  97.0%    548.6    486.5    872.4   1140.9   2281.7  10200.5

I think it may have a small bug in it still, since at one point early on
one of the replicas from the warehouse table on the node doing the
relocating thought that it had 16-17k QPS, which wasn't true by any
other metric in the system. Restarting the node fixed it though.
I'm not too concerned about the bug, since I assume I just made a code
mistake, not that anything about the approach fundamentally leads to a
random SQL table replica gets 10s of thousands of QPS.

Range 1 is also back to getting a ton of QPS (~3k) even though I raised
the range cache size from 1M to 50M. Looking at slow query traces shows
a lot of range lookups, way more than I'd expect given that ranges
weren't moving around at the time of the traces.

Release note: None
  • Loading branch information
a-robinson committed Jun 14, 2018
1 parent 566c3b5 commit 853ac61
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 41 deletions.
170 changes: 129 additions & 41 deletions pkg/server/prototype_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util/log"
)

var testQPSThreshold = settings.RegisterNonNegativeFloatSetting(
"server.test_qps_threshold",
"the maximum fraction a store's qps can differ from the average before store-level rebalancing kicks in",
0.25,
0.15,
)

func (s *Server) RunStoreLevelAllocator(ctx context.Context) {
Expand Down Expand Up @@ -70,13 +71,15 @@ func (s *Server) RunStoreLevelAllocator(ctx context.Context) {
avgQPS += qps
}
avgQPS /= float64(len(qpsPerStore))
upperBound := math.Max(avgQPS*1.15, avgQPS+100)
upperBound := math.Max(avgQPS*(1+testQPSThreshold.Get(&s.st.SV)), avgQPS+100)
log.Infof(ctx, "avgQPS: %f, upperBound: %f", avgQPS, upperBound)

consideringLeases := true

// TODO: Also consider trying to move work to under-utilized stores even
// if there aren't any outliers at the top end.
topLevelLoop:
for {
for iter := 0; iter < 64; iter++ {
// Try to lessen the load on the hottest store.
hottestStore, hottestQPS := findHottestStore(qpsPerStore)
log.Infof(ctx, "hottestStore: s%d, hottestQPS: %f", hottestStore, hottestQPS)
Expand All @@ -85,61 +88,137 @@ func (s *Server) RunStoreLevelAllocator(ctx context.Context) {
}

hottestRanges := hottestRangesByStore[hottestStore]
var rangeIDs []roachpb.RangeID
if len(hottestRanges) == 0 {
log.Warningf(ctx, "no more hot ranges for s%d to move", hottestStore)
}

if consideringLeases {
var rangeIDs []roachpb.RangeID
for i := range hottestRanges {
rangeIDs = append(rangeIDs, hottestRanges[i].RangeID)
}
log.Infof(ctx, "hottest rangeIDs: %v", rangeIDs)

// First check if there are any leases we can reasonably move.
for i, r := range hottestRanges {
qps := qps(r)
log.Infof(ctx, "considering r%d, qps=%f", r.RangeID, qps)
for j := range r.Nodes {
storeID := r.Nodes[j].Range.SourceStoreID
// Transfer the lease if we can move it to a store that will still be
// under the average per-store QPS.
if qpsPerStore[storeID]+qps < avgQPS {
// Attempt to transfer the lease, and make sure we don't do
// anything else to the range this go-round.
hottestRangesByStore[hottestStore] = append(hottestRangesByStore[hottestStore][:i], hottestRangesByStore[hottestStore][i+1:]...)
log.Infof(ctx, "transferring lease for r%d (qps=%f) to s%d (qps=%f)", r.RangeID, qps, storeID, qpsPerStore[storeID])
if err := s.db.AdminTransferLease(ctx, r.Nodes[j].Range.State.ReplicaState.Desc.StartKey, storeID); err != nil {
log.Errorf(ctx, "error transferring lease for r%d to s%d: %s", r.RangeID, storeID, err)
continue topLevelLoop
}
qpsPerStore[storeID] += qps
qpsPerStore[hottestStore] -= qps
continue topLevelLoop
}
}
}
}

// If that didn't work out, then resort to rebalancing replicas.
if consideringLeases {
log.Infof(ctx, "failed to find a store to transfer a lease to; beginning to consider replica rebalances")
consideringLeases = false
}

hottestRanges = hottestRangesByStore[hottestStore]
var remainingRangeIDs []roachpb.RangeID
for i := range hottestRanges {
rangeIDs = append(rangeIDs, hottestRanges[i].RangeID)
remainingRangeIDs = append(remainingRangeIDs, hottestRanges[i].RangeID)
}
log.Infof(ctx, "hottest rangeIDs: %v", rangeIDs)
log.Infof(ctx, "hottest remaining rangeIDs: %v", remainingRangeIDs)

// First check if there are any leases we can reasonably move.
for i, r := range hottestRanges {
qps := qps(r)
log.Infof(ctx, "considering r%d, qps=%f", r.RangeID, qps)

// Pick out the stores that we want the range on, keeping existing
// replicas around if they aren't on overfull stores.
const desiredReplicas = 3
targets := make([]roachpb.ReplicationTarget, 0, desiredReplicas)
for j := range r.Nodes {
storeID := r.Nodes[j].Range.SourceStoreID
// Transfer the lease if we can move it to a store that will still be
// under the average per-store QPS.
if qpsPerStore[storeID]+qps < avgQPS {
// Attempt to transfer the lease, and make sure we don't do
// anything else to the range this go-round.
hottestRangesByStore[hottestStore] = append(hottestRangesByStore[hottestStore][:i], hottestRangesByStore[hottestStore][i+1:]...)
log.Infof(ctx, "transferring lease for r%d (qps=%f) to s%d (qps=%f)", r.RangeID, qps, storeID, qpsPerStore[storeID])
if err := s.db.AdminTransferLease(ctx, r.Nodes[j].Range.State.ReplicaState.Desc.StartKey, storeID); err != nil {
log.Errorf(ctx, "error transferring lease for r%d to s%d: %s", r.RangeID, storeID, err)
continue topLevelLoop
}
qpsPerStore[storeID] += qps
qpsPerStore[hottestStore] -= qps
continue topLevelLoop
if qpsPerStore[storeID] < upperBound {
targets = append(targets, roachpb.ReplicationTarget{
NodeID: r.Nodes[j].Range.SourceNodeID,
StoreID: storeID,
})
}
}
}

log.Infof(ctx, "failed to find a store to transfer a lease to")
break topLevelLoop

/*
// If that didn't work out, then resort to rebalancing replicas.
log.Infof(ctx, "failed to find a store to transfer a lease to; beginning to consider replica rebalances")
// Then pick out which new stores to add the remaining replicas to.
for storeID, candidateQPS := range qpsPerStore {
if len(targets) >= desiredReplicas {
break
}
if candidateQPS+qps < avgQPS && !existingTarget(targets, storeID) {
desc, found := s.storePool.GetStoreDescriptor(storeID)
if !found {
log.Errorf(ctx, "couldn't find store descriptor for s%d", storeID)
}
targets = append(targets, roachpb.ReplicationTarget{
NodeID: desc.Node.NodeID,
StoreID: storeID,
})
}
}

hottestRanges := hottestRangesByStore[hottestStore]
var rangeIDs []roachpb.RangeID
for i := range hottestRanges {
rangeIDs = append(rangeIDs, hottestRanges[i].RangeID)
// If we still don't have enough targets, let them go up to the upper bound.
for storeID, candidateQPS := range qpsPerStore {
if len(targets) >= desiredReplicas {
break
}
if candidateQPS+qps < upperBound && !existingTarget(targets, storeID) {
desc, found := s.storePool.GetStoreDescriptor(storeID)
if !found {
log.Errorf(ctx, "couldn't find store descriptor for s%d", storeID)
}
targets = append(targets, roachpb.ReplicationTarget{
NodeID: desc.Node.NodeID,
StoreID: storeID,
})
}
}
log.Infof(ctx, "hottest remaining rangeIDs: %v", rangeIDs)

for i, r := range hottestRanges {
qps := qps(r)
log.Infof(ctx, "considering r%d, qps=%f", r.RangeID, qps)
if len(targets) < desiredReplicas {
continue
}

for j := range r.Nodes {
// Pick the replica with the least QPS to be leaseholder;
// TestingRelocateRange transfers the lease to the first provided
// target.
newLeaseIdx := 0
for j := 1; j < len(targets); j++ {
if qpsPerStore[targets[j].StoreID] < qpsPerStore[targets[newLeaseIdx].StoreID] {
newLeaseIdx = j
}
}
targets[0], targets[newLeaseIdx] = targets[newLeaseIdx], targets[0]

// Attempt to relocate the range, and make sure we don't do
// anything else to the range this go-round.
hottestRangesByStore[hottestStore] = append(hottestRangesByStore[hottestStore][:i], hottestRangesByStore[hottestStore][i+1:]...)
log.Infof(ctx, "relocating range r%d from %v to %v; new leaseholder qps = %f", r.RangeID, r.Nodes[0].Range.State.ReplicaState.Desc, targets, qpsPerStore[targets[0].StoreID])
if err := storage.TestingRelocateRange(
ctx, s.db, *r.Nodes[0].Range.State.ReplicaState.Desc, targets,
); err != nil {
log.Errorf(ctx, "error relocating range r%d to %v: %s", r.RangeID, targets, err)
continue topLevelLoop
}

// TODO
//storage.TestingRelocateRange(ctx, s.db, rangeDesc, targets)
*/
qpsPerStore[hottestStore] -= qps
qpsPerStore[targets[0].StoreID] += qps
continue topLevelLoop
}
}
}
}
Expand Down Expand Up @@ -168,6 +247,15 @@ func findColdestStore(qpsPerStore map[roachpb.StoreID]float64) (roachpb.StoreID,
return storeID, qps
}

func existingTarget(targets []roachpb.ReplicationTarget, newStore roachpb.StoreID) bool {
for _, target := range targets {
if newStore == target.StoreID {
return true
}
}
return false
}

func processResponse(
resp *serverpb.RaftDebugResponse,
) (map[roachpb.StoreID]float64, map[roachpb.StoreID][]*serverpb.RaftRangeStatus) {
Expand All @@ -184,7 +272,7 @@ func processResponse(
hottestRangeQueues[lease] = pq
}
heap.Push(pq, &r)
if pq.Len() > 32 {
if pq.Len() > 64 {
heap.Pop(pq)
}
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/storage/store_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,12 @@ func (sp *StorePool) getStoreDetailLocked(storeID roachpb.StoreID) *storeDetail
return detail
}

// GetStoreDescriptor returns the latest store descriptor for the given
// storeID.
func (sp *StorePool) GetStoreDescriptor(storeID roachpb.StoreID) (roachpb.StoreDescriptor, bool) {
return sp.getStoreDescriptor(storeID)
}

// getStoreDescriptor returns the latest store descriptor for the given
// storeID.
func (sp *StorePool) getStoreDescriptor(storeID roachpb.StoreID) (roachpb.StoreDescriptor, bool) {
Expand Down

0 comments on commit 853ac61

Please sign in to comment.