Skip to content

Commit

Permalink
Merge #85844 #85890
Browse files Browse the repository at this point in the history
85844: kvserver: track replicate queue metrics by allocator action r=AlexTalks a=AlexTalks

While previously we had metrics within the replicate queue which tracked
the number of particular actions processed by the queue based on a
unique set of categories, this change adds new metrics for tracking the
successes/errors of a replica being processed by the replicate queue,
using the allocator action as a method of categorizing these actions.
With this categorization, we are able to track success and error counts
during rebalancing, upreplicating when we have a dead node, or
decommissioning. The categorization makes no distinction between actions
relatinv to voter replicas vs non-voter replicas, so they are aggregated
across these two types.

Release note (ops change): added new metrics:
```
queue.replicate.addreplica.(success|error)
queue.replicate.removereplica.(success|error)
queue.replicate.replacedeadreplica.(success|error)
queue.replicate.removedeadreplica.(success|error)
queue.replicate.replacedecommissioningreplica.(success|error)
queue.replicate.removedecommissioningreplica.(success|error)
```

85890: builtins: add trunc(decimal, int) builtin r=ZhouXing19 a=rafiss

fixes #85620

Release note (sql change): Added the trunc(decimal, int) builtin
function, which truncates the given decimal value to the specified
number of decimal places. A negative value can be used for the scale
parameter, which will truncate to the left of the decimal point.

Example:
```
> select trunc(541.234, 2), trunc(541.234, 0), trunc(541.234, -1);

  trunc  | trunc | trunc
---------+-------+---------
  541.23 |   541 | 5.4E+2
```

Co-authored-by: Alex Sarkesian <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
  • Loading branch information
3 people committed Aug 13, 2022
3 parents 0dd438d + 62b5e8b + 0268864 commit 158ebe6
Show file tree
Hide file tree
Showing 10 changed files with 309 additions and 12 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -871,6 +871,8 @@ available replica will error.</p>
</span></td><td>Immutable</td></tr>
<tr><td><a name="trunc"></a><code>trunc(val: <a href="decimal.html">decimal</a>) &rarr; <a href="decimal.html">decimal</a></code></td><td><span class="funcdesc"><p>Truncates the decimal values of <code>val</code>.</p>
</span></td><td>Immutable</td></tr>
<tr><td><a name="trunc"></a><code>trunc(val: <a href="decimal.html">decimal</a>, scale: <a href="int.html">int</a>) &rarr; <a href="decimal.html">decimal</a></code></td><td><span class="funcdesc"><p>Truncate <code>val</code> to <code>scale</code> decimal places</p>
</span></td><td>Immutable</td></tr>
<tr><td><a name="trunc"></a><code>trunc(val: <a href="float.html">float</a>) &rarr; <a href="float.html">float</a></code></td><td><span class="funcdesc"><p>Truncates the decimal values of <code>val</code>.</p>
</span></td><td>Immutable</td></tr></tbody>
</table>
Expand Down
182 changes: 174 additions & 8 deletions pkg/kv/kvserver/replicate_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,78 @@ var (
Measurement: "Demotions of Voters to Non Voters",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueAddReplicaSuccessCount = metric.Metadata{
Name: "queue.replicate.addreplica.success",
Help: "Number of successful replica additions processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueAddReplicaErrorCount = metric.Metadata{
Name: "queue.replicate.addreplica.error",
Help: "Number of failed replica additions processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueRemoveReplicaSuccessCount = metric.Metadata{
Name: "queue.replicate.removereplica.success",
Help: "Number of successful replica removals processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueRemoveReplicaErrorCount = metric.Metadata{
Name: "queue.replicate.removereplica.error",
Help: "Number of failed replica removals processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueReplaceDeadReplicaSuccessCount = metric.Metadata{
Name: "queue.replicate.replacedeadreplica.success",
Help: "Number of successful dead replica replica replacements processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueReplaceDeadReplicaErrorCount = metric.Metadata{
Name: "queue.replicate.replacedeadreplica.error",
Help: "Number of failed dead replica replica replacements processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueReplaceDecommissioningReplicaSuccessCount = metric.Metadata{
Name: "queue.replicate.replacedecommissioningreplica.success",
Help: "Number of successful decommissioning replica replica replacements processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueReplaceDecommissioningReplicaErrorCount = metric.Metadata{
Name: "queue.replicate.replacedecommissioningreplica.error",
Help: "Number of failed decommissioning replica replica replacements processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueRemoveDecommissioningReplicaSuccessCount = metric.Metadata{
Name: "queue.replicate.removedecommissioningreplica.success",
Help: "Number of successful decommissioning replica replica removals processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueRemoveDecommissioningReplicaErrorCount = metric.Metadata{
Name: "queue.replicate.removedecommissioningreplica.error",
Help: "Number of failed decommissioning replica replica removals processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueRemoveDeadReplicaSuccessCount = metric.Metadata{
Name: "queue.replicate.removedeadreplica.success",
Help: "Number of successful dead replica replica removals processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
metaReplicateQueueRemoveDeadReplicaErrorCount = metric.Metadata{
Name: "queue.replicate.removedeadreplica.error",
Help: "Number of failed dead replica replica removals processed by the replicate queue",
Measurement: "Replicas",
Unit: metric.Unit_COUNT,
}
)

// quorumError indicates a retryable error condition which sends replicas being
Expand Down Expand Up @@ -228,6 +300,23 @@ type ReplicateQueueMetrics struct {
TransferLeaseCount *metric.Counter
NonVoterPromotionsCount *metric.Counter
VoterDemotionsCount *metric.Counter

// Success/error counts by allocator action.
RemoveReplicaSuccessCount *metric.Counter
RemoveReplicaErrorCount *metric.Counter
AddReplicaSuccessCount *metric.Counter
AddReplicaErrorCount *metric.Counter
ReplaceDeadReplicaSuccessCount *metric.Counter
ReplaceDeadReplicaErrorCount *metric.Counter
RemoveDeadReplicaSuccessCount *metric.Counter
RemoveDeadReplicaErrorCount *metric.Counter
ReplaceDecommissioningReplicaSuccessCount *metric.Counter
ReplaceDecommissioningReplicaErrorCount *metric.Counter
RemoveDecommissioningReplicaSuccessCount *metric.Counter
RemoveDecommissioningReplicaErrorCount *metric.Counter
// TODO(sarkesian): Consider adding metrics for AllocatorRemoveLearner,
// AllocatorConsiderRebalance, and AllocatorFinalizeAtomicReplicationChange
// allocator actions.
}

func makeReplicateQueueMetrics() ReplicateQueueMetrics {
Expand All @@ -251,6 +340,19 @@ func makeReplicateQueueMetrics() ReplicateQueueMetrics {
TransferLeaseCount: metric.NewCounter(metaReplicateQueueTransferLeaseCount),
NonVoterPromotionsCount: metric.NewCounter(metaReplicateQueueNonVoterPromotionsCount),
VoterDemotionsCount: metric.NewCounter(metaReplicateQueueVoterDemotionsCount),

RemoveReplicaSuccessCount: metric.NewCounter(metaReplicateQueueRemoveReplicaSuccessCount),
RemoveReplicaErrorCount: metric.NewCounter(metaReplicateQueueRemoveReplicaErrorCount),
AddReplicaSuccessCount: metric.NewCounter(metaReplicateQueueAddReplicaSuccessCount),
AddReplicaErrorCount: metric.NewCounter(metaReplicateQueueAddReplicaErrorCount),
ReplaceDeadReplicaSuccessCount: metric.NewCounter(metaReplicateQueueReplaceDeadReplicaSuccessCount),
ReplaceDeadReplicaErrorCount: metric.NewCounter(metaReplicateQueueReplaceDeadReplicaErrorCount),
RemoveDeadReplicaSuccessCount: metric.NewCounter(metaReplicateQueueRemoveDeadReplicaSuccessCount),
RemoveDeadReplicaErrorCount: metric.NewCounter(metaReplicateQueueRemoveDeadReplicaErrorCount),
ReplaceDecommissioningReplicaSuccessCount: metric.NewCounter(metaReplicateQueueReplaceDecommissioningReplicaSuccessCount),
ReplaceDecommissioningReplicaErrorCount: metric.NewCounter(metaReplicateQueueReplaceDecommissioningReplicaErrorCount),
RemoveDecommissioningReplicaSuccessCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningReplicaSuccessCount),
RemoveDecommissioningReplicaErrorCount: metric.NewCounter(metaReplicateQueueRemoveDecommissioningReplicaErrorCount),
}
}

Expand Down Expand Up @@ -353,6 +455,50 @@ func (metrics *ReplicateQueueMetrics) trackRebalanceReplicaCount(
}
}

// trackProcessResult increases the corresponding success/error count metric for
// processing a particular allocator action through the replicate queue.
func (metrics *ReplicateQueueMetrics) trackResultByAllocatorAction(
action allocatorimpl.AllocatorAction, err error, dryRun bool,
) {
if dryRun {
return
}
switch action {
case allocatorimpl.AllocatorRemoveVoter, allocatorimpl.AllocatorRemoveNonVoter:
if err == nil {
metrics.RemoveReplicaSuccessCount.Inc(1)
} else {
metrics.RemoveReplicaErrorCount.Inc(1)
}
case allocatorimpl.AllocatorAddVoter, allocatorimpl.AllocatorAddNonVoter:
if err == nil {
metrics.AddReplicaSuccessCount.Inc(1)
} else {
metrics.AddReplicaErrorCount.Inc(1)
}
case allocatorimpl.AllocatorReplaceDeadVoter, allocatorimpl.AllocatorReplaceDeadNonVoter:
if err == nil {
metrics.ReplaceDeadReplicaSuccessCount.Inc(1)
} else {
metrics.ReplaceDeadReplicaErrorCount.Inc(1)
}
case allocatorimpl.AllocatorReplaceDecommissioningVoter, allocatorimpl.AllocatorReplaceDecommissioningNonVoter:
if err == nil {
metrics.ReplaceDecommissioningReplicaSuccessCount.Inc(1)
} else {
metrics.ReplaceDecommissioningReplicaErrorCount.Inc(1)
}
case allocatorimpl.AllocatorRemoveDecommissioningVoter, allocatorimpl.AllocatorRemoveDecommissioningNonVoter:
if err == nil {
metrics.RemoveDecommissioningReplicaSuccessCount.Inc(1)
} else {
metrics.RemoveDecommissioningReplicaErrorCount.Inc(1)
}
default:
panic(fmt.Sprintf("unsupported AllocatorAction: %v", action))
}
}

// replicateQueue manages a queue of replicas which may need to add an
// additional replica to their range.
type replicateQueue struct {
Expand Down Expand Up @@ -641,19 +787,27 @@ func (rq *replicateQueue) processOneChange(

// Add replicas.
case allocatorimpl.AllocatorAddVoter:
return rq.addOrReplaceVoters(
requeue, err := rq.addOrReplaceVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, dryRun,
)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err
case allocatorimpl.AllocatorAddNonVoter:
return rq.addOrReplaceNonVoters(
requeue, err := rq.addOrReplaceNonVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, dryRun,
)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err

// Remove replicas.
case allocatorimpl.AllocatorRemoveVoter:
return rq.removeVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun)
requeue, err := rq.removeVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err
case allocatorimpl.AllocatorRemoveNonVoter:
return rq.removeNonVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun)
requeue, err := rq.removeNonVoter(ctx, repl, voterReplicas, nonVoterReplicas, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err

// Replace dead replicas.
case allocatorimpl.AllocatorReplaceDeadVoter:
Expand All @@ -667,8 +821,10 @@ func (rq *replicateQueue) processOneChange(
"dead voter %v unexpectedly not found in %v",
deadVoterReplicas[0], voterReplicas)
}
return rq.addOrReplaceVoters(
requeue, err := rq.addOrReplaceVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err
case allocatorimpl.AllocatorReplaceDeadNonVoter:
if len(deadNonVoterReplicas) == 0 {
// Nothing to do.
Expand All @@ -680,8 +836,10 @@ func (rq *replicateQueue) processOneChange(
"dead non-voter %v unexpectedly not found in %v",
deadNonVoterReplicas[0], nonVoterReplicas)
}
return rq.addOrReplaceNonVoters(
requeue, err := rq.addOrReplaceNonVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err

// Replace decommissioning replicas.
case allocatorimpl.AllocatorReplaceDecommissioningVoter:
Expand All @@ -698,6 +856,7 @@ func (rq *replicateQueue) processOneChange(
}
requeue, err := rq.addOrReplaceVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
if err != nil {
return requeue, decommissionPurgatoryError{err}
}
Expand All @@ -715,6 +874,7 @@ func (rq *replicateQueue) processOneChange(
}
requeue, err := rq.addOrReplaceNonVoters(
ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
if err != nil {
return requeue, decommissionPurgatoryError{err}
}
Expand All @@ -727,12 +887,14 @@ func (rq *replicateQueue) processOneChange(
// AllocatorReplaceDecommissioning{Non}Voter above.
case allocatorimpl.AllocatorRemoveDecommissioningVoter:
requeue, err := rq.removeDecommissioning(ctx, repl, allocatorimpl.VoterTarget, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
if err != nil {
return requeue, decommissionPurgatoryError{err}
}
return requeue, nil
case allocatorimpl.AllocatorRemoveDecommissioningNonVoter:
requeue, err := rq.removeDecommissioning(ctx, repl, allocatorimpl.NonVoterTarget, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
if err != nil {
return requeue, decommissionPurgatoryError{err}
}
Expand All @@ -744,9 +906,13 @@ func (rq *replicateQueue) processOneChange(
// over-replicated and has dead replicas; in the common case we'll hit
// AllocatorReplaceDead{Non}Voter above.
case allocatorimpl.AllocatorRemoveDeadVoter:
return rq.removeDead(ctx, repl, deadVoterReplicas, allocatorimpl.VoterTarget, dryRun)
requeue, err := rq.removeDead(ctx, repl, deadVoterReplicas, allocatorimpl.VoterTarget, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err
case allocatorimpl.AllocatorRemoveDeadNonVoter:
return rq.removeDead(ctx, repl, deadNonVoterReplicas, allocatorimpl.NonVoterTarget, dryRun)
requeue, err := rq.removeDead(ctx, repl, deadNonVoterReplicas, allocatorimpl.NonVoterTarget, dryRun)
rq.metrics.trackResultByAllocatorAction(action, err, dryRun)
return requeue, err

case allocatorimpl.AllocatorConsiderRebalance:
return rq.considerRebalance(
Expand Down
Loading

0 comments on commit 158ebe6

Please sign in to comment.