Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor StackApplier #1815

Merged
merged 1 commit into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion pkg/applier/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import (
"github.com/k0sproject/k0s/pkg/kubernetes"
)

// manifestFilePattern is the glob pattern that all applicable manifest files need to match.
const manifestFilePattern = "*.yaml"

// Applier manages all the "static" manifests and applies them on the k8s API
type Applier struct {
Name string
Expand Down Expand Up @@ -110,7 +113,7 @@ func (a *Applier) Apply(ctx context.Context) error {
if err != nil {
return err
}
files, err := filepath.Glob(path.Join(a.Dir, "*.yaml"))
files, err := filepath.Glob(path.Join(a.Dir, manifestFilePattern))
if err != nil {
return err
}
Expand Down
77 changes: 44 additions & 33 deletions pkg/applier/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"fmt"
"path"
"time"

"github.com/k0sproject/k0s/internal/pkg/dir"
"github.com/k0sproject/k0s/pkg/component"
Expand All @@ -40,21 +41,26 @@ type Manager struct {
bundlePath string
cancelWatcher context.CancelFunc
log *logrus.Entry
stacks map[string]*StackApplier
stacks map[string]stack

LeaderElector controller.LeaderElector
}

var _ component.Component = (*Manager)(nil)

type stack = struct {
context.CancelFunc
*StackApplier
}

// Init initializes the Manager
func (m *Manager) Init(ctx context.Context) error {
err := dir.Init(m.K0sVars.ManifestsDir, constant.ManifestsDirMode)
if err != nil {
return fmt.Errorf("failed to create manifest bundle dir %s: %w", m.K0sVars.ManifestsDir, err)
}
m.log = logrus.WithField("component", "applier-manager")
m.stacks = make(map[string]*StackApplier)
m.stacks = make(map[string]stack)
m.bundlePath = m.K0sVars.ManifestsDir

m.applier = NewApplier(m.K0sVars.ManifestsDir, m.KubeClientFactory)
Expand Down Expand Up @@ -113,10 +119,7 @@ func (m *Manager) runWatchers(ctx context.Context) error {
}

for _, dir := range dirs {
if err := m.createStack(ctx, path.Join(m.bundlePath, dir)); err != nil {
log.WithError(err).Error("failed to create stack")
return err
}
m.createStack(ctx, path.Join(m.bundlePath, dir))
}

for {
Expand All @@ -134,12 +137,10 @@ func (m *Manager) runWatchers(ctx context.Context) error {
switch event.Op {
case fsnotify.Create:
if dir.IsDirectory(event.Name) {
if err := m.createStack(ctx, event.Name); err != nil {
return err
}
m.createStack(ctx, event.Name)
}
case fsnotify.Remove:
_ = m.removeStack(ctx, event.Name)
m.removeStack(ctx, event.Name)
}
case <-ctx.Done():
log.Info("manifest watcher done")
Expand All @@ -148,44 +149,54 @@ func (m *Manager) runWatchers(ctx context.Context) error {
}
}

func (m *Manager) createStack(ctx context.Context, name string) error {
func (m *Manager) createStack(ctx context.Context, name string) {
// safeguard in case the fswatcher would trigger an event for an already existing stack
if _, ok := m.stacks[name]; ok {
return nil
}
m.log.WithField("stack", name).Info("registering new stack")
sa, err := NewStackApplier(ctx, name, m.KubeClientFactory)
if err != nil {
return err
return
}

stackCtx, cancelStack := context.WithCancel(ctx)
stack := stack{cancelStack, NewStackApplier(name, m.KubeClientFactory)}
m.stacks[name] = stack

go func() {
_ = sa.Start()
}()
log := m.log.WithField("stack", name)
for {
log.Info("Running stack")
if err := stack.Run(stackCtx); err != nil {
log.WithError(err).Error("Failed to run stack")
}

m.stacks[name] = sa
return nil
select {
case <-time.After(10 * time.Second):
continue
case <-stackCtx.Done():
log.Info("Stack done")
return
}
}
}()
}

func (m *Manager) removeStack(ctx context.Context, name string) error {
sa, ok := m.stacks[name]

func (m *Manager) removeStack(ctx context.Context, name string) {
stack, ok := m.stacks[name]
if !ok {
m.log.
WithField("path", name).
Debug("attempted to remove non-existent stack, probably not a directory")
return nil
}
sa.Stop()
err := sa.DeleteStack(ctx)
if err != nil {
m.log.WithField("stack", name).WithError(err).Warn("failed to stop and delete a stack applier")
return err
return
}
m.log.WithField("stack", name).Info("stack deleted succesfully")

delete(m.stacks, name)
stack.CancelFunc()

return nil
log := m.log.WithField("stack", name)
mikhail-sakhnov marked this conversation as resolved.
Show resolved Hide resolved
if err := stack.DeleteStack(ctx); err != nil {
log.WithError(err).Error("Failed to delete stack")
return
}

log.Info("Stack deleted successfully")
}

// Health-check interface
Expand Down
150 changes: 87 additions & 63 deletions pkg/applier/stackapplier.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,95 +17,95 @@ package applier

import (
"context"
"fmt"
"path/filepath"
"sync"
"time"

"k8s.io/client-go/util/retry"

"github.com/avast/retry-go"
"github.com/k0sproject/k0s/pkg/debounce"
"github.com/k0sproject/k0s/pkg/kubernetes"

"github.com/fsnotify/fsnotify"
"github.com/sirupsen/logrus"
)

// StackApplier handles each directory as a Stack and watches for changes
// StackApplier applies a stack whenever the files on disk change.
type StackApplier struct {
Path string

fsWatcher *fsnotify.Watcher
applier Applier
log *logrus.Entry
log logrus.FieldLogger
path string

ctx context.Context
cancel context.CancelFunc
doApply, doDelete func(context.Context) error
}

// NewStackApplier crates new stack applier to manage a stack
func NewStackApplier(ctx context.Context, path string, kubeClientFactory kubernetes.ClientFactoryInterface) (*StackApplier, error) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
err = watcher.Add(path)
if err != nil {
return nil, err
}
func NewStackApplier(path string, kubeClientFactory kubernetes.ClientFactoryInterface) *StackApplier {
var mu sync.Mutex
applier := NewApplier(path, kubeClientFactory)
log := logrus.WithField("component", "applier-"+applier.Name)
log.WithField("path", path).Debug("created stack applier")

sa := &StackApplier{
Path: path,
fsWatcher: watcher,
applier: applier,
log: log,
}

sa.ctx, sa.cancel = context.WithCancel(ctx)
return &StackApplier{
log: logrus.WithField("component", "applier-"+applier.Name),
path: path,

return sa, nil
doApply: func(ctx context.Context) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any actual profit from making this configurable? asking because delegates imo makes code harder to follow during debug.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The intent wasn't to make it configurable, but to synchronize access to the Applier via a Mutex. Alternatively, we can put the Mutex and the Applier into the StackApplier struct, allowing the Applier to be called without locking the Mutex and making it less obvious which parts of the struct are actually guarded by that Mutex.

Using the function pointers was basically a shorthand for something like this:

type StackApplier struct {
	// ...
	applier applier
}

type applier interface {
	Apply(context.Context) error
	Delete(context.Context) error
}

type syncApplier struct {
	sync.Mutex
	applier applier
}

func (a *syncApplier) Apply(ctx context.Context) error {
	a.Lock()
	defer a.Unlock()
	return a.applier.Apply(ctx)
}

func (a *syncApplier) Delete(ctx context.Context) error {
	a.Lock()
	defer a.Unlock()
	return a.applier.Delete(ctx)
}

func NewStackApplier(path string, kubeClientFactory kubernetes.ClientFactoryInterface) *StackApplier {
	applier := NewApplier(path, kubeClientFactory)
	return &StackApplier{
		// ...
		applier: &syncApplier{applier: &applier},
	}
}

(In theory, a unit test could leverage the doApply, doDelete stuff as well, but that was not the goal.)

mu.Lock()
defer mu.Unlock()
return applier.Apply(ctx)
},

doDelete: func(ctx context.Context) error {
mu.Lock()
defer mu.Unlock()
return applier.Delete(ctx)
},
}
}

// Start both the initial apply and also the watch for a single stack
func (s *StackApplier) Start() error {
// Run executes the initial apply and watches the stack for updates.
func (s *StackApplier) Run(ctx context.Context) error {
if ctx.Err() != nil {
return nil // The context is already done.
}

watcher, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("failed to create watcher: %w", err)
}
defer watcher.Close()

debounceCtx, cancelDebouncer := context.WithCancel(ctx)
defer cancelDebouncer()

debouncer := debounce.Debouncer[fsnotify.Event]{
Input: s.fsWatcher.Events,
Timeout: 1 * time.Second,
Filter: s.triggersApply,
Callback: func(fsnotify.Event) {
s.log.Debug("Debouncer triggering, applying...")
err := retry.OnError(retry.DefaultRetry, func(err error) bool {
return true
}, func() error {
return s.applier.Apply(s.ctx)
})
if err != nil {
s.log.WithError(err).Error("Failed to apply manifests")
}
},
Input: watcher.Events,
Timeout: 1 * time.Second,
Filter: s.triggersApply,
Callback: func(fsnotify.Event) { s.apply(debounceCtx) },
}

// Send an artificial event to ensure that an initial apply will happen.
go func() { s.fsWatcher.Events <- fsnotify.Event{} }()

_ = debouncer.Run(s.ctx)
return nil
}
go func() { watcher.Events <- fsnotify.Event{} }()

// Consume and log any errors.
go func() {
for {
err, ok := <-watcher.Errors
if !ok {
return
}
s.log.WithError(err).Error("Error while watching stack")
}
}()

// Stop stops the stack applier.
func (s *StackApplier) Stop() {
s.log.WithField("stack", s.Path).Info("Stopping stack")
s.cancel()
}
err = watcher.Add(s.path)
if err != nil {
return fmt.Errorf("failed to watch %q: %w", s.path, err)
}

// DeleteStack deletes the associated stack
func (s *StackApplier) DeleteStack(ctx context.Context) error {
return s.applier.Delete(ctx)
_ = debouncer.Run(debounceCtx)
return nil
}

// Health-check interface
func (s *StackApplier) Healthy() error { return nil }

func (*StackApplier) triggersApply(event fsnotify.Event) bool {
// always let the initial apply happen
if event == (fsnotify.Event{}) {
Expand All @@ -117,5 +117,29 @@ func (*StackApplier) triggersApply(event fsnotify.Event) bool {
return false
}

return true
// Only consider events on manifest files
match, _ := filepath.Match(manifestFilePattern, filepath.Base(event.Name))
return match
}

func (s *StackApplier) apply(ctx context.Context) {
s.log.Info("Applying manifests")

err := retry.Do(
func() error { return s.doApply(ctx) },
retry.OnRetry(func(attempt uint, err error) {
s.log.WithError(err).Warnf("Retrying after backoff, attempt #%d", attempt)
}),
retry.Context(ctx),
retry.LastErrorOnly(true),
)

if err != nil {
s.log.WithError(err).Error("Failed to apply manifests")
}
}

// DeleteStack deletes the associated stack
func (s *StackApplier) DeleteStack(ctx context.Context) error {
return s.doDelete(ctx)
}