Skip to content
This repository has been archived by the owner on Dec 16, 2024. It is now read-only.

Commit

Permalink
fix: fix zombie runners if control-plane was down when runners restar…
Browse files Browse the repository at this point in the history
…ted (#33)

Upeon restarting, the control-plane previously had no way of knowing if
a registration was from an existing runner or a new one. Added a new
runner-generated UUID key to the runners table to distinguish between
the two cases.
  • Loading branch information
alecthomas authored May 31, 2023
1 parent b6ab744 commit 07bbe63
Show file tree
Hide file tree
Showing 14 changed files with 193 additions and 138 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/console/src/protos/**/* linguist-generated=true
File renamed without changes.
16 changes: 12 additions & 4 deletions console/src/protos/xyz/block/ftl/v1/ftl_pb.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 27 additions & 10 deletions controlplane/controlplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,19 +76,14 @@ type Service struct {
}

func New(ctx context.Context, dal *dal.DAL, heartbeatTimeout time.Duration, artefactChunkSize int) (*Service, error) {
logger := log.FromContext(ctx)
count, err := dal.DeleteStaleRunners(context.Background(), heartbeatTimeout)
if err != nil {
return nil, errors.Wrap(err, "failed to delete stale runners")
}
logger.Warnf("Deleted %d stale runners", count)
return &Service{
svc := &Service{
dal: dal,
heartbeatTimeout: heartbeatTimeout,
artefactChunkSize: artefactChunkSize,
}, nil
}
go svc.reapStaleRunners(ctx)
return svc, nil
}

func (s *Service) Deploy(ctx context.Context, req *connect.Request[ftlv1.DeployRequest]) (*connect.Response[ftlv1.DeployResponse], error) {
panic("unimplemented")
}
Expand All @@ -110,6 +105,10 @@ func (s *Service) RegisterRunner(ctx context.Context, req *connect.ClientStream[
if endpoint.Scheme != "http" && endpoint.Scheme != "https" {
return nil, connect.NewError(connect.CodeUnavailable, errors.Errorf("invalid endpoint scheme %q", endpoint.Scheme))
}
key, err := uuid.Parse(msg.Key)
if err != nil {
return nil, connect.NewError(connect.CodeFailedPrecondition, errors.Wrap(err, "invalid key"))
}

// Check if we can contact the runner.
client := rpc.Dial(ftlv1connect.NewRunnerServiceClient, endpoint.String())
Expand All @@ -119,7 +118,7 @@ func (s *Service) RegisterRunner(ctx context.Context, req *connect.ClientStream[
return nil, connect.NewError(connect.CodeUnavailable, errors.Wrap(err, "failed to connect to runner"))
}

runnerID, err := s.dal.RegisterRunner(ctx, msg.Language, endpoint)
runnerID, err := s.dal.RegisterRunner(ctx, key, msg.Language, endpoint)
if errors.Is(err, dal.ErrConflict) {
return nil, connect.NewError(connect.CodeAlreadyExists, err)
} else if err != nil {
Expand Down Expand Up @@ -302,3 +301,21 @@ func (s *Service) getDeployment(ctx context.Context, key string) (*dal.Deploymen
}
return deployment, nil
}

func (s *Service) reapStaleRunners(ctx context.Context) {
logger := log.FromContext(ctx)
for {
count, err := s.dal.DeleteStaleRunners(context.Background(), s.heartbeatTimeout)
if err != nil {
logger.Errorf(err, "Failed to delete stale runners")
} else if count > 0 {
logger.Warnf("Deleted %d stale runners", count)
}
select {
case <-ctx.Done():
return

case <-time.After(s.heartbeatTimeout):
}
}
}
6 changes: 6 additions & 0 deletions controlplane/controlplane_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/alecthomas/concurrency"
connect "github.com/bufbuild/connect-go"
grpcreflect "github.com/bufbuild/connect-grpcreflect-go"
"github.com/google/uuid"
"github.com/jpillora/backoff"

"github.com/TBD54566975/ftl/common/log"
Expand All @@ -26,14 +27,17 @@ func TestControlPlaneRegisterRunnerHeartbeatClose(t *testing.T) {

stream := client.RegisterRunner(ctx)
t.Cleanup(func() { _, _ = stream.CloseAndReceive() })
key := uuid.NewString()
err := stream.Send(&ftlv1.RegisterRunnerRequest{
Key: key,
Language: "go",
Endpoint: bind.String(),
})
assert.NoError(t, err)
time.Sleep(time.Millisecond * 100)

err = stream.Send(&ftlv1.RegisterRunnerRequest{
Key: key,
Language: "go",
Endpoint: bind.String(),
})
Expand All @@ -54,8 +58,10 @@ func TestControlPlaneRegisterRunnerHeartbeatClose(t *testing.T) {
func TestControlPlaneRegisterRunnerHeartbeatTimeout(t *testing.T) {
db, client, bind, ctx := startForTesting(t)

key := uuid.NewString()
stream := client.RegisterRunner(ctx)
err := stream.Send(&ftlv1.RegisterRunnerRequest{
Key: key,
Language: "go",
Endpoint: bind.String(),
})
Expand Down
4 changes: 2 additions & 2 deletions controlplane/internal/dal/dal.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,8 +205,8 @@ type RunnerID int64
// RegisterRunner registers a new runner.
//
// It will return ErrConflict if a runner with the same endpoint already exists.
func (d *DAL) RegisterRunner(ctx context.Context, language string, endpoint *url.URL) (RunnerID, error) {
id, err := d.db.RegisterRunner(ctx, language, endpoint.String())
func (d *DAL) RegisterRunner(ctx context.Context, key uuid.UUID, language string, endpoint *url.URL) (RunnerID, error) {
id, err := d.db.RegisterRunner(ctx, key, language, endpoint.String())
if isPGConflict(err) {
return 0, errors.Wrap(ErrConflict, "runner already registered")
}
Expand Down
1 change: 1 addition & 0 deletions controlplane/internal/sql/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion controlplane/internal/sql/querier.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion controlplane/internal/sql/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ FROM artefacts a
WHERE a.id = @id;

-- name: RegisterRunner :one
INSERT INTO runners (language, endpoint) VALUES ($1, $2)
INSERT INTO runners (key, language, endpoint) VALUES ($1, $2, $3)
ON CONFLICT (key) DO UPDATE SET language = $2, endpoint = $3
RETURNING id;

-- name: DeleteStaleRunners :one
Expand Down
16 changes: 10 additions & 6 deletions controlplane/internal/sql/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions controlplane/internal/sql/schema/001_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ CREATE INDEX deployment_artefacts_deployment_id_idx ON deployment_artefacts (dep
-- Runners are processes that are available to run modules.
CREATE TABLE runners (
id BIGSERIAL PRIMARY KEY NOT NULL,
-- Unique identifier for this runner, generated at startup.
key UUID UNIQUE NOT NULL,
last_seen TIMESTAMP WITHOUT TIME ZONE NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'),
language VARCHAR(64) NOT NULL,
endpoint VARCHAR(255) UNIQUE NOT NULL,
Expand Down
Loading

0 comments on commit 07bbe63

Please sign in to comment.