diff --git a/pkg/kv/kvserver/queue.go b/pkg/kv/kvserver/queue.go index c7b0497b12b0..2b0221c70f00 100644 --- a/pkg/kv/kvserver/queue.go +++ b/pkg/kv/kvserver/queue.go @@ -78,8 +78,29 @@ func defaultProcessTimeoutFunc(cs *cluster.Settings, _ replicaInQueue) time.Dura // // The parameter controls which rate(s) to use. func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queueProcessTimeoutFunc { + return makeRateLimitedTimeoutFuncBySlowdownMultiplier(permittedRangeScanSlowdown, nil, queueGuaranteedProcessingTimeBudget, rateSettings...) +} + +// makeRateLimitedTimeoutFuncBySlowdownMultiplier is a helper function that +// provides the internal implementation of makeRateLimitedTimeoutFunc, allowing +// an explicit slowdownMultiplier to be specified via a constant value or a +// setting, as well as a minimum timeout setting if applicable (i.e. non-zero). +// A multiplier set by slowdownMultiplierSetting will take precedence if +// provided, otherwise nil can be passed in. +func makeRateLimitedTimeoutFuncBySlowdownMultiplier( + slowdownMultiplier int64, + slowdownMultiplierSetting *settings.IntSetting, + minimumTimeoutSetting *settings.DurationSetting, + rateSettings ...*settings.ByteSizeSetting, +) queueProcessTimeoutFunc { return func(cs *cluster.Settings, r replicaInQueue) time.Duration { - minimumTimeout := queueGuaranteedProcessingTimeBudget.Get(&cs.SV) + minimumTimeout := time.Duration(0) + if minimumTimeoutSetting != nil { + minimumTimeout = minimumTimeoutSetting.Get(&cs.SV) + } + if slowdownMultiplierSetting != nil { + slowdownMultiplier = slowdownMultiplierSetting.Get(&cs.SV) + } // NB: In production code this will type assertion will always succeed. // Some tests set up a fake implementation of replicaInQueue in which // case we fall back to the configured minimum timeout. @@ -95,7 +116,7 @@ func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queue } } estimatedDuration := time.Duration(repl.GetMVCCStats().Total()/minSnapshotRate) * time.Second - timeout := estimatedDuration * permittedRangeScanSlowdown + timeout := estimatedDuration * time.Duration(slowdownMultiplier) if timeout < minimumTimeout { timeout = minimumTimeout } diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index f4178dedbe81..c9de0e85b7d7 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -34,6 +34,8 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/cockroachdb/cockroach/pkg/util/tracing" + "github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb" "github.com/cockroachdb/errors" "go.etcd.io/etcd/raft/v3" ) @@ -71,6 +73,21 @@ var MinLeaseTransferInterval = settings.RegisterDurationSetting( settings.NonNegativeDuration, ) +// SlowReplicateProcessThreshold is a multiplier of the estimated time to +// process a replica via the replicate queue, based on the current snapshot +// rate settings, such that if processing a replica takes longer than this value +// multiplied by the estimated time, recorded traces for the processing +// operation will be logged. +var SlowReplicateProcessThreshold = settings.RegisterIntSetting( + settings.SystemOnly, + "kv.replicate.queue_process_logging_threshold", + "multiplier of estimated replica processing duration, "+ + "given snapshot rate settings, above which to output traces to logs "+ + "(set to 0 to disable).", + 0, + settings.NonNegativeInt, +) + var ( metaReplicateQueueAddReplicaCount = metric.Metadata{ Name: "queue.replicate.addreplica", @@ -364,17 +381,19 @@ type replicateQueue struct { purgCh <-chan time.Time // updateCh is signalled every time there is an update to the cluster's store // descriptors. - updateCh chan time.Time - lastLeaseTransfer atomic.Value // read and written by scanner & queue goroutines + updateCh chan time.Time + lastLeaseTransfer atomic.Value // read and written by scanner & queue goroutines + logTracesThresholdFunc queueProcessTimeoutFunc } // newReplicateQueue returns a new instance of replicateQueue. func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replicateQueue { rq := &replicateQueue{ - metrics: makeReplicateQueueMetrics(), - allocator: allocator, - purgCh: time.NewTicker(replicateQueuePurgatoryCheckInterval).C, - updateCh: make(chan time.Time, 1), + metrics: makeReplicateQueueMetrics(), + allocator: allocator, + purgCh: time.NewTicker(replicateQueuePurgatoryCheckInterval).C, + updateCh: make(chan time.Time, 1), + logTracesThresholdFunc: makeRateLimitedTimeoutFuncBySlowdownMultiplier(0, SlowReplicateProcessThreshold, nil, rebalanceSnapshotRate, recoverySnapshotRate), } store.metrics.registry.AddMetricStruct(&rq.metrics) rq.baseQueue = newBaseQueue( @@ -515,7 +534,7 @@ func (rq *replicateQueue) process( // usually signaling that a rebalancing reservation could not be made with the // selected target. for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); { - requeue, err := rq.processOneChange( + requeue, err := rq.processOneChangeWithTracing( ctx, repl, rq.canTransferLeaseFrom, false /* scatter */, false, /* dryRun */ ) if isSnapshotError(err) { @@ -591,6 +610,43 @@ func (decommissionPurgatoryError) PurgatoryErrorMarker() {} var _ PurgatoryError = decommissionPurgatoryError{} +// processOneChangeWithTracing executes processOneChange within a recording +// span, such that if we encounter an error or processing takes longer than the +// configured threshold, the recorded traces from the operation will be output +// to the logs, along with any associated errors. +func (rq *replicateQueue) processOneChangeWithTracing( + ctx context.Context, + repl *Replica, + canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool, + scatter, dryRun bool, +) (requeue bool, err error) { + tCtx, sp := tracing.EnsureChildSpan(ctx, rq.Tracer, "process replica", tracing.WithRecording(tracingpb.RecordingVerbose)) + processStart := timeutil.Now() + defer func() { + processDuration := timeutil.Since(processStart) + loggingThreshold := rq.logTracesThresholdFunc(rq.store.cfg.Settings, repl) + exceededDuration := loggingThreshold > time.Duration(0) && processDuration > loggingThreshold + if err != nil { + rec := sp.FinishAndGetConfiguredRecording() + log.Warningf(ctx, "error processing replica: %v", err) + if rec != nil { + log.Warningf(ctx, "trace:\n%s", rec) + } + } else if exceededDuration { + rec := sp.FinishAndGetConfiguredRecording() + log.Infof(ctx, "processing replica took %s, exceeding threshold of %s", processDuration, loggingThreshold) + if rec != nil { + log.Infof(ctx, "trace:\n%s", rec) + } + } else { + sp.Finish() + } + }() + + requeue, err = rq.processOneChange(tCtx, repl, canTransferLeaseFrom, scatter, dryRun) + return requeue, err +} + func (rq *replicateQueue) processOneChange( ctx context.Context, repl *Replica, diff --git a/pkg/kv/kvserver/replicate_queue_test.go b/pkg/kv/kvserver/replicate_queue_test.go index b2fd78760175..9d2f56db712e 100644 --- a/pkg/kv/kvserver/replicate_queue_test.go +++ b/pkg/kv/kvserver/replicate_queue_test.go @@ -592,6 +592,88 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) { }) } +// TestReplicateQueueTracingOnError tests that an error or slowdown in +// processing a replica results in traces being logged. +func TestReplicateQueueTracingOnError(t *testing.T) { + defer leaktest.AfterTest(t)() + s := log.ScopeWithoutShowLogs(t) + defer s.Close(t) + + // NB: This test injects a fake failure during replica rebalancing, and we use + // this `rejectSnapshots` variable as a flag to activate or deactivate that + // injected failure. + var rejectSnapshots int64 + ctx := context.Background() + tc := testcluster.StartTestCluster( + t, 4, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{ + ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error { + if atomic.LoadInt64(&rejectSnapshots) == 1 { + return errors.Newf("boom") + } + return nil + }, + }}}, + }, + ) + defer tc.Stopper().Stop(ctx) + + // Add a replica to the second and third nodes, and then decommission the + // second node. Since there are only 4 nodes in the cluster, the + // decommissioning replica must be rebalanced to the fourth node. + const decomNodeIdx = 1 + const decomNodeID = 2 + scratchKey := tc.ScratchRange(t) + tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx)) + tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx+1)) + adminSrv := tc.Server(decomNodeIdx) + conn, err := adminSrv.RPCContext().GRPCDialNode( + adminSrv.RPCAddr(), adminSrv.NodeID(), rpc.DefaultClass).Connect(ctx) + require.NoError(t, err) + adminClient := serverpb.NewAdminClient(conn) + _, err = adminClient.Decommission( + ctx, &serverpb.DecommissionRequest{ + NodeIDs: []roachpb.NodeID{decomNodeID}, + TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING, + }, + ) + require.NoError(t, err) + + // Activate the above testing knob to start rejecting future rebalances and + // then attempt to rebalance the decommissioning replica away. We expect a + // purgatory error to be returned here. + atomic.StoreInt64(&rejectSnapshots, 1) + store := tc.GetFirstStoreFromServer(t, 0) + repl, err := store.GetReplica(tc.LookupRangeOrFatal(t, scratchKey).RangeID) + require.NoError(t, err) + recording, processErr, enqueueErr := tc.GetFirstStoreFromServer(t, 0).Enqueue( + ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */ + ) + require.NoError(t, enqueueErr) + require.Error(t, processErr, "expected processing error") + processRecSpan, foundSpan := recording.FindSpan("process replica") + require.True(t, foundSpan) + foundParent := false + foundErr := false + foundTrace := false + for _, recSpan := range recording { + if recSpan.SpanID == processRecSpan.ParentSpanID { + foundParent = true + for _, logMsg := range recSpan.Logs { + if matched, matchErr := regexp.MatchString(`error processing replica:.*boom`, logMsg.Msg().StripMarkers()); matchErr == nil && matched { + foundErr = true + } + if matched, matchErr := regexp.MatchString(`trace:.*`, logMsg.Msg().StripMarkers()); matchErr == nil && matched { + foundTrace = true + } + } + break + } + } + require.True(t, foundParent && foundErr && foundTrace) +} + // TestReplicateQueueDecommissionPurgatoryError tests that failure to move a // decommissioning replica puts it in the replicate queue purgatory. func TestReplicateQueueDecommissionPurgatoryError(t *testing.T) {