This repository has been archived by the owner on Nov 1, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Rewrite helm-operator to use weaveworks/flux/git
The go-git package seems to introduce a host of problems, especially with SSH, that are obviated by using the git binary. Given a couple of modest additions to github.com/weaveworks/flux/git, we can use that instead. Namely: read-only repos, and repo "exports" (that is, clones that come with only the ability to look at the files). It also turns out that it's possible to simplify the chartsync and releasesync packages. Given the whole picture in the existing code, it's possible to boild it down to two mechanically simple processes: - chartsync looks for the charts used by FluxHelmRelease resources, and checks whether they've changed since the last commit; - releasesync installs any FluxHelmRelease resources that don't have corresponding releases, and upgrades any that do and differ. (In fact, just the latter would be fine since new commits would result in release diffs; but it's more _eventual_ than having both). Since these are now fairly compact, I've just merged them into the chartsync package. I've removed the releasesync_tests.go here, since it tested internal machinery that no longer exists, but I ought to replace the test coverage forthwith.
- Loading branch information
Showing
11 changed files
with
306 additions
and
1,391 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,35 +1,28 @@ | ||
package main | ||
|
||
import ( | ||
"sync" | ||
"syscall" | ||
"time" | ||
|
||
"net/url" | ||
|
||
"github.com/spf13/pflag" | ||
|
||
"context" | ||
"fmt" | ||
"os" | ||
"os/signal" | ||
"sync" | ||
"syscall" | ||
"time" | ||
|
||
"github.com/go-kit/kit/log" | ||
"github.com/spf13/pflag" | ||
"k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/tools/clientcmd" | ||
|
||
"github.com/weaveworks/flux/git" | ||
clientset "github.com/weaveworks/flux/integrations/client/clientset/versioned" | ||
ifinformers "github.com/weaveworks/flux/integrations/client/informers/externalversions" | ||
fluxhelm "github.com/weaveworks/flux/integrations/helm" | ||
helmop "github.com/weaveworks/flux/integrations/helm" | ||
"github.com/weaveworks/flux/integrations/helm/chartsync" | ||
"github.com/weaveworks/flux/integrations/helm/git" | ||
"github.com/weaveworks/flux/integrations/helm/operator" | ||
"github.com/weaveworks/flux/integrations/helm/release" | ||
"github.com/weaveworks/flux/integrations/helm/releasesync" | ||
"github.com/weaveworks/flux/integrations/helm/status" | ||
"github.com/weaveworks/flux/ssh" | ||
|
||
"k8s.io/client-go/kubernetes" | ||
"k8s.io/client-go/tools/clientcmd" | ||
|
||
gitssh "gopkg.in/src-d/go-git.v4/plumbing/transport/ssh" | ||
) | ||
|
||
var ( | ||
|
@@ -51,40 +44,33 @@ var ( | |
tillerTLSCert *string | ||
tillerTLSCACert *string | ||
|
||
chartsSyncInterval *time.Duration | ||
chartsSyncTimeout *time.Duration | ||
eventHandlerWorkers *uint | ||
|
||
customKubectl *string | ||
gitURL *string | ||
gitBranch *string | ||
//gitConfigPath *string | ||
gitChartsPath *string | ||
chartsSyncInterval *time.Duration | ||
chartsSyncTimeout *time.Duration | ||
|
||
k8sSecretName *string | ||
k8sSecretVolumeMountPath *string | ||
k8sSecretDataKey *string | ||
gitURL *string | ||
gitBranch *string | ||
gitChartsPath *string | ||
gitPollInterval *time.Duration | ||
|
||
queueWorkerCount *int | ||
|
||
name *string | ||
listenAddr *string | ||
gcInterval *time.Duration | ||
|
||
ErrOperatorFailure = "Operator failure: %q" | ||
) | ||
|
||
const ( | ||
defaultGitConfigPath = "releaseconfig" | ||
defaultGitChartsPath = "charts" | ||
|
||
ErrOperatorFailure = "Operator failure: %q" | ||
) | ||
|
||
func init() { | ||
// Flags processing | ||
fs = pflag.NewFlagSet("default", pflag.ExitOnError) | ||
fs.Usage = func() { | ||
fmt.Fprintf(os.Stderr, "DESCRIPTION\n") | ||
fmt.Fprintf(os.Stderr, " helm-operator is a Kubernetes operator for Helm integration into flux.\n") | ||
fmt.Fprintf(os.Stderr, " helm-operator releases Helm charts from git.\n") | ||
fmt.Fprintf(os.Stderr, "\n") | ||
fmt.Fprintf(os.Stderr, "FLAGS\n") | ||
fs.PrintDefaults() | ||
|
@@ -105,17 +91,11 @@ func init() { | |
|
||
chartsSyncInterval = fs.Duration("charts-sync-interval", 3*time.Minute, "Interval at which to check for changed charts") | ||
chartsSyncTimeout = fs.Duration("charts-sync-timeout", 1*time.Minute, "Timeout when checking for changed charts") | ||
eventHandlerWorkers = fs.Uint("event-handler-workers", 2, "Number of workers processing events for Flux-Helm custom resources") | ||
|
||
customKubectl = fs.String("kubernetes-kubectl", "", "Optional, explicit path to kubectl tool") | ||
gitURL = fs.String("git-url", "", "URL of git repo with Helm Charts; e.g., [email protected]:weaveworks/flux-example") | ||
gitBranch = fs.String("git-branch", "master", "branch of git repo") | ||
gitChartsPath = fs.String("git-charts-path", defaultGitChartsPath, "path within git repo to locate Helm Charts (relative path)") | ||
|
||
// k8s-secret backed ssh keyring configuration | ||
// generated by flux daemon | ||
k8sSecretVolumeMountPath = fs.String("k8s-secret-volume-mount-path", "/etc/fluxd/ssh", "Mount location of the k8s secret storing the private SSH key") | ||
k8sSecretDataKey = fs.String("k8s-secret-data-key", "identity", "Data key holding the private SSH key within the k8s secret") | ||
gitPollInterval = fs.Duration("git-poll-interval", 5*time.Minute, "period on which to poll for changes to the git repo") | ||
|
||
queueWorkerCount = fs.Int("queue-worker-count", 2, "Number of workers to process queue with Chart release jobs. Two by default") | ||
} | ||
|
@@ -178,7 +158,6 @@ func main() { | |
IP: *tillerIP, | ||
Port: *tillerPort, | ||
Namespace: *tillerNamespace, | ||
|
||
TLSVerify: *tillerTLSVerify, | ||
TLSEnable: *tillerTLSEnable, | ||
TLSKey: *tillerTLSKey, | ||
|
@@ -191,48 +170,44 @@ func main() { | |
statusUpdater := status.New(ifClient, kubeClient, helmClient) | ||
go statusUpdater.Loop(shutdown, log.With(logger, "component", "annotator")) | ||
|
||
gitURLParsed, err := url.Parse(*gitURL) | ||
if err != nil { | ||
mainLogger.Log("error", fmt.Sprintf("Error parsing -git-url %q: %v", *gitURL, err)) | ||
os.Exit(1) | ||
} | ||
gitRemote := git.Remote{URL: *gitURL} | ||
repo := git.NewRepo(gitRemote, git.PollInterval(*gitPollInterval), git.ReadOnly) | ||
|
||
// GIT REPO SETUP --------------------------------------------------------------------- | ||
var gitAuth *gitssh.PublicKeys | ||
for { | ||
gitAuth, err = git.GetRepoAuth(gitURLParsed.User.Username(), *k8sSecretVolumeMountPath, *k8sSecretDataKey) | ||
// Chart releases sync due to Custom Resources changes ------------------------------- | ||
{ | ||
mainLogger.Log("info", "Attempting to clone repo ...", "url", gitRemote.URL) | ||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) | ||
err := repo.Ready(ctx) | ||
cancel() | ||
if err != nil { | ||
mainLogger.Log("error", fmt.Sprintf("Failed to set up git authorization : %#v", err)) | ||
time.Sleep(20 * time.Second) | ||
continue | ||
} | ||
if err == nil { | ||
break | ||
mainLogger.Log("error", err) | ||
os.Exit(2) | ||
} | ||
} | ||
mainLogger.Log("info", "Repo cloned", "url", gitRemote.URL) | ||
|
||
gitRemoteConfig, err := git.NewGitRemoteConfig(*gitURL, *gitBranch, *gitChartsPath) | ||
if err != nil { | ||
mainLogger.Log("err", err) | ||
os.Exit(1) | ||
// Start the repo fetching from upstream | ||
shutdownWg.Add(1) | ||
go func() { | ||
errc <- repo.Start(shutdown, shutdownWg) | ||
}() | ||
} | ||
gitLogger := log.With(logger, "component", "git") | ||
|
||
// Chart releases sync due to Custom Resources changes ------------------------------- | ||
mainLogger.Log("info", "Starting to clone repo ...") | ||
checkout := git.RepoSetup(gitLogger, gitAuth, gitRemoteConfig, git.ChangesClone) | ||
defer checkout.Cleanup() | ||
mainLogger.Log("info", "Repo cloned") | ||
|
||
// release instance is needed during the sync of Charts changes and during the sync of FluxHelRelease changes | ||
rel := release.New(log.With(logger, "component", "release"), helmClient, checkout) | ||
relsync := releasesync.New(log.With(logger, "component", "releasesync"), rel) | ||
releaseConfig := release.Config{ | ||
ChartsPath: *gitChartsPath, | ||
} | ||
repoConfig := helmop.RepoConfig{ | ||
Repo: repo, | ||
Branch: *gitBranch, | ||
ChartsPath: *gitChartsPath, | ||
} | ||
|
||
// release instance is needed during the sync of Charts changes and during the sync of FluxHelmRelease changes | ||
rel := release.New(log.With(logger, "component", "release"), helmClient, releaseConfig) | ||
// CHARTS CHANGES SYNC ------------------------------------------------------------------ | ||
chartSync := chartsync.New(log.With(logger, "component", "chartsync"), | ||
chartsync.Polling{Interval: *chartsSyncInterval, Timeout: *chartsSyncTimeout}, | ||
chartsync.Clients{KubeClient: *kubeClient, IfClient: *ifClient}, | ||
rel, *relsync) | ||
rel, repoConfig) | ||
chartSync.Run(shutdown, errc, shutdownWg) | ||
|
||
// OPERATOR - CUSTOM RESOURCE CHANGE SYNC ----------------------------------------------- | ||
|
@@ -243,7 +218,7 @@ func main() { | |
// Reference to shared index informers for the FluxHelmRelease | ||
fhrInformer := ifInformerFactory.Helm().V1alpha2().FluxHelmReleases() | ||
|
||
opr := operator.New(log.With(logger, "component", "operator"), kubeClient, fhrInformer, rel) | ||
opr := operator.New(log.With(logger, "component", "operator"), kubeClient, fhrInformer, rel, repoConfig) | ||
// Starts handling k8s events related to the given resource kind | ||
go ifInformerFactory.Start(shutdown) | ||
|
||
|
@@ -253,9 +228,3 @@ func main() { | |
errc <- fmt.Errorf(ErrOperatorFailure, err) | ||
} | ||
} | ||
|
||
// Helper functions ----------------------------------------------------------------------- | ||
func optionalVar(fs *pflag.FlagSet, value ssh.OptionalValue, name, usage string) ssh.OptionalValue { | ||
fs.Var(value, name, usage) | ||
return value | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.