From 74c046ba61826e4ade67b5002d89660546f5fc9d Mon Sep 17 00:00:00 2001 From: Wenyi Date: Thu, 25 May 2023 19:54:03 -0400 Subject: [PATCH] kvclient: add cross-region byte count metrics to DistSender MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, there were no metrics to observe cross-region traffic in batch requests / responses. This limitation becomes problematic when trying to evaluate cross-region traffic managed by nodes. To improve this issue, this commit adds two new metrics to DistSenderMetrics – CrossRegionBatchRequestBytes and CrossRegionBatchResponseBytes. These metrics track the byte count for batch requests sent and responses received in cross-region batches within DistSender. DistSender resides on the gateway node that receives SQL queries, so DistSender metrics are updated when DistSender sends cross-region batches to another node. Note: These metrics require nodes’ localities to include a tier with the key “region”. If a node does not have this key but participates in cross-region batch activities, metrics will remain unchanged, and an error message will be logged. The region of each node is determined by using the locality field and the “region” tier value. Unfortunately, the “region” key is hard coded here. Ideally, we would prefer a more flexible approach to determine node locality. Part of: https://github.com/cockroachdb/cockroach/issues/103983 Release note (ops change): Two new metrics - CrossRegionBatchRequestBytes, CrossRegionBatchResponseBytes - are now added to DistSender metrics. Note that these metrics require nodes’ localities to include a “region” tier key. If a node lacks this key but is involved in cross-region batch activities, an error message will be logged. --- pkg/kv/kvclient/kvcoord/dist_sender.go | 152 +++++++++++++++----- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 118 +++++++++++++++ pkg/kv/kvclient/kvcoord/testing_knobs.go | 13 ++ 3 files changed, 247 insertions(+), 36 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 6e2964b0971b..8ef562a8eacd 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -180,6 +180,18 @@ This counts the number of ranges with an active rangefeed that are performing ca Measurement: "Count", Unit: metric.Unit_COUNT, } + metaDistSenderCrossRegionBatchRequestBytes = metric.Metadata{ + Name: "distsender.batch_requests.cross_region", + Help: `Total byte count of cross-region batch requests sent`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaDistSenderCrossRegionBatchResponseBytes = metric.Metadata{ + Name: "distsender.batch_responses.cross_region", + Help: `Total byte count of cross-region batch responses received`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } ) // CanSendToFollower is used by the DistSender to determine if it needs to look @@ -241,44 +253,48 @@ func max(a, b int64) int64 { // DistSenderMetrics is the set of metrics for a given distributed sender. type DistSenderMetrics struct { - BatchCount *metric.Counter - PartialBatchCount *metric.Counter - AsyncSentCount *metric.Counter - AsyncThrottledCount *metric.Counter - SentCount *metric.Counter - LocalSentCount *metric.Counter - NextReplicaErrCount *metric.Counter - NotLeaseHolderErrCount *metric.Counter - InLeaseTransferBackoffs *metric.Counter - RangeLookups *metric.Counter - SlowRPCs *metric.Gauge - RangefeedRanges *metric.Gauge - RangefeedCatchupRanges *metric.Gauge - RangefeedErrorCatchup *metric.Counter - RangefeedRestartRanges *metric.Counter - RangefeedRestartStuck *metric.Counter - MethodCounts [kvpb.NumMethods]*metric.Counter - ErrCounts [kvpb.NumErrors]*metric.Counter + BatchCount *metric.Counter + PartialBatchCount *metric.Counter + CrossRegionBatchRequestBytes *metric.Counter + CrossRegionBatchResponseBytes *metric.Counter + AsyncSentCount *metric.Counter + AsyncThrottledCount *metric.Counter + SentCount *metric.Counter + LocalSentCount *metric.Counter + NextReplicaErrCount *metric.Counter + NotLeaseHolderErrCount *metric.Counter + InLeaseTransferBackoffs *metric.Counter + RangeLookups *metric.Counter + SlowRPCs *metric.Gauge + RangefeedRanges *metric.Gauge + RangefeedCatchupRanges *metric.Gauge + RangefeedErrorCatchup *metric.Counter + RangefeedRestartRanges *metric.Counter + RangefeedRestartStuck *metric.Counter + MethodCounts [kvpb.NumMethods]*metric.Counter + ErrCounts [kvpb.NumErrors]*metric.Counter } func makeDistSenderMetrics() DistSenderMetrics { m := DistSenderMetrics{ - BatchCount: metric.NewCounter(metaDistSenderBatchCount), - PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount), - AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount), - AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount), - SentCount: metric.NewCounter(metaTransportSentCount), - LocalSentCount: metric.NewCounter(metaTransportLocalSentCount), - NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount), - NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount), - InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount), - RangeLookups: metric.NewCounter(metaDistSenderRangeLookups), - SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs), - RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges), - RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges), - RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges), - RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges), - RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck), + BatchCount: metric.NewCounter(metaDistSenderBatchCount), + PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount), + CrossRegionBatchRequestBytes: metric.NewCounter(metaDistSenderCrossRegionBatchRequestBytes), + CrossRegionBatchResponseBytes: metric.NewCounter(metaDistSenderCrossRegionBatchResponseBytes), + AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount), + AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount), + SentCount: metric.NewCounter(metaTransportSentCount), + LocalSentCount: metric.NewCounter(metaTransportLocalSentCount), + NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount), + NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount), + InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount), + RangeLookups: metric.NewCounter(metaDistSenderRangeLookups), + SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs), + RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges), + RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges), + RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges), + RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges), + RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck), } for i := range m.MethodCounts { method := kvpb.Method(i).String() @@ -367,6 +383,12 @@ type DistSender struct { onRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error + // BatchRequestInterceptor intercepts DistSender.Send() to validate BatchRequest properties. + BatchRequestInterceptor func(ba *kvpb.BatchRequest) + + // BatchResponseInterceptor intercepts DistSender.Send() to validate BatchResponse properties. + BatchResponseInterceptor func(br *kvpb.BatchResponse) + // locality is the description of the topography of the server on which the // DistSender is running. It is used to estimate the latency to other nodes // in the absence of a latency function. @@ -520,8 +542,12 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { ds.latencyFunc = ds.rpcContext.RemoteClocks.Latency } - if cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch != nil { - ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch + if cfg.TestingKnobs.BatchRequestInterceptor != nil { + ds.BatchRequestInterceptor = cfg.TestingKnobs.BatchRequestInterceptor + } + + if cfg.TestingKnobs.BatchResponseInterceptor != nil { + ds.BatchResponseInterceptor = cfg.TestingKnobs.BatchResponseInterceptor } return ds @@ -2178,9 +2204,27 @@ func (ds *DistSender) sendToReplicas( ExplicitlyRequested: ba.ClientRangeInfo.ExplicitlyRequested || (desc.Generation == 0 && routing.LeaseSeq() == 0), } + + if ds.BatchRequestInterceptor != nil { + ds.BatchRequestInterceptor(ba) + } + + isCrossRegion, err := ds.maybeIncrementCrossRegionBatchMetrics(ba) + if err != nil { + log.Eventf(ctx, "%v", err) + } + br, err = transport.SendNext(ctx, ba) + + if ds.BatchResponseInterceptor != nil { + ds.BatchResponseInterceptor(br) + } ds.maybeIncrementErrCounters(br, err) + if isCrossRegion { + ds.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size())) + } + if err != nil { if grpcutil.IsAuthError(err) { // Authentication or authorization error. Propagate. @@ -2434,6 +2478,42 @@ func (ds *DistSender) sendToReplicas( } } +// maybeIncrementCrossRegionBatchMetrics updates DistSender metrics for +// cross-region batch requests. It takes a BatchRequest parameter to extract +// information about the gatewayNodeID, destinationNodeID, and the byte count of +// batch request. It returns (bool, error) indicating whether the batch request +// is cross-region and if any errors occurred during the process. +// +// DistSender first tries to obtain the localities of the nodes. If DistSender +// is unable to obtain the node descriptor for gateway or destination node, or +// if the locality of any node does not have a “region” key, the function +// returns (false, error). If no errors occured, it checks if the gateway and +// destination nodes are in different regions. If they are, it updates the +// cross-region metrics and returns (true, nil); otherwise, it returns (false, +// nil). +// +// isCrossRegion is returned here to avoid redundant checks for cross-region +// after receiving batch responses. +func (ds *DistSender) maybeIncrementCrossRegionBatchMetrics(ba *kvpb.BatchRequest) (bool, error) { + gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.GatewayNodeID) + if err != nil { + return false, err + } + destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.Replica.NodeID) + if err != nil { + return false, err + } + isCrossRegion, err := gatewayNodeDesc.Locality.IsCrossRegion(destinationNodeDesc.Locality) + if err != nil { + return false, err + } + if isCrossRegion { + ds.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size())) + return true, nil + } + return false, nil +} + // getCostControllerConfig returns the config for the tenant cost model. This // returns nil if no KV interceptors are associated with the DistSender, or the // KV interceptor is not a multitenant.TenantSideCostController. diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index b49f6bc18271..dc715412a7ba 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -5651,6 +5651,124 @@ func TestDistSenderRPCMetrics(t *testing.T) { require.Equal(t, ds.metrics.ErrCounts[kvpb.ConditionFailedErrType].Count(), int64(1)) } +// TestDistSenderCrossRegionBatchMetrics verifies that the DistSender.Send() +// correctly updates the cross-region byte count metrics for batches requests +// sent and batch responses received. +func TestDistSenderCrossRegionBatchMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + // The initial setup ensures the correct setup for three nodes (with different + // localities), single-range, three replicas (on different nodes), gossip, and + // DistSender. + clock := hlc.NewClockForTesting(nil) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + rangeDesc := testUserRangeDescriptor3Replicas + replicas := rangeDesc.InternalReplicas + + makeNewNodeWithLocality := func(nodeID roachpb.NodeID) roachpb.NodeDescriptor { + return roachpb.NodeDescriptor{ + NodeID: nodeID, + Address: util.UnresolvedAddr{}, + Locality: roachpb.Locality{ + Tiers: []roachpb.Tier{ + {Key: "region", Value: fmt.Sprintf("us-east-%v", nodeID)}, + {Key: "zone", Value: fmt.Sprintf("us-east-%va", nodeID)}, + {Key: "az", Value: "a"}, + }, + }, + } + } + + ns := &mockNodeStore{nodes: []roachpb.NodeDescriptor{ + makeNewNodeWithLocality(1), + makeNewNodeWithLocality(2), + makeNewNodeWithLocality(3), + }} + + var transportFn = func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { + return ba.CreateReply(), nil + } + expectedRequestBytes := -1 + expectedResponseBytes := -1 + cfg := DistSenderConfig{ + AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), + Clock: clock, + NodeDescs: ns, + RPCContext: rpcContext, + RangeDescriptorDB: mockRangeDescriptorDBForDescs(rangeDesc), + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(transportFn), + BatchRequestInterceptor: func(ba *kvpb.BatchRequest) { + expectedRequestBytes = ba.Size() + }, + BatchResponseInterceptor: func(br *kvpb.BatchResponse) { + expectedResponseBytes = br.Size() + }, + }, + Settings: cluster.MakeTestingClusterSettings(), + } + + ba := &kvpb.BatchRequest{} + get := &kvpb.GetRequest{} + get.Key = rangeDesc.StartKey.AsRawKey() + ba.Add(get) + + ba.Header = kvpb.Header{ + GatewayNodeID: 1, + } + + // In the given setup, gateway is set to be on Node 1 which is where + // replicas[0] resides. The first test sets replica[1] as the leaseholder for + // the range, enforcing a cross-region batch request / response. It is + // expected that the metrics will be updated to reflect this cross-region + // scenario. + dsFirst := NewDistSender(cfg) + dsFirst.rangeCache.Insert(ctx, roachpb.RangeInfo{ + Desc: rangeDesc, + Lease: roachpb.Lease{ + Replica: replicas[1], + }, + }) + if _, err := dsFirst.Send(ctx, ba); err != nil { + t.Fatal(err) + } + if expectedRequestBytes == -1 || expectedResponseBytes == -1 { + t.Errorf("expected batch bytes not set correctly") + } + + requestBytesMetrics := dsFirst.metrics.CrossRegionBatchRequestBytes.Count() + responseBytesMetrics := dsFirst.metrics.CrossRegionBatchResponseBytes.Count() + require.Equal(t, int64(expectedRequestBytes), requestBytesMetrics, + fmt.Sprintf("expected cross-region bytes sent: %v but got %v", expectedRequestBytes, requestBytesMetrics)) + require.Equal(t, int64(expectedResponseBytes), responseBytesMetrics, + fmt.Sprintf("expected cross-region bytes received: %v but got %v", expectedResponseBytes, responseBytesMetrics)) + + // The second test sets replica[0] as the leaseholder for the range, enforcing + // a within-same-region batch request / response. In this case, the metrics + // are expected to remain unchanged as no cross-region activities are + // involved. + dsSec := NewDistSender(cfg) + dsSec.rangeCache.Insert(ctx, roachpb.RangeInfo{ + Desc: rangeDesc, + Lease: roachpb.Lease{ + Replica: replicas[0], + }, + }) + if _, err := dsSec.Send(ctx, ba); err != nil { + t.Fatal(err) + } + + requestBytesMetrics = dsSec.metrics.CrossRegionBatchRequestBytes.Count() + responseBytesMetrics = dsSec.metrics.CrossRegionBatchResponseBytes.Count() + require.Equal(t, int64(0), requestBytesMetrics, + fmt.Sprintf("expected cross-region bytes sent: 0 but got %v", requestBytesMetrics)) + require.Equal(t, int64(0), responseBytesMetrics, + fmt.Sprintf("expected cross-region bytes received: 0 but got %v", responseBytesMetrics)) +} + // TestDistSenderNLHEFromUninitializedReplicaDoesNotCauseUnboundedBackoff // ensures that a NLHE from an uninitialized replica, which points to a replica // that isn't part of the range, doesn't result in the dist sender getting diff --git a/pkg/kv/kvclient/kvcoord/testing_knobs.go b/pkg/kv/kvclient/kvcoord/testing_knobs.go index 58bb2efafe1d..dc7b88927ad7 100644 --- a/pkg/kv/kvclient/kvcoord/testing_knobs.go +++ b/pkg/kv/kvclient/kvcoord/testing_knobs.go @@ -61,6 +61,19 @@ type ClientTestingKnobs struct { // error which, if non-nil, becomes the result of the batch. Otherwise, execution // continues. OnRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error + + // Currently, BatchRequestInterceptor and BatchResponseInterceptor only + // intercepts DistSender.Send() to pass the actual batch request and response + // byte count to the test. However, it can be easily extended to validate + // other properties of batch requests / response if required. + + // BatchRequestInterceptor is designed to intercept calls to DistSender + // function calls to validate BatchRequest properties. + BatchRequestInterceptor func(ba *kvpb.BatchRequest) + + // BatchResponseInterceptor is designed to intercept calls to DistSender + // function calls to validate BatchResponse properties. + BatchResponseInterceptor func(br *kvpb.BatchResponse) } var _ base.ModuleTestingKnobs = &ClientTestingKnobs{}