Skip to content

Commit

Permalink
refactor(storage/fs): change fs.FSSource into fs.SnapshotSource (#2325)
Browse files Browse the repository at this point in the history
* refactor(storage/fs): change fs.FSSource into fs.SnapshotSource

* refactor(storage/fs): export new SnapshotFromFiles function

* chore(storage/fs): remove unecessary error return argument

* fix(test/fs): use valid configuration file during subscribe test
  • Loading branch information
GeorgeMac authored Nov 1, 2023
1 parent 85e5f3a commit 3504651
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 237 deletions.
2 changes: 1 addition & 1 deletion go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ cloud.google.com/go/compute v1.18.0/go.mod h1:1X7yHxec2Ga+Ss6jPyjxRxpu2uu7PLgsOV
cloud.google.com/go/compute v1.19.3/go.mod h1:qxvISKp/gYnXkSAD1ppcSOveRAmzxicEv/JlizULFrI=
cloud.google.com/go/compute v1.20.1/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM=
cloud.google.com/go/compute v1.21.0/go.mod h1:4tCnrn48xsqlwSAiLf1HXMQk8CONslYbdiEZc9FEIbM=
cloud.google.com/go/compute v1.23.2/go.mod h1:JJ0atRC0J/oWYiiVBmsSsrRnh92DhZPG4hFDcR04Rns=
cloud.google.com/go/contactcenterinsights v1.11.0/go.mod h1:hutBdImE4XNZ1NV4vbPJKSFOnQruhC5Lj9bZqWMTKiU=
cloud.google.com/go/contactcenterinsights v1.11.2/go.mod h1:A9PIR5ov5cRcd28KlDbmmXE8Aay+Gccer2h4wzkYFso=
cloud.google.com/go/container v1.26.0/go.mod h1:YJCmRet6+6jnYYRS000T6k0D0xUXQgBSaJ7VwI8FBj4=
Expand Down Expand Up @@ -331,6 +330,7 @@ github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY=
github.com/aws/aws-sdk-go v1.15.11/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
github.com/aws/aws-sdk-go v1.34.0 h1:brux2dRrlwCF5JhTL7MUT3WUwo9zfDHZZp3+g3Mvlmo=
github.com/aws/aws-sdk-go v1.34.0/go.mod h1:5zCpMtNQVjRREroY7sYe8lOMRSxkhG6MZveU8YkpAk0=
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.33/go.mod h1:84XgODVR8uRhmOnUkKGUZKqIMxmjmLOR8Uyp7G/TPwc=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
Expand Down
23 changes: 15 additions & 8 deletions internal/storage/fs/git/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/go-git/go-git/v5/storage/memory"
"go.flipt.io/flipt/internal/containers"
"go.flipt.io/flipt/internal/gitfs"
storagefs "go.flipt.io/flipt/internal/storage/fs"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -92,19 +93,25 @@ func NewSource(logger *zap.Logger, url string, opts ...containers.Option[Source]
return source, nil
}

// Get builds a new fs.FS based on the configure Git remote and reference.
func (s *Source) Get() (fs.FS, error) {
// Get builds a new store snapshot based on the configure Git remote and reference.
func (s *Source) Get() (_ *storagefs.StoreSnapshot, err error) {
var fs fs.FS
if s.hash != plumbing.ZeroHash {
return gitfs.NewFromRepoHash(s.logger, s.repo, s.hash)
fs, err = gitfs.NewFromRepoHash(s.logger, s.repo, s.hash)
} else {
fs, err = gitfs.NewFromRepo(s.logger, s.repo, gitfs.WithReference(plumbing.NewRemoteReferenceName("origin", s.ref)))
}
if err != nil {
return nil, err
}

return gitfs.NewFromRepo(s.logger, s.repo, gitfs.WithReference(plumbing.NewRemoteReferenceName("origin", s.ref)))
return storagefs.SnapshotFromFS(s.logger, fs)
}

// Subscribe feeds gitfs implementations of fs.FS onto the provided channel.
// It blocks until the provided context is cancelled (it will be called in a goroutine).
// It closes the provided channel before it returns.
func (s *Source) Subscribe(ctx context.Context, ch chan<- fs.FS) {
func (s *Source) Subscribe(ctx context.Context, ch chan<- *storagefs.StoreSnapshot) {
defer close(ch)

// NOTE: theres is no point subscribing to updates for a git Hash
Expand Down Expand Up @@ -140,13 +147,13 @@ func (s *Source) Subscribe(ctx context.Context, ch chan<- fs.FS) {
continue
}

fs, err := s.Get()
snap, err := s.Get()
if err != nil {
s.logger.Error("failed creating gitfs", zap.Error(err))
s.logger.Error("failed creating snapshot from fs", zap.Error(err))
continue
}

ch <- fs
ch <- snap

s.logger.Debug("finished fetching from remote")
}
Expand Down
36 changes: 16 additions & 20 deletions internal/storage/fs/git/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package git

import (
"context"
"io"
"io/fs"
"os"
"testing"
"time"
Expand All @@ -17,6 +15,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.flipt.io/flipt/internal/containers"
storagefs "go.flipt.io/flipt/internal/storage/fs"
"go.uber.org/zap/zaptest"
)

Expand All @@ -32,16 +31,11 @@ func Test_SourceGet(t *testing.T) {
return
}

fs, err := source.Get()
snap, err := source.Get()
require.NoError(t, err)

fi, err := fs.Open("features.yml")
_, err = snap.GetNamespace(context.TODO(), "production")
require.NoError(t, err)

data, err := io.ReadAll(fi)
require.NoError(t, err)

assert.Equal(t, []byte("namespace: production\n"), data)
}

func Test_SourceSubscribe_Hash(t *testing.T) {
Expand All @@ -56,7 +50,7 @@ func Test_SourceSubscribe_Hash(t *testing.T) {
return
}

ch := make(chan fs.FS)
ch := make(chan *storagefs.StoreSnapshot)
source.Subscribe(context.Background(), ch)

_, closed := <-ch
Expand All @@ -76,7 +70,7 @@ func Test_SourceSubscribe(t *testing.T) {
require.NoError(t, err)

// start subscription
ch := make(chan fs.FS)
ch := make(chan *storagefs.StoreSnapshot)
go source.Subscribe(ctx, ch)

// pull repo
Expand All @@ -102,7 +96,8 @@ func Test_SourceSubscribe(t *testing.T) {

updated := []byte(`namespace: production
flags:
- key: foo`)
- key: foo
name: Foo`)

_, err = fi.Write(updated)
require.NoError(t, err)
Expand All @@ -122,18 +117,19 @@ flags:
}))

// assert matching state
fs := <-ch
require.NoError(t, err)

t.Log("received new FS")
var snap *storagefs.StoreSnapshot
select {
case snap = <-ch:
case <-time.After(time.Minute):
t.Fatal("timed out waiting for snapshot")
}

found, err := fs.Open("features.yml")
require.NoError(t, err)

data, err := io.ReadAll(found)
require.NoError(t, err)
t.Log("received new snapshot")

assert.Equal(t, string(updated), string(data))
_, err = snap.GetFlag(ctx, "production", "foo")
require.NoError(t, err)

// ensure closed
cancel()
Expand Down
13 changes: 7 additions & 6 deletions internal/storage/fs/local/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package local

import (
"context"
"io/fs"
"os"
"time"

"go.flipt.io/flipt/internal/containers"
storagefs "go.flipt.io/flipt/internal/storage/fs"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -41,13 +41,13 @@ func WithPollInterval(tick time.Duration) containers.Option[Source] {
}

// Get returns an fs.FS for the local filesystem.
func (s *Source) Get() (fs.FS, error) {
return os.DirFS(s.dir), nil
func (s *Source) Get() (*storagefs.StoreSnapshot, error) {
return storagefs.SnapshotFromFS(s.logger, os.DirFS(s.dir))
}

// Subscribe feeds local fs.FS implementations onto the provided channel.
// It blocks until the provided context is cancelled.
func (s *Source) Subscribe(ctx context.Context, ch chan<- fs.FS) {
func (s *Source) Subscribe(ctx context.Context, ch chan<- *storagefs.StoreSnapshot) {
defer close(ch)

ticker := time.NewTicker(s.interval)
Expand All @@ -56,14 +56,15 @@ func (s *Source) Subscribe(ctx context.Context, ch chan<- fs.FS) {
case <-ctx.Done():
return
case <-ticker.C:
fs, err := s.Get()
snap, err := s.Get()
if err != nil {
s.logger.Error("error getting file system from directory", zap.Error(err))
continue
}

s.logger.Debug("updating local store snapshot")
ch <- fs

ch <- snap
}
}
}
Expand Down
27 changes: 12 additions & 15 deletions internal/storage/fs/local/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package local

import (
"context"
"io/fs"
"os"
"path/filepath"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
storagefs "go.flipt.io/flipt/internal/storage/fs"
"go.uber.org/zap"
)

Expand All @@ -20,13 +21,11 @@ func Test_SourceGet(t *testing.T) {
s, err := NewSource(zap.NewNop(), "testdata", WithPollInterval(5*time.Second))
assert.NoError(t, err)

tfs, err := s.Get()
snap, err := s.Get()
assert.NoError(t, err)

file, err := tfs.Open("features.yml")
assert.NoError(t, err)

assert.NotNil(t, file)
_, err = snap.GetNamespace(context.TODO(), "production")
require.NoError(t, err)
}

func Test_SourceSubscribe(t *testing.T) {
Expand All @@ -48,21 +47,19 @@ func Test_SourceSubscribe(t *testing.T) {

ctx, cancel := context.WithCancel(context.Background())

fsCh := make(chan fs.FS)
go s.Subscribe(ctx, fsCh)
ch := make(chan *storagefs.StoreSnapshot)
go s.Subscribe(ctx, ch)

// Create event
_, err = os.Create(ftc)
assert.NoError(t, err)
// change the filesystem contents
assert.NoError(t, os.WriteFile(ftc, []byte(`{"namespace":"staging"}`), os.ModePerm))

select {
case f := <-fsCh:
file, err := f.Open("a.features.yml")
case snap := <-ch:
_, err := snap.GetNamespace(ctx, "staging")
assert.NoError(t, err)
assert.NotNil(t, file)
cancel()

_, open := <-fsCh
_, open := <-ch
assert.False(t, open, "expected channel to be closed after cancel")
case <-time.After(10 * time.Second):
t.Fatal("event not caught")
Expand Down
24 changes: 15 additions & 9 deletions internal/storage/fs/s3/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package s3
import (
"context"
"fmt"
"io/fs"
"time"

"github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -12,10 +11,11 @@ import (

"go.flipt.io/flipt/internal/containers"
"go.flipt.io/flipt/internal/s3fs"
storagefs "go.flipt.io/flipt/internal/storage/fs"
"go.uber.org/zap"
)

// Source represents an implementation of an fs.FSSource
// Source represents an implementation of an fs.SnapshotSource
// This implementation is backed by an S3 bucket
type Source struct {
logger *zap.Logger
Expand Down Expand Up @@ -95,14 +95,19 @@ func WithPollInterval(tick time.Duration) containers.Option[Source] {
}
}

// Get returns an fs.FS for the local filesystem.
func (s *Source) Get() (fs.FS, error) {
return s3fs.New(s.logger, s.s3, s.bucket, s.prefix)
// Get returns a *sourcefs.StoreSnapshot for the local filesystem.
func (s *Source) Get() (*storagefs.StoreSnapshot, error) {
fs, err := s3fs.New(s.logger, s.s3, s.bucket, s.prefix)
if err != nil {
return nil, err
}

return storagefs.SnapshotFromFS(s.logger, fs)
}

// Subscribe feeds local fs.FS implementations onto the provided channel.
// Subscribe feeds S3 populated *StoreSnapshot instances onto the provided channel.
// It blocks until the provided context is cancelled.
func (s *Source) Subscribe(ctx context.Context, ch chan<- fs.FS) {
func (s *Source) Subscribe(ctx context.Context, ch chan<- *storagefs.StoreSnapshot) {
defer close(ch)

ticker := time.NewTicker(s.interval)
Expand All @@ -111,14 +116,15 @@ func (s *Source) Subscribe(ctx context.Context, ch chan<- fs.FS) {
case <-ctx.Done():
return
case <-ticker.C:
fs, err := s.Get()
snap, err := s.Get()
if err != nil {
s.logger.Error("error getting file system from directory", zap.Error(err))
continue
}

s.logger.Debug("updating local store snapshot")
ch <- fs

ch <- snap
}
}
}
Expand Down
Loading

0 comments on commit 3504651

Please sign in to comment.