From 2a78584f184bc465f5e5aeb7865f7f718373a941 Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Tue, 16 Jan 2018 10:30:50 -0800 Subject: [PATCH 01/10] Improve logging changes in firewalls --- pkg/firewalls/firewalls.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/firewalls/firewalls.go b/pkg/firewalls/firewalls.go index 19cbb2dedc..8d1c210c90 100644 --- a/pkg/firewalls/firewalls.go +++ b/pkg/firewalls/firewalls.go @@ -54,6 +54,7 @@ func NewFirewallPool(cloud Firewall, namer *utils.Namer) SingleFirewallPool { // Sync sync firewall rules with the cloud. func (fr *FirewallRules) Sync(nodePorts []int64, nodeNames []string) error { + glog.V(4).Infof("Sync(%v, %v)", nodePorts, nodeNames) if len(nodePorts) == 0 { return fr.Shutdown() } @@ -99,7 +100,7 @@ func (fr *FirewallRules) Sync(nodePorts []int64, nodeNames []string) error { // Shutdown shuts down this firewall rules manager. func (fr *FirewallRules) Shutdown() error { name := fr.namer.FirewallRule() - glog.Infof("Deleting firewall %v", name) + glog.V(0).Infof("Deleting firewall %v", name) return fr.deleteFirewall(name) } From f670858c8ce479ba77ffa12cbedfb04913b8380d Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Mon, 8 Jan 2018 17:35:17 -0800 Subject: [PATCH 02/10] Move node specific sync out into its own controller Also remove more state from LBC. --- cmd/glbc/main.go | 2 +- pkg/context/context.go | 13 +++-- pkg/controller/controller.go | 62 +++++++-------------- pkg/controller/node.go | 72 +++++++++++++++++++++++++ pkg/controller/translator/translator.go | 3 +- pkg/controller/utils.go | 18 +++++++ 6 files changed, 121 insertions(+), 49 deletions(-) create mode 100644 pkg/controller/node.go diff --git a/cmd/glbc/main.go b/cmd/glbc/main.go index d6ecf675c8..76a7ee4c2b 100644 --- a/cmd/glbc/main.go +++ b/cmd/glbc/main.go @@ -28,7 +28,7 @@ import ( "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/controller" - neg "k8s.io/ingress-gce/pkg/networkendpointgroup" + neg "k8s.io/ingress-gce/pkg/neg" "k8s.io/ingress-gce/cmd/glbc/app" "k8s.io/ingress-gce/pkg/flags" diff --git a/pkg/context/context.go b/pkg/context/context.go index 038036aa64..bde24bb613 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -36,14 +36,17 @@ type ControllerContext struct { // NewControllerContext returns a new shared set of informers. func NewControllerContext(kubeClient kubernetes.Interface, namespace string, resyncPeriod time.Duration, enableEndpointsInformer bool) *ControllerContext { + newIndexer := func() cache.Indexers { + return cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc} + } context := &ControllerContext{ - IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), - ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), - PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), - NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}), + IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, newIndexer()), + ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, newIndexer()), + PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, newIndexer()), + NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, newIndexer()), } if enableEndpointsInformer { - context.EndpointInformer = informerv1.NewEndpointsInformer(kubeClient, namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + context.EndpointInformer = informerv1.NewEndpointsInformer(kubeClient, namespace, resyncPeriod, newIndexer()) } return context } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 59d3c19bca..52e170b833 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -43,6 +43,7 @@ import ( ) const ( + // DefaultFirewallName is the default firewall name. DefaultFirewallName = "" // Frequency to poll on local stores to sync. storeSyncPollPeriod = 5 * time.Second @@ -58,17 +59,17 @@ var ( // from the loadbalancer, via loadBalancerConfig. type LoadBalancerController struct { client kubernetes.Interface + ctx *context.ControllerContext ingLister StoreToIngressLister nodeLister cache.Indexer - svcLister cache.Indexer - podLister cache.Indexer + nodes *NodeController // endpoint lister is needed when translating service target port to real endpoint target ports. endpointLister StoreToEndpointLister + // TODO: Watch secrets CloudClusterManager *ClusterManager recorder record.EventRecorder - nodeQueue utils.TaskQueue ingQueue utils.TaskQueue Translator *translator.GCE stopCh chan struct{} @@ -105,13 +106,12 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan stru apiv1.EventSource{Component: "loadbalancer-controller"}), negEnabled: negEnabled, } - lbc.nodeQueue = utils.NewPeriodicTaskQueue("nodes", lbc.syncNodes) lbc.ingQueue = utils.NewPeriodicTaskQueue("ingresses", lbc.sync) lbc.hasSynced = hasSyncedFromContext(ctx, negEnabled) lbc.ingLister.Store = ctx.IngressInformer.GetStore() - lbc.svcLister = ctx.ServiceInformer.GetIndexer() - lbc.podLister = ctx.PodInformer.GetIndexer() lbc.nodeLister = ctx.NodeInformer.GetIndexer() + lbc.nodes = NewNodeController(ctx, clusterManager) + if negEnabled { lbc.endpointLister.Indexer = ctx.EndpointInformer.GetIndexer() } @@ -159,15 +159,18 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan stru // Ingress deletes matter, service deletes don't. }) - // node event handler - ctx.NodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: lbc.nodeQueue.Enqueue, - DeleteFunc: lbc.nodeQueue.Enqueue, - // Nodes are updated every 10s and we don't care, so no update handler. - }) - - lbc.Translator = translator.New(lbc.recorder, lbc.CloudClusterManager, lbc.svcLister, lbc.nodeLister, lbc.podLister, lbc.endpointLister, lbc.negEnabled) + var endpointIndexer cache.Indexer + if ctx.EndpointInformer != nil { + endpointIndexer = ctx.EndpointInformer.GetIndexer() + } + lbc.Translator = translator.New(lbc.recorder, lbc.CloudClusterManager, + ctx.ServiceInformer.GetIndexer(), + ctx.NodeInformer.GetIndexer(), + ctx.PodInformer.GetIndexer(), + endpointIndexer, + negEnabled) lbc.tlsLoader = &tls.TLSCertsFromSecretsLoader{Client: lbc.client} + glog.V(3).Infof("Created new loadbalancer controller") return &lbc, nil @@ -193,7 +196,8 @@ func (lbc *LoadBalancerController) enqueueIngressForService(obj interface{}) { func (lbc *LoadBalancerController) Run() { glog.Infof("Starting loadbalancer controller") go lbc.ingQueue.Run(time.Second, lbc.stopCh) - go lbc.nodeQueue.Run(time.Second, lbc.stopCh) + lbc.nodes.Run(lbc.stopCh) + <-lbc.stopCh glog.Infof("Shutting down Loadbalancer Controller") } @@ -210,7 +214,7 @@ func (lbc *LoadBalancerController) Stop(deleteAll bool) error { close(lbc.stopCh) glog.Infof("Shutting down controller queues.") lbc.ingQueue.Shutdown() - lbc.nodeQueue.Shutdown() + lbc.nodes.Shutdown() lbc.shutdown = true } @@ -418,32 +422,6 @@ func (lbc *LoadBalancerController) toRuntimeInfo(ingList extensions.IngressList) return lbs, nil } -// syncNodes manages the syncing of kubernetes nodes to gce instance groups. -// The instancegroups are referenced by loadbalancer backends. -func (lbc *LoadBalancerController) syncNodes(key string) error { - nodeNames, err := getReadyNodeNames(listers.NewNodeLister(lbc.nodeLister)) - if err != nil { - return err - } - return lbc.CloudClusterManager.instancePool.Sync(nodeNames) -} - -// getReadyNodeNames returns names of schedulable, ready nodes from the node lister. -func getReadyNodeNames(lister listers.NodeLister) ([]string, error) { - nodeNames := []string{} - nodes, err := lister.ListWithPredicate(utils.NodeIsReady) - if err != nil { - return nodeNames, err - } - for _, n := range nodes { - if n.Spec.Unschedulable { - continue - } - nodeNames = append(nodeNames, n.Name) - } - return nodeNames, nil -} - func hasSyncedFromContext(ctx *context.ControllerContext, negEnabled bool) func() bool { // Wait for all resources to be sync'd to avoid performing actions while // the controller is still initializing state. diff --git a/pkg/controller/node.go b/pkg/controller/node.go new file mode 100644 index 0000000000..cb74f4ac75 --- /dev/null +++ b/pkg/controller/node.go @@ -0,0 +1,72 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "time" + + listers "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/ingress-gce/pkg/context" + "k8s.io/ingress-gce/pkg/utils" +) + +const ( + nodeResyncPeriod = 1 * time.Second +) + +// NodeController synchronizes the state of the nodes to the unmanaged instance +// groups. +type NodeController struct { + lister cache.Indexer + queue utils.TaskQueue + cm *ClusterManager +} + +// NewNodeController returns a new node update controller. +func NewNodeController(ctx *context.ControllerContext, cm *ClusterManager) *NodeController { + c := &NodeController{ + lister: ctx.NodeInformer.GetIndexer(), + cm: cm, + } + c.queue = utils.NewPeriodicTaskQueue("nodes", c.sync) + + ctx.NodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.queue.Enqueue, + DeleteFunc: c.queue.Enqueue, + }) + + return c +} + +// Run a go routine to process updates for the controller. +func (c *NodeController) Run(stopCh chan struct{}) { + go c.queue.Run(nodeResyncPeriod, stopCh) +} + +// Run a go routine to process updates for the controller. +func (c *NodeController) Shutdown() { + c.queue.Shutdown() +} + +func (c *NodeController) sync(key string) error { + nodeNames, err := getReadyNodeNames(listers.NewNodeLister(c.lister)) + if err != nil { + return err + } + return c.cm.instancePool.Sync(nodeNames) +} diff --git a/pkg/controller/translator/translator.go b/pkg/controller/translator/translator.go index 05ee41323d..d8a00e3f35 100644 --- a/pkg/controller/translator/translator.go +++ b/pkg/controller/translator/translator.go @@ -342,9 +342,10 @@ func (t *GCE) GatherFirewallPorts(svcPorts []backends.ServicePort, includeDefaul if includeDefaultBackend { svcPorts = append(svcPorts, *t.bi.DefaultBackendNodePort()) } + portMap := map[int64]bool{} for _, p := range svcPorts { - if p.NEGEnabled { + if t.negEnabled && p.NEGEnabled { // For NEG backend, need to open firewall to all endpoint target ports // TODO(mixia): refactor firewall syncing into a separate go routine with different trigger. // With NEG, endpoint changes may cause firewall ports to be different if user specifies inconsistent backends. diff --git a/pkg/controller/utils.go b/pkg/controller/utils.go index 018bbe3f5f..1e034f4463 100644 --- a/pkg/controller/utils.go +++ b/pkg/controller/utils.go @@ -28,11 +28,13 @@ import ( api_v1 "k8s.io/api/core/v1" extensions "k8s.io/api/extensions/v1beta1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/backends" "k8s.io/ingress-gce/pkg/flags" + "k8s.io/ingress-gce/pkg/utils" ) // isGCEIngress returns true if the Ingress matches the class managed by this @@ -206,3 +208,19 @@ func uniq(nodePorts []backends.ServicePort) []backends.ServicePort { } return nodePorts } + +// getReadyNodeNames returns names of schedulable, ready nodes from the node lister. +func getReadyNodeNames(lister listers.NodeLister) ([]string, error) { + nodeNames := []string{} + nodes, err := lister.ListWithPredicate(utils.NodeIsReady) + if err != nil { + return nodeNames, err + } + for _, n := range nodes { + if n.Spec.Unschedulable { + continue + } + nodeNames = append(nodeNames, n.Name) + } + return nodeNames, nil +} From 7546cab27e0b2bef0980194d7f1d3f602a5ad41d Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Tue, 9 Jan 2018 23:49:27 -0800 Subject: [PATCH 03/10] Rename "networkendpointgroup" to "neg" --- pkg/backends/backends_test.go | 8 ++++---- pkg/controller/fakes.go | 4 ++-- pkg/loadbalancers/loadbalancers_test.go | 4 ++-- pkg/{networkendpointgroup => neg}/controller.go | 4 ++-- pkg/{networkendpointgroup => neg}/controller_test.go | 2 +- pkg/{networkendpointgroup => neg}/fakes.go | 2 +- pkg/{networkendpointgroup => neg}/interfaces.go | 2 +- pkg/{networkendpointgroup => neg}/manager.go | 2 +- pkg/{networkendpointgroup => neg}/manager_test.go | 2 +- pkg/{networkendpointgroup => neg}/syncer.go | 2 +- pkg/{networkendpointgroup => neg}/syncer_test.go | 2 +- 11 files changed, 17 insertions(+), 17 deletions(-) rename pkg/{networkendpointgroup => neg}/controller.go (99%) rename pkg/{networkendpointgroup => neg}/controller_test.go (99%) rename pkg/{networkendpointgroup => neg}/fakes.go (99%) rename pkg/{networkendpointgroup => neg}/interfaces.go (99%) rename pkg/{networkendpointgroup => neg}/manager.go (99%) rename pkg/{networkendpointgroup => neg}/manager_test.go (99%) rename pkg/{networkendpointgroup => neg}/syncer.go (99%) rename pkg/{networkendpointgroup => neg}/syncer_test.go (99%) diff --git a/pkg/backends/backends_test.go b/pkg/backends/backends_test.go index 9aa56e18c7..4d6d308073 100644 --- a/pkg/backends/backends_test.go +++ b/pkg/backends/backends_test.go @@ -33,7 +33,7 @@ import ( "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/healthchecks" "k8s.io/ingress-gce/pkg/instances" - "k8s.io/ingress-gce/pkg/networkendpointgroup" + "k8s.io/ingress-gce/pkg/neg" "k8s.io/ingress-gce/pkg/storage" "k8s.io/ingress-gce/pkg/utils" ) @@ -58,7 +58,7 @@ var ( ) func newTestJig(f BackendServices, fakeIGs instances.InstanceGroups, syncWithCloud bool) (*Backends, healthchecks.HealthCheckProvider) { - negGetter := networkendpointgroup.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network") + negGetter := neg.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network") nodePool := instances.NewNodePool(fakeIGs, defaultNamer) nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}}) healthCheckProvider := healthchecks.NewFakeHealthCheckProvider() @@ -333,7 +333,7 @@ func TestBackendPoolSync(t *testing.T) { func TestBackendPoolDeleteLegacyHealthChecks(t *testing.T) { f := NewFakeBackendServices(noOpErrFunc) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer) - negGetter := networkendpointgroup.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network") + negGetter := neg.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network") nodePool := instances.NewNodePool(fakeIGs, defaultNamer) nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}}) hcp := healthchecks.NewFakeHealthCheckProvider() @@ -507,7 +507,7 @@ func TestLinkBackendServiceToNEG(t *testing.T) { namespace, name, port := "ns", "name", "port" f := NewFakeBackendServices(noOpErrFunc) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), defaultNamer) - fakeNEG := networkendpointgroup.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network") + fakeNEG := neg.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network") nodePool := instances.NewNodePool(fakeIGs, defaultNamer) nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}}) hcp := healthchecks.NewFakeHealthCheckProvider() diff --git a/pkg/controller/fakes.go b/pkg/controller/fakes.go index 9032aaeb5d..cb79027dc6 100644 --- a/pkg/controller/fakes.go +++ b/pkg/controller/fakes.go @@ -27,7 +27,7 @@ import ( "k8s.io/ingress-gce/pkg/healthchecks" "k8s.io/ingress-gce/pkg/instances" "k8s.io/ingress-gce/pkg/loadbalancers" - "k8s.io/ingress-gce/pkg/networkendpointgroup" + "k8s.io/ingress-gce/pkg/neg" "k8s.io/ingress-gce/pkg/utils" ) @@ -52,7 +52,7 @@ func NewFakeClusterManager(clusterName, firewallName string) *fakeClusterManager fakeBackends := backends.NewFakeBackendServices(func(op int, be *compute.BackendService) error { return nil }) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), namer) fakeHCP := healthchecks.NewFakeHealthCheckProvider() - fakeNEG := networkendpointgroup.NewFakeNetworkEndpointGroupCloud("test-subnet", "test-network") + fakeNEG := neg.NewFakeNetworkEndpointGroupCloud("test-subnet", "test-network") nodePool := instances.NewNodePool(fakeIGs, namer) nodePool.Init(&instances.FakeZoneLister{Zones: []string{"zone-a"}}) diff --git a/pkg/loadbalancers/loadbalancers_test.go b/pkg/loadbalancers/loadbalancers_test.go index d648a77e09..ff5450b3a9 100644 --- a/pkg/loadbalancers/loadbalancers_test.go +++ b/pkg/loadbalancers/loadbalancers_test.go @@ -27,7 +27,7 @@ import ( "k8s.io/ingress-gce/pkg/backends" "k8s.io/ingress-gce/pkg/healthchecks" "k8s.io/ingress-gce/pkg/instances" - "k8s.io/ingress-gce/pkg/networkendpointgroup" + "k8s.io/ingress-gce/pkg/neg" "k8s.io/ingress-gce/pkg/utils" ) @@ -43,7 +43,7 @@ func newFakeLoadBalancerPool(f LoadBalancers, t *testing.T, namer *utils.Namer) fakeBackends := backends.NewFakeBackendServices(func(op int, be *compute.BackendService) error { return nil }) fakeIGs := instances.NewFakeInstanceGroups(sets.NewString(), namer) fakeHCP := healthchecks.NewFakeHealthCheckProvider() - fakeNEG := networkendpointgroup.NewFakeNetworkEndpointGroupCloud("test-subnet", "test-network") + fakeNEG := neg.NewFakeNetworkEndpointGroupCloud("test-subnet", "test-network") healthChecker := healthchecks.NewHealthChecker(fakeHCP, "/", namer) nodePool := instances.NewNodePool(fakeIGs, namer) nodePool.Init(&instances.FakeZoneLister{Zones: []string{defaultZone}}) diff --git a/pkg/networkendpointgroup/controller.go b/pkg/neg/controller.go similarity index 99% rename from pkg/networkendpointgroup/controller.go rename to pkg/neg/controller.go index a535dbb2b8..cd23e127d9 100644 --- a/pkg/networkendpointgroup/controller.go +++ b/pkg/neg/controller.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package networkendpointgroup +package neg import ( "fmt" @@ -77,7 +77,7 @@ func NewController( Interface: kubeClient.Core().Events(""), }) recorder := eventBroadcaster.NewRecorder(scheme.Scheme, - apiv1.EventSource{Component: "networkendpointgroup-controller"}) + apiv1.EventSource{Component: "neg-controller"}) manager := newSyncerManager(namer, recorder, diff --git a/pkg/networkendpointgroup/controller_test.go b/pkg/neg/controller_test.go similarity index 99% rename from pkg/networkendpointgroup/controller_test.go rename to pkg/neg/controller_test.go index b26830245e..0db5050669 100644 --- a/pkg/networkendpointgroup/controller_test.go +++ b/pkg/neg/controller_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package networkendpointgroup +package neg import ( "testing" diff --git a/pkg/networkendpointgroup/fakes.go b/pkg/neg/fakes.go similarity index 99% rename from pkg/networkendpointgroup/fakes.go rename to pkg/neg/fakes.go index f1d6d20b44..27ee717c3d 100644 --- a/pkg/networkendpointgroup/fakes.go +++ b/pkg/neg/fakes.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package networkendpointgroup +package neg import ( "fmt" diff --git a/pkg/networkendpointgroup/interfaces.go b/pkg/neg/interfaces.go similarity index 99% rename from pkg/networkendpointgroup/interfaces.go rename to pkg/neg/interfaces.go index 177ce9e58b..6e43d6729d 100644 --- a/pkg/networkendpointgroup/interfaces.go +++ b/pkg/neg/interfaces.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package networkendpointgroup +package neg import ( computealpha "google.golang.org/api/compute/v0.alpha" diff --git a/pkg/networkendpointgroup/manager.go b/pkg/neg/manager.go similarity index 99% rename from pkg/networkendpointgroup/manager.go rename to pkg/neg/manager.go index 92e80a4ccf..d2d7f18e6a 100644 --- a/pkg/networkendpointgroup/manager.go +++ b/pkg/neg/manager.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package networkendpointgroup +package neg import ( "fmt" diff --git a/pkg/networkendpointgroup/manager_test.go b/pkg/neg/manager_test.go similarity index 99% rename from pkg/networkendpointgroup/manager_test.go rename to pkg/neg/manager_test.go index 3151cdd35d..e488552af6 100644 --- a/pkg/networkendpointgroup/manager_test.go +++ b/pkg/neg/manager_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package networkendpointgroup +package neg import ( "testing" diff --git a/pkg/networkendpointgroup/syncer.go b/pkg/neg/syncer.go similarity index 99% rename from pkg/networkendpointgroup/syncer.go rename to pkg/neg/syncer.go index 2b5afcc684..c6058240ac 100644 --- a/pkg/networkendpointgroup/syncer.go +++ b/pkg/neg/syncer.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package networkendpointgroup +package neg import ( "fmt" diff --git a/pkg/networkendpointgroup/syncer_test.go b/pkg/neg/syncer_test.go similarity index 99% rename from pkg/networkendpointgroup/syncer_test.go rename to pkg/neg/syncer_test.go index f294e71c87..3e32afa0b2 100644 --- a/pkg/networkendpointgroup/syncer_test.go +++ b/pkg/neg/syncer_test.go @@ -1,4 +1,4 @@ -package networkendpointgroup +package neg import ( "reflect" From 31c3f3d2a1d011a584cd519f056558bb5ed9031a Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Wed, 10 Jan 2018 11:47:27 -0800 Subject: [PATCH 04/10] Move HasSynced to be internal to the controller context --- pkg/context/context.go | 20 ++++++++++++++++++++ pkg/controller/controller.go | 31 +++---------------------------- 2 files changed, 23 insertions(+), 28 deletions(-) diff --git a/pkg/context/context.go b/pkg/context/context.go index bde24bb613..9dc57beedf 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -51,6 +51,26 @@ func NewControllerContext(kubeClient kubernetes.Interface, namespace string, res return context } +// HasSynced returns true if all relevant informers has been synced. +func (ctx *ControllerContext) HasSynced() bool { + + funcs := []func() bool{ + ctx.IngressInformer.HasSynced, + ctx.ServiceInformer.HasSynced, + ctx.PodInformer.HasSynced, + ctx.NodeInformer.HasSynced, + } + if ctx.EndpointInformer != nil { + funcs = append(funcs, ctx.EndpointInformer.HasSynced) + } + for _, f := range funcs { + if !f() { + return false + } + } + return true +} + // Start all of the informers. func (ctx *ControllerContext) Start(stopCh chan struct{}) { go ctx.IngressInformer.Run(stopCh) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 52e170b833..b97b8d7fca 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -107,7 +107,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan stru negEnabled: negEnabled, } lbc.ingQueue = utils.NewPeriodicTaskQueue("ingresses", lbc.sync) - lbc.hasSynced = hasSyncedFromContext(ctx, negEnabled) + lbc.hasSynced = ctx.HasSynced lbc.ingLister.Store = ctx.IngressInformer.GetStore() lbc.nodeLister = ctx.NodeInformer.GetIndexer() lbc.nodes = NewNodeController(ctx, clusterManager) @@ -303,6 +303,7 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { if !ingExists { return syncError } + ing := *obj.(*extensions.Ingress) if isGCEMultiClusterIngress(&ing) { // Add instance group names as annotation on the ingress. @@ -312,10 +313,7 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { if err = setInstanceGroupsAnnotation(ing.Annotations, igs); err != nil { return err } - if err = updateAnnotations(lbc.client, ing.Name, ing.Namespace, ing.Annotations); err != nil { - return err - } - return nil + return updateAnnotations(lbc.client, ing.Name, ing.Namespace, ing.Annotations) } if lbc.negEnabled { @@ -422,29 +420,6 @@ func (lbc *LoadBalancerController) toRuntimeInfo(ingList extensions.IngressList) return lbs, nil } -func hasSyncedFromContext(ctx *context.ControllerContext, negEnabled bool) func() bool { - // Wait for all resources to be sync'd to avoid performing actions while - // the controller is still initializing state. - var funcs []func() bool - funcs = append(funcs, []func() bool{ - ctx.IngressInformer.HasSynced, - ctx.ServiceInformer.HasSynced, - ctx.PodInformer.HasSynced, - ctx.NodeInformer.HasSynced, - }...) - if negEnabled { - funcs = append(funcs, ctx.EndpointInformer.HasSynced) - } - return func() bool { - for _, f := range funcs { - if !f() { - return false - } - } - return true - } -} - func updateAnnotations(client kubernetes.Interface, name, namespace string, annotations map[string]string) error { ingClient := client.Extensions().Ingresses(namespace) currIng, err := ingClient.Get(name, metav1.GetOptions{}) From eb5b097d1b4699271897a1b612ba085b675c3bc0 Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Thu, 11 Jan 2018 23:41:05 -0800 Subject: [PATCH 05/10] Split off L7s and L7 into their own files --- pkg/loadbalancers/{loadbalancers.go => l7.go} | 180 --------------- pkg/loadbalancers/l7s.go | 210 ++++++++++++++++++ 2 files changed, 210 insertions(+), 180 deletions(-) rename pkg/loadbalancers/{loadbalancers.go => l7.go} (84%) create mode 100644 pkg/loadbalancers/l7s.go diff --git a/pkg/loadbalancers/loadbalancers.go b/pkg/loadbalancers/l7.go similarity index 84% rename from pkg/loadbalancers/loadbalancers.go rename to pkg/loadbalancers/l7.go index 751c5b54de..8909751524 100644 --- a/pkg/loadbalancers/loadbalancers.go +++ b/pkg/loadbalancers/l7.go @@ -22,7 +22,6 @@ import ( "encoding/json" "fmt" "net/http" - "reflect" "strings" "github.com/golang/glog" @@ -32,12 +31,10 @@ import ( "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/backends" - "k8s.io/ingress-gce/pkg/storage" "k8s.io/ingress-gce/pkg/utils" ) const ( - // The gce api uses the name of a path rule to match a host rule. hostRulePrefix = "host" @@ -53,183 +50,6 @@ const ( httpsDefaultPortRange = "443-443" ) -// L7s implements LoadBalancerPool. -type L7s struct { - cloud LoadBalancers - snapshotter storage.Snapshotter - // TODO: Remove this field and always ask the BackendPool using the NodePort. - glbcDefaultBackend *compute.BackendService - defaultBackendPool backends.BackendPool - defaultBackendNodePort backends.ServicePort - namer *utils.Namer -} - -// GLBCDefaultBackend returns the BackendService used when no path -// rules match. -func (l *L7s) GLBCDefaultBackend() *compute.BackendService { - return l.glbcDefaultBackend -} - -// Namer returns the namer associated with the L7s. -func (l *L7s) Namer() *utils.Namer { - return l.namer -} - -// NewLoadBalancerPool returns a new loadbalancer pool. -// - cloud: implements LoadBalancers. Used to sync L7 loadbalancer resources -// with the cloud. -// - defaultBackendPool: a BackendPool used to manage the GCE BackendService for -// the default backend. -// - defaultBackendNodePort: The nodePort of the Kubernetes service representing -// the default backend. -func NewLoadBalancerPool( - cloud LoadBalancers, - defaultBackendPool backends.BackendPool, - defaultBackendNodePort backends.ServicePort, namer *utils.Namer) LoadBalancerPool { - return &L7s{cloud, storage.NewInMemoryPool(), nil, defaultBackendPool, defaultBackendNodePort, namer} -} - -func (l *L7s) create(ri *L7RuntimeInfo) (*L7, error) { - if l.glbcDefaultBackend == nil { - glog.Warningf("Creating l7 without a default backend") - } - return &L7{ - runtimeInfo: ri, - Name: l.namer.LoadBalancer(ri.Name), - cloud: l.cloud, - glbcDefaultBackend: l.glbcDefaultBackend, - namer: l.namer, - sslCert: nil, - }, nil -} - -// Get returns the loadbalancer by name. -func (l *L7s) Get(name string) (*L7, error) { - name = l.namer.LoadBalancer(name) - lb, exists := l.snapshotter.Get(name) - if !exists { - return nil, fmt.Errorf("loadbalancer %v not in pool", name) - } - return lb.(*L7), nil -} - -// Add gets or creates a loadbalancer. -// If the loadbalancer already exists, it checks that its edges are valid. -func (l *L7s) Add(ri *L7RuntimeInfo) (err error) { - name := l.namer.LoadBalancer(ri.Name) - - lb, _ := l.Get(name) - if lb == nil { - glog.Infof("Creating l7 %v", name) - lb, err = l.create(ri) - if err != nil { - return err - } - } else { - if !reflect.DeepEqual(lb.runtimeInfo, ri) { - glog.Infof("LB %v runtime info changed, old %+v new %+v", lb.Name, lb.runtimeInfo, ri) - lb.runtimeInfo = ri - } - } - // Add the lb to the pool, in case we create an UrlMap but run out - // of quota in creating the ForwardingRule we still need to cleanup - // the UrlMap during GC. - defer l.snapshotter.Add(name, lb) - - // Why edge hop for the create? - // The loadbalancer is a fictitious resource, it doesn't exist in gce. To - // make it exist we need to create a collection of gce resources, done - // through the edge hop. - if err := lb.edgeHop(); err != nil { - return err - } - - return nil -} - -// Delete deletes a loadbalancer by name. -func (l *L7s) Delete(name string) error { - name = l.namer.LoadBalancer(name) - lb, err := l.Get(name) - if err != nil { - return err - } - glog.Infof("Deleting lb %v", name) - if err := lb.Cleanup(); err != nil { - return err - } - l.snapshotter.Delete(name) - return nil -} - -// Sync loadbalancers with the given runtime info from the controller. -func (l *L7s) Sync(lbs []*L7RuntimeInfo) error { - glog.V(3).Infof("Syncing loadbalancers %v", lbs) - - if len(lbs) != 0 { - // Lazily create a default backend so we don't tax users who don't care - // about Ingress by consuming 1 of their 3 GCE BackendServices. This - // BackendService is GC'd when there are no more Ingresses. - if err := l.defaultBackendPool.Ensure([]backends.ServicePort{l.defaultBackendNodePort}, nil); err != nil { - return err - } - defaultBackend, err := l.defaultBackendPool.Get(l.defaultBackendNodePort.Port) - if err != nil { - return err - } - l.glbcDefaultBackend = defaultBackend - } - // create new loadbalancers, validate existing - for _, ri := range lbs { - if err := l.Add(ri); err != nil { - return err - } - } - return nil -} - -// GC garbage collects loadbalancers not in the input list. -func (l *L7s) GC(names []string) error { - knownLoadBalancers := sets.NewString() - for _, n := range names { - knownLoadBalancers.Insert(l.namer.LoadBalancer(n)) - } - pool := l.snapshotter.Snapshot() - - // Delete unknown loadbalancers - for name := range pool { - if knownLoadBalancers.Has(name) { - continue - } - glog.V(3).Infof("GCing loadbalancer %v", name) - if err := l.Delete(name); err != nil { - return err - } - } - // Tear down the default backend when there are no more loadbalancers. - // This needs to happen after we've deleted all url-maps that might be - // using it. - if len(names) == 0 { - if err := l.defaultBackendPool.Delete(l.defaultBackendNodePort.Port); err != nil { - return err - } - l.glbcDefaultBackend = nil - } - return nil -} - -// Shutdown logs whether or not the pool is empty. -func (l *L7s) Shutdown() error { - if err := l.GC([]string{}); err != nil { - return err - } - if err := l.defaultBackendPool.Shutdown(); err != nil { - return err - } - glog.Infof("Loadbalancer pool shutdown.") - return nil -} - // TLSCerts encapsulates .pem encoded TLS information. type TLSCerts struct { // Key is private key. diff --git a/pkg/loadbalancers/l7s.go b/pkg/loadbalancers/l7s.go new file mode 100644 index 0000000000..f1a8fc4b35 --- /dev/null +++ b/pkg/loadbalancers/l7s.go @@ -0,0 +1,210 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package loadbalancers + +import ( + "fmt" + "reflect" + + "github.com/golang/glog" + + compute "google.golang.org/api/compute/v1" + "k8s.io/apimachinery/pkg/util/sets" + + "k8s.io/ingress-gce/pkg/backends" + "k8s.io/ingress-gce/pkg/storage" + "k8s.io/ingress-gce/pkg/utils" +) + +// L7s implements LoadBalancerPool. +type L7s struct { + cloud LoadBalancers + snapshotter storage.Snapshotter + // TODO: Remove this field and always ask the BackendPool using the NodePort. + glbcDefaultBackend *compute.BackendService + defaultBackendPool backends.BackendPool + defaultBackendNodePort backends.ServicePort + namer *utils.Namer +} + +// GLBCDefaultBackend returns the BackendService used when no path +// rules match. +func (l *L7s) GLBCDefaultBackend() *compute.BackendService { + return l.glbcDefaultBackend +} + +// Namer returns the namer associated with the L7s. +func (l *L7s) Namer() *utils.Namer { + return l.namer +} + +// NewLoadBalancerPool returns a new loadbalancer pool. +// - cloud: implements LoadBalancers. Used to sync L7 loadbalancer resources +// with the cloud. +// - defaultBackendPool: a BackendPool used to manage the GCE BackendService for +// the default backend. +// - defaultBackendNodePort: The nodePort of the Kubernetes service representing +// the default backend. +func NewLoadBalancerPool( + cloud LoadBalancers, + defaultBackendPool backends.BackendPool, + defaultBackendNodePort backends.ServicePort, namer *utils.Namer) LoadBalancerPool { + return &L7s{cloud, storage.NewInMemoryPool(), nil, defaultBackendPool, defaultBackendNodePort, namer} +} + +func (l *L7s) create(ri *L7RuntimeInfo) (*L7, error) { + if l.glbcDefaultBackend == nil { + glog.Warningf("Creating l7 without a default backend") + } + return &L7{ + runtimeInfo: ri, + Name: l.namer.LoadBalancer(ri.Name), + cloud: l.cloud, + glbcDefaultBackend: l.glbcDefaultBackend, + namer: l.namer, + sslCert: nil, + }, nil +} + +// Get returns the loadbalancer by name. +func (l *L7s) Get(name string) (*L7, error) { + name = l.namer.LoadBalancer(name) + lb, exists := l.snapshotter.Get(name) + if !exists { + return nil, fmt.Errorf("loadbalancer %v not in pool", name) + } + return lb.(*L7), nil +} + +// Add gets or creates a loadbalancer. +// If the loadbalancer already exists, it checks that its edges are valid. +func (l *L7s) Add(ri *L7RuntimeInfo) (err error) { + name := l.namer.LoadBalancer(ri.Name) + + lb, _ := l.Get(name) + if lb == nil { + glog.Infof("Creating l7 %v", name) + lb, err = l.create(ri) + if err != nil { + return err + } + } else { + if !reflect.DeepEqual(lb.runtimeInfo, ri) { + glog.Infof("LB %v runtime info changed, old %+v new %+v", lb.Name, lb.runtimeInfo, ri) + lb.runtimeInfo = ri + } + } + // Add the lb to the pool, in case we create an UrlMap but run out + // of quota in creating the ForwardingRule we still need to cleanup + // the UrlMap during GC. + defer l.snapshotter.Add(name, lb) + + // Why edge hop for the create? + // The loadbalancer is a fictitious resource, it doesn't exist in gce. To + // make it exist we need to create a collection of gce resources, done + // through the edge hop. + if err := lb.edgeHop(); err != nil { + return err + } + + return nil +} + +// Delete deletes a loadbalancer by name. +func (l *L7s) Delete(name string) error { + name = l.namer.LoadBalancer(name) + lb, err := l.Get(name) + if err != nil { + return err + } + glog.Infof("Deleting lb %v", name) + if err := lb.Cleanup(); err != nil { + return err + } + l.snapshotter.Delete(name) + return nil +} + +// Sync loadbalancers with the given runtime info from the controller. +func (l *L7s) Sync(lbs []*L7RuntimeInfo) error { + glog.V(3).Infof("Syncing loadbalancers %v", lbs) + + if len(lbs) != 0 { + // Lazily create a default backend so we don't tax users who don't care + // about Ingress by consuming 1 of their 3 GCE BackendServices. This + // BackendService is GC'd when there are no more Ingresses. + if err := l.defaultBackendPool.Ensure([]backends.ServicePort{l.defaultBackendNodePort}, nil); err != nil { + return err + } + defaultBackend, err := l.defaultBackendPool.Get(l.defaultBackendNodePort.Port) + if err != nil { + return err + } + l.glbcDefaultBackend = defaultBackend + } + // create new loadbalancers, validate existing + for _, ri := range lbs { + if err := l.Add(ri); err != nil { + return err + } + } + return nil +} + +// GC garbage collects loadbalancers not in the input list. +func (l *L7s) GC(names []string) error { + glog.V(4).Infof("GC(%v)", names) + + knownLoadBalancers := sets.NewString() + for _, n := range names { + knownLoadBalancers.Insert(l.namer.LoadBalancer(n)) + } + pool := l.snapshotter.Snapshot() + + // Delete unknown loadbalancers + for name := range pool { + if knownLoadBalancers.Has(name) { + continue + } + glog.V(2).Infof("GCing loadbalancer %v", name) + if err := l.Delete(name); err != nil { + return err + } + } + // Tear down the default backend when there are no more loadbalancers. + // This needs to happen after we've deleted all url-maps that might be + // using it. + if len(names) == 0 { + if err := l.defaultBackendPool.Delete(l.defaultBackendNodePort.Port); err != nil { + return err + } + l.glbcDefaultBackend = nil + } + return nil +} + +// Shutdown logs whether or not the pool is empty. +func (l *L7s) Shutdown() error { + if err := l.GC([]string{}); err != nil { + return err + } + if err := l.defaultBackendPool.Shutdown(); err != nil { + return err + } + glog.Infof("Loadbalancer pool shutdown.") + return nil +} From ae56addce99e311a683fbda87f95f60ab077c9dd Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Fri, 12 Jan 2018 17:34:50 -0800 Subject: [PATCH 06/10] Move translator specific unit test into translator --- pkg/controller/controller_test.go | 4 +- pkg/controller/translator/translator_test.go | 317 +++++++++++++++++++ pkg/controller/utils_test.go | 247 --------------- 3 files changed, 319 insertions(+), 249 deletions(-) create mode 100644 pkg/controller/translator/translator_test.go diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index c3cc11cc4b..35cbcb6e86 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -176,7 +176,7 @@ func newPortManager(st, end int, namer *utils.Namer) *nodePortManager { // a nodePortManager is supplied, it also adds all backends to the service store // with a nodePort acquired through it. func addIngress(lbc *LoadBalancerController, ing *extensions.Ingress, pm *nodePortManager) { - lbc.ingLister.Store.Add(ing) + lbc.ctx.IngressInformer.GetIndexer().Add(ing) if pm == nil { return } @@ -197,7 +197,7 @@ func addIngress(lbc *LoadBalancerController, ing *extensions.Ingress, pm *nodePo } svcPort.NodePort = int32(pm.getNodePort(path.Backend.ServiceName)) svc.Spec.Ports = []api_v1.ServicePort{svcPort} - lbc.svcLister.Add(svc) + lbc.ctx.ServiceInformer.GetIndexer().Add(svc) } } } diff --git a/pkg/controller/translator/translator_test.go b/pkg/controller/translator/translator_test.go new file mode 100644 index 0000000000..0a607935e1 --- /dev/null +++ b/pkg/controller/translator/translator_test.go @@ -0,0 +1,317 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package translator + +import ( + "fmt" + "testing" + "time" + + "github.com/golang/glog" + compute "google.golang.org/api/compute/v1" + + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" + unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/record" + + "k8s.io/ingress-gce/pkg/annotations" + "k8s.io/ingress-gce/pkg/backends" + "k8s.io/ingress-gce/pkg/context" +) + +var ( + firstPodCreationTime = time.Date(2006, 01, 02, 15, 04, 05, 0, time.UTC) +) + +type fakeBackendInfo struct { +} + +func (bi *fakeBackendInfo) BackendServiceForPort(port int64) (*compute.BackendService, error) { + panic(fmt.Errorf("should not be used")) + return nil, nil +} + +func (bi *fakeBackendInfo) DefaultBackendNodePort() *backends.ServicePort { + return &backends.ServicePort{Port: 30000, Protocol: annotations.ProtocolHTTP} +} + +func gceForTest(negEnabled bool) *GCE { + client := fake.NewSimpleClientset() + broadcaster := record.NewBroadcaster() + broadcaster.StartLogging(glog.Infof) + broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{ + Interface: client.Core().Events(""), + }) + + ctx := context.NewControllerContext(client, apiv1.NamespaceAll, 1*time.Second, negEnabled) + gce := &GCE{ + recorder: broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "loadbalancer-controller"}), + bi: &fakeBackendInfo{}, + svcLister: ctx.ServiceInformer.GetIndexer(), + nodeLister: ctx.NodeInformer.GetIndexer(), + podLister: ctx.PodInformer.GetIndexer(), + negEnabled: negEnabled, + } + if ctx.EndpointInformer != nil { + gce.endpointLister = ctx.EndpointInformer.GetIndexer() + } + return gce +} + +func TestGetProbe(t *testing.T) { + translator := gceForTest(false) + nodePortToHealthCheck := map[backends.ServicePort]string{ + {Port: 3001, Protocol: annotations.ProtocolHTTP}: "/healthz", + {Port: 3002, Protocol: annotations.ProtocolHTTPS}: "/foo", + } + for _, svc := range makeServices(nodePortToHealthCheck, apiv1.NamespaceDefault) { + translator.svcLister.Add(svc) + } + for _, pod := range makePods(nodePortToHealthCheck, apiv1.NamespaceDefault) { + translator.podLister.Add(pod) + } + + for p, exp := range nodePortToHealthCheck { + got, err := translator.GetProbe(p) + if err != nil || got == nil { + t.Errorf("Failed to get probe for node port %v: %v", p, err) + } else if getProbePath(got) != exp { + t.Errorf("Wrong path for node port %v, got %v expected %v", p, getProbePath(got), exp) + } + } +} + +func TestGetProbeNamedPort(t *testing.T) { + translator := gceForTest(false) + nodePortToHealthCheck := map[backends.ServicePort]string{ + {Port: 3001, Protocol: annotations.ProtocolHTTP}: "/healthz", + } + for _, svc := range makeServices(nodePortToHealthCheck, apiv1.NamespaceDefault) { + translator.svcLister.Add(svc) + } + for _, pod := range makePods(nodePortToHealthCheck, apiv1.NamespaceDefault) { + pod.Spec.Containers[0].Ports[0].Name = "test" + pod.Spec.Containers[0].ReadinessProbe.Handler.HTTPGet.Port = intstr.IntOrString{Type: intstr.String, StrVal: "test"} + translator.podLister.Add(pod) + } + for p, exp := range nodePortToHealthCheck { + got, err := translator.GetProbe(p) + if err != nil || got == nil { + t.Errorf("Failed to get probe for node port %v: %v", p, err) + } else if getProbePath(got) != exp { + t.Errorf("Wrong path for node port %v, got %v expected %v", p, getProbePath(got), exp) + } + } +} + +func TestGetProbeCrossNamespace(t *testing.T) { + translator := gceForTest(false) + + firstPod := &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + // labels match those added by "addPods", but ns and health check + // path is different. If this pod was created in the same ns, it + // would become the health check. + Labels: map[string]string{"app-3001": "test"}, + Name: fmt.Sprintf("test-pod-new-ns"), + Namespace: "new-ns", + CreationTimestamp: metav1.NewTime(firstPodCreationTime.Add(-time.Duration(time.Hour))), + }, + Spec: apiv1.PodSpec{ + Containers: []apiv1.Container{ + { + Ports: []apiv1.ContainerPort{{ContainerPort: 80}}, + ReadinessProbe: &apiv1.Probe{ + Handler: apiv1.Handler{ + HTTPGet: &apiv1.HTTPGetAction{ + Scheme: apiv1.URISchemeHTTP, + Path: "/badpath", + Port: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 80, + }, + }, + }, + }, + }, + }, + }, + } + translator.podLister.Add(firstPod) + nodePortToHealthCheck := map[backends.ServicePort]string{ + {Port: 3001, Protocol: annotations.ProtocolHTTP}: "/healthz", + } + for _, svc := range makeServices(nodePortToHealthCheck, apiv1.NamespaceDefault) { + translator.svcLister.Add(svc) + } + for _, pod := range makePods(nodePortToHealthCheck, apiv1.NamespaceDefault) { + pod.Spec.Containers[0].Ports[0].Name = "test" + pod.Spec.Containers[0].ReadinessProbe.Handler.HTTPGet.Port = intstr.IntOrString{Type: intstr.String, StrVal: "test"} + translator.podLister.Add(pod) + } + + for p, exp := range nodePortToHealthCheck { + got, err := translator.GetProbe(p) + if err != nil || got == nil { + t.Errorf("Failed to get probe for node port %v: %v", p, err) + } else if getProbePath(got) != exp { + t.Errorf("Wrong path for node port %v, got %v expected %v", p, getProbePath(got), exp) + } + } +} + +func makePods(nodePortToHealthCheck map[backends.ServicePort]string, ns string) []*apiv1.Pod { + delay := 1 * time.Minute + + var pods []*apiv1.Pod + for np, u := range nodePortToHealthCheck { + pod := &apiv1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{fmt.Sprintf("app-%d", np.Port): "test"}, + Name: fmt.Sprintf("%d", np.Port), + Namespace: ns, + CreationTimestamp: metav1.NewTime(firstPodCreationTime.Add(delay)), + }, + Spec: apiv1.PodSpec{ + Containers: []apiv1.Container{ + { + Ports: []apiv1.ContainerPort{{Name: "test", ContainerPort: 80}}, + ReadinessProbe: &apiv1.Probe{ + Handler: apiv1.Handler{ + HTTPGet: &apiv1.HTTPGetAction{ + Scheme: apiv1.URIScheme(string(np.Protocol)), + Path: u, + Port: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 80, + }, + }, + }, + }, + }, + }, + }, + } + pods = append(pods, pod) + delay = time.Duration(2) * delay + } + return pods +} + +func makeServices(nodePortToHealthCheck map[backends.ServicePort]string, ns string) []*apiv1.Service { + var services []*apiv1.Service + for np, _ := range nodePortToHealthCheck { + svc := &apiv1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%d", np.Port), + Namespace: ns, + }, + Spec: apiv1.ServiceSpec{ + Selector: map[string]string{fmt.Sprintf("app-%d", np.Port): "test"}, + Ports: []apiv1.ServicePort{ + { + NodePort: int32(np.Port), + TargetPort: intstr.IntOrString{ + Type: intstr.Int, + IntVal: 80, + }, + }, + }, + }, + } + services = append(services, svc) + } + return services +} + +func getProbePath(p *apiv1.Probe) string { + return p.Handler.HTTPGet.Path +} + +func TestGatherFirewallPorts(t *testing.T) { + translator := gceForTest(true) + + ep1 := "ep1" + ep2 := "ep2" + + svcPorts := []backends.ServicePort{ + {Port: int64(30001)}, + {Port: int64(30002)}, + { + SvcName: types.NamespacedName{"ns", ep1}, + Port: int64(30003), + NEGEnabled: true, + SvcTargetPort: "80", + }, + { + SvcName: types.NamespacedName{"ns", ep2}, + Port: int64(30004), + NEGEnabled: true, + SvcTargetPort: "named-port", + }, + } + + translator.endpointLister.Add(newDefaultEndpoint(ep1)) + translator.endpointLister.Add(newDefaultEndpoint(ep2)) + + res := translator.GatherFirewallPorts(svcPorts, true) + expect := map[int64]bool{ + int64(30000): true, + int64(30001): true, + int64(30002): true, + int64(80): true, + int64(8080): true, + int64(8081): true, + } + if len(res) != len(expect) { + t.Errorf("got firewall ports == %v, want %v", res, expect) + } + for _, p := range res { + if _, ok := expect[p]; !ok { + t.Errorf("firewall port %v is missing, (got %v, want %v)", p, res, expect) + } + } +} + +func newDefaultEndpoint(name string) *apiv1.Endpoints { + return &apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: "ns"}, + Subsets: []apiv1.EndpointSubset{ + { + Ports: []apiv1.EndpointPort{ + {Name: "", Port: int32(80), Protocol: apiv1.ProtocolTCP}, + {Name: "named-port", Port: int32(8080), Protocol: apiv1.ProtocolTCP}, + }, + }, + { + Ports: []apiv1.EndpointPort{ + {Name: "named-port", Port: int32(80), Protocol: apiv1.ProtocolTCP}, + }, + }, + { + Ports: []apiv1.EndpointPort{ + {Name: "named-port", Port: int32(8081), Protocol: apiv1.ProtocolTCP}, + }, + }, + }, + } +} diff --git a/pkg/controller/utils_test.go b/pkg/controller/utils_test.go index b918c092bf..2a5de6715d 100644 --- a/pkg/controller/utils_test.go +++ b/pkg/controller/utils_test.go @@ -17,7 +17,6 @@ limitations under the License. package controller import ( - "fmt" "testing" "time" @@ -25,12 +24,9 @@ import ( api_v1 "k8s.io/api/core/v1" meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/ingress-gce/pkg/annotations" - "k8s.io/ingress-gce/pkg/backends" "k8s.io/ingress-gce/pkg/flags" ) @@ -92,152 +88,6 @@ func TestInstancesAddedToZones(t *testing.T) { } } -func TestProbeGetter(t *testing.T) { - cm := NewFakeClusterManager(flags.DefaultClusterUID, DefaultFirewallName) - lbc := newLoadBalancerController(t, cm) - - nodePortToHealthCheck := map[backends.ServicePort]string{ - {Port: 3001, Protocol: annotations.ProtocolHTTP}: "/healthz", - {Port: 3002, Protocol: annotations.ProtocolHTTPS}: "/foo", - } - addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault) - for p, exp := range nodePortToHealthCheck { - got, err := lbc.Translator.GetProbe(p) - if err != nil || got == nil { - t.Errorf("Failed to get probe for node port %v: %v", p, err) - } else if getProbePath(got) != exp { - t.Errorf("Wrong path for node port %v, got %v expected %v", p, getProbePath(got), exp) - } - } -} - -func TestProbeGetterNamedPort(t *testing.T) { - cm := NewFakeClusterManager(flags.DefaultClusterUID, DefaultFirewallName) - lbc := newLoadBalancerController(t, cm) - nodePortToHealthCheck := map[backends.ServicePort]string{ - {Port: 3001, Protocol: annotations.ProtocolHTTP}: "/healthz", - } - addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault) - for _, p := range lbc.podLister.List() { - pod := p.(*api_v1.Pod) - pod.Spec.Containers[0].Ports[0].Name = "test" - pod.Spec.Containers[0].ReadinessProbe.Handler.HTTPGet.Port = intstr.IntOrString{Type: intstr.String, StrVal: "test"} - } - for p, exp := range nodePortToHealthCheck { - got, err := lbc.Translator.GetProbe(p) - if err != nil || got == nil { - t.Errorf("Failed to get probe for node port %v: %v", p, err) - } else if getProbePath(got) != exp { - t.Errorf("Wrong path for node port %v, got %v expected %v", p, getProbePath(got), exp) - } - } - -} - -func TestProbeGetterCrossNamespace(t *testing.T) { - cm := NewFakeClusterManager(flags.DefaultClusterUID, DefaultFirewallName) - lbc := newLoadBalancerController(t, cm) - - firstPod := &api_v1.Pod{ - ObjectMeta: meta_v1.ObjectMeta{ - // labels match those added by "addPods", but ns and health check - // path is different. If this pod was created in the same ns, it - // would become the health check. - Labels: map[string]string{"app-3001": "test"}, - Name: fmt.Sprintf("test-pod-new-ns"), - Namespace: "new-ns", - CreationTimestamp: meta_v1.NewTime(firstPodCreationTime.Add(-time.Duration(time.Hour))), - }, - Spec: api_v1.PodSpec{ - Containers: []api_v1.Container{ - { - Ports: []api_v1.ContainerPort{{ContainerPort: 80}}, - ReadinessProbe: &api_v1.Probe{ - Handler: api_v1.Handler{ - HTTPGet: &api_v1.HTTPGetAction{ - Scheme: api_v1.URISchemeHTTP, - Path: "/badpath", - Port: intstr.IntOrString{ - Type: intstr.Int, - IntVal: 80, - }, - }, - }, - }, - }, - }, - }, - } - lbc.podLister.Add(firstPod) - nodePortToHealthCheck := map[backends.ServicePort]string{ - {Port: 3001, Protocol: annotations.ProtocolHTTP}: "/healthz", - } - addPods(lbc, nodePortToHealthCheck, api_v1.NamespaceDefault) - - for p, exp := range nodePortToHealthCheck { - got, err := lbc.Translator.GetProbe(p) - if err != nil || got == nil { - t.Errorf("Failed to get probe for node port %v: %v", p, err) - } else if getProbePath(got) != exp { - t.Errorf("Wrong path for node port %v, got %v expected %v", p, getProbePath(got), exp) - } - } -} - -func addPods(lbc *LoadBalancerController, nodePortToHealthCheck map[backends.ServicePort]string, ns string) { - delay := time.Minute - for np, u := range nodePortToHealthCheck { - l := map[string]string{fmt.Sprintf("app-%d", np.Port): "test"} - svc := &api_v1.Service{ - Spec: api_v1.ServiceSpec{ - Selector: l, - Ports: []api_v1.ServicePort{ - { - NodePort: int32(np.Port), - TargetPort: intstr.IntOrString{ - Type: intstr.Int, - IntVal: 80, - }, - }, - }, - }, - } - svc.Name = fmt.Sprintf("%d", np.Port) - svc.Namespace = ns - lbc.svcLister.Add(svc) - - pod := &api_v1.Pod{ - ObjectMeta: meta_v1.ObjectMeta{ - Labels: l, - Name: fmt.Sprintf("%d", np.Port), - Namespace: ns, - CreationTimestamp: meta_v1.NewTime(firstPodCreationTime.Add(delay)), - }, - Spec: api_v1.PodSpec{ - Containers: []api_v1.Container{ - { - Ports: []api_v1.ContainerPort{{Name: "test", ContainerPort: 80}}, - ReadinessProbe: &api_v1.Probe{ - Handler: api_v1.Handler{ - HTTPGet: &api_v1.HTTPGetAction{ - Scheme: api_v1.URIScheme(string(np.Protocol)), - Path: u, - Port: intstr.IntOrString{ - Type: intstr.Int, - IntVal: 80, - }, - }, - }, - }, - }, - }, - }, - } - lbc.podLister.Add(pod) - delay = 2 * delay - } -} - func addNodes(lbc *LoadBalancerController, zoneToNode map[string][]string) { for zone, nodes := range zoneToNode { for _, node := range nodes { @@ -303,100 +153,3 @@ func TestAddInstanceGroupsAnnotation(t *testing.T) { } } } - -func TestGatherFirewallPorts(t *testing.T) { - cm := NewFakeClusterManager(flags.DefaultClusterUID, DefaultFirewallName) - lbc := newLoadBalancerController(t, cm) - lbc.CloudClusterManager.defaultBackendNodePort.Port = int64(30000) - - ep1 := "ep1" - ep2 := "ep2" - - svcPorts := []backends.ServicePort{ - {Port: int64(30001)}, - {Port: int64(30002)}, - { - SvcName: types.NamespacedName{ - "ns", - ep1, - }, - Port: int64(30003), - NEGEnabled: true, - SvcTargetPort: "80", - }, - { - SvcName: types.NamespacedName{ - "ns", - ep2, - }, - Port: int64(30004), - NEGEnabled: true, - SvcTargetPort: "named-port", - }, - } - - lbc.endpointLister.Add(newDefaultEndpoint(ep1)) - lbc.endpointLister.Add(newDefaultEndpoint(ep2)) - - res := lbc.Translator.GatherFirewallPorts(svcPorts, true) - expect := map[int64]bool{ - int64(30000): true, - int64(30001): true, - int64(30002): true, - int64(80): true, - int64(8080): true, - int64(8081): true, - } - if len(res) != len(expect) { - t.Errorf("got firewall ports == %v, want %v", res, expect) - } - - for _, p := range res { - if _, ok := expect[p]; !ok { - t.Errorf("firewall port %v is missing, (got %v, want %v)", p, res, expect) - } - } -} - -func newDefaultEndpoint(name string) *api_v1.Endpoints { - return &api_v1.Endpoints{ - ObjectMeta: meta_v1.ObjectMeta{ - Name: name, - Namespace: "ns", - }, - Subsets: []api_v1.EndpointSubset{ - { - Ports: []api_v1.EndpointPort{ - { - Name: "", - Port: int32(80), - Protocol: api_v1.ProtocolTCP, - }, - { - Name: "named-port", - Port: int32(8080), - Protocol: api_v1.ProtocolTCP, - }, - }, - }, - { - Ports: []api_v1.EndpointPort{ - { - Name: "named-port", - Port: int32(80), - Protocol: api_v1.ProtocolTCP, - }, - }, - }, - { - Ports: []api_v1.EndpointPort{ - { - Name: "named-port", - Port: int32(8081), - Protocol: api_v1.ProtocolTCP, - }, - }, - }, - }, - } -} From c9a3f8fcde7e03e370a382d8e9085bdd5c950443 Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Sun, 14 Jan 2018 14:38:23 -0800 Subject: [PATCH 07/10] Cleanup unit test to be more readable --- pkg/controller/translator/translator_test.go | 48 +++++++++++++------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/pkg/controller/translator/translator_test.go b/pkg/controller/translator/translator_test.go index 0a607935e1..faec8765b6 100644 --- a/pkg/controller/translator/translator_test.go +++ b/pkg/controller/translator/translator_test.go @@ -47,7 +47,6 @@ type fakeBackendInfo struct { func (bi *fakeBackendInfo) BackendServiceForPort(port int64) (*compute.BackendService, error) { panic(fmt.Errorf("should not be used")) - return nil, nil } func (bi *fakeBackendInfo) DefaultBackendNodePort() *backends.ServicePort { @@ -219,7 +218,7 @@ func makePods(nodePortToHealthCheck map[backends.ServicePort]string, ns string) func makeServices(nodePortToHealthCheck map[backends.ServicePort]string, ns string) []*apiv1.Service { var services []*apiv1.Service - for np, _ := range nodePortToHealthCheck { + for np := range nodePortToHealthCheck { svc := &apiv1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("%d", np.Port), @@ -273,21 +272,28 @@ func TestGatherFirewallPorts(t *testing.T) { translator.endpointLister.Add(newDefaultEndpoint(ep1)) translator.endpointLister.Add(newDefaultEndpoint(ep2)) - res := translator.GatherFirewallPorts(svcPorts, true) - expect := map[int64]bool{ - int64(30000): true, - int64(30001): true, - int64(30002): true, - int64(80): true, - int64(8080): true, - int64(8081): true, - } - if len(res) != len(expect) { - t.Errorf("got firewall ports == %v, want %v", res, expect) - } - for _, p := range res { - if _, ok := expect[p]; !ok { - t.Errorf("firewall port %v is missing, (got %v, want %v)", p, res, expect) + for _, tc := range []struct { + defaultBackend bool + want []int64 + }{ + { + defaultBackend: false, + want: []int64{80, 8080, 8081, 30001, 30002}, + }, + { + defaultBackend: true, + want: []int64{80, 8080, 8081, 30000, 30001, 30002}, + }, + } { + + res := translator.GatherFirewallPorts(svcPorts, tc.defaultBackend) + if len(res) != len(tc.want) { + t.Errorf("got firewall ports == %v, want %v", res, tc.want) + } + for _, p := range res { + if _, ok := int64ToMap(tc.want)[p]; !ok { + t.Errorf("firewall port %v is missing, (got %v, want %v)", p, res, tc.want) + } } } } @@ -315,3 +321,11 @@ func newDefaultEndpoint(name string) *apiv1.Endpoints { }, } } + +func int64ToMap(l []int64) map[int64]bool { + ret := map[int64]bool{} + for _, i := range l { + ret[i] = true + } + return ret +} From e140cb145b41e7b09bb342e41260f6a8ca11ea3e Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Thu, 11 Jan 2018 23:46:38 -0800 Subject: [PATCH 08/10] Only sync the Ingress that was changed - If the Ingress was deleted, only perform a GC. - Only update the Ingress that was changed. --- pkg/controller/cluster_manager.go | 2 ++ pkg/controller/controller.go | 43 ++++++++++++++++++++----------- pkg/controller/controller_test.go | 17 ++++++++---- 3 files changed, 42 insertions(+), 20 deletions(-) diff --git a/pkg/controller/cluster_manager.go b/pkg/controller/cluster_manager.go index a9ccc8404f..49d76ffce0 100644 --- a/pkg/controller/cluster_manager.go +++ b/pkg/controller/cluster_manager.go @@ -105,6 +105,8 @@ func (c *ClusterManager) shutdown() error { // If in performing the checkpoint the cluster manager runs out of quota, a // googleapi 403 is returned. func (c *ClusterManager) Checkpoint(lbs []*loadbalancers.L7RuntimeInfo, nodeNames []string, backendServicePorts []backends.ServicePort, namedPorts []backends.ServicePort, firewallPorts []int64) ([]*compute.InstanceGroup, error) { + glog.V(4).Infof("Checkpoint %q, len(lbs)=%v, len(nodeNames)=%v, lne(backendServicePorts)=%v, len(namedPorts)=%v, len(firewallPorts)=%v", len(lbs), len(nodeNames), len(backendServicePorts), len(namedPorts), len(firewallPorts)) + if len(namedPorts) != 0 { // Add the default backend node port to the list of named ports for instance groups. namedPorts = append(namedPorts, c.defaultBackendNodePort) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index b97b8d7fca..5adbf20b35 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -93,24 +93,24 @@ type LoadBalancerController struct { // required for L7 loadbalancing. // - resyncPeriod: Watchers relist from the Kubernetes API server this often. func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan struct{}, ctx *context.ControllerContext, clusterManager *ClusterManager, negEnabled bool) (*LoadBalancerController, error) { - eventBroadcaster := record.NewBroadcaster() - eventBroadcaster.StartLogging(glog.Infof) - eventBroadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{ + broadcaster := record.NewBroadcaster() + broadcaster.StartLogging(glog.Infof) + broadcaster.StartRecordingToSink(&unversionedcore.EventSinkImpl{ Interface: kubeClient.Core().Events(""), }) lbc := LoadBalancerController{ client: kubeClient, + ctx: ctx, + ingLister: StoreToIngressLister{Store: ctx.IngressInformer.GetStore()}, + nodeLister: ctx.NodeInformer.GetIndexer(), + nodes: NewNodeController(ctx, clusterManager), CloudClusterManager: clusterManager, + recorder: broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "loadbalancer-controller"}), stopCh: stopCh, - recorder: eventBroadcaster.NewRecorder(scheme.Scheme, - apiv1.EventSource{Component: "loadbalancer-controller"}), - negEnabled: negEnabled, + hasSynced: ctx.HasSynced, + negEnabled: negEnabled, } lbc.ingQueue = utils.NewPeriodicTaskQueue("ingresses", lbc.sync) - lbc.hasSynced = ctx.HasSynced - lbc.ingLister.Store = ctx.IngressInformer.GetStore() - lbc.nodeLister = ctx.NodeInformer.GetIndexer() - lbc.nodes = NewNodeController(ctx, clusterManager) if negEnabled { lbc.endpointLister.Indexer = ctx.EndpointInformer.GetIndexer() @@ -245,20 +245,27 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { allNodePorts := lbc.Translator.ToNodePorts(&allIngresses) gceNodePorts := lbc.Translator.ToNodePorts(&gceIngresses) - lbNames := lbc.ingLister.Store.ListKeys() - lbs, err := lbc.toRuntimeInfo(gceIngresses) - if err != nil { - return err - } nodeNames, err := getReadyNodeNames(listers.NewNodeLister(lbc.nodeLister)) if err != nil { return err } + lbNames := lbc.ingLister.Store.ListKeys() + obj, ingExists, err := lbc.ingLister.Store.GetByKey(key) if err != nil { return err } + if !ingExists { + glog.V(2).Infof("Ingress %q no longer exists, triggering GC", key) + return lbc.CloudClusterManager.GC(lbNames, allNodePorts) + } + + ingressObj, ok := obj.(*extensions.Ingress) + if !ok { + return fmt.Errorf("invalid object (not of type Ingress), type was %T", obj) + } + // This performs a 2 phase checkpoint with the cloud: // * Phase 1 creates/verifies resources are as expected. At the end of a // successful checkpoint we know that existing L7s are WAI, and the L7 @@ -280,6 +287,12 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { // Record any errors during sync and throw a single error at the end. This // allows us to free up associated cloud resources ASAP. + lbs, err := lbc.toRuntimeInfo(extensions.IngressList{ + Items: []extensions.Ingress{*ingressObj}, + }) + if err != nil { + return err + } igs, err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, gceNodePorts, allNodePorts, lbc.Translator.GatherFirewallPorts(gceNodePorts, len(lbs) > 0)) if err != nil { // TODO: Implement proper backoff for the queue. diff --git a/pkg/controller/controller_test.go b/pkg/controller/controller_test.go index 35cbcb6e86..a8573dfcfc 100644 --- a/pkg/controller/controller_test.go +++ b/pkg/controller/controller_test.go @@ -98,10 +98,14 @@ func toIngressRules(hostRules map[string]utils.FakeIngressRuleValueMap) []extens // newIngress returns a new Ingress with the given path map. func newIngress(hostRules map[string]utils.FakeIngressRuleValueMap) *extensions.Ingress { - return &extensions.Ingress{ + ret := &extensions.Ingress{ + TypeMeta: meta_v1.TypeMeta{ + Kind: "Ingress", + APIVersion: "extensions/v1beta1", + }, ObjectMeta: meta_v1.ObjectMeta{ Name: fmt.Sprintf("%v", uuid.NewUUID()), - Namespace: "", + Namespace: "default", }, Spec: extensions.IngressSpec{ Backend: &extensions.IngressBackend{ @@ -118,6 +122,8 @@ func newIngress(hostRules map[string]utils.FakeIngressRuleValueMap) *extensions. }, }, } + ret.SelfLink = fmt.Sprintf("%s/%s", ret.Namespace, ret.Name) + return ret } // validIngress returns a valid Ingress. @@ -274,6 +280,7 @@ func TestLbCreateDelete(t *testing.T) { t.Fatalf("Found backend %+v for port %v", be, port) } } + lbc.ingLister.Store.Delete(ings[1]) lbc.sync(getKey(ings[1], t)) @@ -285,15 +292,15 @@ func TestLbCreateDelete(t *testing.T) { } } if len(cm.fakeLbs.Fw) != 0 || len(cm.fakeLbs.Um) != 0 || len(cm.fakeLbs.Tp) != 0 { - t.Fatalf("Loadbalancer leaked resources") + t.Errorf("Loadbalancer leaked resources") } for _, lbName := range []string{getKey(ings[0], t), getKey(ings[1], t)} { if l7, err := cm.l7Pool.Get(lbName); err == nil { - t.Fatalf("Found unexpected loadbalandcer %+v: %v", l7, err) + t.Fatalf("Got loadbalancer %+v: %v, want none", l7, err) } } if firewallRule, err := cm.firewallPool.(*firewalls.FirewallRules).GetFirewall(firewallName); err == nil { - t.Fatalf("Found unexpected firewall rule %v", firewallRule) + t.Errorf("Got firewall rule %+v, want none", firewallRule) } } From 7aa65190923c8f4cdd91d6a795463d855819f62a Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Sat, 13 Jan 2018 10:42:19 -0800 Subject: [PATCH 09/10] Make event recorders have the right namespaces --- pkg/context/context.go | 30 ++++++++++++++++++++ pkg/controller/controller.go | 22 +++++++------- pkg/controller/translator/translator.go | 24 +++++++++++----- pkg/controller/translator/translator_test.go | 3 +- 4 files changed, 58 insertions(+), 21 deletions(-) diff --git a/pkg/context/context.go b/pkg/context/context.go index 9dc57beedf..8b451d2d03 100644 --- a/pkg/context/context.go +++ b/pkg/context/context.go @@ -19,19 +19,30 @@ package context import ( "time" + "github.com/golang/glog" + + apiv1 "k8s.io/api/core/v1" informerv1 "k8s.io/client-go/informers/core/v1" informerv1beta1 "k8s.io/client-go/informers/extensions/v1beta1" "k8s.io/client-go/kubernetes" + scheme "k8s.io/client-go/kubernetes/scheme" + corev1 "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" ) // ControllerContext holds type ControllerContext struct { + kubeClient kubernetes.Interface + IngressInformer cache.SharedIndexInformer ServiceInformer cache.SharedIndexInformer PodInformer cache.SharedIndexInformer NodeInformer cache.SharedIndexInformer EndpointInformer cache.SharedIndexInformer + + // Map of namespace => record.EventRecorder. + recorders map[string]record.EventRecorder } // NewControllerContext returns a new shared set of informers. @@ -40,14 +51,17 @@ func NewControllerContext(kubeClient kubernetes.Interface, namespace string, res return cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc} } context := &ControllerContext{ + kubeClient: kubeClient, IngressInformer: informerv1beta1.NewIngressInformer(kubeClient, namespace, resyncPeriod, newIndexer()), ServiceInformer: informerv1.NewServiceInformer(kubeClient, namespace, resyncPeriod, newIndexer()), PodInformer: informerv1.NewPodInformer(kubeClient, namespace, resyncPeriod, newIndexer()), NodeInformer: informerv1.NewNodeInformer(kubeClient, resyncPeriod, newIndexer()), + recorders: map[string]record.EventRecorder{}, } if enableEndpointsInformer { context.EndpointInformer = informerv1.NewEndpointsInformer(kubeClient, namespace, resyncPeriod, newIndexer()) } + return context } @@ -71,6 +85,22 @@ func (ctx *ControllerContext) HasSynced() bool { return true } +func (ctx *ControllerContext) Recorder(ns string) record.EventRecorder { + if rec, ok := ctx.recorders[ns]; ok { + return rec + } + + broadcaster := record.NewBroadcaster() + broadcaster.StartLogging(glog.Infof) + broadcaster.StartRecordingToSink(&corev1.EventSinkImpl{ + Interface: ctx.kubeClient.Core().Events(ns), + }) + rec := broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "loadbalancer-controller"}) + ctx.recorders[ns] = rec + + return rec +} + // Start all of the informers. func (ctx *ControllerContext) Start(stopCh chan struct{}) { go ctx.IngressInformer.Run(stopCh) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 5adbf20b35..889519cc96 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -28,11 +28,11 @@ import ( extensions "k8s.io/api/extensions/v1beta1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" - scheme "k8s.io/client-go/kubernetes/scheme" unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1" listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/context" "k8s.io/ingress-gce/pkg/controller/translator" @@ -69,7 +69,6 @@ type LoadBalancerController struct { // TODO: Watch secrets CloudClusterManager *ClusterManager - recorder record.EventRecorder ingQueue utils.TaskQueue Translator *translator.GCE stopCh chan struct{} @@ -105,7 +104,6 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan stru nodeLister: ctx.NodeInformer.GetIndexer(), nodes: NewNodeController(ctx, clusterManager), CloudClusterManager: clusterManager, - recorder: broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "loadbalancer-controller"}), stopCh: stopCh, hasSynced: ctx.HasSynced, negEnabled: negEnabled, @@ -124,7 +122,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan stru glog.Infof("Ignoring add for ingress %v based on annotation %v", addIng.Name, annotations.IngressClassKey) return } - lbc.recorder.Eventf(addIng, apiv1.EventTypeNormal, "ADD", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name)) + lbc.ctx.Recorder(addIng.Namespace).Eventf(addIng, apiv1.EventTypeNormal, "ADD", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name)) lbc.ingQueue.Enqueue(obj) }, DeleteFunc: func(obj interface{}) { @@ -163,7 +161,7 @@ func NewLoadBalancerController(kubeClient kubernetes.Interface, stopCh chan stru if ctx.EndpointInformer != nil { endpointIndexer = ctx.EndpointInformer.GetIndexer() } - lbc.Translator = translator.New(lbc.recorder, lbc.CloudClusterManager, + lbc.Translator = translator.New(lbc.ctx, lbc.CloudClusterManager, ctx.ServiceInformer.GetIndexer(), ctx.NodeInformer.GetIndexer(), ctx.PodInformer.GetIndexer(), @@ -298,14 +296,14 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { // TODO: Implement proper backoff for the queue. const eventMsg = "GCE" if fwErr, ok := err.(*firewalls.FirewallSyncError); ok { - if ingExists { - lbc.recorder.Eventf(obj.(*extensions.Ingress), apiv1.EventTypeNormal, eventMsg, fwErr.Message) + if ingObj, ok := obj.(*extensions.Ingress); ok && ingExists { + lbc.ctx.Recorder(ingObj.Namespace).Eventf(ingObj, apiv1.EventTypeNormal, eventMsg, fwErr.Message) } else { glog.Warningf("Received firewallSyncError but don't have an ingress for raising an event: %v", fwErr.Message) } } else { - if ingExists { - lbc.recorder.Eventf(obj.(*extensions.Ingress), apiv1.EventTypeWarning, eventMsg, err.Error()) + if ingObj, ok := obj.(*extensions.Ingress); ok && ingExists { + lbc.ctx.Recorder(ingObj.Namespace).Eventf(ingObj, apiv1.EventTypeWarning, eventMsg, err.Error()) } else { err = fmt.Errorf("%v, error: %v", eventMsg, err) } @@ -355,10 +353,10 @@ func (lbc *LoadBalancerController) sync(key string) (err error) { if urlMap, err := lbc.Translator.ToURLMap(&ing); err != nil { syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err) } else if err := l7.UpdateUrlMap(urlMap); err != nil { - lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "UrlMap", err.Error()) + lbc.ctx.Recorder(ing.Namespace).Eventf(&ing, apiv1.EventTypeWarning, "UrlMap", err.Error()) syncError = fmt.Errorf("%v, update url map error: %v", syncError, err) } else if err := lbc.updateIngressStatus(l7, ing); err != nil { - lbc.recorder.Eventf(&ing, apiv1.EventTypeWarning, "Status", err.Error()) + lbc.ctx.Recorder(ing.Namespace).Eventf(&ing, apiv1.EventTypeWarning, "Status", err.Error()) syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err) } return syncError @@ -391,7 +389,7 @@ func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing if _, err := ingClient.UpdateStatus(currIng); err != nil { return err } - lbc.recorder.Eventf(currIng, apiv1.EventTypeNormal, "CREATE", "ip: %v", ip) + lbc.ctx.Recorder(ing.Namespace).Eventf(currIng, apiv1.EventTypeNormal, "CREATE", "ip: %v", ip) } } annotations := loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.CloudClusterManager.backendPool) diff --git a/pkg/controller/translator/translator.go b/pkg/controller/translator/translator.go index d8a00e3f35..d827458bce 100644 --- a/pkg/controller/translator/translator.go +++ b/pkg/controller/translator/translator.go @@ -35,6 +35,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" listers "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/backends" @@ -43,14 +44,20 @@ import ( "k8s.io/ingress-gce/pkg/utils" ) +// BackendInfo is an interface to return information about the backends. type BackendInfo interface { BackendServiceForPort(port int64) (*compute.BackendService, error) DefaultBackendNodePort() *backends.ServicePort } -func New(recorder record.EventRecorder, bi BackendInfo, svcLister cache.Indexer, nodeLister cache.Indexer, podLister cache.Indexer, endpointLister cache.Indexer, negEnabled bool) *GCE { +type recorderSource interface { + Recorder(ns string) record.EventRecorder +} + +// New returns a new ControllerContext. +func New(recorders recorderSource, bi BackendInfo, svcLister cache.Indexer, nodeLister cache.Indexer, podLister cache.Indexer, endpointLister cache.Indexer, negEnabled bool) *GCE { return &GCE{ - recorder, + recorders, bi, svcLister, nodeLister, @@ -62,7 +69,8 @@ func New(recorder record.EventRecorder, bi BackendInfo, svcLister cache.Indexer, // GCE helps with kubernetes -> gce api conversion. type GCE struct { - recorder record.EventRecorder + recorders recorderSource + bi BackendInfo svcLister cache.Indexer nodeLister cache.Indexer @@ -87,7 +95,7 @@ func (t *GCE) ToURLMap(ing *extensions.Ingress) (utils.GCEURLMap, error) { // to all other services under the assumption that the user will // modify nodeport. if _, ok := err.(errors.ErrNodePortNotFound); ok { - t.recorder.Eventf(ing, api_v1.EventTypeWarning, "Service", err.(errors.ErrNodePortNotFound).Error()) + t.recorders.Recorder(ing.Namespace).Eventf(ing, api_v1.EventTypeWarning, "Service", err.(errors.ErrNodePortNotFound).Error()) continue } @@ -121,12 +129,14 @@ func (t *GCE) ToURLMap(ing *extensions.Ingress) (utils.GCEURLMap, error) { if _, ok := err.(errors.ErrNodePortNotFound); ok { msg = fmt.Sprintf("couldn't find nodeport for %v/%v", ing.Namespace, ing.Spec.Backend.ServiceName) } - t.recorder.Eventf(ing, api_v1.EventTypeWarning, "Service", fmt.Sprintf("failed to identify user specified default backend, %v, using system default", msg)) + msg = fmt.Sprintf("failed to identify user specified default backend, %v, using system default", msg) + t.recorders.Recorder(ing.Namespace).Eventf(ing, api_v1.EventTypeWarning, "Service", msg) } else if defaultBackend != nil { - t.recorder.Eventf(ing, api_v1.EventTypeNormal, "Service", fmt.Sprintf("default backend set to %v:%v", ing.Spec.Backend.ServiceName, defaultBackend.Port)) + msg := fmt.Sprintf("default backend set to %v:%v", ing.Spec.Backend.ServiceName, defaultBackend.Port) + t.recorders.Recorder(ing.Namespace).Eventf(ing, api_v1.EventTypeNormal, "Service", msg) } } else { - t.recorder.Eventf(ing, api_v1.EventTypeNormal, "Service", "no user specified default backend, using system default") + t.recorders.Recorder(ing.Namespace).Eventf(ing, api_v1.EventTypeNormal, "Service", "no user specified default backend, using system default") } hostPathBackend.PutDefaultBackend(defaultBackend) return hostPathBackend, nil diff --git a/pkg/controller/translator/translator_test.go b/pkg/controller/translator/translator_test.go index faec8765b6..1835707d2b 100644 --- a/pkg/controller/translator/translator_test.go +++ b/pkg/controller/translator/translator_test.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/client-go/kubernetes/fake" - "k8s.io/client-go/kubernetes/scheme" unversionedcore "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/client-go/tools/record" @@ -63,7 +62,7 @@ func gceForTest(negEnabled bool) *GCE { ctx := context.NewControllerContext(client, apiv1.NamespaceAll, 1*time.Second, negEnabled) gce := &GCE{ - recorder: broadcaster.NewRecorder(scheme.Scheme, apiv1.EventSource{Component: "loadbalancer-controller"}), + recorders: ctx, bi: &fakeBackendInfo{}, svcLister: ctx.ServiceInformer.GetIndexer(), nodeLister: ctx.NodeInformer.GetIndexer(), From 4a831f3515d1d34778b81c30a5b6ba7b406fa0ca Mon Sep 17 00:00:00 2001 From: Bowei Du Date: Sat, 13 Jan 2018 11:08:11 -0800 Subject: [PATCH 10/10] Remove firewall if there are no more L7 lbs --- pkg/controller/cluster_manager.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/pkg/controller/cluster_manager.go b/pkg/controller/cluster_manager.go index 49d76ffce0..24770063a3 100644 --- a/pkg/controller/cluster_manager.go +++ b/pkg/controller/cluster_manager.go @@ -174,15 +174,17 @@ func (c *ClusterManager) GC(lbNames []string, nodePorts []backends.ServicePort) } // TODO(ingress#120): Move this to the backend pool so it mirrors creation - var igErr error if len(lbNames) == 0 { igName := c.ClusterNamer.InstanceGroup() glog.Infof("Deleting instance group %v", igName) - igErr = c.instancePool.DeleteInstanceGroup(igName) + if err := c.instancePool.DeleteInstanceGroup(igName); err != err { + return err + } } - if igErr != nil { - return igErr + if len(lbNames) == 0 { + c.firewallPool.Shutdown() } + return nil }