Skip to content

Commit

Permalink
Merge pull request #36551 from danhhz/backport19.1-36532
Browse files Browse the repository at this point in the history
release-19.1: changefeedccl,storage: better debugging for changefeeds that have fallen behind
  • Loading branch information
danhhz authored Apr 4, 2019
2 parents 250d614 + 75924f8 commit b88a6ce
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 14 deletions.
32 changes: 18 additions & 14 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ package changefeedccl

import (
"context"
"fmt"
"math"
"time"

Expand All @@ -21,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/storage/closedts"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
Expand Down Expand Up @@ -545,29 +547,31 @@ func (cf *changeFrontier) noteResolvedSpan(d sqlbase.EncDatum) error {
}
}

// Potentially log the most behind span in the frontier for debugging.
slownessThreshold := 10 * changefeedPollInterval.Get(&cf.flowCtx.Settings.SV)
// Potentially log the most behind span in the frontier for debugging. These
// two cluster setting values represent the target responsiveness of poller
// and range feed. The cluster setting for switching between poller and
// rangefeed is only checked at changefeed start/resume, so instead of
// switching on it here, just add them. Also add 1 second in case both these
// settings are set really low (as they are in unit tests).
pollInterval := changefeedPollInterval.Get(&cf.flowCtx.Settings.SV)
closedtsInterval := closedts.TargetDuration.Get(&cf.flowCtx.Settings.SV)
slownessThreshold := time.Second + 10*(pollInterval+closedtsInterval)
frontier := cf.sf.Frontier()
now := timeutil.Now()
if resolvedBehind := now.Sub(frontier.GoTime()); resolvedBehind > slownessThreshold {
description := `sinkless feed`
if cf.spec.JobID != 0 {
description = fmt.Sprintf("job %d", cf.spec.JobID)
}
if frontierChanged {
if cf.spec.JobID != 0 {
log.Infof(cf.Ctx, "job %d new resolved timestamp %s is behind by %s",
cf.spec.JobID, frontier, resolvedBehind)
} else {
log.Infof(cf.Ctx, "sinkless feed new resolved timestamp %s is behind by %s",
frontier, resolvedBehind)
}
log.Infof(cf.Ctx, "%s new resolved timestamp %s is behind by %s",
description, frontier, resolvedBehind)
}
const slowSpanMaxFrequency = 10 * time.Second
if now.Sub(cf.lastSlowSpanLog) > slowSpanMaxFrequency {
cf.lastSlowSpanLog = now
s := cf.sf.peekFrontierSpan()
if cf.spec.JobID != 0 {
log.Infof(cf.Ctx, "job %d span %s is behind by %s", cf.spec.JobID, s, resolvedBehind)
} else {
log.Infof(cf.Ctx, "sinkless feed span %s is behind by %s", s, resolvedBehind)
}
log.Infof(cf.Ctx, "%s span %s is behind by %s", description, s, resolvedBehind)
}
}

Expand Down
14 changes: 14 additions & 0 deletions pkg/storage/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -946,6 +946,14 @@ var (
Measurement: "Encryption At Rest",
Unit: metric.Unit_CONST,
}

// Closed timestamp metrics.
metaClosedTimestampMaxBehindNanos = metric.Metadata{
Name: "kv.closed_timestamp.max_behind_nanos",
Help: "Largest latency between realtime and replica max closed timestamp",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
)

// StoreMetrics is the set of metrics for a given store.
Expand Down Expand Up @@ -1150,6 +1158,9 @@ type StoreMetrics struct {
// RangeFeed counts.
RangeFeedMetrics *rangefeed.Metrics

// Closed timestamp metrics.
ClosedTimestampMaxBehindNanos *metric.Gauge

// Stats for efficient merges.
mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -1350,6 +1361,9 @@ func newStoreMetrics(histogramWindow time.Duration) *StoreMetrics {

// RangeFeed counters.
RangeFeedMetrics: rangefeed.NewMetrics(),

// Closed timestamp metrics.
ClosedTimestampMaxBehindNanos: metric.NewGauge(metaClosedTimestampMaxBehindNanos),
}

sm.raftRcvdMessages[raftpb.MsgProp] = sm.RaftRcvdMsgProp
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -4088,6 +4088,7 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
}
clusterNodes := s.ClusterNodeCount()

var minMaxClosedTS hlc.Timestamp
newStoreReplicaVisitor(s).Visit(func(rep *Replica) bool {
metrics := rep.Metrics(ctx, timestamp, livenessMap, clusterNodes)
if metrics.Leader {
Expand Down Expand Up @@ -4128,6 +4129,9 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
if wps, dur := rep.writeStats.avgQPS(); dur >= MinStatsDuration {
averageWritesPerSecond += wps
}
if mc := rep.maxClosed(ctx); minMaxClosedTS.IsEmpty() || mc.Less(minMaxClosedTS) {
minMaxClosedTS = mc
}
return true // more
})

Expand All @@ -4147,6 +4151,11 @@ func (s *Store) updateReplicationGauges(ctx context.Context) error {
s.metrics.OverReplicatedRangeCount.Update(overreplicatedRangeCount)
s.metrics.RaftLogFollowerBehindCount.Update(behindCount)

if !minMaxClosedTS.IsEmpty() {
nanos := timeutil.Since(minMaxClosedTS.GoTime()).Nanoseconds()
s.metrics.ClosedTimestampMaxBehindNanos.Update(nanos)
}

return nil
}

Expand Down

0 comments on commit b88a6ce

Please sign in to comment.