Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

packaging alertmanager sets and add a tests. #1894

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 23 additions & 29 deletions cmd/thanos/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/prometheus/prometheus/storage/tsdb"
"github.com/prometheus/prometheus/util/strutil"
"github.com/thanos-io/thanos/pkg/alert"
"github.com/thanos-io/thanos/pkg/alertmanager"
"github.com/thanos-io/thanos/pkg/block/metadata"
"github.com/thanos-io/thanos/pkg/component"
"github.com/thanos-io/thanos/pkg/discovery/cache"
Expand Down Expand Up @@ -336,8 +337,9 @@ func runRule(

// Run rule evaluation and alert notifications.
var (
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
ruleMgr = thanosrule.NewManager(dataDir)
alertmgrs = alertmanager.NewAlertmanagerSet(logger, alertmgrURLs, dns.ResolverType(dnsSDResolver))
alertQ = alert.NewQueue(logger, reg, 10000, 100, labelsTSDBToProm(lset), alertExcludeLabels)
ruleMgr = thanosrule.NewManager(dataDir)
)
{
notify := func(ctx context.Context, expr string, alerts ...*rules.Alert) {
Expand Down Expand Up @@ -400,33 +402,8 @@ func runRule(
}
// Discover and resolve Alertmanager addresses.
{
for i := range alertmgrs {
am := alertmgrs[i]
ctx, cancel := context.WithCancel(context.Background())
g.Add(func() error {
am.Discover(ctx)
return nil
}, func(error) {
cancel()
})

g.Add(func() error {
return runutil.Repeat(alertmgrsDNSSDInterval, ctx.Done(), func() error {
am.Resolve(ctx)
return nil
})
}, func(error) {
cancel()
})
}
}
// Run the alert sender.
{
clients := make([]alert.AlertmanagerClient, len(alertmgrs))
for i := range alertmgrs {
clients[i] = alertmgrs[i]
}
sdr := alert.NewSender(logger, reg, clients)
// TODO(bwplotka): https://github.com/thanos-io/thanos/issues/660.
sdr := alert.NewSender(logger, reg, alertmgrs.Get, nil, alertmgrsTimeout)
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error {
Expand All @@ -443,6 +420,23 @@ func runRule(
cancel()
})
}

{
ctx, cancel := context.WithCancel(context.Background())

g.Add(func() error {
return runutil.Repeat(30*time.Second, ctx.Done(), func() error {
if err := alertmgrs.Update(ctx); err != nil {
level.Error(logger).Log("msg", "refreshing alertmanagers failed", "err", err)
alertMngrAddrResolutionErrors.Inc()
}
return nil
})
}, func(error) {
cancel()
})
}

// Run File Service Discovery and update the query addresses when the files are modified.
if fileSD != nil {
var fileSDUpdates chan []*targetgroup.Group
Expand Down
127 changes: 127 additions & 0 deletions pkg/alertmanager/alertmanager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package alertmanager

import (
"context"
"net"
"net/url"
"strconv"
"strings"
"sync"

"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/discovery/dns"
)

const (
defaultAlertmanagerPort = 9093
)

// Alertmanager replica URLs to push firing alerts. Ruler claims success if
// push to at least one alertmanager from discovered succeeds. The scheme
//should not be empty e.g `http` might be used. The scheme may be prefixed
//with 'dns+' or 'dnssrv+' to detect Alertmanager IPs through respective
//DNS lookups. The port defaults to 9093 or the SRV record's value.
//The URL path is used as a prefix for the regular Alertmanager API path.
type AlertManager interface {
// Gets the address of the configured alertmanager
Get() []*url.URL

// Update and parse the raw url
Update(ctx context.Context) error
}

type alertmanagerSet struct {
resolver dns.Resolver
addrs []string
mtx sync.Mutex
current []*url.URL
}

func NewAlertmanagerSet(logger log.Logger, addrs []string, dnsSDResolver dns.ResolverType) *alertmanagerSet {
return &alertmanagerSet{
resolver: dns.NewResolver(dnsSDResolver.ToResolver(logger)),
addrs: addrs,
}
}

// Gets the address of the configured alertmanager
func (s *alertmanagerSet) Get() []*url.URL {
s.mtx.Lock()
defer s.mtx.Unlock()

return s.current
}

// Update and parse the raw url
func (s *alertmanagerSet) Update(ctx context.Context) error {
var result []*url.URL
for _, addr := range s.addrs {
var (
qtype dns.QType
resolvedHosts []string
)

qtype, u, err := parseAlertmanagerAddress(addr)
if err != nil {
return errors.Wrapf(err, "parse URL %q", addr)
}

// Get only the host and resolve it if needed.
host := u.Host
if qtype != "" {
if qtype == dns.A {
_, _, err = net.SplitHostPort(host)
if err != nil {
// The host could be missing a port. Append the defaultAlertmanagerPort.
host = host + ":" + strconv.Itoa(defaultAlertmanagerPort)
}
}
resolvedHosts, err = s.resolver.Resolve(ctx, host, qtype)
if err != nil {
return errors.Wrap(err, "alertmanager resolve")
}
} else {
resolvedHosts = []string{host}
}

for _, host := range resolvedHosts {
result = append(result, &url.URL{
Scheme: u.Scheme,
Host: host,
Path: u.Path,
User: u.User,
})
}
}

s.mtx.Lock()
s.current = result
s.mtx.Unlock()

return nil
}

func parseAlertmanagerAddress(addr string) (qType dns.QType, parsedUrl *url.URL, err error) {
qType = ""
parsedUrl, err = url.Parse(addr)
if err != nil {
return qType, nil, err
}

// The Scheme might contain DNS resolver type separated by + so we split it a part.
if schemeParts := strings.Split(parsedUrl.Scheme, "+"); len(schemeParts) > 1 {
parsedUrl.Scheme = schemeParts[len(schemeParts)-1]
qType = dns.QType(strings.Join(schemeParts[:len(schemeParts)-1], "+"))
}

switch parsedUrl.Scheme {
case "http", "https":
case "":
return "", nil, errors.New("The scheme should not be empty, e.g `http` or `https`")
default:
return "", nil, errors.New("Scheme should be `http` or `https`")
}

return qType, parsedUrl, err
}
104 changes: 104 additions & 0 deletions pkg/alertmanager/alertmanager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package alertmanager

import (
"context"
"net/url"
"testing"

"github.com/pkg/errors"
"github.com/thanos-io/thanos/pkg/discovery/dns"
"github.com/thanos-io/thanos/pkg/testutil"
)

func TestParseAlertmanagerAddress(t *testing.T) {
type expected struct {
hasErr bool
qtype dns.QType
url *url.URL
}
tests := []struct {
addr string
expected expected
}{
// no schema or no support schema
{"alertmanager", expected{hasErr: true}},
{"alertmanager:9093", expected{hasErr: true}},
{"tcp://alertmanager:9093", expected{hasErr: true}},

// correct cases
{"http://alertmanager:9093", expected{hasErr: false, qtype: dns.QType(""), url: &url.URL{Scheme: "http", Host: "alertmanager:9093"}}},
{"dns+http://alertmanager:9093", expected{hasErr: false, qtype: dns.QType("dns"), url: &url.URL{Scheme: "http", Host: "alertmanager:9093"}}},
{"dnssrv+http://alertmanager:9093", expected{hasErr: false, qtype: dns.QType("dnssrv"), url: &url.URL{Scheme: "http", Host: "alertmanager:9093"}}},
{"dnssrvnoa+http://alertmanager:9093", expected{hasErr: false, qtype: dns.QType("dnssrvnoa"), url: &url.URL{Scheme: "http", Host: "alertmanager:9093"}}},
}
for _, tt := range tests {
gotQType, gotParsedUrl, err := parseAlertmanagerAddress(tt.addr)
if tt.expected.hasErr {
testutil.NotOk(t, err)
} else {
testutil.Ok(t, err)
}
testutil.Equals(t, tt.expected.qtype, gotQType)
testutil.Equals(t, tt.expected.url, gotParsedUrl)
}
}

type mockResolver struct {
resultIPs map[string][]string
err error
}

func (m mockResolver) Resolve(ctx context.Context, name string, qtype dns.QType) ([]string, error) {
if m.err != nil {
return nil, m.err
}
if res, ok := m.resultIPs[name]; ok {
return res, nil
}
return nil, errors.Errorf("mockResolver not found response for name: %s", name)
}

func TestRuleAlertmanagerResolveWithPort(t *testing.T) {
mockResolver := mockResolver{
resultIPs: map[string][]string{
"alertmanager.com:19093": {"1.1.1.1:9300"},
},
}

am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com:19093"}}

ctx := context.TODO()
err := am.Update(ctx)
testutil.Ok(t, err)

expected := []*url.URL{
{
Scheme: "http",
Host: "1.1.1.1:9300",
},
}
gotURLs := am.Get()
testutil.Equals(t, expected, gotURLs)
}

func TestRuleAlertmanagerResolveWithoutPort(t *testing.T) {
mockResolver := mockResolver{
resultIPs: map[string][]string{
"alertmanager.com:9093": {"1.1.1.1:9300"},
},
}
am := alertmanagerSet{resolver: mockResolver, addrs: []string{"dns+http://alertmanager.com"}}

ctx := context.TODO()
err := am.Update(ctx)
testutil.Ok(t, err)

expected := []*url.URL{
{
Scheme: "http",
Host: "1.1.1.1:9300",
},
}
gotURLs := am.Get()
testutil.Equals(t, expected, gotURLs)
}