From 315742e7bed7312abb9508c4a9aec12722c1d925 Mon Sep 17 00:00:00 2001 From: Rohit Ramkumar Date: Thu, 18 Oct 2018 11:52:13 -0700 Subject: [PATCH] Replace snapshotter in pkg/backends with a GCE cloud lister implementation that is solely used for garbage collection --- pkg/backends/backends.go | 44 ++++++------------ pkg/backends/ig_linker_test.go | 2 +- pkg/backends/integration_test.go | 10 ++-- pkg/backends/interfaces.go | 4 +- pkg/backends/neg_linker_test.go | 2 +- pkg/backends/syncer.go | 8 +++- pkg/backends/syncer_test.go | 24 +++++----- pkg/cloudlist/lister.go | 78 ++++++++++++++++++++++++++++++++ pkg/controller/controller.go | 2 +- 9 files changed, 121 insertions(+), 53 deletions(-) create mode 100644 pkg/cloudlist/lister.go diff --git a/pkg/backends/backends.go b/pkg/backends/backends.go index dc4948c68d..b7c30ebdfe 100644 --- a/pkg/backends/backends.go +++ b/pkg/backends/backends.go @@ -16,13 +16,12 @@ package backends import ( "fmt" "net/http" - "time" "github.com/golang/glog" compute "google.golang.org/api/compute/v1" "k8s.io/ingress-gce/pkg/backends/features" + "k8s.io/ingress-gce/pkg/cloudlist" "k8s.io/ingress-gce/pkg/composite" - "k8s.io/ingress-gce/pkg/storage" "k8s.io/ingress-gce/pkg/utils" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta" @@ -30,9 +29,9 @@ import ( // Backends handles CRUD operations for backends. type Backends struct { - cloud *gce.GCECloud - snapshotter storage.Snapshotter - namer *utils.Namer + cloud *gce.GCECloud + gceLister cloudlist.Lister + namer *utils.Namer } // Backends is a Pool. @@ -41,20 +40,11 @@ var _ Pool = (*Backends)(nil) // NewPool returns a new backend pool. // - cloud: implements BackendServices // - namer: procudes names for backends. -// - resyncWithCloud: if true, periodically syncs with cloud resources. -func NewPool( - cloud *gce.GCECloud, - namer *utils.Namer, - resyncWithCloud bool) *Backends { - +func NewPool(cloud *gce.GCECloud, namer *utils.Namer) *Backends { backendPool := &Backends{ cloud: cloud, namer: namer, } - if !resyncWithCloud { - backendPool.snapshotter = storage.NewInMemoryPool() - return backendPool - } keyFunc := func(i interface{}) (string, error) { bs := i.(*compute.BackendService) if !namer.NameBelongsToCluster(bs.Name) { @@ -62,7 +52,7 @@ func NewPool( } return bs.Name, nil } - backendPool.snapshotter = storage.NewCloudListingPool("backends", keyFunc, backendPool, 30*time.Second) + backendPool.gceLister = cloudlist.NewGCELister("backends", keyFunc, backendPool) return backendPool } @@ -99,7 +89,6 @@ func (b *Backends) Create(sp utils.ServicePort, hcLink string) (*composite.Backe if err := composite.CreateBackendService(be, b.cloud); err != nil { return nil, err } - b.snapshotter.Add(name, be) // Note: We need to perform a GCE call to re-fetch the object we just created // so that the "Fingerprint" field is filled in. This is needed to update the // object without error. @@ -113,7 +102,6 @@ func (b *Backends) Update(be *composite.BackendService) error { if err := composite.UpdateBackendService(be, b.cloud); err != nil { return err } - b.snapshotter.Add(be.Name, be) return nil } @@ -133,7 +121,6 @@ func (b *Backends) Get(name string, version meta.Version) (*composite.BackendSer return nil, err } } - b.snapshotter.Add(name, be) return be, nil } @@ -143,9 +130,6 @@ func (b *Backends) Delete(name string) (err error) { if utils.IsHTTPErrorCode(err, http.StatusNotFound) { err = nil } - if err == nil { - b.snapshotter.Delete(name) - } }() glog.V(2).Infof("Deleting backend service %v", name) @@ -174,17 +158,17 @@ func (b *Backends) Health(name string) string { return hs.HealthStatus[0].HealthState } -// GetLocalSnapshot implements Pool. -func (b *Backends) GetLocalSnapshot() []string { - pool := b.snapshotter.Snapshot() - var keys []string - for name := range pool { - keys = append(keys, name) +// GetManagedBackends implements Pool. +func (b *Backends) GetManagedBackends() ([]string, error) { + names, err := b.gceLister.List() + if err != nil { + return []string{}, fmt.Errorf("error listing controller-managed backend services from cloud: %v", err) } - return keys + return names, nil } -// List lists all backends. +// List lists all backends. Note that this function will return all backends, even ones that +// are not managed by this controller. func (b *Backends) List() ([]interface{}, error) { // TODO: for consistency with the rest of this sub-package this method // should return a list of backend ports. diff --git a/pkg/backends/ig_linker_test.go b/pkg/backends/ig_linker_test.go index 0344948472..20d45fc81b 100644 --- a/pkg/backends/ig_linker_test.go +++ b/pkg/backends/ig_linker_test.go @@ -34,7 +34,7 @@ const defaultZone = "zone-a" func newTestIGLinker(fakeGCE *gce.GCECloud, fakeInstancePool instances.NodePool) *instanceGroupLinker { fakeInstancePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}}) - fakeBackendPool := NewPool(fakeGCE, defaultNamer, false) + fakeBackendPool := NewPool(fakeGCE, defaultNamer) // Add standard hooks for mocking update calls. Each test can set a different update hook if it chooses to. (fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaBackendServices.UpdateHook = mock.UpdateAlphaBackendServiceHook diff --git a/pkg/backends/integration_test.go b/pkg/backends/integration_test.go index 8f4a83bcff..17d47a0dbb 100644 --- a/pkg/backends/integration_test.go +++ b/pkg/backends/integration_test.go @@ -40,10 +40,10 @@ type Jig struct { pool Pool } -func newTestJig(fakeGCE *gce.GCECloud, resyncWithCloud bool) *Jig { +func newTestJig(fakeGCE *gce.GCECloud) *Jig { fakeHealthCheckProvider := healthchecks.NewFakeHealthCheckProvider() fakeHealthChecks := healthchecks.NewHealthChecker(fakeHealthCheckProvider, "/", "/healthz", defaultNamer, defaultBackendSvc) - fakeBackendPool := NewPool(fakeGCE, defaultNamer, false) + fakeBackendPool := NewPool(fakeGCE, defaultNamer) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer) fakeInstancePool := instances.NewNodePool(fakeIGs, defaultNamer) @@ -58,13 +58,13 @@ func newTestJig(fakeGCE *gce.GCECloud, resyncWithCloud bool) *Jig { fakeInstancePool: fakeInstancePool, linker: NewInstanceGroupLinker(fakeInstancePool, fakeBackendPool, defaultNamer), syncer: NewBackendSyncer(fakeBackendPool, fakeHealthChecks, defaultNamer, false), - pool: NewPool(fakeGCE, defaultNamer, resyncWithCloud), + pool: fakeBackendPool, } } func TestBackendInstanceGroupClobbering(t *testing.T) { fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues()) - jig := newTestJig(fakeGCE, false) + jig := newTestJig(fakeGCE) sp := utils.ServicePort{NodePort: 80} _, err := jig.fakeInstancePool.EnsureInstanceGroupsAndPorts(defaultNamer.InstanceGroup(), []int64{sp.NodePort}) @@ -136,7 +136,7 @@ func TestBackendInstanceGroupClobbering(t *testing.T) { func TestSyncChaosMonkey(t *testing.T) { fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues()) - jig := newTestJig(fakeGCE, false) + jig := newTestJig(fakeGCE) sp := utils.ServicePort{NodePort: 8080, Protocol: annotations.ProtocolHTTP} diff --git a/pkg/backends/interfaces.go b/pkg/backends/interfaces.go index 10af4ceb51..8c1515bde6 100644 --- a/pkg/backends/interfaces.go +++ b/pkg/backends/interfaces.go @@ -44,8 +44,8 @@ type Pool interface { Delete(name string) error // Get the health of a BackendService given its name. Health(name string) string - // Get a list of BackendService names currently in this pool. - GetLocalSnapshot() []string + // Get a list of BackendService names that are managed by this pool. + GetManagedBackends() ([]string, error) } // Syncer is an interface to sync Kubernetes services to GCE BackendServices. diff --git a/pkg/backends/neg_linker_test.go b/pkg/backends/neg_linker_test.go index 609387f690..f3cc658f61 100644 --- a/pkg/backends/neg_linker_test.go +++ b/pkg/backends/neg_linker_test.go @@ -28,7 +28,7 @@ import ( ) func newTestNEGLinker(fakeNEG negtypes.NetworkEndpointGroupCloud, fakeGCE *gce.GCECloud) *negLinker { - fakeBackendPool := NewPool(fakeGCE, defaultNamer, false) + fakeBackendPool := NewPool(fakeGCE, defaultNamer) // Add standard hooks for mocking update calls. Each test can set a update different hook if it chooses to. (fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaBackendServices.UpdateHook = mock.UpdateAlphaBackendServiceHook diff --git a/pkg/backends/syncer.go b/pkg/backends/syncer.go index d94841e469..af85f8c7b8 100644 --- a/pkg/backends/syncer.go +++ b/pkg/backends/syncer.go @@ -14,6 +14,7 @@ limitations under the License. package backends import ( + "fmt" "net/http" "strings" @@ -137,7 +138,12 @@ func (s *backendSyncer) GC(svcPorts []utils.ServicePort) error { name := sp.BackendName(s.namer) knownPorts.Insert(name) } - backendNames := s.backendPool.GetLocalSnapshot() + + backendNames, err := s.backendPool.GetManagedBackends() + if err != nil { + return fmt.Errorf("error getting the names of managed backends: %v", err) + } + for _, name := range backendNames { if knownPorts.Has(name) { continue diff --git a/pkg/backends/syncer_test.go b/pkg/backends/syncer_test.go index 9b23e05334..bd96c0fefa 100644 --- a/pkg/backends/syncer_test.go +++ b/pkg/backends/syncer_test.go @@ -56,11 +56,11 @@ var ( } ) -func newTestSyncer(fakeGCE *gce.GCECloud, poolSyncWithCloud bool) *backendSyncer { +func newTestSyncer(fakeGCE *gce.GCECloud) *backendSyncer { fakeHealthCheckProvider := healthchecks.NewFakeHealthCheckProvider() fakeHealthChecks := healthchecks.NewHealthChecker(fakeHealthCheckProvider, "/", "/healthz", defaultNamer, defaultBackendSvc) - fakeBackendPool := NewPool(fakeGCE, defaultNamer, poolSyncWithCloud) + fakeBackendPool := NewPool(fakeGCE, defaultNamer) syncer := &backendSyncer{ backendPool: fakeBackendPool, @@ -82,7 +82,7 @@ func newTestSyncer(fakeGCE *gce.GCECloud, poolSyncWithCloud bool) *backendSyncer func TestSync(t *testing.T) { fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues()) - syncer := newTestSyncer(fakeGCE, false) + syncer := newTestSyncer(fakeGCE) testCases := []utils.ServicePort{ {NodePort: 80, Protocol: annotations.ProtocolHTTP}, @@ -125,7 +125,7 @@ func TestSync(t *testing.T) { func TestSyncUpdateHTTPS(t *testing.T) { fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues()) - syncer := newTestSyncer(fakeGCE, false) + syncer := newTestSyncer(fakeGCE) p := utils.ServicePort{NodePort: 3000, Protocol: annotations.ProtocolHTTP} syncer.Sync([]utils.ServicePort{p}) @@ -169,7 +169,7 @@ func TestSyncUpdateHTTPS(t *testing.T) { func TestSyncUpdateHTTP2(t *testing.T) { fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues()) - syncer := newTestSyncer(fakeGCE, false) + syncer := newTestSyncer(fakeGCE) p := utils.ServicePort{NodePort: 3000, Protocol: annotations.ProtocolHTTP} syncer.Sync([]utils.ServicePort{p}) @@ -213,7 +213,7 @@ func TestSyncUpdateHTTP2(t *testing.T) { func TestGC(t *testing.T) { fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues()) - syncer := newTestSyncer(fakeGCE, false) + syncer := newTestSyncer(fakeGCE) svcNodePorts := []utils.ServicePort{ {NodePort: 81, Protocol: annotations.ProtocolHTTP}, @@ -339,7 +339,7 @@ func TestSyncQuota(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues()) - syncer := newTestSyncer(fakeGCE, false) + syncer := newTestSyncer(fakeGCE) bsCreated := 0 quota := len(tc.newPorts) @@ -390,7 +390,7 @@ func TestSyncNEG(t *testing.T) { // Convert a BackendPool from non-NEG to NEG. // Expect the old BackendServices to be GC'ed fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues()) - syncer := newTestSyncer(fakeGCE, false) + syncer := newTestSyncer(fakeGCE) svcPort := utils.ServicePort{NodePort: 81, Protocol: annotations.ProtocolHTTP} if err := syncer.Sync([]utils.ServicePort{svcPort}); err != nil { @@ -438,7 +438,7 @@ func TestSyncNEG(t *testing.T) { func TestShutdown(t *testing.T) { fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues()) - syncer := newTestSyncer(fakeGCE, false) + syncer := newTestSyncer(fakeGCE) // Sync a backend and verify that it doesn't exist after Shutdown() syncer.Sync([]utils.ServicePort{{NodePort: 80}}) @@ -476,7 +476,7 @@ func TestApplyProbeSettingsToHC(t *testing.T) { func TestEnsureBackendServiceProtocol(t *testing.T) { fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues()) - syncer := newTestSyncer(fakeGCE, false) + syncer := newTestSyncer(fakeGCE) svcPorts := []utils.ServicePort{ {NodePort: 80, Protocol: annotations.ProtocolHTTP, ID: utils.ServicePortID{Port: intstr.FromInt(1)}}, @@ -520,7 +520,7 @@ func TestEnsureBackendServiceProtocol(t *testing.T) { func TestEnsureBackendServiceDescription(t *testing.T) { fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues()) - syncer := newTestSyncer(fakeGCE, false) + syncer := newTestSyncer(fakeGCE) svcPorts := []utils.ServicePort{ {NodePort: 80, Protocol: annotations.ProtocolHTTP, ID: utils.ServicePortID{Port: intstr.FromInt(1)}}, @@ -558,7 +558,7 @@ func TestEnsureBackendServiceDescription(t *testing.T) { func TestEnsureBackendServiceHealthCheckLink(t *testing.T) { fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues()) - syncer := newTestSyncer(fakeGCE, false) + syncer := newTestSyncer(fakeGCE) p := utils.ServicePort{NodePort: 80, Protocol: annotations.ProtocolHTTP, ID: utils.ServicePortID{Port: intstr.FromInt(1)}} syncer.Sync([]utils.ServicePort{p}) diff --git a/pkg/cloudlist/lister.go b/pkg/cloudlist/lister.go new file mode 100644 index 0000000000..6df13b069e --- /dev/null +++ b/pkg/cloudlist/lister.go @@ -0,0 +1,78 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloudlist + +import ( + "fmt" + + "github.com/golang/glog" +) + +// Lister is an interface to list the names of one specific object from the cloud. +type Lister interface { + // List returns the names of one specific object from the cloud. + List() ([]string, error) +} + +type keyFunc func(interface{}) (string, error) + +type objectLister interface { + List() ([]interface{}, error) +} + +// GCECLister lists the names of a single GCE object. +type GCELister struct { + // name is used to distinguish different listers in the logs. + name string + // An interface that lists objects of a specific type from GCE. + objectLister objectLister + // A function capable of producing a key for a given object. + keyGetter keyFunc +} + +// List implements Lister. +func (c *GCELister) List() ([]string, error) { + glog.V(4).Infof("GCELister %q is pulling objects", c.name) + + items, err := c.objectLister.List() + if err != nil { + return []string{}, fmt.Errorf("Failed to list %q: %v", c.name, err) + } + + objectNames := make([]string, 0) + for i := range items { + key, err := c.keyGetter(items[i]) + if err != nil { + glog.V(5).Infof("GCELister %q failed to extract key from object %+v: %v", c.name, i, err) + continue + } + objectNames = append(objectNames, key) + } + + return objectNames, nil +} + +// NewGCELister lists the names of a specific object from GCE. +func NewGCELister(name string, k keyFunc, objectLister objectLister) Lister { + gceLister := &GCELister{ + name: name, + objectLister: objectLister, + keyGetter: k, + } + glog.V(4).Infof("Starting GCELister %q", gceLister.name) + return gceLister +} diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 18fc82b004..676bdd0a5f 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -99,7 +99,7 @@ func NewLoadBalancerController( }) healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendHealthCheckPath, ctx.ClusterNamer, ctx.DefaultBackendSvcPortID.Service) instancePool := instances.NewNodePool(ctx.Cloud, ctx.ClusterNamer) - backendPool := backends.NewPool(ctx.Cloud, ctx.ClusterNamer, true) + backendPool := backends.NewPool(ctx.Cloud, ctx.ClusterNamer) lbc := LoadBalancerController{ ctx: ctx, ingLister: utils.StoreToIngressLister{Store: ctx.IngressInformer.GetStore()},