Skip to content

Commit

Permalink
feat: add ingress events to timeline
Browse files Browse the repository at this point in the history
  • Loading branch information
wesbillman committed Sep 19, 2024
1 parent 9f9c28f commit 11024ed
Show file tree
Hide file tree
Showing 22 changed files with 1,393 additions and 9,883 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ deploy/
reflex.conf
/logs/
/node_modules
package-lock.json
*.tsbuildinfo
generated_ftl_module.go
*.zip
Expand Down
32 changes: 32 additions & 0 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,38 @@ func eventDALToProto(event timeline.TimelineEvent) *pbconsole.Event {
},
}

case *timeline.IngressEvent:
var requestKey *string
if r, ok := event.RequestKey.Get(); ok {
rstr := r.String()
requestKey = &rstr
}

return &pbconsole.Event{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.Event_Ingress{
Ingress: &pbconsole.IngressEvent{
DeploymentKey: event.DeploymentKey.String(),
RequestKey: requestKey,
VerbRef: &schemapb.Ref{
Module: event.Verb.Module,
Name: event.Verb.Name,
},
Method: event.Method,
Path: event.Path,
StatusCode: int32(event.StatusCode),
TimeStamp: timestamppb.New(event.Time),
Duration: durationpb.New(event.Duration),
Request: string(event.Request),
RequestHeader: string(event.RequestHeader),
Response: string(event.Response),
ResponseHeader: string(event.ResponseHeader),
Error: event.Error.Ptr(),
},
},
}

default:
panic(fmt.Errorf("unknown event type %T", event))
}
Expand Down
12 changes: 8 additions & 4 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
requestKey := model.NewRequestKey(model.OriginIngress, fmt.Sprintf("%s %s", r.Method, r.URL.Path))
ingress.Handle(start, sch, requestKey, routes, w, r, s.callWithRequest)
ingress.Handle(start, sch, requestKey, routes, w, r, s.timeline, s.callWithRequest)
}

func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.ProcessListRequest]) (*connect.Response[ftlv1.ProcessListResponse], error) {
Expand Down Expand Up @@ -459,10 +459,14 @@ func (s *Service) StreamDeploymentLogs(ctx context.Context, stream *connect.Clie
requestKey = optional.Some(rkey)
}

err = s.timeline.RecordLog(ctx, &timeline.Log{
err = s.timeline.InsertLogEvent(ctx, &timeline.Log{
DeploymentKey: deploymentKey,
RequestKey: requestKey,
Msg: msg,
Time: msg.TimeStamp.AsTime(),
Level: msg.LogLevel,
Attributes: msg.Attributes,
Message: msg.Message,
Error: optional.Ptr(msg.Error),
})

if err != nil {
Expand Down Expand Up @@ -1052,7 +1056,7 @@ func (s *Service) callWithRequest(
callResponse = either.RightOf[*ftlv1.CallResponse](err)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed"))
}
s.timeline.RecordCall(ctx, &timeline.Call{
s.timeline.InsertCallEvent(ctx, &timeline.Call{
DeploymentKey: route.Deployment,
RequestKey: requestKey,
ParentRequestKey: parentKey,
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/deployment_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (d *deploymentLogsSink) processLogs(ctx context.Context) {
errorStr = optional.Some(entry.Error.Error())
}

err = d.timeline.InsertLogEvent(ctx, &timeline.LogEvent{
err = d.timeline.InsertLogEvent(ctx, &timeline.Log{
RequestKey: request,
DeploymentKey: deployment,
Time: entry.Time,
Expand Down
73 changes: 61 additions & 12 deletions backend/controller/ingress/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"

"connectrpc.com/connect"
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/observability"
"github.com/TBD54566975/ftl/backend/controller/timeline"
"github.com/TBD54566975/ftl/backend/libdal"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
Expand All @@ -29,10 +32,12 @@ func Handle(
routes []dal.IngressRoute,
w http.ResponseWriter,
r *http.Request,
timelineService *timeline.Service,
call func(context.Context, *connect.Request[ftlv1.CallRequest], optional.Option[model.RequestKey], optional.Option[model.RequestKey], string) (*connect.Response[ftlv1.CallResponse], error),
) {
logger := log.FromContext(r.Context()).Scope(fmt.Sprintf("ingress:%s:%s", r.Method, r.URL.Path))
logger.Debugf("Start ingress request")

route, err := GetIngressRoute(routes, r.Method, r.URL.Path)
if err != nil {
if errors.Is(err, libdal.ErrNotFound) {
Expand All @@ -48,12 +53,24 @@ func Handle(

verbRef := &schemapb.Ref{Module: route.Module, Name: route.Verb}

ingressEvent := timeline.Ingress{
DeploymentKey: route.Deployment,
RequestKey: requestKey,
StartTime: startTime,
Verb: &schema.Ref{Name: route.Verb, Module: route.Module},
Request: r,
Response: &http.Response{Header: make(http.Header)},
}

body, err := BuildRequestBody(route, r, sch)
if err != nil {
// Only log at debug, as this is a client side error
logger.Debugf("bad request: %s", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("bad request"))
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some(err.Error()))
ingressEvent.Response = &http.Response{StatusCode: http.StatusBadRequest}
ingressEvent.Error = optional.Some(err.Error())
timelineService.InsertHTTPIngress(r.Context(), &ingressEvent)
return
}

Expand All @@ -65,25 +82,33 @@ func Handle(

resp, err := call(r.Context(), creq, optional.Some(requestKey), optional.None[model.RequestKey](), r.RemoteAddr)
if err != nil {
logger.Errorf(err, "failed to call verb %s", route.Verb)
logger.Errorf(err, "failed to call verb")
var errMsg string
if connectErr := new(connect.Error); errors.As(err, &connectErr) {
httpCode := connectCodeToHTTP(connectErr.Code())
http.Error(w, http.StatusText(httpCode), httpCode)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("failed to call verb: connect error"))
errMsg = fmt.Sprintf("%s: connect error: %s", err.Error(), connectErr.Message())
} else {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("failed to call verb: internal server error"))
errMsg = fmt.Sprintf("%s: internal server error", err.Error())
}
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some(errMsg))
ingressEvent.Response = &http.Response{StatusCode: http.StatusInternalServerError}
ingressEvent.Error = optional.Some(errMsg)
timelineService.InsertHTTPIngress(r.Context(), &ingressEvent)
return
}
switch msg := resp.Msg.Response.(type) {
case *ftlv1.CallResponse_Body:
verb := &schema.Verb{}
err = sch.ResolveToType(&schema.Ref{Name: route.Verb, Module: route.Module}, verb)
if err != nil {
logger.Errorf(err, "could not resolve schema type for verb %s", route.Verb)
logger.Errorf(err, "could not resolve schema type for verb")
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not resolve schema type for verb"))
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some(err.Error()))
ingressEvent.Response = &http.Response{StatusCode: http.StatusInternalServerError}
ingressEvent.Error = optional.Some(err.Error())
timelineService.InsertHTTPIngress(r.Context(), &ingressEvent)
return
}
var responseBody []byte
Expand All @@ -93,7 +118,10 @@ func Handle(
if err := json.Unmarshal(msg.Body, &response); err != nil {
logger.Errorf(err, "could not unmarhal response for verb %s", verb)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not unmarhal response for verb"))
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some(err.Error()))
ingressEvent.Response = &http.Response{StatusCode: http.StatusInternalServerError}
ingressEvent.Error = optional.Some(err.Error())
timelineService.InsertHTTPIngress(r.Context(), &ingressEvent)
return
}

Expand All @@ -102,33 +130,54 @@ func Handle(
if err != nil {
logger.Errorf(err, "could not create response for verb %s", verb)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not create response for verb"))
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some(err.Error()))
ingressEvent.Response = &http.Response{StatusCode: http.StatusInternalServerError}
ingressEvent.Error = optional.Some(err.Error())
timelineService.InsertHTTPIngress(r.Context(), &ingressEvent)
return
}

for k, v := range responseHeaders {
w.Header()[k] = v
ingressEvent.Response.Header.Set(k, v[0])
}

statusCode := http.StatusOK

// Override with status from verb if provided
if response.Status != 0 {
w.WriteHeader(response.Status)
statusCode = response.Status
w.WriteHeader(statusCode)
}

ingressEvent.Response.StatusCode = statusCode
} else {
w.WriteHeader(http.StatusOK)
ingressEvent.Response.StatusCode = http.StatusOK
w.Header().Set("Content-Type", "application/json; charset=utf-8")
ingressEvent.Response.Header.Set("Content-Type", "application/json; charset=utf-8")
responseBody = msg.Body
}
_, err = w.Write(responseBody)
if err == nil {
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.None[string]())
ingressEvent.Response.Body = io.NopCloser(strings.NewReader(string(responseBody)))
timelineService.InsertHTTPIngress(r.Context(), &ingressEvent)
} else {
logger.Errorf(err, "Could not write response body")
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not write response body"))
logger.Errorf(err, "could not write response body")
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some(err.Error()))
ingressEvent.Response = &http.Response{StatusCode: http.StatusInternalServerError}
ingressEvent.Error = optional.Some(err.Error())
timelineService.InsertHTTPIngress(r.Context(), &ingressEvent)
}

case *ftlv1.CallResponse_Error_:
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("call response: internal server error"))
errMsg := fmt.Sprintf("call response: internal server error: %s", msg.Error.Message)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some(errMsg))
ingressEvent.Response = &http.Response{StatusCode: http.StatusInternalServerError}
ingressEvent.Error = optional.Some(errMsg)
timelineService.InsertHTTPIngress(r.Context(), &ingressEvent)
}
}

Expand Down
10 changes: 9 additions & 1 deletion backend/controller/ingress/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/ingress"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/controller/timeline"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/go-runtime/encoding"
Expand Down Expand Up @@ -68,6 +71,11 @@ func TestIngress(t *testing.T) {
}

ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)

timelineSrv := timeline.New(ctx, conn, encryption)

for _, test := range []struct {
name string
Expand Down Expand Up @@ -100,7 +108,7 @@ func TestIngress(t *testing.T) {
req := httptest.NewRequest(test.method, test.path, bytes.NewBuffer(test.payload)).WithContext(ctx)
req.URL.RawQuery = test.query.Encode()
reqKey := model.NewRequestKey(model.OriginIngress, "test")
ingress.Handle(time.Now(), sch, reqKey, routes, rec, req, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], parentRequestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) {
ingress.Handle(time.Now(), sch, reqKey, routes, rec, req, timelineSrv, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], parentRequestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) {
body, err := encoding.Marshal(response)
assert.NoError(t, err)
return connect.NewResponse(&ftlv1.CallResponse{Response: &ftlv1.CallResponse_Body{Body: body}}), nil
Expand Down
Loading

0 comments on commit 11024ed

Please sign in to comment.