Skip to content

Commit

Permalink
fix: ingress publishes events to timeline (#3659)
Browse files Browse the repository at this point in the history
closes #3655
  • Loading branch information
matt2e authored Dec 6, 2024
1 parent 0451306 commit fd9c52b
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 10 deletions.
20 changes: 10 additions & 10 deletions backend/ingress/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.Reques
logger.Debugf("bad request: %s", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("bad request"))
recordIngressErrorEvent(r.Context(), &ingressEvent, http.StatusBadRequest, err.Error())
recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusBadRequest, err.Error())
return
}
ingressEvent.RequestBody = body
Expand All @@ -77,11 +77,11 @@ func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.Reques
httpCode := connectCodeToHTTP(connectErr.Code())
http.Error(w, http.StatusText(httpCode), httpCode)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("failed to call verb: connect error"))
recordIngressErrorEvent(r.Context(), &ingressEvent, http.StatusInternalServerError, connectErr.Error())
recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, connectErr.Error())
} else {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("failed to call verb: internal server error"))
recordIngressErrorEvent(r.Context(), &ingressEvent, http.StatusInternalServerError, err.Error())
recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, err.Error())
}
return
}
Expand All @@ -93,7 +93,7 @@ func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.Reques
logger.Errorf(err, "could not resolve schema type for verb %s", route.verb)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not resolve schema type for verb"))
recordIngressErrorEvent(r.Context(), &ingressEvent, http.StatusInternalServerError, err.Error())
recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, err.Error())
return
}
var responseBody []byte
Expand All @@ -104,7 +104,7 @@ func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.Reques
logger.Errorf(err, "could not unmarhal response for verb %s", verb)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not unmarhal response for verb"))
recordIngressErrorEvent(r.Context(), &ingressEvent, http.StatusInternalServerError, err.Error())
recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, err.Error())
return
}
rawBody = response.Body
Expand All @@ -114,7 +114,7 @@ func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.Reques
logger.Errorf(err, "could not create response for verb %s", verb)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not create response for verb"))
recordIngressErrorEvent(r.Context(), &ingressEvent, http.StatusInternalServerError, err.Error())
recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, err.Error())
return
}

Expand Down Expand Up @@ -147,25 +147,25 @@ func handleHTTP(startTime time.Time, sch *schema.Schema, requestKey model.Reques
} else {
logger.Errorf(err, "could not write response body")
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not write response body"))
recordIngressErrorEvent(r.Context(), &ingressEvent, http.StatusInternalServerError, err.Error())
recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, err.Error())
}

case *ftlv1.CallResponse_Error_:
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("call response: internal server error"))
recordIngressErrorEvent(r.Context(), &ingressEvent, http.StatusInternalServerError, msg.Error.Message)
recordIngressErrorEvent(r.Context(), ingressEvent, http.StatusInternalServerError, msg.Error.Message)
}
}

func recordIngressErrorEvent(
ctx context.Context,
ingressEvent *timeline.Ingress,
ingressEvent timeline.Ingress,
statusCode int,
errorMsg string,
) {
ingressEvent.ResponseStatus = statusCode
ingressEvent.Error = optional.Some(errorMsg)
// TODO: record event in timeline, one it has been split out from the controller
timeline.Publish(ctx, ingressEvent)
}

// Copied from the Apache-licensed connect-go source.
Expand Down
2 changes: 2 additions & 0 deletions charts/ftl/templates/http-ingress-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ spec:
{{- if .Values.ingress.env }}
{{- toYaml .Values.ingress.env | nindent 12 }}
{{- end }}
- name: FTL_TIMELINE_ENDPOINT
value: "http://{{ .Values.timeline.service.name }}:{{ .Values.timeline.service.port }}"

ports:
{{- range .Values.ingress.ports }}
Expand Down
5 changes: 5 additions & 0 deletions cmd/ftl-http-ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/TBD54566975/ftl"
"github.com/TBD54566975/ftl/backend/ingress"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/timeline/v1/timelinev1connect"
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
_ "github.com/TBD54566975/ftl/internal/automaxprocs" // Set GOMAXPROCS to match Linux container CPU quota.
"github.com/TBD54566975/ftl/internal/log"
Expand All @@ -28,6 +29,7 @@ var cli struct {
HTTPIngressConfig ingress.Config `embed:""`
ConfigFlag string `name:"config" short:"C" help:"Path to FTL project cf file." env:"FTL_CONFIG" placeholder:"FILE"`
SchemaServerEndpoint *url.URL `name:"ftl-endpoint" help:"Controller endpoint." env:"FTL_ENDPOINT" default:"http://127.0.0.1:8892"`
TimelineEndpoint *url.URL `help:"Timeline endpoint." env:"FTL_TIMELINE_ENDPOINT" default:"http://127.0.0.1:8894"`
}

func main() {
Expand All @@ -45,6 +47,9 @@ func main() {
err = observability.Init(ctx, false, "", "ftl-http-ingress", ftl.Version, cli.ObservabilityConfig)
kctx.FatalIfErrorf(err, "failed to initialize observability")

timelineClient := rpc.Dial(timelinev1connect.NewTimelineServiceClient, cli.TimelineEndpoint.String(), log.Error)
ctx = rpc.ContextWithClient(ctx, timelineClient)

schemaClient := rpc.Dial(ftlv1connect.NewSchemaServiceClient, cli.SchemaServerEndpoint.String(), log.Error)
eventSource := schemaeventsource.New(ctx, schemaClient)
routeManager := routing.NewVerbRouter(ctx, schemaeventsource.New(ctx, schemaClient))
Expand Down

0 comments on commit fd9c52b

Please sign in to comment.