Skip to content

Commit

Permalink
kvserver: decrease trace verbosity for change replicas db operations
Browse files Browse the repository at this point in the history
While we introduced logging of traces from replicate queue processing
in cockroachdb#86007, it was noted that when collecting traces with verbose recording
enabled, a significant fraction of these traces were from low-level
database operations, particularly during the transaction run in
`execChangeReplicasTxn(..)`. This function performs necessary
bookkeeping of range descriptors when replica movement occurs, however
the high verbosity on the low-level database operations within the
transaction is not necessary when examining traces from higher-level
operations like replicate queue processing. Hence, this change
creates child tracing spans within `execChangeReplicasTxn` which
can be filtered out prior to logging the traces in the replicate queue
processing loop. This decreases the size of the logged trace by a factor
of 10, allowing the resulting log message to be much more succinct and
focused.

Release justification: Low-risk, high benefit observability change.
Release note: None
  • Loading branch information
AlexTalks committed Aug 25, 2022
1 parent 6c45224 commit 28e89ae
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 105 deletions.
5 changes: 4 additions & 1 deletion pkg/kv/kvserver/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ const (
// the key space being written is starting out empty.
optimizePutThreshold = 10

// Transaction names used for range changes.
// Transaction names and operations used for range changes.
// Note that those names are used by tests to perform request filtering
// in absence of better criteria. If names are changed, tests should be
// updated accordingly to avoid flakiness.
Expand All @@ -77,6 +77,9 @@ const (
splitTxnName = "split"
mergeTxnName = "merge"

replicaChangeTxnGetDescOpName = "change-replica-get-desc"
replicaChangeTxnUpdateDescOpName = "change-replica-update-desc"

defaultReplicaRaftMuWarnThreshold = 500 * time.Millisecond
)

Expand Down
230 changes: 127 additions & 103 deletions pkg/kv/kvserver/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,7 @@ func (r *Replica) changeReplicasImpl(
// here.
swaps := getInternalChangesForExplicitPromotionsAndDemotions(targets.voterDemotions, targets.nonVoterPromotions)
if len(swaps) > 0 {
desc, err = execChangeReplicasTxn(ctx, desc, reason, details, swaps, changeReplicasTxnArgs{
desc, err = execChangeReplicasTxn(ctx, r.store.cfg.Tracer(), desc, reason, details, swaps, changeReplicasTxnArgs{
db: r.store.DB(),
liveAndDeadReplicas: r.store.cfg.StorePool.LiveAndDeadReplicas,
logChange: r.store.logChange,
Expand Down Expand Up @@ -1114,7 +1114,7 @@ func (r *Replica) changeReplicasImpl(
for _, rem := range removals {
iChgs := []internalReplicationChange{{target: rem, typ: internalChangeTypeRemoveNonVoter}}
var err error
desc, err = execChangeReplicasTxn(ctx, desc, reason, details, iChgs,
desc, err = execChangeReplicasTxn(ctx, r.store.cfg.Tracer(), desc, reason, details, iChgs,
changeReplicasTxnArgs{
db: r.store.DB(),
liveAndDeadReplicas: r.store.cfg.StorePool.LiveAndDeadReplicas,
Expand Down Expand Up @@ -1256,7 +1256,7 @@ func (r *Replica) maybeLeaveAtomicChangeReplicas(
// TODO(tbg): reconsider this.
s := r.store
return execChangeReplicasTxn(
ctx, desc, kvserverpb.ReasonUnknown /* unused */, "", nil, /* iChgs */
ctx, s.cfg.Tracer(), desc, kvserverpb.ReasonUnknown /* unused */, "", nil, /* iChgs */
changeReplicasTxnArgs{
db: s.DB(),
liveAndDeadReplicas: s.cfg.StorePool.LiveAndDeadReplicas,
Expand All @@ -1271,7 +1271,7 @@ func (r *Replica) TestingRemoveLearner(
ctx context.Context, beforeDesc *roachpb.RangeDescriptor, target roachpb.ReplicationTarget,
) (*roachpb.RangeDescriptor, error) {
desc, err := execChangeReplicasTxn(
ctx, beforeDesc, kvserverpb.ReasonAbandonedLearner, "",
ctx, r.store.cfg.Tracer(), beforeDesc, kvserverpb.ReasonAbandonedLearner, "",
[]internalReplicationChange{{target: target, typ: internalChangeTypeRemoveLearner}},
changeReplicasTxnArgs{
db: r.store.DB(),
Expand Down Expand Up @@ -1348,7 +1348,7 @@ func (r *Replica) maybeLeaveAtomicChangeReplicasAndRemoveLearners(
for _, target := range targets {
var err error
desc, err = execChangeReplicasTxn(
ctx, desc, kvserverpb.ReasonAbandonedLearner, "",
ctx, store.cfg.Tracer(), desc, kvserverpb.ReasonAbandonedLearner, "",
[]internalReplicationChange{{target: target, typ: internalChangeTypeRemoveLearner}},
changeReplicasTxnArgs{db: store.DB(),
liveAndDeadReplicas: store.cfg.StorePool.LiveAndDeadReplicas,
Expand Down Expand Up @@ -1723,7 +1723,7 @@ func (r *Replica) initializeRaftLearners(
iChgs := []internalReplicationChange{{target: target, typ: iChangeType}}
var err error
desc, err = execChangeReplicasTxn(
ctx, desc, reason, details, iChgs, changeReplicasTxnArgs{
ctx, r.store.cfg.Tracer(), desc, reason, details, iChgs, changeReplicasTxnArgs{
db: r.store.DB(),
liveAndDeadReplicas: r.store.cfg.StorePool.LiveAndDeadReplicas,
logChange: r.store.logChange,
Expand Down Expand Up @@ -1876,7 +1876,7 @@ func (r *Replica) execReplicationChangesForVoters(
iChgs = append(iChgs, internalReplicationChange{target: target, typ: typ})
}

desc, err = execChangeReplicasTxn(ctx, desc, reason, details, iChgs, changeReplicasTxnArgs{
desc, err = execChangeReplicasTxn(ctx, r.store.cfg.Tracer(), desc, reason, details, iChgs, changeReplicasTxnArgs{
db: r.store.DB(),
liveAndDeadReplicas: r.store.cfg.StorePool.LiveAndDeadReplicas,
logChange: r.store.logChange,
Expand Down Expand Up @@ -1933,7 +1933,7 @@ func (r *Replica) tryRollbackRaftLearner(

rollbackFn := func(ctx context.Context) error {
_, err := execChangeReplicasTxn(
ctx, rangeDesc, reason, details,
ctx, r.store.cfg.Tracer(), rangeDesc, reason, details,
[]internalReplicationChange{{target: target, typ: removeChgType}},
changeReplicasTxnArgs{
db: r.store.DB(),
Expand Down Expand Up @@ -2211,6 +2211,7 @@ type changeReplicasTxnArgs struct {
// their expectations.
func execChangeReplicasTxn(
ctx context.Context,
tracer *tracing.Tracer,
referenceDesc *roachpb.RangeDescriptor,
reason kvserverpb.RangeLogEventReason,
details string,
Expand Down Expand Up @@ -2264,118 +2265,141 @@ func execChangeReplicasTxn(
if err := args.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
log.Event(ctx, "attempting txn")
txn.SetDebugName(replicaChangeTxnName)
desc, dbDescValue, skip, err := conditionalGetDescValueFromDB(
ctx, txn, referenceDesc.StartKey, false /* forUpdate */, check)
if err != nil {
return err
}
if skip {
// The new descriptor already reflects what we needed to get done.
returnDesc = desc
return nil
}
// Note that we are now using the descriptor from KV, not the one passed
// into this method.
crt, err := prepareChangeReplicasTrigger(ctx, desc, chgs, args.testForceJointConfig)
if err != nil {
return err
}
log.Infof(ctx, "change replicas (add %v remove %v): existing descriptor %s", crt.Added(), crt.Removed(), desc)

// NB: we haven't written any intents yet, so even in the unlikely case in which
// this is held up, we won't block anyone else.
if err := within10s(ctx, func() error {
if args.testAllowDangerousReplicationChanges {
var desc *roachpb.RangeDescriptor
var dbDescValue []byte
var crt *roachpb.ChangeReplicasTrigger
var err error

// The transaction uses child tracing spans so that low-level database
// traces from the operations within can be encapsulated. This way, they
// may be redacted when being logged during higher-level operations, such
// as replicate queue processing.
parentSp := tracing.SpanFromContext(ctx)
{
ctx, sp := tracer.StartSpanCtx(ctx, replicaChangeTxnGetDescOpName, tracing.WithParent(parentSp))
defer sp.Finish()

var skip bool
desc, dbDescValue, skip, err = conditionalGetDescValueFromDB(
ctx, txn, referenceDesc.StartKey, false /* forUpdate */, check)
if err != nil {
return err
}
if skip {
// The new descriptor already reflects what we needed to get done.
returnDesc = desc
return nil
}
// Run (and retry for a bit) a sanity check that the configuration
// resulting from this change is able to meet quorum. It's
// important to do this at this low layer as there are multiple
// entry points that are not generally too careful. For example,
// before the below check existed, the store rebalancer could
// carry out operations that would lead to a loss of quorum.
//
// See:
// https://github.com/cockroachdb/cockroach/issues/54444#issuecomment-707706553
replicas := crt.Desc.Replicas()
// We consider stores marked as "suspect" to be alive for the purposes of
// determining whether the range can achieve quorum since these stores are
// known to be currently live but have failed a liveness heartbeat in the
// recent past.
//
// Note that the allocator will avoid rebalancing to stores that are
// currently marked suspect. See uses of StorePool.getStoreList() in
// allocator.go.
liveReplicas, _ := args.liveAndDeadReplicas(replicas.Descriptors(), true /* includeSuspectAndDrainingStores */)
if !replicas.CanMakeProgress(
func(rDesc roachpb.ReplicaDescriptor) bool {
for _, inner := range liveReplicas {
if inner.ReplicaID == rDesc.ReplicaID {
return true
}
}
return false
}) {
// NB: we use newQuorumError which is recognized by the replicate queue.
return newQuorumError("range %s cannot make progress with proposed changes add=%v del=%v "+
"based on live replicas %v", crt.Desc, crt.Added(), crt.Removed(), liveReplicas)
// Note that we are now using the descriptor from KV, not the one passed
// into this method.
crt, err = prepareChangeReplicasTrigger(ctx, desc, chgs, args.testForceJointConfig)
if err != nil {
return err
}
return nil
}); err != nil {
return err
}

{
b := txn.NewBatch()
log.Infof(ctx, "change replicas (add %v remove %v): existing descriptor %s", crt.Added(), crt.Removed(), desc)

// Important: the range descriptor must be the first thing touched in the transaction
// so the transaction record is co-located with the range being modified.
if err := updateRangeDescriptor(ctx, b, descKey, dbDescValue, crt.Desc); err != nil {
{
ctx, sp := tracer.StartSpanCtx(ctx, replicaChangeTxnUpdateDescOpName, tracing.WithParent(parentSp))
defer sp.Finish()

// NB: we haven't written any intents yet, so even in the unlikely case in which
// this is held up, we won't block anyone else.
if err := within10s(ctx, func() error {
if args.testAllowDangerousReplicationChanges {
return nil
}
// Run (and retry for a bit) a sanity check that the configuration
// resulting from this change is able to meet quorum. It's
// important to do this at this low layer as there are multiple
// entry points that are not generally too careful. For example,
// before the below check existed, the store rebalancer could
// carry out operations that would lead to a loss of quorum.
//
// See:
// https://github.com/cockroachdb/cockroach/issues/54444#issuecomment-707706553
replicas := crt.Desc.Replicas()
// We consider stores marked as "suspect" to be alive for the purposes of
// determining whether the range can achieve quorum since these stores are
// known to be currently live but have failed a liveness heartbeat in the
// recent past.
//
// Note that the allocator will avoid rebalancing to stores that are
// currently marked suspect. See uses of StorePool.getStoreList() in
// allocator.go.
liveReplicas, _ := args.liveAndDeadReplicas(replicas.Descriptors(), true /* includeSuspectAndDrainingStores */)
if !replicas.CanMakeProgress(
func(rDesc roachpb.ReplicaDescriptor) bool {
for _, inner := range liveReplicas {
if inner.ReplicaID == rDesc.ReplicaID {
return true
}
}
return false
}) {
// NB: we use newQuorumError which is recognized by the replicate queue.
return newQuorumError("range %s cannot make progress with proposed changes add=%v del=%v "+
"based on live replicas %v", crt.Desc, crt.Added(), crt.Removed(), liveReplicas)
}
return nil
}); err != nil {
return err
}

// Run transaction up to this point to create txn record early (see #9265).
if err := txn.Run(ctx, b); err != nil {
{
b := txn.NewBatch()

// Important: the range descriptor must be the first thing touched in the transaction
// so the transaction record is co-located with the range being modified.
if err := updateRangeDescriptor(ctx, b, descKey, dbDescValue, crt.Desc); err != nil {
return err
}

// Run transaction up to this point to create txn record early (see #9265).
if err := txn.Run(ctx, b); err != nil {
return err
}
}

// Log replica change into range event log.
err = recordRangeEventsInLog(
ctx, txn, true /* added */, crt.Added(), crt.Desc, reason, details, args.logChange,
)
if err != nil {
return err
}
err = recordRangeEventsInLog(
ctx, txn, false /* added */, crt.Removed(), crt.Desc, reason, details, args.logChange,
)
if err != nil {
return err
}
}

// Log replica change into range event log.
err = recordRangeEventsInLog(
ctx, txn, true /* added */, crt.Added(), crt.Desc, reason, details, args.logChange,
)
if err != nil {
return err
}
err = recordRangeEventsInLog(
ctx, txn, false /* added */, crt.Removed(), crt.Desc, reason, details, args.logChange,
)
if err != nil {
return err
}
// End the transaction manually instead of letting RunTransaction
// loop do it, in order to provide a commit trigger.
b := txn.NewBatch()

// End the transaction manually instead of letting RunTransaction
// loop do it, in order to provide a commit trigger.
b := txn.NewBatch()
// Update range descriptor addressing record(s).
if err := updateRangeAddressing(b, crt.Desc); err != nil {
return err
}

// Update range descriptor addressing record(s).
if err := updateRangeAddressing(b, crt.Desc); err != nil {
return err
}
b.AddRawRequest(&roachpb.EndTxnRequest{
Commit: true,
InternalCommitTrigger: &roachpb.InternalCommitTrigger{
ChangeReplicasTrigger: crt,
},
})
if err := txn.Run(ctx, b); err != nil {
log.Eventf(ctx, "%v", err)
return err
}

b.AddRawRequest(&roachpb.EndTxnRequest{
Commit: true,
InternalCommitTrigger: &roachpb.InternalCommitTrigger{
ChangeReplicasTrigger: crt,
},
})
if err := txn.Run(ctx, b); err != nil {
log.Eventf(ctx, "%v", err)
return err
returnDesc = crt.Desc
return nil
}

returnDesc = crt.Desc
return nil
}); err != nil {
log.Eventf(ctx, "%v", err)
// NB: desc may not be the descriptor we actually compared against, but
Expand Down
36 changes: 35 additions & 1 deletion pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,6 +744,30 @@ func (decommissionPurgatoryError) PurgatoryErrorMarker() {}

var _ PurgatoryError = decommissionPurgatoryError{}

// filterTracingSpans is a utility for processOneChangeWithTracing in order to
// remove spans with Operation names in opNamesToFilter, as well as all of
// their child spans, to exclude overly verbose spans prior to logging.
func filterTracingSpans(rec tracingpb.Recording, opNamesToFilter ...string) tracingpb.Recording {
excludedOpNames := make(map[string]struct{})
excludedSpanIDs := make(map[tracingpb.SpanID]struct{})
for _, opName := range opNamesToFilter {
excludedOpNames[opName] = struct{}{}
}

filteredRecording := make(tracingpb.Recording, 0, rec.Len())
for _, span := range rec {
_, excludedByOpName := excludedOpNames[span.Operation]
_, excludedByParentSpanID := excludedSpanIDs[span.ParentSpanID]
if excludedByOpName || excludedByParentSpanID {
excludedSpanIDs[span.SpanID] = struct{}{}
} else {
filteredRecording = append(filteredRecording, span)
}
}

return filteredRecording
}

// processOneChangeWithTracing executes processOneChange within a tracing span,
// logging the resulting traces to the DEV channel in the case of errors or
// when the configured log traces threshold is exceeded.
Expand All @@ -764,10 +788,20 @@ func (rq *replicateQueue) processOneChangeWithTracing(
// traces from a child context into its parent.
{
ctx := repl.AnnotateCtx(rq.AnnotateCtx(context.Background()))
rec := sp.GetConfiguredRecording()
var rec tracingpb.Recording
processDuration := timeutil.Since(processStart)
loggingThreshold := rq.logTracesThresholdFunc(rq.store.cfg.Settings, repl)
exceededDuration := loggingThreshold > time.Duration(0) && processDuration > loggingThreshold

loggingNeeded := err != nil || exceededDuration
if loggingNeeded {
rec = sp.GetConfiguredRecording()
// If we have a tracing span from execChangeReplicasTxn, filter it from
// the recording so that we can render the traces to the log without it,
// as the traces from this span (and its children) are highly verbose.
rec = filterTracingSpans(rec, replicaChangeTxnGetDescOpName, replicaChangeTxnUpdateDescOpName)
}

if err != nil {
// TODO(sarkesian): Utilize Allocator log channel once available.
log.Warningf(ctx, "error processing replica: %v\ntrace:\n%s", err, rec)
Expand Down
Loading

0 comments on commit 28e89ae

Please sign in to comment.