Skip to content

Commit

Permalink
kvclient: add cross-region byte count metrics to DistSender
Browse files Browse the repository at this point in the history
Previously, there were no metrics to observe cross-region traffic in batch
requests / responses. This limitation becomes problematic when trying to
evaluate cross-region traffic managed by nodes.

To improve this issue, this commit adds two new metrics to DistSenderMetrics –
CrossRegionBatchRequestBytes and CrossRegionBatchResponseBytes. These metrics
track the byte count for batch requests sent and responses received in
cross-region batches within DistSender. DistSender resides on the gateway node
that receives SQL queries, so DistSender metrics are updated when DistSender
sends cross-region batches to another node.

Note: These metrics require nodes’ localities to include a tier with the key
“region”. If a node does not have this key but participates in cross-region
batch activities, metrics will remain unchanged, and an error message will be
logged. The region of each node is determined by using the locality field and
the “region” tier value. Unfortunately, the “region” key is hard coded here.
Ideally, we would prefer a more flexible approach to determine node locality.

Part of: cockroachdb#103983

Release note (ops change): Two new metrics - CrossRegionBatchRequestBytes,
CrossRegionBatchResponseBytes - are now added to DistSender metrics. Note that
these metrics require nodes’ localities to include a “region” tier key. If a
node lacks this key but is involved in cross-region batch activities, an error
message will be logged.
  • Loading branch information
wenyihu6 committed Jun 6, 2023
1 parent 530e2ef commit 74c046b
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 36 deletions.
152 changes: 116 additions & 36 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,18 @@ This counts the number of ranges with an active rangefeed that are performing ca
Measurement: "Count",
Unit: metric.Unit_COUNT,
}
metaDistSenderCrossRegionBatchRequestBytes = metric.Metadata{
Name: "distsender.batch_requests.cross_region",
Help: `Total byte count of cross-region batch requests sent`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaDistSenderCrossRegionBatchResponseBytes = metric.Metadata{
Name: "distsender.batch_responses.cross_region",
Help: `Total byte count of cross-region batch responses received`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
)

// CanSendToFollower is used by the DistSender to determine if it needs to look
Expand Down Expand Up @@ -241,44 +253,48 @@ func max(a, b int64) int64 {

// DistSenderMetrics is the set of metrics for a given distributed sender.
type DistSenderMetrics struct {
BatchCount *metric.Counter
PartialBatchCount *metric.Counter
AsyncSentCount *metric.Counter
AsyncThrottledCount *metric.Counter
SentCount *metric.Counter
LocalSentCount *metric.Counter
NextReplicaErrCount *metric.Counter
NotLeaseHolderErrCount *metric.Counter
InLeaseTransferBackoffs *metric.Counter
RangeLookups *metric.Counter
SlowRPCs *metric.Gauge
RangefeedRanges *metric.Gauge
RangefeedCatchupRanges *metric.Gauge
RangefeedErrorCatchup *metric.Counter
RangefeedRestartRanges *metric.Counter
RangefeedRestartStuck *metric.Counter
MethodCounts [kvpb.NumMethods]*metric.Counter
ErrCounts [kvpb.NumErrors]*metric.Counter
BatchCount *metric.Counter
PartialBatchCount *metric.Counter
CrossRegionBatchRequestBytes *metric.Counter
CrossRegionBatchResponseBytes *metric.Counter
AsyncSentCount *metric.Counter
AsyncThrottledCount *metric.Counter
SentCount *metric.Counter
LocalSentCount *metric.Counter
NextReplicaErrCount *metric.Counter
NotLeaseHolderErrCount *metric.Counter
InLeaseTransferBackoffs *metric.Counter
RangeLookups *metric.Counter
SlowRPCs *metric.Gauge
RangefeedRanges *metric.Gauge
RangefeedCatchupRanges *metric.Gauge
RangefeedErrorCatchup *metric.Counter
RangefeedRestartRanges *metric.Counter
RangefeedRestartStuck *metric.Counter
MethodCounts [kvpb.NumMethods]*metric.Counter
ErrCounts [kvpb.NumErrors]*metric.Counter
}

func makeDistSenderMetrics() DistSenderMetrics {
m := DistSenderMetrics{
BatchCount: metric.NewCounter(metaDistSenderBatchCount),
PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount),
AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount),
AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount),
SentCount: metric.NewCounter(metaTransportSentCount),
LocalSentCount: metric.NewCounter(metaTransportLocalSentCount),
NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount),
NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount),
InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount),
RangeLookups: metric.NewCounter(metaDistSenderRangeLookups),
SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs),
RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges),
RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges),
RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges),
RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges),
RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck),
BatchCount: metric.NewCounter(metaDistSenderBatchCount),
PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount),
CrossRegionBatchRequestBytes: metric.NewCounter(metaDistSenderCrossRegionBatchRequestBytes),
CrossRegionBatchResponseBytes: metric.NewCounter(metaDistSenderCrossRegionBatchResponseBytes),
AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount),
AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount),
SentCount: metric.NewCounter(metaTransportSentCount),
LocalSentCount: metric.NewCounter(metaTransportLocalSentCount),
NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount),
NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount),
InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount),
RangeLookups: metric.NewCounter(metaDistSenderRangeLookups),
SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs),
RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges),
RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges),
RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges),
RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges),
RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck),
}
for i := range m.MethodCounts {
method := kvpb.Method(i).String()
Expand Down Expand Up @@ -367,6 +383,12 @@ type DistSender struct {

onRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error

// BatchRequestInterceptor intercepts DistSender.Send() to validate BatchRequest properties.
BatchRequestInterceptor func(ba *kvpb.BatchRequest)

// BatchResponseInterceptor intercepts DistSender.Send() to validate BatchResponse properties.
BatchResponseInterceptor func(br *kvpb.BatchResponse)

// locality is the description of the topography of the server on which the
// DistSender is running. It is used to estimate the latency to other nodes
// in the absence of a latency function.
Expand Down Expand Up @@ -520,8 +542,12 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
ds.latencyFunc = ds.rpcContext.RemoteClocks.Latency
}

if cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch != nil {
ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch
if cfg.TestingKnobs.BatchRequestInterceptor != nil {
ds.BatchRequestInterceptor = cfg.TestingKnobs.BatchRequestInterceptor
}

if cfg.TestingKnobs.BatchResponseInterceptor != nil {
ds.BatchResponseInterceptor = cfg.TestingKnobs.BatchResponseInterceptor
}

return ds
Expand Down Expand Up @@ -2178,9 +2204,27 @@ func (ds *DistSender) sendToReplicas(
ExplicitlyRequested: ba.ClientRangeInfo.ExplicitlyRequested ||
(desc.Generation == 0 && routing.LeaseSeq() == 0),
}

if ds.BatchRequestInterceptor != nil {
ds.BatchRequestInterceptor(ba)
}

isCrossRegion, err := ds.maybeIncrementCrossRegionBatchMetrics(ba)
if err != nil {
log.Eventf(ctx, "%v", err)
}

br, err = transport.SendNext(ctx, ba)

if ds.BatchResponseInterceptor != nil {
ds.BatchResponseInterceptor(br)
}
ds.maybeIncrementErrCounters(br, err)

if isCrossRegion {
ds.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size()))
}

if err != nil {
if grpcutil.IsAuthError(err) {
// Authentication or authorization error. Propagate.
Expand Down Expand Up @@ -2434,6 +2478,42 @@ func (ds *DistSender) sendToReplicas(
}
}

// maybeIncrementCrossRegionBatchMetrics updates DistSender metrics for
// cross-region batch requests. It takes a BatchRequest parameter to extract
// information about the gatewayNodeID, destinationNodeID, and the byte count of
// batch request. It returns (bool, error) indicating whether the batch request
// is cross-region and if any errors occurred during the process.
//
// DistSender first tries to obtain the localities of the nodes. If DistSender
// is unable to obtain the node descriptor for gateway or destination node, or
// if the locality of any node does not have a “region” key, the function
// returns (false, error). If no errors occured, it checks if the gateway and
// destination nodes are in different regions. If they are, it updates the
// cross-region metrics and returns (true, nil); otherwise, it returns (false,
// nil).
//
// isCrossRegion is returned here to avoid redundant checks for cross-region
// after receiving batch responses.
func (ds *DistSender) maybeIncrementCrossRegionBatchMetrics(ba *kvpb.BatchRequest) (bool, error) {
gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.GatewayNodeID)
if err != nil {
return false, err
}
destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.Replica.NodeID)
if err != nil {
return false, err
}
isCrossRegion, err := gatewayNodeDesc.Locality.IsCrossRegion(destinationNodeDesc.Locality)
if err != nil {
return false, err
}
if isCrossRegion {
ds.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size()))
return true, nil
}
return false, nil
}

// getCostControllerConfig returns the config for the tenant cost model. This
// returns nil if no KV interceptors are associated with the DistSender, or the
// KV interceptor is not a multitenant.TenantSideCostController.
Expand Down
118 changes: 118 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5651,6 +5651,124 @@ func TestDistSenderRPCMetrics(t *testing.T) {
require.Equal(t, ds.metrics.ErrCounts[kvpb.ConditionFailedErrType].Count(), int64(1))
}

// TestDistSenderCrossRegionBatchMetrics verifies that the DistSender.Send()
// correctly updates the cross-region byte count metrics for batches requests
// sent and batch responses received.
func TestDistSenderCrossRegionBatchMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)

// The initial setup ensures the correct setup for three nodes (with different
// localities), single-range, three replicas (on different nodes), gossip, and
// DistSender.
clock := hlc.NewClockForTesting(nil)
rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper)
rangeDesc := testUserRangeDescriptor3Replicas
replicas := rangeDesc.InternalReplicas

makeNewNodeWithLocality := func(nodeID roachpb.NodeID) roachpb.NodeDescriptor {
return roachpb.NodeDescriptor{
NodeID: nodeID,
Address: util.UnresolvedAddr{},
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{
{Key: "region", Value: fmt.Sprintf("us-east-%v", nodeID)},
{Key: "zone", Value: fmt.Sprintf("us-east-%va", nodeID)},
{Key: "az", Value: "a"},
},
},
}
}

ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{
makeNewNodeWithLocality(1),
makeNewNodeWithLocality(2),
makeNewNodeWithLocality(3),
}}

var transportFn = func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) {
return ba.CreateReply(), nil
}
expectedRequestBytes := -1
expectedResponseBytes := -1
cfg := DistSenderConfig{
AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(),
Clock: clock,
NodeDescs: ns,
RPCContext: rpcContext,
RangeDescriptorDB: mockRangeDescriptorDBForDescs(rangeDesc),
TestingKnobs: ClientTestingKnobs{
TransportFactory: adaptSimpleTransport(transportFn),
BatchRequestInterceptor: func(ba *kvpb.BatchRequest) {
expectedRequestBytes = ba.Size()
},
BatchResponseInterceptor: func(br *kvpb.BatchResponse) {
expectedResponseBytes = br.Size()
},
},
Settings: cluster.MakeTestingClusterSettings(),
}

ba := &kvpb.BatchRequest{}
get := &kvpb.GetRequest{}
get.Key = rangeDesc.StartKey.AsRawKey()
ba.Add(get)

ba.Header = kvpb.Header{
GatewayNodeID: 1,
}

// In the given setup, gateway is set to be on Node 1 which is where
// replicas[0] resides. The first test sets replica[1] as the leaseholder for
// the range, enforcing a cross-region batch request / response. It is
// expected that the metrics will be updated to reflect this cross-region
// scenario.
dsFirst := NewDistSender(cfg)
dsFirst.rangeCache.Insert(ctx, roachpb.RangeInfo{
Desc: rangeDesc,
Lease: roachpb.Lease{
Replica: replicas[1],
},
})
if _, err := dsFirst.Send(ctx, ba); err != nil {
t.Fatal(err)
}
if expectedRequestBytes == -1 || expectedResponseBytes == -1 {
t.Errorf("expected batch bytes not set correctly")
}

requestBytesMetrics := dsFirst.metrics.CrossRegionBatchRequestBytes.Count()
responseBytesMetrics := dsFirst.metrics.CrossRegionBatchResponseBytes.Count()
require.Equal(t, int64(expectedRequestBytes), requestBytesMetrics,
fmt.Sprintf("expected cross-region bytes sent: %v but got %v", expectedRequestBytes, requestBytesMetrics))
require.Equal(t, int64(expectedResponseBytes), responseBytesMetrics,
fmt.Sprintf("expected cross-region bytes received: %v but got %v", expectedResponseBytes, responseBytesMetrics))

// The second test sets replica[0] as the leaseholder for the range, enforcing
// a within-same-region batch request / response. In this case, the metrics
// are expected to remain unchanged as no cross-region activities are
// involved.
dsSec := NewDistSender(cfg)
dsSec.rangeCache.Insert(ctx, roachpb.RangeInfo{
Desc: rangeDesc,
Lease: roachpb.Lease{
Replica: replicas[0],
},
})
if _, err := dsSec.Send(ctx, ba); err != nil {
t.Fatal(err)
}

requestBytesMetrics = dsSec.metrics.CrossRegionBatchRequestBytes.Count()
responseBytesMetrics = dsSec.metrics.CrossRegionBatchResponseBytes.Count()
require.Equal(t, int64(0), requestBytesMetrics,
fmt.Sprintf("expected cross-region bytes sent: 0 but got %v", requestBytesMetrics))
require.Equal(t, int64(0), responseBytesMetrics,
fmt.Sprintf("expected cross-region bytes received: 0 but got %v", responseBytesMetrics))
}

// TestDistSenderNLHEFromUninitializedReplicaDoesNotCauseUnboundedBackoff
// ensures that a NLHE from an uninitialized replica, which points to a replica
// that isn't part of the range, doesn't result in the dist sender getting
Expand Down
13 changes: 13 additions & 0 deletions pkg/kv/kvclient/kvcoord/testing_knobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ type ClientTestingKnobs struct {
// error which, if non-nil, becomes the result of the batch. Otherwise, execution
// continues.
OnRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error

// Currently, BatchRequestInterceptor and BatchResponseInterceptor only
// intercepts DistSender.Send() to pass the actual batch request and response
// byte count to the test. However, it can be easily extended to validate
// other properties of batch requests / response if required.

// BatchRequestInterceptor is designed to intercept calls to DistSender
// function calls to validate BatchRequest properties.
BatchRequestInterceptor func(ba *kvpb.BatchRequest)

// BatchResponseInterceptor is designed to intercept calls to DistSender
// function calls to validate BatchResponse properties.
BatchResponseInterceptor func(br *kvpb.BatchResponse)
}

var _ base.ModuleTestingKnobs = &ClientTestingKnobs{}
Expand Down

0 comments on commit 74c046b

Please sign in to comment.