Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-1.21] Add events to deploy controller #3615

Merged
merged 3 commits into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 98 additions & 39 deletions pkg/deploy/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,32 @@ import (

errors2 "github.com/pkg/errors"
"github.com/rancher/k3s/pkg/agent/util"
v12 "github.com/rancher/k3s/pkg/apis/k3s.cattle.io/v1"
v1 "github.com/rancher/k3s/pkg/generated/controllers/k3s.cattle.io/v1"
apisv1 "github.com/rancher/k3s/pkg/apis/k3s.cattle.io/v1"
controllersv1 "github.com/rancher/k3s/pkg/generated/controllers/k3s.cattle.io/v1"
"github.com/rancher/wrangler/pkg/apply"
"github.com/rancher/wrangler/pkg/merr"
"github.com/rancher/wrangler/pkg/objectset"
"github.com/rancher/wrangler/pkg/schemes"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
yamlDecoder "k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/kubernetes"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
)

const (
ControllerName = "deploy"
ns = "kube-system"
startKey = "_start_"
)

func WatchFiles(ctx context.Context, apply apply.Apply, addons v1.AddonController, disables map[string]bool, bases ...string) error {
// WatchFiles sets up an OnChange callback to start a periodic goroutine to watch files for changes once the controller has started up.
func WatchFiles(ctx context.Context, client kubernetes.Interface, apply apply.Apply, addons controllersv1.AddonController, disables map[string]bool, bases ...string) error {
w := &watcher{
apply: apply,
addonCache: addons.Cache(),
Expand All @@ -46,10 +51,10 @@ func WatchFiles(ctx context.Context, apply apply.Apply, addons v1.AddonControlle
modTime: map[string]time.Time{},
}

addons.Enqueue("", startKey)
addons.OnChange(ctx, "addon-start", func(key string, _ *v12.Addon) (*v12.Addon, error) {
addons.Enqueue(metav1.NamespaceNone, startKey)
addons.OnChange(ctx, "addon-start", func(key string, _ *apisv1.Addon) (*apisv1.Addon, error) {
if key == startKey {
go w.start(ctx)
go w.start(ctx, client)
}
return nil, nil
})
Expand All @@ -59,14 +64,22 @@ func WatchFiles(ctx context.Context, apply apply.Apply, addons v1.AddonControlle

type watcher struct {
apply apply.Apply
addonCache v1.AddonCache
addons v1.AddonClient
addonCache controllersv1.AddonCache
addons controllersv1.AddonClient
bases []string
disables map[string]bool
modTime map[string]time.Time
recorder record.EventRecorder
}

func (w *watcher) start(ctx context.Context) {
// start calls listFiles at regular intervals to trigger application of manifests that have changed on disk.
func (w *watcher) start(ctx context.Context, client kubernetes.Interface) {
nodeName := os.Getenv("NODE_NAME")
broadcaster := record.NewBroadcaster()
broadcaster.StartLogging(logrus.Infof)
broadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: client.CoreV1().Events(metav1.NamespaceSystem)})
w.recorder = broadcaster.NewRecorder(schemes.All, corev1.EventSource{Component: ControllerName, Host: nodeName})

force := true
for {
if err := w.listFiles(force); err == nil {
Expand All @@ -82,6 +95,7 @@ func (w *watcher) start(ctx context.Context) {
}
}

// listFiles calls listFilesIn on a list of paths.
func (w *watcher) listFiles(force bool) error {
var errs []error
for _, base := range w.bases {
Expand All @@ -92,6 +106,8 @@ func (w *watcher) listFiles(force bool) error {
return merr.NewErrors(errs...)
}

// listFilesIn recursively processes all files within a path, and checks them against the disable and skip lists. Files found that
// are not on either list are loaded as Addons and applied to the cluster.
func (w *watcher) listFilesIn(base string, force bool) error {
files := map[string]os.FileInfo{}
if err := filepath.Walk(base, func(path string, info os.FileInfo, err error) error {
Expand All @@ -104,6 +120,9 @@ func (w *watcher) listFilesIn(base string, force bool) error {
return err
}

// Make a map of .skip files - these are used later to indicate that a given file should be ignored
// For example, 'addon.yaml.skip' will cause 'addon.yaml' to be ignored completely - unless it is also
// disabled, since disable processing happens first.
skips := map[string]bool{}
keys := make([]string, len(files))
keyIndex := 0
Expand All @@ -118,13 +137,15 @@ func (w *watcher) listFilesIn(base string, force bool) error {

var errs []error
for _, path := range keys {
if shouldDisableService(base, path, w.disables) {
// Disabled files are not just skipped, but actively deleted from the filesystem
if shouldDisableFile(base, path, w.disables) {
if err := w.delete(path); err != nil {
errs = append(errs, errors2.Wrapf(err, "failed to delete %s", path))
}
continue
}
if skipFile(files[path].Name(), skips) {
// Skipped files are just ignored
if shouldSkipFile(files[path].Name(), skips) {
continue
}
modTime := files[path].ModTime()
Expand All @@ -141,15 +162,31 @@ func (w *watcher) listFilesIn(base string, force bool) error {
return merr.NewErrors(errs...)
}

// deploy loads yaml from a manifest on disk, creates an AddOn resource to track its application, and then applies
// all resources contained within to the cluster.
func (w *watcher) deploy(path string, compareChecksum bool) error {
content, err := ioutil.ReadFile(path)
name := basename(path)
addon, err := w.getOrCreateAddon(name)
if err != nil {
return err
}

name := name(path)
addon, err := w.addon(name)
addon.Spec.Source = path
addon.Status.GVKs = nil

// Create the new Addon now so that we can use it to report Events when parsing/applying the manifest
// Events need the UID and ObjectRevision set to function properly
if addon.UID == "" {
newAddon, err := w.addons.Create(&addon)
if err != nil {
return err
}
addon = *newAddon
}

content, err := ioutil.ReadFile(path)
if err != nil {
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ReadManifestFailed", "Read manifest at %q failed: %v", path, err)
return err
}

Expand All @@ -159,53 +196,60 @@ func (w *watcher) deploy(path string, compareChecksum bool) error {
return nil
}

// Attempt to parse the YAML/JSON into objects. Failure at this point would be due to bad file content - not YAML/JSON,
// YAML/JSON that can't be converted to Kubernetes objects, etc.
objectSet, err := objectSet(content)
if err != nil {
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ParseManifestFailed", "Parse manifest at %q failed: %v", path, err)
return err
}

// Attempt to apply the changes. Failure at this point would be due to more complicated issues - invalid changes to
// existing objects, rejected by validating webhooks, etc.
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "ApplyingManifest", "Applying manifest at %q", path)
if err := w.apply.WithOwner(&addon).Apply(objectSet); err != nil {
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ApplyManifestFailed", "Applying manifest at %q failed: %v", path, err)
return err
}

addon.Spec.Source = path
addon.Spec.Checksum = checksum
addon.Status.GVKs = nil

if addon.UID == "" {
_, err := w.addons.Create(&addon)
return err
}

// Emit event, Update Addon checksum and modtime only if apply was successful
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "AppliedManifest", "Applied manifest at %q", path)
_, err = w.addons.Update(&addon)
return err
}

// delete completely removes both a manifest, and any resources that it did or would have created. The manifest is
// parsed, and any resources it specified are deleted. Finally, the file itself is removed from disk.
func (w *watcher) delete(path string) error {
name := name(path)
addon, err := w.addon(name)
name := basename(path)
addon, err := w.getOrCreateAddon(name)
if err != nil {
return err
}

// ensure that the addon is completely removed before deleting the objectSet,
// so return when err == nil, otherwise pods may get stuck terminating
if err := w.addons.Delete(addon.Namespace, addon.Name, &metav1.DeleteOptions{}); err == nil || !errors.IsNotFound(err) {
return err
}

content, err := ioutil.ReadFile(path)
if err != nil {
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ReadManifestFailed", "Read manifest at %q failed: %v", path, err)
return err
}

objectSet, err := objectSet(content)
if err != nil {
w.recorder.Eventf(&addon, corev1.EventTypeWarning, "ParseManifestFailed", "Parse manifest at %q failed: %v", path, err)
return err
}
var gvk []schema.GroupVersionKind
for k := range objectSet.ObjectsByGVK() {
gvk = append(gvk, k)
}

// ensure that the addon is completely removed before deleting the objectSet,
// so return when err == nil, otherwise pods may get stuck terminating
w.recorder.Eventf(&addon, corev1.EventTypeNormal, "DeletingManifest", "Deleting manifest at %q", path)
if err := w.addons.Delete(addon.Namespace, addon.Name, &metav1.DeleteOptions{}); err == nil || !errors.IsNotFound(err) {
return err
}

// apply an empty set with owner & gvk data to delete
if err := w.apply.WithOwner(&addon).WithGVK(gvk...).Apply(nil); err != nil {
return err
Expand All @@ -214,16 +258,19 @@ func (w *watcher) delete(path string) error {
return os.Remove(path)
}

func (w *watcher) addon(name string) (v12.Addon, error) {
addon, err := w.addonCache.Get(ns, name)
// getOrCreateAddon attempts to get an Addon by name from the addon namespace, and creates a new one
// if it cannot be found.
func (w *watcher) getOrCreateAddon(name string) (apisv1.Addon, error) {
addon, err := w.addonCache.Get(metav1.NamespaceSystem, name)
if errors.IsNotFound(err) {
addon = v12.NewAddon(ns, name, v12.Addon{})
addon = apisv1.NewAddon(metav1.NamespaceSystem, name, apisv1.Addon{})
} else if err != nil {
return v12.Addon{}, err
return apisv1.Addon{}, err
}
return *addon, nil
}

// objectSet returns a new ObjectSet containing all resources from a given yaml chunk
func objectSet(content []byte) (*objectset.ObjectSet, error) {
objs, err := yamlToObjects(bytes.NewBuffer(content))
if err != nil {
Expand All @@ -235,16 +282,19 @@ func objectSet(content []byte) (*objectset.ObjectSet, error) {
return os, nil
}

func name(path string) string {
// basename returns a file's basename by returning everything before the first period
func basename(path string) string {
name := filepath.Base(path)
return strings.SplitN(name, ".", 2)[0]
}

// checksum returns the hex-encoded SHA256 sum of a byte slice
func checksum(bytes []byte) string {
d := sha256.Sum256(bytes)
return hex.EncodeToString(d[:])
}

// isEmptyYaml returns true if a chunk of YAML contains nothing but whitespace, comments, or document separators
func isEmptyYaml(yaml []byte) bool {
isEmpty := true
lines := bytes.Split(yaml, []byte("\n"))
Expand All @@ -257,6 +307,7 @@ func isEmptyYaml(yaml []byte) bool {
return isEmpty
}

// yamlToObjects returns an object slice yielded from documents in a chunk of YAML
func yamlToObjects(in io.Reader) ([]runtime.Object, error) {
var result []runtime.Object
reader := yamlDecoder.NewYAMLReader(bufio.NewReaderSize(in, 4096))
Expand All @@ -282,6 +333,7 @@ func yamlToObjects(in io.Reader) ([]runtime.Object, error) {
return result, nil
}

// Returns one or more objects from a single YAML document
func toObjects(bytes []byte) ([]runtime.Object, error) {
bytes, err := yamlDecoder.ToJSON(bytes)
if err != nil {
Expand All @@ -305,7 +357,9 @@ func toObjects(bytes []byte) ([]runtime.Object, error) {
return []runtime.Object{obj}, nil
}

func skipFile(fileName string, skips map[string]bool) bool {
// Returns true if a file should be skipped. Skips anything from the provided skip map,
// anything that is a dotfile, and anything that does not have a json/yaml/yml extension.
func shouldSkipFile(fileName string, skips map[string]bool) bool {
switch {
case strings.HasPrefix(fileName, "."):
return true
Expand All @@ -318,7 +372,11 @@ func skipFile(fileName string, skips map[string]bool) bool {
}
}

func shouldDisableService(base, fileName string, disables map[string]bool) bool {
// Returns true if a file should be disabled, by checking the file basename against a disables map.
// only json/yaml files are checked.
func shouldDisableFile(base, fileName string, disables map[string]bool) bool {
// Check to see if the file is in a subdirectory that is in the disables map.
// If a file is nested several levels deep, checks 'parent1', 'parent1/parent2', 'parent1/parent2/parent3', etc.
relFile := strings.TrimPrefix(fileName, base)
namePath := strings.Split(relFile, string(os.PathSeparator))
for i := 1; i < len(namePath); i++ {
Expand All @@ -330,6 +388,7 @@ func shouldDisableService(base, fileName string, disables map[string]bool) bool
if !util.HasSuffixI(fileName, ".yaml", ".yml", ".json") {
return false
}
// Check the basename against the disables map
baseFile := filepath.Base(fileName)
suffix := filepath.Ext(baseFile)
baseName := strings.TrimSuffix(baseFile, suffix)
Expand Down
18 changes: 14 additions & 4 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ func coreControllers(ctx context.Context, sc *Context, config *Config) error {
servicelb.DefaultLBImage = config.ControlConfig.SystemDefaultRegistry + "/" + servicelb.DefaultLBImage
}

helm.Register(ctx, sc.Apply,
helm.Register(ctx,
sc.Apply,
sc.Helm.Helm().V1().HelmChart(),
sc.Helm.Helm().V1().HelmChartConfig(),
sc.Batch.Batch().V1().Job(),
Expand All @@ -204,7 +205,8 @@ func coreControllers(ctx context.Context, sc *Context, config *Config) error {
sc.Core.Core().V1().Pod(),
sc.Core.Core().V1().Service(),
sc.Core.Core().V1().Endpoints(),
!config.DisableServiceLB, config.Rootless); err != nil {
!config.DisableServiceLB,
config.Rootless); err != nil {
return err
}

Expand All @@ -213,7 +215,10 @@ func coreControllers(ctx context.Context, sc *Context, config *Config) error {
}

if config.Rootless {
return rootlessports.Register(ctx, sc.Core.Core().V1().Service(), !config.DisableServiceLB, config.ControlConfig.HTTPSPort)
return rootlessports.Register(ctx,
sc.Core.Core().V1().Service(),
!config.DisableServiceLB,
config.ControlConfig.HTTPSPort)
}

return nil
Expand Down Expand Up @@ -242,7 +247,12 @@ func stageFiles(ctx context.Context, sc *Context, controlConfig *config.Control)
return err
}

return deploy.WatchFiles(ctx, sc.Apply, sc.K3s.K3s().V1().Addon(), controlConfig.Disables, dataDir)
return deploy.WatchFiles(ctx,
sc.K8s,
sc.Apply,
sc.K3s.K3s().V1().Addon(),
controlConfig.Disables,
dataDir)
}

// registryTemplate behaves like the system_default_registry template in Rancher helm charts,
Expand Down