From a3ce2eddde75c44ed75bdb53542a15c763558a9e Mon Sep 17 00:00:00 2001 From: Michael Bridgen Date: Wed, 28 Feb 2018 16:50:14 +0000 Subject: [PATCH 1/3] Use resource.Resource throughout Before we had an interface representing resources, they were generally passed around as a pair of (id string, definition []byte). We can cut down on the number of representations of resources by just using `resource.Resource`. This includes returning synchronisation errors with the whole resource, rather than simply by name. Doing so means we will be able to better surface sync problems with individual resources. --- cluster/kubernetes/kubernetes.go | 62 +++++++++++++-------------- cluster/kubernetes/kubernetes_test.go | 34 +++++++++++---- cluster/kubernetes/release.go | 15 ++++--- cluster/kubernetes/update.go | 2 +- cluster/sync.go | 32 +++++++------- sync/sync.go | 6 +-- sync/sync_test.go | 24 ++++------- 7 files changed, 90 insertions(+), 85 deletions(-) diff --git a/cluster/kubernetes/kubernetes.go b/cluster/kubernetes/kubernetes.go index ec8dae909..d53201890 100644 --- a/cluster/kubernetes/kubernetes.go +++ b/cluster/kubernetes/kubernetes.go @@ -23,6 +23,7 @@ import ( "github.com/weaveworks/flux/cluster" "github.com/weaveworks/flux/image" "github.com/weaveworks/flux/registry" + "github.com/weaveworks/flux/resource" "github.com/weaveworks/flux/ssh" ) @@ -40,8 +41,10 @@ type extendedClient struct { v1beta1batch.CronJobsGetter } +// --- internal types for keeping track of syncing + type apiObject struct { - bytes []byte + resource.Resource Kind string `yaml:"kind"` Metadata struct { Name string `yaml:"name"` @@ -49,6 +52,12 @@ type apiObject struct { } `yaml:"metadata"` } +// A convenience for getting an minimal object from some bytes. +func parseObj(def []byte) (*apiObject, error) { + obj := apiObject{} + return &obj, yaml.Unmarshal(def, &obj) +} + func (o *apiObject) hasNamespace() bool { return o.Metadata.Namespace != "" } @@ -85,32 +94,27 @@ func isAddon(obj namespacedLabeled) bool { // --- /add ons type changeSet struct { - nsObjs map[string][]obj - noNsObjs map[string][]obj + nsObjs map[string][]*apiObject + noNsObjs map[string][]*apiObject } func makeChangeSet() changeSet { return changeSet{ - nsObjs: make(map[string][]obj), - noNsObjs: make(map[string][]obj), + nsObjs: make(map[string][]*apiObject), + noNsObjs: make(map[string][]*apiObject), } } -func (c *changeSet) stage(cmd, id string, o *apiObject) { +func (c *changeSet) stage(cmd string, o *apiObject) { if o.hasNamespace() { - c.nsObjs[cmd] = append(c.nsObjs[cmd], obj{id, o}) + c.nsObjs[cmd] = append(c.nsObjs[cmd], o) } else { - c.noNsObjs[cmd] = append(c.noNsObjs[cmd], obj{id, o}) + c.noNsObjs[cmd] = append(c.noNsObjs[cmd], o) } } -type obj struct { - id string - *apiObject -} - type Applier interface { - apply(log.Logger, changeSet, cluster.SyncError) + apply(log.Logger, changeSet) cluster.SyncErrors } // Cluster is a handle to a Kubernetes API server. @@ -217,25 +221,25 @@ func (c *Cluster) Sync(spec cluster.SyncDef) error { logger := log.With(c.logger, "method", "Sync") cs := makeChangeSet() - errs := cluster.SyncError{} + var errs cluster.SyncErrors for _, action := range spec.Actions { stages := []struct { - b []byte + res resource.Resource cmd string }{ {action.Delete, "delete"}, {action.Apply, "apply"}, } for _, stage := range stages { - if len(stage.b) == 0 { + if stage.res == nil { continue } - obj, err := definitionObj(stage.b) - id := action.ResourceID + obj, err := parseObj(stage.res.Bytes()) if err == nil { - cs.stage(stage.cmd, id, obj) + obj.Resource = stage.res + cs.stage(stage.cmd, obj) } else { - errs[id] = err + errs = append(errs, cluster.SyncError{Resource: stage.res, Error: err}) break } } @@ -243,8 +247,12 @@ func (c *Cluster) Sync(spec cluster.SyncDef) error { c.mu.Lock() defer c.mu.Unlock() - c.applier.apply(logger, cs, errs) - if len(errs) != 0 { + if applyErrs := c.applier.apply(logger, cs); len(applyErrs) > 0 { + errs = append(errs, applyErrs...) + } + + // If `nil`, errs is a cluster.SyncError(nil) rather than error(nil) + if errs != nil { return errs } return nil @@ -408,11 +416,3 @@ func (c *Cluster) ImagesToFetch() registry.ImageCreds { return allImageCreds } - -// --- end cluster.Cluster - -// A convenience for getting an minimal object from some bytes. -func definitionObj(bytes []byte) (*apiObject, error) { - obj := apiObject{bytes: bytes} - return &obj, yaml.Unmarshal(bytes, &obj) -} diff --git a/cluster/kubernetes/kubernetes_test.go b/cluster/kubernetes/kubernetes_test.go index 5519356d1..4ed2c0961 100644 --- a/cluster/kubernetes/kubernetes_test.go +++ b/cluster/kubernetes/kubernetes_test.go @@ -5,24 +5,41 @@ import ( "github.com/go-kit/kit/log" + "github.com/weaveworks/flux" "github.com/weaveworks/flux/cluster" + "github.com/weaveworks/flux/policy" ) type mockApplier struct { commandRun bool } -func (m *mockApplier) apply(_ log.Logger, c changeSet, _ cluster.SyncError) { +func (m *mockApplier) apply(_ log.Logger, c changeSet) cluster.SyncErrors { if len(c.nsObjs) != 0 || len(c.noNsObjs) != 0 { m.commandRun = true } + return nil } -func deploymentDef(name string) []byte { - return []byte(`--- -kind: Deployment -metadata: - name: ` + name) +type rsc struct { + id string + bytes []byte +} + +func (r rsc) ResourceID() flux.ResourceID { + return flux.MustParseResourceID(r.id) +} + +func (r rsc) Bytes() []byte { + return r.bytes +} + +func (r rsc) Policy() policy.Set { + return nil +} + +func (r rsc) Source() string { + return "test" } // --- @@ -39,7 +56,7 @@ func setup(t *testing.T) (*Cluster, *mockApplier) { func TestSyncNop(t *testing.T) { kube, mock := setup(t) if err := kube.Sync(cluster.SyncDef{}); err != nil { - t.Error(err) + t.Errorf("%#v", err) } if mock.commandRun { t.Error("expected no commands run") @@ -51,8 +68,7 @@ func TestSyncMalformed(t *testing.T) { err := kube.Sync(cluster.SyncDef{ Actions: []cluster.SyncAction{ cluster.SyncAction{ - ResourceID: "foobar", - Apply: []byte("garbage"), + Apply: rsc{"id", []byte("garbage")}, }, }, }) diff --git a/cluster/kubernetes/release.go b/cluster/kubernetes/release.go index 8978fa840..7ca0a64f8 100644 --- a/cluster/kubernetes/release.go +++ b/cluster/kubernetes/release.go @@ -52,8 +52,8 @@ func (c *Kubectl) connectArgs() []string { return args } -func (c *Kubectl) apply(logger log.Logger, cs changeSet, errs cluster.SyncError) { - f := func(m map[string][]obj, cmd string, args ...string) { +func (c *Kubectl) apply(logger log.Logger, cs changeSet) (errs cluster.SyncErrors) { + f := func(m map[string][]*apiObject, cmd string, args ...string) { objs := m[cmd] if len(objs) == 0 { return @@ -62,9 +62,9 @@ func (c *Kubectl) apply(logger log.Logger, cs changeSet, errs cluster.SyncError) args = append(args, cmd) if err := c.doCommand(logger, makeMultidoc(objs), args...); err != nil { for _, obj := range objs { - r := bytes.NewReader(obj.bytes) + r := bytes.NewReader(obj.Bytes()) if err := c.doCommand(logger, r, args...); err != nil { - errs[obj.id] = err + errs = append(errs, cluster.SyncError{obj.Resource, err}) } } } @@ -79,7 +79,7 @@ func (c *Kubectl) apply(logger log.Logger, cs changeSet, errs cluster.SyncError) // first, so we run the commands the other way round. f(cs.noNsObjs, "apply", "--namespace", "default") f(cs.nsObjs, "apply") - + return errs } func (c *Kubectl) doCommand(logger log.Logger, r io.Reader, args ...string) error { @@ -101,10 +101,11 @@ func (c *Kubectl) doCommand(logger log.Logger, r io.Reader, args ...string) erro return err } -func makeMultidoc(objs []obj) *bytes.Buffer { +func makeMultidoc(objs []*apiObject) *bytes.Buffer { buf := &bytes.Buffer{} for _, obj := range objs { - buf.WriteString("\n---\n" + string(obj.bytes)) + buf.WriteString("\n---\n") + buf.Write(obj.Bytes()) } return buf } diff --git a/cluster/kubernetes/update.go b/cluster/kubernetes/update.go index 5556a393e..24cfc85fb 100644 --- a/cluster/kubernetes/update.go +++ b/cluster/kubernetes/update.go @@ -20,7 +20,7 @@ import ( // the source to learn about them. func updatePodController(def []byte, container string, newImageID image.Ref) ([]byte, error) { // Sanity check - obj, err := definitionObj(def) + obj, err := parseObj(def) if err != nil { return nil, err } diff --git a/cluster/sync.go b/cluster/sync.go index 0993df2aa..017e3c8f8 100644 --- a/cluster/sync.go +++ b/cluster/sync.go @@ -2,24 +2,17 @@ package cluster import ( "strings" + + "github.com/weaveworks/flux/resource" ) // Definitions for use in synchronising a cluster with a git repo. -// Yep, resources are defined by opaque bytes. It's up to the cluster -// at the other end to do the right thing. -type ResourceDef []byte - -// The action(s) to take on a particular resource. -// This should just be done in order, i.e.,: -// 1. delete if something in Delete -// 2. apply if something in Apply +// SyncAction represents either the deletion or application (create or +// update) of a resource. type SyncAction struct { - // The ID is just a handle for labeling any error. No other - // meaning is attached to it. - ResourceID string - Delete ResourceDef - Apply ResourceDef + Delete resource.Resource // ) one of these + Apply resource.Resource // ) } type SyncDef struct { @@ -27,12 +20,17 @@ type SyncDef struct { Actions []SyncAction } -type SyncError map[string]error +type SyncError struct { + resource.Resource + Error error +} + +type SyncErrors []SyncError -func (err SyncError) Error() string { +func (err SyncErrors) Error() string { var errs []string - for id, e := range err { - errs = append(errs, id+": "+e.Error()) + for _, e := range err { + errs = append(errs, e.ResourceID().String()+": "+e.Error.Error()) } return strings.Join(errs, "; ") } diff --git a/sync/sync.go b/sync/sync.go index 1435d6a71..937169091 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -55,8 +55,7 @@ func prepareSyncDelete(logger log.Logger, repoResources map[string]resource.Reso } if _, ok := repoResources[id]; !ok { sync.Actions = append(sync.Actions, cluster.SyncAction{ - ResourceID: id, - Delete: res.Bytes(), + Delete: res, }) } } @@ -73,7 +72,6 @@ func prepareSyncApply(logger log.Logger, clusterResources map[string]resource.Re } } sync.Actions = append(sync.Actions, cluster.SyncAction{ - ResourceID: id, - Apply: res.Bytes(), + Apply: res, }) } diff --git a/sync/sync_test.go b/sync/sync_test.go index ee4168b12..183002da3 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -69,14 +69,12 @@ func TestPrepareSyncDelete(t *testing.T) { var tests = []struct { msg string repoRes map[string]resource.Resource - id string res resource.Resource expected *cluster.SyncDef }{ { msg: "No repo resources provided during sync delete", repoRes: map[string]resource.Resource{}, - id: "res7", res: mockResourceWithIgnorePolicy("service", "ns1", "s2"), expected: &cluster.SyncDef{}, }, @@ -90,7 +88,6 @@ func TestPrepareSyncDelete(t *testing.T) { "res5": mockResourceWithoutIgnorePolicy("deployment", "ns2", "d2"), "res6": mockResourceWithoutIgnorePolicy("service", "ns3", "s1"), }, - id: "res7", res: mockResourceWithIgnorePolicy("service", "ns1", "s2"), expected: &cluster.SyncDef{}, }, @@ -104,16 +101,15 @@ func TestPrepareSyncDelete(t *testing.T) { "res5": mockResourceWithoutIgnorePolicy("deployment", "ns2", "d2"), "res6": mockResourceWithoutIgnorePolicy("service", "ns3", "s1"), }, - id: "res7", res: mockResourceWithoutIgnorePolicy("service", "ns1", "s2"), - expected: &cluster.SyncDef{Actions: []cluster.SyncAction{cluster.SyncAction{ResourceID: "res7", Delete: cluster.ResourceDef{}, Apply: cluster.ResourceDef(nil)}}}, + expected: &cluster.SyncDef{Actions: []cluster.SyncAction{cluster.SyncAction{Delete: mockResourceWithoutIgnorePolicy("service", "ns1", "s2")}}}, }, } logger := log.NewNopLogger() for _, sc := range tests { sync := &cluster.SyncDef{} - prepareSyncDelete(logger, sc.repoRes, sc.id, sc.res, sync) + prepareSyncDelete(logger, sc.repoRes, sc.res.ResourceID().String(), sc.res, sync) if !reflect.DeepEqual(sc.expected, sync) { t.Errorf("%s: expected %+v, got %+v\n", sc.msg, sc.expected, sync) @@ -125,14 +121,12 @@ func TestPrepareSyncApply(t *testing.T) { var tests = []struct { msg string clusRes map[string]resource.Resource - id string res resource.Resource expected *cluster.SyncDef }{ { msg: "No repo resources provided during sync apply", clusRes: map[string]resource.Resource{}, - id: "res1", res: mockResourceWithIgnorePolicy("service", "ns1", "s2"), expected: &cluster.SyncDef{}, }, @@ -146,7 +140,6 @@ func TestPrepareSyncApply(t *testing.T) { "res5": mockResourceWithoutIgnorePolicy("deployment", "ns2", "d2"), "res6": mockResourceWithoutIgnorePolicy("service", "ns3", "s1"), }, - id: "res7", res: mockResourceWithIgnorePolicy("service", "ns1", "s2"), expected: &cluster.SyncDef{}, }, @@ -160,16 +153,15 @@ func TestPrepareSyncApply(t *testing.T) { "res5": mockResourceWithoutIgnorePolicy("deployment", "ns2", "d2"), "res6": mockResourceWithoutIgnorePolicy("service", "ns3", "s1"), }, - id: "res7", res: mockResourceWithoutIgnorePolicy("service", "ns1", "s2"), - expected: &cluster.SyncDef{Actions: []cluster.SyncAction{cluster.SyncAction{ResourceID: "res7", Apply: cluster.ResourceDef{}, Delete: cluster.ResourceDef(nil)}}}, + expected: &cluster.SyncDef{Actions: []cluster.SyncAction{cluster.SyncAction{Apply: mockResourceWithoutIgnorePolicy("service", "ns1", "s2")}}}, }, } logger := log.NewNopLogger() for _, sc := range tests { sync := &cluster.SyncDef{} - prepareSyncApply(logger, sc.clusRes, sc.id, sc.res, sync) + prepareSyncApply(logger, sc.clusRes, sc.res.ResourceID().String(), sc.res, sync) if !reflect.DeepEqual(sc.expected, sync) { t.Errorf("%s: expected %+v, got %+v\n", sc.msg, sc.expected, sync) @@ -211,12 +203,12 @@ func (p *syncCluster) Sync(def cluster.SyncDef) error { println("=== Syncing ===") for _, action := range def.Actions { if action.Delete != nil { - println("Deleting " + action.ResourceID) - delete(p.resources, action.ResourceID) + println("Deleting " + action.Delete.ResourceID().String()) + delete(p.resources, action.Delete.ResourceID().String()) } if action.Apply != nil { - println("Applying " + action.ResourceID) - p.resources[action.ResourceID] = action.Apply + println("Applying " + action.Apply.ResourceID().String()) + p.resources[action.Apply.ResourceID().String()] = action.Apply.Bytes() } } println("=== Done syncing ===") From 4c92f1d2f0a6e0e9e50e5de802fc1f0cef4eac34 Mon Sep 17 00:00:00 2001 From: Michael Bridgen Date: Wed, 28 Feb 2018 23:44:20 +0000 Subject: [PATCH 2/3] Report sync errors including source file Include an error report in the sync notification, with resources v. errors (with the latter coming from `kubectl` most likely). The most useful bit of information when a resource fails to sync -- more useful than the error from kubectl, even -- is the file that had a problem. Include that in the notification. Secondarily: to avoid having a long, tmpfile path in messages, make the source of resources relative to the repo. --- cluster/kubernetes/files.go | 7 ++++--- cluster/kubernetes/kubernetes.go | 6 +++--- cluster/kubernetes/kubernetes_test.go | 2 +- cluster/kubernetes/manifests.go | 4 ++-- cluster/kubernetes/release.go | 4 ++-- cluster/kubernetes/resource/load.go | 11 ++++++++--- cluster/kubernetes/resource/load_test.go | 2 +- cluster/manifests.go | 7 +++++-- cluster/mock.go | 6 +++--- cluster/sync.go | 10 +++++----- daemon/daemon_test.go | 2 +- daemon/loop.go | 25 ++++++++++++++---------- event/event.go | 2 ++ git/working.go | 7 ++++++- sync/sync_test.go | 6 +++--- 15 files changed, 61 insertions(+), 40 deletions(-) diff --git a/cluster/kubernetes/files.go b/cluster/kubernetes/files.go index 02cdda83a..8f9a91725 100644 --- a/cluster/kubernetes/files.go +++ b/cluster/kubernetes/files.go @@ -1,10 +1,11 @@ package kubernetes import ( + "path/filepath" + "github.com/pkg/errors" "github.com/weaveworks/flux" - "github.com/weaveworks/flux/cluster/kubernetes/resource" ) @@ -13,7 +14,7 @@ import ( // specified namespace and name) to the paths of resource definition // files. func (c *Manifests) FindDefinedServices(path string) (map[flux.ResourceID][]string, error) { - objects, err := resource.Load(path) + objects, err := resource.Load(path, path) if err != nil { return nil, errors.Wrap(err, "loading resources") } @@ -23,7 +24,7 @@ func (c *Manifests) FindDefinedServices(path string) (map[flux.ResourceID][]stri id := obj.ResourceID() _, kind, _ := id.Components() if _, ok := resourceKinds[kind]; ok { - result[id] = append(result[id], obj.Source()) + result[id] = append(result[id], filepath.Join(path, obj.Source())) } } return result, nil diff --git a/cluster/kubernetes/kubernetes.go b/cluster/kubernetes/kubernetes.go index d53201890..4c8ef18da 100644 --- a/cluster/kubernetes/kubernetes.go +++ b/cluster/kubernetes/kubernetes.go @@ -114,7 +114,7 @@ func (c *changeSet) stage(cmd string, o *apiObject) { } type Applier interface { - apply(log.Logger, changeSet) cluster.SyncErrors + apply(log.Logger, changeSet) cluster.SyncError } // Cluster is a handle to a Kubernetes API server. @@ -221,7 +221,7 @@ func (c *Cluster) Sync(spec cluster.SyncDef) error { logger := log.With(c.logger, "method", "Sync") cs := makeChangeSet() - var errs cluster.SyncErrors + var errs cluster.SyncError for _, action := range spec.Actions { stages := []struct { res resource.Resource @@ -239,7 +239,7 @@ func (c *Cluster) Sync(spec cluster.SyncDef) error { obj.Resource = stage.res cs.stage(stage.cmd, obj) } else { - errs = append(errs, cluster.SyncError{Resource: stage.res, Error: err}) + errs = append(errs, cluster.ResourceError{Resource: stage.res, Error: err}) break } } diff --git a/cluster/kubernetes/kubernetes_test.go b/cluster/kubernetes/kubernetes_test.go index 4ed2c0961..b3c1d3565 100644 --- a/cluster/kubernetes/kubernetes_test.go +++ b/cluster/kubernetes/kubernetes_test.go @@ -14,7 +14,7 @@ type mockApplier struct { commandRun bool } -func (m *mockApplier) apply(_ log.Logger, c changeSet) cluster.SyncErrors { +func (m *mockApplier) apply(_ log.Logger, c changeSet) cluster.SyncError { if len(c.nsObjs) != 0 || len(c.noNsObjs) != 0 { m.commandRun = true } diff --git a/cluster/kubernetes/manifests.go b/cluster/kubernetes/manifests.go index a3efdc561..3edb7bbc3 100644 --- a/cluster/kubernetes/manifests.go +++ b/cluster/kubernetes/manifests.go @@ -11,8 +11,8 @@ type Manifests struct { // FindDefinedServices implementation in files.go -func (c *Manifests) LoadManifests(paths ...string) (map[string]resource.Resource, error) { - return kresource.Load(paths...) +func (c *Manifests) LoadManifests(base, first string, rest ...string) (map[string]resource.Resource, error) { + return kresource.Load(base, first, rest...) } func (c *Manifests) ParseManifests(allDefs []byte) (map[string]resource.Resource, error) { diff --git a/cluster/kubernetes/release.go b/cluster/kubernetes/release.go index 7ca0a64f8..16ca6790d 100644 --- a/cluster/kubernetes/release.go +++ b/cluster/kubernetes/release.go @@ -52,7 +52,7 @@ func (c *Kubectl) connectArgs() []string { return args } -func (c *Kubectl) apply(logger log.Logger, cs changeSet) (errs cluster.SyncErrors) { +func (c *Kubectl) apply(logger log.Logger, cs changeSet) (errs cluster.SyncError) { f := func(m map[string][]*apiObject, cmd string, args ...string) { objs := m[cmd] if len(objs) == 0 { @@ -64,7 +64,7 @@ func (c *Kubectl) apply(logger log.Logger, cs changeSet) (errs cluster.SyncError for _, obj := range objs { r := bytes.NewReader(obj.Bytes()) if err := c.doCommand(logger, r, args...); err != nil { - errs = append(errs, cluster.SyncError{obj.Resource, err}) + errs = append(errs, cluster.ResourceError{obj.Resource, err}) } } } diff --git a/cluster/kubernetes/resource/load.go b/cluster/kubernetes/resource/load.go index 9fded341a..bf23add03 100644 --- a/cluster/kubernetes/resource/load.go +++ b/cluster/kubernetes/resource/load.go @@ -15,7 +15,8 @@ import ( // Load takes paths to directories or files, and creates an object set // based on the file(s) therein. Resources are named according to the // file content, rather than the file name of directory structure. -func Load(roots ...string) (map[string]resource.Resource, error) { +func Load(base, atLeastOne string, more ...string) (map[string]resource.Resource, error) { + roots := append([]string{atLeastOne}, more...) objs := map[string]resource.Resource{} for _, root := range roots { err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { @@ -32,13 +33,17 @@ func Load(roots ...string) (map[string]resource.Resource, error) { if err != nil { return errors.Wrapf(err, "reading file at %q", path) } - docsInFile, err := ParseMultidoc(bytes, path) + source, err := filepath.Rel(base, path) + if err != nil { + return errors.Wrapf(err, "finding relative path for %q", path) + } + docsInFile, err := ParseMultidoc(bytes, source) if err != nil { return errors.Wrapf(err, "parsing file at %q", path) } for id, obj := range docsInFile { if alreadyDefined, ok := objs[id]; ok { - return fmt.Errorf(`resource '%s' defined more than once (in %s and %s)`, id, alreadyDefined.Source(), path) + return fmt.Errorf(`resource '%s' defined more than once (in %s and %s)`, id, alreadyDefined.Source(), source) } objs[id] = obj } diff --git a/cluster/kubernetes/resource/load_test.go b/cluster/kubernetes/resource/load_test.go index 0a57a507f..22ee749fa 100644 --- a/cluster/kubernetes/resource/load_test.go +++ b/cluster/kubernetes/resource/load_test.go @@ -132,7 +132,7 @@ func TestLoadSome(t *testing.T) { if err := testfiles.WriteTestFiles(dir); err != nil { t.Fatal(err) } - objs, err := Load(dir) + objs, err := Load(dir, dir) if err != nil { t.Error(err) } diff --git a/cluster/manifests.go b/cluster/manifests.go index 6d6cbcb94..0f85a704a 100644 --- a/cluster/manifests.go +++ b/cluster/manifests.go @@ -20,8 +20,11 @@ type Manifests interface { // Update the definitions in a manifests bytes according to the // spec given. UpdateDefinition(def []byte, container string, newImageID image.Ref) ([]byte, error) - // Load all the resource manifests under the path given - LoadManifests(paths ...string) (map[string]resource.Resource, error) + // Load all the resource manifests under the path given. `baseDir` + // is used to relativise the paths, which are supplied as absolute + // paths to directories or files; at least one path must be + // supplied. + LoadManifests(baseDir, first string, rest ...string) (map[string]resource.Resource, error) // Parse the manifests given in an exported blob ParseManifests([]byte) (map[string]resource.Resource, error) // UpdatePolicies modifies a manifest to apply the policy update specified diff --git a/cluster/mock.go b/cluster/mock.go index 78a180cc2..5f14f16ea 100644 --- a/cluster/mock.go +++ b/cluster/mock.go @@ -18,7 +18,7 @@ type Mock struct { PublicSSHKeyFunc func(regenerate bool) (ssh.PublicKey, error) FindDefinedServicesFunc func(path string) (map[flux.ResourceID][]string, error) UpdateDefinitionFunc func(def []byte, container string, newImageID image.Ref) ([]byte, error) - LoadManifestsFunc func(paths ...string) (map[string]resource.Resource, error) + LoadManifestsFunc func(base, first string, rest ...string) (map[string]resource.Resource, error) ParseManifestsFunc func([]byte) (map[string]resource.Resource, error) UpdateManifestFunc func(path, resourceID string, f func(def []byte) ([]byte, error)) error UpdatePoliciesFunc func([]byte, policy.Update) ([]byte, error) @@ -57,8 +57,8 @@ func (m *Mock) UpdateDefinition(def []byte, container string, newImageID image.R return m.UpdateDefinitionFunc(def, container, newImageID) } -func (m *Mock) LoadManifests(paths ...string) (map[string]resource.Resource, error) { - return m.LoadManifestsFunc(paths...) +func (m *Mock) LoadManifests(base, first string, rest ...string) (map[string]resource.Resource, error) { + return m.LoadManifestsFunc(base, first, rest...) } func (m *Mock) ParseManifests(def []byte) (map[string]resource.Resource, error) { diff --git a/cluster/sync.go b/cluster/sync.go index 017e3c8f8..9859ee00f 100644 --- a/cluster/sync.go +++ b/cluster/sync.go @@ -11,8 +11,8 @@ import ( // SyncAction represents either the deletion or application (create or // update) of a resource. type SyncAction struct { - Delete resource.Resource // ) one of these - Apply resource.Resource // ) + Delete resource.Resource // ) one of these + Apply resource.Resource // ) } type SyncDef struct { @@ -20,14 +20,14 @@ type SyncDef struct { Actions []SyncAction } -type SyncError struct { +type ResourceError struct { resource.Resource Error error } -type SyncErrors []SyncError +type SyncError []ResourceError -func (err SyncErrors) Error() string { +func (err SyncError) Error() string { var errs []string for _, e := range err { errs = append(errs, e.ResourceID().String()+": "+e.Error.Error()) diff --git a/daemon/daemon_test.go b/daemon/daemon_test.go index a987f0cc6..ca6ab0007 100644 --- a/daemon/daemon_test.go +++ b/daemon/daemon_test.go @@ -291,7 +291,7 @@ func TestDaemon_PolicyUpdate(t *testing.T) { return false } defer co.Clean() - m, err := d.Manifests.LoadManifests(co.ManifestDir()) + m, err := d.Manifests.LoadManifests(co.Dir(), co.ManifestDir()) if err != nil { t.Fatalf("Error: %s", err.Error()) } diff --git a/daemon/loop.go b/daemon/loop.go index 2e7a4db3a..44023e553 100644 --- a/daemon/loop.go +++ b/daemon/loop.go @@ -13,6 +13,7 @@ import ( "context" "github.com/weaveworks/flux" + "github.com/weaveworks/flux/cluster" "github.com/weaveworks/flux/event" "github.com/weaveworks/flux/git" fluxmetrics "github.com/weaveworks/flux/metrics" @@ -187,21 +188,24 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) { } // Get a map of all resources defined in the repo - allResources, err := d.Manifests.LoadManifests(working.ManifestDir()) + allResources, err := d.Manifests.LoadManifests(working.Dir(), working.ManifestDir()) if err != nil { return errors.Wrap(err, "loading resources from repo") } + var syncErrors map[string]string // TODO supply deletes argument from somewhere (command-line?) if err := fluxsync.Sync(d.Manifests, allResources, d.Cluster, false, logger); err != nil { logger.Log("err", err) - // TODO(michael): we should distinguish between "fully mostly - // succeeded" and "failed utterly", since we want to abandon - // this and not move the tag (and send a SyncFail event - // upstream?), if the latter. For now, it's presumed that any - // error returned is at worst a minor, partial failure (e.g., - // a small number of resources failed to sync, for unimportant - // reasons) + switch syncerr := err.(type) { + case cluster.SyncError: + syncErrors = map[string]string{} + for _, e := range syncerr { + syncErrors[fmt.Sprintf("%s (%s)", e.ResourceID(), e.Source())] = e.Error.Error() + } + default: + return err + } } // update notes and emit events for applied commits @@ -232,9 +236,9 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) { } else { ctx, cancel := context.WithTimeout(ctx, gitOpTimeout) changedFiles, err := working.ChangedFiles(ctx, oldTagRev) - if err == nil { + if err == nil && len(changedFiles) > 0 { // We had some changed files, we're syncing a diff - changedResources, err = d.Manifests.LoadManifests(changedFiles...) + changedResources, err = d.Manifests.LoadManifests(working.Dir(), changedFiles[0], changedFiles[1:]...) } cancel() if err != nil { @@ -361,6 +365,7 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) { Commits: cs, InitialSync: initialSync, Includes: includes, + Errors: syncErrors, }, }); err != nil { logger.Log("err", err) diff --git a/event/event.go b/event/event.go index 6cd26ee9e..ee2a6f44d 100644 --- a/event/event.go +++ b/event/event.go @@ -204,6 +204,8 @@ type SyncEventMetadata struct { // policy changes, and "other" (meaning things we didn't commit // ourselves) Includes map[string]bool `json:"includes,omitempty"` + // Per-resource errors + Errors map[string]string `json:"errors,omitempty"` // `true` if we have no record of having synced before InitialSync bool `json:"initialSync,omitempty"` } diff --git a/git/working.go b/git/working.go index 94e95e37f..b81370eb8 100644 --- a/git/working.go +++ b/git/working.go @@ -84,7 +84,12 @@ func (c *Checkout) Clean() { } } -// ManifestDir returns a path to where the files are +// Dir returns the path to the repo +func (c *Checkout) Dir() string { + return c.dir +} + +// ManifestDir returns the path to the manifests files func (c *Checkout) ManifestDir() string { return filepath.Join(c.dir, c.config.Path) } diff --git a/sync/sync_test.go b/sync/sync_test.go index 183002da3..37eb99bcc 100644 --- a/sync/sync_test.go +++ b/sync/sync_test.go @@ -34,7 +34,7 @@ func TestSync(t *testing.T) { manifests := &kubernetes.Manifests{} var clus cluster.Cluster = &syncCluster{mockCluster, map[string][]byte{}} - resources, err := manifests.LoadManifests(checkout.ManifestDir()) + resources, err := manifests.LoadManifests(checkout.Dir(), checkout.ManifestDir()) if err != nil { t.Fatal(err) } @@ -55,7 +55,7 @@ func TestSync(t *testing.T) { break } - resources, err = manifests.LoadManifests(checkout.ManifestDir()) + resources, err = manifests.LoadManifests(checkout.Dir(), checkout.ManifestDir()) if err != nil { t.Fatal(err) } @@ -244,7 +244,7 @@ func checkClusterMatchesFiles(t *testing.T, m cluster.Manifests, c cluster.Clust if err != nil { t.Fatal(err) } - files, err := m.LoadManifests(dir) + files, err := m.LoadManifests(dir, dir) if err != nil { t.Fatal(err) } From cdca84275ffe6d47507f8ab5ca78cef9fae74b4e Mon Sep 17 00:00:00 2001 From: Michael Bridgen Date: Tue, 6 Mar 2018 17:54:32 +0000 Subject: [PATCH 3/3] Supply more structured info in sync errors Instead of using a map, which forces us to give a glommed-together string as the key (so that it can be JSONed), use a slice of structs with the unglommed data. --- daemon/loop.go | 9 ++++++--- event/event.go | 8 +++++++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/daemon/loop.go b/daemon/loop.go index 44023e553..a298ce77f 100644 --- a/daemon/loop.go +++ b/daemon/loop.go @@ -193,15 +193,18 @@ func (d *Daemon) doSync(logger log.Logger) (retErr error) { return errors.Wrap(err, "loading resources from repo") } - var syncErrors map[string]string + var syncErrors []event.ResourceError // TODO supply deletes argument from somewhere (command-line?) if err := fluxsync.Sync(d.Manifests, allResources, d.Cluster, false, logger); err != nil { logger.Log("err", err) switch syncerr := err.(type) { case cluster.SyncError: - syncErrors = map[string]string{} for _, e := range syncerr { - syncErrors[fmt.Sprintf("%s (%s)", e.ResourceID(), e.Source())] = e.Error.Error() + syncErrors = append(syncErrors, event.ResourceError{ + ID: e.ResourceID(), + Path: e.Source(), + Error: e.Error.Error(), + }) } default: return err diff --git a/event/event.go b/event/event.go index ee2a6f44d..ff031c1ac 100644 --- a/event/event.go +++ b/event/event.go @@ -195,6 +195,12 @@ type Commit struct { Message string `json:"message"` } +type ResourceError struct { + ID flux.ResourceID + Path string + Error string +} + // SyncEventMetadata is the metadata for when new a commit is synced to the cluster type SyncEventMetadata struct { // for parsing old events; Commits is now used in preference @@ -205,7 +211,7 @@ type SyncEventMetadata struct { // ourselves) Includes map[string]bool `json:"includes,omitempty"` // Per-resource errors - Errors map[string]string `json:"errors,omitempty"` + Errors []ResourceError `json:"errors,omitempty"` // `true` if we have no record of having synced before InitialSync bool `json:"initialSync,omitempty"` }