Skip to content

Commit

Permalink
feat: async call otel metrics (#2209)
Browse files Browse the repository at this point in the history
These are just the basic metrics. Queue depth up-down counters should
come next

```
ScopeMetrics #3
ScopeMetrics SchemaURL: 
InstrumentationScope ftl.async_call 

Metric #0
Descriptor:
     -> Name: ftl.async_call.acquired
     -> Description: the number of times that the controller tries acquiring an async call
     -> Unit: 1
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.async_call.origin: Str(sub:echo.sub)
     -> ftl.async_call.time_since_scheduled_at_ms: Int(24)
     -> ftl.async_call.verb.ref: Str(echo.echoSinkOne)
     -> ftl.module.name: Str(echo)
     -> ftl.status.succeeded: Bool(true)
StartTimestamp: 2024-07-31 00:33:53.897947 +0000 UTC
Timestamp: 2024-07-31 00:34:08.898478 +0000 UTC
Value: 1

Metric #1
Descriptor:
     -> Name: ftl.async_call.executed
     -> Description: the number of times that the controller tries executing an async call
     -> Unit: 1
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.async_call.origin: Str(sub:echo.sub)
     -> ftl.async_call.time_since_scheduled_at_ms: Int(41)
     -> ftl.async_call.verb.ref: Str(echo.echoSinkOne)
     -> ftl.module.name: Str(echo)
     -> ftl.status.succeeded: Bool(true)
StartTimestamp: 2024-07-31 00:33:53.897952 +0000 UTC
Timestamp: 2024-07-31 00:34:08.898485 +0000 UTC
Value: 1

Metric #2
Descriptor:
     -> Name: ftl.async_call.completed
     -> Description: the number of times that the controller tries completing an async call
     -> Unit: 1
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.async_call.origin: Str(sub:echo.sub)
     -> ftl.async_call.time_since_scheduled_at_ms: Int(46)
     -> ftl.async_call.verb.ref: Str(echo.echoSinkOne)
     -> ftl.module.name: Str(echo)
     -> ftl.status.succeeded: Bool(true)
StartTimestamp: 2024-07-31 00:33:53.897956 +0000 UTC
Timestamp: 2024-07-31 00:34:08.898487 +0000 UTC
Value: 1
```

When a call fails, it looks like:
```
Metric #1
Descriptor:
     -> Name: ftl.async_call.executed
     -> Description: the number of times that the controller tries executing an async call
     -> Unit: 1
     -> DataType: Sum
     -> IsMonotonic: true
     -> AggregationTemporality: Cumulative

NumberDataPoints #0
Data point attributes:
     -> ftl.async_call.execution.failure_mode: Str(async call failed)
     -> ftl.async_call.origin: Str(sub:echo.sub)
     -> ftl.async_call.time_since_scheduled_at_ms: Int(16)
     -> ftl.async_call.verb.ref: Str(echo.echoSinkOne)
     -> ftl.module.name: Str(echo)
     -> ftl.status.succeeded: Bool(false)
StartTimestamp: 2024-07-31 00:41:26.013892 +0000 UTC
Timestamp: 2024-07-31 00:41:41.014946 +0000 UTC
Value: 1
```

Issue: #2194

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
deniseli and github-actions[bot] authored Jul 31, 2024
1 parent d6b6e17 commit 323c61b
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 2 deletions.
8 changes: 8 additions & 0 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/ingress"
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/observability"
"github.com/TBD54566975/ftl/backend/controller/pubsub"
"github.com/TBD54566975/ftl/backend/controller/scaling"
"github.com/TBD54566975/ftl/backend/controller/scaling/localscaling"
Expand Down Expand Up @@ -1335,8 +1336,10 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
logger.Tracef("No async calls to execute")
return time.Second * 2, nil
} else if err != nil {
observability.AsyncCalls.Acquired(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, err)
return 0, err
}
observability.AsyncCalls.Acquired(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, nil)
defer call.Release() //nolint:errcheck

logger = logger.Scope(fmt.Sprintf("%s:%s", call.Origin, call.Verb))
Expand All @@ -1351,15 +1354,18 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
failed := false
if err != nil {
logger.Warnf("Async call could not be called: %v", err)
observability.AsyncCalls.Executed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, optional.Some("async call could not be called"))
callResult = either.RightOf[[]byte](err.Error())
failed = true
} else if perr := resp.Msg.GetError(); perr != nil {
logger.Warnf("Async call failed: %s", perr.Message)
observability.AsyncCalls.Executed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, optional.Some("async call failed"))
callResult = either.RightOf[[]byte](perr.Message)
failed = true
} else {
logger.Debugf("Async call succeeded")
callResult = either.LeftOf[string](resp.Msg.GetBody())
observability.AsyncCalls.Executed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, optional.None[string]())
}
err = s.dal.CompleteAsyncCall(ctx, call, callResult, func(tx *dal.Tx) error {
if failed && call.RemainingAttempts > 0 {
Expand All @@ -1379,8 +1385,10 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (time.Duration, error)
}
})
if err != nil {
observability.AsyncCalls.Completed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, err)
return 0, fmt.Errorf("failed to complete async call: %w", err)
}
observability.AsyncCalls.Completed(ctx, call.Verb, call.Origin.String(), call.ScheduledAt, nil)
go func() {
// Post-commit notification based on origin
switch origin := call.Origin.(type) {
Expand Down
102 changes: 102 additions & 0 deletions backend/controller/observability/async_calls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package observability

import (
"context"
"fmt"
"time"

"github.com/alecthomas/types/optional"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"

"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/observability"
)

const (
asyncCallMeterName = "ftl.async_call"
asyncCallOriginAttr = "ftl.async_call.origin"
asyncCallVerbRefAttr = "ftl.async_call.verb.ref"
asyncCallTimeSinceScheduledAtAttr = "ftl.async_call.time_since_scheduled_at_ms"
asyncCallExecFailureModeAttr = "ftl.async_call.execution.failure_mode"
)

type AsyncCallMetrics struct {
meter metric.Meter
acquired metric.Int64Counter
executed metric.Int64Counter
completed metric.Int64Counter
}

func initAsyncCallMetrics() (*AsyncCallMetrics, error) {
result := &AsyncCallMetrics{}
var errs error
var err error

result.meter = otel.Meter(asyncCallMeterName)

counterName := fmt.Sprintf("%s.acquired", asyncCallMeterName)
if result.acquired, err = result.meter.Int64Counter(
counterName,
metric.WithUnit("1"),
metric.WithDescription("the number of times that the controller tries acquiring an async call")); err != nil {
errs = handleInitCounterError(errs, err, counterName)
result.acquired = noop.Int64Counter{}
}

counterName = fmt.Sprintf("%s.executed", asyncCallMeterName)
if result.executed, err = result.meter.Int64Counter(
counterName,
metric.WithUnit("1"),
metric.WithDescription("the number of times that the controller tries executing an async call")); err != nil {
errs = handleInitCounterError(errs, err, counterName)
result.executed = noop.Int64Counter{}
}

counterName = fmt.Sprintf("%s.completed", asyncCallMeterName)
if result.completed, err = result.meter.Int64Counter(
counterName,
metric.WithUnit("1"),
metric.WithDescription("the number of times that the controller tries completing an async call")); err != nil {
errs = handleInitCounterError(errs, err, counterName)
result.completed = noop.Int64Counter{}
}

return result, errs
}

func (m *AsyncCallMetrics) Acquired(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeErr error) {
m.acquired.Add(ctx, 1, metric.WithAttributes(extractAsyncCallAndMaybeErrAttrs(verb, origin, scheduledAt, maybeErr)...))
}

func (m *AsyncCallMetrics) Executed(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeFailureMode optional.Option[string]) {
attrs := extractAsyncCallAttrs(verb, origin, scheduledAt)

failureMode, ok := maybeFailureMode.Get()
attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, !ok))
if ok {
attrs = append(attrs, attribute.String(asyncCallExecFailureModeAttr, failureMode))
}

m.executed.Add(ctx, 1, metric.WithAttributes(attrs...))
}

func (m *AsyncCallMetrics) Completed(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeErr error) {
m.completed.Add(ctx, 1, metric.WithAttributes(extractAsyncCallAndMaybeErrAttrs(verb, origin, scheduledAt, maybeErr)...))
}

func extractAsyncCallAndMaybeErrAttrs(verb schema.RefKey, origin string, scheduledAt time.Time, maybeErr error) []attribute.KeyValue {
attrs := extractAsyncCallAttrs(verb, origin, scheduledAt)
return append(attrs, attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil))
}

func extractAsyncCallAttrs(verb schema.RefKey, origin string, scheduledAt time.Time) []attribute.KeyValue {
return []attribute.KeyValue{
attribute.String(observability.ModuleNameAttribute, verb.Module),
attribute.String(asyncCallVerbRefAttr, verb.String()),
attribute.String(asyncCallOriginAttr, origin),
attribute.Int64(asyncCallTimeSinceScheduledAtAttr, time.Since(scheduledAt).Milliseconds()),
}
}
7 changes: 5 additions & 2 deletions backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,17 @@ import (
)

var (
FSM *FSMMetrics
PubSub *PubSubMetrics
AsyncCalls *AsyncCallMetrics
FSM *FSMMetrics
PubSub *PubSubMetrics
)

func init() {
var errs error
var err error

AsyncCalls, err = initAsyncCallMetrics()
errs = errors.Join(errs, err)
FSM, err = initFSMMetrics()
errs = errors.Join(errs, err)
PubSub, err = initPubSubMetrics()
Expand Down

0 comments on commit 323c61b

Please sign in to comment.