Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Validate repository state after repository refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
hiddeco committed Mar 13, 2019
1 parent f8331b5 commit 2416f8a
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 255 deletions.
89 changes: 65 additions & 24 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,11 @@ func (d *Daemon) sync() jobFunc {
if err != nil {
return result, err
}
if latestVerifiedRev, err := d.LatestValidRevision(ctx, head); err != nil {
return result, err
} else if head != latestVerifiedRev {
return result, fmt.Errorf("unable to sync to invalid HEAD revision (%s) latest verified revision is: %s", head, latestVerifiedRev)
}
result.Revision = head
return result, nil
}
Expand Down Expand Up @@ -418,6 +423,14 @@ func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) updateFu
return result, nil
}

if headRev, err := working.HeadRevision(ctx); err != nil {
return result, err
} else if latestVerifiedRev, err := d.LatestValidRevision(ctx, headRev); err != nil {
return result, err
} else if headRev != latestVerifiedRev {
return result, fmt.Errorf("HEAD is not a verified revision; can not update on top of unverified HEAD")
}

commitAuthor := ""
if d.GitConfig.SetAuthor {
commitAuthor = spec.Cause.User
Expand All @@ -426,18 +439,6 @@ func (d *Daemon) updatePolicy(spec update.Spec, updates policy.Updates) updateFu
Author: commitAuthor,
Message: policyCommitMessage(updates, spec.Cause),
}
if d.GitConfig.VerifySignatures {
cs, err := getChangeset(ctx, working, d.Repo, d.GitConfig)
if err != nil {
return result, err
}
if err = verifySyncTagSignature(ctx, working, cs); err != nil {
return result, err
}
if err = verifyCommitSignatures(&cs); err != nil {
return result, err
}
}
if err := working.CommitAndPush(ctx, commitAction, &note{JobID: jobID, Spec: spec}); err != nil {
// On the chance pushing failed because it was not
// possible to fast-forward, ask for a sync so the
Expand Down Expand Up @@ -471,6 +472,14 @@ func (d *Daemon) release(spec update.Spec, c release.Changes) updateFunc {
var revision string

if c.ReleaseKind() == update.ReleaseKindExecute {
if headRev, err := working.HeadRevision(ctx); err != nil {
return zero, err
} else if latestVerifiedRev, err := d.LatestValidRevision(ctx, headRev); err != nil {
return zero, err
} else if headRev != latestVerifiedRev {
return zero, fmt.Errorf("HEAD is not a verified revision; can not update on top of unverified HEAD")
}

commitMsg := spec.Cause.Message
if commitMsg == "" {
commitMsg = c.CommitMessage(result)
Expand All @@ -483,18 +492,6 @@ func (d *Daemon) release(spec update.Spec, c release.Changes) updateFunc {
Author: commitAuthor,
Message: commitMsg,
}
if d.GitConfig.VerifySignatures {
cs, err := getChangeset(ctx, working, d.Repo, d.GitConfig)
if err != nil {
return zero, err
}
if err = verifySyncTagSignature(ctx, working, cs); err != nil {
return zero, err
}
if err = verifyCommitSignatures(&cs); err != nil {
return zero, err
}
}
if err := working.CommitAndPush(ctx, commitAction, &note{JobID: jobID, Spec: spec, Result: result}); err != nil {
// On the chance pushing failed because it was not
// possible to fast-forward, ask the repo to fetch
Expand Down Expand Up @@ -638,6 +635,50 @@ func (d *Daemon) WithClone(ctx context.Context, fn func(*git.Checkout) error) er
return fn(co)
}

// LatestValidRevision returns the latest valid revision for the
// configured branch when the verification of GPG signatures for Git
// is enabled _or_ the HEAD revision of the configured branch when it
// is not. In case verification is enabled and a current revision is
// given it will also validate the tag signature -- as the state of
// the branch can not be trusted when the tag originates from an
// unknown source.
func (d *Daemon) LatestValidRevision(ctx context.Context, currentRevision string) (string, error) {
newRevision, err := d.Repo.Revision(ctx, d.GitConfig.Branch)
if !d.GitConfig.VerifySignatures || err != nil {
return newRevision, err
}

if currentRevision != "" {
err = d.Repo.VerifyTag(ctx, d.GitConfig.SyncTag)
if err != nil {
return currentRevision, errors.Wrap(err, "failed to verify signature of sync tag")
}
}

var commits []git.Commit
if currentRevision == "" {
commits, err = d.Repo.CommitsBefore(ctx, newRevision)
} else {
commits, err = d.Repo.CommitsBetween(ctx, currentRevision, newRevision)
}

if err != nil {
return "", err
}

for i := len(commits) - 1; i >= 0; i-- {
if !commits[i].Signature.Valid() {
d.Logger.Log("err", "invalid GPG signature for commit", "revision", commits[i].Revision, "key", commits[i].Signature.Key)
if i+1 < len(commits) {
return commits[i+1].Revision, nil
}
return "", nil
}
}

return newRevision, nil
}

func (d *Daemon) LogEvent(ev event.Event) error {
if d.EventWriter == nil {
d.Logger.Log("event", ev, "logupstream", "false")
Expand Down
7 changes: 4 additions & 3 deletions daemon/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ func (d *Daemon) Loop(stop chan struct{}, wg *sync.WaitGroup, logger log.Logger)
// available.
imagePollTimer := time.NewTimer(d.RegistryPollInterval)

// Keep track of current HEAD, so we can know when to treat a repo
// Keep track of current, verified (if signature verification is
// enabled), HEAD, so we can know when to treat a repo
// mirror notification as a change. Otherwise, we'll just sync
// every timer tick as well as every mirror refresh.
syncHead := ""
Expand Down Expand Up @@ -72,7 +73,7 @@ func (d *Daemon) Loop(stop chan struct{}, wg *sync.WaitGroup, logger log.Logger)
default:
}
}
sync, err := d.NewSync(logger)
sync, err := d.NewSync(logger, syncHead)
if err != nil {
logger.Log("err", err)
continue
Expand All @@ -89,7 +90,7 @@ func (d *Daemon) Loop(stop chan struct{}, wg *sync.WaitGroup, logger log.Logger)
d.AskForSync()
case <-d.Repo.C:
ctx, cancel := context.WithTimeout(context.Background(), d.GitConfig.Timeout)
newSyncHead, err := d.Repo.Revision(ctx, d.GitConfig.Branch)
newSyncHead, err := d.LatestValidRevision(ctx, syncHead)
cancel()
if err != nil {
logger.Log("url", d.Repo.Origin().URL, "err", err)
Expand Down
95 changes: 13 additions & 82 deletions daemon/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"crypto/sha256"
"encoding/base64"
"fmt"
"github.com/go-kit/kit/log"
"github.com/pkg/errors"
"strings"
Expand Down Expand Up @@ -47,8 +46,8 @@ type changeset struct {
initialSync bool
}

// NewSync initializes a new sync.
func (d *Daemon) NewSync(logger log.Logger) (Sync, error) {
// NewSync initializes a new sync for the given revision.
func (d *Daemon) NewSync(logger log.Logger, revision string) (Sync, error) {
s := Sync{
logger: logger,
repo: d.Repo,
Expand All @@ -62,8 +61,17 @@ func (d *Daemon) NewSync(logger log.Logger) (Sync, error) {
var err error
ctx, cancel := context.WithTimeout(context.Background(), s.gitConfig.Timeout)
s.working, err = s.repo.Clone(ctx, s.gitConfig)
if err != nil {
return s, err
}
cancel()

if headRev, err := s.working.HeadRevision(context.Background()); err != nil {
return s, err
} else if headRev != revision {
err = s.working.Checkout(context.Background(), revision)
}

return s, err
}

Expand All @@ -77,18 +85,6 @@ func (s *Sync) Run(ctx context.Context, synctag SyncTag) error {
return err
}

if s.gitConfig.VerifySignatures {
if err := verifySyncTagSignature(ctx, s.working, c); err != nil {
return err
}
if err = verifyCommitSignatures(&c); err != nil {
s.logger.Log("err", err)
}
if err := verifyWorkingState(ctx, s.working, &c); err != nil {
return err
}
}

syncSetName := makeSyncLabel(s.repo.Origin(), s.gitConfig)
resources, resourceErrors, err := doSync(s, syncSetName)
if err != nil {
Expand Down Expand Up @@ -123,7 +119,7 @@ func (s *Sync) Run(ctx context.Context, synctag SyncTag) error {
}

if c.newTagRev != c.oldTagRev {
if err := moveSyncTag(ctx, s, c, synctag); err != nil {
if err := moveSyncTag(ctx, s, c); err != nil {
return err
}
synctag.SetRevision(c.oldTagRev, c.newTagRev)
Expand Down Expand Up @@ -162,71 +158,6 @@ func getChangeset(ctx context.Context, working *git.Checkout, repo *git.Repo, gi
return c, err
}

// verifySyncTagSignature verifies the signature of the current sync
// tag, if verification fails _while we are not doing an initial sync_
// it returns an error.
func verifySyncTagSignature(ctx context.Context, working *git.Checkout, c changeset) error {
if c.initialSync {
return nil
}
if err := working.VerifySyncTag(ctx); err != nil {
return errors.Wrap(err, "failed to verify signature of sync tag")
}
return nil
}

// verifyCommitSignatures verifies the signatures of the commits we are
// working with, it does so by looping through the commits in ascending
// order and requesting the validity of each signature. In case of
// failure it mutates the set of the changeset of commits we are
// working with to the ones we have validated and returns an error.
func verifyCommitSignatures(c *changeset) error {
for i := len(c.commits) - 1; i >= 0; i-- {
if !c.commits[i].Signature.Valid() {
err := fmt.Errorf(
"invalid GPG signature for commit %s with key %s",
c.commits[i].Revision,
c.commits[i].Signature.Key,
)
c.commits = c.commits[i+1:]
return err
}
}
return nil
}

// verifyWorkingState verifies if the state of the working git
// repository and newTagRev is still equal to the commit changeset we
// have. This is required when working with GPG signatures as we may
// be working with a mutated commit changeset due to commit signature
// verification errors.
func verifyWorkingState(ctx context.Context, working *git.Checkout, c *changeset) error {
// We have no valid commits, determine what we should do next...
if len(c.commits) == 0 {
// We have no state to reapply either; abort...
if c.initialSync {
return errors.New("unable to sync as no commits with valid GPG signatures were found")
}
// Reapply the old rev as this is the latest valid state we saw
c.newTagRev = c.oldTagRev
return nil
}

// Check if the first commit in the slice still equals the
// newTagRev. If this is not the case we need to checkout
// the working clone to the revision of the commit from the
// slice as otherwise we will be (re)applying unverified
// state on the cluster.
if latestCommitRev := c.commits[len(c.commits)-1].Revision; c.newTagRev != latestCommitRev {
if err := working.Checkout(ctx, latestCommitRev); err != nil {
return err
}
c.newTagRev = latestCommitRev
return nil
}
return nil
}

// doSync runs the actual sync of workloads on the cluster. It returns
// a map with all resources it applied and sync errors it encountered.
func doSync(s *Sync, syncSetName string) (map[string]resource.Resource, []event.ResourceError, error) {
Expand Down Expand Up @@ -435,7 +366,7 @@ func logCommitEvent(s *Sync, c changeset, serviceIDs flux.ResourceIDSet,
}

// moveSyncTag moves the sync tag to the revision we just synced.
func moveSyncTag(ctx context.Context, s *Sync, c changeset, synctag SyncTag) error {
func moveSyncTag(ctx context.Context, s *Sync, c changeset) error {
tagAction := git.TagAction{
Revision: c.newTagRev,
Message: "Sync pointer",
Expand Down
Loading

0 comments on commit 2416f8a

Please sign in to comment.