Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: introduce cron job metrics #2256

Merged
merged 12 commits into from
Aug 7, 2024
10 changes: 10 additions & 0 deletions backend/controller/cronjobs/cronjobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/TBD54566975/ftl/backend/controller/cronjobs/dal"
parentdal "github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/observability"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
Expand Down Expand Up @@ -202,6 +203,7 @@ func (s *Service) executeJob(ctx context.Context, job model.CronJob) {
requestJSON, err := json.Marshal(requestBody)
if err != nil {
logger.Errorf(err, "could not build body for cron job: %v", job.Key)
observability.Cron.JobFailedStart(ctx, job)
return
}

Expand All @@ -214,10 +216,17 @@ func (s *Service) executeJob(ctx context.Context, job model.CronJob) {

callCtx, cancel := context.WithTimeout(ctx, s.config.Timeout)
defer cancel()

observability.Cron.JobStarted(ctx, job)
_, err = s.call(callCtx, req, optional.Some(requestKey), s.requestSource)

// Record execution success/failure metric now and leave post job-execution-action observability to logging
if err != nil {
logger.Errorf(err, "failed to execute cron job %v", job.Key)
observability.Cron.JobFailed(ctx, job)
// Do not return, continue to end the job and schedule the next execution
} else {
observability.Cron.JobSuccess(ctx, job)
}

schedule, err := cron.Parse(job.Schedule)
Expand Down Expand Up @@ -274,6 +283,7 @@ func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) {
continue
}
logger.Warnf("Killed stale cron job %s", stale.Key)
observability.Cron.JobKilled(ctx, stale)
updatedJobs = append(updatedJobs, updated)
}

Expand Down
8 changes: 4 additions & 4 deletions backend/controller/observability/async_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func wrapErr(signalName string, err error) error {

func (m *AsyncCallMetrics) Created(ctx context.Context, verb schema.RefKey, origin string, remainingAttempts int64, maybeErr error) {
attrs := extractRefAttrs(verb, origin)
attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil))
attrs = append(attrs, observability.SuccessOrFailureStatus(maybeErr == nil))
attrs = append(attrs, attribute.Int64(asyncCallRemainingAttemptsAttr, remainingAttempts))

m.created.Add(ctx, 1, metric.WithAttributes(attrs...))
Expand All @@ -103,15 +103,15 @@ func (m *AsyncCallMetrics) RecordQueueDepth(ctx context.Context, queueDepth int6

func (m *AsyncCallMetrics) Acquired(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeErr error) {
attrs := extractAsyncCallAttrs(verb, origin, scheduledAt)
attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil))
attrs = append(attrs, observability.SuccessOrFailureStatus(maybeErr == nil))
m.acquired.Add(ctx, 1, metric.WithAttributes(attrs...))
}

func (m *AsyncCallMetrics) Executed(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeFailureMode optional.Option[string]) {
attrs := extractAsyncCallAttrs(verb, origin, scheduledAt)

failureMode, ok := maybeFailureMode.Get()
attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, !ok))
attrs = append(attrs, observability.SuccessOrFailureStatus(!ok))
if ok {
attrs = append(attrs, attribute.String(asyncCallExecFailureModeAttr, failureMode))
}
Expand All @@ -123,7 +123,7 @@ func (m *AsyncCallMetrics) Completed(ctx context.Context, verb schema.RefKey, or
msToComplete := timeSinceMS(scheduledAt)

attrs := extractRefAttrs(verb, origin)
attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil))
attrs = append(attrs, observability.SuccessOrFailureStatus(maybeErr == nil))
m.msToComplete.Record(ctx, msToComplete, metric.WithAttributes(attrs...))

attrs = append(attrs, attribute.String(asyncCallTimeSinceScheduledAtBucketAttr, logBucket(8, msToComplete)))
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/observability/calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (m *CallMetrics) Request(ctx context.Context, verb *schemapb.Ref, startTime
}

failureMode, ok := maybeFailureMode.Get()
attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, !ok))
attrs = append(attrs, observability.SuccessOrFailureStatus(!ok))
if ok {
attrs = append(attrs, attribute.String(callFailureModeAttr, failureMode))
}
Expand Down
107 changes: 107 additions & 0 deletions backend/controller/observability/cron.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package observability

import (
"context"
"fmt"

"github.com/alecthomas/types/optional"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"

"github.com/TBD54566975/ftl/internal/model"
"github.com/TBD54566975/ftl/internal/observability"
)

const (
cronMeterName = "ftl.cron"
cronJobFullNameAttribute = "ftl.cron.job.full_name"

cronJobKilledStatus = "killed"
cronJobFailedStartStatus = "failed_start"
)

type CronMetrics struct {
jobsActive metric.Int64UpDownCounter
jobsCompleted metric.Int64Counter
jobLatency metric.Int64Histogram
}

func initCronMetrics() (*CronMetrics, error) {
result := &CronMetrics{}

var errs error
var err error

meter := otel.Meter(deploymentMeterName)

counter := fmt.Sprintf("%s.jobs.completed", cronMeterName)
if result.jobsCompleted, err = meter.Int64Counter(
counter,
metric.WithDescription("the number of cron jobs completed; successful or otherwise")); err != nil {
result.jobsCompleted, errs = handleInt64CounterError(counter, err, errs)
}

counter = fmt.Sprintf("%s.jobs.active", cronMeterName)
if result.jobsActive, err = meter.Int64UpDownCounter(
counter,
metric.WithDescription("the number of actively executing cron jobs")); err != nil {
result.jobsActive, errs = handleInt64UpDownCounterError(counter, err, errs)
}

counter = fmt.Sprintf("%s.job.latency", cronMeterName)
if result.jobLatency, err = meter.Int64Histogram(
counter,
metric.WithDescription("the latency between the scheduled execution time of a cron job"),
metric.WithUnit("ms")); err != nil {
result.jobLatency, errs = handleInt64HistogramCounterError(counter, err, errs)
}

return result, errs
}

func (m *CronMetrics) JobStarted(ctx context.Context, job model.CronJob) {
m.jobsActive.Add(ctx, 1, cronAttributes(job, optional.None[string]()))
}

func (m *CronMetrics) JobSuccess(ctx context.Context, job model.CronJob) {
m.jobCompleted(ctx, job, observability.SuccessStatus)
}

func (m *CronMetrics) JobKilled(ctx context.Context, job model.CronJob) {
m.jobCompleted(ctx, job, cronJobKilledStatus)
}

func (m *CronMetrics) JobFailedStart(ctx context.Context, job model.CronJob) {
completionAttributes := cronAttributes(job, optional.Some(cronJobFailedStartStatus))

elapsed := timeSinceMS(job.NextExecution)
m.jobLatency.Record(ctx, elapsed, completionAttributes)
m.jobsCompleted.Add(ctx, 1, completionAttributes)
}

func (m *CronMetrics) JobFailed(ctx context.Context, job model.CronJob) {
m.jobCompleted(ctx, job, observability.FailureStatus)
}

func (m *CronMetrics) jobCompleted(ctx context.Context, job model.CronJob, status string) {
elapsed := timeSinceMS(job.NextExecution)

m.jobsActive.Add(ctx, -1, cronAttributes(job, optional.None[string]()))

completionAttributes := cronAttributes(job, optional.Some(status))
m.jobLatency.Record(ctx, elapsed, completionAttributes)
m.jobsCompleted.Add(ctx, 1, completionAttributes)
}

func cronAttributes(job model.CronJob, maybeStatus optional.Option[string]) metric.MeasurementOption {
attributes := []attribute.KeyValue{
attribute.String(observability.ModuleNameAttribute, job.Key.Payload.Module),
attribute.String(cronJobFullNameAttribute, job.Key.String()),
attribute.String(observability.RunnerDeploymentKeyAttribute, job.DeploymentKey.String()),
}
if status, ok := maybeStatus.Get(); ok {
attributes = append(attributes, attribute.String(observability.OutcomeStatusNameAttribute, status))
}
return metric.WithAttributes(attributes...)
}
8 changes: 8 additions & 0 deletions backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ var (
Deployment *DeploymentMetrics
FSM *FSMMetrics
PubSub *PubSubMetrics
Cron *CronMetrics
)

func init() {
Expand All @@ -32,6 +33,8 @@ func init() {
errs = errors.Join(errs, err)
PubSub, err = initPubSubMetrics()
errs = errors.Join(errs, err)
Cron, err = initCronMetrics()
errs = errors.Join(errs, err)

if err != nil {
panic(fmt.Errorf("could not initialize controller metrics: %w", errs))
Expand All @@ -48,6 +51,11 @@ func handleInt64UpDownCounterError(counter string, err error, errs error) (metri
return noop.Int64UpDownCounter{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err))
}

//nolint:unparam
func handleInt64HistogramCounterError(counter string, err error, errs error) (metric.Int64Histogram, error) {
return noop.Int64Histogram{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err))
}

func timeSinceMS(start time.Time) int64 {
return time.Since(start).Milliseconds()
}
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/observability/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (m *PubSubMetrics) Published(ctx context.Context, module, topic, caller str
attribute.String(observability.ModuleNameAttribute, module),
attribute.String(pubsubTopicRefAttr, schema.RefKey{Module: module, Name: topic}.String()),
attribute.String(pubsubCallerVerbRefAttr, schema.RefKey{Module: module, Name: caller}.String()),
attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil),
observability.SuccessOrFailureStatus(maybeErr == nil),
}

m.published.Add(ctx, 1, metric.WithAttributes(attrs...))
Expand Down
14 changes: 13 additions & 1 deletion internal/observability/attributes.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,19 @@
package observability

import "go.opentelemetry.io/otel/attribute"

const (
ModuleNameAttribute = "ftl.module.name"
StatusSucceededAttribute = "ftl.status.succeeded"
OutcomeStatusNameAttribute = "ftl.outcome.status"
RunnerDeploymentKeyAttribute = "ftl.deployment.key"

SuccessStatus = "success"
FailureStatus = "failure"
)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about reprising the pattern you came up with ages ago and adding a helper function here that just returns attribute.String(observability.OutcomeStatusNameAttribute, observability.SuccessOrFailureStatus(succeeded))? Since so much of the code that uses this now is basically just duplicating that line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sgtm, applied

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

micro-nit: could we rename SuccessOrFailureStatus to SuccessOrFailureStatusAttr since that's what it returns?

func SuccessOrFailureStatus(succeeded bool) attribute.KeyValue {
if succeeded {
return attribute.String(OutcomeStatusNameAttribute, SuccessStatus)
}
return attribute.String(OutcomeStatusNameAttribute, FailureStatus)
}