Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: add parallel io metrics #115785

Merged
merged 1 commit into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,10 @@
<tr><td>APPLICATION</td><td>changefeed.nprocs_consume_event_nanos</td><td>Total time spent waiting to add an event to the parallel consumer</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.nprocs_flush_nanos</td><td>Total time spent idle waiting for the parallel consumer to flush</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.nprocs_in_flight_count</td><td>Number of buffered events in the parallel consumer</td><td>Count of Events</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.parallel_io_queue_nanos</td><td>Time spent with outgoing requests to the sink waiting in queue due to inflight requests with conflicting keys</td><td>Changefeeds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.parallel_io_in_flight_keys</td><td>The number of keys currently in-flight which may contend with batches pending to be emitted</td><td>Keys</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.parallel_io_pending_rows</td><td>Number of rows which are blocked from being sent due to conflicting in-flight keys</td><td>Keys</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.parallel_io_queue_nanos</td><td>Time that outgoing requests to the sink spend waiting in a queue due to in-flight requests with conflicting keys</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.parallel_io_result_queue_nanos</td><td>Time that incoming results from the sink spend waiting in parallel io emitter before they are acknowledged by the changefeed</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.queue_time_nanos</td><td>Time KV event spent waiting to be processed</td><td>Nanoseconds</td><td>COUNTER</td><td>NANOSECONDS</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.running</td><td>Number of currently running changefeeds, including sinkless</td><td>Changefeeds</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.schema_registry.registrations</td><td>Number of registration attempts with the schema registry</td><td>Registrations</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/batching_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ func (sb *sinkBatch) Keys() intsets.Fast {
return sb.keys
}

// NumMessages implements the IORequest interface.
func (sb *sinkBatch) NumMessages() int {
return sb.numMessages
}

func (sb *sinkBatch) isEmpty() bool {
return sb.numMessages == 0
}
Expand Down
83 changes: 83 additions & 0 deletions pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9123,3 +9123,86 @@ func TestBatchSizeMetric(t *testing.T) {
}
cdcTest(t, testFn)
}

// TestParallelIOMetrics tests parallel io metrics.
func TestParallelIOMetrics(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
registry := s.Server.JobRegistry().(*jobs.Registry)
metrics := registry.MetricsStruct().Changefeed.(*Metrics).AggMetrics

// Add delay so queuing occurs, which results in the below metrics being
// nonzero.
defer testingEnableQueuingDelay()()

db := sqlutils.MakeSQLRunner(s.DB)
db.Exec(t, `SET CLUSTER SETTING changefeed.new_pubsub_sink_enabled = true`)
db.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 1`)
db.Exec(t, `
CREATE TABLE foo (a INT PRIMARY KEY);
`)

// Keep writing data to the same key to ensure contention.
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()

g := ctxgroup.WithContext(ctx)
done := make(chan struct{})
g.Go(func() error {
for {
select {
case <-ctx.Done():
return nil
case <-done:
return nil
default:
_, err := s.DB.Exec(`UPSERT INTO foo (a) SELECT * FROM generate_series(1, 10)`)
if err != nil {
return err
}
}
}
})
// Set the frequency to 1s. The default frequency at the time of writing is
foo, err := f.Feed("CREATE CHANGEFEED FOR TABLE foo WITH pubsub_sink_config=" +
"'{\"Flush\": {\"Frequency\": \"100ms\"}}'")
require.NoError(t, err)

testutils.SucceedsSoon(t, func() error {
numSamples, sum := metrics.ParallelIOPendingQueueNanos.TotalWindowed()
if numSamples <= 0 && sum <= 0.0 {
return errors.Newf("waiting for queue nanos: %d %f", numSamples, sum)
}
return nil
})
testutils.SucceedsSoon(t, func() error {
pendingKeys := metrics.ParallelIOPendingRows.Value()
if pendingKeys <= 0 {
return errors.Newf("waiting for pending keys: %d", pendingKeys)
}
return nil
})
testutils.SucceedsSoon(t, func() error {
for i := 0; i < 50; i++ {
inFlightKeys := metrics.ParallelIOInFlightKeys.Value()
if inFlightKeys > 0 {
return nil
}
}
return errors.New("waiting for in-flight keys")
})
testutils.SucceedsSoon(t, func() error {
numSamples, sum := metrics.ParallelIOResultQueueNanos.TotalWindowed()
if numSamples <= 0 && sum <= 0.0 {
return errors.Newf("waiting for result queue nanos: %d %f", numSamples, sum)
}
return nil
})
close(done)
require.NoError(t, g.Wait())
require.NoError(t, foo.Close())
}
cdcTest(t, testFn, feedTestForceSink("pubsub"))
}
Loading