From 1e57036bc5b05ba00dbc4b434f00cbe140a7761d Mon Sep 17 00:00:00 2001 From: Alex Sarkesian Date: Thu, 15 Dec 2022 17:31:28 -0500 Subject: [PATCH] server: evaluate decommission pre-checks This adds support for the evaluation of the decommission readiness of a node (or set of nodes), by simulating their liveness to have the DECOMMISSIONING status and utilizing the allocator to ensure that we are able to perform any actions needed to repair the range. This supports a "strict" mode, in which case we expect all ranges to only need replacement or removal due to the decommissioning status, or a more permissive "non-strict" mode, which allows for other actions needed, as long as they do not encounter errors in finding a suitable allocation target. The non-strict mode allows us to permit situations where a range may have more than one action needed to repair it, such as a range that needs to reach its replication factor before the decommissioning replica can be replaced, or a range that needs to finalize an atomic replication change. Depends on #94024. Part of #91568 Release note: None --- pkg/server/BUILD.bazel | 1 + pkg/server/decommission.go | 185 ++++++++++++++++++++++++++++++ pkg/server/decommission_test.go | 197 ++++++++++++++++++++++++++++++++ 3 files changed, 383 insertions(+) create mode 100644 pkg/server/decommission_test.go diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 5ba452010169..2c31ce3456e6 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -100,6 +100,7 @@ go_library( "//pkg/kv/kvclient/rangestats", "//pkg/kv/kvprober", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/allocator/allocatorimpl", "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/closedts/ctpb", "//pkg/kv/kvserver/closedts/sidetransport", diff --git a/pkg/server/decommission.go b/pkg/server/decommission.go index 797891849232..e6347ebdbd78 100644 --- a/pkg/server/decommission.go +++ b/pkg/server/decommission.go @@ -17,6 +17,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -24,8 +26,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" + "github.com/cockroachdb/cockroach/pkg/util/rangedesc" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "google.golang.org/grpc/codes" grpcstatus "google.golang.org/grpc/status" @@ -39,6 +43,25 @@ type decommissioningNodeMap struct { nodes map[roachpb.NodeID]interface{} } +// decommissionRangeCheckResult is the result of evaluating the allocator action +// and target for a single range that has an extant replica on a node targeted +// for decommission. +type decommissionRangeCheckResult struct { + desc *roachpb.RangeDescriptor + action allocatorimpl.AllocatorAction + tracingSpans tracingpb.Recording + err error +} + +// decommissionPreCheckResult is the result of checking the readiness +// of a node or set of nodes to be decommissioned. +type decommissionPreCheckResult struct { + rangesChecked int + replicasByNode map[roachpb.NodeID][]roachpb.ReplicaIdent + actionCounts map[allocatorimpl.AllocatorAction]int + rangesNotReady []decommissionRangeCheckResult +} + // makeOnNodeDecommissioningCallback returns a callback that enqueues the // decommissioning node's ranges into the `stores`' replicateQueues for // rebalancing. @@ -131,6 +154,168 @@ func getPingCheckDecommissionFn( } } +// DecommissionPreCheck is used to evaluate if nodes are ready for decommission, +// prior to starting the Decommission(..) process. This is evaluated by checking +// that any replicas on the given nodes are able to be replaced or removed, +// following the current state of the cluster as well as the configuration. +// If strictReadiness is true, all replicas are expected to need only replace +// or remove actions. If maxErrors >0, range checks will stop once maxError is +// reached. +func (s *Server) DecommissionPreCheck( + ctx context.Context, + nodeIDs []roachpb.NodeID, + strictReadiness bool, + collectTraces bool, + maxErrors int, +) (decommissionPreCheckResult, error) { + var rangesChecked int + decommissionCheckNodeIDs := make(map[roachpb.NodeID]livenesspb.NodeLivenessStatus) + replicasByNode := make(map[roachpb.NodeID][]roachpb.ReplicaIdent) + actionCounts := make(map[allocatorimpl.AllocatorAction]int) + var rangeErrors []decommissionRangeCheckResult + const pageSize = 10000 + + for _, nodeID := range nodeIDs { + decommissionCheckNodeIDs[nodeID] = livenesspb.NodeLivenessStatus_DECOMMISSIONING + } + + // Counters need to be reset on any transaction retries during the scan + // through range descriptors. + initCounters := func() { + rangesChecked = 0 + for action := range actionCounts { + actionCounts[action] = 0 + } + rangeErrors = rangeErrors[:0] + for nid := range replicasByNode { + replicasByNode[nid] = replicasByNode[nid][:0] + } + } + + // Only check using the first store on this node, as they should all give + // identical results. + var evalStore *kvserver.Store + err := s.node.stores.VisitStores(func(s *kvserver.Store) error { + if evalStore == nil { + evalStore = s + } + return nil + }) + if err == nil && evalStore == nil { + err = errors.Errorf("n%d has no initialized store", s.NodeID()) + } + if err != nil { + return decommissionPreCheckResult{}, err + } + + // Define our node liveness overrides to simulate that the nodeIDs for which + // we are checking decommission readiness are in the DECOMMISSIONING state. + // All other nodeIDs should use their actual liveness status. + existingStorePool := evalStore.GetStoreConfig().StorePool + overrideNodeLivenessFn := storepool.OverrideNodeLivenessFunc( + decommissionCheckNodeIDs, existingStorePool.NodeLivenessFn, + ) + overrideStorePool := storepool.NewOverrideStorePool(existingStorePool, overrideNodeLivenessFn) + + // Define our replica filter to only look at the replicas on the checked nodes. + predHasDecommissioningReplica := func(rDesc roachpb.ReplicaDescriptor) bool { + _, ok := decommissionCheckNodeIDs[rDesc.NodeID] + return ok + } + + // Iterate through all range descriptors using the rangedesc.Scanner, which + // will perform the requisite meta1/meta2 lookups, including retries. + rangeDescScanner := rangedesc.NewScanner(s.db) + err = rangeDescScanner.Scan(ctx, pageSize, initCounters, keys.EverythingSpan, func(descriptors ...roachpb.RangeDescriptor) error { + for _, desc := range descriptors { + // Track replicas by node for recording purposes. + // Skip checks if this range doesn't exist on a potentially decommissioning node. + replicasToMove := desc.Replicas().FilterToDescriptors(predHasDecommissioningReplica) + if len(replicasToMove) == 0 { + continue + } + for _, rDesc := range replicasToMove { + rIdent := roachpb.ReplicaIdent{ + RangeID: desc.RangeID, + Replica: rDesc, + } + replicasByNode[rDesc.NodeID] = append(replicasByNode[rDesc.NodeID], rIdent) + } + + if maxErrors > 0 && len(rangeErrors) >= maxErrors { + // TODO(sarkesian): Consider adding a per-range descriptor iterator to + // rangedesc.Scanner, which will correctly stop iteration on the + // function returning iterutil.StopIteration(). + continue + } + + action, _, recording, rErr := evalStore.AllocatorCheckRange(ctx, &desc, overrideStorePool) + rangesChecked += 1 + actionCounts[action] += 1 + + if passed, checkResult := evaluateRangeCheckResult(strictReadiness, collectTraces, + &desc, action, recording, rErr, + ); !passed { + rangeErrors = append(rangeErrors, checkResult) + } + } + + return nil + }) + + return decommissionPreCheckResult{ + rangesChecked: rangesChecked, + replicasByNode: replicasByNode, + actionCounts: actionCounts, + rangesNotReady: rangeErrors, + }, err +} + +// evaluateRangeCheckResult returns true or false if the range has passed +// decommissioning checks (based on if we are testing strict readiness or not), +// as well as the encapsulated range check result with errors defined as needed. +func evaluateRangeCheckResult( + strictReadiness bool, + collectTraces bool, + desc *roachpb.RangeDescriptor, + action allocatorimpl.AllocatorAction, + recording tracingpb.Recording, + rErr error, +) (passed bool, _ decommissionRangeCheckResult) { + checkResult := decommissionRangeCheckResult{ + desc: desc, + action: action, + err: rErr, + } + + if collectTraces { + checkResult.tracingSpans = recording + } + + if rErr != nil { + return false, checkResult + } + + if action == allocatorimpl.AllocatorRangeUnavailable || + action == allocatorimpl.AllocatorNoop || + action == allocatorimpl.AllocatorConsiderRebalance { + checkResult.err = errors.Errorf("range r%d requires unexpected allocation action: %s", + desc.RangeID, action, + ) + return false, checkResult + } + + if strictReadiness && !(action.Replace() || action.Remove()) { + checkResult.err = errors.Errorf( + "range r%d needs repair beyond replacing/removing the decommissioning replica: %s", + desc.RangeID, action, + ) + return false, checkResult + } + + return true, checkResult +} + // Decommission idempotently sets the decommissioning flag for specified nodes. // The error return is a gRPC error. func (s *Server) Decommission( diff --git a/pkg/server/decommission_test.go b/pkg/server/decommission_test.go new file mode 100644 index 000000000000..13ff33fedd8d --- /dev/null +++ b/pkg/server/decommission_test.go @@ -0,0 +1,197 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package server + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/security/username" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/require" +) + +func TestDecommissionPreCheckEvaluation(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + skip.UnderRace(t) // can't handle 7-node clusters + + tsArgs := func(attrs ...string) base.TestServerArgs { + return base.TestServerArgs{ + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + { + Key: "region", + Value: "a", + }, + }, + }, + StoreSpecs: []base.StoreSpec{ + {InMemory: true, Attributes: roachpb.Attributes{Attrs: attrs}}, + }, + } + } + + // Set up test cluster. + ctx := context.Background() + tc := serverutils.StartNewTestCluster(t, 7, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgsPerNode: map[int]base.TestServerArgs{ + 0: tsArgs("ns1", "origin"), + 1: tsArgs("ns2", "west"), + 2: tsArgs("ns3", "central"), + 3: tsArgs("ns4", "central"), + 4: tsArgs("ns5", "east"), + 5: tsArgs("ns6", "east"), + 6: tsArgs("ns7", "east"), + }, + }) + defer tc.Stopper().Stop(ctx) + + // Evaluate decommission readiness of several nodes given the replicas that + // exist on these nodes. + firstSvr := tc.Server(0).(*TestServer) + db := tc.ServerConn(0) + runQueries := func(queries ...string) { + for _, q := range queries { + if _, err := db.Exec(q); err != nil { + t.Fatalf("error executing '%s': %s", q, err) + } + } + } + // Create database and tables. + ac := firstSvr.AmbientCtx() + ctx, span := ac.AnnotateCtxWithSpan(context.Background(), "test") + defer span.Finish() + setupQueries := []string{ + "CREATE DATABASE test", + "CREATE TABLE test.tblA (val STRING)", + "CREATE TABLE test.tblB (val STRING)", + "INSERT INTO test.tblA VALUES ('testvalA')", + "INSERT INTO test.tblB VALUES ('testvalB')", + } + runQueries(setupQueries...) + alterQueries := []string{ + "ALTER TABLE test.tblA CONFIGURE ZONE USING num_replicas = 3, constraints = '{+west: 1, +central: 1, +east: 1}', " + + "range_max_bytes = 500000, range_min_bytes = 100", + "ALTER TABLE test.tblB CONFIGURE ZONE USING num_replicas = 3, constraints = '{+east}', " + + "range_max_bytes = 500000, range_min_bytes = 100", + } + runQueries(alterQueries...) + tblAID, err := firstSvr.admin.queryTableID(ctx, username.RootUserName(), "test", "tblA") + require.NoError(t, err) + tblBID, err := firstSvr.admin.queryTableID(ctx, username.RootUserName(), "test", "tblB") + require.NoError(t, err) + startKeyTblA := keys.TODOSQLCodec.TablePrefix(uint32(tblAID)) + startKeyTblB := keys.TODOSQLCodec.TablePrefix(uint32(tblBID)) + + // Split off ranges for tblA and tblB. + _, rDescA, err := firstSvr.SplitRange(startKeyTblA) + require.NoError(t, err) + _, rDescB, err := firstSvr.SplitRange(startKeyTblB) + require.NoError(t, err) + + // Ensure all nodes have the correct span configs for tblA and tblB. + waitForSpanConfig(t, tc, rDescA.StartKey, 500000) + waitForSpanConfig(t, tc, rDescB.StartKey, 500000) + + // Transfer tblA to [west, central, east] and tblB to [east]. + tc.AddVotersOrFatal(t, startKeyTblA, tc.Target(1), tc.Target(2), tc.Target(4)) + tc.TransferRangeLeaseOrFatal(t, rDescA, tc.Target(1)) + tc.RemoveVotersOrFatal(t, startKeyTblA, tc.Target(0)) + tc.AddVotersOrFatal(t, startKeyTblB, tc.Target(4), tc.Target(5), tc.Target(6)) + tc.TransferRangeLeaseOrFatal(t, rDescB, tc.Target(4)) + tc.RemoveVotersOrFatal(t, startKeyTblB, tc.Target(0)) + + // Validate range distribution. + rDescA = tc.LookupRangeOrFatal(t, startKeyTblA) + rDescB = tc.LookupRangeOrFatal(t, startKeyTblB) + for _, desc := range []roachpb.RangeDescriptor{rDescA, rDescB} { + require.Lenf(t, desc.Replicas().VoterAndNonVoterDescriptors(), 3, "expected 3 replicas, have %v", desc) + } + + require.True(t, hasReplicaOnServers(tc, &rDescA, 1, 2, 4)) + require.True(t, hasReplicaOnServers(tc, &rDescB, 4, 5, 6)) + + // Evaluate n5 decommission check. + decommissioningNodeIDs := []roachpb.NodeID{tc.Server(4).NodeID()} + result, err := firstSvr.DecommissionPreCheck(ctx, decommissioningNodeIDs, true, true, 0) + require.NoError(t, err) + require.Equal(t, 2, result.rangesChecked, "unexpected number of ranges checked") + require.Equalf(t, 2, result.actionCounts[allocatorimpl.AllocatorReplaceDecommissioningVoter], + "unexpected allocator actions, got %v", result.actionCounts) + require.Lenf(t, result.rangesNotReady, 1, "unexpected number of unready ranges") + + // Validate error on tblB's range as it requires 3 replicas in "east". + unreadyResult := result.rangesNotReady[0] + require.Equalf(t, rDescB.StartKey, unreadyResult.desc.StartKey, + "expected tblB's range to be unready, got %s", unreadyResult.desc, + ) + require.Errorf(t, unreadyResult.err, "expected error on %s", unreadyResult.desc) + require.NotEmptyf(t, unreadyResult.tracingSpans, "expected tracing spans on %s", unreadyResult.desc) + var allocatorError allocator.AllocationError + require.ErrorAsf(t, unreadyResult.err, &allocatorError, "expected allocator error on %s", unreadyResult.desc) + + // Evaluate n3 decommission check (not required to satisfy constraints). + decommissioningNodeIDs = []roachpb.NodeID{tc.Server(2).NodeID()} + result, err = firstSvr.DecommissionPreCheck(ctx, decommissioningNodeIDs, true, true, 0) + require.NoError(t, err) + require.Equal(t, 1, result.rangesChecked, "unexpected number of ranges checked") + require.Equalf(t, 1, result.actionCounts[allocatorimpl.AllocatorReplaceDecommissioningVoter], + "unexpected allocator actions, got %v", result.actionCounts) + require.Lenf(t, result.rangesNotReady, 0, "unexpected number of unready ranges") +} + +// hasReplicaOnServers returns true if the range has replicas on given servers. +func hasReplicaOnServers( + tc serverutils.TestClusterInterface, desc *roachpb.RangeDescriptor, serverIdxs ...int, +) bool { + hasAll := true + for _, idx := range serverIdxs { + hasAll = hasAll && desc.Replicas().HasReplicaOnNode(tc.Server(idx).NodeID()) + } + return hasAll +} + +// waitForSpanConfig waits until all servers in the test cluster have a span +// config for the key with the expected number of max bytes for the range. +func waitForSpanConfig( + t *testing.T, tc serverutils.TestClusterInterface, key roachpb.RKey, exp int64, +) { + testutils.SucceedsSoon(t, func() error { + for i := 0; i < tc.NumServers(); i++ { + s := tc.Server(i) + store, err := s.GetStores().(*kvserver.Stores).GetStore(s.GetFirstStoreID()) + if err != nil { + return errors.Wrapf(err, "missing store on server %d", i) + } + conf, err := store.GetStoreConfig().SpanConfigSubscriber.GetSpanConfigForKey(context.Background(), key) + if err != nil { + return errors.Wrapf(err, "missing span config for %s on server %d", key, i) + } + if conf.RangeMaxBytes != exp { + return errors.Errorf("expected %d max bytes, got %d", exp, conf.RangeMaxBytes) + } + } + return nil + }) +}