diff --git a/cmd/thanos/rule.go b/cmd/thanos/rule.go index 395a16ecc8..5ef1a1bdc7 100644 --- a/cmd/thanos/rule.go +++ b/cmd/thanos/rule.go @@ -31,6 +31,7 @@ import ( "github.com/prometheus/prometheus/storage/tsdb" "github.com/prometheus/prometheus/util/strutil" "github.com/thanos-io/thanos/pkg/alert" + "github.com/thanos-io/thanos/pkg/alertmanager" "github.com/thanos-io/thanos/pkg/block/metadata" "github.com/thanos-io/thanos/pkg/component" "github.com/thanos-io/thanos/pkg/discovery/cache" @@ -336,8 +337,9 @@ func runRule( // Run rule evaluation and alert notifications. var ( - alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels) - ruleMgr = thanosrule.NewManager(dataDir) + alertmgrs = alertmanager.NewAlertmanagerSet(logger, alertmgrURLs, dns.ResolverType(dnsSDResolver)) + alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels) + ruleMgr = thanosrule.NewManager(dataDir) ) { notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) { @@ -400,33 +402,8 @@ func runRule( } // Discover and resolve Alertmanager addresses. { - for i := range alertmgrs { - am := alertmgrs[i] - ctx, cancel := context.WithCancel(context.Background()) - g.Add(func() error { - am.Discover(ctx) - return nil - }, func(error) { - cancel() - }) - - g.Add(func() error { - return runutil.Repeat(alertmgrsDNSSDInterval, ctx.Done(), func() error { - am.Resolve(ctx) - return nil - }) - }, func(error) { - cancel() - }) - } - } - // Run the alert sender. - { - clients := make([]alert.AlertmanagerClient, len(alertmgrs)) - for i := range alertmgrs { - clients[i] = alertmgrs[i] - } - sdr := alert.NewSender(logger, reg, clients) + // TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660. + sdr := alert.NewSender(logger, reg, alertmgrs.Get, nil, alertmgrsTimeout) ctx, cancel := context.WithCancel(context.Background()) g.Add(func() error { @@ -443,6 +420,23 @@ func runRule( cancel() }) } + + { + ctx, cancel := context.WithCancel(context.Background()) + + g.Add(func() error { + return runutil.Repeat(30*time.Second, ctx.Done(), func() error { + if err := alertmgrs.Update(ctx); err != nil { + level.Error(logger).Log("msg", "refreshing alertmanagers failed", "err", err) + alertMngrAddrResolutionErrors.Inc() + } + return nil + }) + }, func(error) { + cancel() + }) + } + // Run File Service Discovery and update the query addresses when the files are modified. if fileSD != nil { var fileSDUpdates chan []*targetgroup.Group diff --git a/pkg/alertmanager/alertmanager.go b/pkg/alertmanager/alertmanager.go new file mode 100644 index 0000000000..6769cd4fed --- /dev/null +++ b/pkg/alertmanager/alertmanager.go @@ -0,0 +1,127 @@ +package alertmanager + +import ( + "context" + "net" + "net/url" + "strconv" + "strings" + "sync" + + "github.com/go-kit/kit/log" + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/discovery/dns" +) + +const ( + defaultAlertmanagerPort = 9093 +) + +// Alertmanager replica URLs to push firing alerts. Ruler claims success if +// push to at least one alertmanager from discovered succeeds. The scheme +//should not be empty e.g `http` might be used. The scheme may be prefixed +//with 'dns+' or 'dnssrv+' to detect Alertmanager IPs through respective +//DNS lookups. The port defaults to 9093 or the SRV record's value. +//The URL path is used as a prefix for the regular Alertmanager API path. +type AlertManager interface { + // Gets the address of the configured alertmanager + Get() []*url.URL + + // Update and parse the raw url + Update(ctx context.Context) error +} + +type alertmanagerSet struct { + resolver dns.Resolver + addrs []string + mtx sync.Mutex + current []*url.URL +} + +func NewAlertmanagerSet(logger log.Logger, addrs []string, dnsSDResolver dns.ResolverType) *alertmanagerSet { + return &alertmanagerSet{ + resolver: dns.NewResolver(dnsSDResolver.ToResolver(logger)), + addrs: addrs, + } +} + +// Gets the address of the configured alertmanager +func (s *alertmanagerSet) Get() []*url.URL { + s.mtx.Lock() + defer s.mtx.Unlock() + + return s.current +} + +// Update and parse the raw url +func (s *alertmanagerSet) Update(ctx context.Context) error { + var result []*url.URL + for _, addr := range s.addrs { + var ( + qtype dns.QType + resolvedHosts []string + ) + + qtype, u, err := parseAlertmanagerAddress(addr) + if err != nil { + return errors.Wrapf(err, "parse URL %q", addr) + } + + // Get only the host and resolve it if needed. + host := u.Host + if qtype != "" { + if qtype == dns.A { + _, _, err = net.SplitHostPort(host) + if err != nil { + // The host could be missing a port. Append the defaultAlertmanagerPort. + host = host + ":" + strconv.Itoa(defaultAlertmanagerPort) + } + } + resolvedHosts, err = s.resolver.Resolve(ctx, host, qtype) + if err != nil { + return errors.Wrap(err, "alertmanager resolve") + } + } else { + resolvedHosts = []string{host} + } + + for _, host := range resolvedHosts { + result = append(result, &url.URL{ + Scheme: u.Scheme, + Host: host, + Path: u.Path, + User: u.User, + }) + } + } + + s.mtx.Lock() + s.current = result + s.mtx.Unlock() + + return nil +} + +func parseAlertmanagerAddress(addr string) (qType dns.QType, parsedUrl *url.URL, err error) { + qType = "" + parsedUrl, err = url.Parse(addr) + if err != nil { + return qType, nil, err + } + + // The Scheme might contain DNS resolver type separated by + so we split it a part. + if schemeParts := strings.Split(parsedUrl.Scheme, "+"); len(schemeParts) > 1 { + parsedUrl.Scheme = schemeParts[len(schemeParts)-1] + qType = dns.QType(strings.Join(schemeParts[:len(schemeParts)-1], "+")) + } + + switch parsedUrl.Scheme { + case "http", "https": + case "": + return "", nil, errors.New("The scheme should not be empty, e.g `http` or `https`") + default: + return "", nil, errors.New("Scheme should be `http` or `https`") + } + + return qType, parsedUrl, err +} diff --git a/pkg/alertmanager/alertmanager_test.go b/pkg/alertmanager/alertmanager_test.go new file mode 100644 index 0000000000..160a3bcead --- /dev/null +++ b/pkg/alertmanager/alertmanager_test.go @@ -0,0 +1,104 @@ +package alertmanager + +import ( + "context" + "net/url" + "testing" + + "github.com/pkg/errors" + "github.com/thanos-io/thanos/pkg/discovery/dns" + "github.com/thanos-io/thanos/pkg/testutil" +) + +func TestParseAlertmanagerAddress(t *testing.T) { + type expected struct { + hasErr bool + qtype dns.QType + url *url.URL + } + tests := []struct { + addr string + expected expected + }{ + // no schema or no support schema + {"alertmanager", expected{hasErr: true}}, + {"alertmanager:9093", expected{hasErr: true}}, + {"tcp://alertmanager:9093", expected{hasErr: true}}, + + // correct cases + {"http://alertmanager:9093", expected{hasErr: false, qtype: dns.QType(""), url: &url.URL{Scheme: "http", Host: "alertmanager:9093"}}}, + {"dns+http://alertmanager:9093", expected{hasErr: false, qtype: dns.QType("dns"), url: &url.URL{Scheme: "http", Host: "alertmanager:9093"}}}, + {"dnssrv+http://alertmanager:9093", expected{hasErr: false, qtype: dns.QType("dnssrv"), url: &url.URL{Scheme: "http", Host: "alertmanager:9093"}}}, + {"dnssrvnoa+http://alertmanager:9093", expected{hasErr: false, qtype: dns.QType("dnssrvnoa"), url: &url.URL{Scheme: "http", Host: "alertmanager:9093"}}}, + } + for _, tt := range tests { + gotQType, gotParsedUrl, err := parseAlertmanagerAddress(tt.addr) + if tt.expected.hasErr { + testutil.NotOk(t, err) + } else { + testutil.Ok(t, err) + } + testutil.Equals(t, tt.expected.qtype, gotQType) + testutil.Equals(t, tt.expected.url, gotParsedUrl) + } +} + +type mockResolver struct { + resultIPs map[string][]string + err error +} + +func (m mockResolver) Resolve(ctx context.Context, name string, qtype dns.QType) ([]string, error) { + if m.err != nil { + return nil, m.err + } + if res, ok := m.resultIPs[name]; ok { + return res, nil + } + return nil, errors.Errorf("mockResolver not found response for name: %s", name) +} + +func TestRuleAlertmanagerResolveWithPort(t *testing.T) { + mockResolver := mockResolver{ + resultIPs: map[string][]string{ + "alertmanager.com:19093": {"1.1.1.1:9300"}, + }, + } + + am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com:19093"}} + + ctx := context.TODO() + err := am.Update(ctx) + testutil.Ok(t, err) + + expected := []*url.URL{ + { + Scheme: "http", + Host: "1.1.1.1:9300", + }, + } + gotURLs := am.Get() + testutil.Equals(t, expected, gotURLs) +} + +func TestRuleAlertmanagerResolveWithoutPort(t *testing.T) { + mockResolver := mockResolver{ + resultIPs: map[string][]string{ + "alertmanager.com:9093": {"1.1.1.1:9300"}, + }, + } + am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com"}} + + ctx := context.TODO() + err := am.Update(ctx) + testutil.Ok(t, err) + + expected := []*url.URL{ + { + Scheme: "http", + Host: "1.1.1.1:9300", + }, + } + gotURLs := am.Get() + testutil.Equals(t, expected, gotURLs) +}