From 523d0e0d64a5dd6bdc2561b92cb5b95ffa15ef8a Mon Sep 17 00:00:00 2001 From: Sean Eagan Date: Fri, 29 May 2020 13:10:00 -0500 Subject: [PATCH] Add locking to release sync process Fixes #437 --- .../lockedfile/internal/filelock/filelock.go | 98 +++++++++ .../internal/filelock/filelock_unix.go | 44 +++++ internal/lockedfile/lockedfile.go | 187 ++++++++++++++++++ internal/lockedfile/lockedfile_filelock.go | 64 ++++++ internal/lockedfile/mutex.go | 67 +++++++ pkg/release/release.go | 30 ++- 6 files changed, 483 insertions(+), 7 deletions(-) create mode 100755 internal/lockedfile/internal/filelock/filelock.go create mode 100755 internal/lockedfile/internal/filelock/filelock_unix.go create mode 100755 internal/lockedfile/lockedfile.go create mode 100755 internal/lockedfile/lockedfile_filelock.go create mode 100755 internal/lockedfile/mutex.go diff --git a/internal/lockedfile/internal/filelock/filelock.go b/internal/lockedfile/internal/filelock/filelock.go new file mode 100755 index 000000000..aba3eed77 --- /dev/null +++ b/internal/lockedfile/internal/filelock/filelock.go @@ -0,0 +1,98 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package filelock provides a platform-independent API for advisory file +// locking. Calls to functions in this package on platforms that do not support +// advisory locks will return errors for which IsNotSupported returns true. +package filelock + +import ( + "errors" + "os" +) + +// A File provides the minimal set of methods required to lock an open file. +// File implementations must be usable as map keys. +// The usual implementation is *os.File. +type File interface { + // Name returns the name of the file. + Name() string + + // Fd returns a valid file descriptor. + // (If the File is an *os.File, it must not be closed.) + Fd() uintptr + + // Stat returns the FileInfo structure describing file. + Stat() (os.FileInfo, error) +} + +// Lock places an advisory write lock on the file, blocking until it can be +// locked. +// +// If Lock returns nil, no other process will be able to place a read or write +// lock on the file until this process exits, closes f, or calls Unlock on it. +// +// If f's descriptor is already read- or write-locked, the behavior of Lock is +// unspecified. +// +// Closing the file may or may not release the lock promptly. Callers should +// ensure that Unlock is always called when Lock succeeds. +func Lock(f File) error { + return lock(f, writeLock) +} + +// RLock places an advisory read lock on the file, blocking until it can be locked. +// +// If RLock returns nil, no other process will be able to place a write lock on +// the file until this process exits, closes f, or calls Unlock on it. +// +// If f is already read- or write-locked, the behavior of RLock is unspecified. +// +// Closing the file may or may not release the lock promptly. Callers should +// ensure that Unlock is always called if RLock succeeds. +func RLock(f File) error { + return lock(f, readLock) +} + +// Unlock removes an advisory lock placed on f by this process. +// +// The caller must not attempt to unlock a file that is not locked. +func Unlock(f File) error { + return unlock(f) +} + +// String returns the name of the function corresponding to lt +// (Lock, RLock, or Unlock). +func (lt lockType) String() string { + switch lt { + case readLock: + return "RLock" + case writeLock: + return "Lock" + default: + return "Unlock" + } +} + +// IsNotSupported returns a boolean indicating whether the error is known to +// report that a function is not supported (possibly for a specific input). +// It is satisfied by ErrNotSupported as well as some syscall errors. +func IsNotSupported(err error) bool { + return isNotSupported(underlyingError(err)) +} + +var ErrNotSupported = errors.New("operation not supported") + +// underlyingError returns the underlying error for known os error types. +func underlyingError(err error) error { + switch err := err.(type) { + case *os.PathError: + return err.Err + case *os.LinkError: + return err.Err + case *os.SyscallError: + return err.Err + } + return err +} diff --git a/internal/lockedfile/internal/filelock/filelock_unix.go b/internal/lockedfile/internal/filelock/filelock_unix.go new file mode 100755 index 000000000..00c426283 --- /dev/null +++ b/internal/lockedfile/internal/filelock/filelock_unix.go @@ -0,0 +1,44 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build darwin dragonfly freebsd linux netbsd openbsd + +package filelock + +import ( + "os" + "syscall" +) + +type lockType int16 + +const ( + readLock lockType = syscall.LOCK_SH + writeLock lockType = syscall.LOCK_EX +) + +func lock(f File, lt lockType) (err error) { + for { + err = syscall.Flock(int(f.Fd()), int(lt)) + if err != syscall.EINTR { + break + } + } + if err != nil { + return &os.PathError{ + Op: lt.String(), + Path: f.Name(), + Err: err, + } + } + return nil +} + +func unlock(f File) error { + return lock(f, syscall.LOCK_UN) +} + +func isNotSupported(err error) bool { + return err == syscall.ENOSYS || err == syscall.ENOTSUP || err == syscall.EOPNOTSUPP || err == ErrNotSupported +} diff --git a/internal/lockedfile/lockedfile.go b/internal/lockedfile/lockedfile.go new file mode 100755 index 000000000..59b2dba44 --- /dev/null +++ b/internal/lockedfile/lockedfile.go @@ -0,0 +1,187 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package lockedfile creates and manipulates files whose contents should only +// change atomically. +package lockedfile + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "runtime" +) + +// A File is a locked *os.File. +// +// Closing the file releases the lock. +// +// If the program exits while a file is locked, the operating system releases +// the lock but may not do so promptly: callers must ensure that all locked +// files are closed before exiting. +type File struct { + osFile + closed bool +} + +// osFile embeds a *os.File while keeping the pointer itself unexported. +// (When we close a File, it must be the same file descriptor that we opened!) +type osFile struct { + *os.File +} + +// OpenFile is like os.OpenFile, but returns a locked file. +// If flag includes os.O_WRONLY or os.O_RDWR, the file is write-locked; +// otherwise, it is read-locked. +func OpenFile(name string, flag int, perm os.FileMode) (*File, error) { + var ( + f = new(File) + err error + ) + f.osFile.File, err = openFile(name, flag, perm) + if err != nil { + return nil, err + } + + // Although the operating system will drop locks for open files when the go + // command exits, we want to hold locks for as little time as possible, and we + // especially don't want to leave a file locked after we're done with it. Our + // Close method is what releases the locks, so use a finalizer to report + // missing Close calls on a best-effort basis. + runtime.SetFinalizer(f, func(f *File) { + panic(fmt.Sprintf("lockedfile.File %s became unreachable without a call to Close", f.Name())) + }) + + return f, nil +} + +// Open is like os.Open, but returns a read-locked file. +func Open(name string) (*File, error) { + return OpenFile(name, os.O_RDONLY, 0) +} + +// Create is like os.Create, but returns a write-locked file. +func Create(name string) (*File, error) { + return OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0666) +} + +// Edit creates the named file with mode 0666 (before umask), +// but does not truncate existing contents. +// +// If Edit succeeds, methods on the returned File can be used for I/O. +// The associated file descriptor has mode O_RDWR and the file is write-locked. +func Edit(name string) (*File, error) { + return OpenFile(name, os.O_RDWR|os.O_CREATE, 0666) +} + +// Close unlocks and closes the underlying file. +// +// Close may be called multiple times; all calls after the first will return a +// non-nil error. +func (f *File) Close() error { + if f.closed { + return &os.PathError{ + Op: "close", + Path: f.Name(), + Err: os.ErrClosed, + } + } + f.closed = true + + err := closeFile(f.osFile.File) + runtime.SetFinalizer(f, nil) + return err +} + +// Read opens the named file with a read-lock and returns its contents. +func Read(name string) ([]byte, error) { + f, err := Open(name) + if err != nil { + return nil, err + } + defer f.Close() + + return ioutil.ReadAll(f) +} + +// Write opens the named file (creating it with the given permissions if needed), +// then write-locks it and overwrites it with the given content. +func Write(name string, content io.Reader, perm os.FileMode) (err error) { + f, err := OpenFile(name, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, perm) + if err != nil { + return err + } + + _, err = io.Copy(f, content) + if closeErr := f.Close(); err == nil { + err = closeErr + } + return err +} + +// Transform invokes t with the result of reading the named file, with its lock +// still held. +// +// If t returns a nil error, Transform then writes the returned contents back to +// the file, making a best effort to preserve existing contents on error. +// +// t must not modify the slice passed to it. +func Transform(name string, t func([]byte) ([]byte, error)) (err error) { + f, err := Edit(name) + if err != nil { + return err + } + defer f.Close() + + old, err := ioutil.ReadAll(f) + if err != nil { + return err + } + + new, err := t(old) + if err != nil { + return err + } + + if len(new) > len(old) { + // The overall file size is increasing, so write the tail first: if we're + // about to run out of space on the disk, we would rather detect that + // failure before we have overwritten the original contents. + if _, err := f.WriteAt(new[len(old):], int64(len(old))); err != nil { + // Make a best effort to remove the incomplete tail. + f.Truncate(int64(len(old))) + return err + } + } + + // We're about to overwrite the old contents. In case of failure, make a best + // effort to roll back before we close the file. + defer func() { + if err != nil { + if _, err := f.WriteAt(old, 0); err == nil { + f.Truncate(int64(len(old))) + } + } + }() + + if len(new) >= len(old) { + if _, err := f.WriteAt(new[:len(old)], 0); err != nil { + return err + } + } else { + if _, err := f.WriteAt(new, 0); err != nil { + return err + } + // The overall file size is decreasing, so shrink the file to its final size + // after writing. We do this after writing (instead of before) so that if + // the write fails, enough filesystem space will likely still be reserved + // to contain the previous contents. + if err := f.Truncate(int64(len(new))); err != nil { + return err + } + } + + return nil +} diff --git a/internal/lockedfile/lockedfile_filelock.go b/internal/lockedfile/lockedfile_filelock.go new file mode 100755 index 000000000..39f1250d5 --- /dev/null +++ b/internal/lockedfile/lockedfile_filelock.go @@ -0,0 +1,64 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !plan9 + +package lockedfile + +import ( + "os" + + "github.com/fluxcd/helm-operator/internal/lockedfile/internal/filelock" +) + +func openFile(name string, flag int, perm os.FileMode) (*os.File, error) { + // On BSD systems, we could add the O_SHLOCK or O_EXLOCK flag to the OpenFile + // call instead of locking separately, but we have to support separate locking + // calls for Linux and Windows anyway, so it's simpler to use that approach + // consistently. + + f, err := os.OpenFile(name, flag&^os.O_TRUNC, perm) + if err != nil { + return nil, err + } + + switch flag & (os.O_RDONLY | os.O_WRONLY | os.O_RDWR) { + case os.O_WRONLY, os.O_RDWR: + err = filelock.Lock(f) + default: + err = filelock.RLock(f) + } + if err != nil { + f.Close() + return nil, err + } + + if flag&os.O_TRUNC == os.O_TRUNC { + if err := f.Truncate(0); err != nil { + // The documentation for os.O_TRUNC says “if possible, truncate file when + // opened”, but doesn't define “possible” (golang.org/issue/28699). + // We'll treat regular files (and symlinks to regular files) as “possible” + // and ignore errors for the rest. + if fi, statErr := f.Stat(); statErr != nil || fi.Mode().IsRegular() { + filelock.Unlock(f) + f.Close() + return nil, err + } + } + } + + return f, nil +} + +func closeFile(f *os.File) error { + // Since locking syscalls operate on file descriptors, we must unlock the file + // while the descriptor is still valid — that is, before the file is closed — + // and avoid unlocking files that are already closed. + err := filelock.Unlock(f) + + if closeErr := f.Close(); err == nil { + err = closeErr + } + return err +} diff --git a/internal/lockedfile/mutex.go b/internal/lockedfile/mutex.go new file mode 100755 index 000000000..180a36c62 --- /dev/null +++ b/internal/lockedfile/mutex.go @@ -0,0 +1,67 @@ +// Copyright 2018 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package lockedfile + +import ( + "fmt" + "os" + "sync" +) + +// A Mutex provides mutual exclusion within and across processes by locking a +// well-known file. Such a file generally guards some other part of the +// filesystem: for example, a Mutex file in a directory might guard access to +// the entire tree rooted in that directory. +// +// Mutex does not implement sync.Locker: unlike a sync.Mutex, a lockedfile.Mutex +// can fail to lock (e.g. if there is a permission error in the filesystem). +// +// Like a sync.Mutex, a Mutex may be included as a field of a larger struct but +// must not be copied after first use. The Path field must be set before first +// use and must not be change thereafter. +type Mutex struct { + Path string // The path to the well-known lock file. Must be non-empty. + mu sync.Mutex // A redundant mutex. The race detector doesn't know about file locking, so in tests we may need to lock something that it understands. +} + +// MutexAt returns a new Mutex with Path set to the given non-empty path. +func MutexAt(path string) *Mutex { + if path == "" { + panic("lockedfile.MutexAt: path must be non-empty") + } + return &Mutex{Path: path} +} + +func (mu *Mutex) String() string { + return fmt.Sprintf("lockedfile.Mutex(%s)", mu.Path) +} + +// Lock attempts to lock the Mutex. +// +// If successful, Lock returns a non-nil unlock function: it is provided as a +// return-value instead of a separate method to remind the caller to check the +// accompanying error. (See https://golang.org/issue/20803.) +func (mu *Mutex) Lock() (unlock func(), err error) { + if mu.Path == "" { + panic("lockedfile.Mutex: missing Path during Lock") + } + + // We could use either O_RDWR or O_WRONLY here. If we choose O_RDWR and the + // file at mu.Path is write-only, the call to OpenFile will fail with a + // permission error. That's actually what we want: if we add an RLock method + // in the future, it should call OpenFile with O_RDONLY and will require the + // files must be readable, so we should not let the caller make any + // assumptions about Mutex working with write-only files. + f, err := OpenFile(mu.Path, os.O_RDWR|os.O_CREATE, 0666) + if err != nil { + return nil, err + } + mu.mu.Lock() + + return func() { + mu.mu.Unlock() + f.Close() + }, nil +} diff --git a/pkg/release/release.go b/pkg/release/release.go index 0320d1ee1..696e5d7b5 100644 --- a/pkg/release/release.go +++ b/pkg/release/release.go @@ -3,6 +3,8 @@ package release import ( "context" "fmt" + "os" + "path" "path/filepath" "time" @@ -11,6 +13,7 @@ import ( corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "github.com/fluxcd/helm-operator/internal/lockedfile" "github.com/fluxcd/helm-operator/pkg/apis/helm.fluxcd.io/v1" "github.com/fluxcd/helm-operator/pkg/chartsync" v1client "github.com/fluxcd/helm-operator/pkg/client/clientset/versioned/typed/helm.fluxcd.io/v1" @@ -60,18 +63,25 @@ func New(logger log.Logger, helmClients *helm.Clients, coreV1Client corev1client // Sync synchronizes the given HelmRelease with Helm. func (r *Release) Sync(hr *v1.HelmRelease) (err error) { - defer func(start time.Time) { - ObserveRelease(start, err == nil, hr.GetTargetNamespace(), hr.GetReleaseName()) - }(time.Now()) - defer status.SetObservedGeneration(r.hrClient.HelmReleases(hr.Namespace), hr, hr.Generation) - client, ok := r.helmClients.Load(hr.GetHelmVersion(r.config.DefaultHelmVersion)) if !ok { status.SetStatusPhase(r.hrClient.HelmReleases(hr.GetTargetNamespace()), hr, v1.HelmReleasePhaseFailed) return fmt.Errorf("no client found for Helm '%s'", r.config.DefaultHelmVersion) } - logger := releaseLogger(r.logger, client, hr) + + // acquire lock + unlock, err := r.lock(fmt.Sprintf("%s-%s", hr.GetNamespace(), hr.GetName())) + if err != nil { + logger.Log("info", fmt.Sprintf("could not obtain lock: %s", err)) + return nil + } + defer unlock() + defer func(start time.Time) { + ObserveRelease(start, err == nil, hr.GetTargetNamespace(), hr.GetReleaseName()) + }(time.Now()) + defer status.SetObservedGeneration(r.hrClient.HelmReleases(hr.Namespace), hr, hr.Generation) + logger.Log("info", "starting sync run") chart, cleanup, err := r.prepareChart(client, hr) @@ -108,6 +118,12 @@ func (r *Release) Sync(hr *v1.HelmRelease) (err error) { return r.run(logger, client, action, hr, curRel, chart, values) } +func (r *Release) lock(name string) (unlock func(), err error) { + lockFile := path.Join(os.TempDir(), name+".lock") + mutex := lockedfile.MutexAt(lockFile) + return mutex.Lock() +} + // Uninstalls removes the Helm release for the given HelmRelease, // and the git chart source if present. func (r *Release) Uninstall(hr *v1.HelmRelease) error { @@ -322,7 +338,7 @@ next: action = AnnotateAction goto next case AnnotateAction: - if err := annotate(hr, newRel) ; err != nil { + if err := annotate(hr, newRel); err != nil { logger.Log("warning", err, "phase", action) } case UninstallAction: