Skip to content

Commit

Permalink
pkg/util/metric: optionally reintroduce legacy hdrhistogram model
Browse files Browse the repository at this point in the history
Addresses cockroachdb#95833

This patch reeintroduces the old HdrHistogram model to optionally be
enabled in favor of the new Prometheus model, gated behind
an environment variable called `COCKROACH_ENABLE_HDR_HISTOGRAMS`,
allowing users a means to "fall back" to the old model in the
event that the new model does not adequately serve their needs
(think of this as an "insurance policy" to protect against
this from happening again with no real mitigation - ideally,
this environment variable should never have to be used).

Note: some histograms were introduced *after* the new
Prometheus histograms were added to CockroachDB. In this
case, we use the `ForceUsePrometheus` option in the
`HistogramOptions` struct to ignore the value of the env
var, since there never was a time where these specific
histograms used the HdrHistogram model.

Release note (ops change): Histogram metrics can now optionally
use the legacy HdrHistogram model by setting the environment var
`COCKROACH_ENABLE_HDR_HISTOGRAMS=true` on CockroachDB nodes.
**Note that this is not recommended** unless users are having
difficulties with the newer Prometheus-backed histogram model.
Enabling can cause performance issues with timeseries databases
like Prometheus, as processing and storing the increased number
of buckets is taxing on both CPU and storage. Note that the
HdrHistogram model is slated for full deprecation in upcoming
releases.
  • Loading branch information
dhartunian authored and abarganier committed Feb 6, 2023
1 parent 30cd22e commit 44d0ffa
Show file tree
Hide file tree
Showing 41 changed files with 1,133 additions and 437 deletions.
74 changes: 59 additions & 15 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ import (
var enableSLIMetrics = envutil.EnvOrDefaultBool(
"COCKROACH_EXPERIMENTAL_ENABLE_PER_CHANGEFEED_METRICS", false)

const (
changefeedCheckpointHistMaxLatency = 30 * time.Second
changefeedBatchHistMaxLatency = 30 * time.Second
changefeedFlushHistMaxLatency = 1 * time.Minute
admitLatencyMaxValue = 1 * time.Minute
commitLatencyMaxValue = 10 * time.Minute
)

// max length for the scope name.
const maxSLIScopeNameLen = 128

Expand Down Expand Up @@ -482,18 +490,48 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
// retain significant figures of 2.
b := aggmetric.MakeBuilder("scope")
a := &AggMetrics{
ErrorRetries: b.Counter(metaChangefeedErrorRetries),
EmittedMessages: b.Counter(metaChangefeedEmittedMessages),
MessageSize: b.Histogram(metaMessageSize, histogramWindow, metric.DataSize16MBBuckets),
ErrorRetries: b.Counter(metaChangefeedErrorRetries),
EmittedMessages: b.Counter(metaChangefeedEmittedMessages),
MessageSize: b.Histogram(metric.HistogramOptions{
Metadata: metaMessageSize,
Duration: histogramWindow,
MaxVal: 10 << 20, /* 10MB max message size */
SigFigs: 1,
Buckets: metric.DataSize16MBBuckets,
}),
EmittedBytes: b.Counter(metaChangefeedEmittedBytes),
FlushedBytes: b.Counter(metaChangefeedFlushedBytes),
Flushes: b.Counter(metaChangefeedFlushes),
SizeBasedFlushes: b.Counter(metaSizeBasedFlushes),

BatchHistNanos: b.Histogram(metaChangefeedBatchHistNanos, histogramWindow, metric.BatchProcessLatencyBuckets),
FlushHistNanos: b.Histogram(metaChangefeedFlushHistNanos, histogramWindow, metric.BatchProcessLatencyBuckets),
CommitLatency: b.Histogram(metaCommitLatency, histogramWindow, metric.BatchProcessLatencyBuckets),
AdmitLatency: b.Histogram(metaAdmitLatency, histogramWindow, metric.BatchProcessLatencyBuckets),
BatchHistNanos: b.Histogram(metric.HistogramOptions{
Metadata: metaChangefeedBatchHistNanos,
Duration: histogramWindow,
MaxVal: changefeedBatchHistMaxLatency.Nanoseconds(),
SigFigs: 1,
Buckets: metric.BatchProcessLatencyBuckets,
}),
FlushHistNanos: b.Histogram(metric.HistogramOptions{
Metadata: metaChangefeedFlushHistNanos,
Duration: histogramWindow,
MaxVal: changefeedFlushHistMaxLatency.Nanoseconds(),
SigFigs: 2,
Buckets: metric.BatchProcessLatencyBuckets,
}),
CommitLatency: b.Histogram(metric.HistogramOptions{
Metadata: metaCommitLatency,
Duration: histogramWindow,
MaxVal: commitLatencyMaxValue.Nanoseconds(),
SigFigs: 1,
Buckets: metric.BatchProcessLatencyBuckets,
}),
AdmitLatency: b.Histogram(metric.HistogramOptions{
Metadata: metaAdmitLatency,
Duration: histogramWindow,
MaxVal: admitLatencyMaxValue.Nanoseconds(),
SigFigs: 1,
Buckets: metric.BatchProcessLatencyBuckets,
}),
BackfillCount: b.Gauge(metaChangefeedBackfillCount),
BackfillPendingRanges: b.Gauge(metaChangefeedBackfillPendingRanges),
RunningCount: b.Gauge(metaChangefeedRunning),
Expand Down Expand Up @@ -574,7 +612,7 @@ type Metrics struct {
Failures *metric.Counter
ResolvedMessages *metric.Counter
QueueTimeNanos *metric.Counter
CheckpointHistNanos *metric.Histogram
CheckpointHistNanos metric.IHistogram
FrontierUpdates *metric.Counter
ThrottleMetrics cdcutils.Metrics
ReplanCount *metric.Counter
Expand All @@ -601,13 +639,19 @@ func (m *Metrics) getSLIMetrics(scope string) (*sliMetrics, error) {
// MakeMetrics makes the metrics for changefeed monitoring.
func MakeMetrics(histogramWindow time.Duration) metric.Struct {
m := &Metrics{
AggMetrics: newAggregateMetrics(histogramWindow),
KVFeedMetrics: kvevent.MakeMetrics(histogramWindow),
SchemaFeedMetrics: schemafeed.MakeMetrics(histogramWindow),
ResolvedMessages: metric.NewCounter(metaChangefeedForwardedResolvedMessages),
Failures: metric.NewCounter(metaChangefeedFailures),
QueueTimeNanos: metric.NewCounter(metaEventQueueTime),
CheckpointHistNanos: metric.NewHistogram(metaChangefeedCheckpointHistNanos, histogramWindow, metric.IOLatencyBuckets),
AggMetrics: newAggregateMetrics(histogramWindow),
KVFeedMetrics: kvevent.MakeMetrics(histogramWindow),
SchemaFeedMetrics: schemafeed.MakeMetrics(histogramWindow),
ResolvedMessages: metric.NewCounter(metaChangefeedForwardedResolvedMessages),
Failures: metric.NewCounter(metaChangefeedFailures),
QueueTimeNanos: metric.NewCounter(metaEventQueueTime),
CheckpointHistNanos: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaChangefeedCheckpointHistNanos,
Duration: histogramWindow,
MaxVal: changefeedCheckpointHistMaxLatency.Nanoseconds(),
SigFigs: 2,
Buckets: metric.IOLatencyBuckets,
}),
FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates),
ThrottleMetrics: cdcutils.MakeMetrics(histogramWindow),
ReplanCount: metric.NewCounter(metaChangefeedReplanCount),
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/sqlproxyccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type connector struct {

// DialTenantLatency tracks how long it takes to retrieve the address for
// a tenant and set up a tcp connection to the address.
DialTenantLatency *metric.Histogram
DialTenantLatency metric.IHistogram

// DialTenantRetries counts how often dialing a tenant is retried.
DialTenantRetries *metric.Counter
Expand Down
27 changes: 18 additions & 9 deletions pkg/ccl/sqlproxyccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,12 @@ func TestConnector_dialTenantCluster(t *testing.T) {

c := &connector{
TenantID: roachpb.MakeTenantID(42),
DialTenantLatency: metric.NewHistogram(
metaDialTenantLatency, time.Millisecond, metric.NetworkLatencyBuckets,
),
DialTenantLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePrometheus,
Metadata: metaDialTenantLatency,
Duration: time.Millisecond,
Buckets: metric.NetworkLatencyBuckets,
}),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
}
dc := &testTenantDirectoryCache{}
Expand Down Expand Up @@ -459,9 +462,12 @@ func TestConnector_dialTenantCluster(t *testing.T) {
defer cancel()

c := &connector{
DialTenantLatency: metric.NewHistogram(
metaDialTenantLatency, time.Millisecond, metric.NetworkLatencyBuckets,
),
DialTenantLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaDialTenantLatency,
Duration: time.Millisecond,
Buckets: metric.NetworkLatencyBuckets,
}),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
}
c.testingKnobs.lookupAddr = func(ctx context.Context) (string, error) {
Expand Down Expand Up @@ -490,9 +496,12 @@ func TestConnector_dialTenantCluster(t *testing.T) {
var reportFailureFnCount int
c := &connector{
TenantID: roachpb.MakeTenantID(42),
DialTenantLatency: metric.NewHistogram(
metaDialTenantLatency, time.Millisecond, metric.NetworkLatencyBuckets,
),
DialTenantLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaDialTenantLatency,
Duration: time.Millisecond,
Buckets: metric.NetworkLatencyBuckets,
}),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
}
c.DirectoryCache = &testTenantDirectoryCache{
Expand Down
61 changes: 38 additions & 23 deletions pkg/ccl/sqlproxyccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ type metrics struct {
RoutingErrCount *metric.Counter
RefusedConnCount *metric.Counter
SuccessfulConnCount *metric.Counter
ConnectionLatency *metric.Histogram
ConnectionLatency metric.IHistogram
AuthFailedCount *metric.Counter
ExpiredClientConnCount *metric.Counter

DialTenantLatency *metric.Histogram
DialTenantLatency metric.IHistogram
DialTenantRetries *metric.Counter

ConnMigrationSuccessCount *metric.Counter
ConnMigrationErrorFatalCount *metric.Counter
ConnMigrationErrorRecoverableCount *metric.Counter
ConnMigrationAttemptedCount *metric.Counter
ConnMigrationAttemptedLatency *metric.Histogram
ConnMigrationTransferResponseMessageSize *metric.Histogram
ConnMigrationAttemptedLatency metric.IHistogram
ConnMigrationTransferResponseMessageSize metric.IHistogram

QueryCancelReceivedPGWire *metric.Counter
QueryCancelReceivedHTTP *metric.Counter
Expand All @@ -49,6 +49,16 @@ func (metrics) MetricStruct() {}

var _ metric.Struct = metrics{}

const (
// maxExpectedTransferResponseMessageSize corresponds to maximum expected
// response message size for the SHOW TRANSFER STATE query. We choose 16MB
// here to match the defaultMaxReadBufferSize used for ingesting SQL
// statements in the SQL server (see pkg/sql/pgwire/pgwirebase/encoding.go).
//
// This will be used to tune sql.session_transfer.max_session_size.
maxExpectedTransferResponseMessageSize = 1 << 24 // 16MB
)

var (
metaCurConnCount = metric.Metadata{
Name: "proxy.sql.conns",
Expand Down Expand Up @@ -213,35 +223,40 @@ func makeProxyMetrics() metrics {
RoutingErrCount: metric.NewCounter(metaRoutingErrCount),
RefusedConnCount: metric.NewCounter(metaRefusedConnCount),
SuccessfulConnCount: metric.NewCounter(metaSuccessfulConnCount),
ConnectionLatency: metric.NewHistogram(
metaConnMigrationAttemptedCount,
base.DefaultHistogramWindowInterval(),
metric.NetworkLatencyBuckets,
),
ConnectionLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaConnMigrationAttemptedCount,
Duration: base.DefaultHistogramWindowInterval(),
Buckets: metric.NetworkLatencyBuckets,
}),
AuthFailedCount: metric.NewCounter(metaAuthFailedCount),
ExpiredClientConnCount: metric.NewCounter(metaExpiredClientConnCount),
// Connector metrics.
DialTenantLatency: metric.NewHistogram(
metaDialTenantLatency,
base.DefaultHistogramWindowInterval(),
metric.NetworkLatencyBuckets,
DialTenantLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaDialTenantLatency,
Duration: base.DefaultHistogramWindowInterval(),
Buckets: metric.NetworkLatencyBuckets},
),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
// Connection migration metrics.
ConnMigrationSuccessCount: metric.NewCounter(metaConnMigrationSuccessCount),
ConnMigrationErrorFatalCount: metric.NewCounter(metaConnMigrationErrorFatalCount),
ConnMigrationErrorRecoverableCount: metric.NewCounter(metaConnMigrationErrorRecoverableCount),
ConnMigrationAttemptedCount: metric.NewCounter(metaConnMigrationAttemptedCount),
ConnMigrationAttemptedLatency: metric.NewHistogram(
metaConnMigrationAttemptedLatency,
base.DefaultHistogramWindowInterval(),
metric.NetworkLatencyBuckets,
),
ConnMigrationTransferResponseMessageSize: metric.NewHistogram(
metaConnMigrationTransferResponseMessageSize,
base.DefaultHistogramWindowInterval(),
metric.DataSize16MBBuckets,
),
ConnMigrationAttemptedLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaConnMigrationAttemptedLatency,
Duration: base.DefaultHistogramWindowInterval(),
Buckets: metric.NetworkLatencyBuckets,
}),
ConnMigrationTransferResponseMessageSize: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaConnMigrationTransferResponseMessageSize,
Duration: base.DefaultHistogramWindowInterval(),
Buckets: metric.DataSize16MBBuckets,
MaxVal: maxExpectedTransferResponseMessageSize,
SigFigs: 1,
}),
QueryCancelReceivedPGWire: metric.NewCounter(metaQueryCancelReceivedPGWire),
QueryCancelReceivedHTTP: metric.NewCounter(metaQueryCancelReceivedHTTP),
QueryCancelIgnored: metric.NewCounter(metaQueryCancelIgnored),
Expand Down
39 changes: 30 additions & 9 deletions pkg/ccl/streamingccl/streamingest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
)

const (
streamingFlushHistMaxLatency = 1 * time.Minute
streamingAdmitLatencyMaxValue = 3 * time.Minute
streamingCommitLatencyMaxValue = 10 * time.Minute
)

var (
metaStreamingEventsIngested = metric.Metadata{
Name: "streaming.events_ingested",
Expand Down Expand Up @@ -107,9 +113,9 @@ type Metrics struct {
Flushes *metric.Counter
JobProgressUpdates *metric.Counter
ResolvedEvents *metric.Counter
FlushHistNanos *metric.Histogram
CommitLatency *metric.Histogram
AdmitLatency *metric.Histogram
FlushHistNanos metric.IHistogram
CommitLatency metric.IHistogram
AdmitLatency metric.IHistogram
RunningCount *metric.Gauge
EarliestDataCheckpointSpan *metric.Gauge
LatestDataCheckpointSpan *metric.Gauge
Expand All @@ -128,12 +134,27 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
Flushes: metric.NewCounter(metaStreamingFlushes),
ResolvedEvents: metric.NewCounter(metaStreamingResolvedEventsIngested),
JobProgressUpdates: metric.NewCounter(metaJobProgressUpdates),
FlushHistNanos: metric.NewHistogram(metaStreamingFlushHistNanos,
histogramWindow, metric.BatchProcessLatencyBuckets),
CommitLatency: metric.NewHistogram(metaStreamingCommitLatency,
histogramWindow, metric.BatchProcessLatencyBuckets),
AdmitLatency: metric.NewHistogram(metaStreamingAdmitLatency,
histogramWindow, metric.BatchProcessLatencyBuckets),
FlushHistNanos: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaStreamingFlushHistNanos,
Duration: histogramWindow,
Buckets: metric.BatchProcessLatencyBuckets,
MaxVal: streamingFlushHistMaxLatency.Nanoseconds(),
SigFigs: 1,
}),
CommitLatency: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaStreamingCommitLatency,
Duration: histogramWindow,
Buckets: metric.BatchProcessLatencyBuckets,
MaxVal: streamingCommitLatencyMaxValue.Nanoseconds(),
SigFigs: 1,
}),
AdmitLatency: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaStreamingAdmitLatency,
Duration: histogramWindow,
Buckets: metric.BatchProcessLatencyBuckets,
MaxVal: streamingAdmitLatencyMaxValue.Nanoseconds(),
SigFigs: 1,
}),
RunningCount: metric.NewGauge(metaStreamsRunning),
EarliestDataCheckpointSpan: metric.NewGauge(metaEarliestDataCheckpointSpan),
LatestDataCheckpointSpan: metric.NewGauge(metaLatestDataCheckpointSpan),
Expand Down
14 changes: 12 additions & 2 deletions pkg/kv/bulk/bulk_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// Metrics contains pointers to the metrics for
// monitoring bulk operations.
type Metrics struct {
MaxBytesHist *metric.Histogram
MaxBytesHist metric.IHistogram
CurBytesCount *metric.Gauge
}

Expand All @@ -44,10 +44,20 @@ var (
}
)

// See pkg/sql/mem_metrics.go
// log10int64times1000 = log10(math.MaxInt64) * 1000, rounded up somewhat
const log10int64times1000 = 19 * 1000

// MakeBulkMetrics instantiates the metrics holder for bulk operation monitoring.
func MakeBulkMetrics(histogramWindow time.Duration) Metrics {
return Metrics{
MaxBytesHist: metric.NewHistogram(metaMemMaxBytes, histogramWindow, metric.MemoryUsage64MBBuckets),
MaxBytesHist: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaMemMaxBytes,
Duration: histogramWindow,
MaxVal: log10int64times1000,
SigFigs: 3,
Buckets: metric.MemoryUsage64MBBuckets,
}),
CurBytesCount: metric.NewGauge(metaMemCurBytes),
}
}
Expand Down
Loading

0 comments on commit 44d0ffa

Please sign in to comment.