From 497c53dbd14c8fccafabf297e4cf46a2a6f34bad Mon Sep 17 00:00:00 2001 From: Brian Conway Date: Tue, 12 Sep 2023 22:07:38 -0500 Subject: [PATCH] Refactor eventer service for legibility - Remove pkg/metric use of `init()`. - All test variations pass. --- build/deploy/atlas/docker-compose.yml | 16 +-- internal/atlas-eventer/eventer/event.go | 144 +++++++++++++----------- pkg/metric/default.go | 5 +- pkg/metric/init.go | 5 - pkg/metric/statsd_test.go | 26 ++--- 5 files changed, 100 insertions(+), 96 deletions(-) delete mode 100644 pkg/metric/init.go diff --git a/build/deploy/atlas/docker-compose.yml b/build/deploy/atlas/docker-compose.yml index 57157e75..d2846aad 100644 --- a/build/deploy/atlas/docker-compose.yml +++ b/build/deploy/atlas/docker-compose.yml @@ -1,7 +1,7 @@ version: "3" services: atlas-api: - image: ghcr.io/thingspect/atlas:2a02e705 + image: ghcr.io/thingspect/atlas:8ef215b5 command: atlas-api restart: on-failure ports: @@ -22,7 +22,7 @@ services: - API_LORA_DEV_PROF_ID=00000000-0000-0000-0000-000000000000 atlas-mqtt-ingestor: - image: ghcr.io/thingspect/atlas:2a02e705 + image: ghcr.io/thingspect/atlas:8ef215b5 command: atlas-mqtt-ingestor restart: on-failure depends_on: @@ -34,7 +34,7 @@ services: - MQTT_INGEST_NSQ_PUB_ADDR=nsqd:4150 atlas-lora-ingestor: - image: ghcr.io/thingspect/atlas:2a02e705 + image: ghcr.io/thingspect/atlas:8ef215b5 command: atlas-lora-ingestor restart: on-failure depends_on: @@ -47,7 +47,7 @@ services: - LORA_INGEST_NSQ_PUB_ADDR=nsqd:4150 atlas-decoder: - image: ghcr.io/thingspect/atlas:2a02e705 + image: ghcr.io/thingspect/atlas:8ef215b5 command: atlas-decoder restart: on-failure depends_on: @@ -60,7 +60,7 @@ services: - DECODER_NSQ_LOOKUP_ADDRS=nsqlookupd:4161 atlas-validator: - image: ghcr.io/thingspect/atlas:2a02e705 + image: ghcr.io/thingspect/atlas:8ef215b5 command: atlas-validator restart: on-failure depends_on: @@ -74,7 +74,7 @@ services: - VALIDATOR_NSQ_LOOKUP_ADDRS=nsqlookupd:4161 atlas-accumulator: - image: ghcr.io/thingspect/atlas:2a02e705 + image: ghcr.io/thingspect/atlas:8ef215b5 command: atlas-accumulator restart: on-failure environment: @@ -84,7 +84,7 @@ services: - ACCUMULATOR_NSQ_LOOKUP_ADDRS=nsqlookupd:4161 atlas-eventer: - image: ghcr.io/thingspect/atlas:2a02e705 + image: ghcr.io/thingspect/atlas:8ef215b5 command: atlas-eventer restart: on-failure depends_on: @@ -96,7 +96,7 @@ services: - EVENTER_NSQ_LOOKUP_ADDRS=nsqlookupd:4161 atlas-alerter: - image: ghcr.io/thingspect/atlas:2a02e705 + image: ghcr.io/thingspect/atlas:8ef215b5 command: atlas-alerter restart: on-failure environment: diff --git a/internal/atlas-eventer/eventer/event.go b/internal/atlas-eventer/eventer/event.go index 51f90ce1..42577281 100644 --- a/internal/atlas-eventer/eventer/event.go +++ b/internal/atlas-eventer/eventer/event.go @@ -21,6 +21,7 @@ import ( // and builds messages for publishing. func (ev *Eventer) eventMessages() { alog.Info("eventMessages starting processor") + ctx := context.Background() var processCount int for msg := range ev.vOutSub.C() { @@ -49,8 +50,8 @@ func (ev *Eventer) eventMessages() { // Retrieve rules. Only active rules with matching tags and attributes // will be returned. - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - rules, err := ev.ruleDAO.ListByTags(ctx, vOut.Device.OrgId, + dCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + rules, err := ev.ruleDAO.ListByTags(dCtx, vOut.Device.OrgId, vOut.Point.Attr, vOut.Device.Tags) cancel() if err != nil { @@ -64,71 +65,8 @@ func (ev *Eventer) eventMessages() { // Evaluate, event, and optionally publish EventerOut messages. for _, r := range rules { - res, err := rule.Eval(vOut.Point, r.Expr) - if err != nil { - metric.Incr("error", map[string]string{"func": "eval"}) - logger.Errorf("eventMessages rule.Eval: %v", err) - - continue - } - metric.Incr("evaluated", map[string]string{ - "result": strconv.FormatBool(res), - }) - - if res { - event := &api.Event{ - OrgId: vOut.Device.OrgId, - UniqId: vOut.Device.UniqId, - RuleId: r.Id, - CreatedAt: vOut.Point.Ts, - TraceId: vOut.Point.TraceId, - } - - ctx, cancel := context.WithTimeout(context.Background(), - 5*time.Second) - err := ev.evDAO.Create(ctx, event) - cancel() - // Use a duplicate event as a tombstone to protect against - // failure mid-loop and support fast-forward. Do not attempt to - // coordinate event success with publish failures. - if errors.Is(err, dao.ErrAlreadyExists) { - metric.Incr("duplicate", nil) - logger.Infof("eventMessages duplicate ev.evDAO.Create: "+ - "%v", err) - - continue - } - if err != nil { - metric.Incr("error", map[string]string{"func": "create"}) - logger.Errorf("eventMessages ev.evDAO.Create: %v", err) - - continue - } - - eOut := &message.EventerOut{ - Point: vOut.Point, - Device: vOut.Device, - Rule: r, - } - bEOut, err := proto.Marshal(eOut) - if err != nil { - metric.Incr("error", map[string]string{"func": "marshal"}) - logger.Errorf("eventMessages proto.Marshal: %v", err) - - continue - } - - if err = ev.evQueue.Publish(ev.eOutPubTopic, - bEOut); err != nil { - metric.Incr("error", map[string]string{"func": "publish"}) - logger.Errorf("eventMessages ev.evQueue.Publish: %v", err) - - continue - } - - metric.Incr("published", nil) - logger.Debugf("eventMessages published: %+v", eOut) - } + ev.evalRules(alog.NewContext(ctx, &alog.CtxLogger{Logger: logger}), + vOut, r) } msg.Ack() @@ -140,3 +78,75 @@ func (ev *Eventer) eventMessages() { } } } + +// evalRules evaluates rules, generates events, and optionally publishes +// EventerOut messages. +func (ev *Eventer) evalRules( + ctx context.Context, vOut *message.ValidatorOut, r *api.Rule, +) { + logger := alog.FromContext(ctx) + + res, err := rule.Eval(vOut.Point, r.Expr) + if err != nil { + metric.Incr("error", map[string]string{"func": "eval"}) + logger.Errorf("eventMessages rule.Eval: %v", err) + + return + } + metric.Incr("evaluated", map[string]string{ + "result": strconv.FormatBool(res), + }) + + if res { + event := &api.Event{ + OrgId: vOut.Device.OrgId, + UniqId: vOut.Device.UniqId, + RuleId: r.Id, + CreatedAt: vOut.Point.Ts, + TraceId: vOut.Point.TraceId, + } + + dCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + err := ev.evDAO.Create(dCtx, event) + cancel() + // Use a duplicate event as a tombstone to protect against failure + // mid-loop and support fast-forward. Do not attempt to coordinate event + // success with publish failures. + if errors.Is(err, dao.ErrAlreadyExists) { + metric.Incr("duplicate", nil) + logger.Infof("eventMessages duplicate ev.evDAO.Create: %v", err) + + return + } + if err != nil { + metric.Incr("error", map[string]string{"func": "create"}) + logger.Errorf("eventMessages ev.evDAO.Create: %v", err) + + return + } + + eOut := &message.EventerOut{ + Point: vOut.Point, + Device: vOut.Device, + Rule: r, + } + bEOut, err := proto.Marshal(eOut) + if err != nil { + metric.Incr("error", map[string]string{"func": "marshal"}) + logger.Errorf("eventMessages proto.Marshal: %v", err) + + return + } + + if err = ev.evQueue.Publish(ev.eOutPubTopic, + bEOut); err != nil { + metric.Incr("error", map[string]string{"func": "publish"}) + logger.Errorf("eventMessages ev.evQueue.Publish: %v", err) + + return + } + + metric.Incr("published", nil) + logger.Debugf("eventMessages published: %+v", eOut) + } +} diff --git a/pkg/metric/default.go b/pkg/metric/default.go index fb2f55da..e3eda4fe 100644 --- a/pkg/metric/default.go +++ b/pkg/metric/default.go @@ -7,7 +7,7 @@ import ( // Since metricer is global and may be replaced, locking is required. var ( - metricer Metricer + metricer Metricer = &noOpMetric{} metricerMu sync.Mutex ) @@ -23,9 +23,8 @@ func getDefault() Metricer { // setDefault sets a new default metricer. func setDefault(m Metricer) { metricerMu.Lock() - defer metricerMu.Unlock() - metricer = m + metricerMu.Unlock() } // Incr increments a statsd count metric by 1. diff --git a/pkg/metric/init.go b/pkg/metric/init.go deleted file mode 100644 index 680c221a..00000000 --- a/pkg/metric/init.go +++ /dev/null @@ -1,5 +0,0 @@ -package metric - -func init() { - setDefault(&noOpMetric{}) -} diff --git a/pkg/metric/statsd_test.go b/pkg/metric/statsd_test.go index ff1c660f..edfa0a5d 100644 --- a/pkg/metric/statsd_test.go +++ b/pkg/metric/statsd_test.go @@ -14,12 +14,12 @@ import ( func TestStatsD(t *testing.T) { t.Parallel() - metricer := statsD{ + metStats := statsD{ client: statsd.NewClient("127.0.0.1:8125", statsd.TagStyle(statsd.TagFormatGraphite), statsd.MetricPrefix("teststatsd.")), } - t.Logf("metricer: %#v", metricer) + t.Logf("metStats: %#v", metStats) for i := 0; i < 5; i++ { lTest := i @@ -27,12 +27,12 @@ func TestStatsD(t *testing.T) { t.Run(fmt.Sprintf("Can send %v", lTest), func(t *testing.T) { t.Parallel() - metricer.Incr(random.String(10), nil) - metricer.Count(random.String(10), random.Intn(99), + metStats.Incr(random.String(10), nil) + metStats.Count(random.String(10), random.Intn(99), map[string]string{random.String(10): random.String(10)}) - metricer.Set(random.String(10), random.Intn(99), + metStats.Set(random.String(10), random.Intn(99), map[string]string{random.String(10): random.String(10)}) - metricer.Timing(random.String(10), + metStats.Timing(random.String(10), time.Duration(random.Intn(99))*time.Millisecond, nil) }) } @@ -47,12 +47,12 @@ func TestSetStatsD(t *testing.T) { t.Run(fmt.Sprintf("Can send %v", lTest), func(t *testing.T) { t.Parallel() - metricer.Incr(random.String(10), nil) - metricer.Count(random.String(10), random.Intn(99), + Incr(random.String(10), nil) + Count(random.String(10), random.Intn(99), map[string]string{random.String(10): random.String(10)}) - metricer.Set(random.String(10), random.Intn(99), + Set(random.String(10), random.Intn(99), map[string]string{random.String(10): random.String(10)}) - metricer.Timing(random.String(10), + Timing(random.String(10), time.Duration(random.Intn(99))*time.Millisecond, nil) }) } @@ -69,10 +69,10 @@ func TestNewStatsDNoAddr(t *testing.T) { t.Run(fmt.Sprintf("Can send %v", lTest), func(t *testing.T) { t.Parallel() - metricer.Incr(random.String(10), nil) - metricer.Count(random.String(10), random.Intn(99), + Incr(random.String(10), nil) + Count(random.String(10), random.Intn(99), map[string]string{random.String(10): random.String(10)}) - metricer.Timing(random.String(10), + Timing(random.String(10), time.Duration(random.Intn(99))*time.Millisecond, nil) }) }