Skip to content

Commit

Permalink
chore(refactor): add ingress event type and refactor timeline
Browse files Browse the repository at this point in the history
  • Loading branch information
wesbillman committed Sep 18, 2024
1 parent b805ee8 commit cd4b33f
Show file tree
Hide file tree
Showing 20 changed files with 509 additions and 377 deletions.
44 changes: 21 additions & 23 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (

"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/timeline"
timelinedal "github.com/TBD54566975/ftl/backend/controller/timeline/dal"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
pbconsole "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/console"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/console/pbconsoleconnect"
Expand Down Expand Up @@ -242,7 +241,7 @@ func (c *ConsoleService) StreamEvents(ctx context.Context, req *connect.Request[
newQuery := query

if !lastEventTime.IsZero() {
newQuery = append(newQuery, timelinedal.FilterTimeRange(thisRequestTime, lastEventTime))
newQuery = append(newQuery, timeline.FilterTimeRange(thisRequestTime, lastEventTime))
}

events, err := c.timeline.QueryTimeline(ctx, int(req.Msg.Query.Limit), newQuery...)
Expand All @@ -268,11 +267,11 @@ func (c *ConsoleService) StreamEvents(ctx context.Context, req *connect.Request[
}
}

func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timelinedal.TimelineFilter, error) {
var query []timelinedal.TimelineFilter
func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timeline.TimelineFilter, error) {
var query []timeline.TimelineFilter

if pb.Order == pbconsole.EventsQuery_DESC {
query = append(query, timelinedal.FilterDescending())
query = append(query, timeline.FilterDescending())
}

for _, filter := range pb.Filters {
Expand All @@ -286,7 +285,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timelinedal.TimelineFil
}
deploymentKeys = append(deploymentKeys, deploymentKey)
}
query = append(query, timelinedal.FilterDeployments(deploymentKeys...))
query = append(query, timeline.FilterDeployments(deploymentKeys...))

case *pbconsole.EventsQuery_Filter_Requests:
requestKeys := make([]model.RequestKey, 0, len(filter.Requests.Requests))
Expand All @@ -297,32 +296,32 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timelinedal.TimelineFil
}
requestKeys = append(requestKeys, requestKey)
}
query = append(query, timelinedal.FilterRequests(requestKeys...))
query = append(query, timeline.FilterRequests(requestKeys...))

case *pbconsole.EventsQuery_Filter_EventTypes:
eventTypes := make([]timelinedal.EventType, 0, len(filter.EventTypes.EventTypes))
eventTypes := make([]timeline.EventType, 0, len(filter.EventTypes.EventTypes))
for _, eventType := range filter.EventTypes.EventTypes {
switch eventType {
case pbconsole.EventType_EVENT_TYPE_CALL:
eventTypes = append(eventTypes, timelinedal.EventTypeCall)
eventTypes = append(eventTypes, timeline.EventTypeCall)
case pbconsole.EventType_EVENT_TYPE_LOG:
eventTypes = append(eventTypes, timelinedal.EventTypeLog)
eventTypes = append(eventTypes, timeline.EventTypeLog)
case pbconsole.EventType_EVENT_TYPE_DEPLOYMENT_CREATED:
eventTypes = append(eventTypes, timelinedal.EventTypeDeploymentCreated)
eventTypes = append(eventTypes, timeline.EventTypeDeploymentCreated)
case pbconsole.EventType_EVENT_TYPE_DEPLOYMENT_UPDATED:
eventTypes = append(eventTypes, timelinedal.EventTypeDeploymentUpdated)
eventTypes = append(eventTypes, timeline.EventTypeDeploymentUpdated)
default:
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown event type %v", eventType))
}
}
query = append(query, timelinedal.FilterTypes(eventTypes...))
query = append(query, timeline.FilterTypes(eventTypes...))

case *pbconsole.EventsQuery_Filter_LogLevel:
level := log.Level(filter.LogLevel.LogLevel)
if level < log.Trace || level > log.Error {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown log level %v", filter.LogLevel.LogLevel))
}
query = append(query, timelinedal.FilterLogLevel(level))
query = append(query, timeline.FilterLogLevel(level))

case *pbconsole.EventsQuery_Filter_Time:
var newerThan, olderThan time.Time
Expand All @@ -332,7 +331,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timelinedal.TimelineFil
if filter.Time.OlderThan != nil {
olderThan = filter.Time.OlderThan.AsTime()
}
query = append(query, timelinedal.FilterTimeRange(olderThan, newerThan))
query = append(query, timeline.FilterTimeRange(olderThan, newerThan))

case *pbconsole.EventsQuery_Filter_Id:
var lowerThan, higherThan int64
Expand All @@ -342,7 +341,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timelinedal.TimelineFil
if filter.Id.HigherThan != nil {
higherThan = *filter.Id.HigherThan
}
query = append(query, timelinedal.FilterIDRange(lowerThan, higherThan))
query = append(query, timeline.FilterIDRange(lowerThan, higherThan))
case *pbconsole.EventsQuery_Filter_Call:
var sourceModule optional.Option[string]
if filter.Call.SourceModule != nil {
Expand All @@ -352,7 +351,7 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timelinedal.TimelineFil
if filter.Call.DestVerb != nil {
destVerb = optional.Some(*filter.Call.DestVerb)
}
query = append(query, timelinedal.FilterCall(sourceModule, filter.Call.DestModule, destVerb))
query = append(query, timeline.FilterCall(sourceModule, filter.Call.DestModule, destVerb))

default:
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown filter %T", filter))
Expand All @@ -361,9 +360,9 @@ func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timelinedal.TimelineFil
return query, nil
}

func eventDALToProto(event timelinedal.TimelineEvent) *pbconsole.Event {
func eventDALToProto(event timeline.TimelineEvent) *pbconsole.Event {
switch event := event.(type) {
case *timelinedal.CallEvent:
case *timeline.CallEvent:
var requestKey *string
if r, ok := event.RequestKey.Get(); ok {
rstr := r.String()
Expand Down Expand Up @@ -395,7 +394,7 @@ func eventDALToProto(event timelinedal.TimelineEvent) *pbconsole.Event {
},
}

case *timelinedal.LogEvent:
case *timeline.LogEvent:
var requestKey *string
if r, ok := event.RequestKey.Get(); ok {
rstr := r.String()
Expand All @@ -413,12 +412,11 @@ func eventDALToProto(event timelinedal.TimelineEvent) *pbconsole.Event {
Attributes: event.Attributes,
Message: event.Message,
Error: event.Error.Ptr(),
Stack: event.Stack.Ptr(),
},
},
}

case *timelinedal.DeploymentCreatedEvent:
case *timeline.DeploymentCreatedEvent:
var replaced *string
if r, ok := event.ReplacedDeployment.Get(); ok {
rstr := r.String()
Expand All @@ -437,7 +435,7 @@ func eventDALToProto(event timelinedal.TimelineEvent) *pbconsole.Event {
},
},
}
case *timelinedal.DeploymentUpdatedEvent:
case *timeline.DeploymentUpdatedEvent:
return &pbconsole.Event{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Expand Down
3 changes: 1 addition & 2 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/TBD54566975/ftl/backend/controller/scaling"
"github.com/TBD54566975/ftl/backend/controller/scheduledtask"
"github.com/TBD54566975/ftl/backend/controller/timeline"
timelinedal "github.com/TBD54566975/ftl/backend/controller/timeline/dal"
"github.com/TBD54566975/ftl/backend/libdal"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/console/pbconsoleconnect"
Expand Down Expand Up @@ -1836,7 +1835,7 @@ func (s *Service) reapCallEvents(ctx context.Context) (time.Duration, error) {
return time.Hour, nil
}

removed, err := s.timeline.DeleteOldEvents(ctx, timelinedal.EventTypeCall, *s.config.EventLogRetention)
removed, err := s.timeline.DeleteOldEvents(ctx, timeline.EventTypeCall, *s.config.EventLogRetention)
if err != nil {
return 0, fmt.Errorf("failed to prune call events: %w", err)
}
Expand Down
13 changes: 6 additions & 7 deletions backend/controller/deployment_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/timeline"
timelinedal "github.com/TBD54566975/ftl/backend/controller/timeline/dal"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
)
Expand Down Expand Up @@ -59,11 +58,6 @@ func (d *deploymentLogsSink) processLogs(ctx context.Context) {
}
deployment = dep

var errorStr optional.Option[string]
if entry.Error != nil {
errorStr = optional.Some(entry.Error.Error())
}

var request optional.Option[model.RequestKey]
if reqStr, ok := entry.Attributes["request"]; ok {
req, err := model.ParseRequestKey(reqStr)
Expand All @@ -72,7 +66,12 @@ func (d *deploymentLogsSink) processLogs(ctx context.Context) {
}
}

err = d.timeline.InsertLogEvent(ctx, &timelinedal.LogEvent{
var errorStr optional.Option[string]
if entry.Error != nil {
errorStr = optional.Some(entry.Error.Error())
}

err = d.timeline.InsertLogEvent(ctx, &timeline.LogEvent{
RequestKey: request,
DeploymentKey: deployment,
Time: entry.Time,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
-- migrate:up

ALTER TYPE event_type ADD VALUE IF NOT EXISTS 'ingress';

-- migrate:down

135 changes: 135 additions & 0 deletions backend/controller/timeline/events_call.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package timeline

import (
"context"
"encoding/json"
"fmt"
"time"

ftlencryption "github.com/TBD54566975/ftl/backend/controller/encryption/api"
"github.com/TBD54566975/ftl/backend/controller/timeline/internal/sql"
"github.com/TBD54566975/ftl/backend/libdal"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
"github.com/alecthomas/types/either"
"github.com/alecthomas/types/optional"
)

type CallEvent struct {
ID int64
DeploymentKey model.DeploymentKey
RequestKey optional.Option[model.RequestKey]
ParentRequestKey optional.Option[model.RequestKey]
Time time.Time
SourceVerb optional.Option[schema.Ref]
DestVerb schema.Ref
Duration time.Duration
Request json.RawMessage
Response json.RawMessage
Error optional.Option[string]
Stack optional.Option[string]
}

func (e *CallEvent) GetID() int64 { return e.ID }
func (e *CallEvent) event() {}

// The internal JSON payload of a call event.
type eventCallJSON struct {
DurationMS int64 `json:"duration_ms"`
Request json.RawMessage `json:"request"`
Response json.RawMessage `json:"response"`
Error optional.Option[string] `json:"error,omitempty"`
Stack optional.Option[string] `json:"stack,omitempty"`
}

type Call struct {
DeploymentKey model.DeploymentKey
RequestKey model.RequestKey
ParentRequestKey optional.Option[model.RequestKey]
StartTime time.Time
DestVerb *schema.Ref
Callers []*schema.Ref
Request *ftlv1.CallRequest
Response either.Either[*ftlv1.CallResponse, error]
}

func (s *Service) RecordCall(ctx context.Context, call *Call) {
logger := log.FromContext(ctx)
var sourceVerb optional.Option[schema.Ref]
if len(call.Callers) > 0 {
sourceVerb = optional.Some(*call.Callers[0])
}

var errorStr optional.Option[string]
var stack optional.Option[string]
var responseBody []byte

switch response := call.Response.(type) {
case either.Left[*ftlv1.CallResponse, error]:
resp := response.Get()
responseBody = resp.GetBody()
if callError := resp.GetError(); callError != nil {
errorStr = optional.Some(callError.Message)
stack = optional.Ptr(callError.Stack)
}
case either.Right[*ftlv1.CallResponse, error]:
callError := response.Get()
errorStr = optional.Some(callError.Error())
}

err := s.insertCallEvent(ctx, &CallEvent{
Time: call.StartTime,
DeploymentKey: call.DeploymentKey,
RequestKey: optional.Some(call.RequestKey),
ParentRequestKey: call.ParentRequestKey,
Duration: time.Since(call.StartTime),
SourceVerb: sourceVerb,
DestVerb: *call.DestVerb,
Request: call.Request.GetBody(),
Response: responseBody,
Error: errorStr,
Stack: stack,
})
if err != nil {
logger.Errorf(err, "failed to record call")
}
}

func (s *Service) insertCallEvent(ctx context.Context, call *CallEvent) error {
var sourceModule, sourceVerb optional.Option[string]
if sr, ok := call.SourceVerb.Get(); ok {
sourceModule, sourceVerb = optional.Some(sr.Module), optional.Some(sr.Name)
}
var requestKey optional.Option[string]
if rn, ok := call.RequestKey.Get(); ok {
requestKey = optional.Some(rn.String())
}
var parentRequestKey optional.Option[string]
if pr, ok := call.ParentRequestKey.Get(); ok {
parentRequestKey = optional.Some(pr.String())
}
var payload ftlencryption.EncryptedTimelineColumn
err := s.encryption.EncryptJSON(map[string]any{
"duration_ms": call.Duration.Milliseconds(),
"request": call.Request,
"response": call.Response,
"error": call.Error,
"stack": call.Stack,
}, &payload)
if err != nil {
return fmt.Errorf("failed to encrypt call payload: %w", err)
}
return libdal.TranslatePGError(s.db.InsertTimelineCallEvent(ctx, sql.InsertTimelineCallEventParams{
DeploymentKey: call.DeploymentKey,
RequestKey: requestKey,
ParentRequestKey: parentRequestKey,
TimeStamp: call.Time,
SourceModule: sourceModule,
SourceVerb: sourceVerb,
DestModule: call.DestVerb.Module,
DestVerb: call.DestVerb.Name,
Payload: payload,
}))
}
42 changes: 42 additions & 0 deletions backend/controller/timeline/events_deployment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package timeline

import (
"time"

"github.com/TBD54566975/ftl/internal/model"
"github.com/alecthomas/types/optional"
)

type DeploymentCreatedEvent struct {
ID int64
DeploymentKey model.DeploymentKey
Time time.Time
Language string
ModuleName string
MinReplicas int
ReplacedDeployment optional.Option[model.DeploymentKey]
}

func (e *DeploymentCreatedEvent) GetID() int64 { return e.ID }
func (e *DeploymentCreatedEvent) event() {}

type eventDeploymentUpdatedJSON struct {
MinReplicas int `json:"min_replicas"`
PrevMinReplicas int `json:"prev_min_replicas"`
}

type DeploymentUpdatedEvent struct {
ID int64
DeploymentKey model.DeploymentKey
Time time.Time
MinReplicas int
PrevMinReplicas int
}

func (e *DeploymentUpdatedEvent) GetID() int64 { return e.ID }
func (e *DeploymentUpdatedEvent) event() {}

type eventDeploymentCreatedJSON struct {
MinReplicas int `json:"min_replicas"`
ReplacedDeployment optional.Option[model.DeploymentKey] `json:"replaced,omitempty"`
}
Loading

0 comments on commit cd4b33f

Please sign in to comment.