Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl, kv: Add few metrics #77711

Merged
merged 2 commits into from
Mar 14, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 26 additions & 9 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
@@ -43,6 +43,7 @@ const defaultSLIScope = "default"
// indicators, combined with a limited number of per-changefeed indicators.
type AggMetrics struct {
EmittedMessages *aggmetric.AggCounter
MessageSize *aggmetric.AggHistogram
EmittedBytes *aggmetric.AggCounter
FlushedBytes *aggmetric.AggCounter
BatchHistNanos *aggmetric.AggHistogram
@@ -68,6 +69,7 @@ func (a *AggMetrics) MetricStruct() {}
// sliMetrics holds all SLI related metrics aggregated into AggMetrics.
type sliMetrics struct {
EmittedMessages *aggmetric.Counter
MessageSize *aggmetric.Histogram
EmittedBytes *aggmetric.Counter
FlushedBytes *aggmetric.Counter
BatchHistNanos *aggmetric.Histogram
@@ -85,16 +87,23 @@ type sliMetrics struct {
// does not compress the data it emits.
const sinkDoesNotCompress = -1

type recordEmittedMessagesCallback func(numMessages int, mvcc hlc.Timestamp, bytes int, compressedBytes int)
type recordOneMessageCallback func(mvcc hlc.Timestamp, bytes int, compressedBytes int)

func (m *sliMetrics) recordEmittedMessages() recordEmittedMessagesCallback {
func (m *sliMetrics) recordOneMessage() recordOneMessageCallback {
if m == nil {
return func(numMessages int, mvcc hlc.Timestamp, bytes int, compressedBytes int) {}
return func(mvcc hlc.Timestamp, bytes int, compressedBytes int) {}
}

start := timeutil.Now()
return func(numMessages int, mvcc hlc.Timestamp, bytes int, compressedBytes int) {
m.recordEmittedBatch(start, numMessages, mvcc, bytes, compressedBytes)
return func(mvcc hlc.Timestamp, bytes int, compressedBytes int) {
m.MessageSize.RecordValue(int64(bytes))
m.recordEmittedBatch(start, 1, mvcc, bytes, compressedBytes)
}
}

func (m *sliMetrics) recordMessageSize(sz int64) {
if m != nil {
m.MessageSize.RecordValue(sz)
}
}

@@ -309,16 +318,23 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
Measurement: "Changefeeds",
Unit: metric.Unit_COUNT,
}

metaMessageSize := metric.Metadata{
Name: "changefeed.message_size_hist",
Help: "Message size histogram",
Measurement: "Bytes",
Unit: metric.Unit_BYTES,
}
// NB: When adding new histograms, use sigFigs = 1. Older histograms
// retain significant figures of 2.
b := aggmetric.MakeBuilder("scope")
a := &AggMetrics{
ErrorRetries: b.Counter(metaChangefeedErrorRetries),
EmittedMessages: b.Counter(metaChangefeedEmittedMessages),
EmittedBytes: b.Counter(metaChangefeedEmittedBytes),
FlushedBytes: b.Counter(metaChangefeedFlushedBytes),
Flushes: b.Counter(metaChangefeedFlushes),
MessageSize: b.Histogram(metaMessageSize,
histogramWindow, 10<<20 /* 10MB max message size */, 1),
EmittedBytes: b.Counter(metaChangefeedEmittedBytes),
FlushedBytes: b.Counter(metaChangefeedFlushedBytes),
Flushes: b.Counter(metaChangefeedFlushes),

BatchHistNanos: b.Histogram(metaChangefeedBatchHistNanos,
histogramWindow, changefeedBatchHistMaxLatency.Nanoseconds(), 1),
@@ -376,6 +392,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {

sm := &sliMetrics{
EmittedMessages: a.EmittedMessages.AddChild(scope),
MessageSize: a.MessageSize.AddChild(scope),
EmittedBytes: a.EmittedBytes.AddChild(scope),
FlushedBytes: a.FlushedBytes.AddChild(scope),
BatchHistNanos: a.BatchHistNanos.AddChild(scope),
4 changes: 2 additions & 2 deletions pkg/ccl/changefeedccl/sink.go
Original file line number Diff line number Diff line change
@@ -322,7 +322,7 @@ func (s *bufferSink) EmitRow(
r kvevent.Alloc,
) error {
defer r.Release(ctx)
defer s.metrics.recordEmittedMessages()(1, mvcc, len(key)+len(value), sinkDoesNotCompress)
defer s.metrics.recordOneMessage()(mvcc, len(key)+len(value), sinkDoesNotCompress)

if s.closed {
return errors.New(`cannot EmitRow on a closed sink`)
@@ -417,7 +417,7 @@ func (n *nullSink) EmitRow(
r kvevent.Alloc,
) error {
defer r.Release(ctx)
defer n.metrics.recordEmittedMessages()(1, mvcc, len(key)+len(value), sinkDoesNotCompress)
defer n.metrics.recordOneMessage()(mvcc, len(key)+len(value), sinkDoesNotCompress)
if err := n.pace(ctx); err != nil {
return err
}
21 changes: 12 additions & 9 deletions pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ import (
"path/filepath"
"strings"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
@@ -32,6 +33,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/google/btree"
)
@@ -59,13 +61,13 @@ func cloudStorageFormatTime(ts hlc.Timestamp) string {

type cloudStorageSinkFile struct {
cloudStorageSinkKey
codec io.WriteCloser
rawSize int
numMessages int
buf bytes.Buffer
alloc kvevent.Alloc
oldestMVCC hlc.Timestamp
recordMetrics recordEmittedMessagesCallback
created time.Time
codec io.WriteCloser
rawSize int
numMessages int
buf bytes.Buffer
alloc kvevent.Alloc
oldestMVCC hlc.Timestamp
}

var _ io.Writer = &cloudStorageSinkFile{}
@@ -422,8 +424,8 @@ func (s *cloudStorageSink) getOrCreateFile(
return f
}
f := &cloudStorageSinkFile{
created: timeutil.Now(),
cloudStorageSinkKey: key,
recordMetrics: s.metrics.recordEmittedMessages(),
oldestMVCC: eventMVCC,
}
switch s.compression {
@@ -446,6 +448,7 @@ func (s *cloudStorageSink) EmitRow(
return errors.New(`cannot EmitRow on a closed sink`)
}

s.metrics.recordMessageSize(int64(len(key) + len(value)))
file := s.getOrCreateFile(topic, mvcc)
file.alloc.Merge(&alloc)

@@ -586,7 +589,7 @@ func (s *cloudStorageSink) flushFile(ctx context.Context, file *cloudStorageSink
if err := cloud.WriteFile(ctx, s.es, filepath.Join(s.dataFilePartition, filename), bytes.NewReader(file.buf.Bytes())); err != nil {
return err
}
file.recordMetrics(file.numMessages, file.oldestMVCC, file.rawSize, compressedBytes)
s.metrics.recordEmittedBatch(file.created, file.numMessages, file.oldestMVCC, file.rawSize, compressedBytes)

return nil
}
6 changes: 3 additions & 3 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
@@ -205,7 +205,7 @@ func (s *kafkaSink) Close() error {

type messageMetadata struct {
alloc kvevent.Alloc
updateMetrics recordEmittedMessagesCallback
updateMetrics recordOneMessageCallback
mvcc hlc.Timestamp
}

@@ -226,7 +226,7 @@ func (s *kafkaSink) EmitRow(
Topic: topic,
Key: sarama.ByteEncoder(key),
Value: sarama.ByteEncoder(value),
Metadata: messageMetadata{alloc: alloc, mvcc: mvcc, updateMetrics: s.metrics.recordEmittedMessages()},
Metadata: messageMetadata{alloc: alloc, mvcc: mvcc, updateMetrics: s.metrics.recordOneMessage()},
}
return s.emitMessage(ctx, msg)
}
@@ -369,7 +369,7 @@ func (s *kafkaSink) workerLoop() {

if m, ok := ackMsg.Metadata.(messageMetadata); ok {
if ackError == nil {
m.updateMetrics(1, m.mvcc, ackMsg.Key.Length()+ackMsg.Value.Length(), sinkDoesNotCompress)
m.updateMetrics(m.mvcc, ackMsg.Key.Length()+ackMsg.Value.Length(), sinkDoesNotCompress)
}
m.alloc.Release(s.ctx)
}
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/sink_sql.go
Original file line number Diff line number Diff line change
@@ -132,7 +132,7 @@ func (s *sqlSink) EmitRow(
alloc kvevent.Alloc,
) error {
defer alloc.Release(ctx)
defer s.metrics.recordEmittedMessages()(1, mvcc, len(key)+len(value), sinkDoesNotCompress)
defer s.metrics.recordOneMessage()(mvcc, len(key)+len(value), sinkDoesNotCompress)

topic := s.targetNames[topicDescr.GetID()]
if _, ok := s.topics[topic]; !ok {
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/sink_webhook.go
Original file line number Diff line number Diff line change
@@ -677,6 +677,7 @@ func (s *webhookSink) EmitRow(
emitTime: timeutil.Now(),
mvcc: mvcc,
}}:
s.metrics.recordMessageSize(int64(len(key) + len(value)))
}
return nil
}
30 changes: 30 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
@@ -139,6 +139,30 @@ errors as 'roachpb.InternalErrType'.
Measurement: "Errors",
Unit: metric.Unit_COUNT,
}
metaDistSenderRangefeedTotalRanges = metric.Metadata{
Name: "distsender.rangefeed.total_ranges",
Help: `Number of ranges executing rangefeed

This counts the number of ranges with an active rangefeed.
`,
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaDistSenderRangefeedCatchupRanges = metric.Metadata{
Name: "distsender.rangefeed.catchup_ranges",
Help: `Number of ranges in catchup mode

This counts the number of ranges with an active rangefeed that are performing catchup scan.
`,
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
metaDistSenderRangefeedErrorCatchupRanges = metric.Metadata{
Name: "distsender.rangefeed.error_catchup_ranges",
Help: `Number of ranges in catchup mode which experienced an error`,
Measurement: "Ranges",
Unit: metric.Unit_COUNT,
}
)

// CanSendToFollower is used by the DistSender to determine if it needs to look
@@ -202,6 +226,9 @@ type DistSenderMetrics struct {
InLeaseTransferBackoffs *metric.Counter
RangeLookups *metric.Counter
SlowRPCs *metric.Gauge
RangefeedRanges *metric.Gauge
RangefeedCatchupRanges *metric.Gauge
RangefeedErrorCatchup *metric.Counter
MethodCounts [roachpb.NumMethods]*metric.Counter
ErrCounts [roachpb.NumErrors]*metric.Counter
}
@@ -219,6 +246,9 @@ func makeDistSenderMetrics() DistSenderMetrics {
InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount),
RangeLookups: metric.NewCounter(metaDistSenderRangeLookups),
SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs),
RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges),
RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges),
RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges),
}
for i := range m.MethodCounts {
method := roachpb.Method(i).String()
26 changes: 26 additions & 0 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
@@ -269,7 +269,9 @@ func (ds *DistSender) partialRangeFeed(
},
}
rr.ranges.Store(active, nil)
ds.metrics.RangefeedRanges.Inc(1)
defer rr.ranges.Delete(active)
defer ds.metrics.RangefeedRanges.Dec(1)

// Start a retry loop for sending the batch to the range.
for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); {
@@ -389,6 +391,15 @@ func (ds *DistSender) singleRangeFeed(
}
defer transport.Release()

// checkpointSeen keeps track on whether we've seen range checkpoint.
// Initially set to true to avoid erroneously decrementing the counter.
checkpointSeen := true
defer func() {
if !checkpointSeen {
ds.metrics.RangefeedCatchupRanges.Dec(1)
}
}()

for {
if transport.IsExhausted() {
return args.Timestamp, newSendError(
@@ -411,6 +422,13 @@ func (ds *DistSender) singleRangeFeed(
}
continue
}

// Indicate this range is going to performs catchup scan.
// Counter decremented when range receive checkpoint event, or
// when this function terminates.
ds.metrics.RangefeedCatchupRanges.Inc(1)
checkpointSeen = false

for {
event, err := stream.Recv()
if err == io.EOF {
@@ -422,10 +440,18 @@ func (ds *DistSender) singleRangeFeed(
switch t := event.GetValue().(type) {
case *roachpb.RangeFeedCheckpoint:
if t.Span.Contains(args.Span) {
// If we see the first non-empty checkpoint, we know we're done with catchup scan.
if !t.ResolvedTS.IsEmpty() && !checkpointSeen {
checkpointSeen = true
ds.metrics.RangefeedCatchupRanges.Dec(1)
}
args.Timestamp.Forward(t.ResolvedTS)
}
case *roachpb.RangeFeedError:
log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError())
if !checkpointSeen {
ds.metrics.RangefeedErrorCatchup.Inc(1)
}
return args.Timestamp, t.Error.GoError()
}
onRangeEvent(args.Replica.NodeID, desc.RangeID, event)
13 changes: 13 additions & 0 deletions pkg/ts/catalog/chart_catalog.go
Original file line number Diff line number Diff line change
@@ -277,6 +277,19 @@ var charts = []sectionDescription{
"distsender.rangelookups",
},
},
{
Title: "Rangefeed",
Metrics: []string{
"distsender.rangefeed.total_ranges",
"distsender.rangefeed.catchup_ranges",
},
},
{
Title: "Rangefeed Errors",
Metrics: []string{
"distsender.rangefeed.error_catchup_ranges",
},
},
{
Title: "RPCs",
Metrics: []string{