Skip to content

Commit

Permalink
feat: allow leases in module tests (#1562)
Browse files Browse the repository at this point in the history
- split up the code in `ftl/leases.go` between:
- the business logic, which drives the acquisition and heartbeating of
the lease
- the communication code, now called `leaseClient`, which streams
requests and responses with the controller
- `ftl/leases.go` chooses which client it uses by asking the
ModuleContext for a mock lease client, and uses the standard one if no
mock is provided.
- `ftltest.MockLeaseClient` implements the `LeaseClient` protocol.
`ftltest` provides this mock to the `modulecontext` when building the
context

It's odd that the `LeaseClient` protocol is in `modulecontext`, LMK if
you have a better way of arranging things without making import
cycles...
  • Loading branch information
matt2e authored May 23, 2024
1 parent e30eae3 commit dc09486
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 31 deletions.
2 changes: 1 addition & 1 deletion go-runtime/ftl/ftltest/ftltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func Context(options ...Option) context.Context {
}

builder := modulecontext.NewBuilder(name).AddConfigs(state.configs).AddSecrets(state.secrets).AddDatabases(state.databases)
builder = builder.UpdateForTesting(state.mockVerbs, state.allowDirectVerbBehavior)
builder = builder.UpdateForTesting(state.mockVerbs, state.allowDirectVerbBehavior, newFakeLeaseClient())
return builder.Build().ApplyToContext(ctx)
}

Expand Down
77 changes: 77 additions & 0 deletions go-runtime/ftl/ftltest/leases.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package ftltest

import (
"context"
"fmt"
"strings"
"sync"
"time"

"github.com/TBD54566975/ftl/go-runtime/ftl"
"github.com/TBD54566975/ftl/internal/modulecontext"
)

// fakeLeaseClient is a simple in-memory lease client for testing.
//
// It does not include any checks on module names, as it assumes that all leases are within the module being tested
type fakeLeaseClient struct {
lock sync.Mutex
deadlines map[string]time.Time
}

var _ modulecontext.LeaseClient = &fakeLeaseClient{}

func newFakeLeaseClient() *fakeLeaseClient {
return &fakeLeaseClient{
deadlines: make(map[string]time.Time),
}
}

func keyForKeys(keys []string) string {
return strings.Join(keys, "\n")
}

func (c *fakeLeaseClient) Acquire(ctx context.Context, module string, key []string, ttl time.Duration) error {
c.lock.Lock()
defer c.lock.Unlock()

k := keyForKeys(key)
if deadline, ok := c.deadlines[k]; ok {
if time.Now().Before(deadline) {
return ftl.ErrLeaseHeld
}
}

c.deadlines[k] = time.Now().Add(ttl)
return nil
}

func (c *fakeLeaseClient) Heartbeat(ctx context.Context, module string, key []string, ttl time.Duration) error {
c.lock.Lock()
defer c.lock.Unlock()

k := keyForKeys(key)
if deadline, ok := c.deadlines[k]; ok {
if !time.Now().Before(deadline) {
return fmt.Errorf("could not heartbeat expired lease")
}
c.deadlines[k] = time.Now().Add(ttl)
return nil
}
return fmt.Errorf("could not heartbeat lease: no active lease found")
}

func (c *fakeLeaseClient) Release(ctx context.Context, key []string) error {
c.lock.Lock()
defer c.lock.Unlock()

k := keyForKeys(key)
if deadline, ok := c.deadlines[k]; ok {
if !time.Now().Before(deadline) {
return fmt.Errorf("could not release lease after timeout")
}
delete(c.deadlines, k)
return nil
}
return fmt.Errorf("could not release lease: no active lease found")
}
63 changes: 63 additions & 0 deletions go-runtime/ftl/ftltest/leases_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package ftltest

import (
"context"
"errors"
"testing"
"time"

"github.com/TBD54566975/ftl/go-runtime/ftl"
"github.com/alecthomas/assert/v2"
)

var (
keys1 = []string{"one", "1"}
keys2 = []string{"two", "2"}
module = "test"
)

func TestDoubleAcquireLease(t *testing.T) {
ctx := context.Background()
client := newFakeLeaseClient()

// Acquire a lease, and immediately try to acquire it again.
err := client.Acquire(ctx, module, keys1, 1*time.Second)
assert.NoError(t, err)
err = client.Acquire(ctx, module, keys1, 1*time.Second)
assert.True(t, errors.Is(err, ftl.ErrLeaseHeld), "expected lease to already be held")
}

func TestAcquireTwoDifferentLeases(t *testing.T) {
ctx := context.Background()
client := newFakeLeaseClient()

err := client.Acquire(ctx, module, keys1, 1*time.Second)
assert.NoError(t, err)
err = client.Acquire(ctx, module, keys2, 1*time.Second)
assert.NoError(t, err)
}

func TestExpiry(t *testing.T) {
ctx := context.Background()
client := newFakeLeaseClient()

err := client.Acquire(ctx, module, keys1, 500*time.Millisecond)
assert.NoError(t, err)
time.Sleep(250 * time.Millisecond)
err = client.Heartbeat(ctx, module, keys1, 500*time.Millisecond)
assert.NoError(t, err)
time.Sleep(250 * time.Millisecond)
err = client.Heartbeat(ctx, module, keys1, 500*time.Millisecond)
assert.NoError(t, err)

// wait longer than ttl
time.Sleep(1 * time.Second)
err = client.Heartbeat(ctx, module, keys1, 500*time.Millisecond)
assert.Error(t, err, "expected error for heartbeating expired lease")
err = client.Release(ctx, keys1)
assert.Error(t, err, "expected error for heartbeating expired lease")

// try and acquire again
err = client.Acquire(ctx, module, keys1, 1*time.Second)
assert.NoError(t, err)
}
104 changes: 76 additions & 28 deletions go-runtime/ftl/leases.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect"
"github.com/TBD54566975/ftl/go-runtime/ftl/reflection"
"github.com/TBD54566975/ftl/internal/log"
"github.com/TBD54566975/ftl/internal/modulecontext"
"github.com/TBD54566975/ftl/internal/rpc"
)

Expand All @@ -24,7 +25,7 @@ import (
var ErrLeaseHeld = fmt.Errorf("lease already held")

type LeaseHandle struct {
stream *connect.BidiStreamForClient[ftlv1.AcquireLeaseRequest, ftlv1.AcquireLeaseResponse]
client modulecontext.LeaseClient
key []string
errMu *sync.Mutex
err error
Expand All @@ -44,11 +45,9 @@ func (l LeaseHandle) Err() error {
func (l LeaseHandle) Release() error {
l.errMu.Lock()
defer l.errMu.Unlock()
if err := l.stream.CloseRequest(); err != nil {
return fmt.Errorf("close lease: %w", err)
}
if err := l.stream.CloseResponse(); err != nil {
return fmt.Errorf("close lease: %w", err)
err := l.client.Release(context.Background(), l.key)
if err != nil {
return err
}
return l.err
}
Expand All @@ -65,44 +64,31 @@ func (l LeaseHandle) Release() error {
// [lease]: https://hackmd.io/@ftl/Sym_GKEb0
func Lease(ctx context.Context, ttl time.Duration, key ...string) (LeaseHandle, error) {
logger := log.FromContext(ctx).Scope("lease(" + strings.Join(key, "/"))
client := rpc.ClientFromContext[ftlv1connect.VerbServiceClient](ctx)
stream := client.AcquireLease(ctx)
client := newClient(ctx)

module := reflection.Module()
logger.Tracef("Acquiring lease")
req := &ftlv1.AcquireLeaseRequest{Key: key, Module: module, Ttl: durationpb.New(ttl)}
if err := stream.Send(req); err != nil {
if connect.CodeOf(err) == connect.CodeResourceExhausted {
return LeaseHandle{}, ErrLeaseHeld
}
logger.Warnf("Lease acquisition failed: %s", err)
return LeaseHandle{}, fmt.Errorf("lease acquisition failed: %w", err)
}
// Wait for response.
_, err := stream.Receive()
err := client.Acquire(ctx, module, key, ttl)
if err != nil {
if connect.CodeOf(err) == connect.CodeResourceExhausted {
if errors.Is(err, ErrLeaseHeld) {
return LeaseHandle{}, ErrLeaseHeld
}
return LeaseHandle{}, fmt.Errorf("lease acquisition failed: %w", err)
logger.Warnf("Lease acquisition failed: %s", err)
return LeaseHandle{}, err
}

lease := LeaseHandle{key: key, errMu: &sync.Mutex{}, stream: stream}
lease := LeaseHandle{key: key, errMu: &sync.Mutex{}, client: client}
// Heartbeat the lease.
go func() {
for {
logger.Tracef("Heartbeating lease")
req := &ftlv1.AcquireLeaseRequest{Key: key, Module: module, Ttl: durationpb.New(ttl)}
err := stream.Send(req)
err := client.Heartbeat(ctx, module, key, ttl)
if err == nil {
time.Sleep(ttl / 2)
continue
}
if errors.Is(err, io.EOF) {
err = nil
} else {
logger.Warnf("Lease heartbeat terminated: %s", err)
}
logger.Warnf("Lease heartbeat terminated: %s", err)

// Notify the handle.
lease.errMu.Lock()
lease.err = err
Expand All @@ -112,3 +98,65 @@ func Lease(ctx context.Context, ttl time.Duration, key ...string) (LeaseHandle,
}()
return lease, nil
}

// newClient creates a new lease client
//
// It allows module context to override the client with a mock if appropriate
func newClient(ctx context.Context) modulecontext.LeaseClient {
moduleCtx := modulecontext.FromContext(ctx)
if mock, ok := moduleCtx.MockLeaseClient().Get(); ok {
return mock
}
return &leaseClient{}
}

// leaseClient is a client that coordinates leases with the controller
//
// This is used in all non-unit tests environements
type leaseClient struct {
stream *connect.BidiStreamForClient[ftlv1.AcquireLeaseRequest, ftlv1.AcquireLeaseResponse]
}

var _ modulecontext.LeaseClient = &leaseClient{}

func (c *leaseClient) Acquire(ctx context.Context, module string, key []string, ttl time.Duration) error {
c.stream = rpc.ClientFromContext[ftlv1connect.VerbServiceClient](ctx).AcquireLease(ctx)
req := &ftlv1.AcquireLeaseRequest{Key: key, Module: module, Ttl: durationpb.New(ttl)}
if err := c.stream.Send(req); err != nil {
if connect.CodeOf(err) == connect.CodeResourceExhausted {
return ErrLeaseHeld
}
return fmt.Errorf("lease acquisition failed: %w", err)
}
// Wait for response.
_, err := c.stream.Receive()
if err == nil {
return nil
}
if connect.CodeOf(err) == connect.CodeResourceExhausted {
return ErrLeaseHeld
}
return fmt.Errorf("lease acquisition failed: %w", err)
}

func (c *leaseClient) Heartbeat(ctx context.Context, module string, key []string, ttl time.Duration) error {
req := &ftlv1.AcquireLeaseRequest{Key: key, Module: module, Ttl: durationpb.New(ttl)}
err := c.stream.Send(req)
if err == nil {
return nil
}
if errors.Is(err, io.EOF) {
return nil
}
return err
}

func (c *leaseClient) Release(ctx context.Context, key []string) error {
if err := c.stream.CloseRequest(); err != nil {
return fmt.Errorf("close lease: %w", err)
}
if err := c.stream.CloseResponse(); err != nil {
return fmt.Errorf("close lease: %w", err)
}
return nil
}
9 changes: 8 additions & 1 deletion integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,16 +274,23 @@ func TestModuleUnitTests(t *testing.T) {
func TestLease(t *testing.T) {
run(t, "",
copyModule("leases"),
build("leases"),
// checks if leases work in a unit test environment
testModule("leases"),
deploy("leases"),
// checks if it leases work with a real controller
func(t testing.TB, ic testContext) error {
// Start a lease.
wg := errgroup.Group{}
wg.Go(func() error {
infof("Acquiring lease")
_, err := ic.verbs.Call(ic, connect.NewRequest(&ftlv1.CallRequest{
resp, err := ic.verbs.Call(ic, connect.NewRequest(&ftlv1.CallRequest{
Verb: &schemapb.Ref{Module: "leases", Name: "acquire"},
Body: []byte("{}"),
}))
if respErr := resp.Msg.GetError(); respErr != nil {
return fmt.Errorf("received error on first call: %v", respErr)
}
return err
})

Expand Down
3 changes: 3 additions & 0 deletions integration/testdata/go/leases/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@ require (
connectrpc.com/connect v1.16.1 // indirect
connectrpc.com/grpcreflect v1.2.0 // indirect
connectrpc.com/otelconnect v0.7.0 // indirect
github.com/BurntSushi/toml v1.3.2 // indirect
github.com/TBD54566975/scaffolder v0.8.0 // indirect
github.com/alecthomas/concurrency v0.0.2 // indirect
github.com/alecthomas/kong v0.9.0 // indirect
github.com/alecthomas/participle/v2 v2.1.1 // indirect
github.com/alecthomas/types v0.15.0 // indirect
github.com/alessio/shellescape v1.4.2 // indirect
Expand Down
6 changes: 6 additions & 0 deletions integration/testdata/go/leases/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions integration/testdata/go/leases/leases.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"github.com/TBD54566975/ftl/go-runtime/ftl"
)

// Acquire acquires a lease and waits 5s before releasing it.
//
//ftl:verb
func Acquire(ctx context.Context) error {
logger := ftl.LoggerFromContext(ctx)
Expand Down
28 changes: 28 additions & 0 deletions integration/testdata/go/leases/leases_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package leases

import (
"testing"
"time"

"github.com/TBD54566975/ftl/go-runtime/ftl/ftltest"
"github.com/alecthomas/assert/v2"
"golang.org/x/sync/errgroup"
)

func TestLease(t *testing.T) {
ctx := ftltest.Context(
ftltest.WithProjectFiles(),
)
// test that we can acquire a lease in a test environment
wg := errgroup.Group{}
wg.Go(func() error {
return Acquire(ctx)
})

// test that we get an error acquiring another lease at the same time
time.Sleep(1 * time.Second)
err := Acquire(ctx)
assert.Error(t, err, "expected error for acquiring lease while another is held")

assert.NoError(t, wg.Wait(), "expected no error acquiring the initial lease")
}
Loading

0 comments on commit dc09486

Please sign in to comment.