Skip to content

Commit

Permalink
changefeedccl: update parallel consumer metrics
Browse files Browse the repository at this point in the history
Previously, both changefeed.nprocs_flush_nanos and
changefeed.nprocs_consume_event_nanos were counters that monotonically
increased. This was not that useful when determining the average time it
takes to consume or flush an event. Changing them to a histogram fixes
this issue and allows for percentile values like p90, p99.

This change also updates changefeed.nprocs_in_flight_count to sample
values when incrementing inFlight variable. Previously, it was
showing up at 0 in the UI. This change makes it show the actual value.

Fixes: #89654

Release note: None
  • Loading branch information
jayshrivastava committed Oct 10, 2022
1 parent 9e7e704 commit a8319ee
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 8 deletions.
7 changes: 3 additions & 4 deletions pkg/ccl/changefeedccl/event_processing.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ func (c *parallelEventConsumer) ConsumeEvent(ctx context.Context, ev kvevent.Eve
startTime := timeutil.Now().UnixNano()
defer func() {
time := timeutil.Now().UnixNano()
c.metrics.ParallelConsumerConsumeNanos.Inc(time - startTime)
c.metrics.ParallelConsumerConsumeNanos.RecordValue(time - startTime)
}()

bucket := c.getBucketForEvent(ev)
Expand Down Expand Up @@ -486,14 +486,13 @@ func (c *parallelEventConsumer) workerLoop(
func (c *parallelEventConsumer) incInFlight() {
c.mu.Lock()
c.mu.inFlight++
c.metrics.ParallelConsumerInFlightEvents.Update(int64(c.mu.inFlight))
c.mu.Unlock()
c.metrics.ParallelConsumerInFlightEvents.Inc(1)
}

func (c *parallelEventConsumer) decInFlight() {
c.mu.Lock()
c.mu.inFlight--
c.metrics.ParallelConsumerInFlightEvents.Dec(1)
notifyFlush := c.mu.waiting && c.mu.inFlight == 0
c.mu.Unlock()

Expand Down Expand Up @@ -521,7 +520,7 @@ func (c *parallelEventConsumer) Flush(ctx context.Context) error {
startTime := timeutil.Now().UnixNano()
defer func() {
time := timeutil.Now().UnixNano()
c.metrics.ParallelConsumerFlushNanos.Inc(time - startTime)
c.metrics.ParallelConsumerFlushNanos.RecordValue(time - startTime)
}()

needFlush := func() bool {
Expand Down
8 changes: 4 additions & 4 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,8 +576,8 @@ type Metrics struct {
FrontierUpdates *metric.Counter
ThrottleMetrics cdcutils.Metrics
ReplanCount *metric.Counter
ParallelConsumerFlushNanos *metric.Counter
ParallelConsumerConsumeNanos *metric.Counter
ParallelConsumerFlushNanos *metric.Histogram
ParallelConsumerConsumeNanos *metric.Histogram
ParallelConsumerInFlightEvents *metric.Gauge

mu struct {
Expand Down Expand Up @@ -609,8 +609,8 @@ func MakeMetrics(histogramWindow time.Duration) metric.Struct {
FrontierUpdates: metric.NewCounter(metaChangefeedFrontierUpdates),
ThrottleMetrics: cdcutils.MakeMetrics(histogramWindow),
ReplanCount: metric.NewCounter(metaChangefeedReplanCount),
ParallelConsumerFlushNanos: metric.NewCounter(metaChangefeedEventConsumerFlushNanos),
ParallelConsumerConsumeNanos: metric.NewCounter(metaChangefeedEventConsumerConsumeNanos),
ParallelConsumerFlushNanos: metric.NewHistogram(metaChangefeedEventConsumerFlushNanos, histogramWindow, metric.IOLatencyBuckets),
ParallelConsumerConsumeNanos: metric.NewHistogram(metaChangefeedEventConsumerConsumeNanos, histogramWindow, metric.IOLatencyBuckets),
ParallelConsumerInFlightEvents: metric.NewGauge(metaChangefeedEventConsumerInFlightEvents),
}

Expand Down

0 comments on commit a8319ee

Please sign in to comment.