Skip to content

Commit

Permalink
storage: add Allocator.TransferLease{Source,Target}
Browse files Browse the repository at this point in the history
Before (`allocsim -n 4 -w 4`):

_elapsed__ops/sec___errors_replicas_________1_________2_________3_________4
    5m0s   1472.5        0     2124   704/687     492/0     522/0     406/0

After:

_elapsed__ops/sec___errors_replicas_________1_________2_________3_________4
    5m0s   1506.7        0     2157   520/183   548/180   547/166   542/180

I do see a little bit of thrashiness with the lease-transfer heuristics,
though it settles down relatively quickly so I'm not sure if it is worth
addressing yet.

Depends on cockroachdb#10420
  • Loading branch information
petermattis committed Nov 14, 2016
1 parent e17cad3 commit 1844549
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 41 deletions.
98 changes: 91 additions & 7 deletions pkg/storage/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,10 @@ func (a *Allocator) AllocateTarget(
}

// RemoveTarget returns a suitable replica to remove from the provided replica
// set. It attempts to consider which of the provided replicas would be the best
// candidate for removal. It also will exclude any replica that belongs to the
// range lease holder's store ID.
// set. It first attempts to randomly select a target from the set of stores
// that have greater than the average number of replicas. Failing that, it
// falls back to selecting a random target from any of the existing
// replicas. It also will exclude any replica that lives on leaseStoreID.
//
// TODO(mrtracy): removeTarget eventually needs to accept the attributes from
// the zone config associated with the provided replicas. This will allow it to
Expand Down Expand Up @@ -333,12 +334,12 @@ func (a Allocator) RemoveTarget(
}

// Retrieve store descriptors for the provided replicas from the StorePool.
var descriptors []roachpb.StoreDescriptor
descriptors := make([]roachpb.StoreDescriptor, 0, len(existing))
for _, exist := range existing {
if exist.StoreID == leaseStoreID {
continue
}
if desc, ok := a.storePool.getStoreDescriptor(exist.StoreID); ok {
if exist.StoreID == leaseStoreID {
continue
}
descriptors = append(descriptors, desc)
}
}
Expand Down Expand Up @@ -483,6 +484,89 @@ func (a Allocator) RebalanceTarget(
return a.improve(sl, existingNodes), nil
}

// TransferLeaseTarget returns a suitable replica to transfer the range lease
// to from the provided list. It excludes the current lease holder replica.
func (a *Allocator) TransferLeaseTarget(
constraints config.Constraints,
existing []roachpb.ReplicaDescriptor,
leaseStoreID roachpb.StoreID,
rangeID roachpb.RangeID,
checkTransferLeaseSource bool,
) roachpb.ReplicaDescriptor {
if !a.options.AllowRebalance {
return roachpb.ReplicaDescriptor{}
}

sl, _, _ := a.storePool.getStoreList(rangeID)
sl = sl.filter(constraints)

source, ok := a.storePool.getStoreDescriptor(leaseStoreID)
if !ok {
return roachpb.ReplicaDescriptor{}
}
if checkTransferLeaseSource && !shouldTransferLease(sl, source) {
return roachpb.ReplicaDescriptor{}
}

candidates := make([]roachpb.ReplicaDescriptor, 0, len(existing)-1)
for _, repl := range existing {
if leaseStoreID == repl.StoreID {
continue
}
storeDesc, ok := a.storePool.getStoreDescriptor(repl.StoreID)
if !ok {
continue
}
if float64(storeDesc.Capacity.LeaseCount) < sl.candidateLeases.mean-0.5 {
candidates = append(candidates, repl)
}
}
if len(candidates) == 0 {
return roachpb.ReplicaDescriptor{}
}
a.randGen.Lock()
defer a.randGen.Unlock()
return candidates[a.randGen.Intn(len(candidates))]
}

// ShouldTransferLease returns true if the specified store is overfull in terms
// of leases with respect to the other stores matching the specified
// attributes.
func (a *Allocator) ShouldTransferLease(
constraints config.Constraints, leaseStoreID roachpb.StoreID, rangeID roachpb.RangeID,
) bool {
if !a.options.AllowRebalance {
return false
}

source, ok := a.storePool.getStoreDescriptor(leaseStoreID)
if !ok {
return false
}
sl, _, _ := a.storePool.getStoreList(rangeID)
sl = sl.filter(constraints)
if log.V(3) {
log.Infof(context.TODO(), "transfer-lease-source (lease-holder=%d):\n%s", leaseStoreID, sl)
}
return shouldTransferLease(sl, source)
}

var enableLeaseRebalancing = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_LEASE_REBALANCING", true)

func shouldTransferLease(sl StoreList, source roachpb.StoreDescriptor) bool {
if !enableLeaseRebalancing {
return false
}
// Allow lease transfer if we're above the overfull threshold, which is
// mean*(1+rebalanceThreshold).
overfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 + rebalanceThreshold)))
minOverfullThreshold := int32(math.Ceil(sl.candidateLeases.mean + 5))
if overfullLeaseThreshold < minOverfullThreshold {
overfullLeaseThreshold = minOverfullThreshold
}
return source.Capacity.LeaseCount > overfullLeaseThreshold
}

// selectGood attempts to select a store from the supplied store list that it
// considers to be 'Good' relative to the other stores in the list. Any nodes
// in the supplied 'exclude' list will be disqualified from selection. Returns
Expand Down
98 changes: 97 additions & 1 deletion pkg/storage/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,102 @@ func TestAllocatorRebalanceByCount(t *testing.T) {
})
}

func TestAllocatorTransferLeaseTarget(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, g, _, a, _ := createTestAllocator(
/* deterministic */ true,
/* useRuleSolver */ false,
)
defer stopper.Stop()

// 3 stores where the lease count for each store is equal to 10x the store
// ID.
var stores []*roachpb.StoreDescriptor
for i := 1; i <= 3; i++ {
stores = append(stores, &roachpb.StoreDescriptor{
StoreID: roachpb.StoreID(i),
Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)},
Capacity: roachpb.StoreCapacity{LeaseCount: int32(10 * i)},
})
}
sg := gossiputil.NewStoreGossiper(g)
sg.GossipStores(stores, t)

existing := []roachpb.ReplicaDescriptor{
{StoreID: 1},
{StoreID: 2},
{StoreID: 3},
}

// TODO(peter): Add test cases for non-empty constraints.
testCases := []struct {
existing []roachpb.ReplicaDescriptor
leaseholder roachpb.StoreID
check bool
expected roachpb.StoreID
}{
// No existing lease holder, nothing to do.
{existing: existing, leaseholder: 0, check: true, expected: 0},
// Store 1 is not a lease transfer source.
{existing: existing, leaseholder: 1, check: true, expected: 0},
{existing: existing, leaseholder: 1, check: false, expected: 0},
// Store 2 is not a lease transfer source.
{existing: existing, leaseholder: 2, check: true, expected: 0},
{existing: existing, leaseholder: 2, check: false, expected: 1},
// Store 3 is a lease transfer source.
{existing: existing, leaseholder: 3, check: true, expected: 1},
{existing: existing, leaseholder: 3, check: false, expected: 1},
}
for i, c := range testCases {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
target := a.TransferLeaseTarget(config.Constraints{},
c.existing, c.leaseholder, 0, c.check)
if c.expected != target.StoreID {
t.Fatalf("expected %d, but found %d", c.expected, target.StoreID)
}
})
}
}

func TestAllocatorShouldTransferLease(t *testing.T) {
defer leaktest.AfterTest(t)()
stopper, g, _, a, _ := createTestAllocator(
/* deterministic */ true,
/* useRuleSolver */ false,
)
defer stopper.Stop()

// 3 stores where the lease count for each store is equal to 10x the store
// ID.
var stores []*roachpb.StoreDescriptor
for i := 1; i <= 3; i++ {
stores = append(stores, &roachpb.StoreDescriptor{
StoreID: roachpb.StoreID(i),
Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)},
Capacity: roachpb.StoreCapacity{LeaseCount: int32(10 * i)},
})
}
sg := gossiputil.NewStoreGossiper(g)
sg.GossipStores(stores, t)

testCases := []struct {
leaseholder roachpb.StoreID
expected bool
}{
{leaseholder: 1, expected: false},
{leaseholder: 2, expected: false},
{leaseholder: 3, expected: true},
}
for i, c := range testCases {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
result := a.ShouldTransferLease(config.Constraints{}, c.leaseholder, 0)
if c.expected != result {
t.Fatalf("expected %v, but found %v", c.expected, result)
}
})
}
}

// TestAllocatorRemoveTarget verifies that the replica chosen by RemoveTarget is
// the one with the lowest capacity.
func TestAllocatorRemoveTarget(t *testing.T) {
Expand Down Expand Up @@ -1021,7 +1117,7 @@ func TestAllocatorRemoveTarget(t *testing.T) {
for i := 0; i < 100; i++ {
// Exclude store 1 as a removal target. We should see stores 2 and 3
// randomly selected as the removal target.
targetRepl, err := a.RemoveTarget(config.Constraints{}, replicas, stores[0].StoreID)
targetRepl, err := a.RemoveTarget(config.Constraints{}, replicas, 0)
if err != nil {
t.Fatal(err)
}
Expand Down
75 changes: 61 additions & 14 deletions pkg/storage/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,23 @@ func (rq *replicateQueue) shouldQueue(
}
return true, priority
}
// See if there is a rebalancing opportunity present.
leaseStoreID := repl.store.StoreID()
if lease, _ := repl.getLease(); lease != nil {

// If we hold the lease, check to see if we should transfer it.
var leaseStoreID roachpb.StoreID
if lease, _ := repl.getLease(); lease != nil && lease.Covers(now) {
leaseStoreID = lease.Replica.StoreID
if rq.allocator.ShouldTransferLease(
zone.Constraints, leaseStoreID, desc.RangeID) {
if log.V(2) {
log.Infof(ctx, "%s lease transfer needed, enqueuing", repl)
}
return true, 0
}
}

// Check for a rebalancing opportunity. Note that leaseStoreID will be 0 if
// the range doesn't currently have a lease which will allow the current
// replica to be considered a rebalancing source.
target, err := rq.allocator.RebalanceTarget(
zone.Constraints,
desc.Replicas,
Expand Down Expand Up @@ -182,23 +194,46 @@ func (rq *replicateQueue) process(
}
case AllocatorRemove:
log.Event(ctx, "removing a replica")
// We require the lease in order to process replicas, so
// repl.store.StoreID() corresponds to the lease-holder's store ID.
// If the lease holder (our local store) is an overfull store (in terms of
// leases) allow transferring the lease away.
leaseHolderStoreID := repl.store.StoreID()
if rq.allocator.ShouldTransferLease(zone.Constraints, leaseHolderStoreID, desc.RangeID) {
leaseHolderStoreID = 0
}
removeReplica, err := rq.allocator.RemoveTarget(
zone.Constraints,
desc.Replicas,
repl.store.StoreID(),
leaseHolderStoreID,
)
if err != nil {
return err
}
log.VEventf(ctx, 1, "removing replica %+v due to over-replication", removeReplica)
if err = repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, removeReplica, desc); err != nil {
return err
}
// Do not requeue if we removed ourselves.
if removeReplica.StoreID == repl.store.StoreID() {
return nil
// The local replica was selected as the removal target, but that replica
// is the leaseholder, so transfer the lease instead. We don't check that
// the current store has too many leases in this case under the
// assumption that replica balance is a greater concern. Also note that
// AllocatorRemove action takes preference over AllocatorNoop
// (rebalancing) which is where lease transfer would otherwise occur. We
// need to be able to transfer leases in AllocatorRemove in order to get
// out of situations where this store is overfull and yet holds all the
// leases.
target := rq.allocator.TransferLeaseTarget(
zone.Constraints, desc.Replicas, repl.store.StoreID(), desc.RangeID,
false /* checkTransferLeaseSource */)
if target != (roachpb.ReplicaDescriptor{}) {
log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID)
if err := repl.AdminTransferLease(target.StoreID); err != nil {
return errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID)
}
// Do not requeue as we transferred our lease away.
return nil
}
} else {
log.VEventf(ctx, 1, "removing replica %+v due to over-replication", removeReplica)
if err = repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, removeReplica, desc); err != nil {
return err
}
}
case AllocatorRemoveDead:
log.Event(ctx, "removing a dead replica")
Expand All @@ -214,12 +249,24 @@ func (rq *replicateQueue) process(
return err
}
case AllocatorNoop:
log.Event(ctx, "considering a rebalance")
// The Noop case will result if this replica was queued in order to
// rebalance. Attempt to find a rebalancing target.
//
log.Event(ctx, "considering a rebalance")

// We require the lease in order to process replicas, so
// repl.store.StoreID() corresponds to the lease-holder's store ID.
target := rq.allocator.TransferLeaseTarget(
zone.Constraints, desc.Replicas, repl.store.StoreID(), desc.RangeID,
true /* checkTransferLeaseSource */)
if target.StoreID != 0 {
log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID)
if err := repl.AdminTransferLease(target.StoreID); err != nil {
return errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID)
}
// Do not requeue as we transferred our lease away.
return nil
}

rebalanceStore, err := rq.allocator.RebalanceTarget(
zone.Constraints,
desc.Replicas,
Expand Down
Loading

0 comments on commit 1844549

Please sign in to comment.