From 3502ca2ef2dffd9f19c3bd7d7277324433ac43ad Mon Sep 17 00:00:00 2001 From: Wenyi Date: Thu, 25 May 2023 19:54:03 -0400 Subject: [PATCH 1/5] kvclient: add x-region, x-zone metrics to DistSender Previously, there were no metrics to observe cross-region, cross-zone traffic in batch requests / responses at DistSender. To improve this issue, this commit introduces six new distsender metrics - ``` "distsender.batch_requests.replica_addressed.bytes" "distsender.batch_responses.replica_addressed.bytes" "distsender.batch_requests.cross_region.bytes" "distsender.batch_responses.cross_region.bytes" "distsender.batch_requests.cross_zone.bytes" "distsender.batch_responses.cross_zone.bytes" ``` The first two metrics track the total byte count of batch requests processed and batch responses received at DistSender. Additionally, there are four metrics to track the aggregate counts processed and received across different regions and zones. Note that these metrics only track the sender node and not the receiver node as DistSender resides on the gateway node receiving SQL queries. Part of: https://github.com/cockroachdb/cockroach/issues/103983 Release note (ops change): Six new metrics - "distsender.batch_requests.replica_addressed.bytes", "distsender.batch_responses.replica_addressed.bytes", "distsender.batch_requests.cross_region.bytes", "distsender.batch_responses.cross_region.bytes", "distsender.batch_requests.cross_zone.bytes", "distsender.batch_responses.cross_zone.bytes"- are now added to DistSender metrics. For accurate metrics, follow these assumptions: - Configure region and zone tier keys consistently across nodes. - Within a node locality, ensure unique region and zone tier keys. - Maintain consistent configuration of region and zone tiers across nodes. --- pkg/kv/kvclient/kvcoord/dist_sender.go | 260 +++++++++++++++++--- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 197 +++++++++++++++ pkg/kv/kvclient/kvcoord/testing_knobs.go | 13 + 3 files changed, 434 insertions(+), 36 deletions(-) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 6e2964b0971b..04cbc59fdd71 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -13,6 +13,7 @@ package kvcoord import ( "context" "fmt" + "reflect" "runtime" "runtime/pprof" "strings" @@ -63,6 +64,54 @@ var ( Measurement: "Partial Batches", Unit: metric.Unit_COUNT, } + metaDistSenderReplicaAddressedBatchRequestBytes = metric.Metadata{ + Name: "distsender.batch_requests.replica_addressed.bytes", + Help: `Total byte count of replica-addressed batch requests processed`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaDistSenderReplicaAddressedBatchResponseBytes = metric.Metadata{ + Name: "distsender.batch_responses.replica_addressed.bytes", + Help: `Total byte count of replica-addressed batch responses received`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaDistSenderCrossRegionBatchRequestBytes = metric.Metadata{ + Name: "distsender.batch_requests.cross_region.bytes", + Help: `Total byte count of replica-addressed batch requests processed cross + region when region tiers are configured`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaDistSenderCrossRegionBatchResponseBytes = metric.Metadata{ + Name: "distsender.batch_responses.cross_region.bytes", + Help: `Total byte count of replica-addressed batch responses received cross + region when region tiers are configured`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaDistSenderCrossZoneBatchRequestBytes = metric.Metadata{ + Name: "distsender.batch_requests.cross_zone.bytes", + Help: `Total byte count of replica-addressed batch requests processed cross + zone within the same region when region and zone tiers are configured. + However, if the region tiers are not configured, this count may also include + batch data sent between different regions. Ensuring consistent configuration + of region and zone tiers across nodes helps to accurately monitor the data + transmitted.`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } + metaDistSenderCrossZoneBatchResponseBytes = metric.Metadata{ + Name: "distsender.batch_responses.cross_zone.bytes", + Help: `Total byte count of replica-addressed batch responses received cross + zone within the same region when region and zone tiers are configured. + However, if the region tiers are not configured, this count may also include + batch data received between different regions. Ensuring consistent + configuration of region and zone tiers across nodes helps to accurately + monitor the data transmitted.`, + Measurement: "Bytes", + Unit: metric.Unit_BYTES, + } metaDistSenderAsyncSentCount = metric.Metadata{ Name: "distsender.batches.async.sent", Help: "Number of partial batches sent asynchronously", @@ -241,44 +290,56 @@ func max(a, b int64) int64 { // DistSenderMetrics is the set of metrics for a given distributed sender. type DistSenderMetrics struct { - BatchCount *metric.Counter - PartialBatchCount *metric.Counter - AsyncSentCount *metric.Counter - AsyncThrottledCount *metric.Counter - SentCount *metric.Counter - LocalSentCount *metric.Counter - NextReplicaErrCount *metric.Counter - NotLeaseHolderErrCount *metric.Counter - InLeaseTransferBackoffs *metric.Counter - RangeLookups *metric.Counter - SlowRPCs *metric.Gauge - RangefeedRanges *metric.Gauge - RangefeedCatchupRanges *metric.Gauge - RangefeedErrorCatchup *metric.Counter - RangefeedRestartRanges *metric.Counter - RangefeedRestartStuck *metric.Counter - MethodCounts [kvpb.NumMethods]*metric.Counter - ErrCounts [kvpb.NumErrors]*metric.Counter + BatchCount *metric.Counter + PartialBatchCount *metric.Counter + ReplicaAddressedBatchRequestBytes *metric.Counter + ReplicaAddressedBatchResponseBytes *metric.Counter + CrossRegionBatchRequestBytes *metric.Counter + CrossRegionBatchResponseBytes *metric.Counter + CrossZoneBatchRequestBytes *metric.Counter + CrossZoneBatchResponseBytes *metric.Counter + AsyncSentCount *metric.Counter + AsyncThrottledCount *metric.Counter + SentCount *metric.Counter + LocalSentCount *metric.Counter + NextReplicaErrCount *metric.Counter + NotLeaseHolderErrCount *metric.Counter + InLeaseTransferBackoffs *metric.Counter + RangeLookups *metric.Counter + SlowRPCs *metric.Gauge + RangefeedRanges *metric.Gauge + RangefeedCatchupRanges *metric.Gauge + RangefeedErrorCatchup *metric.Counter + RangefeedRestartRanges *metric.Counter + RangefeedRestartStuck *metric.Counter + MethodCounts [kvpb.NumMethods]*metric.Counter + ErrCounts [kvpb.NumErrors]*metric.Counter } func makeDistSenderMetrics() DistSenderMetrics { m := DistSenderMetrics{ - BatchCount: metric.NewCounter(metaDistSenderBatchCount), - PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount), - AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount), - AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount), - SentCount: metric.NewCounter(metaTransportSentCount), - LocalSentCount: metric.NewCounter(metaTransportLocalSentCount), - NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount), - NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount), - InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount), - RangeLookups: metric.NewCounter(metaDistSenderRangeLookups), - SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs), - RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges), - RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges), - RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges), - RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges), - RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck), + BatchCount: metric.NewCounter(metaDistSenderBatchCount), + PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount), + AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount), + AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount), + SentCount: metric.NewCounter(metaTransportSentCount), + LocalSentCount: metric.NewCounter(metaTransportLocalSentCount), + ReplicaAddressedBatchRequestBytes: metric.NewCounter(metaDistSenderReplicaAddressedBatchRequestBytes), + ReplicaAddressedBatchResponseBytes: metric.NewCounter(metaDistSenderReplicaAddressedBatchResponseBytes), + CrossRegionBatchRequestBytes: metric.NewCounter(metaDistSenderCrossRegionBatchRequestBytes), + CrossRegionBatchResponseBytes: metric.NewCounter(metaDistSenderCrossRegionBatchResponseBytes), + CrossZoneBatchRequestBytes: metric.NewCounter(metaDistSenderCrossZoneBatchRequestBytes), + CrossZoneBatchResponseBytes: metric.NewCounter(metaDistSenderCrossZoneBatchResponseBytes), + NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount), + NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount), + InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount), + RangeLookups: metric.NewCounter(metaDistSenderRangeLookups), + SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs), + RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges), + RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges), + RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges), + RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges), + RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck), } for i := range m.MethodCounts { method := kvpb.Method(i).String() @@ -297,6 +358,43 @@ func makeDistSenderMetrics() DistSenderMetrics { return m } +// getDistSenderCounterMetrics fetches the count of each specified DisSender +// metric from the `metricNames` parameter and returns the result as a map. The +// keys in the map represent the metric metadata names, while the corresponding +// values indicate the count of each metric. If any of the specified metric +// cannot be found or is not a counter, the function will return an error. +// +// Assumption: 1. The metricNames parameter should consist of string literals +// that match the metadata names used for metric counters. 2. Each metric name +// provided in `metricNames` must exist, unique and be a counter type. +func (dm *DistSenderMetrics) getDistSenderCounterMetrics( + metricsName []string, +) (map[string]int64, error) { + metricCountMap := make(map[string]int64) + getFirstDistSenderMetric := func(metricName string) int64 { + metricsStruct := reflect.ValueOf(*dm) + for i := 0; i < metricsStruct.NumField(); i++ { + field := metricsStruct.Field(i) + switch t := field.Interface().(type) { + case *metric.Counter: + if t.Name == metricName { + return t.Count() + } + } + } + return -1 + } + + for _, metricName := range metricsName { + count := getFirstDistSenderMetric(metricName) + if count == -1 { + return map[string]int64{}, errors.Errorf("cannot find metric for %s", metricName) + } + metricCountMap[metricName] = count + } + return metricCountMap, nil +} + // FirstRangeProvider is capable of providing DistSender with the descriptor of // the first range in the cluster and notifying the DistSender when the first // range in the cluster has changed. @@ -367,6 +465,14 @@ type DistSender struct { onRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error + // BatchRequestInterceptor intercepts DistSender.Send() to pass the actual + // batch request byte count to the test. + BatchRequestInterceptor func(ba *kvpb.BatchRequest) + + // BatchRequestInterceptor intercepts DistSender.Send() to pass the actual + // batch response byte count to the test. + BatchResponseInterceptor func(br *kvpb.BatchResponse) + // locality is the description of the topography of the server on which the // DistSender is running. It is used to estimate the latency to other nodes // in the absence of a latency function. @@ -520,8 +626,12 @@ func NewDistSender(cfg DistSenderConfig) *DistSender { ds.latencyFunc = ds.rpcContext.RemoteClocks.Latency } - if cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch != nil { - ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch + if cfg.TestingKnobs.BatchRequestInterceptor != nil { + ds.BatchRequestInterceptor = cfg.TestingKnobs.BatchRequestInterceptor + } + + if cfg.TestingKnobs.BatchResponseInterceptor != nil { + ds.BatchResponseInterceptor = cfg.TestingKnobs.BatchResponseInterceptor } return ds @@ -2178,7 +2288,16 @@ func (ds *DistSender) sendToReplicas( ExplicitlyRequested: ba.ClientRangeInfo.ExplicitlyRequested || (desc.Generation == 0 && routing.LeaseSeq() == 0), } + + if ds.BatchRequestInterceptor != nil { + ds.BatchRequestInterceptor(ba) + } + shouldIncCrossRegion, shouldIncCrossZone := ds.checkAndUpdateBatchRequestMetrics(ctx, ba) br, err = transport.SendNext(ctx, ba) + if ds.BatchResponseInterceptor != nil { + ds.BatchResponseInterceptor(br) + } + ds.checkAndUpdateBatchResponseMetrics(br, shouldIncCrossRegion, shouldIncCrossZone) ds.maybeIncrementErrCounters(br, err) if err != nil { @@ -2434,6 +2553,75 @@ func (ds *DistSender) sendToReplicas( } } +// isCrossRegionCrossZoneBatch returns (bool, bool) - indicating if the given +// batch request is cross-region and cross-zone respectively. +func (ds *DistSender) isCrossRegionCrossZoneBatch( + ctx context.Context, ba *kvpb.BatchRequest, +) (bool, bool) { + gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.GatewayNodeID) + if err != nil { + log.VEventf(ctx, 2, "failed to perform look up for node descriptor %s", err) + return false, false + } + destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.Replica.NodeID) + if err != nil { + log.VEventf(ctx, 2, "failed to perform look up for node descriptor %s", err) + return false, false + } + isCrossRegion, regionErr, isCrossZone, zoneErr := gatewayNodeDesc.Locality.IsCrossRegionCrossZone(destinationNodeDesc.Locality) + if regionErr != nil { + log.VEventf(ctx, 2, "%v", regionErr) + } + if zoneErr != nil { + log.VEventf(ctx, 2, "%v", zoneErr) + } + return isCrossRegion, isCrossZone +} + +// checkAndUpdateBatchRequestMetrics updates the batch requests metrics in a +// more meaningful way. Cross-region metrics monitor activities across different +// regions. Cross-zone metrics monitor cross-zone activities within the same +// region or in cases where region tiers are not configured. The check result is +// returned here to avoid redundant check for metrics updates after receiving +// batch responses. +func (ds *DistSender) checkAndUpdateBatchRequestMetrics( + ctx context.Context, ba *kvpb.BatchRequest, +) (shouldIncCrossRegion bool, shouldIncCrossZone bool) { + ds.metrics.ReplicaAddressedBatchRequestBytes.Inc(int64(ba.Size())) + isCrossRegion, isCrossZone := ds.isCrossRegionCrossZoneBatch(ctx, ba) + if isCrossRegion { + if !isCrossZone { + log.VEventf(ctx, 2, "unexpected: cross region but same zone") + } else { + ds.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size())) + shouldIncCrossRegion = true + } + } else { + if isCrossZone { + ds.metrics.CrossZoneBatchRequestBytes.Inc(int64(ba.Size())) + shouldIncCrossZone = true + } + } + return shouldIncCrossRegion, shouldIncCrossZone +} + +// checkAndUpdateBatchResponseMetrics updates the batch response metrics based +// on the shouldIncCrossRegion and shouldIncCrossZone parameters. These +// parameters are determined during the initial check for batch requests. The +// underlying assumption is that if requests were cross-region or cross-zone, +// the response should be as well. +func (ds *DistSender) checkAndUpdateBatchResponseMetrics( + br *kvpb.BatchResponse, shouldIncCrossRegion bool, shouldIncCrossZone bool, +) { + ds.metrics.ReplicaAddressedBatchResponseBytes.Inc(int64(br.Size())) + if shouldIncCrossRegion { + ds.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size())) + } + if shouldIncCrossZone { + ds.metrics.CrossZoneBatchResponseBytes.Inc(int64(br.Size())) + } +} + // getCostControllerConfig returns the config for the tenant cost model. This // returns nil if no KV interceptors are associated with the DistSender, or the // KV interceptor is not a multitenant.TenantSideCostController. diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index b49f6bc18271..e8319fabc1a3 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -5651,6 +5651,203 @@ func TestDistSenderRPCMetrics(t *testing.T) { require.Equal(t, ds.metrics.ErrCounts[kvpb.ConditionFailedErrType].Count(), int64(1)) } +// getMapsDiff returns the difference between the values of corresponding +// metrics in two maps. Assumption: beforeMap and afterMap contain the same set +// of keys. +func getMapsDiff(beforeMap map[string]int64, afterMap map[string]int64) map[string]int64 { + diffMap := make(map[string]int64) + for metricName, beforeValue := range beforeMap { + if v, ok := afterMap[metricName]; ok { + diffMap[metricName] = v - beforeValue + } + } + return diffMap +} + +// TestDistSenderBatchMetrics verifies that the DistSender.Send() +// correctly updates the cross-region, cross-zone byte count metrics. +func TestDistSenderBatchMetrics(t *testing.T) { + defer leaktest.AfterTest(t)() + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + + // The initial setup ensures the correct setup for three nodes (with different + // localities), single-range, three replicas (on different nodes). + clock := hlc.NewClockForTesting(nil) + rpcContext := rpc.NewInsecureTestingContext(ctx, clock, stopper) + rangeDesc := testUserRangeDescriptor3Replicas + replicas := rangeDesc.InternalReplicas + + // The servers localities are configured so that the first batch request sent + // from server0 to server0 is same-region, same-zone. The second batch request + // sent from server0 to server1 is cross-region. The second batch request sent + // from server0 to server2 is cross-zone within the same region. + const numNodes = 3 + serverLocality := [numNodes]roachpb.Locality{ + {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-1"}}}, + {Tiers: []roachpb.Tier{{Key: "region", Value: "us-west"}, {Key: "az", Value: "us-west-1"}}}, + {Tiers: []roachpb.Tier{{Key: "region", Value: "us-east"}, {Key: "az", Value: "us-east-2"}}}, + } + + nodes := make([]roachpb.NodeDescriptor, 3) + for i := 0; i < numNodes; i++ { + nodes[i] = roachpb.NodeDescriptor{ + NodeID: roachpb.NodeID(i + 1 /* 0 is not a valid NodeID */), + Address: util.UnresolvedAddr{}, + Locality: serverLocality[i], + } + } + ns := &mockNodeStore{nodes: nodes} + + var transportFn = func(_ context.Context, ba *kvpb.BatchRequest) (*kvpb.BatchResponse, error) { + return ba.CreateReply(), nil + } + interceptedBatchRequestBytes, interceptedBatchResponseBytes := int64(-1), int64(-1) + cfg := DistSenderConfig{ + AmbientCtx: log.MakeTestingAmbientCtxWithNewTracer(), + Clock: clock, + NodeDescs: ns, + RPCContext: rpcContext, + RangeDescriptorDB: mockRangeDescriptorDBForDescs(rangeDesc), + TestingKnobs: ClientTestingKnobs{ + TransportFactory: adaptSimpleTransport(transportFn), + BatchRequestInterceptor: func(ba *kvpb.BatchRequest) { + interceptedBatchRequestBytes = int64(ba.Size()) + }, + BatchResponseInterceptor: func(br *kvpb.BatchResponse) { + interceptedBatchResponseBytes = int64(br.Size()) + }, + }, + Settings: cluster.MakeTestingClusterSettings(), + } + + distSender := NewDistSender(cfg) + metricsNames := []string{ + "distsender.batch_requests.replica_addressed.bytes", + "distsender.batch_responses.replica_addressed.bytes", + "distsender.batch_requests.cross_region.bytes", + "distsender.batch_responses.cross_region.bytes", + "distsender.batch_requests.cross_zone.bytes", + "distsender.batch_responses.cross_zone.bytes"} + + getExpectedDelta := func( + isCrossRegion bool, isCrossZone bool, interceptedRequest int64, interceptedResponse int64, + ) map[string]int64 { + ternaryOp := func(b bool, num int64) (res int64) { + if b { + res = num + } + return res + } + + expectedDelta := make(map[string]int64) + expectedDelta[metricsNames[0]] = interceptedRequest + expectedDelta[metricsNames[1]] = interceptedResponse + expectedDelta[metricsNames[2]] = ternaryOp(isCrossRegion, interceptedRequest) + expectedDelta[metricsNames[3]] = ternaryOp(isCrossRegion, interceptedResponse) + expectedDelta[metricsNames[4]] = ternaryOp(isCrossZone, interceptedRequest) + expectedDelta[metricsNames[5]] = ternaryOp(isCrossZone, interceptedResponse) + return expectedDelta + } + + sameRegionSameZoneRequest := int64(0) + sameRegionSameZoneResponse := int64(0) + + for _, tc := range []struct { + toReplica int + isCrossRegion bool + isCrossZone bool + }{ + // First test sets replica[0] as leaseholder, enforcing a within-region, + // within-zone batch request / response. + {toReplica: 0, isCrossRegion: false, isCrossZone: false}, + // Second test sets replica[1] as leaseholder, enforcing a cross-region, + // batch request / response. Note that although the request is cross-zone, + // the cross-zone metrics is not expected to increment. + {toReplica: 1, isCrossRegion: true, isCrossZone: false}, + // Third test sets replica[2] as leaseholder, enforcing a within-region, + // cross-zone batch request / response. Cross-zone metrics is only expected + // to increment when it is cross-zone, same-region activities. + {toReplica: 2, isCrossRegion: false, isCrossZone: true}, + } { + t.Run(fmt.Sprintf("isCrossRegion:%t-isCrossZone:%t", tc.isCrossRegion, tc.isCrossZone), func(t *testing.T) { + beforeMetrics, err := distSender.metrics.getDistSenderCounterMetrics(metricsNames) + if err != nil { + t.Fatal(err) + } + + ba := &kvpb.BatchRequest{} + if tc.toReplica == 0 { + // Send a different request type for the first request to avoid having + // the same byte count for three requests and coincidental correct + // results. + get := &kvpb.GetRequest{} + get.Key = rangeDesc.StartKey.AsRawKey() + ba.Add(get) + } else { + put := &kvpb.PutRequest{} + put.Key = rangeDesc.StartKey.AsRawKey() + ba.Add(put) + } + + ba.Header = kvpb.Header{ + // DistSender is set to be at the server0. + GatewayNodeID: 1, + } + distSender.rangeCache.Insert(ctx, roachpb.RangeInfo{ + Desc: rangeDesc, + Lease: roachpb.Lease{ + Replica: replicas[tc.toReplica], + }, + }) + + if _, err := distSender.Send(ctx, ba); err != nil { + t.Fatal(err) + } + + require.NotEqual(t, interceptedBatchRequestBytes, int64(-1), + "expected bytes not set correctly") + require.NotEqual(t, interceptedBatchResponseBytes, int64(-1), + "expected bytes not set correctly") + if tc.toReplica == 0 { + // Record the first batch request and response that was sent same + // region, same zone for future testing. + sameRegionSameZoneRequest = interceptedBatchRequestBytes + sameRegionSameZoneResponse = interceptedBatchResponseBytes + } + + expected := getExpectedDelta(tc.isCrossRegion, tc.isCrossZone, + interceptedBatchRequestBytes, interceptedBatchResponseBytes) + afterMetrics, err := distSender.metrics.getDistSenderCounterMetrics(metricsNames) + diffMetrics := getMapsDiff(beforeMetrics, afterMetrics) + if err != nil { + t.Error(err) + } + require.Equal(t, expected, diffMetrics) + }) + t.Run("SameRegionSameZone", func(t *testing.T) { + // Since the region and zone tiers are all configured in this test, we + // expect that the byte count of batch requests sent within the same + // region and same zone should equal to the total byte count of requests + // minus the combined byte count of cross-region and cross-zone requests + // metrics. Similar expectation for batch responses. + metrics, err := distSender.metrics.getDistSenderCounterMetrics(metricsNames) + if err != nil { + t.Error(err) + } + totalRequest := metrics["distsender.batch_requests.replica_addressed.bytes"] + totalResponse := metrics["distsender.batch_responses.replica_addressed.bytes"] + crossRegionRequest := metrics["distsender.batch_requests.cross_region.bytes"] + crossRegionResponse := metrics["distsender.batch_responses.cross_region.bytes"] + crossZoneRequest := metrics["distsender.batch_requests.cross_zone.bytes"] + crossZoneResponse := metrics["distsender.batch_responses.cross_zone.bytes"] + require.Equal(t, sameRegionSameZoneRequest, totalRequest-crossRegionRequest-crossZoneRequest) + require.Equal(t, sameRegionSameZoneResponse, totalResponse-crossRegionResponse-crossZoneResponse) + }) + } +} + // TestDistSenderNLHEFromUninitializedReplicaDoesNotCauseUnboundedBackoff // ensures that a NLHE from an uninitialized replica, which points to a replica // that isn't part of the range, doesn't result in the dist sender getting diff --git a/pkg/kv/kvclient/kvcoord/testing_knobs.go b/pkg/kv/kvclient/kvcoord/testing_knobs.go index 58bb2efafe1d..dc7b88927ad7 100644 --- a/pkg/kv/kvclient/kvcoord/testing_knobs.go +++ b/pkg/kv/kvclient/kvcoord/testing_knobs.go @@ -61,6 +61,19 @@ type ClientTestingKnobs struct { // error which, if non-nil, becomes the result of the batch. Otherwise, execution // continues. OnRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error + + // Currently, BatchRequestInterceptor and BatchResponseInterceptor only + // intercepts DistSender.Send() to pass the actual batch request and response + // byte count to the test. However, it can be easily extended to validate + // other properties of batch requests / response if required. + + // BatchRequestInterceptor is designed to intercept calls to DistSender + // function calls to validate BatchRequest properties. + BatchRequestInterceptor func(ba *kvpb.BatchRequest) + + // BatchResponseInterceptor is designed to intercept calls to DistSender + // function calls to validate BatchResponse properties. + BatchResponseInterceptor func(br *kvpb.BatchResponse) } var _ base.ModuleTestingKnobs = &ClientTestingKnobs{} From 2593e4b96be1ec1eda9a8a28e9c2c6ae4fdc990e Mon Sep 17 00:00:00 2001 From: Rafi Shamim Date: Mon, 5 Jun 2023 18:34:26 -0400 Subject: [PATCH 2/5] sql: CREATEROLE now includes ability to grant non-admin roles This matches the PostgreSQL behavior. Release note (security update): Users who have the CREATEROLE role option can now grant and revoke role membership in any non-admin role. This change also removes the sql.auth.createrole_allows_grant_role_membership.enabled cluster setting, which was added in v23.1. Now, the cluster setting is effectively always true. --- .../testdata/benchmark_expectations | 2 +- pkg/settings/registry.go | 5 +- pkg/sql/grant_role.go | 24 ++-- .../testdata/logic_test/grant_in_txn | 2 +- .../logictest/testdata/logic_test/grant_role | 127 ++++++++++++++++++ pkg/sql/logictest/testdata/logic_test/role | 37 ++--- pkg/sql/revoke_role.go | 25 ++-- 7 files changed, 183 insertions(+), 39 deletions(-) diff --git a/pkg/bench/rttanalysis/testdata/benchmark_expectations b/pkg/bench/rttanalysis/testdata/benchmark_expectations index bb315356df15..42a9b1c2f2ff 100644 --- a/pkg/bench/rttanalysis/testdata/benchmark_expectations +++ b/pkg/bench/rttanalysis/testdata/benchmark_expectations @@ -91,7 +91,7 @@ exp,benchmark 17,Revoke/revoke_all_on_2_tables 21,Revoke/revoke_all_on_3_tables 14,RevokeRole/revoke_1_role -16,RevokeRole/revoke_2_roles +18,RevokeRole/revoke_2_roles 10,ShowGrants/grant_2_roles 11,ShowGrants/grant_3_roles 12,ShowGrants/grant_4_roles diff --git a/pkg/settings/registry.go b/pkg/settings/registry.go index 34facc5c94c7..5af4a3a9f776 100644 --- a/pkg/settings/registry.go +++ b/pkg/settings/registry.go @@ -164,7 +164,10 @@ var retiredSettings = map[string]struct{}{ // renamed. "spanconfig.host_coalesce_adjacent.enabled": {}, "sql.defaults.experimental_stream_replication.enabled": {}, - "sql.log.unstructured_entries.enabled": {}, + + // removed as of 23.2. + "sql.log.unstructured_entries.enabled": {}, + "sql.auth.createrole_allows_grant_role_membership.enabled": {}, } // sqlDefaultSettings is the list of "grandfathered" existing sql.defaults diff --git a/pkg/sql/grant_role.go b/pkg/sql/grant_role.go index 137ebb11d929..f76d1965f441 100644 --- a/pkg/sql/grant_role.go +++ b/pkg/sql/grant_role.go @@ -54,7 +54,7 @@ func (p *planner) GrantRoleNode(ctx context.Context, n *tree.GrantRole) (*GrantR ctx, span := tracing.ChildSpan(ctx, n.StatementTag()) defer span.Finish() - hasAdminRole, err := p.HasAdminRole(ctx) + hasCreateRolePriv, err := p.HasRoleOption(ctx, roleoption.CREATEROLE) if err != nil { return nil, err } @@ -63,6 +63,7 @@ func (p *planner) GrantRoleNode(ctx context.Context, n *tree.GrantRole) (*GrantR if err != nil { return nil, err } + grantingRoleHasAdminOptionOnAdmin := allRoles[username.AdminRoleName()] inputRoles, err := decodeusername.FromNameList(n.Roles) if err != nil { @@ -76,19 +77,24 @@ func (p *planner) GrantRoleNode(ctx context.Context, n *tree.GrantRole) (*GrantR } for _, r := range inputRoles { - // If the user is an admin, don't check if the user is allowed to add/drop - // roles in the role. However, if the role being modified is the admin role, then - // make sure the user is an admin with the admin option. - if hasAdminRole && !r.IsAdminRole() { + // If the current user has CREATEROLE, and the role being granted is not an + // admin there is no need to check if the user is allowed to grant/revoke + // membership in the role. However, if the role being granted is an admin, + // then make sure the current user also has the admin option for that role. + grantedRoleIsAdmin, err := p.UserHasAdminRole(ctx, r) + if err != nil { + return nil, err + } + if hasCreateRolePriv && !grantedRoleIsAdmin { continue } - if isAdmin, ok := allRoles[r]; !ok || !isAdmin { - if r.IsAdminRole() { + if hasAdminOption := allRoles[r]; !hasAdminOption && !grantingRoleHasAdminOptionOnAdmin { + if grantedRoleIsAdmin { return nil, pgerror.Newf(pgcode.InsufficientPrivilege, - "%s is not a role admin for role %s", p.User(), r) + "%s must have admin option on role %q", p.User(), r) } return nil, pgerror.Newf(pgcode.InsufficientPrivilege, - "%s is not a superuser or role admin for role %s", p.User(), r) + "%s must have CREATEROLE or have admin option on role %q", p.User(), r) } } diff --git a/pkg/sql/logictest/testdata/logic_test/grant_in_txn b/pkg/sql/logictest/testdata/logic_test/grant_in_txn index 2931fc27709b..559bfd49e030 100644 --- a/pkg/sql/logictest/testdata/logic_test/grant_in_txn +++ b/pkg/sql/logictest/testdata/logic_test/grant_in_txn @@ -100,7 +100,7 @@ SAVEPOINT before_invalid_grant # This grant should fail as testuser no longer has right to this grant # via role_foo. -statement error testuser is not a superuser or role admin for role role_bar +statement error testuser must have CREATEROLE or have admin option on role \"role_bar\" GRANT role_bar TO testuser; statement ok diff --git a/pkg/sql/logictest/testdata/logic_test/grant_role b/pkg/sql/logictest/testdata/logic_test/grant_role index 47b70bec8701..f6e1103237a0 100644 --- a/pkg/sql/logictest/testdata/logic_test/grant_role +++ b/pkg/sql/logictest/testdata/logic_test/grant_role @@ -32,3 +32,130 @@ GRANT testuser TO public statement error pgcode 42704 role/user \"public\" does not exist REVOKE testuser FROM public + +# CREATEROLE should allow a user to GRANT/REVOKE on a role even if they do not +# have the admin option for that role. +subtest grant_with_createrole + +statement ok +CREATE USER grantor WITH CREATEROLE; +CREATE ROLE transitiveadmin; +GRANT admin TO transitiveadmin + +statement ok +SET ROLE grantor + +statement ok +CREATE ROLE parent1; +CREATE ROLE child1; +GRANT parent1 TO child1 + +# Verify that CREATEROLE is not sufficient to give admin to other users. +statement error grantor must have admin option on role \"admin\" +GRANT admin TO child2 + +# It also shouldn't allow anyone to get admin transitively. +statement error grantor must have admin option on role \"transitiveadmin\" +GRANT transitiveadmin TO child2 + +statement ok +RESET ROLE + +query TTB colnames +SHOW GRANTS ON ROLE parent1 +---- +role_name member is_admin +parent1 child1 false + +statement ok +SET ROLE grantor; +REVOKE parent1 FROM child1; +RESET ROLE + +# Without CREATEROLE, the admin option is required to grant a role. +subtest grant_with_admin_option + +statement ok +CREATE ROLE parent2; +CREATE ROLE child2; +GRANT parent2 TO grantor WITH ADMIN OPTION; +ALTER USER grantor WITH NOCREATEROLE + +statement ok +SET ROLE grantor + +statement ok +GRANT parent2 TO child2 + +statement ok +RESET ROLE + +query TTB colnames,rowsort +SHOW GRANTS ON ROLE parent2 +---- +role_name member is_admin +parent2 child2 false +parent2 grantor true + +statement ok +SET ROLE grantor; +REVOKE parent2 FROM child2; +RESET ROLE + +statement ok +GRANT admin TO grantor; +SET ROLE grantor + +# Verify that testuser can only grant an admin role if it has the admin option +# on that role. +statement error grantor must have admin option on role \"transitiveadmin\" +GRANT transitiveadmin TO child2 + +statement ok +RESET ROLE; +GRANT transitiveadmin TO grantor; +SET ROLE grantor + +statement error grantor must have admin option on role \"transitiveadmin\" +GRANT transitiveadmin TO child2 + +statement ok +RESET ROLE; +GRANT transitiveadmin TO grantor WITH ADMIN OPTION; +SET ROLE grantor + +# Now that grantor has the admin option on transitiveadmin, it can grant the role. +statement ok +GRANT transitiveadmin TO child2 + +statement ok +RESET ROLE; +REVOKE transitiveadmin FROM grantor; +REVOKE transitiveadmin FROM child2; +GRANT admin TO grantor WITH ADMIN OPTION + +# If grantor has the admin option on admin, it also can grant transitiveadmin. +statement ok +GRANT transitiveadmin TO child2 + +statement ok +RESET ROLE; +REVOKE admin FROM grantor; +REVOKE transitiveadmin FROM child2 + +# Without CREATEROLE or the admin option, then an error should occur during +# granting. +subtest grant_no_privilege + +statement ok +CREATE ROLE parent3; +CREATE ROLE child3 + +statement ok +SET ROLE grantor + +statement error grantor must have CREATEROLE or have admin option on role \"parent3\" +GRANT parent3 TO child3 + +statement ok +RESET ROLE diff --git a/pkg/sql/logictest/testdata/logic_test/role b/pkg/sql/logictest/testdata/logic_test/role index 8ba1e2aa64d9..5d759bdfcd2b 100644 --- a/pkg/sql/logictest/testdata/logic_test/role +++ b/pkg/sql/logictest/testdata/logic_test/role @@ -149,7 +149,7 @@ GRANT unknownrole TO testuser # Test role "grant" and WITH ADMIN option. user testuser -statement error pq: testuser is not a superuser or role admin for role testrole +statement error testuser must have CREATEROLE or have admin option on role "testrole" GRANT testrole TO testuser2 user root @@ -173,7 +173,7 @@ testrole testuser false user testuser -statement error pq: testuser is not a superuser or role admin for role testrole +statement error testuser must have CREATEROLE or have admin option on role "testrole" GRANT testrole TO testuser2 user root @@ -428,13 +428,13 @@ rolec roled testuser -statement error pq: testuser is not a superuser or role admin for role roled +statement error testuser must have CREATEROLE or have admin option on role "roled" GRANT roled TO rolee -statement error pq: testuser is not a superuser or role admin for role rolec +statement error testuser must have CREATEROLE or have admin option on role "rolec" GRANT rolec TO rolee -statement error pq: testuser is not a superuser or role admin for role roleb +statement error testuser must have CREATEROLE or have admin option on role "roleb" GRANT roleb TO rolee statement ok @@ -617,7 +617,7 @@ testuser statement ok REVOKE ADMIN OPTION FOR rolea FROM testuser -statement error pq: testuser is not a superuser or role admin for role rolea +statement error testuser must have CREATEROLE or have admin option on role "rolea" REVOKE ADMIN OPTION FOR rolea FROM root statement ok @@ -678,7 +678,7 @@ CREATE DATABASE db2 statement error user testuser does not have DROP privilege on database db1 DROP DATABASE db1 -statement error testuser is not a role admin for role admin +statement error testuser must have admin option on role "admin" GRANT admin TO testuser user root @@ -762,13 +762,13 @@ statement ok SELECT * FROM db2.foo # We may be in the 'newgroup', but we don't have the admin option. -statement error testuser is not a superuser or role admin for role newgroup +statement error testuser must have CREATEROLE or have admin option on role "newgroup" GRANT newgroup TO testuser2 -statement error testuser is not a superuser or role admin for role newgroup +statement error testuser must have CREATEROLE or have admin option on role "newgroup" REVOKE newgroup FROM testuser -statement error testuser is not a superuser or role admin for role newgroup +statement error testuser must have CREATEROLE or have admin option on role "newgroup" GRANT newgroup TO testuser WITH ADMIN OPTION # Regression for #31784 @@ -781,10 +781,10 @@ GRANT admin TO testuser user testuser -statement error pq: testuser is not a role admin for role admin +statement error testuser must have admin option on role "admin" GRANT admin TO user1 -statement error pq: testuser is not a role admin for role admin +statement error testuser must have admin option on role "admin" REVOKE admin FROM user1 user root @@ -990,14 +990,14 @@ CREATE ROLE IF NOT EXISTS roleg statement ok CREATE ROLE IF NOT EXISTS roleg -# Need Admin option to GRANT role, CREATEROLE should not give GRANT role privilege for other roles statement ok CREATE USER testuser3 -statement error pq: testuser is not a role admin for role admin +statement error testuser must have admin option on role "admin" GRANT admin to testuser3 -statement error pq: testuser is not a superuser or role admin for role roleg +# CREATEROLE should give GRANT role privilege for other roles. +statement ok GRANT roleg to testuser3 user root @@ -1117,11 +1117,12 @@ CREATE ROLE thisshouldntwork LOGIN LOGIN statement ok DROP ROLE parentrole -query TTB colnames +query TTB colnames,rowsort SHOW GRANTS ON ROLE ---- -role_name member is_admin -admin root true +role_name member is_admin +admin root true +roleg testuser3 false query TTB colnames SHOW GRANTS ON ROLE admin diff --git a/pkg/sql/revoke_role.go b/pkg/sql/revoke_role.go index 0a6ba0a32765..47ac48c080d2 100644 --- a/pkg/sql/revoke_role.go +++ b/pkg/sql/revoke_role.go @@ -17,6 +17,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/decodeusername" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" + "github.com/cockroachdb/cockroach/pkg/sql/roleoption" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" @@ -43,7 +44,7 @@ func (p *planner) RevokeRoleNode(ctx context.Context, n *tree.RevokeRole) (*Revo ctx, span := tracing.ChildSpan(ctx, n.StatementTag()) defer span.Finish() - hasAdminRole, err := p.HasAdminRole(ctx) + hasCreateRolePriv, err := p.HasRoleOption(ctx, roleoption.CREATEROLE) if err != nil { return nil, err } @@ -52,6 +53,7 @@ func (p *planner) RevokeRoleNode(ctx context.Context, n *tree.RevokeRole) (*Revo if err != nil { return nil, err } + revokingRoleHasAdminOptionOnAdmin := allRoles[username.AdminRoleName()] inputRoles, err := decodeusername.FromNameList(n.Roles) if err != nil { @@ -65,19 +67,24 @@ func (p *planner) RevokeRoleNode(ctx context.Context, n *tree.RevokeRole) (*Revo } for _, r := range inputRoles { - // If the user is an admin, don't check if the user is allowed to add/drop - // roles in the role. However, if the role being modified is the admin role, then - // make sure the user is an admin with the admin option. - if hasAdminRole && !r.IsAdminRole() { + // If the current user has CREATEROLE, and the role being revoked is not an + // admin there is no need to check if the user is allowed to grant/revoke + // membership in the role. However, if the role being revoked is an admin, + // then make sure the current user also has the admin option for that role. + revokedRoleIsAdmin, err := p.UserHasAdminRole(ctx, r) + if err != nil { + return nil, err + } + if hasCreateRolePriv && !revokedRoleIsAdmin { continue } - if isAdmin, ok := allRoles[r]; !ok || !isAdmin { - if r.IsAdminRole() { + if hasAdminOption := allRoles[r]; !hasAdminOption && !revokingRoleHasAdminOptionOnAdmin { + if revokedRoleIsAdmin { return nil, pgerror.Newf(pgcode.InsufficientPrivilege, - "%s is not a role admin for role %s", p.User(), r) + "%s must have admin option on role %q", p.User(), r) } return nil, pgerror.Newf(pgcode.InsufficientPrivilege, - "%s is not a superuser or role admin for role %s", p.User(), r) + "%s must have CREATEROLE or have admin option on role %q", p.User(), r) } } From 16720482e1c0dd12a9dcd860eb2f82d4e37b84ac Mon Sep 17 00:00:00 2001 From: David Hartunian Date: Mon, 22 May 2023 11:09:56 -0400 Subject: [PATCH 3/5] log: add headers, compression config to http servers This commit adds support for custom headers and gzip compression on requests made to `http-server` outputs in CRDB's logging configuration. Custom headers enable the inclusion of API keys for 3rd party sinks and gzip compression reduces network resource consumption. Resolves #103477 Release note (ops change): `http-defaults` and `http-servers` sections of the log config will now accept a `headers` field containing a map of key value string pairs which will comprise custom HTTP headers appended to every request. Additionally a `compression` value is support which can be set to `gzip` or `none` to select a compression method for the HTTP requesst body. By default `gzip` is selected. This is a change from previous functionality that did not compress by default. --- docs/generated/logsinks.md | 2 + pkg/cli/log_flags_test.go | 1 + pkg/util/log/BUILD.bazel | 1 + pkg/util/log/http_sink.go | 34 ++++++- pkg/util/log/http_sink_test.go | 65 ++++++++++++- pkg/util/log/logconfig/config.go | 10 ++ pkg/util/log/logconfig/testdata/validate | 119 +++++++++++++++++++++++ pkg/util/log/logconfig/validate.go | 4 + pkg/util/log/testdata/config | 1 + 9 files changed, 234 insertions(+), 3 deletions(-) diff --git a/docs/generated/logsinks.md b/docs/generated/logsinks.md index dacacbaad196..220fe712d397 100644 --- a/docs/generated/logsinks.md +++ b/docs/generated/logsinks.md @@ -220,6 +220,8 @@ Type-specific configuration options: | `unsafe-tls` | enables certificate authentication to be bypassed. Defaults to false. Inherited from `http-defaults.unsafe-tls` if not specified. | | `timeout` | the HTTP timeout. Defaults to 0 for no timeout. Inherited from `http-defaults.timeout` if not specified. | | `disable-keep-alives` | causes the logging sink to re-establish a new connection for every outgoing log message. This option is intended for testing only and can cause excessive network overhead in production systems. Inherited from `http-defaults.disable-keep-alives` if not specified. | +| `headers` | a list of headers to attach to each HTTP request Inherited from `http-defaults.headers` if not specified. | +| `compression` | can be "none" or "gzip" to enable gzip compression. Set to "gzip" by default. Inherited from `http-defaults.compression` if not specified. | Configuration options shared across all sink types: diff --git a/pkg/cli/log_flags_test.go b/pkg/cli/log_flags_test.go index ce0bf9b19bd4..8ee7b032ac77 100644 --- a/pkg/cli/log_flags_test.go +++ b/pkg/cli/log_flags_test.go @@ -53,6 +53,7 @@ func TestSetupLogging(t *testing.T) { `unsafe-tls: false, ` + `timeout: 0s, ` + `disable-keep-alives: false, ` + + `compression: gzip, ` + `filter: INFO, ` + `format: json-compact, ` + `redactable: true, ` + diff --git a/pkg/util/log/BUILD.bazel b/pkg/util/log/BUILD.bazel index 86645315379c..aa9cf04e6365 100644 --- a/pkg/util/log/BUILD.bazel +++ b/pkg/util/log/BUILD.bazel @@ -62,6 +62,7 @@ go_library( "//pkg/util/encoding/encodingtype", "//pkg/util/envutil", "//pkg/util/fileutil", + "//pkg/util/httputil", "//pkg/util/jsonbytes", "//pkg/util/log/channel", "//pkg/util/log/logconfig", diff --git a/pkg/util/log/http_sink.go b/pkg/util/log/http_sink.go index df906a0fb3fc..ac8b71e890a4 100644 --- a/pkg/util/log/http_sink.go +++ b/pkg/util/log/http_sink.go @@ -12,12 +12,14 @@ package log import ( "bytes" + "compress/gzip" "crypto/tls" "fmt" "net/http" "net/url" "github.com/cockroachdb/cockroach/pkg/cli/exit" + "github.com/cockroachdb/cockroach/pkg/util/httputil" "github.com/cockroachdb/cockroach/pkg/util/log/logconfig" "github.com/cockroachdb/errors" ) @@ -94,7 +96,37 @@ func (hs *httpSink) output(b []byte, opt sinkOutputOptions) (err error) { } func doPost(hs *httpSink, b []byte) (*http.Response, error) { - resp, err := hs.client.Post(hs.address, hs.contentType, bytes.NewReader(b)) + var buf = bytes.Buffer{} + var req *http.Request + + if *hs.config.Compression == logconfig.GzipCompression { + g := gzip.NewWriter(&buf) + _, err := g.Write(b) + if err != nil { + return nil, err + } + err = g.Close() + if err != nil { + return nil, err + } + } else { + buf.Write(b) + } + + req, err := http.NewRequest(http.MethodPost, hs.address, &buf) + if err != nil { + return nil, err + } + + if *hs.config.Compression == logconfig.GzipCompression { + req.Header.Add(httputil.ContentEncodingHeader, httputil.GzipEncoding) + } + + for k, v := range hs.config.Headers { + req.Header.Add(k, v) + } + req.Header.Add(httputil.ContentTypeHeader, hs.contentType) + resp, err := hs.client.Do(req) if err != nil { return nil, err } diff --git a/pkg/util/log/http_sink_test.go b/pkg/util/log/http_sink_test.go index f9e473824e96..8debeac07128 100644 --- a/pkg/util/log/http_sink_test.go +++ b/pkg/util/log/http_sink_test.go @@ -11,6 +11,7 @@ package log import ( + "bytes" "context" "io" "net" @@ -179,8 +180,9 @@ func TestMessageReceived(t *testing.T) { timeout := 5 * time.Second tb := true defaults := logconfig.HTTPDefaults{ - Address: &address, - Timeout: &timeout, + Address: &address, + Timeout: &timeout, + Compression: &logconfig.NoneCompression, // We need to disable keepalives otherwise the HTTP server in the // test will let an async goroutine run waiting for more requests. @@ -293,3 +295,62 @@ func TestHTTPSinkContentTypePlainText(t *testing.T) { testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0)) } + +func TestHTTPSinkHeadersAndCompression(t *testing.T) { + defer leaktest.AfterTest(t)() + + address := "http://localhost" // testBase appends the port + timeout := 5 * time.Second + tb := true + format := "json" + expectedContentType := "application/json" + expectedContentEncoding := logconfig.GzipCompression + defaults := logconfig.HTTPDefaults{ + Address: &address, + Timeout: &timeout, + + // We need to disable keepalives otherwise the HTTP server in the + // test will let an async goroutine run waiting for more requests. + DisableKeepAlives: &tb, + CommonSinkConfig: logconfig.CommonSinkConfig{ + Format: &format, + Buffering: disabledBufferingCfg, + }, + + Compression: &logconfig.GzipCompression, + Headers: map[string]string{"X-CRDB-TEST": "secret-value"}, + } + + testFn := func(header http.Header, body string) error { + t.Log(body) + contentType := header.Get("Content-Type") + if contentType != expectedContentType { + return errors.Newf("mismatched content type: expected %s, got %s", expectedContentType, contentType) + } + contentEncoding := header.Get("Content-Encoding") + if contentEncoding != expectedContentEncoding { + return errors.Newf("mismatched content encoding: expected %s, got %s", expectedContentEncoding, contentEncoding) + } + + var isGzipped = func(dat []byte) bool { + gzipPrefix := []byte("\x1F\x8B\x08") + return bytes.HasPrefix(dat, gzipPrefix) + } + + if !isGzipped([]byte(body)) { + return errors.New("expected gzipped body") + } + for k, v := range header { + if k == "X-Crdb-Test" { + for _, vv := range v { + if vv == "secret-value" { + return nil + } + } + } + } + return errors.New("expected to find special header in request") + } + + testBase(t, defaults, testFn, false /* hangServer */, time.Duration(0)) +} diff --git a/pkg/util/log/logconfig/config.go b/pkg/util/log/logconfig/config.go index c934b77bc6c4..92a2e4bc5a5a 100644 --- a/pkg/util/log/logconfig/config.go +++ b/pkg/util/log/logconfig/config.go @@ -476,6 +476,9 @@ type FileSinkConfig struct { prefix string } +var GzipCompression = "gzip" +var NoneCompression = "none" + // HTTPDefaults refresents the configuration defaults for HTTP sinks. type HTTPDefaults struct { // Address is the network address of the http server. The @@ -502,6 +505,13 @@ type HTTPDefaults struct { // overhead in production systems. DisableKeepAlives *bool `yaml:"disable-keep-alives,omitempty"` + // Headers is a list of headers to attach to each HTTP request + Headers map[string]string `yaml:",omitempty,flow"` + + // Compression can be "none" or "gzip" to enable gzip compression. + // Set to "gzip" by default. + Compression *string `yaml:",omitempty"` + CommonSinkConfig `yaml:",inline"` } diff --git a/pkg/util/log/logconfig/testdata/validate b/pkg/util/log/logconfig/testdata/validate index 48581a2d9106..aa2724499016 100644 --- a/pkg/util/log/logconfig/testdata/validate +++ b/pkg/util/log/logconfig/testdata/validate @@ -543,3 +543,122 @@ capture-stray-errors: enable: true dir: /default-dir max-group-size: 100MiB + +# Check that each component of buffering struct propagates to http-sinks +# Ensure servers have gzip compression on by default and headers if set +yaml +http-defaults: + buffering: + max-staleness: 15s + flush-trigger-size: 10KiB + max-buffer-size: 2MiB +sinks: + http-servers: + a: + address: a + channels: STORAGE + headers: {X-CRDB-HEADER: header-value-a} + buffering: + max-staleness: 10s + b: + address: b + channels: OPS + headers: {X-CRDB-HEADER: header-value-b, X-ANOTHER-HEADER: zz-yy-bb} + buffering: + flush-trigger-size: 5.0KiB + c: + address: c + channels: HEALTH + buffering: + max-buffer-size: 3MiB + d: + address: d + channels: SESSIONS + buffering: NONE +---- +sinks: + file-groups: + default: + channels: {INFO: all} + filter: INFO + http-servers: + a: + channels: {INFO: [STORAGE]} + address: a + method: POST + unsafe-tls: false + timeout: 0s + disable-keep-alives: false + headers: {X-CRDB-HEADER: header-value-a} + compression: gzip + filter: INFO + format: json-compact + redact: false + redactable: true + exit-on-error: false + auditable: false + buffering: + max-staleness: 10s + flush-trigger-size: 10KiB + max-buffer-size: 2.0MiB + format: newline + b: + channels: {INFO: [OPS]} + address: b + method: POST + unsafe-tls: false + timeout: 0s + disable-keep-alives: false + headers: {X-ANOTHER-HEADER: zz-yy-bb, X-CRDB-HEADER: header-value-b} + compression: gzip + filter: INFO + format: json-compact + redact: false + redactable: true + exit-on-error: false + auditable: false + buffering: + max-staleness: 15s + flush-trigger-size: 5.0KiB + max-buffer-size: 2.0MiB + format: newline + c: + channels: {INFO: [HEALTH]} + address: c + method: POST + unsafe-tls: false + timeout: 0s + disable-keep-alives: false + compression: gzip + filter: INFO + format: json-compact + redact: false + redactable: true + exit-on-error: false + auditable: false + buffering: + max-staleness: 15s + flush-trigger-size: 10KiB + max-buffer-size: 3.0MiB + format: newline + d: + channels: {INFO: [SESSIONS]} + address: d + method: POST + unsafe-tls: false + timeout: 0s + disable-keep-alives: false + compression: gzip + filter: INFO + format: json-compact + redact: false + redactable: true + exit-on-error: false + auditable: false + buffering: NONE + stderr: + filter: NONE +capture-stray-errors: + enable: true + dir: /default-dir + max-group-size: 100MiB diff --git a/pkg/util/log/logconfig/validate.go b/pkg/util/log/logconfig/validate.go index 150df6323cf7..71a205ec23bd 100644 --- a/pkg/util/log/logconfig/validate.go +++ b/pkg/util/log/logconfig/validate.go @@ -112,6 +112,7 @@ func (c *Config) Validate(defaultLogDir *string) (resErr error) { DisableKeepAlives: &bf, Method: func() *HTTPSinkMethod { m := HTTPSinkMethod(http.MethodPost); return &m }(), Timeout: &zeroDuration, + Compression: &GzipCompression, } propagateCommonDefaults(&baseFileDefaults.CommonSinkConfig, baseCommonSinkConfig) @@ -454,6 +455,9 @@ func (c *Config) validateHTTPSinkConfig(hsc *HTTPSinkConfig) error { if hsc.Address == nil || len(*hsc.Address) == 0 { return errors.New("address cannot be empty") } + if *hsc.Compression != GzipCompression && *hsc.Compression != NoneCompression { + return errors.New("compression must be 'gzip' or 'none'") + } return c.ValidateCommonSinkConfig(hsc.CommonSinkConfig) } diff --git a/pkg/util/log/testdata/config b/pkg/util/log/testdata/config index 61405cb63cf6..0a27abb5d4e4 100644 --- a/pkg/util/log/testdata/config +++ b/pkg/util/log/testdata/config @@ -119,6 +119,7 @@ sinks: unsafe-tls: false timeout: 0s disable-keep-alives: false + compression: gzip filter: INFO format: json-compact redact: false From bae5045069c9f39cf3365dd87538fceb1a38228c Mon Sep 17 00:00:00 2001 From: Eric Harmeling Date: Wed, 31 May 2023 17:44:52 -0400 Subject: [PATCH 4/5] metrics: fix windowed histogram merging approach Fixes #103814. Fixes #98266. This commit updates the windowed histogram merging approach to add the previous window's histogram bucket counts and sample count to those of the current one. As a result of this change, the histograms will no longer report under-sampled quantile values, and timeseries metrics-derived charts (e.g., the quantile-based SQL service latency charts on the DB console's Metrics page) will more accurately display metrics. Release note (bug fix): Updated the histogram window merge calculation to more accurately interpolate quantile values. This change will result in smoother, more accurate Metrics charts on the DB Console. Co-authored-by: Aaditya Sondhi <20070511+aadityasondhi@users.noreply.github.com> --- pkg/util/metric/hdrhistogram.go | 31 +++---- pkg/util/metric/metric.go | 101 +++++++++++++++------ pkg/util/metric/metric_test.go | 151 +++++++++++++++++++++++++++++++- 3 files changed, 238 insertions(+), 45 deletions(-) diff --git a/pkg/util/metric/hdrhistogram.go b/pkg/util/metric/hdrhistogram.go index b6b562403e2a..65175eef0751 100644 --- a/pkg/util/metric/hdrhistogram.go +++ b/pkg/util/metric/hdrhistogram.go @@ -18,14 +18,9 @@ import ( prometheusgo "github.com/prometheus/client_model/go" ) -const ( - // HdrHistogramMaxLatency is the maximum value tracked in latency histograms. Higher - // values will be recorded as this value instead. - HdrHistogramMaxLatency = 10 * time.Second - - // The number of histograms to keep in rolling window. - hdrHistogramHistWrapNum = 2 // TestSampleInterval is passed to histograms during tests which don't -) +// HdrHistogramMaxLatency is the maximum value tracked in latency histograms. Higher +// values will be recorded as this value instead. +const HdrHistogramMaxLatency = 10 * time.Second // A HdrHistogram collects observed values by keeping bucketed counts. For // convenience, internally two sets of buckets are kept: A cumulative set (i.e. @@ -64,12 +59,12 @@ func NewHdrHistogram( Metadata: metadata, maxVal: maxVal, } - wHist := hdrhistogram.NewWindowed(hdrHistogramHistWrapNum, 0, maxVal, sigFigs) + wHist := hdrhistogram.NewWindowed(WindowedHistogramWrapNum, 0, maxVal, sigFigs) h.mu.cumulative = hdrhistogram.New(0, maxVal, sigFigs) h.mu.sliding = wHist h.mu.tickHelper = &tickHelper{ nextT: now(), - tickInterval: duration / hdrHistogramHistWrapNum, + tickInterval: duration / WindowedHistogramWrapNum, onTick: func() { wHist.Rotate() }, @@ -171,15 +166,19 @@ func (h *HdrHistogram) ToPrometheusMetric() *prometheusgo.Metric { // TotalWindowed implements the WindowedHistogram interface. func (h *HdrHistogram) TotalWindowed() (int64, float64) { - pHist := h.ToPrometheusMetricWindowed().Histogram - return int64(pHist.GetSampleCount()), pHist.GetSampleSum() + h.mu.Lock() + defer h.mu.Unlock() + hist := h.mu.sliding.Merge() + totalSum := float64(hist.TotalCount()) * hist.Mean() + return hist.TotalCount(), totalSum } func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric { hist := &prometheusgo.Histogram{} maybeTick(h.mu.tickHelper) - bars := h.mu.sliding.Current.Distribution() + mergedHist := h.mu.sliding.Merge() + bars := mergedHist.Distribution() hist.Bucket = make([]*prometheusgo.Bucket, 0, len(bars)) var cumCount uint64 @@ -202,7 +201,6 @@ func (h *HdrHistogram) toPrometheusMetricWindowedLocked() *prometheusgo.Metric { } hist.SampleCount = &cumCount hist.SampleSum = &sum // can do better here; we approximate in the loop - return &prometheusgo.Metric{ Histogram: hist, } @@ -233,13 +231,12 @@ func (h *HdrHistogram) ValueAtQuantileWindowed(q float64) float64 { func (h *HdrHistogram) Mean() float64 { h.mu.Lock() defer h.mu.Unlock() - return h.mu.cumulative.Mean() } func (h *HdrHistogram) MeanWindowed() float64 { h.mu.Lock() defer h.mu.Unlock() - - return h.mu.sliding.Current.Mean() + hist := h.mu.sliding.Merge() + return hist.Mean() } diff --git a/pkg/util/metric/metric.go b/pkg/util/metric/metric.go index 9674739c3109..e3508c2ebdb3 100644 --- a/pkg/util/metric/metric.go +++ b/pkg/util/metric/metric.go @@ -27,9 +27,14 @@ import ( "github.com/rcrowley/go-metrics" ) -// TestSampleInterval is passed to histograms during tests which don't -// want to concern themselves with supplying a "correct" interval. -const TestSampleInterval = time.Duration(math.MaxInt64) +const ( + // TestSampleInterval is passed to histograms during tests which don't + // want to concern themselves with supplying a "correct" interval. + TestSampleInterval = time.Duration(math.MaxInt64) + // WindowedHistogramWrapNum is the number of histograms to keep in rolling + // window. + WindowedHistogramWrapNum = 2 +) // Iterable provides a method for synchronized access to interior objects. type Iterable interface { @@ -97,10 +102,12 @@ type WindowedHistogram interface { Total() (int64, float64) // MeanWindowed returns the average of the samples in the current window. MeanWindowed() float64 - // Mean returns the average of the sample in teh cumulative histogram. + // Mean returns the average of the sample in the cumulative histogram. Mean() float64 // ValueAtQuantileWindowed takes a quantile value [0,100] and returns the // interpolated value at that quantile for the windowed histogram. + // Methods implementing this interface should the merge buckets, sums, + // and counts of previous and current windows. ValueAtQuantileWindowed(q float64) float64 } @@ -224,7 +231,10 @@ const ( type HistogramOptions struct { // Metadata is the metric Metadata associated with the histogram. Metadata Metadata - // Duration is the histogram's window duration. + // Duration is the total duration of all windows in the histogram. + // The individual window duration is equal to the + // Duration/WindowedHistogramWrapNum (i.e., the number of windows + // in the histogram). Duration time.Duration // MaxVal is only relevant to the HdrHistogram, and represents the // highest trackable value in the resulting histogram buckets. @@ -256,7 +266,7 @@ func NewHistogram(opt HistogramOptions) IHistogram { // NewHistogram is a prometheus-backed histogram. Depending on the value of // opts.Buckets, this is suitable for recording any kind of quantity. Common // sensible choices are {IO,Network}LatencyBuckets. -func newHistogram(meta Metadata, windowDuration time.Duration, buckets []float64) *Histogram { +func newHistogram(meta Metadata, duration time.Duration, buckets []float64) *Histogram { // TODO(obs-inf): prometheus supports labeled histograms but they require more // plumbing and don't fit into the PrometheusObservable interface any more. opts := prometheus.HistogramOpts{ @@ -268,8 +278,11 @@ func newHistogram(meta Metadata, windowDuration time.Duration, buckets []float64 cum: cum, } h.windowed.tickHelper = &tickHelper{ - nextT: now(), - tickInterval: windowDuration, + nextT: now(), + // We want to divide the total window duration by the number of windows + // because we need to rotate the windows at uniformly distributed + // intervals within a histogram's total duration. + tickInterval: duration / WindowedHistogramWrapNum, onTick: func() { h.windowed.prev = h.windowed.cur h.windowed.cur = prometheus.NewHistogram(opts) @@ -293,16 +306,13 @@ type Histogram struct { Metadata cum prometheus.Histogram - // TODO(obs-inf): the way we implement windowed histograms is not great. If - // the windowed histogram is pulled right after a tick, it will be mostly - // empty. We could add a third bucket and represent the merged view of the two - // most recent buckets to avoid that. Or we could "just" double the rotation - // interval (so that the histogram really collects for 20s when we expect to - // persist the contents every 10s). Really it would make more sense to - // explicitly rotate the histogram atomically with collecting its contents, - // but that is now how we have set it up right now. It should be doable - // though, since there is only one consumer of windowed histograms - our - // internal timeseries system. + // TODO(obs-inf): the way we implement windowed histograms is not great. + // We could "just" double the rotation interval (so that the histogram really + // collects for 20s when we expect to persist the contents every 10s). + // Really it would make more sense to explicitly rotate the histogram + // atomically with collecting its contents, but that is now how we have set + // it up right now. It should be doable though, since there is only one + // consumer of windowed histograms - our internal timeseries system. windowed struct { // prometheus.Histogram is thread safe, so we only // need an RLock to record into it. But write lock @@ -368,15 +378,23 @@ func (h *Histogram) ToPrometheusMetric() *prometheusgo.Metric { return m } -// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the right type. +// ToPrometheusMetricWindowed returns a filled-in prometheus metric of the +// right type. func (h *Histogram) ToPrometheusMetricWindowed() *prometheusgo.Metric { h.windowed.Lock() defer h.windowed.Unlock() - m := &prometheusgo.Metric{} - if err := h.windowed.cur.Write(m); err != nil { + cur := &prometheusgo.Metric{} + prev := &prometheusgo.Metric{} + if err := h.windowed.cur.Write(cur); err != nil { panic(err) } - return m + if h.windowed.prev != nil { + if err := h.windowed.prev.Write(prev); err != nil { + panic(err) + } + MergeWindowedHistogram(cur.Histogram, prev.Histogram) + } + return cur } // GetMetadata returns the metric's metadata including the Prometheus @@ -428,7 +446,8 @@ func (h *Histogram) MeanWindowed() float64 { // 2. Since the prometheus client library ensures buckets are in a strictly // increasing order at creation, we do not sort them. func (h *Histogram) ValueAtQuantileWindowed(q float64) float64 { - return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram, q) + return ValueAtQuantileWindowed(h.ToPrometheusMetricWindowed().Histogram, + q) } var _ PrometheusExportable = (*ManualWindowHistogram)(nil) @@ -592,11 +611,25 @@ func (mwh *ManualWindowHistogram) ToPrometheusMetric() *prometheusgo.Metric { return m } +// ToPrometheusMetricWindowedLocked returns a filled-in prometheus metric of the +// right type. +func (mwh *ManualWindowHistogram) ToPrometheusMetricWindowedLocked() *prometheusgo.Metric { + cur := &prometheusgo.Metric{} + if err := mwh.mu.cum.Write(cur); err != nil { + panic(err) + } + if mwh.mu.prev != nil { + MergeWindowedHistogram(cur.Histogram, mwh.mu.prev) + } + return cur +} + // TotalWindowed implements the WindowedHistogram interface. func (mwh *ManualWindowHistogram) TotalWindowed() (int64, float64) { mwh.mu.RLock() defer mwh.mu.RUnlock() - return int64(mwh.mu.cur.GetSampleCount()), mwh.mu.cur.GetSampleSum() + pHist := mwh.ToPrometheusMetricWindowedLocked().Histogram + return int64(pHist.GetSampleCount()), pHist.GetSampleSum() } // Total implements the WindowedHistogram interface. @@ -608,7 +641,8 @@ func (mwh *ManualWindowHistogram) Total() (int64, float64) { func (mwh *ManualWindowHistogram) MeanWindowed() float64 { mwh.mu.RLock() defer mwh.mu.RUnlock() - return mwh.mu.cur.GetSampleSum() / float64(mwh.mu.cur.GetSampleCount()) + pHist := mwh.ToPrometheusMetricWindowedLocked().Histogram + return pHist.GetSampleSum() / float64(pHist.GetSampleCount()) } func (mwh *ManualWindowHistogram) Mean() float64 { @@ -626,7 +660,7 @@ func (mwh *ManualWindowHistogram) ValueAtQuantileWindowed(q float64) float64 { if mwh.mu.cur == nil { return 0 } - return ValueAtQuantileWindowed(mwh.mu.cur, q) + return ValueAtQuantileWindowed(mwh.ToPrometheusMetricWindowedLocked().Histogram, q) } // A Counter holds a single mutable atomic value. @@ -881,6 +915,21 @@ func (g *GaugeFloat64) GetMetadata() Metadata { return baseMetadata } +// MergeWindowedHistogram adds the bucket counts, sample count, and sample sum +// from the previous windowed histogram to those of the current windowed +// histogram. +// NB: Buckets on each histogram must be the same +func MergeWindowedHistogram(cur *prometheusgo.Histogram, prev *prometheusgo.Histogram) { + for i, bucket := range cur.Bucket { + count := *bucket.CumulativeCount + *prev.Bucket[i].CumulativeCount + *bucket.CumulativeCount = count + } + sampleCount := *cur.SampleCount + *prev.SampleCount + *cur.SampleCount = sampleCount + sampleSum := *cur.SampleSum + *prev.SampleSum + *cur.SampleSum = sampleSum +} + // ValueAtQuantileWindowed takes a quantile value [0,100] and returns the // interpolated value at that quantile for the given histogram. func ValueAtQuantileWindowed(histogram *prometheusgo.Histogram, q float64) float64 { diff --git a/pkg/util/metric/metric_test.go b/pkg/util/metric/metric_test.go index b69c87584e2d..a0f9cde324f3 100644 --- a/pkg/util/metric/metric_test.go +++ b/pkg/util/metric/metric_test.go @@ -15,6 +15,7 @@ import ( "encoding/json" "math" "reflect" + "sort" "sync" "testing" "time" @@ -285,11 +286,10 @@ func TestNewHistogramRotate(t *testing.T) { // But cumulative histogram has history (if i > 0). count, _ := h.Total() require.EqualValues(t, i, count) - // Add a measurement and verify it's there. { h.RecordValue(12345) - f := float64(12345) + f := float64(12345) + sum _, wSum := h.TotalWindowed() require.Equal(t, wSum, f) } @@ -298,3 +298,150 @@ func TestNewHistogramRotate(t *testing.T) { // Go to beginning. } } + +func TestHistogramWindowed(t *testing.T) { + defer TestingSetNow(nil)() + setNow(0) + + duration := 10 * time.Second + + h := NewHistogram(HistogramOptions{ + Mode: HistogramModePrometheus, + Metadata: Metadata{}, + Duration: duration, + Buckets: IOLatencyBuckets, + }) + + measurements := []int64{200000000, 0, 4000000, 5000000, 10000000, 20000000, + 25000000, 30000000, 40000000, 90000000} + + // Sort the measurements so we can calculate the expected quantile values + // for the first windowed histogram after the measurements have been recorded. + sortedMeasurements := make([]int64, len(measurements)) + copy(sortedMeasurements, measurements) + sort.Slice(sortedMeasurements, func(i, j int) bool { + return sortedMeasurements[i] < sortedMeasurements[j] + }) + + // Calculate the expected quantile values as the lowest bucket values that are + // greater than each measurement. + count := 0 + j := 0 + var expQuantileValues []float64 + for i := range IOLatencyBuckets { + if j < len(sortedMeasurements) && IOLatencyBuckets[i] > float64( + sortedMeasurements[j]) { + count += 1 + j += 1 + expQuantileValues = append(expQuantileValues, IOLatencyBuckets[i]) + } + } + + w := 2 + var expHist []prometheusgo.Histogram + var expSum float64 + var expCount uint64 + for i := 0; i < w; i++ { + h.Inspect(func(interface{}) {}) // trigger ticking + if i == 0 { + // If there is no previous window, we should be unable to calculate mean + // or quantile without any observations. + require.Equal(t, 0.0, h.ValueAtQuantileWindowed(99.99)) + if !math.IsNaN(h.MeanWindowed()) { + t.Fatalf("mean should be undefined with no observations") + } + // Record all measurements on first iteration. + for _, m := range measurements { + h.RecordValue(m) + expCount += 1 + expSum += float64(m) + } + // Because we have 10 observations, we expect quantiles to correspond + // to observation indices (e.g., the 8th expected quantile value is equal + // to the value interpolated at the 80th percentile). + require.Equal(t, 0.0, h.ValueAtQuantileWindowed(0)) + require.Equal(t, expQuantileValues[0], h.ValueAtQuantileWindowed(10)) + require.Equal(t, expQuantileValues[4], h.ValueAtQuantileWindowed(50)) + require.Equal(t, expQuantileValues[7], h.ValueAtQuantileWindowed(80)) + require.Equal(t, expQuantileValues[9], h.ValueAtQuantileWindowed(99.99)) + } else { + // The SampleSum and SampleCount values in the current window before any + // observations should be equal to those of the previous window, after all + // observations (the quantile values will also be the same). + expSum = *expHist[i-1].SampleSum + expCount = *expHist[i-1].SampleCount + + // After recording a few higher-value observations in the second window, + // the quantile values will shift in the direction of the observations. + for _, m := range sortedMeasurements[len(sortedMeasurements)-3 : len( + sortedMeasurements)-1] { + h.RecordValue(m) + expCount += 1 + expSum += float64(m) + } + require.Less(t, expQuantileValues[4], h.ValueAtQuantileWindowed(50)) + require.Less(t, expQuantileValues[7], h.ValueAtQuantileWindowed(80)) + require.Equal(t, expQuantileValues[9], h.ValueAtQuantileWindowed(99.99)) + } + + // In all cases, the windowed mean should be equal to the expected sum/count + require.Equal(t, expSum/float64(expCount), h.MeanWindowed()) + + expHist = append(expHist, prometheusgo.Histogram{ + SampleCount: &expCount, + SampleSum: &expSum, + }) + + // Increment Now time to trigger tick on the following iteration. + setNow(time.Duration(i+1) * (duration / 2)) + } +} + +func TestMergeWindowedHistogram(t *testing.T) { + measurements := []int64{4000000, 90000000} + opts := prometheus.HistogramOpts{ + Buckets: IOLatencyBuckets, + } + + prevWindow := prometheus.NewHistogram(opts) + curWindow := prometheus.NewHistogram(opts) + + cur := &prometheusgo.Metric{} + prev := &prometheusgo.Metric{} + + prevWindow.Observe(float64(measurements[0])) + require.NoError(t, prevWindow.Write(prev)) + require.NoError(t, curWindow.Write(cur)) + + MergeWindowedHistogram(cur.Histogram, prev.Histogram) + // Merging a non-empty previous histogram into an empty current histogram + // should result in the current histogram containing the same sample sum, + // sample count, and per-bucket cumulative count values as the previous + // histogram. + require.Equal(t, uint64(1), *cur.Histogram.SampleCount) + require.Equal(t, float64(measurements[0]), *cur.Histogram.SampleSum) + for _, bucket := range cur.Histogram.Bucket { + if *bucket.UpperBound > float64(measurements[0]) { + require.Equal(t, uint64(1), *bucket.CumulativeCount) + } + } + + curWindow.Observe(float64(measurements[1])) + require.NoError(t, curWindow.Write(cur)) + + MergeWindowedHistogram(cur.Histogram, prev.Histogram) + // Merging a non-empty previous histogram with a non-empty current histogram + // should result in the current histogram containing its original sample sum, + // sample count, and per-bucket cumulative count values, + // plus those of the previous histogram. + require.Equal(t, uint64(2), *cur.Histogram.SampleCount) + require.Equal(t, float64(measurements[0]+measurements[1]), + *cur.Histogram.SampleSum) + for _, bucket := range cur.Histogram.Bucket { + if *bucket.UpperBound > float64(measurements[1]) { + require.Equal(t, uint64(2), *bucket.CumulativeCount) + } else if *bucket.UpperBound > float64(measurements[0]) { + require.Equal(t, uint64(1), *bucket.CumulativeCount) + } + } +} From 716c7c5ce08f5bdbb91f41eb5be566c799b087e7 Mon Sep 17 00:00:00 2001 From: Tobias Grieger Date: Tue, 13 Jun 2023 21:23:35 +0200 Subject: [PATCH 5/5] kvserver: prevent split at invalid tenant prefix keys Closes https://github.com/cockroachdb/cockroach/issues/104796. Epic: None Release note (bug fix): prevents invalid splits that can crash (and prevent restarts) of nodes that hold a replica for the right-hand side. --- pkg/kv/kvserver/BUILD.bazel | 1 + pkg/kv/kvserver/client_invalidsplit_test.go | 49 +++++++++++++++++++++ pkg/kv/kvserver/replica_command.go | 3 ++ 3 files changed, 53 insertions(+) create mode 100644 pkg/kv/kvserver/client_invalidsplit_test.go diff --git a/pkg/kv/kvserver/BUILD.bazel b/pkg/kv/kvserver/BUILD.bazel index 55c034eaa9c0..0ccb4bcd58ee 100644 --- a/pkg/kv/kvserver/BUILD.bazel +++ b/pkg/kv/kvserver/BUILD.bazel @@ -247,6 +247,7 @@ go_test( "batch_spanset_test.go", "below_raft_protos_test.go", "client_atomic_membership_change_test.go", + "client_invalidsplit_test.go", "client_lease_test.go", "client_merge_test.go", "client_metrics_test.go", diff --git a/pkg/kv/kvserver/client_invalidsplit_test.go b/pkg/kv/kvserver/client_invalidsplit_test.go new file mode 100644 index 000000000000..4355aec8835f --- /dev/null +++ b/pkg/kv/kvserver/client_invalidsplit_test.go @@ -0,0 +1,49 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package kvserver_test + +import ( + "context" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/stretchr/testify/require" +) + +func TestSplitAtInvalidTenantPrefix(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + // badKey is the tenant prefix followed by a "large" byte that indicates + // that it should be followed by a separate uvarint encoded key (which is + // not there). + // + // See: https://github.com/cockroachdb/cockroach/issues/104796 + var badKey = append([]byte{'\xfe'}, '\xfd') + _, _, err := keys.DecodeTenantPrefix(badKey) + t.Log(err) + require.Error(t, err) + + ctx := context.Background() + + tc := testcluster.StartTestCluster(t, 1, base.TestClusterArgs{ + ReplicationMode: base.ReplicationManual, + }) + defer tc.Stopper().Stop(ctx) + + _, _, err = tc.SplitRange(badKey) + require.Error(t, err) + require.Contains(t, err.Error(), `checking for valid tenantID`) +} diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 4bd39652ce57..8ba1d7f34ff9 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -391,6 +391,9 @@ func (r *Replica) adminSplitWithDescriptor( if !storage.IsValidSplitKey(foundSplitKey) { return reply, errors.Errorf("cannot split range at key %s", splitKey) } + if _, _, err := keys.DecodeTenantPrefixE(splitKey.AsRawKey()); err != nil { + return reply, errors.Wrapf(err, "checking for valid tenantID") + } } // If the range starts at the splitKey, we treat the AdminSplit