From e906757a4bac6a556ed1e2a0cd7428d708d5caed Mon Sep 17 00:00:00 2001 From: James Milligan <75740990+james-milligan@users.noreply.github.com> Date: Fri, 10 Feb 2023 16:17:23 +0000 Subject: [PATCH] refactor: implement delete event type in filepath sync, expand tests (#369) Signed-off-by: James Milligan Signed-off-by: James Milligan <75740990+james-milligan@users.noreply.github.com> Co-authored-by: Skye Gill Co-authored-by: Michael Beemer --- pkg/store/flags.go | 20 +++- pkg/store/flags_test.go | 22 ++++ pkg/sync/file/filepath_sync.go | 63 ++++++----- pkg/sync/file/filepath_sync_test.go | 161 +++++++++++++++++++++++----- pkg/sync/isync.go | 15 +++ 5 files changed, 229 insertions(+), 52 deletions(-) diff --git a/pkg/store/flags.go b/pkg/store/flags.go index 3627c6f36..8d2eea8be 100644 --- a/pkg/store/flags.go +++ b/pkg/store/flags.go @@ -130,10 +130,26 @@ func (f *Flags) Update(logger *logger.Logger, source string, flags map[string]mo // DeleteFlags matching flags from source. func (f *Flags) DeleteFlags(logger *logger.Logger, source string, flags map[string]model.Flag) map[string]interface{} { notifications := map[string]interface{}{} + if len(flags) == 0 { + allFlags := f.GetAll() + for key, flag := range allFlags { + if flag.Source != source { + continue + } + notifications[key] = map[string]interface{}{ + "type": string(model.NotificationDelete), + "source": source, + } + f.Delete(key) + } + } for k := range flags { - _, ok := f.Get(k) + flag, ok := f.Get(k) if ok { + if flag.Source != source { + continue + } notifications[k] = map[string]interface{}{ "type": string(model.NotificationDelete), "source": source, @@ -142,7 +158,7 @@ func (f *Flags) DeleteFlags(logger *logger.Logger, source string, flags map[stri f.Delete(k) } else { logger.Warn( - fmt.Sprintf("failed to remove flag, flag with key %s from source %s does not exisit.", + fmt.Sprintf("failed to remove flag, flag with key %s from source %s does not exist.", k, source)) } diff --git a/pkg/store/flags_test.go b/pkg/store/flags_test.go index a3c0741c9..e3ba77828 100644 --- a/pkg/store/flags_test.go +++ b/pkg/store/flags_test.go @@ -344,6 +344,7 @@ func TestFlags_Update(t *testing.T) { func TestFlags_Delete(t *testing.T) { mockLogger := logger.NewLogger(nil, false) mockSource := "source" + mockSource2 := "source2" tests := []struct { name string @@ -358,6 +359,7 @@ func TestFlags_Delete(t *testing.T) { Flags: map[string]model.Flag{ "A": {Source: mockSource}, "B": {Source: mockSource}, + "C": {Source: mockSource2}, }, }, deleteRequest: map[string]model.Flag{ @@ -366,6 +368,7 @@ func TestFlags_Delete(t *testing.T) { expectedState: &Flags{ Flags: map[string]model.Flag{ "B": {Source: mockSource}, + "C": {Source: mockSource2}, }, }, expectedNotificationKeys: []string{"A"}, @@ -376,6 +379,7 @@ func TestFlags_Delete(t *testing.T) { Flags: map[string]model.Flag{ "A": {Source: mockSource}, "B": {Source: mockSource}, + "C": {Source: mockSource2}, }, }, deleteRequest: map[string]model.Flag{ @@ -385,10 +389,28 @@ func TestFlags_Delete(t *testing.T) { Flags: map[string]model.Flag{ "A": {Source: mockSource}, "B": {Source: mockSource}, + "C": {Source: mockSource2}, }, }, expectedNotificationKeys: []string{}, }, + { + name: "Remove all", + storedState: &Flags{ + Flags: map[string]model.Flag{ + "A": {Source: mockSource}, + "B": {Source: mockSource}, + "C": {Source: mockSource2}, + }, + }, + deleteRequest: map[string]model.Flag{}, + expectedState: &Flags{ + Flags: map[string]model.Flag{ + "C": {Source: mockSource2}, + }, + }, + expectedNotificationKeys: []string{"A", "B"}, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/sync/file/filepath_sync.go b/pkg/sync/file/filepath_sync.go index 6fbe31a7d..b7227991a 100644 --- a/pkg/sync/file/filepath_sync.go +++ b/pkg/sync/file/filepath_sync.go @@ -24,6 +24,9 @@ type Sync struct { fileType string } +// default state is used to prevent EOF errors when handling filepath delete events + empty files +const defaultState = "{}" + //nolint:funlen func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { fs.Logger.Info("Starting filepath sync notifier") @@ -38,13 +41,7 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { return err } - // file watcher is ready(and stable), fetch and emit the initial results - fetch, err := fs.fetch(ctx) - if err != nil { - return err - } - - dataSync <- sync.DataSync{FlagData: fetch, Source: fs.URI, Type: sync.ALL} + fs.sendDataSync(ctx, sync.ALL, dataSync) fs.Logger.Info(fmt.Sprintf("watching filepath: %s", fs.URI)) for { @@ -56,24 +53,33 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { } fs.Logger.Info(fmt.Sprintf("filepath event: %s %s", event.Name, event.Op.String())) - - // event.Op is a bitmask and some systems may send multiple operations at once - // event.Has(...) checks that the bitmask contains the particular event (among others) - if event.Has(fsnotify.Create) || event.Has(fsnotify.Write) { - fs.sendDataSync(ctx, event, dataSync) - } else if event.Has(fsnotify.Remove) { - // Counterintuively, remove events are the only meanful ones seen in K8s. - // K8s handles mounted ConfigMap updates by modifying symbolic links, which is an atomic operation. - // At the point the remove event is fired, we have our new data, so we can send it down the channel. - fs.sendDataSync(ctx, event, dataSync) - + switch { + case event.Has(fsnotify.Create) || event.Has(fsnotify.Write): + fs.sendDataSync(ctx, sync.ALL, dataSync) + case event.Has(fsnotify.Remove): // K8s exposes config maps as symlinks. // Updates cause a remove event, we need to re-add the watcher in this case. err = watcher.Add(fs.URI) if err != nil { + // the watcher could not be re-added, so the file must have been deleted fs.Logger.Error(fmt.Sprintf("error restoring watcher, file may have been deleted: %s", err.Error())) + fs.sendDataSync(ctx, sync.DELETE, dataSync) + continue + } + + // Counterintuitively, remove events are the only meaningful ones seen in K8s. + // K8s handles mounted ConfigMap updates by modifying symbolic links, which is an atomic operation. + // At the point the remove event is fired, we have our new data, so we can send it down the channel. + fs.sendDataSync(ctx, sync.ALL, dataSync) + case event.Has(fsnotify.Chmod): + // on linux the REMOVE event will not fire until all file descriptors are closed, this cannot happen + // while the file is being watched, os.Stat is used here to infer deletion + if _, err := os.Stat(fs.URI); errors.Is(err, os.ErrNotExist) { + fs.Logger.Error(fmt.Sprintf("file has been deleted: %s", err.Error())) + fs.sendDataSync(ctx, sync.DELETE, dataSync) } } + case err, ok := <-watcher.Errors: if !ok { return errors.New("watcher error") @@ -87,14 +93,23 @@ func (fs *Sync) Sync(ctx context.Context, dataSync chan<- sync.DataSync) error { } } -func (fs *Sync) sendDataSync(ctx context.Context, eventType fsnotify.Event, dataSync chan<- sync.DataSync) { - fs.Logger.Debug(fmt.Sprintf("Configuration %s: %s", fs.URI, eventType.Op.String())) - msg, err := fs.fetch(ctx) - if err != nil { - fs.Logger.Error(fmt.Sprintf("Error fetching after %s notification: %s", eventType.Op.String(), err.Error())) +func (fs *Sync) sendDataSync(ctx context.Context, syncType sync.Type, dataSync chan<- sync.DataSync) { + fs.Logger.Debug(fmt.Sprintf("Configuration %s: %s", fs.URI, syncType.String())) + + msg := defaultState + if syncType != sync.DELETE { + m, err := fs.fetch(ctx) + if err != nil { + fs.Logger.Error(fmt.Sprintf("Error fetching %s: %s", fs.URI, err.Error())) + } + if m == "" { + fs.Logger.Warn(fmt.Sprintf("file %s is empty", fs.URI)) + } else { + msg = m + } } - dataSync <- sync.DataSync{FlagData: msg, Source: fs.URI, Type: sync.ALL} + dataSync <- sync.DataSync{FlagData: msg, Source: fs.URI, Type: syncType} } func (fs *Sync) fetch(_ context.Context) (string, error) { diff --git a/pkg/sync/file/filepath_sync_test.go b/pkg/sync/file/filepath_sync_test.go index 5a4f1007c..950da625b 100644 --- a/pkg/sync/file/filepath_sync_test.go +++ b/pkg/sync/file/filepath_sync_test.go @@ -6,6 +6,7 @@ import ( "log" "os" "testing" + "time" "github.com/open-feature/flagd/pkg/sync" @@ -13,35 +14,131 @@ import ( ) const ( - dirName = "test" + fetchDirName = "test" fetchFileName = "to_fetch.json" fetchFileContents = "fetch me" ) func TestSimpleSync(t *testing.T) { + tests := map[string]struct { + manipulationFuncs []func(t *testing.T) + expectedDataSync []sync.DataSync + }{ + "simple-read": { + manipulationFuncs: []func(t *testing.T){ + func(t *testing.T) { + writeToFile(t, fetchFileContents) + }, + }, + expectedDataSync: []sync.DataSync{ + { + FlagData: fetchFileContents, + Source: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), + Type: sync.ALL, + }, + }, + }, + "update-event": { + manipulationFuncs: []func(t *testing.T){ + func(t *testing.T) { + writeToFile(t, fetchFileContents) + }, + func(t *testing.T) { + writeToFile(t, "new content") + }, + }, + expectedDataSync: []sync.DataSync{ + { + FlagData: fetchFileContents, + Source: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), + Type: sync.ALL, + }, + { + FlagData: "new content", + Source: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), + Type: sync.ALL, + }, + }, + }, + "delete-event": { + manipulationFuncs: []func(t *testing.T){ + func(t *testing.T) { + writeToFile(t, fetchFileContents) + }, + func(t *testing.T) { + deleteFile(t, fetchDirName, fetchFileName) + }, + }, + expectedDataSync: []sync.DataSync{ + { + FlagData: fetchFileContents, + Source: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), + Type: sync.ALL, + }, + { + FlagData: defaultState, + Source: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), + Type: sync.DELETE, + }, + }, + }, + "empty-file-use-default": { + manipulationFuncs: []func(t *testing.T){ + func(t *testing.T) { + writeToFile(t, "") + }, + }, + expectedDataSync: []sync.DataSync{ + { + FlagData: defaultState, + Source: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), + Type: sync.ALL, + }, + }, + }, + } + handler := Sync{ - URI: fmt.Sprintf("%s/%s", dirName, fetchFileName), + URI: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), Logger: logger.NewLogger(nil, false), } - defer t.Cleanup(cleanupFilePath) - setupFilePathFetch(t) - - ctx := context.Background() - dataSyncChan := make(chan sync.DataSync) - - go func() { - err := handler.Sync(ctx, dataSyncChan) - if err != nil { - log.Fatalf("Error start sync: %s", err.Error()) - return - } - }() + for test, tt := range tests { + t.Run(test, func(t *testing.T) { + defer t.Cleanup(cleanupFilePath) + setupDir(t, fetchDirName) + createFile(t, fetchDirName, fetchFileName) - data := <-dataSyncChan + ctx := context.Background() + dataSyncChan := make(chan sync.DataSync, len(tt.expectedDataSync)) - if data.FlagData != fetchFileContents { - t.Errorf("expected content: %s, but received content: %s", fetchFileContents, data.FlagData) + go func() { + err := handler.Sync(ctx, dataSyncChan) + if err != nil { + log.Fatalf("Error start sync: %s", err.Error()) + return + } + }() + + for i, manipulation := range tt.manipulationFuncs { + syncEvent := tt.expectedDataSync[i] + manipulation(t) + select { + case data := <-dataSyncChan: + if data.FlagData != syncEvent.FlagData { + t.Errorf("expected content: %s, but received content: %s", syncEvent.FlagData, data.FlagData) + } + if data.Source != syncEvent.Source { + t.Errorf("expected source: %s, but received source: %s", syncEvent.Source, data.Source) + } + if data.Type != syncEvent.Type { + t.Errorf("expected type: %b, but received type: %b", syncEvent.Type, data.Type) + } + case <-time.After(10 * time.Second): + t.Errorf("event not found, timeout out after 10 seconds") + } + } + }) } } @@ -52,7 +149,7 @@ func TestFilePathSync_Fetch(t *testing.T) { }{ "success": { fpSync: Sync{ - URI: fmt.Sprintf("%s/%s", dirName, fetchFileName), + URI: fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), Logger: logger.NewLogger(nil, false), }, handleResponse: func(t *testing.T, fetched string, err error) { @@ -67,7 +164,7 @@ func TestFilePathSync_Fetch(t *testing.T) { }, "not found": { fpSync: Sync{ - URI: fmt.Sprintf("%s/%s", dirName, "not_found"), + URI: fmt.Sprintf("%s/%s", fetchDirName, "not_found"), Logger: logger.NewLogger(nil, false), }, handleResponse: func(t *testing.T, fetched string, err error) { @@ -80,7 +177,9 @@ func TestFilePathSync_Fetch(t *testing.T) { for name, tt := range tests { t.Run(name, func(t *testing.T) { - setupFilePathFetch(t) + setupDir(t, fetchDirName) + createFile(t, fetchDirName, fetchFileName) + writeToFile(t, fetchFileContents) defer t.Cleanup(cleanupFilePath) data, err := tt.fpSync.fetch(context.Background()) @@ -91,21 +190,31 @@ func TestFilePathSync_Fetch(t *testing.T) { } func cleanupFilePath() { - if err := os.RemoveAll(dirName); err != nil { + if err := os.RemoveAll(fetchDirName); err != nil { log.Fatalf("rmdir: %v", err) } } -func setupFilePathFetch(t *testing.T) { +func deleteFile(t *testing.T, dirName string, fileName string) { + if err := os.Remove(fmt.Sprintf("%s/%s", dirName, fileName)); err != nil { + t.Fatal(err) + } +} + +func setupDir(t *testing.T, dirName string) { if err := os.Mkdir(dirName, os.ModePerm); err != nil { t.Fatal(err) } +} - if _, err := os.Create(fmt.Sprintf("%s/%s", dirName, fetchFileName)); err != nil { +func createFile(t *testing.T, dirName string, fileName string) { + if _, err := os.Create(fmt.Sprintf("%s/%s", dirName, fileName)); err != nil { t.Fatal(err) } +} - file, err := os.OpenFile(fmt.Sprintf("%s/%s", dirName, fetchFileName), os.O_RDWR, 0o644) +func writeToFile(t *testing.T, fileContents string) { + file, err := os.OpenFile(fmt.Sprintf("%s/%s", fetchDirName, fetchFileName), os.O_RDWR, 0o644) if err != nil { t.Fatal(err) } @@ -115,7 +224,7 @@ func setupFilePathFetch(t *testing.T) { } }(file) - _, err = file.WriteAt([]byte(fetchFileContents), 0) + _, err = file.WriteAt([]byte(fileContents), 0) if err != nil { t.Fatal(err) } diff --git a/pkg/sync/isync.go b/pkg/sync/isync.go index f1db9cc3a..86ed7c2d8 100644 --- a/pkg/sync/isync.go +++ b/pkg/sync/isync.go @@ -18,6 +18,21 @@ const ( DELETE ) +func (t Type) String() string { + switch t { + case ALL: + return "ALL" + case ADD: + return "ADD" + case UPDATE: + return "UPDATE" + case DELETE: + return "DELETE" + default: + return "UNKNOWN" + } +} + /* ISync implementations watch for changes in the flag sources (HTTP backend, local file, K8s CRDs ...),fetch the latest value and communicate to the Runtime with DataSync channel