Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add auto scaling to ftl serve #533

Merged
merged 2 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions backend/common/bind/bind_allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package bind

import (
"fmt"
"net"
"net/url"
"strconv"

"github.com/alecthomas/atomic"
"github.com/alecthomas/errors"
)

type BindAllocator struct {
baseURL *url.URL
port atomic.Int32
}

func NewBindAllocator(url *url.URL) (*BindAllocator, error) {
_, portStr, err := net.SplitHostPort(url.Host)
if err != nil {
return nil, errors.WithStack(err)
}

port, err := strconv.Atoi(portStr)
if err != nil {
return nil, errors.WithStack(err)
}

return &BindAllocator{
baseURL: url,
port: atomic.NewInt32(int32(port) - 1), //nolint:gosec
}, nil
}

func (b *BindAllocator) Next() *url.URL {
var l *net.TCPListener
var err error
for {
b.port.Add(1)
l, err = net.ListenTCP("tcp", &net.TCPAddr{IP: net.ParseIP(b.baseURL.Hostname()), Port: int(b.port.Load())})

if err != nil {
continue
}
_ = l.Close()

newURL := *b.baseURL
newURL.Host = net.JoinHostPort(b.baseURL.Hostname(), fmt.Sprintf("%d", b.port.Load()))
return &newURL
}
}
44 changes: 38 additions & 6 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/TBD54566975/ftl/backend/common/sha256"
"github.com/TBD54566975/ftl/backend/common/slices"
"github.com/TBD54566975/ftl/backend/controller/internal/dal"
"github.com/TBD54566975/ftl/backend/controller/scaling"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/console"
ftlv1 "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1"
Expand Down Expand Up @@ -66,7 +67,7 @@ func (c *Config) SetDefaults() {
}

// Start the Controller. Blocks until the context is cancelled.
func Start(ctx context.Context, config Config) error {
func Start(ctx context.Context, config Config, runnerScaling scaling.RunnerScaling) error {
config.SetDefaults()

logger := log.FromContext(ctx)
Expand All @@ -87,7 +88,7 @@ func Start(ctx context.Context, config Config) error {
return errors.WithStack(err)
}

svc, err := New(ctx, dal, config)
svc, err := New(ctx, dal, config, runnerScaling)
if err != nil {
return errors.WithStack(err)
}
Expand Down Expand Up @@ -120,12 +121,13 @@ type Service struct {
// Map from endpoint to client.
clients *ttlcache.Cache[string, clients]

routesMu sync.RWMutex
routes map[string][]dal.Route
config Config
routesMu sync.RWMutex
routes map[string][]dal.Route
config Config
runnerScaling scaling.RunnerScaling
}

func New(ctx context.Context, db *dal.DAL, config Config) (*Service, error) {
func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) {
key := config.Key
if config.Key.ULID() == (ulid.ULID{}) {
key = model.NewControllerKey()
Expand All @@ -138,6 +140,7 @@ func New(ctx context.Context, db *dal.DAL, config Config) (*Service, error) {
clients: ttlcache.New[string, clients](ttlcache.WithTTL[string, clients](time.Minute)),
routes: map[string][]dal.Route{},
config: config,
runnerScaling: runnerScaling,
}

go runWithRetries(ctx, time.Second*1, time.Second*2, svc.syncRoutes)
Expand All @@ -146,6 +149,7 @@ func New(ctx context.Context, db *dal.DAL, config Config) (*Service, error) {
go runWithRetries(ctx, config.RunnerTimeout, time.Second*10, svc.reapStaleRunners)
go runWithRetries(ctx, config.DeploymentReservationTimeout, time.Second*20, svc.releaseExpiredReservations)
go runWithRetries(ctx, time.Second*1, time.Second*5, svc.reconcileDeployments)
go runWithRetries(ctx, time.Second*1, time.Second*5, svc.reconcileRunners)
return svc, nil
}

Expand Down Expand Up @@ -792,6 +796,34 @@ func (s *Service) reconcileDeployments(ctx context.Context) error {
return nil
}

func (s *Service) reconcileRunners(ctx context.Context) error {
activeDeployments, err := s.dal.GetActiveDeployments(ctx)
if err != nil {
return errors.Wrap(err, "failed to get deployments needing reconciliation")
}

totalRunners := 0
for _, deployment := range activeDeployments {
totalRunners += deployment.MinReplicas
}

// It's possible that idles runners will get terminated here, but they will get recreated in the next
// reconciliation cycle.
idleRunners, err := s.dal.GetIdleRunners(ctx, 16, model.Labels{})
if err != nil {
return errors.WithStack(err)
}

idleRunnerKeys := slices.Map(idleRunners, func(r dal.Runner) model.RunnerKey { return r.Key })

err = s.runnerScaling.SetReplicas(ctx, totalRunners, idleRunnerKeys)
if err != nil {
return errors.WithStack(err)
}

return nil
}

func (s *Service) terminateRandomRunner(ctx context.Context, key model.DeploymentName) (bool, error) {
runners, err := s.dal.GetRunnersForDeployment(ctx, key)
if err != nil {
Expand Down
20 changes: 20 additions & 0 deletions backend/controller/scaling/k8s_scaling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package scaling

import (
"context"

"github.com/TBD54566975/ftl/backend/common/model"
)

var _ RunnerScaling = (*K8sScaling)(nil)

type K8sScaling struct {
}

func NewK8sScaling() *K8sScaling {
return &K8sScaling{}
}

func (k *K8sScaling) SetReplicas(ctx context.Context, replicas int, idleRunners []model.RunnerKey) error {
return nil
}
130 changes: 130 additions & 0 deletions backend/controller/scaling/local_scaling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package scaling

import (
"context"
"encoding/binary"
"fmt"
"net/url"
"os"
"path/filepath"
"sync"

"github.com/alecthomas/errors"
"github.com/alecthomas/kong"

"github.com/TBD54566975/ftl/backend/common/bind"
"github.com/TBD54566975/ftl/backend/common/log"
"github.com/TBD54566975/ftl/backend/common/model"
"github.com/TBD54566975/ftl/backend/runner"
)

var _ RunnerScaling = (*LocalScaling)(nil)

type LocalScaling struct {
lock sync.Mutex
cacheDir string
runners map[model.RunnerKey]context.CancelFunc

portAllocator *bind.BindAllocator
controllerAddresses []*url.URL
}

func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*url.URL) (*LocalScaling, error) {
cacheDir, err := os.UserCacheDir()
if err != nil {
return nil, errors.WithStack(err)
}
return &LocalScaling{
lock: sync.Mutex{},
cacheDir: cacheDir,
runners: map[model.RunnerKey]context.CancelFunc{},
portAllocator: portAllocator,
controllerAddresses: controllerAddresses,
}, nil
}

func (l *LocalScaling) SetReplicas(ctx context.Context, replicas int, idleRunners []model.RunnerKey) error {
l.lock.Lock()
defer l.lock.Unlock()

logger := log.FromContext(ctx)

replicasToAdd := replicas - len(l.runners)

if replicasToAdd <= 0 {
replicasToRemove := -replicasToAdd

for i := 0; i < replicasToRemove; i++ {
if len(idleRunners) == 0 {
return nil
}
runnerToRemove := idleRunners[len(idleRunners)-1]
idleRunners = idleRunners[:len(idleRunners)-1]

err := l.remove(ctx, runnerToRemove)
if err != nil {
return errors.WithStack(err)
}
}

return nil
}

logger.Infof("Adding %d replicas", replicasToAdd)
for i := 0; i < replicasToAdd; i++ {
i := i

controllerEndpoint := l.controllerAddresses[len(l.runners)%len(l.controllerAddresses)]
config := runner.Config{
Bind: l.portAllocator.Next(),
ControllerEndpoint: controllerEndpoint,
}

name := fmt.Sprintf("runner%d", i)
if err := kong.ApplyDefaults(&config, kong.Vars{
"deploymentdir": filepath.Join(l.cacheDir, "ftl-runner", name, "deployments"),
"language": "go,kotlin",
}); err != nil {
return errors.WithStack(err)
}

// Create a readable ULID for the runner.
var ulid [16]byte
binary.BigEndian.PutUint32(ulid[10:], uint32(len(l.runners)+1))
ulidStr := fmt.Sprintf("%025X", ulid)
err := config.Key.Scan(ulidStr)
if err != nil {
return errors.WithStack(err)
}

runnerCtx := log.ContextWithLogger(ctx, logger.Scope(name))

runnerCtx, cancel := context.WithCancel(runnerCtx)
l.runners[config.Key] = cancel

go func() {
logger.Infof("Starting runner: %s", config.Key)
err := runner.Start(runnerCtx, config)
if err != nil {
logger.Errorf(err, "Error starting runner: %s", err)
}
}()
}

return nil
}

func (l *LocalScaling) remove(ctx context.Context, runner model.RunnerKey) error {
log := log.FromContext(ctx)
alecthomas marked this conversation as resolved.
Show resolved Hide resolved
log.Infof("Removing runner: %s", runner)

cancel, ok := l.runners[runner]
if !ok {
return errors.Errorf("runner %s not found", runner)
}

cancel()
delete(l.runners, runner)

return nil
}
11 changes: 11 additions & 0 deletions backend/controller/scaling/scaling.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package scaling

import (
"context"

"github.com/TBD54566975/ftl/backend/common/model"
)

type RunnerScaling interface {
SetReplicas(ctx context.Context, replicas int, idleRunners []model.RunnerKey) error
}
4 changes: 3 additions & 1 deletion cmd/ftl-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/TBD54566975/ftl/backend/common/log"
"github.com/TBD54566975/ftl/backend/common/observability"
"github.com/TBD54566975/ftl/backend/controller"
"github.com/TBD54566975/ftl/backend/controller/scaling"
)

var version = "dev"
Expand All @@ -38,6 +39,7 @@ func main() {
ctx := log.ContextWithLogger(context.Background(), log.Configure(os.Stderr, cli.LogConfig))
err = observability.Init(ctx, "ftl-controller", version, cli.ObservabilityConfig)
kctx.FatalIfErrorf(err, "failed to initialize observability")
err = controller.Start(ctx, cli.ControllerConfig)

err = controller.Start(ctx, cli.ControllerConfig, scaling.NewK8sScaling())
kctx.FatalIfErrorf(err)
}
Loading