Skip to content

Commit

Permalink
Convert all remaining metrics to cloudwatch
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Dec 13, 2024
1 parent 8403f1b commit d6bc88d
Show file tree
Hide file tree
Showing 8 changed files with 156 additions and 126 deletions.
2 changes: 1 addition & 1 deletion cmd/mailroom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (

_ "github.com/nyaruka/mailroom/core/handlers"
_ "github.com/nyaruka/mailroom/core/hooks"
_ "github.com/nyaruka/mailroom/core/tasks/analytics"
_ "github.com/nyaruka/mailroom/core/tasks/campaigns"
_ "github.com/nyaruka/mailroom/core/tasks/contacts"
_ "github.com/nyaruka/mailroom/core/tasks/expirations"
Expand All @@ -28,6 +27,7 @@ import (
_ "github.com/nyaruka/mailroom/core/tasks/incidents"
_ "github.com/nyaruka/mailroom/core/tasks/interrupts"
_ "github.com/nyaruka/mailroom/core/tasks/ivr"
_ "github.com/nyaruka/mailroom/core/tasks/metrics"
_ "github.com/nyaruka/mailroom/core/tasks/msgs"
_ "github.com/nyaruka/mailroom/core/tasks/schedules"
_ "github.com/nyaruka/mailroom/core/tasks/starts"
Expand Down
89 changes: 0 additions & 89 deletions core/tasks/analytics/cron.go

This file was deleted.

7 changes: 0 additions & 7 deletions core/tasks/campaigns/fire_campaign_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/goflow/assets"
"github.com/nyaruka/goflow/flows"
"github.com/nyaruka/goflow/flows/triggers"
Expand Down Expand Up @@ -118,8 +117,6 @@ func (t *FireCampaignEventTask) Perform(ctx context.Context, rt *runtime.Runtime

// FireCampaignEvents tries to handle the given event fires, returning those that were handled (i.e. skipped, fired or deleted)
func FireCampaignEvents(ctx context.Context, rt *runtime.Runtime, oa *models.OrgAssets, fires []*models.EventFire, flowUUID assets.FlowUUID, campaign *triggers.CampaignReference, eventUUID triggers.CampaignEventUUID) ([]*models.EventFire, error) {
start := time.Now()

// get the capmaign event object
dbEvent := oa.CampaignEventByID(fires[0].EventID)
if dbEvent == nil {
Expand Down Expand Up @@ -232,9 +229,5 @@ func FireCampaignEvents(ctx context.Context, rt *runtime.Runtime, oa *models.Org
slog.Error("error starting flow for campaign event", "error", err, "event", eventUUID)
}

// log both our total and average
analytics.Gauge("mr.campaign_event_elapsed", float64(time.Since(start))/float64(time.Second))
analytics.Gauge("mr.campaign_event_count", float64(len(handled)))

return handled, nil
}
4 changes: 2 additions & 2 deletions core/tasks/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type Cron interface {
Run(context.Context, *runtime.Runtime) (map[string]any, error)

// AllInstances returns whether cron runs on all instances - i.e. locking is instance specific. This is for crons
// like analytics which report instance specific stats. Other crons are synchronized across all instances.
// like metrics which report instance specific stats. Other crons are synchronized across all instances.
AllInstances() bool
}

Expand Down Expand Up @@ -69,7 +69,7 @@ func recordCronExecution(name string, r func(context.Context, *runtime.Runtime)
elapsedSeconds := elapsed.Seconds()

rt.CW.Queue(types.MetricDatum{
MetricName: aws.String("CronTime"),
MetricName: aws.String("CronTaskDuration"),
Dimensions: []types.Dimension{{Name: aws.String("TaskName"), Value: aws.String(name)}},
Value: aws.Float64(elapsedSeconds),
Unit: types.StandardUnitSeconds,
Expand Down
18 changes: 12 additions & 6 deletions core/tasks/handler/handle_contact_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import (
"log/slog"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/gomodule/redigo/redis"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/mailroom/core/models"
Expand Down Expand Up @@ -98,11 +99,16 @@ func (t *HandleContactEventTask) Perform(ctx context.Context, rt *runtime.Runtim

err = performHandlerTask(ctx, rt, oa, t.ContactID, ctask)

// log our processing time to librato
analytics.Gauge(fmt.Sprintf("mr.%s_elapsed", taskPayload.Type), float64(time.Since(start))/float64(time.Second))

// and total latency for this task since it was queued
analytics.Gauge(fmt.Sprintf("mr.%s_latency", taskPayload.Type), float64(time.Since(taskPayload.QueuedOn))/float64(time.Second))
// send metrics for processing time and lag from queue time
rt.CW.Queue(types.MetricDatum{
MetricName: aws.String("HandlerTaskDuration"),
Value: aws.Float64(float64(time.Since(start)) / float64(time.Second)),
Unit: types.StandardUnitSeconds,
}, types.MetricDatum{
MetricName: aws.String("HandlerTaskLatency"),
Value: aws.Float64(float64(time.Since(taskPayload.QueuedOn)) / float64(time.Second)),
Unit: types.StandardUnitSeconds,
})

// if we get an error processing an event, requeue it for later and return our error
if err != nil {
Expand Down
132 changes: 132 additions & 0 deletions core/tasks/metrics/cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package analytics

import (
"context"
"log/slog"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/nyaruka/mailroom/core/tasks"
"github.com/nyaruka/mailroom/runtime"
)

func init() {
tasks.RegisterCron("metrics", &metricsCron{})
}

// calculates a bunch of stats every minute and both logs them and sends them to cloudwatch
type metricsCron struct {
// both sqlx and redis provide wait stats which are cummulative that we need to make into increments
dbWaitDuration time.Duration
redisWaitDuration time.Duration
}

func (c *metricsCron) Next(last time.Time) time.Time {
return tasks.CronNext(last, time.Minute)
}

func (c *metricsCron) AllInstances() bool {
return true
}

func (c *metricsCron) Run(ctx context.Context, rt *runtime.Runtime) (map[string]any, error) {
// TODO replace with offset passed to tasks.CronNext
// We wait 15 seconds since we fire at the top of the minute, the same as expirations.
// That way any metrics related to the size of our queue are a bit more accurate (all expirations can
// usually be handled in 15 seconds). Something more complicated would take into account the age of
// the items in our queues.
time.Sleep(time.Second * 15)

handlerSize, batchSize, throttledSize := getQueueSizes(rt)

// get our DB and redis stats
dbStats := rt.DB.Stats()
redisStats := rt.RP.Stats()

dbWaitDurationInPeriod := dbStats.WaitDuration - c.dbWaitDuration
redisWaitDurationInPeriod := redisStats.WaitDuration - c.redisWaitDuration

c.dbWaitDuration = dbStats.WaitDuration
c.redisWaitDuration = redisStats.WaitDuration

dims := []types.Dimension{
{Name: aws.String("Host"), Value: aws.String(rt.Config.InstanceID)},
{Name: aws.String("App"), Value: aws.String("mailroom")},
}

rt.CW.Queue(types.MetricDatum{
MetricName: aws.String("DBConnectionsInUse"),
Dimensions: dims,
Value: aws.Float64(float64(dbStats.InUse)),
Unit: types.StandardUnitCount,
}, types.MetricDatum{
MetricName: aws.String("DBConnectionWaitDuration"),
Dimensions: dims,
Value: aws.Float64(float64(dbWaitDurationInPeriod / time.Second)),
Unit: types.StandardUnitSeconds,
}, types.MetricDatum{
MetricName: aws.String("RedisConnectionsInUse"),
Dimensions: dims,
Value: aws.Float64(float64(redisStats.ActiveCount)),
Unit: types.StandardUnitCount,
}, types.MetricDatum{
MetricName: aws.String("RedisConnectionsWaitDuration"),
Dimensions: dims,
Value: aws.Float64(float64(redisWaitDurationInPeriod / time.Second)),
Unit: types.StandardUnitSeconds,
})

rt.CW.Queue(types.MetricDatum{
MetricName: aws.String("QueuedTasks"),
Dimensions: []types.Dimension{
{Name: aws.String("QueueName"), Value: aws.String("handler")},
},
Value: aws.Float64(float64(handlerSize)),
Unit: types.StandardUnitCount,
}, types.MetricDatum{
MetricName: aws.String("QueuedTasks"),
Dimensions: []types.Dimension{
{Name: aws.String("QueueName"), Value: aws.String("batch")},
},
Value: aws.Float64(float64(batchSize)),
Unit: types.StandardUnitCount,
}, types.MetricDatum{
MetricName: aws.String("QueuedTasks"),
Dimensions: []types.Dimension{
{Name: aws.String("QueueName"), Value: aws.String("throttled")},
},
Value: aws.Float64(float64(throttledSize)),
Unit: types.StandardUnitCount,
})

return map[string]any{
"db_inuse": dbStats.InUse,
"db_wait": dbWaitDurationInPeriod,
"redis_inuse": redisStats.ActiveCount,
"redis_wait": redisWaitDurationInPeriod,
"handler_size": handlerSize,
"batch_size": batchSize,
"throttled_size": throttledSize,
}, nil
}

func getQueueSizes(rt *runtime.Runtime) (int, int, int) {
rc := rt.RP.Get()
defer rc.Close()

handler, err := tasks.HandlerQueue.Size(rc)
if err != nil {
slog.Error("error calculating handler queue size", "error", err)
}
batch, err := tasks.BatchQueue.Size(rc)
if err != nil {
slog.Error("error calculating batch queue size", "error", err)
}
throttled, err := tasks.ThrottledQueue.Size(rc)
if err != nil {
slog.Error("error calculating throttled queue size", "error", err)
}

return handler, batch, throttled
}
9 changes: 0 additions & 9 deletions mailroom.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/appleboy/go-fcm"
"github.com/elastic/go-elasticsearch/v8"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/aws/dynamo"
"github.com/nyaruka/gocommon/aws/s3x"
Expand Down Expand Up @@ -133,13 +132,6 @@ func (mr *Mailroom) Start() error {
log.Info("elastic ok")
}

// if we have a librato token, configure it
if c.LibratoToken != "" {
analytics.RegisterBackend(analytics.NewLibrato(c.LibratoUsername, c.LibratoToken, c.InstanceID, time.Second, mr.wg))
}

analytics.Start()

// configure and start cloudwatch
mr.rt.CW, err = cwatch.NewService(c.AWSAccessKeyID, c.AWSSecretAccessKey, c.AWSRegion, c.CloudwatchNamespace, c.DeploymentID)
if err != nil {
Expand Down Expand Up @@ -176,7 +168,6 @@ func (mr *Mailroom) Stop() error {
mr.throttledForeman.Stop()

mr.rt.CW.StopQueue()
analytics.Stop()

close(mr.quit)
mr.cancel()
Expand Down
21 changes: 9 additions & 12 deletions runtime/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,19 +69,16 @@ type Config struct {
S3SessionsBucket string `help:"S3 bucket to write flow sessions to"`
S3Minio bool `help:"S3 is actually Minio or other compatible service"`

CourierAuthToken string `help:"the authentication token used for requests to Courier"`
LibratoUsername string `help:"the username that will be used to authenticate to Librato"`
LibratoToken string `help:"the token that will be used to authenticate to Librato"`

CloudwatchNamespace string `help:"the namespace to use for cloudwatch metrics"`
DeploymentID string `help:"the deployment identifier to use for metrics"`
InstanceID string `help:"the instance identifier to use for metrics"`

CourierAuthToken string `help:"the authentication token used for requests to Courier"`
AndroidCredentialsFile string `help:"path to JSON file with FCM service account credentials used to sync Android relayers"`

InstanceID string `help:"the unique identifier of this instance, defaults to hostname"`
LogLevel slog.Level `help:"the logging level courier should use"`
UUIDSeed int `help:"seed to use for UUID generation in a testing environment"`
Version string `help:"the version of this mailroom install"`
LogLevel slog.Level `help:"the logging level courier should use"`
UUIDSeed int `help:"seed to use for UUID generation in a testing environment"`
Version string `help:"the version of this mailroom install"`
}

// NewDefaultConfig returns a new default configuration object
Expand Down Expand Up @@ -133,11 +130,11 @@ func NewDefaultConfig() *Config {

CloudwatchNamespace: "Temba",
DeploymentID: "dev",
InstanceID: hostname,

InstanceID: hostname,
LogLevel: slog.LevelWarn,
UUIDSeed: 0,
Version: "Dev",
LogLevel: slog.LevelWarn,
UUIDSeed: 0,
Version: "Dev",
}
}

Expand Down

0 comments on commit d6bc88d

Please sign in to comment.