Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for ingress path params #652

Merged
merged 2 commits into from
Dec 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 31 additions & 24 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package controller

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -38,6 +37,7 @@ import (
"github.com/TBD54566975/ftl/backend/common/sha256"
"github.com/TBD54566975/ftl/backend/common/slices"
"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/controller/ingress"
"github.com/TBD54566975/ftl/backend/controller/scaling"
"github.com/TBD54566975/ftl/backend/schema"
frontend "github.com/TBD54566975/ftl/frontend"
Expand Down Expand Up @@ -165,7 +165,7 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.
func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
logger := log.FromContext(r.Context())
logger.Infof("%s %s", r.Method, r.URL.Path)
routes, err := s.dal.GetIngressRoutes(r.Context(), r.Method, r.URL.Path)
routes, err := s.dal.GetIngressRoutes(r.Context(), r.Method)
if err != nil {
if errors.Is(err, dal.ErrNotFound) {
http.NotFound(w, r)
Expand All @@ -174,37 +174,44 @@ func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you also want the if errors.Is(err, dal.ErrNotFound) check here too.

return
}
route := routes[rand.Intn(len(routes))] //nolint:gosec
var body []byte
switch r.Method {
case http.MethodPost, http.MethodPut:
body, err = io.ReadAll(r.Body)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}

default:
// TODO: Transcode query parameters into JSON.
payload := map[string]string{}
for key, value := range r.URL.Query() {
payload[key] = value[len(value)-1]
}
body, err = json.Marshal(payload)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
route, err := ingress.GetIngressRoute(routes, r.Method, r.URL.Path)
if err != nil {
if errors.Is(err, dal.ErrNotFound) {
http.NotFound(w, r)
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
requestName, err := s.dal.CreateIngressRequest(r.Context(), fmt.Sprintf("%s %s", r.Method, r.URL.Path), r.RemoteAddr)

deployments, err := s.dal.GetActiveDeployments(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
sch := &schema.Schema{
Modules: slices.Map(deployments, func(d dal.Deployment) *schema.Module {
return d.Schema
}),
}

body, err := ingress.ValidateAndExtractBody(route, r, sch)
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

creq := connect.NewRequest(&ftlv1.CallRequest{
Verb: &schemapb.VerbRef{Module: route.Module, Name: route.Verb},
Body: body,
Metadata: &ftlv1.Metadata{},
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also might be able to add it to the Metadata here or something.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we'll combine the path parameters with query parameters, and map them onto the request structure. That's the approach gRPC uses, and I think it makes sense.

Copy link
Collaborator Author

@wesbillman wesbillman Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

coolio! Maybe we can chat today on this one. Want to make sure we have request validation in the right place. For example, if we add a RequestParams arg that has either the path params or query params here, when we create the actual verb request, we'll need a way to know if we should use the RequestParams for the <verb>Request.

It seems like we currently put the query params into the body of the payload after we json.Marshal it. So maybe the path parameters will always be in the RequestParams struct and query params will always end up in the body? I'm guessing we can't specify query params in the ingress annotation, yeah?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Off the top of my head, I think it should be in the schema validator.

Verb: &schemapb.VerbRef{Module: route.Module, Name: route.Verb},
Body: body,
})

requestName, err := s.dal.CreateIngressRequest(r.Context(), fmt.Sprintf("%s %s", r.Method, r.URL.Path), r.RemoteAddr)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
headers.SetRequestName(creq.Header(), requestName)
resp, err := s.Call(r.Context(), creq)
if err != nil {
Expand Down
24 changes: 14 additions & 10 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ var (
)

type IngressRoute struct {
Runner model.RunnerKey
Endpoint string
Module string
Verb string
Runner model.RunnerKey
Deployment model.DeploymentName
Endpoint string
Path string
Module string
Verb string
}

type IngressRouteEntry struct {
Expand Down Expand Up @@ -948,8 +950,8 @@ func (d *DAL) CreateIngressRequest(ctx context.Context, route, addr string) (mod
return name, err
}

func (d *DAL) GetIngressRoutes(ctx context.Context, method string, path string) ([]IngressRoute, error) {
routes, err := d.db.GetIngressRoutes(ctx, method, path)
func (d *DAL) GetIngressRoutes(ctx context.Context, method string) ([]IngressRoute, error) {
routes, err := d.db.GetIngressRoutes(ctx, method)
if err != nil {
return nil, translatePGError(err)
}
Expand All @@ -958,10 +960,12 @@ func (d *DAL) GetIngressRoutes(ctx context.Context, method string, path string)
}
return slices.Map(routes, func(row sql.GetIngressRoutesRow) IngressRoute {
return IngressRoute{
Runner: model.RunnerKey(row.RunnerKey),
Endpoint: row.Endpoint,
Module: row.Module,
Verb: row.Verb,
Runner: model.RunnerKey(row.RunnerKey),
Deployment: row.DeploymentName,
Endpoint: row.Endpoint,
Path: row.Path,
Module: row.Module,
Verb: row.Verb,
}
}), nil
}
Expand Down
234 changes: 234 additions & 0 deletions backend/controller/ingress/ingress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package ingress

import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"net/http"
"reflect"
"strconv"
"strings"

"github.com/TBD54566975/ftl/backend/common/slices"
"github.com/TBD54566975/ftl/backend/controller/dal"
"github.com/TBD54566975/ftl/backend/schema"
)

type path []string

func (p path) String() string {
return strings.TrimLeft(strings.Join(p, ""), ".")
}

func GetIngressRoute(routes []dal.IngressRoute, method string, path string) (*dal.IngressRoute, error) {
var matchedRoutes = slices.Filter(routes, func(route dal.IngressRoute) bool {
return matchSegments(route.Path, path, func(segment, value string) {})
})

if len(matchedRoutes) == 0 {
return nil, dal.ErrNotFound
}

// TODO: add load balancing at some point
route := matchedRoutes[rand.Intn(len(matchedRoutes))] //nolint:gosec
return &route, nil
}

func matchSegments(pattern, urlPath string, onMatch func(segment, value string)) bool {
patternSegments := strings.Split(strings.Trim(pattern, "/"), "/")
urlSegments := strings.Split(strings.Trim(urlPath, "/"), "/")

if len(patternSegments) != len(urlSegments) {
return false
}

for i, segment := range patternSegments {
if segment == "" && urlSegments[i] == "" {
continue // Skip empty segments
}

if strings.HasPrefix(segment, "{") && strings.HasSuffix(segment, "}") {
key := strings.Trim(segment, "{}") // Dynamic segment
onMatch(key, urlSegments[i])
} else if segment != urlSegments[i] {
return false
}
}
return true
}

// ValidateAndExtractBody validates the request body against the schema and extracts the request body as a JSON blob.
func ValidateAndExtractBody(route *dal.IngressRoute, r *http.Request, sch *schema.Schema) ([]byte, error) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A doc comment here would be good.

requestMap, err := buildRequestMap(route, r)
if err != nil {
return nil, err
}

verb := sch.ResolveVerbRef(&schema.VerbRef{Name: route.Verb, Module: route.Module})
if verb == nil {
return nil, fmt.Errorf("unknown verb %s", route.Verb)
}

dataRef := verb.Request
if dataRef.Module == "" {
dataRef.Module = route.Module
}

err = validateRequestMap(dataRef, []string{dataRef.String()}, requestMap, sch)
if err != nil {
return nil, err
}

body, err := json.Marshal(requestMap)
if err != nil {
return nil, err
}

return body, nil
}

func buildRequestMap(route *dal.IngressRoute, r *http.Request) (map[string]any, error) {
requestMap := map[string]any{}
matchSegments(route.Path, r.URL.Path, func(segment, value string) {
requestMap[segment] = value
})

switch r.Method {
case http.MethodPost, http.MethodPut:
var bodyMap map[string]any
err := json.NewDecoder(r.Body).Decode(&bodyMap)
if err != nil {
return nil, fmt.Errorf("HTTP request body is not valid JSON: %w", err)
}

// Merge bodyMap into params
for k, v := range bodyMap {
requestMap[k] = v
}
default:
// TODO: Support query params correctly for map and array
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO!!!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This pained me 😂

for key, value := range r.URL.Query() {
requestMap[key] = value[len(value)-1]
}
}

return requestMap, nil
}

func validateRequestMap(dataRef *schema.DataRef, path path, request map[string]any, sch *schema.Schema) error {
data := sch.ResolveDataRef(dataRef)
if data == nil {
return fmt.Errorf("unknown data %v", dataRef)
}

var errs []error
for _, field := range data.Fields {
fieldPath := append(path, "."+field.Name) //nolint:gocritic

_, isOptional := field.Type.(*schema.Optional)
value, haveValue := request[field.Name]
if !isOptional && !haveValue {
errs = append(errs, fmt.Errorf("%s is required", fieldPath))
continue
}

if haveValue {
err := validateValue(field.Type, fieldPath, value, sch)
if err != nil {
errs = append(errs, err)
}
}

}

return errors.Join(errs...)
}

func validateValue(fieldType schema.Type, path path, value any, sch *schema.Schema) error {
var typeMatches bool
switch fieldType := fieldType.(type) {
case *schema.Int:
switch value := value.(type) {
case float64:
typeMatches = true
case string:
if _, err := strconv.ParseFloat(value, 64); err == nil {
typeMatches = true
}
}
case *schema.Float:
switch value := value.(type) {
case float64:
typeMatches = true
case string:
if _, err := strconv.ParseFloat(value, 64); err == nil {
typeMatches = true
}
}
case *schema.String:
_, typeMatches = value.(string)
case *schema.Bool:
switch value := value.(type) {
case bool:
typeMatches = true
case string:
if _, err := strconv.ParseBool(value); err == nil {
typeMatches = true
}
}
case *schema.Array:
rv := reflect.ValueOf(value)
if rv.Kind() != reflect.Slice {
return fmt.Errorf("%s is not a slice", path)
}
elementType := fieldType.Element
for i := 0; i < rv.Len(); i++ {
elemPath := append(path, fmt.Sprintf("[%d]", i)) //nolint:gocritic
elem := rv.Index(i).Interface()
if err := validateValue(elementType, elemPath, elem, sch); err != nil {
return err
}
}
typeMatches = true
case *schema.Map:
rv := reflect.ValueOf(value)
if rv.Kind() != reflect.Map {
return fmt.Errorf("%s is not a map", path)
}
keyType := fieldType.Key
valueType := fieldType.Value
for _, key := range rv.MapKeys() {
elemPath := append(path, fmt.Sprintf("[%q]", key)) //nolint:gocritic
elem := rv.MapIndex(key).Interface()
if err := validateValue(keyType, elemPath, key.Interface(), sch); err != nil {
return err
}
if err := validateValue(valueType, elemPath, elem, sch); err != nil {
return err
}
}
typeMatches = true
case *schema.DataRef:
if valueMap, ok := value.(map[string]any); ok {
if err := validateRequestMap(fieldType, path, valueMap, sch); err != nil {
return err
}
typeMatches = true
}
case *schema.Optional:
if value == nil {
typeMatches = true
} else {
return validateValue(fieldType.Type, path, value, sch)
}

default:
return fmt.Errorf("%s has unsupported type %T", path, fieldType)
}

if !typeMatches {
return fmt.Errorf("%s has wrong type, expected %s found %T", path, fieldType, value)
}
return nil
}
Loading