Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: otel metrics for ingress #2310

Merged
merged 2 commits into from
Aug 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,22 +336,27 @@ func New(ctx context.Context, conn *sql.DB, config Config, runnerScaling scaling
}

func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()

routes, err := s.dal.GetIngressRoutes(r.Context(), r.Method)
if err != nil {
if errors.Is(err, dalerrs.ErrNotFound) {
http.NotFound(w, r)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), start, optional.Some("route not found in dal"))
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), start, optional.Some("failed to resolve route from dal"))
return
}
sch, err := s.dal.GetActiveSchema(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), start, optional.Some("could not get active schema"))
return
}
requestKey := model.NewRequestKey(model.OriginIngress, fmt.Sprintf("%s %s", r.Method, r.URL.Path))
ingress.Handle(sch, requestKey, routes, w, r, s.callWithRequest)
ingress.Handle(start, sch, requestKey, routes, w, r, s.callWithRequest)
}

func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.ProcessListRequest]) (*connect.Response[ftlv1.ProcessListResponse], error) {
Expand Down
21 changes: 19 additions & 2 deletions backend/controller/ingress/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@ import (
"encoding/json"
"errors"
"net/http"
"time"

"connectrpc.com/connect"
"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/observability"
dalerrs "github.com/TBD54566975/ftl/backend/dal"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
Expand All @@ -20,6 +22,7 @@ import (

// Handle HTTP ingress routes.
func Handle(
startTime time.Time,
sch *schema.Schema,
requestKey model.RequestKey,
routes []dal.IngressRoute,
Expand All @@ -33,24 +36,29 @@ func Handle(
if err != nil {
if errors.Is(err, dalerrs.ErrNotFound) {
http.NotFound(w, r)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), startTime, optional.Some("route not found"))
return
}
logger.Errorf(err, "failed to resolve route for %s %s", r.Method, r.URL.Path)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), startTime, optional.Some("failed to resolve route"))
return
}

verbRef := &schemapb.Ref{Module: route.Module, Name: route.Verb}

body, err := BuildRequestBody(route, r, sch)
if err != nil {
// Only log at debug, as this is a client side error
logger.Debugf("bad request: %s", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("bad request"))
return
}

creq := connect.NewRequest(&ftlv1.CallRequest{
Metadata: &ftlv1.Metadata{},
Verb: &schemapb.Ref{Module: route.Module, Name: route.Verb},
Verb: verbRef,
Body: body,
})

Expand All @@ -60,8 +68,10 @@ func Handle(
if connectErr := new(connect.Error); errors.As(err, &connectErr) {
httpCode := connectCodeToHTTP(connectErr.Code())
http.Error(w, http.StatusText(httpCode), httpCode)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("failed to call verb: connect error"))
} else {
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("failed to call verb: internal server error"))
}
return
}
Expand All @@ -72,6 +82,7 @@ func Handle(
if err != nil {
logger.Errorf(err, "could not resolve schema type for verb %s", route.Verb)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not resolve schema type for verb"))
return
}
var responseBody []byte
Expand All @@ -81,6 +92,7 @@ func Handle(
if err := json.Unmarshal(msg.Body, &response); err != nil {
logger.Errorf(err, "could not unmarhal response for verb %s", verb)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not unmarhal response for verb"))
return
}

Expand All @@ -89,6 +101,7 @@ func Handle(
if err != nil {
logger.Errorf(err, "could not create response for verb %s", verb)
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not create response for verb"))
return
}

Expand All @@ -105,12 +118,16 @@ func Handle(
responseBody = msg.Body
}
_, err = w.Write(responseBody)
if err != nil {
if err == nil {
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.None[string]())
} else {
logger.Errorf(err, "Could not write response body")
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("could not write response body"))
}

case *ftlv1.CallResponse_Error_:
http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.Some(verbRef), startTime, optional.Some("call response: internal server error"))
}
}

Expand Down
3 changes: 2 additions & 1 deletion backend/controller/ingress/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http/httptest"
"net/url"
"testing"
"time"

"connectrpc.com/connect"
"github.com/alecthomas/assert/v2"
Expand Down Expand Up @@ -99,7 +100,7 @@ func TestIngress(t *testing.T) {
req := httptest.NewRequest(test.method, test.path, bytes.NewBuffer(test.payload)).WithContext(ctx)
req.URL.RawQuery = test.query.Encode()
reqKey := model.NewRequestKey(model.OriginIngress, "test")
ingress.Handle(sch, reqKey, routes, rec, req, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], parentRequestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) {
ingress.Handle(time.Now(), sch, reqKey, routes, rec, req, func(ctx context.Context, r *connect.Request[ftlv1.CallRequest], requestKey optional.Option[model.RequestKey], parentRequestKey optional.Option[model.RequestKey], requestSource string) (*connect.Response[ftlv1.CallResponse], error) {
body, err := encoding.Marshal(response)
assert.NoError(t, err)
return connect.NewResponse(&ftlv1.CallResponse{Response: &ftlv1.CallResponse_Body{Body: body}}), nil
Expand Down
76 changes: 76 additions & 0 deletions backend/controller/observability/ingress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package observability

import (
"context"
"fmt"
"time"

"github.com/alecthomas/types/optional"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/noop"

schemapb "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/observability"
)

const (
ingressMeterName = "ftl.ingress"
ingressMethodAttr = "ftl.ingress.method"
ingressPathAttr = "ftl.ingress.path"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious if this is too high of a cardinality to log

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on what info is contained in the paths, we could potentially chop the path and keep just the top level prefix, or otherwise parse out only the data that we find useful

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh huh if the path is just the module name, I guess it doesn't even matter if we do anything special here, because it won't add any cardinality beyond the verb ref. We can just leave this as is - it's probably easiest to understand this way

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The path is entirely user defined, i.e. //ftl:ingress GET /http/echo -> https://<domain:port>/http/echo

But you're right it's equivalent to adding a verb ref, and it's pretty usefull; I'll leave it in.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh got it, that makes sense. Sounds good!

ingressVerbRefAttr = "ftl.ingress.verb.ref"
ingressFailureModeAttr = "ftl.ingress.failure_mode"
ingressRunTimeBucketAttr = "ftl.ingress.run_time_ms.bucket"
)

type IngressMetrics struct {
requests metric.Int64Counter
msToComplete metric.Int64Histogram
}

func initIngressMetrics() (*IngressMetrics, error) {
result := &IngressMetrics{
requests: noop.Int64Counter{},
msToComplete: noop.Int64Histogram{},
}

var err error
meter := otel.Meter(ingressMeterName)

signalName := fmt.Sprintf("%s.requests", ingressMeterName)
if result.requests, err = meter.Int64Counter(signalName, metric.WithUnit("1"),
metric.WithDescription("the number of ingress requests that the FTL controller receives")); err != nil {
return nil, wrapErr(signalName, err)
}

signalName = fmt.Sprintf("%s.ms_to_complete", ingressMeterName)
if result.msToComplete, err = meter.Int64Histogram(signalName, metric.WithUnit("ms"),
metric.WithDescription("duration in ms to complete an ingress request")); err != nil {
return nil, wrapErr(signalName, err)
}

return result, nil
}

func (m *IngressMetrics) Request(ctx context.Context, method string, path string, verb optional.Option[*schemapb.Ref], startTime time.Time, failureMode optional.Option[string]) {
attrs := []attribute.KeyValue{
attribute.String(ingressMethodAttr, method),
attribute.String(ingressPathAttr, path),
}
if v, ok := verb.Get(); ok {
attrs = append(attrs,
attribute.String(observability.ModuleNameAttribute, v.Module),
attribute.String(ingressVerbRefAttr, schema.RefFromProto(v).String()))
}
if f, ok := failureMode.Get(); ok {
attrs = append(attrs, attribute.String(ingressFailureModeAttr, f))
}

msToComplete := timeSinceMS(startTime)
m.msToComplete.Record(ctx, msToComplete, metric.WithAttributes(attrs...))

attrs = append(attrs, attribute.String(ingressRunTimeBucketAttr, logBucket(2, msToComplete)))
m.requests.Add(ctx, 1, metric.WithAttributes(attrs...))
}
3 changes: 3 additions & 0 deletions backend/controller/observability/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var (
Calls *CallMetrics
Deployment *DeploymentMetrics
FSM *FSMMetrics
Ingress *IngressMetrics
PubSub *PubSubMetrics
Cron *CronMetrics
)
Expand All @@ -28,6 +29,8 @@ func init() {
errs = errors.Join(errs, err)
FSM, err = initFSMMetrics()
errs = errors.Join(errs, err)
Ingress, err = initIngressMetrics()
errs = errors.Join(errs, err)
PubSub, err = initPubSubMetrics()
errs = errors.Join(errs, err)
Cron, err = initCronMetrics()
Expand Down