Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
87159: kvserver: log replicate queue processing errors via KvDistribution channel r=AlexTalks a=AlexTalks

While we introduced logging of replicate queue processing errors, as well
as the collected traces, in #86007, we also introduced a new logging
channel `KvDistribution` in #85688. This changes the log channel of
those errors to the new `KvDistribution` channel, and also modifies the
output to only log the traces if verbosity for the module is >=1.

Release justification: Low risk observability change.
Release note: None

87256: roachtest: loq_recovery fix incorrect error handling r=aliher1911 a=aliher1911

Previously, if attempt to set snapshot rates failed, then whole test
fails. It is not correct as we report any unhealthy outcomes as
histogram value and succeed the test.

Release justification: roachtest fix
Release note: None

Fixes #85721

87283: allocator: increase log verbosity r=lidorcarmel a=lidorcarmel

Now that we log distribution logs to a separate log file (#85688), we can
enable more logging by default (without setting vmodule).

This PR enables most log messages in the store rebalancer, and some in the
replicate queue. The rational is that the store rebalancer iterates over
128 replicas every minute and therefore we can allow logging per replica.
We roughly expect:
200 bytes per log message x 128 x 10 messages = 250K per minute,
or 15MB per hour. The replicate queue tries to avoid per replica logging
(e.g in functions such as shouldEnqueue), instead it will log when an
action such as adding a replica is executed.

In a roachtest, running drain and decommission when adding a new node we
see only a few MBs per hour in the kv-distribution log file.

Release justification: low risk, will help debugging.
Release note: None

87286: rowexec: make sure to close generator builtin with error in Start r=yuzefovich a=yuzefovich

Previously, if an error is returned by `Start` method of the generator
builtin, we would never close it which could lead to leaking of
resources. This is now fixed.

Fixes: #87248.

Release justification: bug fix.

Release note: None

Co-authored-by: Alex Sarkesian <[email protected]>
Co-authored-by: Oleg Afanasyev <[email protected]>
Co-authored-by: Lidor Carmel <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
5 people committed Sep 1, 2022
5 parents 6e7de35 + 40feae5 + d52dcf5 + c8b50df + 0917ce3 commit d470252
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 63 deletions.
6 changes: 5 additions & 1 deletion pkg/cmd/roachtest/tests/loss_of_quorum_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,11 @@ func runRecoverLossOfQuorum(ctx context.Context, t test.Test, c cluster.Cluster,
t.L().Printf("workload finished")

t.L().Printf("ensuring nodes are decommissioned")
require.NoError(t, setSnapshotRate(db, 512))
if err := setSnapshotRate(db, 512); err != nil {
// Set snapshot executes SQL query against cluster, if query failed then
// cluster is not healthy after recovery and that means restart failed.
return &recoveryImpossibleError{testOutcome: restartFailed}
}
if err := contextutil.RunWithTimeout(ctx, "decommission-removed-nodes", 5*time.Minute,
func(ctx context.Context) error {
decommissionCmd := fmt.Sprintf(
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/allocator/allocatorimpl/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2204,7 +2204,7 @@ func loadBasedLeaseRebalanceScore(
underfullScore := underfullLeaseThreshold - remoteStore.Capacity.LeaseCount
totalScore := overfullScore + underfullScore

log.KvDistribution.VEventf(ctx, 1,
log.KvDistribution.Infof(ctx,
"node: %d, sourceWeight: %.2f, remoteWeight: %.2f, remoteLatency: %v, "+
"rebalanceThreshold: %.2f, meanLeases: %.2f, sourceLeaseCount: %d, overfullThreshold: %d, "+
"remoteLeaseCount: %d, underfullThreshold: %d, totalScore: %d",
Expand Down
66 changes: 32 additions & 34 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,16 +678,16 @@ func (rq *replicateQueue) process(
ctx, repl, rq.canTransferLeaseFrom, false /* scatter */, false, /* dryRun */
)
if isSnapshotError(err) {
// If ChangeReplicas failed because the snapshot failed, we log the
// error but then return success indicating we should retry the
// operation. The most likely causes of the snapshot failing are a
// declined reservation or the remote node being unavailable. In either
// case we don't want to wait another scanner cycle before reconsidering
// the range.
// If ChangeReplicas failed because the snapshot failed, we attempt to
// retry the operation. The most likely causes of the snapshot failing
// are a declined reservation (i.e. snapshot queue too long, or timeout
// while waiting in queue) or the remote node being unavailable. In
// either case we don't want to wait another scanner cycle before
// reconsidering the range.
// NB: The reason we are retrying snapshot failures immediately is that
// the recipient node will be "blocked" by a snapshot send failure for a
// few seconds. By retrying immediately we will choose the "second best"
// node to send to.
// few seconds. By retrying immediately we will choose another equally
// "good" target store chosen by the allocator.
// TODO(baptist): This is probably suboptimal behavior. In the case where
// there is only one option for a recipient, we will block the entire
// replicate queue until we are able to send this through. Also even if
Expand Down Expand Up @@ -799,23 +799,23 @@ func (rq *replicateQueue) processOneChangeWithTracing(
loggingThreshold := rq.logTracesThresholdFunc(rq.store.cfg.Settings, repl)
exceededDuration := loggingThreshold > time.Duration(0) && processDuration > loggingThreshold

loggingNeeded := err != nil || exceededDuration
if loggingNeeded {
var traceOutput string
traceLoggingNeeded := err != nil || exceededDuration
if traceLoggingNeeded {
// If we have tracing spans from execChangeReplicasTxn, filter it from
// the recording so that we can render the traces to the log without it,
// as the traces from this span (and its children) are highly verbose.
rec = filterTracingSpans(sp.GetConfiguredRecording(),
replicaChangeTxnGetDescOpName, replicaChangeTxnUpdateDescOpName,
)
traceOutput = fmt.Sprintf("\ntrace:\n%s", rec)
}

if err != nil {
// TODO(sarkesian): Utilize Allocator log channel once available.
log.Warningf(ctx, "error processing replica: %v\ntrace:\n%s", err, rec)
log.KvDistribution.Warningf(ctx, "error processing replica: %v%s", err, traceOutput)
} else if exceededDuration {
// TODO(sarkesian): Utilize Allocator log channel once available.
log.Infof(ctx, "processing replica took %s, exceeding threshold of %s\ntrace:\n%s",
processDuration, loggingThreshold, rec)
log.KvDistribution.Infof(ctx, "processing replica took %s, exceeding threshold of %s%s",
processDuration, loggingThreshold, traceOutput)
}
}

Expand Down Expand Up @@ -1142,14 +1142,14 @@ func (rq *replicateQueue) addOrReplaceVoters(
ops = roachpb.MakeReplicationChanges(roachpb.ADD_VOTER, newVoter)
}
if removeIdx < 0 {
log.KvDistribution.VEventf(ctx, 1, "adding voter %+v: %s",
log.KvDistribution.Infof(ctx, "adding voter %+v: %s",
newVoter, rangeRaftProgress(repl.RaftStatus(), existingVoters))
} else {
if !dryRun {
rq.metrics.trackRemoveMetric(allocatorimpl.VoterTarget, replicaStatus)
}
removeVoter := existingVoters[removeIdx]
log.KvDistribution.VEventf(ctx, 1, "replacing voter %s with %+v: %s",
log.KvDistribution.Infof(ctx, "replacing voter %s with %+v: %s",
removeVoter, newVoter, rangeRaftProgress(repl.RaftStatus(), existingVoters))
// NB: We may have performed a promotion of a non-voter above, but we will
// not perform a demotion here and instead just remove the existing replica
Expand Down Expand Up @@ -1206,14 +1206,14 @@ func (rq *replicateQueue) addOrReplaceNonVoters(

ops := roachpb.MakeReplicationChanges(roachpb.ADD_NON_VOTER, newNonVoter)
if removeIdx < 0 {
log.KvDistribution.VEventf(ctx, 1, "adding non-voter %+v: %s",
log.KvDistribution.Infof(ctx, "adding non-voter %+v: %s",
newNonVoter, rangeRaftProgress(repl.RaftStatus(), existingNonVoters))
} else {
if !dryRun {
rq.metrics.trackRemoveMetric(allocatorimpl.NonVoterTarget, replicaStatus)
}
removeNonVoter := existingNonVoters[removeIdx]
log.KvDistribution.VEventf(ctx, 1, "replacing non-voter %s with %+v: %s",
log.KvDistribution.Infof(ctx, "replacing non-voter %s with %+v: %s",
removeNonVoter, newNonVoter, rangeRaftProgress(repl.RaftStatus(), existingNonVoters))
ops = append(ops,
roachpb.MakeReplicationChanges(roachpb.REMOVE_NON_VOTER, roachpb.ReplicationTarget{
Expand Down Expand Up @@ -1403,7 +1403,7 @@ func (rq *replicateQueue) removeVoter(
rq.metrics.trackRemoveMetric(allocatorimpl.VoterTarget, allocatorimpl.Alive)
}

log.KvDistribution.VEventf(ctx, 1, "removing voting replica %+v due to over-replication: %s",
log.KvDistribution.Infof(ctx, "removing voting replica %+v due to over-replication: %s",
removeVoter, rangeRaftProgress(repl.RaftStatus(), existingVoters))
desc := repl.Desc()
// TODO(aayush): Directly removing the voter here is a bit of a missed
Expand Down Expand Up @@ -1449,7 +1449,7 @@ func (rq *replicateQueue) removeNonVoter(
if !dryRun {
rq.metrics.trackRemoveMetric(allocatorimpl.NonVoterTarget, allocatorimpl.Alive)
}
log.KvDistribution.VEventf(ctx, 1, "removing non-voting replica %+v due to over-replication: %s",
log.KvDistribution.Infof(ctx, "removing non-voting replica %+v due to over-replication: %s",
removeNonVoter, rangeRaftProgress(repl.RaftStatus(), existingVoters))
target := roachpb.ReplicationTarget{
NodeID: removeNonVoter.NodeID,
Expand Down Expand Up @@ -1491,7 +1491,7 @@ func (rq *replicateQueue) removeDecommissioning(
}

if len(decommissioningReplicas) == 0 {
log.KvDistribution.VEventf(ctx, 1, "range of %[1]ss %[2]s was identified as having decommissioning %[1]ss, "+
log.KvDistribution.Infof(ctx, "range of %[1]ss %[2]s was identified as having decommissioning %[1]ss, "+
"but no decommissioning %[1]ss were found", targetType, repl)
return true, nil
}
Expand All @@ -1511,7 +1511,7 @@ func (rq *replicateQueue) removeDecommissioning(
if !dryRun {
rq.metrics.trackRemoveMetric(targetType, allocatorimpl.Decommissioning)
}
log.KvDistribution.VEventf(ctx, 1, "removing decommissioning %s %+v from store", targetType, decommissioningReplica)
log.KvDistribution.Infof(ctx, "removing decommissioning %s %+v from store", targetType, decommissioningReplica)
target := roachpb.ReplicationTarget{
NodeID: decommissioningReplica.NodeID,
StoreID: decommissioningReplica.StoreID,
Expand Down Expand Up @@ -1540,9 +1540,8 @@ func (rq *replicateQueue) removeDead(
) (requeue bool, _ error) {
desc := repl.Desc()
if len(deadReplicas) == 0 {
log.KvDistribution.VEventf(
log.KvDistribution.Infof(
ctx,
1,
"range of %[1]s %[2]s was identified as having dead %[1]ss, but no dead %[1]ss were found",
targetType,
repl,
Expand All @@ -1553,7 +1552,7 @@ func (rq *replicateQueue) removeDead(
if !dryRun {
rq.metrics.trackRemoveMetric(targetType, allocatorimpl.Dead)
}
log.KvDistribution.VEventf(ctx, 1, "removing dead %s %+v from store", targetType, deadReplica)
log.KvDistribution.Infof(ctx, "removing dead %s %+v from store", targetType, deadReplica)
target := roachpb.ReplicationTarget{
NodeID: deadReplica.NodeID,
StoreID: deadReplica.StoreID,
Expand Down Expand Up @@ -1609,7 +1608,7 @@ func (rq *replicateQueue) considerRebalance(
if !ok {
// If there was nothing to do for the set of voting replicas on this
// range, attempt to rebalance non-voters.
log.KvDistribution.VEventf(ctx, 1, "no suitable rebalance target for voters")
log.KvDistribution.Infof(ctx, "no suitable rebalance target for voters")
addTarget, removeTarget, details, ok = rq.allocator.RebalanceNonVoter(
ctx,
conf,
Expand All @@ -1629,12 +1628,12 @@ func (rq *replicateQueue) considerRebalance(
lhBeingRemoved := removeTarget.StoreID == repl.store.StoreID()

if !ok {
log.KvDistribution.VEventf(ctx, 1, "no suitable rebalance target for non-voters")
log.KvDistribution.Infof(ctx, "no suitable rebalance target for non-voters")
} else if !lhRemovalAllowed {
if done, err := rq.maybeTransferLeaseAway(
ctx, repl, removeTarget.StoreID, dryRun, canTransferLeaseFrom,
); err != nil {
log.KvDistribution.VEventf(ctx, 1, "want to remove self, but failed to transfer lease away: %s", err)
log.KvDistribution.Infof(ctx, "want to remove self, but failed to transfer lease away: %s", err)
ok = false
} else if done {
// Lease is now elsewhere, so we're not in charge any more.
Expand All @@ -1656,8 +1655,7 @@ func (rq *replicateQueue) considerRebalance(
rq.metrics.NonVoterPromotionsCount.Inc(1)
}
}
log.KvDistribution.VEventf(ctx,
1,
log.KvDistribution.Infof(ctx,
"rebalancing %s %+v to %+v: %s",
rebalanceTargetType,
removeTarget,
Expand Down Expand Up @@ -1724,7 +1722,7 @@ func replicationChangesForRebalance(
chgs = []roachpb.ReplicationChange{
{ChangeType: roachpb.ADD_VOTER, Target: addTarget},
}
log.KvDistribution.VEventf(ctx, 1, "can't swap replica due to lease; falling back to add")
log.KvDistribution.Infof(ctx, "can't swap replica due to lease; falling back to add")
return chgs, false, err
}

Expand Down Expand Up @@ -1810,7 +1808,7 @@ func (rq *replicateQueue) shedLease(
}

if opts.DryRun {
log.KvDistribution.VEventf(ctx, 1, "transferring lease to s%d", target.StoreID)
log.KvDistribution.Infof(ctx, "transferring lease to s%d", target.StoreID)
return allocator.NoTransferDryRun, nil
}

Expand Down Expand Up @@ -1859,7 +1857,7 @@ func (rq *replicateQueue) TransferLease(
ctx context.Context, rlm ReplicaLeaseMover, source, target roachpb.StoreID, rangeQPS float64,
) error {
rq.metrics.TransferLeaseCount.Inc(1)
log.KvDistribution.VEventf(ctx, 1, "transferring lease to s%d", target)
log.KvDistribution.Infof(ctx, "transferring lease to s%d", target)
if err := rlm.AdminTransferLease(ctx, target); err != nil {
return errors.Wrapf(err, "%s: unable to transfer lease to s%d", rlm, target)
}
Expand Down
41 changes: 15 additions & 26 deletions pkg/kv/kvserver/store_rebalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,8 @@ func (sr *StoreRebalancer) rebalanceStore(
}

descBeforeRebalance := replWithStats.repl.Desc()
log.KvDistribution.VEventf(
log.KvDistribution.Infof(
ctx,
1,
"rebalancing r%d (%.2f qps) to better balance load: voters from %v to %v; non-voters from %v to %v",
replWithStats.repl.RangeID,
replWithStats.qps,
Expand Down Expand Up @@ -408,7 +407,7 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer(
}

desc, conf := replWithStats.repl.DescAndSpanConfig()
log.KvDistribution.VEventf(ctx, 3, "considering lease transfer for r%d with %.2f qps",
log.KvDistribution.Infof(ctx, "considering lease transfer for r%d with %.2f qps",
desc.RangeID, replWithStats.qps)

// Check all the other voting replicas in order of increasing qps.
Expand Down Expand Up @@ -436,9 +435,8 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer(
)

if candidate == (roachpb.ReplicaDescriptor{}) {
log.KvDistribution.VEventf(
log.KvDistribution.Infof(
ctx,
3,
"could not find a better lease transfer target for r%d; considering replica rebalance instead",
desc.RangeID,
)
Expand All @@ -455,17 +453,16 @@ func (sr *StoreRebalancer) chooseLeaseToTransfer(
candidates,
replWithStats.repl.loadStats.batchRequests,
) {
log.KvDistribution.VEventf(
ctx, 3, "r%d is on s%d due to follow-the-workload; considering replica rebalance instead",
log.KvDistribution.Infof(
ctx, "r%d is on s%d due to follow-the-workload; considering replica rebalance instead",
desc.RangeID, localDesc.StoreID,
)
considerForRebalance = append(considerForRebalance, replWithStats)
continue
}
if targetStore, ok := storeMap[candidate.StoreID]; ok {
log.KvDistribution.VEventf(
log.KvDistribution.Infof(
ctx,
1,
"transferring lease for r%d (qps=%.2f) to store s%d (qps=%.2f) from local store s%d (qps=%.2f)",
desc.RangeID,
replWithStats.qps,
Expand Down Expand Up @@ -529,9 +526,8 @@ func (sr *StoreRebalancer) chooseRangeToRebalance(
numDesiredVoters := allocatorimpl.GetNeededVoters(conf.GetNumVoters(), clusterNodes)
numDesiredNonVoters := allocatorimpl.GetNeededNonVoters(numDesiredVoters, int(conf.GetNumNonVoters()), clusterNodes)
if expected, actual := numDesiredVoters, len(rangeDesc.Replicas().VoterDescriptors()); expected != actual {
log.KvDistribution.VEventf(
log.KvDistribution.Infof(
ctx,
3,
"r%d is either over or under replicated (expected %d voters, found %d); ignoring",
rangeDesc.RangeID,
expected,
Expand All @@ -540,9 +536,8 @@ func (sr *StoreRebalancer) chooseRangeToRebalance(
continue
}
if expected, actual := numDesiredNonVoters, len(rangeDesc.Replicas().NonVoterDescriptors()); expected != actual {
log.KvDistribution.VEventf(
log.KvDistribution.Infof(
ctx,
3,
"r%d is either over or under replicated (expected %d non-voters, found %d); ignoring",
rangeDesc.RangeID,
expected,
Expand Down Expand Up @@ -576,9 +571,8 @@ func (sr *StoreRebalancer) chooseRangeToRebalance(
continue
}

log.KvDistribution.VEventf(
log.KvDistribution.Infof(
ctx,
3,
"considering replica rebalance for r%d with %.2f qps",
replWithStats.repl.GetRangeID(),
replWithStats.qps,
Expand All @@ -594,7 +588,7 @@ func (sr *StoreRebalancer) chooseRangeToRebalance(
// Bail if there are no stores that are better for the existing replicas.
// If the range needs a lease transfer to enable better load distribution,
// it will be handled by the logic in `chooseLeaseToTransfer()`.
log.KvDistribution.VEventf(ctx, 3, "could not find rebalance opportunities for r%d", replWithStats.repl.RangeID)
log.KvDistribution.Infof(ctx, "could not find rebalance opportunities for r%d", replWithStats.repl.RangeID)
continue
}

Expand Down Expand Up @@ -626,9 +620,8 @@ func (sr *StoreRebalancer) chooseRangeToRebalance(
// being on dead stores, ignore this rebalance option. The lease for
// this range post-rebalance would have no suitable location.
if len(validTargets) == 0 {
log.KvDistribution.VEventf(
log.KvDistribution.Infof(
ctx,
3,
"could not find rebalance opportunities for r%d, no replica found to hold lease",
replWithStats.repl.RangeID,
)
Expand Down Expand Up @@ -694,9 +687,8 @@ func (sr *StoreRebalancer) getRebalanceTargetsBasedOnQPS(
options,
)
if !shouldRebalance {
log.KvDistribution.VEventf(
log.KvDistribution.Infof(
ctx,
3,
"no more rebalancing opportunities for r%d voters that improve QPS balance",
rbCtx.rangeDesc.RangeID,
)
Expand All @@ -705,9 +697,8 @@ func (sr *StoreRebalancer) getRebalanceTargetsBasedOnQPS(
// Record the fact that we found at least one rebalance opportunity.
foundRebalance = true
}
log.KvDistribution.VEventf(
log.KvDistribution.Infof(
ctx,
3,
"rebalancing voter (qps=%.2f) for r%d on %v to %v in order to improve QPS balance",
rbCtx.replWithStats.qps,
rbCtx.rangeDesc.RangeID,
Expand Down Expand Up @@ -758,9 +749,8 @@ func (sr *StoreRebalancer) getRebalanceTargetsBasedOnQPS(
options,
)
if !shouldRebalance {
log.KvDistribution.VEventf(
log.KvDistribution.Infof(
ctx,
3,
"no more rebalancing opportunities for r%d non-voters that improve QPS balance",
rbCtx.rangeDesc.RangeID,
)
Expand All @@ -769,9 +759,8 @@ func (sr *StoreRebalancer) getRebalanceTargetsBasedOnQPS(
// Record the fact that we found at least one rebalance opportunity.
foundRebalance = true
}
log.KvDistribution.VEventf(
log.KvDistribution.Infof(
ctx,
3,
"rebalancing non-voter (qps=%.2f) for r%d on %v to %v in order to improve QPS balance",
rbCtx.replWithStats.qps,
rbCtx.rangeDesc.RangeID,
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/sql_keys
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,8 @@ query II
SELECT count(key), count(DISTINCT key) FROM crdb_internal.scan(crdb_internal.table_span($tableid))
----
4096 4096

# Regression test for not closing the generator builtin if it encounters an
# error in Start() (#87248).
statement error failed to verify keys for Scan
SELECT crdb_internal.scan('\xff':::BYTES, '\x3f5918':::BYTES);
Loading

0 comments on commit d470252

Please sign in to comment.