Skip to content

Commit

Permalink
feat: add auto scaling to ftl serve
Browse files Browse the repository at this point in the history
  • Loading branch information
wesbillman committed Nov 1, 2023
1 parent 9846932 commit bde28d4
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 65 deletions.
38 changes: 32 additions & 6 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/TBD54566975/ftl/backend/common/sha256"
"github.com/TBD54566975/ftl/backend/common/slices"
"github.com/TBD54566975/ftl/backend/controller/internal/dal"
"github.com/TBD54566975/ftl/backend/controller/scaling"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/console"
ftlv1 "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1"
Expand Down Expand Up @@ -66,7 +67,7 @@ func (c *Config) SetDefaults() {
}

// Start the Controller. Blocks until the context is cancelled.
func Start(ctx context.Context, config Config) error {
func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling) error {
config.SetDefaults()

logger := log.FromContext(ctx)
Expand All @@ -87,7 +88,7 @@ func Start(ctx context.Context, config Config) error {
return errors.WithStack(err)
}

svc, err := New(ctx, dal, config)
svc, err := New(ctx, dal, config, runnerScaling)
if err != nil {
return errors.WithStack(err)
}
Expand Down Expand Up @@ -120,12 +121,13 @@ type Service struct {
// Map from endpoint to client.
clients *ttlcache.Cache[string, clients]

routesMu sync.RWMutex
routes map[string][]dal.Route
config Config
routesMu sync.RWMutex
routes map[string][]dal.Route
config Config
runnerScaling scaling.RunnerScaling
}

func New(ctx context.Context, db *dal.DAL, config Config) (*Service, error) {
func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) {
key := config.Key
if config.Key.ULID() == (ulid.ULID{}) {
key = model.NewControllerKey()
Expand All @@ -138,6 +140,7 @@ func New(ctx context.Context, db *dal.DAL, config Config) (*Service, error) {
clients: ttlcache.New[string, clients](ttlcache.WithTTL[string, clients](time.Minute)),
routes: map[string][]dal.Route{},
config: config,
runnerScaling: runnerScaling,
}

go runWithRetries(ctx, time.Second*1, time.Second*2, svc.syncRoutes)
Expand All @@ -146,6 +149,7 @@ func New(ctx context.Context, db *dal.DAL, config Config) (*Service, error) {
go runWithRetries(ctx, config.RunnerTimeout, time.Second*10, svc.reapStaleRunners)
go runWithRetries(ctx, config.DeploymentReservationTimeout, time.Second*20, svc.releaseExpiredReservations)
go runWithRetries(ctx, time.Second*1, time.Second*5, svc.reconcileDeployments)
go runWithRetries(ctx, time.Second*1, time.Second*5, svc.reconcileRunners)
return svc, nil
}

Expand Down Expand Up @@ -792,6 +796,28 @@ func (s *Service) reconcileDeployments(ctx context.Context) error {
return nil
}

func (s *Service) reconcileRunners(ctx context.Context) error {
log := log.FromContext(ctx)
log.Infof("Reconciling runners")

activeDeployments, err := s.dal.GetActiveDeployments(ctx)
if err != nil {
return errors.Wrap(err, "failed to get deployments needing reconciliation")
}

totalRunners := 0
for _, deployment := range activeDeployments {
totalRunners += deployment.MinReplicas
}

err = s.runnerScaling.SetReplicas(ctx, totalRunners)
if err != nil {
return errors.WithStack(err)
}

return nil
}

func (s *Service) terminateRandomRunner(ctx context.Context, key model.DeploymentName) (bool, error) {
runners, err := s.dal.GetRunnersForDeployment(ctx, key)
if err != nil {
Expand Down
24 changes: 24 additions & 0 deletions backend/controller/scaling/k8s_scaling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package scaling

import (
"context"

"github.com/TBD54566975/ftl/backend/common/model"
)

var _ RunnerScaling = (*K8sScaling)(nil)

type K8sScaling struct {
}

func NewK8sScaling() *K8sScaling {
return &K8sScaling{}
}

func (k *K8sScaling) SetReplicas(ctx context.Context, replicas int) error {
return nil
}

func (*K8sScaling) Remove(ctx context.Context, runner model.RunnerKey) error {
return nil
}
115 changes: 115 additions & 0 deletions backend/controller/scaling/local_scaling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package scaling

import (
"context"
"encoding/binary"
"fmt"
"net/url"
"os"
"path/filepath"
"strconv"

"github.com/TBD54566975/ftl/backend/common/log"
"github.com/TBD54566975/ftl/backend/common/model"
"github.com/TBD54566975/ftl/backend/runner"
"github.com/alecthomas/errors"
"github.com/alecthomas/kong"
)

var _ RunnerScaling = (*LocalScaling)(nil)

type LocalScaling struct {
cacheDir string
runners []model.RunnerKey

nextBind *url.URL
controllerAddresses []*url.URL
}

func NewLocalScaling(nextBind *url.URL, controllerAddresses []*url.URL) (*LocalScaling, error) {
cacheDir, err := os.UserCacheDir()
if err != nil {
return nil, errors.WithStack(err)
}
return &LocalScaling{
cacheDir: cacheDir,
nextBind: nextBind,
controllerAddresses: controllerAddresses,
}, nil
}

func (l *LocalScaling) SetReplicas(ctx context.Context, replicas int) error {
logger := log.FromContext(ctx)
logger.Infof("SetReplicas: %d", replicas)

replicasToAdd := replicas - len(l.runners)

logger.Infof("Requested replicas %d", replicas)
if replicasToAdd <= 0 {
return nil
}

logger.Infof("Adding %d replicas", replicasToAdd)
for i := 0; i < replicasToAdd; i++ {
i := i

var err error
l.nextBind, err = IncrementPort(l.nextBind)
if err != nil {
return errors.WithStack(err)
}

controllerEndpoint := l.controllerAddresses[i%len(l.controllerAddresses)]
config := runner.Config{
Bind: l.nextBind,
ControllerEndpoint: controllerEndpoint,
}

name := fmt.Sprintf("runner%d", i)
if err := kong.ApplyDefaults(&config, kong.Vars{
"deploymentdir": filepath.Join(l.cacheDir, "ftl-runner", name, "deployments"),
"language": "go,kotlin",
}); err != nil {
return errors.WithStack(err)
}

// Create a readable ULID for the runner.
var ulid [16]byte
binary.BigEndian.PutUint32(ulid[10:], uint32(len(l.runners)+1))
ulidStr := fmt.Sprintf("%025X", ulid)
err = config.Key.Scan(ulidStr)
if err != nil {
return errors.WithStack(err)
}

runnerCtx := log.ContextWithLogger(ctx, logger.Scope(name))
l.runners = append(l.runners, config.Key)

go func() {
err := runner.Start(runnerCtx, config)
if err != nil {
logger.Errorf(err, "Error starting runner: %s", err)
}
}()
}

return nil
}

func (l *LocalScaling) Remove(ctx context.Context, runner model.RunnerKey) error {
log := log.FromContext(ctx)
log.Infof("Remove: %s", runner)
return nil
}

func IncrementPort(baseURL *url.URL) (*url.URL, error) {
newURL := *baseURL

newPort, err := strconv.Atoi(newURL.Port())
if err != nil {
return nil, errors.WithStack(err)
}

newURL.Host = fmt.Sprintf("%s:%d", baseURL.Hostname(), newPort+1)
return &newURL, nil
}
12 changes: 12 additions & 0 deletions backend/controller/scaling/scaling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package scaling

import (
"context"

"github.com/TBD54566975/ftl/backend/common/model"
)

type RunnerScaling interface {
SetReplicas(ctx context.Context, replicas int) error
Remove(ctx context.Context, runner model.RunnerKey) error
}
4 changes: 3 additions & 1 deletion cmd/ftl-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/TBD54566975/ftl/backend/common/log"
"github.com/TBD54566975/ftl/backend/common/observability"
"github.com/TBD54566975/ftl/backend/controller"
"github.com/TBD54566975/ftl/backend/controller/scaling"
)

var version = "dev"
Expand All @@ -38,6 +39,7 @@ func main() {
ctx := log.ContextWithLogger(context.Background(), log.Configure(os.Stderr, cli.LogConfig))
err = observability.Init(ctx, "ftl-controller", version, cli.ObservabilityConfig)
kctx.FatalIfErrorf(err, "failed to initialize observability")
err = controller.Start(ctx, cli.ControllerConfig)

err = controller.Start(ctx, cli.ControllerConfig, scaling.NewK8sScaling())
kctx.FatalIfErrorf(err)
}
75 changes: 17 additions & 58 deletions cmd/ftl/cmd_serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,8 @@ package main

import (
"context"
"encoding/binary"
"fmt"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"

Expand All @@ -18,13 +14,13 @@ import (
"github.com/TBD54566975/ftl/backend/common/exec"
"github.com/TBD54566975/ftl/backend/common/log"
"github.com/TBD54566975/ftl/backend/controller"
"github.com/TBD54566975/ftl/backend/runner"
"github.com/TBD54566975/ftl/backend/controller/scaling"
)

type serveCmd struct {
Bind *url.URL `help:"Starting endpoint to bind to and advertise to. Each controller and runner will increment the port by 1" default:"http://localhost:8892"`
Controllers int `short:"c" help:"Number of controllers to start." default:"1"`
Runners int `short:"r" help:"Number of runners to start." default:"10"`
Runners int `short:"r" help:"Number of runners to start." default:"0"`
}

const ftlContainerName = "ftl-db"
Expand All @@ -39,10 +35,22 @@ func (s *serveCmd) Run(ctx context.Context) error {

logger.Infof("Starting %d controller(s) and %d runner(s)", s.Controllers, s.Runners)

if err != nil {
return errors.WithStack(err)
}

wg, ctx := errgroup.WithContext(ctx)

controllerAddresses := make([]*url.URL, 0, s.Controllers)
nextBind := s.Bind
for i := 0; i < s.Controllers; i++ {
controllerAddresses = append(controllerAddresses, nextBind)
}

runnerScaling, err := scaling.NewLocalScaling(controllerAddresses[len(controllerAddresses)-1], controllerAddresses)
if err != nil {
return errors.WithStack(err)
}

for i := 0; i < s.Controllers; i++ {
i := i
Expand All @@ -59,58 +67,21 @@ func (s *serveCmd) Run(ctx context.Context) error {
controllerCtx := log.ContextWithLogger(ctx, logger.Scope(scope))

wg.Go(func() error {
return errors.Wrapf(controller.Start(controllerCtx, config), "controller%d failed", i)
return errors.Wrapf(controller.Start(controllerCtx, config, runnerScaling), "controller%d failed", i)
})

var err error
nextBind, err = incrementPort(nextBind)
nextBind, err = scaling.IncrementPort(nextBind)
if err != nil {
return errors.WithStack(err)
}
}

cacheDir, err := os.UserCacheDir()
err = runnerScaling.SetReplicas(ctx, s.Runners)
if err != nil {
return errors.WithStack(err)
}

for i := 0; i < s.Runners; i++ {
i := i
controllerEndpoint := controllerAddresses[i%len(controllerAddresses)]
config := runner.Config{
Bind: nextBind,
ControllerEndpoint: controllerEndpoint,
}

name := fmt.Sprintf("runner%d", i)
if err := kong.ApplyDefaults(&config, kong.Vars{
"deploymentdir": filepath.Join(cacheDir, "ftl-runner", name, "deployments"),
"language": "go,kotlin",
}); err != nil {
return errors.WithStack(err)
}

// Create a readable ULID for the runner.
var ulid [16]byte
binary.BigEndian.PutUint32(ulid[10:], uint32(i))
ulidStr := fmt.Sprintf("%025X", ulid)
err := config.Key.Scan(ulidStr)
if err != nil {
return errors.WithStack(err)
}

runnerCtx := log.ContextWithLogger(ctx, logger.Scope(name))

wg.Go(func() error {
return errors.Wrapf(runner.Start(runnerCtx, config), "runner%d failed", i)
})

nextBind, err = incrementPort(nextBind)
if err != nil {
return errors.WithStack(err)
}
}

if err := wg.Wait(); err != nil {
return errors.WithStack(err)
}
Expand Down Expand Up @@ -182,18 +153,6 @@ func setupDB(ctx context.Context) (string, error) {
return dsn, nil
}

func incrementPort(baseURL *url.URL) (*url.URL, error) {
newURL := *baseURL

newPort, err := strconv.Atoi(newURL.Port())
if err != nil {
return nil, errors.WithStack(err)
}

newURL.Host = fmt.Sprintf("%s:%d", baseURL.Hostname(), newPort+1)
return &newURL, nil
}

func pollContainerHealth(ctx context.Context, containerName string, timeout time.Duration) error {
logger := log.FromContext(ctx)
logger.Infof("Waiting for %s to be healthy", containerName)
Expand Down

0 comments on commit bde28d4

Please sign in to comment.