Skip to content

Commit

Permalink
Simplify metrics reporting
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Dec 17, 2024
1 parent a9ed817 commit 82a458b
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 47 deletions.
32 changes: 11 additions & 21 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ type stats struct {
// both sqlx and redis provide wait stats which are cummulative that we need to convert into increments by
// tracking their previous values
dbWaitDuration time.Duration
dbWaitCount int64
redisWaitDuration time.Duration
redisWaitCount int64
}

type backend struct {
Expand Down Expand Up @@ -777,41 +775,33 @@ func (b *backend) Heartbeat() error {
redisStats := b.rp.Stats()

dbWaitDurationInPeriod := dbStats.WaitDuration - b.stats.dbWaitDuration
dbWaitCountInPeriod := dbStats.WaitCount - b.stats.dbWaitCount
redisWaitDurationInPeriod := redisStats.WaitDuration - b.stats.redisWaitDuration
redisWaitCountInPeriod := redisStats.WaitCount - b.stats.redisWaitCount

b.stats.dbWaitDuration = dbStats.WaitDuration
b.stats.dbWaitCount = dbStats.WaitCount
b.stats.redisWaitDuration = redisStats.WaitDuration
b.stats.redisWaitCount = redisStats.WaitCount

hostDim := cwatch.Dimension("Host", b.config.InstanceID)
appDim := cwatch.Dimension("App", "courier")

b.CloudWatch().Queue(
cwatch.Datum("DBConnectionsInUse", float64(dbStats.InUse), cwtypes.StandardUnitCount, hostDim, appDim),
cwatch.Datum("DBConnectionWaitDuration", float64(dbWaitDurationInPeriod/time.Millisecond), cwtypes.StandardUnitMilliseconds, hostDim, appDim),
cwatch.Datum("RedisConnectionsInUse", float64(redisStats.ActiveCount), cwtypes.StandardUnitCount, hostDim, appDim),
cwatch.Datum("RedisConnectionsWaitDuration", float64(redisWaitDurationInPeriod/time.Millisecond), cwtypes.StandardUnitMilliseconds, hostDim, appDim),
cwatch.Datum("DBConnectionsInUse", float64(dbStats.InUse), cwtypes.StandardUnitCount, hostDim),
cwatch.Datum("DBConnectionWaitDuration", float64(dbWaitDurationInPeriod/time.Millisecond), cwtypes.StandardUnitMilliseconds, hostDim),
cwatch.Datum("RedisConnectionsInUse", float64(redisStats.ActiveCount), cwtypes.StandardUnitCount, hostDim),
cwatch.Datum("RedisConnectionsWaitDuration", float64(redisWaitDurationInPeriod/time.Millisecond), cwtypes.StandardUnitMilliseconds, hostDim),
)

b.CloudWatch().Queue(
cwatch.Datum("QueuedMsgs", float64(bulkSize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "bulk")),
cwatch.Datum("QueuedMsgs", float64(prioritySize), cwtypes.StandardUnitCount, cwatch.Dimension("QueueName", "priority")),
)

slog.Info("current metrics", "db_busy", dbStats.InUse,
"db_idle", dbStats.Idle,
"db_wait_time", dbWaitDurationInPeriod,
"db_wait_count", dbWaitCountInPeriod,
"redis_active", redisStats.ActiveCount,
"redis_idle", redisStats.IdleCount,
"redis_wait_time", redisWaitDurationInPeriod,
"redis_wait_count", redisWaitCountInPeriod,
slog.Info("current metrics",
"db_inuse", dbStats.InUse,
"db_wait", dbWaitDurationInPeriod,
"redis_inuse", redisStats.ActiveCount,
"redis_wait", redisWaitDurationInPeriod,
"priority_size", prioritySize,
"bulk_size", bulkSize)

"bulk_size", bulkSize,
)
return nil
}

Expand Down
1 change: 0 additions & 1 deletion backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,6 @@ func (ts *BackendTestSuite) TestHealth() {
}

func (ts *BackendTestSuite) TestHeartbeat() {
// TODO make metrics abstraction layer so we can test what we report
ts.NoError(ts.b.Heartbeat())
}

Expand Down
11 changes: 2 additions & 9 deletions backends/rapidpro/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,15 +218,8 @@ func contactForURN(ctx context.Context, b *backend, org OrgID, channel *Channel,
// store this URN on our contact
contact.URNID_ = contactURN.ID

// log that we created a new contact to librato
b.cw.Queue(
cwatch.Datum(
"NewContact",
float64(1),
cwtypes.StandardUnitCount,
cwatch.Dimension("ChannelType", string(channel.ChannelType())),
),
)
// report that we created a new contact
b.cw.Queue(cwatch.Datum("ContactCreated", float64(1), cwtypes.StandardUnitCount))

// and return it
return contact, nil
Expand Down
23 changes: 7 additions & 16 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,39 +306,30 @@ func (s *server) channelHandleWrapper(handler ChannelHandler, handlerFunc Channe
}

if channel != nil {
// if we have a channel but no events were created, we still log this to metrics
cw := s.Backend().CloudWatch()
channelTypeDim := cwatch.Dimension("ChannelType", string(channel.ChannelType()))

// if we have a channel but no events were created, we still log this to metrics
if len(events) == 0 {
if hErr != nil {
s.Backend().CloudWatch().Queue(
cwatch.Datum("ChannelError", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim),
)
cw.Queue(cwatch.Datum("ChannelError", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim))
} else {
s.Backend().CloudWatch().Queue(
cwatch.Datum("ChannelIgnored", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim),
)
cw.Queue(cwatch.Datum("ChannelIgnored", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim))
}
}

for _, event := range events {
switch e := event.(type) {
case MsgIn:
clog.SetAttached(true)
s.Backend().CloudWatch().Queue(
cwatch.Datum("MsgReceive", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim),
)
cw.Queue(cwatch.Datum("MsgReceive", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim))
LogMsgReceived(r, e)
case StatusUpdate:
clog.SetAttached(true)
s.Backend().CloudWatch().Queue(
cwatch.Datum("MsgStatus", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim),
)
cw.Queue(cwatch.Datum("MsgStatus", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim))
LogMsgStatusReceived(r, e)
case ChannelEvent:
s.Backend().CloudWatch().Queue(
cwatch.Datum("EventReceive", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim),
)
cw.Queue(cwatch.Datum("EventReceive", float64(secondDuration), types.StandardUnitSeconds, channelTypeDim))
LogChannelEventReceived(r, e)
}
}
Expand Down

0 comments on commit 82a458b

Please sign in to comment.