Skip to content
This repository was archived by the owner on May 29, 2024. It is now read-only.

Publish SNS event on new alert #200

Merged
merged 19 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config.env.template
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ P0_PAGERDUTY_ALERT_EVENTS_URL=
P1_PAGERDUTY_INTEGRATION_KEY=
P1_PAGERDUTY_ALERT_EVENTS_URL=

SNS_TOPIC_ARN=

# Metrics configurations
METRICS_HOST=localhost
METRICS_PORT=7300
Expand Down
3 changes: 3 additions & 0 deletions e2e/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,9 @@ func DefaultTestConfig() *config.Config {
AlertConfig: &alert.Config{
PagerdutyAlertEventsURL: "",
RoutingCfgPath: "",
SNSConfig: &client.SNSConfig{
TopicArn: "e2e-test-arn",
},
},
EngineConfig: &engine.Config{
WorkerCount: workerCount,
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/VictoriaMetrics/fastcache v1.10.0 // indirect
github.com/ajg/form v1.5.1 // indirect
github.com/aws/aws-sdk-go v1.50.3 // indirect
github.com/benbjohnson/clock v1.3.5 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.7.0 // indirect
Expand Down Expand Up @@ -104,6 +105,7 @@ require (
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.5 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
Expand Down
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ github.com/allegro/bigcache v1.2.1 h1:hg1sY1raCwic3Vnsvje6TT7/pnZba83LeFck5NrFKS
github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
github.com/aws/aws-sdk-go v1.50.3 h1:NnXC/ukOakZbBwQcwAzkAXYEB4SbWboP9TFx9vvhIrE=
github.com/aws/aws-sdk-go v1.50.3/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aymerick/raymond v2.0.3-0.20180322193309-b565731e1464+incompatible/go.mod h1:osfaiScAUVup+UC9Nfq76eWqDhXlp+4UYaA8uhTBO6g=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
Expand Down Expand Up @@ -437,6 +439,9 @@ github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD
github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc=
github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ=
github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4=
github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ=
Expand Down
37 changes: 31 additions & 6 deletions internal/alert/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Config struct {
RoutingCfgPath string
PagerdutyAlertEventsURL string
RoutingParams *core.AlertRoutingParams
SNSConfig *client.SNSConfig
}

// alertManager ... Alert manager implementation
Expand All @@ -39,28 +40,30 @@ type alertManager struct {
interpolator *Interpolator
cdHandler CoolDownHandler
cm RoutingDirectory
sns client.SNSClient

logger *zap.Logger
metrics metrics.Metricer
alertTransit chan core.Alert
}

// NewManager ... Instantiates a new alert manager
func NewManager(ctx context.Context, cfg *Config, cm RoutingDirectory) Manager {
func NewManager(ctx context.Context, cfg *Config, cm RoutingDirectory, sns client.SNSClient) Manager {
// NOTE - Consider constructing dependencies in higher level
// abstraction and passing them in

ctx, cancel := context.WithCancel(ctx)

// NOTE - Consider adding support for additional sns configurations
am := &alertManager{
ctx: ctx,
cdHandler: NewCoolDownHandler(),
cfg: cfg,
cm: cm,

ctx: ctx,
cdHandler: NewCoolDownHandler(),
cfg: cfg,
cm: cm,
cancel: cancel,
interpolator: new(Interpolator),
store: NewStore(),
sns: sns,
alertTransit: make(chan core.Alert),
metrics: metrics.WithContext(ctx),
logger: logging.WithContext(ctx),
Expand Down Expand Up @@ -142,6 +145,24 @@ func (am *alertManager) handlePagerDutyPost(alert core.Alert) error {
return nil
}

func (am *alertManager) handleSNSPublish(alert core.Alert, policy *core.AlertPolicy) error {
event := &client.AlertEventTrigger{
Message: am.interpolator.SlackMessage(alert, policy.Msg),
DedupKey: alert.PathID,
Severity: alert.Sev,
}

resp, err := am.sns.PostEvent(am.ctx, event)
if err != nil {
return err
}

if resp.Status != core.SuccessStatus {
return fmt.Errorf("client %s could not post to sns: %s", am.sns.GetName(), resp.Message)
}
return nil
}

// EventLoop ... Event loop for alert manager subsystem
func (am *alertManager) EventLoop() error {
ticker := time.NewTicker(time.Second * 1)
Expand Down Expand Up @@ -202,6 +223,10 @@ func (am *alertManager) HandleAlert(alert core.Alert, policy *core.AlertPolicy)
if err := am.handlePagerDutyPost(alert); err != nil {
am.logger.Error("could not post to pagerduty", zap.Error(err))
}

if err := am.handleSNSPublish(alert, policy); err != nil {
am.logger.Error("could not publish to sns", zap.Error(err))
}
}

// Shutdown ... Shuts down the alert manager subsystem
Expand Down
33 changes: 28 additions & 5 deletions internal/alert/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func TestEventLoop(t *testing.T) {
description: "Test low sev alert sends to slack",
test: func(t *testing.T) {
cm := alert.NewRoutingDirectory(cfg.AlertConfig)
am := alert.NewManager(ctx, cfg.AlertConfig, cm)
sns := mocks.NewMockSNSClient(c)

am := alert.NewManager(ctx, cfg.AlertConfig, cm, sns)

go func() {
_ = am.EventLoop()
Expand Down Expand Up @@ -76,6 +78,12 @@ func TestEventLoop(t *testing.T) {
}, nil).Times(1)
}

sns.EXPECT().PostEvent(gomock.Any(), gomock.Any()).Return(
&client.AlertAPIResponse{
Message: "test",
Status: core.SuccessStatus,
}, nil).AnyTimes()

ingress <- alert
time.Sleep(1 * time.Second)
id := core.NewUUID()
Expand All @@ -93,7 +101,8 @@ func TestEventLoop(t *testing.T) {
description: "Test medium sev alert sends to just PagerDuty",
test: func(t *testing.T) {
cm := alert.NewRoutingDirectory(cfg.AlertConfig)
am := alert.NewManager(ctx, cfg.AlertConfig, cm)
sns := mocks.NewMockSNSClient(c)
am := alert.NewManager(ctx, cfg.AlertConfig, cm, sns)

go func() {
_ = am.EventLoop()
Expand Down Expand Up @@ -130,6 +139,12 @@ func TestEventLoop(t *testing.T) {
}, nil).Times(1)
}

sns.EXPECT().PostEvent(gomock.Any(), gomock.Any()).Return(
&client.AlertAPIResponse{
Message: "test",
Status: core.SuccessStatus,
}, nil).AnyTimes()

ingress <- alert
time.Sleep(1 * time.Second)
id := core.UUID{}
Expand All @@ -147,7 +162,8 @@ func TestEventLoop(t *testing.T) {
description: "Test high sev alert sends to both slack and PagerDuty",
test: func(t *testing.T) {
cm := alert.NewRoutingDirectory(cfg.AlertConfig)
am := alert.NewManager(ctx, cfg.AlertConfig, cm)
sns := mocks.NewMockSNSClient(c)
am := alert.NewManager(ctx, cfg.AlertConfig, cm, sns)

go func() {
_ = am.EventLoop()
Expand Down Expand Up @@ -181,7 +197,7 @@ func TestEventLoop(t *testing.T) {
&client.AlertAPIResponse{
Message: "test",
Status: core.SuccessStatus,
}, nil).Times(1)
}, nil)
}

for _, cli := range cm.GetSlackClients(core.HIGH) {
Expand All @@ -191,8 +207,15 @@ func TestEventLoop(t *testing.T) {
&client.AlertAPIResponse{
Message: "test",
Status: core.SuccessStatus,
}, nil).Times(1)
}, nil)
}

sns.EXPECT().PostEvent(gomock.Any(), gomock.Any()).Return(
&client.AlertAPIResponse{
Message: "test",
Status: core.SuccessStatus,
}, nil).AnyTimes()

ingress <- alert
time.Sleep(1 * time.Second)
id := core.UUID{}
Expand Down
3 changes: 2 additions & 1 deletion internal/app/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ func InitializeAlerting(ctx context.Context, cfg *config.Config) (alert.Manager,
}

clientMap := alert.NewRoutingDirectory(cfg.AlertConfig)
snsClient := client.NewSNSClient(cfg.AlertConfig.SNSConfig, "pessimism")

return alert.NewManager(ctx, cfg.AlertConfig, clientMap), nil
return alert.NewManager(ctx, cfg.AlertConfig, clientMap, snsClient), nil
}

// InitializeETL ... Performs dependency injection to build etl struct
Expand Down
95 changes: 95 additions & 0 deletions internal/client/sns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
//go:generate mockgen -package mocks --destination=../mocks/mock_sns.go . SNSClient

package client

import (
"context"
"os"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/base-org/pessimism/internal/core"
"github.com/base-org/pessimism/internal/logging"

"go.uber.org/zap"
)

// SNSClient ... An interface for SNS clients to implement
type SNSClient interface {
AlertClient
}

// SNSConfig ... Configuration for SNS client
type SNSConfig struct {
TopicArn string
}

type snsClient struct {
svc *sns.SNS
name string
topicArn string
}

// NewSNSClient ... Initializer
func NewSNSClient(cfg *SNSConfig, name string) SNSClient {
if cfg.TopicArn == "" {
logging.NoContext().Warn("No SNS topic ARN provided")
}

logging.NoContext().Debug("AWS Region", zap.String("region", os.Getenv("AWS_REGION")))

// Initialize a session that the SDK will use to load configuration,
// credentials, and region. AWS_REGION, AWS_SECRET_ACCESS_KEY, and AWS_ACCESS_KEY_ID should be set in the
// environment's runtime
sess, err := session.NewSession()
if err != nil {
logging.NoContext().Error("Failed to create SNS session", zap.Error(err))
return nil
}

return &snsClient{
svc: sns.New(sess),
topicArn: cfg.TopicArn,
name: name,
}
}

// PostEvent ... Posts an event to an SNS topic ARN
func (sc snsClient) PostEvent(_ context.Context, event *AlertEventTrigger) (*AlertAPIResponse, error) {
// Publish a message to the topic
result, err := sc.svc.Publish(&sns.PublishInput{
MessageAttributes: getAttributesFromEvent(event),
Message: &event.Message,
TopicArn: &sc.topicArn,
})
if err != nil {
return &AlertAPIResponse{
Status: core.FailureStatus,
Message: err.Error(),
}, err
}

return &AlertAPIResponse{
Status: core.SuccessStatus,
Message: *result.MessageId,
}, nil
}

// getAttributesFromEvent ... Helper method to get attributes from an AlertEventTrigger
func getAttributesFromEvent(event *AlertEventTrigger) map[string]*sns.MessageAttributeValue {
return map[string]*sns.MessageAttributeValue{
"severity": {
DataType: aws.String("String"),
StringValue: aws.String(event.Severity.String()),
},
"dedup_key": {
DataType: aws.String("String"),
StringValue: aws.String(event.DedupKey.String()),
},
}
}

func (sc snsClient) GetName() string {
return sc.name
}
1 change: 1 addition & 0 deletions internal/client/sns_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package client
3 changes: 3 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func NewConfig(fileName core.FilePath) *Config {
RoutingCfgPath: getEnvStrWithDefault("ALERT_ROUTE_CFG_PATH", "alerts-routing.yaml"),
PagerdutyAlertEventsURL: getEnvStrWithDefault("PAGERDUTY_ALERT_EVENTS_URL", ""),
RoutingParams: nil, // This is populated after the config is created (see IngestAlertConfig)
SNSConfig: &client.SNSConfig{
TopicArn: getEnvStrWithDefault("SNS_TOPIC_ARN", ""),
},
},

ClientConfig: &client.Config{
Expand Down
3 changes: 2 additions & 1 deletion internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,8 +247,9 @@ func (m *Metrics) RecordAlertGenerated(alert core.Alert, dest core.AlertDestinat
net := alert.PathID.Network().String()
h := alert.HT.String()
sev := alert.Sev.String()
path := alert.PathID.String()

m.AlertsGenerated.WithLabelValues(net, h, sev, dest.String(), clientName).Inc()
m.AlertsGenerated.WithLabelValues(net, h, path, sev, dest.String(), clientName).Inc()
}

func (m *Metrics) RecordNodeError(n core.Network) {
Expand Down
Loading
Loading