Skip to content

Commit

Permalink
Merge pull request #109744 from jayshrivastava/checkpoint-progress-ba…
Browse files Browse the repository at this point in the history
…ckport-release-23.1

release-23.1: changefeedccl: add changefeed_progress and aggregator_progress sli metrics
  • Loading branch information
jayshrivastava authored Aug 31, 2023
2 parents cca0ceb + cf4cbfd commit 1d9ecd7
Show file tree
Hide file tree
Showing 6 changed files with 349 additions and 3 deletions.
37 changes: 34 additions & 3 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type changeAggregator struct {

metrics *Metrics
sliMetrics *sliMetrics
sliMetricsID int64
closeTelemetryRecorder func()
knobs TestingKnobs
}
Expand Down Expand Up @@ -262,6 +263,7 @@ func (ca *changeAggregator) Start(ctx context.Context) {
ca.cancel()
return
}
ca.sliMetricsID = ca.sliMetrics.claimId()

// TODO(jayant): add support for sinkless changefeeds using UUID
recorder := metricsRecorder(ca.sliMetrics)
Expand Down Expand Up @@ -493,6 +495,9 @@ func (ca *changeAggregator) close() {
// Best effort: context is often cancel by now, so we expect to see an error
_ = ca.sink.Close()
}

ca.closeMetrics()

ca.memAcc.Close(ca.Ctx())
if ca.kvFeedMemMon != nil {
ca.kvFeedMemMon.Stop(ca.Ctx())
Expand Down Expand Up @@ -593,6 +598,11 @@ func (ca *changeAggregator) noteResolvedSpan(resolved jobspb.ResolvedSpan) error
return err
}

// The resolved sliMetric data backs the aggregator_progress metric
if advanced {
ca.sliMetrics.setResolved(ca.sliMetricsID, ca.frontier.Frontier())
}

forceFlush := resolved.BoundaryType != jobspb.ResolvedSpan_NONE

checkpointFrontier := advanced &&
Expand Down Expand Up @@ -674,6 +684,12 @@ func (ca *changeAggregator) emitResolved(batch jobspb.ResolvedSpans) error {
return nil
}

// closeMetrics de-registers the aggregator from the sliMetrics registry so that
// it's no longer considered by the aggregator_progress gauge
func (ca *changeAggregator) closeMetrics() {
ca.sliMetrics.closeId(ca.sliMetricsID)
}

// ConsumerClosed is part of the RowSource interface.
func (ca *changeAggregator) ConsumerClosed() {
// The consumer is done, Next() will not be called again.
Expand Down Expand Up @@ -738,9 +754,12 @@ type changeFrontier struct {
// metrics are monitoring counters shared between all changefeeds.
metrics *Metrics
sliMetrics *sliMetrics
// metricsID is used as the unique id of this changefeed in the
// metrics.MaxBehindNanos map.
metricsID int

// sliMetricsID and metricsID uniquely identify the changefeed in the metrics's
// map (a shared struct across all changefeeds on the node) and the sliMetrics's
// map (shared structure between all feeds within the same scope on the node).
metricsID int
sliMetricsID int64

knobs TestingKnobs
}
Expand Down Expand Up @@ -1045,6 +1064,9 @@ func (cf *changeFrontier) Start(ctx context.Context) {
cf.metrics.mu.id++
sli.RunningCount.Inc(1)
cf.metrics.mu.Unlock()

cf.sliMetricsID = cf.sliMetrics.claimId()

// TODO(dan): It's very important that we de-register from the metric because
// if we orphan an entry in there, our monitoring will lie (say the changefeed
// is behind when it may not be). We call this in `close` but that doesn't
Expand Down Expand Up @@ -1084,6 +1106,8 @@ func (cf *changeFrontier) closeMetrics() {
delete(cf.metrics.mu.resolved, cf.metricsID)
cf.metricsID = -1
cf.metrics.mu.Unlock()

cf.sliMetrics.closeId(cf.sliMetricsID)
}

// Next is part of the RowSource interface.
Expand Down Expand Up @@ -1215,6 +1239,13 @@ func (cf *changeFrontier) forwardFrontier(resolved jobspb.ResolvedSpan) error {
// Keeping this after the checkpointJobProgress call will avoid
// some duplicates if a restart happens.
newResolved := cf.frontier.Frontier()

// The feed's checkpoint is tracked in a map which is used to inform the
// checkpoint_progress metric which will return the lowest timestamp across
// all feeds in the scope.
cf.sliMetrics.setCheckpoint(cf.sliMetricsID, newResolved)

// This backs max_behind_nanos which is deprecated in favor of checkpoint_progress
cf.metrics.mu.Lock()
if cf.metricsID != -1 {
cf.metrics.mu.resolved[cf.metricsID] = newResolved
Expand Down
156 changes: 156 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,162 @@ func TestToJSONAsChangefeed(t *testing.T) {
cdcTest(t, testFn)
}

// TestChangefeedProgressMetrics tests the changefeed.aggregator_progress and
// changefeed.checkpoint_progress metrics.
func TestChangefeedProgressMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Verify the aggmetric functional gauges work correctly
t.Run("aggregate functional gauge", func(t *testing.T) {
cdcTest(t, func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
registry := s.Server.JobRegistry().(*jobs.Registry)
metrics := registry.MetricsStruct().Changefeed.(*Metrics)
defaultSLI, err := metrics.getSLIMetrics(defaultSLIScope)
require.NoError(t, err)
sliA, err := metrics.getSLIMetrics("scope_a")
require.NoError(t, err)
sliB, err := metrics.getSLIMetrics("scope_b")
require.NoError(t, err)

defaultSLI.mu.checkpoint[5] = hlc.Timestamp{WallTime: 1}

sliA.mu.checkpoint[1] = hlc.Timestamp{WallTime: 2}
sliA.mu.checkpoint[2] = hlc.Timestamp{WallTime: 5}
sliA.mu.checkpoint[3] = hlc.Timestamp{WallTime: 0} // Zero timestamp should be ignored.

sliB.mu.checkpoint[1] = hlc.Timestamp{WallTime: 4}
sliB.mu.checkpoint[2] = hlc.Timestamp{WallTime: 9}

// Ensure each scope gets the correct value
require.Equal(t, int64(1), defaultSLI.CheckpointProgress.Value())
require.Equal(t, int64(2), sliA.CheckpointProgress.Value())
require.Equal(t, int64(4), sliB.CheckpointProgress.Value())

// Ensure the value progresses upon changefeed progress
defaultSLI.mu.checkpoint[5] = hlc.Timestamp{WallTime: 20}
require.Equal(t, int64(20), defaultSLI.CheckpointProgress.Value())

// Ensure the value updates correctly upon changefeeds completing
delete(sliB.mu.checkpoint, 1)
require.Equal(t, int64(9), sliB.CheckpointProgress.Value())
delete(sliB.mu.checkpoint, 2)
require.Equal(t, int64(0), sliB.CheckpointProgress.Value())

// Ensure the aggregate value is correct after progress / completion
require.Equal(t, int64(2), metrics.AggMetrics.CheckpointProgress.Value())
sliA.mu.checkpoint[1] = hlc.Timestamp{WallTime: 30}
require.Equal(t, int64(5), metrics.AggMetrics.CheckpointProgress.Value())
delete(sliA.mu.checkpoint, 2)
require.Equal(t, int64(20), metrics.AggMetrics.CheckpointProgress.Value())
delete(defaultSLI.mu.checkpoint, 5)
require.Equal(t, int64(30), metrics.AggMetrics.CheckpointProgress.Value())
delete(sliA.mu.checkpoint, 1)
require.Equal(t, int64(0), metrics.AggMetrics.CheckpointProgress.Value())
})
})

// Verify that ids must be registered to have an effect.
t.Run("id registration", func(t *testing.T) {
cdcTest(t, func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
registry := s.Server.JobRegistry().(*jobs.Registry)
metrics := registry.MetricsStruct().Changefeed.(*Metrics)
sliA, err := metrics.getSLIMetrics("scope_a")
require.NoError(t, err)

unregisteredID := int64(999)
id1 := sliA.claimId()
id2 := sliA.claimId()
id3 := sliA.claimId()
sliA.setResolved(unregisteredID, hlc.Timestamp{WallTime: 0})
sliA.setResolved(id1, hlc.Timestamp{WallTime: 1})
sliA.setResolved(id2, hlc.Timestamp{WallTime: 2})
sliA.setResolved(id3, hlc.Timestamp{WallTime: 3})

sliA.setCheckpoint(unregisteredID, hlc.Timestamp{WallTime: 0})
sliA.setCheckpoint(id1, hlc.Timestamp{WallTime: 1})
sliA.setCheckpoint(id2, hlc.Timestamp{WallTime: 2})
sliA.setCheckpoint(id3, hlc.Timestamp{WallTime: 3})

require.Equal(t, int64(1), metrics.AggMetrics.CheckpointProgress.Value())
require.Equal(t, int64(1), metrics.AggMetrics.AggregatorProgress.Value())

sliA.closeId(id1)

require.Equal(t, int64(2), metrics.AggMetrics.CheckpointProgress.Value())
require.Equal(t, int64(2), metrics.AggMetrics.AggregatorProgress.Value())

})
})

// Verify that a changefeed updates the timestamps as it progresses
t.Run("running changefeed", func(t *testing.T) {
cdcTest(t, func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
sqlDB := sqlutils.MakeSQLRunner(s.DB)

sqlDB.Exec(t, `CREATE TABLE foo (a INT PRIMARY KEY)`)
fooA := feed(t, f, `CREATE CHANGEFEED FOR foo WITH metrics_label='label_a', resolved='100ms'`)

registry := s.Server.JobRegistry().(*jobs.Registry)
metrics := registry.MetricsStruct().Changefeed.(*Metrics)
sliA, err := metrics.getSLIMetrics("label_a")
require.NoError(t, err)

// Verify that aggregator_progress has recurring updates
var lastTimestamp int64 = 0
for i := 0; i < 3; i++ {
testutils.SucceedsSoon(t, func() error {
progress := sliA.AggregatorProgress.Value()
if progress > lastTimestamp {
lastTimestamp = progress
return nil
}
return errors.Newf("waiting for aggregator_progress to advance from %d (value=%d)",
lastTimestamp, progress)
})
}

// Verify that checkpoint_progress has recurring updates
for i := 0; i < 3; i++ {
testutils.SucceedsSoon(t, func() error {
progress := sliA.CheckpointProgress.Value()
if progress > lastTimestamp {
lastTimestamp = progress
return nil
}
return errors.Newf("waiting for checkpoint_progress to advance from %d (value=%d)",
lastTimestamp, progress)
})
}

sliB, err := registry.MetricsStruct().Changefeed.(*Metrics).getSLIMetrics("label_b")
require.Equal(t, int64(0), sliB.AggregatorProgress.Value())
fooB := feed(t, f, `CREATE CHANGEFEED FOR foo WITH metrics_label='label_b', resolved='100ms'`)
defer closeFeed(t, fooB)
require.NoError(t, err)
// Verify that aggregator_progress has recurring updates
testutils.SucceedsSoon(t, func() error {
progress := sliB.AggregatorProgress.Value()
if progress > 0 {
return nil
}
return errors.Newf("waiting for second aggregator_progress to advance (value=%d)", progress)
})

closeFeed(t, fooA)
testutils.SucceedsSoon(t, func() error {
aggregatorProgress := sliA.AggregatorProgress.Value()
checkpointProgress := sliA.CheckpointProgress.Value()
if aggregatorProgress == 0 && checkpointProgress == 0 {
return nil
}
return errors.Newf("waiting for progress metrics to be 0 (ap=%d, cp=%d)",
aggregatorProgress, checkpointProgress)
})
})
})
}

func TestChangefeedIdleness(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
Loading

0 comments on commit 1d9ecd7

Please sign in to comment.