From fa84803c3f45323ea4361b94b9fe771f356d48ad Mon Sep 17 00:00:00 2001 From: Ivan Babrou Date: Mon, 22 Apr 2019 23:55:38 -0700 Subject: [PATCH 1/2] Switch from counter to a gauge for partitions held Counters cannot be decremented in Prometheus: ``` panic: counter cannot decrease in value goroutine 895 [running]: github.com/jaegertracing/jaeger/vendor/github.com/prometheus/client_golang/prometheus.(*counter).Add(0xc000790600, 0xbff0000000000000) /home/travis/gopath/src/github.com/jaegertracing/jaeger/vendor/github.com/prometheus/client_golang/prometheus/counter.go:71 +0xa3 github.com/jaegertracing/jaeger/vendor/github.com/uber/jaeger-lib/metrics/prometheus.(*counter).Inc(0xc0006b42a0, 0xffffffffffffffff) /home/travis/gopath/src/github.com/jaegertracing/jaeger/vendor/github.com/uber/jaeger-lib/metrics/prometheus/factory.go:183 +0x46 github.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*Consumer).handleMessages(0xc0004c4300, 0xf08c60, 0xc00054e630) /home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/consumer.go:124 +0x893 created by github.com/jaegertracing/jaeger/cmd/ingester/app/consumer.(*Consumer).Start.func1 /home/travis/gopath/src/github.com/jaegertracing/jaeger/cmd/ingester/app/consumer/consumer.go:87 +0xbd ``` Gauges can, even though we have to keep an extra variable around to keep count. In Prometheus Go library itself that is not necessary as Gauge type provides `Inc` and `Dec`, but Jaeger's wrapper does not have those exposed. Fixes #1200. Signed-off-by: Ivan Babrou --- cmd/ingester/app/consumer/consumer.go | 29 +++++++++++-------- cmd/ingester/app/consumer/consumer_metrics.go | 4 +-- cmd/ingester/app/consumer/consumer_test.go | 14 ++++----- 3 files changed, 26 insertions(+), 21 deletions(-) diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index 9513dad4d04..c27f2db9350 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -46,9 +46,10 @@ type Consumer struct { deadlockDetector deadlockDetector - partitionIDToState map[int32]*consumerState - partitionMapLock sync.Mutex - partitionsHeld metrics.Counter + partitionIDToState map[int32]*consumerState + partitionMapLock sync.Mutex + partitionsHeld int64 + partitionsHeldGauge metrics.Gauge } type consumerState struct { @@ -60,13 +61,13 @@ type consumerState struct { func New(params Params) (*Consumer, error) { deadlockDetector := newDeadlockDetector(params.MetricsFactory, params.Logger, params.DeadlockCheckInterval) return &Consumer{ - metricsFactory: params.MetricsFactory, - logger: params.Logger, - internalConsumer: params.InternalConsumer, - processorFactory: params.ProcessorFactory, - deadlockDetector: deadlockDetector, - partitionIDToState: make(map[int32]*consumerState), - partitionsHeld: partitionsHeld(params.MetricsFactory), + metricsFactory: params.MetricsFactory, + logger: params.Logger, + internalConsumer: params.InternalConsumer, + processorFactory: params.ProcessorFactory, + deadlockDetector: deadlockDetector, + partitionIDToState: make(map[int32]*consumerState), + partitionsHeldGauge: partitionsHeldGauge(params.MetricsFactory), }, nil } @@ -109,8 +110,12 @@ func (c *Consumer) Close() error { func (c *Consumer) handleMessages(pc sc.PartitionConsumer) { c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition())) - c.partitionsHeld.Inc(1) - defer c.partitionsHeld.Inc(-1) + c.partitionsHeld++ + c.partitionsHeldGauge.Update(c.partitionsHeld) + defer func() { + c.partitionsHeld-- + c.partitionsHeldGauge.Update(c.partitionsHeld) + }() c.partitionMapLock.Lock() wg := &c.partitionIDToState[pc.Partition()].wg c.partitionMapLock.Unlock() diff --git a/cmd/ingester/app/consumer/consumer_metrics.go b/cmd/ingester/app/consumer/consumer_metrics.go index 2653dd12581..bea13be346d 100644 --- a/cmd/ingester/app/consumer/consumer_metrics.go +++ b/cmd/ingester/app/consumer/consumer_metrics.go @@ -61,6 +61,6 @@ func (c *Consumer) partitionMetrics(partition int32) partitionMetrics { startCounter: f.Counter(metrics.Options{Name: "partition-start", Tags: nil})} } -func partitionsHeld(metricsFactory metrics.Factory) metrics.Counter { - return metricsFactory.Namespace(metrics.NSOptions{Name: consumerNamespace, Tags: nil}).Counter(metrics.Options{Name: "partitions-held", Tags: nil}) +func partitionsHeldGauge(metricsFactory metrics.Factory) metrics.Gauge { + return metricsFactory.Namespace(metrics.NSOptions{Name: consumerNamespace, Tags: nil}).Gauge(metrics.Options{Name: "partitions-held", Tags: nil}) } diff --git a/cmd/ingester/app/consumer/consumer_test.go b/cmd/ingester/app/consumer/consumer_test.go index 449d69c4c19..263ba45ac5a 100644 --- a/cmd/ingester/app/consumer/consumer_test.go +++ b/cmd/ingester/app/consumer/consumer_test.go @@ -90,12 +90,12 @@ func newConsumer( logger, _ := zap.NewDevelopment() return &Consumer{ - metricsFactory: metricsFactory, - logger: logger, - internalConsumer: consumer, - partitionIDToState: make(map[int32]*consumerState), - partitionsHeld: partitionsHeld(metricsFactory), - deadlockDetector: newDeadlockDetector(metricsFactory, logger, time.Second), + metricsFactory: metricsFactory, + logger: logger, + internalConsumer: consumer, + partitionIDToState: make(map[int32]*consumerState), + partitionsHeldGauge: partitionsHeldGauge(metricsFactory), + deadlockDetector: newDeadlockDetector(metricsFactory, logger, time.Second), processorFactory: ProcessorFactory{ topic: topic, @@ -153,7 +153,7 @@ func TestSaramaConsumerWrapper_start_Messages(t *testing.T) { mc.YieldMessage(msg) isProcessed.Wait() - localFactory.AssertCounterMetrics(t, metricstest.ExpectedMetric{ + localFactory.AssertGaugeMetrics(t, metricstest.ExpectedMetric{ Name: "sarama-consumer.partitions-held", Value: 1, }) From 258204028c94ef6b141a4fcfec2f3263ee95887c Mon Sep 17 00:00:00 2001 From: Ivan Babrou Date: Tue, 23 Apr 2019 08:27:17 -0700 Subject: [PATCH 2/2] Protect partitionsHeld in consumer by lock Signed-off-by: Ivan Babrou --- cmd/ingester/app/consumer/consumer.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/cmd/ingester/app/consumer/consumer.go b/cmd/ingester/app/consumer/consumer.go index c27f2db9350..64bfa4ec37d 100644 --- a/cmd/ingester/app/consumer/consumer.go +++ b/cmd/ingester/app/consumer/consumer.go @@ -110,17 +110,19 @@ func (c *Consumer) Close() error { func (c *Consumer) handleMessages(pc sc.PartitionConsumer) { c.logger.Info("Starting message handler", zap.Int32("partition", pc.Partition())) + c.partitionMapLock.Lock() c.partitionsHeld++ c.partitionsHeldGauge.Update(c.partitionsHeld) + wg := &c.partitionIDToState[pc.Partition()].wg + c.partitionMapLock.Unlock() defer func() { + c.closePartition(pc) + wg.Done() + c.partitionMapLock.Lock() c.partitionsHeld-- c.partitionsHeldGauge.Update(c.partitionsHeld) + c.partitionMapLock.Unlock() }() - c.partitionMapLock.Lock() - wg := &c.partitionIDToState[pc.Partition()].wg - c.partitionMapLock.Unlock() - defer wg.Done() - defer c.closePartition(pc) msgMetrics := c.newMsgMetrics(pc.Partition())