diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index 43b07ee05bbb..176e9cde1e68 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -20,6 +20,7 @@ package storage import ( "fmt" + "math" "math/rand" "golang.org/x/net/context" @@ -235,9 +236,10 @@ func (a *Allocator) AllocateTarget( } // RemoveTarget returns a suitable replica to remove from the provided replica -// set. It attempts to consider which of the provided replicas would be the best -// candidate for removal. It also will exclude any replica that belongs to the -// range lease holder's store ID. +// set. It first attempts to randomly select a target from the set of stores +// that have greater than the average number of replicas. Failing that, it +// falls back to selecting a random target from any of the existing +// replicas. It also will exclude any replica that lives on leaseStoreID. // // TODO(mrtracy): removeTarget eventually needs to accept the attributes from // the zone config associated with the provided replicas. This will allow it to @@ -251,12 +253,12 @@ func (a Allocator) RemoveTarget( } // Retrieve store descriptors for the provided replicas from the StorePool. - var descriptors []roachpb.StoreDescriptor + descriptors := make([]roachpb.StoreDescriptor, 0, len(existing)) for _, exist := range existing { - if exist.StoreID == leaseStoreID { - continue - } if desc, ok := a.storePool.getStoreDescriptor(exist.StoreID); ok { + if exist.StoreID == leaseStoreID { + continue + } descriptors = append(descriptors, desc) } } @@ -330,6 +332,84 @@ func (a Allocator) RebalanceTarget( return a.improve(sl, existingNodes) } +// TransferLeaseTarget returns a suitable replica to transfer the range lease +// to from the provided list. It excludes the current lease holder replica. +func (a *Allocator) TransferLeaseTarget( + constraints config.Constraints, + existing []roachpb.ReplicaDescriptor, + leaseStoreID roachpb.StoreID, + rangeID roachpb.RangeID, + checkTransferLeaseSource bool, +) roachpb.ReplicaDescriptor { + if !a.options.AllowRebalance { + return roachpb.ReplicaDescriptor{} + } + + sl, _, _ := a.storePool.getStoreList(rangeID) + sl = sl.filter(constraints) + + source, ok := a.storePool.getStoreDescriptor(leaseStoreID) + if !ok { + return roachpb.ReplicaDescriptor{} + } + if checkTransferLeaseSource && !shouldTransferLease(sl, source) { + return roachpb.ReplicaDescriptor{} + } + + candidates := make([]roachpb.ReplicaDescriptor, 0, len(existing)-1) + for _, repl := range existing { + if leaseStoreID == repl.StoreID { + continue + } + storeDesc, ok := a.storePool.getStoreDescriptor(repl.StoreID) + if !ok { + continue + } + if float64(storeDesc.Capacity.LeaseCount) < sl.candidateLeases.mean-0.5 { + candidates = append(candidates, repl) + } + } + if len(candidates) == 0 { + return roachpb.ReplicaDescriptor{} + } + a.randGen.Lock() + defer a.randGen.Unlock() + return candidates[a.randGen.Intn(len(candidates))] +} + +// ShouldTransferLease returns true if the specified store is overfull in terms +// of leases with respect to the other stores matching the specified +// attributes. +func (a *Allocator) ShouldTransferLease( + constraints config.Constraints, leaseStoreID roachpb.StoreID, rangeID roachpb.RangeID, +) bool { + if !a.options.AllowRebalance { + return false + } + + source, ok := a.storePool.getStoreDescriptor(leaseStoreID) + if !ok { + return false + } + sl, _, _ := a.storePool.getStoreList(rangeID) + sl = sl.filter(constraints) + if log.V(3) { + log.Infof(context.TODO(), "transfer-lease-source (lease-holder=%d):\n%s", leaseStoreID, sl) + } + return shouldTransferLease(sl, source) +} + +func shouldTransferLease(sl StoreList, candidate roachpb.StoreDescriptor) bool { + // Allow lease transfer if we're above the overfull threshold, which is + // mean*(1+rebalanceThreshold). + overfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 + rebalanceThreshold))) + minOverfullThreshold := int32(math.Ceil(sl.candidateLeases.mean + 5)) + if overfullLeaseThreshold < minOverfullThreshold { + overfullLeaseThreshold = minOverfullThreshold + } + return candidate.Capacity.LeaseCount > overfullLeaseThreshold +} + // selectGood attempts to select a store from the supplied store list that it // considers to be 'Good' relative to the other stores in the list. Any nodes // in the supplied 'exclude' list will be disqualified from selection. Returns diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index 82ba633e26e9..ff3ace2176a0 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -682,6 +682,96 @@ func TestAllocatorRebalanceByCount(t *testing.T) { } } +func TestAllocatorTransferLeaseTarget(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper, g, _, a, _ := createTestAllocator(true) + defer stopper.Stop() + + // 3 stores where the lease count for each store is equal to 10x the store + // ID. + var stores []*roachpb.StoreDescriptor + for i := 1; i <= 3; i++ { + stores = append(stores, &roachpb.StoreDescriptor{ + StoreID: roachpb.StoreID(i), + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)}, + Capacity: roachpb.StoreCapacity{LeaseCount: int32(10 * i)}, + }) + } + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(stores, t) + + existing := []roachpb.ReplicaDescriptor{ + {StoreID: 1}, + {StoreID: 2}, + {StoreID: 3}, + } + + // TODO(peter): Add test cases for non-empty constraints. + testCases := []struct { + existing []roachpb.ReplicaDescriptor + leaseholder roachpb.StoreID + check bool + expected roachpb.StoreID + }{ + // No existing lease holder, nothing to do. + {existing: existing, leaseholder: 0, check: true, expected: 0}, + // Store 1 is not a lease transfer source. + {existing: existing, leaseholder: 1, check: true, expected: 0}, + {existing: existing, leaseholder: 1, check: false, expected: 0}, + // Store 2 is not a lease transfer source. + {existing: existing, leaseholder: 2, check: true, expected: 0}, + {existing: existing, leaseholder: 2, check: false, expected: 1}, + // Store 3 is a lease transfer source. + {existing: existing, leaseholder: 3, check: true, expected: 1}, + {existing: existing, leaseholder: 3, check: false, expected: 1}, + } + for i, c := range testCases { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + target := a.TransferLeaseTarget(config.Constraints{}, + c.existing, c.leaseholder, 0, c.check) + if c.expected != target.StoreID { + t.Fatalf("expected %d, but found %d", c.expected, target.StoreID) + } + }) + } +} + +func TestAllocatorShouldTransferLease(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper, g, _, a, _ := createTestAllocator(true) + defer stopper.Stop() + + // 3 stores where the lease count for each store is equal to 10x the store + // ID. + var stores []*roachpb.StoreDescriptor + for i := 1; i <= 3; i++ { + stores = append(stores, &roachpb.StoreDescriptor{ + StoreID: roachpb.StoreID(i), + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)}, + Capacity: roachpb.StoreCapacity{LeaseCount: int32(10 * i)}, + }) + } + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(stores, t) + + testCases := []struct { + leaseholder roachpb.StoreID + expected bool + }{ + {leaseholder: 1, expected: false}, + {leaseholder: 2, expected: false}, + {leaseholder: 3, expected: true}, + } + for i, c := range testCases { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + result := a.ShouldTransferLease(config.Constraints{}, c.leaseholder, 0) + if c.expected != result { + t.Fatalf("expected %v, but found %v", c.expected, result) + } + }) + } +} + // TestAllocatorRemoveTarget verifies that the replica chosen by RemoveTarget is // the one with the lowest capacity. func TestAllocatorRemoveTarget(t *testing.T) { @@ -760,9 +850,9 @@ func TestAllocatorRemoveTarget(t *testing.T) { var counts [4]int for i := 0; i < 100; i++ { - // Exclude store 1 as a removal target. We should see stores 2 and 3 - // randomly selected as the removal target. - targetRepl, err := a.RemoveTarget(replicas, stores[0].StoreID) + // Stores 2 and 3 are overfull, so we should see them randomly selected as + // the removal target. + targetRepl, err := a.RemoveTarget(replicas, 0) if err != nil { t.Fatal(err) } diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index 0d1792c806b8..d5a8ffe7a1f5 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -119,6 +119,13 @@ func (rq *replicateQueue) shouldQueue( if lease, _ := repl.getLease(); lease != nil { leaseStoreID = lease.Replica.StoreID } + if rq.allocator.ShouldTransferLease( + zone.Constraints, leaseStoreID, desc.RangeID) { + if log.V(2) { + log.Infof(ctx, "%s lease transfer needed, enqueuing", repl) + } + return true, 0 + } target := rq.allocator.RebalanceTarget( zone.Constraints, desc.Replicas, @@ -178,19 +185,42 @@ func (rq *replicateQueue) process( } case AllocatorRemove: log.Event(ctx, "removing a replica") - // We require the lease in order to process replicas, so - // repl.store.StoreID() corresponds to the lease-holder's store ID. - removeReplica, err := rq.allocator.RemoveTarget(desc.Replicas, repl.store.StoreID()) - if err != nil { - return err + // If the lease holder (our local store) is an overfull store (in terms of + // leases) allow transferring the lease away. + leaseHolderStoreID := repl.store.StoreID() + if rq.allocator.ShouldTransferLease(zone.Constraints, leaseHolderStoreID, desc.RangeID) { + leaseHolderStoreID = 0 } - log.VEventf(ctx, 1, "removing replica %+v due to over-replication", removeReplica) - if err = repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, removeReplica, desc); err != nil { + removeReplica, err := rq.allocator.RemoveTarget(desc.Replicas, leaseHolderStoreID) + if err != nil { return err } - // Do not requeue if we removed ourselves. if removeReplica.StoreID == repl.store.StoreID() { - return nil + // The local replica was selected as the removal target, but that replica + // is the leaseholder, so transfer the lease instead. We don't check that + // the current store has too many leases in this case under the + // assumption that replica balance is a greater concern. Also note that + // AllocatorRemove action takes preference over AllocatorNoop + // (rebalancing) which is where lease transfer would otherwise occur. We + // need to be able to transfer leases in AllocatorRemove in order to get + // out of situations where this store is overfull and yet holds all the + // leases. + target := rq.allocator.TransferLeaseTarget( + zone.Constraints, desc.Replicas, repl.store.StoreID(), desc.RangeID, + false /* checkTransferLeaseSource */) + if target != (roachpb.ReplicaDescriptor{}) { + log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID) + if err := repl.AdminTransferLease(target.StoreID); err != nil { + return errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID) + } + // Do not requeue as we transferred our lease away. + return nil + } + } else { + log.VEventf(ctx, 1, "removing replica %+v due to over-replication", removeReplica) + if err = repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, removeReplica, desc); err != nil { + return err + } } case AllocatorRemoveDead: log.Event(ctx, "removing a dead replica") @@ -206,12 +236,24 @@ func (rq *replicateQueue) process( return err } case AllocatorNoop: - log.Event(ctx, "considering a rebalance") // The Noop case will result if this replica was queued in order to // rebalance. Attempt to find a rebalancing target. - // + log.Event(ctx, "considering a rebalance") + // We require the lease in order to process replicas, so // repl.store.StoreID() corresponds to the lease-holder's store ID. + target := rq.allocator.TransferLeaseTarget( + zone.Constraints, desc.Replicas, repl.store.StoreID(), desc.RangeID, + true /* checkTransferLeaseSource */) + if target.StoreID != 0 { + log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID) + if err := repl.AdminTransferLease(target.StoreID); err != nil { + return errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID) + } + // Do not requeue as we transferred our lease away. + return nil + } + rebalanceStore := rq.allocator.RebalanceTarget( zone.Constraints, desc.Replicas, diff --git a/pkg/storage/replicate_queue_test.go b/pkg/storage/replicate_queue_test.go index e82b365b5f41..3fe2a1779fb1 100644 --- a/pkg/storage/replicate_queue_test.go +++ b/pkg/storage/replicate_queue_test.go @@ -17,6 +17,7 @@ package storage_test import ( + "math" "os" "testing" @@ -47,15 +48,8 @@ func TestReplicateQueueRebalance(t *testing.T) { } }() - // TODO(peter): Bump this to 10 nodes. Doing so is flaky until we have lease - // rebalancing because store 1 can hold on to too many replicas. Consider: - // - // [15 4 2 3 3 5 5 0 5 5] - // - // Store 1 is holding all of the leases so we can't rebalance away from - // it. Every other store has within the ceil(average-replicas) threshold. So - // there are no rebalancing opportunities for store 8. - tc := testcluster.StartTestCluster(t, 5, + const numNodes = 10 + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ReplicationMode: base.ReplicationAuto}, ) defer tc.Stopper().Stop() @@ -94,12 +88,14 @@ func TestReplicateQueueRebalance(t *testing.T) { return counts } + const numRanges = 15 + const numReplicas = numRanges * 3 + const minThreshold = 0.9 + minReplicas := int(math.Floor(minThreshold * (numReplicas / numNodes))) util.SucceedsSoon(t, func() error { counts := countReplicas() for _, c := range counts { - // TODO(peter): This is a weak check for rebalancing. When lease - // rebalancing is in place we can make this somewhat more robust. - if c == 0 { + if c < minReplicas { err := errors.Errorf("not balanced: %d", counts) log.Info(context.Background(), err) return err diff --git a/pkg/storage/simulation/range.go b/pkg/storage/simulation/range.go index 94be037c5948..0e79162429af 100644 --- a/pkg/storage/simulation/range.go +++ b/pkg/storage/simulation/range.go @@ -131,7 +131,7 @@ func (r *Range) getAllocateTarget() (roachpb.StoreID, error) { func (r *Range) getRemoveTarget() (roachpb.StoreID, error) { // Pass in an invalid store ID since we don't consider range leases as part // of the simulator. - removeStore, err := r.allocator.RemoveTarget(r.desc.Replicas, roachpb.StoreID(-1)) + removeStore, err := r.allocator.RemoveTarget(r.desc.Replicas, 0) if err != nil { return 0, err }