From bd02d7effb488b42fe4642b830af901f546dfae4 Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Thu, 4 Apr 2019 08:49:58 -0700 Subject: [PATCH 1/2] changefeedccl: consider closedts interval in slow log threshold This was currently only tied to the cluster setting that controls poller responsiveness, but we've switched to rangefeed by default. These two cluster setting values represent the target responsiveness of poller and range feed. The cluster setting for switching between poller and rangefeed is only checked at changefeed start/resume, so instead of switching on it here, just add them. Also add 1 second in case both these settings are set really low (as they are in unit tests). Closes #34690 Release note: None --- .../changefeedccl/changefeed_processors.go | 32 +++++++++++-------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeed_processors.go b/pkg/ccl/changefeedccl/changefeed_processors.go index 9091ecdb4db0..f8507a924e4f 100644 --- a/pkg/ccl/changefeedccl/changefeed_processors.go +++ b/pkg/ccl/changefeedccl/changefeed_processors.go @@ -10,6 +10,7 @@ package changefeedccl import ( "context" + "fmt" "math" "time" @@ -21,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/distsqlrun" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlbase" + "github.com/cockroachdb/cockroach/pkg/storage/closedts" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" @@ -545,29 +547,31 @@ func (cf *changeFrontier) noteResolvedSpan(d sqlbase.EncDatum) error { } } - // Potentially log the most behind span in the frontier for debugging. - slownessThreshold := 10 * changefeedPollInterval.Get(&cf.flowCtx.Settings.SV) + // Potentially log the most behind span in the frontier for debugging. These + // two cluster setting values represent the target responsiveness of poller + // and range feed. The cluster setting for switching between poller and + // rangefeed is only checked at changefeed start/resume, so instead of + // switching on it here, just add them. Also add 1 second in case both these + // settings are set really low (as they are in unit tests). + pollInterval := changefeedPollInterval.Get(&cf.flowCtx.Settings.SV) + closedtsInterval := closedts.TargetDuration.Get(&cf.flowCtx.Settings.SV) + slownessThreshold := time.Second + 10*(pollInterval+closedtsInterval) frontier := cf.sf.Frontier() now := timeutil.Now() if resolvedBehind := now.Sub(frontier.GoTime()); resolvedBehind > slownessThreshold { + description := `sinkless feed` + if cf.spec.JobID != 0 { + description = fmt.Sprintf("job %d", cf.spec.JobID) + } if frontierChanged { - if cf.spec.JobID != 0 { - log.Infof(cf.Ctx, "job %d new resolved timestamp %s is behind by %s", - cf.spec.JobID, frontier, resolvedBehind) - } else { - log.Infof(cf.Ctx, "sinkless feed new resolved timestamp %s is behind by %s", - frontier, resolvedBehind) - } + log.Infof(cf.Ctx, "%s new resolved timestamp %s is behind by %s", + description, frontier, resolvedBehind) } const slowSpanMaxFrequency = 10 * time.Second if now.Sub(cf.lastSlowSpanLog) > slowSpanMaxFrequency { cf.lastSlowSpanLog = now s := cf.sf.peekFrontierSpan() - if cf.spec.JobID != 0 { - log.Infof(cf.Ctx, "job %d span %s is behind by %s", cf.spec.JobID, s, resolvedBehind) - } else { - log.Infof(cf.Ctx, "sinkless feed span %s is behind by %s", s, resolvedBehind) - } + log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind) } } From 75924f8572e44304b1505f121ae481392ae80319 Mon Sep 17 00:00:00 2001 From: Daniel Harrison Date: Thu, 4 Apr 2019 09:00:43 -0700 Subject: [PATCH 2/2] storage: add kv.closed_timestamp.max_behind_nanos metric Largest latency between realtime and replica max closed timestamp Will help debug things like #35142 Release note: None --- pkg/storage/metrics.go | 14 ++++++++++++++ pkg/storage/store.go | 9 +++++++++ 2 files changed, 23 insertions(+) diff --git a/pkg/storage/metrics.go b/pkg/storage/metrics.go index 1e585710bfbb..0a309c4004e8 100644 --- a/pkg/storage/metrics.go +++ b/pkg/storage/metrics.go @@ -946,6 +946,14 @@ var ( Measurement: "Encryption At Rest", Unit: metric.Unit_CONST, } + + // Closed timestamp metrics. + metaClosedTimestampMaxBehindNanos = metric.Metadata{ + Name: "kv.closed_timestamp.max_behind_nanos", + Help: "Largest latency between realtime and replica max closed timestamp", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } ) // StoreMetrics is the set of metrics for a given store. @@ -1150,6 +1158,9 @@ type StoreMetrics struct { // RangeFeed counts. RangeFeedMetrics *rangefeed.Metrics + // Closed timestamp metrics. + ClosedTimestampMaxBehindNanos *metric.Gauge + // Stats for efficient merges. mu struct { syncutil.Mutex @@ -1350,6 +1361,9 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics { // RangeFeed counters. RangeFeedMetrics: rangefeed.NewMetrics(), + + // Closed timestamp metrics. + ClosedTimestampMaxBehindNanos: metric.NewGauge(metaClosedTimestampMaxBehindNanos), } sm.raftRcvdMessages[raftpb.MsgProp] = sm.RaftRcvdMsgProp diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 83c5d755c944..f05c83164b74 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -4083,6 +4083,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { } clusterNodes := s.ClusterNodeCount() + var minMaxClosedTS hlc.Timestamp newStoreReplicaVisitor(s).Visit(func(rep *Replica) bool { metrics := rep.Metrics(ctx, timestamp, livenessMap, clusterNodes) if metrics.Leader { @@ -4123,6 +4124,9 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { if wps, dur := rep.writeStats.avgQPS(); dur >= MinStatsDuration { averageWritesPerSecond += wps } + if mc := rep.maxClosed(ctx); minMaxClosedTS.IsEmpty() || mc.Less(minMaxClosedTS) { + minMaxClosedTS = mc + } return true // more }) @@ -4142,6 +4146,11 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error { s.metrics.OverReplicatedRangeCount.Update(overreplicatedRangeCount) s.metrics.RaftLogFollowerBehindCount.Update(behindCount) + if !minMaxClosedTS.IsEmpty() { + nanos := timeutil.Since(minMaxClosedTS.GoTime()).Nanoseconds() + s.metrics.ClosedTimestampMaxBehindNanos.Update(nanos) + } + return nil }