diff --git a/go-runtime/ftl/ftltest/ftltest.go b/go-runtime/ftl/ftltest/ftltest.go index 833314335..3b3fd94b1 100644 --- a/go-runtime/ftl/ftltest/ftltest.go +++ b/go-runtime/ftl/ftltest/ftltest.go @@ -53,7 +53,7 @@ func Context(options ...Option) context.Context { } builder := modulecontext.NewBuilder(name).AddConfigs(state.configs).AddSecrets(state.secrets).AddDatabases(state.databases) - builder = builder.UpdateForTesting(state.mockVerbs, state.allowDirectVerbBehavior) + builder = builder.UpdateForTesting(state.mockVerbs, state.allowDirectVerbBehavior, newFakeLeaseClient()) return builder.Build().ApplyToContext(ctx) } diff --git a/go-runtime/ftl/ftltest/leases.go b/go-runtime/ftl/ftltest/leases.go new file mode 100644 index 000000000..f4966ce4b --- /dev/null +++ b/go-runtime/ftl/ftltest/leases.go @@ -0,0 +1,77 @@ +package ftltest + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/TBD54566975/ftl/go-runtime/ftl" + "github.com/TBD54566975/ftl/internal/modulecontext" +) + +// fakeLeaseClient is a simple in-memory lease client for testing. +// +// It does not include any checks on module names, as it assumes that all leases are within the module being tested +type fakeLeaseClient struct { + lock sync.Mutex + deadlines map[string]time.Time +} + +var _ modulecontext.LeaseClient = &fakeLeaseClient{} + +func newFakeLeaseClient() *fakeLeaseClient { + return &fakeLeaseClient{ + deadlines: make(map[string]time.Time), + } +} + +func keyForKeys(keys []string) string { + return strings.Join(keys, "\n") +} + +func (c *fakeLeaseClient) Acquire(ctx context.Context, module string, key []string, ttl time.Duration) error { + c.lock.Lock() + defer c.lock.Unlock() + + k := keyForKeys(key) + if deadline, ok := c.deadlines[k]; ok { + if time.Now().Before(deadline) { + return ftl.ErrLeaseHeld + } + } + + c.deadlines[k] = time.Now().Add(ttl) + return nil +} + +func (c *fakeLeaseClient) Heartbeat(ctx context.Context, module string, key []string, ttl time.Duration) error { + c.lock.Lock() + defer c.lock.Unlock() + + k := keyForKeys(key) + if deadline, ok := c.deadlines[k]; ok { + if !time.Now().Before(deadline) { + return fmt.Errorf("could not heartbeat expired lease") + } + c.deadlines[k] = time.Now().Add(ttl) + return nil + } + return fmt.Errorf("could not heartbeat lease: no active lease found") +} + +func (c *fakeLeaseClient) Release(ctx context.Context, key []string) error { + c.lock.Lock() + defer c.lock.Unlock() + + k := keyForKeys(key) + if deadline, ok := c.deadlines[k]; ok { + if !time.Now().Before(deadline) { + return fmt.Errorf("could not release lease after timeout") + } + delete(c.deadlines, k) + return nil + } + return fmt.Errorf("could not release lease: no active lease found") +} diff --git a/go-runtime/ftl/ftltest/leases_test.go b/go-runtime/ftl/ftltest/leases_test.go new file mode 100644 index 000000000..5e1b711e4 --- /dev/null +++ b/go-runtime/ftl/ftltest/leases_test.go @@ -0,0 +1,63 @@ +package ftltest + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/TBD54566975/ftl/go-runtime/ftl" + "github.com/alecthomas/assert/v2" +) + +var ( + keys1 = []string{"one", "1"} + keys2 = []string{"two", "2"} + module = "test" +) + +func TestDoubleAcquireLease(t *testing.T) { + ctx := context.Background() + client := newFakeLeaseClient() + + // Acquire a lease, and immediately try to acquire it again. + err := client.Acquire(ctx, module, keys1, 1*time.Second) + assert.NoError(t, err) + err = client.Acquire(ctx, module, keys1, 1*time.Second) + assert.True(t, errors.Is(err, ftl.ErrLeaseHeld), "expected lease to already be held") +} + +func TestAcquireTwoDifferentLeases(t *testing.T) { + ctx := context.Background() + client := newFakeLeaseClient() + + err := client.Acquire(ctx, module, keys1, 1*time.Second) + assert.NoError(t, err) + err = client.Acquire(ctx, module, keys2, 1*time.Second) + assert.NoError(t, err) +} + +func TestExpiry(t *testing.T) { + ctx := context.Background() + client := newFakeLeaseClient() + + err := client.Acquire(ctx, module, keys1, 500*time.Millisecond) + assert.NoError(t, err) + time.Sleep(250 * time.Millisecond) + err = client.Heartbeat(ctx, module, keys1, 500*time.Millisecond) + assert.NoError(t, err) + time.Sleep(250 * time.Millisecond) + err = client.Heartbeat(ctx, module, keys1, 500*time.Millisecond) + assert.NoError(t, err) + + // wait longer than ttl + time.Sleep(1 * time.Second) + err = client.Heartbeat(ctx, module, keys1, 500*time.Millisecond) + assert.Error(t, err, "expected error for heartbeating expired lease") + err = client.Release(ctx, keys1) + assert.Error(t, err, "expected error for heartbeating expired lease") + + // try and acquire again + err = client.Acquire(ctx, module, keys1, 1*time.Second) + assert.NoError(t, err) +} diff --git a/go-runtime/ftl/leases.go b/go-runtime/ftl/leases.go index b698857ee..d0c7720f1 100644 --- a/go-runtime/ftl/leases.go +++ b/go-runtime/ftl/leases.go @@ -16,6 +16,7 @@ import ( "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1/ftlv1connect" "github.com/TBD54566975/ftl/go-runtime/ftl/reflection" "github.com/TBD54566975/ftl/internal/log" + "github.com/TBD54566975/ftl/internal/modulecontext" "github.com/TBD54566975/ftl/internal/rpc" ) @@ -24,7 +25,7 @@ import ( var ErrLeaseHeld = fmt.Errorf("lease already held") type LeaseHandle struct { - stream *connect.BidiStreamForClient[ftlv1.AcquireLeaseRequest, ftlv1.AcquireLeaseResponse] + client modulecontext.LeaseClient key []string errMu *sync.Mutex err error @@ -44,11 +45,9 @@ func (l LeaseHandle) Err() error { func (l LeaseHandle) Release() error { l.errMu.Lock() defer l.errMu.Unlock() - if err := l.stream.CloseRequest(); err != nil { - return fmt.Errorf("close lease: %w", err) - } - if err := l.stream.CloseResponse(); err != nil { - return fmt.Errorf("close lease: %w", err) + err := l.client.Release(context.Background(), l.key) + if err != nil { + return err } return l.err } @@ -65,44 +64,31 @@ func (l LeaseHandle) Release() error { // [lease]: https://hackmd.io/@ftl/Sym_GKEb0 func Lease(ctx context.Context, ttl time.Duration, key ...string) (LeaseHandle, error) { logger := log.FromContext(ctx).Scope("lease(" + strings.Join(key, "/")) - client := rpc.ClientFromContext[ftlv1connect.VerbServiceClient](ctx) - stream := client.AcquireLease(ctx) + client := newClient(ctx) module := reflection.Module() logger.Tracef("Acquiring lease") - req := &ftlv1.AcquireLeaseRequest{Key: key, Module: module, Ttl: durationpb.New(ttl)} - if err := stream.Send(req); err != nil { - if connect.CodeOf(err) == connect.CodeResourceExhausted { - return LeaseHandle{}, ErrLeaseHeld - } - logger.Warnf("Lease acquisition failed: %s", err) - return LeaseHandle{}, fmt.Errorf("lease acquisition failed: %w", err) - } - // Wait for response. - _, err := stream.Receive() + err := client.Acquire(ctx, module, key, ttl) if err != nil { - if connect.CodeOf(err) == connect.CodeResourceExhausted { + if errors.Is(err, ErrLeaseHeld) { return LeaseHandle{}, ErrLeaseHeld } - return LeaseHandle{}, fmt.Errorf("lease acquisition failed: %w", err) + logger.Warnf("Lease acquisition failed: %s", err) + return LeaseHandle{}, err } - lease := LeaseHandle{key: key, errMu: &sync.Mutex{}, stream: stream} + lease := LeaseHandle{key: key, errMu: &sync.Mutex{}, client: client} // Heartbeat the lease. go func() { for { logger.Tracef("Heartbeating lease") - req := &ftlv1.AcquireLeaseRequest{Key: key, Module: module, Ttl: durationpb.New(ttl)} - err := stream.Send(req) + err := client.Heartbeat(ctx, module, key, ttl) if err == nil { time.Sleep(ttl / 2) continue } - if errors.Is(err, io.EOF) { - err = nil - } else { - logger.Warnf("Lease heartbeat terminated: %s", err) - } + logger.Warnf("Lease heartbeat terminated: %s", err) + // Notify the handle. lease.errMu.Lock() lease.err = err @@ -112,3 +98,65 @@ func Lease(ctx context.Context, ttl time.Duration, key ...string) (LeaseHandle, }() return lease, nil } + +// newClient creates a new lease client +// +// It allows module context to override the client with a mock if appropriate +func newClient(ctx context.Context) modulecontext.LeaseClient { + moduleCtx := modulecontext.FromContext(ctx) + if mock, ok := moduleCtx.MockLeaseClient().Get(); ok { + return mock + } + return &leaseClient{} +} + +// leaseClient is a client that coordinates leases with the controller +// +// This is used in all non-unit tests environements +type leaseClient struct { + stream *connect.BidiStreamForClient[ftlv1.AcquireLeaseRequest, ftlv1.AcquireLeaseResponse] +} + +var _ modulecontext.LeaseClient = &leaseClient{} + +func (c *leaseClient) Acquire(ctx context.Context, module string, key []string, ttl time.Duration) error { + c.stream = rpc.ClientFromContext[ftlv1connect.VerbServiceClient](ctx).AcquireLease(ctx) + req := &ftlv1.AcquireLeaseRequest{Key: key, Module: module, Ttl: durationpb.New(ttl)} + if err := c.stream.Send(req); err != nil { + if connect.CodeOf(err) == connect.CodeResourceExhausted { + return ErrLeaseHeld + } + return fmt.Errorf("lease acquisition failed: %w", err) + } + // Wait for response. + _, err := c.stream.Receive() + if err == nil { + return nil + } + if connect.CodeOf(err) == connect.CodeResourceExhausted { + return ErrLeaseHeld + } + return fmt.Errorf("lease acquisition failed: %w", err) +} + +func (c *leaseClient) Heartbeat(ctx context.Context, module string, key []string, ttl time.Duration) error { + req := &ftlv1.AcquireLeaseRequest{Key: key, Module: module, Ttl: durationpb.New(ttl)} + err := c.stream.Send(req) + if err == nil { + return nil + } + if errors.Is(err, io.EOF) { + return nil + } + return err +} + +func (c *leaseClient) Release(ctx context.Context, key []string) error { + if err := c.stream.CloseRequest(); err != nil { + return fmt.Errorf("close lease: %w", err) + } + if err := c.stream.CloseResponse(); err != nil { + return fmt.Errorf("close lease: %w", err) + } + return nil +} diff --git a/integration/integration_test.go b/integration/integration_test.go index ab6eaa191..fed2c1e9f 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -274,16 +274,23 @@ func TestModuleUnitTests(t *testing.T) { func TestLease(t *testing.T) { run(t, "", copyModule("leases"), + build("leases"), + // checks if leases work in a unit test environment + testModule("leases"), deploy("leases"), + // checks if it leases work with a real controller func(t testing.TB, ic testContext) error { // Start a lease. wg := errgroup.Group{} wg.Go(func() error { infof("Acquiring lease") - _, err := ic.verbs.Call(ic, connect.NewRequest(&ftlv1.CallRequest{ + resp, err := ic.verbs.Call(ic, connect.NewRequest(&ftlv1.CallRequest{ Verb: &schemapb.Ref{Module: "leases", Name: "acquire"}, Body: []byte("{}"), })) + if respErr := resp.Msg.GetError(); respErr != nil { + return fmt.Errorf("received error on first call: %v", respErr) + } return err }) diff --git a/integration/testdata/go/leases/go.mod b/integration/testdata/go/leases/go.mod index 529b4e66e..329d677f6 100644 --- a/integration/testdata/go/leases/go.mod +++ b/integration/testdata/go/leases/go.mod @@ -10,7 +10,10 @@ require ( connectrpc.com/connect v1.16.1 // indirect connectrpc.com/grpcreflect v1.2.0 // indirect connectrpc.com/otelconnect v0.7.0 // indirect + github.com/BurntSushi/toml v1.3.2 // indirect + github.com/TBD54566975/scaffolder v0.8.0 // indirect github.com/alecthomas/concurrency v0.0.2 // indirect + github.com/alecthomas/kong v0.9.0 // indirect github.com/alecthomas/participle/v2 v2.1.1 // indirect github.com/alecthomas/types v0.15.0 // indirect github.com/alessio/shellescape v1.4.2 // indirect diff --git a/integration/testdata/go/leases/go.sum b/integration/testdata/go/leases/go.sum index a13ddbc3e..ceae861c7 100644 --- a/integration/testdata/go/leases/go.sum +++ b/integration/testdata/go/leases/go.sum @@ -4,10 +4,16 @@ connectrpc.com/grpcreflect v1.2.0 h1:Q6og1S7HinmtbEuBvARLNwYmTbhEGRpHDhqrPNlmK+U connectrpc.com/grpcreflect v1.2.0/go.mod h1:nwSOKmE8nU5u/CidgHtPYk1PFI3U9ignz7iDMxOYkSY= connectrpc.com/otelconnect v0.7.0 h1:ZH55ZZtcJOTKWWLy3qmL4Pam4RzRWBJFOqTPyAqCXkY= connectrpc.com/otelconnect v0.7.0/go.mod h1:Bt2ivBymHZHqxvo4HkJ0EwHuUzQN6k2l0oH+mp/8nwc= +github.com/BurntSushi/toml v1.3.2 h1:o7IhLm0Msx3BaB+n3Ag7L8EVlByGnpq14C4YWiu/gL8= +github.com/BurntSushi/toml v1.3.2/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/TBD54566975/scaffolder v0.8.0 h1:DWl1K3dWcLsOPAYGQGPQXtffrml6XCB0tF05JdpMqZU= +github.com/TBD54566975/scaffolder v0.8.0/go.mod h1:Ab/jbQ4q8EloYL0nbkdh2DVvkGc4nxr1OcIbdMpTxxg= github.com/alecthomas/assert/v2 v2.9.0 h1:ZcLG8ccMEtlMLkLW4gwGpBWBb0N8MUCmsy1lYBVd1xQ= github.com/alecthomas/assert/v2 v2.9.0/go.mod h1:Bze95FyfUr7x34QZrjL+XP+0qgp/zg8yS+TtBj1WA3k= github.com/alecthomas/concurrency v0.0.2 h1:Q3kGPtLbleMbH9lHX5OBFvJygfyFw29bXZKBg+IEVuo= github.com/alecthomas/concurrency v0.0.2/go.mod h1:GmuQb/iHX7mbNtPlC/WDzEFxDMB0HYFer2Qda9QTs7w= +github.com/alecthomas/kong v0.9.0 h1:G5diXxc85KvoV2f0ZRVuMsi45IrBgx9zDNGNj165aPA= +github.com/alecthomas/kong v0.9.0/go.mod h1:Y47y5gKfHp1hDc7CH7OeXgLIpp+Q2m1Ni0L5s3bI8Os= github.com/alecthomas/participle/v2 v2.1.1 h1:hrjKESvSqGHzRb4yW1ciisFJ4p3MGYih6icjJvbsmV8= github.com/alecthomas/participle/v2 v2.1.1/go.mod h1:Y1+hAs8DHPmc3YUFzqllV+eSQ9ljPTk0ZkPMtEdAx2c= github.com/alecthomas/repr v0.4.0 h1:GhI2A8MACjfegCPVq9f1FLvIBS+DrQ2KQBFZP1iFzXc= diff --git a/integration/testdata/go/leases/leases.go b/integration/testdata/go/leases/leases.go index d6b4903f9..b344233a4 100644 --- a/integration/testdata/go/leases/leases.go +++ b/integration/testdata/go/leases/leases.go @@ -8,6 +8,8 @@ import ( "github.com/TBD54566975/ftl/go-runtime/ftl" ) +// Acquire acquires a lease and waits 5s before releasing it. +// //ftl:verb func Acquire(ctx context.Context) error { logger := ftl.LoggerFromContext(ctx) diff --git a/integration/testdata/go/leases/leases_test.go b/integration/testdata/go/leases/leases_test.go new file mode 100644 index 000000000..6a92bc6cd --- /dev/null +++ b/integration/testdata/go/leases/leases_test.go @@ -0,0 +1,28 @@ +package leases + +import ( + "testing" + "time" + + "github.com/TBD54566975/ftl/go-runtime/ftl/ftltest" + "github.com/alecthomas/assert/v2" + "golang.org/x/sync/errgroup" +) + +func TestLease(t *testing.T) { + ctx := ftltest.Context( + ftltest.WithProjectFiles(), + ) + // test that we can acquire a lease in a test environment + wg := errgroup.Group{} + wg.Go(func() error { + return Acquire(ctx) + }) + + // test that we get an error acquiring another lease at the same time + time.Sleep(1 * time.Second) + err := Acquire(ctx) + assert.Error(t, err, "expected error for acquiring lease while another is held") + + assert.NoError(t, wg.Wait(), "expected no error acquiring the initial lease") +} diff --git a/internal/modulecontext/module_context.go b/internal/modulecontext/module_context.go index f1298855e..3e7cf7c4d 100644 --- a/internal/modulecontext/module_context.go +++ b/internal/modulecontext/module_context.go @@ -6,6 +6,7 @@ import ( "encoding/json" "fmt" "strings" + "time" "github.com/alecthomas/types/optional" _ "github.com/jackc/pgx/v5/stdlib" // SQL driver @@ -31,6 +32,7 @@ type ModuleContext struct { isTesting bool mockVerbs map[schema.RefKey]Verb allowDirectVerbBehavior bool + leaseClient optional.Option[LeaseClient] } // Builder is used to build a ModuleContext @@ -74,12 +76,13 @@ func (b *Builder) AddDatabases(databases map[string]Database) *Builder { } // UpdateForTesting marks the builder as part of a test environment and adds mock verbs and flags for other test features. -func (b *Builder) UpdateForTesting(mockVerbs map[schema.RefKey]Verb, allowDirectVerbBehavior bool) *Builder { +func (b *Builder) UpdateForTesting(mockVerbs map[schema.RefKey]Verb, allowDirectVerbBehavior bool, leaseClient LeaseClient) *Builder { b.isTesting = true for name, verb := range mockVerbs { b.mockVerbs[name] = verb } b.allowDirectVerbBehavior = allowDirectVerbBehavior + b.leaseClient = optional.Some[LeaseClient](leaseClient) return b } @@ -141,6 +144,19 @@ func (m ModuleContext) GetDatabase(name string, dbType DBType) (*sql.DB, error) return db.db, nil } +// LeaseClient is the interface for acquiring, heartbeating and releasing leases +type LeaseClient interface { + // Returns ResourceExhausted if the lease is held. + Acquire(ctx context.Context, module string, key []string, ttl time.Duration) error + Heartbeat(ctx context.Context, module string, key []string, ttl time.Duration) error + Release(ctx context.Context, key []string) error +} + +// MockLeaseClient provides a mock lease client when testing +func (m ModuleContext) MockLeaseClient() optional.Option[LeaseClient] { + return m.leaseClient +} + // BehaviorForVerb returns what to do to execute a verb // // This allows module context to dictate behavior based on testing options