Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor metrics so that everything is sent from Heartbeat in the backend #818

Merged
merged 2 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 6 additions & 9 deletions backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"strings"

"github.com/gomodule/redigo/redis"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/urns"
)
Expand Down Expand Up @@ -68,7 +67,7 @@ type Backend interface {
// WriteChannelLog writes the passed in channel log to our backend
WriteChannelLog(context.Context, *ChannelLog) error

// PopNextOutgoingMsg returns the next message that needs to be sent, callers should call MarkOutgoingMsgComplete with the
// PopNextOutgoingMsg returns the next message that needs to be sent, callers should call OnSendComplete with the
// returned message when they have dealt with the message (regardless of whether it was sent or not)
PopNextOutgoingMsg(context.Context) (MsgOut, error)

Expand All @@ -80,10 +79,11 @@ type Backend interface {
// a message is being forced in being resent by a user
ClearMsgSent(context.Context, MsgID) error

// MarkOutgoingMsgComplete marks the passed in message as having been processed. Note this should be called even in the case
// of errors during sending as it will manage the number of active workers per channel. The status parameter can be
// used to determine any sort of deduping of msg sends
MarkOutgoingMsgComplete(context.Context, MsgOut, StatusUpdate)
// OnSendComplete is called when the sender has finished trying to send a message
OnSendComplete(context.Context, MsgOut, StatusUpdate, *ChannelLog)

// OnReceiveComplete is called when the server has finished handling an incoming request
OnReceiveComplete(context.Context, Channel, []Event, *ChannelLog)

// SaveAttachment saves an attachment to backend storage
SaveAttachment(context.Context, Channel, string, []byte, string) (string, error)
Expand All @@ -106,9 +106,6 @@ type Backend interface {

// RedisPool returns the redisPool for this backend
RedisPool() *redis.Pool

// CloudWatch return the CloudWatch service for this backend
CloudWatch() *cwatch.Service
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cloudwatch is now completely internal to the RP backend.. which seems right

}

// Media is a resolved media object that can be used as a message attachment
Expand Down
76 changes: 35 additions & 41 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,6 @@
courier.RegisterBackend("rapidpro", newBackend)
}

type stats struct {
// both sqlx and redis provide wait stats which are cummulative that we need to convert into increments by
// tracking their previous values
dbWaitDuration time.Duration
redisWaitDuration time.Duration
}

type backend struct {
config *courier.Config

Expand Down Expand Up @@ -94,7 +87,12 @@
// tracking of external ids of messages we've sent in case we need one before its status update has been written
sentExternalIDs *redisx.IntervalHash

stats stats
stats *StatsCollector

// both sqlx and redis provide wait stats which are cummulative that we need to convert into increments by
// tracking their previous values
dbWaitDuration time.Duration
redisWaitDuration time.Duration
}

// NewBackend creates a new RapidPro backend
Expand Down Expand Up @@ -131,6 +129,8 @@
receivedExternalIDs: redisx.NewIntervalHash("seen-external-ids", time.Hour*24, 2), // 24 - 48 hours
sentIDs: redisx.NewIntervalSet("sent-ids", time.Hour, 2), // 1 - 2 hours
sentExternalIDs: redisx.NewIntervalHash("sent-external-ids", time.Hour, 2), // 1 - 2 hours

stats: NewStatsCollector(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to call Heartbeat when the backend stop or in backend cleanup so we are sure all the stats at that time are sent? it seems the stats in the last minute not yet send will be lost

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't worry about losing some metrics here and there.. but I do want to move the heartbeat stuff completely inside the backend in another pr and could think about sending final metrics when stopping courier.

}
}

Expand Down Expand Up @@ -194,7 +194,6 @@
if err != nil {
return err
}
b.cw.StartQueue(time.Second * 3)

// check attachment bucket access
if err := b.s3.Test(ctx, b.config.S3AttachmentsBucket); err != nil {
Expand Down Expand Up @@ -253,8 +252,6 @@
// wait for our threads to exit
b.waitGroup.Wait()

// stop cloudwatch service
b.cw.StopQueue()
return nil
}

Expand Down Expand Up @@ -464,8 +461,8 @@
return b.sentIDs.Rem(rc, id.String())
}

// MarkOutgoingMsgComplete marks the passed in message as having completed processing, freeing up a worker for that channel
func (b *backend) MarkOutgoingMsgComplete(ctx context.Context, msg courier.MsgOut, status courier.StatusUpdate) {
// OnSendComplete is called when the sender has finished trying to send a message
func (b *backend) OnSendComplete(ctx context.Context, msg courier.MsgOut, status courier.StatusUpdate, clog *courier.ChannelLog) {
rc := b.rp.Get()
defer rc.Close()

Expand All @@ -489,6 +486,13 @@
slog.Error("unable to update session timeout", "error", err, "session_id", dbMsg.SessionID_)
}
}

b.stats.RecordOutgoing(msg.Channel().ChannelType(), wasSuccess, clog.Elapsed)
}

// OnReceiveComplete is called when the server has finished handling an incoming request
func (b *backend) OnReceiveComplete(ctx context.Context, ch courier.Channel, events []courier.Event, clog *courier.ChannelLog) {
b.stats.RecordIncoming(ch.ChannelType(), events, clog.Elapsed)

Check warning on line 495 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L494-L495

Added lines #L494 - L495 were not covered by tests
}

// WriteMsg writes the passed in message to our store
Expand Down Expand Up @@ -737,11 +741,12 @@
return health.String()
}

// Heartbeat is called every minute, we log our queue depth to librato
func (b *backend) Heartbeat() error {
rc := b.rp.Get()
defer rc.Close()

metrics := b.stats.Extract().ToMetrics()

active, err := redis.Strings(rc.Do("ZRANGE", fmt.Sprintf("%s:active", msgQueueName), "0", "-1"))
if err != nil {
return fmt.Errorf("error getting active queues: %w", err)
Expand Down Expand Up @@ -770,38 +775,32 @@
bulkSize += count
}

// get our DB and redis stats
// calculate DB and redis pool metrics
dbStats := b.db.Stats()
redisStats := b.rp.Stats()

dbWaitDurationInPeriod := dbStats.WaitDuration - b.stats.dbWaitDuration
redisWaitDurationInPeriod := redisStats.WaitDuration - b.stats.redisWaitDuration

b.stats.dbWaitDuration = dbStats.WaitDuration
b.stats.redisWaitDuration = redisStats.WaitDuration
dbWaitDurationInPeriod := dbStats.WaitDuration - b.dbWaitDuration
redisWaitDurationInPeriod := redisStats.WaitDuration - b.redisWaitDuration
b.dbWaitDuration = dbStats.WaitDuration
b.redisWaitDuration = redisStats.WaitDuration

hostDim := cwatch.Dimension("Host", b.config.InstanceID)

b.CloudWatch().Queue(
metrics = append(metrics,
cwatch.Datum("DBConnectionsInUse", float64(dbStats.InUse), cwtypes.StandardUnitCount, hostDim),
cwatch.Datum("DBConnectionWaitDuration", float64(dbWaitDurationInPeriod/time.Millisecond), cwtypes.StandardUnitMilliseconds, hostDim),
cwatch.Datum("DBConnectionWaitDuration", float64(dbWaitDurationInPeriod/time.Second), cwtypes.StandardUnitSeconds, hostDim),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're using seconds everywhere

cwatch.Datum("RedisConnectionsInUse", float64(redisStats.ActiveCount), cwtypes.StandardUnitCount, hostDim),
cwatch.Datum("RedisConnectionsWaitDuration", float64(redisWaitDurationInPeriod/time.Millisecond), cwtypes.StandardUnitMilliseconds, hostDim),
)

b.CloudWatch().Queue(
cwatch.Datum("RedisConnectionsWaitDuration", float64(redisWaitDurationInPeriod/time.Second), cwtypes.StandardUnitSeconds, hostDim),
cwatch.Datum("QueuedMsgs", float64(bulkSize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "bulk")),
cwatch.Datum("QueuedMsgs", float64(prioritySize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "priority")),
)

slog.Info("current metrics",
"db_inuse", dbStats.InUse,
"db_wait", dbWaitDurationInPeriod,
"redis_inuse", redisStats.ActiveCount,
"redis_wait", redisWaitDurationInPeriod,
"priority_size", prioritySize,
"bulk_size", bulkSize,
)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
if err := b.cw.Send(ctx, metrics...); err != nil {
slog.Error("error sending metrics", "error", err)

Check warning on line 798 in backends/rapidpro/backend.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/backend.go#L798

Added line #L798 was not covered by tests
} else {
slog.Info("sent metrics to cloudwatch", "metrics", len(metrics))
}
cancel()

return nil
}

Expand Down Expand Up @@ -878,8 +877,3 @@
func (b *backend) RedisPool() *redis.Pool {
return b.rp
}

// CloudWatch return the cloudwatch service
func (b *backend) CloudWatch() *cwatch.Service {
return b.cw
}
2 changes: 1 addition & 1 deletion backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ func (ts *BackendTestSuite) TestOutgoingQueue() {
ts.Equal(msg.Text(), "test message")

// mark this message as dealt with
ts.b.MarkOutgoingMsgComplete(ctx, msg, ts.b.NewStatusUpdate(msg.Channel(), msg.ID(), courier.MsgStatusWired, clog))
ts.b.OnSendComplete(ctx, msg, ts.b.NewStatusUpdate(msg.Channel(), msg.ID(), courier.MsgStatusWired, clog), clog)

// this message should now be marked as sent
sent, err := ts.b.WasMsgSent(ctx, msg.ID())
Expand Down
6 changes: 1 addition & 5 deletions backends/rapidpro/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,8 @@ import (
"time"
"unicode/utf8"

cwtypes "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
"github.com/nyaruka/gocommon/aws/cwatch"
"github.com/nyaruka/gocommon/dbutil"
"github.com/nyaruka/gocommon/urns"
"github.com/nyaruka/gocommon/uuids"
Expand Down Expand Up @@ -218,9 +216,7 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,
// store this URN on our contact
contact.URNID_ = contactURN.ID

// report that we created a new contact
b.cw.Queue(cwatch.Datum("ContactCreated", float64(1), cwtypes.StandardUnitCount))
b.stats.RecordContactCreated()

// and return it
return contact, nil
}
139 changes: 139 additions & 0 deletions backends/rapidpro/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
package rapidpro

import (
"sync"
"time"

"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
"github.com/nyaruka/courier"
"github.com/nyaruka/gocommon/aws/cwatch"
)

type CountByType map[courier.ChannelType]int

// converts per channel counts into cloudwatch metrics with type as a dimension
func (c CountByType) metrics(name string) []types.MetricDatum {
m := make([]types.MetricDatum, 0, len(c))
for typ, count := range c {
m = append(m, cwatch.Datum(name, float64(count), types.StandardUnitCount, cwatch.Dimension("ChannelType", string(typ))))
}
return m
}

type DurationByType map[courier.ChannelType]time.Duration

type Stats struct {
IncomingRequests CountByType // number of handler requests
IncomingMessages CountByType // number of messages received
IncomingStatuses CountByType // number of status updates received
IncomingEvents CountByType // number of other events received
IncomingIgnored CountByType // number of requests ignored
IncomingDuration DurationByType // total time spent handling requests

OutgoingSends CountByType // number of sends that succeeded
OutgoingErrors CountByType // number of sends that errored
OutgoingDuration DurationByType // total time spent sending messages

ContactsCreated int
}

func newStats() *Stats {
return &Stats{
IncomingRequests: make(CountByType),
IncomingMessages: make(CountByType),
IncomingStatuses: make(CountByType),
IncomingEvents: make(CountByType),
IncomingIgnored: make(CountByType),
IncomingDuration: make(DurationByType),

OutgoingSends: make(CountByType),
OutgoingErrors: make(CountByType),
OutgoingDuration: make(DurationByType),

ContactsCreated: 0,
}
}

func (s *Stats) ToMetrics() []types.MetricDatum {
metrics := make([]types.MetricDatum, 0, 20)
metrics = append(metrics, s.IncomingRequests.metrics("IncomingRequests")...)
metrics = append(metrics, s.IncomingMessages.metrics("IncomingMessages")...)
metrics = append(metrics, s.IncomingStatuses.metrics("IncomingStatuses")...)
metrics = append(metrics, s.IncomingEvents.metrics("IncomingEvents")...)
metrics = append(metrics, s.IncomingIgnored.metrics("IncomingIgnored")...)

for typ, d := range s.IncomingDuration { // convert to averages
avgTime := float64(d) / float64(s.IncomingRequests[typ])
metrics = append(metrics, cwatch.Datum("IncomingDuration", float64(avgTime), types.StandardUnitCount, cwatch.Dimension("ChannelType", string(typ))))
}

metrics = append(metrics, s.OutgoingSends.metrics("OutgoingSends")...)
metrics = append(metrics, s.OutgoingErrors.metrics("OutgoingErrors")...)

for typ, d := range s.OutgoingDuration { // convert to averages
avgTime := float64(d) / float64(s.OutgoingSends[typ]+s.OutgoingErrors[typ])
metrics = append(metrics, cwatch.Datum("OutgoingDuration", avgTime, types.StandardUnitSeconds, cwatch.Dimension("ChannelType", string(typ))))
}

metrics = append(metrics, cwatch.Datum("ContactsCreated", float64(s.ContactsCreated), types.StandardUnitCount))
return metrics
}

// StatsCollector provides threadsafe stats collection
type StatsCollector struct {
mutex sync.Mutex
stats *Stats
}

// NewStatsCollector creates a new stats collector
func NewStatsCollector() *StatsCollector {
return &StatsCollector{stats: newStats()}
}

func (c *StatsCollector) RecordIncoming(typ courier.ChannelType, evts []courier.Event, d time.Duration) {
c.mutex.Lock()
c.stats.IncomingRequests[typ]++

for _, e := range evts {
switch e.(type) {
case courier.MsgIn:
c.stats.IncomingMessages[typ]++
case courier.StatusUpdate:
c.stats.IncomingStatuses[typ]++
case courier.ChannelEvent:
c.stats.IncomingEvents[typ]++

Check warning on line 104 in backends/rapidpro/stats.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/stats.go#L98-L104

Added lines #L98 - L104 were not covered by tests
}
}
if len(evts) == 0 {
c.stats.IncomingIgnored[typ]++
}

c.stats.IncomingDuration[typ] += d
c.mutex.Unlock()
}

func (c *StatsCollector) RecordOutgoing(typ courier.ChannelType, success bool, d time.Duration) {
c.mutex.Lock()
if success {
c.stats.OutgoingSends[typ]++
} else {
c.stats.OutgoingErrors[typ]++
}

Check warning on line 121 in backends/rapidpro/stats.go

View check run for this annotation

Codecov / codecov/patch

backends/rapidpro/stats.go#L120-L121

Added lines #L120 - L121 were not covered by tests
c.stats.OutgoingDuration[typ] += d
c.mutex.Unlock()
}

func (c *StatsCollector) RecordContactCreated() {
c.mutex.Lock()
c.stats.ContactsCreated++
c.mutex.Unlock()
}

// Extract returns the stats for the period since the last call
func (c *StatsCollector) Extract() *Stats {
c.mutex.Lock()
defer c.mutex.Unlock()
s := c.stats
c.stats = newStats()
return s
}
Loading
Loading