Skip to content

Commit

Permalink
Merge pull request #96514 from abarganier/backport22.2-96029
Browse files Browse the repository at this point in the history
  • Loading branch information
abarganier authored Feb 6, 2023
2 parents ecaab51 + b916c5b commit e7defe0
Show file tree
Hide file tree
Showing 43 changed files with 1,374 additions and 498 deletions.
74 changes: 59 additions & 15 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ import (
var enableSLIMetrics = envutil.EnvOrDefaultBool(
"COCKROACH_EXPERIMENTAL_ENABLE_PER_CHANGEFEED_METRICS", false)

const (
changefeedCheckpointHistMaxLatency = 30 * time.Second
changefeedBatchHistMaxLatency = 30 * time.Second
changefeedFlushHistMaxLatency = 1 * time.Minute
admitLatencyMaxValue = 1 * time.Minute
commitLatencyMaxValue = 10 * time.Minute
)

// max length for the scope name.
const maxSLIScopeNameLen = 128

Expand Down Expand Up @@ -482,18 +490,48 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
// retain significant figures of 2.
b := aggmetric.MakeBuilder("scope")
a := &AggMetrics{
ErrorRetries: b.Counter(metaChangefeedErrorRetries),
EmittedMessages: b.Counter(metaChangefeedEmittedMessages),
MessageSize: b.Histogram(metaMessageSize, histogramWindow, metric.DataSize16MBBuckets),
ErrorRetries: b.Counter(metaChangefeedErrorRetries),
EmittedMessages: b.Counter(metaChangefeedEmittedMessages),
MessageSize: b.Histogram(metric.HistogramOptions{
Metadata: metaMessageSize,
Duration: histogramWindow,
MaxVal: 10 << 20, /* 10MB max message size */
SigFigs: 1,
Buckets: metric.DataSize16MBBuckets,
}),
EmittedBytes: b.Counter(metaChangefeedEmittedBytes),
FlushedBytes: b.Counter(metaChangefeedFlushedBytes),
Flushes: b.Counter(metaChangefeedFlushes),
SizeBasedFlushes: b.Counter(metaSizeBasedFlushes),

BatchHistNanos: b.Histogram(metaChangefeedBatchHistNanos, histogramWindow, metric.BatchProcessLatencyBuckets),
FlushHistNanos: b.Histogram(metaChangefeedFlushHistNanos, histogramWindow, metric.BatchProcessLatencyBuckets),
CommitLatency: b.Histogram(metaCommitLatency, histogramWindow, metric.BatchProcessLatencyBuckets),
AdmitLatency: b.Histogram(metaAdmitLatency, histogramWindow, metric.BatchProcessLatencyBuckets),
BatchHistNanos: b.Histogram(metric.HistogramOptions{
Metadata: metaChangefeedBatchHistNanos,
Duration: histogramWindow,
MaxVal: changefeedBatchHistMaxLatency.Nanoseconds(),
SigFigs: 1,
Buckets: metric.BatchProcessLatencyBuckets,
}),
FlushHistNanos: b.Histogram(metric.HistogramOptions{
Metadata: metaChangefeedFlushHistNanos,
Duration: histogramWindow,
MaxVal: changefeedFlushHistMaxLatency.Nanoseconds(),
SigFigs: 2,
Buckets: metric.BatchProcessLatencyBuckets,
}),
CommitLatency: b.Histogram(metric.HistogramOptions{
Metadata: metaCommitLatency,
Duration: histogramWindow,
MaxVal: commitLatencyMaxValue.Nanoseconds(),
SigFigs: 1,
Buckets: metric.BatchProcessLatencyBuckets,
}),
AdmitLatency: b.Histogram(metric.HistogramOptions{
Metadata: metaAdmitLatency,
Duration: histogramWindow,
MaxVal: admitLatencyMaxValue.Nanoseconds(),
SigFigs: 1,
Buckets: metric.BatchProcessLatencyBuckets,
}),
BackfillCount: b.Gauge(metaChangefeedBackfillCount),
BackfillPendingRanges: b.Gauge(metaChangefeedBackfillPendingRanges),
RunningCount: b.Gauge(metaChangefeedRunning),
Expand Down Expand Up @@ -574,7 +612,7 @@ type Metrics struct {
Failures *metric.Counter
ResolvedMessages *metric.Counter
QueueTimeNanos *metric.Counter
CheckpointHistNanos *metric.Histogram
CheckpointHistNanos metric.IHistogram
FrontierUpdates *metric.Counter
ThrottleMetrics cdcutils.Metrics
ReplanCount *metric.Counter
Expand All @@ -601,13 +639,19 @@ func (m *Metrics) getSLIMetrics(scope string) (*sliMetrics, error) {
// MakeMetrics makes the metrics for changefeed monitoring.
func MakeMetrics(histogramWindow time.Duration) metric.Struct {
m := &Metrics{
AggMetrics: newAggregateMetrics(histogramWindow),
KVFeedMetrics: kvevent.MakeMetrics(histogramWindow),
SchemaFeedMetrics: schemafeed.MakeMetrics(histogramWindow),
ResolvedMessages: metric.NewCounter(metaChangefeedForwardedResolvedMessages),
Failures: metric.NewCounter(metaChangefeedFailures),
QueueTimeNanos: metric.NewCounter(metaEventQueueTime),
CheckpointHistNanos: metric.NewHistogram(metaChangefeedCheckpointHistNanos, histogramWindow, metric.IOLatencyBuckets),
AggMetrics: newAggregateMetrics(histogramWindow),
KVFeedMetrics: kvevent.MakeMetrics(histogramWindow),
SchemaFeedMetrics: schemafeed.MakeMetrics(histogramWindow),
ResolvedMessages: metric.NewCounter(metaChangefeedForwardedResolvedMessages),
Failures: metric.NewCounter(metaChangefeedFailures),
QueueTimeNanos: metric.NewCounter(metaEventQueueTime),
CheckpointHistNanos: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaChangefeedCheckpointHistNanos,
Duration: histogramWindow,
MaxVal: changefeedCheckpointHistMaxLatency.Nanoseconds(),
SigFigs: 2,
Buckets: metric.IOLatencyBuckets,
}),
FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates),
ThrottleMetrics: cdcutils.MakeMetrics(histogramWindow),
ReplanCount: metric.NewCounter(metaChangefeedReplanCount),
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/sqlproxyccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type connector struct {

// DialTenantLatency tracks how long it takes to retrieve the address for
// a tenant and set up a tcp connection to the address.
DialTenantLatency *metric.Histogram
DialTenantLatency metric.IHistogram

// DialTenantRetries counts how often dialing a tenant is retried.
DialTenantRetries *metric.Counter
Expand Down
27 changes: 18 additions & 9 deletions pkg/ccl/sqlproxyccl/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,12 @@ func TestConnector_dialTenantCluster(t *testing.T) {

c := &connector{
TenantID: roachpb.MakeTenantID(42),
DialTenantLatency: metric.NewHistogram(
metaDialTenantLatency, time.Millisecond, metric.NetworkLatencyBuckets,
),
DialTenantLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePrometheus,
Metadata: metaDialTenantLatency,
Duration: time.Millisecond,
Buckets: metric.NetworkLatencyBuckets,
}),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
}
dc := &testTenantDirectoryCache{}
Expand Down Expand Up @@ -459,9 +462,12 @@ func TestConnector_dialTenantCluster(t *testing.T) {
defer cancel()

c := &connector{
DialTenantLatency: metric.NewHistogram(
metaDialTenantLatency, time.Millisecond, metric.NetworkLatencyBuckets,
),
DialTenantLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaDialTenantLatency,
Duration: time.Millisecond,
Buckets: metric.NetworkLatencyBuckets,
}),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
}
c.testingKnobs.lookupAddr = func(ctx context.Context) (string, error) {
Expand Down Expand Up @@ -490,9 +496,12 @@ func TestConnector_dialTenantCluster(t *testing.T) {
var reportFailureFnCount int
c := &connector{
TenantID: roachpb.MakeTenantID(42),
DialTenantLatency: metric.NewHistogram(
metaDialTenantLatency, time.Millisecond, metric.NetworkLatencyBuckets,
),
DialTenantLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaDialTenantLatency,
Duration: time.Millisecond,
Buckets: metric.NetworkLatencyBuckets,
}),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
}
c.DirectoryCache = &testTenantDirectoryCache{
Expand Down
61 changes: 38 additions & 23 deletions pkg/ccl/sqlproxyccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,19 @@ type metrics struct {
RoutingErrCount *metric.Counter
RefusedConnCount *metric.Counter
SuccessfulConnCount *metric.Counter
ConnectionLatency *metric.Histogram
ConnectionLatency metric.IHistogram
AuthFailedCount *metric.Counter
ExpiredClientConnCount *metric.Counter

DialTenantLatency *metric.Histogram
DialTenantLatency metric.IHistogram
DialTenantRetries *metric.Counter

ConnMigrationSuccessCount *metric.Counter
ConnMigrationErrorFatalCount *metric.Counter
ConnMigrationErrorRecoverableCount *metric.Counter
ConnMigrationAttemptedCount *metric.Counter
ConnMigrationAttemptedLatency *metric.Histogram
ConnMigrationTransferResponseMessageSize *metric.Histogram
ConnMigrationAttemptedLatency metric.IHistogram
ConnMigrationTransferResponseMessageSize metric.IHistogram

QueryCancelReceivedPGWire *metric.Counter
QueryCancelReceivedHTTP *metric.Counter
Expand All @@ -49,6 +49,16 @@ func (metrics) MetricStruct() {}

var _ metric.Struct = metrics{}

const (
// maxExpectedTransferResponseMessageSize corresponds to maximum expected
// response message size for the SHOW TRANSFER STATE query. We choose 16MB
// here to match the defaultMaxReadBufferSize used for ingesting SQL
// statements in the SQL server (see pkg/sql/pgwire/pgwirebase/encoding.go).
//
// This will be used to tune sql.session_transfer.max_session_size.
maxExpectedTransferResponseMessageSize = 1 << 24 // 16MB
)

var (
metaCurConnCount = metric.Metadata{
Name: "proxy.sql.conns",
Expand Down Expand Up @@ -213,35 +223,40 @@ func makeProxyMetrics() metrics {
RoutingErrCount: metric.NewCounter(metaRoutingErrCount),
RefusedConnCount: metric.NewCounter(metaRefusedConnCount),
SuccessfulConnCount: metric.NewCounter(metaSuccessfulConnCount),
ConnectionLatency: metric.NewHistogram(
metaConnMigrationAttemptedCount,
base.DefaultHistogramWindowInterval(),
metric.NetworkLatencyBuckets,
),
ConnectionLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaConnMigrationAttemptedCount,
Duration: base.DefaultHistogramWindowInterval(),
Buckets: metric.NetworkLatencyBuckets,
}),
AuthFailedCount: metric.NewCounter(metaAuthFailedCount),
ExpiredClientConnCount: metric.NewCounter(metaExpiredClientConnCount),
// Connector metrics.
DialTenantLatency: metric.NewHistogram(
metaDialTenantLatency,
base.DefaultHistogramWindowInterval(),
metric.NetworkLatencyBuckets,
DialTenantLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaDialTenantLatency,
Duration: base.DefaultHistogramWindowInterval(),
Buckets: metric.NetworkLatencyBuckets},
),
DialTenantRetries: metric.NewCounter(metaDialTenantRetries),
// Connection migration metrics.
ConnMigrationSuccessCount: metric.NewCounter(metaConnMigrationSuccessCount),
ConnMigrationErrorFatalCount: metric.NewCounter(metaConnMigrationErrorFatalCount),
ConnMigrationErrorRecoverableCount: metric.NewCounter(metaConnMigrationErrorRecoverableCount),
ConnMigrationAttemptedCount: metric.NewCounter(metaConnMigrationAttemptedCount),
ConnMigrationAttemptedLatency: metric.NewHistogram(
metaConnMigrationAttemptedLatency,
base.DefaultHistogramWindowInterval(),
metric.NetworkLatencyBuckets,
),
ConnMigrationTransferResponseMessageSize: metric.NewHistogram(
metaConnMigrationTransferResponseMessageSize,
base.DefaultHistogramWindowInterval(),
metric.DataSize16MBBuckets,
),
ConnMigrationAttemptedLatency: metric.NewHistogram(metric.HistogramOptions{
Mode: metric.HistogramModePreferHdrLatency,
Metadata: metaConnMigrationAttemptedLatency,
Duration: base.DefaultHistogramWindowInterval(),
Buckets: metric.NetworkLatencyBuckets,
}),
ConnMigrationTransferResponseMessageSize: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaConnMigrationTransferResponseMessageSize,
Duration: base.DefaultHistogramWindowInterval(),
Buckets: metric.DataSize16MBBuckets,
MaxVal: maxExpectedTransferResponseMessageSize,
SigFigs: 1,
}),
QueryCancelReceivedPGWire: metric.NewCounter(metaQueryCancelReceivedPGWire),
QueryCancelReceivedHTTP: metric.NewCounter(metaQueryCancelReceivedHTTP),
QueryCancelIgnored: metric.NewCounter(metaQueryCancelIgnored),
Expand Down
39 changes: 30 additions & 9 deletions pkg/ccl/streamingccl/streamingest/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric"
)

const (
streamingFlushHistMaxLatency = 1 * time.Minute
streamingAdmitLatencyMaxValue = 3 * time.Minute
streamingCommitLatencyMaxValue = 10 * time.Minute
)

var (
metaStreamingEventsIngested = metric.Metadata{
Name: "streaming.events_ingested",
Expand Down Expand Up @@ -107,9 +113,9 @@ type Metrics struct {
Flushes *metric.Counter
JobProgressUpdates *metric.Counter
ResolvedEvents *metric.Counter
FlushHistNanos *metric.Histogram
CommitLatency *metric.Histogram
AdmitLatency *metric.Histogram
FlushHistNanos metric.IHistogram
CommitLatency metric.IHistogram
AdmitLatency metric.IHistogram
RunningCount *metric.Gauge
EarliestDataCheckpointSpan *metric.Gauge
LatestDataCheckpointSpan *metric.Gauge
Expand All @@ -128,12 +134,27 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
Flushes: metric.NewCounter(metaStreamingFlushes),
ResolvedEvents: metric.NewCounter(metaStreamingResolvedEventsIngested),
JobProgressUpdates: metric.NewCounter(metaJobProgressUpdates),
FlushHistNanos: metric.NewHistogram(metaStreamingFlushHistNanos,
histogramWindow, metric.BatchProcessLatencyBuckets),
CommitLatency: metric.NewHistogram(metaStreamingCommitLatency,
histogramWindow, metric.BatchProcessLatencyBuckets),
AdmitLatency: metric.NewHistogram(metaStreamingAdmitLatency,
histogramWindow, metric.BatchProcessLatencyBuckets),
FlushHistNanos: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaStreamingFlushHistNanos,
Duration: histogramWindow,
Buckets: metric.BatchProcessLatencyBuckets,
MaxVal: streamingFlushHistMaxLatency.Nanoseconds(),
SigFigs: 1,
}),
CommitLatency: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaStreamingCommitLatency,
Duration: histogramWindow,
Buckets: metric.BatchProcessLatencyBuckets,
MaxVal: streamingCommitLatencyMaxValue.Nanoseconds(),
SigFigs: 1,
}),
AdmitLatency: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaStreamingAdmitLatency,
Duration: histogramWindow,
Buckets: metric.BatchProcessLatencyBuckets,
MaxVal: streamingAdmitLatencyMaxValue.Nanoseconds(),
SigFigs: 1,
}),
RunningCount: metric.NewGauge(metaStreamsRunning),
EarliestDataCheckpointSpan: metric.NewGauge(metaEarliestDataCheckpointSpan),
LatestDataCheckpointSpan: metric.NewGauge(metaLatestDataCheckpointSpan),
Expand Down
14 changes: 12 additions & 2 deletions pkg/kv/bulk/bulk_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// Metrics contains pointers to the metrics for
// monitoring bulk operations.
type Metrics struct {
MaxBytesHist *metric.Histogram
MaxBytesHist metric.IHistogram
CurBytesCount *metric.Gauge
}

Expand All @@ -44,10 +44,20 @@ var (
}
)

// See pkg/sql/mem_metrics.go
// log10int64times1000 = log10(math.MaxInt64) * 1000, rounded up somewhat
const log10int64times1000 = 19 * 1000

// MakeBulkMetrics instantiates the metrics holder for bulk operation monitoring.
func MakeBulkMetrics(histogramWindow time.Duration) Metrics {
return Metrics{
MaxBytesHist: metric.NewHistogram(metaMemMaxBytes, histogramWindow, metric.MemoryUsage64MBBuckets),
MaxBytesHist: metric.NewHistogram(metric.HistogramOptions{
Metadata: metaMemMaxBytes,
Duration: histogramWindow,
MaxVal: log10int64times1000,
SigFigs: 3,
Buckets: metric.MemoryUsage64MBBuckets,
}),
CurBytesCount: metric.NewGauge(metaMemCurBytes),
}
}
Expand Down
Loading

0 comments on commit e7defe0

Please sign in to comment.