From 60acb0ba9f33131b179e535dd89ca5b37e9c1bae Mon Sep 17 00:00:00 2001
From: Matt Toohey <mtoohey@block.xyz>
Date: Thu, 18 Jul 2024 11:14:44 +1000
Subject: [PATCH] fix: flaky asm tests fixed by wrapping async provider (#2102)

fixes #2092
- mock clock was causing the same race conditions as this issue:
https://github.com/TBD54566975/ftl/issues/1368#issuecomment-2087944187
- instead we are now wrapping the ASM provider with a manual sync
provider
- This wrapper allows us to trigger when syncs should happen in tests,
and block until sync completes
---
 common/configuration/asm_test.go          | 69 +++++++-----------
 common/configuration/cache.go             | 27 ++++---
 common/configuration/manager.go           |  9 +--
 common/configuration/manual_sync_utils.go | 86 +++++++++++++++++++++++
 4 files changed, 128 insertions(+), 63 deletions(-)
 create mode 100644 common/configuration/manual_sync_utils.go

diff --git a/common/configuration/asm_test.go b/common/configuration/asm_test.go
index 19ce5a9580..489b9b0f01 100644
--- a/common/configuration/asm_test.go
+++ b/common/configuration/asm_test.go
@@ -11,14 +11,12 @@ import (
 	"path"
 	"sort"
 	"testing"
-	"time"
 
 	"connectrpc.com/connect"
 	"github.com/TBD54566975/ftl/backend/controller/leases"
 	ftlv1 "github.com/TBD54566975/ftl/backend/protos/xyz/block/ftl/v1"
 	"github.com/TBD54566975/ftl/internal/log"
 	"github.com/TBD54566975/ftl/internal/slices"
-	"github.com/benbjohnson/clock"
 
 	"github.com/alecthomas/assert/v2"
 	"github.com/alecthomas/types/optional"
@@ -30,11 +28,9 @@ import (
 	"github.com/aws/aws-sdk-go-v2/service/secretsmanager/types"
 )
 
-func setUp(ctx context.Context, t *testing.T, router optional.Option[Router[Secrets]]) (*Manager[Secrets], *ASM, *asmLeader, *secretsmanager.Client, *clock.Mock, *leases.FakeLeaser) {
+func setUp(ctx context.Context, t *testing.T, router optional.Option[Router[Secrets]]) (*Manager[Secrets], *ASM, *asmLeader, *secretsmanager.Client, *ManualSyncProvider[Secrets], *leases.FakeLeaser) {
 	t.Helper()
 
-	mockClock := clock.NewMock()
-
 	if _, ok := router.Get(); !ok {
 		dir := t.TempDir()
 		projectPath := path.Join(dir, "ftl-project.toml")
@@ -51,14 +47,16 @@ func setUp(ctx context.Context, t *testing.T, router optional.Option[Router[Secr
 	})
 	leaser := leases.NewFakeLeaser()
 	asm := NewASM(ctx, externalClient, URL("http://localhost:1234"), leaser)
+	manualSyncProvider := NewManualSyncProvider[Secrets](asm)
 
-	sm := newForTesting(ctx, router.MustGet(), []Provider[Secrets]{asm}, mockClock)
+	sm, err := New(ctx, router.MustGet(), []Provider[Secrets]{manualSyncProvider})
+	assert.NoError(t, err)
 
 	leaderOrFollower, err := asm.coordinator.Get()
 	assert.NoError(t, err)
 	leader, ok := leaderOrFollower.(*asmLeader)
 	assert.True(t, ok, "expected test to get an asm leader not a follower")
-	return sm, asm, leader, externalClient, mockClock, leaser
+	return sm, asm, leader, externalClient, manualSyncProvider, leaser
 }
 
 func waitForUpdatesToProcess(c *cache[Secrets]) {
@@ -172,23 +170,17 @@ func TestASMPagination(t *testing.T) {
 // TestLeaderSync sets and gets values via the leader, as well as directly with ASM
 func TestLeaderSync(t *testing.T) {
 	ctx := log.ContextWithNewDefaultLogger(context.Background())
-	sm, _, _, externalClient, clock, _ := setUp(ctx, t, None[Router[Secrets]]())
-	testClientSync(ctx, t, sm, externalClient, true, func(percentage float64) {
-		clock.Add(time.Second * (time.Duration(asmLeaderSyncInterval.Seconds()*percentage) + 1.0))
-		if percentage == 1.0 {
-			time.Sleep(time.Second * 4)
-		}
-	})
+	sm, _, _, externalClient, manualSync, _ := setUp(ctx, t, None[Router[Secrets]]())
+	testClientSync(ctx, t, sm, externalClient, true, []*ManualSyncProvider[Secrets]{manualSync})
 }
 
 // TestFollowerSync tests setting and getting values from a follower to the leader to ASM, and vice versa
 func TestFollowerSync(t *testing.T) {
 	ctx := log.ContextWithNewDefaultLogger(context.Background())
-	leaderManager, _, _, externalClient, leaderClock, leaser := setUp(ctx, t, None[Router[Secrets]]())
+	leaderManager, _, _, externalClient, leaderManualSync, leaser := setUp(ctx, t, None[Router[Secrets]]())
 
 	// fakeRPCClient connects the follower to the leader
 	fakeRPCClient := &fakeAdminClient{sm: leaderManager}
-	followerClock := clock.NewMock()
 	follower := newASMFollower(fakeRPCClient, "fake")
 
 	followerASM := newASMForTesting(ctx, externalClient, URL("http://localhost:1235"), leaser, optional.Some[asmClient](follower))
@@ -197,22 +189,12 @@ func TestFollowerSync(t *testing.T) {
 	_, ok := asmClient.(*asmFollower)
 	assert.True(t, ok, "expected test to get an asm follower not a leader")
 
-	sm := newForTesting(ctx, leaderManager.router, []Provider[Secrets]{followerASM}, followerClock)
-	assert.NoError(t, err)
+	followerManualSync := NewManualSyncProvider(followerASM)
 
-	testClientSync(ctx, t, sm, externalClient, false, func(percentage float64) {
-		// sync leader
-		leaderClock.Add(time.Second * (time.Duration(asmLeaderSyncInterval.Seconds()*percentage) + 1.0))
-		if percentage == 1.0 {
-			time.Sleep(time.Second * 4)
-		}
+	sm, err := New(ctx, leaderManager.router, []Provider[Secrets]{followerManualSync})
+	assert.NoError(t, err)
 
-		// then sync follower
-		followerClock.Add(time.Second * (time.Duration(asmFollowerSyncInterval.Seconds()*percentage) + 1.0))
-		if percentage == 1.0 {
-			time.Sleep(time.Second * 4)
-		}
-	})
+	testClientSync(ctx, t, sm, externalClient, false, []*ManualSyncProvider[Secrets]{leaderManualSync, followerManualSync})
 }
 
 // testClientSync uses a Manager and a secretsmanager.Client to test setting and getting secrets
@@ -221,19 +203,14 @@ func testClientSync(ctx context.Context,
 	sm *Manager[Secrets],
 	externalClient *secretsmanager.Client,
 	isLeader bool,
-	progressByIntervalPercentage func(percentage float64)) {
+	manualSyncProviders []*ManualSyncProvider[Secrets]) {
 	t.Helper()
 
-	// advance clock to half way between syncs
-	progressByIntervalPercentage(0.5)
-
-	// wait for initial load
-	err := sm.cache.providers["asm"].waitForInitialSync()
-	assert.NoError(t, err)
+	waitForManualSync(t, manualSyncProviders)
 
 	// write a secret via asmClient
 	clientRef := Ref{Module: Some("sync"), Name: "set-by-client"}
-	err = sm.Set(ctx, "asm", clientRef, "client-first")
+	err := sm.Set(ctx, "asm", clientRef, "client-first")
 	assert.NoError(t, err)
 	waitForUpdatesToProcess(sm.cache)
 	value, err := sm.getData(ctx, clientRef)
@@ -270,8 +247,7 @@ func testClientSync(ctx context.Context,
 	assert.NoError(t, err, "failed to load secret via asm")
 	assert.Equal(t, value, jsonBytes(t, "sm-client-second"), "unexpected secret value")
 
-	// give client a change to sync
-	progressByIntervalPercentage(1.0)
+	waitForManualSync(t, manualSyncProviders)
 
 	// confirm that all secrets are up to date
 	list, err := sm.List(ctx)
@@ -316,8 +292,7 @@ func testClientSync(ctx context.Context,
 	})
 	assert.NoError(t, err)
 
-	// give client a change to sync
-	progressByIntervalPercentage(1.0)
+	waitForManualSync(t, manualSyncProviders)
 
 	_, err = sm.getData(ctx, smRef)
 	assert.Error(t, err, "expected to fail because secret was deleted")
@@ -325,6 +300,16 @@ func testClientSync(ctx context.Context,
 	assert.Error(t, err, "expected to fail because secret was deleted")
 }
 
+// waitForManualSync syncs each provider in order
+func waitForManualSync[R Role](t *testing.T, providers []*ManualSyncProvider[R]) {
+	t.Helper()
+
+	for _, provider := range providers {
+		err := provider.SyncAndWait()
+		assert.NoError(t, err)
+	}
+}
+
 func storeUnobfuscatedValueInASM(ctx context.Context, sm *Manager[Secrets], externalClient *secretsmanager.Client, ref Ref, value []byte, isNew bool) error {
 	obfuscator := Secrets{}.obfuscator()
 	obfuscatedValue, err := obfuscator.Obfuscate(value)
diff --git a/common/configuration/cache.go b/common/configuration/cache.go
index 8e8173f1df..8bfb5e6bf6 100644
--- a/common/configuration/cache.go
+++ b/common/configuration/cache.go
@@ -11,7 +11,6 @@ import (
 	"github.com/TBD54566975/ftl/internal/slices"
 	"github.com/alecthomas/types/optional"
 	"github.com/alecthomas/types/pubsub"
-	"github.com/benbjohnson/clock"
 	"github.com/puzpuzpuz/xsync/v3"
 )
 
@@ -48,7 +47,7 @@ type cache[R Role] struct {
 	topicWaitGroup *sync.WaitGroup
 }
 
-func newCache[R Role](ctx context.Context, providers []AsynchronousProvider[R], listProvider listProvider, clock clock.Clock) *cache[R] {
+func newCache[R Role](ctx context.Context, providers []AsynchronousProvider[R], listProvider listProvider) *cache[R] {
 	cacheProviders := make(map[string]*cacheProvider[R], len(providers))
 	for _, provider := range providers {
 		cacheProviders[provider.Key()] = &cacheProvider[R]{
@@ -64,7 +63,7 @@ func newCache[R Role](ctx context.Context, providers []AsynchronousProvider[R],
 		topic:          pubsub.New[updateCacheEvent](),
 		topicWaitGroup: &sync.WaitGroup{},
 	}
-	go cache.sync(ctx, clock)
+	go cache.sync(ctx)
 
 	return cache
 }
@@ -121,7 +120,7 @@ func (c *cache[R]) deletedValue(ref Ref, pkey string) {
 // Errors returned by a provider cause retries with exponential backoff.
 //
 // Events are processed when all providers are not being synced
-func (c *cache[R]) sync(ctx context.Context, clock clock.Clock) {
+func (c *cache[R]) sync(ctx context.Context) {
 	if len(c.providers) == 0 {
 		// nothing to sync
 		return
@@ -134,7 +133,7 @@ func (c *cache[R]) sync(ctx context.Context, clock clock.Clock) {
 	defer c.topic.Unsubscribe(events)
 
 	// start syncing immediately
-	next := clock.Now()
+	next := time.Now()
 
 	for {
 		select {
@@ -145,12 +144,12 @@ func (c *cache[R]) sync(ctx context.Context, clock clock.Clock) {
 			c.processEvent(e)
 
 		// Can not calculate next sync date for each provider as sync intervals can change (eg when follower becomes leader)
-		case <-clock.After(next.Sub(clock.Now())):
+		case <-time.After(time.Until(next)):
 			wg := &sync.WaitGroup{}
 
 			providersToSync := []*cacheProvider[R]{}
 			for _, cp := range c.providers {
-				if cp.needsSync(clock) {
+				if cp.needsSync() {
 					providersToSync = append(providersToSync, cp)
 				}
 			}
@@ -168,12 +167,12 @@ func (c *cache[R]) sync(ctx context.Context, clock clock.Clock) {
 				})
 				wg.Add(1)
 				go func(cp *cacheProvider[R]) {
-					cp.sync(ctx, entriesForProvider, clock)
+					cp.sync(ctx, entriesForProvider)
 					wg.Done()
 				}(cp)
 			}
 			wg.Wait()
-			next = clock.Now().Add(time.Second)
+			next = time.Now().Add(time.Second)
 		}
 	}
 }
@@ -211,22 +210,22 @@ func (c *cacheProvider[R]) waitForInitialSync() error {
 }
 
 // needsSync returns true if the provider needs to be synced.
-func (c *cacheProvider[R]) needsSync(clock clock.Clock) bool {
+func (c *cacheProvider[R]) needsSync() bool {
 	lastSyncAttempt, ok := c.lastSyncAttempt.Get()
 	if !ok {
 		return true
 	}
 	if currentBackoff, ok := c.currentBackoff.Get(); ok {
-		return clock.Now().After(lastSyncAttempt.Add(currentBackoff))
+		return time.Now().After(lastSyncAttempt.Add(currentBackoff))
 	}
-	return clock.Now().After(lastSyncAttempt.Add(c.provider.SyncInterval()))
+	return time.Now().After(lastSyncAttempt.Add(c.provider.SyncInterval()))
 }
 
 // sync executes sync on the provider and updates the cacheProvider sync state
-func (c *cacheProvider[R]) sync(ctx context.Context, entries []Entry, clock clock.Clock) {
+func (c *cacheProvider[R]) sync(ctx context.Context, entries []Entry) {
 	logger := log.FromContext(ctx)
 
-	c.lastSyncAttempt = optional.Some(clock.Now())
+	c.lastSyncAttempt = optional.Some(time.Now())
 	err := c.provider.Sync(ctx, entries, c.values)
 	if err != nil {
 		logger.Errorf(err, "Error syncing %s", c.provider.Key())
diff --git a/common/configuration/manager.go b/common/configuration/manager.go
index 30b099fa3f..e493fd1176 100644
--- a/common/configuration/manager.go
+++ b/common/configuration/manager.go
@@ -11,7 +11,6 @@ import (
 	"strings"
 
 	"github.com/alecthomas/types/optional"
-	"github.com/benbjohnson/clock"
 )
 
 // Role of [Manager], either Secrets or Configuration.
@@ -65,10 +64,6 @@ func NewDefaultConfigurationManagerFromConfig(ctx context.Context, config string
 
 // New configuration manager.
 func New[R Role](ctx context.Context, router Router[R], providers []Provider[R]) (*Manager[R], error) {
-	return newForTesting(ctx, router, providers, clock.New()), nil
-}
-
-func newForTesting[R Role](ctx context.Context, router Router[R], providers []Provider[R], clock clock.Clock) *Manager[R] {
 	m := &Manager[R]{
 		providers: map[string]Provider[R]{},
 	}
@@ -86,9 +81,9 @@ func newForTesting[R Role](ctx context.Context, router Router[R], providers []Pr
 			asyncProviders = append(asyncProviders, sp)
 		}
 	}
-	m.cache = newCache[R](ctx, asyncProviders, m, clock)
+	m.cache = newCache[R](ctx, asyncProviders, m)
 
-	return m
+	return m, nil
 }
 
 func ProviderKeyForAccessor(accessor *url.URL) string {
diff --git a/common/configuration/manual_sync_utils.go b/common/configuration/manual_sync_utils.go
new file mode 100644
index 0000000000..34acdce9a2
--- /dev/null
+++ b/common/configuration/manual_sync_utils.go
@@ -0,0 +1,86 @@
+//go:build !release
+
+package configuration
+
+import (
+	"context"
+	"net/url"
+	"time"
+
+	"github.com/alecthomas/atomic"
+
+	"github.com/alecthomas/types/optional"
+	"github.com/puzpuzpuz/xsync/v3"
+)
+
+type manualSyncBlock struct {
+	sync chan optional.Option[error]
+}
+
+// ManualSyncProvider prevents normal syncs by returning a very high sync interval
+// when syncAndWait() is called, it starts returning a 0 sync interval  and then then blocks until sync completes.
+// See why we didn't use mock clocks to schedule syncs here: https://github.com/TBD54566975/ftl/issues/2092
+type ManualSyncProvider[R Role] struct {
+	syncRequested atomic.Value[optional.Option[manualSyncBlock]]
+
+	provider AsynchronousProvider[R]
+}
+
+var _ AsynchronousProvider[Secrets] = &ManualSyncProvider[Secrets]{}
+
+func NewManualSyncProvider[R Role](provider AsynchronousProvider[R]) *ManualSyncProvider[R] {
+	return &ManualSyncProvider[R]{
+		provider: provider,
+	}
+}
+
+func (a *ManualSyncProvider[R]) SyncAndWait() error {
+	block := manualSyncBlock{
+		sync: make(chan optional.Option[error]),
+	}
+	a.syncRequested.Store(optional.Some(block))
+	err := <-block.sync
+	if err, hasErr := err.Get(); hasErr {
+		return err //nolint:wrapcheck
+	}
+	return nil
+}
+
+func (a *ManualSyncProvider[R]) Role() R {
+	return a.provider.Role()
+}
+
+func (a *ManualSyncProvider[R]) Key() string {
+	return a.provider.Key()
+}
+
+func (a *ManualSyncProvider[R]) Store(ctx context.Context, ref Ref, value []byte) (*url.URL, error) {
+	return a.provider.Store(ctx, ref, value) //nolint:wrapcheck
+}
+
+func (a *ManualSyncProvider[R]) Delete(ctx context.Context, ref Ref) error {
+	return a.provider.Delete(ctx, ref) //nolint:wrapcheck
+}
+
+func (a *ManualSyncProvider[R]) SyncInterval() time.Duration {
+	if _, ok := a.syncRequested.Load().Get(); ok {
+		// sync now
+		return 0
+	}
+	// prevent sync
+	return time.Hour * 24 * 365
+}
+
+func (a *ManualSyncProvider[R]) Sync(ctx context.Context, entries []Entry, values *xsync.MapOf[Ref, SyncedValue]) error {
+	err := a.provider.Sync(ctx, entries, values)
+
+	if block, ok := a.syncRequested.Load().Get(); ok {
+		a.syncRequested.Store(optional.None[manualSyncBlock]())
+		if err == nil {
+			block.sync <- optional.None[error]()
+		} else {
+			block.sync <- optional.Some(err)
+		}
+	}
+	return err //nolint:wrapcheck
+}