Skip to content

Commit

Permalink
feat: move ingress to it's own service
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Nov 25, 2024
1 parent 23e0322 commit 4293d18
Show file tree
Hide file tree
Showing 46 changed files with 594 additions and 815 deletions.
106 changes: 5 additions & 101 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/jackc/pgx/v5"
"github.com/jellydator/ttlcache/v3"
"github.com/jpillora/backoff"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
Expand All @@ -42,7 +41,6 @@ import (
dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
"github.com/TBD54566975/ftl/backend/controller/dsn"
"github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/ingress"
"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/leases/dbleaser"
"github.com/TBD54566975/ftl/backend/controller/observability"
Expand All @@ -58,8 +56,6 @@ import (
frontend "github.com/TBD54566975/ftl/frontend/console"
"github.com/TBD54566975/ftl/internal/configuration"
cf "github.com/TBD54566975/ftl/internal/configuration/manager"
"github.com/TBD54566975/ftl/internal/cors"
ftlhttp "github.com/TBD54566975/ftl/internal/http"
"github.com/TBD54566975/ftl/internal/log"
ftlmaps "github.com/TBD54566975/ftl/internal/maps"
"github.com/TBD54566975/ftl/internal/model"
Expand All @@ -75,24 +71,14 @@ import (

// CommonConfig between the production controller and development server.
type CommonConfig struct {
AllowOrigins []*url.URL `help:"Allow CORS requests to ingress endpoints from these origins." env:"FTL_CONTROLLER_ALLOW_ORIGIN"`
AllowHeaders []string `help:"Allow these headers in CORS requests. (Requires AllowOrigins)" env:"FTL_CONTROLLER_ALLOW_HEADERS"`
NoConsole bool `help:"Disable the console."`
IdleRunners int `help:"Number of idle runners to keep around (not supported in production)." default:"3"`
WaitFor []string `help:"Wait for these modules to be deployed before becoming ready." placeholder:"MODULE"`
CronJobTimeout time.Duration `help:"Timeout for cron jobs." default:"5m"`
}

func (c *CommonConfig) Validate() error {
if len(c.AllowHeaders) > 0 && len(c.AllowOrigins) == 0 {
return fmt.Errorf("AllowOrigins must be set when AllowHeaders is used")
}
return nil
}

type Config struct {
Bind *url.URL `help:"Socket to bind to." default:"http://127.0.0.1:8892" env:"FTL_CONTROLLER_BIND"`
IngressBind *url.URL `help:"Socket to bind to for ingress." default:"http://127.0.0.1:8891" env:"FTL_CONTROLLER_INGRESS_BIND"`
Key model.ControllerKey `help:"Controller key (auto)." placeholder:"KEY"`
DSN string `help:"DAL DSN." default:"${dsn}" env:"FTL_CONTROLLER_DSN"`
Advertise *url.URL `help:"Endpoint the Controller should advertise (must be unique across the cluster, defaults to --bind if omitted)." env:"FTL_CONTROLLER_ADVERTISE"`
Expand Down Expand Up @@ -170,21 +156,7 @@ func Start(
admin := admin.NewAdminService(cm, sm, svc.dal)
console := console.NewService(svc.dal, svc.timeline, admin)

ingressHandler := otelhttp.NewHandler(http.Handler(svc), "ftl.ingress")
if len(config.AllowOrigins) > 0 {
ingressHandler = cors.Middleware(
slices.Map(config.AllowOrigins, func(u *url.URL) string { return u.String() }),
config.AllowHeaders,
ingressHandler,
)
}

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
logger.Infof("HTTP ingress server listening on: %s", config.IngressBind)

return ftlhttp.Serve(ctx, config.IngressBind, ingressHandler)
})

g.Go(func() error {
return rpc.Serve(ctx, config.Bind,
Expand Down Expand Up @@ -370,28 +342,6 @@ func New(
return svc, nil
}

func (s *Service) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()
method := strings.ToLower(r.Method)
requestKey := model.NewRequestKey(model.OriginIngress, fmt.Sprintf("%s %s", method, r.URL.Path))

sourceAddress := r.RemoteAddr
err := s.dal.CreateRequest(r.Context(), requestKey, sourceAddress)
if err != nil {
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), start, optional.Some("failed to create request"))
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return
}

routes := s.schemaState.Load().httpRoutes[r.Method]
if len(routes) == 0 {
http.NotFound(w, r)
observability.Ingress.Request(r.Context(), r.Method, r.URL.Path, optional.None[*schemapb.Ref](), start, optional.Some("route not found in dal"))
return
}
ingress.Handle(start, s.schemaState.Load().schema, requestKey, routes, w, r, s.timeline, s.callWithRequest)
}

func (s *Service) ProcessList(ctx context.Context, req *connect.Request[ftlv1.ProcessListRequest]) (*connect.Response[ftlv1.ProcessListResponse], error) {
processes, err := s.dal.GetProcessList(ctx)
if err != nil {
Expand Down Expand Up @@ -487,15 +437,7 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
}),
Runners: protoRunners,
Deployments: deployments,
IngressRoutes: slices.Map(status.IngressRoutes, func(r dalmodel.IngressRouteEntry) *ftlv1.StatusResponse_IngressRoute {
return &ftlv1.StatusResponse_IngressRoute{
DeploymentKey: r.Deployment.String(),
Verb: &schemapb.Ref{Module: r.Module, Name: r.Verb},
Method: r.Method,
Path: r.Path,
}
}),
Routes: routes,
Routes: routes,
}
return connect.NewResponse(resp), nil
}
Expand Down Expand Up @@ -1131,9 +1073,7 @@ func (s *Service) CreateDeployment(ctx context.Context, req *connect.Request[ftl
}
}

ingressRoutes := extractIngressRoutingEntries(req.Msg)

dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts, ingressRoutes)
dkey, err := s.dal.CreateDeployment(ctx, ms.Runtime.Language, module, artefacts)
if err != nil {
logger.Errorf(err, "Could not create deployment")
return nil, fmt.Errorf("could not create deployment: %w", err)
Expand Down Expand Up @@ -1763,16 +1703,12 @@ func (s *Service) syncRoutesAndSchema(ctx context.Context) (ret time.Duration, e
}
}

httpRoutes, err := s.dal.GetIngressRoutes(ctx)
if err != nil {
return 0, fmt.Errorf("failed to get ingress routes: %w", err)
}
orderedModules := maps.Values(modulesByName)
sort.SliceStable(orderedModules, func(i, j int) bool {
return orderedModules[i].Name < orderedModules[j].Name
})
combined := &schema.Schema{Modules: orderedModules}
s.schemaState.Store(schemaState{schema: combined, routes: newRoutes, httpRoutes: httpRoutes})
s.schemaState.Store(schemaState{schema: combined, routes: newRoutes})
return time.Second, nil
}

Expand All @@ -1796,37 +1732,6 @@ func (s *Service) reapCallEvents(ctx context.Context) (time.Duration, error) {
return *s.config.EventLogRetention / 20, nil
}

func extractIngressRoutingEntries(req *ftlv1.CreateDeploymentRequest) []dal.IngressRoutingEntry {
var ingressRoutes []dal.IngressRoutingEntry
for _, decl := range req.Schema.Decls {
if verb, ok := decl.Value.(*schemapb.Decl_Verb); ok {
for _, metadata := range verb.Verb.Metadata {
if ingress, ok := metadata.Value.(*schemapb.Metadata_Ingress); ok {
ingressRoutes = append(ingressRoutes, dal.IngressRoutingEntry{
Verb: verb.Verb.Name,
Method: ingress.Ingress.Method,
Path: ingressPathString(ingress.Ingress.Path),
})
}
}
}
}
return ingressRoutes
}

func ingressPathString(path []*schemapb.IngressPathComponent) string {
pathString := make([]string, len(path))
for i, p := range path {
switch p.Value.(type) {
case *schemapb.IngressPathComponent_IngressPathLiteral:
pathString[i] = p.GetIngressPathLiteral().Text
case *schemapb.IngressPathComponent_IngressPathParameter:
pathString[i] = fmt.Sprintf("{%s}", p.GetIngressPathParameter().Name)
}
}
return "/" + strings.Join(pathString, "/")
}

func makeBackoff(min, max time.Duration) backoff.Backoff {
return backoff.Backoff{
Min: min,
Expand Down Expand Up @@ -1861,9 +1766,8 @@ type Route struct {
}

type schemaState struct {
schema *schema.Schema
routes map[string]Route
httpRoutes map[string][]dalmodel.IngressRoute
schema *schema.Schema
routes map[string]Route
}

func (r Route) String() string {
Expand Down
52 changes: 1 addition & 51 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,6 @@ func (d *DAL) GetStatus(ctx context.Context) (dalmodel.Status, error) {
if err != nil {
return dalmodel.Status{}, fmt.Errorf("could not get active deployments: %w", libdal.TranslatePGError(err))
}
ingressRoutes, err := d.db.GetActiveIngressRoutes(ctx)
if err != nil {
return dalmodel.Status{}, fmt.Errorf("could not get ingress routes: %w", libdal.TranslatePGError(err))
}
statusDeployments, err := slices.MapErr(deployments, func(in dalsql.GetActiveDeploymentsRow) (dalmodel.Deployment, error) {
labels := model.Labels{}
err = json.Unmarshal(in.Deployment.Labels, &labels)
Expand Down Expand Up @@ -157,15 +153,6 @@ func (d *DAL) GetStatus(ctx context.Context) (dalmodel.Status, error) {
Controllers: controllers,
Deployments: statusDeployments,
Runners: domainRunners,
IngressRoutes: slices.Map(ingressRoutes, func(in dalsql.GetActiveIngressRoutesRow) dalmodel.IngressRouteEntry {
return dalmodel.IngressRouteEntry{
Deployment: in.DeploymentKey,
Module: in.Module,
Verb: in.Verb,
Method: in.Method,
Path: in.Path,
}
}),
}, nil
}

Expand Down Expand Up @@ -196,17 +183,11 @@ func (d *DAL) UpsertModule(ctx context.Context, language, name string) (err erro
return libdal.TranslatePGError(err)
}

type IngressRoutingEntry struct {
Verb string
Method string
Path string
}

// CreateDeployment (possibly) creates a new deployment and associates
// previously created artefacts with it.
//
// If an existing deployment with identical artefacts exists, it is returned.
func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []dalmodel.DeploymentArtefact, ingressRoutes []IngressRoutingEntry) (key model.DeploymentKey, err error) {
func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchema *schema.Module, artefacts []dalmodel.DeploymentArtefact) (key model.DeploymentKey, err error) {
logger := log.FromContext(ctx)

// Start the parent transaction
Expand Down Expand Up @@ -290,19 +271,6 @@ func (d *DAL) CreateDeployment(ctx context.Context, language string, moduleSchem
}
}

for _, ingressRoute := range ingressRoutes {
err = tx.db.CreateIngressRoute(ctx, dalsql.CreateIngressRouteParams{
Key: deploymentKey,
Method: ingressRoute.Method,
Path: ingressRoute.Path,
Module: moduleSchema.Name,
Verb: ingressRoute.Verb,
})
if err != nil {
return model.DeploymentKey{}, fmt.Errorf("failed to create ingress route: %w", libdal.TranslatePGError(err))
}
}

return deploymentKey, nil
}

Expand Down Expand Up @@ -690,24 +658,6 @@ func (d *DAL) CreateRequest(ctx context.Context, key model.RequestKey, addr stri
return nil
}

func (d *DAL) GetIngressRoutes(ctx context.Context) (map[string][]dalmodel.IngressRoute, error) {
routes, err := d.db.GetIngressRoutes(ctx)
if err != nil {
return nil, libdal.TranslatePGError(err)
}
return slices.GroupBy(slices.Map(routes, func(row dalsql.GetIngressRoutesRow) dalmodel.IngressRoute {
return dalmodel.IngressRoute{
Runner: row.RunnerKey,
Deployment: row.DeploymentKey,
Endpoint: row.Endpoint,
Path: row.Path,
Module: row.Module,
Verb: row.Verb,
Method: row.Method,
}
}), func(route dalmodel.IngressRoute) string { return route.Method }), nil
}

func (d *DAL) UpsertController(ctx context.Context, key model.ControllerKey, addr string) (int64, error) {
id, err := d.db.UpsertController(ctx, key, addr)
return id, libdal.TranslatePGError(err)
Expand Down
4 changes: 0 additions & 4 deletions backend/controller/dal/internal/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 0 additions & 17 deletions backend/controller/dal/internal/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -191,23 +191,6 @@ FROM controllers c
WHERE c.state <> 'dead'
ORDER BY c.key;

-- name: CreateIngressRoute :exec
INSERT INTO ingress_routes (deployment_id, module, verb, method, path)
VALUES ((SELECT id FROM deployments WHERE key = sqlc.arg('key')::deployment_key LIMIT 1), $2, $3, $4, $5);

-- name: GetIngressRoutes :many
-- Get the runner endpoints corresponding to the given ingress route.
SELECT r.key AS runner_key, d.key AS deployment_key, endpoint, ir.path, ir.module, ir.verb, ir.method
FROM ingress_routes ir
INNER JOIN runners r ON ir.deployment_id = r.deployment_id
INNER JOIN deployments d ON ir.deployment_id = d.id;

-- name: GetActiveIngressRoutes :many
SELECT d.key AS deployment_key, ir.module, ir.verb, ir.method, ir.path
FROM ingress_routes ir
INNER JOIN deployments d ON ir.deployment_id = d.id
WHERE d.min_replicas > 0;

-- name: SucceedAsyncCall :one
UPDATE async_calls
SET
Expand Down
Loading

0 comments on commit 4293d18

Please sign in to comment.