Skip to content

Commit

Permalink
storage: add Allocator.TransferLease{Source,Target}
Browse files Browse the repository at this point in the history
Before (`allocsim -n 4 -w 4`):

_elapsed__ops/sec___errors_replicas_________1_________2_________3_________4
    5m0s   1472.5        0     2124   704/687     492/0     522/0     406/0

After:

_elapsed__ops/sec___errors_replicas_________1_________2_________3_________4
    5m0s   1506.7        0     2157   520/183   548/180   547/166   542/180

I do see a little bit of thrashiness with the lease-transfer heuristics,
though it settles down relatively quickly so I'm not sure if it is worth
addressing yet.

Depends on cockroachdb#10420
  • Loading branch information
petermattis committed Nov 8, 2016
1 parent 24dac05 commit e109d2e
Show file tree
Hide file tree
Showing 5 changed files with 242 additions and 34 deletions.
94 changes: 87 additions & 7 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package storage

import (
"fmt"
"math"
"math/rand"

"golang.org/x/net/context"
Expand Down Expand Up @@ -235,9 +236,10 @@ func (a *Allocator) AllocateTarget(
}

// RemoveTarget returns a suitable replica to remove from the provided replica
// set. It attempts to consider which of the provided replicas would be the best
// candidate for removal. It also will exclude any replica that belongs to the
// range lease holder's store ID.
// set. It first attempts to randomly select a target from the set of stores
// that have greater than the average number of replicas. Failing that, it
// falls back to selecting a random target from any of the existing
// replicas. It also will exclude any replica that lives on leaseStoreID.
//
// TODO(mrtracy): removeTarget eventually needs to accept the attributes from
// the zone config associated with the provided replicas. This will allow it to
Expand All @@ -251,12 +253,12 @@ func (a Allocator) RemoveTarget(
}

// Retrieve store descriptors for the provided replicas from the StorePool.
var descriptors []roachpb.StoreDescriptor
descriptors := make([]roachpb.StoreDescriptor, 0, len(existing))
for _, exist := range existing {
if exist.StoreID == leaseStoreID {
continue
}
if desc, ok := a.storePool.getStoreDescriptor(exist.StoreID); ok {
if exist.StoreID == leaseStoreID {
continue
}
descriptors = append(descriptors, desc)
}
}
Expand Down Expand Up @@ -330,6 +332,84 @@ func (a Allocator) RebalanceTarget(
return a.improve(sl, existingNodes)
}

// TransferLeaseTarget returns a suitable replica to transfer the range lease
// to from the provided list. It excludes the current lease holder replica.
func (a *Allocator) TransferLeaseTarget(
constraints config.Constraints,
existing []roachpb.ReplicaDescriptor,
leaseStoreID roachpb.StoreID,
rangeID roachpb.RangeID,
checkTransferLeaseSource bool,
) roachpb.ReplicaDescriptor {
if !a.options.AllowRebalance {
return roachpb.ReplicaDescriptor{}
}

sl, _, _ := a.storePool.getStoreList(rangeID)
sl = sl.filter(constraints)

source, ok := a.storePool.getStoreDescriptor(leaseStoreID)
if !ok {
return roachpb.ReplicaDescriptor{}
}
if checkTransferLeaseSource && !shouldTransferLease(sl, source) {
return roachpb.ReplicaDescriptor{}
}

candidates := make([]roachpb.ReplicaDescriptor, 0, len(existing)-1)
for _, repl := range existing {
if leaseStoreID == repl.StoreID {
continue
}
storeDesc, ok := a.storePool.getStoreDescriptor(repl.StoreID)
if !ok {
continue
}
if float64(storeDesc.Capacity.LeaseCount) < sl.candidateLeases.mean-0.5 {
candidates = append(candidates, repl)
}
}
if len(candidates) == 0 {
return roachpb.ReplicaDescriptor{}
}
a.randGen.Lock()
defer a.randGen.Unlock()
return candidates[a.randGen.Intn(len(candidates))]
}

// ShouldTransferLease returns true if the specified store is overfull in terms
// of leases with respect to the other stores matching the specified
// attributes.
func (a *Allocator) ShouldTransferLease(
constraints config.Constraints, leaseStoreID roachpb.StoreID, rangeID roachpb.RangeID,
) bool {
if !a.options.AllowRebalance {
return false
}

source, ok := a.storePool.getStoreDescriptor(leaseStoreID)
if !ok {
return false
}
sl, _, _ := a.storePool.getStoreList(rangeID)
sl = sl.filter(constraints)
if log.V(3) {
log.Infof(context.TODO(), "transfer-lease-source (lease-holder=%d):\n%s", leaseStoreID, sl)
}
return shouldTransferLease(sl, source)
}

func shouldTransferLease(sl StoreList, candidate roachpb.StoreDescriptor) bool {
// Allow lease transfer if we're above the overfull threshold, which is
// mean*(1+rebalanceThreshold).
overfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 + rebalanceThreshold)))
minOverfullThreshold := int32(math.Ceil(sl.candidateLeases.mean + 5))
if overfullLeaseThreshold < minOverfullThreshold {
overfullLeaseThreshold = minOverfullThreshold
}
return candidate.Capacity.LeaseCount > overfullLeaseThreshold
}

// selectGood attempts to select a store from the supplied store list that it
// considers to be 'Good' relative to the other stores in the list. Any nodes
// in the supplied 'exclude' list will be disqualified from selection. Returns
Expand Down
96 changes: 93 additions & 3 deletions pkg/storage/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,96 @@ func TestAllocatorRebalanceByCount(t *testing.T) {
}
}

func TestAllocatorTransferLeaseTarget(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, g, _, a, _ := createTestAllocator(true)
defer stopper.Stop()

// 3 stores where the lease count for each store is equal to 10x the store
// ID.
var stores []*roachpb.StoreDescriptor
for i := 1; i <= 3; i++ {
stores = append(stores, &roachpb.StoreDescriptor{
StoreID: roachpb.StoreID(i),
Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)},
Capacity: roachpb.StoreCapacity{LeaseCount: int32(10 * i)},
})
}
sg := gossiputil.NewStoreGossiper(g)
sg.GossipStores(stores, t)

existing := []roachpb.ReplicaDescriptor{
{StoreID: 1},
{StoreID: 2},
{StoreID: 3},
}

// TODO(peter): Add test cases for non-empty constraints.
testCases := []struct {
existing []roachpb.ReplicaDescriptor
leaseholder roachpb.StoreID
check bool
expected roachpb.StoreID
}{
// No existing lease holder, nothing to do.
{existing: existing, leaseholder: 0, check: true, expected: 0},
// Store 1 is not a lease transfer source.
{existing: existing, leaseholder: 1, check: true, expected: 0},
{existing: existing, leaseholder: 1, check: false, expected: 0},
// Store 2 is not a lease transfer source.
{existing: existing, leaseholder: 2, check: true, expected: 0},
{existing: existing, leaseholder: 2, check: false, expected: 1},
// Store 3 is a lease transfer source.
{existing: existing, leaseholder: 3, check: true, expected: 1},
{existing: existing, leaseholder: 3, check: false, expected: 1},
}
for i, c := range testCases {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
target := a.TransferLeaseTarget(config.Constraints{},
c.existing, c.leaseholder, 0, c.check)
if c.expected != target.StoreID {
t.Fatalf("expected %d, but found %d", c.expected, target.StoreID)
}
})
}
}

func TestAllocatorShouldTransferLease(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, g, _, a, _ := createTestAllocator(true)
defer stopper.Stop()

// 3 stores where the lease count for each store is equal to 10x the store
// ID.
var stores []*roachpb.StoreDescriptor
for i := 1; i <= 3; i++ {
stores = append(stores, &roachpb.StoreDescriptor{
StoreID: roachpb.StoreID(i),
Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)},
Capacity: roachpb.StoreCapacity{LeaseCount: int32(10 * i)},
})
}
sg := gossiputil.NewStoreGossiper(g)
sg.GossipStores(stores, t)

testCases := []struct {
leaseholder roachpb.StoreID
expected bool
}{
{leaseholder: 1, expected: false},
{leaseholder: 2, expected: false},
{leaseholder: 3, expected: true},
}
for i, c := range testCases {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
result := a.ShouldTransferLease(config.Constraints{}, c.leaseholder, 0)
if c.expected != result {
t.Fatalf("expected %v, but found %v", c.expected, result)
}
})
}
}

// TestAllocatorRemoveTarget verifies that the replica chosen by RemoveTarget is
// the one with the lowest capacity.
func TestAllocatorRemoveTarget(t *testing.T) {
Expand Down Expand Up @@ -760,9 +850,9 @@ func TestAllocatorRemoveTarget(t *testing.T) {

var counts [4]int
for i := 0; i < 100; i++ {
// Exclude store 1 as a removal target. We should see stores 2 and 3
// randomly selected as the removal target.
targetRepl, err := a.RemoveTarget(replicas, stores[0].StoreID)
// Stores 2 and 3 are overfull, so we should see them randomly selected as
// the removal target.
targetRepl, err := a.RemoveTarget(replicas, 0)
if err != nil {
t.Fatal(err)
}
Expand Down
64 changes: 53 additions & 11 deletions pkg/storage/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@ func (rq *replicateQueue) shouldQueue(
if lease, _ := repl.getLease(); lease != nil {
leaseStoreID = lease.Replica.StoreID
}
if rq.allocator.ShouldTransferLease(
zone.Constraints, leaseStoreID, desc.RangeID) {
if log.V(2) {
log.Infof(ctx, "%s lease transfer needed, enqueuing", repl)
}
return true, 0
}
target := rq.allocator.RebalanceTarget(
zone.Constraints,
desc.Replicas,
Expand Down Expand Up @@ -178,19 +185,42 @@ func (rq *replicateQueue) process(
}
case AllocatorRemove:
log.Event(ctx, "removing a replica")
// We require the lease in order to process replicas, so
// repl.store.StoreID() corresponds to the lease-holder's store ID.
removeReplica, err := rq.allocator.RemoveTarget(desc.Replicas, repl.store.StoreID())
if err != nil {
return err
// If the lease holder (our local store) is an overfull store (in terms of
// leases) allow transferring the lease away.
leaseHolderStoreID := repl.store.StoreID()
if rq.allocator.ShouldTransferLease(zone.Constraints, leaseHolderStoreID, desc.RangeID) {
leaseHolderStoreID = 0
}
log.VEventf(ctx, 1, "removing replica %+v due to over-replication", removeReplica)
if err = repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, removeReplica, desc); err != nil {
removeReplica, err := rq.allocator.RemoveTarget(desc.Replicas, leaseHolderStoreID)
if err != nil {
return err
}
// Do not requeue if we removed ourselves.
if removeReplica.StoreID == repl.store.StoreID() {
return nil
// The local replica was selected as the removal target, but that replica
// is the leaseholder, so transfer the lease instead. We don't check that
// the current store has too many leases in this case under the
// assumption that replica balance is a greater concern. Also note that
// AllocatorRemove action takes preference over AllocatorNoop
// (rebalancing) which is where lease transfer would otherwise occur. We
// need to be able to transfer leases in AllocatorRemove in order to get
// out of situations where this store is overfull and yet holds all the
// leases.
target := rq.allocator.TransferLeaseTarget(
zone.Constraints, desc.Replicas, repl.store.StoreID(), desc.RangeID,
false /* checkTransferLeaseSource */)
if target != (roachpb.ReplicaDescriptor{}) {
log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID)
if err := repl.AdminTransferLease(target.StoreID); err != nil {
return errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID)
}
// Do not requeue as we transferred our lease away.
return nil
}
} else {
log.VEventf(ctx, 1, "removing replica %+v due to over-replication", removeReplica)
if err = repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, removeReplica, desc); err != nil {
return err
}
}
case AllocatorRemoveDead:
log.Event(ctx, "removing a dead replica")
Expand All @@ -206,12 +236,24 @@ func (rq *replicateQueue) process(
return err
}
case AllocatorNoop:
log.Event(ctx, "considering a rebalance")
// The Noop case will result if this replica was queued in order to
// rebalance. Attempt to find a rebalancing target.
//
log.Event(ctx, "considering a rebalance")

// We require the lease in order to process replicas, so
// repl.store.StoreID() corresponds to the lease-holder's store ID.
target := rq.allocator.TransferLeaseTarget(
zone.Constraints, desc.Replicas, repl.store.StoreID(), desc.RangeID,
true /* checkTransferLeaseSource */)
if target.StoreID != 0 {
log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID)
if err := repl.AdminTransferLease(target.StoreID); err != nil {
return errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID)
}
// Do not requeue as we transferred our lease away.
return nil
}

rebalanceStore := rq.allocator.RebalanceTarget(
zone.Constraints,
desc.Replicas,
Expand Down
20 changes: 8 additions & 12 deletions pkg/storage/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package storage_test

import (
"math"
"os"
"testing"

Expand Down Expand Up @@ -47,15 +48,8 @@ func TestReplicateQueueRebalance(t *testing.T) {
}
}()

// TODO(peter): Bump this to 10 nodes. Doing so is flaky until we have lease
// rebalancing because store 1 can hold on to too many replicas. Consider:
//
// [15 4 2 3 3 5 5 0 5 5]
//
// Store 1 is holding all of the leases so we can't rebalance away from
// it. Every other store has within the ceil(average-replicas) threshold. So
// there are no rebalancing opportunities for store 8.
tc := testcluster.StartTestCluster(t, 5,
const numNodes = 10
tc := testcluster.StartTestCluster(t, numNodes,
base.TestClusterArgs{ReplicationMode: base.ReplicationAuto},
)
defer tc.Stopper().Stop()
Expand Down Expand Up @@ -94,12 +88,14 @@ func TestReplicateQueueRebalance(t *testing.T) {
return counts
}

const numRanges = 15
const numReplicas = numRanges * 3
const minThreshold = 0.9
minReplicas := int(math.Floor(minThreshold * (numReplicas / numNodes)))
util.SucceedsSoon(t, func() error {
counts := countReplicas()
for _, c := range counts {
// TODO(peter): This is a weak check for rebalancing. When lease
// rebalancing is in place we can make this somewhat more robust.
if c == 0 {
if c < minReplicas {
err := errors.Errorf("not balanced: %d", counts)
log.Info(context.Background(), err)
return err
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/simulation/range.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (r *Range) getAllocateTarget() (roachpb.StoreID, error) {
func (r *Range) getRemoveTarget() (roachpb.StoreID, error) {
// Pass in an invalid store ID since we don't consider range leases as part
// of the simulator.
removeStore, err := r.allocator.RemoveTarget(r.desc.Replicas, roachpb.StoreID(-1))
removeStore, err := r.allocator.RemoveTarget(r.desc.Replicas, 0)
if err != nil {
return 0, err
}
Expand Down

0 comments on commit e109d2e

Please sign in to comment.