Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1023 from weaveworks/980-sync-for-manual-chart-re…
Browse files Browse the repository at this point in the history
…leases

Sync for manual Chart releases
  • Loading branch information
tamarakaufler authored Apr 13, 2018
2 parents a8f8de9 + 05aa675 commit 6f427cb
Show file tree
Hide file tree
Showing 10 changed files with 616 additions and 254 deletions.
109 changes: 27 additions & 82 deletions cmd/helm-operator/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package main

import (
"context"
"sync"
"syscall"
"time"
Expand All @@ -21,12 +20,11 @@ import (
"github.com/weaveworks/flux/integrations/helm/git"
"github.com/weaveworks/flux/integrations/helm/operator"
"github.com/weaveworks/flux/integrations/helm/release"
"github.com/weaveworks/flux/integrations/helm/releasesync"
"github.com/weaveworks/flux/ssh"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"

k8shelm "k8s.io/helm/pkg/helm"

gitssh "gopkg.in/src-d/go-git.v4/plumbing/transport/ssh"
)

Expand Down Expand Up @@ -116,7 +114,6 @@ func main() {
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)
}
// ----------------------------------------------------------------------

// SHUTDOWN ----------------------------------------------------------------------------
errc := make(chan error)
Expand All @@ -136,27 +133,9 @@ func main() {
close(shutdown)
shutdownWg.Wait()
}()
// ----------------------------------------------------------------------

mainLogger := log.With(logger, "component", "helm-operator")

// GIT REPO CONFIG ----------------------------------------------------------------------
mainLogger.Log("info", "Setting up git repo configs")

gitRemoteConfigFhr, err := git.NewGitRemoteConfig(*gitURL, *gitBranch, *gitChartsPath)
if err != nil {
mainLogger.Log("err", err)
os.Exit(1)
}
fmt.Printf("%#v", gitRemoteConfigFhr)
gitRemoteConfigCh, err := git.NewGitRemoteConfig(*gitURL, *gitBranch, *gitChartsPath)
if err != nil {
mainLogger.Log("err", err)
os.Exit(1)
}
fmt.Printf("%#v", gitRemoteConfigCh)
mainLogger.Log("info", "Finished setting up git repo configs")

// CLUSTER ACCESS -----------------------------------------------------------------------
cfg, err := clientcmd.BuildConfigFromFlags(*master, *kubeconfig)
if err != nil {
Expand All @@ -170,7 +149,7 @@ func main() {
os.Exit(1)
}

// CUSTOM RESOURCES ----------------------------------------------------------------------
// CUSTOM RESOURCES CLIENT --------------------------------------------------------------
ifClient, err := clientset.NewForConfig(cfg)
if err != nil {
mainLogger.Log("error", fmt.Sprintf("Error building integrations clientset: %v", err))
Expand All @@ -179,28 +158,14 @@ func main() {
}

// HELM ---------------------------------------------------------------------------------
var helmClient *k8shelm.Client
for {
helmClient, err = fluxhelm.NewClient(kubeClient, fluxhelm.TillerOptions{IP: *tillerIP, Port: *tillerPort, Namespace: *tillerNamespace})
if err != nil {
mainLogger.Log("error", fmt.Sprintf("Error creating helm client: %v", err))
time.Sleep(20 * time.Second)
continue
}
mainLogger.Log("info", "Set up Helm client")
break
}
//---------------------------------------------------------------------------------------

// GIT REPO CLONING ---------------------------------------------------------------------
mainLogger.Log("info", "Starting to clone repo ...")
helmClient := fluxhelm.ClientSetup(log.With(logger, "component", "helm"), kubeClient, fluxhelm.TillerOptions{IP: *tillerIP, Port: *tillerPort, Namespace: *tillerNamespace})

// GIT REPO SETUP ---------------------------------------------------------------------
var gitAuth *gitssh.PublicKeys
for {
gitAuth, err = git.GetRepoAuth(*k8sSecretVolumeMountPath, *k8sSecretDataKey)
if err != nil {
mainLogger.Log("error", fmt.Sprintf("Failed to set up git authorization : %#v", err))
//errc <- fmt.Errorf("Failed to create Checkout [%#v]: %v", gitRemoteConfigFhr, err)
time.Sleep(20 * time.Second)
continue
}
Expand All @@ -209,70 +174,50 @@ func main() {
}
}

// Chart releases sync due to Custom Resources changes -------------------------------
checkoutFhr := git.NewCheckout(log.With(logger, "component", "git"), gitRemoteConfigFhr, gitAuth)
defer checkoutFhr.Cleanup()

// If cloning not immediately possible, we wait until it is -----------------------------
for {
mainLogger.Log("info", "Cloning repo ...")
ctx, cancel := context.WithTimeout(context.Background(), git.DefaultCloneTimeout)
err = checkoutFhr.Clone(ctx, git.FhrsChangesClone)
cancel()
if err == nil {
break
}
mainLogger.Log("error", fmt.Sprintf("Failed to clone git repo [%s, %s, %s]: %v", gitRemoteConfigFhr.URL, gitRemoteConfigFhr.Path, gitRemoteConfigFhr.Branch, err))
time.Sleep(10 * time.Second)
gitRemoteConfig, err := git.NewGitRemoteConfig(*gitURL, *gitBranch, *gitChartsPath)
if err != nil {
mainLogger.Log("err", err)
os.Exit(1)
}
gitLogger := log.With(logger, "component", "git")

// Chart releases sync due to Custom Resources changes -------------------------------
mainLogger.Log("info", "Starting to clone repo ...")
checkout := git.RepoSetup(gitLogger, gitAuth, gitRemoteConfig, git.ChangesClone)
defer checkout.Cleanup()
mainLogger.Log("info", "Repo cloned")

// Chart releases sync due to pure Charts changes ------------------------------------
checkoutCh := git.NewCheckout(log.With(logger, "component", "git"), gitRemoteConfigCh, gitAuth)
defer checkoutCh.Cleanup()
// release instance is needed during the sync of Charts changes and during the sync of FluxHelRelease changes
rel := release.New(log.With(logger, "component", "release"), helmClient, checkout)
relsync := releasesync.New(log.With(logger, "component", "releasesync"), rel)

// If cloning not immediately possible, we wait until it is -----------------------------
for {
mainLogger.Log("info", "Cloning repo ...")
ctx, cancel := context.WithTimeout(context.Background(), git.DefaultCloneTimeout)
err = checkoutCh.Clone(ctx, git.ChartsChangesClone)
cancel()
if err == nil {
break
}
mainLogger.Log("error", fmt.Sprintf("Failed to clone git repo [%s, %s, %s]: %v", gitRemoteConfigCh.URL, gitRemoteConfigCh.Branch, gitRemoteConfigCh.Path, err))
time.Sleep(10 * time.Second)
}
mainLogger.Log("info", "Repo cloned")
// CHARTS CHANGES SYNC ------------------------------------------------------------------
chartSync := chartsync.New(log.With(logger, "component", "chartsync"),
chartsync.Polling{Interval: *chartsSyncInterval, Timeout: *chartsSyncTimeout},
chartsync.Clients{KubeClient: *kubeClient, IfClient: *ifClient},
rel, *relsync)
chartSync.Run(shutdown, errc, shutdownWg)

//=======================================================================================
// OPERATOR - CUSTOM RESOURCE CHANGE SYNC -----------------------------------------------
// CUSTOM RESOURCES CACHING SETUP -------------------------------------------------------
// SharedInformerFactory sets up informer, that maps resource type to a cache shared informer.
// operator attaches event handler to the informer and syncs the informer cache
ifInformerFactory := ifinformers.NewSharedInformerFactory(ifClient, 30*time.Second)
// Obtain reference to shared index informers for the FluxHelmRelease
// Reference to shared index informers for the FluxHelmRelease
fhrInformer := ifInformerFactory.Helm().V1alpha().FluxHelmReleases()

rel := release.New(log.With(logger, "component", "release"), helmClient, checkoutFhr, checkoutCh)

// CHARTS CHANGES SYNC -----------------------------------------------------------------------------
chartSync := chartsync.New(log.With(logger, "component", "chartsync"), *chartsSyncInterval, *chartsSyncTimeout, *kubeClient, *ifClient, fhrInformer, rel)
chartSync.Run(shutdown, errc, shutdownWg)
//---------------------------------------------------------------------------------------

// OPERATOR - CUSTOM RRESOURCES CHANGE SYNC ----------------------------------------------
opr := operator.New(log.With(logger, "component", "operator"), kubeClient, fhrInformer, rel)
// Starts handling k8s events related to the given resource kind
go ifInformerFactory.Start(shutdown)

if err = opr.Run(*queueWorkerCount, shutdown, shutdownWg); err != nil {
msg := fmt.Sprintf("Failure to run controller: %s", err.Error())
logger.Log("error", msg)
errc <- fmt.Errorf(ErrOperatorFailure, err)
}
//=======================================================================================
}

// Helper functions
// Helper functions -----------------------------------------------------------------------
func optionalVar(fs *pflag.FlagSet, value ssh.OptionalValue, name, usage string) ssh.OptionalValue {
fs.Var(value, name, usage)
return value
Expand Down
Loading

0 comments on commit 6f427cb

Please sign in to comment.