From 18445497be8a0736746ba4d93c5c3df4a9eeb701 Mon Sep 17 00:00:00 2001 From: Peter Mattis Date: Thu, 3 Nov 2016 21:25:27 -0400 Subject: [PATCH] storage: add Allocator.TransferLease{Source,Target} Before (`allocsim -n 4 -w 4`): _elapsed__ops/sec___errors_replicas_________1_________2_________3_________4 5m0s 1472.5 0 2124 704/687 492/0 522/0 406/0 After: _elapsed__ops/sec___errors_replicas_________1_________2_________3_________4 5m0s 1506.7 0 2157 520/183 548/180 547/166 542/180 I do see a little bit of thrashiness with the lease-transfer heuristics, though it settles down relatively quickly so I'm not sure if it is worth addressing yet. Depends on #10420 --- pkg/storage/allocator.go | 98 ++++++++++++++++++++++++++--- pkg/storage/allocator_test.go | 98 ++++++++++++++++++++++++++++- pkg/storage/replicate_queue.go | 75 +++++++++++++++++----- pkg/storage/replicate_queue_test.go | 30 ++++----- pkg/storage/simulation/range.go | 2 +- 5 files changed, 262 insertions(+), 41 deletions(-) diff --git a/pkg/storage/allocator.go b/pkg/storage/allocator.go index c349c8d7646a..6b71970dc3c5 100644 --- a/pkg/storage/allocator.go +++ b/pkg/storage/allocator.go @@ -268,9 +268,10 @@ func (a *Allocator) AllocateTarget( } // RemoveTarget returns a suitable replica to remove from the provided replica -// set. It attempts to consider which of the provided replicas would be the best -// candidate for removal. It also will exclude any replica that belongs to the -// range lease holder's store ID. +// set. It first attempts to randomly select a target from the set of stores +// that have greater than the average number of replicas. Failing that, it +// falls back to selecting a random target from any of the existing +// replicas. It also will exclude any replica that lives on leaseStoreID. // // TODO(mrtracy): removeTarget eventually needs to accept the attributes from // the zone config associated with the provided replicas. This will allow it to @@ -333,12 +334,12 @@ func (a Allocator) RemoveTarget( } // Retrieve store descriptors for the provided replicas from the StorePool. - var descriptors []roachpb.StoreDescriptor + descriptors := make([]roachpb.StoreDescriptor, 0, len(existing)) for _, exist := range existing { - if exist.StoreID == leaseStoreID { - continue - } if desc, ok := a.storePool.getStoreDescriptor(exist.StoreID); ok { + if exist.StoreID == leaseStoreID { + continue + } descriptors = append(descriptors, desc) } } @@ -483,6 +484,89 @@ func (a Allocator) RebalanceTarget( return a.improve(sl, existingNodes), nil } +// TransferLeaseTarget returns a suitable replica to transfer the range lease +// to from the provided list. It excludes the current lease holder replica. +func (a *Allocator) TransferLeaseTarget( + constraints config.Constraints, + existing []roachpb.ReplicaDescriptor, + leaseStoreID roachpb.StoreID, + rangeID roachpb.RangeID, + checkTransferLeaseSource bool, +) roachpb.ReplicaDescriptor { + if !a.options.AllowRebalance { + return roachpb.ReplicaDescriptor{} + } + + sl, _, _ := a.storePool.getStoreList(rangeID) + sl = sl.filter(constraints) + + source, ok := a.storePool.getStoreDescriptor(leaseStoreID) + if !ok { + return roachpb.ReplicaDescriptor{} + } + if checkTransferLeaseSource && !shouldTransferLease(sl, source) { + return roachpb.ReplicaDescriptor{} + } + + candidates := make([]roachpb.ReplicaDescriptor, 0, len(existing)-1) + for _, repl := range existing { + if leaseStoreID == repl.StoreID { + continue + } + storeDesc, ok := a.storePool.getStoreDescriptor(repl.StoreID) + if !ok { + continue + } + if float64(storeDesc.Capacity.LeaseCount) < sl.candidateLeases.mean-0.5 { + candidates = append(candidates, repl) + } + } + if len(candidates) == 0 { + return roachpb.ReplicaDescriptor{} + } + a.randGen.Lock() + defer a.randGen.Unlock() + return candidates[a.randGen.Intn(len(candidates))] +} + +// ShouldTransferLease returns true if the specified store is overfull in terms +// of leases with respect to the other stores matching the specified +// attributes. +func (a *Allocator) ShouldTransferLease( + constraints config.Constraints, leaseStoreID roachpb.StoreID, rangeID roachpb.RangeID, +) bool { + if !a.options.AllowRebalance { + return false + } + + source, ok := a.storePool.getStoreDescriptor(leaseStoreID) + if !ok { + return false + } + sl, _, _ := a.storePool.getStoreList(rangeID) + sl = sl.filter(constraints) + if log.V(3) { + log.Infof(context.TODO(), "transfer-lease-source (lease-holder=%d):\n%s", leaseStoreID, sl) + } + return shouldTransferLease(sl, source) +} + +var enableLeaseRebalancing = envutil.EnvOrDefaultBool("COCKROACH_ENABLE_LEASE_REBALANCING", true) + +func shouldTransferLease(sl StoreList, source roachpb.StoreDescriptor) bool { + if !enableLeaseRebalancing { + return false + } + // Allow lease transfer if we're above the overfull threshold, which is + // mean*(1+rebalanceThreshold). + overfullLeaseThreshold := int32(math.Ceil(sl.candidateLeases.mean * (1 + rebalanceThreshold))) + minOverfullThreshold := int32(math.Ceil(sl.candidateLeases.mean + 5)) + if overfullLeaseThreshold < minOverfullThreshold { + overfullLeaseThreshold = minOverfullThreshold + } + return source.Capacity.LeaseCount > overfullLeaseThreshold +} + // selectGood attempts to select a store from the supplied store list that it // considers to be 'Good' relative to the other stores in the list. Any nodes // in the supplied 'exclude' list will be disqualified from selection. Returns diff --git a/pkg/storage/allocator_test.go b/pkg/storage/allocator_test.go index c49da63f20ba..96e092356602 100644 --- a/pkg/storage/allocator_test.go +++ b/pkg/storage/allocator_test.go @@ -935,6 +935,102 @@ func TestAllocatorRebalanceByCount(t *testing.T) { }) } +func TestAllocatorTransferLeaseTarget(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper, g, _, a, _ := createTestAllocator( + /* deterministic */ true, + /* useRuleSolver */ false, + ) + defer stopper.Stop() + + // 3 stores where the lease count for each store is equal to 10x the store + // ID. + var stores []*roachpb.StoreDescriptor + for i := 1; i <= 3; i++ { + stores = append(stores, &roachpb.StoreDescriptor{ + StoreID: roachpb.StoreID(i), + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)}, + Capacity: roachpb.StoreCapacity{LeaseCount: int32(10 * i)}, + }) + } + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(stores, t) + + existing := []roachpb.ReplicaDescriptor{ + {StoreID: 1}, + {StoreID: 2}, + {StoreID: 3}, + } + + // TODO(peter): Add test cases for non-empty constraints. + testCases := []struct { + existing []roachpb.ReplicaDescriptor + leaseholder roachpb.StoreID + check bool + expected roachpb.StoreID + }{ + // No existing lease holder, nothing to do. + {existing: existing, leaseholder: 0, check: true, expected: 0}, + // Store 1 is not a lease transfer source. + {existing: existing, leaseholder: 1, check: true, expected: 0}, + {existing: existing, leaseholder: 1, check: false, expected: 0}, + // Store 2 is not a lease transfer source. + {existing: existing, leaseholder: 2, check: true, expected: 0}, + {existing: existing, leaseholder: 2, check: false, expected: 1}, + // Store 3 is a lease transfer source. + {existing: existing, leaseholder: 3, check: true, expected: 1}, + {existing: existing, leaseholder: 3, check: false, expected: 1}, + } + for i, c := range testCases { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + target := a.TransferLeaseTarget(config.Constraints{}, + c.existing, c.leaseholder, 0, c.check) + if c.expected != target.StoreID { + t.Fatalf("expected %d, but found %d", c.expected, target.StoreID) + } + }) + } +} + +func TestAllocatorShouldTransferLease(t *testing.T) { + defer leaktest.AfterTest(t)() + stopper, g, _, a, _ := createTestAllocator( + /* deterministic */ true, + /* useRuleSolver */ false, + ) + defer stopper.Stop() + + // 3 stores where the lease count for each store is equal to 10x the store + // ID. + var stores []*roachpb.StoreDescriptor + for i := 1; i <= 3; i++ { + stores = append(stores, &roachpb.StoreDescriptor{ + StoreID: roachpb.StoreID(i), + Node: roachpb.NodeDescriptor{NodeID: roachpb.NodeID(i)}, + Capacity: roachpb.StoreCapacity{LeaseCount: int32(10 * i)}, + }) + } + sg := gossiputil.NewStoreGossiper(g) + sg.GossipStores(stores, t) + + testCases := []struct { + leaseholder roachpb.StoreID + expected bool + }{ + {leaseholder: 1, expected: false}, + {leaseholder: 2, expected: false}, + {leaseholder: 3, expected: true}, + } + for i, c := range testCases { + t.Run(fmt.Sprintf("%d", i), func(t *testing.T) { + result := a.ShouldTransferLease(config.Constraints{}, c.leaseholder, 0) + if c.expected != result { + t.Fatalf("expected %v, but found %v", c.expected, result) + } + }) + } +} + // TestAllocatorRemoveTarget verifies that the replica chosen by RemoveTarget is // the one with the lowest capacity. func TestAllocatorRemoveTarget(t *testing.T) { @@ -1021,7 +1117,7 @@ func TestAllocatorRemoveTarget(t *testing.T) { for i := 0; i < 100; i++ { // Exclude store 1 as a removal target. We should see stores 2 and 3 // randomly selected as the removal target. - targetRepl, err := a.RemoveTarget(config.Constraints{}, replicas, stores[0].StoreID) + targetRepl, err := a.RemoveTarget(config.Constraints{}, replicas, 0) if err != nil { t.Fatal(err) } diff --git a/pkg/storage/replicate_queue.go b/pkg/storage/replicate_queue.go index 63420bc95de0..09d57c8af5bb 100644 --- a/pkg/storage/replicate_queue.go +++ b/pkg/storage/replicate_queue.go @@ -114,11 +114,23 @@ func (rq *replicateQueue) shouldQueue( } return true, priority } - // See if there is a rebalancing opportunity present. - leaseStoreID := repl.store.StoreID() - if lease, _ := repl.getLease(); lease != nil { + + // If we hold the lease, check to see if we should transfer it. + var leaseStoreID roachpb.StoreID + if lease, _ := repl.getLease(); lease != nil && lease.Covers(now) { leaseStoreID = lease.Replica.StoreID + if rq.allocator.ShouldTransferLease( + zone.Constraints, leaseStoreID, desc.RangeID) { + if log.V(2) { + log.Infof(ctx, "%s lease transfer needed, enqueuing", repl) + } + return true, 0 + } } + + // Check for a rebalancing opportunity. Note that leaseStoreID will be 0 if + // the range doesn't currently have a lease which will allow the current + // replica to be considered a rebalancing source. target, err := rq.allocator.RebalanceTarget( zone.Constraints, desc.Replicas, @@ -182,23 +194,46 @@ func (rq *replicateQueue) process( } case AllocatorRemove: log.Event(ctx, "removing a replica") - // We require the lease in order to process replicas, so - // repl.store.StoreID() corresponds to the lease-holder's store ID. + // If the lease holder (our local store) is an overfull store (in terms of + // leases) allow transferring the lease away. + leaseHolderStoreID := repl.store.StoreID() + if rq.allocator.ShouldTransferLease(zone.Constraints, leaseHolderStoreID, desc.RangeID) { + leaseHolderStoreID = 0 + } removeReplica, err := rq.allocator.RemoveTarget( zone.Constraints, desc.Replicas, - repl.store.StoreID(), + leaseHolderStoreID, ) if err != nil { return err } - log.VEventf(ctx, 1, "removing replica %+v due to over-replication", removeReplica) - if err = repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, removeReplica, desc); err != nil { - return err - } - // Do not requeue if we removed ourselves. if removeReplica.StoreID == repl.store.StoreID() { - return nil + // The local replica was selected as the removal target, but that replica + // is the leaseholder, so transfer the lease instead. We don't check that + // the current store has too many leases in this case under the + // assumption that replica balance is a greater concern. Also note that + // AllocatorRemove action takes preference over AllocatorNoop + // (rebalancing) which is where lease transfer would otherwise occur. We + // need to be able to transfer leases in AllocatorRemove in order to get + // out of situations where this store is overfull and yet holds all the + // leases. + target := rq.allocator.TransferLeaseTarget( + zone.Constraints, desc.Replicas, repl.store.StoreID(), desc.RangeID, + false /* checkTransferLeaseSource */) + if target != (roachpb.ReplicaDescriptor{}) { + log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID) + if err := repl.AdminTransferLease(target.StoreID); err != nil { + return errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID) + } + // Do not requeue as we transferred our lease away. + return nil + } + } else { + log.VEventf(ctx, 1, "removing replica %+v due to over-replication", removeReplica) + if err = repl.ChangeReplicas(ctx, roachpb.REMOVE_REPLICA, removeReplica, desc); err != nil { + return err + } } case AllocatorRemoveDead: log.Event(ctx, "removing a dead replica") @@ -214,12 +249,24 @@ func (rq *replicateQueue) process( return err } case AllocatorNoop: - log.Event(ctx, "considering a rebalance") // The Noop case will result if this replica was queued in order to // rebalance. Attempt to find a rebalancing target. - // + log.Event(ctx, "considering a rebalance") + // We require the lease in order to process replicas, so // repl.store.StoreID() corresponds to the lease-holder's store ID. + target := rq.allocator.TransferLeaseTarget( + zone.Constraints, desc.Replicas, repl.store.StoreID(), desc.RangeID, + true /* checkTransferLeaseSource */) + if target.StoreID != 0 { + log.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID) + if err := repl.AdminTransferLease(target.StoreID); err != nil { + return errors.Wrapf(err, "%s: unable to transfer lease to s%d", repl, target.StoreID) + } + // Do not requeue as we transferred our lease away. + return nil + } + rebalanceStore, err := rq.allocator.RebalanceTarget( zone.Constraints, desc.Replicas, diff --git a/pkg/storage/replicate_queue_test.go b/pkg/storage/replicate_queue_test.go index e82b365b5f41..5efa0e928275 100644 --- a/pkg/storage/replicate_queue_test.go +++ b/pkg/storage/replicate_queue_test.go @@ -17,6 +17,7 @@ package storage_test import ( + "math" "os" "testing" @@ -24,6 +25,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/server" "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" @@ -47,25 +49,14 @@ func TestReplicateQueueRebalance(t *testing.T) { } }() - // TODO(peter): Bump this to 10 nodes. Doing so is flaky until we have lease - // rebalancing because store 1 can hold on to too many replicas. Consider: - // - // [15 4 2 3 3 5 5 0 5 5] - // - // Store 1 is holding all of the leases so we can't rebalance away from - // it. Every other store has within the ceil(average-replicas) threshold. So - // there are no rebalancing opportunities for store 8. - tc := testcluster.StartTestCluster(t, 5, + const numNodes = 5 + tc := testcluster.StartTestCluster(t, numNodes, base.TestClusterArgs{ReplicationMode: base.ReplicationAuto}, ) defer tc.Stopper().Stop() - // Create a handful of ranges. Along with the initial ranges in the cluster, - // this will result in 15 total ranges and 45 total replicas. Spread across - // the 10 nodes in the cluster the average is 4.5 replicas per node. Note - // that we don't expect to achieve that perfect balance as rebalancing - // targets a threshold around the average. - for i := 0; i < 10; i++ { + const newRanges = 5 + for i := 0; i < newRanges; i++ { tableID := keys.MaxReservedDescID + i + 1 splitKey := keys.MakeRowSentinelKey(keys.MakeTablePrefix(uint32(tableID))) for { @@ -94,12 +85,15 @@ func TestReplicateQueueRebalance(t *testing.T) { return counts } + numRanges := newRanges + server.ExpectedInitialRangeCount() + numReplicas := numRanges * 3 + const minThreshold = 0.9 + minReplicas := int(math.Floor(minThreshold * (float64(numReplicas) / numNodes))) + util.SucceedsSoon(t, func() error { counts := countReplicas() for _, c := range counts { - // TODO(peter): This is a weak check for rebalancing. When lease - // rebalancing is in place we can make this somewhat more robust. - if c == 0 { + if c < minReplicas { err := errors.Errorf("not balanced: %d", counts) log.Info(context.Background(), err) return err diff --git a/pkg/storage/simulation/range.go b/pkg/storage/simulation/range.go index 0effcf37163c..b8668b719fb3 100644 --- a/pkg/storage/simulation/range.go +++ b/pkg/storage/simulation/range.go @@ -131,7 +131,7 @@ func (r *Range) getAllocateTarget() (roachpb.StoreID, error) { func (r *Range) getRemoveTarget() (roachpb.StoreID, error) { // Pass in an invalid store ID since we don't consider range leases as part // of the simulator. - removeStore, err := r.allocator.RemoveTarget(r.zone.Constraints, r.desc.Replicas, roachpb.StoreID(-1)) + removeStore, err := r.allocator.RemoveTarget(r.zone.Constraints, r.desc.Replicas, 0) if err != nil { return 0, err }