Skip to content
This repository was archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1254 from weaveworks/rewrite/run-operator-events-…
Browse files Browse the repository at this point in the history
…via-chartsync

Rewrite operator to use chartsync
  • Loading branch information
squaremo authored Aug 20, 2018
2 parents 1b3eec0 + e481080 commit 2967072
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 123 deletions.
8 changes: 5 additions & 3 deletions cmd/helm-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"context"
"flag"
"fmt"
"os"
"os/signal"
Expand Down Expand Up @@ -111,7 +112,9 @@ func init() {
}

func main() {

// Stop glog complaining
flag.CommandLine.Parse([]string{"-logtostderr"})
// Now do our own
fs.Parse(os.Args)

if *versionFlag {
Expand Down Expand Up @@ -233,8 +236,7 @@ func main() {
// Reference to shared index informers for the FluxHelmRelease
fhrInformer := ifInformerFactory.Helm().V1alpha2().FluxHelmReleases()

opr := operator.New(log.With(logger, "component", "operator"), *logReleaseDiffs,
kubeClient, fhrInformer, rel, repoConfig)
opr := operator.New(log.With(logger, "component", "operator"), *logReleaseDiffs, kubeClient, fhrInformer, chartSync, repoConfig)
// Starts handling k8s events related to the given resource kind
go ifInformerFactory.Start(shutdown)

Expand Down
161 changes: 90 additions & 71 deletions integrations/helm/chartsync/chartsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ type ChartChangeSync struct {
release *release.Release
config helmop.RepoConfig
logDiffs bool

mu sync.RWMutex
clone *git.Export
}

func New(logger log.Logger, polling Polling, clients Clients, release *release.Release, config helmop.RepoConfig, logReleaseDiffs bool) *ChartChangeSync {
Expand Down Expand Up @@ -108,11 +111,17 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn

ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout)
currentRevision, err := chs.config.Repo.Revision(ctx, chs.config.Branch)
if err == nil {
chs.mu.Lock()
chs.clone, err = chs.config.Repo.Export(ctx, currentRevision)
chs.mu.Unlock()
}
cancel()
if err != nil {
errc <- err
return
}
defer chs.clone.Clean()

ticker := time.NewTicker(chs.Polling.Interval)
defer ticker.Stop()
Expand All @@ -133,11 +142,22 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn
continue
}

// Sync changes to charts in the git repo
ctx, cancel = context.WithTimeout(context.Background(), helmop.GitOperationTimeout)
newClone, err := chs.config.Repo.Export(ctx, head)
cancel()
if err != nil {
chs.logger.Log("warning", "failure to clone git repo", "error", err)
continue
}
chs.mu.Lock()
chs.clone.Clean()
chs.clone = newClone
chs.mu.Unlock()

chs.logger.Log("info", fmt.Sprint("Start of chartsync"))
err = chs.ApplyChartChanges(currentRevision, head)
err = chs.applyChartChanges(currentRevision, head)
if err != nil {
chs.logger.Log("error", fmt.Sprintf("Failure to do chart sync: %#v", err))
chs.logger.Log("error", fmt.Sprintf("Failure to do chart sync: %s", err))
}
currentRevision = head
chs.logger.Log("info", fmt.Sprint("End of chartsync"))
Expand All @@ -146,7 +166,7 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn
// Re-release any chart releases that have apparently
// changed in the cluster.
chs.logger.Log("info", fmt.Sprint("Start of releasesync"))
err = chs.ReapplyReleaseDefs(currentRevision)
err = chs.reapplyReleaseDefs()
if err != nil {
chs.logger.Log("error", fmt.Sprintf("Failure to do manual release sync: %s", err))
}
Expand All @@ -160,10 +180,20 @@ func (chs *ChartChangeSync) Run(stopCh <-chan struct{}, errc chan error, wg *syn
}()
}

// ReconcileReleaseDef asks the ChartChangeSync to examine the release
// associated with a FluxHelmRelease, compared to what is in the git
// repo, and install or upgrade the release if necessary. This may
// block indefinitely, so the caller provides a context which it can
// cancel if it gets tired of waiting. Returns an error if the context
// timed out or was canceled before the operation was started.
func (chs *ChartChangeSync) ReconcileReleaseDef(fhr ifv1.FluxHelmRelease) {
chs.reconcileReleaseDef(fhr)
}

// ApplyChartChanges looks at the FluxHelmRelease resources in the
// cluster, figures out which refer to charts that have changed since
// the last commit, then re-releases those that have.
func (chs *ChartChangeSync) ApplyChartChanges(prevRef, head string) error {
func (chs *ChartChangeSync) applyChartChanges(prevRef, head string) error {
resources, err := chs.getCustomResources()
if err != nil {
return fmt.Errorf("Failure getting FHR custom resources: %s", err.Error())
Expand All @@ -175,14 +205,6 @@ func (chs *ChartChangeSync) ApplyChartChanges(prevRef, head string) error {
// changed or not changed.
chartHasChanged := map[string]bool{}

// Lazily clone the repo if and when it turns out we need it
var clone *git.Export
defer func() {
if clone != nil {
clone.Clean()
}
}()

for _, fhr := range resources {
chartPath := filepath.Join(chs.config.ChartsPath, fhr.Spec.ChartGitPath)
changed, ok := chartHasChanged[chartPath]
Expand All @@ -197,90 +219,87 @@ func (chs *ChartChangeSync) ApplyChartChanges(prevRef, head string) error {
chartHasChanged[chartPath] = changed
}
if changed {
if clone == nil {
clone, err = chs.exportAtRef(head)
if err != nil {
return fmt.Errorf("failed to clone repo at %s: %s", head, err.Error())
}
}

rlsName := release.GetReleaseName(fhr)
opts := release.InstallOptions{DryRun: false}
if _, err = chs.release.Install(clone.Dir(), rlsName, fhr, release.UpgradeAction, opts); err != nil {
chs.mu.RLock()
if _, err = chs.release.Install(chs.clone.Dir(), rlsName, fhr, release.UpgradeAction, opts); err != nil {
// NB in this step, failure to release is considered non-fatal, i.e,. we move on to the next rather than giving up entirely.
chs.logger.Log("warning", "failure to release chart with changes in git", "error", err, "chart", chartPath, "release", rlsName)
}
chs.mu.RUnlock()
}
}

return nil
}

func (chs *ChartChangeSync) ReapplyReleaseDefs(ref string) error {
var clone *git.Export
defer func() {
if clone != nil {
clone.Clean()
}
}()
// reconcileReleaseDef looks up the helm release associated with a
// FluxHelmRelease resource, and either installs, upgrades, or does
// nothing, depending on the state (or absence) of the release.
func (chs *ChartChangeSync) reconcileReleaseDef(fhr ifv1.FluxHelmRelease) {
releaseName := release.GetReleaseName(fhr)

resources, err := chs.getCustomResources()
if err != nil {
return fmt.Errorf("failed to get FluxHelmRelease resources from the API server: %s", err.Error())
}
// There's no exact way in the Helm API to test whether a release
// exists or not. Instead, try to fetch it, and treat an error as
// not existing (and possibly fail further below, if it meant
// something else).
rel, _ := chs.release.GetDeployedRelease(releaseName)

for _, fhr := range resources {
releaseName := release.GetReleaseName(fhr)
rel, err := chs.release.GetDeployedRelease(releaseName)
chs.mu.RLock()
defer chs.mu.RUnlock()

opts := release.InstallOptions{DryRun: false}
if rel == nil {
_, err := chs.release.Install(chs.clone.Dir(), releaseName, fhr, release.InstallAction, opts)
if err != nil {
return fmt.Errorf("failed to get release %q: %s", releaseName, err)
chs.logger.Log("warning", "Failed to install chart", "namespace", fhr.Namespace, "name", fhr.Name, "error", err)
}
return
}

// At this point, one way or another, we are going to need a clone of the repo.
if clone == nil {
clone, err = chs.exportAtRef(ref)
if err != nil {
return err
}
changed, err := chs.shouldUpgrade(chs.clone.Dir(), rel, fhr)
if err != nil {
chs.logger.Log("warning", "Unable to determine if release has changed", "namespace", fhr.Namespace, "name", fhr.Name, "error", err)
return
}
if changed {
_, err := chs.release.Install(chs.clone.Dir(), releaseName, fhr, release.UpgradeAction, opts)
if err != nil {
chs.logger.Log("warning", "Failed to upgrade chart", "namespace", fhr.Namespace, "name", fhr.Name, "error", err)
}
}
}

opts := release.InstallOptions{DryRun: false}
if rel == nil {
_, err := chs.release.Install(clone.Dir(), releaseName, fhr, release.InstallAction, opts)
if err != nil {
chs.logger.Log("warning", "Failed to install chart", "namespace", fhr.Namespace, "name", fhr.Name, "error", err)
}
continue
}
// reapplyReleaseDefs goes through the resource definitions and
// reconciles them with Helm releases. This is a "backstop" for the
// other sync processes, to cover the case of a release being changed
// out-of-band (e.g., by someone using `helm upgrade`).
func (chs *ChartChangeSync) reapplyReleaseDefs() error {
resources, err := chs.getCustomResources()
if err != nil {
return fmt.Errorf("failed to get FluxHelmRelease resources from the API server: %s", err.Error())
}

changed, err := chs.shouldUpgrade(clone.Dir(), rel, fhr)
if err != nil {
chs.logger.Log("warning", "Unable to determine if release has changed", "namespace", fhr.Namespace, "name", fhr.Name, "error", err)
continue
}
if changed {
_, err := chs.release.Install(clone.Dir(), releaseName, fhr, release.UpgradeAction, opts)
if err != nil {
chs.logger.Log("warning", "Failed to upgrade chart", "namespace", fhr.Namespace, "name", fhr.Name, "error", err)
}
}
for _, fhr := range resources {
chs.reconcileReleaseDef(fhr)
}
return nil
}

//---

func (chs *ChartChangeSync) exportAtRef(ref string) (*git.Export, error) {
ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout)
clone, err := chs.config.Repo.Export(ctx, ref)
cancel()
// DeleteRelease deletes the helm release associated with a
// FluxHelmRelease. This exists mainly so that the operator code can
// call it when it is handling a resource deletion.
func (chs *ChartChangeSync) DeleteRelease(fhr ifv1.FluxHelmRelease) {
name := release.GetReleaseName(fhr)
err := chs.release.Delete(name)
if err != nil {
return nil, fmt.Errorf("error cloning repo at ref %s for chart releases: %s", ref, err.Error())
chs.logger.Log("warning", "Chart release not deleted", "release", name, "error", err)
}
return clone, nil
}

// GetNamespaces gets current kubernetes cluster namespaces
// ---

// getNamespaces gets current kubernetes cluster namespaces
func (chs *ChartChangeSync) getNamespaces() ([]string, error) {
var ns []string
nso, err := chs.kubeClient.CoreV1().Namespaces().List(metav1.ListOptions{})
Expand All @@ -295,7 +314,7 @@ func (chs *ChartChangeSync) getNamespaces() ([]string, error) {
return ns, nil
}

// getCustomResources assembles all custom resources
// getCustomResources assembles all custom resources in all namespaces
func (chs *ChartChangeSync) getCustomResources() ([]ifv1.FluxHelmRelease, error) {
namespaces, err := chs.getNamespaces()
if err != nil {
Expand Down
49 changes: 7 additions & 42 deletions integrations/helm/operator/operator.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package operator

import (
"context"
"errors"
"fmt"
"sync"
Expand All @@ -26,7 +25,7 @@ import (
fhrv1 "github.com/weaveworks/flux/integrations/client/informers/externalversions/helm.integrations.flux.weave.works/v1alpha2"
iflister "github.com/weaveworks/flux/integrations/client/listers/helm.integrations.flux.weave.works/v1alpha2"
helmop "github.com/weaveworks/flux/integrations/helm"
chartrelease "github.com/weaveworks/flux/integrations/helm/release"
"github.com/weaveworks/flux/integrations/helm/chartsync"
)

const (
Expand Down Expand Up @@ -58,8 +57,8 @@ type Controller struct {
fhrLister iflister.FluxHelmReleaseLister
fhrSynced cache.InformerSynced

release *chartrelease.Release
config helmop.RepoConfig
sync *chartsync.ChartChangeSync
config helmop.RepoConfig

// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
Expand All @@ -79,7 +78,7 @@ func New(
logReleaseDiffs bool,
kubeclientset kubernetes.Interface,
fhrInformer fhrv1.FluxHelmReleaseInformer,
release *chartrelease.Release,
sync *chartsync.ChartChangeSync,
config helmop.RepoConfig) *Controller {

// Add helm-operator types to the default Kubernetes Scheme so Events can be
Expand All @@ -95,9 +94,9 @@ func New(
logDiffs: logReleaseDiffs,
fhrLister: fhrInformer.Lister(),
fhrSynced: fhrInformer.Informer().HasSynced,
release: release,
releaseWorkqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ChartRelease"),
recorder: recorder,
sync: sync,
config: config,
}

Expand Down Expand Up @@ -253,36 +252,7 @@ func (c *Controller) syncHandler(key string) error {
return err
}

var syncType chartrelease.Action

releaseName := chartrelease.GetReleaseName(*fhr)
ok, err := c.release.Exists(releaseName)
if ok {
if err != nil {
c.logger.Log("error", fmt.Sprintf("Failure to do Chart release [%s]: %#v", releaseName, err))
return err
}
syncType = chartrelease.UpgradeAction
}
if !ok {
syncType = chartrelease.InstallAction
}

// Chart installation of the appropriate type
ctx, cancel := context.WithTimeout(context.Background(), helmop.GitOperationTimeout)
clone, err := c.config.Repo.Export(ctx, c.config.Branch)
cancel()
if err != nil {
return fmt.Errorf("Failure to clone repo: %s", err.Error())
}
defer clone.Clean()

opts := chartrelease.InstallOptions{DryRun: false}
_, err = c.release.Install(clone.Dir(), releaseName, *fhr, syncType, opts)
if err != nil {
return err
}

c.sync.ReconcileReleaseDef(*fhr)
c.recorder.Event(fhr, corev1.EventTypeNormal, ChartSynced, MessageChartSynced)
return nil
}
Expand Down Expand Up @@ -345,10 +315,5 @@ func (c *Controller) enqueueUpateJob(old, new interface{}) {
func (c *Controller) deleteRelease(fhr ifv1.FluxHelmRelease) {
c.logger.Log("info", "DELETING release")
c.logger.Log("info", "Custom Resource driven release deletion")
name := chartrelease.GetReleaseName(fhr)
err := c.release.Delete(name)
if err != nil {
c.logger.Log("error", fmt.Sprintf("Chart release [%s] not deleted: %#v", name, err))
}
return
c.sync.DeleteRelease(fhr)
}
7 changes: 0 additions & 7 deletions integrations/helm/release/release.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,6 @@ func (r *Release) GetDeployedRelease(name string) (*hapi_release.Release, error)
return nil, nil
}

// Exists detects if a particular Chart release exists
// release name must match regex ^(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])+$
func (r *Release) Exists(name string) (bool, error) {
rls, err := r.GetDeployedRelease(name)
return rls != nil, err
}

func (r *Release) canDelete(name string) (bool, error) {
rls, err := r.HelmClient.ReleaseStatus(name)

Expand Down

0 comments on commit 2967072

Please sign in to comment.