Skip to content

Commit

Permalink
metrics(ticdc): changefeed metrics add namespace label (#5311)
Browse files Browse the repository at this point in the history
ref #5301
  • Loading branch information
sdojjy authored May 6, 2022
1 parent fbbf359 commit 3e8e246
Show file tree
Hide file tree
Showing 38 changed files with 317 additions and 230 deletions.
4 changes: 2 additions & 2 deletions cdc/entry/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,14 @@ var (
Name: "unmarshal_and_mount",
Help: "Bucketed histogram of processing time (s) of unmarshal and mount in mounter.",
Buckets: prometheus.ExponentialBuckets(0.000001, 10, 10),
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
totalRowsCountGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "mounter",
Name: "total_rows_count",
Help: "The total count of rows that are processed by mounter",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
)

// InitMetrics registers all metrics in this file
Expand Down
14 changes: 8 additions & 6 deletions cdc/entry/mounter.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,14 @@ func NewMounter(schemaStorage SchemaStorage,
enableOldValue bool,
) Mounter {
return &mounterImpl{
schemaStorage: schemaStorage,
changefeedID: changefeedID,
enableOldValue: enableOldValue,
metricMountDuration: mountDuration.WithLabelValues(changefeedID.ID),
metricTotalRows: totalRowsCountGauge.WithLabelValues(changefeedID.ID),
tz: tz,
schemaStorage: schemaStorage,
changefeedID: changefeedID,
enableOldValue: enableOldValue,
metricMountDuration: mountDuration.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
metricTotalRows: totalRowsCountGauge.
WithLabelValues(changefeedID.Namespace, changefeedID.ID),
tz: tz,
}
}

Expand Down
3 changes: 2 additions & 1 deletion cdc/kv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1171,7 +1171,8 @@ func (s *eventFeedSession) receiveFromStream(
}()

changefeedID := contextutil.ChangefeedIDFromCtx(ctx)
metricSendEventBatchResolvedSize := batchResolvedEventSize.WithLabelValues(changefeedID.ID)
metricSendEventBatchResolvedSize := batchResolvedEventSize.
WithLabelValues(changefeedID.Namespace, changefeedID.ID)

// always create a new region worker, because `receiveFromStream` is ensured
// to call exactly once from outter code logic
Expand Down
10 changes: 5 additions & 5 deletions cdc/kv/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,14 +57,14 @@ var (
Subsystem: "kvclient",
Name: "pull_event_count",
Help: "event count received by this puller",
}, []string{"type", "changefeed"})
}, []string{"type", "namespace", "changefeed"})
sendEventCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "kvclient",
Name: "send_event_count",
Help: "event count sent to event channel by this puller",
}, []string{"type", "changefeed"})
}, []string{"type", "namespace", "changefeed"})
clientChannelSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Expand All @@ -78,22 +78,22 @@ var (
Subsystem: "kvclient",
Name: "region_token",
Help: "size of region token in kv client",
}, []string{"store", "changefeed"})
}, []string{"store", "namespace", "changefeed"})
cachedRegionSize = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "kvclient",
Name: "cached_region",
Help: "cached region that has not requested to TiKV in kv client",
}, []string{"store", "changefeed"})
}, []string{"store", "namespace", "changefeed"})
batchResolvedEventSize = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "kvclient",
Name: "batch_resolved_event_size",
Help: "The number of region in one batch resolved ts event",
Buckets: prometheus.ExponentialBuckets(2, 2, 16),
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
grpcPoolStreamGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Expand Down
16 changes: 8 additions & 8 deletions cdc/kv/region_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,21 +179,21 @@ func (w *regionWorker) initMetrics(ctx context.Context) {
metrics.metricReceivedEventSize = eventSize.WithLabelValues("received")
metrics.metricDroppedEventSize = eventSize.WithLabelValues("dropped")
metrics.metricPullEventInitializedCounter = pullEventCounter.
WithLabelValues(cdcpb.Event_INITIALIZED.String(), changefeedID.ID)
WithLabelValues(cdcpb.Event_INITIALIZED.String(), changefeedID.Namespace, changefeedID.ID)
metrics.metricPullEventCommittedCounter = pullEventCounter.
WithLabelValues(cdcpb.Event_COMMITTED.String(), changefeedID.ID)
WithLabelValues(cdcpb.Event_COMMITTED.String(), changefeedID.Namespace, changefeedID.ID)
metrics.metricPullEventCommitCounter = pullEventCounter.
WithLabelValues(cdcpb.Event_COMMIT.String(), changefeedID.ID)
WithLabelValues(cdcpb.Event_COMMIT.String(), changefeedID.Namespace, changefeedID.ID)
metrics.metricPullEventPrewriteCounter = pullEventCounter.
WithLabelValues(cdcpb.Event_PREWRITE.String(), changefeedID.ID)
WithLabelValues(cdcpb.Event_PREWRITE.String(), changefeedID.Namespace, changefeedID.ID)
metrics.metricPullEventRollbackCounter = pullEventCounter.
WithLabelValues(cdcpb.Event_ROLLBACK.String(), changefeedID.ID)
WithLabelValues(cdcpb.Event_ROLLBACK.String(), changefeedID.Namespace, changefeedID.ID)
metrics.metricSendEventResolvedCounter = sendEventCounter.
WithLabelValues("native-resolved", changefeedID.ID)
WithLabelValues("native-resolved", changefeedID.Namespace, changefeedID.ID)
metrics.metricSendEventCommitCounter = sendEventCounter.
WithLabelValues("commit", changefeedID.ID)
WithLabelValues("commit", changefeedID.Namespace, changefeedID.ID)
metrics.metricSendEventCommittedCounter = sendEventCounter.
WithLabelValues("committed", changefeedID.ID)
WithLabelValues("committed", changefeedID.Namespace, changefeedID.ID)

w.metrics = metrics
}
Expand Down
8 changes: 5 additions & 3 deletions cdc/kv/token_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (r *sizedRegionRouter) AddRegion(sri singleRegionInfo) {
r.buffer[id] = append(r.buffer[id], sri)
if _, ok := r.metrics.cachedRegions[id]; !ok {
r.metrics.cachedRegions[id] = cachedRegionSize.
WithLabelValues(id, r.metrics.changefeed.ID)
WithLabelValues(id, r.metrics.changefeed.Namespace, r.metrics.changefeed.ID)
}
r.metrics.cachedRegions[id].Inc()
}
Expand All @@ -116,7 +116,8 @@ func (r *sizedRegionRouter) Acquire(id string) {
defer r.lock.Unlock()
r.tokens[id]++
if _, ok := r.metrics.tokens[id]; !ok {
r.metrics.tokens[id] = clientRegionTokenSize.WithLabelValues(id, r.metrics.changefeed.ID)
r.metrics.tokens[id] = clientRegionTokenSize.
WithLabelValues(id, r.metrics.changefeed.Namespace, r.metrics.changefeed.ID)
}
r.metrics.tokens[id].Inc()
}
Expand All @@ -128,7 +129,8 @@ func (r *sizedRegionRouter) Release(id string) {
defer r.lock.Unlock()
r.tokens[id]--
if _, ok := r.metrics.tokens[id]; !ok {
r.metrics.tokens[id] = clientRegionTokenSize.WithLabelValues(id, r.metrics.changefeed.ID)
r.metrics.tokens[id] = clientRegionTokenSize.
WithLabelValues(id, r.metrics.changefeed.Namespace, r.metrics.changefeed.ID)
}
r.metrics.tokens[id].Dec()
}
Expand Down
24 changes: 12 additions & 12 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,17 +354,17 @@ LOOP:

// init metrics
c.metricsChangefeedBarrierTsGauge = changefeedBarrierTsGauge.
WithLabelValues(c.id.ID)
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedCheckpointTsGauge = changefeedCheckpointTsGauge.
WithLabelValues(c.id.ID)
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedCheckpointTsLagGauge = changefeedCheckpointTsLagGauge.
WithLabelValues(c.id.ID)
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedResolvedTsGauge = changefeedResolvedTsGauge.
WithLabelValues(c.id.ID)
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedResolvedTsLagGauge = changefeedResolvedTsLagGauge.
WithLabelValues(c.id.ID)
WithLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedTickDuration = changefeedTickDuration.
WithLabelValues(c.id.ID)
WithLabelValues(c.id.Namespace, c.id.ID)

// create scheduler
c.scheduler, err = c.newScheduler(ctx, checkpointTs)
Expand Down Expand Up @@ -402,20 +402,20 @@ func (c *changefeed) releaseResources(ctx cdcContext.Context) {
c.wg.Wait()
c.scheduler.Close(ctx)

changefeedCheckpointTsGauge.DeleteLabelValues(c.id.ID)
changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id.ID)
changefeedCheckpointTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
changefeedCheckpointTsLagGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedCheckpointTsGauge = nil
c.metricsChangefeedCheckpointTsLagGauge = nil

changefeedResolvedTsGauge.DeleteLabelValues(c.id.ID)
changefeedResolvedTsLagGauge.DeleteLabelValues(c.id.ID)
changefeedResolvedTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
changefeedResolvedTsLagGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedResolvedTsGauge = nil
c.metricsChangefeedResolvedTsLagGauge = nil

changefeedTickDuration.DeleteLabelValues(c.id.ID)
changefeedTickDuration.DeleteLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedTickDuration = nil

changefeedBarrierTsGauge.DeleteLabelValues(c.id.ID)
changefeedBarrierTsGauge.DeleteLabelValues(c.id.Namespace, c.id.ID)
c.metricsChangefeedBarrierTsGauge = nil

c.initialized = false
Expand Down
16 changes: 8 additions & 8 deletions cdc/owner/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,36 +26,36 @@ var (
Subsystem: "owner",
Name: "barrier_ts",
Help: "barrier ts of changefeeds",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})

changefeedCheckpointTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "checkpoint_ts",
Help: "checkpoint ts of changefeeds",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
changefeedCheckpointTsLagGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "checkpoint_ts_lag",
Help: "checkpoint ts lag of changefeeds in seconds",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
changefeedResolvedTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "resolved_ts",
Help: "resolved ts of changefeeds",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
changefeedResolvedTsLagGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "resolved_ts_lag",
Help: "resolved ts lag of changefeeds in seconds",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
ownershipCounter = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: "ticdc",
Expand All @@ -69,22 +69,22 @@ var (
Subsystem: "owner",
Name: "maintain_table_num",
Help: "number of replicated tables maintained in owner",
}, []string{"changefeed", "capture", "type"})
}, []string{"namespace", "changefeed", "capture", "type"})
changefeedStatusGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "status",
Help: "The status of changefeeds",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
changefeedTickDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "owner",
Name: "changefeed_tick_duration",
Help: "Bucketed histogram of owner tick changefeed reactor time (s).",
Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18),
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
changefeedCloseDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "ticdc",
Expand Down
14 changes: 8 additions & 6 deletions cdc/owner/owner.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ func (o *ownerImpl) updateMetrics(state *orchestrator.GlobalReactorState) {
if conf.Debug != nil && conf.Debug.EnableNewScheduler {
for cfID, cf := range o.changefeeds {
if cf.state != nil && cf.state.Info != nil {
changefeedStatusGauge.WithLabelValues(cfID.ID).
changefeedStatusGauge.WithLabelValues(cfID.Namespace, cfID.ID).
Set(float64(cf.state.Info.State.ToInt()))
}

Expand All @@ -374,11 +374,11 @@ func (o *ownerImpl) updateMetrics(state *orchestrator.GlobalReactorState) {

for captureID, info := range o.captures {
ownerMaintainTableNumGauge.
WithLabelValues(cfID.ID,
WithLabelValues(cfID.Namespace, cfID.ID,
info.AdvertiseAddr, maintainTableTypeTotal).
Set(float64(totalCounts[captureID]))
ownerMaintainTableNumGauge.
WithLabelValues(cfID.ID,
WithLabelValues(cfID.Namespace, cfID.ID,
info.AdvertiseAddr, maintainTableTypeWip).
Set(float64(pendingCounts[captureID]))
}
Expand All @@ -393,13 +393,15 @@ func (o *ownerImpl) updateMetrics(state *orchestrator.GlobalReactorState) {
continue
}
ownerMaintainTableNumGauge.
WithLabelValues(changefeedID.ID, captureInfo.AdvertiseAddr, maintainTableTypeTotal).
WithLabelValues(changefeedID.Namespace, changefeedID.ID,
captureInfo.AdvertiseAddr, maintainTableTypeTotal).
Set(float64(len(taskStatus.Tables)))
ownerMaintainTableNumGauge.
WithLabelValues(changefeedID.ID, captureInfo.AdvertiseAddr, maintainTableTypeWip).
WithLabelValues(changefeedID.Namespace, changefeedID.ID,
captureInfo.AdvertiseAddr, maintainTableTypeWip).
Set(float64(len(taskStatus.Operation)))
if changefeedState.Info != nil {
changefeedStatusGauge.WithLabelValues(changefeedID.ID).
changefeedStatusGauge.WithLabelValues(changefeedID.Namespace, changefeedID.ID).
Set(float64(changefeedState.Info.State.ToInt()))
}
}
Expand Down
20 changes: 10 additions & 10 deletions cdc/processor/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,71 +24,71 @@ var (
Subsystem: "processor",
Name: "resolved_ts",
Help: "local resolved ts of processor",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
resolvedTsLagGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "resolved_ts_lag",
Help: "local resolved ts lag of processor",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
resolvedTsMinTableIDGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "min_resolved_table_id",
Help: "ID of the minimum resolved table",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
checkpointTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "checkpoint_ts",
Help: "global checkpoint ts of processor",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
checkpointTsLagGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "checkpoint_ts_lag",
Help: "global checkpoint ts lag of processor",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
checkpointTsMinTableIDGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "min_checkpoint_table_id",
Help: "ID of the minimum checkpoint table",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
syncTableNumGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "num_of_tables",
Help: "number of synchronized table of processor",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
processorErrorCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "exit_with_error_count",
Help: "counter for processor exits with error",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
processorSchemaStorageGcTsGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "schema_storage_gc_ts",
Help: "the TS of the currently maintained oldest snapshot in SchemaStorage",
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
processorTickDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "processor",
Name: "processor_tick_duration",
Help: "Bucketed histogram of processorManager tick processor time (s).",
Buckets: prometheus.ExponentialBuckets(0.01 /* 10 ms */, 2, 18),
}, []string{"changefeed"})
}, []string{"namespace", "changefeed"})
processorCloseDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "ticdc",
Expand Down
Loading

0 comments on commit 3e8e246

Please sign in to comment.