diff --git a/backend/controller/call_events.go b/backend/controller/call_events.go index ebf3a172c7..609e9a3657 100644 --- a/backend/controller/call_events.go +++ b/backend/controller/call_events.go @@ -14,14 +14,15 @@ import ( ) type Call struct { - deploymentKey model.DeploymentKey - requestKey model.RequestKey - startTime time.Time - destVerb *schema.Ref - callers []*schema.Ref - request *ftlv1.CallRequest - response optional.Option[*ftlv1.CallResponse] - callError optional.Option[error] + deploymentKey model.DeploymentKey + requestKey model.RequestKey + parentRequestKey optional.Option[model.RequestKey] + startTime time.Time + destVerb *schema.Ref + callers []*schema.Ref + request *ftlv1.CallRequest + response optional.Option[*ftlv1.CallResponse] + callError optional.Option[error] } func (s *Service) recordCall(ctx context.Context, call *Call) { @@ -46,16 +47,17 @@ func (s *Service) recordCall(ctx context.Context, call *Call) { } err := s.dal.InsertCallEvent(ctx, &dal.CallEvent{ - Time: call.startTime, - DeploymentKey: call.deploymentKey, - RequestKey: optional.Some(call.requestKey), - Duration: time.Since(call.startTime), - SourceVerb: sourceVerb, - DestVerb: *call.destVerb, - Request: call.request.GetBody(), - Response: responseBody, - Error: errorStr, - Stack: stack, + Time: call.startTime, + DeploymentKey: call.deploymentKey, + RequestKey: optional.Some(call.requestKey), + ParentRequestKey: call.parentRequestKey, + Duration: time.Since(call.startTime), + SourceVerb: sourceVerb, + DestVerb: *call.destVerb, + Request: call.request.GetBody(), + Response: responseBody, + Error: errorStr, + Stack: stack, }) if err != nil { logger.Errorf(err, "failed to record call") diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 0223a8155a..28399102a9 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -856,7 +856,7 @@ func (s *Service) AcquireLease(ctx context.Context, stream *connect.BidiStream[f } func (s *Service) Call(ctx context.Context, req *connect.Request[ftlv1.CallRequest]) (*connect.Response[ftlv1.CallResponse], error) { - return s.callWithRequest(ctx, req, optional.None[model.RequestKey](), "") + return s.callWithRequest(ctx, req, optional.None[model.RequestKey](), optional.None[model.RequestKey](), "") } func (s *Service) SendFSMEvent(ctx context.Context, req *connect.Request[ftlv1.SendFSMEventRequest]) (resp *connect.Response[ftlv1.SendFSMEventResponse], err error) { @@ -962,6 +962,7 @@ func (s *Service) callWithRequest( ctx context.Context, req *connect.Request[ftlv1.CallRequest], key optional.Option[model.RequestKey], + parentKey optional.Option[model.RequestKey], sourceAddress string, ) (*connect.Response[ftlv1.CallResponse], error) { start := time.Now() @@ -1049,6 +1050,9 @@ func (s *Service) callWithRequest( } } + if pk, ok := parentKey.Get(); ok { + ctx = rpc.WithParentRequestKey(ctx, pk) + } ctx = rpc.WithRequestKey(ctx, requestKey) ctx = rpc.WithVerbs(ctx, append(callers, verbRef)) headers.AddCaller(req.Header(), schema.RefFromProto(req.Msg.Verb)) @@ -1064,14 +1068,15 @@ func (s *Service) callWithRequest( observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed")) } s.recordCall(ctx, &Call{ - deploymentKey: route.Deployment, - requestKey: requestKey, - startTime: start, - destVerb: verbRef, - callers: callers, - callError: optional.Nil(err), - request: req.Msg, - response: maybeResponse, + deploymentKey: route.Deployment, + requestKey: requestKey, + parentRequestKey: parentKey, + startTime: start, + destVerb: verbRef, + callers: callers, + callError: optional.Nil(err), + request: req.Msg, + response: maybeResponse, }) return resp, err } @@ -1384,6 +1389,23 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration return 0, err } + // Extract the otel context from the call + ctx, err = observability.ExtractTraceContextToContext(ctx, call.TraceContext) + if err != nil { + observability.AsyncCalls.Acquired(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, call.Catching, err) + return 0, fmt.Errorf("failed to extract trace context: %w", err) + } + + // Extract the request key from the call and attach it as the parent request key + parentRequestKey := optional.None[model.RequestKey]() + if prk, ok := call.ParentRequestKey.Get(); ok { + if rk, err := model.ParseRequestKey(prk); err == nil { + parentRequestKey = optional.Some(rk) + } else { + logger.Tracef("Ignoring invalid request key: %s", prk) + } + } + observability.AsyncCalls.Acquired(ctx, call.Verb, call.CatchVerb, call.Origin.String(), call.ScheduledAt, call.Catching, nil) defer func() { @@ -1417,7 +1439,7 @@ func (s *Service) executeAsyncCalls(ctx context.Context) (interval time.Duration Verb: call.Verb.ToProto(), Body: call.Request, } - resp, err := s.callWithRequest(ctx, connect.NewRequest(req), optional.None[model.RequestKey](), s.config.Advertise.String()) + resp, err := s.callWithRequest(ctx, connect.NewRequest(req), optional.None[model.RequestKey](), parentRequestKey, s.config.Advertise.String()) var callResult either.Either[[]byte, string] if err != nil { logger.Warnf("Async call could not be called: %v", err) @@ -1475,7 +1497,7 @@ func (s *Service) catchAsyncCall(ctx context.Context, logger *log.Logger, call * Verb: catchVerb.ToProto(), Body: body, } - resp, err := s.callWithRequest(ctx, connect.NewRequest(req), optional.None[model.RequestKey](), s.config.Advertise.String()) + resp, err := s.callWithRequest(ctx, connect.NewRequest(req), optional.None[model.RequestKey](), optional.None[model.RequestKey](), s.config.Advertise.String()) var catchResult either.Either[[]byte, string] if err != nil { // Could not call catch verb diff --git a/backend/controller/cronjobs/cronjobs.go b/backend/controller/cronjobs/cronjobs.go index e00af2d742..1f555a1f79 100644 --- a/backend/controller/cronjobs/cronjobs.go +++ b/backend/controller/cronjobs/cronjobs.go @@ -80,7 +80,7 @@ type Scheduler interface { Parallel(retry backoff.Backoff, job scheduledtask.Job) } -type ExecuteCallFunc func(context.Context, *connect.Request[ftlv1.CallRequest], optional.Option[model.RequestKey], string) (*connect.Response[ftlv1.CallResponse], error) +type ExecuteCallFunc func(context.Context, *connect.Request[ftlv1.CallRequest], optional.Option[model.RequestKey], optional.Option[model.RequestKey], string) (*connect.Response[ftlv1.CallResponse], error) type Service struct { config Config @@ -216,9 +216,8 @@ func (s *Service) executeJob(ctx context.Context, job model.CronJob) { callCtx, cancel := context.WithTimeout(ctx, s.config.Timeout) defer cancel() - observability.Cron.JobStarted(ctx, job) - _, err = s.call(callCtx, req, optional.Some(requestKey), s.requestSource) + _, err = s.call(callCtx, req, optional.Some(requestKey), optional.None[model.RequestKey](), s.requestSource) // Record execution success/failure metric now and leave post job-execution-action observability to logging if err != nil { diff --git a/backend/controller/cronjobs/cronjobs_test.go b/backend/controller/cronjobs/cronjobs_test.go index c77d86bc93..bba46f5a42 100644 --- a/backend/controller/cronjobs/cronjobs_test.go +++ b/backend/controller/cronjobs/cronjobs_test.go @@ -70,7 +70,7 @@ func TestHashRing(t *testing.T) { err = mockDal.ReplaceDeployment(ctx, deploymentKey, 1) assert.NoError(t, err) - controllers := newControllers(ctx, 20, mockDal, func() clock.Clock { return clock.NewMock() }, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { + controllers := newControllers(ctx, 20, mockDal, func() clock.Clock { return clock.NewMock() }, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], p optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { return &connect.Response[ftlv1.CallResponse]{}, nil }) diff --git a/backend/controller/cronjobs/cronjobs_utils_test.go b/backend/controller/cronjobs/cronjobs_utils_test.go index 572d9e2395..5e9d64bd59 100644 --- a/backend/controller/cronjobs/cronjobs_utils_test.go +++ b/backend/controller/cronjobs/cronjobs_utils_test.go @@ -233,7 +233,7 @@ func testServiceWithDal(ctx context.Context, t *testing.T, dal DAL, parentDAL Pa err = parentDAL.ReplaceDeployment(ctx, deploymentKey, 1) assert.NoError(t, err) - _ = newControllers(ctx, 5, dal, func() clock.Clock { return clk }, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { + _ = newControllers(ctx, 5, dal, func() clock.Clock { return clk }, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], o optional.Option[model.RequestKey], p optional.Option[model.RequestKey], s string) (*connect.Response[ftlv1.CallResponse], error) { verbRef := schema.RefFromProto(r.Msg.Verb) verbCallCountLock.Lock() diff --git a/backend/controller/cronjobs/sql/models.go b/backend/controller/cronjobs/sql/models.go index e93784de75..e67f71b017 100644 --- a/backend/controller/cronjobs/sql/models.go +++ b/backend/controller/cronjobs/sql/models.go @@ -384,6 +384,8 @@ type AsyncCall struct { MaxBackoff time.Duration CatchVerb optional.Option[schema.RefKey] Catching bool + ParentRequestKey optional.Option[string] + TraceContext []byte } type Controller struct { @@ -426,16 +428,17 @@ type DeploymentArtefact struct { } type Event struct { - ID int64 - TimeStamp time.Time - DeploymentID int64 - RequestID optional.Option[int64] - Type EventType - CustomKey1 optional.Option[string] - CustomKey2 optional.Option[string] - CustomKey3 optional.Option[string] - CustomKey4 optional.Option[string] - Payload json.RawMessage + ID int64 + TimeStamp time.Time + DeploymentID int64 + RequestID optional.Option[int64] + Type EventType + CustomKey1 optional.Option[string] + CustomKey2 optional.Option[string] + CustomKey3 optional.Option[string] + CustomKey4 optional.Option[string] + Payload json.RawMessage + ParentRequestID optional.Option[string] } type FsmInstance struct { @@ -520,12 +523,14 @@ type Topic struct { } type TopicEvent struct { - ID int64 - CreatedAt time.Time - Key model.TopicEventKey - TopicID int64 - Payload []byte - Caller optional.Option[string] + ID int64 + CreatedAt time.Time + Key model.TopicEventKey + TopicID int64 + Payload []byte + Caller optional.Option[string] + RequestKey optional.Option[string] + TraceContext []byte } type TopicSubscriber struct { diff --git a/backend/controller/dal/async_calls.go b/backend/controller/dal/async_calls.go index 12b1ede0ae..02c576f932 100644 --- a/backend/controller/dal/async_calls.go +++ b/backend/controller/dal/async_calls.go @@ -71,14 +71,16 @@ func ParseAsyncOrigin(origin string) (AsyncOrigin, error) { } type AsyncCall struct { - *Lease // May be nil - ID int64 - Origin AsyncOrigin - Verb schema.RefKey - CatchVerb optional.Option[schema.RefKey] - Request json.RawMessage - ScheduledAt time.Time - QueueDepth int64 + *Lease // May be nil + ID int64 + Origin AsyncOrigin + Verb schema.RefKey + CatchVerb optional.Option[schema.RefKey] + Request json.RawMessage + ScheduledAt time.Time + QueueDepth int64 + ParentRequestKey optional.Option[string] + TraceContext []byte Error optional.Option[string] @@ -128,6 +130,8 @@ func (d *DAL) AcquireAsyncCall(ctx context.Context) (call *AsyncCall, err error) Lease: lease, ScheduledAt: row.ScheduledAt, QueueDepth: row.QueueDepth, + ParentRequestKey: row.ParentRequestKey, + TraceContext: row.TraceContext, RemainingAttempts: row.RemainingAttempts, Error: row.Error, Backoff: row.Backoff, diff --git a/backend/controller/dal/dal.go b/backend/controller/dal/dal.go index 83dd3c80c4..1ea5f6daf0 100644 --- a/backend/controller/dal/dal.go +++ b/backend/controller/dal/dal.go @@ -1142,6 +1142,10 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error { if rn, ok := call.RequestKey.Get(); ok { requestKey = optional.Some(rn.String()) } + var parentRequestKey optional.Option[string] + if pr, ok := call.ParentRequestKey.Get(); ok { + parentRequestKey = optional.Some(pr.String()) + } payload, err := d.encryptors.Logs.EncryptJSON(map[string]any{ "duration_ms": call.Duration.Milliseconds(), "request": call.Request, @@ -1153,14 +1157,15 @@ func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error { return fmt.Errorf("failed to encrypt call payload: %w", err) } return dalerrs.TranslatePGError(d.db.InsertCallEvent(ctx, sql.InsertCallEventParams{ - DeploymentKey: call.DeploymentKey, - RequestKey: requestKey, - TimeStamp: call.Time, - SourceModule: sourceModule, - SourceVerb: sourceVerb, - DestModule: call.DestVerb.Module, - DestVerb: call.DestVerb.Name, - Payload: payload, + DeploymentKey: call.DeploymentKey, + RequestKey: requestKey, + ParentRequestKey: parentRequestKey, + TimeStamp: call.Time, + SourceModule: sourceModule, + SourceVerb: sourceVerb, + DestModule: call.DestVerb.Module, + DestVerb: call.DestVerb.Name, + Payload: payload, })) } diff --git a/backend/controller/dal/events.go b/backend/controller/dal/events.go index a4301f3814..7250e047ac 100644 --- a/backend/controller/dal/events.go +++ b/backend/controller/dal/events.go @@ -51,17 +51,18 @@ func (e *LogEvent) GetID() int64 { return e.ID } func (e *LogEvent) event() {} type CallEvent struct { - ID int64 - DeploymentKey model.DeploymentKey - RequestKey optional.Option[model.RequestKey] - Time time.Time - SourceVerb optional.Option[schema.Ref] - DestVerb schema.Ref - Duration time.Duration - Request json.RawMessage - Response json.RawMessage - Error optional.Option[string] - Stack optional.Option[string] + ID int64 + DeploymentKey model.DeploymentKey + RequestKey optional.Option[model.RequestKey] + ParentRequestKey optional.Option[model.RequestKey] + Time time.Time + SourceVerb optional.Option[schema.Ref] + DestVerb schema.Ref + Duration time.Duration + Request json.RawMessage + Response json.RawMessage + Error optional.Option[string] + Stack optional.Option[string] } func (e *CallEvent) GetID() int64 { return e.ID } diff --git a/backend/controller/dal/pubsub.go b/backend/controller/dal/pubsub.go index 335ce91e20..699799daee 100644 --- a/backend/controller/dal/pubsub.go +++ b/backend/controller/dal/pubsub.go @@ -15,6 +15,7 @@ import ( "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" + "github.com/TBD54566975/ftl/internal/rpc" "github.com/TBD54566975/ftl/internal/slices" ) @@ -23,12 +24,32 @@ func (d *DAL) PublishEventForTopic(ctx context.Context, module, topic, caller st if err != nil { return fmt.Errorf("failed to encrypt payload: %w", err) } + + // Store the current otel context with the event + jsonOc, err := observability.RetrieveTraceContextFromContext(ctx) + if err != nil { + return fmt.Errorf("failed to retrieve trace context: %w", err) + } + + // Store the request key that initiated this publish, this will eventually + // become the parent request key of the subscriber call + requestKey := "" + if rk, err := rpc.RequestKeyFromContext(ctx); err == nil { + if rk, ok := rk.Get(); ok { + requestKey = rk.String() + } + } else { + return fmt.Errorf("failed to get request key: %w", err) + } + err = d.db.PublishEventForTopic(ctx, sql.PublishEventForTopicParams{ - Key: model.NewTopicEventKey(module, topic), - Module: module, - Topic: topic, - Caller: caller, - Payload: encryptedPayload, + Key: model.NewTopicEventKey(module, topic), + Module: module, + Topic: topic, + Caller: caller, + Payload: encryptedPayload, + RequestKey: requestKey, + TraceContext: jsonOc, }) observability.PubSub.Published(ctx, module, topic, caller, err) if err != nil { @@ -111,6 +132,8 @@ func (d *DAL) ProgressSubscriptions(ctx context.Context, eventConsumptionDelay t RemainingAttempts: subscriber.RetryAttempts, Backoff: subscriber.Backoff, MaxBackoff: subscriber.MaxBackoff, + ParentRequestKey: nextCursor.RequestKey, + TraceContext: nextCursor.TraceContext, CatchVerb: subscriber.CatchVerb, }) observability.AsyncCalls.Created(ctx, subscriber.Sink, subscriber.CatchVerb, origin.String(), int64(subscriber.RetryAttempts), err) diff --git a/backend/controller/ingress/handler.go b/backend/controller/ingress/handler.go index ba3c84caa4..3201bb9767 100644 --- a/backend/controller/ingress/handler.go +++ b/backend/controller/ingress/handler.go @@ -25,7 +25,7 @@ func Handle( routes []dal.IngressRoute, w http.ResponseWriter, r *http.Request, - call func(context.Context, *connect.Request[ftlv1.CallRequest], optional.Option[model.RequestKey], string) (*connect.Response[ftlv1.CallResponse], error), + call func(context.Context, *connect.Request[ftlv1.CallRequest], optional.Option[model.RequestKey], optional.Option[model.RequestKey], string) (*connect.Response[ftlv1.CallResponse], error), ) { logger := log.FromContext(r.Context()) logger.Debugf("%s %s", r.Method, r.URL.Path) @@ -54,7 +54,7 @@ func Handle( Body: body, }) - resp, err := call(r.Context(), creq, optional.Some(requestKey), r.RemoteAddr) + resp, err := call(r.Context(), creq, optional.Some(requestKey), optional.None[model.RequestKey](), r.RemoteAddr) if err != nil { logger.Errorf(err, "failed to call verb %s", route.Verb) if connectErr := new(connect.Error); errors.As(err, &connectErr) { diff --git a/backend/controller/ingress/handler_test.go b/backend/controller/ingress/handler_test.go index 72af762c4f..527992c04d 100644 --- a/backend/controller/ingress/handler_test.go +++ b/backend/controller/ingress/handler_test.go @@ -99,7 +99,7 @@ func TestIngress(t *testing.T) { req := httptest.NewRequest(test.method, test.path, bytes.NewBuffer(test.payload)).WithContext(ctx) req.URL.RawQuery = test.query.Encode() reqKey := model.NewRequestKey(model.OriginIngress, "test") - ingress.Handle(sch, reqKey, routes, rec, req, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) { + ingress.Handle(sch, reqKey, routes, rec, req, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], parentRequestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) { body, err := encoding.Marshal(response) assert.NoError(t, err) return connect.NewResponse(&ftlv1.CallResponse{Response: &ftlv1.CallResponse_Body{Body: body}}), nil diff --git a/backend/controller/observability/async_calls.go b/backend/controller/observability/async_calls.go index 9d6a57b66f..d2943be77c 100644 --- a/backend/controller/observability/async_calls.go +++ b/backend/controller/observability/async_calls.go @@ -2,6 +2,7 @@ package observability import ( "context" + "encoding/json" "fmt" "time" @@ -10,6 +11,7 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/propagation" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/observability" @@ -130,6 +132,28 @@ func (m *AsyncCallMetrics) Completed(ctx context.Context, verb schema.RefKey, ca m.queueDepth.Record(ctx, queueDepth) } +func ExtractTraceContextToContext(ctx context.Context, traceContext []byte) (context.Context, error) { + if len(traceContext) == 0 { + return ctx, nil + } + var oc propagation.MapCarrier + err := json.Unmarshal(traceContext, &oc) + if err != nil { + return ctx, fmt.Errorf("failed to unmarshal otel context: %w", err) + } + return otel.GetTextMapPropagator().Extract(ctx, oc), nil +} + +func RetrieveTraceContextFromContext(ctx context.Context) ([]byte, error) { + oc := propagation.MapCarrier(make(map[string]string)) + otel.GetTextMapPropagator().Inject(ctx, oc) + jsonOc, err := json.Marshal(oc) + if err != nil { + return jsonOc, fmt.Errorf("failed to marshal otel context: %w", err) + } + return jsonOc, nil +} + func extractAsyncCallAttrs(verb schema.RefKey, catchVerb optional.Option[schema.RefKey], origin string, scheduledAt time.Time, isCatching bool) []attribute.KeyValue { return append(extractRefAttrs(verb, catchVerb, origin, isCatching), attribute.String(asyncCallTimeSinceScheduledAtBucketAttr, logBucket(8, timeSinceMS(scheduledAt)))) } diff --git a/backend/controller/sql/models.go b/backend/controller/sql/models.go index e93784de75..e67f71b017 100644 --- a/backend/controller/sql/models.go +++ b/backend/controller/sql/models.go @@ -384,6 +384,8 @@ type AsyncCall struct { MaxBackoff time.Duration CatchVerb optional.Option[schema.RefKey] Catching bool + ParentRequestKey optional.Option[string] + TraceContext []byte } type Controller struct { @@ -426,16 +428,17 @@ type DeploymentArtefact struct { } type Event struct { - ID int64 - TimeStamp time.Time - DeploymentID int64 - RequestID optional.Option[int64] - Type EventType - CustomKey1 optional.Option[string] - CustomKey2 optional.Option[string] - CustomKey3 optional.Option[string] - CustomKey4 optional.Option[string] - Payload json.RawMessage + ID int64 + TimeStamp time.Time + DeploymentID int64 + RequestID optional.Option[int64] + Type EventType + CustomKey1 optional.Option[string] + CustomKey2 optional.Option[string] + CustomKey3 optional.Option[string] + CustomKey4 optional.Option[string] + Payload json.RawMessage + ParentRequestID optional.Option[string] } type FsmInstance struct { @@ -520,12 +523,14 @@ type Topic struct { } type TopicEvent struct { - ID int64 - CreatedAt time.Time - Key model.TopicEventKey - TopicID int64 - Payload []byte - Caller optional.Option[string] + ID int64 + CreatedAt time.Time + Key model.TopicEventKey + TopicID int64 + Payload []byte + Caller optional.Option[string] + RequestKey optional.Option[string] + TraceContext []byte } type TopicSubscriber struct { diff --git a/backend/controller/sql/queries.sql b/backend/controller/sql/queries.sql index 24a985584b..031a46e964 100644 --- a/backend/controller/sql/queries.sql +++ b/backend/controller/sql/queries.sql @@ -337,6 +337,7 @@ VALUES ( INSERT INTO events ( deployment_id, request_id, + parent_request_id, time_stamp, type, custom_key_1, @@ -351,6 +352,10 @@ VALUES ( WHEN sqlc.narg('request_key')::TEXT IS NULL THEN NULL ELSE (SELECT id FROM requests ir WHERE ir.key = sqlc.narg('request_key')::TEXT) END), + (CASE + WHEN sqlc.narg('parent_request_key')::TEXT IS NULL THEN NULL + ELSE (SELECT id FROM requests ir WHERE ir.key = sqlc.narg('parent_request_key')::TEXT) + END), sqlc.arg('time_stamp')::TIMESTAMPTZ, 'call', sqlc.narg('source_module')::TEXT, @@ -419,10 +424,10 @@ WHERE d.min_replicas > 0; -- name: InsertEvent :exec -INSERT INTO events (deployment_id, request_id, type, +INSERT INTO events (deployment_id, request_id, parent_request_id, type, custom_key_1, custom_key_2, custom_key_3, custom_key_4, payload) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id; -- name: NewLease :one @@ -471,7 +476,9 @@ INSERT INTO async_calls ( remaining_attempts, backoff, max_backoff, - catch_verb + catch_verb, + parent_request_key, + trace_context ) VALUES ( @verb, @@ -480,7 +487,9 @@ VALUES ( @remaining_attempts, @backoff::interval, @max_backoff::interval, - @catch_verb + @catch_verb, + @parent_request_key, + @trace_context::jsonb ) RETURNING id; @@ -525,6 +534,8 @@ RETURNING error, backoff, max_backoff, + parent_request_key, + trace_context, catching; -- name: SucceedAsyncCall :one @@ -743,7 +754,9 @@ INSERT INTO topic_events ( "key", topic_id, caller, - payload + payload, + request_key, + trace_context ) VALUES ( sqlc.arg('key')::topic_event_key, @@ -755,7 +768,9 @@ VALUES ( AND topics.name = sqlc.arg('topic')::TEXT ), sqlc.arg('caller')::TEXT, - sqlc.arg('payload') + sqlc.arg('payload'), + sqlc.arg('request_key')::TEXT, + sqlc.arg('trace_context')::jsonb ); -- name: GetSubscriptionsNeedingUpdate :many @@ -788,6 +803,8 @@ SELECT events."key" as event, events.payload, events.created_at, events.caller, + events.request_key, + events.trace_context, NOW() - events.created_at >= sqlc.arg('consumption_delay')::interval AS ready FROM topics LEFT JOIN topic_events as events ON events.topic_id = topics.id diff --git a/backend/controller/sql/queries.sql.go b/backend/controller/sql/queries.sql.go index 673a7f2425..a7815e1184 100644 --- a/backend/controller/sql/queries.sql.go +++ b/backend/controller/sql/queries.sql.go @@ -51,6 +51,8 @@ RETURNING error, backoff, max_backoff, + parent_request_key, + trace_context, catching ` @@ -68,6 +70,8 @@ type AcquireAsyncCallRow struct { Error optional.Option[string] Backoff time.Duration MaxBackoff time.Duration + ParentRequestKey optional.Option[string] + TraceContext []byte Catching bool } @@ -90,6 +94,8 @@ func (q *Queries) AcquireAsyncCall(ctx context.Context, ttl time.Duration) (Acqu &i.Error, &i.Backoff, &i.MaxBackoff, + &i.ParentRequestKey, + &i.TraceContext, &i.Catching, ) return i, err @@ -132,7 +138,7 @@ func (q *Queries) AsyncCallQueueDepth(ctx context.Context) (int64, error) { const beginConsumingTopicEvent = `-- name: BeginConsumingTopicEvent :exec WITH event AS ( - SELECT id, created_at, key, topic_id, payload, caller + SELECT id, created_at, key, topic_id, payload, caller, request_key, trace_context FROM topic_events WHERE "key" = $2::topic_event_key ) @@ -186,7 +192,9 @@ INSERT INTO async_calls ( remaining_attempts, backoff, max_backoff, - catch_verb + catch_verb, + parent_request_key, + trace_context ) VALUES ( $1, @@ -195,7 +203,9 @@ VALUES ( $4, $5::interval, $6::interval, - $7 + $7, + $8, + $9::jsonb ) RETURNING id ` @@ -208,6 +218,8 @@ type CreateAsyncCallParams struct { Backoff time.Duration MaxBackoff time.Duration CatchVerb optional.Option[schema.RefKey] + ParentRequestKey optional.Option[string] + TraceContext []byte } func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams) (int64, error) { @@ -219,6 +231,8 @@ func (q *Queries) CreateAsyncCall(ctx context.Context, arg CreateAsyncCallParams arg.Backoff, arg.MaxBackoff, arg.CatchVerb, + arg.ParentRequestKey, + arg.TraceContext, ) var id int64 err := row.Scan(&id) @@ -502,7 +516,7 @@ WITH updated AS ( SET state = 'error'::async_call_state, error = $7::TEXT WHERE id = $8::BIGINT - RETURNING id, created_at, lease_id, verb, state, origin, scheduled_at, request, response, error, remaining_attempts, backoff, max_backoff, catch_verb, catching + RETURNING id, created_at, lease_id, verb, state, origin, scheduled_at, request, response, error, remaining_attempts, backoff, max_backoff, catch_verb, catching, parent_request_key, trace_context ) INSERT INTO async_calls ( verb, @@ -1348,6 +1362,8 @@ SELECT events."key" as event, events.payload, events.created_at, events.caller, + events.request_key, + events.trace_context, NOW() - events.created_at >= $1::interval AS ready FROM topics LEFT JOIN topic_events as events ON events.topic_id = topics.id @@ -1358,11 +1374,13 @@ LIMIT 1 ` type GetNextEventForSubscriptionRow struct { - Event optional.Option[model.TopicEventKey] - Payload []byte - CreatedAt optional.Option[time.Time] - Caller optional.Option[string] - Ready bool + Event optional.Option[model.TopicEventKey] + Payload []byte + CreatedAt optional.Option[time.Time] + Caller optional.Option[string] + RequestKey optional.Option[string] + TraceContext []byte + Ready bool } func (q *Queries) GetNextEventForSubscription(ctx context.Context, consumptionDelay time.Duration, topic model.TopicKey, cursor optional.Option[model.TopicEventKey]) (GetNextEventForSubscriptionRow, error) { @@ -1373,6 +1391,8 @@ func (q *Queries) GetNextEventForSubscription(ctx context.Context, consumptionDe &i.Payload, &i.CreatedAt, &i.Caller, + &i.RequestKey, + &i.TraceContext, &i.Ready, ) return i, err @@ -1812,7 +1832,7 @@ func (q *Queries) GetTopic(ctx context.Context, dollar_1 int64) (Topic, error) { } const getTopicEvent = `-- name: GetTopicEvent :one -SELECT id, created_at, key, topic_id, payload, caller +SELECT id, created_at, key, topic_id, payload, caller, request_key, trace_context FROM topic_events WHERE id = $1::BIGINT ` @@ -1827,6 +1847,8 @@ func (q *Queries) GetTopicEvent(ctx context.Context, dollar_1 int64) (TopicEvent &i.TopicID, &i.Payload, &i.Caller, + &i.RequestKey, + &i.TraceContext, ) return i, err } @@ -1835,6 +1857,7 @@ const insertCallEvent = `-- name: InsertCallEvent :exec INSERT INTO events ( deployment_id, request_id, + parent_request_id, time_stamp, type, custom_key_1, @@ -1849,31 +1872,37 @@ VALUES ( WHEN $2::TEXT IS NULL THEN NULL ELSE (SELECT id FROM requests ir WHERE ir.key = $2::TEXT) END), - $3::TIMESTAMPTZ, + (CASE + WHEN $3::TEXT IS NULL THEN NULL + ELSE (SELECT id FROM requests ir WHERE ir.key = $3::TEXT) + END), + $4::TIMESTAMPTZ, 'call', - $4::TEXT, $5::TEXT, $6::TEXT, $7::TEXT, - $8 + $8::TEXT, + $9 ) ` type InsertCallEventParams struct { - DeploymentKey model.DeploymentKey - RequestKey optional.Option[string] - TimeStamp time.Time - SourceModule optional.Option[string] - SourceVerb optional.Option[string] - DestModule string - DestVerb string - Payload json.RawMessage + DeploymentKey model.DeploymentKey + RequestKey optional.Option[string] + ParentRequestKey optional.Option[string] + TimeStamp time.Time + SourceModule optional.Option[string] + SourceVerb optional.Option[string] + DestModule string + DestVerb string + Payload json.RawMessage } func (q *Queries) InsertCallEvent(ctx context.Context, arg InsertCallEventParams) error { _, err := q.db.Exec(ctx, insertCallEvent, arg.DeploymentKey, arg.RequestKey, + arg.ParentRequestKey, arg.TimeStamp, arg.SourceModule, arg.SourceVerb, @@ -1961,28 +1990,30 @@ func (q *Queries) InsertDeploymentUpdatedEvent(ctx context.Context, arg InsertDe } const insertEvent = `-- name: InsertEvent :exec -INSERT INTO events (deployment_id, request_id, type, +INSERT INTO events (deployment_id, request_id, parent_request_id, type, custom_key_1, custom_key_2, custom_key_3, custom_key_4, payload) -VALUES ($1, $2, $3, $4, $5, $6, $7, $8) +VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) RETURNING id ` type InsertEventParams struct { - DeploymentID int64 - RequestID optional.Option[int64] - Type EventType - CustomKey1 optional.Option[string] - CustomKey2 optional.Option[string] - CustomKey3 optional.Option[string] - CustomKey4 optional.Option[string] - Payload json.RawMessage + DeploymentID int64 + RequestID optional.Option[int64] + ParentRequestID optional.Option[string] + Type EventType + CustomKey1 optional.Option[string] + CustomKey2 optional.Option[string] + CustomKey3 optional.Option[string] + CustomKey4 optional.Option[string] + Payload json.RawMessage } func (q *Queries) InsertEvent(ctx context.Context, arg InsertEventParams) error { _, err := q.db.Exec(ctx, insertEvent, arg.DeploymentID, arg.RequestID, + arg.ParentRequestID, arg.Type, arg.CustomKey1, arg.CustomKey2, @@ -2129,7 +2160,7 @@ func (q *Queries) KillStaleRunners(ctx context.Context, timeout time.Duration) ( } const loadAsyncCall = `-- name: LoadAsyncCall :one -SELECT id, created_at, lease_id, verb, state, origin, scheduled_at, request, response, error, remaining_attempts, backoff, max_backoff, catch_verb, catching +SELECT id, created_at, lease_id, verb, state, origin, scheduled_at, request, response, error, remaining_attempts, backoff, max_backoff, catch_verb, catching, parent_request_key, trace_context FROM async_calls WHERE id = $1 ` @@ -2153,6 +2184,8 @@ func (q *Queries) LoadAsyncCall(ctx context.Context, id int64) (AsyncCall, error &i.MaxBackoff, &i.CatchVerb, &i.Catching, + &i.ParentRequestKey, + &i.TraceContext, ) return i, err } @@ -2185,7 +2218,9 @@ INSERT INTO topic_events ( "key", topic_id, caller, - payload + payload, + request_key, + trace_context ) VALUES ( $1::topic_event_key, @@ -2197,16 +2232,20 @@ VALUES ( AND topics.name = $3::TEXT ), $4::TEXT, - $5 + $5, + $6::TEXT, + $7::jsonb ) ` type PublishEventForTopicParams struct { - Key model.TopicEventKey - Module string - Topic string - Caller string - Payload []byte + Key model.TopicEventKey + Module string + Topic string + Caller string + Payload []byte + RequestKey string + TraceContext []byte } func (q *Queries) PublishEventForTopic(ctx context.Context, arg PublishEventForTopicParams) error { @@ -2216,6 +2255,8 @@ func (q *Queries) PublishEventForTopic(ctx context.Context, arg PublishEventForT arg.Topic, arg.Caller, arg.Payload, + arg.RequestKey, + arg.TraceContext, ) return err } diff --git a/backend/controller/sql/schema/20240807174508_pubsub_otel_context.sql b/backend/controller/sql/schema/20240807174508_pubsub_otel_context.sql new file mode 100644 index 0000000000..14fa897d1e --- /dev/null +++ b/backend/controller/sql/schema/20240807174508_pubsub_otel_context.sql @@ -0,0 +1,15 @@ +-- migrate:up + +ALTER TABLE topic_events + ADD COLUMN request_key TEXT, + ADD COLUMN trace_context JSONB; + +ALTER TABLE async_calls + ADD COLUMN parent_request_key TEXT, + ADD COLUMN trace_context JSONB; + +ALTER TABLE events + ADD COLUMN parent_request_id TEXT; + +-- migrate:down + diff --git a/common/configuration/sql/models.go b/common/configuration/sql/models.go index e93784de75..e67f71b017 100644 --- a/common/configuration/sql/models.go +++ b/common/configuration/sql/models.go @@ -384,6 +384,8 @@ type AsyncCall struct { MaxBackoff time.Duration CatchVerb optional.Option[schema.RefKey] Catching bool + ParentRequestKey optional.Option[string] + TraceContext []byte } type Controller struct { @@ -426,16 +428,17 @@ type DeploymentArtefact struct { } type Event struct { - ID int64 - TimeStamp time.Time - DeploymentID int64 - RequestID optional.Option[int64] - Type EventType - CustomKey1 optional.Option[string] - CustomKey2 optional.Option[string] - CustomKey3 optional.Option[string] - CustomKey4 optional.Option[string] - Payload json.RawMessage + ID int64 + TimeStamp time.Time + DeploymentID int64 + RequestID optional.Option[int64] + Type EventType + CustomKey1 optional.Option[string] + CustomKey2 optional.Option[string] + CustomKey3 optional.Option[string] + CustomKey4 optional.Option[string] + Payload json.RawMessage + ParentRequestID optional.Option[string] } type FsmInstance struct { @@ -520,12 +523,14 @@ type Topic struct { } type TopicEvent struct { - ID int64 - CreatedAt time.Time - Key model.TopicEventKey - TopicID int64 - Payload []byte - Caller optional.Option[string] + ID int64 + CreatedAt time.Time + Key model.TopicEventKey + TopicID int64 + Payload []byte + Caller optional.Option[string] + RequestKey optional.Option[string] + TraceContext []byte } type TopicSubscriber struct { diff --git a/internal/rpc/context.go b/internal/rpc/context.go index 34f7164328..3b10eab8f2 100644 --- a/internal/rpc/context.go +++ b/internal/rpc/context.go @@ -21,6 +21,7 @@ import ( type ftlDirectRoutingKey struct{} type ftlVerbKey struct{} type requestIDKey struct{} +type parentRequestIDKey struct{} // WithDirectRouting ensures any hops in Verb routing do not redirect. // @@ -63,6 +64,24 @@ func IsDirectRouted(ctx context.Context) bool { // TODO: Return an Option here instead of a bool. func RequestKeyFromContext(ctx context.Context) (optional.Option[model.RequestKey], error) { value := ctx.Value(requestIDKey{}) + return requestKeyFromContextValue(value) +} + +// WithRequestKey adds the request key to the context. +func WithRequestKey(ctx context.Context, key model.RequestKey) context.Context { + return context.WithValue(ctx, requestIDKey{}, key.String()) +} + +func ParentRequestKeyFromContext(ctx context.Context) (optional.Option[model.RequestKey], error) { + value := ctx.Value(parentRequestIDKey{}) + return requestKeyFromContextValue(value) +} + +func WithParentRequestKey(ctx context.Context, key model.RequestKey) context.Context { + return context.WithValue(ctx, parentRequestIDKey{}, key.String()) +} + +func requestKeyFromContextValue(value any) (optional.Option[model.RequestKey], error) { keyStr, ok := value.(string) if !ok { return optional.None[model.RequestKey](), nil @@ -74,11 +93,6 @@ func RequestKeyFromContext(ctx context.Context) (optional.Option[model.RequestKe return optional.Some(key), nil } -// WithRequestKey adds the request key to the context. -func WithRequestKey(ctx context.Context, key model.RequestKey) context.Context { - return context.WithValue(ctx, requestIDKey{}, key.String()) -} - func DefaultClientOptions(level log.Level) []connect.ClientOption { interceptors := []connect.Interceptor{ PanicInterceptor(), @@ -253,6 +267,11 @@ func propagateHeaders(ctx context.Context, isClient bool, header http.Header) (c } else if key, ok := key.Get(); ok { headers.SetRequestKey(header, key) } + if key, err := ParentRequestKeyFromContext(ctx); err != nil { + return nil, fmt.Errorf("invalid parent request key in context: %w", err) + } else if key, ok := key.Get(); ok { + headers.SetParentRequestKey(header, key) + } } else { if headers.IsDirectRouted(header) { ctx = WithDirectRouting(ctx) @@ -267,6 +286,11 @@ func propagateHeaders(ctx context.Context, isClient bool, header http.Header) (c } else if ok { ctx = WithRequestKey(ctx, key) } + if key, ok, err := headers.GetParentRequestKey(header); err != nil { + return nil, fmt.Errorf("invalid parent request key in header: %w", err) + } else if ok { + ctx = WithParentRequestKey(ctx, key) + } } return ctx, nil } diff --git a/internal/rpc/headers/headers.go b/internal/rpc/headers/headers.go index f084b778c1..e785613e4a 100644 --- a/internal/rpc/headers/headers.go +++ b/internal/rpc/headers/headers.go @@ -19,6 +19,9 @@ const ( VerbHeader = "Ftl-Verb" // RequestIDHeader is the header used to pass the inbound request ID. RequestIDHeader = "Ftl-Request-Id" + // ParentRequestIDHeader is the header used to pass the parent request ID, + // i.e. the publisher or fsm call that initiated this call. + ParentRequestIDHeader = "Ftl-Parent-Request-Id" ) func IsDirectRouted(header http.Header) bool { @@ -33,11 +36,24 @@ func SetRequestKey(header http.Header, key model.RequestKey) { header.Set(RequestIDHeader, key.String()) } +func SetParentRequestKey(header http.Header, key model.RequestKey) { + header.Set(ParentRequestIDHeader, key.String()) +} + // GetRequestKey from an incoming request. // // Will return ("", false, nil) if no request key is present. func GetRequestKey(header http.Header) (model.RequestKey, bool, error) { keyStr := header.Get(RequestIDHeader) + return getRequestKeyFromKeyStr(keyStr) +} + +func GetParentRequestKey(header http.Header) (model.RequestKey, bool, error) { + keyStr := header.Get(ParentRequestIDHeader) + return getRequestKeyFromKeyStr(keyStr) +} + +func getRequestKeyFromKeyStr(keyStr string) (model.RequestKey, bool, error) { if keyStr == "" { return model.RequestKey{}, false, nil } diff --git a/internal/rpc/otel_interceptor.go b/internal/rpc/otel_interceptor.go index 71cb97934d..d570e16991 100644 --- a/internal/rpc/otel_interceptor.go +++ b/internal/rpc/otel_interceptor.go @@ -11,10 +11,11 @@ import ( ) const ( - otelFtlRequestKeyAttr = attribute.Key("ftl.request_key") - otelFtlVerbChainAttr = attribute.Key("ftl.verb_chain") - otelFtlVerbRefAttr = attribute.Key("ftl.verb.ref") - otelFtlVerbModuleAttr = attribute.Key("ftl.verb.module") + otelFtlRequestKeyAttr = attribute.Key("ftl.request_key") + otelFtlParentRequestKeyAttr = attribute.Key("ftl.parent_request_key") + otelFtlVerbChainAttr = attribute.Key("ftl.verb_chain") + otelFtlVerbRefAttr = attribute.Key("ftl.verb.ref") + otelFtlVerbModuleAttr = attribute.Key("ftl.verb.module") ) func CustomOtelInterceptor() connect.Interceptor { @@ -33,6 +34,13 @@ func getAttributes(ctx context.Context) []attribute.KeyValue { if key, ok := requestKey.Get(); ok { attributes = append(attributes, otelFtlRequestKeyAttr.String(key.String())) } + parentRequestKey, err := ParentRequestKeyFromContext(ctx) + if err != nil { + logger.Warnf("failed to get parent request key: %s", err) + } + if key, ok := parentRequestKey.Get(); ok { + attributes = append(attributes, otelFtlParentRequestKeyAttr.String(key.String())) + } if verb, ok := VerbFromContext(ctx); ok { attributes = append(attributes, otelFtlVerbRefAttr.String(verb.String())) attributes = append(attributes, otelFtlVerbModuleAttr.String(verb.Module))