From 91701d0da4f56dae6721a71bff0814f92df152c2 Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Tue, 22 Nov 2022 20:13:04 -0500 Subject: [PATCH] kvserver: add support for allocator range check via store This change exposes support via a store for checking the allocator action and upreplication target (if applicable) for any range descriptor. The range does not need to have a replica on the given store, nor is it required to evaluate given the current state of the cluster (i.e. the store's configured `StorePool`), as a node liveness override can be provided in order to evaluate possible future states. Depends on #92176. Part of #91570. Release note: None --- .../allocator/allocatorimpl/allocator.go | 3 +- pkg/kv/kvserver/allocator/base.go | 7 + pkg/kv/kvserver/allocator_impl_test.go | 42 ++++ pkg/kv/kvserver/store.go | 63 ++++++ pkg/kv/kvserver/store_test.go | 209 ++++++++++++++++++ 5 files changed, 323 insertions(+), 1 deletion(-) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index 49bfa33cdb67..97298fda4b46 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -350,7 +350,8 @@ func (ae *allocatorError) Error() string { return b.String() } -func (*allocatorError) PurgatoryErrorMarker() {} +func (*allocatorError) AllocationErrorMarker() {} +func (*allocatorError) PurgatoryErrorMarker() {} // allocatorRand pairs a rand.Rand with a mutex. // NOTE: Allocator is typically only accessed from a single thread (the diff --git a/pkg/kv/kvserver/allocator/base.go b/pkg/kv/kvserver/allocator/base.go index 546a6ea82a9f..52db4fc11961 100644 --- a/pkg/kv/kvserver/allocator/base.go +++ b/pkg/kv/kvserver/allocator/base.go @@ -41,6 +41,13 @@ const ( defaultLoadBasedRebalancingInterval = time.Minute ) +// AllocationError is a simple interface used to indicate a replica processing +// error originating from the allocator. +type AllocationError interface { + error + AllocationErrorMarker() // dummy method for unique interface +} + // MaxCapacityCheck returns true if the store has room for a new replica. func MaxCapacityCheck(store roachpb.StoreDescriptor) bool { return store.Capacity.FractionUsed() < MaxFractionUsedThreshold diff --git a/pkg/kv/kvserver/allocator_impl_test.go b/pkg/kv/kvserver/allocator_impl_test.go index da650cac28f3..bfa7f02440df 100644 --- a/pkg/kv/kvserver/allocator_impl_test.go +++ b/pkg/kv/kvserver/allocator_impl_test.go @@ -61,6 +61,48 @@ var singleStore = []*roachpb.StoreDescriptor{ }, } +var threeStores = []*roachpb.StoreDescriptor{ + { + StoreID: 1, + Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 1, + Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, + { + StoreID: 2, + Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 2, + Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, + { + StoreID: 3, + Attrs: roachpb.Attributes{Attrs: []string{"ssd"}}, + Node: roachpb.NodeDescriptor{ + NodeID: 3, + Attrs: roachpb.Attributes{Attrs: []string{"a"}}, + }, + Capacity: roachpb.StoreCapacity{ + Capacity: 200, + Available: 100, + LogicalBytes: 100, + }, + }, +} + var twoDCStores = []*roachpb.StoreDescriptor{ { StoreID: 1, diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index b2c22defe993..8e498296e229 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -3728,6 +3728,69 @@ func (s *Store) AllocatorDryRun(ctx context.Context, repl *Replica) (tracingpb.R return collectAndFinish(), nil } +// AllocatorCheckRange takes a range descriptor and a node liveness override (or +// nil, to use the configured StorePool's), looks up the configuration of +// range, and utilizes the allocator to get the action needed to repair the +// range, as well as any upreplication target if needed, returning along with +// any encountered errors as well as the collected tracing spans. +// +// This functionality is similar to AllocatorDryRun, but operates on the basis +// of a range, evaluating the action and target determined by the allocator. +// The range does not need to have a replica on the store in order to check the +// needed allocator action and target. The liveness override function, if +// provided, may return UNKNOWN to fall back to the actual node liveness. +// +// Assuming the span config is available, a valid allocator action should +// always be returned, even in case of errors. +// +// NB: In the case of removal or rebalance actions, a target cannot be +// evaluated, as a leaseholder is required for evaluation. +func (s *Store) AllocatorCheckRange( + ctx context.Context, + desc *roachpb.RangeDescriptor, + nodeLivenessOverride storepool.NodeLivenessFunc, +) (allocatorimpl.AllocatorAction, roachpb.ReplicationTarget, tracingpb.Recording, error) { + ctx, collectAndFinish := tracing.ContextWithRecordingSpan(ctx, s.cfg.AmbientCtx.Tracer, "allocator check range") + defer collectAndFinish() + + confReader, err := s.GetConfReader(ctx) + if err == nil { + err = s.WaitForSpanConfigSubscription(ctx) + } + if err != nil { + log.Eventf(ctx, "span configs unavailable: %s", err) + return allocatorimpl.AllocatorNoop, roachpb.ReplicationTarget{}, collectAndFinish(), err + } + + conf, err := confReader.GetSpanConfigForKey(ctx, desc.StartKey) + if err != nil { + log.Eventf(ctx, "error retrieving span config for range %s: %s", desc, err) + return allocatorimpl.AllocatorNoop, roachpb.ReplicationTarget{}, collectAndFinish(), err + } + + var storePool storepool.AllocatorStorePool + if nodeLivenessOverride != nil { + internalNodeLivenessFn := func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + status := nodeLivenessOverride(nid, now, timeUntilStoreDead) + if status == livenesspb.NodeLivenessStatus_UNKNOWN { + return s.cfg.StorePool.NodeLivenessFn(nid, now, timeUntilStoreDead) + } + + return status + } + storePool = storepool.NewOverrideStorePool(s.cfg.StorePool, internalNodeLivenessFn) + } else if s.cfg.StorePool != nil { + storePool = s.cfg.StorePool + } + + action, target, err := s.replicateQueue.CheckRangeAction(ctx, storePool, desc, conf) + if err != nil { + log.Eventf(ctx, "error simulating allocator on range %s: %s", desc, err) + } + + return action, target, collectAndFinish(), err +} + // Enqueue runs the given replica through the requested queue. If `async` is // specified, the replica is enqueued into the requested queue for asynchronous // processing and this method returns nothing. Otherwise, it returns all trace diff --git a/pkg/kv/kvserver/store_test.go b/pkg/kv/kvserver/store_test.go index 55a74f41e6fa..697384c6a683 100644 --- a/pkg/kv/kvserver/store_test.go +++ b/pkg/kv/kvserver/store_test.go @@ -32,11 +32,14 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/batcheval" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverpb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rditer" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/stateloader" @@ -49,6 +52,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/gossiputil" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/errorutil" @@ -3398,6 +3402,211 @@ func TestSnapshotRateLimit(t *testing.T) { } } +// TestAllocatorCheckRangeUnconfigured tests evaluating the allocation decisions +// for a range with a single replica using the default system configuration and +// no other available allocation targets. +func TestAllocatorCheckRangeUnconfigured(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + tc := testContext{} + tc.Start(ctx, t, stopper) + + s := tc.store + + // Expect allocator error if range has nowhere to upreplicate. + action, _, _, err := s.AllocatorCheckRange(ctx, tc.repl.Desc(), nil /* nodeLivenessOverride */) + require.Error(t, err) + var allocatorError allocator.AllocationError + require.ErrorAs(t, err, &allocatorError) + require.Equal(t, allocatorimpl.AllocatorAddVoter, action) + + // Expect error looking up spanConfig if we can't use the system config span, + // as the spanconfig.KVSubscriber infrastructure is not initialized. + s.cfg.TestingKnobs.MakeSystemConfigSpanUnavailableToQueues = true + action, _, _, err = s.AllocatorCheckRange(ctx, tc.repl.Desc(), nil /* nodeLivenessOverride */) + require.Error(t, err) + require.ErrorIs(t, err, errSysCfgUnavailable) + require.Equal(t, allocatorimpl.AllocatorNoop, action) +} + +// TestAllocatorCheckRange runs a number of tests to check the allocator's +// range repair action and target based on a number of different configured +// stores. +func TestAllocatorCheckRange(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + for _, tc := range []struct { + name string + stores []*roachpb.StoreDescriptor + existingReplicas []roachpb.ReplicaDescriptor + livenessOverrides map[roachpb.NodeID]livenesspb.NodeLivenessStatus + expectedAction allocatorimpl.AllocatorAction + expectValidTarget bool + expectedLogMessage string + expectErr bool + expectAllocatorErr bool + expectedErr error + expectedErrStr string + }{ + { + name: "overreplicated", + stores: multiRegionStores, + existingReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + {NodeID: 4, StoreID: 4, ReplicaID: 4}, + }, + livenessOverrides: nil, + expectedAction: allocatorimpl.AllocatorRemoveVoter, + expectErr: false, + }, + { + name: "overreplicated but store dead", + stores: multiRegionStores, + existingReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + {NodeID: 4, StoreID: 4, ReplicaID: 4}, + }, + livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ + 3: livenesspb.NodeLivenessStatus_DEAD, + }, + expectedAction: allocatorimpl.AllocatorRemoveDeadVoter, + expectErr: false, + }, + { + name: "decommissioning but underreplicated", + stores: multiRegionStores, + existingReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + }, + livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ + 2: livenesspb.NodeLivenessStatus_DECOMMISSIONING, + }, + expectedAction: allocatorimpl.AllocatorAddVoter, + expectErr: false, + expectValidTarget: true, + }, + { + name: "decommissioning with replacement", + stores: multiRegionStores, + existingReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + }, + livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ + 3: livenesspb.NodeLivenessStatus_DECOMMISSIONING, + }, + expectedAction: allocatorimpl.AllocatorReplaceDecommissioningVoter, + expectErr: false, + expectValidTarget: true, + }, + { + name: "decommissioning without valid replacement", + stores: threeStores, + existingReplicas: []roachpb.ReplicaDescriptor{ + {NodeID: 1, StoreID: 1, ReplicaID: 1}, + {NodeID: 2, StoreID: 2, ReplicaID: 2}, + {NodeID: 3, StoreID: 3, ReplicaID: 3}, + }, + livenessOverrides: map[roachpb.NodeID]livenesspb.NodeLivenessStatus{ + 3: livenesspb.NodeLivenessStatus_DECOMMISSIONING, + }, + expectedAction: allocatorimpl.AllocatorReplaceDecommissioningVoter, + expectAllocatorErr: true, + expectedErrStr: "likely not enough nodes in cluster", + }, + } { + t.Run(tc.name, func(t *testing.T) { + // Setup store pool based on store descriptors and configure test store. + nodesSeen := make(map[roachpb.NodeID]struct{}) + for _, storeDesc := range tc.stores { + nodesSeen[storeDesc.Node.NodeID] = struct{}{} + } + numNodes := len(nodesSeen) + + // Create a test store simulating n1s1, where we have other nodes/stores as + // determined by the test configuration. As we do not start the test store, + // queues will not be running. + stopper, g, sp, _, manual := allocatorimpl.CreateTestAllocator(ctx, numNodes, false /* deterministic */) + defer stopper.Stop(context.Background()) + gossiputil.NewStoreGossiper(g).GossipStores(tc.stores, t) + + clock := hlc.NewClock(manual, time.Nanosecond) + cfg := TestStoreConfig(clock) + cfg.Gossip = g + cfg.StorePool = sp + + s := createTestStoreWithoutStart(ctx, t, stopper, testStoreOpts{createSystemRanges: true}, &cfg) + + desc := &roachpb.RangeDescriptor{ + RangeID: 789, + StartKey: roachpb.RKey("a"), + EndKey: roachpb.RKey("b"), + InternalReplicas: tc.existingReplicas, + } + + var livenessOverride storepool.NodeLivenessFunc + if len(tc.livenessOverrides) > 0 { + livenessOverride = func(nid roachpb.NodeID, _ time.Time, _ time.Duration) livenesspb.NodeLivenessStatus { + if liveness, ok := tc.livenessOverrides[nid]; ok { + return liveness + } + + return livenesspb.NodeLivenessStatus_UNKNOWN + } + } + + // Execute actual allocator range repair check. + action, target, recording, err := s.AllocatorCheckRange(ctx, desc, livenessOverride) + + // Validate expectations from test case. + if tc.expectErr || tc.expectAllocatorErr { + require.Error(t, err) + + if tc.expectedErr != nil { + require.ErrorIs(t, err, tc.expectedErr) + } + if tc.expectAllocatorErr { + var allocatorError allocator.AllocationError + require.ErrorAs(t, err, &allocatorError) + } + if tc.expectedErrStr != "" { + require.ErrorContains(t, err, tc.expectedErrStr) + } + } else { + require.NoError(t, err) + } + + require.Equalf(t, tc.expectedAction, action, + "expected action \"%s\", got action \"%s\"", tc.expectedAction, action, + ) + + if tc.expectValidTarget { + require.NotEqualf(t, roachpb.ReplicationTarget{}, target, "expected valid target") + } + + if tc.expectedLogMessage != "" { + _, ok := recording.FindLogMessage(tc.expectedLogMessage) + require.Truef(t, ok, "expected to find trace \"%s\"", tc.expectedLogMessage) + } + }) + } +} + // TestManuallyEnqueueUninitializedReplica makes sure that uninitialized // replicas cannot be enqueued. func TestManuallyEnqueueUninitializedReplica(t *testing.T) {