From 853b3c3e43335ed4f56c383cfa719e3957694e28 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Thu, 3 Nov 2016 21:25:27 -0400 Subject: [PATCH] storage: add Allocator.TransferLease{Source,Target} Before (`allocsim -n 4 -w 4`): _elapsed__ops/sec___errors_replicas_________1_________2_________3_________4 5m0s 1448.8 0 1686 559/554 383/0 331/1 413/2 After: _elapsed__ops/sec___errors_replicas_________1_________2_________3_________4 5m0s 1413.7 0 1658 406/132 422/141 410/136 420/138 I do see a little bit of thrashiness with the lease-transfer heuristics, though it settles down relatively quickly so I'm not sure if it is worth addressing yet. Depends on #10420 --- pkg/storage/allocator.go | 90 +++++++++++++++++++++++-- pkg/storage/allocator_test.go | 113 ++++++++++++++++++++++++++------ pkg/storage/replicate_queue.go | 63 ++++++++++++++---- pkg/storage/simulation/range.go | 2 +- 4 files changed, 227 insertions(+), 41 deletions(-) diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index 43b07ee05bbb..bf7fa9eb1d89 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -20,6 +20,7 @@ package storage import ( "fmt" + "math" "math/rand" "golang.org/x/net/context" @@ -235,16 +236,16 @@ func (a *Allocator) AllocateTarget( } // RemoveTarget returns a suitable replica to remove from the provided replica -// set. It attempts to consider which of the provided replicas would be the best -// candidate for removal. It also will exclude any replica that belongs to the -// range lease holder's store ID. +// set. It first attempts to randomly select a target from the set of stores +// that have greater than the average number of replicas. Failing that, it +// falls back to selecting a random target from any of the existing replicas. // // TODO(mrtracy): removeTarget eventually needs to accept the attributes from // the zone config associated with the provided replicas. This will allow it to // make correct decisions in the case of ranges with heterogeneous replica // requirements (i.e. multiple data centers). func (a Allocator) RemoveTarget( - existing []roachpb.ReplicaDescriptor, leaseStoreID roachpb.StoreID, + existing []roachpb.ReplicaDescriptor, ) (roachpb.ReplicaDescriptor, error) { if len(existing) == 0 { return roachpb.ReplicaDescriptor{}, errors.Errorf("must supply at least one replica to allocator.RemoveTarget()") @@ -253,9 +254,6 @@ func (a Allocator) RemoveTarget( // Retrieve store descriptors for the provided replicas from the StorePool. var descriptors []roachpb.StoreDescriptor for _, exist := range existing { - if exist.StoreID == leaseStoreID { - continue - } if desc, ok := a.storePool.getStoreDescriptor(exist.StoreID); ok { descriptors = append(descriptors, desc) } @@ -330,6 +328,84 @@ func (a Allocator) RebalanceTarget( return a.improve(sl, existingNodes) } +// TransferLeaseTarget returns a suitable replica to transfer the range lease +// to from the provided list. It excludes the current lease holder replica. +func (a *Allocator) TransferLeaseTarget( + constraints config.Constraints, + existing []roachpb.ReplicaDescriptor, + leaseStoreID roachpb.StoreID, + rangeID roachpb.RangeID, + checkTransferLeaseSource bool, +) roachpb.ReplicaDescriptor { + if !a.options.AllowRebalance { + return roachpb.ReplicaDescriptor{} + } + + sl, _, _ := a.storePool.getStoreList(rangeID) + sl = sl.filter(constraints) + + source, ok := a.storePool.getStoreDescriptor(leaseStoreID) + if !ok { + return roachpb.ReplicaDescriptor{} + } + if checkTransferLeaseSource && !shouldTransferLease(sl, source) { + return roachpb.ReplicaDescriptor{} + } + + candidates := make([]roachpb.ReplicaDescriptor, 0, len(existing)-1) + for _, repl := range existing { + if leaseStoreID == repl.StoreID { + continue + } + storeDesc, ok := a.storePool.getStoreDescriptor(repl.StoreID) + if !ok { + continue + } + if float64(storeDesc.Capacity.LeaseCount) < sl.candidateLeases.mean-0.5 { + candidates = append(candidates, repl) + } + } + if len(candidates) == 0 { + return roachpb.ReplicaDescriptor{} + } + a.randGen.Lock() + defer a.randGen.Unlock() + return candidates[a.randGen.Intn(len(candidates))] +} + +// ShouldTransferLease returns true if the specified store is overfull in terms +// of leases with respect to the other stores matching the specified +// attributes. +func (a *Allocator) ShouldTransferLease( + constraints config.Constraints, leaseStoreID roachpb.StoreID, rangeID roachpb.RangeID, +) bool { + if !a.options.AllowRebalance { + return false + } + + source, ok := a.storePool.getStoreDescriptor(leaseStoreID) + if !ok { + return false + } + sl, _, _ := a.storePool.getStoreList(rangeID) + sl = sl.filter(constraints) + if log.V(3) { + log.Infof(context.TODO(), "transfer-lease-source (lease-holder=%d):\n%s", leaseStoreID, sl) + } + return shouldTransferLease(sl, source) +} + +func shouldTransferLease(sl StoreList, candidate roachpb.StoreDescriptor) bool { + // Allow lease transfer if we're above the overfull threshold, which is + // mean*(1+rebalanceThreshold). + overfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 + RebalanceThreshold))) + minOverfullThreshold := int32(math.Ceil(sl.candidateLeases.mean + 5)) + if overfullLeaseThreshold < minOverfullThreshold { + overfullLeaseThreshold = minOverfullThreshold + } + return candidate.Capacity.LeaseCount > overfullLeaseThreshold +} + // selectGood attempts to select a store from the supplied store list that it // considers to be 'Good' relative to the other stores in the list. Any nodes // in the supplied 'exclude' list will be disqualified from selection. Returns diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index 98316c6afaa0..997bd0d0e5c2 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -682,6 +682,95 @@ func TestAllocatorRebalanceByCount(t *testing.T) { } } +func TestAllocatorTransferLeaseTarget(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper, g, _, a, _ := createTestAllocator(true) + defer stopper.Stop() + + // 3 stores where the lease count for each store is equal to 10x the store + // ID. + var stores []*roachpb.StoreDescriptor + for i := 1; i <= 3; i++ { + stores = append(stores, &roachpb.StoreDescriptor{ + StoreID: roachpb.StoreID(i), + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)}, + Capacity: roachpb.StoreCapacity{LeaseCount: int32(10 * i)}, + }) + } + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(stores, t) + + existing := []roachpb.ReplicaDescriptor{ + {StoreID: 1}, + {StoreID: 2}, + {StoreID: 3}, + } + + testCases := []struct { + existing []roachpb.ReplicaDescriptor + leaseholder roachpb.StoreID + check bool + expected roachpb.StoreID + }{ + // No existing lease holder, nothing to do. + {existing: existing, leaseholder: 0, check: true, expected: 0}, + // Store 1 is not a lease transfer source. + {existing: existing, leaseholder: 1, check: true, expected: 0}, + {existing: existing, leaseholder: 1, check: false, expected: 0}, + // Store 2 is not a lease transfer source. + {existing: existing, leaseholder: 2, check: true, expected: 0}, + {existing: existing, leaseholder: 2, check: false, expected: 1}, + // Store 3 is a lease transfer source. + {existing: existing, leaseholder: 3, check: true, expected: 1}, + {existing: existing, leaseholder: 3, check: false, expected: 1}, + } + for i, c := range testCases { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + target := a.TransferLeaseTarget(config.Constraints{}, + c.existing, c.leaseholder, 0, c.check) + if c.expected != target.StoreID { + t.Fatalf("%d: expected %d, but found %d", i, c.expected, target.StoreID) + } + }) + } +} + +func TestAllocatorShouldTransferLease(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper, g, _, a, _ := createTestAllocator(true) + defer stopper.Stop() + + // 3 stores where the lease count for each store is equal to 10x the store + // ID. + var stores []*roachpb.StoreDescriptor + for i := 1; i <= 3; i++ { + stores = append(stores, &roachpb.StoreDescriptor{ + StoreID: roachpb.StoreID(i), + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)}, + Capacity: roachpb.StoreCapacity{LeaseCount: int32(10 * i)}, + }) + } + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(stores, t) + + testCases := []struct { + leaseholder roachpb.StoreID + expected bool + }{ + {leaseholder: 1, expected: false}, + {leaseholder: 2, expected: false}, + {leaseholder: 3, expected: true}, + } + for i, c := range testCases { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + result := a.ShouldTransferLease(config.Constraints{}, c.leaseholder, 0) + if c.expected != result { + t.Fatalf("%d: expected %v, but found %v", i, c.expected, result) + } + }) + } +} + // TestAllocatorRemoveTarget verifies that the replica chosen by RemoveTarget is // the one with the lowest capacity. func TestAllocatorRemoveTarget(t *testing.T) { @@ -740,29 +829,11 @@ func TestAllocatorRemoveTarget(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(stores, t) - // Exclude store 2 as a removal target so that only store 3 is a candidate. - targetRepl, err := a.RemoveTarget(replicas, stores[1].StoreID) - if err != nil { - t.Fatal(err) - } - if a, e := targetRepl, replicas[2]; a != e { - t.Fatalf("RemoveTarget did not select expected replica; expected %v, got %v", e, a) - } - - // Now exclude store 3 so that only store 2 is a candidate. - targetRepl, err = a.RemoveTarget(replicas, stores[2].StoreID) - if err != nil { - t.Fatal(err) - } - if a, e := targetRepl, replicas[1]; a != e { - t.Fatalf("RemoveTarget did not select expected replica; expected %v, got %v", e, a) - } - var counts [4]int for i := 0; i < 100; i++ { - // Exclude store 1 as a removal target. We should see stores 2 and 3 - // randomly selected as the removal target. - targetRepl, err := a.RemoveTarget(replicas, stores[0].StoreID) + // Stores 2 and 3 are overfull, so we should see them randomly selected as + // the removal target. + targetRepl, err := a.RemoveTarget(replicas) if err != nil { t.Fatal(err) } diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index 0d1792c806b8..93ac5d85bbf5 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -119,6 +119,13 @@ func (rq *replicateQueue) shouldQueue( if lease, _ := repl.getLease(); lease != nil { leaseStoreID = lease.Replica.StoreID } + if rq.allocator.ShouldTransferLease( + zone.Constraints, leaseStoreID, desc.RangeID) { + if log.V(2) { + log.Infof(ctx, "%s lease transfer needed, enqueuing", repl) + } + return true, 0 + } target := rq.allocator.RebalanceTarget( zone.Constraints, desc.Replicas, @@ -178,19 +185,39 @@ func (rq *replicateQueue) process( } case AllocatorRemove: log.Event(ctx, "removing a replica") - // We require the lease in order to process replicas, so - // repl.store.StoreID() corresponds to the lease-holder's store ID. - removeReplica, err := rq.allocator.RemoveTarget(desc.Replicas, repl.store.StoreID()) + removeReplica, err := rq.allocator.RemoveTarget(desc.Replicas) if err != nil { return err } - log.VEventf(ctx, 1, "removing replica %+v due to over-replication", removeReplica) - if err = repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, removeReplica, desc); err != nil { - return err - } - // Do not requeue if we removed ourselves. - if removeReplica.StoreID == repl.store.StoreID() { - return nil + // We require the lease in order to process replicas, so + // repl.store.StoreID() corresponds to the lease-holder's store ID. + leaseholderStoreID := repl.store.StoreID() + if removeReplica.StoreID == leaseholderStoreID { + // The local replica was selected as the removal target, but that replica + // is the leaseholder, so transfer the lease instead. We don't check that + // the current store has too many leases in this case under the + // assumption that replica balance is a greater concern. Also note that + // AllocatorRemove action takes preference over AllocatorNoop + // (rebalancing) which is where lease transfer would otherwise occur. We + // need to be able to transfer leases in AllocatorRemove in order to get + // out of situations where this store is overfull and yet holds all the + // leases. + target := rq.allocator.TransferLeaseTarget( + zone.Constraints, desc.Replicas, repl.store.StoreID(), desc.RangeID, + false /* check-transfer-lease-source */) + if target != (roachpb.ReplicaDescriptor{}) { + log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID) + if err := repl.AdminTransferLease(target.StoreID); err != nil { + return errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID) + } + // Do not requeue as we transferred our lease away. + return nil + } + } else { + log.VEventf(ctx, 1, "removing replica %+v due to over-replication", removeReplica) + if err = repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, removeReplica, desc); err != nil { + return err + } } case AllocatorRemoveDead: log.Event(ctx, "removing a dead replica") @@ -206,12 +233,24 @@ func (rq *replicateQueue) process( return err } case AllocatorNoop: - log.Event(ctx, "considering a rebalance") // The Noop case will result if this replica was queued in order to // rebalance. Attempt to find a rebalancing target. - // + log.Event(ctx, "considering a rebalance") + // We require the lease in order to process replicas, so // repl.store.StoreID() corresponds to the lease-holder's store ID. + target := rq.allocator.TransferLeaseTarget( + zone.Constraints, desc.Replicas, repl.store.StoreID(), desc.RangeID, + true /* check-transfer-lease-source */) + if target.StoreID != 0 { + log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID) + if err := repl.AdminTransferLease(target.StoreID); err != nil { + return errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID) + } + // Do not requeue as we transferred our lease away. + return nil + } + rebalanceStore := rq.allocator.RebalanceTarget( zone.Constraints, desc.Replicas, diff --git a/pkg/storage/simulation/range.go b/pkg/storage/simulation/range.go index 94be037c5948..fac939316a61 100644 --- a/pkg/storage/simulation/range.go +++ b/pkg/storage/simulation/range.go @@ -131,7 +131,7 @@ func (r *Range) getAllocateTarget() (roachpb.StoreID, error) { func (r *Range) getRemoveTarget() (roachpb.StoreID, error) { // Pass in an invalid store ID since we don't consider range leases as part // of the simulator. - removeStore, err := r.allocator.RemoveTarget(r.desc.Replicas, roachpb.StoreID(-1)) + removeStore, err := r.allocator.RemoveTarget(r.desc.Replicas) if err != nil { return 0, err }