Skip to content

Commit

Permalink
Use same approach to metrics as courier - record events in stats, con…
Browse files Browse the repository at this point in the history
…vert to metrics and send every minute
  • Loading branch information
rowanseymour committed Dec 18, 2024
1 parent c3a1701 commit 13d6fa1
Show file tree
Hide file tree
Showing 7 changed files with 105 additions and 39 deletions.
6 changes: 2 additions & 4 deletions core/tasks/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/crons"
Expand Down Expand Up @@ -68,7 +66,7 @@ func recordCronExecution(name string, r func(context.Context, *runtime.Runtime)
elapsed := time.Since(started)
elapsedSeconds := elapsed.Seconds()

rt.CW.Queue(cwatch.Datum("CronTaskDuration", elapsedSeconds, types.StandardUnitSeconds, cwatch.Dimension("TaskName", name)))
rt.Stats.RecordCronTask(name, elapsed)

rc := rt.RP.Get()
defer rc.Close()
Expand All @@ -90,7 +88,7 @@ func recordCronExecution(name string, r func(context.Context, *runtime.Runtime)
for k, v := range results {
logResults = append(logResults, k, v)
}
log = log.With("elapsed", elapsedSeconds, slog.Group("results", logResults...))
log = log.With("elapsed", elapsed, slog.Group("results", logResults...))

// if cron too longer than a minute, log as error
if elapsed > time.Minute {
Expand Down
9 changes: 2 additions & 7 deletions core/tasks/handler/handle_contact_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ import (
"log/slog"
"time"

"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/mailroom/core/models"
Expand Down Expand Up @@ -99,11 +97,8 @@ func (t *HandleContactEventTask) Perform(ctx context.Context, rt *runtime.Runtim

err = performHandlerTask(ctx, rt, oa, t.ContactID, ctask)

// send metrics for processing time and lag from queue time
rt.CW.Queue(
cwatch.Datum("HandlerTaskDuration", float64(time.Since(start))/float64(time.Second), types.StandardUnitSeconds),
cwatch.Datum("HandlerTaskLatency", float64(time.Since(taskPayload.QueuedOn))/float64(time.Second), types.StandardUnitSeconds),
)
// record metrics
rt.Stats.RecordHandlerTask(time.Since(start), time.Since(taskPayload.QueuedOn))

// if we get an error processing an event, requeue it for later and return our error
if err != nil {
Expand Down
36 changes: 14 additions & 22 deletions core/tasks/metrics/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package analytics

import (
"context"
"fmt"
"log/slog"
"time"

Expand Down Expand Up @@ -33,43 +34,34 @@ func (c *metricsCron) AllInstances() bool {
}

func (c *metricsCron) Run(ctx context.Context, rt *runtime.Runtime) (map[string]any, error) {
metrics := rt.Stats.Extract().ToMetrics()

handlerSize, batchSize, throttledSize := getQueueSizes(rt)

// get our DB and redis stats
// calculate DB and redis stats
dbStats := rt.DB.Stats()
redisStats := rt.RP.Stats()

dbWaitDurationInPeriod := dbStats.WaitDuration - c.dbWaitDuration
redisWaitDurationInPeriod := redisStats.WaitDuration - c.redisWaitDuration

c.dbWaitDuration = dbStats.WaitDuration
c.redisWaitDuration = redisStats.WaitDuration

hostDim := cwatch.Dimension("Host", rt.Config.InstanceID)
appDim := cwatch.Dimension("App", "mailroom")

rt.CW.Queue(
cwatch.Datum("DBConnectionsInUse", float64(dbStats.InUse), types.StandardUnitCount, hostDim, appDim),
cwatch.Datum("DBConnectionWaitDuration", float64(dbWaitDurationInPeriod/time.Second), types.StandardUnitSeconds, hostDim, appDim),
cwatch.Datum("RedisConnectionsInUse", float64(redisStats.ActiveCount), types.StandardUnitCount, hostDim, appDim),
cwatch.Datum("RedisConnectionsWaitDuration", float64(redisWaitDurationInPeriod/time.Second), types.StandardUnitSeconds, hostDim, appDim),
)

rt.CW.Queue(
metrics = append(metrics,
cwatch.Datum("DBConnectionsInUse", float64(dbStats.InUse), types.StandardUnitCount, hostDim),
cwatch.Datum("DBConnectionWaitDuration", float64(dbWaitDurationInPeriod/time.Second), types.StandardUnitSeconds, hostDim),
cwatch.Datum("RedisConnectionsInUse", float64(redisStats.ActiveCount), types.StandardUnitCount, hostDim),
cwatch.Datum("RedisConnectionsWaitDuration", float64(redisWaitDurationInPeriod/time.Second), types.StandardUnitSeconds, hostDim),
cwatch.Datum("QueuedTasks", float64(handlerSize), types.StandardUnitCount, cwatch.Dimension("QueueName", "handler")),
cwatch.Datum("QueuedTasks", float64(batchSize), types.StandardUnitCount, cwatch.Dimension("QueueName", "batch")),
cwatch.Datum("QueuedTasks", float64(throttledSize), types.StandardUnitCount, cwatch.Dimension("QueueName", "throttled")),
)

return map[string]any{
"db_inuse": dbStats.InUse,
"db_wait": dbWaitDurationInPeriod,
"redis_inuse": redisStats.ActiveCount,
"redis_wait": redisWaitDurationInPeriod,
"handler_size": handlerSize,
"batch_size": batchSize,
"throttled_size": throttledSize,
}, nil
if err := rt.CW.Send(ctx, metrics...); err != nil {
return nil, fmt.Errorf("error sending metrics: %w", err)
}

return map[string]any{"metrics": len(metrics)}, nil
}

func getQueueSizes(rt *runtime.Runtime) (int, int, int) {
Expand Down
5 changes: 0 additions & 5 deletions mailroom.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,6 @@ func (mr *Mailroom) Start() error {
log.Info("cloudwatch ok")
}

mr.rt.CW.StartQueue(time.Second * 3)

// init our foremen and start it
mr.handlerForeman.Start()
mr.batchForeman.Start()
Expand Down Expand Up @@ -174,9 +172,6 @@ func (mr *Mailroom) Stop() error {

mr.wg.Wait()

// now that all tasks are finished, stop services they depend on
mr.rt.CW.StopQueue()

log.Info("mailroom stopped")
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion runtime/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func NewDefaultConfig() *Config {
S3AttachmentsBucket: "temba-attachments",
S3SessionsBucket: "temba-sessions",

CloudwatchNamespace: "Temba",
CloudwatchNamespace: "Temba/Mailroom",
DeploymentID: "dev",
InstanceID: hostname,

Expand Down
1 change: 1 addition & 0 deletions runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Runtime struct {
Dynamo *dynamo.Service
S3 *s3x.Service
ES *elasticsearch.TypedClient
Stats *StatsCollector
CW *cwatch.Service
FCM FCMClient
Config *Config
Expand Down
85 changes: 85 additions & 0 deletions runtime/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package runtime

import (
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/nyaruka/gocommon/aws/cwatch"
)

type Stats struct {
HandlerTaskCount int // number of contact tasks handled
HandlerTaskDuration time.Duration // total time spent handling contact tasks
HandlerTaskLatency time.Duration // total time spent queuing and handling contact tasks

CronTaskCount map[string]int // number of cron tasks run by type
CronTaskDuration map[string]time.Duration // total time spent running cron tasks
}

func newStats() *Stats {
return &Stats{}
}

func (s *Stats) ToMetrics() []types.MetricDatum {
metrics := make([]types.MetricDatum, 0, 20)

// convert handler task timings to averages
avgHandlerTaskDuration, avgHandlerTaskLatency := time.Duration(0), time.Duration(0)
if s.HandlerTaskCount > 0 {
avgHandlerTaskDuration = s.HandlerTaskDuration / time.Duration(s.HandlerTaskCount)
avgHandlerTaskLatency = s.HandlerTaskLatency / time.Duration(s.HandlerTaskCount)
}

metrics = append(metrics,
cwatch.Datum("HandlerTaskCount", float64(s.HandlerTaskCount), types.StandardUnitCount),
cwatch.Datum("HandlerTaskDuration", float64(avgHandlerTaskDuration/time.Second), types.StandardUnitCount),
cwatch.Datum("HandlerTaskLatency", float64(avgHandlerTaskLatency/time.Second), types.StandardUnitCount),
)

for name, count := range s.CronTaskCount {
avgTime := s.CronTaskDuration[name] / time.Duration(count)

metrics = append(metrics,
cwatch.Datum("CronTaskCount", float64(count), types.StandardUnitCount, cwatch.Dimension("TaskName", name)),
cwatch.Datum("CronTaskDuration", float64(avgTime/time.Second), types.StandardUnitSeconds, cwatch.Dimension("TaskName", name)),
)
}

return metrics
}

// StatsCollector provides threadsafe stats collection
type StatsCollector struct {
mutex sync.Mutex
stats *Stats
}

// NewStatsCollector creates a new stats collector
func NewStatsCollector() *StatsCollector {
return &StatsCollector{stats: newStats()}
}

func (c *StatsCollector) RecordHandlerTask(d, l time.Duration) {
c.mutex.Lock()
c.stats.HandlerTaskCount++
c.stats.HandlerTaskDuration += d
c.stats.HandlerTaskLatency += l
c.mutex.Unlock()
}

func (c *StatsCollector) RecordCronTask(name string, d time.Duration) {
c.mutex.Lock()
c.stats.CronTaskCount[name]++
c.stats.CronTaskDuration[name] += d
c.mutex.Unlock()
}

// Extract returns the stats for the period since the last call
func (c *StatsCollector) Extract() *Stats {
c.mutex.Lock()
defer c.mutex.Unlock()
s := c.stats
c.stats = newStats()
return s
}

0 comments on commit 13d6fa1

Please sign in to comment.