Skip to content

Commit

Permalink
server: add x-region, x-zone metrics to Node
Browse files Browse the repository at this point in the history
Previously, there were no metrics to observe cross-region, cross-zone traffic in
batch requests / responses processed at receiver nodes.

To improve this issue, this commit adds four new node metrics  -
```
"batch_requests.bytes"
"batch_responses.bytes"
"batch_requests.cross_region.bytes"
"batch_responses.cross_region.bytes"
"batch_requests.cross_zone.bytes",
"batch_responses.cross_zone.bytes"
```
The first two metrics track the total byte count of batch requests processed and
batch responses received at node. Additionally, there are four metrics to track
the aggregate counts processed and received across different regions and zones.
Note that these metrics only track the receiver node since the node here
represents the destination range node but not the gateway node.

Part of: #103983

Release note (ops change): Six new metrics -
"batch_requests.bytes",
"batch_responses.bytes",
"batch_requests.cross_region.bytes",
"batch_responses.cross_region.bytes",
"batch_requests.cross_zone.bytes",
"batch_responses.cross_zone.bytes" - are now added to Node metrics.

For accurate metrics, follow these assumptions:
- Configure region and zone tier keys consistently across nodes.
- Within a node locality, ensure unique region and zone tier keys.
- Maintain consistent configuration of region and zone tiers across nodes.
  • Loading branch information
wenyihu6 committed Jun 13, 2023
1 parent 5d630ae commit a1bea39
Show file tree
Hide file tree
Showing 3 changed files with 385 additions and 6 deletions.
10 changes: 10 additions & 0 deletions pkg/kv/kvserver/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,16 @@ type StoreTestingKnobs struct {
// no-op write, and the ForcedError field will be set.
TestingPostApplyFilter kvserverbase.ReplicaApplyFilter

// TestingBatchRequestFilter intercepts Node.Batch() to pass the actual batch
// request byte count to the test. A boolean value is returned here to filter
// out changes in node metrics that are irrelevant to the test.
TestingBatchRequestFilter func(*kvpb.BatchRequest) bool

// TestingBatchResponseFilter intercepts Node.Batch() to pass the actual batch
// request byte count to the test. A boolean value is returned here to filter
// out changes in node metrics that are irrelevant to the test.
TestingBatchResponseFilter func(*kvpb.BatchResponse) bool

// TestingResponseErrorEvent is called when an error is returned applying
// a command.
TestingResponseErrorEvent func(context.Context, *kvpb.BatchRequest, error)
Expand Down
206 changes: 200 additions & 6 deletions pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"context"
"fmt"
"net"
"reflect"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -142,6 +143,54 @@ This metric is thus not an indicator of KV health.`,
Measurement: "Batches",
Unit: metric.Unit_COUNT,
}

metaBatchRequestsBytes = metric.Metadata{
Name: "batch_requests.bytes",
Help: `Total byte count of batch requests processed`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaBatchResponsesBytes = metric.Metadata{
Name: "batch_responses.bytes",
Help: `Total byte count of batch responses received`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaCrossRegionBatchRequest = metric.Metadata{
Name: "batch_requests.cross_region.bytes",
Help: `Total byte count of batch requests processed cross region when region
tiers are configured`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaCrossRegionBatchResponse = metric.Metadata{
Name: "batch_responses.cross_region.bytes",
Help: `Total byte count of batch responses received cross region when region
tiers are configured`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaCrossZoneBatchRequest = metric.Metadata{
Name: "batch_requests.cross_zone.bytes",
Help: `Total byte count of batch requests processed cross zone within
the same region when region and zone tiers are configured. However, if the
region tiers are not configured, this count may also include batch data sent
between different regions. Ensuring consistent configuration of region and
zone tiers across nodes helps to accurately monitor the data transmitted.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaCrossZoneBatchResponse = metric.Metadata{
Name: "batch_responses.cross_zone.bytes",
Help: `Total byte count of batch responses received cross zone within the
same region when region and zone tiers are configured. However, if the
region tiers are not configured, this count may also include batch data
received between different regions. Ensuring consistent configuration of
region and zone tiers across nodes helps to accurately monitor the data
transmitted.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
)

// Cluster settings.
Expand Down Expand Up @@ -182,8 +231,14 @@ type nodeMetrics struct {
Err *metric.Counter
DiskStalls *metric.Counter

BatchCount *metric.Counter
MethodCounts [kvpb.NumMethods]*metric.Counter
BatchCount *metric.Counter
MethodCounts [kvpb.NumMethods]*metric.Counter
BatchRequestsBytes *metric.Counter
BatchResponsesBytes *metric.Counter
CrossRegionBatchRequestBytes *metric.Counter
CrossRegionBatchResponseBytes *metric.Counter
CrossZoneBatchRequestBytes *metric.Counter
CrossZoneBatchResponseBytes *metric.Counter
}

func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) nodeMetrics {
Expand All @@ -194,10 +249,16 @@ func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) nodeMe
Duration: histogramWindow,
Buckets: metric.IOLatencyBuckets,
}),
Success: metric.NewCounter(metaExecSuccess),
Err: metric.NewCounter(metaExecError),
DiskStalls: metric.NewCounter(metaDiskStalls),
BatchCount: metric.NewCounter(metaInternalBatchRPCCount),
Success: metric.NewCounter(metaExecSuccess),
Err: metric.NewCounter(metaExecError),
DiskStalls: metric.NewCounter(metaDiskStalls),
BatchCount: metric.NewCounter(metaInternalBatchRPCCount),
BatchRequestsBytes: metric.NewCounter(metaBatchRequestsBytes),
BatchResponsesBytes: metric.NewCounter(metaBatchResponsesBytes),
CrossRegionBatchRequestBytes: metric.NewCounter(metaCrossRegionBatchRequest),
CrossRegionBatchResponseBytes: metric.NewCounter(metaCrossRegionBatchResponse),
CrossZoneBatchRequestBytes: metric.NewCounter(metaCrossZoneBatchRequest),
CrossZoneBatchResponseBytes: metric.NewCounter(metaCrossZoneBatchResponse),
}

for i := range nm.MethodCounts {
Expand All @@ -224,6 +285,41 @@ func (nm nodeMetrics) callComplete(d time.Duration, pErr *kvpb.Error) {
nm.Latency.RecordValue(d.Nanoseconds())
}

// getNodeCounterMetrics fetches the count of each specified node metric from
// the `metricNames` parameter and returns the result as a map. The keys in the
// map represent the metric metadata names, while the corresponding values
// indicate the count of each metric. If any of the specified metric cannot be
// found or is not a counter, the function will return an error.
//
// Assumption: 1. The metricNames parameter should consist of string literals
// that match the metadata names used for metric counters. 2. Each metric name
// provided in `metricNames` must exist, unique and be a counter type.
func (nm nodeMetrics) getNodeCounterMetrics(metricsName []string) (map[string]int64, error) {
metricCountMap := make(map[string]int64)
getFirstNodeMetric := func(metricName string) int64 {
metricsStruct := reflect.ValueOf(nm)
for i := 0; i < metricsStruct.NumField(); i++ {
field := metricsStruct.Field(i)
switch t := field.Interface().(type) {
case *metric.Counter:
if t.Name == metricName {
return t.Count()
}
}
}
return -1
}

for _, metricName := range metricsName {
count := getFirstNodeMetric(metricName)
if count == -1 {
return map[string]int64{}, errors.Errorf("cannot find metric for %s", metricName)
}
metricCountMap[metricName] = count
}
return metricCountMap, nil
}

// A Node manages a map of stores (by store ID) for which it serves
// traffic. A node is the top-level data structure. There is one node
// instance per process. A node accepts incoming RPCs and services
Expand Down Expand Up @@ -1229,6 +1325,86 @@ func (n *Node) batchInternal(
return br, nil
}

// isCrossRegionCrossZoneBatch returns (bool, bool) - indicating if the given
// batch request is cross-region and cross-zone respectively.
func (n *Node) isCrossRegionCrossZoneBatch(
ctx context.Context, ba *kvpb.BatchRequest,
) (bool, bool) {
gossip := n.storeCfg.Gossip
if gossip == nil {
log.VEventf(ctx, 2, "gossip is not configured")
return false, false
}

gatewayNodeDesc, err := gossip.GetNodeDescriptor(ba.GatewayNodeID)
if err != nil {
log.VEventf(ctx, 2,
"failed to perform look up for node descriptor %+v", err)
return false, false
}

isCrossRegion, regionErr, isCrossZone, zoneErr := n.Descriptor.Locality.
IsCrossRegionCrossZone(gatewayNodeDesc.Locality)
if regionErr != nil {
log.VEventf(ctx, 2, "%v", regionErr)
}
if zoneErr != nil {
log.VEventf(ctx, 2, "%v", zoneErr)
}

return isCrossRegion, isCrossZone
}

// checkAndUpdateBatchRequestMetrics updates the batch requests metrics in a
// more meaningful way. Cross-region metrics monitor activities across different
// regions. Cross-zone metrics monitor cross-zone activities within the same
// region or in cases where region tiers are not configured. The check result is
// returned here to avoid redundant check for metrics updates after receiving
// batch responses.
func (n *Node) checkAndUpdateBatchRequestMetrics(
ctx context.Context, ba *kvpb.BatchRequest, shouldIncrement bool,
) (shouldIncCrossRegion bool, shouldIncCrossZone bool) {
if !shouldIncrement {
return false, false
}
n.metrics.BatchRequestsBytes.Inc(int64(ba.Size()))
isCrossRegion, isCrossZone := n.isCrossRegionCrossZoneBatch(ctx, ba)
if isCrossRegion {
if !isCrossZone {
log.VEventf(ctx, 2, "unexpected: cross region but same zone")
} else {
n.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size()))
shouldIncCrossRegion = true
}
} else {
if isCrossZone {
n.metrics.CrossZoneBatchRequestBytes.Inc(int64(ba.Size()))
shouldIncCrossZone = true
}
}
return shouldIncCrossRegion, shouldIncCrossZone
}

// checkAndUpdateBatchResponseMetrics updates the batch response metrics based
// on the shouldIncCrossRegion and shouldIncCrossZone parameters. These
// parameters are determined during the initial check for batch requests. The
// underlying assumption is that if requests were cross-region or cross-zone,
// the response should be as well.
func (n *Node) checkAndUpdateBatchResponseMetrics(
br *kvpb.BatchResponse, shouldIncCrossRegion bool, shouldIncCrossZone bool, shouldIncrement bool,
) {
if !shouldIncrement {
return
}
n.metrics.BatchResponsesBytes.Inc(int64(br.Size()))
if shouldIncCrossRegion {
n.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size()))
}
if shouldIncCrossZone {
n.metrics.CrossZoneBatchResponseBytes.Inc(int64(br.Size()))
}
}

// incrementBatchCounters increments counters to track the batch and composite
// request methods.
func (n *Node) incrementBatchCounters(ba *kvpb.BatchRequest) {
Expand All @@ -1243,6 +1419,15 @@ func (n *Node) incrementBatchCounters(ba *kvpb.BatchRequest) {
func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
n.incrementBatchCounters(args)

shouldIncrement := true
if fn := n.storeCfg.TestingKnobs.TestingBatchRequestFilter; fn != nil {
// ShouldIncrement is always set to true in the production environment. The
// testing knob is used here to filter out metrics changes caused by batch
// requests that are irrelevant to our tests.
shouldIncrement = fn(args)
}
shouldIncCrossRegion, shouldIncCrossZone := n.checkAndUpdateBatchRequestMetrics(ctx, args, shouldIncrement)

// NB: Node.Batch is called directly for "local" calls. We don't want to
// carry the associated log tags forward as doing so makes adding additional
// log tags more expensive and makes local calls differ from remote calls.
Expand Down Expand Up @@ -1291,6 +1476,15 @@ func (n *Node) Batch(ctx context.Context, args *kvpb.BatchRequest) (*kvpb.BatchR
}
br.Error = kvpb.NewError(err)
}

shouldIncrement = true
if fn := n.storeCfg.TestingKnobs.TestingBatchResponseFilter; fn != nil {
// ShouldIncrement is always set to true in the production environment. The
// testing knob is used here to filter out metrics changes caused by batch
// requests that are irrelevant to our tests.
shouldIncrement = fn(br)
}
n.checkAndUpdateBatchResponseMetrics(br, shouldIncCrossRegion, shouldIncCrossZone, shouldIncrement)
if buildutil.CrdbTestBuild && br.Error != nil && n.testingErrorEvent != nil {
n.testingErrorEvent(ctx, args, errors.DecodeError(ctx, br.Error.EncodedError))
}
Expand Down
Loading

0 comments on commit a1bea39

Please sign in to comment.