Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use same approach to metrics as courier #387

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cmd/mailroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
_ "github.com/nyaruka/mailroom/core/tasks/incidents"
_ "github.com/nyaruka/mailroom/core/tasks/interrupts"
_ "github.com/nyaruka/mailroom/core/tasks/ivr"
_ "github.com/nyaruka/mailroom/core/tasks/metrics"
_ "github.com/nyaruka/mailroom/core/tasks/msgs"
_ "github.com/nyaruka/mailroom/core/tasks/schedules"
_ "github.com/nyaruka/mailroom/core/tasks/starts"
Expand Down
6 changes: 2 additions & 4 deletions core/tasks/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/mailroom/runtime"
"github.com/nyaruka/mailroom/utils/crons"
Expand Down Expand Up @@ -68,7 +66,7 @@ func recordCronExecution(name string, r func(context.Context, *runtime.Runtime)
elapsed := time.Since(started)
elapsedSeconds := elapsed.Seconds()

rt.CW.Queue(cwatch.Datum("CronTaskDuration", elapsedSeconds, types.StandardUnitSeconds, cwatch.Dimension("TaskName", name)))
rt.Stats.RecordCronTask(name, elapsed)

rc := rt.RP.Get()
defer rc.Close()
Expand All @@ -90,7 +88,7 @@ func recordCronExecution(name string, r func(context.Context, *runtime.Runtime)
for k, v := range results {
logResults = append(logResults, k, v)
}
log = log.With("elapsed", elapsedSeconds, slog.Group("results", logResults...))
log = log.With("elapsed", elapsed, slog.Group("results", logResults...))

// if cron too longer than a minute, log as error
if elapsed > time.Minute {
Expand Down
9 changes: 2 additions & 7 deletions core/tasks/handler/handle_contact_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ import (
"log/slog"
"time"

"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/mailroom/core/models"
Expand Down Expand Up @@ -99,11 +97,8 @@ func (t *HandleContactEventTask) Perform(ctx context.Context, rt *runtime.Runtim

err = performHandlerTask(ctx, rt, oa, t.ContactID, ctask)

// send metrics for processing time and lag from queue time
rt.CW.Queue(
cwatch.Datum("HandlerTaskDuration", float64(time.Since(start))/float64(time.Second), types.StandardUnitSeconds),
cwatch.Datum("HandlerTaskLatency", float64(time.Since(taskPayload.QueuedOn))/float64(time.Second), types.StandardUnitSeconds),
)
// record metrics
rt.Stats.RecordHandlerTask(time.Since(start), time.Since(taskPayload.QueuedOn))

// if we get an error processing an event, requeue it for later and return our error
if err != nil {
Expand Down
93 changes: 0 additions & 93 deletions core/tasks/metrics/cron.go

This file was deleted.

96 changes: 91 additions & 5 deletions mailroom.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/appleboy/go-fcm"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/elastic/go-elasticsearch/v8"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/gocommon/aws/cwatch"
Expand All @@ -34,6 +35,11 @@ type Mailroom struct {
throttledForeman *Foreman

webserver *web.Server

// both sqlx and redis provide wait stats which are cummulative that we need to convert into increments by
// tracking their previous values
dbWaitDuration time.Duration
redisWaitDuration time.Duration
}

// NewMailroom creates and returns a new mailroom instance
Expand Down Expand Up @@ -140,8 +146,6 @@ func (mr *Mailroom) Start() error {
log.Info("cloudwatch ok")
}

mr.rt.CW.StartQueue(time.Second * 3)

// init our foremen and start it
mr.handlerForeman.Start()
mr.batchForeman.Start()
Expand All @@ -153,11 +157,76 @@ func (mr *Mailroom) Start() error {

tasks.StartCrons(mr.rt, mr.wg, mr.quit)

mr.startMetricsReporter(time.Minute)

log.Info("mailroom started", "domain", c.Domain)

return nil
}

func (mr *Mailroom) startMetricsReporter(interval time.Duration) {
mr.wg.Add(1)

report := func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
count, err := mr.reportMetrics(ctx)
cancel()
if err != nil {
slog.Error("error reporting metrics", "error", err)
} else {
slog.Info("sent metrics to cloudwatch", "count", count)
}
}

go func() {
defer func() {
slog.Info("metrics reporter exiting")
mr.wg.Done()
}()

for {
select {
case <-mr.quit:
report()
return
case <-time.After(interval): // TODO align to half minute marks for queue sizes?
report()
}
}
}()
}

func (mr *Mailroom) reportMetrics(ctx context.Context) (int, error) {
metrics := mr.rt.Stats.Extract().ToMetrics()

handlerSize, batchSize, throttledSize := getQueueSizes(mr.rt)

// calculate DB and redis stats
dbStats := mr.rt.DB.Stats()
redisStats := mr.rt.RP.Stats()
dbWaitDurationInPeriod := dbStats.WaitDuration - mr.dbWaitDuration
redisWaitDurationInPeriod := redisStats.WaitDuration - mr.redisWaitDuration
mr.dbWaitDuration = dbStats.WaitDuration
mr.redisWaitDuration = redisStats.WaitDuration

hostDim := cwatch.Dimension("Host", mr.rt.Config.InstanceID)
metrics = append(metrics,
cwatch.Datum("DBConnectionsInUse", float64(dbStats.InUse), types.StandardUnitCount, hostDim),
cwatch.Datum("DBConnectionWaitDuration", float64(dbWaitDurationInPeriod/time.Second), types.StandardUnitSeconds, hostDim),
cwatch.Datum("RedisConnectionsInUse", float64(redisStats.ActiveCount), types.StandardUnitCount, hostDim),
cwatch.Datum("RedisConnectionsWaitDuration", float64(redisWaitDurationInPeriod/time.Second), types.StandardUnitSeconds, hostDim),
cwatch.Datum("QueuedTasks", float64(handlerSize), types.StandardUnitCount, cwatch.Dimension("QueueName", "handler")),
cwatch.Datum("QueuedTasks", float64(batchSize), types.StandardUnitCount, cwatch.Dimension("QueueName", "batch")),
cwatch.Datum("QueuedTasks", float64(throttledSize), types.StandardUnitCount, cwatch.Dimension("QueueName", "throttled")),
)

if err := mr.rt.CW.Send(ctx, metrics...); err != nil {
return 0, fmt.Errorf("error sending metrics: %w", err)
}

return len(metrics), nil
}

// Stop stops the mailroom service
func (mr *Mailroom) Stop() error {
log := slog.With("comp", "mailroom")
Expand All @@ -174,9 +243,6 @@ func (mr *Mailroom) Stop() error {

mr.wg.Wait()

// now that all tasks are finished, stop services they depend on
mr.rt.CW.StopQueue()

log.Info("mailroom stopped")
return nil
}
Expand All @@ -199,3 +265,23 @@ func openAndCheckDBConnection(url string, maxOpenConns int) (*sql.DB, *sqlx.DB,

return db.DB, db, err
}

func getQueueSizes(rt *runtime.Runtime) (int, int, int) {
rc := rt.RP.Get()
defer rc.Close()

handler, err := tasks.HandlerQueue.Size(rc)
if err != nil {
slog.Error("error calculating handler queue size", "error", err)
}
batch, err := tasks.BatchQueue.Size(rc)
if err != nil {
slog.Error("error calculating batch queue size", "error", err)
}
throttled, err := tasks.ThrottledQueue.Size(rc)
if err != nil {
slog.Error("error calculating throttled queue size", "error", err)
}

return handler, batch, throttled
}
2 changes: 1 addition & 1 deletion runtime/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func NewDefaultConfig() *Config {
S3AttachmentsBucket: "temba-attachments",
S3SessionsBucket: "temba-sessions",

CloudwatchNamespace: "Temba",
CloudwatchNamespace: "Temba/Mailroom",
DeploymentID: "dev",
InstanceID: hostname,

Expand Down
1 change: 1 addition & 0 deletions runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type Runtime struct {
Dynamo *dynamo.Service
S3 *s3x.Service
ES *elasticsearch.TypedClient
Stats *StatsCollector
CW *cwatch.Service
FCM FCMClient
Config *Config
Expand Down
88 changes: 88 additions & 0 deletions runtime/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package runtime

import (
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/nyaruka/gocommon/aws/cwatch"
)

type Stats struct {
HandlerTaskCount int // number of contact tasks handled
HandlerTaskDuration time.Duration // total time spent handling contact tasks
HandlerTaskLatency time.Duration // total time spent queuing and handling contact tasks

CronTaskCount map[string]int // number of cron tasks run by type
CronTaskDuration map[string]time.Duration // total time spent running cron tasks
}

func newStats() *Stats {
return &Stats{
CronTaskCount: make(map[string]int),
CronTaskDuration: make(map[string]time.Duration),
}
}

func (s *Stats) ToMetrics() []types.MetricDatum {
metrics := make([]types.MetricDatum, 0, 20)

// convert handler task timings to averages
avgHandlerTaskDuration, avgHandlerTaskLatency := time.Duration(0), time.Duration(0)
if s.HandlerTaskCount > 0 {
avgHandlerTaskDuration = s.HandlerTaskDuration / time.Duration(s.HandlerTaskCount)
avgHandlerTaskLatency = s.HandlerTaskLatency / time.Duration(s.HandlerTaskCount)
}

metrics = append(metrics,
cwatch.Datum("HandlerTaskCount", float64(s.HandlerTaskCount), types.StandardUnitCount),
cwatch.Datum("HandlerTaskDuration", float64(avgHandlerTaskDuration/time.Second), types.StandardUnitCount),
cwatch.Datum("HandlerTaskLatency", float64(avgHandlerTaskLatency/time.Second), types.StandardUnitCount),
)

for name, count := range s.CronTaskCount {
avgTime := s.CronTaskDuration[name] / time.Duration(count)

metrics = append(metrics,
cwatch.Datum("CronTaskCount", float64(count), types.StandardUnitCount, cwatch.Dimension("TaskName", name)),
cwatch.Datum("CronTaskDuration", float64(avgTime/time.Second), types.StandardUnitSeconds, cwatch.Dimension("TaskName", name)),
)
}

return metrics
}

// StatsCollector provides threadsafe stats collection
type StatsCollector struct {
mutex sync.Mutex
stats *Stats
}

// NewStatsCollector creates a new stats collector
func NewStatsCollector() *StatsCollector {
return &StatsCollector{stats: newStats()}
}

func (c *StatsCollector) RecordHandlerTask(d, l time.Duration) {
c.mutex.Lock()
c.stats.HandlerTaskCount++
c.stats.HandlerTaskDuration += d
c.stats.HandlerTaskLatency += l
c.mutex.Unlock()
}

func (c *StatsCollector) RecordCronTask(name string, d time.Duration) {
c.mutex.Lock()
c.stats.CronTaskCount[name]++
c.stats.CronTaskDuration[name] += d
c.mutex.Unlock()
}

// Extract returns the stats for the period since the last call
func (c *StatsCollector) Extract() *Stats {
c.mutex.Lock()
defer c.mutex.Unlock()
s := c.stats
c.stats = newStats()
return s
}
Loading
Loading