From 753bf4566d58f641e1c40264893a3b8de9a28ad9 Mon Sep 17 00:00:00 2001 From: Brian Conway Date: Mon, 18 Sep 2023 13:39:30 -0500 Subject: [PATCH] Refactor alerter service for legibility - All test variations pass. --- internal/atlas-alerter/alerter/alert.go | 258 ++++++++++++------------ 1 file changed, 129 insertions(+), 129 deletions(-) diff --git a/internal/atlas-alerter/alerter/alert.go b/internal/atlas-alerter/alerter/alert.go index 5fe7c2c1..04419dd3 100644 --- a/internal/atlas-alerter/alerter/alert.go +++ b/internal/atlas-alerter/alerter/alert.go @@ -22,6 +22,7 @@ const ErrUnknownAlarm consterr.Error = "unknown alarm type" // stores the results. func (ale *Alerter) alertMessages() { alog.Info("alertMessages starting processor") + ctx := context.Background() var processCount int for msg := range ale.eOutSub.C() { @@ -50,8 +51,8 @@ func (ale *Alerter) alertMessages() { WithField("devID", eOut.Device.Id) // Retrieve org. - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - org, err := ale.orgDAO.Read(ctx, eOut.Device.OrgId) + dCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + org, err := ale.orgDAO.Read(dCtx, eOut.Device.OrgId) cancel() if err != nil { msg.Requeue() @@ -63,9 +64,9 @@ func (ale *Alerter) alertMessages() { logger.Debugf("alertMessages org: %+v", org) // Retrieve alarms by rule ID. Alarms may be disabled. - ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second) - alarms, _, err := ale.alarmDAO.List(ctx, eOut.Device.OrgId, time.Time{}, - "", 0, eOut.Rule.Id) + dCtx, cancel = context.WithTimeout(ctx, 5*time.Second) + alarms, _, err := ale.alarmDAO.List(dCtx, eOut.Device.OrgId, + time.Time{}, "", 0, eOut.Rule.Id) cancel() if err != nil { msg.Requeue() @@ -77,140 +78,139 @@ func (ale *Alerter) alertMessages() { logger.Debugf("alertMessages alarms: %+v", alarms) // Validate, retrieve users, process and send alerts, and store results. - // Unconditionally acknowledge a message after processing, as there are - // no guarantees of alarms or users being assigned to an event. for _, a := range alarms { - // Validate alarm. - if a.Status != api.Status_ACTIVE { - continue - } + ale.evalAlarms(alog.NewContext( + ctx, &alog.CtxLogger{Logger: logger}), eOut, org, a) + } - // Retrieve users. Only active users with matching tags will be - // returned. - ctx, cancel := context.WithTimeout(context.Background(), - 5*time.Second) - users, err := ale.userDAO.ListByTags(ctx, eOut.Device.OrgId, - a.UserTags) - cancel() - if err != nil { - metric.Incr("error", map[string]string{"func": "listbytags"}) - logger.Errorf("alertMessages ale.userDAO.ListByTags: %v", err) - - continue - } - if len(users) == 0 { - continue - } + msg.Ack() + metric.Incr("processed", nil) + + processCount++ + if processCount%100 == 0 { + alog.Infof("alertMessages processed %v messages", processCount) + } + } +} - // Generate alert subject and body. - subj, err := template.Generate(eOut.Point, eOut.Rule, eOut.Device, - a.SubjectTemplate) - if err != nil { - metric.Incr("error", map[string]string{"func": "gensubject"}) - logger.Errorf("alertMessages subject template.Generate: %v", - err) +// evalAlarms validates alarms, retrieves users, processes and sends alerts, and +// stores results. Unconditionally acknowledge a message after processing, as +// there are no guarantees of alarms or users being assigned to an event. +func (ale *Alerter) evalAlarms( + ctx context.Context, eOut *message.EventerOut, org *api.Org, a *api.Alarm, +) { + logger := alog.FromContext(ctx) + + // Validate alarm. + if a.Status != api.Status_ACTIVE { + return + } - continue - } + // Retrieve users. Only active users with matching tags will be returned. + dCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + users, err := ale.userDAO.ListByTags(dCtx, eOut.Device.OrgId, a.UserTags) + cancel() + if err != nil { + metric.Incr("error", map[string]string{"func": "listbytags"}) + logger.Errorf("alertMessages ale.userDAO.ListByTags: %v", err) - body, err := template.Generate(eOut.Point, eOut.Rule, eOut.Device, - a.BodyTemplate) - if err != nil { - metric.Incr("error", map[string]string{"func": "genbody"}) - logger.Errorf("alertMessages body template.Generate: %v", err) + return + } + if len(users) == 0 { + return + } - continue - } + // Generate alert subject and body. + subj, err := template.Generate(eOut.Point, eOut.Rule, eOut.Device, + a.SubjectTemplate) + if err != nil { + metric.Incr("error", map[string]string{"func": "gensubject"}) + logger.Errorf("alertMessages subject template.Generate: %v", err) - // Process alerts. - for _, user := range users { - // Check cache for existing repeat interval. - key := repeatKey(eOut.Device.OrgId, eOut.Device.Id, a.Id, - user.Id) - ctx, cancel := context.WithTimeout(context.Background(), - 5*time.Second) - ok, err := ale.cache.SetIfNotExistTTL(ctx, key, 1, - time.Duration(a.RepeatInterval)*time.Minute) - cancel() - if err != nil { - metric.Incr("error", map[string]string{ - "func": "setifnotexist", - }) - logger.Errorf("alertMessages ale.cache.SetIfNotExistTTL: "+ - "%v", err) - - continue - } - if !ok { - metric.Incr("repeat", nil) - - continue - } - - // Send alert. - ctx, cancel = context.WithTimeout(context.Background(), - time.Minute) - switch a.Type { - case api.AlarmType_APP: - err = ale.notify.App(ctx, user.AppKey, subj, body) - case api.AlarmType_SMS: - err = ale.notify.SMS(ctx, user.Phone, subj, body) - case api.AlarmType_EMAIL: - err = ale.notify.Email(ctx, org.DisplayName, org.Email, - user.Email, subj, body) - case api.AlarmType_ALARM_TYPE_UNSPECIFIED: - fallthrough - default: - err = ErrUnknownAlarm - } - cancel() - - alert := &api.Alert{ - OrgId: eOut.Device.OrgId, - UniqId: eOut.Device.UniqId, - AlarmId: a.Id, - UserId: user.Id, - TraceId: eOut.Point.TraceId, - } - - if err != nil { - alert.Status = api.AlertStatus_ERROR - alert.Error = err.Error() - metric.Incr("error", map[string]string{"func": "notify"}) - logger.Errorf("alertMessages ale.notify a, err: %+v, %v", a, - err.Error()) - } else { - alert.Status = api.AlertStatus_SENT - metric.Incr("sent", map[string]string{ - "type": a.Type.String(), - }) - logger.Debugf("alertMessages sent user, msg: %+v, %v", user, - subj+" - "+body) - } - - // Store alert. - ctx, cancel = context.WithTimeout(context.Background(), - 5*time.Second) - err = ale.aleDAO.Create(ctx, alert) - cancel() - if err != nil { - metric.Incr("error", map[string]string{"func": "create"}) - logger.Errorf("alertMessages ale.aleDAO.Create: %v", err) - - continue - } - - metric.Incr("created", nil) - logger.Debugf("alertMessages created: %+v", alert) - } + return + } + + body, err := template.Generate(eOut.Point, eOut.Rule, eOut.Device, + a.BodyTemplate) + if err != nil { + metric.Incr("error", map[string]string{"func": "genbody"}) + logger.Errorf("alertMessages body template.Generate: %v", err) + + return + } + + // Process alerts. + for _, user := range users { + // Check cache for existing repeat interval. + key := repeatKey(eOut.Device.OrgId, eOut.Device.Id, a.Id, user.Id) + cCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + ok, err := ale.cache.SetIfNotExistTTL(cCtx, key, 1, + time.Duration(a.RepeatInterval)*time.Minute) + cancel() + if err != nil { + metric.Incr("error", map[string]string{"func": "setifnotexist"}) + logger.Errorf("alertMessages ale.cache.SetIfNotExistTTL: %v", err) + + continue } + if !ok { + metric.Incr("repeat", nil) - msg.Ack() - metric.Incr("processed", nil) + continue + } - processCount++ - if processCount%100 == 0 { - alog.Infof("alertMessages processed %v messages", processCount) + // Send alert. + nCtx, cancel := context.WithTimeout(ctx, time.Minute) + switch a.Type { + case api.AlarmType_APP: + err = ale.notify.App(nCtx, user.AppKey, subj, body) + case api.AlarmType_SMS: + err = ale.notify.SMS(nCtx, user.Phone, subj, body) + case api.AlarmType_EMAIL: + err = ale.notify.Email(nCtx, org.DisplayName, org.Email, user.Email, + subj, body) + case api.AlarmType_ALARM_TYPE_UNSPECIFIED: + fallthrough + default: + err = ErrUnknownAlarm + } + cancel() + + alert := &api.Alert{ + OrgId: eOut.Device.OrgId, + UniqId: eOut.Device.UniqId, + AlarmId: a.Id, + UserId: user.Id, + TraceId: eOut.Point.TraceId, + } + + if err != nil { + alert.Status = api.AlertStatus_ERROR + alert.Error = err.Error() + metric.Incr("error", map[string]string{"func": "notify"}) + logger.Errorf("alertMessages ale.notify a, err: %+v, %v", a, + err.Error()) + } else { + alert.Status = api.AlertStatus_SENT + metric.Incr("sent", map[string]string{ + "type": a.Type.String(), + }) + logger.Debugf("alertMessages sent user, msg: %+v, %v", user, + subj+" - "+body) + } + + // Store alert. + dCtx, cancel = context.WithTimeout(ctx, 5*time.Second) + err = ale.aleDAO.Create(dCtx, alert) + cancel() + if err != nil { + metric.Incr("error", map[string]string{"func": "create"}) + logger.Errorf("alertMessages ale.aleDAO.Create: %v", err) + + continue } + + metric.Incr("created", nil) + logger.Debugf("alertMessages created: %+v", alert) } }