Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Let syncTag.SetRevision also update git revision
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddeco committed May 7, 2019
1 parent 0579f26 commit 3aeae2b
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 37 deletions.
36 changes: 29 additions & 7 deletions daemon/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,21 +166,43 @@ type lastKnownSyncTag struct {
warnedAboutChange bool
}

func (s *lastKnownSyncTag) Revision() string {
return s.revision
}

func (s *lastKnownSyncTag) SetRevision(oldRev, newRev string) {
// SetRevision updates the sync tag revision in git _and_ the
// in-memory revision, if it has changed. In addition, it validates
// if the in-memory revision matches the old revision from git before
// making the update, to notify a user about multiple Flux daemons
// using the same tag.
func (s *lastKnownSyncTag) SetRevision(ctx context.Context, working *git.Checkout, timeout time.Duration,
oldRev, newRev string) (bool, error) {
// Check if something other than the current instance of fluxd
// changed the sync tag. This is likely caused by another instance
// using the same tag. Having multiple instances fight for the same
// tag can lead to fluxd missing manifest changes.
if s.revision != "" && oldRev != s.revision && !s.warnedAboutChange {
s.logger.Log("warning",
"detected external change in git sync tag; the sync tag should not be shared by fluxd instances")
"detected external change in git sync tag; the sync tag should not be shared by fluxd instances",
"tag", s.syncTag)
s.warnedAboutChange = true
}

s.logger.Log("tag", s.syncTag, "old", oldRev, "new", newRev)
// Did it actually change?
if s.revision == newRev {
return false, nil
}

// Update the sync tag revision in git
tagAction := git.TagAction{
Revision: newRev,
Message: "Sync pointer",
}
ctx, cancel := context.WithTimeout(ctx, timeout)
if err := working.MoveSyncTagAndPush(ctx, tagAction); err != nil {
return false, err
}
cancel()

// Update in-memory revision
s.revision = newRev

s.logger.Log("tag", s.syncTag, "old", oldRev, "new", newRev)
return true, nil
}
40 changes: 10 additions & 30 deletions daemon/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ import (
"github.com/weaveworks/flux/update"
)

type SyncTag interface {
Revision() string
SetRevision(oldRev, newRev string)
type syncTag interface {
SetRevision(ctx context.Context, working *git.Checkout, timeout time.Duration, oldRev, newRev string) (bool, error)
}

type eventLogger interface {
Expand All @@ -34,7 +33,7 @@ type changeSet struct {
}

// Sync starts the synchronization of the cluster with git.
func (d *Daemon) Sync(ctx context.Context, started time.Time, revision string, syncTag SyncTag) error {
func (d *Daemon) Sync(ctx context.Context, started time.Time, revision string, syncTag syncTag) error {
// Checkout a working clone used for this sync
ctxt, cancel := context.WithTimeout(ctx, d.GitTimeout)
working, err := d.Repo.Clone(ctxt, d.GitConfig)
Expand All @@ -45,10 +44,8 @@ func (d *Daemon) Sync(ctx context.Context, started time.Time, revision string, s
defer working.Clean()

// Ensure we are syncing the given revision
if headRev, err := working.HeadRevision(ctx); err != nil {
if err := working.Checkout(ctx, revision); err != nil {
return err
} else if headRev != revision {
err = working.Checkout(ctx, revision)
}

// Retrieve change set of commits we need to sync
Expand Down Expand Up @@ -96,17 +93,14 @@ func (d *Daemon) Sync(ctx context.Context, started time.Time, revision string, s
}

// Move sync tag
if c.newTagRev != c.oldTagRev {
if err := moveSyncTag(ctx, c, d.GitTimeout, working); err != nil {
return err
}
syncTag.SetRevision(c.oldTagRev, c.newTagRev)
if err := refresh(ctx, d.GitTimeout, d.Repo); err != nil {
return err
}
if ok, err := syncTag.SetRevision(ctx, working, d.GitTimeout, c.oldTagRev, c.newTagRev); err != nil {
return err
} else if !ok {
return nil
}

return nil
err = refresh(ctx, d.GitTimeout, d.Repo)
return err
}

// getChangeSet returns the change set of commits for this sync,
Expand Down Expand Up @@ -350,20 +344,6 @@ func logCommitEvent(el eventLogger, c changeSet, serviceIDs flux.ResourceIDSet,
return nil
}

// moveSyncTag moves the sync tag to the revision we just synced.
func moveSyncTag(ctx context.Context, c changeSet, timeout time.Duration, working *git.Checkout) error {
tagAction := git.TagAction{
Revision: c.newTagRev,
Message: "Sync pointer",
}
ctx, cancel := context.WithTimeout(ctx, timeout)
if err := working.MoveSyncTagAndPush(ctx, tagAction); err != nil {
return err
}
cancel()
return nil
}

// refresh refreshes the repository, notifying the daemon we have a new
// sync head.
func refresh(ctx context.Context, timeout time.Duration, repo *git.Repo) error {
Expand Down

0 comments on commit 3aeae2b

Please sign in to comment.