diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index 9513dad4d048..c27f2db93502 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -46,9 +46,10 @@ type Consumer struct { deadlockDetector deadlockDetector - partitionIDToState map[int32]*consumerState - partitionMapLock sync.Mutex - partitionsHeld metrics.Counter + partitionIDToState map[int32]*consumerState + partitionMapLock sync.Mutex + partitionsHeld int64 + partitionsHeldGauge metrics.Gauge } type consumerState struct { @@ -60,13 +61,13 @@ type consumerState struct { func New(params Params) (*Consumer, error) { deadlockDetector := newDeadlockDetector(params.MetricsFactory, params.Logger, params.DeadlockCheckInterval) return &Consumer{ - metricsFactory: params.MetricsFactory, - logger: params.Logger, - internalConsumer: params.InternalConsumer, - processorFactory: params.ProcessorFactory, - deadlockDetector: deadlockDetector, - partitionIDToState: make(map[int32]*consumerState), - partitionsHeld: partitionsHeld(params.MetricsFactory), + metricsFactory: params.MetricsFactory, + logger: params.Logger, + internalConsumer: params.InternalConsumer, + processorFactory: params.ProcessorFactory, + deadlockDetector: deadlockDetector, + partitionIDToState: make(map[int32]*consumerState), + partitionsHeldGauge: partitionsHeldGauge(params.MetricsFactory), }, nil } @@ -109,8 +110,12 @@ func (c *Consumer) Close() error { func (c *Consumer) handleMessages(pc sc.PartitionConsumer) { c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition())) - c.partitionsHeld.Inc(1) - defer c.partitionsHeld.Inc(-1) + c.partitionsHeld++ + c.partitionsHeldGauge.Update(c.partitionsHeld) + defer func() { + c.partitionsHeld-- + c.partitionsHeldGauge.Update(c.partitionsHeld) + }() c.partitionMapLock.Lock() wg := &c.partitionIDToState[pc.Partition()].wg c.partitionMapLock.Unlock() diff --git a/cmd/ingester/app/consumer/consumer_metrics.go b/cmd/ingester/app/consumer/consumer_metrics.go index 2653dd125815..bea13be346d3 100644 --- a/cmd/ingester/app/consumer/consumer_metrics.go +++ b/cmd/ingester/app/consumer/consumer_metrics.go @@ -61,6 +61,6 @@ func (c *Consumer) partitionMetrics(partition int32) partitionMetrics { startCounter: f.Counter(metrics.Options{Name: "partition-start", Tags: nil})} } -func partitionsHeld(metricsFactory metrics.Factory) metrics.Counter { - return metricsFactory.Namespace(metrics.NSOptions{Name: consumerNamespace, Tags: nil}).Counter(metrics.Options{Name: "partitions-held", Tags: nil}) +func partitionsHeldGauge(metricsFactory metrics.Factory) metrics.Gauge { + return metricsFactory.Namespace(metrics.NSOptions{Name: consumerNamespace, Tags: nil}).Gauge(metrics.Options{Name: "partitions-held", Tags: nil}) } diff --git a/cmd/ingester/app/consumer/consumer_test.go b/cmd/ingester/app/consumer/consumer_test.go index 449d69c4c198..048524ba15a6 100644 --- a/cmd/ingester/app/consumer/consumer_test.go +++ b/cmd/ingester/app/consumer/consumer_test.go @@ -90,12 +90,12 @@ func newConsumer( logger, _ := zap.NewDevelopment() return &Consumer{ - metricsFactory: metricsFactory, - logger: logger, - internalConsumer: consumer, - partitionIDToState: make(map[int32]*consumerState), - partitionsHeld: partitionsHeld(metricsFactory), - deadlockDetector: newDeadlockDetector(metricsFactory, logger, time.Second), + metricsFactory: metricsFactory, + logger: logger, + internalConsumer: consumer, + partitionIDToState: make(map[int32]*consumerState), + partitionsHeldGauge: partitionsHeldGauge(metricsFactory), + deadlockDetector: newDeadlockDetector(metricsFactory, logger, time.Second), processorFactory: ProcessorFactory{ topic: topic,