Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
103729: log: add headers, compression config to http servers r=healthy-pod a=dhartunian

This commit adds support for custom headers and gzip compression on requests made to `http-server` outputs in CRDB's logging configuration. Custom headers enable the inclusion of API keys for 3rd party sinks and gzip compression reduces network resource consumption.

Resolves #103477

Release note (ops change): `http-defaults` and `http-servers` sections of the log config will now accept a `headers` field containing a map of key value string pairs which will comprise custom HTTP headers appended to every request. Additionally a `compression` value is support which can be set to `gzip` or `none` to select a compression method for the HTTP requesst body. By default `gzip` is selected. This is a change from previous functionality that did not compress by default.

103963: kvclient: add x-region, x-zone metrics to DistSender r=healthy-pod a=wenyihu6

Previously, there were no metrics to observe cross-region, cross-zone traffic in
batch requests / responses at DistSender.

To improve this issue, this commit introduces six new distsender metrics -
```
"distsender.batch_requests.replica_addressed.bytes"
"distsender.batch_responses.replica_addressed.bytes"
"distsender.batch_requests.cross_region.bytes"
"distsender.batch_responses.cross_region.bytes"
"distsender.batch_requests.cross_zone.bytes"
"distsender.batch_responses.cross_zone.bytes"
```

The first two metrics track the total byte count of batch requests processed and
batch responses received at DistSender. Additionally, there are four metrics to
track the aggregate counts processed and received across different regions and
zones. Note that these metrics only track the sender node and not the receiver
node as DistSender resides on the gateway node receiving SQL queries.

Part of: #103983

Release note (ops change): Six new metrics -
"distsender.batch_requests.replica_addressed.bytes",
"distsender.batch_responses.replica_addressed.bytes",
"distsender.batch_requests.cross_region.bytes",
"distsender.batch_responses.cross_region.bytes",
"distsender.batch_requests.cross_zone.bytes",
"distsender.batch_responses.cross_zone.bytes"- are now added to DistSender 
metrics.

For accurate metrics, follow these assumptions:
- Configure region and zone tier keys consistently across nodes.
- Within a node locality, ensure unique region and zone tier keys.
- Maintain consistent configuration of region and zone tiers across nodes.

104088: metrics: fix windowed histogram merging approach r=ericharmeling a=ericharmeling

Fixes #103814.
Fixes #98266.

This commit updates the windowed histogram merging approach to add the previous window's histogram bucket counts and sample count to those of the current one. As a result of this change, the histograms will no longer report under-sampled quantile values, and timeseries metrics-derived charts (e.g., the quantile-based SQL service latency charts on the DB console's Metrics page) will more accurately display metrics.

Release note (bug fix): Updated the histogram window merge calculation to more accurately interpolate quantile values. This change will result in smoother, more accurate Metrics charts on the DB Console.

104376: sql: CREATEROLE now includes ability to grant non-admin roles r=rafiss a=rafiss

fixes #104371

This matches the PostgreSQL behavior.

Release note (security update): Users who have the CREATEROLE role
option can now grant and revoke role membership in any non-admin role.

This change also removes the sql.auth.createrole_allows_grant_role_membership.enabled
cluster setting, which was added in v23.1. Now, the cluster setting is
effectively always true.

104802: kvserver: prevent split at invalid tenant prefix keys r=arulajmani a=tbg

Closes #104796.

Epic: None
Release note (bug fix): prevents invalid splits that can crash (and prevent
restarts) of nodes that hold a replica for the right-hand side.


Co-authored-by: David Hartunian <[email protected]>
Co-authored-by: Wenyi <[email protected]>
Co-authored-by: Eric Harmeling <[email protected]>
Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Tobias Grieger <[email protected]>
  • Loading branch information
6 people committed Jun 13, 2023
6 parents dc2584c + 1672048 + 3502ca2 + bae5045 + 2593e4b + 716c7c5 commit 99f92ee
Show file tree
Hide file tree
Showing 25 changed files with 1,142 additions and 123 deletions.
2 changes: 2 additions & 0 deletions docs/generated/logsinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,8 @@ Type-specific configuration options:
| `unsafe-tls` | enables certificate authentication to be bypassed. Defaults to false. Inherited from `http-defaults.unsafe-tls` if not specified. |
| `timeout` | the HTTP timeout. Defaults to 0 for no timeout. Inherited from `http-defaults.timeout` if not specified. |
| `disable-keep-alives` | causes the logging sink to re-establish a new connection for every outgoing log message. This option is intended for testing only and can cause excessive network overhead in production systems. Inherited from `http-defaults.disable-keep-alives` if not specified. |
| `headers` | a list of headers to attach to each HTTP request Inherited from `http-defaults.headers` if not specified. |
| `compression` | can be "none" or "gzip" to enable gzip compression. Set to "gzip" by default. Inherited from `http-defaults.compression` if not specified. |


Configuration options shared across all sink types:
Expand Down
2 changes: 1 addition & 1 deletion pkg/bench/rttanalysis/testdata/benchmark_expectations
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ exp,benchmark
17,Revoke/revoke_all_on_2_tables
21,Revoke/revoke_all_on_3_tables
14,RevokeRole/revoke_1_role
16,RevokeRole/revoke_2_roles
18,RevokeRole/revoke_2_roles
10,ShowGrants/grant_2_roles
11,ShowGrants/grant_3_roles
12,ShowGrants/grant_4_roles
Expand Down
1 change: 1 addition & 0 deletions pkg/cli/log_flags_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestSetupLogging(t *testing.T) {
`unsafe-tls: false, ` +
`timeout: 0s, ` +
`disable-keep-alives: false, ` +
`compression: gzip, ` +
`filter: INFO, ` +
`format: json-compact, ` +
`redactable: true, ` +
Expand Down
260 changes: 224 additions & 36 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvcoord
import (
"context"
"fmt"
"reflect"
"runtime"
"runtime/pprof"
"strings"
Expand Down Expand Up @@ -63,6 +64,54 @@ var (
Measurement: "Partial Batches",
Unit: metric.Unit_COUNT,
}
metaDistSenderReplicaAddressedBatchRequestBytes = metric.Metadata{
Name: "distsender.batch_requests.replica_addressed.bytes",
Help: `Total byte count of replica-addressed batch requests processed`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaDistSenderReplicaAddressedBatchResponseBytes = metric.Metadata{
Name: "distsender.batch_responses.replica_addressed.bytes",
Help: `Total byte count of replica-addressed batch responses received`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaDistSenderCrossRegionBatchRequestBytes = metric.Metadata{
Name: "distsender.batch_requests.cross_region.bytes",
Help: `Total byte count of replica-addressed batch requests processed cross
region when region tiers are configured`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaDistSenderCrossRegionBatchResponseBytes = metric.Metadata{
Name: "distsender.batch_responses.cross_region.bytes",
Help: `Total byte count of replica-addressed batch responses received cross
region when region tiers are configured`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaDistSenderCrossZoneBatchRequestBytes = metric.Metadata{
Name: "distsender.batch_requests.cross_zone.bytes",
Help: `Total byte count of replica-addressed batch requests processed cross
zone within the same region when region and zone tiers are configured.
However, if the region tiers are not configured, this count may also include
batch data sent between different regions. Ensuring consistent configuration
of region and zone tiers across nodes helps to accurately monitor the data
transmitted.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaDistSenderCrossZoneBatchResponseBytes = metric.Metadata{
Name: "distsender.batch_responses.cross_zone.bytes",
Help: `Total byte count of replica-addressed batch responses received cross
zone within the same region when region and zone tiers are configured.
However, if the region tiers are not configured, this count may also include
batch data received between different regions. Ensuring consistent
configuration of region and zone tiers across nodes helps to accurately
monitor the data transmitted.`,
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
metaDistSenderAsyncSentCount = metric.Metadata{
Name: "distsender.batches.async.sent",
Help: "Number of partial batches sent asynchronously",
Expand Down Expand Up @@ -241,44 +290,56 @@ func max(a, b int64) int64 {

// DistSenderMetrics is the set of metrics for a given distributed sender.
type DistSenderMetrics struct {
BatchCount *metric.Counter
PartialBatchCount *metric.Counter
AsyncSentCount *metric.Counter
AsyncThrottledCount *metric.Counter
SentCount *metric.Counter
LocalSentCount *metric.Counter
NextReplicaErrCount *metric.Counter
NotLeaseHolderErrCount *metric.Counter
InLeaseTransferBackoffs *metric.Counter
RangeLookups *metric.Counter
SlowRPCs *metric.Gauge
RangefeedRanges *metric.Gauge
RangefeedCatchupRanges *metric.Gauge
RangefeedErrorCatchup *metric.Counter
RangefeedRestartRanges *metric.Counter
RangefeedRestartStuck *metric.Counter
MethodCounts [kvpb.NumMethods]*metric.Counter
ErrCounts [kvpb.NumErrors]*metric.Counter
BatchCount *metric.Counter
PartialBatchCount *metric.Counter
ReplicaAddressedBatchRequestBytes *metric.Counter
ReplicaAddressedBatchResponseBytes *metric.Counter
CrossRegionBatchRequestBytes *metric.Counter
CrossRegionBatchResponseBytes *metric.Counter
CrossZoneBatchRequestBytes *metric.Counter
CrossZoneBatchResponseBytes *metric.Counter
AsyncSentCount *metric.Counter
AsyncThrottledCount *metric.Counter
SentCount *metric.Counter
LocalSentCount *metric.Counter
NextReplicaErrCount *metric.Counter
NotLeaseHolderErrCount *metric.Counter
InLeaseTransferBackoffs *metric.Counter
RangeLookups *metric.Counter
SlowRPCs *metric.Gauge
RangefeedRanges *metric.Gauge
RangefeedCatchupRanges *metric.Gauge
RangefeedErrorCatchup *metric.Counter
RangefeedRestartRanges *metric.Counter
RangefeedRestartStuck *metric.Counter
MethodCounts [kvpb.NumMethods]*metric.Counter
ErrCounts [kvpb.NumErrors]*metric.Counter
}

func makeDistSenderMetrics() DistSenderMetrics {
m := DistSenderMetrics{
BatchCount: metric.NewCounter(metaDistSenderBatchCount),
PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount),
AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount),
AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount),
SentCount: metric.NewCounter(metaTransportSentCount),
LocalSentCount: metric.NewCounter(metaTransportLocalSentCount),
NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount),
NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount),
InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount),
RangeLookups: metric.NewCounter(metaDistSenderRangeLookups),
SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs),
RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges),
RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges),
RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges),
RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges),
RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck),
BatchCount: metric.NewCounter(metaDistSenderBatchCount),
PartialBatchCount: metric.NewCounter(metaDistSenderPartialBatchCount),
AsyncSentCount: metric.NewCounter(metaDistSenderAsyncSentCount),
AsyncThrottledCount: metric.NewCounter(metaDistSenderAsyncThrottledCount),
SentCount: metric.NewCounter(metaTransportSentCount),
LocalSentCount: metric.NewCounter(metaTransportLocalSentCount),
ReplicaAddressedBatchRequestBytes: metric.NewCounter(metaDistSenderReplicaAddressedBatchRequestBytes),
ReplicaAddressedBatchResponseBytes: metric.NewCounter(metaDistSenderReplicaAddressedBatchResponseBytes),
CrossRegionBatchRequestBytes: metric.NewCounter(metaDistSenderCrossRegionBatchRequestBytes),
CrossRegionBatchResponseBytes: metric.NewCounter(metaDistSenderCrossRegionBatchResponseBytes),
CrossZoneBatchRequestBytes: metric.NewCounter(metaDistSenderCrossZoneBatchRequestBytes),
CrossZoneBatchResponseBytes: metric.NewCounter(metaDistSenderCrossZoneBatchResponseBytes),
NextReplicaErrCount: metric.NewCounter(metaTransportSenderNextReplicaErrCount),
NotLeaseHolderErrCount: metric.NewCounter(metaDistSenderNotLeaseHolderErrCount),
InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount),
RangeLookups: metric.NewCounter(metaDistSenderRangeLookups),
SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs),
RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges),
RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges),
RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges),
RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges),
RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck),
}
for i := range m.MethodCounts {
method := kvpb.Method(i).String()
Expand All @@ -297,6 +358,43 @@ func makeDistSenderMetrics() DistSenderMetrics {
return m
}

// getDistSenderCounterMetrics fetches the count of each specified DisSender
// metric from the `metricNames` parameter and returns the result as a map. The
// keys in the map represent the metric metadata names, while the corresponding
// values indicate the count of each metric. If any of the specified metric
// cannot be found or is not a counter, the function will return an error.
//
// Assumption: 1. The metricNames parameter should consist of string literals
// that match the metadata names used for metric counters. 2. Each metric name
// provided in `metricNames` must exist, unique and be a counter type.
func (dm *DistSenderMetrics) getDistSenderCounterMetrics(
metricsName []string,
) (map[string]int64, error) {
metricCountMap := make(map[string]int64)
getFirstDistSenderMetric := func(metricName string) int64 {
metricsStruct := reflect.ValueOf(*dm)
for i := 0; i < metricsStruct.NumField(); i++ {
field := metricsStruct.Field(i)
switch t := field.Interface().(type) {
case *metric.Counter:
if t.Name == metricName {
return t.Count()
}
}
}
return -1
}

for _, metricName := range metricsName {
count := getFirstDistSenderMetric(metricName)
if count == -1 {
return map[string]int64{}, errors.Errorf("cannot find metric for %s", metricName)
}
metricCountMap[metricName] = count
}
return metricCountMap, nil
}

// FirstRangeProvider is capable of providing DistSender with the descriptor of
// the first range in the cluster and notifying the DistSender when the first
// range in the cluster has changed.
Expand Down Expand Up @@ -367,6 +465,14 @@ type DistSender struct {

onRangeSpanningNonTxnalBatch func(ba *kvpb.BatchRequest) *kvpb.Error

// BatchRequestInterceptor intercepts DistSender.Send() to pass the actual
// batch request byte count to the test.
BatchRequestInterceptor func(ba *kvpb.BatchRequest)

// BatchRequestInterceptor intercepts DistSender.Send() to pass the actual
// batch response byte count to the test.
BatchResponseInterceptor func(br *kvpb.BatchResponse)

// locality is the description of the topography of the server on which the
// DistSender is running. It is used to estimate the latency to other nodes
// in the absence of a latency function.
Expand Down Expand Up @@ -520,8 +626,12 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
ds.latencyFunc = ds.rpcContext.RemoteClocks.Latency
}

if cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch != nil {
ds.onRangeSpanningNonTxnalBatch = cfg.TestingKnobs.OnRangeSpanningNonTxnalBatch
if cfg.TestingKnobs.BatchRequestInterceptor != nil {
ds.BatchRequestInterceptor = cfg.TestingKnobs.BatchRequestInterceptor
}

if cfg.TestingKnobs.BatchResponseInterceptor != nil {
ds.BatchResponseInterceptor = cfg.TestingKnobs.BatchResponseInterceptor
}

return ds
Expand Down Expand Up @@ -2178,7 +2288,16 @@ func (ds *DistSender) sendToReplicas(
ExplicitlyRequested: ba.ClientRangeInfo.ExplicitlyRequested ||
(desc.Generation == 0 && routing.LeaseSeq() == 0),
}

if ds.BatchRequestInterceptor != nil {
ds.BatchRequestInterceptor(ba)
}
shouldIncCrossRegion, shouldIncCrossZone := ds.checkAndUpdateBatchRequestMetrics(ctx, ba)
br, err = transport.SendNext(ctx, ba)
if ds.BatchResponseInterceptor != nil {
ds.BatchResponseInterceptor(br)
}
ds.checkAndUpdateBatchResponseMetrics(br, shouldIncCrossRegion, shouldIncCrossZone)
ds.maybeIncrementErrCounters(br, err)

if err != nil {
Expand Down Expand Up @@ -2434,6 +2553,75 @@ func (ds *DistSender) sendToReplicas(
}
}

// isCrossRegionCrossZoneBatch returns (bool, bool) - indicating if the given
// batch request is cross-region and cross-zone respectively.
func (ds *DistSender) isCrossRegionCrossZoneBatch(
ctx context.Context, ba *kvpb.BatchRequest,
) (bool, bool) {
gatewayNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.GatewayNodeID)
if err != nil {
log.VEventf(ctx, 2, "failed to perform look up for node descriptor %s", err)
return false, false
}
destinationNodeDesc, err := ds.nodeDescs.GetNodeDescriptor(ba.Replica.NodeID)
if err != nil {
log.VEventf(ctx, 2, "failed to perform look up for node descriptor %s", err)
return false, false
}
isCrossRegion, regionErr, isCrossZone, zoneErr := gatewayNodeDesc.Locality.IsCrossRegionCrossZone(destinationNodeDesc.Locality)
if regionErr != nil {
log.VEventf(ctx, 2, "%v", regionErr)
}
if zoneErr != nil {
log.VEventf(ctx, 2, "%v", zoneErr)
}
return isCrossRegion, isCrossZone
}

// checkAndUpdateBatchRequestMetrics updates the batch requests metrics in a
// more meaningful way. Cross-region metrics monitor activities across different
// regions. Cross-zone metrics monitor cross-zone activities within the same
// region or in cases where region tiers are not configured. The check result is
// returned here to avoid redundant check for metrics updates after receiving
// batch responses.
func (ds *DistSender) checkAndUpdateBatchRequestMetrics(
ctx context.Context, ba *kvpb.BatchRequest,
) (shouldIncCrossRegion bool, shouldIncCrossZone bool) {
ds.metrics.ReplicaAddressedBatchRequestBytes.Inc(int64(ba.Size()))
isCrossRegion, isCrossZone := ds.isCrossRegionCrossZoneBatch(ctx, ba)
if isCrossRegion {
if !isCrossZone {
log.VEventf(ctx, 2, "unexpected: cross region but same zone")
} else {
ds.metrics.CrossRegionBatchRequestBytes.Inc(int64(ba.Size()))
shouldIncCrossRegion = true
}
} else {
if isCrossZone {
ds.metrics.CrossZoneBatchRequestBytes.Inc(int64(ba.Size()))
shouldIncCrossZone = true
}
}
return shouldIncCrossRegion, shouldIncCrossZone
}

// checkAndUpdateBatchResponseMetrics updates the batch response metrics based
// on the shouldIncCrossRegion and shouldIncCrossZone parameters. These
// parameters are determined during the initial check for batch requests. The
// underlying assumption is that if requests were cross-region or cross-zone,
// the response should be as well.
func (ds *DistSender) checkAndUpdateBatchResponseMetrics(
br *kvpb.BatchResponse, shouldIncCrossRegion bool, shouldIncCrossZone bool,
) {
ds.metrics.ReplicaAddressedBatchResponseBytes.Inc(int64(br.Size()))
if shouldIncCrossRegion {
ds.metrics.CrossRegionBatchResponseBytes.Inc(int64(br.Size()))
}
if shouldIncCrossZone {
ds.metrics.CrossZoneBatchResponseBytes.Inc(int64(br.Size()))
}
}

// getCostControllerConfig returns the config for the tenant cost model. This
// returns nil if no KV interceptors are associated with the DistSender, or the
// KV interceptor is not a multitenant.TenantSideCostController.
Expand Down
Loading

0 comments on commit 99f92ee

Please sign in to comment.