Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[lockrenewer] Add prom metric for lock renewal timeouts #244

Merged
merged 2 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions v2/lockrenewer.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ func (plr *peekLockRenewer) renewMessageLock(ctx context.Context, message *azser
if !errors.Is(renewErr, context.DeadlineExceeded) {
return renewErr
}
// renewErr is context.DeadlineExceeded, increment metric and retry
plr.metrics.IncMessageLockRenewalTimeoutCount(message)
}
// lock is expired or message context is done
if ctx.Err() != nil {
Expand Down
37 changes: 30 additions & 7 deletions v2/metrics/processor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,14 @@ func NewRegistry() *Registry {
Help: "total number of message lock renewal",
Subsystem: subsystem,
}, []string{messageTypeLabel, successLabel}),
MessageLockRenewalTimeoutCount: prom.NewCounterVec(prom.CounterOpts{
Name: "message_lock_renewal_timeout_total",
Help: "total number of message lock renewal calls that timed out",
Subsystem: subsystem,
}, []string{messageTypeLabel}),
MessageDeadlineReachedCount: prom.NewCounterVec(prom.CounterOpts{
Name: "message_deadline_reached_total",
Help: "total number of message lock renewal",
Help: "total number of message lock renewal that reached deadline",
Subsystem: subsystem,
}, []string{messageTypeLabel}),
HealthCheckCount: prom.NewCounterVec(prom.CounterOpts{
Expand Down Expand Up @@ -76,19 +81,21 @@ func (m *Registry) Init(reg prom.Registerer) {
m.MessageReceivedCount,
m.MessageHandledCount,
m.MessageLockRenewedCount,
m.MessageLockRenewalTimeoutCount,
m.MessageDeadlineReachedCount,
m.HealthCheckCount,
m.ConcurrentMessageCount)
}

// Registry provides the prometheus metrics for the message processor
type Registry struct {
MessageReceivedCount *prom.CounterVec
MessageHandledCount *prom.CounterVec
MessageLockRenewedCount *prom.CounterVec
MessageDeadlineReachedCount *prom.CounterVec
HealthCheckCount *prom.CounterVec
ConcurrentMessageCount *prom.GaugeVec
MessageReceivedCount *prom.CounterVec
MessageHandledCount *prom.CounterVec
MessageLockRenewedCount *prom.CounterVec
MessageLockRenewalTimeoutCount *prom.CounterVec
MessageDeadlineReachedCount *prom.CounterVec
HealthCheckCount *prom.CounterVec
ConcurrentMessageCount *prom.GaugeVec
}

// Recorder allows to initialize the metric registry and increase/decrease the registered metrics at runtime.
Expand All @@ -97,6 +104,7 @@ type Recorder interface {
IncMessageDeadlineReachedCount(msg *azservicebus.ReceivedMessage)
IncMessageLockRenewedFailure(msg *azservicebus.ReceivedMessage)
IncMessageLockRenewedSuccess(msg *azservicebus.ReceivedMessage)
IncMessageLockRenewalTimeoutCount(msg *azservicebus.ReceivedMessage)
IncMessageHandled(receiverName string, msg *azservicebus.ReceivedMessage)
IncMessageReceived(receiverName string, count float64)
IncHealthCheckSuccessCount(namespace, entity, subscription string)
Expand Down Expand Up @@ -141,6 +149,12 @@ func (m *Registry) DecConcurrentMessageCount(receiverName string, msg *azservice
m.ConcurrentMessageCount.With(labels).Dec()
}

// IncMessageDeadlineReachedCount increases the message deadline reached counter
func (m *Registry) IncMessageLockRenewalTimeoutCount(msg *azservicebus.ReceivedMessage) {
labels := getMessageTypeLabel(msg)
m.MessageLockRenewalTimeoutCount.With(labels).Inc()
}

// IncMessageDeadlineReachedCount increases the message deadline reached counter
func (m *Registry) IncMessageDeadlineReachedCount(msg *azservicebus.ReceivedMessage) {
labels := getMessageTypeLabel(msg)
Expand Down Expand Up @@ -201,6 +215,15 @@ func (i *Informer) GetMessageLockRenewedFailureCount() (float64, error) {
return total, nil
}

// GetMessageLockRenewalTimeoutCount retrieves the current value of the MessageLockRenewalTimeoutCount metric
func (i *Informer) GetMessageLockRenewalTimeoutCount() (float64, error) {
var total float64
common.Collect(i.registry.MessageLockRenewalTimeoutCount, func(m *dto.Metric) {
total += m.GetCounter().GetValue()
})
return total, nil
}

// GetHealthCheckSuccessCount retrieves the current value of the HealthCheckSuccessCount metric
func (i *Informer) GetHealthCheckSuccessCount(namespace, entity, subscription string) (float64, error) {
var total float64
Expand Down
12 changes: 11 additions & 1 deletion v2/metrics/processor/types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestRegistry_Init(t *testing.T) {
fRegistry := &fakeRegistry{}
g.Expect(func() { r.Init(prometheus.NewRegistry()) }).ToNot(Panic())
g.Expect(func() { r.Init(fRegistry) }).ToNot(Panic())
g.Expect(fRegistry.collectors).To(HaveLen(6))
g.Expect(fRegistry.collectors).To(HaveLen(7))
Metric.IncMessageReceived("testReceiverName", 10)
}

Expand Down Expand Up @@ -68,18 +68,28 @@ func TestLockRenewalMetrics(t *testing.T) {
count, err := informer.GetMessageLockRenewedFailureCount()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(count).To(Equal(float64(0)))
count, err = informer.GetMessageLockRenewalTimeoutCount()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(count).To(Equal(float64(0)))

// after init, count 0
g.Expect(func() { r.Init(registerer) }).ToNot(Panic())
count, err = informer.GetMessageLockRenewedFailureCount()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(count).To(Equal(float64(0)))
count, err = informer.GetMessageLockRenewalTimeoutCount()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(count).To(Equal(float64(0)))

// count incremented
r.IncMessageLockRenewedFailure(tc.msg)
count, err = informer.GetMessageLockRenewedFailureCount()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(count).To(Equal(float64(1)))
r.IncMessageLockRenewalTimeoutCount(tc.msg)
count, err = informer.GetMessageLockRenewalTimeoutCount()
g.Expect(err).ToNot(HaveOccurred())
g.Expect(count).To(Equal(float64(1)))

// count failure only
r.IncMessageLockRenewedSuccess(tc.msg)
Expand Down
2 changes: 1 addition & 1 deletion v2/metrics/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ func TestRegister(t *testing.T) {
g := NewWithT(t)
reg := &fakeRegistry{}
g.Expect(func() { Register(reg) }).ToNot(Panic())
g.Expect(reg.collectors).To(HaveLen(8))
g.Expect(reg.collectors).To(HaveLen(9))
}
Loading