diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index 4d88227c95..e00af2d742 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -18,6 +18,7 @@ import ( "github.com/TBD54566975/ftl/backend/controller/cronjobs/dal" parentdal "github.com/TBD54566975/ftl/backend/controller/dal" + "github.com/TBD54566975/ftl/backend/controller/observability" "github.com/TBD54566975/ftl/backend/controller/scheduledtask" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema" @@ -202,6 +203,7 @@ func (s *Service) executeJob(ctx context.Context, job model.CronJob) { requestJSON, err := json.Marshal(requestBody) if err != nil { logger.Errorf(err, "could not build body for cron job: %v", job.Key) + observability.Cron.JobFailedStart(ctx, job) return } @@ -214,10 +216,17 @@ func (s *Service) executeJob(ctx context.Context, job model.CronJob) { callCtx, cancel := context.WithTimeout(ctx, s.config.Timeout) defer cancel() + + observability.Cron.JobStarted(ctx, job) _, err = s.call(callCtx, req, optional.Some(requestKey), s.requestSource) + + // Record execution success/failure metric now and leave post job-execution-action observability to logging if err != nil { logger.Errorf(err, "failed to execute cron job %v", job.Key) + observability.Cron.JobFailed(ctx, job) // Do not return, continue to end the job and schedule the next execution + } else { + observability.Cron.JobSuccess(ctx, job) } schedule, err := cron.Parse(job.Schedule) @@ -274,6 +283,7 @@ func (s *Service) killOldJobs(ctx context.Context) (time.Duration, error) { continue } logger.Warnf("Killed stale cron job %s", stale.Key) + observability.Cron.JobKilled(ctx, stale) updatedJobs = append(updatedJobs, updated) } diff --git a/backend/controller/observability/async_calls.go b/backend/controller/observability/async_calls.go index 2b955dec01..d763a88aa7 100644 --- a/backend/controller/observability/async_calls.go +++ b/backend/controller/observability/async_calls.go @@ -91,7 +91,7 @@ func wrapErr(signalName string, err error) error { func (m *AsyncCallMetrics) Created(ctx context.Context, verb schema.RefKey, origin string, remainingAttempts int64, maybeErr error) { attrs := extractRefAttrs(verb, origin) - attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil)) + attrs = append(attrs, observability.SuccessOrFailureStatusAttr(maybeErr == nil)) attrs = append(attrs, attribute.Int64(asyncCallRemainingAttemptsAttr, remainingAttempts)) m.created.Add(ctx, 1, metric.WithAttributes(attrs...)) @@ -103,7 +103,7 @@ func (m *AsyncCallMetrics) RecordQueueDepth(ctx context.Context, queueDepth int6 func (m *AsyncCallMetrics) Acquired(ctx context.Context, verb schema.RefKey, origin string, scheduledAt time.Time, maybeErr error) { attrs := extractAsyncCallAttrs(verb, origin, scheduledAt) - attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil)) + attrs = append(attrs, observability.SuccessOrFailureStatusAttr(maybeErr == nil)) m.acquired.Add(ctx, 1, metric.WithAttributes(attrs...)) } @@ -111,7 +111,7 @@ func (m *AsyncCallMetrics) Executed(ctx context.Context, verb schema.RefKey, ori attrs := extractAsyncCallAttrs(verb, origin, scheduledAt) failureMode, ok := maybeFailureMode.Get() - attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, !ok)) + attrs = append(attrs, observability.SuccessOrFailureStatusAttr(!ok)) if ok { attrs = append(attrs, attribute.String(asyncCallExecFailureModeAttr, failureMode)) } @@ -123,7 +123,7 @@ func (m *AsyncCallMetrics) Completed(ctx context.Context, verb schema.RefKey, or msToComplete := timeSinceMS(scheduledAt) attrs := extractRefAttrs(verb, origin) - attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil)) + attrs = append(attrs, observability.SuccessOrFailureStatusAttr(maybeErr == nil)) m.msToComplete.Record(ctx, msToComplete, metric.WithAttributes(attrs...)) attrs = append(attrs, attribute.String(asyncCallTimeSinceScheduledAtBucketAttr, logBucket(8, msToComplete))) diff --git a/backend/controller/observability/calls.go b/backend/controller/observability/calls.go index 6051e8e72e..30118a24a2 100644 --- a/backend/controller/observability/calls.go +++ b/backend/controller/observability/calls.go @@ -59,7 +59,7 @@ func (m *CallMetrics) Request(ctx context.Context, verb *schemapb.Ref, startTime } failureMode, ok := maybeFailureMode.Get() - attrs = append(attrs, attribute.Bool(observability.StatusSucceededAttribute, !ok)) + attrs = append(attrs, observability.SuccessOrFailureStatusAttr(!ok)) if ok { attrs = append(attrs, attribute.String(callFailureModeAttr, failureMode)) } diff --git a/backend/controller/observability/cron.go b/backend/controller/observability/cron.go new file mode 100644 index 0000000000..de4fefd973 --- /dev/null +++ b/backend/controller/observability/cron.go @@ -0,0 +1,107 @@ +package observability + +import ( + "context" + "fmt" + + "github.com/alecthomas/types/optional" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/observability" +) + +const ( + cronMeterName = "ftl.cron" + cronJobFullNameAttribute = "ftl.cron.job.full_name" + + cronJobKilledStatus = "killed" + cronJobFailedStartStatus = "failed_start" +) + +type CronMetrics struct { + jobsActive metric.Int64UpDownCounter + jobsCompleted metric.Int64Counter + jobLatency metric.Int64Histogram +} + +func initCronMetrics() (*CronMetrics, error) { + result := &CronMetrics{} + + var errs error + var err error + + meter := otel.Meter(deploymentMeterName) + + counter := fmt.Sprintf("%s.jobs.completed", cronMeterName) + if result.jobsCompleted, err = meter.Int64Counter( + counter, + metric.WithDescription("the number of cron jobs completed; successful or otherwise")); err != nil { + result.jobsCompleted, errs = handleInt64CounterError(counter, err, errs) + } + + counter = fmt.Sprintf("%s.jobs.active", cronMeterName) + if result.jobsActive, err = meter.Int64UpDownCounter( + counter, + metric.WithDescription("the number of actively executing cron jobs")); err != nil { + result.jobsActive, errs = handleInt64UpDownCounterError(counter, err, errs) + } + + counter = fmt.Sprintf("%s.job.latency", cronMeterName) + if result.jobLatency, err = meter.Int64Histogram( + counter, + metric.WithDescription("the latency between the scheduled execution time of a cron job"), + metric.WithUnit("ms")); err != nil { + result.jobLatency, errs = handleInt64HistogramCounterError(counter, err, errs) + } + + return result, errs +} + +func (m *CronMetrics) JobStarted(ctx context.Context, job model.CronJob) { + m.jobsActive.Add(ctx, 1, cronAttributes(job, optional.None[string]())) +} + +func (m *CronMetrics) JobSuccess(ctx context.Context, job model.CronJob) { + m.jobCompleted(ctx, job, observability.SuccessStatus) +} + +func (m *CronMetrics) JobKilled(ctx context.Context, job model.CronJob) { + m.jobCompleted(ctx, job, cronJobKilledStatus) +} + +func (m *CronMetrics) JobFailedStart(ctx context.Context, job model.CronJob) { + completionAttributes := cronAttributes(job, optional.Some(cronJobFailedStartStatus)) + + elapsed := timeSinceMS(job.NextExecution) + m.jobLatency.Record(ctx, elapsed, completionAttributes) + m.jobsCompleted.Add(ctx, 1, completionAttributes) +} + +func (m *CronMetrics) JobFailed(ctx context.Context, job model.CronJob) { + m.jobCompleted(ctx, job, observability.FailureStatus) +} + +func (m *CronMetrics) jobCompleted(ctx context.Context, job model.CronJob, status string) { + elapsed := timeSinceMS(job.NextExecution) + + m.jobsActive.Add(ctx, -1, cronAttributes(job, optional.None[string]())) + + completionAttributes := cronAttributes(job, optional.Some(status)) + m.jobLatency.Record(ctx, elapsed, completionAttributes) + m.jobsCompleted.Add(ctx, 1, completionAttributes) +} + +func cronAttributes(job model.CronJob, maybeStatus optional.Option[string]) metric.MeasurementOption { + attributes := []attribute.KeyValue{ + attribute.String(observability.ModuleNameAttribute, job.Key.Payload.Module), + attribute.String(cronJobFullNameAttribute, job.Key.String()), + attribute.String(observability.RunnerDeploymentKeyAttribute, job.DeploymentKey.String()), + } + if status, ok := maybeStatus.Get(); ok { + attributes = append(attributes, attribute.String(observability.OutcomeStatusNameAttribute, status)) + } + return metric.WithAttributes(attributes...) +} diff --git a/backend/controller/observability/observability.go b/backend/controller/observability/observability.go index dc044c93fa..ccb2f687a0 100644 --- a/backend/controller/observability/observability.go +++ b/backend/controller/observability/observability.go @@ -16,6 +16,7 @@ var ( Deployment *DeploymentMetrics FSM *FSMMetrics PubSub *PubSubMetrics + Cron *CronMetrics ) func init() { @@ -32,6 +33,8 @@ func init() { errs = errors.Join(errs, err) PubSub, err = initPubSubMetrics() errs = errors.Join(errs, err) + Cron, err = initCronMetrics() + errs = errors.Join(errs, err) if err != nil { panic(fmt.Errorf("could not initialize controller metrics: %w", errs)) @@ -48,6 +51,11 @@ func handleInt64UpDownCounterError(counter string, err error, errs error) (metri return noop.Int64UpDownCounter{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err)) } +//nolint:unparam +func handleInt64HistogramCounterError(counter string, err error, errs error) (metric.Int64Histogram, error) { + return noop.Int64Histogram{}, errors.Join(errs, fmt.Errorf("%q counter init failed; falling back to noop: %w", counter, err)) +} + func timeSinceMS(start time.Time) int64 { return time.Since(start).Milliseconds() } diff --git a/backend/controller/observability/pubsub.go b/backend/controller/observability/pubsub.go index 42f2a51045..32088f0395 100644 --- a/backend/controller/observability/pubsub.go +++ b/backend/controller/observability/pubsub.go @@ -83,7 +83,7 @@ func (m *PubSubMetrics) Published(ctx context.Context, module, topic, caller str attribute.String(observability.ModuleNameAttribute, module), attribute.String(pubsubTopicRefAttr, schema.RefKey{Module: module, Name: topic}.String()), attribute.String(pubsubCallerVerbRefAttr, schema.RefKey{Module: module, Name: caller}.String()), - attribute.Bool(observability.StatusSucceededAttribute, maybeErr == nil), + observability.SuccessOrFailureStatusAttr(maybeErr == nil), } m.published.Add(ctx, 1, metric.WithAttributes(attrs...)) diff --git a/internal/observability/attributes.go b/internal/observability/attributes.go index 44f01c858f..504eb0e123 100644 --- a/internal/observability/attributes.go +++ b/internal/observability/attributes.go @@ -1,7 +1,19 @@ package observability +import "go.opentelemetry.io/otel/attribute" + const ( ModuleNameAttribute = "ftl.module.name" - StatusSucceededAttribute = "ftl.status.succeeded" + OutcomeStatusNameAttribute = "ftl.outcome.status" RunnerDeploymentKeyAttribute = "ftl.deployment.key" + + SuccessStatus = "success" + FailureStatus = "failure" ) + +func SuccessOrFailureStatusAttr(succeeded bool) attribute.KeyValue { + if succeeded { + return attribute.String(OutcomeStatusNameAttribute, SuccessStatus) + } + return attribute.String(OutcomeStatusNameAttribute, FailureStatus) +}