Skip to content

Commit

Permalink
feat: remove runner state (#2686)
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas authored Sep 16, 2024
1 parent 9a33881 commit 1dc1678
Show file tree
Hide file tree
Showing 15 changed files with 740 additions and 1,068 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ jobs:
run: just lint-scripts
proto-breaking:
name: Proto Breaking Change Check
# if: ${{ github.event_name == 'pull_request' && !contains(github.event.pull_request.labels.*.name, 'skip-proto-breaking') }}
if: ${{ github.event_name == 'pull_request' && !contains(github.event.pull_request.labels.*.name, 'skip-proto-breaking') }}
runs-on: ubuntu-latest
steps:
- name: Checkout code
Expand Down
2 changes: 0 additions & 2 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,6 @@ func (s *Service) Status(ctx context.Context, req *connect.Request[ftlv1.StatusR
return &ftlv1.StatusResponse_Runner{
Key: r.Key.String(),
Endpoint: r.Endpoint,
State: r.State.ToProto(),
Deployment: deployment,
Labels: labels,
}, nil
Expand Down Expand Up @@ -589,7 +588,6 @@ func (s *Service) RegisterRunner(ctx context.Context, stream *connect.ClientStre
err = s.dal.UpsertRunner(ctx, dal.Runner{
Key: runnerKey,
Endpoint: msg.Endpoint,
State: dal.RunnerStateFromProto(msg.State),
Deployment: deploymentKey,
Labels: msg.Labels.AsMap(),
})
Expand Down
40 changes: 0 additions & 40 deletions backend/controller/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ func runnerFromDB(row dalsql.GetRunnerRow) Runner {
return Runner{
Key: row.RunnerKey,
Endpoint: row.Endpoint,
State: RunnerState(row.State),
Deployment: row.DeploymentKey,
Labels: attrs,
}
Expand All @@ -92,7 +91,6 @@ func runnerFromDB(row dalsql.GetRunnerRow) Runner {
type Runner struct {
Key model.RunnerKey
Endpoint string
State RunnerState
ReservationTimeout optional.Option[time.Duration]
Module optional.Option[string]
Deployment model.DeploymentKey
Expand All @@ -110,23 +108,6 @@ type Reconciliation struct {
RequiredReplicas int
}

type RunnerState string

// Runner states.
const (
RunnerStateNew = RunnerState(dalsql.RunnerStateNew)
RunnerStateAssigned = RunnerState(dalsql.RunnerStateAssigned)
RunnerStateDead = RunnerState(dalsql.RunnerStateDead)
)

func RunnerStateFromProto(state ftlv1.RunnerState) RunnerState {
return RunnerState(strings.ToLower(strings.TrimPrefix(state.String(), "RUNNER_")))
}

func (s RunnerState) ToProto() ftlv1.RunnerState {
return ftlv1.RunnerState(ftlv1.RunnerState_value["RUNNER_"+strings.ToUpper(string(s))])
}

type ControllerState string

// Controller states.
Expand Down Expand Up @@ -192,16 +173,6 @@ func (r Route) String() string {

func (r Route) notification() {}

func WithReservation(ctx context.Context, reservation Reservation, fn func() error) error {
if err := fn(); err != nil {
if rerr := reservation.Rollback(ctx); rerr != nil {
err = errors.Join(err, rerr)
}
return err
}
return reservation.Commit(ctx)
}

func New(ctx context.Context, conn libdal.Connection, encryption *encryption.Service) *DAL {
var d *DAL
d = &DAL{
Expand Down Expand Up @@ -295,7 +266,6 @@ func (d *DAL) GetStatus(ctx context.Context) (Status, error) {
return Runner{
Key: in.RunnerKey,
Endpoint: in.Endpoint,
State: RunnerState(in.State),
Deployment: in.DeploymentKey,
Labels: attrs,
}, nil
Expand Down Expand Up @@ -342,7 +312,6 @@ func (d *DAL) GetRunnersForDeployment(ctx context.Context, deployment model.Depl
runners = append(runners, Runner{
Key: row.Key,
Endpoint: row.Endpoint,
State: RunnerState(row.State),
Deployment: deployment,
Labels: attrs,
})
Expand Down Expand Up @@ -519,7 +488,6 @@ func (d *DAL) UpsertRunner(ctx context.Context, runner Runner) error {
deploymentID, err := d.db.UpsertRunner(ctx, dalsql.UpsertRunnerParams{
Key: runner.Key,
Endpoint: runner.Endpoint,
State: dalsql.RunnerState(runner.State),
DeploymentKey: runner.Deployment,
Labels: attrBytes,
})
Expand Down Expand Up @@ -864,14 +832,6 @@ func (d *DAL) GetRoutingTable(ctx context.Context, modules []string) (map[string
return out, nil
}

func (d *DAL) GetRunnerState(ctx context.Context, runnerKey model.RunnerKey) (RunnerState, error) {
state, err := d.db.GetRunnerState(ctx, runnerKey)
if err != nil {
return "", libdal.TranslatePGError(err)
}
return RunnerState(state), nil
}

func (d *DAL) GetRunner(ctx context.Context, runnerKey model.RunnerKey) (Runner, error) {
row, err := d.db.GetRunner(ctx, runnerKey)
if err != nil {
Expand Down
22 changes: 0 additions & 22 deletions backend/controller/dal/dal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/TBD54566975/ftl/backend/controller/encryption"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/backend/libdal"
ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/backend/schema"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/model"
Expand Down Expand Up @@ -108,24 +107,11 @@ func TestDAL(t *testing.T) {
Key: runnerID,
Labels: labels,
Endpoint: "http://localhost:8080",
State: RunnerStateNew,
Deployment: deploymentKey,
})
assert.NoError(t, err)
})

t.Run("RegisterRunnerFailsOnDuplicate", func(t *testing.T) {
err = dal.UpsertRunner(ctx, Runner{
Key: model.NewRunnerKey("localhost", "8080"),
Labels: labels,
Endpoint: "http://localhost:8080",
State: RunnerStateNew,
Deployment: deploymentKey,
})
assert.Error(t, err)
assert.IsError(t, err, libdal.ErrConflict)
})

expectedRunner := Runner{
Key: runnerID,
Labels: labels,
Expand All @@ -143,7 +129,6 @@ func TestDAL(t *testing.T) {
Key: runnerID,
Labels: labels,
Endpoint: "http://localhost:8080",
State: RunnerStateAssigned,
Deployment: deploymentKey,
})
assert.NoError(t, err)
Expand All @@ -156,7 +141,6 @@ func TestDAL(t *testing.T) {
Key: runnerID,
Labels: labels,
Endpoint: "http://localhost:8080",
State: RunnerStateAssigned,
Deployment: deploymentKey,
}}, runners)
})
Expand All @@ -183,7 +167,6 @@ func TestDAL(t *testing.T) {
Key: runnerID,
Labels: labels,
Endpoint: "http://localhost:8080",
State: RunnerStateAssigned,
Deployment: model.NewDeploymentKey("test"),
})
assert.Error(t, err)
Expand Down Expand Up @@ -273,8 +256,3 @@ func artefactContent(t testing.TB, artefacts []*model.Artefact) [][]byte {
}
return result
}

func TestRunnerStateFromProto(t *testing.T) {
state := ftlv1.RunnerState_RUNNER_NEW
assert.Equal(t, RunnerStateNew, RunnerStateFromProto(state))
}
44 changes: 0 additions & 44 deletions backend/controller/dal/internal/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion backend/controller/dal/internal/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 14 additions & 29 deletions backend/controller/dal/internal/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -75,55 +75,49 @@ WITH deployment_rel AS (
WHERE d.key = sqlc.arg('deployment_key')::deployment_key
LIMIT 1)
INSERT
INTO runners (key, endpoint, state, labels, deployment_id, last_seen)
INTO runners (key, endpoint, labels, deployment_id, last_seen)
VALUES ($1,
$2,
$3,
$4,
(SELECT id FROM deployment_rel),
NOW() AT TIME ZONE 'utc')
ON CONFLICT (key) DO UPDATE SET endpoint = $2,
state = $3,
labels = $4,
labels = $3,
last_seen = NOW() AT TIME ZONE 'utc'
RETURNING deployment_id;

-- name: KillStaleRunners :one
WITH matches AS (
UPDATE runners
SET state = 'dead'
WHERE state <> 'dead' AND last_seen < (NOW() AT TIME ZONE 'utc') - sqlc.arg('timeout')::INTERVAL
DELETE FROM runners
WHERE last_seen < (NOW() AT TIME ZONE 'utc') - sqlc.arg('timeout')::INTERVAL
RETURNING 1)
SELECT COUNT(*)
FROM matches;

-- name: DeregisterRunner :one
WITH matches AS (
UPDATE runners
SET state = 'dead'
DELETE FROM runners
WHERE key = sqlc.arg('key')::runner_key
RETURNING 1)
SELECT COUNT(*)
FROM matches;

-- name: GetActiveRunners :many
SELECT DISTINCT ON (r.key) r.key AS runner_key,
SELECT DISTINCT ON (r.key) r.key AS runner_key,
r.endpoint,
r.state,
r.labels,
r.last_seen,
r.module_name,
d.key AS deployment_key
FROM runners r
INNER JOIN deployments d on d.id = r.deployment_id
WHERE r.state <> 'dead'
ORDER BY r.key;

-- name: GetActiveDeployments :many
SELECT sqlc.embed(d), m.name AS module_name, m.language, COUNT(r.id) AS replicas
FROM deployments d
JOIN modules m ON d.module_id = m.id
LEFT JOIN runners r ON d.id = r.deployment_id AND r.state = 'assigned'
LEFT JOIN runners r ON d.id = r.deployment_id
WHERE min_replicas > 0
GROUP BY d.id, m.name, m.language;

Expand All @@ -148,7 +142,7 @@ SELECT d.min_replicas,
r.endpoint,
r.labels AS runner_labels
FROM deployments d
LEFT JOIN runners r on d.id = r.deployment_id AND r.state != 'dead'
LEFT JOIN runners r on d.id = r.deployment_id
WHERE d.min_replicas > 0
ORDER BY d.key;

Expand All @@ -166,15 +160,9 @@ WHERE m.name = $1
AND min_replicas > 0
LIMIT 1;

-- name: GetRunnerState :one
SELECT state
FROM runners
WHERE key = sqlc.arg('key')::runner_key;

-- name: GetRunner :one
SELECT DISTINCT ON (r.key) r.key AS runner_key,
r.endpoint,
r.state,
r.labels,
r.last_seen,
r.module_name,
Expand All @@ -187,13 +175,12 @@ WHERE r.key = sqlc.arg('key')::runner_key;
SELECT endpoint, r.key AS runner_key, r.module_name, d.key deployment_key
FROM runners r
LEFT JOIN deployments d on r.deployment_id = d.id
WHERE state = 'assigned'
AND (COALESCE(cardinality(sqlc.arg('modules')::TEXT[]), 0) = 0
WHERE (COALESCE(cardinality(sqlc.arg('modules')::TEXT[]), 0) = 0
OR module_name = ANY (sqlc.arg('modules')::TEXT[]));

-- name: GetRouteForRunner :one
-- Retrieve routing information for a runner.
SELECT endpoint, r.key AS runner_key, r.module_name, d.key deployment_key, r.state
SELECT endpoint, r.key AS runner_key, r.module_name, d.key deployment_key
FROM runners r
LEFT JOIN deployments d on r.deployment_id = d.id
WHERE r.key = sqlc.arg('key')::runner_key;
Expand All @@ -202,8 +189,7 @@ WHERE r.key = sqlc.arg('key')::runner_key;
SELECT *
FROM runners r
INNER JOIN deployments d on r.deployment_id = d.id
WHERE state = 'assigned'
AND d.key = sqlc.arg('key')::deployment_key;
WHERE d.key = sqlc.arg('key')::deployment_key;

-- name: CreateRequest :exec
INSERT INTO requests (origin, "key", source_addr)
Expand Down Expand Up @@ -243,8 +229,7 @@ SELECT r.key AS runner_key, d.key AS deployment_key, endpoint, ir.path, ir.modul
FROM ingress_routes ir
INNER JOIN runners r ON ir.deployment_id = r.deployment_id
INNER JOIN deployments d ON ir.deployment_id = d.id
WHERE r.state = 'assigned'
AND ir.method = $1;
WHERE ir.method = $1;

-- name: GetActiveIngressRoutes :many
SELECT d.key AS deployment_key, ir.module, ir.verb, ir.method, ir.path
Expand Down Expand Up @@ -522,8 +507,8 @@ VALUES (
WITH runner_count AS (
SELECT count(r.deployment_id) as runner_count,
r.deployment_id as deployment
FROM runners r WHERE r.state = 'assigned'
GROUP BY deployment HAVING count(r.deployment_id) > 0
FROM runners r
GROUP BY deployment
)
SELECT
subs.key::subscription_key as key,
Expand Down
Loading

0 comments on commit 1dc1678

Please sign in to comment.