diff --git a/integrations/helm/chartsync/chartsync.go b/integrations/helm/chartsync/chartsync.go index 5781210b9..1b16954a3 100644 --- a/integrations/helm/chartsync/chartsync.go +++ b/integrations/helm/chartsync/chartsync.go @@ -73,6 +73,7 @@ type ChartChangeSync struct { ifClient ifclientset.Clientset release *release.Release config helmop.RepoConfig + reconcile chan ifv1.FluxHelmRelease } func New(logger log.Logger, polling Polling, clients Clients, release *release.Release, config helmop.RepoConfig) *ChartChangeSync { @@ -83,6 +84,7 @@ func New(logger log.Logger, polling Polling, clients Clients, release *release.R ifClient: clients.IfClient, release: release, config: config, + reconcile: make(chan ifv1.FluxHelmRelease), } } @@ -101,17 +103,26 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) currentRevision, err := chs.config.Repo.Revision(ctx, chs.config.Branch) + var currentClone *git.Export + if err == nil { + currentClone, err = chs.config.Repo.Export(ctx, currentRevision) + } cancel() if err != nil { errc <- err return } + // From now, currentClone shall not revert to nil + defer currentClone.Clean() + ticker := time.NewTicker(chs.Polling.Interval) defer ticker.Stop() for { select { + case fhr := <-chs.reconcile: + chs.reconcileReleaseDef(fhr, currentClone) case <-chs.config.Repo.C: ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) head, err := chs.config.Repo.Revision(ctx, chs.config.Branch) @@ -126,11 +137,20 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn continue } - // Sync changes to charts in the git repo + ctx, cancel = context.WithTimeout(context.Background(), helmop.GitOperationTimeout) + newClone, err := chs.config.Repo.Export(ctx, head) + cancel() + if err != nil { + chs.logger.Log("warning", "failure to clone git repo", "error", err) + continue + } + currentClone.Clean() + currentClone = newClone + chs.logger.Log("info", fmt.Sprint("Start of chartsync")) - err = chs.ApplyChartChanges(currentRevision, head) + err = chs.applyChartChanges(currentRevision, head, currentClone) if err != nil { - chs.logger.Log("error", fmt.Sprintf("Failure to do chart sync: %#v", err)) + chs.logger.Log("error", fmt.Sprintf("Failure to do chart sync: %s", err)) } currentRevision = head chs.logger.Log("info", fmt.Sprint("End of chartsync")) @@ -139,7 +159,7 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn // Re-release any chart releases that have apparently // changed in the cluster. chs.logger.Log("info", fmt.Sprint("Start of releasesync")) - err = chs.ReapplyReleaseDefs(currentRevision) + err = chs.reapplyReleaseDefs(currentClone) if err != nil { chs.logger.Log("error", fmt.Sprintf("Failure to do manual release sync: %s", err)) } @@ -153,10 +173,25 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn }() } +// ReconcileReleaseDef asks the ChartChangeSync to examine the release +// associated with a FluxHelmRelease, compared to what is in the git +// repo, and install or upgrade the release if necessary. This may +// block indefinitely, so the caller provides a context which it can +// cancel if it gets tired of waiting. Returns an error if the context +// timed out or was canceled before the operation was started. +func (chs *ChartChangeSync) ReconcileReleaseDef(ctx context.Context, fhr ifv1.FluxHelmRelease) error { + select { + case chs.reconcile <- fhr: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + // ApplyChartChanges looks at the FluxHelmRelease resources in the // cluster, figures out which refer to charts that have changed since // the last commit, then re-releases those that have. -func (chs *ChartChangeSync) ApplyChartChanges(prevRef, head string) error { +func (chs *ChartChangeSync) applyChartChanges(prevRef, head string, clone *git.Export) error { resources, err := chs.getCustomResources() if err != nil { return fmt.Errorf("Failure getting FHR custom resources: %s", err.Error()) @@ -168,14 +203,6 @@ func (chs *ChartChangeSync) ApplyChartChanges(prevRef, head string) error { // changed or not changed. chartHasChanged := map[string]bool{} - // Lazily clone the repo if and when it turns out we need it - var clone *git.Export - defer func() { - if clone != nil { - clone.Clean() - } - }() - for _, fhr := range resources { chartPath := filepath.Join(chs.config.ChartsPath, fhr.Spec.ChartGitPath) changed, ok := chartHasChanged[chartPath] @@ -190,13 +217,6 @@ func (chs *ChartChangeSync) ApplyChartChanges(prevRef, head string) error { chartHasChanged[chartPath] = changed } if changed { - if clone == nil { - clone, err = chs.exportAtRef(head) - if err != nil { - return fmt.Errorf("failed to clone repo at %s: %s", head, err.Error()) - } - } - rlsName := release.GetReleaseName(fhr) opts := release.InstallOptions{DryRun: false} if _, err = chs.release.Install(clone.Dir(), rlsName, fhr, release.UpgradeAction, opts); err != nil { @@ -209,10 +229,10 @@ func (chs *ChartChangeSync) ApplyChartChanges(prevRef, head string) error { return nil } -// ReconcileReleaseDef looks up the helm release associated with a +// reconcileReleaseDef looks up the helm release associated with a // FluxHelmRelease resource, and either installs, upgrades, or does // nothing, depending on the state (or absence) of the release. -func (chs *ChartChangeSync) ReconcileReleaseDef(fhr ifv1.FluxHelmRelease, clone *git.Export) { +func (chs *ChartChangeSync) reconcileReleaseDef(fhr ifv1.FluxHelmRelease, clone *git.Export) { releaseName := release.GetReleaseName(fhr) rel, err := chs.release.GetDeployedRelease(releaseName) if err != nil { @@ -242,28 +262,18 @@ func (chs *ChartChangeSync) ReconcileReleaseDef(fhr ifv1.FluxHelmRelease, clone } } -// ReapplyReleaseDefs goes through the resource definitions and +// reapplyReleaseDefs goes through the resource definitions and // reconciles them with Helm releases. This is a "backstop" for the // other sync processes, to cover the case of a release being changed // out-of-band (e.g., by someone using `helm upgrade`). -func (chs *ChartChangeSync) ReapplyReleaseDefs(ref string) error { - var clone *git.Export - // At this point, one way or another, we are going to need a clone of the repo. - clone, err := chs.exportAtRef(ref) - if err != nil { - return err - } - defer func() { - clone.Clean() - }() - +func (chs *ChartChangeSync) reapplyReleaseDefs(clone *git.Export) error { resources, err := chs.getCustomResources() if err != nil { return fmt.Errorf("failed to get FluxHelmRelease resources from the API server: %s", err.Error()) } for _, fhr := range resources { - chs.ReconcileReleaseDef(fhr, clone) + chs.reconcileReleaseDef(fhr, clone) } return nil } @@ -279,19 +289,9 @@ func (chs *ChartChangeSync) DeleteRelease(fhr ifv1.FluxHelmRelease) { } } -//--- - -func (chs *ChartChangeSync) exportAtRef(ref string) (*git.Export, error) { - ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) - clone, err := chs.config.Repo.Export(ctx, ref) - cancel() - if err != nil { - return nil, fmt.Errorf("error cloning repo at ref %s for chart releases: %s", ref, err.Error()) - } - return clone, nil -} +// --- -// GetNamespaces gets current kubernetes cluster namespaces +// getNamespaces gets current kubernetes cluster namespaces func (chs *ChartChangeSync) getNamespaces() ([]string, error) { var ns []string nso, err := chs.kubeClient.CoreV1().Namespaces().List(metav1.ListOptions{}) @@ -306,7 +306,7 @@ func (chs *ChartChangeSync) getNamespaces() ([]string, error) { return ns, nil } -// getCustomResources assembles all custom resources +// getCustomResources assembles all custom resources in all namespaces func (chs *ChartChangeSync) getCustomResources() ([]ifv1.FluxHelmRelease, error) { namespaces, err := chs.getNamespaces() if err != nil { diff --git a/integrations/helm/operator/operator.go b/integrations/helm/operator/operator.go index 20000c427..05bdc1041 100644 --- a/integrations/helm/operator/operator.go +++ b/integrations/helm/operator/operator.go @@ -249,17 +249,12 @@ func (c *Controller) syncHandler(key string) error { return err } - // Chart installation of the appropriate type - ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout) - clone, err := c.config.Repo.Export(ctx, c.config.Branch) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + err = c.sync.ReconcileReleaseDef(ctx, *fhr) cancel() if err != nil { - return fmt.Errorf("Failure to clone repo: %s", err.Error()) + return err } - defer clone.Clean() - - c.sync.ReconcileReleaseDef(*fhr, clone) - c.recorder.Event(fhr, corev1.EventTypeNormal, ChartSynced, MessageChartSynced) return nil }