Skip to content

Commit

Permalink
Merge pull request #810 from spencerhance/l7-ilb-backends-gc
Browse files Browse the repository at this point in the history
Refactor Backend GC
  • Loading branch information
k8s-ci-robot authored Aug 13, 2019
2 parents 909fa74 + 462b73c commit 69c12a6
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 50 deletions.
11 changes: 3 additions & 8 deletions pkg/backends/backends.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package backends

import (
"fmt"
"k8s.io/ingress-gce/pkg/flags"
"net/http"

"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta"
Expand Down Expand Up @@ -195,17 +194,13 @@ func (b *Backends) Health(name string, version meta.Version, scope meta.KeyType)
}

// List lists all backends managed by this controller.
func (b *Backends) List() ([]*composite.BackendService, error) {
func (b *Backends) List(key *meta.Key, version meta.Version) ([]*composite.BackendService, error) {
// TODO: for consistency with the rest of this sub-package this method
// should return a list of backend ports.
var backends []*composite.BackendService
var err error
if flags.F.EnableL7Ilb {
backends, err = composite.ListAllBackendServices(b.cloud)
} else {
// TODO: (shance) this needs to be changed to not take a key
backends, err = composite.ListBackendServices(b.cloud, meta.GlobalKey(""), meta.VersionGA)
}

backends, err = composite.ListBackendServices(b.cloud, key, version)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/backends/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type Pool interface {
// Get the health of a BackendService given its name.
Health(name string, version meta.Version, scope meta.KeyType) (string, error)
// Get a list of BackendService names that are managed by this pool.
List() ([]*composite.BackendService, error)
List(key *meta.Key, version meta.Version) ([]*composite.BackendService, error)
}

// Syncer is an interface to sync Kubernetes services to GCE BackendServices.
Expand Down
38 changes: 35 additions & 3 deletions pkg/backends/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/backends/features"
"k8s.io/ingress-gce/pkg/composite"
"k8s.io/ingress-gce/pkg/flags"
"k8s.io/ingress-gce/pkg/healthchecks"
lbfeatures "k8s.io/ingress-gce/pkg/loadbalancers/features"
"k8s.io/ingress-gce/pkg/utils"
"k8s.io/klog"
"k8s.io/legacy-cloud-providers/gce"
Expand Down Expand Up @@ -68,7 +70,6 @@ func (s *backendSyncer) Sync(svcPorts []utils.ServicePort) error {
}
}
return nil

}

// ensureBackendService will update or create a BackendService for the given port.
Expand Down Expand Up @@ -143,11 +144,42 @@ func (s *backendSyncer) GC(svcPorts []utils.ServicePort) error {
return err
}

backends, err := s.backendPool.List()
// Only GC L7 ILB backends if it's enabled
if flags.F.EnableL7Ilb {
// TODO(shance): Refactor out empty key field
key, err := composite.CreateKey(s.cloud, "", meta.Regional)
if err != nil {
return fmt.Errorf("error creating l7 ilb key: %v", err)
}
ilbBackends, err := s.backendPool.List(key, lbfeatures.L7ILBVersions().BackendService)
if err != nil {
return fmt.Errorf("error listing regional backends: %v", err)
}
err = s.gc(ilbBackends, knownPorts)
if err != nil {
return fmt.Errorf("error GCing regional Backends: %v", err)
}
}

// Requires an empty name field until it is refactored out
key, err := composite.CreateKey(s.cloud, "", meta.Global)
if err != nil {
return fmt.Errorf("error creating l7 ilb key: %v", err)
}
backends, err := s.backendPool.List(key, meta.VersionGA)
if err != nil {
return fmt.Errorf("error listing backends: %v", err)
}
err = s.gc(backends, knownPorts)
if err != nil {
return fmt.Errorf("error getting the names of controller-managed backends: %v", err)
return fmt.Errorf("error GCing Backends: %v", err)
}

return nil
}

// gc deletes the provided backends
func (s *backendSyncer) gc(backends []*composite.BackendService, knownPorts sets.String) error {
for _, be := range backends {
var key *meta.Key
name := be.Name
Expand Down
179 changes: 147 additions & 32 deletions pkg/backends/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package backends
import (
"context"
"fmt"
"k8s.io/ingress-gce/pkg/composite"
"net/http"
"reflect"
"testing"
Expand All @@ -39,6 +40,78 @@ import (
"k8s.io/legacy-cloud-providers/gce"
)

// portset helps keep track of service ports during GC tests
type portset struct {
// all represents the set all of service ports in the test
all map[utils.ServicePort]bool
// existing represents what should exist in GCE
existing map[utils.ServicePort]bool
}

func newPortset(ports []utils.ServicePort) *portset {
ps := portset{all: map[utils.ServicePort]bool{}, existing: map[utils.ServicePort]bool{}}
for _, sp := range ports {
ps.all[sp] = true
}
return &ps
}

func (p *portset) existingPorts() []utils.ServicePort {
var result []utils.ServicePort
for sp, _ := range p.existing {
result = append(result, sp)
}
return result
}

// Add to 'existing' from all
func (p *portset) add(ports []utils.ServicePort) error {
for _, sp := range ports {
// Sanity check
if found := p.all[sp]; !found {
return fmt.Errorf("%+v not found in p.all", sp)
}
p.existing[sp] = true
}
return nil
}

// Delete from 'existing'
func (p *portset) del(ports []utils.ServicePort) error {
for _, sp := range ports {
found := p.existing[sp]
if !found {
return fmt.Errorf("%+v not found in p.existing", sp)
}
delete(p.existing, sp)
}
return nil
}

// check() iterates through all and checks that the ports in 'existing' exist in gce, and that those
// that are not in 'existing' do not exist
func (p *portset) check(fakeGCE *gce.Cloud) error {
for sp, _ := range p.all {
_, found := p.existing[sp]
beName := sp.BackendName(defaultNamer)
key, err := composite.CreateKey(fakeGCE, beName, features.ScopeFromServicePort(&sp))
if err != nil {
return fmt.Errorf("Error creating key for backend service %s: %v", beName, err)
}

if found {
if _, err := composite.GetBackendService(fakeGCE, key, features.VersionFromServicePort(&sp)); err != nil {
return fmt.Errorf("backend for port %+v should exist, but got: %v", sp.NodePort, err)
}
} else {
if bs, err := composite.GetBackendService(fakeGCE, key, features.VersionFromServicePort(&sp)); !utils.IsHTTPErrorCode(err, http.StatusNotFound) {
return fmt.Errorf("backend for port %+v should not exist, but got %v", sp, bs)
}
}
}
return nil
}

var (
defaultNamer = utils.NewNamer("uid1", "fw1")
defaultBackendSvc = types.NamespacedName{Namespace: "system", Name: "default"}
Expand All @@ -65,6 +138,7 @@ func newTestSyncer(fakeGCE *gce.Cloud) *backendSyncer {
backendPool: fakeBackendPool,
healthChecker: fakeHealthChecks,
namer: defaultNamer,
cloud: fakeGCE,
}

probes := map[utils.ServicePort]*api_v1.Probe{{NodePort: 443, Protocol: annotations.ProtocolHTTPS}: existingProbe}
Expand All @@ -76,6 +150,7 @@ func newTestSyncer(fakeGCE *gce.Cloud) *backendSyncer {
(fakeGCE.Compute().(*cloud.MockGCE)).MockBackendServices.UpdateHook = mock.UpdateBackendServiceHook
(fakeGCE.Compute().(*cloud.MockGCE)).MockHealthChecks.UpdateHook = mock.UpdateHealthCheckHook
(fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaHealthChecks.UpdateHook = mock.UpdateAlphaHealthCheckHook
(fakeGCE.Compute().(*cloud.MockGCE)).MockAlphaRegionHealthChecks.UpdateHook = mock.UpdateAlphaRegionHealthCheckHook
(fakeGCE.Compute().(*cloud.MockGCE)).MockBetaHealthChecks.UpdateHook = mock.UpdateBetaHealthCheckHook

return syncer
Expand Down Expand Up @@ -212,6 +287,7 @@ func TestSyncUpdateHTTP2(t *testing.T) {
}
}

// Test GC with both ELB and ILBs
func TestGC(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE)
Expand All @@ -221,49 +297,88 @@ func TestGC(t *testing.T) {
{NodePort: 82, Protocol: annotations.ProtocolHTTPS},
{NodePort: 83, Protocol: annotations.ProtocolHTTP},
}
ps := newPortset(svcNodePorts)
if err := ps.add(svcNodePorts); err != nil {
t.Fatal(err)
}

if err := syncer.Sync(svcNodePorts); err != nil {
t.Fatalf("Expected syncer to add backends with error, err: %v", err)
if err := syncer.Sync(ps.existingPorts()); err != nil {
t.Fatalf("syncer.Sync(%+v) = %v, want nil ", ps.existingPorts(), err)
}
// Check that all backends were created.
for _, sp := range svcNodePorts {
beName := sp.BackendName(defaultNamer)
if _, err := fakeGCE.GetGlobalBackendService(beName); err != nil {
t.Fatalf("Expected to find backend for port %v, err: %v", sp.NodePort, err)
}

if err := ps.check(fakeGCE); err != nil {
t.Fatal(err)
}

// Run a no-op GC (i.e nothing is actually cleaned up)
if err := syncer.GC(svcNodePorts); err != nil {
t.Fatalf("Expected backend pool to GC, err: %v", err)
if err := syncer.GC(ps.existingPorts()); err != nil {
t.Fatalf("syncer.GC(%+v) = %v, want nil", ps.existingPorts(), err)
}
// Ensure that no backends were actually deleted
for _, sp := range svcNodePorts {
beName := sp.BackendName(defaultNamer)
if _, err := fakeGCE.GetGlobalBackendService(beName); err != nil {
t.Fatalf("Expected to find backend for port %v, err: %v", sp.NodePort, err)
}

// Check that nothing was deleted
if err := ps.check(fakeGCE); err != nil {
t.Fatal(err)
}

deletedPorts := []utils.ServicePort{svcNodePorts[1], svcNodePorts[2]}
svcNodePorts = []utils.ServicePort{svcNodePorts[0]}
if err := syncer.GC(svcNodePorts); err != nil {
t.Fatalf("Expected backend pool to GC, err: %v", err)
if err := ps.del([]utils.ServicePort{svcNodePorts[1], svcNodePorts[2]}); err != nil {
t.Fatal(err)
}

// Ensure that 2 out of the 3 backends were deleted
for _, sp := range deletedPorts {
beName := sp.BackendName(defaultNamer)
if _, err := fakeGCE.GetGlobalBackendService(beName); err == nil {
t.Fatalf("Expected to not find backend for port %v", sp.NodePort)
}
if err := syncer.GC(ps.existingPorts()); err != nil {
t.Fatalf("syncer.GC(%+v) = %v, want nil", ps.existingPorts(), err)
}

// Ensure that the 1 remaining backend exists
for _, sp := range svcNodePorts {
beName := sp.BackendName(defaultNamer)
if _, err := fakeGCE.GetGlobalBackendService(beName); err != nil {
t.Fatalf("Expected to find backend for port %v, err: %v", sp.NodePort, err)
}
if err := ps.check(fakeGCE); err != nil {
t.Fatal(err)
}
}

// Test GC with both ELB and ILBs
func TestGCMixed(t *testing.T) {
fakeGCE := gce.NewFakeGCECloud(gce.DefaultTestClusterValues())
syncer := newTestSyncer(fakeGCE)

svcNodePorts := []utils.ServicePort{
{NodePort: 81, Protocol: annotations.ProtocolHTTP},
{NodePort: 82, Protocol: annotations.ProtocolHTTPS},
{NodePort: 83, Protocol: annotations.ProtocolHTTP},
{NodePort: 84, Protocol: annotations.ProtocolHTTP, NEGEnabled: true, L7ILBEnabled: true},
{NodePort: 85, Protocol: annotations.ProtocolHTTPS, NEGEnabled: true, L7ILBEnabled: true},
{NodePort: 86, Protocol: annotations.ProtocolHTTP, NEGEnabled: true, L7ILBEnabled: true},
}
ps := newPortset(svcNodePorts)
if err := ps.add(svcNodePorts); err != nil {
t.Fatal(err)
}

if err := syncer.Sync(ps.existingPorts()); err != nil {
t.Fatalf("syncer.Sync(%+v) = %v, want nil ", ps.existingPorts(), err)
}

if err := ps.check(fakeGCE); err != nil {
t.Fatal(err)
}

// Run a no-op GC (i.e nothing is actually cleaned up)
if err := syncer.GC(ps.existingPorts()); err != nil {
t.Fatalf("syncer.GC(%+v) = %v, want nil", ps.existingPorts(), err)
}

// Check that nothing was deleted
if err := ps.check(fakeGCE); err != nil {
t.Fatal(err)
}

if err := ps.del([]utils.ServicePort{svcNodePorts[1], svcNodePorts[2]}); err != nil {
t.Fatal(err)
}

if err := syncer.GC(ps.existingPorts()); err != nil {
t.Fatalf("syncer.GC(%+v) = %v, want nil", ps.existingPorts(), err)
}

if err := ps.check(fakeGCE); err != nil {
t.Fatal(err)
}
}

Expand Down
21 changes: 15 additions & 6 deletions pkg/healthchecks/healthchecks.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,17 +208,16 @@ func (h *HealthChecks) updateILB(oldHC, newHC *HealthCheck) error {
// special case ILB to avoid mucking with stable HC code
cloud := h.cloud.(*gce.Cloud)

compositeType, err := composite.ToHealthCheck(newHC)
mergedHC := mergeHealthcheck(oldHC, newHC).ToAlphaComputeHealthCheck()
compositeType, err := composite.ToHealthCheck(mergedHC)
if err != nil {
return fmt.Errorf("Error converting newHC to composite: %v", err)
}
key, err := composite.CreateKey(cloud, newHC.Name, features.L7ILBScope())
key, err := composite.CreateKey(cloud, mergedHC.Name, features.L7ILBScope())

// Update fields
compositeType.Version = features.L7ILBVersions().HealthCheck
compositeType.Region = key.Region
compositeType.HttpHealthCheck.Port = 0
compositeType.HttpHealthCheck.PortSpecification = oldHC.HttpHealthCheck.PortSpecification

return composite.UpdateHealthCheck(cloud, key, compositeType)
}
Expand Down Expand Up @@ -310,11 +309,21 @@ func (h *HealthChecks) getILB(name string) (*HealthCheck, error) {
if err != nil {
return nil, err
}
alphaHC, err := hc.ToAlpha()
gceHC, err := hc.ToAlpha()
if err != nil {
return nil, err
}
return NewHealthCheck(alphaHC)

newHC, err := NewHealthCheck(gceHC)
if err != nil {
return nil, err
}

// Update fields for future update() calls
newHC.forILB = true
newHC.ForNEG = true

return newHC, nil
}

// Get returns the health check by port
Expand Down
1 change: 1 addition & 0 deletions pkg/loadbalancers/loadbalancers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,7 @@ func TestIdenticalHostnameCerts(t *testing.T) {
UrlMap: gceUrlMap,
Ingress: newIngress(),
}

// Sync multiple times to make sure ordering is preserved
for i := 0; i < 10; i++ {
if _, err := j.pool.Ensure(lbInfo); err != nil {
Expand Down

0 comments on commit 69c12a6

Please sign in to comment.