diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 17984d9d6860..bbd4db00ca59 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -100,6 +100,7 @@ go_library( "//pkg/kv/kvclient/rangestats", "//pkg/kv/kvprober", "//pkg/kv/kvserver", + "//pkg/kv/kvserver/allocator/allocatorimpl", "//pkg/kv/kvserver/allocator/storepool", "//pkg/kv/kvserver/closedts/ctpb", "//pkg/kv/kvserver/closedts/sidetransport", diff --git a/pkg/server/decommission.go b/pkg/server/decommission.go index 797891849232..702fb7f90e00 100644 --- a/pkg/server/decommission.go +++ b/pkg/server/decommission.go @@ -17,6 +17,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/allocatorimpl" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/allocator/storepool" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" "github.com/cockroachdb/cockroach/pkg/roachpb" @@ -24,8 +26,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/log/logpb" + "github.com/cockroachdb/cockroach/pkg/util/rangedesc" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "google.golang.org/grpc/codes" grpcstatus "google.golang.org/grpc/status" @@ -39,6 +43,25 @@ type decommissioningNodeMap struct { nodes map[roachpb.NodeID]interface{} } +// decommissionRangeCheckResult is the result of evaluating the allocator action +// and target for a single range that has an extant replica on a node targeted +// for decommission. +type decommissionRangeCheckResult struct { + desc *roachpb.RangeDescriptor + action allocatorimpl.AllocatorAction + tracingSpans tracingpb.Recording + err error +} + +// decommissionPreCheckResult is the result of checking the readiness +// of a node or set of nodes to be decommissioned. +type decommissionPreCheckResult struct { + rangesChecked int + replicasByNode map[roachpb.NodeID][]roachpb.ReplicaIdent + actionCounts map[allocatorimpl.AllocatorAction]int + rangesNotReady []decommissionRangeCheckResult +} + // makeOnNodeDecommissioningCallback returns a callback that enqueues the // decommissioning node's ranges into the `stores`' replicateQueues for // rebalancing. @@ -131,6 +154,172 @@ func getPingCheckDecommissionFn( } } +// DecommissionPreCheck is used to evaluate if nodes are ready for decommission, +// prior to starting the Decommission(..) process. This is evaluated by checking +// that any replicas on the given nodes are able to be replaced or removed, +// following the current state of the cluster as well as the configuration. +// If strictReadiness is true, all replicas are expected to need only replace +// or remove actions. If maxErrors >0, range checks will stop once maxError is +// reached. +func (s *Server) DecommissionPreCheck( + ctx context.Context, + nodeIDs []roachpb.NodeID, + strictReadiness bool, + collectTraces bool, + maxErrors int, +) (decommissionPreCheckResult, error) { + var rangesChecked int + decommissionCheckNodeIDs := make(map[roachpb.NodeID]struct{}) + replicasByNode := make(map[roachpb.NodeID][]roachpb.ReplicaIdent) + actionCounts := make(map[allocatorimpl.AllocatorAction]int) + var rangeErrors []decommissionRangeCheckResult + const pageSize = 10000 + + for _, nodeID := range nodeIDs { + decommissionCheckNodeIDs[nodeID] = struct{}{} + } + + // Counters need to be reset on any transaction retries during the scan + // through range descriptors. + initCounters := func() { + rangesChecked = 0 + for action := range actionCounts { + actionCounts[action] = 0 + } + rangeErrors = rangeErrors[:0] + for nid := range replicasByNode { + replicasByNode[nid] = replicasByNode[nid][:0] + } + } + + // Only check using the first store on this node, as they should all give + // identical results. + var evalStore *kvserver.Store + err := s.node.stores.VisitStores(func(s *kvserver.Store) error { + if evalStore == nil { + evalStore = s + } + return nil + }) + if err == nil && evalStore == nil { + err = errors.Errorf("n%d has no initialized store", s.NodeID()) + } + if err != nil { + return decommissionPreCheckResult{}, err + } + + // Define our node liveness override function to simulate that the nodeIDs + // for which we are checking decommission readiness are in the DECOMMISSIONING + // state. All other nodeIDs should use their actual liveness status. + var livenessFn storepool.NodeLivenessFunc = func(nid roachpb.NodeID, now time.Time, timeUntilStoreDead time.Duration) livenesspb.NodeLivenessStatus { + if _, ok := decommissionCheckNodeIDs[nid]; ok { + return livenesspb.NodeLivenessStatus_DECOMMISSIONING + } + + // By returning unknown, we signal to the store to look up the true liveness. + return livenesspb.NodeLivenessStatus_UNKNOWN + } + + // Define our replica filter to only look at the replicas on the checked nodes. + predHasDecommissioningReplica := func(rDesc roachpb.ReplicaDescriptor) bool { + _, ok := decommissionCheckNodeIDs[rDesc.NodeID] + return ok + } + + // Iterate through all range descriptors using the rangedesc.Scanner, which + // will perform the requisite meta1/meta2 lookups, including retries. + rangeDescScanner := rangedesc.NewScanner(s.db) + err = rangeDescScanner.Scan(ctx, pageSize, initCounters, keys.EverythingSpan, func(descriptors ...roachpb.RangeDescriptor) error { + for _, desc := range descriptors { + // Track replicas by node for recording purposes. + // Skip checks if this range doesn't exist on a potentially decommissioning node. + replicasToMove := desc.Replicas().FilterToDescriptors(predHasDecommissioningReplica) + if len(replicasToMove) == 0 { + continue + } + for _, rDesc := range replicasToMove { + rIdent := roachpb.ReplicaIdent{ + RangeID: desc.RangeID, + Replica: rDesc, + } + replicasByNode[rDesc.NodeID] = append(replicasByNode[rDesc.NodeID], rIdent) + } + + if maxErrors > 0 && len(rangeErrors) >= maxErrors { + // TODO(sarkesian): Consider adding a per-range descriptor iterator to + // rangedesc.Scanner, which will correctly stop iteration on the + // function returning iterutil.StopIteration(). + continue + } + + action, _, recording, rErr := evalStore.AllocatorCheckRange(ctx, &desc, livenessFn) + rangesChecked += 1 + actionCounts[action] += 1 + + if passed, checkResult := evaluateRangeCheckResult(strictReadiness, collectTraces, + &desc, action, recording, rErr, + ); !passed { + rangeErrors = append(rangeErrors, checkResult) + } + } + + return nil + }) + + return decommissionPreCheckResult{ + rangesChecked: rangesChecked, + replicasByNode: replicasByNode, + actionCounts: actionCounts, + rangesNotReady: rangeErrors, + }, err +} + +// evaluateRangeCheckResult returns true or false if the range has passed +// decommissioning checks (based on if we are testing strict readiness or not), +// as well as the encapsulated range check result with errors defined as needed. +func evaluateRangeCheckResult( + strictReadiness bool, + collectTraces bool, + desc *roachpb.RangeDescriptor, + action allocatorimpl.AllocatorAction, + recording tracingpb.Recording, + rErr error, +) (passed bool, _ decommissionRangeCheckResult) { + checkResult := decommissionRangeCheckResult{ + desc: desc, + action: action, + } + + if collectTraces { + checkResult.tracingSpans = recording + } + + if rErr != nil { + return false, checkResult + } + + if action == allocatorimpl.AllocatorRangeUnavailable || + action == allocatorimpl.AllocatorNoop || + action == allocatorimpl.AllocatorConsiderRebalance { + checkResult.err = errors.Errorf("range r%d requires unexpected allocation action: %s", + desc.RangeID, action, + ) + return false, checkResult + } + + if strictReadiness && !(action == allocatorimpl.AllocatorReplaceDecommissioningVoter || + action == allocatorimpl.AllocatorReplaceDecommissioningNonVoter || + action == allocatorimpl.AllocatorRemoveDecommissioningVoter || + action == allocatorimpl.AllocatorRemoveDecommissioningNonVoter) { + checkResult.err = errors.Errorf( + "range r%d needs repair beyond replacing/removing the decommissioning replica", + ) + return false, checkResult + } + + return true, checkResult +} + // Decommission idempotently sets the decommissioning flag for specified nodes. // The error return is a gRPC error. func (s *Server) Decommission(