From e46ac1bab8bcac9fc32d2a93728756a3fc3469ee Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Thu, 29 Mar 2018 15:59:33 +0200 Subject: [PATCH 1/6] inhibit: update inhibition cache when alerts resolve Signed-off-by: Simon Pasquier --- inhibit/inhibit.go | 23 +++++++---- test/acceptance/inhibit_test.go | 72 +++++++++++++++++++++++++++++++++ 2 files changed, 87 insertions(+), 8 deletions(-) diff --git a/inhibit/inhibit.go b/inhibit/inhibit.go index b1c6192051..c7cb18b4bc 100644 --- a/inhibit/inhibit.go +++ b/inhibit/inhibit.go @@ -43,6 +43,9 @@ type Inhibitor struct { // NewInhibitor returns a new Inhibitor. func NewInhibitor(ap provider.Alerts, rs []*config.InhibitRule, mk types.Marker, logger log.Logger) *Inhibitor { + if logger == nil { + logger = log.NewNopLogger() + } ih := &Inhibitor{ alerts: ap, marker: mk, @@ -81,14 +84,9 @@ func (ih *Inhibitor) run(ctx context.Context) { level.Error(ih.logger).Log("msg", "Error iterating alerts", "err", err) continue } - if a.Resolved() { - // As alerts can also time out without an update, we never - // handle new resolved alerts but invalidate the cache on read. - continue - } - // Populate the inhibition rules' cache. + // Update the inhibition rules' cache. for _, r := range ih.rules { - if r.SourceMatchers.Match(a.Labels) { + if r.exists(a) || r.SourceMatchers.Match(a.Labels) { r.set(a) } } @@ -145,7 +143,7 @@ func (ih *Inhibitor) Mutes(lset model.LabelSet) bool { for _, r := range ih.rules { // Only inhibit if target matchers match but source matchers don't. if inhibitedByFP, eq := r.hasEqual(lset); !r.SourceMatchers.Match(lset) && r.TargetMatchers.Match(lset) && eq { - ih.marker.SetInhibited(fp, fmt.Sprintf("%d", inhibitedByFP)) + ih.marker.SetInhibited(fp, fmt.Sprintf("%s", inhibitedByFP.String())) return true } } @@ -217,6 +215,15 @@ func (r *InhibitRule) set(a *types.Alert) { r.scache[a.Fingerprint()] = a } +// exists returns true if the alert is present in the source cache. +func (r *InhibitRule) exists(a *types.Alert) bool { + r.mtx.Lock() + defer r.mtx.Unlock() + + _, ok := r.scache[a.Fingerprint()] + return ok +} + // hasEqual checks whether the source cache contains alerts matching // the equal labels for the given label set. func (r *InhibitRule) hasEqual(lset model.LabelSet) (model.Fingerprint, bool) { diff --git a/test/acceptance/inhibit_test.go b/test/acceptance/inhibit_test.go index 1e8cacdf5e..43d317c67c 100644 --- a/test/acceptance/inhibit_test.go +++ b/test/acceptance/inhibit_test.go @@ -24,6 +24,10 @@ import ( func TestInhibiting(t *testing.T) { t.Parallel() + // This integration test checks that alerts can be inhibited and that an + // inhibited alert will be notified again as soon as the inhibiting alert + // gets resolved. + conf := ` route: receiver: "default" @@ -64,6 +68,10 @@ inhibit_rules: // second batch of notifications. am.Push(At(2.2), Alert("alertname", "JobDown", "job", "testjob", "zone", "aa")) + // InstanceDown in zone aa should fire again in the third batch of + // notifications once JobDown in zone aa gets resolved. + am.Push(At(3.6), Alert("alertname", "JobDown", "job", "testjob", "zone", "aa").Active(2.2, 3.6)) + co.Want(Between(2, 2.5), Alert("alertname", "test1", "job", "testjob", "zone", "aa").Active(1), Alert("alertname", "InstanceDown", "job", "testjob", "zone", "aa").Active(1), @@ -76,5 +84,69 @@ inhibit_rules: Alert("alertname", "JobDown", "job", "testjob", "zone", "aa").Active(2.2), ) + co.Want(Between(4, 4.5), + Alert("alertname", "test1", "job", "testjob", "zone", "aa").Active(1), + Alert("alertname", "InstanceDown", "job", "testjob", "zone", "aa").Active(1), + Alert("alertname", "InstanceDown", "job", "testjob", "zone", "ab").Active(1), + Alert("alertname", "JobDown", "job", "testjob", "zone", "aa").Active(2.2, 3.6), + ) + + at.Run() +} + +func TestAlwaysInhibiting(t *testing.T) { + t.Parallel() + + // This integration test checks that when inhibited and inhibiting alerts + // gets resolved at the same time, the final notification contains both + // alerts. + + conf := ` +route: + receiver: "default" + group_by: [] + group_wait: 1s + group_interval: 1s + repeat_interval: 1s + +receivers: +- name: "default" + webhook_configs: + - url: 'http://%s' + +inhibit_rules: +- source_match: + alertname: JobDown + target_match: + alertname: InstanceDown + equal: + - job + - zone +` + + at := NewAcceptanceTest(t, &AcceptanceOpts{ + Tolerance: 150 * time.Millisecond, + }) + + co := at.Collector("webhook") + wh := NewWebhook(co) + + am := at.Alertmanager(fmt.Sprintf(conf, wh.Address())) + + am.Push(At(1), Alert("alertname", "InstanceDown", "job", "testjob", "zone", "aa")) + am.Push(At(1), Alert("alertname", "JobDown", "job", "testjob", "zone", "aa")) + + am.Push(At(2.6), Alert("alertname", "JobDown", "job", "testjob", "zone", "aa").Active(1, 2.6)) + am.Push(At(2.6), Alert("alertname", "InstanceDown", "job", "testjob", "zone", "aa").Active(1, 2.6)) + + co.Want(Between(2, 2.5), + Alert("alertname", "JobDown", "job", "testjob", "zone", "aa").Active(1), + ) + + co.Want(Between(3, 3.5), + Alert("alertname", "InstanceDown", "job", "testjob", "zone", "aa").Active(1, 2.6), + Alert("alertname", "JobDown", "job", "testjob", "zone", "aa").Active(1, 2.6), + ) + at.Run() } From 39fbc512145f6c8ae044aa950270a55373cfa7ed Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Thu, 5 Apr 2018 11:06:56 +0200 Subject: [PATCH 2/6] inhibit: remove unnecessary fmt.Sprintf Signed-off-by: Simon Pasquier --- inhibit/inhibit.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/inhibit/inhibit.go b/inhibit/inhibit.go index c7cb18b4bc..40da664f46 100644 --- a/inhibit/inhibit.go +++ b/inhibit/inhibit.go @@ -15,7 +15,6 @@ package inhibit import ( "context" - "fmt" "sync" "time" @@ -143,7 +142,7 @@ func (ih *Inhibitor) Mutes(lset model.LabelSet) bool { for _, r := range ih.rules { // Only inhibit if target matchers match but source matchers don't. if inhibitedByFP, eq := r.hasEqual(lset); !r.SourceMatchers.Match(lset) && r.TargetMatchers.Match(lset) && eq { - ih.marker.SetInhibited(fp, fmt.Sprintf("%s", inhibitedByFP.String())) + ih.marker.SetInhibited(fp, inhibitedByFP.String()) return true } } From f4846006e6ff61e51ff0bf8ed3168cdfca956e11 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Thu, 5 Apr 2018 16:58:16 +0200 Subject: [PATCH 3/6] inhibit: add unit tests Signed-off-by: Simon Pasquier --- inhibit/inhibit_test.go | 156 +++++++++++++++++++++++++++++++++++++++- 1 file changed, 155 insertions(+), 1 deletion(-) diff --git a/inhibit/inhibit_test.go b/inhibit/inhibit_test.go index 9695c998af..a1ca1740cf 100644 --- a/inhibit/inhibit_test.go +++ b/inhibit/inhibit_test.go @@ -19,12 +19,16 @@ import ( "time" "github.com/kylelemons/godebug/pretty" + "github.com/prometheus/common/model" + "github.com/prometheus/alertmanager/config" + "github.com/prometheus/alertmanager/provider" "github.com/prometheus/alertmanager/types" - "github.com/prometheus/common/model" ) func TestInhibitRuleHasEqual(t *testing.T) { + t.Parallel() + now := time.Now() cases := []struct { initial map[model.Fingerprint]*types.Alert @@ -135,6 +139,8 @@ func TestInhibitRuleHasEqual(t *testing.T) { } func TestInhibitRuleMatches(t *testing.T) { + t.Parallel() + // Simple inhibut rule cr := config.InhibitRule{ SourceMatch: map[string]string{"s": "1"}, @@ -226,3 +232,151 @@ func TestInhibitRuleGC(t *testing.T) { t.Errorf(pretty.Compare(r.scache, after)) } } + +type fakeAlerts struct { + alerts []*types.Alert + finished chan struct{} +} + +func newFakeAlerts(alerts []*types.Alert) *fakeAlerts { + return &fakeAlerts{ + alerts: alerts, + finished: make(chan struct{}), + } +} + +func (f *fakeAlerts) GetPending() provider.AlertIterator { return nil } +func (f *fakeAlerts) Get(model.Fingerprint) (*types.Alert, error) { return nil, nil } +func (f *fakeAlerts) Put(...*types.Alert) error { return nil } +func (f *fakeAlerts) Subscribe() provider.AlertIterator { + ch := make(chan *types.Alert) + done := make(chan struct{}) + go func() { + for _, a := range f.alerts { + ch <- a + } + // Send another (meaningless) alert to make sure that the inhibitor has + // processed everything. + ch <- &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{}, + StartsAt: time.Now(), + }, + } + close(f.finished) + <-done + }() + return provider.NewAlertIterator(ch, done, nil) +} + +func TestInhibit(t *testing.T) { + t.Parallel() + + now := time.Now() + inhibitRule := func() *config.InhibitRule { + return &config.InhibitRule{ + SourceMatch: map[string]string{"s": "1"}, + TargetMatch: map[string]string{"t": "1"}, + Equal: model.LabelNames{"e"}, + } + } + // alertOne is muted by alertTwo when it is active. + alertOne := func() *types.Alert { + return &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{"t": "1", "e": "f"}, + StartsAt: now.Add(-time.Minute), + EndsAt: now.Add(time.Hour), + }, + } + } + alertTwo := func(resolved bool) *types.Alert { + var end time.Time + if resolved { + end = now.Add(-time.Second) + } else { + end = now.Add(time.Hour) + } + return &types.Alert{ + Alert: model.Alert{ + Labels: model.LabelSet{"s": "1", "e": "f"}, + StartsAt: now.Add(-time.Minute), + EndsAt: end, + }, + } + } + + type exp struct { + lbls model.LabelSet + muted bool + } + for i, tc := range []struct { + alerts []*types.Alert + expected []exp + }{ + { + // alertOne shouldn't be muted since alertTwo hasn't fired. + alerts: []*types.Alert{alertOne()}, + expected: []exp{ + { + lbls: model.LabelSet{"t": "1", "e": "f"}, + muted: false, + }, + }, + }, + { + // alertOne should be muted by alertTwo which is active. + alerts: []*types.Alert{alertOne(), alertTwo(false)}, + expected: []exp{ + { + lbls: model.LabelSet{"t": "1", "e": "f"}, + muted: true, + }, + { + lbls: model.LabelSet{"s": "1", "e": "f"}, + muted: false, + }, + }, + }, + { + // alertOne shouldn't be muted since alertTwo is resolved. + alerts: []*types.Alert{alertOne(), alertTwo(false), alertTwo(true)}, + expected: []exp{ + { + lbls: model.LabelSet{"t": "1", "e": "f"}, + muted: false, + }, + { + lbls: model.LabelSet{"s": "1", "e": "f"}, + muted: false, + }, + }, + }, + } { + ap := newFakeAlerts(tc.alerts) + mk := types.NewMarker() + inhibitor := NewInhibitor(ap, []*config.InhibitRule{inhibitRule()}, mk, nil) + + go func() { + for ap.finished != nil { + select { + case <-ap.finished: + ap.finished = nil + default: + } + } + inhibitor.Stop() + }() + inhibitor.Run() + + for _, expected := range tc.expected { + if inhibitor.Mutes(expected.lbls) != expected.muted { + mute := "unmuted" + if expected.muted { + mute = "muted" + } + t.Errorf("tc: %d, expected alert with labels %q to be %s", i, expected.lbls, mute) + } + } + } +} From d118263c745591eadad3e416122e8261dd445df0 Mon Sep 17 00:00:00 2001 From: Simon Pasquier Date: Tue, 10 Apr 2018 14:05:35 +0200 Subject: [PATCH 4/6] inhibit: use NopLogger in tests Signed-off-by: Simon Pasquier --- inhibit/inhibit.go | 3 --- inhibit/inhibit_test.go | 7 +++++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/inhibit/inhibit.go b/inhibit/inhibit.go index 40da664f46..faf2b81c8d 100644 --- a/inhibit/inhibit.go +++ b/inhibit/inhibit.go @@ -42,9 +42,6 @@ type Inhibitor struct { // NewInhibitor returns a new Inhibitor. func NewInhibitor(ap provider.Alerts, rs []*config.InhibitRule, mk types.Marker, logger log.Logger) *Inhibitor { - if logger == nil { - logger = log.NewNopLogger() - } ih := &Inhibitor{ alerts: ap, marker: mk, diff --git a/inhibit/inhibit_test.go b/inhibit/inhibit_test.go index a1ca1740cf..d269c06f23 100644 --- a/inhibit/inhibit_test.go +++ b/inhibit/inhibit_test.go @@ -18,6 +18,7 @@ import ( "testing" "time" + "github.com/go-kit/kit/log" "github.com/kylelemons/godebug/pretty" "github.com/prometheus/common/model" @@ -26,6 +27,8 @@ import ( "github.com/prometheus/alertmanager/types" ) +var nopLogger = log.NewNopLogger() + func TestInhibitRuleHasEqual(t *testing.T) { t.Parallel() @@ -148,7 +151,7 @@ func TestInhibitRuleMatches(t *testing.T) { Equal: model.LabelNames{"e"}, } m := types.NewMarker() - ih := NewInhibitor(nil, []*config.InhibitRule{&cr}, m, nil) + ih := NewInhibitor(nil, []*config.InhibitRule{&cr}, m, nopLogger) ir := ih.rules[0] now := time.Now() // Active alert that matches the source filter @@ -355,7 +358,7 @@ func TestInhibit(t *testing.T) { } { ap := newFakeAlerts(tc.alerts) mk := types.NewMarker() - inhibitor := NewInhibitor(ap, []*config.InhibitRule{inhibitRule()}, mk, nil) + inhibitor := NewInhibitor(ap, []*config.InhibitRule{inhibitRule()}, mk, nopLogger) go func() { for ap.finished != nil { From a0317db8c402104361e045426e63b021792d555b Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Tue, 17 Apr 2018 18:10:46 +0200 Subject: [PATCH 5/6] Update old alert with result of merge with new On ingest, alerts with matching fingerprints are merged if the new alert's start and end times overlap with the old alert's. The merge creates a new alert, which is then updated in the internal alert store. The original alert is not updated (because merge creates a copy), so it is never marked as resolved in the inhibitor's reference to it. The code within the inhibitor relies on skipping over resolved alerts, but because the old alert is never updated it is never marked as resolved. Thus it continues to inhibit other alerts until it is cleaned up by the internal GC. This commit updates the struct of the old alert with the result of the merge with the new alert. An alternative would be to always update the inhibitor's internal cache of alerts regardless of an alert's resolve status. Signed-off-by: stuart nelson --- inhibit/inhibit.go | 14 ++++---------- provider/mem/mem.go | 6 ++++++ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/inhibit/inhibit.go b/inhibit/inhibit.go index faf2b81c8d..8563e1844f 100644 --- a/inhibit/inhibit.go +++ b/inhibit/inhibit.go @@ -80,9 +80,12 @@ func (ih *Inhibitor) run(ctx context.Context) { level.Error(ih.logger).Log("msg", "Error iterating alerts", "err", err) continue } + if a.Resolved() { + continue + } // Update the inhibition rules' cache. for _, r := range ih.rules { - if r.exists(a) || r.SourceMatchers.Match(a.Labels) { + if r.SourceMatchers.Match(a.Labels) { r.set(a) } } @@ -211,15 +214,6 @@ func (r *InhibitRule) set(a *types.Alert) { r.scache[a.Fingerprint()] = a } -// exists returns true if the alert is present in the source cache. -func (r *InhibitRule) exists(a *types.Alert) bool { - r.mtx.Lock() - defer r.mtx.Unlock() - - _, ok := r.scache[a.Fingerprint()] - return ok -} - // hasEqual checks whether the source cache contains alerts matching // the equal labels for the given label set. func (r *InhibitRule) hasEqual(lset model.LabelSet) (model.Fingerprint, bool) { diff --git a/provider/mem/mem.go b/provider/mem/mem.go index 23c91e224e..54c8142e7e 100644 --- a/provider/mem/mem.go +++ b/provider/mem/mem.go @@ -180,6 +180,12 @@ func (a *Alerts) Put(alerts ...*types.Alert) error { if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) || (alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) { alert = old.Merge(alert) + // Merge returns a new alert. In order to + // update old, we have to set the struct it + // points to to equal the newly merged alert. + // This is necessary as old may be stored in + // the inhibitor's rules cache. + *old = *alert } } From bb5b73ddf0ccf0f59c7a4845ad57da6716804a5c Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Tue, 17 Apr 2018 18:32:03 +0200 Subject: [PATCH 6/6] Update inhibitor cache even if alert is resolved This seems like a better choice than the previous commit. I think it is more sane to have the inhibitor update its own cache, rather than having one of its pointers updated externally. Signed-off-by: stuart nelson --- inhibit/inhibit.go | 3 --- provider/mem/mem.go | 6 ------ 2 files changed, 9 deletions(-) diff --git a/inhibit/inhibit.go b/inhibit/inhibit.go index 8563e1844f..80575ec73e 100644 --- a/inhibit/inhibit.go +++ b/inhibit/inhibit.go @@ -80,9 +80,6 @@ func (ih *Inhibitor) run(ctx context.Context) { level.Error(ih.logger).Log("msg", "Error iterating alerts", "err", err) continue } - if a.Resolved() { - continue - } // Update the inhibition rules' cache. for _, r := range ih.rules { if r.SourceMatchers.Match(a.Labels) { diff --git a/provider/mem/mem.go b/provider/mem/mem.go index 54c8142e7e..23c91e224e 100644 --- a/provider/mem/mem.go +++ b/provider/mem/mem.go @@ -180,12 +180,6 @@ func (a *Alerts) Put(alerts ...*types.Alert) error { if (alert.EndsAt.After(old.StartsAt) && alert.EndsAt.Before(old.EndsAt)) || (alert.StartsAt.After(old.StartsAt) && alert.StartsAt.Before(old.EndsAt)) { alert = old.Merge(alert) - // Merge returns a new alert. In order to - // update old, we have to set the struct it - // points to to equal the newly merged alert. - // This is necessary as old may be stored in - // the inhibitor's rules cache. - *old = *alert } }