Skip to content

Commit

Permalink
Merge pull request #227 from thingspect/bconway_refactor_eventer
Browse files Browse the repository at this point in the history
Refactor eventer service for legibility
  • Loading branch information
bconway authored Sep 13, 2023
2 parents e2f6929 + 497c53d commit 1d5cafa
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 96 deletions.
16 changes: 8 additions & 8 deletions build/deploy/atlas/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
version: "3"
services:
atlas-api:
image: ghcr.io/thingspect/atlas:2a02e705
image: ghcr.io/thingspect/atlas:8ef215b5
command: atlas-api
restart: on-failure
ports:
Expand All @@ -22,7 +22,7 @@ services:
- API_LORA_DEV_PROF_ID=00000000-0000-0000-0000-000000000000

atlas-mqtt-ingestor:
image: ghcr.io/thingspect/atlas:2a02e705
image: ghcr.io/thingspect/atlas:8ef215b5
command: atlas-mqtt-ingestor
restart: on-failure
depends_on:
Expand All @@ -34,7 +34,7 @@ services:
- MQTT_INGEST_NSQ_PUB_ADDR=nsqd:4150

atlas-lora-ingestor:
image: ghcr.io/thingspect/atlas:2a02e705
image: ghcr.io/thingspect/atlas:8ef215b5
command: atlas-lora-ingestor
restart: on-failure
depends_on:
Expand All @@ -47,7 +47,7 @@ services:
- LORA_INGEST_NSQ_PUB_ADDR=nsqd:4150

atlas-decoder:
image: ghcr.io/thingspect/atlas:2a02e705
image: ghcr.io/thingspect/atlas:8ef215b5
command: atlas-decoder
restart: on-failure
depends_on:
Expand All @@ -60,7 +60,7 @@ services:
- DECODER_NSQ_LOOKUP_ADDRS=nsqlookupd:4161

atlas-validator:
image: ghcr.io/thingspect/atlas:2a02e705
image: ghcr.io/thingspect/atlas:8ef215b5
command: atlas-validator
restart: on-failure
depends_on:
Expand All @@ -74,7 +74,7 @@ services:
- VALIDATOR_NSQ_LOOKUP_ADDRS=nsqlookupd:4161

atlas-accumulator:
image: ghcr.io/thingspect/atlas:2a02e705
image: ghcr.io/thingspect/atlas:8ef215b5
command: atlas-accumulator
restart: on-failure
environment:
Expand All @@ -84,7 +84,7 @@ services:
- ACCUMULATOR_NSQ_LOOKUP_ADDRS=nsqlookupd:4161

atlas-eventer:
image: ghcr.io/thingspect/atlas:2a02e705
image: ghcr.io/thingspect/atlas:8ef215b5
command: atlas-eventer
restart: on-failure
depends_on:
Expand All @@ -96,7 +96,7 @@ services:
- EVENTER_NSQ_LOOKUP_ADDRS=nsqlookupd:4161

atlas-alerter:
image: ghcr.io/thingspect/atlas:2a02e705
image: ghcr.io/thingspect/atlas:8ef215b5
command: atlas-alerter
restart: on-failure
environment:
Expand Down
144 changes: 77 additions & 67 deletions internal/atlas-eventer/eventer/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
// and builds messages for publishing.
func (ev *Eventer) eventMessages() {
alog.Info("eventMessages starting processor")
ctx := context.Background()

var processCount int
for msg := range ev.vOutSub.C() {
Expand Down Expand Up @@ -49,8 +50,8 @@ func (ev *Eventer) eventMessages() {

// Retrieve rules. Only active rules with matching tags and attributes
// will be returned.
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
rules, err := ev.ruleDAO.ListByTags(ctx, vOut.Device.OrgId,
dCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
rules, err := ev.ruleDAO.ListByTags(dCtx, vOut.Device.OrgId,
vOut.Point.Attr, vOut.Device.Tags)
cancel()
if err != nil {
Expand All @@ -64,71 +65,8 @@ func (ev *Eventer) eventMessages() {

// Evaluate, event, and optionally publish EventerOut messages.
for _, r := range rules {
res, err := rule.Eval(vOut.Point, r.Expr)
if err != nil {
metric.Incr("error", map[string]string{"func": "eval"})
logger.Errorf("eventMessages rule.Eval: %v", err)

continue
}
metric.Incr("evaluated", map[string]string{
"result": strconv.FormatBool(res),
})

if res {
event := &api.Event{
OrgId: vOut.Device.OrgId,
UniqId: vOut.Device.UniqId,
RuleId: r.Id,
CreatedAt: vOut.Point.Ts,
TraceId: vOut.Point.TraceId,
}

ctx, cancel := context.WithTimeout(context.Background(),
5*time.Second)
err := ev.evDAO.Create(ctx, event)
cancel()
// Use a duplicate event as a tombstone to protect against
// failure mid-loop and support fast-forward. Do not attempt to
// coordinate event success with publish failures.
if errors.Is(err, dao.ErrAlreadyExists) {
metric.Incr("duplicate", nil)
logger.Infof("eventMessages duplicate ev.evDAO.Create: "+
"%v", err)

continue
}
if err != nil {
metric.Incr("error", map[string]string{"func": "create"})
logger.Errorf("eventMessages ev.evDAO.Create: %v", err)

continue
}

eOut := &message.EventerOut{
Point: vOut.Point,
Device: vOut.Device,
Rule: r,
}
bEOut, err := proto.Marshal(eOut)
if err != nil {
metric.Incr("error", map[string]string{"func": "marshal"})
logger.Errorf("eventMessages proto.Marshal: %v", err)

continue
}

if err = ev.evQueue.Publish(ev.eOutPubTopic,
bEOut); err != nil {
metric.Incr("error", map[string]string{"func": "publish"})
logger.Errorf("eventMessages ev.evQueue.Publish: %v", err)

continue
}

metric.Incr("published", nil)
logger.Debugf("eventMessages published: %+v", eOut)
}
ev.evalRules(alog.NewContext(ctx, &alog.CtxLogger{Logger: logger}),
vOut, r)
}

msg.Ack()
Expand All @@ -140,3 +78,75 @@ func (ev *Eventer) eventMessages() {
}
}
}

// evalRules evaluates rules, generates events, and optionally publishes
// EventerOut messages.
func (ev *Eventer) evalRules(
ctx context.Context, vOut *message.ValidatorOut, r *api.Rule,
) {
logger := alog.FromContext(ctx)

res, err := rule.Eval(vOut.Point, r.Expr)
if err != nil {
metric.Incr("error", map[string]string{"func": "eval"})
logger.Errorf("eventMessages rule.Eval: %v", err)

return
}
metric.Incr("evaluated", map[string]string{
"result": strconv.FormatBool(res),
})

if res {
event := &api.Event{
OrgId: vOut.Device.OrgId,
UniqId: vOut.Device.UniqId,
RuleId: r.Id,
CreatedAt: vOut.Point.Ts,
TraceId: vOut.Point.TraceId,
}

dCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
err := ev.evDAO.Create(dCtx, event)
cancel()
// Use a duplicate event as a tombstone to protect against failure
// mid-loop and support fast-forward. Do not attempt to coordinate event
// success with publish failures.
if errors.Is(err, dao.ErrAlreadyExists) {
metric.Incr("duplicate", nil)
logger.Infof("eventMessages duplicate ev.evDAO.Create: %v", err)

return
}
if err != nil {
metric.Incr("error", map[string]string{"func": "create"})
logger.Errorf("eventMessages ev.evDAO.Create: %v", err)

return
}

eOut := &message.EventerOut{
Point: vOut.Point,
Device: vOut.Device,
Rule: r,
}
bEOut, err := proto.Marshal(eOut)
if err != nil {
metric.Incr("error", map[string]string{"func": "marshal"})
logger.Errorf("eventMessages proto.Marshal: %v", err)

return
}

if err = ev.evQueue.Publish(ev.eOutPubTopic,
bEOut); err != nil {
metric.Incr("error", map[string]string{"func": "publish"})
logger.Errorf("eventMessages ev.evQueue.Publish: %v", err)

return
}

metric.Incr("published", nil)
logger.Debugf("eventMessages published: %+v", eOut)
}
}
5 changes: 2 additions & 3 deletions pkg/metric/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

// Since metricer is global and may be replaced, locking is required.
var (
metricer Metricer
metricer Metricer = &noOpMetric{}
metricerMu sync.Mutex
)

Expand All @@ -23,9 +23,8 @@ func getDefault() Metricer {
// setDefault sets a new default metricer.
func setDefault(m Metricer) {
metricerMu.Lock()
defer metricerMu.Unlock()

metricer = m
metricerMu.Unlock()
}

// Incr increments a statsd count metric by 1.
Expand Down
5 changes: 0 additions & 5 deletions pkg/metric/init.go

This file was deleted.

26 changes: 13 additions & 13 deletions pkg/metric/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,25 @@ import (
func TestStatsD(t *testing.T) {
t.Parallel()

metricer := statsD{
metStats := statsD{
client: statsd.NewClient("127.0.0.1:8125",
statsd.TagStyle(statsd.TagFormatGraphite),
statsd.MetricPrefix("teststatsd.")),
}
t.Logf("metricer: %#v", metricer)
t.Logf("metStats: %#v", metStats)

for i := 0; i < 5; i++ {
lTest := i

t.Run(fmt.Sprintf("Can send %v", lTest), func(t *testing.T) {
t.Parallel()

metricer.Incr(random.String(10), nil)
metricer.Count(random.String(10), random.Intn(99),
metStats.Incr(random.String(10), nil)
metStats.Count(random.String(10), random.Intn(99),
map[string]string{random.String(10): random.String(10)})
metricer.Set(random.String(10), random.Intn(99),
metStats.Set(random.String(10), random.Intn(99),
map[string]string{random.String(10): random.String(10)})
metricer.Timing(random.String(10),
metStats.Timing(random.String(10),
time.Duration(random.Intn(99))*time.Millisecond, nil)
})
}
Expand All @@ -47,12 +47,12 @@ func TestSetStatsD(t *testing.T) {
t.Run(fmt.Sprintf("Can send %v", lTest), func(t *testing.T) {
t.Parallel()

metricer.Incr(random.String(10), nil)
metricer.Count(random.String(10), random.Intn(99),
Incr(random.String(10), nil)
Count(random.String(10), random.Intn(99),
map[string]string{random.String(10): random.String(10)})
metricer.Set(random.String(10), random.Intn(99),
Set(random.String(10), random.Intn(99),
map[string]string{random.String(10): random.String(10)})
metricer.Timing(random.String(10),
Timing(random.String(10),
time.Duration(random.Intn(99))*time.Millisecond, nil)
})
}
Expand All @@ -69,10 +69,10 @@ func TestNewStatsDNoAddr(t *testing.T) {
t.Run(fmt.Sprintf("Can send %v", lTest), func(t *testing.T) {
t.Parallel()

metricer.Incr(random.String(10), nil)
metricer.Count(random.String(10), random.Intn(99),
Incr(random.String(10), nil)
Count(random.String(10), random.Intn(99),
map[string]string{random.String(10): random.String(10)})
metricer.Timing(random.String(10),
Timing(random.String(10),
time.Duration(random.Intn(99))*time.Millisecond, nil)
})
}
Expand Down

0 comments on commit 1d5cafa

Please sign in to comment.