Skip to content

Commit

Permalink
feat: move to single entry route tables
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Sep 17, 2024
1 parent 4820efc commit 717e7bd
Show file tree
Hide file tree
Showing 13 changed files with 418 additions and 600 deletions.
68 changes: 34 additions & 34 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"fmt"
"hash"
"io"
"math/rand"
"net/http"
"net/url"
"sort"
Expand Down Expand Up @@ -216,7 +215,7 @@ type Service struct {
// Complete schema synchronised from the database.
schema atomic.Value[*schema.Schema]

routes atomic.Value[map[string][]dal.Route]
routes atomic.Value[map[string]Route]
config Config

increaseReplicaFailures map[string]int
Expand Down Expand Up @@ -255,7 +254,7 @@ func New(ctx context.Context, conn *sql.DB, config Config, devel bool, runnerSca
increaseReplicaFailures: map[string]int{},
runnerScaling: runnerScaling,
}
svc.routes.Store(map[string][]dal.Route{})
svc.routes.Store(map[string]Route{})
svc.schema.Store(&schema.Schema{})

cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, encryption, conn)
Expand Down Expand Up @@ -377,17 +376,12 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
return nil, fmt.Errorf("could not get status: %w", err)
}
sroutes := s.routes.Load()
routes := slices.FlatMap(maps.Values(sroutes), func(routes []dal.Route) (out []*ftlv1.StatusResponse_Route) {
out = make([]*ftlv1.StatusResponse_Route, len(routes))
for i, route := range routes {
out[i] = &ftlv1.StatusResponse_Route{
Module: route.Module,
Runner: route.Runner.String(),
Deployment: route.Deployment.String(),
Endpoint: route.Endpoint,
}
routes := slices.Map(maps.Values(sroutes), func(route Route) (out *ftlv1.StatusResponse_Route) {
return &ftlv1.StatusResponse_Route{
Module: route.Module,
Deployment: route.Deployment.String(),
Endpoint: route.Endpoint,
}
return out
})
replicas := map[string]int32{}
protoRunners, err := slices.MapErr(status.Runners, func(r dal.Runner) (*ftlv1.StatusResponse_Runner, error) {
Expand Down Expand Up @@ -1013,13 +1007,12 @@ func (s *Service) callWithRequest(
}

module := verbRef.Module
routes, ok := s.routes.Load()[module]
route, ok := s.routes.Load()[module]
if !ok {
observability.Calls.Request(ctx, req.Msg.Verb, start, optional.Some("no routes for module"))
return nil, connect.NewError(connect.CodeNotFound, fmt.Errorf("no routes for module %q", module))
}
route := routes[rand.Intn(len(routes))] //nolint:gosec
client := s.clientsForRunner(route.Runner, route.Endpoint)
client := s.clientsForEndpoint(route.Endpoint)

callers, err := headers.GetCallers(req.Header())
if err != nil {
Expand Down Expand Up @@ -1210,16 +1203,16 @@ func (s *Service) getDeployment(ctx context.Context, key string) (*model.Deploym
return deployment, nil
}

// Return or create the RunnerService and VerbService clients for a Runner.
func (s *Service) clientsForRunner(key model.RunnerKey, endpoint string) clients {
clientItem := s.clients.Get(key.String())
// Return or create the RunnerService and VerbService clients for an endpoint.
func (s *Service) clientsForEndpoint(endpoint string) clients {
clientItem := s.clients.Get(endpoint)
if clientItem != nil {
return clientItem.Value()
}
client := clients{
verb: rpc.Dial(ftlv1connect.NewVerbServiceClient, endpoint, log.Error),
}
s.clients.Set(key.String(), client, time.Minute)
s.clients.Set(endpoint, client, time.Minute)
return client
}

Expand Down Expand Up @@ -1759,26 +1752,23 @@ func (s *Service) getDeploymentLogger(ctx context.Context, deploymentKey model.D
// Periodically sync the routing table from the DB.
func (s *Service) syncRoutes(ctx context.Context) (time.Duration, error) {
logger := log.FromContext(ctx)
routes, err := s.dal.GetRoutingTable(ctx, nil)
deployments, err := s.dal.GetActiveDeployments(ctx)
if errors.Is(err, libdal.ErrNotFound) {
routes = map[string][]dal.Route{}
deployments = []dal.Deployment{}
} else if err != nil {
return 0, err
}
// TODO: This currently keeps a route table entry per runner, even for situations when the load balancing
// is being routed through a service. Long terms we don't really want this, however as this will all need to
// be changed when we get rolling deployments in place, we can leave this for now.
for k, v := range routes {
for i := range v {
deployment, err := s.runnerScaling.GetEndpointForDeployment(k, v[i].Deployment.String())
if err != nil {
logger.Errorf(err, "Failed to get updated endpoint for deployment %s", v[i].Deployment.String())
continue
}
v[i].Endpoint = deployment.String()
newRoutes := map[string]Route{}
for _, v := range deployments {
optUri, err := s.runnerScaling.GetEndpointForDeployment(ctx, v.Module, v.Key.String())
if err != nil {
logger.Errorf(err, "Failed to get updated endpoint for deployment %s", v.Key.String())
continue
} else if uri, ok := optUri.Get(); ok {
newRoutes[v.Module] = Route{Module: v.Module, Deployment: v.Key, Endpoint: uri.String()}
}
}
s.routes.Store(routes)
s.routes.Store(newRoutes)
return time.Second, nil
}

Expand Down Expand Up @@ -1882,3 +1872,13 @@ func makeBackoff(min, max time.Duration) backoff.Backoff {
Factor: 2,
}
}

type Route struct {
Module string
Deployment model.DeploymentKey
Endpoint string
}

func (r Route) String() string {
return fmt.Sprintf("%s -> %s (%s)", r.Deployment, r.Endpoint)
}
53 changes: 0 additions & 53 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ type Status struct {
Runners []Runner
Deployments []Deployment
IngressRoutes []IngressRouteEntry
Routes []Route
}

// A Reservation of a Runner.
Expand All @@ -160,19 +159,6 @@ type Reservation interface {
Rollback(ctx context.Context) error
}

type Route struct {
Module string
Runner model.RunnerKey
Deployment model.DeploymentKey
Endpoint string
}

func (r Route) String() string {
return fmt.Sprintf("%s -> %s (%s)", r.Deployment, r.Runner, r.Endpoint)
}

func (r Route) notification() {}

func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service) *DAL {
var d *DAL
d = &DAL{
Expand Down Expand Up @@ -235,10 +221,6 @@ func (d *DAL) GetStatus(ctx context.Context) (Status, error) {
if err != nil {
return Status{}, fmt.Errorf("could not get ingress routes: %w", libdal.TranslatePGError(err))
}
routes, err := d.db.GetRoutingTable(ctx, nil)
if err != nil {
return Status{}, fmt.Errorf("could not get routing table: %w", libdal.TranslatePGError(err))
}
statusDeployments, err := slices.MapErr(deployments, func(in dalsql.GetActiveDeploymentsRow) (Deployment, error) {
labels := model.Labels{}
err = json.Unmarshal(in.Deployment.Labels, &labels)
Expand Down Expand Up @@ -286,14 +268,6 @@ func (d *DAL) GetStatus(ctx context.Context) (Status, error) {
Path: in.Path,
}
}),
Routes: slices.Map(routes, func(row dalsql.GetRoutingTableRow) Route {
return Route{
Module: row.ModuleName.MustGet(),
Runner: row.RunnerKey,
Deployment: row.DeploymentKey.MustGet(),
Endpoint: row.Endpoint,
}
}),
}, nil
}

Expand Down Expand Up @@ -805,33 +779,6 @@ func (d *DAL) GetProcessList(ctx context.Context) ([]Process, error) {
})
}

// GetRoutingTable returns the endpoints for all runners for the given modules,
// or all routes if modules is empty.
//
// Returns route map keyed by module.
func (d *DAL) GetRoutingTable(ctx context.Context, modules []string) (map[string][]Route, error) {
routes, err := d.db.GetRoutingTable(ctx, modules)
if err != nil {
return nil, libdal.TranslatePGError(err)
}
if len(routes) == 0 {
return nil, fmt.Errorf("no routes found: %w", libdal.ErrNotFound)
}
out := make(map[string][]Route, len(routes))
for _, route := range routes {
// This is guaranteed to be non-nil by the query, but sqlc doesn't quite understand that.
moduleName := route.ModuleName.MustGet()
deploymentKey := route.DeploymentKey.MustGet()
out[moduleName] = append(out[moduleName], Route{
Module: moduleName,
Deployment: deploymentKey,
Runner: route.RunnerKey,
Endpoint: route.Endpoint,
})
}
return out, nil
}

func (d *DAL) GetRunner(ctx context.Context, runnerKey model.RunnerKey) (Runner, error) {
row, err := d.db.GetRunner(ctx, runnerKey)
if err != nil {
Expand Down
23 changes: 0 additions & 23 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,6 @@ func TestDAL(t *testing.T) {
assert.NoError(t, err)
})

expectedRunner := Runner{
Key: runnerID,
Labels: labels,
Endpoint: "http://localhost:8080",
Deployment: deploymentKey,
}

t.Run("SetDeploymentReplicas", func(t *testing.T) {
err := dal.SetDeploymentReplicas(ctx, deploymentKey, 1)
assert.NoError(t, err)
Expand Down Expand Up @@ -151,17 +144,6 @@ func TestDAL(t *testing.T) {
assert.NoError(t, err)
})

t.Run("GetRoutingTable", func(t *testing.T) {
routes, err := dal.GetRoutingTable(ctx, []string{deployment.Module})
assert.NoError(t, err)
assert.Equal(t, []Route{{
Module: "test",
Runner: expectedRunner.Key,
Deployment: deploymentKey,
Endpoint: expectedRunner.Endpoint,
}}, routes[deployment.Module])
})

t.Run("UpdateRunnerInvalidDeployment", func(t *testing.T) {
err := dal.UpsertRunner(ctx, Runner{
Key: runnerID,
Expand All @@ -173,11 +155,6 @@ func TestDAL(t *testing.T) {
assert.IsError(t, err, libdal.ErrConstraint)
})

t.Run("GetRoutingTable", func(t *testing.T) {
_, err := dal.GetRoutingTable(ctx, []string{"non-existent"})
assert.IsError(t, err, libdal.ErrNotFound)
})

t.Run("DeregisterRunner", func(t *testing.T) {
err = dal.DeregisterRunner(ctx, runnerID)
assert.NoError(t, err)
Expand Down
3 changes: 0 additions & 3 deletions backend/controller/dal/internal/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 0 additions & 14 deletions backend/controller/dal/internal/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -171,20 +171,6 @@ FROM runners r
INNER JOIN deployments d on d.id = r.deployment_id
WHERE r.key = sqlc.arg('key')::runner_key;

-- name: GetRoutingTable :many
SELECT endpoint, r.key AS runner_key, r.module_name, d.key deployment_key
FROM runners r
LEFT JOIN deployments d on r.deployment_id = d.id
WHERE (COALESCE(cardinality(sqlc.arg('modules')::TEXT[]), 0) = 0
OR module_name = ANY (sqlc.arg('modules')::TEXT[]));

-- name: GetRouteForRunner :one
-- Retrieve routing information for a runner.
SELECT endpoint, r.key AS runner_key, r.module_name, d.key deployment_key
FROM runners r
LEFT JOIN deployments d on r.deployment_id = d.id
WHERE r.key = sqlc.arg('key')::runner_key;

-- name: GetRunnersForDeployment :many
SELECT *
FROM runners r
Expand Down
70 changes: 0 additions & 70 deletions backend/controller/dal/internal/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 717e7bd

Please sign in to comment.