Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

backport-2.1: storage: make load-based replica rebalancing decisions at the store level #29663

Merged
merged 5 commits into from
Sep 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/generated/settings/settings.html
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
<tr><td><code>jobs.registry.leniency</code></td><td>duration</td><td><code>1m0s</code></td><td>the amount of time to defer any attempts to reschedule a job</td></tr>
<tr><td><code>kv.allocator.lease_rebalancing_aggressiveness</code></td><td>float</td><td><code>1</code></td><td>set greater than 1.0 to rebalance leases toward load more aggressively, or between 0 and 1.0 to be more conservative about rebalancing leases</td></tr>
<tr><td><code>kv.allocator.load_based_lease_rebalancing.enabled</code></td><td>boolean</td><td><code>true</code></td><td>set to enable rebalancing of range leases based on load and latency</td></tr>
<tr><td><code>kv.allocator.load_based_rebalancing</code></td><td>enumeration</td><td><code>1</code></td><td>whether to rebalance based on the distribution of QPS across stores [off = 0, leases = 1, leases and replicas = 2]</td></tr>
<tr><td><code>kv.allocator.qps_rebalance_threshold</code></td><td>float</td><td><code>0.25</code></td><td>minimum fraction away from the mean a store's QPS (such as queries per second) can be before it is considered overfull or underfull</td></tr>
<tr><td><code>kv.allocator.range_rebalance_threshold</code></td><td>float</td><td><code>0.05</code></td><td>minimum fraction away from the mean a store's range count can be before it is considered overfull or underfull</td></tr>
<tr><td><code>kv.allocator.stat_based_rebalancing.enabled</code></td><td>boolean</td><td><code>false</code></td><td>set to enable rebalancing range replicas and leases to more evenly distribute read and write load across the stores in a cluster</td></tr>
<tr><td><code>kv.allocator.stat_rebalance_threshold</code></td><td>float</td><td><code>0.2</code></td><td>minimum fraction away from the mean a store's stats (like disk usage or writes per second) can be before it is considered overfull or underfull</td></tr>
<tr><td><code>kv.bulk_io_write.concurrent_export_requests</code></td><td>integer</td><td><code>5</code></td><td>number of export requests a store will handle concurrently before queuing</td></tr>
<tr><td><code>kv.bulk_io_write.concurrent_import_requests</code></td><td>integer</td><td><code>1</code></td><td>number of import requests a store will handle concurrently before queuing</td></tr>
<tr><td><code>kv.bulk_io_write.max_rate</code></td><td>byte size</td><td><code>8.0 EiB</code></td><td>the rate limit (bytes/sec) to use for writes to disk on behalf of bulk io ops</td></tr>
Expand Down
20 changes: 15 additions & 5 deletions pkg/cmd/roachtest/rebalance_load.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,30 @@ func registerRebalanceLoad(r *registry) {
}
}

minutes := 2 * time.Minute
numNodes := 4 // the last node is just used to generate load
concurrency := 128

r.Add(testSpec{
Name: `rebalance-leases-by-load`,
Nodes: nodes(numNodes),
Stable: false, // TODO(a-robinson): Promote to stable
Nodes: nodes(4), // the last node is just used to generate load
Stable: false, // TODO(a-robinson): Promote to stable
Run: func(ctx context.Context, t *test, c *cluster) {
if local {
concurrency = 32
fmt.Printf("lowering concurrency to %d in local testing\n", concurrency)
}
rebalanceLoadRun(ctx, t, c, minutes, concurrency)
rebalanceLoadRun(ctx, t, c, 2*time.Minute, concurrency)
},
})
r.Add(testSpec{
Name: `rebalance-replicas-by-load`,
Nodes: nodes(7), // the last node is just used to generate load
Stable: false, // TODO(a-robinson): Promote to stable
Run: func(ctx context.Context, t *test, c *cluster) {
if local {
concurrency = 32
fmt.Printf("lowering concurrency to %d in local testing\n", concurrency)
}
rebalanceLoadRun(ctx, t, c, 5*time.Minute, concurrency)
},
})
}
Expand Down
5 changes: 4 additions & 1 deletion pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,13 @@ var Registry = make(map[string]Setting)
// When a setting is removed, it should be added to this list so that we cannot
// accidentally reuse its name, potentially mis-handling older values.
var retiredSettings = map[string]struct{}{
//removed as of 2.0.
// removed as of 2.0.
"kv.gc.batch_size": {},
"kv.transaction.max_intents": {},
"diagnostics.reporting.report_metrics": {},
// removed as of 2.1.
"kv.allocator.stat_based_rebalancing.enabled": {},
"kv.allocator.stat_rebalance_threshold": {},
}

// Register adds a setting to the registry.
Expand Down
91 changes: 40 additions & 51 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ var leaseRebalancingAggressiveness = settings.RegisterNonNegativeFloatSetting(
1.0,
)

func statsBasedRebalancingEnabled(st *cluster.Settings, disableStatsBasedRebalance bool) bool {
return EnableStatsBasedRebalancing.Get(&st.SV) && st.Version.IsActive(cluster.VersionStatsBasedRebalancing) && !disableStatsBasedRebalance
}

// AllocatorAction enumerates the various replication adjustments that may be
// recommended by the allocator.
type AllocatorAction int
Expand Down Expand Up @@ -247,10 +243,7 @@ func MakeAllocator(
// supplied range, as governed by the supplied zone configuration. It
// returns the required action that should be taken and a priority.
func (a *Allocator) ComputeAction(
ctx context.Context,
zone config.ZoneConfig,
rangeInfo RangeInfo,
disableStatsBasedRebalancing bool,
ctx context.Context, zone config.ZoneConfig, rangeInfo RangeInfo,
) (AllocatorAction, float64) {
if a.storePool == nil {
// Do nothing if storePool is nil for some unittests.
Expand Down Expand Up @@ -341,10 +334,8 @@ func (a *Allocator) ComputeAction(
}

type decisionDetails struct {
Target string
Existing string `json:",omitempty"`
RangeBytes int64 `json:",omitempty"`
RangeWritesPerSecond float64 `json:",omitempty"`
Target string
Existing string `json:",omitempty"`
}

// AllocateTarget returns a suitable store for a new allocation with the
Expand All @@ -357,42 +348,54 @@ func (a *Allocator) AllocateTarget(
zone config.ZoneConfig,
existing []roachpb.ReplicaDescriptor,
rangeInfo RangeInfo,
disableStatsBasedRebalancing bool,
) (*roachpb.StoreDescriptor, string, error) {
sl, aliveStoreCount, throttledStoreCount := a.storePool.getStoreList(rangeInfo.Desc.RangeID, storeFilterThrottled)

target, details := a.allocateTargetFromList(
ctx, sl, zone, existing, rangeInfo, a.scorerOptions())

if target != nil {
return target, details, nil
}

// When there are throttled stores that do match, we shouldn't send
// the replica to purgatory.
if throttledStoreCount > 0 {
return nil, "", errors.Errorf("%d matching stores are currently throttled", throttledStoreCount)
}
return nil, "", &allocatorError{
constraints: zone.Constraints,
existingReplicas: len(existing),
aliveStores: aliveStoreCount,
throttledStores: throttledStoreCount,
}
}

func (a *Allocator) allocateTargetFromList(
ctx context.Context,
sl StoreList,
zone config.ZoneConfig,
existing []roachpb.ReplicaDescriptor,
rangeInfo RangeInfo,
options scorerOptions,
) (*roachpb.StoreDescriptor, string) {
analyzedConstraints := analyzeConstraints(
ctx, a.storePool.getStoreDescriptor, existing, zone)
options := a.scorerOptions(disableStatsBasedRebalancing)
candidates := allocateCandidates(
sl, analyzedConstraints, existing, rangeInfo, a.storePool.getLocalities(existing), options,
)
log.VEventf(ctx, 3, "allocate candidates: %s", candidates)
if target := candidates.selectGood(a.randGen); target != nil {
log.VEventf(ctx, 3, "add target: %s", target)
details := decisionDetails{Target: target.compactString(options)}
if options.statsBasedRebalancingEnabled {
details.RangeBytes = rangeInfo.LogicalBytes
details.RangeWritesPerSecond = rangeInfo.WritesPerSecond
}
detailsBytes, err := json.Marshal(details)
if err != nil {
log.Warningf(ctx, "failed to marshal details for choosing allocate target: %s", err)
}
return &target.store, string(detailsBytes), nil
return &target.store, string(detailsBytes)
}

// When there are throttled stores that do match, we shouldn't send
// the replica to purgatory.
if throttledStoreCount > 0 {
return nil, "", errors.Errorf("%d matching stores are currently throttled", throttledStoreCount)
}
return nil, "", &allocatorError{
constraints: zone.Constraints,
existingReplicas: len(existing),
aliveStores: aliveStoreCount,
throttledStores: throttledStoreCount,
}
return nil, ""
}

func (a Allocator) simulateRemoveTarget(
Expand All @@ -401,7 +404,6 @@ func (a Allocator) simulateRemoveTarget(
zone config.ZoneConfig,
candidates []roachpb.ReplicaDescriptor,
rangeInfo RangeInfo,
disableStatsBasedRebalancing bool,
) (roachpb.ReplicaDescriptor, string, error) {
// Update statistics first
// TODO(a-robinson): This could theoretically interfere with decisions made by other goroutines,
Expand All @@ -413,7 +415,7 @@ func (a Allocator) simulateRemoveTarget(
a.storePool.updateLocalStoreAfterRebalance(targetStore, rangeInfo, roachpb.REMOVE_REPLICA)
}()
log.VEventf(ctx, 3, "simulating which replica would be removed after adding s%d", targetStore)
return a.RemoveTarget(ctx, zone, candidates, rangeInfo, disableStatsBasedRebalancing)
return a.RemoveTarget(ctx, zone, candidates, rangeInfo)
}

// RemoveTarget returns a suitable replica to remove from the provided replica
Expand All @@ -426,7 +428,6 @@ func (a Allocator) RemoveTarget(
zone config.ZoneConfig,
candidates []roachpb.ReplicaDescriptor,
rangeInfo RangeInfo,
disableStatsBasedRebalancing bool,
) (roachpb.ReplicaDescriptor, string, error) {
if len(candidates) == 0 {
return roachpb.ReplicaDescriptor{}, "", errors.Errorf("must supply at least one candidate replica to allocator.RemoveTarget()")
Expand All @@ -441,7 +442,7 @@ func (a Allocator) RemoveTarget(

analyzedConstraints := analyzeConstraints(
ctx, a.storePool.getStoreDescriptor, rangeInfo.Desc.Replicas, zone)
options := a.scorerOptions(disableStatsBasedRebalancing)
options := a.scorerOptions()
rankedCandidates := removeCandidates(
sl,
analyzedConstraints,
Expand All @@ -455,10 +456,6 @@ func (a Allocator) RemoveTarget(
if exist.StoreID == bad.store.StoreID {
log.VEventf(ctx, 3, "remove target: %s", bad)
details := decisionDetails{Target: bad.compactString(options)}
if options.statsBasedRebalancingEnabled {
details.RangeBytes = rangeInfo.LogicalBytes
details.RangeWritesPerSecond = rangeInfo.WritesPerSecond
}
detailsBytes, err := json.Marshal(details)
if err != nil {
log.Warningf(ctx, "failed to marshal details for choosing remove target: %s", err)
Expand Down Expand Up @@ -495,7 +492,6 @@ func (a Allocator) RebalanceTarget(
raftStatus *raft.Status,
rangeInfo RangeInfo,
filter storeFilter,
disableStatsBasedRebalancing bool,
) (*roachpb.StoreDescriptor, string) {
sl, _, _ := a.storePool.getStoreList(rangeInfo.Desc.RangeID, filter)

Expand Down Expand Up @@ -529,7 +525,7 @@ func (a Allocator) RebalanceTarget(

analyzedConstraints := analyzeConstraints(
ctx, a.storePool.getStoreDescriptor, rangeInfo.Desc.Replicas, zone)
options := a.scorerOptions(disableStatsBasedRebalancing)
options := a.scorerOptions()
results := rebalanceCandidates(
ctx,
sl,
Expand Down Expand Up @@ -585,8 +581,7 @@ func (a Allocator) RebalanceTarget(
target.store.StoreID,
zone,
replicaCandidates,
rangeInfo,
disableStatsBasedRebalancing)
rangeInfo)
if err != nil {
log.Warningf(ctx, "simulating RemoveTarget failed: %s", err)
return nil, ""
Expand All @@ -606,10 +601,6 @@ func (a Allocator) RebalanceTarget(
Target: target.compactString(options),
Existing: existingCandidates.compactString(options),
}
if options.statsBasedRebalancingEnabled {
details.RangeBytes = rangeInfo.LogicalBytes
details.RangeWritesPerSecond = rangeInfo.WritesPerSecond
}
detailsBytes, err := json.Marshal(details)
if err != nil {
log.Warningf(ctx, "failed to marshal details for choosing rebalance target: %s", err)
Expand All @@ -618,12 +609,10 @@ func (a Allocator) RebalanceTarget(
return &target.store, string(detailsBytes)
}

func (a *Allocator) scorerOptions(disableStatsBasedRebalancing bool) scorerOptions {
func (a *Allocator) scorerOptions() scorerOptions {
return scorerOptions{
deterministic: a.storePool.deterministic,
statsBasedRebalancingEnabled: statsBasedRebalancingEnabled(a.storePool.st, disableStatsBasedRebalancing),
rangeRebalanceThreshold: rangeRebalanceThreshold.Get(&a.storePool.st.SV),
statRebalanceThreshold: statRebalanceThreshold.Get(&a.storePool.st.SV),
deterministic: a.storePool.deterministic,
rangeRebalanceThreshold: rangeRebalanceThreshold.Get(&a.storePool.st.SV),
}
}

Expand Down
Loading