Skip to content
This repository has been archived by the owner on Nov 1, 2022. It is now read-only.

Commit

Permalink
Merge pull request #869 from weaveworks/issue/868-cannot-read-cronjob
Browse files Browse the repository at this point in the history
Separate filtering into pre- and post- phases
  • Loading branch information
squaremo authored Dec 29, 2017
2 parents 1d034df + d1ca57f commit 687e1f7
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 112 deletions.
84 changes: 39 additions & 45 deletions release/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,94 +59,88 @@ func (rc *ReleaseContext) WriteUpdates(updates []*update.ControllerUpdate) error
// ---

// SelectServices finds the services that exist both in the definition
// files and the running platform.
//
// `ServiceFilter`s can be provided to filter the found services.
// Be careful about the ordering of the filters. Filters that are earlier
// in the slice will have higher priority (they are run first).
func (rc *ReleaseContext) SelectServices(results update.Result, filters ...update.ControllerFilter) ([]*update.ControllerUpdate, error) {
defined, err := rc.FindDefinedServices()
// files and the running platform. `ControllerFilter`s can be provided
// to filter the controllers so found, either before (`prefilters`) or
// after (`postfilters`) consulting the cluster.
func (rc *ReleaseContext) SelectServices(results update.Result, prefilters, postfilters []update.ControllerFilter) ([]*update.ControllerUpdate, error) {

// Start with all the controllers that are defined in the repo.
allDefined, err := rc.FindDefinedServices()
if err != nil {
return nil, err
}

var ids []flux.ResourceID
definedMap := map[flux.ResourceID]*update.ControllerUpdate{}
for _, s := range defined {
ids = append(ids, s.ResourceID)
definedMap[s.ResourceID] = s
// Apply prefilters to select the controllers that we'll ask the
// cluster about.
var toAskClusterAbout []flux.ResourceID
for _, s := range allDefined {
res := s.Filter(prefilters...)
if res.Error == "" {
// Give these a default value, in case we don't find them
// in the cluster.
results[s.ResourceID] = update.ControllerResult{
Status: update.ReleaseStatusSkipped,
Error: update.NotInCluster,
}
toAskClusterAbout = append(toAskClusterAbout, s.ResourceID)
} else {
results[s.ResourceID] = res
}
}

// Correlate with services in running system.
services, err := rc.cluster.SomeControllers(ids)
// Ask the cluster about those that we're still interested in
definedAndRunning, err := rc.cluster.SomeControllers(toAskClusterAbout)
if err != nil {
return nil, err
}

var forPostFiltering []*update.ControllerUpdate
// Compare defined vs running
var updates []*update.ControllerUpdate
for _, s := range services {
update, ok := definedMap[s.ID]
for _, s := range definedAndRunning {
update, ok := allDefined[s.ID]
if !ok {
// Found running service, but not defined...
continue
// A contradiction: we asked only about defined
// controllers, and got a controller that is not
// defined.
return nil, fmt.Errorf("controller %s was requested and is running, but is not defined", s.ID)
}
update.Controller = s
updates = append(updates, update)
delete(definedMap, s.ID)
forPostFiltering = append(forPostFiltering, update)
}

// Filter both updates ...
var filteredUpdates []*update.ControllerUpdate
for _, s := range updates {
fr := s.Filter(filters...)
for _, s := range forPostFiltering {
fr := s.Filter(postfilters...)
results[s.ResourceID] = fr
if fr.Status == update.ReleaseStatusSuccess || fr.Status == "" {
filteredUpdates = append(filteredUpdates, s)
}
}

// ... and missing services
filteredDefined := map[flux.ResourceID]*update.ControllerUpdate{}
for k, s := range definedMap {
fr := s.Filter(filters...)
results[s.ResourceID] = fr
if fr.Status != update.ReleaseStatusIgnored {
filteredDefined[k] = s
}
}

// Mark anything left over as skipped
for id, _ := range filteredDefined {
results[id] = update.ControllerResult{
Status: update.ReleaseStatusSkipped,
Error: update.NotInCluster,
}
}
return filteredUpdates, nil
}

func (rc *ReleaseContext) FindDefinedServices() ([]*update.ControllerUpdate, error) {
func (rc *ReleaseContext) FindDefinedServices() (map[flux.ResourceID]*update.ControllerUpdate, error) {
rc.repo.RLock()
defer rc.repo.RUnlock()
services, err := rc.manifests.FindDefinedServices(rc.repo.ManifestDir())
if err != nil {
return nil, err
}

var defined []*update.ControllerUpdate
var defined = map[flux.ResourceID]*update.ControllerUpdate{}
for id, paths := range services {
switch len(paths) {
case 1:
def, err := ioutil.ReadFile(paths[0])
if err != nil {
return nil, err
}
defined = append(defined, &update.ControllerUpdate{
defined[id] = &update.ControllerUpdate{
ResourceID: id,
ManifestPath: paths[0],
ManifestBytes: def,
})
}
default:
return nil, fmt.Errorf("multiple resource files found for service %s: %s", id, strings.Join(paths, ", "))
}
Expand Down
65 changes: 32 additions & 33 deletions release/releaser_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package release

import (
"encoding/json"
"reflect"
"testing"
"time"
Expand Down Expand Up @@ -94,7 +95,7 @@ var (
CreatedAt: timeNow,
},
{
ID: canonSidecarRef,
ID: newSidecarRef,
CreatedAt: timeNow,
},
{
Expand All @@ -106,23 +107,31 @@ var (
mockManifests = &kubernetes.Manifests{}
)

func setup(t *testing.T) (*git.Checkout, func()) {
return gittest.Checkout(t)
}

func Test_FilterLogic(t *testing.T) {
mockCluster := &cluster.Mock{
func mockCluster(running ...cluster.Controller) *cluster.Mock {
return &cluster.Mock{
AllServicesFunc: func(string) ([]cluster.Controller, error) {
return allSvcs, nil
return running, nil
},
SomeServicesFunc: func([]flux.ResourceID) ([]cluster.Controller, error) {
return []cluster.Controller{
hwSvc,
lockedSvc,
}, nil
SomeServicesFunc: func(ids []flux.ResourceID) ([]cluster.Controller, error) {
var res []cluster.Controller
for _, id := range ids {
for _, svc := range running {
if id == svc.ID {
res = append(res, svc)
}
}
}
return res, nil
},
}
}

func setup(t *testing.T) (*git.Checkout, func()) {
return gittest.Checkout(t)
}

func Test_FilterLogic(t *testing.T) {
cluster := mockCluster(hwSvc, lockedSvc) // no testsvc in cluster, but it _is_ in repo
notInRepoService := "default:deployment/notInRepo"
notInRepoSpec, _ := update.ParseResourceSpec(notInRepoService)
for _, tst := range []struct {
Expand All @@ -132,7 +141,7 @@ func Test_FilterLogic(t *testing.T) {
}{
// ignored if: excluded OR not included OR not correct image.
{
Name: "not included",
Name: "include specific service",
Spec: update.ReleaseSpec{
ServiceSpecs: []update.ResourceSpec{hwSvcSpec},
ImageSpec: update.ImageSpecLatest,
Expand Down Expand Up @@ -165,7 +174,7 @@ func Test_FilterLogic(t *testing.T) {
},
},
}, {
Name: "excluded",
Name: "exclude specific service",
Spec: update.ReleaseSpec{
ServiceSpecs: []update.ResourceSpec{update.ResourceSpecAll},
ImageSpec: update.ImageSpecLatest,
Expand Down Expand Up @@ -198,7 +207,7 @@ func Test_FilterLogic(t *testing.T) {
},
},
}, {
Name: "not image",
Name: "update specific image",
Spec: update.ReleaseSpec{
ServiceSpecs: []update.ResourceSpec{update.ResourceSpecAll},
ImageSpec: update.ImageSpecFromRef(newHwRef),
Expand All @@ -221,7 +230,7 @@ func Test_FilterLogic(t *testing.T) {
Error: update.DifferentImage,
},
flux.MustParseResourceID("default:deployment/test-service"): update.ControllerResult{
Status: update.ReleaseStatusIgnored,
Status: update.ReleaseStatusSkipped,
Error: update.NotInCluster,
},
},
Expand Down Expand Up @@ -327,7 +336,7 @@ func Test_FilterLogic(t *testing.T) {
checkout, cleanup := setup(t)
defer cleanup()
testRelease(t, tst.Name, &ReleaseContext{
cluster: mockCluster,
cluster: cluster,
manifests: mockManifests,
registry: mockRegistry,
repo: checkout,
Expand All @@ -336,19 +345,7 @@ func Test_FilterLogic(t *testing.T) {
}

func Test_ImageStatus(t *testing.T) {
mockCluster := &cluster.Mock{
AllServicesFunc: func(string) ([]cluster.Controller, error) {
return allSvcs, nil
},
SomeServicesFunc: func([]flux.ResourceID) ([]cluster.Controller, error) {
return []cluster.Controller{
hwSvc,
lockedSvc,
testSvc,
}, nil
},
}

cluster := mockCluster(hwSvc, lockedSvc, testSvc)
upToDateRegistry := &registryMock.Registry{
Images: []image.Info{
{
Expand Down Expand Up @@ -417,7 +414,7 @@ func Test_ImageStatus(t *testing.T) {
checkout, cleanup := setup(t)
defer cleanup()
ctx := &ReleaseContext{
cluster: mockCluster,
cluster: cluster,
manifests: mockManifests,
repo: checkout,
registry: upToDateRegistry,
Expand All @@ -432,6 +429,8 @@ func testRelease(t *testing.T, name string, ctx *ReleaseContext, spec update.Rel
t.Fatal(err)
}
if !reflect.DeepEqual(expected, results) {
t.Errorf("%s - expected:\n%#v, got:\n%#v", name, expected, results)
exp, _ := json.Marshal(expected)
got, _ := json.Marshal(results)
t.Errorf("%s\n--- expected ---\n%s\n--- got ---\n%s\n", name, string(exp), string(got))
}
}
13 changes: 3 additions & 10 deletions update/automated.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,12 @@ func (a *Automated) Add(service flux.ResourceID, container cluster.Container, im
}

func (a *Automated) CalculateRelease(rc ReleaseContext, logger log.Logger) ([]*ControllerUpdate, Result, error) {
filters, err := a.filters(rc)
if err != nil {
return nil, nil, err
prefilters := []ControllerFilter{
&IncludeFilter{a.serviceIDs()},
}

result := Result{}
updates, err := rc.SelectServices(result, filters...)
updates, err := rc.SelectServices(result, prefilters, nil)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -73,12 +72,6 @@ func (a *Automated) Images() []image.Ref {
return images
}

func (a *Automated) filters(rc ReleaseContext) ([]ControllerFilter, error) {
return []ControllerFilter{
&IncludeFilter{a.serviceIDs()},
}, nil
}

func (a *Automated) markSkipped(results Result) {
for _, v := range a.serviceIDs() {
if _, ok := results[v]; !ok {
Expand Down
45 changes: 21 additions & 24 deletions update/release.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func ParseReleaseKind(s string) (ReleaseKind, error) {
const UserAutomated = "<automated>"

type ReleaseContext interface {
SelectServices(Result, ...ControllerFilter) ([]*ControllerUpdate, error)
SelectServices(Result, []ControllerFilter, []ControllerFilter) ([]*ControllerUpdate, error)
ServicesWithPolicies() (policy.ResourceMap, error)
Registry() registry.Registry
Manifests() cluster.Manifests
Expand Down Expand Up @@ -109,30 +109,17 @@ func (s ReleaseSpec) CommitMessage() string {
// repo. Fill in the release results along the way.
func (s ReleaseSpec) selectServices(rc ReleaseContext, results Result) ([]*ControllerUpdate, error) {
// Build list of filters
filtList, err := s.filters(rc)
prefilters, postfilters, err := s.filters(rc)
if err != nil {
return nil, err
}
// Find and filter services
return rc.SelectServices(
results,
filtList...,
)
return rc.SelectServices(results, prefilters, postfilters)
}

// filters converts a ReleaseSpec (and Lock config) into ServiceFilters
func (s ReleaseSpec) filters(rc ReleaseContext) ([]ControllerFilter, error) {
// Image filter
var filtList []ControllerFilter
if s.ImageSpec != ImageSpecLatest {
id, err := image.ParseRef(s.ImageSpec.String())
if err != nil {
return nil, err
}
filtList = append(filtList, &SpecificImageFilter{id})
}
func (s ReleaseSpec) filters(rc ReleaseContext) ([]ControllerFilter, []ControllerFilter, error) {
var prefilters, postfilters []ControllerFilter

// Service filter
ids := []flux.ResourceID{}
for _, s := range s.ServiceSpecs {
if s == ResourceSpecAll {
Expand All @@ -142,27 +129,37 @@ func (s ReleaseSpec) filters(rc ReleaseContext) ([]ControllerFilter, error) {
}
id, err := flux.ParseResourceID(string(s))
if err != nil {
return nil, err
return nil, nil, err
}
ids = append(ids, id)
}
if len(ids) > 0 {
filtList = append(filtList, &IncludeFilter{ids})
prefilters = append(prefilters, &IncludeFilter{ids})
}

// Exclude filter
if len(s.Excludes) > 0 {
filtList = append(filtList, &ExcludeFilter{s.Excludes})
prefilters = append(prefilters, &ExcludeFilter{s.Excludes})
}

// Image filter
if s.ImageSpec != ImageSpecLatest {
id, err := image.ParseRef(s.ImageSpec.String())
if err != nil {
return nil, nil, err
}
postfilters = append(postfilters, &SpecificImageFilter{id})
}

// Locked filter
services, err := rc.ServicesWithPolicies()
if err != nil {
return nil, err
return nil, nil, err
}
lockedSet := services.OnlyWithPolicy(policy.Locked)
filtList = append(filtList, &LockedFilter{lockedSet.ToSlice()})
return filtList, nil
postfilters = append(postfilters, &LockedFilter{lockedSet.ToSlice()})

return prefilters, postfilters, nil
}

func (s ReleaseSpec) markSkipped(results Result) {
Expand Down

0 comments on commit 687e1f7

Please sign in to comment.