Skip to content

Commit

Permalink
Switch from counter to a gauge for partitions held
Browse files Browse the repository at this point in the history
Counters cannot be decremented in Prometheus:

```
panic: counter cannot decrease in value

goroutine 895 [running]:
github.com/jaegertracing/jaeger/vendor/github.com/prometheus/client_golang/prometheus.(*counter).Add(0xc000790600, 0xbff0000000000000)
    /home/travis/gopath/src/github.com/jaegertracing/jaeger/vendor/github.com/prometheus/client_golang/prometheus/counter.go:71 +0xa3
github.com/jaegertracing/jaeger/vendor/github.com/uber/jaeger-lib/metrics/prometheus.(*counter).Inc(0xc0006b42a0, 0xffffffffffffffff)
    /home/travis/gopath/src/github.com/jaegertracing/jaeger/vendor/github.com/uber/jaeger-lib/metrics/prometheus/factory.go:183 +0x46
github.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*Consumer).handleMessages(0xc0004c4300, 0xf08c60, 0xc00054e630)
    /home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/consumer.go:124 +0x893
created by github.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*Consumer).Start.func1
    /home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/consumer.go:87 +0xbd
```

Gauges can, even though we have to keep an extra variable
around to keep count. In Prometheus Go library itself that
is not necessary as Gauge type provides `Inc` and `Dec`,
but Jaeger's wrapper does not have those exposed.

Fixes #1200.

Signed-off-by: Ivan Babrou <[email protected]>
  • Loading branch information
bobrik committed Apr 23, 2019
1 parent 5b8c1f4 commit 5e32ed2
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 20 deletions.
29 changes: 17 additions & 12 deletions cmd/ingester/app/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,10 @@ type Consumer struct {

deadlockDetector deadlockDetector

partitionIDToState map[int32]*consumerState
partitionMapLock sync.Mutex
partitionsHeld metrics.Counter
partitionIDToState map[int32]*consumerState
partitionMapLock sync.Mutex
partitionsHeld int64
partitionsHeldGauge metrics.Gauge
}

type consumerState struct {
Expand All @@ -60,13 +61,13 @@ type consumerState struct {
func New(params Params) (*Consumer, error) {
deadlockDetector := newDeadlockDetector(params.MetricsFactory, params.Logger, params.DeadlockCheckInterval)
return &Consumer{
metricsFactory: params.MetricsFactory,
logger: params.Logger,
internalConsumer: params.InternalConsumer,
processorFactory: params.ProcessorFactory,
deadlockDetector: deadlockDetector,
partitionIDToState: make(map[int32]*consumerState),
partitionsHeld: partitionsHeld(params.MetricsFactory),
metricsFactory: params.MetricsFactory,
logger: params.Logger,
internalConsumer: params.InternalConsumer,
processorFactory: params.ProcessorFactory,
deadlockDetector: deadlockDetector,
partitionIDToState: make(map[int32]*consumerState),
partitionsHeldGauge: partitionsHeldGauge(params.MetricsFactory),
}, nil
}

Expand Down Expand Up @@ -109,8 +110,12 @@ func (c *Consumer) Close() error {

func (c *Consumer) handleMessages(pc sc.PartitionConsumer) {
c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition()))
c.partitionsHeld.Inc(1)
defer c.partitionsHeld.Inc(-1)
c.partitionsHeld++
c.partitionsHeldGauge.Update(c.partitionsHeld)
defer func() {
c.partitionsHeld--
c.partitionsHeldGauge.Update(c.partitionsHeld)
}()
c.partitionMapLock.Lock()
wg := &c.partitionIDToState[pc.Partition()].wg
c.partitionMapLock.Unlock()
Expand Down
4 changes: 2 additions & 2 deletions cmd/ingester/app/consumer/consumer_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,6 @@ func (c *Consumer) partitionMetrics(partition int32) partitionMetrics {
startCounter: f.Counter(metrics.Options{Name: "partition-start", Tags: nil})}
}

func partitionsHeld(metricsFactory metrics.Factory) metrics.Counter {
return metricsFactory.Namespace(metrics.NSOptions{Name: consumerNamespace, Tags: nil}).Counter(metrics.Options{Name: "partitions-held", Tags: nil})
func partitionsHeldGauge(metricsFactory metrics.Factory) metrics.Gauge {
return metricsFactory.Namespace(metrics.NSOptions{Name: consumerNamespace, Tags: nil}).Gauge(metrics.Options{Name: "partitions-held", Tags: nil})
}
12 changes: 6 additions & 6 deletions cmd/ingester/app/consumer/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,12 +90,12 @@ func newConsumer(

logger, _ := zap.NewDevelopment()
return &Consumer{
metricsFactory: metricsFactory,
logger: logger,
internalConsumer: consumer,
partitionIDToState: make(map[int32]*consumerState),
partitionsHeld: partitionsHeld(metricsFactory),
deadlockDetector: newDeadlockDetector(metricsFactory, logger, time.Second),
metricsFactory: metricsFactory,
logger: logger,
internalConsumer: consumer,
partitionIDToState: make(map[int32]*consumerState),
partitionsHeldGauge: partitionsHeldGauge(metricsFactory),
deadlockDetector: newDeadlockDetector(metricsFactory, logger, time.Second),

processorFactory: ProcessorFactory{
topic: topic,
Expand Down

0 comments on commit 5e32ed2

Please sign in to comment.