diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index dd5a5b3150ce..f7b384b610a7 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -767,7 +767,10 @@
APPLICATION | changefeed.nprocs_consume_event_nanos | Total time spent waiting to add an event to the parallel consumer | Nanoseconds | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | changefeed.nprocs_flush_nanos | Total time spent idle waiting for the parallel consumer to flush | Nanoseconds | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | changefeed.nprocs_in_flight_count | Number of buffered events in the parallel consumer | Count of Events | GAUGE | COUNT | AVG | NONE |
-APPLICATION | changefeed.parallel_io_queue_nanos | Time spent with outgoing requests to the sink waiting in queue due to inflight requests with conflicting keys | Changefeeds | HISTOGRAM | NANOSECONDS | AVG | NONE |
+APPLICATION | changefeed.parallel_io_in_flight_keys | The number of keys currently in-flight which may contend with batches pending to be emitted | Keys | GAUGE | COUNT | AVG | NONE |
+APPLICATION | changefeed.parallel_io_pending_rows | Number of rows which are blocked from being sent due to conflicting in-flight keys | Keys | GAUGE | COUNT | AVG | NONE |
+APPLICATION | changefeed.parallel_io_queue_nanos | Time that outgoing requests to the sink spend waiting in a queue due to in-flight requests with conflicting keys | Nanoseconds | HISTOGRAM | NANOSECONDS | AVG | NONE |
+APPLICATION | changefeed.parallel_io_result_queue_nanos | Time that incoming results from the sink spend waiting in parallel io emitter before they are acknowledged by the changefeed | Nanoseconds | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | changefeed.queue_time_nanos | Time KV event spent waiting to be processed | Nanoseconds | COUNTER | NANOSECONDS | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | changefeed.running | Number of currently running changefeeds, including sinkless | Changefeeds | GAUGE | COUNT | AVG | NONE |
APPLICATION | changefeed.schema_registry.registrations | Number of registration attempts with the schema registry | Registrations | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
diff --git a/pkg/ccl/changefeedccl/batching_sink.go b/pkg/ccl/changefeedccl/batching_sink.go
index e3bbcf30ab9d..c8537092201b 100644
--- a/pkg/ccl/changefeedccl/batching_sink.go
+++ b/pkg/ccl/changefeedccl/batching_sink.go
@@ -258,6 +258,11 @@ func (sb *sinkBatch) Keys() intsets.Fast {
return sb.keys
}
+// NumMessages implements the IORequest interface.
+func (sb *sinkBatch) NumMessages() int {
+ return sb.numMessages
+}
+
func (sb *sinkBatch) isEmpty() bool {
return sb.numMessages == 0
}
diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go
index a85a02e4329b..3c1b5027bfb3 100644
--- a/pkg/ccl/changefeedccl/changefeed_test.go
+++ b/pkg/ccl/changefeedccl/changefeed_test.go
@@ -9123,3 +9123,86 @@ func TestBatchSizeMetric(t *testing.T) {
}
cdcTest(t, testFn)
}
+
+// TestParallelIOMetrics tests parallel io metrics.
+func TestParallelIOMetrics(t *testing.T) {
+ defer leaktest.AfterTest(t)()
+ defer log.Scope(t).Close(t)
+
+ testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) {
+ registry := s.Server.JobRegistry().(*jobs.Registry)
+ metrics := registry.MetricsStruct().Changefeed.(*Metrics).AggMetrics
+
+ // Add delay so queuing occurs, which results in the below metrics being
+ // nonzero.
+ defer testingEnableQueuingDelay()()
+
+ db := sqlutils.MakeSQLRunner(s.DB)
+ db.Exec(t, `SET CLUSTER SETTING changefeed.new_pubsub_sink_enabled = true`)
+ db.Exec(t, `SET CLUSTER SETTING changefeed.sink_io_workers = 1`)
+ db.Exec(t, `
+ CREATE TABLE foo (a INT PRIMARY KEY);
+ `)
+
+ // Keep writing data to the same key to ensure contention.
+ ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
+ defer cancel()
+
+ g := ctxgroup.WithContext(ctx)
+ done := make(chan struct{})
+ g.Go(func() error {
+ for {
+ select {
+ case <-ctx.Done():
+ return nil
+ case <-done:
+ return nil
+ default:
+ _, err := s.DB.Exec(`UPSERT INTO foo (a) SELECT * FROM generate_series(1, 10)`)
+ if err != nil {
+ return err
+ }
+ }
+ }
+ })
+ // Set the frequency to 1s. The default frequency at the time of writing is
+ foo, err := f.Feed("CREATE CHANGEFEED FOR TABLE foo WITH pubsub_sink_config=" +
+ "'{\"Flush\": {\"Frequency\": \"100ms\"}}'")
+ require.NoError(t, err)
+
+ testutils.SucceedsSoon(t, func() error {
+ numSamples, sum := metrics.ParallelIOPendingQueueNanos.TotalWindowed()
+ if numSamples <= 0 && sum <= 0.0 {
+ return errors.Newf("waiting for queue nanos: %d %f", numSamples, sum)
+ }
+ return nil
+ })
+ testutils.SucceedsSoon(t, func() error {
+ pendingKeys := metrics.ParallelIOPendingRows.Value()
+ if pendingKeys <= 0 {
+ return errors.Newf("waiting for pending keys: %d", pendingKeys)
+ }
+ return nil
+ })
+ testutils.SucceedsSoon(t, func() error {
+ for i := 0; i < 50; i++ {
+ inFlightKeys := metrics.ParallelIOInFlightKeys.Value()
+ if inFlightKeys > 0 {
+ return nil
+ }
+ }
+ return errors.New("waiting for in-flight keys")
+ })
+ testutils.SucceedsSoon(t, func() error {
+ numSamples, sum := metrics.ParallelIOResultQueueNanos.TotalWindowed()
+ if numSamples <= 0 && sum <= 0.0 {
+ return errors.Newf("waiting for result queue nanos: %d %f", numSamples, sum)
+ }
+ return nil
+ })
+ close(done)
+ require.NoError(t, g.Wait())
+ require.NoError(t, foo.Close())
+ }
+ cdcTest(t, testFn, feedTestForceSink("pubsub"))
+}
diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go
index 179c0b21d4b1..74aa8eca110b 100644
--- a/pkg/ccl/changefeedccl/metrics.go
+++ b/pkg/ccl/changefeedccl/metrics.go
@@ -47,32 +47,35 @@ const defaultSLIScope = "default"
// AggMetrics are aggregated metrics keeping track of aggregated changefeed performance
// indicators, combined with a limited number of per-changefeed indicators.
type AggMetrics struct {
- EmittedMessages *aggmetric.AggCounter
- EmittedBatchSizes *aggmetric.AggHistogram
- FilteredMessages *aggmetric.AggCounter
- MessageSize *aggmetric.AggHistogram
- EmittedBytes *aggmetric.AggCounter
- FlushedBytes *aggmetric.AggCounter
- BatchHistNanos *aggmetric.AggHistogram
- Flushes *aggmetric.AggCounter
- FlushHistNanos *aggmetric.AggHistogram
- SizeBasedFlushes *aggmetric.AggCounter
- ParallelIOQueueNanos *aggmetric.AggHistogram
- SinkIOInflight *aggmetric.AggGauge
- CommitLatency *aggmetric.AggHistogram
- BackfillCount *aggmetric.AggGauge
- BackfillPendingRanges *aggmetric.AggGauge
- ErrorRetries *aggmetric.AggCounter
- AdmitLatency *aggmetric.AggHistogram
- RunningCount *aggmetric.AggGauge
- BatchReductionCount *aggmetric.AggGauge
- InternalRetryMessageCount *aggmetric.AggGauge
- SchemaRegistrations *aggmetric.AggCounter
- SchemaRegistryRetries *aggmetric.AggCounter
- AggregatorProgress *aggmetric.AggGauge
- CheckpointProgress *aggmetric.AggGauge
- LaggingRanges *aggmetric.AggGauge
- CloudstorageBufferedBytes *aggmetric.AggGauge
+ EmittedMessages *aggmetric.AggCounter
+ EmittedBatchSizes *aggmetric.AggHistogram
+ FilteredMessages *aggmetric.AggCounter
+ MessageSize *aggmetric.AggHistogram
+ EmittedBytes *aggmetric.AggCounter
+ FlushedBytes *aggmetric.AggCounter
+ BatchHistNanos *aggmetric.AggHistogram
+ Flushes *aggmetric.AggCounter
+ FlushHistNanos *aggmetric.AggHistogram
+ SizeBasedFlushes *aggmetric.AggCounter
+ ParallelIOPendingQueueNanos *aggmetric.AggHistogram
+ ParallelIOPendingRows *aggmetric.AggGauge
+ ParallelIOResultQueueNanos *aggmetric.AggHistogram
+ ParallelIOInFlightKeys *aggmetric.AggGauge
+ SinkIOInflight *aggmetric.AggGauge
+ CommitLatency *aggmetric.AggHistogram
+ BackfillCount *aggmetric.AggGauge
+ BackfillPendingRanges *aggmetric.AggGauge
+ ErrorRetries *aggmetric.AggCounter
+ AdmitLatency *aggmetric.AggHistogram
+ RunningCount *aggmetric.AggGauge
+ BatchReductionCount *aggmetric.AggGauge
+ InternalRetryMessageCount *aggmetric.AggGauge
+ SchemaRegistrations *aggmetric.AggCounter
+ SchemaRegistryRetries *aggmetric.AggCounter
+ AggregatorProgress *aggmetric.AggGauge
+ CheckpointProgress *aggmetric.AggGauge
+ LaggingRanges *aggmetric.AggGauge
+ CloudstorageBufferedBytes *aggmetric.AggGauge
// There is always at least 1 sliMetrics created for defaultSLI scope.
mu struct {
@@ -100,7 +103,7 @@ type metricsRecorder interface {
getBackfillCallback() func() func()
getBackfillRangeCallback() func(int64) (func(), func())
recordSizeBasedFlush()
- recordParallelIOQueueLatency(time.Duration)
+ newParallelIOMetricsRecorder() parallelIOMetricsRecorder
recordSinkIOInflightChange(int64)
makeCloudstorageFileAllocCallback() func(delta int64)
}
@@ -113,32 +116,35 @@ func (a *AggMetrics) MetricStruct() {}
// sliMetrics holds all SLI related metrics aggregated into AggMetrics.
type sliMetrics struct {
- EmittedMessages *aggmetric.Counter
- EmittedBatchSizes *aggmetric.Histogram
- FilteredMessages *aggmetric.Counter
- MessageSize *aggmetric.Histogram
- EmittedBytes *aggmetric.Counter
- FlushedBytes *aggmetric.Counter
- BatchHistNanos *aggmetric.Histogram
- Flushes *aggmetric.Counter
- FlushHistNanos *aggmetric.Histogram
- SizeBasedFlushes *aggmetric.Counter
- ParallelIOQueueNanos *aggmetric.Histogram
- SinkIOInflight *aggmetric.Gauge
- CommitLatency *aggmetric.Histogram
- ErrorRetries *aggmetric.Counter
- AdmitLatency *aggmetric.Histogram
- BackfillCount *aggmetric.Gauge
- BackfillPendingRanges *aggmetric.Gauge
- RunningCount *aggmetric.Gauge
- BatchReductionCount *aggmetric.Gauge
- InternalRetryMessageCount *aggmetric.Gauge
- SchemaRegistrations *aggmetric.Counter
- SchemaRegistryRetries *aggmetric.Counter
- AggregatorProgress *aggmetric.Gauge
- CheckpointProgress *aggmetric.Gauge
- LaggingRanges *aggmetric.Gauge
- CloudstorageBufferedBytes *aggmetric.Gauge
+ EmittedMessages *aggmetric.Counter
+ EmittedBatchSizes *aggmetric.Histogram
+ FilteredMessages *aggmetric.Counter
+ MessageSize *aggmetric.Histogram
+ EmittedBytes *aggmetric.Counter
+ FlushedBytes *aggmetric.Counter
+ BatchHistNanos *aggmetric.Histogram
+ Flushes *aggmetric.Counter
+ FlushHistNanos *aggmetric.Histogram
+ SizeBasedFlushes *aggmetric.Counter
+ ParallelIOPendingQueueNanos *aggmetric.Histogram
+ ParallelIOPendingRows *aggmetric.Gauge
+ ParallelIOResultQueueNanos *aggmetric.Histogram
+ ParallelIOInFlightKeys *aggmetric.Gauge
+ SinkIOInflight *aggmetric.Gauge
+ CommitLatency *aggmetric.Histogram
+ ErrorRetries *aggmetric.Counter
+ AdmitLatency *aggmetric.Histogram
+ BackfillCount *aggmetric.Gauge
+ BackfillPendingRanges *aggmetric.Gauge
+ RunningCount *aggmetric.Gauge
+ BatchReductionCount *aggmetric.Gauge
+ InternalRetryMessageCount *aggmetric.Gauge
+ SchemaRegistrations *aggmetric.Counter
+ SchemaRegistryRetries *aggmetric.Counter
+ AggregatorProgress *aggmetric.Gauge
+ CheckpointProgress *aggmetric.Gauge
+ LaggingRanges *aggmetric.Gauge
+ CloudstorageBufferedBytes *aggmetric.Gauge
mu struct {
syncutil.Mutex
@@ -319,12 +325,61 @@ func (m *sliMetrics) recordSizeBasedFlush() {
m.SizeBasedFlushes.Inc(1)
}
-func (m *sliMetrics) recordParallelIOQueueLatency(latency time.Duration) {
- if m == nil {
+type parallelIOMetricsRecorder interface {
+ recordPendingQueuePush(numKeys int64)
+ recordPendingQueuePop(numKeys int64, latency time.Duration)
+ recordResultQueueLatency(latency time.Duration)
+ setInFlightKeys(n int64)
+}
+
+type parallelIOMetricsRecorderImpl struct {
+ pendingQueueNanos *aggmetric.Histogram
+ pendingRows *aggmetric.Gauge
+ resultQueueNanos *aggmetric.Histogram
+ inFlight *aggmetric.Gauge
+}
+
+func (p *parallelIOMetricsRecorderImpl) setInFlightKeys(n int64) {
+ if p == nil {
return
}
- m.ParallelIOQueueNanos.RecordValue(latency.Nanoseconds())
+ p.inFlight.Update(n)
+}
+
+func (p *parallelIOMetricsRecorderImpl) recordResultQueueLatency(latency time.Duration) {
+ if p == nil {
+ return
+ }
+ p.resultQueueNanos.RecordValue(latency.Nanoseconds())
+}
+
+func (p *parallelIOMetricsRecorderImpl) recordPendingQueuePush(n int64) {
+ if p == nil {
+ return
+ }
+ p.pendingRows.Inc(n)
+}
+
+func (p *parallelIOMetricsRecorderImpl) recordPendingQueuePop(n int64, latency time.Duration) {
+ if p == nil {
+ return
+ }
+ p.pendingRows.Dec(n)
+ p.pendingQueueNanos.RecordValue(latency.Nanoseconds())
+}
+
+func (m *sliMetrics) newParallelIOMetricsRecorder() parallelIOMetricsRecorder {
+ if m == nil {
+ return (*parallelIOMetricsRecorderImpl)(nil)
+ }
+ return ¶llelIOMetricsRecorderImpl{
+ pendingQueueNanos: m.ParallelIOPendingQueueNanos,
+ pendingRows: m.ParallelIOPendingRows,
+ resultQueueNanos: m.ParallelIOResultQueueNanos,
+ inFlight: m.ParallelIOInFlightKeys,
+ }
}
+
func (m *sliMetrics) recordSinkIOInflightChange(delta int64) {
if m == nil {
return
@@ -407,14 +462,14 @@ func (w *wrappingCostController) recordSizeBasedFlush() {
w.inner.recordSizeBasedFlush()
}
-func (w *wrappingCostController) recordParallelIOQueueLatency(latency time.Duration) {
- w.inner.recordParallelIOQueueLatency(latency)
-}
-
func (w *wrappingCostController) recordSinkIOInflightChange(delta int64) {
w.inner.recordSinkIOInflightChange(delta)
}
+func (w *wrappingCostController) newParallelIOMetricsRecorder() parallelIOMetricsRecorder {
+ return w.inner.newParallelIOMetricsRecorder()
+}
+
var (
metaChangefeedForwardedResolvedMessages = metric.Metadata{
Name: "changefeed.forwarded_resolved_messages",
@@ -611,11 +666,31 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
Unit: metric.Unit_COUNT,
}
metaChangefeedParallelIOQueueNanos := metric.Metadata{
- Name: "changefeed.parallel_io_queue_nanos",
- Help: "Time spent with outgoing requests to the sink waiting in queue due to inflight requests with conflicting keys",
- Measurement: "Changefeeds",
+ Name: "changefeed.parallel_io_queue_nanos",
+ Help: "Time that outgoing requests to the sink spend waiting in a queue due to" +
+ " in-flight requests with conflicting keys",
+ Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}
+ metaChangefeedParallelIOPendingRows := metric.Metadata{
+ Name: "changefeed.parallel_io_pending_rows",
+ Help: "Number of rows which are blocked from being sent due to conflicting in-flight keys",
+ Measurement: "Keys",
+ Unit: metric.Unit_COUNT,
+ }
+ metaChangefeedParallelIOResultQueueNanos := metric.Metadata{
+ Name: "changefeed.parallel_io_result_queue_nanos",
+ Help: "Time that incoming results from the sink spend waiting in parallel io emitter" +
+ " before they are acknowledged by the changefeed",
+ Measurement: "Nanoseconds",
+ Unit: metric.Unit_NANOSECONDS,
+ }
+ metaChangefeedParallelIOInFlightKeys := metric.Metadata{
+ Name: "changefeed.parallel_io_in_flight_keys",
+ Help: "The number of keys currently in-flight which may contend with batches pending to be emitted",
+ Measurement: "Keys",
+ Unit: metric.Unit_COUNT,
+ }
metaChangefeedSinkIOInflight := metric.Metadata{
Name: "changefeed.sink_io_inflight",
Help: "The number of keys currently inflight as IO requests being sent to the sink",
@@ -682,15 +757,23 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
FlushedBytes: b.Counter(metaChangefeedFlushedBytes),
Flushes: b.Counter(metaChangefeedFlushes),
SizeBasedFlushes: b.Counter(metaSizeBasedFlushes),
- ParallelIOQueueNanos: b.Histogram(metric.HistogramOptions{
+ ParallelIOPendingQueueNanos: b.Histogram(metric.HistogramOptions{
Metadata: metaChangefeedParallelIOQueueNanos,
Duration: histogramWindow,
MaxVal: changefeedIOQueueMaxLatency.Nanoseconds(),
SigFigs: 2,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
- SinkIOInflight: b.Gauge(metaChangefeedSinkIOInflight),
-
+ ParallelIOPendingRows: b.Gauge(metaChangefeedParallelIOPendingRows),
+ ParallelIOResultQueueNanos: b.Histogram(metric.HistogramOptions{
+ Metadata: metaChangefeedParallelIOResultQueueNanos,
+ Duration: histogramWindow,
+ MaxVal: changefeedIOQueueMaxLatency.Nanoseconds(),
+ SigFigs: 2,
+ BucketConfig: metric.BatchProcessLatencyBuckets,
+ }),
+ ParallelIOInFlightKeys: b.Gauge(metaChangefeedParallelIOInFlightKeys),
+ SinkIOInflight: b.Gauge(metaChangefeedSinkIOInflight),
BatchHistNanos: b.Histogram(metric.HistogramOptions{
Metadata: metaChangefeedBatchHistNanos,
Duration: histogramWindow,
@@ -768,30 +851,33 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
}
sm := &sliMetrics{
- EmittedMessages: a.EmittedMessages.AddChild(scope),
- EmittedBatchSizes: a.EmittedBatchSizes.AddChild(scope),
- FilteredMessages: a.FilteredMessages.AddChild(scope),
- MessageSize: a.MessageSize.AddChild(scope),
- EmittedBytes: a.EmittedBytes.AddChild(scope),
- FlushedBytes: a.FlushedBytes.AddChild(scope),
- BatchHistNanos: a.BatchHistNanos.AddChild(scope),
- Flushes: a.Flushes.AddChild(scope),
- FlushHistNanos: a.FlushHistNanos.AddChild(scope),
- SizeBasedFlushes: a.SizeBasedFlushes.AddChild(scope),
- ParallelIOQueueNanos: a.ParallelIOQueueNanos.AddChild(scope),
- SinkIOInflight: a.SinkIOInflight.AddChild(scope),
- CommitLatency: a.CommitLatency.AddChild(scope),
- ErrorRetries: a.ErrorRetries.AddChild(scope),
- AdmitLatency: a.AdmitLatency.AddChild(scope),
- BackfillCount: a.BackfillCount.AddChild(scope),
- BackfillPendingRanges: a.BackfillPendingRanges.AddChild(scope),
- RunningCount: a.RunningCount.AddChild(scope),
- BatchReductionCount: a.BatchReductionCount.AddChild(scope),
- InternalRetryMessageCount: a.InternalRetryMessageCount.AddChild(scope),
- SchemaRegistryRetries: a.SchemaRegistryRetries.AddChild(scope),
- SchemaRegistrations: a.SchemaRegistrations.AddChild(scope),
- LaggingRanges: a.LaggingRanges.AddChild(scope),
- CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope),
+ EmittedMessages: a.EmittedMessages.AddChild(scope),
+ EmittedBatchSizes: a.EmittedBatchSizes.AddChild(scope),
+ FilteredMessages: a.FilteredMessages.AddChild(scope),
+ MessageSize: a.MessageSize.AddChild(scope),
+ EmittedBytes: a.EmittedBytes.AddChild(scope),
+ FlushedBytes: a.FlushedBytes.AddChild(scope),
+ BatchHistNanos: a.BatchHistNanos.AddChild(scope),
+ Flushes: a.Flushes.AddChild(scope),
+ FlushHistNanos: a.FlushHistNanos.AddChild(scope),
+ SizeBasedFlushes: a.SizeBasedFlushes.AddChild(scope),
+ ParallelIOPendingQueueNanos: a.ParallelIOPendingQueueNanos.AddChild(scope),
+ ParallelIOPendingRows: a.ParallelIOPendingRows.AddChild(scope),
+ ParallelIOResultQueueNanos: a.ParallelIOResultQueueNanos.AddChild(scope),
+ ParallelIOInFlightKeys: a.ParallelIOInFlightKeys.AddChild(scope),
+ SinkIOInflight: a.SinkIOInflight.AddChild(scope),
+ CommitLatency: a.CommitLatency.AddChild(scope),
+ ErrorRetries: a.ErrorRetries.AddChild(scope),
+ AdmitLatency: a.AdmitLatency.AddChild(scope),
+ BackfillCount: a.BackfillCount.AddChild(scope),
+ BackfillPendingRanges: a.BackfillPendingRanges.AddChild(scope),
+ RunningCount: a.RunningCount.AddChild(scope),
+ BatchReductionCount: a.BatchReductionCount.AddChild(scope),
+ InternalRetryMessageCount: a.InternalRetryMessageCount.AddChild(scope),
+ SchemaRegistryRetries: a.SchemaRegistryRetries.AddChild(scope),
+ SchemaRegistrations: a.SchemaRegistrations.AddChild(scope),
+ LaggingRanges: a.LaggingRanges.AddChild(scope),
+ CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope),
}
sm.mu.resolved = make(map[int64]hlc.Timestamp)
sm.mu.checkpoint = make(map[int64]hlc.Timestamp)
diff --git a/pkg/ccl/changefeedccl/parallel_io.go b/pkg/ccl/changefeedccl/parallel_io.go
index b986da71af21..bc351bf35575 100644
--- a/pkg/ccl/changefeedccl/parallel_io.go
+++ b/pkg/ccl/changefeedccl/parallel_io.go
@@ -44,6 +44,7 @@ type parallelIO struct {
// sequential ordering of fulfillment must be enforced.
type IORequest interface {
Keys() intsets.Fast
+ NumMessages() int
}
// ioResult stores the full request that was sent as well as an error if even
@@ -51,9 +52,11 @@ type IORequest interface {
type ioResult struct {
request IORequest
err error
+ // Time representing when this result was received from the sink.
+ arrivalTime time.Time
}
-var resultPool sync.Pool = sync.Pool{
+var resultPool = sync.Pool{
New: func() interface{} {
return new(ioResult)
},
@@ -63,6 +66,7 @@ func newIOResult(req IORequest, err error) *ioResult {
res := resultPool.Get().(*ioResult)
res.request = req
res.err = err
+ res.arrivalTime = timeutil.Now()
return res
}
func freeIOResult(e *ioResult) {
@@ -110,6 +114,15 @@ func (p *parallelIO) Close() {
_ = p.wg.Wait()
}
+var testQueuingDelay = 0 * time.Second
+
+var testingEnableQueuingDelay = func() func() {
+ testQueuingDelay = 250 * time.Millisecond
+ return func() {
+ testQueuingDelay = 0 * time.Second
+ }
+}
+
// processIO starts numEmitWorkers worker threads to run the IOHandler on
// non-conflicting IORequests each retrying according to the retryOpts, then:
// - Reads incoming messages from requestCh, sending them to any worker if there
@@ -136,6 +149,14 @@ func (p *parallelIO) Close() {
// keys currently being sent, followed by checking each pending batch's intset.
func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error {
emitWithRetries := func(ctx context.Context, payload IORequest) error {
+ if testQueuingDelay > 0*time.Second {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-time.After(testQueuingDelay):
+ }
+ }
+
initialSend := true
return retry.WithMaxAttempts(ctx, p.retryOpts, p.retryOpts.MaxRetries+1, func() error {
if !initialSend {
@@ -197,12 +218,14 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error {
// in a Queue to be sent to IO workers once the conflicting requests complete.
var inflight intsets.Fast
var pending []queuedRequest
+ metricsRec := p.metrics.newParallelIOMetricsRecorder()
handleResult := func(res *ioResult) error {
if res.err == nil {
// Clear out the completed keys to check for newly valid pending requests.
- inflight.DifferenceWith(res.request.Keys())
-
+ requestKeys := res.request.Keys()
+ inflight.DifferenceWith(requestKeys)
+ metricsRec.setInFlightKeys(int64(inflight.Len()))
// Check for a pending request that is now able to be sent i.e. is not
// conflicting with any inflight requests or any requests that arrived
// earlier than itself in the pending queue.
@@ -210,8 +233,9 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error {
for i, pendingReq := range pending {
if !inflight.Intersects(pendingReq.req.Keys()) && !pendingKeys.Intersects(pendingReq.req.Keys()) {
inflight.UnionWith(pendingReq.req.Keys())
+ metricsRec.setInFlightKeys(int64(inflight.Len()))
pending = append(pending[:i], pending[i+1:]...)
- p.metrics.recordParallelIOQueueLatency(timeutil.Since(pendingReq.admitTime))
+ metricsRec.recordPendingQueuePop(int64(pendingReq.req.NumMessages()), timeutil.Since(pendingReq.admitTime))
if err := submitIO(pendingReq.req); err != nil {
return err
}
@@ -222,12 +246,17 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error {
}
}
+ // Copy the arrival time for the metrics recorder below.
+ // Otherwise, it would be possible for res to be admitted to the
+ // resultCh and freed before we read rec.arrivalTime.
+ arrivalTime := res.arrivalTime
select {
case <-ctx.Done():
return ctx.Err()
case <-p.doneCh:
return nil
case p.resultCh <- res:
+ metricsRec.recordResultQueueLatency(timeutil.Since(arrivalTime))
return nil
}
}
@@ -263,8 +292,11 @@ func (p *parallelIO) processIO(ctx context.Context, numEmitWorkers int) error {
// If a request conflicts with any currently unhandled requests, add it
// to the pending queue to be rechecked for validity later.
pending = append(pending, queuedRequest{req: req, admitTime: timeutil.Now()})
+ metricsRec.recordPendingQueuePush(int64(req.NumMessages()))
} else {
- inflight.UnionWith(req.Keys())
+ newInFlightKeys := req.Keys()
+ inflight.UnionWith(newInFlightKeys)
+ metricsRec.setInFlightKeys(int64(inflight.Len()))
if err := submitIO(req); err != nil {
return err
}
diff --git a/pkg/ccl/changefeedccl/telemetry.go b/pkg/ccl/changefeedccl/telemetry.go
index dc4959d6edd8..8ca6badcbad5 100644
--- a/pkg/ccl/changefeedccl/telemetry.go
+++ b/pkg/ccl/changefeedccl/telemetry.go
@@ -208,14 +208,14 @@ func (r *telemetryMetricsRecorder) recordSizeBasedFlush() {
r.inner.recordSizeBasedFlush()
}
-func (r *telemetryMetricsRecorder) recordParallelIOQueueLatency(latency time.Duration) {
- r.inner.recordParallelIOQueueLatency(latency)
-}
-
func (r *telemetryMetricsRecorder) recordSinkIOInflightChange(delta int64) {
r.inner.recordSinkIOInflightChange(delta)
}
+func (r *telemetryMetricsRecorder) newParallelIOMetricsRecorder() parallelIOMetricsRecorder {
+ return r.inner.newParallelIOMetricsRecorder()
+}
+
// continuousTelemetryInterval determines the interval at which each node emits telemetry events
// during the lifespan of each enterprise changefeed.
var continuousTelemetryInterval = settings.RegisterDurationSetting(
diff --git a/pkg/ccl/changefeedccl/testfeed_test.go b/pkg/ccl/changefeedccl/testfeed_test.go
index fc7d16beedc7..35a1f8662f71 100644
--- a/pkg/ccl/changefeedccl/testfeed_test.go
+++ b/pkg/ccl/changefeedccl/testfeed_test.go
@@ -2314,6 +2314,7 @@ func (ps *fakePubsubServer) React(req interface{}) (handled bool, ret interface{
if ok {
ps.mu.Lock()
defer ps.mu.Unlock()
+
for _, msg := range publishReq.Messages {
ps.mu.buffer = append(ps.mu.buffer, mockPubsubMessage{data: string(msg.Data), topic: publishReq.Topic})
}