Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Include individual resource sync errors in Sync events #970

Merged
merged 3 commits into from
Mar 12, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions cluster/kubernetes/files.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package kubernetes

import (
"path/filepath"

"github.com/pkg/errors"

"github.com/weaveworks/flux"

"github.com/weaveworks/flux/cluster/kubernetes/resource"
)

Expand All @@ -13,7 +14,7 @@ import (
// specified namespace and name) to the paths of resource definition
// files.
func (c *Manifests) FindDefinedServices(path string) (map[flux.ResourceID][]string, error) {
objects, err := resource.Load(path)
objects, err := resource.Load(path, path)
if err != nil {
return nil, errors.Wrap(err, "loading resources")
}
Expand All @@ -23,7 +24,7 @@ func (c *Manifests) FindDefinedServices(path string) (map[flux.ResourceID][]stri
id := obj.ResourceID()
_, kind, _ := id.Components()
if _, ok := resourceKinds[kind]; ok {
result[id] = append(result[id], obj.Source())
result[id] = append(result[id], filepath.Join(path, obj.Source()))
}
}
return result, nil
Expand Down
62 changes: 31 additions & 31 deletions cluster/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/image"
"github.com/weaveworks/flux/registry"
"github.com/weaveworks/flux/resource"
"github.com/weaveworks/flux/ssh"
)

Expand All @@ -40,15 +41,23 @@ type extendedClient struct {
v1beta1batch.CronJobsGetter
}

// --- internal types for keeping track of syncing

type apiObject struct {
bytes []byte
resource.Resource
Kind string `yaml:"kind"`
Metadata struct {
Name string `yaml:"name"`
Namespace string `yaml:"namespace"`
} `yaml:"metadata"`
}

// A convenience for getting an minimal object from some bytes.
func parseObj(def []byte) (*apiObject, error) {
obj := apiObject{}
return &obj, yaml.Unmarshal(def, &obj)
}

func (o *apiObject) hasNamespace() bool {
return o.Metadata.Namespace != ""
}
Expand Down Expand Up @@ -85,32 +94,27 @@ func isAddon(obj namespacedLabeled) bool {
// --- /add ons

type changeSet struct {
nsObjs map[string][]obj
noNsObjs map[string][]obj
nsObjs map[string][]*apiObject
noNsObjs map[string][]*apiObject
}

func makeChangeSet() changeSet {
return changeSet{
nsObjs: make(map[string][]obj),
noNsObjs: make(map[string][]obj),
nsObjs: make(map[string][]*apiObject),
noNsObjs: make(map[string][]*apiObject),
}
}

func (c *changeSet) stage(cmd, id string, o *apiObject) {
func (c *changeSet) stage(cmd string, o *apiObject) {
if o.hasNamespace() {
c.nsObjs[cmd] = append(c.nsObjs[cmd], obj{id, o})
c.nsObjs[cmd] = append(c.nsObjs[cmd], o)
} else {
c.noNsObjs[cmd] = append(c.noNsObjs[cmd], obj{id, o})
c.noNsObjs[cmd] = append(c.noNsObjs[cmd], o)
}
}

type obj struct {
id string
*apiObject
}

type Applier interface {
apply(log.Logger, changeSet, cluster.SyncError)
apply(log.Logger, changeSet) cluster.SyncError
}

// Cluster is a handle to a Kubernetes API server.
Expand Down Expand Up @@ -217,34 +221,38 @@ func (c *Cluster) Sync(spec cluster.SyncDef) error {
logger := log.With(c.logger, "method", "Sync")

cs := makeChangeSet()
errs := cluster.SyncError{}
var errs cluster.SyncError
for _, action := range spec.Actions {
stages := []struct {
b []byte
res resource.Resource
cmd string
}{
{action.Delete, "delete"},
{action.Apply, "apply"},
}
for _, stage := range stages {
if len(stage.b) == 0 {
if stage.res == nil {
continue
}
obj, err := definitionObj(stage.b)
id := action.ResourceID
obj, err := parseObj(stage.res.Bytes())
if err == nil {
cs.stage(stage.cmd, id, obj)
obj.Resource = stage.res
cs.stage(stage.cmd, obj)
} else {
errs[id] = err
errs = append(errs, cluster.ResourceError{Resource: stage.res, Error: err})
break
}
}
}

c.mu.Lock()
defer c.mu.Unlock()
c.applier.apply(logger, cs, errs)
if len(errs) != 0 {
if applyErrs := c.applier.apply(logger, cs); len(applyErrs) > 0 {
errs = append(errs, applyErrs...)
}

// If `nil`, errs is a cluster.SyncError(nil) rather than error(nil)
if errs != nil {
return errs
}
return nil
Expand Down Expand Up @@ -408,11 +416,3 @@ func (c *Cluster) ImagesToFetch() registry.ImageCreds {

return allImageCreds
}

// --- end cluster.Cluster

// A convenience for getting an minimal object from some bytes.
func definitionObj(bytes []byte) (*apiObject, error) {
obj := apiObject{bytes: bytes}
return &obj, yaml.Unmarshal(bytes, &obj)
}
34 changes: 25 additions & 9 deletions cluster/kubernetes/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,41 @@ import (

"github.com/go-kit/kit/log"

"github.com/weaveworks/flux"
"github.com/weaveworks/flux/cluster"
"github.com/weaveworks/flux/policy"
)

type mockApplier struct {
commandRun bool
}

func (m *mockApplier) apply(_ log.Logger, c changeSet, _ cluster.SyncError) {
func (m *mockApplier) apply(_ log.Logger, c changeSet) cluster.SyncError {
if len(c.nsObjs) != 0 || len(c.noNsObjs) != 0 {
m.commandRun = true
}
return nil
}

func deploymentDef(name string) []byte {
return []byte(`---
kind: Deployment
metadata:
name: ` + name)
type rsc struct {
id string
bytes []byte
}

func (r rsc) ResourceID() flux.ResourceID {
return flux.MustParseResourceID(r.id)
}

func (r rsc) Bytes() []byte {
return r.bytes
}

func (r rsc) Policy() policy.Set {
return nil
}

func (r rsc) Source() string {
return "test"
}

// ---
Expand All @@ -39,7 +56,7 @@ func setup(t *testing.T) (*Cluster, *mockApplier) {
func TestSyncNop(t *testing.T) {
kube, mock := setup(t)
if err := kube.Sync(cluster.SyncDef{}); err != nil {
t.Error(err)
t.Errorf("%#v", err)
}
if mock.commandRun {
t.Error("expected no commands run")
Expand All @@ -51,8 +68,7 @@ func TestSyncMalformed(t *testing.T) {
err := kube.Sync(cluster.SyncDef{
Actions: []cluster.SyncAction{
cluster.SyncAction{
ResourceID: "foobar",
Apply: []byte("garbage"),
Apply: rsc{"id", []byte("garbage")},
},
},
})
Expand Down
4 changes: 2 additions & 2 deletions cluster/kubernetes/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ type Manifests struct {

// FindDefinedServices implementation in files.go

func (c *Manifests) LoadManifests(paths ...string) (map[string]resource.Resource, error) {
return kresource.Load(paths...)
func (c *Manifests) LoadManifests(base, first string, rest ...string) (map[string]resource.Resource, error) {
return kresource.Load(base, first, rest...)
}

func (c *Manifests) ParseManifests(allDefs []byte) (map[string]resource.Resource, error) {
Expand Down
15 changes: 8 additions & 7 deletions cluster/kubernetes/release.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ func (c *Kubectl) connectArgs() []string {
return args
}

func (c *Kubectl) apply(logger log.Logger, cs changeSet, errs cluster.SyncError) {
f := func(m map[string][]obj, cmd string, args ...string) {
func (c *Kubectl) apply(logger log.Logger, cs changeSet) (errs cluster.SyncError) {
f := func(m map[string][]*apiObject, cmd string, args ...string) {
objs := m[cmd]
if len(objs) == 0 {
return
Expand All @@ -62,9 +62,9 @@ func (c *Kubectl) apply(logger log.Logger, cs changeSet, errs cluster.SyncError)
args = append(args, cmd)
if err := c.doCommand(logger, makeMultidoc(objs), args...); err != nil {
for _, obj := range objs {
r := bytes.NewReader(obj.bytes)
r := bytes.NewReader(obj.Bytes())
if err := c.doCommand(logger, r, args...); err != nil {
errs[obj.id] = err
errs = append(errs, cluster.ResourceError{obj.Resource, err})
}
}
}
Expand All @@ -79,7 +79,7 @@ func (c *Kubectl) apply(logger log.Logger, cs changeSet, errs cluster.SyncError)
// first, so we run the commands the other way round.
f(cs.noNsObjs, "apply", "--namespace", "default")
f(cs.nsObjs, "apply")

return errs
}

func (c *Kubectl) doCommand(logger log.Logger, r io.Reader, args ...string) error {
Expand All @@ -101,10 +101,11 @@ func (c *Kubectl) doCommand(logger log.Logger, r io.Reader, args ...string) erro
return err
}

func makeMultidoc(objs []obj) *bytes.Buffer {
func makeMultidoc(objs []*apiObject) *bytes.Buffer {
buf := &bytes.Buffer{}
for _, obj := range objs {
buf.WriteString("\n---\n" + string(obj.bytes))
buf.WriteString("\n---\n")
buf.Write(obj.Bytes())
}
return buf
}
Expand Down
11 changes: 8 additions & 3 deletions cluster/kubernetes/resource/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
// Load takes paths to directories or files, and creates an object set
// based on the file(s) therein. Resources are named according to the
// file content, rather than the file name of directory structure.
func Load(roots ...string) (map[string]resource.Resource, error) {
func Load(base, atLeastOne string, more ...string) (map[string]resource.Resource, error) {
roots := append([]string{atLeastOne}, more...)
objs := map[string]resource.Resource{}
for _, root := range roots {
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
Expand All @@ -32,13 +33,17 @@ func Load(roots ...string) (map[string]resource.Resource, error) {
if err != nil {
return errors.Wrapf(err, "reading file at %q", path)
}
docsInFile, err := ParseMultidoc(bytes, path)
source, err := filepath.Rel(base, path)
if err != nil {
return errors.Wrapf(err, "finding relative path for %q", path)
}
docsInFile, err := ParseMultidoc(bytes, source)
if err != nil {
return errors.Wrapf(err, "parsing file at %q", path)
}
for id, obj := range docsInFile {
if alreadyDefined, ok := objs[id]; ok {
return fmt.Errorf(`resource '%s' defined more than once (in %s and %s)`, id, alreadyDefined.Source(), path)
return fmt.Errorf(`resource '%s' defined more than once (in %s and %s)`, id, alreadyDefined.Source(), source)
}
objs[id] = obj
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/kubernetes/resource/load_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func TestLoadSome(t *testing.T) {
if err := testfiles.WriteTestFiles(dir); err != nil {
t.Fatal(err)
}
objs, err := Load(dir)
objs, err := Load(dir, dir)
if err != nil {
t.Error(err)
}
Expand Down
2 changes: 1 addition & 1 deletion cluster/kubernetes/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
// the source to learn about them.
func updatePodController(def []byte, container string, newImageID image.Ref) ([]byte, error) {
// Sanity check
obj, err := definitionObj(def)
obj, err := parseObj(def)
if err != nil {
return nil, err
}
Expand Down
7 changes: 5 additions & 2 deletions cluster/manifests.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ type Manifests interface {
// Update the definitions in a manifests bytes according to the
// spec given.
UpdateDefinition(def []byte, container string, newImageID image.Ref) ([]byte, error)
// Load all the resource manifests under the path given
LoadManifests(paths ...string) (map[string]resource.Resource, error)
// Load all the resource manifests under the path given. `baseDir`
// is used to relativise the paths, which are supplied as absolute
// paths to directories or files; at least one path must be
// supplied.
LoadManifests(baseDir, first string, rest ...string) (map[string]resource.Resource, error)
// Parse the manifests given in an exported blob
ParseManifests([]byte) (map[string]resource.Resource, error)
// UpdatePolicies modifies a manifest to apply the policy update specified
Expand Down
6 changes: 3 additions & 3 deletions cluster/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Mock struct {
PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error)
FindDefinedServicesFunc func(path string) (map[flux.ResourceID][]string, error)
UpdateDefinitionFunc func(def []byte, container string, newImageID image.Ref) ([]byte, error)
LoadManifestsFunc func(paths ...string) (map[string]resource.Resource, error)
LoadManifestsFunc func(base, first string, rest ...string) (map[string]resource.Resource, error)
ParseManifestsFunc func([]byte) (map[string]resource.Resource, error)
UpdateManifestFunc func(path, resourceID string, f func(def []byte) ([]byte, error)) error
UpdatePoliciesFunc func([]byte, policy.Update) ([]byte, error)
Expand Down Expand Up @@ -57,8 +57,8 @@ func (m *Mock) UpdateDefinition(def []byte, container string, newImageID image.R
return m.UpdateDefinitionFunc(def, container, newImageID)
}

func (m *Mock) LoadManifests(paths ...string) (map[string]resource.Resource, error) {
return m.LoadManifestsFunc(paths...)
func (m *Mock) LoadManifests(base, first string, rest ...string) (map[string]resource.Resource, error) {
return m.LoadManifestsFunc(base, first, rest...)
}

func (m *Mock) ParseManifests(def []byte) (map[string]resource.Resource, error) {
Expand Down
Loading