Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Use exactly one clone of the git repo for syncing
Browse files Browse the repository at this point in the history
If we just clone the repo at the branch HEAD whenever the operator
wants to apply a change, we may end up undoing, or redoing, work,
since the ChartChangeSync only advances the revision it looks at when
it's ready to.

To avoid that, pass the desired reconcilations to the ChartChangeSync
and have it do the work, at the revision it's currently working from.

The mechanism is to send the resource in question on an unbuffered
channel, and receive it in the Run loop of ChartChangeSync. This has a
couple of flaws:

 - the event handlers have to wait for the Run loop to get around to
   doing the work (or time out and retry);

 - there's not much point in running more than one worker to process
   events, since they all get serialised through the ChartChangeSync
   anyway (not quite true -- deletes are done straight away because
   they don't need to consult the git repo)

But it is good enough for now.
  • Loading branch information
squaremo committed Aug 17, 2018
1 parent 99c04ad commit 20ce84a
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 56 deletions.
96 changes: 48 additions & 48 deletions integrations/helm/chartsync/chartsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ type ChartChangeSync struct {
release *release.Release
config helmop.RepoConfig
logDiffs bool
reconcile chan ifv1.FluxHelmRelease
}

func New(logger log.Logger, polling Polling, clients Clients, release *release.Release, config helmop.RepoConfig, logReleaseDiffs bool) *ChartChangeSync {
Expand All @@ -90,6 +91,7 @@ func New(logger log.Logger, polling Polling, clients Clients, release *release.R
release: release,
config: config,
logDiffs: logReleaseDiffs,
reconcile: make(chan ifv1.FluxHelmRelease),
}
}

Expand All @@ -108,17 +110,26 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn

ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout)
currentRevision, err := chs.config.Repo.Revision(ctx, chs.config.Branch)
var currentClone *git.Export
if err == nil {
currentClone, err = chs.config.Repo.Export(ctx, currentRevision)
}
cancel()
if err != nil {
errc <- err
return
}

// From now, currentClone shall not revert to nil
defer currentClone.Clean()

ticker := time.NewTicker(chs.Polling.Interval)
defer ticker.Stop()

for {
select {
case fhr := <-chs.reconcile:
chs.reconcileReleaseDef(fhr, currentClone)
case <-chs.config.Repo.C:
ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout)
head, err := chs.config.Repo.Revision(ctx, chs.config.Branch)
Expand All @@ -133,11 +144,20 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn
continue
}

// Sync changes to charts in the git repo
ctx, cancel = context.WithTimeout(context.Background(), helmop.GitOperationTimeout)
newClone, err := chs.config.Repo.Export(ctx, head)
cancel()
if err != nil {
chs.logger.Log("warning", "failure to clone git repo", "error", err)
continue
}
currentClone.Clean()
currentClone = newClone

chs.logger.Log("info", fmt.Sprint("Start of chartsync"))
err = chs.ApplyChartChanges(currentRevision, head)
err = chs.applyChartChanges(currentRevision, head, currentClone)
if err != nil {
chs.logger.Log("error", fmt.Sprintf("Failure to do chart sync: %#v", err))
chs.logger.Log("error", fmt.Sprintf("Failure to do chart sync: %s", err))
}
currentRevision = head
chs.logger.Log("info", fmt.Sprint("End of chartsync"))
Expand All @@ -146,7 +166,7 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn
// Re-release any chart releases that have apparently
// changed in the cluster.
chs.logger.Log("info", fmt.Sprint("Start of releasesync"))
err = chs.ReapplyReleaseDefs(currentRevision)
err = chs.reapplyReleaseDefs(currentClone)
if err != nil {
chs.logger.Log("error", fmt.Sprintf("Failure to do manual release sync: %s", err))
}
Expand All @@ -160,10 +180,25 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn
}()
}

// ReconcileReleaseDef asks the ChartChangeSync to examine the release
// associated with a FluxHelmRelease, compared to what is in the git
// repo, and install or upgrade the release if necessary. This may
// block indefinitely, so the caller provides a context which it can
// cancel if it gets tired of waiting. Returns an error if the context
// timed out or was canceled before the operation was started.
func (chs *ChartChangeSync) ReconcileReleaseDef(ctx context.Context, fhr ifv1.FluxHelmRelease) error {
select {
case chs.reconcile <- fhr:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

// ApplyChartChanges looks at the FluxHelmRelease resources in the
// cluster, figures out which refer to charts that have changed since
// the last commit, then re-releases those that have.
func (chs *ChartChangeSync) ApplyChartChanges(prevRef, head string) error {
func (chs *ChartChangeSync) applyChartChanges(prevRef, head string, clone *git.Export) error {
resources, err := chs.getCustomResources()
if err != nil {
return fmt.Errorf("Failure getting FHR custom resources: %s", err.Error())
Expand All @@ -175,14 +210,6 @@ func (chs *ChartChangeSync) ApplyChartChanges(prevRef, head string) error {
// changed or not changed.
chartHasChanged := map[string]bool{}

// Lazily clone the repo if and when it turns out we need it
var clone *git.Export
defer func() {
if clone != nil {
clone.Clean()
}
}()

for _, fhr := range resources {
chartPath := filepath.Join(chs.config.ChartsPath, fhr.Spec.ChartGitPath)
changed, ok := chartHasChanged[chartPath]
Expand All @@ -197,13 +224,6 @@ func (chs *ChartChangeSync) ApplyChartChanges(prevRef, head string) error {
chartHasChanged[chartPath] = changed
}
if changed {
if clone == nil {
clone, err = chs.exportAtRef(head)
if err != nil {
return fmt.Errorf("failed to clone repo at %s: %s", head, err.Error())
}
}

rlsName := release.GetReleaseName(fhr)
opts := release.InstallOptions{DryRun: false}
if _, err = chs.release.Install(clone.Dir(), rlsName, fhr, release.UpgradeAction, opts); err != nil {
Expand All @@ -216,10 +236,10 @@ func (chs *ChartChangeSync) ApplyChartChanges(prevRef, head string) error {
return nil
}

// ReconcileReleaseDef looks up the helm release associated with a
// reconcileReleaseDef looks up the helm release associated with a
// FluxHelmRelease resource, and either installs, upgrades, or does
// nothing, depending on the state (or absence) of the release.
func (chs *ChartChangeSync) ReconcileReleaseDef(fhr ifv1.FluxHelmRelease, clone *git.Export) {
func (chs *ChartChangeSync) reconcileReleaseDef(fhr ifv1.FluxHelmRelease, clone *git.Export) {
releaseName := release.GetReleaseName(fhr)
rel, err := chs.release.GetDeployedRelease(releaseName)
if err != nil {
Expand Down Expand Up @@ -249,28 +269,18 @@ func (chs *ChartChangeSync) ReconcileReleaseDef(fhr ifv1.FluxHelmRelease, clone
}
}

// ReapplyReleaseDefs goes through the resource definitions and
// reapplyReleaseDefs goes through the resource definitions and
// reconciles them with Helm releases. This is a "backstop" for the
// other sync processes, to cover the case of a release being changed
// out-of-band (e.g., by someone using `helm upgrade`).
func (chs *ChartChangeSync) ReapplyReleaseDefs(ref string) error {
var clone *git.Export
// At this point, one way or another, we are going to need a clone of the repo.
clone, err := chs.exportAtRef(ref)
if err != nil {
return err
}
defer func() {
clone.Clean()
}()

func (chs *ChartChangeSync) reapplyReleaseDefs(clone *git.Export) error {
resources, err := chs.getCustomResources()
if err != nil {
return fmt.Errorf("failed to get FluxHelmRelease resources from the API server: %s", err.Error())
}

for _, fhr := range resources {
chs.ReconcileReleaseDef(fhr, clone)
chs.reconcileReleaseDef(fhr, clone)
}
return nil
}
Expand All @@ -286,19 +296,9 @@ func (chs *ChartChangeSync) DeleteRelease(fhr ifv1.FluxHelmRelease) {
}
}

//---

func (chs *ChartChangeSync) exportAtRef(ref string) (*git.Export, error) {
ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout)
clone, err := chs.config.Repo.Export(ctx, ref)
cancel()
if err != nil {
return nil, fmt.Errorf("error cloning repo at ref %s for chart releases: %s", ref, err.Error())
}
return clone, nil
}
// ---

// GetNamespaces gets current kubernetes cluster namespaces
// getNamespaces gets current kubernetes cluster namespaces
func (chs *ChartChangeSync) getNamespaces() ([]string, error) {
var ns []string
nso, err := chs.kubeClient.CoreV1().Namespaces().List(metav1.ListOptions{})
Expand All @@ -313,7 +313,7 @@ func (chs *ChartChangeSync) getNamespaces() ([]string, error) {
return ns, nil
}

// getCustomResources assembles all custom resources
// getCustomResources assembles all custom resources in all namespaces
func (chs *ChartChangeSync) getCustomResources() ([]ifv1.FluxHelmRelease, error) {
namespaces, err := chs.getNamespaces()
if err != nil {
Expand Down
11 changes: 3 additions & 8 deletions integrations/helm/operator/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,17 +253,12 @@ func (c *Controller) syncHandler(key string) error {
return err
}

// Chart installation of the appropriate type
ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout)
clone, err := c.config.Repo.Export(ctx, c.config.Branch)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = c.sync.ReconcileReleaseDef(ctx, *fhr)
cancel()
if err != nil {
return fmt.Errorf("Failure to clone repo: %s", err.Error())
return err
}
defer clone.Clean()

c.sync.ReconcileReleaseDef(*fhr, clone)

c.recorder.Event(fhr, corev1.EventTypeNormal, ChartSynced, MessageChartSynced)
return nil
}
Expand Down

0 comments on commit 20ce84a

Please sign in to comment.