Skip to content

Commit

Permalink
Replace snapshotter in pkg/backends with a GCE cloud lister implement…
Browse files Browse the repository at this point in the history
…ation that is solely used for garbage collection
  • Loading branch information
rramkumar1 committed Oct 18, 2018
1 parent 338b6e6 commit 315742e
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 53 deletions.
44 changes: 14 additions & 30 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,22 @@ package backends
import (
"fmt"
"net/http"
"time"

"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"
"k8s.io/ingress-gce/pkg/backends/features"
"k8s.io/ingress-gce/pkg/cloudlist"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/storage"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
)

// Backends handles CRUD operations for backends.
type Backends struct {
cloud *gce.GCECloud
snapshotter storage.Snapshotter
namer *utils.Namer
cloud *gce.GCECloud
gceLister cloudlist.Lister
namer *utils.Namer
}

// Backends is a Pool.
Expand All @@ -41,28 +40,19 @@ var _ Pool = (*Backends)(nil)
// NewPool returns a new backend pool.
// - cloud: implements BackendServices
// - namer: procudes names for backends.
// - resyncWithCloud: if true, periodically syncs with cloud resources.
func NewPool(
cloud *gce.GCECloud,
namer *utils.Namer,
resyncWithCloud bool) *Backends {

func NewPool(cloud *gce.GCECloud, namer *utils.Namer) *Backends {
backendPool := &Backends{
cloud: cloud,
namer: namer,
}
if !resyncWithCloud {
backendPool.snapshotter = storage.NewInMemoryPool()
return backendPool
}
keyFunc := func(i interface{}) (string, error) {
bs := i.(*compute.BackendService)
if !namer.NameBelongsToCluster(bs.Name) {
return "", fmt.Errorf("unrecognized name %v", bs.Name)
}
return bs.Name, nil
}
backendPool.snapshotter = storage.NewCloudListingPool("backends", keyFunc, backendPool, 30*time.Second)
backendPool.gceLister = cloudlist.NewGCELister("backends", keyFunc, backendPool)
return backendPool
}

Expand Down Expand Up @@ -99,7 +89,6 @@ func (b *Backends) Create(sp utils.ServicePort, hcLink string) (*composite.Backe
if err := composite.CreateBackendService(be, b.cloud); err != nil {
return nil, err
}
b.snapshotter.Add(name, be)
// Note: We need to perform a GCE call to re-fetch the object we just created
// so that the "Fingerprint" field is filled in. This is needed to update the
// object without error.
Expand All @@ -113,7 +102,6 @@ func (b *Backends) Update(be *composite.BackendService) error {
if err := composite.UpdateBackendService(be, b.cloud); err != nil {
return err
}
b.snapshotter.Add(be.Name, be)
return nil
}

Expand All @@ -133,7 +121,6 @@ func (b *Backends) Get(name string, version meta.Version) (*composite.BackendSer
return nil, err
}
}
b.snapshotter.Add(name, be)
return be, nil
}

Expand All @@ -143,9 +130,6 @@ func (b *Backends) Delete(name string) (err error) {
if utils.IsHTTPErrorCode(err, http.StatusNotFound) {
err = nil
}
if err == nil {
b.snapshotter.Delete(name)
}
}()

glog.V(2).Infof("Deleting backend service %v", name)
Expand Down Expand Up @@ -174,17 +158,17 @@ func (b *Backends) Health(name string) string {
return hs.HealthStatus[0].HealthState
}

// GetLocalSnapshot implements Pool.
func (b *Backends) GetLocalSnapshot() []string {
pool := b.snapshotter.Snapshot()
var keys []string
for name := range pool {
keys = append(keys, name)
// GetManagedBackends implements Pool.
func (b *Backends) GetManagedBackends() ([]string, error) {
names, err := b.gceLister.List()
if err != nil {
return []string{}, fmt.Errorf("error listing controller-managed backend services from cloud: %v", err)
}
return keys
return names, nil
}

// List lists all backends.
// List lists all backends. Note that this function will return all backends, even ones that
// are not managed by this controller.
func (b *Backends) List() ([]interface{}, error) {
// TODO: for consistency with the rest of this sub-package this method
// should return a list of backend ports.
Expand Down
2 changes: 1 addition & 1 deletion pkg/backends/ig_linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const defaultZone = "zone-a"

func newTestIGLinker(fakeGCE *gce.GCECloud, fakeInstancePool instances.NodePool) *instanceGroupLinker {
fakeInstancePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}})
fakeBackendPool := NewPool(fakeGCE, defaultNamer, false)
fakeBackendPool := NewPool(fakeGCE, defaultNamer)

// Add standard hooks for mocking update calls. Each test can set a different update hook if it chooses to.
(fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaBackendServices.UpdateHook = mock.UpdateAlphaBackendServiceHook
Expand Down
10 changes: 5 additions & 5 deletions pkg/backends/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ type Jig struct {
pool Pool
}

func newTestJig(fakeGCE *gce.GCECloud, resyncWithCloud bool) *Jig {
func newTestJig(fakeGCE *gce.GCECloud) *Jig {
fakeHealthCheckProvider := healthchecks.NewFakeHealthCheckProvider()
fakeHealthChecks := healthchecks.NewHealthChecker(fakeHealthCheckProvider, "/", "/healthz", defaultNamer, defaultBackendSvc)
fakeBackendPool := NewPool(fakeGCE, defaultNamer, false)
fakeBackendPool := NewPool(fakeGCE, defaultNamer)

fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer)
fakeInstancePool := instances.NewNodePool(fakeIGs, defaultNamer)
Expand All @@ -58,13 +58,13 @@ func newTestJig(fakeGCE *gce.GCECloud, resyncWithCloud bool) *Jig {
fakeInstancePool: fakeInstancePool,
linker: NewInstanceGroupLinker(fakeInstancePool, fakeBackendPool, defaultNamer),
syncer: NewBackendSyncer(fakeBackendPool, fakeHealthChecks, defaultNamer, false),
pool: NewPool(fakeGCE, defaultNamer, resyncWithCloud),
pool: fakeBackendPool,
}
}

func TestBackendInstanceGroupClobbering(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
jig := newTestJig(fakeGCE, false)
jig := newTestJig(fakeGCE)

sp := utils.ServicePort{NodePort: 80}
_, err := jig.fakeInstancePool.EnsureInstanceGroupsAndPorts(defaultNamer.InstanceGroup(), []int64{sp.NodePort})
Expand Down Expand Up @@ -136,7 +136,7 @@ func TestBackendInstanceGroupClobbering(t *testing.T) {

func TestSyncChaosMonkey(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
jig := newTestJig(fakeGCE, false)
jig := newTestJig(fakeGCE)

sp := utils.ServicePort{NodePort: 8080, Protocol: annotations.ProtocolHTTP}

Expand Down
4 changes: 2 additions & 2 deletions pkg/backends/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ type Pool interface {
Delete(name string) error
// Get the health of a BackendService given its name.
Health(name string) string
// Get a list of BackendService names currently in this pool.
GetLocalSnapshot() []string
// Get a list of BackendService names that are managed by this pool.
GetManagedBackends() ([]string, error)
}

// Syncer is an interface to sync Kubernetes services to GCE BackendServices.
Expand Down
2 changes: 1 addition & 1 deletion pkg/backends/neg_linker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

func newTestNEGLinker(fakeNEG negtypes.NetworkEndpointGroupCloud, fakeGCE *gce.GCECloud) *negLinker {
fakeBackendPool := NewPool(fakeGCE, defaultNamer, false)
fakeBackendPool := NewPool(fakeGCE, defaultNamer)

// Add standard hooks for mocking update calls. Each test can set a update different hook if it chooses to.
(fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaBackendServices.UpdateHook = mock.UpdateAlphaBackendServiceHook
Expand Down
8 changes: 7 additions & 1 deletion pkg/backends/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package backends

import (
"fmt"
"net/http"
"strings"

Expand Down Expand Up @@ -137,7 +138,12 @@ func (s *backendSyncer) GC(svcPorts []utils.ServicePort) error {
name := sp.BackendName(s.namer)
knownPorts.Insert(name)
}
backendNames := s.backendPool.GetLocalSnapshot()

backendNames, err := s.backendPool.GetManagedBackends()
if err != nil {
return fmt.Errorf("error getting the names of managed backends: %v", err)
}

for _, name := range backendNames {
if knownPorts.Has(name) {
continue
Expand Down
24 changes: 12 additions & 12 deletions pkg/backends/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ var (
}
)

func newTestSyncer(fakeGCE *gce.GCECloud, poolSyncWithCloud bool) *backendSyncer {
func newTestSyncer(fakeGCE *gce.GCECloud) *backendSyncer {
fakeHealthCheckProvider := healthchecks.NewFakeHealthCheckProvider()
fakeHealthChecks := healthchecks.NewHealthChecker(fakeHealthCheckProvider, "/", "/healthz", defaultNamer, defaultBackendSvc)

fakeBackendPool := NewPool(fakeGCE, defaultNamer, poolSyncWithCloud)
fakeBackendPool := NewPool(fakeGCE, defaultNamer)

syncer := &backendSyncer{
backendPool: fakeBackendPool,
Expand All @@ -82,7 +82,7 @@ func newTestSyncer(fakeGCE *gce.GCECloud, poolSyncWithCloud bool) *backendSyncer

func TestSync(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

testCases := []utils.ServicePort{
{NodePort: 80, Protocol: annotations.ProtocolHTTP},
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestSync(t *testing.T) {

func TestSyncUpdateHTTPS(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

p := utils.ServicePort{NodePort: 3000, Protocol: annotations.ProtocolHTTP}
syncer.Sync([]utils.ServicePort{p})
Expand Down Expand Up @@ -169,7 +169,7 @@ func TestSyncUpdateHTTPS(t *testing.T) {

func TestSyncUpdateHTTP2(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

p := utils.ServicePort{NodePort: 3000, Protocol: annotations.ProtocolHTTP}
syncer.Sync([]utils.ServicePort{p})
Expand Down Expand Up @@ -213,7 +213,7 @@ func TestSyncUpdateHTTP2(t *testing.T) {

func TestGC(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

svcNodePorts := []utils.ServicePort{
{NodePort: 81, Protocol: annotations.ProtocolHTTP},
Expand Down Expand Up @@ -339,7 +339,7 @@ func TestSyncQuota(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

bsCreated := 0
quota := len(tc.newPorts)
Expand Down Expand Up @@ -390,7 +390,7 @@ func TestSyncNEG(t *testing.T) {
// Convert a BackendPool from non-NEG to NEG.
// Expect the old BackendServices to be GC'ed
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

svcPort := utils.ServicePort{NodePort: 81, Protocol: annotations.ProtocolHTTP}
if err := syncer.Sync([]utils.ServicePort{svcPort}); err != nil {
Expand Down Expand Up @@ -438,7 +438,7 @@ func TestSyncNEG(t *testing.T) {

func TestShutdown(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

// Sync a backend and verify that it doesn't exist after Shutdown()
syncer.Sync([]utils.ServicePort{{NodePort: 80}})
Expand Down Expand Up @@ -476,7 +476,7 @@ func TestApplyProbeSettingsToHC(t *testing.T) {

func TestEnsureBackendServiceProtocol(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

svcPorts := []utils.ServicePort{
{NodePort: 80, Protocol: annotations.ProtocolHTTP, ID: utils.ServicePortID{Port: intstr.FromInt(1)}},
Expand Down Expand Up @@ -520,7 +520,7 @@ func TestEnsureBackendServiceProtocol(t *testing.T) {

func TestEnsureBackendServiceDescription(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

svcPorts := []utils.ServicePort{
{NodePort: 80, Protocol: annotations.ProtocolHTTP, ID: utils.ServicePortID{Port: intstr.FromInt(1)}},
Expand Down Expand Up @@ -558,7 +558,7 @@ func TestEnsureBackendServiceDescription(t *testing.T) {

func TestEnsureBackendServiceHealthCheckLink(t *testing.T) {
fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE, false)
syncer := newTestSyncer(fakeGCE)

p := utils.ServicePort{NodePort: 80, Protocol: annotations.ProtocolHTTP, ID: utils.ServicePortID{Port: intstr.FromInt(1)}}
syncer.Sync([]utils.ServicePort{p})
Expand Down
78 changes: 78 additions & 0 deletions pkg/cloudlist/lister.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cloudlist

import (
"fmt"

"github.com/golang/glog"
)

// Lister is an interface to list the names of one specific object from the cloud.
type Lister interface {
// List returns the names of one specific object from the cloud.
List() ([]string, error)
}

type keyFunc func(interface{}) (string, error)

type objectLister interface {
List() ([]interface{}, error)
}

// GCECLister lists the names of a single GCE object.
type GCELister struct {
// name is used to distinguish different listers in the logs.
name string
// An interface that lists objects of a specific type from GCE.
objectLister objectLister
// A function capable of producing a key for a given object.
keyGetter keyFunc
}

// List implements Lister.
func (c *GCELister) List() ([]string, error) {
glog.V(4).Infof("GCELister %q is pulling objects", c.name)

items, err := c.objectLister.List()
if err != nil {
return []string{}, fmt.Errorf("Failed to list %q: %v", c.name, err)
}

objectNames := make([]string, 0)
for i := range items {
key, err := c.keyGetter(items[i])
if err != nil {
glog.V(5).Infof("GCELister %q failed to extract key from object %+v: %v", c.name, i, err)
continue
}
objectNames = append(objectNames, key)
}

return objectNames, nil
}

// NewGCELister lists the names of a specific object from GCE.
func NewGCELister(name string, k keyFunc, objectLister objectLister) Lister {
gceLister := &GCELister{
name: name,
objectLister: objectLister,
keyGetter: k,
}
glog.V(4).Infof("Starting GCELister %q", gceLister.name)
return gceLister
}
2 changes: 1 addition & 1 deletion pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewLoadBalancerController(
})
healthChecker := healthchecks.NewHealthChecker(ctx.Cloud, ctx.HealthCheckPath, ctx.DefaultBackendHealthCheckPath, ctx.ClusterNamer, ctx.DefaultBackendSvcPortID.Service)
instancePool := instances.NewNodePool(ctx.Cloud, ctx.ClusterNamer)
backendPool := backends.NewPool(ctx.Cloud, ctx.ClusterNamer, true)
backendPool := backends.NewPool(ctx.Cloud, ctx.ClusterNamer)
lbc := LoadBalancerController{
ctx: ctx,
ingLister: utils.StoreToIngressLister{Store: ctx.IngressInformer.GetStore()},
Expand Down

0 comments on commit 315742e

Please sign in to comment.