Skip to content

Commit

Permalink
feat: keys include host-port-suffix (#1071)
Browse files Browse the repository at this point in the history
Fixes #1047
Fixes #1045

Changes `ControllerKey` and `RunnerKey` to have the format:
- `<r/c>-<hostname>-<port>-<4bytehex>` for passing around
- `<hostname>-<port>-<4bytehex>` in the db
- non production environments use shorter keys:
`r-<incrementing-digits>` and `<incrementing digits>` in db

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
matt2e and github-actions[bot] authored Mar 13, 2024
1 parent a0cedb7 commit e564ab1
Show file tree
Hide file tree
Showing 29 changed files with 239 additions and 192 deletions.
8 changes: 4 additions & 4 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/jellydator/ttlcache/v3"
"github.com/jpillora/backoff"
"github.com/oklog/ulid/v2"
"golang.org/x/exp/maps"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"
Expand Down Expand Up @@ -55,7 +54,7 @@ type Config struct {
ConsoleURL *url.URL `help:"The public URL of the console (for CORS)." env:"FTL_CONTROLLER_CONSOLE_URL"`
AllowOrigins []*url.URL `help:"Allow CORS requests to ingress endpoints from these origins." env:"FTL_CONTROLLER_ALLOW_ORIGIN"`
ContentTime time.Time `help:"Time to use for console resource timestamps." default:"${timestamp=1970-01-01T00:00:00Z}"`
Key model.ControllerKey `help:"Controller key (auto)." placeholder:"C<ULID>" default:"C00000000000000000000000000"`
Key model.ControllerKey `help:"Controller key (auto)."`
DSN string `help:"DAL DSN." default:"postgres://localhost:54320/ftl?sslmode=disable&user=postgres&password=secret" env:"FTL_CONTROLLER_DSN"`
RunnerTimeout time.Duration `help:"Runner heartbeat timeout." default:"10s"`
DeploymentReservationTimeout time.Duration `help:"Deployment reservation timeout." default:"120s"`
Expand Down Expand Up @@ -152,9 +151,10 @@ type Service struct {
}

func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) {
var zero model.ControllerKey
key := config.Key
if config.Key.ULID() == (ulid.ULID{}) {
key = model.NewControllerKey()
if config.Key == zero {
key = model.NewControllerKey(config.Bind.Hostname(), config.Bind.Port())
}
config.SetDefaults()
svc := &Service{
Expand Down
37 changes: 20 additions & 17 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,9 @@ func runnerFromDB(row sql.GetRunnerRow) Runner {
if err := json.Unmarshal(row.Labels, &attrs); err != nil {
return Runner{}
}

return Runner{
Key: model.RunnerKey(row.RunnerKey),
Key: row.RunnerKey,
Endpoint: row.Endpoint,
State: RunnerState(row.State),
Deployment: deployment,
Expand Down Expand Up @@ -308,8 +309,9 @@ func (d *DAL) GetStatus(
if err := json.Unmarshal(in.Labels, &attrs); err != nil {
return Runner{}, fmt.Errorf("invalid attributes JSON for runner %s: %w", in.RunnerKey, err)
}

return Runner{
Key: model.RunnerKey(in.RunnerKey),
Key: in.RunnerKey,
Endpoint: in.Endpoint,
State: RunnerState(in.State),
Deployment: deployment,
Expand All @@ -335,7 +337,7 @@ func (d *DAL) GetStatus(
Routes: slices.Map(routes, func(row sql.GetRoutingTableRow) Route {
return Route{
Module: row.ModuleName.MustGet(),
Runner: model.RunnerKey(row.RunnerKey),
Runner: row.RunnerKey,
Deployment: row.DeploymentName,
Endpoint: row.Endpoint,
}
Expand All @@ -354,8 +356,9 @@ func (d *DAL) GetRunnersForDeployment(ctx context.Context, deployment model.Depl
if err := json.Unmarshal(row.Labels, &attrs); err != nil {
return nil, fmt.Errorf("invalid attributes JSON for runner %d: %w", row.ID, err)
}

runners = append(runners, Runner{
Key: model.RunnerKey(row.Key),
Key: row.Key,
Endpoint: row.Endpoint,
State: RunnerState(row.State),
Deployment: optional.Some(deployment),
Expand Down Expand Up @@ -502,7 +505,7 @@ func (d *DAL) UpsertRunner(ctx context.Context, runner Runner) error {
return fmt.Errorf("%s: %w", "failed to JSON encode runner labels", err)
}
deploymentID, err := d.db.UpsertRunner(ctx, sql.UpsertRunnerParams{
Key: sql.Key(runner.Key),
Key: runner.Key,
Endpoint: runner.Endpoint,
State: sql.RunnerState(runner.State),
DeploymentName: pgDeploymentName,
Expand All @@ -511,9 +514,6 @@ func (d *DAL) UpsertRunner(ctx context.Context, runner Runner) error {
if err != nil {
return translatePGError(err)
}
if err != nil {
return translatePGError(err)
}
if runner.Deployment.Ok() && !deploymentID.Ok() {
return fmt.Errorf("deployment %s not found", runner.Deployment)
}
Expand All @@ -534,7 +534,7 @@ func (d *DAL) KillStaleControllers(ctx context.Context, age time.Duration) (int6

// DeregisterRunner deregisters the given runner.
func (d *DAL) DeregisterRunner(ctx context.Context, key model.RunnerKey) error {
count, err := d.db.DeregisterRunner(ctx, sql.Key(key))
count, err := d.db.DeregisterRunner(ctx, key)
if err != nil {
return translatePGError(err)
}
Expand Down Expand Up @@ -574,11 +574,12 @@ func (d *DAL) ReserveRunnerForDeployment(ctx context.Context, deployment model.D
cancel()
return nil, fmt.Errorf("failed to JSON decode labels for runner %d: %w", runner.ID, err)
}

return &postgresClaim{
cancel: cancel,
tx: tx,
runner: Runner{
Key: model.RunnerKey(runner.Key),
Key: runner.Key,
Endpoint: runner.Endpoint,
State: RunnerState(runner.State),
Deployment: optional.Some(deployment),
Expand Down Expand Up @@ -628,7 +629,7 @@ func (d *DAL) SetDeploymentReplicas(ctx context.Context, key model.DeploymentNam
}

err = tx.InsertDeploymentUpdatedEvent(ctx, sql.InsertDeploymentUpdatedEventParams{
DeploymentName: key.String(),
DeploymentName: string(key),
MinReplicas: int32(minReplicas),
PrevMinReplicas: deployment.MinReplicas,
})
Expand Down Expand Up @@ -766,8 +767,9 @@ func (d *DAL) GetProcessList(ctx context.Context) ([]Process, error) {
if err := json.Unmarshal(row.RunnerLabels, &labels); err != nil {
return Process{}, fmt.Errorf("invalid labels JSON for runner %s: %w", row.RunnerKey, err)
}

runner = optional.Some(ProcessRunner{
Key: model.RunnerKey(row.RunnerKey.MustGet()),
Key: row.RunnerKey.MustGet(),
Endpoint: endpoint,
Labels: labels,
})
Expand Down Expand Up @@ -812,8 +814,9 @@ func (d *DAL) GetIdleRunners(ctx context.Context, limit int, labels model.Labels
if err != nil {
return Runner{}, fmt.Errorf("%s: %w", "could not unmarshal labels", err)
}

return Runner{
Key: model.RunnerKey(row.Key),
Key: row.Key,
Endpoint: row.Endpoint,
State: RunnerState(row.State),
Labels: labels,
Expand All @@ -840,23 +843,23 @@ func (d *DAL) GetRoutingTable(ctx context.Context, modules []string) (map[string
out[moduleName] = append(out[moduleName], Route{
Module: moduleName,
Deployment: route.DeploymentName,
Runner: model.RunnerKey(route.RunnerKey),
Runner: route.RunnerKey,
Endpoint: route.Endpoint,
})
}
return out, nil
}

func (d *DAL) GetRunnerState(ctx context.Context, runnerKey model.RunnerKey) (RunnerState, error) {
state, err := d.db.GetRunnerState(ctx, sql.Key(runnerKey))
state, err := d.db.GetRunnerState(ctx, runnerKey)
if err != nil {
return "", translatePGError(err)
}
return RunnerState(state), nil
}

func (d *DAL) GetRunner(ctx context.Context, runnerKey model.RunnerKey) (Runner, error) {
row, err := d.db.GetRunner(ctx, sql.Key(runnerKey))
row, err := d.db.GetRunner(ctx, runnerKey)
if err != nil {
return Runner{}, translatePGError(err)
}
Expand Down Expand Up @@ -927,7 +930,7 @@ func (d *DAL) GetIngressRoutes(ctx context.Context, method string) ([]IngressRou
}
return slices.Map(routes, func(row sql.GetIngressRoutesRow) IngressRoute {
return IngressRoute{
Runner: model.RunnerKey(row.RunnerKey),
Runner: row.RunnerKey,
Deployment: row.DeploymentName,
Endpoint: row.Endpoint,
Path: row.Path,
Expand Down
6 changes: 3 additions & 3 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestDAL(t *testing.T) {
assert.Equal(t, []sha256.SHA256{misshingSHA}, missing)
})

runnerID := model.NewRunnerKey()
runnerID := model.NewRunnerKey("localhost", "8080")
labels := map[string]any{"languages": []any{"go"}}

t.Run("RegisterRunner", func(t *testing.T) {
Expand All @@ -101,7 +101,7 @@ func TestDAL(t *testing.T) {

t.Run("RegisterRunnerFailsOnDuplicate", func(t *testing.T) {
err = dal.UpsertRunner(ctx, Runner{
Key: model.NewRunnerKey(),
Key: model.NewRunnerKey("localhost", "8080"),
Labels: labels,
Endpoint: "http://localhost:8080",
State: RunnerStateIdle,
Expand Down Expand Up @@ -333,7 +333,7 @@ func TestDAL(t *testing.T) {
})

t.Run("DeregisterRunnerFailsOnMissing", func(t *testing.T) {
err = dal.DeregisterRunner(ctx, model.NewRunnerKey())
err = dal.DeregisterRunner(ctx, model.NewRunnerKey("localhost", "8080"))
assert.IsError(t, err, ErrNotFound)
})
}
Expand Down
20 changes: 13 additions & 7 deletions backend/controller/scaling/localscaling/local_scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type LocalScaling struct {

portAllocator *bind.BindAllocator
controllerAddresses []*url.URL

prevRunnerSuffix int
}

func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*url.URL) (*LocalScaling, error) {
Expand All @@ -40,6 +42,7 @@ func NewLocalScaling(portAllocator *bind.BindAllocator, controllerAddresses []*u
runners: map[model.RunnerKey]context.CancelFunc{},
portAllocator: portAllocator,
controllerAddresses: controllerAddresses,
prevRunnerSuffix: -1,
}, nil
}

Expand Down Expand Up @@ -72,25 +75,28 @@ func (l *LocalScaling) SetReplicas(ctx context.Context, replicas int, idleRunner

logger.Debugf("Adding %d replicas", replicasToAdd)
for i := 0; i < replicasToAdd; i++ {
i := i

controllerEndpoint := l.controllerAddresses[len(l.runners)%len(l.controllerAddresses)]

bind := l.portAllocator.Next()
keySuffix := l.prevRunnerSuffix + 1
l.prevRunnerSuffix = keySuffix

config := runner.Config{
Bind: l.portAllocator.Next(),
Bind: bind,
ControllerEndpoint: controllerEndpoint,
TemplateDir: templateDir(ctx),
Key: model.NewRunnerKey(),
Key: model.NewLocalRunnerKey(keySuffix),
}

name := fmt.Sprintf("runner%d", i)
simpleName := fmt.Sprintf("runner%d", keySuffix)
if err := kong.ApplyDefaults(&config, kong.Vars{
"deploymentdir": filepath.Join(l.cacheDir, "ftl-runner", name, "deployments"),
"deploymentdir": filepath.Join(l.cacheDir, "ftl-runner", simpleName, "deployments"),
"language": "go,kotlin",
}); err != nil {
return err
}

runnerCtx := log.ContextWithLogger(ctx, logger.Scope(name))
runnerCtx := log.ContextWithLogger(ctx, logger.Scope(simpleName))

runnerCtx, cancel := context.WithCancel(runnerCtx)
l.runners[config.Key] = cancel
Expand Down
8 changes: 4 additions & 4 deletions backend/controller/scheduledtask/scheduledtask_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ func TestCron(t *testing.T) {
}

controllers := []*controller{
{controller: dal.Controller{Key: model.NewControllerKey()}},
{controller: dal.Controller{Key: model.NewControllerKey()}},
{controller: dal.Controller{Key: model.NewControllerKey()}},
{controller: dal.Controller{Key: model.NewControllerKey()}},
{controller: dal.Controller{Key: model.NewControllerKey("localhost", "8080")}},
{controller: dal.Controller{Key: model.NewControllerKey("localhost", "8081")}},
{controller: dal.Controller{Key: model.NewControllerKey("localhost", "8082")}},
{controller: dal.Controller{Key: model.NewControllerKey("localhost", "8083")}},
}

clock := clock.NewMock()
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions backend/controller/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions backend/controller/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ WITH matches AS (
UPDATE runners
SET state = 'dead',
deployment_id = NULL
WHERE key = $1
WHERE key = sqlc.arg('key')::runner_key
RETURNING 1)
SELECT COUNT(*)
FROM matches;
Expand Down Expand Up @@ -222,7 +222,7 @@ RETURNING runners.*;
-- name: GetRunnerState :one
SELECT state
FROM runners
WHERE key = $1;
WHERE key = sqlc.arg('key')::runner_key;

-- name: GetRunner :one
SELECT DISTINCT ON (r.key) r.key AS runner_key,
Expand All @@ -236,7 +236,7 @@ SELECT DISTINCT ON (r.key) r.key AS runner_key
THEN d.name END, NULL) AS deployment_name
FROM runners r
LEFT JOIN deployments d on d.id = r.deployment_id OR r.deployment_id IS NULL
WHERE r.key = $1;
WHERE r.key = sqlc.arg('key')::runner_key;

-- name: GetRoutingTable :many
SELECT endpoint, r.key AS runner_key, r.module_name, d.name deployment_name
Expand All @@ -251,7 +251,7 @@ WHERE state = 'assigned'
SELECT endpoint, r.key AS runner_key, r.module_name, d.name deployment_name, r.state
FROM runners r
LEFT JOIN deployments d on r.deployment_id = d.id
WHERE r.key = $1;
WHERE r.key = sqlc.arg('key')::runner_key;

-- name: GetRunnersForDeployment :many
SELECT *
Expand Down
Loading

0 comments on commit e564ab1

Please sign in to comment.