From c94d3a948b0335bf0d787b48bd370a15a673fe1f Mon Sep 17 00:00:00 2001 From: Wes Date: Wed, 18 Sep 2024 12:06:25 -0700 Subject: [PATCH] chore(refactor): add ingress event type and refactor timeline (#2719) This is mostly the refactor and setting up for the new `ingress` events. I'm pushing this up first because it will get quite large once I add `ingress` and a bit more refactoring. :) Actual ingress events will come next. --------- Co-authored-by: github-actions[bot] --- backend/controller/console/console.go | 44 ++-- backend/controller/controller.go | 3 +- backend/controller/deployment_logs.go | 13 +- .../20240917015216_add_ingress_event_type.sql | 6 + backend/controller/timeline/events_call.go | 136 ++++++++++ .../controller/timeline/events_deployment.go | 43 +++ backend/controller/timeline/events_ingress.go | 33 +++ backend/controller/timeline/events_log.go | 82 ++++++ .../timeline/{dal => }/internal/sql/db.go | 0 .../internal/sql/deployment_queries.sql | 0 .../internal/sql/deployment_queries.sql.go | 0 .../timeline/{dal => }/internal/sql/models.go | 1 + .../{dal => }/internal/sql/querier.go | 1 + .../{dal => }/internal/sql/queries.sql | 32 +++ .../{dal => }/internal/sql/queries.sql.go | 60 +++++ .../timeline/{dal/dal.go => query.go} | 245 +++--------------- backend/controller/timeline/timeline.go | 141 +++------- .../{dal/dal_test.go => timeline_test.go} | 38 +-- deployment/base/db-migrate/kustomization.yml | 1 + sqlc.yaml | 8 +- 20 files changed, 512 insertions(+), 375 deletions(-) create mode 100644 backend/controller/sql/schema/20240917015216_add_ingress_event_type.sql create mode 100644 backend/controller/timeline/events_call.go create mode 100644 backend/controller/timeline/events_deployment.go create mode 100644 backend/controller/timeline/events_ingress.go create mode 100644 backend/controller/timeline/events_log.go rename backend/controller/timeline/{dal => }/internal/sql/db.go (100%) rename backend/controller/timeline/{dal => }/internal/sql/deployment_queries.sql (100%) rename backend/controller/timeline/{dal => }/internal/sql/deployment_queries.sql.go (100%) rename backend/controller/timeline/{dal => }/internal/sql/models.go (97%) rename backend/controller/timeline/{dal => }/internal/sql/querier.go (90%) rename backend/controller/timeline/{dal => }/internal/sql/queries.sql (67%) rename backend/controller/timeline/{dal => }/internal/sql/queries.sql.go (72%) rename backend/controller/timeline/{dal/dal.go => query.go} (57%) rename backend/controller/timeline/{dal/dal_test.go => timeline_test.go} (85%) diff --git a/backend/controller/console/console.go b/backend/controller/console/console.go index ce8d9748dc..2c69f4e7c4 100644 --- a/backend/controller/console/console.go +++ b/backend/controller/console/console.go @@ -14,7 +14,6 @@ import ( "github.com/TBD54566975/ftl/backend/controller/dal" "github.com/TBD54566975/ftl/backend/controller/timeline" - timelinedal "github.com/TBD54566975/ftl/backend/controller/timeline/dal" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" pbconsole "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/console" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/console/pbconsoleconnect" @@ -242,7 +241,7 @@ func (c *ConsoleService) StreamEvents(ctx context.Context, req *connect.Request[ newQuery := query if !lastEventTime.IsZero() { - newQuery = append(newQuery, timelinedal.FilterTimeRange(thisRequestTime, lastEventTime)) + newQuery = append(newQuery, timeline.FilterTimeRange(thisRequestTime, lastEventTime)) } events, err := c.timeline.QueryTimeline(ctx, int(req.Msg.Query.Limit), newQuery...) @@ -268,11 +267,11 @@ func (c *ConsoleService) StreamEvents(ctx context.Context, req *connect.Request[ } } -func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timelinedal.TimelineFilter, error) { - var query []timelinedal.TimelineFilter +func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timeline.TimelineFilter, error) { + var query []timeline.TimelineFilter if pb.Order == pbconsole.EventsQuery_DESC { - query = append(query, timelinedal.FilterDescending()) + query = append(query, timeline.FilterDescending()) } for _, filter := range pb.Filters { @@ -286,7 +285,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timelinedal.TimelineFil } deploymentKeys = append(deploymentKeys, deploymentKey) } - query = append(query, timelinedal.FilterDeployments(deploymentKeys...)) + query = append(query, timeline.FilterDeployments(deploymentKeys...)) case *pbconsole.EventsQuery_Filter_Requests: requestKeys := make([]model.RequestKey, 0, len(filter.Requests.Requests)) @@ -297,32 +296,32 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timelinedal.TimelineFil } requestKeys = append(requestKeys, requestKey) } - query = append(query, timelinedal.FilterRequests(requestKeys...)) + query = append(query, timeline.FilterRequests(requestKeys...)) case *pbconsole.EventsQuery_Filter_EventTypes: - eventTypes := make([]timelinedal.EventType, 0, len(filter.EventTypes.EventTypes)) + eventTypes := make([]timeline.EventType, 0, len(filter.EventTypes.EventTypes)) for _, eventType := range filter.EventTypes.EventTypes { switch eventType { case pbconsole.EventType_EVENT_TYPE_CALL: - eventTypes = append(eventTypes, timelinedal.EventTypeCall) + eventTypes = append(eventTypes, timeline.EventTypeCall) case pbconsole.EventType_EVENT_TYPE_LOG: - eventTypes = append(eventTypes, timelinedal.EventTypeLog) + eventTypes = append(eventTypes, timeline.EventTypeLog) case pbconsole.EventType_EVENT_TYPE_DEPLOYMENT_CREATED: - eventTypes = append(eventTypes, timelinedal.EventTypeDeploymentCreated) + eventTypes = append(eventTypes, timeline.EventTypeDeploymentCreated) case pbconsole.EventType_EVENT_TYPE_DEPLOYMENT_UPDATED: - eventTypes = append(eventTypes, timelinedal.EventTypeDeploymentUpdated) + eventTypes = append(eventTypes, timeline.EventTypeDeploymentUpdated) default: return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown event type %v", eventType)) } } - query = append(query, timelinedal.FilterTypes(eventTypes...)) + query = append(query, timeline.FilterTypes(eventTypes...)) case *pbconsole.EventsQuery_Filter_LogLevel: level := log.Level(filter.LogLevel.LogLevel) if level < log.Trace || level > log.Error { return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown log level %v", filter.LogLevel.LogLevel)) } - query = append(query, timelinedal.FilterLogLevel(level)) + query = append(query, timeline.FilterLogLevel(level)) case *pbconsole.EventsQuery_Filter_Time: var newerThan, olderThan time.Time @@ -332,7 +331,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timelinedal.TimelineFil if filter.Time.OlderThan != nil { olderThan = filter.Time.OlderThan.AsTime() } - query = append(query, timelinedal.FilterTimeRange(olderThan, newerThan)) + query = append(query, timeline.FilterTimeRange(olderThan, newerThan)) case *pbconsole.EventsQuery_Filter_Id: var lowerThan, higherThan int64 @@ -342,7 +341,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timelinedal.TimelineFil if filter.Id.HigherThan != nil { higherThan = *filter.Id.HigherThan } - query = append(query, timelinedal.FilterIDRange(lowerThan, higherThan)) + query = append(query, timeline.FilterIDRange(lowerThan, higherThan)) case *pbconsole.EventsQuery_Filter_Call: var sourceModule optional.Option[string] if filter.Call.SourceModule != nil { @@ -352,7 +351,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timelinedal.TimelineFil if filter.Call.DestVerb != nil { destVerb = optional.Some(*filter.Call.DestVerb) } - query = append(query, timelinedal.FilterCall(sourceModule, filter.Call.DestModule, destVerb)) + query = append(query, timeline.FilterCall(sourceModule, filter.Call.DestModule, destVerb)) default: return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown filter %T", filter)) @@ -361,9 +360,9 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timelinedal.TimelineFil return query, nil } -func eventDALToProto(event timelinedal.TimelineEvent) *pbconsole.Event { +func eventDALToProto(event timeline.TimelineEvent) *pbconsole.Event { switch event := event.(type) { - case *timelinedal.CallEvent: + case *timeline.CallEvent: var requestKey *string if r, ok := event.RequestKey.Get(); ok { rstr := r.String() @@ -395,7 +394,7 @@ func eventDALToProto(event timelinedal.TimelineEvent) *pbconsole.Event { }, } - case *timelinedal.LogEvent: + case *timeline.LogEvent: var requestKey *string if r, ok := event.RequestKey.Get(); ok { rstr := r.String() @@ -413,12 +412,11 @@ func eventDALToProto(event timelinedal.TimelineEvent) *pbconsole.Event { Attributes: event.Attributes, Message: event.Message, Error: event.Error.Ptr(), - Stack: event.Stack.Ptr(), }, }, } - case *timelinedal.DeploymentCreatedEvent: + case *timeline.DeploymentCreatedEvent: var replaced *string if r, ok := event.ReplacedDeployment.Get(); ok { rstr := r.String() @@ -437,7 +435,7 @@ func eventDALToProto(event timelinedal.TimelineEvent) *pbconsole.Event { }, }, } - case *timelinedal.DeploymentUpdatedEvent: + case *timeline.DeploymentUpdatedEvent: return &pbconsole.Event{ TimeStamp: timestamppb.New(event.Time), Id: event.ID, diff --git a/backend/controller/controller.go b/backend/controller/controller.go index 9f53e0a504..8d2751d61c 100644 --- a/backend/controller/controller.go +++ b/backend/controller/controller.go @@ -46,7 +46,6 @@ import ( "github.com/TBD54566975/ftl/backend/controller/scaling" "github.com/TBD54566975/ftl/backend/controller/scheduledtask" "github.com/TBD54566975/ftl/backend/controller/timeline" - timelinedal "github.com/TBD54566975/ftl/backend/controller/timeline/dal" "github.com/TBD54566975/ftl/backend/libdal" ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/console/pbconsoleconnect" @@ -1836,7 +1835,7 @@ func (s *Service) reapCallEvents(ctx context.Context) (time.Duration, error) { return time.Hour, nil } - removed, err := s.timeline.DeleteOldEvents(ctx, timelinedal.EventTypeCall, *s.config.EventLogRetention) + removed, err := s.timeline.DeleteOldEvents(ctx, timeline.EventTypeCall, *s.config.EventLogRetention) if err != nil { return 0, fmt.Errorf("failed to prune call events: %w", err) } diff --git a/backend/controller/deployment_logs.go b/backend/controller/deployment_logs.go index 4f47bbf29e..d7919a578f 100644 --- a/backend/controller/deployment_logs.go +++ b/backend/controller/deployment_logs.go @@ -8,7 +8,6 @@ import ( "github.com/alecthomas/types/optional" "github.com/TBD54566975/ftl/backend/controller/timeline" - timelinedal "github.com/TBD54566975/ftl/backend/controller/timeline/dal" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" ) @@ -59,11 +58,6 @@ func (d *deploymentLogsSink) processLogs(ctx context.Context) { } deployment = dep - var errorStr optional.Option[string] - if entry.Error != nil { - errorStr = optional.Some(entry.Error.Error()) - } - var request optional.Option[model.RequestKey] if reqStr, ok := entry.Attributes["request"]; ok { req, err := model.ParseRequestKey(reqStr) @@ -72,7 +66,12 @@ func (d *deploymentLogsSink) processLogs(ctx context.Context) { } } - err = d.timeline.InsertLogEvent(ctx, &timelinedal.LogEvent{ + var errorStr optional.Option[string] + if entry.Error != nil { + errorStr = optional.Some(entry.Error.Error()) + } + + err = d.timeline.InsertLogEvent(ctx, &timeline.LogEvent{ RequestKey: request, DeploymentKey: deployment, Time: entry.Time, diff --git a/backend/controller/sql/schema/20240917015216_add_ingress_event_type.sql b/backend/controller/sql/schema/20240917015216_add_ingress_event_type.sql new file mode 100644 index 0000000000..3b38188e3d --- /dev/null +++ b/backend/controller/sql/schema/20240917015216_add_ingress_event_type.sql @@ -0,0 +1,6 @@ +-- migrate:up + +ALTER TYPE event_type ADD VALUE IF NOT EXISTS 'ingress'; + +-- migrate:down + diff --git a/backend/controller/timeline/events_call.go b/backend/controller/timeline/events_call.go new file mode 100644 index 0000000000..93706529a6 --- /dev/null +++ b/backend/controller/timeline/events_call.go @@ -0,0 +1,136 @@ +package timeline + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/alecthomas/types/either" + "github.com/alecthomas/types/optional" + + ftlencryption "github.com/TBD54566975/ftl/backend/controller/encryption/api" + "github.com/TBD54566975/ftl/backend/controller/timeline/internal/sql" + "github.com/TBD54566975/ftl/backend/libdal" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/backend/schema" + "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/model" +) + +type CallEvent struct { + ID int64 + DeploymentKey model.DeploymentKey + RequestKey optional.Option[model.RequestKey] + ParentRequestKey optional.Option[model.RequestKey] + Time time.Time + SourceVerb optional.Option[schema.Ref] + DestVerb schema.Ref + Duration time.Duration + Request json.RawMessage + Response json.RawMessage + Error optional.Option[string] + Stack optional.Option[string] +} + +func (e *CallEvent) GetID() int64 { return e.ID } +func (e *CallEvent) event() {} + +// The internal JSON payload of a call event. +type eventCallJSON struct { + DurationMS int64 `json:"duration_ms"` + Request json.RawMessage `json:"request"` + Response json.RawMessage `json:"response"` + Error optional.Option[string] `json:"error,omitempty"` + Stack optional.Option[string] `json:"stack,omitempty"` +} + +type Call struct { + DeploymentKey model.DeploymentKey + RequestKey model.RequestKey + ParentRequestKey optional.Option[model.RequestKey] + StartTime time.Time + DestVerb *schema.Ref + Callers []*schema.Ref + Request *ftlv1.CallRequest + Response either.Either[*ftlv1.CallResponse, error] +} + +func (s *Service) RecordCall(ctx context.Context, call *Call) { + logger := log.FromContext(ctx) + var sourceVerb optional.Option[schema.Ref] + if len(call.Callers) > 0 { + sourceVerb = optional.Some(*call.Callers[0]) + } + + var errorStr optional.Option[string] + var stack optional.Option[string] + var responseBody []byte + + switch response := call.Response.(type) { + case either.Left[*ftlv1.CallResponse, error]: + resp := response.Get() + responseBody = resp.GetBody() + if callError := resp.GetError(); callError != nil { + errorStr = optional.Some(callError.Message) + stack = optional.Ptr(callError.Stack) + } + case either.Right[*ftlv1.CallResponse, error]: + callError := response.Get() + errorStr = optional.Some(callError.Error()) + } + + err := s.insertCallEvent(ctx, &CallEvent{ + Time: call.StartTime, + DeploymentKey: call.DeploymentKey, + RequestKey: optional.Some(call.RequestKey), + ParentRequestKey: call.ParentRequestKey, + Duration: time.Since(call.StartTime), + SourceVerb: sourceVerb, + DestVerb: *call.DestVerb, + Request: call.Request.GetBody(), + Response: responseBody, + Error: errorStr, + Stack: stack, + }) + if err != nil { + logger.Errorf(err, "failed to record call") + } +} + +func (s *Service) insertCallEvent(ctx context.Context, call *CallEvent) error { + var sourceModule, sourceVerb optional.Option[string] + if sr, ok := call.SourceVerb.Get(); ok { + sourceModule, sourceVerb = optional.Some(sr.Module), optional.Some(sr.Name) + } + var requestKey optional.Option[string] + if rn, ok := call.RequestKey.Get(); ok { + requestKey = optional.Some(rn.String()) + } + var parentRequestKey optional.Option[string] + if pr, ok := call.ParentRequestKey.Get(); ok { + parentRequestKey = optional.Some(pr.String()) + } + var payload ftlencryption.EncryptedTimelineColumn + err := s.encryption.EncryptJSON(map[string]any{ + "duration_ms": call.Duration.Milliseconds(), + "request": call.Request, + "response": call.Response, + "error": call.Error, + "stack": call.Stack, + }, &payload) + if err != nil { + return fmt.Errorf("failed to encrypt call payload: %w", err) + } + return libdal.TranslatePGError(s.db.InsertTimelineCallEvent(ctx, sql.InsertTimelineCallEventParams{ + DeploymentKey: call.DeploymentKey, + RequestKey: requestKey, + ParentRequestKey: parentRequestKey, + TimeStamp: call.Time, + SourceModule: sourceModule, + SourceVerb: sourceVerb, + DestModule: call.DestVerb.Module, + DestVerb: call.DestVerb.Name, + Payload: payload, + })) +} diff --git a/backend/controller/timeline/events_deployment.go b/backend/controller/timeline/events_deployment.go new file mode 100644 index 0000000000..723da92858 --- /dev/null +++ b/backend/controller/timeline/events_deployment.go @@ -0,0 +1,43 @@ +package timeline + +import ( + "time" + + "github.com/alecthomas/types/optional" + + "github.com/TBD54566975/ftl/internal/model" +) + +type DeploymentCreatedEvent struct { + ID int64 + DeploymentKey model.DeploymentKey + Time time.Time + Language string + ModuleName string + MinReplicas int + ReplacedDeployment optional.Option[model.DeploymentKey] +} + +func (e *DeploymentCreatedEvent) GetID() int64 { return e.ID } +func (e *DeploymentCreatedEvent) event() {} + +type eventDeploymentUpdatedJSON struct { + MinReplicas int `json:"min_replicas"` + PrevMinReplicas int `json:"prev_min_replicas"` +} + +type DeploymentUpdatedEvent struct { + ID int64 + DeploymentKey model.DeploymentKey + Time time.Time + MinReplicas int + PrevMinReplicas int +} + +func (e *DeploymentUpdatedEvent) GetID() int64 { return e.ID } +func (e *DeploymentUpdatedEvent) event() {} + +type eventDeploymentCreatedJSON struct { + MinReplicas int `json:"min_replicas"` + ReplacedDeployment optional.Option[model.DeploymentKey] `json:"replaced,omitempty"` +} diff --git a/backend/controller/timeline/events_ingress.go b/backend/controller/timeline/events_ingress.go new file mode 100644 index 0000000000..7419b4b172 --- /dev/null +++ b/backend/controller/timeline/events_ingress.go @@ -0,0 +1,33 @@ +package timeline + +import ( + "context" + "encoding/json" + + "github.com/alecthomas/types/optional" + + "github.com/TBD54566975/ftl/backend/controller/timeline/internal/sql" + "github.com/TBD54566975/ftl/backend/libdal" + "github.com/TBD54566975/ftl/internal/model" +) + +type IngresEvent struct { + DeploymentKey model.DeploymentKey + RequestKey model.RequestKey +} + +// The internal JSON payload of an ingress event. +type eventIngressJSON struct { + DurationMS int64 `json:"duration_ms"` + Request json.RawMessage `json:"request"` + Response json.RawMessage `json:"response"` + Error optional.Option[string] `json:"error,omitempty"` + Stack optional.Option[string] `json:"stack,omitempty"` +} + +func (s *Service) InsertIngress(ctx context.Context, ingress *IngresEvent) error { + return libdal.TranslatePGError(s.db.InsertTimelineIngressEvent(ctx, sql.InsertTimelineIngressEventParams{ + DeploymentKey: ingress.DeploymentKey, + RequestKey: optional.Some(ingress.RequestKey.String()), + })) +} diff --git a/backend/controller/timeline/events_log.go b/backend/controller/timeline/events_log.go new file mode 100644 index 0000000000..ee5ec0a689 --- /dev/null +++ b/backend/controller/timeline/events_log.go @@ -0,0 +1,82 @@ +package timeline + +import ( + "context" + "fmt" + "time" + + "github.com/alecthomas/types/optional" + + ftlencryption "github.com/TBD54566975/ftl/backend/controller/encryption/api" + "github.com/TBD54566975/ftl/backend/controller/timeline/internal/sql" + "github.com/TBD54566975/ftl/backend/libdal" + ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" + "github.com/TBD54566975/ftl/internal/model" +) + +type LogEvent struct { + ID int64 + DeploymentKey model.DeploymentKey + RequestKey optional.Option[model.RequestKey] + Time time.Time + Level int32 + Attributes map[string]string + Message string + Error optional.Option[string] +} + +func (e *LogEvent) GetID() int64 { return e.ID } +func (e *LogEvent) event() {} + +type eventLogJSON struct { + Message string `json:"message"` + Attributes map[string]string `json:"attributes"` + Error optional.Option[string] `json:"error,omitempty"` +} + +type Log struct { + DeploymentKey model.DeploymentKey + RequestKey optional.Option[model.RequestKey] + Msg *ftlv1.StreamDeploymentLogsRequest +} + +func (s *Service) RecordLog(ctx context.Context, log *Log) error { + err := s.InsertLogEvent(ctx, &LogEvent{ + RequestKey: log.RequestKey, + DeploymentKey: log.DeploymentKey, + Time: log.Msg.TimeStamp.AsTime(), + Level: log.Msg.LogLevel, + Attributes: log.Msg.Attributes, + Message: log.Msg.Message, + Error: optional.Ptr(log.Msg.Error), + }) + if err != nil { + return fmt.Errorf("failed to insert log event: %w", err) + } + return nil +} + +func (s *Service) InsertLogEvent(ctx context.Context, log *LogEvent) error { + var requestKey optional.Option[string] + if name, ok := log.RequestKey.Get(); ok { + requestKey = optional.Some(name.String()) + } + + payload := map[string]any{ + "message": log.Message, + "attributes": log.Attributes, + "error": log.Error, + } + var encryptedPayload ftlencryption.EncryptedTimelineColumn + err := s.encryption.EncryptJSON(payload, &encryptedPayload) + if err != nil { + return fmt.Errorf("failed to encrypt log payload: %w", err) + } + return libdal.TranslatePGError(s.db.InsertTimelineLogEvent(ctx, sql.InsertTimelineLogEventParams{ + DeploymentKey: log.DeploymentKey, + RequestKey: requestKey, + TimeStamp: log.Time, + Level: log.Level, + Payload: encryptedPayload, + })) +} diff --git a/backend/controller/timeline/dal/internal/sql/db.go b/backend/controller/timeline/internal/sql/db.go similarity index 100% rename from backend/controller/timeline/dal/internal/sql/db.go rename to backend/controller/timeline/internal/sql/db.go diff --git a/backend/controller/timeline/dal/internal/sql/deployment_queries.sql b/backend/controller/timeline/internal/sql/deployment_queries.sql similarity index 100% rename from backend/controller/timeline/dal/internal/sql/deployment_queries.sql rename to backend/controller/timeline/internal/sql/deployment_queries.sql diff --git a/backend/controller/timeline/dal/internal/sql/deployment_queries.sql.go b/backend/controller/timeline/internal/sql/deployment_queries.sql.go similarity index 100% rename from backend/controller/timeline/dal/internal/sql/deployment_queries.sql.go rename to backend/controller/timeline/internal/sql/deployment_queries.sql.go diff --git a/backend/controller/timeline/dal/internal/sql/models.go b/backend/controller/timeline/internal/sql/models.go similarity index 97% rename from backend/controller/timeline/dal/internal/sql/models.go rename to backend/controller/timeline/internal/sql/models.go index 5571794e2f..ea1201073b 100644 --- a/backend/controller/timeline/dal/internal/sql/models.go +++ b/backend/controller/timeline/internal/sql/models.go @@ -20,6 +20,7 @@ const ( EventTypeLog EventType = "log" EventTypeDeploymentCreated EventType = "deployment_created" EventTypeDeploymentUpdated EventType = "deployment_updated" + EventTypeIngress EventType = "ingress" ) func (e *EventType) Scan(src interface{}) error { diff --git a/backend/controller/timeline/dal/internal/sql/querier.go b/backend/controller/timeline/internal/sql/querier.go similarity index 90% rename from backend/controller/timeline/dal/internal/sql/querier.go rename to backend/controller/timeline/internal/sql/querier.go index 9d0bf33988..c6364fc330 100644 --- a/backend/controller/timeline/dal/internal/sql/querier.go +++ b/backend/controller/timeline/internal/sql/querier.go @@ -17,6 +17,7 @@ type Querier interface { InsertTimelineCallEvent(ctx context.Context, arg InsertTimelineCallEventParams) error InsertTimelineDeploymentCreatedEvent(ctx context.Context, arg InsertTimelineDeploymentCreatedEventParams) error InsertTimelineDeploymentUpdatedEvent(ctx context.Context, arg InsertTimelineDeploymentUpdatedEventParams) error + InsertTimelineIngressEvent(ctx context.Context, arg InsertTimelineIngressEventParams) error InsertTimelineLogEvent(ctx context.Context, arg InsertTimelineLogEventParams) error } diff --git a/backend/controller/timeline/dal/internal/sql/queries.sql b/backend/controller/timeline/internal/sql/queries.sql similarity index 67% rename from backend/controller/timeline/dal/internal/sql/queries.sql rename to backend/controller/timeline/internal/sql/queries.sql index 8e989ef779..33095cc9bd 100644 --- a/backend/controller/timeline/dal/internal/sql/queries.sql +++ b/backend/controller/timeline/internal/sql/queries.sql @@ -53,6 +53,38 @@ VALUES ( sqlc.arg('payload') ); +-- name: InsertTimelineIngressEvent :exec +INSERT INTO timeline ( + deployment_id, + request_id, + parent_request_id, + time_stamp, + type, + custom_key_1, + custom_key_2, + custom_key_3, + custom_key_4, + payload +) +VALUES ( + (SELECT id FROM deployments WHERE deployments.key = sqlc.arg('deployment_key')::deployment_key), + (CASE + WHEN sqlc.narg('request_key')::TEXT IS NULL THEN NULL + ELSE (SELECT id FROM requests ir WHERE ir.key = sqlc.narg('request_key')::TEXT) + END), + (CASE + WHEN sqlc.narg('parent_request_key')::TEXT IS NULL THEN NULL + ELSE (SELECT id FROM requests ir WHERE ir.key = sqlc.narg('parent_request_key')::TEXT) + END), + sqlc.arg('time_stamp')::TIMESTAMPTZ, + 'ingress', + sqlc.narg('source')::TEXT, + sqlc.narg('destination_module')::TEXT, + sqlc.arg('ingress_type')::TEXT, + sqlc.narg('http_method')::TEXT, + sqlc.arg('payload') +); + -- name: DeleteOldTimelineEvents :one WITH deleted AS ( DELETE FROM timeline diff --git a/backend/controller/timeline/dal/internal/sql/queries.sql.go b/backend/controller/timeline/internal/sql/queries.sql.go similarity index 72% rename from backend/controller/timeline/dal/internal/sql/queries.sql.go rename to backend/controller/timeline/internal/sql/queries.sql.go index c75ac34a78..bd9fd8648b 100644 --- a/backend/controller/timeline/dal/internal/sql/queries.sql.go +++ b/backend/controller/timeline/internal/sql/queries.sql.go @@ -117,6 +117,66 @@ func (q *Queries) InsertTimelineCallEvent(ctx context.Context, arg InsertTimelin return err } +const insertTimelineIngressEvent = `-- name: InsertTimelineIngressEvent :exec +INSERT INTO timeline ( + deployment_id, + request_id, + parent_request_id, + time_stamp, + type, + custom_key_1, + custom_key_2, + custom_key_3, + custom_key_4, + payload +) +VALUES ( + (SELECT id FROM deployments WHERE deployments.key = $1::deployment_key), + (CASE + WHEN $2::TEXT IS NULL THEN NULL + ELSE (SELECT id FROM requests ir WHERE ir.key = $2::TEXT) + END), + (CASE + WHEN $3::TEXT IS NULL THEN NULL + ELSE (SELECT id FROM requests ir WHERE ir.key = $3::TEXT) + END), + $4::TIMESTAMPTZ, + 'ingress', + $5::TEXT, + $6::TEXT, + $7::TEXT, + $8::TEXT, + $9 +) +` + +type InsertTimelineIngressEventParams struct { + DeploymentKey model.DeploymentKey + RequestKey optional.Option[string] + ParentRequestKey optional.Option[string] + TimeStamp time.Time + Source optional.Option[string] + DestinationModule optional.Option[string] + IngressType string + HttpMethod optional.Option[string] + Payload api.EncryptedTimelineColumn +} + +func (q *Queries) InsertTimelineIngressEvent(ctx context.Context, arg InsertTimelineIngressEventParams) error { + _, err := q.db.ExecContext(ctx, insertTimelineIngressEvent, + arg.DeploymentKey, + arg.RequestKey, + arg.ParentRequestKey, + arg.TimeStamp, + arg.Source, + arg.DestinationModule, + arg.IngressType, + arg.HttpMethod, + arg.Payload, + ) + return err +} + const insertTimelineLogEvent = `-- name: InsertTimelineLogEvent :exec INSERT INTO timeline ( deployment_id, diff --git a/backend/controller/timeline/dal/dal.go b/backend/controller/timeline/query.go similarity index 57% rename from backend/controller/timeline/dal/dal.go rename to backend/controller/timeline/query.go index 816df58f8d..54ed0ec4b6 100644 --- a/backend/controller/timeline/dal/dal.go +++ b/backend/controller/timeline/query.go @@ -1,190 +1,21 @@ -package dal +package timeline import ( "context" stdsql "database/sql" - "encoding/json" "fmt" "strconv" "time" "github.com/alecthomas/types/optional" - "github.com/TBD54566975/ftl/backend/controller/encryption" - ftlencryption "github.com/TBD54566975/ftl/backend/controller/encryption/api" - "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" - "github.com/TBD54566975/ftl/backend/controller/timeline/dal/internal/sql" + "github.com/TBD54566975/ftl/backend/controller/timeline/internal/sql" "github.com/TBD54566975/ftl/backend/libdal" "github.com/TBD54566975/ftl/backend/schema" "github.com/TBD54566975/ftl/internal/log" "github.com/TBD54566975/ftl/internal/model" ) -type DAL struct { - *libdal.Handle[DAL] - db sql.Querier - encryption *encryption.Service -} - -func New(conn libdal.Connection, encryption *encryption.Service) *DAL { - var d *DAL - d = &DAL{ - db: sql.New(conn), - encryption: encryption, - Handle: libdal.New(conn, func(h *libdal.Handle[DAL]) *DAL { - return &DAL{ - Handle: h, - db: sql.New(h.Connection), - encryption: d.encryption, - } - }), - } - return d -} - -func (d *DAL) InsertLogEvent(ctx context.Context, log *LogEvent) error { - var requestKey optional.Option[string] - if name, ok := log.RequestKey.Get(); ok { - requestKey = optional.Some(name.String()) - } - - payload := map[string]any{ - "message": log.Message, - "attributes": log.Attributes, - "error": log.Error, - "stack": log.Stack, - } - var encryptedPayload ftlencryption.EncryptedTimelineColumn - err := d.encryption.EncryptJSON(payload, &encryptedPayload) - if err != nil { - return fmt.Errorf("failed to encrypt log payload: %w", err) - } - return libdal.TranslatePGError(d.db.InsertTimelineLogEvent(ctx, sql.InsertTimelineLogEventParams{ - DeploymentKey: log.DeploymentKey, - RequestKey: requestKey, - TimeStamp: log.Time, - Level: log.Level, - Payload: encryptedPayload, - })) -} - -func (d *DAL) InsertCallEvent(ctx context.Context, call *CallEvent) error { - var sourceModule, sourceVerb optional.Option[string] - if sr, ok := call.SourceVerb.Get(); ok { - sourceModule, sourceVerb = optional.Some(sr.Module), optional.Some(sr.Name) - } - var requestKey optional.Option[string] - if rn, ok := call.RequestKey.Get(); ok { - requestKey = optional.Some(rn.String()) - } - var parentRequestKey optional.Option[string] - if pr, ok := call.ParentRequestKey.Get(); ok { - parentRequestKey = optional.Some(pr.String()) - } - var payload ftlencryption.EncryptedTimelineColumn - err := d.encryption.EncryptJSON(map[string]any{ - "duration_ms": call.Duration.Milliseconds(), - "request": call.Request, - "response": call.Response, - "error": call.Error, - "stack": call.Stack, - }, &payload) - if err != nil { - return fmt.Errorf("failed to encrypt call payload: %w", err) - } - return libdal.TranslatePGError(d.db.InsertTimelineCallEvent(ctx, sql.InsertTimelineCallEventParams{ - DeploymentKey: call.DeploymentKey, - RequestKey: requestKey, - ParentRequestKey: parentRequestKey, - TimeStamp: call.Time, - SourceModule: sourceModule, - SourceVerb: sourceVerb, - DestModule: call.DestVerb.Module, - DestVerb: call.DestVerb.Name, - Payload: payload, - })) -} - -func (d *DAL) DeleteOldEvents(ctx context.Context, eventType EventType, age time.Duration) (int64, error) { - count, err := d.db.DeleteOldTimelineEvents(ctx, sqltypes.Duration(age), eventType) - return count, libdal.TranslatePGError(err) -} - -type EventType = sql.EventType - -// Supported event types. -const ( - EventTypeLog = sql.EventTypeLog - EventTypeCall = sql.EventTypeCall - EventTypeDeploymentCreated = sql.EventTypeDeploymentCreated - EventTypeDeploymentUpdated = sql.EventTypeDeploymentUpdated -) - -// TimelineEvent types. -// -//sumtype:decl -type TimelineEvent interface { - GetID() int64 - event() -} - -type LogEvent struct { - ID int64 - DeploymentKey model.DeploymentKey - RequestKey optional.Option[model.RequestKey] - Time time.Time - Level int32 - Attributes map[string]string - Message string - Error optional.Option[string] - Stack optional.Option[string] -} - -func (e *LogEvent) GetID() int64 { return e.ID } -func (e *LogEvent) event() {} - -type CallEvent struct { - ID int64 - DeploymentKey model.DeploymentKey - RequestKey optional.Option[model.RequestKey] - ParentRequestKey optional.Option[model.RequestKey] - Time time.Time - SourceVerb optional.Option[schema.Ref] - DestVerb schema.Ref - Duration time.Duration - Request json.RawMessage - Response json.RawMessage - Error optional.Option[string] - Stack optional.Option[string] -} - -func (e *CallEvent) GetID() int64 { return e.ID } -func (e *CallEvent) event() {} - -type DeploymentCreatedEvent struct { - ID int64 - DeploymentKey model.DeploymentKey - Time time.Time - Language string - ModuleName string - MinReplicas int - ReplacedDeployment optional.Option[model.DeploymentKey] -} - -func (e *DeploymentCreatedEvent) GetID() int64 { return e.ID } -func (e *DeploymentCreatedEvent) event() {} - -type DeploymentUpdatedEvent struct { - ID int64 - DeploymentKey model.DeploymentKey - Time time.Time - MinReplicas int - PrevMinReplicas int -} - -func (e *DeploymentUpdatedEvent) GetID() int64 { return e.ID } -func (e *DeploymentUpdatedEvent) event() {} - type eventFilterCall struct { sourceModule optional.Option[string] destModule string @@ -204,6 +35,12 @@ type eventFilter struct { descending bool } +type eventRow struct { + sql.Timeline + DeploymentKey model.DeploymentKey + RequestKey optional.Option[model.RequestKey] +} + type TimelineFilter func(query *eventFilter) func FilterLogLevel(level log.Level) TimelineFilter { @@ -266,39 +103,7 @@ func FilterDescending() TimelineFilter { } } -// The internal JSON payload of a call event. -type eventCallJSON struct { - DurationMS int64 `json:"duration_ms"` - Request json.RawMessage `json:"request"` - Response json.RawMessage `json:"response"` - Error optional.Option[string] `json:"error,omitempty"` - Stack optional.Option[string] `json:"stack,omitempty"` -} - -type eventLogJSON struct { - Message string `json:"message"` - Attributes map[string]string `json:"attributes"` - Error optional.Option[string] `json:"error,omitempty"` - Stack optional.Option[string] `json:"stack,omitempty"` -} - -type eventDeploymentCreatedJSON struct { - MinReplicas int `json:"min_replicas"` - ReplacedDeployment optional.Option[model.DeploymentKey] `json:"replaced,omitempty"` -} - -type eventDeploymentUpdatedJSON struct { - MinReplicas int `json:"min_replicas"` - PrevMinReplicas int `json:"prev_min_replicas"` -} - -type eventRow struct { - sql.Timeline - DeploymentKey model.DeploymentKey - RequestKey optional.Option[model.RequestKey] -} - -func (d *DAL) QueryTimeline(ctx context.Context, limit int, filters ...TimelineFilter) ([]TimelineEvent, error) { +func (s *Service) QueryTimeline(ctx context.Context, limit int, filters ...TimelineFilter) ([]TimelineEvent, error) { if limit < 1 { return nil, fmt.Errorf("limit must be >= 1, got %d", limit) } @@ -353,7 +158,7 @@ func (d *DAL) QueryTimeline(ctx context.Context, limit int, filters ...TimelineF deploymentQuery += ` WHERE key = ANY($1::TEXT[])` deploymentArgs = append(deploymentArgs, filter.deployments) } - rows, err := d.Handle.Connection.QueryContext(ctx, deploymentQuery, deploymentArgs...) + rows, err := s.Handle.Connection.QueryContext(ctx, deploymentQuery, deploymentArgs...) if err != nil { return nil, libdal.TranslatePGError(err) } @@ -409,20 +214,20 @@ func (d *DAL) QueryTimeline(ctx context.Context, limit int, filters ...TimelineF q += fmt.Sprintf(" LIMIT %d", limit) // Issue query. - rows, err = d.Handle.Connection.QueryContext(ctx, q, args...) + rows, err = s.Handle.Connection.QueryContext(ctx, q, args...) if err != nil { return nil, fmt.Errorf("%s: %w", q, libdal.TranslatePGError(err)) } defer rows.Close() - events, err := d.transformRowsToTimelineEvents(deploymentKeys, rows) + events, err := s.transformRowsToTimelineEvents(deploymentKeys, rows) if err != nil { return nil, err } return events, nil } -func (d *DAL) transformRowsToTimelineEvents(deploymentKeys map[int64]model.DeploymentKey, rows *stdsql.Rows) ([]TimelineEvent, error) { +func (s *Service) transformRowsToTimelineEvents(deploymentKeys map[int64]model.DeploymentKey, rows *stdsql.Rows) ([]TimelineEvent, error) { var out []TimelineEvent for rows.Next() { row := eventRow{} @@ -441,7 +246,7 @@ func (d *DAL) transformRowsToTimelineEvents(deploymentKeys map[int64]model.Deplo switch row.Type { case sql.EventTypeLog: var jsonPayload eventLogJSON - if err := d.encryption.DecryptJSON(&row.Payload, &jsonPayload); err != nil { + if err := s.encryption.DecryptJSON(&row.Payload, &jsonPayload); err != nil { return nil, fmt.Errorf("failed to decrypt log event: %w", err) } @@ -458,12 +263,11 @@ func (d *DAL) transformRowsToTimelineEvents(deploymentKeys map[int64]model.Deplo Attributes: jsonPayload.Attributes, Message: jsonPayload.Message, Error: jsonPayload.Error, - Stack: jsonPayload.Stack, }) case sql.EventTypeCall: var jsonPayload eventCallJSON - if err := d.encryption.DecryptJSON(&row.Payload, &jsonPayload); err != nil { + if err := s.encryption.DecryptJSON(&row.Payload, &jsonPayload); err != nil { return nil, fmt.Errorf("failed to decrypt call event: %w", err) } var sourceVerb optional.Option[schema.Ref] @@ -488,8 +292,8 @@ func (d *DAL) transformRowsToTimelineEvents(deploymentKeys map[int64]model.Deplo case sql.EventTypeDeploymentCreated: var jsonPayload eventDeploymentCreatedJSON - if err := d.encryption.DecryptJSON(&row.Payload, &jsonPayload); err != nil { - return nil, fmt.Errorf("failed to decrypt call event: %w", err) + if err := s.encryption.DecryptJSON(&row.Payload, &jsonPayload); err != nil { + return nil, fmt.Errorf("failed to decrypt deployment created event: %w", err) } out = append(out, &DeploymentCreatedEvent{ ID: row.ID, @@ -503,8 +307,8 @@ func (d *DAL) transformRowsToTimelineEvents(deploymentKeys map[int64]model.Deplo case sql.EventTypeDeploymentUpdated: var jsonPayload eventDeploymentUpdatedJSON - if err := d.encryption.DecryptJSON(&row.Payload, &jsonPayload); err != nil { - return nil, fmt.Errorf("failed to decrypt call event: %w", err) + if err := s.encryption.DecryptJSON(&row.Payload, &jsonPayload); err != nil { + return nil, fmt.Errorf("failed to decrypt deployment updated event: %w", err) } out = append(out, &DeploymentUpdatedEvent{ ID: row.ID, @@ -514,6 +318,17 @@ func (d *DAL) transformRowsToTimelineEvents(deploymentKeys map[int64]model.Deplo PrevMinReplicas: jsonPayload.PrevMinReplicas, }) + case sql.EventTypeIngress: + var jsonPayload eventIngressJSON + if err := s.encryption.DecryptJSON(&row.Payload, &jsonPayload); err != nil { + return nil, fmt.Errorf("failed to decrypt ingress event: %w", err) + } + out = append(out, &DeploymentUpdatedEvent{ + ID: row.ID, + DeploymentKey: row.DeploymentKey, + Time: row.TimeStamp, + }) + default: panic("unknown event type: " + row.Type) } diff --git a/backend/controller/timeline/timeline.go b/backend/controller/timeline/timeline.go index 8205453c34..0fa9715082 100644 --- a/backend/controller/timeline/timeline.go +++ b/backend/controller/timeline/timeline.go @@ -2,124 +2,55 @@ package timeline import ( "context" - "fmt" "time" - "github.com/alecthomas/types/either" - "github.com/alecthomas/types/optional" - "github.com/TBD54566975/ftl/backend/controller/encryption" - "github.com/TBD54566975/ftl/backend/controller/timeline/dal" + "github.com/TBD54566975/ftl/backend/controller/sql/sqltypes" + "github.com/TBD54566975/ftl/backend/controller/timeline/internal/sql" "github.com/TBD54566975/ftl/backend/libdal" - ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1" - "github.com/TBD54566975/ftl/backend/schema" - "github.com/TBD54566975/ftl/internal/log" - "github.com/TBD54566975/ftl/internal/model" ) -type Service struct { - dal *dal.DAL -} - -func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service) *Service { - return &Service{dal: dal.New(conn, encryption)} -} - -type Log struct { - DeploymentKey model.DeploymentKey - RequestKey optional.Option[model.RequestKey] - Msg *ftlv1.StreamDeploymentLogsRequest -} - -func (s *Service) RecordLog(ctx context.Context, log *Log) error { - err := s.dal.InsertLogEvent(ctx, &dal.LogEvent{ - RequestKey: log.RequestKey, - DeploymentKey: log.DeploymentKey, - Time: log.Msg.TimeStamp.AsTime(), - Level: log.Msg.LogLevel, - Attributes: log.Msg.Attributes, - Message: log.Msg.Message, - Error: optional.Ptr(log.Msg.Error), - }) - if err != nil { - return fmt.Errorf("failed to insert log event: %w", err) - } - return nil -} +type EventType = sql.EventType -type Call struct { - DeploymentKey model.DeploymentKey - RequestKey model.RequestKey - ParentRequestKey optional.Option[model.RequestKey] - StartTime time.Time - DestVerb *schema.Ref - Callers []*schema.Ref - Request *ftlv1.CallRequest - Response either.Either[*ftlv1.CallResponse, error] -} - -func (s *Service) RecordCall(ctx context.Context, call *Call) { - logger := log.FromContext(ctx) - var sourceVerb optional.Option[schema.Ref] - if len(call.Callers) > 0 { - sourceVerb = optional.Some(*call.Callers[0]) - } - - var errorStr optional.Option[string] - var stack optional.Option[string] - var responseBody []byte - - switch response := call.Response.(type) { - case either.Left[*ftlv1.CallResponse, error]: - resp := response.Get() - responseBody = resp.GetBody() - if callError := resp.GetError(); callError != nil { - errorStr = optional.Some(callError.Message) - stack = optional.Ptr(callError.Stack) - } - case either.Right[*ftlv1.CallResponse, error]: - callError := response.Get() - errorStr = optional.Some(callError.Error()) - } +// Supported event types. +const ( + EventTypeLog = sql.EventTypeLog + EventTypeCall = sql.EventTypeCall + EventTypeDeploymentCreated = sql.EventTypeDeploymentCreated + EventTypeDeploymentUpdated = sql.EventTypeDeploymentUpdated +) - err := s.dal.InsertCallEvent(ctx, &dal.CallEvent{ - Time: call.StartTime, - DeploymentKey: call.DeploymentKey, - RequestKey: optional.Some(call.RequestKey), - ParentRequestKey: call.ParentRequestKey, - Duration: time.Since(call.StartTime), - SourceVerb: sourceVerb, - DestVerb: *call.DestVerb, - Request: call.Request.GetBody(), - Response: responseBody, - Error: errorStr, - Stack: stack, - }) - if err != nil { - logger.Errorf(err, "failed to record call") - } +// TimelineEvent types. +// +//sumtype:decl +type TimelineEvent interface { + GetID() int64 + event() } -func (s *Service) InsertLogEvent(ctx context.Context, log *dal.LogEvent) error { - err := s.dal.InsertLogEvent(ctx, log) - if err != nil { - return fmt.Errorf("failed to insert log event: %w", err) - } - return nil +type Service struct { + *libdal.Handle[Service] + db sql.Querier + encryption *encryption.Service } -func (s *Service) QueryTimeline(ctx context.Context, limit int, filters ...dal.TimelineFilter) ([]dal.TimelineEvent, error) { - events, err := s.dal.QueryTimeline(ctx, limit, filters...) - if err != nil { - return nil, fmt.Errorf("failed to query timeline: %w", err) +func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service) *Service { + var s *Service + s = &Service{ + db: sql.New(conn), + encryption: encryption, + Handle: libdal.New(conn, func(h *libdal.Handle[Service]) *Service { + return &Service{ + Handle: h, + db: sql.New(h.Connection), + encryption: s.encryption, + } + }), } - return events, nil + return s } -func (s *Service) DeleteOldEvents(ctx context.Context, eventType dal.EventType, age time.Duration) (int64, error) { - deleted, err := s.dal.DeleteOldEvents(ctx, eventType, age) - if err != nil { - return 0, fmt.Errorf("failed to delete old events: %w", err) - } - return deleted, nil +func (s *Service) DeleteOldEvents(ctx context.Context, eventType EventType, age time.Duration) (int64, error) { + count, err := s.db.DeleteOldTimelineEvents(ctx, sqltypes.Duration(age), eventType) + return count, libdal.TranslatePGError(err) } diff --git a/backend/controller/timeline/dal/dal_test.go b/backend/controller/timeline/timeline_test.go similarity index 85% rename from backend/controller/timeline/dal/dal_test.go rename to backend/controller/timeline/timeline_test.go index 80fc2fb010..7e9d3e6ce8 100644 --- a/backend/controller/timeline/dal/dal_test.go +++ b/backend/controller/timeline/timeline_test.go @@ -1,4 +1,4 @@ -package dal +package timeline import ( "bytes" @@ -25,7 +25,7 @@ func TestTimelineDAL(t *testing.T) { encryption, err := encryption.New(ctx, conn, encryption.NewBuilder()) assert.NoError(t, err) - dal := New(conn, encryption) + timeline := New(ctx, conn, encryption) controllerDAL := controllerdal.New(ctx, conn, encryption) var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100) @@ -73,7 +73,7 @@ func TestTimelineDAL(t *testing.T) { DestVerb: schema.Ref{Module: "time", Name: "time"}, } t.Run("InsertCallEvent", func(t *testing.T) { - err = dal.InsertCallEvent(ctx, callEvent) + err = timeline.insertCallEvent(ctx, callEvent) assert.NoError(t, err) }) @@ -86,7 +86,7 @@ func TestTimelineDAL(t *testing.T) { Message: "A log entry", } t.Run("InsertLogEntry", func(t *testing.T) { - err = dal.InsertLogEvent(ctx, logEvent) + err = timeline.InsertLogEvent(ctx, logEvent) assert.NoError(t, err) }) @@ -97,37 +97,37 @@ func TestTimelineDAL(t *testing.T) { t.Run("QueryEvents", func(t *testing.T) { t.Run("Limit", func(t *testing.T) { - events, err := dal.QueryTimeline(ctx, 1) + events, err := timeline.QueryTimeline(ctx, 1) assert.NoError(t, err) assert.Equal(t, 1, len(events)) }) t.Run("NoFilters", func(t *testing.T) { - events, err := dal.QueryTimeline(ctx, 1000) + events, err := timeline.QueryTimeline(ctx, 1000) assert.NoError(t, err) assertEventsEqual(t, []TimelineEvent{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events) }) t.Run("ByDeployment", func(t *testing.T) { - events, err := dal.QueryTimeline(ctx, 1000, FilterDeployments(deploymentKey)) + events, err := timeline.QueryTimeline(ctx, 1000, FilterDeployments(deploymentKey)) assert.NoError(t, err) assertEventsEqual(t, []TimelineEvent{expectedDeploymentUpdatedEvent, callEvent, logEvent}, events) }) t.Run("ByCall", func(t *testing.T) { - events, err := dal.QueryTimeline(ctx, 1000, FilterTypes(EventTypeCall), FilterCall(optional.None[string](), "time", optional.None[string]())) + events, err := timeline.QueryTimeline(ctx, 1000, FilterTypes(EventTypeCall), FilterCall(optional.None[string](), "time", optional.None[string]())) assert.NoError(t, err) assertEventsEqual(t, []TimelineEvent{callEvent}, events) }) t.Run("ByLogLevel", func(t *testing.T) { - events, err := dal.QueryTimeline(ctx, 1000, FilterTypes(EventTypeLog), FilterLogLevel(log.Trace)) + events, err := timeline.QueryTimeline(ctx, 1000, FilterTypes(EventTypeLog), FilterLogLevel(log.Trace)) assert.NoError(t, err) assertEventsEqual(t, []TimelineEvent{logEvent}, events) }) t.Run("ByRequests", func(t *testing.T) { - events, err := dal.QueryTimeline(ctx, 1000, FilterRequests(requestKey)) + events, err := timeline.QueryTimeline(ctx, 1000, FilterRequests(requestKey)) assert.NoError(t, err) assertEventsEqual(t, []TimelineEvent{callEvent, logEvent}, events) }) @@ -135,7 +135,7 @@ func TestTimelineDAL(t *testing.T) { } func normaliseEvents(events []TimelineEvent) []TimelineEvent { - for i := range len(events) { + for i := range events { event := events[i] re := reflect.Indirect(reflect.ValueOf(event)) f := re.FieldByName("Time") @@ -159,7 +159,7 @@ func TestDeleteOldEvents(t *testing.T) { encryption, err := encryption.New(ctx, conn, encryption.NewBuilder()) assert.NoError(t, err) - dal := New(conn, encryption) + timeline := New(ctx, conn, encryption) controllerDAL := controllerdal.New(ctx, conn, encryption) var testContent = bytes.Repeat([]byte("sometestcontentthatislongerthanthereadbuffer"), 100) @@ -192,7 +192,7 @@ func TestDeleteOldEvents(t *testing.T) { DestVerb: schema.Ref{Module: "time", Name: "time"}, } t.Run("InsertCallEvent", func(t *testing.T) { - err = dal.InsertCallEvent(ctx, callEvent) + err = timeline.insertCallEvent(ctx, callEvent) assert.NoError(t, err) }) // hour old event @@ -205,7 +205,7 @@ func TestDeleteOldEvents(t *testing.T) { DestVerb: schema.Ref{Module: "time", Name: "time"}, } t.Run("InsertCallEvent", func(t *testing.T) { - err = dal.InsertCallEvent(ctx, callEvent) + err = timeline.insertCallEvent(ctx, callEvent) assert.NoError(t, err) }) @@ -219,7 +219,7 @@ func TestDeleteOldEvents(t *testing.T) { Message: "A log entry", } t.Run("InsertLogEntry", func(t *testing.T) { - err = dal.InsertLogEvent(ctx, logEvent) + err = timeline.InsertLogEvent(ctx, logEvent) assert.NoError(t, err) }) // hour old event @@ -232,20 +232,20 @@ func TestDeleteOldEvents(t *testing.T) { Message: "A log entry", } t.Run("InsertLogEntry", func(t *testing.T) { - err = dal.InsertLogEvent(ctx, logEvent) + err = timeline.InsertLogEvent(ctx, logEvent) assert.NoError(t, err) }) t.Run("DeleteOldEvents", func(t *testing.T) { - count, err := dal.DeleteOldEvents(ctx, EventTypeCall, 2*24*time.Hour) + count, err := timeline.DeleteOldEvents(ctx, EventTypeCall, 2*24*time.Hour) assert.NoError(t, err) assert.Equal(t, int64(1), count) - count, err = dal.DeleteOldEvents(ctx, EventTypeLog, time.Minute) + count, err = timeline.DeleteOldEvents(ctx, EventTypeLog, time.Minute) assert.NoError(t, err) assert.Equal(t, int64(2), count) - count, err = dal.DeleteOldEvents(ctx, EventTypeLog, time.Minute) + count, err = timeline.DeleteOldEvents(ctx, EventTypeLog, time.Minute) assert.NoError(t, err) assert.Equal(t, int64(0), count) }) diff --git a/deployment/base/db-migrate/kustomization.yml b/deployment/base/db-migrate/kustomization.yml index 3ace98922e..2e713d205e 100644 --- a/deployment/base/db-migrate/kustomization.yml +++ b/deployment/base/db-migrate/kustomization.yml @@ -26,4 +26,5 @@ configMapGenerator: - ./schema/20240913041619_encrypted_topic_events_payload.sql - ./schema/20240916015906_remove_runner_state.sql - ./schema/20240916190209_rename_controller_to_controllers.sql + - ./schema/20240917015216_add_ingress_event_type.sql - ./schema/20240917062716_change_deployments_index.sql diff --git a/sqlc.yaml b/sqlc.yaml index dd555fa9fe..f1b2df3955 100644 --- a/sqlc.yaml +++ b/sqlc.yaml @@ -8,7 +8,7 @@ sql: # FIXME: Until we fully decouple cron from the controller, we need to include the cron queries here - backend/controller/cronjobs/dal/internal/sql/queries.sql # Some of the timeline entries happen within a controller transaction, so we need to include them here - - backend/controller/timeline/dal/internal/sql/deployment_queries.sql + - backend/controller/timeline/internal/sql/deployment_queries.sql schema: "backend/controller/sql/schema" database: uri: postgres://localhost:15432/ftl?sslmode=disable&user=postgres&password=secret @@ -182,12 +182,12 @@ sql: out: "backend/controller/encryption/dal/internal/sql" - <<: *daldir queries: - - backend/controller/timeline/dal/internal/sql/queries.sql - - backend/controller/timeline/dal/internal/sql/deployment_queries.sql + - backend/controller/timeline/internal/sql/queries.sql + - backend/controller/timeline/internal/sql/deployment_queries.sql gen: go: <<: *gengo - out: "backend/controller/timeline/dal/internal/sql" + out: "backend/controller/timeline/internal/sql" rules: - name: postgresql-query-too-costly message: "Query cost estimate is too high"