Skip to content

Commit

Permalink
kvserver: log traces from replicate queue on errors or slow processing
Browse files Browse the repository at this point in the history
While we previously had some logging from the replicate queue as a
result of the standard queue logging, this change adds logging to the
replicate queue when there are errors in processing a replica, or when
processing a replica exceeds a threshold, configured as a multiplier of
the estimated time to process the replica based on the snapshot rate
settings. When there are errors or the duration threshold is exceeded,
any error messages are logged along with the collected tracing spans
from the operation. By default, the logging threshold is disabled, but
the multiplier can be set via the setting
`kv.replicate.queue_process_logging_threshold`.

Release note (ops change): Added logging on replicate queue processing
in the presence of errors or when the duration exceeds the configured
threshold.
  • Loading branch information
AlexTalks committed Aug 12, 2022
1 parent 7c18668 commit 0fb0c80
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 9 deletions.
25 changes: 23 additions & 2 deletions pkg/kv/kvserver/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,29 @@ func defaultProcessTimeoutFunc(cs *cluster.Settings, _ replicaInQueue) time.Dura
//
// The parameter controls which rate(s) to use.
func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queueProcessTimeoutFunc {
return makeRateLimitedTimeoutFuncBySlowdownMultiplier(permittedRangeScanSlowdown, nil, queueGuaranteedProcessingTimeBudget, rateSettings...)
}

// makeRateLimitedTimeoutFuncBySlowdownMultiplier is a helper function that
// provides the internal implementation of makeRateLimitedTimeoutFunc, allowing
// an explicit slowdownMultiplier to be specified via a constant value or a
// setting, as well as a minimum timeout setting if applicable (i.e. non-zero).
// A multiplier set by slowdownMultiplierSetting will take precedence if
// provided, otherwise nil can be passed in.
func makeRateLimitedTimeoutFuncBySlowdownMultiplier(
slowdownMultiplier int64,
slowdownMultiplierSetting *settings.IntSetting,
minimumTimeoutSetting *settings.DurationSetting,
rateSettings ...*settings.ByteSizeSetting,
) queueProcessTimeoutFunc {
return func(cs *cluster.Settings, r replicaInQueue) time.Duration {
minimumTimeout := queueGuaranteedProcessingTimeBudget.Get(&cs.SV)
minimumTimeout := time.Duration(0)
if minimumTimeoutSetting != nil {
minimumTimeout = minimumTimeoutSetting.Get(&cs.SV)
}
if slowdownMultiplierSetting != nil {
slowdownMultiplier = slowdownMultiplierSetting.Get(&cs.SV)
}
// NB: In production code this will type assertion will always succeed.
// Some tests set up a fake implementation of replicaInQueue in which
// case we fall back to the configured minimum timeout.
Expand All @@ -95,7 +116,7 @@ func makeRateLimitedTimeoutFunc(rateSettings ...*settings.ByteSizeSetting) queue
}
}
estimatedDuration := time.Duration(repl.GetMVCCStats().Total()/minSnapshotRate) * time.Second
timeout := estimatedDuration * permittedRangeScanSlowdown
timeout := estimatedDuration * time.Duration(slowdownMultiplier)
if timeout < minimumTimeout {
timeout = minimumTimeout
}
Expand Down
70 changes: 63 additions & 7 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/cockroach/pkg/util/tracing/tracingpb"
"github.com/cockroachdb/errors"
"go.etcd.io/etcd/raft/v3"
)
Expand Down Expand Up @@ -71,6 +73,21 @@ var MinLeaseTransferInterval = settings.RegisterDurationSetting(
settings.NonNegativeDuration,
)

// SlowReplicateProcessThreshold is a multiplier of the estimated time to
// process a replica via the replicate queue, based on the current snapshot
// rate settings, such that if processing a replica takes longer than this value
// multiplied by the estimated time, recorded traces for the processing
// operation will be logged.
var SlowReplicateProcessThreshold = settings.RegisterIntSetting(
settings.SystemOnly,
"kv.replicate.queue_process_logging_threshold",
"multiplier of estimated replica processing duration, "+
"given snapshot rate settings, above which to output traces to logs "+
"(set to 0 to disable).",
0,
settings.NonNegativeInt,
)

var (
metaReplicateQueueAddReplicaCount = metric.Metadata{
Name: "queue.replicate.addreplica",
Expand Down Expand Up @@ -364,17 +381,19 @@ type replicateQueue struct {
purgCh <-chan time.Time
// updateCh is signalled every time there is an update to the cluster's store
// descriptors.
updateCh chan time.Time
lastLeaseTransfer atomic.Value // read and written by scanner & queue goroutines
updateCh chan time.Time
lastLeaseTransfer atomic.Value // read and written by scanner & queue goroutines
logTracesThresholdFunc queueProcessTimeoutFunc
}

// newReplicateQueue returns a new instance of replicateQueue.
func newReplicateQueue(store *Store, allocator allocatorimpl.Allocator) *replicateQueue {
rq := &replicateQueue{
metrics: makeReplicateQueueMetrics(),
allocator: allocator,
purgCh: time.NewTicker(replicateQueuePurgatoryCheckInterval).C,
updateCh: make(chan time.Time, 1),
metrics: makeReplicateQueueMetrics(),
allocator: allocator,
purgCh: time.NewTicker(replicateQueuePurgatoryCheckInterval).C,
updateCh: make(chan time.Time, 1),
logTracesThresholdFunc: makeRateLimitedTimeoutFuncBySlowdownMultiplier(0, SlowReplicateProcessThreshold, nil, rebalanceSnapshotRate, recoverySnapshotRate),
}
store.metrics.registry.AddMetricStruct(&rq.metrics)
rq.baseQueue = newBaseQueue(
Expand Down Expand Up @@ -515,7 +534,7 @@ func (rq *replicateQueue) process(
// usually signaling that a rebalancing reservation could not be made with the
// selected target.
for r := retry.StartWithCtx(ctx, retryOpts); r.Next(); {
requeue, err := rq.processOneChange(
requeue, err := rq.processOneChangeWithTracing(
ctx, repl, rq.canTransferLeaseFrom, false /* scatter */, false, /* dryRun */
)
if isSnapshotError(err) {
Expand Down Expand Up @@ -591,6 +610,43 @@ func (decommissionPurgatoryError) PurgatoryErrorMarker() {}

var _ PurgatoryError = decommissionPurgatoryError{}

// processOneChangeWithTracing executes processOneChange within a recording
// span, such that if we encounter an error or processing takes longer than the
// configured threshold, the recorded traces from the operation will be output
// to the logs, along with any associated errors.
func (rq *replicateQueue) processOneChangeWithTracing(
ctx context.Context,
repl *Replica,
canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool,
scatter, dryRun bool,
) (requeue bool, err error) {
tCtx, sp := tracing.EnsureChildSpan(ctx, rq.Tracer, "process replica", tracing.WithRecording(tracingpb.RecordingVerbose))
processStart := timeutil.Now()
defer func() {
processDuration := timeutil.Since(processStart)
loggingThreshold := rq.logTracesThresholdFunc(rq.store.cfg.Settings, repl)
exceededDuration := loggingThreshold > time.Duration(0) && processDuration > loggingThreshold
if err != nil {
rec := sp.FinishAndGetConfiguredRecording()
log.Warningf(ctx, "error processing replica: %v", err)
if rec != nil {
log.Warningf(ctx, "trace:\n%s", rec)
}
} else if exceededDuration {
rec := sp.FinishAndGetConfiguredRecording()
log.Infof(ctx, "processing replica took %s, exceeding threshold of %s", processDuration, loggingThreshold)
if rec != nil {
log.Infof(ctx, "trace:\n%s", rec)
}
} else {
sp.Finish()
}
}()

requeue, err = rq.processOneChange(tCtx, repl, canTransferLeaseFrom, scatter, dryRun)
return requeue, err
}

func (rq *replicateQueue) processOneChange(
ctx context.Context,
repl *Replica,
Expand Down
82 changes: 82 additions & 0 deletions pkg/kv/kvserver/replicate_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,88 @@ func TestReplicateQueueDecommissioningNonVoters(t *testing.T) {
})
}

// TestReplicateQueueTracingOnError tests that an error or slowdown in
// processing a replica results in traces being logged.
func TestReplicateQueueTracingOnError(t *testing.T) {
defer leaktest.AfterTest(t)()
s := log.ScopeWithoutShowLogs(t)
defer s.Close(t)

// NB: This test injects a fake failure during replica rebalancing, and we use
// this `rejectSnapshots` variable as a flag to activate or deactivate that
// injected failure.
var rejectSnapshots int64
ctx := context.Background()
tc := testcluster.StartTestCluster(
t, 4, base.TestClusterArgs{
ReplicationMode: base.ReplicationManual,
ServerArgs: base.TestServerArgs{Knobs: base.TestingKnobs{Store: &kvserver.StoreTestingKnobs{
ReceiveSnapshot: func(_ *kvserverpb.SnapshotRequest_Header) error {
if atomic.LoadInt64(&rejectSnapshots) == 1 {
return errors.Newf("boom")
}
return nil
},
}}},
},
)
defer tc.Stopper().Stop(ctx)

// Add a replica to the second and third nodes, and then decommission the
// second node. Since there are only 4 nodes in the cluster, the
// decommissioning replica must be rebalanced to the fourth node.
const decomNodeIdx = 1
const decomNodeID = 2
scratchKey := tc.ScratchRange(t)
tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx))
tc.AddVotersOrFatal(t, scratchKey, tc.Target(decomNodeIdx+1))
adminSrv := tc.Server(decomNodeIdx)
conn, err := adminSrv.RPCContext().GRPCDialNode(
adminSrv.RPCAddr(), adminSrv.NodeID(), rpc.DefaultClass).Connect(ctx)
require.NoError(t, err)
adminClient := serverpb.NewAdminClient(conn)
_, err = adminClient.Decommission(
ctx, &serverpb.DecommissionRequest{
NodeIDs: []roachpb.NodeID{decomNodeID},
TargetMembership: livenesspb.MembershipStatus_DECOMMISSIONING,
},
)
require.NoError(t, err)

// Activate the above testing knob to start rejecting future rebalances and
// then attempt to rebalance the decommissioning replica away. We expect a
// purgatory error to be returned here.
atomic.StoreInt64(&rejectSnapshots, 1)
store := tc.GetFirstStoreFromServer(t, 0)
repl, err := store.GetReplica(tc.LookupRangeOrFatal(t, scratchKey).RangeID)
require.NoError(t, err)
recording, processErr, enqueueErr := tc.GetFirstStoreFromServer(t, 0).Enqueue(
ctx, "replicate", repl, true /* skipShouldQueue */, false, /* async */
)
require.NoError(t, enqueueErr)
require.Error(t, processErr, "expected processing error")
processRecSpan, foundSpan := recording.FindSpan("process replica")
require.True(t, foundSpan)
foundParent := false
foundErr := false
foundTrace := false
for _, recSpan := range recording {
if recSpan.SpanID == processRecSpan.ParentSpanID {
foundParent = true
for _, logMsg := range recSpan.Logs {
if matched, matchErr := regexp.MatchString(`error processing replica:.*boom`, logMsg.Msg().StripMarkers()); matchErr == nil && matched {
foundErr = true
}
if matched, matchErr := regexp.MatchString(`trace:.*`, logMsg.Msg().StripMarkers()); matchErr == nil && matched {
foundTrace = true
}
}
break
}
}
require.True(t, foundParent && foundErr && foundTrace)
}

// TestReplicateQueueDecommissionPurgatoryError tests that failure to move a
// decommissioning replica puts it in the replicate queue purgatory.
func TestReplicateQueueDecommissionPurgatoryError(t *testing.T) {
Expand Down

0 comments on commit 0fb0c80

Please sign in to comment.