Skip to content

Commit

Permalink
chore: observability cleanup (#3706)
Browse files Browse the repository at this point in the history
Move metrics to correct packages. There is still some more work required
(e.g. pubsub, peer-to-peer metrics) but this improves things.
  • Loading branch information
stuartwdouglas authored Dec 11, 2024
1 parent 39de359 commit a51794a
Show file tree
Hide file tree
Showing 12 changed files with 142 additions and 155 deletions.
22 changes: 11 additions & 11 deletions backend/controller/observability/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type AsyncCallMetrics struct {
queueDepth metric.Int64Gauge
}

func initAsyncCallMetrics() (*AsyncCallMetrics, error) {
func initAsyncCallMetrics() *AsyncCallMetrics {
result := &AsyncCallMetrics{
created: noop.Int64Counter{},
acquired: noop.Int64Counter{},
Expand All @@ -53,40 +53,40 @@ func initAsyncCallMetrics() (*AsyncCallMetrics, error) {
signalName := fmt.Sprintf("%s.created", asyncCallMeterName)
if result.created, err = meter.Int64Counter(signalName, metric.WithUnit("1"),
metric.WithDescription("the number of times that an async call was created")); err != nil {
return nil, wrapErr(signalName, err)
observability.FatalError(signalName, err)
}

signalName = fmt.Sprintf("%s.acquired", asyncCallMeterName)
if result.acquired, err = meter.Int64Counter(signalName, metric.WithUnit("1"),
metric.WithDescription("the number of times that the controller tries acquiring an async call")); err != nil {
return nil, wrapErr(signalName, err)
observability.FatalError(signalName, err)
}

signalName = fmt.Sprintf("%s.executed", asyncCallMeterName)
if result.executed, err = meter.Int64Counter(signalName, metric.WithUnit("1"),
metric.WithDescription("the number of times that the controller tries executing an async call")); err != nil {
return nil, wrapErr(signalName, err)
observability.FatalError(signalName, err)
}

signalName = fmt.Sprintf("%s.completed", asyncCallMeterName)
if result.completed, err = meter.Int64Counter(signalName, metric.WithUnit("1"),
metric.WithDescription("the number of times that the controller tries completing an async call")); err != nil {
return nil, wrapErr(signalName, err)
observability.FatalError(signalName, err)
}

signalName = fmt.Sprintf("%s.ms_to_complete", asyncCallMeterName)
if result.msToComplete, err = meter.Int64Histogram(signalName, metric.WithUnit("ms"),
metric.WithDescription("duration in ms to complete an async call, from the earliest time it was scheduled to execute")); err != nil {
return nil, wrapErr(signalName, err)
observability.FatalError(signalName, err)
}

signalName = fmt.Sprintf("%s.queue_depth", asyncCallMeterName)
if result.queueDepth, err = meter.Int64Gauge(signalName, metric.WithUnit("1"),
metric.WithDescription("number of async calls queued up")); err != nil {
return nil, wrapErr(signalName, err)
observability.FatalError(signalName, err)
}

return result, nil
return result
}

func (m *AsyncCallMetrics) Created(ctx context.Context, verb schema.RefKey, catchVerb optional.Option[schema.RefKey], origin string, remainingAttempts int64, maybeErr error) {
Expand Down Expand Up @@ -125,7 +125,7 @@ func (m *AsyncCallMetrics) Executed(ctx context.Context, verb schema.RefKey, cat
}

func (m *AsyncCallMetrics) Completed(ctx context.Context, verb schema.RefKey, catchVerb optional.Option[schema.RefKey], origin string, scheduledAt time.Time, isCatching bool, queueDepth int64, maybeErr error) {
msToComplete := timeSinceMS(scheduledAt)
msToComplete := observability.TimeSinceMS(scheduledAt)

attrs := extractRefAttrs(verb, catchVerb, origin, isCatching)
attrs = append(attrs, observability.SuccessOrFailureStatusAttr(maybeErr == nil))
Expand Down Expand Up @@ -160,11 +160,11 @@ func RetrieveTraceContextFromContext(ctx context.Context) ([]byte, error) {
}

func extractAsyncCallAttrs(verb schema.RefKey, catchVerb optional.Option[schema.RefKey], origin string, scheduledAt time.Time, isCatching bool) []attribute.KeyValue {
return append(extractRefAttrs(verb, catchVerb, origin, isCatching), attribute.String(asyncCallTimeSinceScheduledAtBucketAttr, asyncLogBucket(timeSinceMS(scheduledAt))))
return append(extractRefAttrs(verb, catchVerb, origin, isCatching), attribute.String(asyncCallTimeSinceScheduledAtBucketAttr, asyncLogBucket(observability.TimeSinceMS(scheduledAt))))
}

func asyncLogBucket(msToComplete int64) string {
return logBucket(4, msToComplete, optional.Some(4), optional.Some(6))
return observability.LogBucket(4, msToComplete, optional.Some(4), optional.Some(6))
}

func extractRefAttrs(verb schema.RefKey, catchVerb optional.Option[schema.RefKey], origin string, isCatching bool) []attribute.KeyValue {
Expand Down
12 changes: 6 additions & 6 deletions backend/controller/observability/calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type CallMetrics struct {
callTracer trace.Tracer
}

func initCallMetrics() (*CallMetrics, error) {
func initCallMetrics() *CallMetrics {
provider := otel.GetTracerProvider()
result := &CallMetrics{
requests: noop.Int64Counter{},
Expand All @@ -44,16 +44,16 @@ func initCallMetrics() (*CallMetrics, error) {
signalName := fmt.Sprintf("%s.requests", callMeterName)
if result.requests, err = meter.Int64Counter(signalName, metric.WithUnit("1"),
metric.WithDescription("the number of times that the FTL controller receives a verb call request")); err != nil {
return nil, wrapErr(signalName, err)
observability.FatalError(signalName, err)
}

signalName = fmt.Sprintf("%s.ms_to_complete", callMeterName)
if result.msToComplete, err = meter.Int64Histogram(signalName, metric.WithUnit("ms"),
metric.WithDescription("duration in ms to complete a verb call")); err != nil {
return nil, wrapErr(signalName, err)
observability.FatalError(signalName, err)
}

return result, nil
return result
}

func (m *CallMetrics) BeginSpan(ctx context.Context, verb *schemapb.Ref) (context.Context, trace.Span) {
Expand All @@ -74,9 +74,9 @@ func (m *CallMetrics) Request(ctx context.Context, verb *schemapb.Ref, startTime
attrs = append(attrs, attribute.String(callFailureModeAttr, failureMode))
}

msToComplete := timeSinceMS(startTime)
msToComplete := observability.TimeSinceMS(startTime)
m.msToComplete.Record(ctx, msToComplete, metric.WithAttributes(attrs...))

attrs = append(attrs, attribute.String(callRunTimeBucketAttr, logBucket(4, msToComplete, optional.Some(3), optional.Some(7))))
attrs = append(attrs, attribute.String(callRunTimeBucketAttr, observability.LogBucket(4, msToComplete, optional.Some(3), optional.Some(7))))
m.requests.Add(ctx, 1, metric.WithAttributes(attrs...))
}
12 changes: 6 additions & 6 deletions backend/controller/observability/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type DeploymentMetrics struct {
replicasRemoved metric.Int64Counter
}

func initDeploymentMetrics() (*DeploymentMetrics, error) {
func initDeploymentMetrics() *DeploymentMetrics {
result := &DeploymentMetrics{
reconciliationFailures: noop.Int64Counter{},
reconciliationsActive: noop.Int64UpDownCounter{},
Expand All @@ -38,31 +38,31 @@ func initDeploymentMetrics() (*DeploymentMetrics, error) {
if result.reconciliationFailures, err = meter.Int64Counter(
signalName,
metric.WithDescription("the number of failed runner deployment reconciliation tasks")); err != nil {
return nil, wrapErr(signalName, err)
observability.FatalError(signalName, err)
}

signalName = fmt.Sprintf("%s.reconciliations.active", deploymentMeterName)
if result.reconciliationsActive, err = meter.Int64UpDownCounter(
signalName,
metric.WithDescription("the number of active deployment reconciliation tasks")); err != nil {
return nil, wrapErr(signalName, err)
observability.FatalError(signalName, err)
}

signalName = fmt.Sprintf("%s.replicas.added", deploymentMeterName)
if result.replicasAdded, err = meter.Int64Counter(
signalName,
metric.WithDescription("the number of runner replicas added by the deployment reconciliation tasks")); err != nil {
return nil, wrapErr(signalName, err)
observability.FatalError(signalName, err)
}

signalName = fmt.Sprintf("%s.replicas.removed", deploymentMeterName)
if result.replicasRemoved, err = meter.Int64Counter(
signalName,
metric.WithDescription("the number of runner replicas removed by the deployment reconciliation tasks")); err != nil {
return nil, wrapErr(signalName, err)
observability.FatalError(signalName, err)
}

return result, nil
return result
}

func (m *DeploymentMetrics) ReconciliationFailure(ctx context.Context, module string, key string) {
Expand Down
86 changes: 4 additions & 82 deletions backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
@@ -1,95 +1,17 @@
package observability

import (
"errors"
"fmt"
"math"
"time"

"github.com/alecthomas/types/optional"
)

var (
AsyncCalls *AsyncCallMetrics
Calls *CallMetrics
Deployment *DeploymentMetrics
Ingress *IngressMetrics
PubSub *PubSubMetrics
Controller *ControllerTracing
Timeline *TimelineMetrics
)

func init() {
var errs error
var err error

AsyncCalls, err = initAsyncCallMetrics()
errs = errors.Join(errs, err)
Calls, err = initCallMetrics()
errs = errors.Join(errs, err)
Deployment, err = initDeploymentMetrics()
errs = errors.Join(errs, err)
Ingress, err = initIngressMetrics()
errs = errors.Join(errs, err)
PubSub, err = initPubSubMetrics()
errs = errors.Join(errs, err)
AsyncCalls = initAsyncCallMetrics()
Calls = initCallMetrics()
Deployment = initDeploymentMetrics()
PubSub = initPubSubMetrics()
Controller = initControllerTracing()
Timeline, err = initTimelineMetrics()
errs = errors.Join(errs, err)

if errs != nil {
panic(fmt.Errorf("could not initialize controller metrics: %w", errs))
}
}

func wrapErr(signalName string, err error) error {
return fmt.Errorf("failed to create %q signal: %w", signalName, err)
}

func timeSinceMS(start time.Time) int64 {
return time.Since(start).Milliseconds()
}

// logBucket returns a string bucket label for a given positive number bucketed into
// powers of some arbitrary base. For base 8, for example, we would have buckets:
//
// <1, [1-8), [8-64), [64-512), etc.
//
// The buckets are then demarcated by `min` and `max`, such that any `num` < `base`^`min`
// will be bucketed together into the min bucket, and similarly, any `num` >= `base`^`max`
// will go in the `max` bucket. This constrains output cardinality by chopping the long
// tails on both ends of the normal distribution and lumping them all into terminal
// buckets. When `min` and `max` are not provided, the effective `min` is 0, and there is
// no max.
//
// Go only supports a few bases with math.Log*, so this func performs a change of base:
// log_b(x) = log_a(x) / log_a(b)
func logBucket(base int, num int64, optMin, optMax optional.Option[int]) string {
if num < 1 {
return "<1"
}
b := float64(base)

// Check max
maxBucket, ok := optMax.Get()
if ok {
maxThreshold := int64(math.Pow(b, float64(maxBucket)))
if num >= maxThreshold {
return fmt.Sprintf(">=%d", maxThreshold)
}
}

// Check min
minBucket, ok := optMin.Get()
if ok {
minThreshold := int64(math.Pow(b, float64(minBucket)))
if num < minThreshold {
return fmt.Sprintf("<%d", minThreshold)
}
}

logB := math.Log(float64(num)) / math.Log(b)
bucketExpLo := math.Floor(logB)

return fmt.Sprintf("[%d,%d)", int(math.Pow(b, bucketExpLo)), int(math.Pow(b, bucketExpLo+1)))
}
10 changes: 5 additions & 5 deletions backend/controller/observability/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type PubSubMetrics struct {
sinkCalled metric.Int64Counter
}

func initPubSubMetrics() (*PubSubMetrics, error) {
func initPubSubMetrics() *PubSubMetrics {
result := &PubSubMetrics{
published: noop.Int64Counter{},
propagationFailed: noop.Int64Counter{},
Expand All @@ -50,26 +50,26 @@ func initPubSubMetrics() (*PubSubMetrics, error) {
counterName,
metric.WithUnit("1"),
metric.WithDescription("the number of times that an event is published to a topic")); err != nil {
return nil, wrapErr(counterName, err)
observability.FatalError(counterName, err)
}

counterName = fmt.Sprintf("%s.propagation.failed", pubsubMeterName)
if result.propagationFailed, err = meter.Int64Counter(
counterName,
metric.WithUnit("1"),
metric.WithDescription("the number of times that subscriptions fail to progress")); err != nil {
return nil, wrapErr(counterName, err)
observability.FatalError(counterName, err)
}

counterName = fmt.Sprintf("%s.sink.called", pubsubMeterName)
if result.sinkCalled, err = meter.Int64Counter(
counterName,
metric.WithUnit("1"),
metric.WithDescription("the number of times that a pubsub event has been enqueued to asynchronously send to a subscriber")); err != nil {
return nil, wrapErr(counterName, err)
observability.FatalError(counterName, err)
}

return result, nil
return result
}

func (m *PubSubMetrics) Published(ctx context.Context, module, topic, caller string, maybeErr error) {
Expand Down
Loading

0 comments on commit a51794a

Please sign in to comment.