Skip to content

Commit

Permalink
feat: add leases and switch scheduledtask to use them (#1351)
Browse files Browse the repository at this point in the history
Leases provide limited exclusive access to a resource for a period of
time with automatic renewal.

Note: Currently each lease renewal occurs in a dedicated goroutine, via
an UPDATE issued every two seconds. An easy future optimisation to
reduce the number of updates is to aggregate all UPDATES into a single
statement run at the same frequency.

See [design doc](https://hackmd.io/@ftl/Sym_GKEb0).
  • Loading branch information
alecthomas authored Apr 29, 2024
1 parent f8ce99d commit 4edee31
Show file tree
Hide file tree
Showing 19 changed files with 612 additions and 116 deletions.
6 changes: 3 additions & 3 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ build-generate:
@mk internal/log/log_level_string.go : internal/log/api.go -- go generate -x ./internal/log

# Build command-line tools
build +tools: build-protos build-sqlc build-zips build-frontend
build +tools: build-protos build-zips build-frontend
#!/bin/bash
shopt -s extglob
for tool in $@; do mk "{{RELEASE}}/$tool" : !(build) -- go build -o "{{RELEASE}}/$tool" -tags release -ldflags "-X github.com/TBD54566975/ftl.Version={{VERSION}} -X github.com/TBD54566975/ftl.Timestamp={{TIMESTAMP}}" "./cmd/$tool"; done
Expand All @@ -56,8 +56,8 @@ init-db:
dbmate --migrations-dir backend/controller/sql/schema up

# Regenerate SQLC code (requires init-db to be run first)
build-sqlc:
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/sql/schema -- sqlc generate
build-sqlc: init-db
@mk backend/controller/sql/{db.go,models.go,querier.go,queries.sql.go} : backend/controller/sql/queries.sql backend/controller/sql/schema sqlc.yaml -- sqlc generate

# Build the ZIP files that are embedded in the FTL release binaries
build-zips: build-kt-runtime
Expand Down
82 changes: 39 additions & 43 deletions backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,34 +168,14 @@ type Service struct {
increaseReplicaFailures map[string]int
}

type jobConfig struct {
job scheduledtask.Job
backoff backoff.Backoff
develBackoff optional.Option[backoff.Backoff]
}

func (j jobConfig) getBackoff(devel bool) backoff.Backoff {
var bo backoff.Backoff
if devel {
var ok bool
if bo, ok = j.develBackoff.Get(); !ok {
// if in devel and develBackoff is empty, use 1s for min/max
bo = backoff.Backoff{Min: time.Second, Max: time.Second}
}
} else {
bo = j.backoff
}
return bo
}

func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.RunnerScaling) (*Service, error) {
key := config.Key
if config.Key.IsZero() {
key = model.NewControllerKey(config.Bind.Hostname(), config.Bind.Port())
}
config.SetDefaults()
svc := &Service{
tasks: scheduledtask.New(ctx, key),
tasks: scheduledtask.New(ctx, key, db),
dal: db,
key: key,
deploymentLogsSink: newDeploymentLogsSink(ctx, db),
Expand All @@ -208,32 +188,31 @@ func New(ctx context.Context, db *dal.DAL, config Config, runnerScaling scaling.

cronSvc := cronjobs.New(ctx, key, svc.config.Advertise.Host, cronjobs.Config{Timeout: config.CronJobTimeout}, db, svc.tasks, svc.callWithRequest)
svc.cronJobs = cronSvc
svc.controllerListListeners = append(svc.controllerListListeners, svc.tasks, cronSvc)
svc.controllerListListeners = append(svc.controllerListListeners, cronSvc)

parallelJobs := []jobConfig{
{svc.syncRoutes, backoff.Backoff{Min: time.Second, Max: time.Second * 5}, optional.None[backoff.Backoff]()},
{svc.heartbeatController, backoff.Backoff{Min: time.Second * 3, Max: time.Second * 3}, optional.Some[backoff.Backoff](backoff.Backoff{Min: time.Second * 2, Max: time.Second * 2})},
{svc.updateControllersList, backoff.Backoff{Min: time.Second * 5, Max: time.Second * 5}, optional.None[backoff.Backoff]()},
// This should only run on one controller, but because dead controllers
// might be selected by the hash ring, we have to run it on all controllers.
// We should use a DB lock at some point.
{svc.reapStaleControllers, backoff.Backoff{Min: time.Second * 20, Max: time.Second * 20}, optional.None[backoff.Backoff]()},
// Use min, max backoff if we are running in production, otherwise use develBackoff if available.
maybeDevelBackoff := func(min, max time.Duration, develBackoff ...backoff.Backoff) backoff.Backoff {
if len(develBackoff) > 1 {
panic("too many devel backoffs")
}
if _, devel := runnerScaling.(*localscaling.LocalScaling); devel && len(develBackoff) == 1 {
return develBackoff[0]
}
return makeBackoff(min, max)
}

singletonJobs := []jobConfig{
{svc.reapStaleRunners, backoff.Backoff{Min: time.Second, Max: time.Second * 10}, optional.None[backoff.Backoff]()},
{svc.releaseExpiredReservations, backoff.Backoff{Min: time.Second, Max: time.Second * 20}, optional.None[backoff.Backoff]()},
{svc.reconcileDeployments, backoff.Backoff{Min: time.Second, Max: time.Second * 5}, optional.None[backoff.Backoff]()},
{svc.reconcileRunners, backoff.Backoff{Min: time.Second, Max: time.Second * 5}, optional.None[backoff.Backoff]()},
}
// Parallel tasks.
svc.tasks.Parallel(maybeDevelBackoff(time.Second, time.Second*5), svc.syncRoutes)
svc.tasks.Parallel(maybeDevelBackoff(time.Second*3, time.Second*5, makeBackoff(time.Second*2, time.Second*2)), svc.heartbeatController)
svc.tasks.Parallel(maybeDevelBackoff(time.Second*5, time.Second*5), svc.updateControllersList)

_, devel := runnerScaling.(*localscaling.LocalScaling)
for _, j := range parallelJobs {
svc.tasks.Parallel(j.getBackoff(devel), j.job)
}
for _, j := range singletonJobs {
svc.tasks.Singleton(j.getBackoff(devel), j.job)
}
// Singleton tasks use leases to only run on a single controller.
svc.tasks.Singleton(maybeDevelBackoff(time.Second*20, time.Second*20), svc.reapStaleControllers)
svc.tasks.Singleton(maybeDevelBackoff(time.Second, time.Second*10), svc.reapStaleRunners)
svc.tasks.Singleton(maybeDevelBackoff(time.Second, time.Second*20), svc.releaseExpiredReservations)
svc.tasks.Singleton(maybeDevelBackoff(time.Second, time.Second*5), svc.reconcileDeployments)
svc.tasks.Singleton(maybeDevelBackoff(time.Second, time.Second*5), svc.reconcileRunners)
svc.tasks.Singleton(maybeDevelBackoff(time.Second, time.Second*5), svc.expireStaleLeases)
return svc, nil
}

Expand Down Expand Up @@ -1006,6 +985,14 @@ func (s *Service) reconcileRunners(ctx context.Context) (time.Duration, error) {
return time.Second, nil
}

func (s *Service) expireStaleLeases(ctx context.Context) (time.Duration, error) {
err := s.dal.ExpireLeases(ctx)
if err != nil {
return 0, fmt.Errorf("failed to expire leases: %w", err)
}
return time.Second * 1, nil
}

func (s *Service) terminateRandomRunner(ctx context.Context, key model.DeploymentKey) (bool, error) {
runners, err := s.dal.GetRunnersForDeployment(ctx, key)
if err != nil {
Expand Down Expand Up @@ -1312,3 +1299,12 @@ func ingressPathString(path []*schemapb.IngressPathComponent) string {
}
return "/" + strings.Join(pathString, "/")
}

func makeBackoff(min, max time.Duration) backoff.Backoff {
return backoff.Backoff{
Min: min,
Max: max,
Jitter: true,
Factor: 2,
}
}
100 changes: 100 additions & 0 deletions backend/controller/dal/lease.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package dal

import (
"context"
"fmt"
"time"

"github.com/google/uuid"

"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/internal/log"
)

const leaseRenewalInterval = time.Second * 2

var _ leases.Leaser = (*DAL)(nil)

// Lease represents a lease that is held by a controller.
type Lease struct {
idempotencyKey uuid.UUID
context any
key leases.Key
db *sql.DB
ttl time.Duration
errch chan error
release chan bool
leak bool // For testing.
}

func (l *Lease) String() string {
return fmt.Sprintf("%s:%s", l.key, l.idempotencyKey)
}

// Periodically renew the lease until it is released.
func (l *Lease) renew(ctx context.Context) {
defer close(l.errch)
logger := log.FromContext(ctx).Scope("lease(" + l.key.String() + ")")
logger.Debugf("Acquired lease %s", l.key)
for {
select {
case <-time.After(leaseRenewalInterval):
logger.Tracef("Renewing lease %s", l.key)
ctx, cancel := context.WithTimeout(ctx, leaseRenewalInterval)
_, err := l.db.RenewLease(ctx, l.ttl, l.idempotencyKey, l.key)
cancel()

if err != nil {
logger.Errorf(err, "Failed to renew lease %s", l.key)
l.errch <- translatePGError(err)
return
}

case <-l.release:
if l.leak { // For testing.
return
}
logger.Debugf("Releasing lease %s", l.key)
_, err := l.db.ReleaseLease(ctx, l.idempotencyKey, l.key)
l.errch <- translatePGError(err)
return
}
}
}

func (l *Lease) Release() error {
close(l.release)
return <-l.errch
}

func (d *DAL) AcquireLease(ctx context.Context, key leases.Key, ttl time.Duration) (leases.Lease, error) {
if ttl < time.Second*5 {
return nil, fmt.Errorf("lease TTL must be at least 5 seconds")
}
idempotencyKey, err := d.db.NewLease(ctx, key, time.Now().Add(ttl))
if err != nil {
return nil, translatePGError(err)
}
lease := &Lease{
idempotencyKey: idempotencyKey,
context: nil,
key: key,
db: d.db,
ttl: ttl,
release: make(chan bool),
errch: make(chan error, 1),
}
go lease.renew(ctx)
return lease, nil
}

// ExpireLeases expires (deletes) all leases that have expired.
func (d *DAL) ExpireLeases(ctx context.Context) error {
count, err := d.db.ExpireLeases(ctx)
// TODO: Return and log the actual lease keys?
if count > 0 {
log.FromContext(ctx).Warnf("Expired %d leases", count)
}
return translatePGError(err)
}
94 changes: 94 additions & 0 deletions backend/controller/dal/lease_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package dal

import (
"context"
"errors"
"testing"
"time"

"github.com/alecthomas/assert/v2"
"github.com/google/uuid"

"github.com/TBD54566975/ftl/backend/controller/leases"
"github.com/TBD54566975/ftl/backend/controller/sql"
"github.com/TBD54566975/ftl/backend/controller/sql/sqltest"
"github.com/TBD54566975/ftl/internal/log"
)

func leaseExists(t *testing.T, conn sql.DBI, idempotencyKey uuid.UUID, key leases.Key) bool {
t.Helper()
var count int
err := translatePGError(conn.
QueryRow(context.Background(), "SELECT COUNT(*) FROM leases WHERE idempotency_key = $1 AND key = $2", idempotencyKey, key).
Scan(&count))
if errors.Is(err, ErrNotFound) {
return false
}
assert.NoError(t, err)
return count > 0
}

func TestLease(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)
dal, err := New(ctx, conn)
assert.NoError(t, err)

_, err = dal.AcquireLease(ctx, leases.SystemKey("test"), time.Second*1)
assert.Error(t, err)

leasei, err := dal.AcquireLease(ctx, leases.SystemKey("test"), time.Second*5)
assert.NoError(t, err)

lease := leasei.(*Lease) //nolint:forcetypeassert

// Try to acquire the same lease again, which should fail.
_, err = dal.AcquireLease(ctx, leases.SystemKey("test"), time.Second*5)
assert.IsError(t, err, ErrConflict)

time.Sleep(time.Second * 6)

assert.True(t, leaseExists(t, conn, lease.idempotencyKey, lease.key))

err = lease.Release()
assert.NoError(t, err)

assert.False(t, leaseExists(t, conn, lease.idempotencyKey, lease.key))
}

func TestExpireLeases(t *testing.T) {
ctx := log.ContextWithNewDefaultLogger(context.Background())
conn := sqltest.OpenForTesting(ctx, t)
dal, err := New(ctx, conn)
assert.NoError(t, err)

leasei, err := dal.AcquireLease(ctx, leases.SystemKey("test"), time.Second*5)
assert.NoError(t, err)

lease := leasei.(*Lease) //nolint:forcetypeassert

err = dal.ExpireLeases(ctx)
assert.NoError(t, err)

assert.True(t, leaseExists(t, conn, lease.idempotencyKey, lease.key))

// Pretend that the lease expired.
lease.leak = true
err = lease.Release()
assert.NoError(t, err)

assert.True(t, leaseExists(t, conn, lease.idempotencyKey, lease.key))

time.Sleep(time.Second * 6)

err = dal.ExpireLeases(ctx)
assert.NoError(t, err)

assert.False(t, leaseExists(t, conn, lease.idempotencyKey, lease.key))

leasei, err = dal.AcquireLease(ctx, leases.SystemKey("test"), time.Second*5)
assert.NoError(t, err)

err = leasei.Release()
assert.NoError(t, err)
}
40 changes: 40 additions & 0 deletions backend/controller/leases/fake_lease.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package leases

import (
"context"
"time"

"github.com/puzpuzpuz/xsync/v3"
)

func NewFakeLeaser() *FakeLeaser {
return &FakeLeaser{
leases: xsync.NewMapOf[string, struct{}](),
}
}

var _ Leaser = (*FakeLeaser)(nil)

// FakeLeaser is a fake implementation of the [Leaser] interface.
type FakeLeaser struct {
leases *xsync.MapOf[string, struct{}]
}

func (f *FakeLeaser) AcquireLease(ctx context.Context, key Key, ttl time.Duration) (Lease, error) {
if _, loaded := f.leases.LoadOrStore(key.String(), struct{}{}); loaded {
return nil, ErrConflict
}
return &FakeLease{leaser: f, key: key}, nil
}

type FakeLease struct {
leaser *FakeLeaser
key Key
}

func (f *FakeLease) Release() error {
f.leaser.leases.Delete(f.key.String())
return nil
}

func (f *FakeLease) String() string { return f.key.String() }
Loading

0 comments on commit 4edee31

Please sign in to comment.