From 85c26e17501f6266f9b2dbd6f3c548239123df30 Mon Sep 17 00:00:00 2001 From: Tom Wieczorek Date: Tue, 14 Jun 2022 13:13:11 +0200 Subject: [PATCH] Refactor StackApplier Changes to StackApplier: * Combine Start/Stop methods into a simple Run method that exits when the given context is done. The start method blocked anyways, exiting when the Stop method was called. The context approach is somewhat more straight forward. * Only consider fsnotify events for manifest files. * Also log fsnotify watcher errors in the StackApplier. * Synchronize access to the internal applier, as calls to it might happen concurrently. * Remove the unused Healthy method. Changes to the Manager: * Always remove a stack from internal map, even if its deletion failed. The directory is gone anyways, and a readdition wouldn't work otherwise, because of the old, stopped stack still being in the map. * Retry stack running on failure. Signed-off-by: Tom Wieczorek --- pkg/applier/applier.go | 5 +- pkg/applier/manager.go | 77 ++++++++++-------- pkg/applier/stackapplier.go | 150 +++++++++++++++++++++--------------- 3 files changed, 135 insertions(+), 97 deletions(-) diff --git a/pkg/applier/applier.go b/pkg/applier/applier.go index df8909d630fb..e37f6296f9c7 100644 --- a/pkg/applier/applier.go +++ b/pkg/applier/applier.go @@ -36,6 +36,9 @@ import ( "github.com/k0sproject/k0s/pkg/kubernetes" ) +// manifestFilePattern is the glob pattern that all applicable manifest files need to match. +const manifestFilePattern = "*.yaml" + // Applier manages all the "static" manifests and applies them on the k8s API type Applier struct { Name string @@ -110,7 +113,7 @@ func (a *Applier) Apply(ctx context.Context) error { if err != nil { return err } - files, err := filepath.Glob(path.Join(a.Dir, "*.yaml")) + files, err := filepath.Glob(path.Join(a.Dir, manifestFilePattern)) if err != nil { return err } diff --git a/pkg/applier/manager.go b/pkg/applier/manager.go index eeaa392c66fa..fe55a5be60f2 100644 --- a/pkg/applier/manager.go +++ b/pkg/applier/manager.go @@ -19,6 +19,7 @@ import ( "context" "fmt" "path" + "time" "github.com/k0sproject/k0s/internal/pkg/dir" "github.com/k0sproject/k0s/pkg/component" @@ -40,13 +41,18 @@ type Manager struct { bundlePath string cancelWatcher context.CancelFunc log *logrus.Entry - stacks map[string]*StackApplier + stacks map[string]stack LeaderElector controller.LeaderElector } var _ component.Component = (*Manager)(nil) +type stack = struct { + context.CancelFunc + *StackApplier +} + // Init initializes the Manager func (m *Manager) Init(ctx context.Context) error { err := dir.Init(m.K0sVars.ManifestsDir, constant.ManifestsDirMode) @@ -54,7 +60,7 @@ func (m *Manager) Init(ctx context.Context) error { return fmt.Errorf("failed to create manifest bundle dir %s: %w", m.K0sVars.ManifestsDir, err) } m.log = logrus.WithField("component", "applier-manager") - m.stacks = make(map[string]*StackApplier) + m.stacks = make(map[string]stack) m.bundlePath = m.K0sVars.ManifestsDir m.applier = NewApplier(m.K0sVars.ManifestsDir, m.KubeClientFactory) @@ -113,10 +119,7 @@ func (m *Manager) runWatchers(ctx context.Context) error { } for _, dir := range dirs { - if err := m.createStack(ctx, path.Join(m.bundlePath, dir)); err != nil { - log.WithError(err).Error("failed to create stack") - return err - } + m.createStack(ctx, path.Join(m.bundlePath, dir)) } for { @@ -134,12 +137,10 @@ func (m *Manager) runWatchers(ctx context.Context) error { switch event.Op { case fsnotify.Create: if dir.IsDirectory(event.Name) { - if err := m.createStack(ctx, event.Name); err != nil { - return err - } + m.createStack(ctx, event.Name) } case fsnotify.Remove: - _ = m.removeStack(ctx, event.Name) + m.removeStack(ctx, event.Name) } case <-ctx.Done(): log.Info("manifest watcher done") @@ -148,44 +149,54 @@ func (m *Manager) runWatchers(ctx context.Context) error { } } -func (m *Manager) createStack(ctx context.Context, name string) error { +func (m *Manager) createStack(ctx context.Context, name string) { // safeguard in case the fswatcher would trigger an event for an already existing stack if _, ok := m.stacks[name]; ok { - return nil - } - m.log.WithField("stack", name).Info("registering new stack") - sa, err := NewStackApplier(ctx, name, m.KubeClientFactory) - if err != nil { - return err + return } + stackCtx, cancelStack := context.WithCancel(ctx) + stack := stack{cancelStack, NewStackApplier(name, m.KubeClientFactory)} + m.stacks[name] = stack + go func() { - _ = sa.Start() - }() + log := m.log.WithField("stack", name) + for { + log.Info("Running stack") + if err := stack.Run(stackCtx); err != nil { + log.WithError(err).Error("Failed to run stack") + } - m.stacks[name] = sa - return nil + select { + case <-time.After(10 * time.Second): + continue + case <-stackCtx.Done(): + log.Info("Stack done") + return + } + } + }() } -func (m *Manager) removeStack(ctx context.Context, name string) error { - sa, ok := m.stacks[name] - +func (m *Manager) removeStack(ctx context.Context, name string) { + stack, ok := m.stacks[name] if !ok { m.log. WithField("path", name). Debug("attempted to remove non-existent stack, probably not a directory") - return nil - } - sa.Stop() - err := sa.DeleteStack(ctx) - if err != nil { - m.log.WithField("stack", name).WithError(err).Warn("failed to stop and delete a stack applier") - return err + return } - m.log.WithField("stack", name).Info("stack deleted succesfully") + delete(m.stacks, name) + stack.CancelFunc() - return nil + log := m.log.WithField("stack", name) + if err := stack.DeleteStack(ctx); err != nil { + log.WithError(err).Error("Failed to delete stack") + return + } + + log.Info("Stack deleted successfully") } // Health-check interface diff --git a/pkg/applier/stackapplier.go b/pkg/applier/stackapplier.go index 251ade9377d6..e8d3133c0131 100644 --- a/pkg/applier/stackapplier.go +++ b/pkg/applier/stackapplier.go @@ -17,10 +17,12 @@ package applier import ( "context" + "fmt" + "path/filepath" + "sync" "time" - "k8s.io/client-go/util/retry" - + "github.com/avast/retry-go" "github.com/k0sproject/k0s/pkg/debounce" "github.com/k0sproject/k0s/pkg/kubernetes" @@ -28,84 +30,82 @@ import ( "github.com/sirupsen/logrus" ) -// StackApplier handles each directory as a Stack and watches for changes +// StackApplier applies a stack whenever the files on disk change. type StackApplier struct { - Path string - - fsWatcher *fsnotify.Watcher - applier Applier - log *logrus.Entry + log logrus.FieldLogger + path string - ctx context.Context - cancel context.CancelFunc + doApply, doDelete func(context.Context) error } // NewStackApplier crates new stack applier to manage a stack -func NewStackApplier(ctx context.Context, path string, kubeClientFactory kubernetes.ClientFactoryInterface) (*StackApplier, error) { - watcher, err := fsnotify.NewWatcher() - if err != nil { - return nil, err - } - err = watcher.Add(path) - if err != nil { - return nil, err - } +func NewStackApplier(path string, kubeClientFactory kubernetes.ClientFactoryInterface) *StackApplier { + var mu sync.Mutex applier := NewApplier(path, kubeClientFactory) - log := logrus.WithField("component", "applier-"+applier.Name) - log.WithField("path", path).Debug("created stack applier") - - sa := &StackApplier{ - Path: path, - fsWatcher: watcher, - applier: applier, - log: log, - } - sa.ctx, sa.cancel = context.WithCancel(ctx) + return &StackApplier{ + log: logrus.WithField("component", "applier-"+applier.Name), + path: path, - return sa, nil + doApply: func(ctx context.Context) error { + mu.Lock() + defer mu.Unlock() + return applier.Apply(ctx) + }, + + doDelete: func(ctx context.Context) error { + mu.Lock() + defer mu.Unlock() + return applier.Delete(ctx) + }, + } } -// Start both the initial apply and also the watch for a single stack -func (s *StackApplier) Start() error { +// Run executes the initial apply and watches the stack for updates. +func (s *StackApplier) Run(ctx context.Context) error { + if ctx.Err() != nil { + return nil // The context is already done. + } + + watcher, err := fsnotify.NewWatcher() + if err != nil { + return fmt.Errorf("failed to create watcher: %w", err) + } + defer watcher.Close() + + debounceCtx, cancelDebouncer := context.WithCancel(ctx) + defer cancelDebouncer() + debouncer := debounce.Debouncer[fsnotify.Event]{ - Input: s.fsWatcher.Events, - Timeout: 1 * time.Second, - Filter: s.triggersApply, - Callback: func(fsnotify.Event) { - s.log.Debug("Debouncer triggering, applying...") - err := retry.OnError(retry.DefaultRetry, func(err error) bool { - return true - }, func() error { - return s.applier.Apply(s.ctx) - }) - if err != nil { - s.log.WithError(err).Error("Failed to apply manifests") - } - }, + Input: watcher.Events, + Timeout: 1 * time.Second, + Filter: s.triggersApply, + Callback: func(fsnotify.Event) { s.apply(debounceCtx) }, } // Send an artificial event to ensure that an initial apply will happen. - go func() { s.fsWatcher.Events <- fsnotify.Event{} }() - - _ = debouncer.Run(s.ctx) - return nil -} + go func() { watcher.Events <- fsnotify.Event{} }() + + // Consume and log any errors. + go func() { + for { + err, ok := <-watcher.Errors + if !ok { + return + } + s.log.WithError(err).Error("Error while watching stack") + } + }() -// Stop stops the stack applier. -func (s *StackApplier) Stop() { - s.log.WithField("stack", s.Path).Info("Stopping stack") - s.cancel() -} + err = watcher.Add(s.path) + if err != nil { + return fmt.Errorf("failed to watch %q: %w", s.path, err) + } -// DeleteStack deletes the associated stack -func (s *StackApplier) DeleteStack(ctx context.Context) error { - return s.applier.Delete(ctx) + _ = debouncer.Run(debounceCtx) + return nil } -// Health-check interface -func (s *StackApplier) Healthy() error { return nil } - func (*StackApplier) triggersApply(event fsnotify.Event) bool { // always let the initial apply happen if event == (fsnotify.Event{}) { @@ -117,5 +117,29 @@ func (*StackApplier) triggersApply(event fsnotify.Event) bool { return false } - return true + // Only consider events on manifest files + match, _ := filepath.Match(manifestFilePattern, filepath.Base(event.Name)) + return match +} + +func (s *StackApplier) apply(ctx context.Context) { + s.log.Info("Applying manifests") + + err := retry.Do( + func() error { return s.doApply(ctx) }, + retry.OnRetry(func(attempt uint, err error) { + s.log.WithError(err).Warnf("Retrying after backoff, attempt #%d", attempt) + }), + retry.Context(ctx), + retry.LastErrorOnly(true), + ) + + if err != nil { + s.log.WithError(err).Error("Failed to apply manifests") + } +} + +// DeleteStack deletes the associated stack +func (s *StackApplier) DeleteStack(ctx context.Context) error { + return s.doDelete(ctx) }