From 9f79b46e41bb7bca973b7a078d31622b8ea3fd05 Mon Sep 17 00:00:00 2001 From: George MacRorie Date: Thu, 14 Dec 2023 15:18:51 +0000 Subject: [PATCH 1/3] refactor(storage): introduce readonly only storage interface refactor(storage/fs): export SyncedStore ability to set snapshot refactor(storage/fs/local): embed synced store directly refactor(storage/fs/s3): embed synced store directly refactor(storage/fs/git): embed synced store directly refactor(storage/fs/oci): embed synced store directly refactor(cmd/grpc): update calls to declarative backend stores refactor(storage/fs): rename files from source to store refactor(storage/fs/git): simplify condition where reference is a revision refactor(storage/fs): remove store abstraction fix(storage/fs/oci): remove errant error check from store test refactor(storage/fs): rename SyncedStore to Store refactor(storage/fs): add SnapshotStore functional transaction interface --- internal/cmd/grpc.go | 155 +------- internal/storage/fs/git/source.go | 188 --------- internal/storage/fs/git/store.go | 201 ++++++++++ .../fs/git/{source_test.go => store_test.go} | 111 +++--- internal/storage/fs/local/source.go | 75 ---- internal/storage/fs/local/source_test.go | 67 ---- internal/storage/fs/local/store.go | 83 ++++ internal/storage/fs/local/store_test.go | 64 +++ internal/storage/fs/oci/source.go | 105 ----- internal/storage/fs/oci/store.go | 98 +++++ .../fs/oci/{source_test.go => store_test.go} | 77 ++-- internal/storage/fs/poll.go | 68 ++++ internal/storage/fs/s3/source.go | 124 ------ internal/storage/fs/s3/source_test.go | 124 ------ internal/storage/fs/s3/store.go | 135 +++++++ internal/storage/fs/s3/store_test.go | 122 ++++++ internal/storage/fs/snapshot.go | 163 ++------ internal/storage/fs/snapshot_test.go | 4 +- internal/storage/fs/store.go | 368 ++++++++++++++---- internal/storage/fs/store/store.go | 164 ++++++++ internal/storage/fs/store_test.go | 237 ++++++++--- internal/storage/fs/sync.go | 221 ----------- internal/storage/fs/sync_test.go | 228 ----------- internal/storage/storage.go | 57 ++- 24 files changed, 1576 insertions(+), 1663 deletions(-) delete mode 100644 internal/storage/fs/git/source.go create mode 100644 internal/storage/fs/git/store.go rename internal/storage/fs/git/{source_test.go => store_test.go} (65%) delete mode 100644 internal/storage/fs/local/source.go delete mode 100644 internal/storage/fs/local/source_test.go create mode 100644 internal/storage/fs/local/store.go create mode 100644 internal/storage/fs/local/store_test.go delete mode 100644 internal/storage/fs/oci/source.go create mode 100644 internal/storage/fs/oci/store.go rename internal/storage/fs/oci/{source_test.go => store_test.go} (70%) create mode 100644 internal/storage/fs/poll.go delete mode 100644 internal/storage/fs/s3/source.go delete mode 100644 internal/storage/fs/s3/source_test.go create mode 100644 internal/storage/fs/s3/store.go create mode 100644 internal/storage/fs/s3/store_test.go create mode 100644 internal/storage/fs/store/store.go delete mode 100644 internal/storage/fs/sync.go delete mode 100644 internal/storage/fs/sync_test.go diff --git a/internal/cmd/grpc.go b/internal/cmd/grpc.go index 2eece34831..265404c722 100644 --- a/internal/cmd/grpc.go +++ b/internal/cmd/grpc.go @@ -8,7 +8,6 @@ import ( "fmt" "net" "net/url" - "os" "strconv" "sync" "time" @@ -20,7 +19,6 @@ import ( "go.flipt.io/flipt/internal/config" "go.flipt.io/flipt/internal/containers" "go.flipt.io/flipt/internal/info" - "go.flipt.io/flipt/internal/oci" fliptserver "go.flipt.io/flipt/internal/server" "go.flipt.io/flipt/internal/server/audit" "go.flipt.io/flipt/internal/server/audit/logfile" @@ -33,8 +31,7 @@ import ( middlewaregrpc "go.flipt.io/flipt/internal/server/middleware/grpc" "go.flipt.io/flipt/internal/storage" storagecache "go.flipt.io/flipt/internal/storage/cache" - "go.flipt.io/flipt/internal/storage/fs" - storageoci "go.flipt.io/flipt/internal/storage/fs/oci" + fsstore "go.flipt.io/flipt/internal/storage/fs/store" fliptsql "go.flipt.io/flipt/internal/storage/sql" "go.flipt.io/flipt/internal/storage/sql/mysql" "go.flipt.io/flipt/internal/storage/sql/postgres" @@ -52,7 +49,6 @@ import ( semconv "go.opentelemetry.io/otel/semconv/v1.4.0" "go.uber.org/zap" "go.uber.org/zap/zapcore" - "golang.org/x/crypto/ssh" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -60,12 +56,6 @@ import ( "google.golang.org/grpc/reflection" "google.golang.org/grpc/status" - "github.com/go-git/go-git/v5/plumbing/transport/http" - gitssh "github.com/go-git/go-git/v5/plumbing/transport/ssh" - "go.flipt.io/flipt/internal/storage/fs/git" - "go.flipt.io/flipt/internal/storage/fs/local" - "go.flipt.io/flipt/internal/storage/fs/s3" - grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" grpc_ctxtags "github.com/grpc-ecosystem/go-grpc-middleware/tags" @@ -154,119 +144,12 @@ func NewGRPCServer( } logger.Debug("database driver configured", zap.Stringer("driver", driver)) - case config.GitStorageType: - opts := []containers.Option[git.Source]{ - git.WithRef(cfg.Storage.Git.Ref), - git.WithPollInterval(cfg.Storage.Git.PollInterval), - git.WithInsecureTLS(cfg.Storage.Git.InsecureSkipTLS), - } - - if cfg.Storage.Git.CaCertBytes != "" { - opts = append(opts, git.WithCABundle([]byte(cfg.Storage.Git.CaCertBytes))) - } else if cfg.Storage.Git.CaCertPath != "" { - if bytes, err := os.ReadFile(cfg.Storage.Git.CaCertPath); err == nil { - opts = append(opts, git.WithCABundle(bytes)) - } else { - return nil, err - } - } - - auth := cfg.Storage.Git.Authentication - switch { - case auth.BasicAuth != nil: - opts = append(opts, git.WithAuth(&http.BasicAuth{ - Username: auth.BasicAuth.Username, - Password: auth.BasicAuth.Password, - })) - case auth.TokenAuth != nil: - opts = append(opts, git.WithAuth(&http.TokenAuth{ - Token: auth.TokenAuth.AccessToken, - })) - case auth.SSHAuth != nil: - var method *gitssh.PublicKeys - if auth.SSHAuth.PrivateKeyBytes != "" { - method, err = gitssh.NewPublicKeys( - auth.SSHAuth.User, - []byte(auth.SSHAuth.PrivateKeyBytes), - auth.SSHAuth.Password, - ) - } else { - method, err = gitssh.NewPublicKeysFromFile( - auth.SSHAuth.User, - auth.SSHAuth.PrivateKeyPath, - auth.SSHAuth.Password, - ) - } - if err != nil { - return nil, err - } - - // we're protecting against this explicitly so we can disable - // the gosec linting rule - if auth.SSHAuth.InsecureIgnoreHostKey { - // nolint:gosec - method.HostKeyCallback = ssh.InsecureIgnoreHostKey() - } - - opts = append(opts, git.WithAuth(method)) - } - - source, err := git.NewSource(logger, cfg.Storage.Git.Repository, opts...) - if err != nil { - return nil, err - } - - store, err = fs.NewStore(logger, source) - if err != nil { - return nil, err - } - case config.LocalStorageType: - source, err := local.NewSource(logger, cfg.Storage.Local.Path) - if err != nil { - return nil, err - } - - store, err = fs.NewStore(logger, source) - if err != nil { - return nil, err - } - case config.ObjectStorageType: - store, err = NewObjectStore(cfg, logger) - if err != nil { - return nil, err - } - case config.OCIStorageType: - var opts []containers.Option[oci.StoreOptions] - if auth := cfg.Storage.OCI.Authentication; auth != nil { - opts = append(opts, oci.WithCredentials( - auth.Username, - auth.Password, - )) - } - - ocistore, err := oci.NewStore(logger, cfg.Storage.OCI.BundlesDirectory, opts...) - if err != nil { - return nil, err - } - - ref, err := oci.ParseReference(cfg.Storage.OCI.Repository) - if err != nil { - return nil, err - } - - source, err := storageoci.NewSource(logger, ocistore, ref, - storageoci.WithPollInterval(cfg.Storage.OCI.PollInterval), - ) - if err != nil { - return nil, err - } - - store, err = fs.NewStore(logger, source) + default: + // otherwise, attempt to configure a declarative backend store + store, err = fsstore.NewStore(ctx, logger, cfg) if err != nil { return nil, err } - default: - return nil, fmt.Errorf("unexpected storage type: %q", cfg.Storage.Type) } logger.Debug("store enabled", zap.Stringer("store", store)) @@ -507,36 +390,6 @@ func NewGRPCServer( return server, nil } -// NewObjectStore create a new storate.Store from the object config -func NewObjectStore(cfg *config.Config, logger *zap.Logger) (storage.Store, error) { - objectCfg := cfg.Storage.Object - var store storage.Store - // keep this as a case statement in anticipation of - // more object types in the future - // nolint:gocritic - switch objectCfg.Type { - case config.S3ObjectSubStorageType: - opts := []containers.Option[s3.Source]{ - s3.WithPollInterval(objectCfg.S3.PollInterval), - } - if objectCfg.S3.Endpoint != "" { - opts = append(opts, s3.WithEndpoint(objectCfg.S3.Endpoint)) - } - if objectCfg.S3.Region != "" { - opts = append(opts, s3.WithRegion(objectCfg.S3.Region)) - } - source, err := s3.NewSource(logger, objectCfg.S3.Bucket, opts...) - if err != nil { - return nil, err - } - store, err = fs.NewStore(logger, source) - if err != nil { - return nil, err - } - } - return store, nil -} - // Run begins serving gRPC requests. // This methods blocks until Shutdown is called. func (s *GRPCServer) Run() error { diff --git a/internal/storage/fs/git/source.go b/internal/storage/fs/git/source.go deleted file mode 100644 index 7fb52ed345..0000000000 --- a/internal/storage/fs/git/source.go +++ /dev/null @@ -1,188 +0,0 @@ -package git - -import ( - "context" - "errors" - "fmt" - "io/fs" - "time" - - "github.com/go-git/go-git/v5" - "github.com/go-git/go-git/v5/config" - "github.com/go-git/go-git/v5/plumbing" - "github.com/go-git/go-git/v5/plumbing/transport" - "github.com/go-git/go-git/v5/storage/memory" - "go.flipt.io/flipt/internal/containers" - "go.flipt.io/flipt/internal/gitfs" - storagefs "go.flipt.io/flipt/internal/storage/fs" - "go.uber.org/zap" -) - -// Source is an implementation of storage/fs.FSSource -// This implementation is backed by a Git repository and it tracks an upstream reference. -// When subscribing to this source, the upstream reference is tracked -// by polling the upstream on a configurable interval. -type Source struct { - logger *zap.Logger - repo *git.Repository - - url string - ref string - hash plumbing.Hash - interval time.Duration - auth transport.AuthMethod - caBundle []byte - insecureSkipTLS bool -} - -// WithRef configures the target reference to be used when fetching -// and building fs.FS implementations. -// If it is a valid hash, then the fixed SHA value is used. -// Otherwise, it is treated as a reference in the origin upstream. -func WithRef(ref string) containers.Option[Source] { - return func(s *Source) { - if plumbing.IsHash(ref) { - s.hash = plumbing.NewHash(ref) - return - } - - s.ref = ref - } -} - -// WithPollInterval configures the interval in which origin is polled to -// discover any updates to the target reference. -func WithPollInterval(tick time.Duration) containers.Option[Source] { - return func(s *Source) { - s.interval = tick - } -} - -// WithAuth returns an option which configures the auth method used -// by the provided source. -func WithAuth(auth transport.AuthMethod) containers.Option[Source] { - return func(s *Source) { - s.auth = auth - } -} - -// WithInsecureTLS returns an option which configures the insecure TLS -// setting for the provided source. -func WithInsecureTLS(insecureSkipTLS bool) containers.Option[Source] { - return func(s *Source) { - s.insecureSkipTLS = insecureSkipTLS - } -} - -// WithCABundle returns an option which configures the CA Bundle used for -// validating the TLS connection to the provided source. -func WithCABundle(caCertBytes []byte) containers.Option[Source] { - return func(s *Source) { - if caCertBytes != nil { - s.caBundle = caCertBytes - } - } -} - -// NewSource constructs and configures a Source. -// The source uses the connection and credential details provided to build -// fs.FS implementations around a target git repository. -func NewSource(logger *zap.Logger, url string, opts ...containers.Option[Source]) (_ *Source, err error) { - source := &Source{ - logger: logger.With(zap.String("repository", url)), - url: url, - ref: "main", - interval: 30 * time.Second, - } - containers.ApplyAll(source, opts...) - - field := zap.Stringer("ref", plumbing.NewBranchReferenceName(source.ref)) - if source.hash != plumbing.ZeroHash { - field = zap.Stringer("SHA", source.hash) - } - source.logger = source.logger.With(field) - - source.repo, err = git.Clone(memory.NewStorage(), nil, &git.CloneOptions{ - Auth: source.auth, - URL: source.url, - CABundle: source.caBundle, - InsecureSkipTLS: source.insecureSkipTLS, - }) - if err != nil { - return nil, err - } - - return source, nil -} - -// Get builds a new store snapshot based on the configure Git remote and reference. -func (s *Source) Get(context.Context) (_ *storagefs.StoreSnapshot, err error) { - var fs fs.FS - if s.hash != plumbing.ZeroHash { - fs, err = gitfs.NewFromRepoHash(s.logger, s.repo, s.hash) - } else { - fs, err = gitfs.NewFromRepo(s.logger, s.repo, gitfs.WithReference(plumbing.NewRemoteReferenceName("origin", s.ref))) - } - if err != nil { - return nil, err - } - - return storagefs.SnapshotFromFS(s.logger, fs) -} - -// Subscribe feeds gitfs implementations of fs.FS onto the provided channel. -// It blocks until the provided context is cancelled (it will be called in a goroutine). -// It closes the provided channel before it returns. -func (s *Source) Subscribe(ctx context.Context, ch chan<- *storagefs.StoreSnapshot) { - defer close(ch) - - // NOTE: theres is no point subscribing to updates for a git Hash - // as it is atomic and will never change. - if s.hash != plumbing.ZeroHash { - s.logger.Info("skipping subscribe as static SHA has been configured") - return - } - - ticker := time.NewTicker(s.interval) - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - s.logger.Debug("fetching from remote") - if err := s.repo.Fetch(&git.FetchOptions{ - Auth: s.auth, - RefSpecs: []config.RefSpec{ - config.RefSpec(fmt.Sprintf( - "+%s:%s", - plumbing.NewBranchReferenceName(s.ref), - plumbing.NewRemoteReferenceName("origin", s.ref), - )), - }, - }); err != nil { - if errors.Is(err, git.NoErrAlreadyUpToDate) { - s.logger.Debug("store already up to date") - continue - } - - s.logger.Error("failed fetching remote", zap.Error(err)) - continue - } - - snap, err := s.Get(ctx) - if err != nil { - s.logger.Error("failed creating snapshot from fs", zap.Error(err)) - continue - } - - ch <- snap - - s.logger.Debug("finished fetching from remote") - } - } -} - -// String returns an identifier string for the store type. -func (*Source) String() string { - return "git" -} diff --git a/internal/storage/fs/git/store.go b/internal/storage/fs/git/store.go new file mode 100644 index 0000000000..9b823ddddb --- /dev/null +++ b/internal/storage/fs/git/store.go @@ -0,0 +1,201 @@ +package git + +import ( + "context" + "errors" + "fmt" + "io/fs" + "sync" + + "github.com/go-git/go-git/v5" + "github.com/go-git/go-git/v5/config" + "github.com/go-git/go-git/v5/plumbing" + "github.com/go-git/go-git/v5/plumbing/transport" + "github.com/go-git/go-git/v5/storage/memory" + "go.flipt.io/flipt/internal/containers" + "go.flipt.io/flipt/internal/gitfs" + "go.flipt.io/flipt/internal/storage" + storagefs "go.flipt.io/flipt/internal/storage/fs" + "go.uber.org/zap" +) + +// ensure that the git *Store implements storage.Store +var _ storagefs.SnapshotStore = (*SnapshotStore)(nil) + +// SnapshotStore is an implementation of storage.SnapshotStore +// This implementation is backed by a Git repository and it tracks an upstream reference. +// When subscribing to this source, the upstream reference is tracked +// by polling the upstream on a configurable interval. +type SnapshotStore struct { + logger *zap.Logger + repo *git.Repository + + mu sync.RWMutex + snap storage.ReadOnlyStore + + url string + ref string + hash plumbing.Hash + auth transport.AuthMethod + caBundle []byte + insecureSkipTLS bool + pollOpts []containers.Option[storagefs.Poller] +} + +// WithRef configures the target reference to be used when fetching +// and building fs.FS implementations. +// If it is a valid hash, then the fixed SHA value is used. +// Otherwise, it is treated as a reference in the origin upstream. +func WithRef(ref string) containers.Option[SnapshotStore] { + return func(s *SnapshotStore) { + if plumbing.IsHash(ref) { + s.hash = plumbing.NewHash(ref) + return + } + + s.ref = ref + } +} + +// WithPollOptions configures the poller used to trigger update procedures +func WithPollOptions(opts ...containers.Option[storagefs.Poller]) containers.Option[SnapshotStore] { + return func(s *SnapshotStore) { + s.pollOpts = append(s.pollOpts, opts...) + } +} + +// WithAuth returns an option which configures the auth method used +// by the provided source. +func WithAuth(auth transport.AuthMethod) containers.Option[SnapshotStore] { + return func(s *SnapshotStore) { + s.auth = auth + } +} + +// WithInsecureTLS returns an option which configures the insecure TLS +// setting for the provided source. +func WithInsecureTLS(insecureSkipTLS bool) containers.Option[SnapshotStore] { + return func(s *SnapshotStore) { + s.insecureSkipTLS = insecureSkipTLS + } +} + +// WithCABundle returns an option which configures the CA Bundle used for +// validating the TLS connection to the provided source. +func WithCABundle(caCertBytes []byte) containers.Option[SnapshotStore] { + return func(s *SnapshotStore) { + if caCertBytes != nil { + s.caBundle = caCertBytes + } + } +} + +// NewSnapshotStore constructs and configures a Store. +// The store uses the connection and credential details provided to build +// fs.FS implementations around a target git repository. +func NewSnapshotStore(ctx context.Context, logger *zap.Logger, url string, opts ...containers.Option[SnapshotStore]) (_ *SnapshotStore, err error) { + store := &SnapshotStore{ + logger: logger.With(zap.String("repository", url)), + url: url, + ref: "main", + } + containers.ApplyAll(store, opts...) + + field := zap.Stringer("ref", plumbing.NewBranchReferenceName(store.ref)) + if store.hash != plumbing.ZeroHash { + field = zap.Stringer("SHA", store.hash) + } + store.logger = store.logger.With(field) + + store.repo, err = git.Clone(memory.NewStorage(), nil, &git.CloneOptions{ + Auth: store.auth, + URL: store.url, + CABundle: store.caBundle, + InsecureSkipTLS: store.insecureSkipTLS, + }) + if err != nil { + return nil, err + } + + // fetch snapshot at-least once before returning store + // to ensure we have some state to serve + if _, err := store.update(ctx); err != nil { + return nil, err + } + + // if the reference is a static hash then it is immutable + // if we have already fetched it once, there is not point updating again + if store.hash == plumbing.ZeroHash { + go storagefs. + NewPoller(store.logger, store.pollOpts...). + Poll(ctx, store.update) + } + + return store, nil +} + +// String returns an identifier string for the store type. +func (*SnapshotStore) String() string { + return "git" +} + +// View accepts a function which takes a *StoreSnapshot. +// The SnapshotStore will supply a snapshot which is valid +// for the lifetime of the provided function call. +func (s *SnapshotStore) View(fn func(storage.ReadOnlyStore) error) error { + s.mu.RLock() + defer s.mu.RUnlock() + return fn(s.snap) +} + +// update fetches from the remote and given that a the target reference +// HEAD updates to a new revision, it builds a snapshot and updates it +// on the store. +func (s *SnapshotStore) update(ctx context.Context) (bool, error) { + if err := s.repo.Fetch(&git.FetchOptions{ + Auth: s.auth, + RefSpecs: []config.RefSpec{ + config.RefSpec(fmt.Sprintf( + "+%s:%s", + plumbing.NewBranchReferenceName(s.ref), + plumbing.NewRemoteReferenceName("origin", s.ref), + )), + }, + }); err != nil { + if !errors.Is(err, git.NoErrAlreadyUpToDate) { + return false, err + } + + return false, nil + } + + if err := s.get(ctx); err != nil { + return false, err + } + + return true, nil +} + +// get builds a new store snapshot based on the configure Git remote and reference. +func (s *SnapshotStore) get(context.Context) (err error) { + var fs fs.FS + if s.hash != plumbing.ZeroHash { + fs, err = gitfs.NewFromRepoHash(s.logger, s.repo, s.hash) + } else { + fs, err = gitfs.NewFromRepo(s.logger, s.repo, gitfs.WithReference(plumbing.NewRemoteReferenceName("origin", s.ref))) + } + if err != nil { + return err + } + + snap, err := storagefs.SnapshotFromFS(s.logger, fs) + if err != nil { + return err + } + + s.mu.Lock() + s.snap = snap + s.mu.Unlock() + + return nil +} diff --git a/internal/storage/fs/git/source_test.go b/internal/storage/fs/git/store_test.go similarity index 65% rename from internal/storage/fs/git/source_test.go rename to internal/storage/fs/git/store_test.go index 89f66ded6f..ec3dfd63dd 100644 --- a/internal/storage/fs/git/source_test.go +++ b/internal/storage/fs/git/store_test.go @@ -9,74 +9,55 @@ import ( "testing" "time" - "github.com/go-git/go-git/v5/plumbing/transport" - "github.com/go-git/go-billy/v5/memfs" "github.com/go-git/go-git/v5" "github.com/go-git/go-git/v5/plumbing" "github.com/go-git/go-git/v5/plumbing/object" + "github.com/go-git/go-git/v5/plumbing/transport" "github.com/go-git/go-git/v5/plumbing/transport/http" "github.com/go-git/go-git/v5/storage/memory" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.flipt.io/flipt/internal/containers" - storagefs "go.flipt.io/flipt/internal/storage/fs" + "go.flipt.io/flipt/internal/storage" + "go.flipt.io/flipt/internal/storage/fs" "go.uber.org/zap/zaptest" ) var gitRepoURL = os.Getenv("TEST_GIT_REPO_URL") -func Test_SourceString(t *testing.T) { - require.Equal(t, "git", (&Source{}).String()) -} - -func Test_SourceGet(t *testing.T) { - source, skip := testSource(t) - if skip { - return - } - - snap, err := source.Get(context.Background()) - require.NoError(t, err) - - _, err = snap.GetNamespace(context.TODO(), "production") - require.NoError(t, err) +func Test_Store_String(t *testing.T) { + require.Equal(t, "git", (&SnapshotStore{}).String()) } -func Test_SourceSubscribe_Hash(t *testing.T) { +func Test_Store_Subscribe_Hash(t *testing.T) { head := os.Getenv("TEST_GIT_REPO_HEAD") if head == "" { t.Skip("Set non-empty TEST_GIT_REPO_HEAD env var to run this test.") return } - source, skip := testSource(t, WithRef(head)) - if skip { - return - } - - ch := make(chan *storagefs.StoreSnapshot) - source.Subscribe(context.Background(), ch) - - _, closed := <-ch - assert.False(t, closed, "expected channel to be closed") + // this helper will fail if there is a problem with this option + // the only difference in behaviour is that the poll loop + // will silently (intentionally) not run + testStore(t, WithRef(head)) } -func Test_SourceSubscribe(t *testing.T) { - source, skip := testSource(t) +func Test_Store_Subscribe(t *testing.T) { + ch := make(chan struct{}) + store, skip := testStore(t, WithPollOptions( + fs.WithInterval(time.Second), + fs.WithNotify(t, func(modified bool) { + if modified { + close(ch) + } + }), + )) if skip { return } ctx, cancel := context.WithCancel(context.Background()) - - // prime source - _, err := source.Get(context.Background()) - require.NoError(t, err) - - // start subscription - ch := make(chan *storagefs.StoreSnapshot) - go source.Subscribe(ctx, ch) + t.Cleanup(cancel) // pull repo workdir := memfs.New() @@ -121,10 +102,10 @@ flags: RemoteName: "origin", })) - // assert matching state - var snap *storagefs.StoreSnapshot + // wait until the snapshot is updated or + // we timeout select { - case snap = <-ch: + case <-ch: case <-time.After(time.Minute): t.Fatal("timed out waiting for snapshot") } @@ -133,31 +114,27 @@ flags: t.Log("received new snapshot") - _, err = snap.GetFlag(ctx, "production", "foo") - require.NoError(t, err) - - // ensure closed - cancel() - - _, open := <-ch - require.False(t, open, "expected channel to be closed after cancel") + require.NoError(t, store.View(func(s storage.ReadOnlyStore) error { + _, err = s.GetFlag(ctx, "production", "foo") + return err + })) } -func Test_SourceSelfSignedSkipTLS(t *testing.T) { +func Test_Store_SelfSignedSkipTLS(t *testing.T) { ts := httptest.NewTLSServer(nil) defer ts.Close() // This is not a valid Git source, but it still proves the point that a // well-known server with a self-signed certificate will be accepted by Flipt // when configuring the TLS options for the source gitRepoURL = ts.URL - _, err := testSourceWithError(t, WithInsecureTLS(false)) + _, err := testStoreWithError(t, WithInsecureTLS(false)) require.ErrorContains(t, err, "tls: failed to verify certificate: x509: certificate signed by unknown authority") - _, err = testSourceWithError(t, WithInsecureTLS(true)) + _, err = testStoreWithError(t, WithInsecureTLS(true)) // This time, we don't expect a tls validation error anymore require.ErrorIs(t, err, transport.ErrRepositoryNotFound) } -func Test_SourceSelfSignedCABytes(t *testing.T) { +func Test_Store_SelfSignedCABytes(t *testing.T) { ts := httptest.NewTLSServer(nil) defer ts.Close() var buf bytes.Buffer @@ -172,14 +149,14 @@ func Test_SourceSelfSignedCABytes(t *testing.T) { // well-known server with a self-signed certificate will be accepted by Flipt // when configuring the TLS options for the source gitRepoURL = ts.URL - _, err = testSourceWithError(t) + _, err = testStoreWithError(t) require.ErrorContains(t, err, "tls: failed to verify certificate: x509: certificate signed by unknown authority") - _, err = testSourceWithError(t, WithCABundle(buf.Bytes())) + _, err = testStoreWithError(t, WithCABundle(buf.Bytes())) // This time, we don't expect a tls validation error anymore require.ErrorIs(t, err, transport.ErrRepositoryNotFound) } -func testSource(t *testing.T, opts ...containers.Option[Source]) (*Source, bool) { +func testStore(t *testing.T, opts ...containers.Option[SnapshotStore]) (*SnapshotStore, bool) { t.Helper() if gitRepoURL == "" { @@ -187,10 +164,12 @@ func testSource(t *testing.T, opts ...containers.Option[Source]) (*Source, bool) return nil, true } - source, err := NewSource(zaptest.NewLogger(t), gitRepoURL, - append([]containers.Option[Source]{ + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + source, err := NewSnapshotStore(ctx, zaptest.NewLogger(t), gitRepoURL, + append([]containers.Option[SnapshotStore]{ WithRef("main"), - WithPollInterval(5 * time.Second), WithAuth(&http.BasicAuth{ Username: "root", Password: "password", @@ -203,13 +182,15 @@ func testSource(t *testing.T, opts ...containers.Option[Source]) (*Source, bool) return source, false } -func testSourceWithError(t *testing.T, opts ...containers.Option[Source]) (*Source, error) { +func testStoreWithError(t *testing.T, opts ...containers.Option[SnapshotStore]) (*SnapshotStore, error) { t.Helper() - source, err := NewSource(zaptest.NewLogger(t), gitRepoURL, - append([]containers.Option[Source]{ + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + source, err := NewSnapshotStore(ctx, zaptest.NewLogger(t), gitRepoURL, + append([]containers.Option[SnapshotStore]{ WithRef("main"), - WithPollInterval(5 * time.Second), WithAuth(&http.BasicAuth{ Username: "root", Password: "password", diff --git a/internal/storage/fs/local/source.go b/internal/storage/fs/local/source.go deleted file mode 100644 index 3e4aac6b78..0000000000 --- a/internal/storage/fs/local/source.go +++ /dev/null @@ -1,75 +0,0 @@ -package local - -import ( - "context" - "os" - "time" - - "go.flipt.io/flipt/internal/containers" - storagefs "go.flipt.io/flipt/internal/storage/fs" - "go.uber.org/zap" -) - -// Source represents an implementation of an fs.FSSource for local -// updates on a FS. -type Source struct { - logger *zap.Logger - - dir string - interval time.Duration -} - -// NewSource constructs a Source. -func NewSource(logger *zap.Logger, dir string, opts ...containers.Option[Source]) (*Source, error) { - s := &Source{ - logger: logger, - dir: dir, - interval: 10 * time.Second, - } - - containers.ApplyAll(s, opts...) - - return s, nil -} - -// WithPollInterval configures the interval in which we will restore -// the local fs. -func WithPollInterval(tick time.Duration) containers.Option[Source] { - return func(s *Source) { - s.interval = tick - } -} - -// Get returns an fs.FS for the local filesystem. -func (s *Source) Get(context.Context) (*storagefs.StoreSnapshot, error) { - return storagefs.SnapshotFromFS(s.logger, os.DirFS(s.dir)) -} - -// Subscribe feeds local fs.FS implementations onto the provided channel. -// It blocks until the provided context is cancelled. -func (s *Source) Subscribe(ctx context.Context, ch chan<- *storagefs.StoreSnapshot) { - defer close(ch) - - ticker := time.NewTicker(s.interval) - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - snap, err := s.Get(ctx) - if err != nil { - s.logger.Error("error getting file system from directory", zap.Error(err)) - continue - } - - s.logger.Debug("updating local store snapshot") - - ch <- snap - } - } -} - -// String returns an identifier string for the store type. -func (s *Source) String() string { - return "local" -} diff --git a/internal/storage/fs/local/source_test.go b/internal/storage/fs/local/source_test.go deleted file mode 100644 index 417b63b2e5..0000000000 --- a/internal/storage/fs/local/source_test.go +++ /dev/null @@ -1,67 +0,0 @@ -package local - -import ( - "context" - "os" - "path/filepath" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - storagefs "go.flipt.io/flipt/internal/storage/fs" - "go.uber.org/zap" -) - -func Test_SourceString(t *testing.T) { - assert.Equal(t, "local", (&Source{}).String()) -} - -func Test_SourceGet(t *testing.T) { - s, err := NewSource(zap.NewNop(), "testdata", WithPollInterval(5*time.Second)) - assert.NoError(t, err) - - snap, err := s.Get(context.Background()) - assert.NoError(t, err) - - _, err = snap.GetNamespace(context.TODO(), "production") - require.NoError(t, err) -} - -func Test_SourceSubscribe(t *testing.T) { - s, err := NewSource(zap.NewNop(), "testdata", WithPollInterval(5*time.Second)) - assert.NoError(t, err) - - dir, err := os.Getwd() - assert.NoError(t, err) - - ftc := filepath.Join(dir, "testdata", "a.features.yml") - - defer func() { - _, err := os.Stat(ftc) - if err == nil { - err := os.Remove(ftc) - assert.NoError(t, err) - } - }() - - ctx, cancel := context.WithCancel(context.Background()) - - ch := make(chan *storagefs.StoreSnapshot) - go s.Subscribe(ctx, ch) - - // change the filesystem contents - assert.NoError(t, os.WriteFile(ftc, []byte(`{"namespace":"staging"}`), os.ModePerm)) - - select { - case snap := <-ch: - _, err := snap.GetNamespace(ctx, "staging") - assert.NoError(t, err) - cancel() - - _, open := <-ch - assert.False(t, open, "expected channel to be closed after cancel") - case <-time.After(10 * time.Second): - t.Fatal("event not caught") - } -} diff --git a/internal/storage/fs/local/store.go b/internal/storage/fs/local/store.go new file mode 100644 index 0000000000..62efe2b1b4 --- /dev/null +++ b/internal/storage/fs/local/store.go @@ -0,0 +1,83 @@ +package local + +import ( + "context" + "os" + "sync" + + "go.flipt.io/flipt/internal/containers" + "go.flipt.io/flipt/internal/storage" + storagefs "go.flipt.io/flipt/internal/storage/fs" + "go.uber.org/zap" +) + +var _ storagefs.SnapshotStore = (*SnapshotStore)(nil) + +// SnapshotStore implements storagefs.SnapshotStore which +// is backed by the local filesystem through os.DirFS +type SnapshotStore struct { + logger *zap.Logger + dir string + + mu sync.RWMutex + snap storage.ReadOnlyStore + + pollOpts []containers.Option[storagefs.Poller] +} + +// NewSnapshotStore constructs a new SnapshotStore +func NewSnapshotStore(ctx context.Context, logger *zap.Logger, dir string, opts ...containers.Option[SnapshotStore]) (*SnapshotStore, error) { + s := &SnapshotStore{ + logger: logger, + dir: dir, + } + + containers.ApplyAll(s, opts...) + + // seed initial state an ensure we have state + // before returning + if _, err := s.update(ctx); err != nil { + return nil, err + } + + go storagefs. + NewPoller(logger, s.pollOpts...). + Poll(ctx, s.update) + + return s, nil +} + +// WithPollOptions configures poller options on the store. +func WithPollOptions(opts ...containers.Option[storagefs.Poller]) containers.Option[SnapshotStore] { + return func(s *SnapshotStore) { + s.pollOpts = append(s.pollOpts, opts...) + } +} + +// View passes the current snapshot to the provided function +// while holding a read lock. +func (s *SnapshotStore) View(fn func(storage.ReadOnlyStore) error) error { + s.mu.RLock() + defer s.mu.RUnlock() + return fn(s.snap) +} + +// update fetches a new snapshot from the local filesystem +// and updates the current served reference via a write lock +func (s *SnapshotStore) update(context.Context) (bool, error) { + snap, err := storagefs.SnapshotFromFS(s.logger, os.DirFS(s.dir)) + if err != nil { + return false, err + } + + s.mu.Lock() + s.snap = snap + s.mu.Unlock() + + return true, nil +} + +// String returns an identifier string for the store type. +func (s *SnapshotStore) String() string { + return "local" +} diff --git a/internal/storage/fs/local/store_test.go b/internal/storage/fs/local/store_test.go new file mode 100644 index 0000000000..c98de6c543 --- /dev/null +++ b/internal/storage/fs/local/store_test.go @@ -0,0 +1,64 @@ +package local + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "go.flipt.io/flipt/internal/storage" + storagefs "go.flipt.io/flipt/internal/storage/fs" + "go.uber.org/zap" +) + +func Test_Store_String(t *testing.T) { + assert.Equal(t, "local", (&SnapshotStore{}).String()) +} + +func Test_Store(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + var closed bool + ch := make(chan struct{}) + + s, err := NewSnapshotStore(ctx, zap.NewNop(), "testdata", WithPollOptions( + storagefs.WithInterval(1*time.Second), + storagefs.WithNotify(t, func(modified bool) { + if modified && !closed { + closed = true + close(ch) + } + }), + )) + assert.NoError(t, err) + + dir, err := os.Getwd() + assert.NoError(t, err) + + ftc := filepath.Join(dir, "testdata", "a.features.yml") + + defer func() { + _, err := os.Stat(ftc) + if err == nil { + err := os.Remove(ftc) + assert.NoError(t, err) + } + }() + + // change the filesystem contents + assert.NoError(t, os.WriteFile(ftc, []byte(`{"namespace":"staging"}`), os.ModePerm)) + + select { + case <-ch: + case <-time.After(10 * time.Second): + t.Fatal("event not caught") + } + + assert.NoError(t, s.View(func(s storage.ReadOnlyStore) error { + _, err = s.GetNamespace(ctx, "staging") + return err + })) +} diff --git a/internal/storage/fs/oci/source.go b/internal/storage/fs/oci/source.go deleted file mode 100644 index 0a8d6ee99d..0000000000 --- a/internal/storage/fs/oci/source.go +++ /dev/null @@ -1,105 +0,0 @@ -package oci - -import ( - "context" - "time" - - "github.com/opencontainers/go-digest" - "go.flipt.io/flipt/internal/containers" - "go.flipt.io/flipt/internal/oci" - storagefs "go.flipt.io/flipt/internal/storage/fs" - "go.uber.org/zap" -) - -// Source is an implementation fs.SnapshotSource backed by OCI repositories -// It fetches instances of OCI manifests and uses them to build snapshots from their contents -type Source struct { - logger *zap.Logger - interval time.Duration - - store *oci.Store - ref oci.Reference - - curSnap *storagefs.StoreSnapshot - curDigest digest.Digest -} - -// NewSource constructs and configures a Source. -// The source uses the connection and credential details provided to build -// *storagefs.StoreSnapshot implementations around a target git repository. -func NewSource(logger *zap.Logger, store *oci.Store, ref oci.Reference, opts ...containers.Option[Source]) (_ *Source, err error) { - src := &Source{ - logger: logger, - interval: 30 * time.Second, - store: store, - ref: ref, - } - containers.ApplyAll(src, opts...) - - return src, nil -} - -// WithPollInterval configures the interval in which origin is polled to -// discover any updates to the target reference. -func WithPollInterval(tick time.Duration) containers.Option[Source] { - return func(s *Source) { - s.interval = tick - } -} - -func (s *Source) String() string { - return "oci" -} - -// Get builds a single instance of an *storagefs.StoreSnapshot -func (s *Source) Get(context.Context) (*storagefs.StoreSnapshot, error) { - resp, err := s.store.Fetch(context.Background(), s.ref, oci.IfNoMatch(s.curDigest)) - if err != nil { - return nil, err - } - - if resp.Matched { - return s.curSnap, nil - } - - if s.curSnap, err = storagefs.SnapshotFromFiles(s.logger, resp.Files...); err != nil { - return nil, err - } - - s.curDigest = resp.Digest - - return s.curSnap, nil -} - -// Subscribe feeds implementations of *storagefs.StoreSnapshot onto the provided channel. -// It should block until the provided context is cancelled (it will be called in a goroutine). -// It should close the provided channel before it returns. -func (s *Source) Subscribe(ctx context.Context, ch chan<- *storagefs.StoreSnapshot) { - defer close(ch) - - ticker := time.NewTicker(s.interval) - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - current := s.curDigest - s.logger.Debug("fetching new snapshot", zap.String("current", current.Hex())) - - snap, err := s.Get(ctx) - if err != nil { - s.logger.Error("failed resolving upstream", zap.Error(err)) - continue - } - - if current == s.curDigest { - s.logger.Debug("snapshot already up to date") - continue - } - - ch <- snap - - s.logger.Debug("fetched new reference from remote") - } - } -} diff --git a/internal/storage/fs/oci/store.go b/internal/storage/fs/oci/store.go new file mode 100644 index 0000000000..39d17a58a8 --- /dev/null +++ b/internal/storage/fs/oci/store.go @@ -0,0 +1,98 @@ +package oci + +import ( + "context" + "sync" + + "github.com/opencontainers/go-digest" + "go.flipt.io/flipt/internal/containers" + "go.flipt.io/flipt/internal/oci" + "go.flipt.io/flipt/internal/storage" + storagefs "go.flipt.io/flipt/internal/storage/fs" + "go.uber.org/zap" +) + +var _ storagefs.SnapshotStore = (*SnapshotStore)(nil) + +// SnapshotStore is an implementation storage.SnapshotStore backed by OCI repositories. +// It fetches instances of OCI manifests and uses them to build snapshots from their contents. +type SnapshotStore struct { + logger *zap.Logger + + store *oci.Store + ref oci.Reference + + mu sync.RWMutex + snap storage.ReadOnlyStore + lastDigest digest.Digest + + pollOpts []containers.Option[storagefs.Poller] +} + +// View accepts a function which takes a *StoreSnapshot. +// The SnapshotStore will supply a snapshot which is valid +// for the lifetime of the provided function call. +func (s *SnapshotStore) View(fn func(storage.ReadOnlyStore) error) error { + s.mu.RLock() + defer s.mu.RUnlock() + return fn(s.snap) +} + +// NewSnapshotStore constructs and configures a Store. +// The store uses the connection and credential details provided to build +// *storagefs.StoreSnapshot implementations around a target OCI repository. +func NewSnapshotStore(ctx context.Context, logger *zap.Logger, store *oci.Store, ref oci.Reference, opts ...containers.Option[SnapshotStore]) (_ *SnapshotStore, err error) { + s := &SnapshotStore{ + logger: logger, + store: store, + ref: ref, + } + containers.ApplyAll(s, opts...) + + if _, err := s.update(ctx); err != nil { + return nil, err + } + + go storagefs.NewPoller(logger, s.pollOpts...).Poll(ctx, s.update) + + return s, nil +} + +// WithPollOptions configures the options used periodically invoke the update procedure +func WithPollOptions(opts ...containers.Option[storagefs.Poller]) containers.Option[SnapshotStore] { + return func(s *SnapshotStore) { + s.pollOpts = append(s.pollOpts, opts...) + } +} + +func (s *SnapshotStore) String() string { + return "oci" +} + +// update attempts to fetch the latest state for the target OCi repository and tag. +// If the state has not change sinced the last observed image digest it skips +// updating the snapshot and returns false (not modified). +func (s *SnapshotStore) update(ctx context.Context) (bool, error) { + resp, err := s.store.Fetch(context.Background(), s.ref, oci.IfNoMatch(s.lastDigest)) + if err != nil { + return false, err + } + + // return not modified as the last observed digest matched + // the remote digest + if resp.Matched { + return false, nil + } + + snap, err := storagefs.SnapshotFromFiles(s.logger, resp.Files...) + if err != nil { + return false, err + } + + s.mu.Lock() + s.lastDigest = resp.Digest + s.snap = snap + s.mu.Unlock() + + return true, nil +} diff --git a/internal/storage/fs/oci/source_test.go b/internal/storage/fs/oci/store_test.go similarity index 70% rename from internal/storage/fs/oci/source_test.go rename to internal/storage/fs/oci/store_test.go index ade6770bff..03810a03fe 100644 --- a/internal/storage/fs/oci/source_test.go +++ b/internal/storage/fs/oci/store_test.go @@ -10,41 +10,42 @@ import ( "github.com/opencontainers/go-digest" v1 "github.com/opencontainers/image-spec/specs-go/v1" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.flipt.io/flipt/internal/containers" fliptoci "go.flipt.io/flipt/internal/oci" - storagefs "go.flipt.io/flipt/internal/storage/fs" + "go.flipt.io/flipt/internal/storage" + "go.flipt.io/flipt/internal/storage/fs" "go.uber.org/zap/zaptest" "oras.land/oras-go/v2" "oras.land/oras-go/v2/content/oci" ) func Test_SourceString(t *testing.T) { - require.Equal(t, "oci", (&Source{}).String()) -} - -func Test_SourceGet(t *testing.T) { - source, _ := testSource(t) - - snap, err := source.Get(context.Background()) - require.NoError(t, err) - - _, err = snap.GetNamespace(context.TODO(), "production") - require.NoError(t, err) + require.Equal(t, "oci", (&SnapshotStore{}).String()) } func Test_SourceSubscribe(t *testing.T) { - source, target := testSource(t) + ch := make(chan struct{}) + store, target := testStore(t, WithPollOptions( + fs.WithInterval(time.Second), + fs.WithNotify(t, func(modified bool) { + if modified { + close(ch) + } + }), + )) - ctx, cancel := context.WithCancel(context.Background()) + ctx := context.Background() - // prime source - _, err := source.Get(context.Background()) - require.NoError(t, err) + require.NoError(t, store.View(func(s storage.ReadOnlyStore) error { + _, err := s.GetNamespace(ctx, "production") + require.NoError(t, err) - // start subscription - ch := make(chan *storagefs.StoreSnapshot) - go source.Subscribe(ctx, ch) + _, err = s.GetFlag(ctx, "production", "foo") + require.Error(t, err, "should error as flag should not exist yet") + + return nil + })) updateRepoContents(t, target, layer( @@ -57,34 +58,22 @@ func Test_SourceSubscribe(t *testing.T) { t.Log("waiting for new snapshot") // assert matching state - var snap *storagefs.StoreSnapshot select { - case snap = <-ch: + case <-ch: case <-time.After(time.Minute): t.Fatal("timed out waiting for snapshot") } - require.NoError(t, err) - t.Log("received new snapshot") - _, err = snap.GetFlag(ctx, "production", "foo") - require.NoError(t, err) - - // ensure closed - cancel() - - _, open := <-ch - require.False(t, open, "expected channel to be closed after cancel") - - // fetch again and expected to get the same snapshot - found, err := source.Get(context.Background()) - require.NoError(t, err) - - assert.Equal(t, snap, found) + require.NoError(t, store.View(func(s storage.ReadOnlyStore) error { + _, err := s.GetFlag(ctx, "production", "foo") + require.NoError(t, err) + return nil + })) } -func testSource(t *testing.T) (*Source, oras.Target) { +func testStore(t *testing.T, opts ...containers.Option[SnapshotStore]) (*SnapshotStore, oras.Target) { t.Helper() target, dir, repo := testRepository(t, @@ -97,10 +86,14 @@ func testSource(t *testing.T) (*Source, oras.Target) { ref, err := fliptoci.ParseReference(fmt.Sprintf("flipt://local/%s:latest", repo)) require.NoError(t, err) - source, err := NewSource(zaptest.NewLogger(t), + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + source, err := NewSnapshotStore(ctx, + zaptest.NewLogger(t), store, ref, - WithPollInterval(time.Second)) + opts...) require.NoError(t, err) return source, target diff --git a/internal/storage/fs/poll.go b/internal/storage/fs/poll.go new file mode 100644 index 0000000000..12c9147df6 --- /dev/null +++ b/internal/storage/fs/poll.go @@ -0,0 +1,68 @@ +package fs + +import ( + "context" + "testing" + "time" + + "go.flipt.io/flipt/internal/containers" + "go.uber.org/zap" +) + +type Poller struct { + logger *zap.Logger + + interval time.Duration + notify func(modified bool) +} + +func WithInterval(interval time.Duration) containers.Option[Poller] { + return func(p *Poller) { + p.interval = interval + } +} + +func WithNotify(t *testing.T, n func(modified bool)) containers.Option[Poller] { + t.Helper() + return func(p *Poller) { + p.notify = n + } +} + +func NewPoller(logger *zap.Logger, opts ...containers.Option[Poller]) *Poller { + p := &Poller{ + logger: logger, + interval: 30 * time.Second, + } + containers.ApplyAll(p, opts...) + return p +} + +// Poll is a utility function for a common polling strategy used by lots of declarative +// store implementations. +func (p *Poller) Poll(ctx context.Context, update func(context.Context) (bool, error)) { + ticker := time.NewTicker(p.interval) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + modified, err := update(ctx) + if err != nil { + p.logger.Error("error getting file system from directory", zap.Error(err)) + continue + } + + if p.notify != nil { + p.notify(modified) + } + + if !modified { + p.logger.Debug("skipping snapshot update as it has not been modified") + continue + } + + p.logger.Debug("snapshot updated") + } + } +} diff --git a/internal/storage/fs/s3/source.go b/internal/storage/fs/s3/source.go deleted file mode 100644 index d7d7cdb01f..0000000000 --- a/internal/storage/fs/s3/source.go +++ /dev/null @@ -1,124 +0,0 @@ -package s3 - -import ( - "context" - "time" - - "github.com/aws/aws-sdk-go-v2/config" - "github.com/aws/aws-sdk-go-v2/service/s3" - - "go.flipt.io/flipt/internal/containers" - "go.flipt.io/flipt/internal/s3fs" - storagefs "go.flipt.io/flipt/internal/storage/fs" - "go.uber.org/zap" -) - -// Source represents an implementation of an fs.SnapshotSource -// This implementation is backed by an S3 bucket -type Source struct { - logger *zap.Logger - s3 *s3.Client - - endpoint string - region string - bucket string - prefix string - interval time.Duration -} - -// NewSource constructs a Source. -func NewSource(logger *zap.Logger, bucket string, opts ...containers.Option[Source]) (*Source, error) { - s := &Source{ - logger: logger, - bucket: bucket, - interval: 60 * time.Second, - } - - containers.ApplyAll(s, opts...) - - cfg, err := config.LoadDefaultConfig(context.Background(), - config.WithRegion(s.region)) - if err != nil { - return nil, err - } - - var s3Opts []func(*s3.Options) - if s.endpoint != "" { - s3Opts = append(s3Opts, func(o *s3.Options) { - o.BaseEndpoint = &s.endpoint - o.UsePathStyle = true - o.Region = s.region - }) - } - s.s3 = s3.NewFromConfig(cfg, s3Opts...) - - return s, nil -} - -// WithPrefix configures the prefix for s3 -func WithPrefix(prefix string) containers.Option[Source] { - return func(s *Source) { - s.prefix = prefix - } -} - -// WithRegion configures the region for s3 -func WithRegion(region string) containers.Option[Source] { - return func(s *Source) { - s.region = region - } -} - -// WithEndpoint configures the region for s3 -func WithEndpoint(endpoint string) containers.Option[Source] { - return func(s *Source) { - s.endpoint = endpoint - } -} - -// WithPollInterval configures the interval in which we will restore -// the s3 fs. -func WithPollInterval(tick time.Duration) containers.Option[Source] { - return func(s *Source) { - s.interval = tick - } -} - -// Get returns a *sourcefs.StoreSnapshot for the local filesystem. -func (s *Source) Get(context.Context) (*storagefs.StoreSnapshot, error) { - fs, err := s3fs.New(s.logger, s.s3, s.bucket, s.prefix) - if err != nil { - return nil, err - } - - return storagefs.SnapshotFromFS(s.logger, fs) -} - -// Subscribe feeds S3 populated *StoreSnapshot instances onto the provided channel. -// It blocks until the provided context is cancelled. -func (s *Source) Subscribe(ctx context.Context, ch chan<- *storagefs.StoreSnapshot) { - defer close(ch) - - ticker := time.NewTicker(s.interval) - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - snap, err := s.Get(ctx) - if err != nil { - s.logger.Error("error getting file system from directory", zap.Error(err)) - continue - } - - s.logger.Debug("updating local store snapshot") - - ch <- snap - } - } -} - -// String returns an identifier string for the store type. -func (s *Source) String() string { - return "s3" -} diff --git a/internal/storage/fs/s3/source_test.go b/internal/storage/fs/s3/source_test.go deleted file mode 100644 index e383b8c045..0000000000 --- a/internal/storage/fs/s3/source_test.go +++ /dev/null @@ -1,124 +0,0 @@ -package s3 - -import ( - "bytes" - "context" - "os" - "testing" - "time" - - "github.com/aws/aws-sdk-go-v2/service/s3" - "github.com/stretchr/testify/require" - "go.flipt.io/flipt/internal/containers" - storagefs "go.flipt.io/flipt/internal/storage/fs" - "go.uber.org/zap/zaptest" -) - -const testBucket = "testdata" - -var minioURL = os.Getenv("TEST_S3_ENDPOINT") - -func Test_SourceString(t *testing.T) { - require.Equal(t, "s3", (&Source{}).String()) -} - -func Test_SourceGet(t *testing.T) { - source, skip := testSource(t) - if skip { - return - } - - snap, err := source.Get(context.Background()) - require.NoError(t, err) - - _, err = snap.GetNamespace(context.TODO(), "production") - require.NoError(t, err) - - _, err = snap.GetNamespace(context.TODO(), "prefix") - require.NoError(t, err) -} - -func Test_SourceGetPrefix(t *testing.T) { - source, skip := testSource(t, WithPrefix("prefix/")) - if skip { - return - } - - snap, err := source.Get(context.Background()) - require.NoError(t, err) - - _, err = snap.GetNamespace(context.TODO(), "production") - require.Error(t, err, "production namespace should have been skipped") - - _, err = snap.GetNamespace(context.TODO(), "prefix") - require.NoError(t, err, "prefix namespace should be present in snapshot") -} - -func Test_SourceSubscribe(t *testing.T) { - source, skip := testSource(t) - if skip { - return - } - - snap, err := source.Get(context.Background()) - require.NoError(t, err) - - _, err = snap.GetNamespace(context.TODO(), "production") - require.NoError(t, err) - - ctx, cancel := context.WithCancel(context.Background()) - - // start subscription - ch := make(chan *storagefs.StoreSnapshot) - go source.Subscribe(ctx, ch) - - updated := []byte(`namespace: production -flags: - - key: foo - name: Foo`) - - buf := bytes.NewReader(updated) - - s3Client := source.s3 - // update features.yml - path := "features.yml" - _, err = s3Client.PutObject(ctx, &s3.PutObjectInput{ - Bucket: &source.bucket, - Key: &path, - Body: buf, - }) - require.NoError(t, err) - - // assert matching state - snap = <-ch - - t.Log("received new snapshot") - - _, err = snap.GetFlag(context.TODO(), "production", "foo") - require.NoError(t, err) - - cancel() - - _, open := <-ch - require.False(t, open, "expected channel to be closed after cancel") -} - -func testSource(t *testing.T, opts ...containers.Option[Source]) (*Source, bool) { - t.Helper() - - if minioURL == "" { - t.Skip("Set non-empty TEST_S3_ENDPOINT env var to run this test.") - return nil, true - } - - source, err := NewSource(zaptest.NewLogger(t), testBucket, - append([]containers.Option[Source]{ - WithEndpoint(minioURL), - WithPollInterval(5 * time.Second), - }, - opts...)..., - ) - require.NoError(t, err) - - return source, false -} diff --git a/internal/storage/fs/s3/store.go b/internal/storage/fs/s3/store.go new file mode 100644 index 0000000000..2003557683 --- /dev/null +++ b/internal/storage/fs/s3/store.go @@ -0,0 +1,135 @@ +package s3 + +import ( + "context" + "sync" + "time" + + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + + "go.flipt.io/flipt/internal/containers" + "go.flipt.io/flipt/internal/s3fs" + "go.flipt.io/flipt/internal/storage" + storagefs "go.flipt.io/flipt/internal/storage/fs" + "go.uber.org/zap" +) + +var _ storagefs.SnapshotStore = (*SnapshotStore)(nil) + +// SnapshotStore represents an implementation of storage.SnapshotStore +// This implementation is backed by an S3 bucket +type SnapshotStore struct { + logger *zap.Logger + s3 *s3.Client + + mu sync.RWMutex + snap storage.ReadOnlyStore + + endpoint string + region string + bucket string + prefix string + + pollOpts []containers.Option[storagefs.Poller] +} + +// View accepts a function which takes a *StoreSnapshot. +// The SnapshotStore will supply a snapshot which is valid +// for the lifetime of the provided function call. +func (s *SnapshotStore) View(fn func(storage.ReadOnlyStore) error) error { + s.mu.RLock() + defer s.mu.RUnlock() + return fn(s.snap) +} + +// NewSnapshotStore constructs a Store +func NewSnapshotStore(ctx context.Context, logger *zap.Logger, bucket string, opts ...containers.Option[SnapshotStore]) (*SnapshotStore, error) { + s := &SnapshotStore{ + logger: logger, + bucket: bucket, + pollOpts: []containers.Option[storagefs.Poller]{ + storagefs.WithInterval(60 * time.Second), + }, + } + + containers.ApplyAll(s, opts...) + + cfg, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion(s.region)) + if err != nil { + return nil, err + } + + var s3Opts []func(*s3.Options) + if s.endpoint != "" { + s3Opts = append(s3Opts, func(o *s3.Options) { + o.BaseEndpoint = &s.endpoint + o.UsePathStyle = true + o.Region = s.region + }) + } + s.s3 = s3.NewFromConfig(cfg, s3Opts...) + + // fetch snapshot at-least once before returning store + // to ensure we have some state to serve + if _, err := s.update(ctx); err != nil { + return nil, err + } + + go storagefs.NewPoller(s.logger, s.pollOpts...).Poll(ctx, s.update) + + return s, nil +} + +// WithPrefix configures the prefix for s3 +func WithPrefix(prefix string) containers.Option[SnapshotStore] { + return func(s *SnapshotStore) { + s.prefix = prefix + } +} + +// WithRegion configures the region for s3 +func WithRegion(region string) containers.Option[SnapshotStore] { + return func(s *SnapshotStore) { + s.region = region + } +} + +// WithEndpoint configures the region for s3 +func WithEndpoint(endpoint string) containers.Option[SnapshotStore] { + return func(s *SnapshotStore) { + s.endpoint = endpoint + } +} + +// WithPollOptions configures the poller options used when periodically updating snapshot state +func WithPollOptions(opts ...containers.Option[storagefs.Poller]) containers.Option[SnapshotStore] { + return func(s *SnapshotStore) { + s.pollOpts = append(s.pollOpts, opts...) + } +} + +// Update fetches a new snapshot and swaps it out for the current one. +func (s *SnapshotStore) update(context.Context) (bool, error) { + fs, err := s3fs.New(s.logger, s.s3, s.bucket, s.prefix) + if err != nil { + return false, err + } + + snap, err := storagefs.SnapshotFromFS(s.logger, fs) + if err != nil { + return false, err + } + + s.mu.Lock() + s.snap = snap + s.mu.Unlock() + + return true, nil +} + +// String returns an identifier string for the store type. +func (s *SnapshotStore) String() string { + return "s3" +} diff --git a/internal/storage/fs/s3/store_test.go b/internal/storage/fs/s3/store_test.go new file mode 100644 index 0000000000..4c2314b202 --- /dev/null +++ b/internal/storage/fs/s3/store_test.go @@ -0,0 +1,122 @@ +package s3 + +import ( + "bytes" + "context" + "os" + "testing" + "time" + + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/stretchr/testify/require" + "go.flipt.io/flipt/internal/containers" + "go.flipt.io/flipt/internal/storage" + "go.flipt.io/flipt/internal/storage/fs" + "go.uber.org/zap/zaptest" +) + +const testBucket = "testdata" + +var minioURL = os.Getenv("TEST_S3_ENDPOINT") + +func Test_Store_String(t *testing.T) { + require.Equal(t, "s3", (&SnapshotStore{}).String()) +} + +func Test_Store(t *testing.T) { + ch := make(chan struct{}) + store, skip := testStore(t, WithPollOptions( + fs.WithInterval(time.Second), + fs.WithNotify(t, func(modified bool) { + if modified { + close(ch) + } + }), + )) + if skip { + return + } + + // flag shouldn't be present until we update it + require.Error(t, store.View(func(s storage.ReadOnlyStore) error { + _, err := s.GetFlag(context.TODO(), "production", "foo") + return err + }), "flag should not be defined yet") + + updated := []byte(`namespace: production +flags: + - key: foo + name: Foo`) + + buf := bytes.NewReader(updated) + + s3Client := store.s3 + // update features.yml + path := "features.yml" + _, err := s3Client.PutObject(context.TODO(), &s3.PutObjectInput{ + Bucket: &store.bucket, + Key: &path, + Body: buf, + }) + require.NoError(t, err) + + // assert matching state + select { + case <-ch: + case <-time.After(time.Minute): + t.Fatal("timed out waiting for update") + } + + t.Log("received new snapshot") + + require.NoError(t, store.View(func(s storage.ReadOnlyStore) error { + _, err = s.GetNamespace(context.TODO(), "production") + if err != nil { + return err + } + + _, err = s.GetFlag(context.TODO(), "production", "foo") + if err != nil { + return err + } + + _, err = s.GetNamespace(context.TODO(), "prefix") + return err + })) + +} + +func Test_Store_WithPrefix(t *testing.T) { + store, skip := testStore(t, WithPrefix("prefix")) + if skip { + return + } + + // namespace shouldn't exist as it has been filtered out by the prefix + require.Error(t, store.View(func(s storage.ReadOnlyStore) error { + _, err := s.GetNamespace(context.TODO(), "production") + return err + }), "production namespace shouldn't be retrieavable") +} + +func testStore(t *testing.T, opts ...containers.Option[SnapshotStore]) (*SnapshotStore, bool) { + t.Helper() + + if minioURL == "" { + t.Skip("Set non-empty TEST_S3_ENDPOINT env var to run this test.") + return nil, true + } + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + source, err := NewSnapshotStore(ctx, zaptest.NewLogger(t), testBucket, + append([]containers.Option[SnapshotStore]{ + WithEndpoint(minioURL), + }, + opts...)..., + ) + require.NoError(t, err) + + return source, false +} diff --git a/internal/storage/fs/snapshot.go b/internal/storage/fs/snapshot.go index 37f3c2febb..7f058e0650 100644 --- a/internal/storage/fs/snapshot.go +++ b/internal/storage/fs/snapshot.go @@ -29,10 +29,7 @@ const ( defaultNs = "default" ) -var ( - _ storage.Store = (*StoreSnapshot)(nil) - ErrNotImplemented = errors.New("not implemented") -) +var _ storage.ReadOnlyStore = (*Snapshot)(nil) // FliptIndex represents the structure of a well-known file ".flipt.yml" // at the root of an FS. @@ -42,9 +39,9 @@ type FliptIndex struct { Exclude []string `yaml:"exclude,omitempty"` } -// StoreSnapshot contains the structures necessary for serving +// Snapshot contains the structures necessary for serving // flag state to a client. -type StoreSnapshot struct { +type Snapshot struct { ns map[string]*namespace evalDists map[string][]*storage.EvaluationDistribution now *timestamppb.Timestamp @@ -80,7 +77,7 @@ func newNamespace(key, name string, created *timestamppb.Timestamp) *namespace { // SnapshotFromFS is a convenience function for building a snapshot // directly from an implementation of fs.FS using the list state files // function to source the relevant Flipt configuration files. -func SnapshotFromFS(logger *zap.Logger, fs fs.FS) (*StoreSnapshot, error) { +func SnapshotFromFS(logger *zap.Logger, fs fs.FS) (*Snapshot, error) { files, err := listStateFiles(logger, fs) if err != nil { return nil, err @@ -93,7 +90,7 @@ func SnapshotFromFS(logger *zap.Logger, fs fs.FS) (*StoreSnapshot, error) { // SnapshotFromPaths constructs a StoreSnapshot from the provided // slice of paths resolved against the provided fs.FS. -func SnapshotFromPaths(logger *zap.Logger, ffs fs.FS, paths ...string) (*StoreSnapshot, error) { +func SnapshotFromPaths(logger *zap.Logger, ffs fs.FS, paths ...string) (*Snapshot, error) { var files []fs.File for _, file := range paths { fi, err := ffs.Open(file) @@ -109,9 +106,9 @@ func SnapshotFromPaths(logger *zap.Logger, ffs fs.FS, paths ...string) (*StoreSn // SnapshotFromFiles constructs a StoreSnapshot from the provided slice // of fs.File implementations. -func SnapshotFromFiles(logger *zap.Logger, files ...fs.File) (*StoreSnapshot, error) { +func SnapshotFromFiles(logger *zap.Logger, files ...fs.File) (*Snapshot, error) { now := flipt.Now() - s := StoreSnapshot{ + s := Snapshot{ ns: map[string]*namespace{ defaultNs: newNamespace("default", "Default", now), }, @@ -309,7 +306,7 @@ func listStateFiles(logger *zap.Logger, source fs.FS) ([]string, error) { return filenames, nil } -func (ss *StoreSnapshot) addDoc(doc *ext.Document) error { +func (ss *Snapshot) addDoc(doc *ext.Document) error { ns := ss.ns[doc.Namespace] if ns == nil { ns = newNamespace(doc.Namespace, doc.Namespace, ss.now) @@ -593,11 +590,11 @@ func (ss *StoreSnapshot) addDoc(doc *ext.Document) error { return nil } -func (ss StoreSnapshot) String() string { +func (ss Snapshot) String() string { return "snapshot" } -func (ss *StoreSnapshot) GetRule(ctx context.Context, namespaceKey string, id string) (rule *flipt.Rule, _ error) { +func (ss *Snapshot) GetRule(ctx context.Context, namespaceKey string, id string) (rule *flipt.Rule, _ error) { ns, err := ss.getNamespace(namespaceKey) if err != nil { return nil, err @@ -612,7 +609,7 @@ func (ss *StoreSnapshot) GetRule(ctx context.Context, namespaceKey string, id st return rule, nil } -func (ss *StoreSnapshot) ListRules(ctx context.Context, namespaceKey string, flagKey string, opts ...storage.QueryOption) (set storage.ResultSet[*flipt.Rule], _ error) { +func (ss *Snapshot) ListRules(ctx context.Context, namespaceKey string, flagKey string, opts ...storage.QueryOption) (set storage.ResultSet[*flipt.Rule], _ error) { ns, err := ss.getNamespace(namespaceKey) if err != nil { return set, err @@ -630,7 +627,7 @@ func (ss *StoreSnapshot) ListRules(ctx context.Context, namespaceKey string, fla }, rules...) } -func (ss *StoreSnapshot) CountRules(ctx context.Context, namespaceKey, flagKey string) (uint64, error) { +func (ss *Snapshot) CountRules(ctx context.Context, namespaceKey, flagKey string) (uint64, error) { ns, err := ss.getNamespace(namespaceKey) if err != nil { return 0, err @@ -646,35 +643,7 @@ func (ss *StoreSnapshot) CountRules(ctx context.Context, namespaceKey, flagKey s return count, nil } -func (ss *StoreSnapshot) CreateRule(ctx context.Context, r *flipt.CreateRuleRequest) (*flipt.Rule, error) { - return nil, ErrNotImplemented -} - -func (ss *StoreSnapshot) UpdateRule(ctx context.Context, r *flipt.UpdateRuleRequest) (*flipt.Rule, error) { - return nil, ErrNotImplemented -} - -func (ss *StoreSnapshot) DeleteRule(ctx context.Context, r *flipt.DeleteRuleRequest) error { - return ErrNotImplemented -} - -func (ss *StoreSnapshot) OrderRules(ctx context.Context, r *flipt.OrderRulesRequest) error { - return ErrNotImplemented -} - -func (ss *StoreSnapshot) CreateDistribution(ctx context.Context, r *flipt.CreateDistributionRequest) (*flipt.Distribution, error) { - return nil, ErrNotImplemented -} - -func (ss *StoreSnapshot) UpdateDistribution(ctx context.Context, r *flipt.UpdateDistributionRequest) (*flipt.Distribution, error) { - return nil, ErrNotImplemented -} - -func (ss *StoreSnapshot) DeleteDistribution(ctx context.Context, r *flipt.DeleteDistributionRequest) error { - return ErrNotImplemented -} - -func (ss *StoreSnapshot) GetSegment(ctx context.Context, namespaceKey string, key string) (*flipt.Segment, error) { +func (ss *Snapshot) GetSegment(ctx context.Context, namespaceKey string, key string) (*flipt.Segment, error) { ns, err := ss.getNamespace(namespaceKey) if err != nil { return nil, err @@ -688,7 +657,7 @@ func (ss *StoreSnapshot) GetSegment(ctx context.Context, namespaceKey string, ke return segment, nil } -func (ss *StoreSnapshot) ListSegments(ctx context.Context, namespaceKey string, opts ...storage.QueryOption) (set storage.ResultSet[*flipt.Segment], err error) { +func (ss *Snapshot) ListSegments(ctx context.Context, namespaceKey string, opts ...storage.QueryOption) (set storage.ResultSet[*flipt.Segment], err error) { ns, err := ss.getNamespace(namespaceKey) if err != nil { return set, err @@ -704,7 +673,7 @@ func (ss *StoreSnapshot) ListSegments(ctx context.Context, namespaceKey string, }, segments...) } -func (ss *StoreSnapshot) CountSegments(ctx context.Context, namespaceKey string) (uint64, error) { +func (ss *Snapshot) CountSegments(ctx context.Context, namespaceKey string) (uint64, error) { ns, err := ss.getNamespace(namespaceKey) if err != nil { return 0, err @@ -713,31 +682,7 @@ func (ss *StoreSnapshot) CountSegments(ctx context.Context, namespaceKey string) return uint64(len(ns.segments)), nil } -func (ss *StoreSnapshot) CreateSegment(ctx context.Context, r *flipt.CreateSegmentRequest) (*flipt.Segment, error) { - return nil, ErrNotImplemented -} - -func (ss *StoreSnapshot) UpdateSegment(ctx context.Context, r *flipt.UpdateSegmentRequest) (*flipt.Segment, error) { - return nil, ErrNotImplemented -} - -func (ss *StoreSnapshot) DeleteSegment(ctx context.Context, r *flipt.DeleteSegmentRequest) error { - return ErrNotImplemented -} - -func (ss *StoreSnapshot) CreateConstraint(ctx context.Context, r *flipt.CreateConstraintRequest) (*flipt.Constraint, error) { - return nil, ErrNotImplemented -} - -func (ss *StoreSnapshot) UpdateConstraint(ctx context.Context, r *flipt.UpdateConstraintRequest) (*flipt.Constraint, error) { - return nil, ErrNotImplemented -} - -func (ss *StoreSnapshot) DeleteConstraint(ctx context.Context, r *flipt.DeleteConstraintRequest) error { - return ErrNotImplemented -} - -func (ss *StoreSnapshot) GetNamespace(ctx context.Context, key string) (*flipt.Namespace, error) { +func (ss *Snapshot) GetNamespace(ctx context.Context, key string) (*flipt.Namespace, error) { ns, err := ss.getNamespace(key) if err != nil { return nil, err @@ -746,7 +691,7 @@ func (ss *StoreSnapshot) GetNamespace(ctx context.Context, key string) (*flipt.N return ns.resource, nil } -func (ss *StoreSnapshot) ListNamespaces(ctx context.Context, opts ...storage.QueryOption) (set storage.ResultSet[*flipt.Namespace], err error) { +func (ss *Snapshot) ListNamespaces(ctx context.Context, opts ...storage.QueryOption) (set storage.ResultSet[*flipt.Namespace], err error) { ns := make([]*flipt.Namespace, 0, len(ss.ns)) for _, n := range ss.ns { ns = append(ns, n.resource) @@ -757,23 +702,11 @@ func (ss *StoreSnapshot) ListNamespaces(ctx context.Context, opts ...storage.Que }, ns...) } -func (ss *StoreSnapshot) CountNamespaces(ctx context.Context) (uint64, error) { +func (ss *Snapshot) CountNamespaces(ctx context.Context) (uint64, error) { return uint64(len(ss.ns)), nil } -func (ss *StoreSnapshot) CreateNamespace(ctx context.Context, r *flipt.CreateNamespaceRequest) (*flipt.Namespace, error) { - return nil, ErrNotImplemented -} - -func (ss *StoreSnapshot) UpdateNamespace(ctx context.Context, r *flipt.UpdateNamespaceRequest) (*flipt.Namespace, error) { - return nil, ErrNotImplemented -} - -func (ss *StoreSnapshot) DeleteNamespace(ctx context.Context, r *flipt.DeleteNamespaceRequest) error { - return ErrNotImplemented -} - -func (ss *StoreSnapshot) GetFlag(ctx context.Context, namespaceKey string, key string) (*flipt.Flag, error) { +func (ss *Snapshot) GetFlag(ctx context.Context, namespaceKey string, key string) (*flipt.Flag, error) { ns, err := ss.getNamespace(namespaceKey) if err != nil { return nil, err @@ -787,7 +720,7 @@ func (ss *StoreSnapshot) GetFlag(ctx context.Context, namespaceKey string, key s return flag, nil } -func (ss *StoreSnapshot) ListFlags(ctx context.Context, namespaceKey string, opts ...storage.QueryOption) (set storage.ResultSet[*flipt.Flag], err error) { +func (ss *Snapshot) ListFlags(ctx context.Context, namespaceKey string, opts ...storage.QueryOption) (set storage.ResultSet[*flipt.Flag], err error) { ns, err := ss.getNamespace(namespaceKey) if err != nil { return set, err @@ -803,7 +736,7 @@ func (ss *StoreSnapshot) ListFlags(ctx context.Context, namespaceKey string, opt }, flags...) } -func (ss *StoreSnapshot) CountFlags(ctx context.Context, namespaceKey string) (uint64, error) { +func (ss *Snapshot) CountFlags(ctx context.Context, namespaceKey string) (uint64, error) { ns, err := ss.getNamespace(namespaceKey) if err != nil { return 0, err @@ -812,31 +745,7 @@ func (ss *StoreSnapshot) CountFlags(ctx context.Context, namespaceKey string) (u return uint64(len(ns.flags)), nil } -func (ss *StoreSnapshot) CreateFlag(ctx context.Context, r *flipt.CreateFlagRequest) (*flipt.Flag, error) { - return nil, ErrNotImplemented -} - -func (ss *StoreSnapshot) UpdateFlag(ctx context.Context, r *flipt.UpdateFlagRequest) (*flipt.Flag, error) { - return nil, ErrNotImplemented -} - -func (ss *StoreSnapshot) DeleteFlag(ctx context.Context, r *flipt.DeleteFlagRequest) error { - return ErrNotImplemented -} - -func (ss *StoreSnapshot) CreateVariant(ctx context.Context, r *flipt.CreateVariantRequest) (*flipt.Variant, error) { - return nil, ErrNotImplemented -} - -func (ss *StoreSnapshot) UpdateVariant(ctx context.Context, r *flipt.UpdateVariantRequest) (*flipt.Variant, error) { - return nil, ErrNotImplemented -} - -func (ss *StoreSnapshot) DeleteVariant(ctx context.Context, r *flipt.DeleteVariantRequest) error { - return ErrNotImplemented -} - -func (ss *StoreSnapshot) GetEvaluationRules(ctx context.Context, namespaceKey string, flagKey string) ([]*storage.EvaluationRule, error) { +func (ss *Snapshot) GetEvaluationRules(ctx context.Context, namespaceKey string, flagKey string) ([]*storage.EvaluationRule, error) { ns, ok := ss.ns[namespaceKey] if !ok { return nil, errs.ErrNotFoundf("namespaced %q", namespaceKey) @@ -850,7 +759,7 @@ func (ss *StoreSnapshot) GetEvaluationRules(ctx context.Context, namespaceKey st return rules, nil } -func (ss *StoreSnapshot) GetEvaluationDistributions(ctx context.Context, ruleID string) ([]*storage.EvaluationDistribution, error) { +func (ss *Snapshot) GetEvaluationDistributions(ctx context.Context, ruleID string) ([]*storage.EvaluationDistribution, error) { dists, ok := ss.evalDists[ruleID] if !ok { return nil, errs.ErrNotFoundf("rule %q", ruleID) @@ -859,7 +768,7 @@ func (ss *StoreSnapshot) GetEvaluationDistributions(ctx context.Context, ruleID return dists, nil } -func (ss *StoreSnapshot) GetEvaluationRollouts(ctx context.Context, namespaceKey, flagKey string) ([]*storage.EvaluationRollout, error) { +func (ss *Snapshot) GetEvaluationRollouts(ctx context.Context, namespaceKey, flagKey string) ([]*storage.EvaluationRollout, error) { ns, ok := ss.ns[namespaceKey] if !ok { return nil, errs.ErrNotFoundf("namespaced %q", namespaceKey) @@ -873,7 +782,7 @@ func (ss *StoreSnapshot) GetEvaluationRollouts(ctx context.Context, namespaceKey return rollouts, nil } -func (ss *StoreSnapshot) GetRollout(ctx context.Context, namespaceKey, id string) (*flipt.Rollout, error) { +func (ss *Snapshot) GetRollout(ctx context.Context, namespaceKey, id string) (*flipt.Rollout, error) { ns, err := ss.getNamespace(namespaceKey) if err != nil { return nil, err @@ -887,7 +796,7 @@ func (ss *StoreSnapshot) GetRollout(ctx context.Context, namespaceKey, id string return rollout, nil } -func (ss *StoreSnapshot) ListRollouts(ctx context.Context, namespaceKey, flagKey string, opts ...storage.QueryOption) (set storage.ResultSet[*flipt.Rollout], err error) { +func (ss *Snapshot) ListRollouts(ctx context.Context, namespaceKey, flagKey string, opts ...storage.QueryOption) (set storage.ResultSet[*flipt.Rollout], err error) { ns, err := ss.getNamespace(namespaceKey) if err != nil { return set, err @@ -905,7 +814,7 @@ func (ss *StoreSnapshot) ListRollouts(ctx context.Context, namespaceKey, flagKey }, rollouts...) } -func (ss *StoreSnapshot) CountRollouts(ctx context.Context, namespaceKey, flagKey string) (uint64, error) { +func (ss *Snapshot) CountRollouts(ctx context.Context, namespaceKey, flagKey string) (uint64, error) { ns, err := ss.getNamespace(namespaceKey) if err != nil { return 0, err @@ -921,22 +830,6 @@ func (ss *StoreSnapshot) CountRollouts(ctx context.Context, namespaceKey, flagKe return count, nil } -func (ss *StoreSnapshot) CreateRollout(ctx context.Context, r *flipt.CreateRolloutRequest) (*flipt.Rollout, error) { - return nil, ErrNotImplemented -} - -func (ss *StoreSnapshot) UpdateRollout(ctx context.Context, r *flipt.UpdateRolloutRequest) (*flipt.Rollout, error) { - return nil, ErrNotImplemented -} - -func (ss *StoreSnapshot) DeleteRollout(ctx context.Context, r *flipt.DeleteRolloutRequest) error { - return ErrNotImplemented -} - -func (ss *StoreSnapshot) OrderRollouts(ctx context.Context, r *flipt.OrderRolloutsRequest) error { - return ErrNotImplemented -} - func findByKey[T interface{ GetKey() string }](key string, ts ...T) (t T, _ bool) { return find(func(t T) bool { return t.GetKey() == key }, ts...) } @@ -1003,7 +896,7 @@ func paginate[T any](params storage.QueryParams, less func(i, j int) bool, items return set, nil } -func (ss *StoreSnapshot) getNamespace(key string) (namespace, error) { +func (ss *Snapshot) getNamespace(key string) (namespace, error) { ns, ok := ss.ns[key] if !ok { return namespace{}, errs.ErrNotFoundf("namespace %q", key) diff --git a/internal/storage/fs/snapshot_test.go b/internal/storage/fs/snapshot_test.go index 1a4334d71b..3b0ae2f1fb 100644 --- a/internal/storage/fs/snapshot_test.go +++ b/internal/storage/fs/snapshot_test.go @@ -113,7 +113,7 @@ func TestFSWithIndex(t *testing.T) { type FSIndexSuite struct { suite.Suite - store storage.Store + store storage.ReadOnlyStore } func (fis *FSIndexSuite) TestCountFlag() { @@ -790,7 +790,7 @@ func (fis *FSIndexSuite) TestCountRules() { type FSWithoutIndexSuite struct { suite.Suite - store storage.Store + store storage.ReadOnlyStore } func TestFSWithoutIndex(t *testing.T) { diff --git a/internal/storage/fs/store.go b/internal/storage/fs/store.go index 925bf628dd..21358bae82 100644 --- a/internal/storage/fs/store.go +++ b/internal/storage/fs/store.go @@ -2,108 +2,336 @@ package fs import ( "context" + "errors" "fmt" "path" - "go.uber.org/zap" + "go.flipt.io/flipt/internal/storage" + "go.flipt.io/flipt/rpc/flipt" ) -// SnapshotSource produces instances of the storage snapshot. -// A single snapshot can be produced via Get or a channel -// may be provided to Subscribe in order to received -// new instances when new state becomes available. -type SnapshotSource interface { - fmt.Stringer +var ( + _ storage.Store = (*Store)(nil) - // Get builds a single instance of a *SnapshotSource - Get(context.Context) (*StoreSnapshot, error) + // ErrNotImplemented is returned when a method has intentionally not been implemented + // This is usually reserved for the store write actions when the store is read-only + // but still needs to implement storage.Store + ErrNotImplemented = errors.New("not implemented") +) - // Subscribe feeds instances of *SnapshotSource onto the provided channel. - // It should block until the provided context is cancelled (it will be called in a goroutine). - // It should close the provided channel before it returns. - Subscribe(context.Context, chan<- *StoreSnapshot) +// SnapshotStore is a type which has a single function View. +// View is a functional transaction interface for reading a snapshot +// during the lifetime of a supplied function. +type SnapshotStore interface { + // View accepts a function which takes a *StoreSnapshot. + // The SnapshotStore will supply a snapshot which is valid + // for the lifetime of the provided function call. + View(func(storage.ReadOnlyStore) error) error + fmt.Stringer } -// Store is an implementation of storage.Store backed by an SnapshotSource. -// The store subscribes to the source for instances of *SnapshotSource with new contents. -// When a new fs is received the contents is fetched and built into a snapshot -// of Flipt feature flag state. +// Store embeds a StoreSnapshot and wraps the Store methods with a read-write mutex +// to synchronize reads with atomic replacements of the embedded snapshot. type Store struct { - *syncedStore + viewer SnapshotStore +} + +func NewStore(viewer SnapshotStore) *Store { + return &Store{viewer: viewer} +} + +func (s *Store) String() string { + return path.Join("declarative", s.viewer.String()) +} + +func (s *Store) GetFlag(ctx context.Context, namespaceKey string, key string) (flag *flipt.Flag, err error) { + if namespaceKey == "" { + namespaceKey = flipt.DefaultNamespace + } + + return flag, s.viewer.View(func(ss storage.ReadOnlyStore) error { + flag, err = ss.GetFlag(ctx, namespaceKey, key) + return err + }) +} + +func (s *Store) ListFlags(ctx context.Context, namespaceKey string, opts ...storage.QueryOption) (set storage.ResultSet[*flipt.Flag], err error) { + if namespaceKey == "" { + namespaceKey = flipt.DefaultNamespace + } + + return set, s.viewer.View(func(ss storage.ReadOnlyStore) error { + set, err = ss.ListFlags(ctx, namespaceKey, opts...) + return err + }) +} + +func (s *Store) CountFlags(ctx context.Context, namespaceKey string) (count uint64, err error) { + if namespaceKey == "" { + namespaceKey = flipt.DefaultNamespace + } + + return count, s.viewer.View(func(ss storage.ReadOnlyStore) error { + count, err = ss.CountFlags(ctx, namespaceKey) + return err + }) +} + +func (s *Store) GetRule(ctx context.Context, namespaceKey string, id string) (rule *flipt.Rule, err error) { + if namespaceKey == "" { + namespaceKey = flipt.DefaultNamespace + } + + return rule, s.viewer.View(func(ss storage.ReadOnlyStore) error { + rule, err = ss.GetRule(ctx, namespaceKey, id) + return err + }) +} + +func (s *Store) ListRules(ctx context.Context, namespaceKey string, flagKey string, opts ...storage.QueryOption) (set storage.ResultSet[*flipt.Rule], err error) { + if namespaceKey == "" { + namespaceKey = flipt.DefaultNamespace + } + + return set, s.viewer.View(func(ss storage.ReadOnlyStore) error { + set, err = ss.ListRules(ctx, namespaceKey, flagKey, opts...) + return err + }) +} + +func (s *Store) CountRules(ctx context.Context, namespaceKey, flagKey string) (count uint64, err error) { + if namespaceKey == "" { + namespaceKey = flipt.DefaultNamespace + } + + return count, s.viewer.View(func(ss storage.ReadOnlyStore) error { + count, err = ss.CountRules(ctx, namespaceKey, flagKey) + return err + }) +} + +func (s *Store) GetSegment(ctx context.Context, namespaceKey string, key string) (segment *flipt.Segment, err error) { + if namespaceKey == "" { + namespaceKey = flipt.DefaultNamespace + } + + return segment, s.viewer.View(func(ss storage.ReadOnlyStore) error { + segment, err = ss.GetSegment(ctx, namespaceKey, key) + return err + }) +} + +func (s *Store) ListSegments(ctx context.Context, namespaceKey string, opts ...storage.QueryOption) (set storage.ResultSet[*flipt.Segment], err error) { + if namespaceKey == "" { + namespaceKey = flipt.DefaultNamespace + } + + return set, s.viewer.View(func(ss storage.ReadOnlyStore) error { + set, err = ss.ListSegments(ctx, namespaceKey, opts...) + return err + }) +} + +func (s *Store) CountSegments(ctx context.Context, namespaceKey string) (count uint64, err error) { + if namespaceKey == "" { + namespaceKey = flipt.DefaultNamespace + } + + return count, s.viewer.View(func(ss storage.ReadOnlyStore) error { + count, err = ss.CountSegments(ctx, namespaceKey) + return err + }) +} + +func (s *Store) GetEvaluationRules(ctx context.Context, namespaceKey string, flagKey string) (rules []*storage.EvaluationRule, err error) { + if namespaceKey == "" { + namespaceKey = flipt.DefaultNamespace + } + + return rules, s.viewer.View(func(ss storage.ReadOnlyStore) error { + rules, err = ss.GetEvaluationRules(ctx, namespaceKey, flagKey) + return err + }) +} + +func (s *Store) GetEvaluationDistributions(ctx context.Context, ruleID string) (dists []*storage.EvaluationDistribution, err error) { + return dists, s.viewer.View(func(ss storage.ReadOnlyStore) error { + dists, err = ss.GetEvaluationDistributions(ctx, ruleID) + return err + }) +} + +func (s *Store) GetEvaluationRollouts(ctx context.Context, namespaceKey, flagKey string) (rollouts []*storage.EvaluationRollout, err error) { + if namespaceKey == "" { + namespaceKey = flipt.DefaultNamespace + } - logger *zap.Logger - source SnapshotSource + return rollouts, s.viewer.View(func(ss storage.ReadOnlyStore) error { + rollouts, err = ss.GetEvaluationRollouts(ctx, namespaceKey, flagKey) + return err + }) +} - // notify is used for test purposes - // it is invoked if defined when a snapshot update finishes - notify func() +func (s *Store) GetNamespace(ctx context.Context, key string) (ns *flipt.Namespace, err error) { + if key == "" { + key = flipt.DefaultNamespace + } - cancel context.CancelFunc - done chan struct{} + return ns, s.viewer.View(func(ss storage.ReadOnlyStore) error { + ns, err = ss.GetNamespace(ctx, key) + return err + }) } -func (l *Store) updateSnapshot(storeSnapshot *StoreSnapshot) { - l.mu.Lock() - l.Store = storeSnapshot - l.mu.Unlock() +func (s *Store) ListNamespaces(ctx context.Context, opts ...storage.QueryOption) (set storage.ResultSet[*flipt.Namespace], err error) { + return set, s.viewer.View(func(ss storage.ReadOnlyStore) error { + set, err = ss.ListNamespaces(ctx, opts...) + return err + }) +} - // NOTE: this is really just a trick for unit tests - // It is used to signal that an update occurred - // so we dont have to e.g. sleep to know when - // to check state. - if l.notify != nil { - l.notify() +func (s *Store) CountNamespaces(ctx context.Context) (count uint64, err error) { + return count, s.viewer.View(func(ss storage.ReadOnlyStore) error { + count, err = ss.CountNamespaces(ctx) + return err + }) +} + +func (s *Store) GetRollout(ctx context.Context, namespaceKey, id string) (rollout *flipt.Rollout, err error) { + if namespaceKey == "" { + namespaceKey = flipt.DefaultNamespace } + + return rollout, s.viewer.View(func(ss storage.ReadOnlyStore) error { + rollout, err = ss.GetRollout(ctx, namespaceKey, id) + return err + }) } -// NewStore constructs and configure a Store. -// The store creates a background goroutine which feeds a channel of *SnapshotSource. -func NewStore(logger *zap.Logger, source SnapshotSource) (*Store, error) { - store := &Store{ - syncedStore: &syncedStore{}, - logger: logger, - source: source, - done: make(chan struct{}), +func (s *Store) ListRollouts(ctx context.Context, namespaceKey, flagKey string, opts ...storage.QueryOption) (set storage.ResultSet[*flipt.Rollout], err error) { + if namespaceKey == "" { + namespaceKey = flipt.DefaultNamespace } - // get an initial snapshot from source. - f, err := source.Get(context.Background()) - if err != nil { - return nil, err + return set, s.viewer.View(func(ss storage.ReadOnlyStore) error { + set, err = ss.ListRollouts(ctx, namespaceKey, flagKey, opts...) + return err + }) +} + +func (s *Store) CountRollouts(ctx context.Context, namespaceKey, flagKey string) (count uint64, err error) { + if namespaceKey == "" { + namespaceKey = flipt.DefaultNamespace } - store.updateSnapshot(f) + return count, s.viewer.View(func(ss storage.ReadOnlyStore) error { + count, err = ss.CountRollouts(ctx, namespaceKey, flagKey) + return err + }) +} + +// unimplemented write paths below + +func (s *Store) CreateNamespace(ctx context.Context, r *flipt.CreateNamespaceRequest) (*flipt.Namespace, error) { + return nil, ErrNotImplemented +} + +func (s *Store) UpdateNamespace(ctx context.Context, r *flipt.UpdateNamespaceRequest) (*flipt.Namespace, error) { + return nil, ErrNotImplemented +} - var ctx context.Context - ctx, store.cancel = context.WithCancel(context.Background()) +func (s *Store) DeleteNamespace(ctx context.Context, r *flipt.DeleteNamespaceRequest) error { + return ErrNotImplemented +} - ch := make(chan *StoreSnapshot) - go source.Subscribe(ctx, ch) +func (s *Store) CreateFlag(ctx context.Context, r *flipt.CreateFlagRequest) (*flipt.Flag, error) { + return nil, ErrNotImplemented +} - go func() { - defer close(store.done) - for snap := range ch { - logger.Debug("received new snapshot") - store.updateSnapshot(snap) - logger.Debug("updated latest snapshot") - } +func (s *Store) UpdateFlag(ctx context.Context, r *flipt.UpdateFlagRequest) (*flipt.Flag, error) { + return nil, ErrNotImplemented +} - logger.Info("source subscription closed") - }() +func (s *Store) DeleteFlag(ctx context.Context, r *flipt.DeleteFlagRequest) error { + return ErrNotImplemented +} - return store, nil +func (s *Store) CreateVariant(ctx context.Context, r *flipt.CreateVariantRequest) (*flipt.Variant, error) { + return nil, ErrNotImplemented } -// Close cancels the polling routine and waits for the routine to return. -func (l *Store) Close() error { - l.cancel() +func (s *Store) UpdateVariant(ctx context.Context, r *flipt.UpdateVariantRequest) (*flipt.Variant, error) { + return nil, ErrNotImplemented +} + +func (s *Store) DeleteVariant(ctx context.Context, r *flipt.DeleteVariantRequest) error { + return ErrNotImplemented +} + +func (s *Store) CreateSegment(ctx context.Context, r *flipt.CreateSegmentRequest) (*flipt.Segment, error) { + return nil, ErrNotImplemented +} + +func (s *Store) UpdateSegment(ctx context.Context, r *flipt.UpdateSegmentRequest) (*flipt.Segment, error) { + return nil, ErrNotImplemented +} - <-l.done +func (s *Store) DeleteSegment(ctx context.Context, r *flipt.DeleteSegmentRequest) error { + return ErrNotImplemented +} + +func (s *Store) CreateConstraint(ctx context.Context, r *flipt.CreateConstraintRequest) (*flipt.Constraint, error) { + return nil, ErrNotImplemented +} + +func (s *Store) UpdateConstraint(ctx context.Context, r *flipt.UpdateConstraintRequest) (*flipt.Constraint, error) { + return nil, ErrNotImplemented +} + +func (s *Store) DeleteConstraint(ctx context.Context, r *flipt.DeleteConstraintRequest) error { + return ErrNotImplemented +} + +func (s *Store) CreateRule(ctx context.Context, r *flipt.CreateRuleRequest) (*flipt.Rule, error) { + return nil, ErrNotImplemented +} + +func (s *Store) UpdateRule(ctx context.Context, r *flipt.UpdateRuleRequest) (*flipt.Rule, error) { + return nil, ErrNotImplemented +} + +func (s *Store) DeleteRule(ctx context.Context, r *flipt.DeleteRuleRequest) error { + return ErrNotImplemented +} + +func (s *Store) OrderRules(ctx context.Context, r *flipt.OrderRulesRequest) error { + return ErrNotImplemented +} + +func (s *Store) CreateDistribution(ctx context.Context, r *flipt.CreateDistributionRequest) (*flipt.Distribution, error) { + return nil, ErrNotImplemented +} + +func (s *Store) UpdateDistribution(ctx context.Context, r *flipt.UpdateDistributionRequest) (*flipt.Distribution, error) { + return nil, ErrNotImplemented +} + +func (s *Store) DeleteDistribution(ctx context.Context, r *flipt.DeleteDistributionRequest) error { + return ErrNotImplemented +} + +func (s *Store) CreateRollout(ctx context.Context, r *flipt.CreateRolloutRequest) (*flipt.Rollout, error) { + return nil, ErrNotImplemented +} + +func (s *Store) UpdateRollout(ctx context.Context, r *flipt.UpdateRolloutRequest) (*flipt.Rollout, error) { + return nil, ErrNotImplemented +} - return nil +func (s *Store) DeleteRollout(ctx context.Context, r *flipt.DeleteRolloutRequest) error { + return ErrNotImplemented } -// String returns an identifier string for the store type. -func (l *Store) String() string { - return path.Join("filesystem", l.source.String()) +func (s *Store) OrderRollouts(ctx context.Context, r *flipt.OrderRolloutsRequest) error { + return ErrNotImplemented } diff --git a/internal/storage/fs/store/store.go b/internal/storage/fs/store/store.go new file mode 100644 index 0000000000..0afece5d99 --- /dev/null +++ b/internal/storage/fs/store/store.go @@ -0,0 +1,164 @@ +package store + +import ( + "context" + "fmt" + "os" + + "github.com/go-git/go-git/v5/plumbing/transport/http" + gitssh "github.com/go-git/go-git/v5/plumbing/transport/ssh" + "go.flipt.io/flipt/internal/config" + "go.flipt.io/flipt/internal/containers" + "go.flipt.io/flipt/internal/oci" + "go.flipt.io/flipt/internal/storage" + storagefs "go.flipt.io/flipt/internal/storage/fs" + "go.flipt.io/flipt/internal/storage/fs/git" + "go.flipt.io/flipt/internal/storage/fs/local" + storageoci "go.flipt.io/flipt/internal/storage/fs/oci" + "go.flipt.io/flipt/internal/storage/fs/s3" + "go.uber.org/zap" + "golang.org/x/crypto/ssh" +) + +// NewStore is a constructor that handles all the known declarative backend storage types +// Given the provided storage type is know, the relevant backend is configured and returned +func NewStore(ctx context.Context, logger *zap.Logger, cfg *config.Config) (_ storage.Store, err error) { + switch cfg.Storage.Type { + case config.GitStorageType: + opts := []containers.Option[git.SnapshotStore]{ + git.WithRef(cfg.Storage.Git.Ref), + git.WithPollOptions( + storagefs.WithInterval(cfg.Storage.Git.PollInterval), + ), + git.WithInsecureTLS(cfg.Storage.Git.InsecureSkipTLS), + } + + if cfg.Storage.Git.CaCertBytes != "" { + opts = append(opts, git.WithCABundle([]byte(cfg.Storage.Git.CaCertBytes))) + } else if cfg.Storage.Git.CaCertPath != "" { + if bytes, err := os.ReadFile(cfg.Storage.Git.CaCertPath); err == nil { + opts = append(opts, git.WithCABundle(bytes)) + } else { + return nil, err + } + } + + auth := cfg.Storage.Git.Authentication + switch { + case auth.BasicAuth != nil: + opts = append(opts, git.WithAuth(&http.BasicAuth{ + Username: auth.BasicAuth.Username, + Password: auth.BasicAuth.Password, + })) + case auth.TokenAuth != nil: + opts = append(opts, git.WithAuth(&http.TokenAuth{ + Token: auth.TokenAuth.AccessToken, + })) + case auth.SSHAuth != nil: + var method *gitssh.PublicKeys + if auth.SSHAuth.PrivateKeyBytes != "" { + method, err = gitssh.NewPublicKeys( + auth.SSHAuth.User, + []byte(auth.SSHAuth.PrivateKeyBytes), + auth.SSHAuth.Password, + ) + } else { + method, err = gitssh.NewPublicKeysFromFile( + auth.SSHAuth.User, + auth.SSHAuth.PrivateKeyPath, + auth.SSHAuth.Password, + ) + } + if err != nil { + return nil, err + } + + // we're protecting against this explicitly so we can disable + // the gosec linting rule + if auth.SSHAuth.InsecureIgnoreHostKey { + // nolint:gosec + method.HostKeyCallback = ssh.InsecureIgnoreHostKey() + } + + opts = append(opts, git.WithAuth(method)) + } + + snapStore, err := git.NewSnapshotStore(ctx, logger, cfg.Storage.Git.Repository, opts...) + if err != nil { + return nil, err + } + + return storagefs.NewStore(snapStore), nil + case config.LocalStorageType: + snapStore, err := local.NewSnapshotStore(ctx, logger, cfg.Storage.Local.Path) + if err != nil { + return nil, err + } + + return storagefs.NewStore(snapStore), nil + case config.ObjectStorageType: + return newObjectStore(ctx, cfg, logger) + case config.OCIStorageType: + var opts []containers.Option[oci.StoreOptions] + if auth := cfg.Storage.OCI.Authentication; auth != nil { + opts = append(opts, oci.WithCredentials( + auth.Username, + auth.Password, + )) + } + + ocistore, err := oci.NewStore(logger, cfg.Storage.OCI.BundlesDirectory, opts...) + if err != nil { + return nil, err + } + + ref, err := oci.ParseReference(cfg.Storage.OCI.Repository) + if err != nil { + return nil, err + } + + snapStore, err := storageoci.NewSnapshotStore(ctx, logger, ocistore, ref, + storageoci.WithPollOptions( + storagefs.WithInterval(cfg.Storage.OCI.PollInterval), + ), + ) + if err != nil { + return nil, err + } + + return storagefs.NewStore(snapStore), nil + } + + return nil, fmt.Errorf("unexpected storage type: %q", cfg.Storage.Type) +} + +// newObjectStore create a new storate.Store from the object config +func newObjectStore(ctx context.Context, cfg *config.Config, logger *zap.Logger) (store storage.Store, err error) { + objectCfg := cfg.Storage.Object + // keep this as a case statement in anticipation of + // more object types in the future + // nolint:gocritic + switch objectCfg.Type { + case config.S3ObjectSubStorageType: + opts := []containers.Option[s3.SnapshotStore]{ + s3.WithPollOptions( + storagefs.WithInterval(objectCfg.S3.PollInterval), + ), + } + if objectCfg.S3.Endpoint != "" { + opts = append(opts, s3.WithEndpoint(objectCfg.S3.Endpoint)) + } + if objectCfg.S3.Region != "" { + opts = append(opts, s3.WithRegion(objectCfg.S3.Region)) + } + + snapStore, err := s3.NewSnapshotStore(ctx, logger, objectCfg.S3.Bucket, opts...) + if err != nil { + return nil, err + } + + return storagefs.NewStore(snapStore), nil + } + + return nil, fmt.Errorf("unexpected object storage subtype: %q", objectCfg.Type) +} diff --git a/internal/storage/fs/store_test.go b/internal/storage/fs/store_test.go index 41cae2631b..780ab6059b 100644 --- a/internal/storage/fs/store_test.go +++ b/internal/storage/fs/store_test.go @@ -2,89 +2,212 @@ package fs import ( "context" - "io/fs" "testing" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - "go.uber.org/zap/zaptest" + "go.flipt.io/flipt/internal/common" + "go.flipt.io/flipt/internal/storage" + "go.flipt.io/flipt/rpc/flipt" ) -func Test_Store(t *testing.T) { - var ( - logger = zaptest.NewLogger(t) - notify = make(chan struct{}) - source = source{ - get: mustSub(t, testdata, "testdata/valid/explicit_index"), - ch: make(chan *StoreSnapshot), - } - ) +func TestGetFlag(t *testing.T) { + storeMock := newSnapshotStoreMock() + ss := NewStore(storeMock) - store, err := NewStore(logger, source) + storeMock.On("GetFlag", mock.Anything, flipt.DefaultNamespace, "foo").Return(&flipt.Flag{}, nil) + + _, err := ss.GetFlag(context.TODO(), "", "foo") require.NoError(t, err) +} - // register a function to be called when updates have - // finished - store.notify = func() { - notify <- struct{}{} - } +func TestListFlags(t *testing.T) { + storeMock := newSnapshotStoreMock() + ss := NewStore(storeMock) + + storeMock.On("ListFlags", mock.Anything, flipt.DefaultNamespace, mock.Anything).Return(storage.ResultSet[*flipt.Flag]{}, nil) + + _, err := ss.ListFlags(context.TODO(), "") + require.NoError(t, err) +} + +func TestCountFlags(t *testing.T) { + storeMock := newSnapshotStoreMock() + ss := NewStore(storeMock) + + storeMock.On("CountFlags", mock.Anything, flipt.DefaultNamespace).Return(uint64(0), nil) + + _, err := ss.CountFlags(context.TODO(), "") + require.NoError(t, err) +} + +func TestGetRule(t *testing.T) { + storeMock := newSnapshotStoreMock() + ss := NewStore(storeMock) + + storeMock.On("GetRule", mock.Anything, flipt.DefaultNamespace, "").Return(&flipt.Rule{}, nil) + + _, err := ss.GetRule(context.TODO(), "", "") + require.NoError(t, err) +} + +func TestListRules(t *testing.T) { + storeMock := newSnapshotStoreMock() + ss := NewStore(storeMock) + + storeMock.On("ListRules", mock.Anything, flipt.DefaultNamespace, "", mock.Anything).Return(storage.ResultSet[*flipt.Rule]{}, nil) - assert.Equal(t, "filesystem/test", store.String()) + _, err := ss.ListRules(context.TODO(), "", "") + require.NoError(t, err) +} + +func TestCountRules(t *testing.T) { + storeMock := newSnapshotStoreMock() + ss := NewStore(storeMock) - // run FS with index suite against current store - suite.Run(t, &FSIndexSuite{store: store}) + storeMock.On("CountRules", mock.Anything, flipt.DefaultNamespace, "").Return(uint64(0), nil) - // update snapshot by sending fs without index - source.ch <- mustSub(t, testdata, "testdata/valid/implicit_index") + _, err := ss.CountRules(context.TODO(), "", "") + require.NoError(t, err) +} - // wait for update to apply - <-notify +func TestGetSegment(t *testing.T) { + storeMock := newSnapshotStoreMock() + ss := NewStore(storeMock) - // run FS without index suite against current store - suite.Run(t, &FSWithoutIndexSuite{store: store}) + storeMock.On("GetSegment", mock.Anything, flipt.DefaultNamespace, "").Return(&flipt.Segment{}, nil) - // shutdown store - require.NoError(t, store.Close()) + _, err := ss.GetSegment(context.TODO(), "", "") + require.NoError(t, err) } -type source struct { - get *StoreSnapshot - ch chan *StoreSnapshot +func TestListSegments(t *testing.T) { + storeMock := newSnapshotStoreMock() + ss := NewStore(storeMock) + + storeMock.On("ListSegments", mock.Anything, flipt.DefaultNamespace, mock.Anything).Return(storage.ResultSet[*flipt.Segment]{}, nil) + + _, err := ss.ListSegments(context.TODO(), "") + require.NoError(t, err) } -func (s source) String() string { - return "test" +func TestCountSegments(t *testing.T) { + storeMock := newSnapshotStoreMock() + ss := NewStore(storeMock) + + storeMock.On("CountSegments", mock.Anything, flipt.DefaultNamespace).Return(uint64(0), nil) + + _, err := ss.CountSegments(context.TODO(), "") + require.NoError(t, err) } -// Get builds a single instance of an *StoreSnapshot -func (s source) Get(context.Context) (*StoreSnapshot, error) { - return s.get, nil +func TestGetRollout(t *testing.T) { + storeMock := newSnapshotStoreMock() + ss := NewStore(storeMock) + + storeMock.On("GetRollout", mock.Anything, flipt.DefaultNamespace, "").Return(&flipt.Rollout{}, nil) + + _, err := ss.GetRollout(context.TODO(), "", "") + require.NoError(t, err) } -// Subscribe feeds implementations of *StoreSnapshot onto the provided channel. -// It should block until the provided context is cancelled (it will be called in a goroutine). -// It should close the provided channel before it returns. -func (s source) Subscribe(ctx context.Context, ch chan<- *StoreSnapshot) { - defer close(ch) +func TestListRollouts(t *testing.T) { + storeMock := newSnapshotStoreMock() + ss := NewStore(storeMock) - for { - select { - case <-ctx.Done(): - return - case snap := <-s.ch: - ch <- snap - } - } + storeMock.On("ListRollouts", mock.Anything, flipt.DefaultNamespace, "", mock.Anything).Return(storage.ResultSet[*flipt.Rollout]{}, nil) + + _, err := ss.ListRollouts(context.TODO(), "", "") + require.NoError(t, err) +} + +func TestCountRollouts(t *testing.T) { + storeMock := newSnapshotStoreMock() + ss := NewStore(storeMock) + + storeMock.On("CountRollouts", mock.Anything, flipt.DefaultNamespace, "").Return(uint64(0), nil) + + _, err := ss.CountRollouts(context.TODO(), "", "") + require.NoError(t, err) +} + +func TestGetNamespace(t *testing.T) { + storeMock := newSnapshotStoreMock() + ss := NewStore(storeMock) + + storeMock.On("GetNamespace", mock.Anything, flipt.DefaultNamespace).Return(&flipt.Namespace{}, nil) + + _, err := ss.GetNamespace(context.TODO(), "") + require.NoError(t, err) +} + +func TestListNamespaces(t *testing.T) { + storeMock := newSnapshotStoreMock() + ss := NewStore(storeMock) + + storeMock.On("ListNamespaces", mock.Anything, mock.Anything).Return(storage.ResultSet[*flipt.Namespace]{}, nil) + + _, err := ss.ListNamespaces(context.TODO()) + require.NoError(t, err) +} + +func TestCountNamespaces(t *testing.T) { + storeMock := newSnapshotStoreMock() + ss := NewStore(storeMock) + + storeMock.On("CountNamespaces", mock.Anything).Return(uint64(0), nil) + + _, err := ss.CountNamespaces(context.TODO()) + require.NoError(t, err) } -func mustSub(t *testing.T, f fs.FS, dir string) *StoreSnapshot { - t.Helper() - var err error - f, err = fs.Sub(f, dir) +func TestGetEvaluationRules(t *testing.T) { + storeMock := newSnapshotStoreMock() + ss := NewStore(storeMock) + + storeMock.On("GetEvaluationRules", mock.Anything, flipt.DefaultNamespace, "").Return([]*storage.EvaluationRule{}, nil) + + _, err := ss.GetEvaluationRules(context.TODO(), "", "") + require.NoError(t, err) +} + +func TestGetEvaluationDistributions(t *testing.T) { + storeMock := newSnapshotStoreMock() + ss := NewStore(storeMock) + + storeMock.On("GetEvaluationDistributions", mock.Anything, "").Return([]*storage.EvaluationDistribution{}, nil) + + _, err := ss.GetEvaluationDistributions(context.TODO(), "") require.NoError(t, err) +} + +func TestGetEvaluationRollouts(t *testing.T) { + storeMock := newSnapshotStoreMock() + ss := NewStore(storeMock) + + storeMock.On("GetEvaluationRollouts", mock.Anything, flipt.DefaultNamespace, "").Return([]*storage.EvaluationRollout{}, nil) - snap, err := SnapshotFromFS(zaptest.NewLogger(t), f) + _, err := ss.GetEvaluationRollouts(context.TODO(), "", "") require.NoError(t, err) - return snap +} + +type snapshotStoreMock struct { + *common.StoreMock +} + +func newSnapshotStoreMock() snapshotStoreMock { + return snapshotStoreMock{ + StoreMock: &common.StoreMock{}, + } +} + +// View accepts a function which takes a *StoreSnapshot. +// The SnapshotStore will supply a snapshot which is valid +// for the lifetime of the provided function call. +func (s snapshotStoreMock) View(fn func(storage.ReadOnlyStore) error) error { + return fn(s.StoreMock) +} + +func (s snapshotStoreMock) String() string { + return "mock" } diff --git a/internal/storage/fs/sync.go b/internal/storage/fs/sync.go deleted file mode 100644 index 459e26466e..0000000000 --- a/internal/storage/fs/sync.go +++ /dev/null @@ -1,221 +0,0 @@ -package fs - -import ( - "context" - "sync" - - "go.flipt.io/flipt/internal/storage" - "go.flipt.io/flipt/rpc/flipt" -) - -var _ storage.Store = (*syncedStore)(nil) - -// syncedStore embeds a storeSnapshot and wraps the Store methods with a read-write mutex -// to synchronize reads with swapping out the storeSnapshot. -type syncedStore struct { - storage.Store - - mu sync.RWMutex -} - -func (s *syncedStore) GetFlag(ctx context.Context, namespaceKey string, key string) (*flipt.Flag, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - if namespaceKey == "" { - namespaceKey = flipt.DefaultNamespace - } - - return s.Store.GetFlag(ctx, namespaceKey, key) -} - -func (s *syncedStore) ListFlags(ctx context.Context, namespaceKey string, opts ...storage.QueryOption) (storage.ResultSet[*flipt.Flag], error) { - s.mu.RLock() - defer s.mu.RUnlock() - - if namespaceKey == "" { - namespaceKey = flipt.DefaultNamespace - } - - return s.Store.ListFlags(ctx, namespaceKey, opts...) -} - -func (s *syncedStore) CountFlags(ctx context.Context, namespaceKey string) (uint64, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - if namespaceKey == "" { - namespaceKey = flipt.DefaultNamespace - } - - return s.Store.CountFlags(ctx, namespaceKey) -} - -func (s *syncedStore) GetRule(ctx context.Context, namespaceKey string, id string) (*flipt.Rule, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - if namespaceKey == "" { - namespaceKey = flipt.DefaultNamespace - } - - return s.Store.GetRule(ctx, namespaceKey, id) -} - -func (s *syncedStore) ListRules(ctx context.Context, namespaceKey string, flagKey string, opts ...storage.QueryOption) (storage.ResultSet[*flipt.Rule], error) { - s.mu.RLock() - defer s.mu.RUnlock() - - if namespaceKey == "" { - namespaceKey = flipt.DefaultNamespace - } - - return s.Store.ListRules(ctx, namespaceKey, flagKey, opts...) -} - -func (s *syncedStore) CountRules(ctx context.Context, namespaceKey, flagKey string) (uint64, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - if namespaceKey == "" { - namespaceKey = flipt.DefaultNamespace - } - - return s.Store.CountRules(ctx, namespaceKey, flagKey) -} - -func (s *syncedStore) GetSegment(ctx context.Context, namespaceKey string, key string) (*flipt.Segment, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - if namespaceKey == "" { - namespaceKey = flipt.DefaultNamespace - } - - return s.Store.GetSegment(ctx, namespaceKey, key) -} - -func (s *syncedStore) ListSegments(ctx context.Context, namespaceKey string, opts ...storage.QueryOption) (storage.ResultSet[*flipt.Segment], error) { - s.mu.RLock() - defer s.mu.RUnlock() - - if namespaceKey == "" { - namespaceKey = flipt.DefaultNamespace - } - - return s.Store.ListSegments(ctx, namespaceKey, opts...) -} - -func (s *syncedStore) CountSegments(ctx context.Context, namespaceKey string) (uint64, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - if namespaceKey == "" { - namespaceKey = flipt.DefaultNamespace - } - - return s.Store.CountSegments(ctx, namespaceKey) -} - -func (s *syncedStore) GetEvaluationRules(ctx context.Context, namespaceKey string, flagKey string) ([]*storage.EvaluationRule, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - if namespaceKey == "" { - namespaceKey = flipt.DefaultNamespace - } - - return s.Store.GetEvaluationRules(ctx, namespaceKey, flagKey) -} - -func (s *syncedStore) GetEvaluationDistributions(ctx context.Context, ruleID string) ([]*storage.EvaluationDistribution, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - return s.Store.GetEvaluationDistributions(ctx, ruleID) -} - -func (s *syncedStore) GetEvaluationRollouts(ctx context.Context, namespaceKey, flagKey string) ([]*storage.EvaluationRollout, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - if namespaceKey == "" { - namespaceKey = flipt.DefaultNamespace - } - - return s.Store.GetEvaluationRollouts(ctx, namespaceKey, flagKey) -} - -func (s *syncedStore) GetNamespace(ctx context.Context, key string) (*flipt.Namespace, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - if key == "" { - key = flipt.DefaultNamespace - } - - return s.Store.GetNamespace(ctx, key) -} - -func (s *syncedStore) ListNamespaces(ctx context.Context, opts ...storage.QueryOption) (storage.ResultSet[*flipt.Namespace], error) { - s.mu.RLock() - defer s.mu.RUnlock() - - return s.Store.ListNamespaces(ctx, opts...) -} - -func (s *syncedStore) CountNamespaces(ctx context.Context) (uint64, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - return s.Store.CountNamespaces(ctx) -} - -func (s *syncedStore) GetRollout(ctx context.Context, namespaceKey, id string) (*flipt.Rollout, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - if namespaceKey == "" { - namespaceKey = flipt.DefaultNamespace - } - - return s.Store.GetRollout(ctx, namespaceKey, id) -} - -func (s *syncedStore) ListRollouts(ctx context.Context, namespaceKey, flagKey string, opts ...storage.QueryOption) (storage.ResultSet[*flipt.Rollout], error) { - s.mu.RLock() - defer s.mu.RUnlock() - - if namespaceKey == "" { - namespaceKey = flipt.DefaultNamespace - } - - return s.Store.ListRollouts(ctx, namespaceKey, flagKey, opts...) -} - -func (s *syncedStore) CountRollouts(ctx context.Context, namespaceKey, flagKey string) (uint64, error) { - s.mu.RLock() - defer s.mu.RUnlock() - - if namespaceKey == "" { - namespaceKey = flipt.DefaultNamespace - } - - return s.Store.CountRollouts(ctx, namespaceKey, flagKey) -} - -func (s *syncedStore) CreateRollout(ctx context.Context, r *flipt.CreateRolloutRequest) (*flipt.Rollout, error) { - return nil, ErrNotImplemented -} - -func (s *syncedStore) UpdateRollout(ctx context.Context, r *flipt.UpdateRolloutRequest) (*flipt.Rollout, error) { - return nil, ErrNotImplemented -} - -func (s *syncedStore) DeleteRollout(ctx context.Context, r *flipt.DeleteRolloutRequest) error { - return ErrNotImplemented -} - -func (s *syncedStore) OrderRollouts(ctx context.Context, r *flipt.OrderRolloutsRequest) error { - return ErrNotImplemented -} diff --git a/internal/storage/fs/sync_test.go b/internal/storage/fs/sync_test.go deleted file mode 100644 index 3ee3617975..0000000000 --- a/internal/storage/fs/sync_test.go +++ /dev/null @@ -1,228 +0,0 @@ -package fs - -import ( - "context" - "testing" - - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" - "go.flipt.io/flipt/internal/common" - "go.flipt.io/flipt/internal/storage" - "go.flipt.io/flipt/rpc/flipt" -) - -func TestGetFlag(t *testing.T) { - storeMock := &common.StoreMock{} - ss := &syncedStore{ - Store: storeMock, - } - - storeMock.On("GetFlag", mock.Anything, flipt.DefaultNamespace, "foo").Return(&flipt.Flag{}, nil) - - _, err := ss.GetFlag(context.TODO(), "", "foo") - require.NoError(t, err) -} - -func TestListFlags(t *testing.T) { - storeMock := &common.StoreMock{} - ss := &syncedStore{ - Store: storeMock, - } - - storeMock.On("ListFlags", mock.Anything, flipt.DefaultNamespace, mock.Anything).Return(storage.ResultSet[*flipt.Flag]{}, nil) - - _, err := ss.ListFlags(context.TODO(), "") - require.NoError(t, err) -} - -func TestCountFlags(t *testing.T) { - storeMock := &common.StoreMock{} - ss := &syncedStore{ - Store: storeMock, - } - - storeMock.On("CountFlags", mock.Anything, flipt.DefaultNamespace).Return(uint64(0), nil) - - _, err := ss.CountFlags(context.TODO(), "") - require.NoError(t, err) -} - -func TestGetRule(t *testing.T) { - storeMock := &common.StoreMock{} - ss := &syncedStore{ - Store: storeMock, - } - - storeMock.On("GetRule", mock.Anything, flipt.DefaultNamespace, "").Return(&flipt.Rule{}, nil) - - _, err := ss.GetRule(context.TODO(), "", "") - require.NoError(t, err) -} - -func TestListRules(t *testing.T) { - storeMock := &common.StoreMock{} - ss := &syncedStore{ - Store: storeMock, - } - - storeMock.On("ListRules", mock.Anything, flipt.DefaultNamespace, "", mock.Anything).Return(storage.ResultSet[*flipt.Rule]{}, nil) - - _, err := ss.ListRules(context.TODO(), "", "") - require.NoError(t, err) -} - -func TestCountRules(t *testing.T) { - storeMock := &common.StoreMock{} - ss := &syncedStore{ - Store: storeMock, - } - - storeMock.On("CountRules", mock.Anything, flipt.DefaultNamespace, "").Return(uint64(0), nil) - - _, err := ss.CountRules(context.TODO(), "", "") - require.NoError(t, err) -} - -func TestGetSegment(t *testing.T) { - storeMock := &common.StoreMock{} - ss := &syncedStore{ - Store: storeMock, - } - - storeMock.On("GetSegment", mock.Anything, flipt.DefaultNamespace, "").Return(&flipt.Segment{}, nil) - - _, err := ss.GetSegment(context.TODO(), "", "") - require.NoError(t, err) -} - -func TestListSegments(t *testing.T) { - storeMock := &common.StoreMock{} - ss := &syncedStore{ - Store: storeMock, - } - - storeMock.On("ListSegments", mock.Anything, flipt.DefaultNamespace, mock.Anything).Return(storage.ResultSet[*flipt.Segment]{}, nil) - - _, err := ss.ListSegments(context.TODO(), "") - require.NoError(t, err) -} - -func TestCountSegments(t *testing.T) { - storeMock := &common.StoreMock{} - ss := &syncedStore{ - Store: storeMock, - } - - storeMock.On("CountSegments", mock.Anything, flipt.DefaultNamespace).Return(uint64(0), nil) - - _, err := ss.CountSegments(context.TODO(), "") - require.NoError(t, err) -} - -func TestGetRollout(t *testing.T) { - storeMock := &common.StoreMock{} - ss := &syncedStore{ - Store: storeMock, - } - - storeMock.On("GetRollout", mock.Anything, flipt.DefaultNamespace, "").Return(&flipt.Rollout{}, nil) - - _, err := ss.GetRollout(context.TODO(), "", "") - require.NoError(t, err) -} - -func TestListRollouts(t *testing.T) { - storeMock := &common.StoreMock{} - ss := &syncedStore{ - Store: storeMock, - } - - storeMock.On("ListRollouts", mock.Anything, flipt.DefaultNamespace, "", mock.Anything).Return(storage.ResultSet[*flipt.Rollout]{}, nil) - - _, err := ss.ListRollouts(context.TODO(), "", "") - require.NoError(t, err) -} - -func TestCountRollouts(t *testing.T) { - storeMock := &common.StoreMock{} - ss := &syncedStore{ - Store: storeMock, - } - - storeMock.On("CountRollouts", mock.Anything, flipt.DefaultNamespace, "").Return(uint64(0), nil) - - _, err := ss.CountRollouts(context.TODO(), "", "") - require.NoError(t, err) -} - -func TestGetNamespace(t *testing.T) { - storeMock := &common.StoreMock{} - ss := &syncedStore{ - Store: storeMock, - } - - storeMock.On("GetNamespace", mock.Anything, flipt.DefaultNamespace).Return(&flipt.Namespace{}, nil) - - _, err := ss.GetNamespace(context.TODO(), "") - require.NoError(t, err) -} - -func TestListNamespaces(t *testing.T) { - storeMock := &common.StoreMock{} - ss := &syncedStore{ - Store: storeMock, - } - - storeMock.On("ListNamespaces", mock.Anything, mock.Anything).Return(storage.ResultSet[*flipt.Namespace]{}, nil) - - _, err := ss.ListNamespaces(context.TODO()) - require.NoError(t, err) -} - -func TestCountNamespaces(t *testing.T) { - storeMock := &common.StoreMock{} - ss := &syncedStore{ - Store: storeMock, - } - - storeMock.On("CountNamespaces", mock.Anything).Return(uint64(0), nil) - - _, err := ss.CountNamespaces(context.TODO()) - require.NoError(t, err) -} - -func TestGetEvaluationRules(t *testing.T) { - storeMock := &common.StoreMock{} - ss := &syncedStore{ - Store: storeMock, - } - - storeMock.On("GetEvaluationRules", mock.Anything, flipt.DefaultNamespace, "").Return([]*storage.EvaluationRule{}, nil) - - _, err := ss.GetEvaluationRules(context.TODO(), "", "") - require.NoError(t, err) -} - -func TestGetEvaluationDistributions(t *testing.T) { - storeMock := &common.StoreMock{} - ss := &syncedStore{ - Store: storeMock, - } - - storeMock.On("GetEvaluationDistributions", mock.Anything, "").Return([]*storage.EvaluationDistribution{}, nil) - - _, err := ss.GetEvaluationDistributions(context.TODO(), "") - require.NoError(t, err) -} - -func TestGetEvaluationRollouts(t *testing.T) { - storeMock := &common.StoreMock{} - ss := &syncedStore{ - Store: storeMock, - } - - storeMock.On("GetEvaluationRollouts", mock.Anything, flipt.DefaultNamespace, "").Return([]*storage.EvaluationRollout{}, nil) - - _, err := ss.GetEvaluationRollouts(context.TODO(), "", "") - require.NoError(t, err) -} diff --git a/internal/storage/storage.go b/internal/storage/storage.go index 0f7b5441bf..e18d0c7aac 100644 --- a/internal/storage/storage.go +++ b/internal/storage/storage.go @@ -151,6 +151,19 @@ func WithOrder(order Order) QueryOption { } } +// ReadOnlyStore is a storage implementation which only supports +// reading the various types of state configuring within Flipt +type ReadOnlyStore interface { + ReadOnlyNamespaceStore + ReadOnlyFlagStore + ReadOnlySegmentStore + ReadOnlyRuleStore + ReadOnlyRolloutStore + EvaluationStore + fmt.Stringer +} + +// Store supports reading and writing all the resources within Flipt type Store interface { NamespaceStore FlagStore @@ -177,21 +190,31 @@ type EvaluationStore interface { GetEvaluationRollouts(ctx context.Context, namespaceKey, flagKey string) ([]*EvaluationRollout, error) } -// NamespaceStore stores and retrieves namespaces -type NamespaceStore interface { +// ReadOnlyNamespaceStore support retrieval of namespaces only +type ReadOnlyNamespaceStore interface { GetNamespace(ctx context.Context, key string) (*flipt.Namespace, error) ListNamespaces(ctx context.Context, opts ...QueryOption) (ResultSet[*flipt.Namespace], error) CountNamespaces(ctx context.Context) (uint64, error) +} + +// NamespaceStore stores and retrieves namespaces +type NamespaceStore interface { + ReadOnlyNamespaceStore CreateNamespace(ctx context.Context, r *flipt.CreateNamespaceRequest) (*flipt.Namespace, error) UpdateNamespace(ctx context.Context, r *flipt.UpdateNamespaceRequest) (*flipt.Namespace, error) DeleteNamespace(ctx context.Context, r *flipt.DeleteNamespaceRequest) error } -// FlagStore stores and retrieves flags and variants -type FlagStore interface { +// ReadOnlyFlagStore supports retrieval of flags +type ReadOnlyFlagStore interface { GetFlag(ctx context.Context, namespaceKey, key string) (*flipt.Flag, error) ListFlags(ctx context.Context, namespaceKey string, opts ...QueryOption) (ResultSet[*flipt.Flag], error) CountFlags(ctx context.Context, namespaceKey string) (uint64, error) +} + +// FlagStore stores and retrieves flags and variants +type FlagStore interface { + ReadOnlyFlagStore CreateFlag(ctx context.Context, r *flipt.CreateFlagRequest) (*flipt.Flag, error) UpdateFlag(ctx context.Context, r *flipt.UpdateFlagRequest) (*flipt.Flag, error) DeleteFlag(ctx context.Context, r *flipt.DeleteFlagRequest) error @@ -200,11 +223,16 @@ type FlagStore interface { DeleteVariant(ctx context.Context, r *flipt.DeleteVariantRequest) error } -// SegmentStore stores and retrieves segments and constraints -type SegmentStore interface { +// ReadOnlySegmentStore supports retrieval of segments and constraints +type ReadOnlySegmentStore interface { GetSegment(ctx context.Context, namespaceKey, key string) (*flipt.Segment, error) ListSegments(ctx context.Context, namespaceKey string, opts ...QueryOption) (ResultSet[*flipt.Segment], error) CountSegments(ctx context.Context, namespaceKey string) (uint64, error) +} + +// SegmentStore stores and retrieves segments and constraints +type SegmentStore interface { + ReadOnlySegmentStore CreateSegment(ctx context.Context, r *flipt.CreateSegmentRequest) (*flipt.Segment, error) UpdateSegment(ctx context.Context, r *flipt.UpdateSegmentRequest) (*flipt.Segment, error) DeleteSegment(ctx context.Context, r *flipt.DeleteSegmentRequest) error @@ -213,11 +241,16 @@ type SegmentStore interface { DeleteConstraint(ctx context.Context, r *flipt.DeleteConstraintRequest) error } -// RuleStore stores and retrieves rules and distributions -type RuleStore interface { +// ReadOnlyRuleStore supports retrieval of rules and distributions +type ReadOnlyRuleStore interface { GetRule(ctx context.Context, namespaceKey, id string) (*flipt.Rule, error) ListRules(ctx context.Context, namespaceKey, flagKey string, opts ...QueryOption) (ResultSet[*flipt.Rule], error) CountRules(ctx context.Context, namespaceKey, flagKey string) (uint64, error) +} + +// RuleStore stores and retrieves rules and distributions +type RuleStore interface { + ReadOnlyRuleStore CreateRule(ctx context.Context, r *flipt.CreateRuleRequest) (*flipt.Rule, error) UpdateRule(ctx context.Context, r *flipt.UpdateRuleRequest) (*flipt.Rule, error) DeleteRule(ctx context.Context, r *flipt.DeleteRuleRequest) error @@ -227,10 +260,16 @@ type RuleStore interface { DeleteDistribution(ctx context.Context, r *flipt.DeleteDistributionRequest) error } -type RolloutStore interface { +// ReadOnlyRolloutStore supports retrieval of rollouts +type ReadOnlyRolloutStore interface { GetRollout(ctx context.Context, namespaceKey, id string) (*flipt.Rollout, error) ListRollouts(ctx context.Context, namespaceKey, flagKey string, opts ...QueryOption) (ResultSet[*flipt.Rollout], error) CountRollouts(ctx context.Context, namespaceKey, flagKey string) (uint64, error) +} + +// RolloutStore supports storing and retrieving rollouts +type RolloutStore interface { + ReadOnlyRolloutStore CreateRollout(ctx context.Context, r *flipt.CreateRolloutRequest) (*flipt.Rollout, error) UpdateRollout(ctx context.Context, r *flipt.UpdateRolloutRequest) (*flipt.Rollout, error) DeleteRollout(ctx context.Context, r *flipt.DeleteRolloutRequest) error From 0ce99f1ed2b3f34d48208c54f637ce6622a9aa2e Mon Sep 17 00:00:00 2001 From: George MacRorie Date: Fri, 15 Dec 2023 11:27:53 +0000 Subject: [PATCH 2/3] chore(storage/fs): thread context on update --- internal/storage/fs/oci/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/storage/fs/oci/store.go b/internal/storage/fs/oci/store.go index 39d17a58a8..e99cce0930 100644 --- a/internal/storage/fs/oci/store.go +++ b/internal/storage/fs/oci/store.go @@ -73,7 +73,7 @@ func (s *SnapshotStore) String() string { // If the state has not change sinced the last observed image digest it skips // updating the snapshot and returns false (not modified). func (s *SnapshotStore) update(ctx context.Context) (bool, error) { - resp, err := s.store.Fetch(context.Background(), s.ref, oci.IfNoMatch(s.lastDigest)) + resp, err := s.store.Fetch(ctx, s.ref, oci.IfNoMatch(s.lastDigest)) if err != nil { return false, err } From c6e880f2dd7362be347dbe19c0c1fb092c06f298 Mon Sep 17 00:00:00 2001 From: George MacRorie Date: Fri, 15 Dec 2023 13:05:42 +0000 Subject: [PATCH 3/3] fix(storage/fs/git): call get instead of update when initializing --- internal/storage/fs/git/store.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/storage/fs/git/store.go b/internal/storage/fs/git/store.go index 9b823ddddb..cb312cdbd8 100644 --- a/internal/storage/fs/git/store.go +++ b/internal/storage/fs/git/store.go @@ -119,7 +119,7 @@ func NewSnapshotStore(ctx context.Context, logger *zap.Logger, url string, opts // fetch snapshot at-least once before returning store // to ensure we have some state to serve - if _, err := store.update(ctx); err != nil { + if err := store.get(ctx); err != nil { return nil, err }