Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add ingress events to timeline #2735

Merged
merged 1 commit into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ deploy/
reflex.conf
/logs/
/node_modules
package-lock.json
*.tsbuildinfo
generated_ftl_module.go
*.zip
Expand Down
32 changes: 32 additions & 0 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,38 @@ func eventDALToProto(event timeline.TimelineEvent) *pbconsole.Event {
},
}

case *timeline.IngressEvent:
var requestKey *string
if r, ok := event.RequestKey.Get(); ok {
rstr := r.String()
requestKey = &rstr
}

return &pbconsole.Event{
TimeStamp: timestamppb.New(event.Time),
Id: event.ID,
Entry: &pbconsole.Event_Ingress{
Ingress: &pbconsole.IngressEvent{
DeploymentKey: event.DeploymentKey.String(),
RequestKey: requestKey,
VerbRef: &schemapb.Ref{
Module: event.Verb.Module,
Name: event.Verb.Name,
},
Method: event.Method,
Path: event.Path,
StatusCode: int32(event.StatusCode),
TimeStamp: timestamppb.New(event.Time),
Duration: durationpb.New(event.Duration),
Request: string(event.Request),
RequestHeader: string(event.RequestHeader),
Response: string(event.Response),
ResponseHeader: string(event.ResponseHeader),
Error: event.Error.Ptr(),
},
},
}

default:
panic(fmt.Errorf("unknown event type %T", event))
}
Expand Down
12 changes: 8 additions & 4 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
requestKey := model.NewRequestKey(model.OriginIngress, fmt.Sprintf("%s %s", r.Method, r.URL.Path))
ingress.Handle(start, sch, requestKey, routes, w, r, s.callWithRequest)
ingress.Handle(start, sch, requestKey, routes, w, r, s.timeline, s.callWithRequest)
}

func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.ProcessListRequest]) (*connect.Response[ftlv1.ProcessListResponse], error) {
Expand Down Expand Up @@ -459,10 +459,14 @@ func (s *Service) StreamDeploymentLogs(ctx context.Context, stream *connect.Clie
requestKey = optional.Some(rkey)
}

err = s.timeline.RecordLog(ctx, &timeline.Log{
err = s.timeline.InsertLogEvent(ctx, &timeline.Log{
DeploymentKey: deploymentKey,
RequestKey: requestKey,
Msg: msg,
Time: msg.TimeStamp.AsTime(),
Level: msg.LogLevel,
Attributes: msg.Attributes,
Message: msg.Message,
Error: optional.Ptr(msg.Error),
})

if err != nil {
Expand Down Expand Up @@ -1052,7 +1056,7 @@ func (s *Service) callWithRequest(
callResponse = either.RightOf[*ftlv1.CallResponse](err)
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("verb call failed"))
}
s.timeline.RecordCall(ctx, &timeline.Call{
s.timeline.InsertCallEvent(ctx, &timeline.Call{
DeploymentKey: route.Deployment,
RequestKey: requestKey,
ParentRequestKey: parentKey,
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/deployment_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (d *deploymentLogsSink) processLogs(ctx context.Context) {
errorStr = optional.Some(entry.Error.Error())
}

err = d.timeline.InsertLogEvent(ctx, &timeline.LogEvent{
err = d.timeline.InsertLogEvent(ctx, &timeline.Log{
RequestKey: request,
DeploymentKey: deployment,
Time: entry.Time,
Expand Down
51 changes: 48 additions & 3 deletions backend/controller/ingress/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"

"connectrpc.com/connect"
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/observability"
"github.com/TBD54566975/ftl/backend/controller/timeline"
"github.com/TBD54566975/ftl/backend/libdal"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
Expand All @@ -29,10 +32,12 @@ func Handle(
routes []dal.IngressRoute,
w http.ResponseWriter,
r *http.Request,
timelineService *timeline.Service,
call func(context.Context, *connect.Request[ftlv1.CallRequest], optional.Option[model.RequestKey], optional.Option[model.RequestKey], string) (*connect.Response[ftlv1.CallResponse], error),
) {
logger := log.FromContext(r.Context()).Scope(fmt.Sprintf("ingress:%s:%s", r.Method, r.URL.Path))
logger.Debugf("Start ingress request")

route, err := GetIngressRoute(routes, r.Method, r.URL.Path)
if err != nil {
if errors.Is(err, libdal.ErrNotFound) {
Expand All @@ -48,12 +53,22 @@ func Handle(

verbRef := &schemapb.Ref{Module: route.Module, Name: route.Verb}

ingressEvent := timeline.Ingress{
DeploymentKey: route.Deployment,
RequestKey: requestKey,
StartTime: startTime,
Verb: &schema.Ref{Name: route.Verb, Module: route.Module},
Request: r,
Response: &http.Response{Header: make(http.Header)},
}

body, err := BuildRequestBody(route, r, sch)
if err != nil {
// Only log at debug, as this is a client side error
logger.Debugf("bad request: %s", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("bad request"))
recordIngressErrorEvent(r.Context(), timelineService, &ingressEvent, http.StatusBadRequest, err.Error())
return
}

Expand All @@ -65,14 +80,16 @@ func Handle(

resp, err := call(r.Context(), creq, optional.Some(requestKey), optional.None[model.RequestKey](), r.RemoteAddr)
if err != nil {
logger.Errorf(err, "failed to call verb %s", route.Verb)
logger.Errorf(err, "failed to call verb")
if connectErr := new(connect.Error); errors.As(err, &connectErr) {
httpCode := connectCodeToHTTP(connectErr.Code())
http.Error(w, http.StatusText(httpCode), httpCode)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("failed to call verb: connect error"))
recordIngressErrorEvent(r.Context(), timelineService, &ingressEvent, http.StatusInternalServerError, connectErr.Error())
} else {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("failed to call verb: internal server error"))
recordIngressErrorEvent(r.Context(), timelineService, &ingressEvent, http.StatusInternalServerError, err.Error())
}
return
}
Expand All @@ -84,6 +101,7 @@ func Handle(
logger.Errorf(err, "could not resolve schema type for verb %s", route.Verb)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not resolve schema type for verb"))
recordIngressErrorEvent(r.Context(), timelineService, &ingressEvent, http.StatusInternalServerError, err.Error())
return
}
var responseBody []byte
Expand All @@ -94,6 +112,7 @@ func Handle(
logger.Errorf(err, "could not unmarhal response for verb %s", verb)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not unmarhal response for verb"))
recordIngressErrorEvent(r.Context(), timelineService, &ingressEvent, http.StatusInternalServerError, err.Error())
return
}

Expand All @@ -103,35 +122,61 @@ func Handle(
logger.Errorf(err, "could not create response for verb %s", verb)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not create response for verb"))
recordIngressErrorEvent(r.Context(), timelineService, &ingressEvent, http.StatusInternalServerError, err.Error())
return
}

for k, v := range responseHeaders {
w.Header()[k] = v
ingressEvent.Response.Header.Set(k, v[0])
}

statusCode := http.StatusOK

// Override with status from verb if provided
if response.Status != 0 {
w.WriteHeader(response.Status)
statusCode = response.Status
w.WriteHeader(statusCode)
}

ingressEvent.Response.StatusCode = statusCode
} else {
w.WriteHeader(http.StatusOK)
ingressEvent.Response.StatusCode = http.StatusOK
w.Header().Set("Content-Type", "application/json; charset=utf-8")
ingressEvent.Response.Header.Set("Content-Type", "application/json; charset=utf-8")
responseBody = msg.Body
}
_, err = w.Write(responseBody)
if err == nil {
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.None[string]())
ingressEvent.Response.Body = io.NopCloser(strings.NewReader(string(responseBody)))
timelineService.InsertHTTPIngress(r.Context(), &ingressEvent)
} else {
logger.Errorf(err, "Could not write response body")
logger.Errorf(err, "could not write response body")
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not write response body"))
recordIngressErrorEvent(r.Context(), timelineService, &ingressEvent, http.StatusInternalServerError, err.Error())
}

case *ftlv1.CallResponse_Error_:
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("call response: internal server error"))
recordIngressErrorEvent(r.Context(), timelineService, &ingressEvent, http.StatusInternalServerError, msg.Error.Message)
}
}

func recordIngressErrorEvent(
ctx context.Context,
timelineService *timeline.Service,
ingressEvent *timeline.Ingress,
statusCode int,
errorMsg string,
) {
ingressEvent.Response.StatusCode = statusCode
ingressEvent.Error = optional.Some(errorMsg)
timelineService.InsertHTTPIngress(ctx, ingressEvent)
}

// Copied from the Apache-licensed connect-go source.
func connectCodeToHTTP(code connect.Code) int {
switch code {
Expand Down
10 changes: 9 additions & 1 deletion backend/controller/ingress/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ import (
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/ingress"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/controller/timeline"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/go-runtime/encoding"
Expand Down Expand Up @@ -68,6 +71,11 @@ func TestIngress(t *testing.T) {
}

ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)
encryption, err := encryption.New(ctx, conn, encryption.NewBuilder())
assert.NoError(t, err)

timelineSrv := timeline.New(ctx, conn, encryption)

for _, test := range []struct {
name string
Expand Down Expand Up @@ -100,7 +108,7 @@ func TestIngress(t *testing.T) {
req := httptest.NewRequest(test.method, test.path, bytes.NewBuffer(test.payload)).WithContext(ctx)
req.URL.RawQuery = test.query.Encode()
reqKey := model.NewRequestKey(model.OriginIngress, "test")
ingress.Handle(time.Now(), sch, reqKey, routes, rec, req, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], parentRequestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) {
ingress.Handle(time.Now(), sch, reqKey, routes, rec, req, timelineSrv, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], parentRequestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) {
body, err := encoding.Marshal(response)
assert.NoError(t, err)
return connect.NewResponse(&ftlv1.CallResponse{Response: &ftlv1.CallResponse_Body{Body: body}}), nil
Expand Down
Loading
Loading