diff --git a/pkg/backends/neg_linker_test.go b/pkg/backends/neg_linker_test.go index eb5419304b..dd3263402f 100644 --- a/pkg/backends/neg_linker_test.go +++ b/pkg/backends/neg_linker_test.go @@ -20,14 +20,14 @@ import ( computebeta "google.golang.org/api/compute/v0.beta" "k8s.io/apimachinery/pkg/types" "k8s.io/ingress-gce/pkg/annotations" - "k8s.io/ingress-gce/pkg/neg" + negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/ingress-gce/pkg/utils" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/mock" ) -func newTestNEGLinker(fakeNEG neg.NetworkEndpointGroupCloud, fakeGCE *gce.GCECloud) *negLinker { +func newTestNEGLinker(fakeNEG negtypes.NetworkEndpointGroupCloud, fakeGCE *gce.GCECloud) *negLinker { fakeBackendPool := NewPool(fakeGCE, defaultNamer, false) // Add standard hooks for mocking update calls. Each test can set a update different hook if it chooses to. @@ -40,7 +40,7 @@ func newTestNEGLinker(fakeNEG neg.NetworkEndpointGroupCloud, fakeGCE *gce.GCEClo func TestLinkBackendServiceToNEG(t *testing.T) { fakeGCE := gce.FakeGCECloud(gce.DefaultTestClusterValues()) - fakeNEG := neg.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network") + fakeNEG := negtypes.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network") linker := newTestNEGLinker(fakeNEG, fakeGCE) zones := []GroupKey{{"zone1"}, {"zone2"}} diff --git a/pkg/neg/cloudprovideradapter.go b/pkg/neg/cloudprovideradapter.go index a3e5294396..64dd702113 100644 --- a/pkg/neg/cloudprovideradapter.go +++ b/pkg/neg/cloudprovideradapter.go @@ -21,6 +21,7 @@ import ( "strings" computebeta "google.golang.org/api/compute/v0.beta" + negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter" @@ -28,7 +29,7 @@ import ( ) // NewAdapter takes a GCECloud and returns a NetworkEndpointGroupCloud. -func NewAdapter(g *gce.GCECloud) NetworkEndpointGroupCloud { +func NewAdapter(g *gce.GCECloud) negtypes.NetworkEndpointGroupCloud { return &cloudProviderAdapter{ c: g.Compute(), networkURL: g.NetworkURL(), diff --git a/pkg/neg/controller.go b/pkg/neg/controller.go index 822fe0bf4d..3da4a22267 100644 --- a/pkg/neg/controller.go +++ b/pkg/neg/controller.go @@ -36,24 +36,25 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/context" + "k8s.io/ingress-gce/pkg/neg/metrics" negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/ingress-gce/pkg/utils" ) func init() { // register prometheus metrics - registerMetrics() + metrics.RegisterMetrics() } // Controller is network endpoint group controller. -// It determines whether NEG for a service port is needed, then signals negSyncerManager to sync it. +// It determines whether NEG for a service port is needed, then signals NegSyncerManager to sync it. type Controller struct { - manager negSyncerManager + manager negtypes.NegSyncerManager resyncPeriod time.Duration gcPeriod time.Duration recorder record.EventRecorder - namer networkEndpointGroupNamer - zoneGetter zoneGetter + namer negtypes.NetworkEndpointGroupNamer + zoneGetter negtypes.ZoneGetter ingressSynced cache.InformerSynced serviceSynced cache.InformerSynced @@ -71,10 +72,10 @@ type Controller struct { // NewController returns a network endpoint group controller. func NewController( - cloud NetworkEndpointGroupCloud, + cloud negtypes.NetworkEndpointGroupCloud, ctx *context.ControllerContext, - zoneGetter zoneGetter, - namer networkEndpointGroupNamer, + zoneGetter negtypes.ZoneGetter, + namer negtypes.NetworkEndpointGroupNamer, resyncPeriod time.Duration, gcPeriod time.Duration) *Controller { // init event recorder @@ -202,7 +203,7 @@ func (c *Controller) stop() { func (c *Controller) processEndpoint(obj interface{}) { defer func() { now := c.syncTracker.Track() - lastSyncTimestamp.WithLabelValues().Set(float64(now.UTC().UnixNano())) + metrics.LastSyncTimestamp.WithLabelValues().Set(float64(now.UTC().UnixNano())) }() key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) @@ -235,7 +236,7 @@ func (c *Controller) serviceWorker() { func (c *Controller) processService(key string) error { defer func() { now := c.syncTracker.Track() - lastSyncTimestamp.WithLabelValues().Set(float64(now.UTC().UnixNano())) + metrics.LastSyncTimestamp.WithLabelValues().Set(float64(now.UTC().UnixNano())) }() namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -438,16 +439,12 @@ func gatherIngressServiceKeys(ing *extensions.Ingress) sets.String { return set } utils.TraverseIngressBackends(ing, func(id utils.ServicePortID) bool { - set.Insert(serviceKeyFunc(id.Service.Namespace, id.Service.Name)) + set.Insert(utils.ServiceKeyFunc(id.Service.Namespace, id.Service.Name)) return false }) return set } -func serviceKeyFunc(namespace, name string) string { - return fmt.Sprintf("%s/%s", namespace, name) -} - func getIngressServicesFromStore(store cache.Store, svc *apiv1.Service) (ings []extensions.Ingress) { for _, m := range store.List() { ing := *m.(*extensions.Ingress) diff --git a/pkg/neg/controller_test.go b/pkg/neg/controller_test.go index 023f47005a..1a515ec826 100644 --- a/pkg/neg/controller_test.go +++ b/pkg/neg/controller_test.go @@ -39,6 +39,12 @@ import ( "k8s.io/apimachinery/pkg/util/intstr" ) +const ( + testServiceNamespace = "test-ns" + testServiceName = "test-Name" + testNamedPort = "named-Port" +) + var ( defaultBackend = utils.ServicePortID{Service: types.NamespacedName{Name: "default-http-backend", Namespace: "kube-system"}, Port: intstr.FromString("http")} ) @@ -55,9 +61,9 @@ func newTestController(kubeClient kubernetes.Interface) *Controller { } context := context.NewControllerContext(kubeClient, backendConfigClient, nil, namer, ctxConfig) controller := NewController( - NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network"), + negtypes.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network"), context, - NewFakeZoneGetter(), + negtypes.NewFakeZoneGetter(), namer, 1*time.Second, 1*time.Second, @@ -95,7 +101,7 @@ func TestNewNonNEGService(t *testing.T) { defer controller.stop() controller.serviceLister.Add(newTestService(controller, false, []int32{})) controller.ingressLister.Add(newTestIngress()) - err := controller.processService(serviceKeyFunc(testServiceNamespace, testServiceName)) + err := controller.processService(utils.ServiceKeyFunc(testServiceNamespace, testServiceName)) if err != nil { t.Fatalf("Failed to process service: %v", err) } @@ -154,7 +160,7 @@ func TestNewNEGService(t *testing.T) { t.Run(tc.desc, func(t *testing.T) { controller := newTestController(fake.NewSimpleClientset()) defer controller.stop() - svcKey := serviceKeyFunc(testServiceNamespace, testServiceName) + svcKey := utils.ServiceKeyFunc(testServiceNamespace, testServiceName) controller.serviceLister.Add(newTestService(controller, tc.ingress, tc.svcPorts)) if tc.ingress { @@ -193,7 +199,7 @@ func TestEnableNEGServiceWithIngress(t *testing.T) { controller.serviceLister.Add(newTestService(controller, false, []int32{})) controller.ingressLister.Add(newTestIngress()) svcClient := controller.client.CoreV1().Services(testServiceNamespace) - svcKey := serviceKeyFunc(testServiceNamespace, testServiceName) + svcKey := utils.ServiceKeyFunc(testServiceNamespace, testServiceName) err := controller.processService(svcKey) if err != nil { t.Fatalf("Failed to process service: %v", err) @@ -225,14 +231,14 @@ func TestDisableNEGServiceWithIngress(t *testing.T) { defer controller.stop() controller.serviceLister.Add(newTestService(controller, true, []int32{})) controller.ingressLister.Add(newTestIngress()) - err := controller.processService(serviceKeyFunc(testServiceNamespace, testServiceName)) + err := controller.processService(utils.ServiceKeyFunc(testServiceNamespace, testServiceName)) if err != nil { t.Fatalf("Failed to process service: %v", err) } validateSyncers(t, controller, 3, false) controller.serviceLister.Update(newTestService(controller, false, []int32{})) - err = controller.processService(serviceKeyFunc(testServiceNamespace, testServiceName)) + err = controller.processService(utils.ServiceKeyFunc(testServiceNamespace, testServiceName)) if err != nil { t.Fatalf("Failed to process service: %v", err) } @@ -436,7 +442,7 @@ func validateServiceStateAnnotation(t *testing.T, svc *apiv1.Service, svcPorts [ } } - zoneGetter := NewFakeZoneGetter() + zoneGetter := negtypes.NewFakeZoneGetter() zones, _ := zoneGetter.ListZones() for _, zone := range zones { if !strings.Contains(v, zone) { diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index 5086c16a6a..6d7cae5f7f 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -25,7 +25,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" - "k8s.io/ingress-gce/pkg/neg/types" + negsyncer "k8s.io/ingress-gce/pkg/neg/syncer" + negtypes "k8s.io/ingress-gce/pkg/neg/types" ) type serviceKey struct { @@ -35,10 +36,10 @@ type serviceKey struct { // syncerManager contains all the active syncer goroutines and manage their lifecycle. type syncerManager struct { - namer networkEndpointGroupNamer + namer negtypes.NetworkEndpointGroupNamer recorder record.EventRecorder - cloud NetworkEndpointGroupCloud - zoneGetter zoneGetter + cloud negtypes.NetworkEndpointGroupCloud + zoneGetter negtypes.ZoneGetter serviceLister cache.Indexer endpointLister cache.Indexer @@ -48,13 +49,13 @@ type syncerManager struct { // svcPortMap is the canonical indicator for whether a service needs NEG. // key consists of service namespace and name. Value is a map of ServicePort // Port:TargetPort, which represents ports that require NEG - svcPortMap map[serviceKey]types.PortNameMap + svcPortMap map[serviceKey]negtypes.PortNameMap // syncerMap stores the NEG syncer // key consists of service namespace, name and targetPort. Value is the corresponding syncer. - syncerMap map[servicePort]negSyncer + syncerMap map[negsyncer.NegSyncerKey]negtypes.NegSyncer } -func newSyncerManager(namer networkEndpointGroupNamer, recorder record.EventRecorder, cloud NetworkEndpointGroupCloud, zoneGetter zoneGetter, serviceLister cache.Indexer, endpointLister cache.Indexer) *syncerManager { +func newSyncerManager(namer negtypes.NetworkEndpointGroupNamer, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, serviceLister cache.Indexer, endpointLister cache.Indexer) *syncerManager { return &syncerManager{ namer: namer, recorder: recorder, @@ -62,19 +63,19 @@ func newSyncerManager(namer networkEndpointGroupNamer, recorder record.EventReco zoneGetter: zoneGetter, serviceLister: serviceLister, endpointLister: endpointLister, - svcPortMap: make(map[serviceKey]types.PortNameMap), - syncerMap: make(map[servicePort]negSyncer), + svcPortMap: make(map[serviceKey]negtypes.PortNameMap), + syncerMap: make(map[negsyncer.NegSyncerKey]negtypes.NegSyncer), } } // EnsureSyncer starts and stops syncers based on the input service ports. -func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts types.PortNameMap) error { +func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts negtypes.PortNameMap) error { manager.mu.Lock() defer manager.mu.Unlock() key := getServiceKey(namespace, name) currentPorts, ok := manager.svcPortMap[key] if !ok { - currentPorts = make(types.PortNameMap) + currentPorts = make(negtypes.PortNameMap) } removes := currentPorts.Difference(newPorts) @@ -95,12 +96,12 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts typ for svcPort, targetPort := range adds { syncer, ok := manager.syncerMap[getSyncerKey(namespace, name, svcPort, targetPort)] if !ok { - syncer = newSyncer( - servicePort{ - namespace: namespace, - name: name, - port: svcPort, - targetPort: targetPort, + syncer = negsyncer.NewBatchSyncer( + negsyncer.NegSyncerKey{ + Namespace: namespace, + Name: name, + Port: svcPort, + TargetPort: targetPort, }, manager.namer.NEG(namespace, name, svcPort), manager.recorder, @@ -242,12 +243,12 @@ func (manager *syncerManager) ensureDeleteNetworkEndpointGroup(name, zone string } // getSyncerKey encodes a service namespace, name, service port and targetPort into a string key -func getSyncerKey(namespace, name string, port int32, targetPort string) servicePort { - return servicePort{ - namespace: namespace, - name: name, - port: port, - targetPort: targetPort, +func getSyncerKey(namespace, name string, port int32, targetPort string) negsyncer.NegSyncerKey { + return negsyncer.NegSyncerKey{ + Namespace: namespace, + Name: name, + Port: port, + TargetPort: targetPort, } } diff --git a/pkg/neg/manager_test.go b/pkg/neg/manager_test.go index e9678b3da3..f78926b2ed 100644 --- a/pkg/neg/manager_test.go +++ b/pkg/neg/manager_test.go @@ -22,13 +22,16 @@ import ( compute "google.golang.org/api/compute/v0.beta" apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake" "k8s.io/ingress-gce/pkg/context" + "k8s.io/ingress-gce/pkg/neg/syncer" "k8s.io/ingress-gce/pkg/neg/types" + negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/ingress-gce/pkg/utils" ) @@ -50,8 +53,8 @@ func NewTestSyncerManager(kubeClient kubernetes.Interface) *syncerManager { manager := newSyncerManager( namer, record.NewFakeRecorder(100), - NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network"), - NewFakeZoneGetter(), + negtypes.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-network"), + negtypes.NewFakeZoneGetter(), context.ServiceInformer.GetIndexer(), context.EndpointInformer.GetIndexer(), ) @@ -64,14 +67,14 @@ func TestEnsureAndStopSyncer(t *testing.T) { name string ports types.PortNameMap stop bool - expect []servicePort // keys of running syncers + expect []syncer.NegSyncerKey // keys of running syncers }{ { "ns1", "n1", types.PortNameMap{1000: "80", 2000: "443"}, false, - []servicePort{ + []syncer.NegSyncerKey{ getSyncerKey("ns1", "n1", 1000, "80"), getSyncerKey("ns1", "n1", 2000, "443"), }, @@ -81,7 +84,7 @@ func TestEnsureAndStopSyncer(t *testing.T) { "n1", types.PortNameMap{3000: "80", 4000: "namedport"}, false, - []servicePort{ + []syncer.NegSyncerKey{ getSyncerKey("ns1", "n1", 3000, "80"), getSyncerKey("ns1", "n1", 4000, "namedport"), }, @@ -91,7 +94,7 @@ func TestEnsureAndStopSyncer(t *testing.T) { "n1", types.PortNameMap{3000: "80"}, false, - []servicePort{ + []syncer.NegSyncerKey{ getSyncerKey("ns1", "n1", 3000, "80"), getSyncerKey("ns1", "n1", 4000, "namedport"), getSyncerKey("ns2", "n1", 3000, "80"), @@ -102,7 +105,7 @@ func TestEnsureAndStopSyncer(t *testing.T) { "n1", types.PortNameMap{}, true, - []servicePort{ + []syncer.NegSyncerKey{ getSyncerKey("ns2", "n1", 3000, "80"), }, }, @@ -194,13 +197,13 @@ func TestGarbageCollectionNEG(t *testing.T) { negName := manager.namer.NEG("test", "test", 80) manager.cloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{ Name: negName, - }, TestZone1) + }, negtypes.TestZone1) if err := manager.GC(); err != nil { t.Fatalf("Failed to GC: %v", err) } - negs, _ := manager.cloud.ListNetworkEndpointGroup(TestZone1) + negs, _ := manager.cloud.ListNetworkEndpointGroup(negtypes.TestZone1) for _, neg := range negs { if neg.Name == negName { t.Errorf("Expect NEG %q to be GCed.", negName) @@ -210,3 +213,83 @@ func TestGarbageCollectionNEG(t *testing.T) { // make sure there is no leaking go routine manager.StopSyncer(testServiceNamespace, testServiceName) } + +func getDefaultEndpoint() *apiv1.Endpoints { + instance1 := negtypes.TestInstance1 + instance2 := negtypes.TestInstance2 + instance3 := negtypes.TestInstance3 + instance4 := negtypes.TestInstance4 + return &apiv1.Endpoints{ + ObjectMeta: metav1.ObjectMeta{ + Name: testServiceName, + Namespace: testServiceNamespace, + }, + Subsets: []apiv1.EndpointSubset{ + { + Addresses: []apiv1.EndpointAddress{ + { + IP: "10.100.1.1", + NodeName: &instance1, + }, + { + IP: "10.100.1.2", + NodeName: &instance1, + }, + { + IP: "10.100.2.1", + NodeName: &instance2, + }, + { + IP: "10.100.3.1", + NodeName: &instance3, + }, + }, + Ports: []apiv1.EndpointPort{ + { + Name: "", + Port: int32(80), + Protocol: apiv1.ProtocolTCP, + }, + }, + }, + { + Addresses: []apiv1.EndpointAddress{ + { + IP: "10.100.2.2", + NodeName: &instance2, + }, + { + IP: "10.100.4.1", + NodeName: &instance4, + }, + }, + Ports: []apiv1.EndpointPort{ + { + Name: testNamedPort, + Port: int32(81), + Protocol: apiv1.ProtocolTCP, + }, + }, + }, + { + Addresses: []apiv1.EndpointAddress{ + { + IP: "10.100.3.2", + NodeName: &instance3, + }, + { + IP: "10.100.4.2", + NodeName: &instance4, + }, + }, + Ports: []apiv1.EndpointPort{ + { + Name: testNamedPort, + Port: int32(8081), + Protocol: apiv1.ProtocolTCP, + }, + }, + }, + }, + } +} diff --git a/pkg/neg/metrics.go b/pkg/neg/metrics/metrics.go similarity index 78% rename from pkg/neg/metrics.go rename to pkg/neg/metrics/metrics.go index 6d6976bf74..bd765e8cde 100644 --- a/pkg/neg/metrics.go +++ b/pkg/neg/metrics/metrics.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package neg +package metrics import ( "sync" @@ -32,8 +32,8 @@ const ( resultSuccess = "success" resultError = "error" - attachSync = syncType("attach") - detachSync = syncType("detach") + AttachSync = syncType("attach") + DetachSync = syncType("detach") ) type syncType string @@ -45,7 +45,7 @@ var ( "result", // Result of the sync. } - syncLatency = prometheus.NewHistogramVec( + SyncLatency = prometheus.NewHistogramVec( prometheus.HistogramOpts{ Namespace: metrics.GLBC_NAMESPACE, Subsystem: negControllerSubsystem, @@ -55,7 +55,7 @@ var ( syncMetricsLabels, ) - lastSyncTimestamp = prometheus.NewGaugeVec( + LastSyncTimestamp = prometheus.NewGaugeVec( prometheus.GaugeOpts{ Namespace: metrics.GLBC_NAMESPACE, Subsystem: negControllerSubsystem, @@ -68,18 +68,18 @@ var ( var register sync.Once -func registerMetrics() { +func RegisterMetrics() { register.Do(func() { - prometheus.MustRegister(syncLatency) - prometheus.MustRegister(lastSyncTimestamp) + prometheus.MustRegister(SyncLatency) + prometheus.MustRegister(LastSyncTimestamp) }) } -// observeNegSync publish collected metrics for the sync of NEG -func observeNegSync(negName string, syncType syncType, err error, start time.Time) { +// ObserveNegSync publish collected metrics for the sync of NEG +func ObserveNegSync(negName string, syncType syncType, err error, start time.Time) { result := resultSuccess if err != nil { result = resultError } - syncLatency.WithLabelValues(negName, string(syncType), result).Observe(time.Since(start).Seconds()) + SyncLatency.WithLabelValues(negName, string(syncType), result).Observe(time.Since(start).Seconds()) } diff --git a/pkg/neg/syncer.go b/pkg/neg/syncer/batch.go similarity index 79% rename from pkg/neg/syncer.go rename to pkg/neg/syncer/batch.go index aaf6705c75..5d7c40d478 100644 --- a/pkg/neg/syncer.go +++ b/pkg/neg/syncer/batch.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package neg +package syncer import ( "fmt" @@ -33,6 +33,8 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/ingress-gce/pkg/neg/metrics" + negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/ingress-gce/pkg/utils" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" ) @@ -46,25 +48,26 @@ const ( maxRetryDelay = 600 * time.Second ) -// servicePort includes information to uniquely identify a NEG -type servicePort struct { - namespace string - name string - port int32 - targetPort string +// NegSyncerKey includes information to uniquely identify a NEG +type NegSyncerKey struct { + Namespace string + Name string + Port int32 + TargetPort string } -// syncer handles synchorizing NEGs for one service port. It handles sync, resync and retry on error. -type syncer struct { - servicePort +// batchSyncer handles synchorizing NEGs for one service port. It handles sync, resync and retry on error. +// It syncs NEG in batch and waits for all operation to complete before continue to the next batch. +type batchSyncer struct { + NegSyncerKey negName string serviceLister cache.Indexer endpointLister cache.Indexer recorder record.EventRecorder - cloud NetworkEndpointGroupCloud - zoneGetter zoneGetter + cloud negtypes.NetworkEndpointGroupCloud + zoneGetter negtypes.ZoneGetter stateLock sync.Mutex stopped bool @@ -76,10 +79,10 @@ type syncer struct { retryCount int } -func newSyncer(svcPort servicePort, networkEndpointGroupName string, recorder record.EventRecorder, cloud NetworkEndpointGroupCloud, zoneGetter zoneGetter, serviceLister cache.Indexer, endpointLister cache.Indexer) *syncer { - glog.V(2).Infof("New syncer for service %s/%s port %s NEG %q", svcPort.namespace, svcPort.name, svcPort.targetPort, networkEndpointGroupName) - return &syncer{ - servicePort: svcPort, +func NewBatchSyncer(svcPort NegSyncerKey, networkEndpointGroupName string, recorder record.EventRecorder, cloud negtypes.NetworkEndpointGroupCloud, zoneGetter negtypes.ZoneGetter, serviceLister cache.Indexer, endpointLister cache.Indexer) *batchSyncer { + glog.V(2).Infof("New syncer for service %s/%s Port %s NEG %q", svcPort.Namespace, svcPort.Name, svcPort.TargetPort, networkEndpointGroupName) + return &batchSyncer{ + NegSyncerKey: svcPort, negName: networkEndpointGroupName, recorder: recorder, serviceLister: serviceLister, @@ -94,7 +97,7 @@ func newSyncer(svcPort servicePort, networkEndpointGroupName string, recorder re } } -func (s *syncer) init() { +func (s *batchSyncer) init() { s.stateLock.Lock() defer s.stateLock.Unlock() s.stopped = false @@ -102,7 +105,7 @@ func (s *syncer) init() { } // Start starts the syncer go routine if it has not been started. -func (s *syncer) Start() error { +func (s *batchSyncer) Start() error { if !s.IsStopped() { return fmt.Errorf("NEG syncer for %s is already running.", s.formattedName()) } @@ -126,7 +129,7 @@ func (s *syncer) Start() error { retryMesg = "(will retry)" } - if svc := getService(s.serviceLister, s.namespace, s.name); svc != nil { + if svc := getService(s.serviceLister, s.Namespace, s.Name); svc != nil { s.recorder.Eventf(svc, apiv1.EventTypeWarning, "SyncNetworkEndpointGroupFailed", "Failed to sync NEG %q %s: %v", s.negName, retryMesg, err) } } else { @@ -151,7 +154,7 @@ func (s *syncer) Start() error { } // Stop stops syncer and return only when syncer shutdown completes. -func (s *syncer) Stop() { +func (s *batchSyncer) Stop() { s.stateLock.Lock() defer s.stateLock.Unlock() if !s.stopped { @@ -163,7 +166,7 @@ func (s *syncer) Stop() { } // Sync informs syncer to run sync loop as soon as possible. -func (s *syncer) Sync() bool { +func (s *batchSyncer) Sync() bool { if s.IsStopped() { glog.Warningf("NEG syncer for %s is already stopped.", s.formattedName()) return false @@ -176,31 +179,31 @@ func (s *syncer) Sync() bool { } } -func (s *syncer) IsStopped() bool { +func (s *batchSyncer) IsStopped() bool { s.stateLock.Lock() defer s.stateLock.Unlock() return s.stopped } -func (s *syncer) IsShuttingDown() bool { +func (s *batchSyncer) IsShuttingDown() bool { s.stateLock.Lock() defer s.stateLock.Unlock() return s.shuttingDown } -func (s *syncer) sync() (err error) { +func (s *batchSyncer) sync() (err error) { if s.IsStopped() || s.IsShuttingDown() { glog.V(4).Infof("Skip syncing NEG %q for %s.", s.negName, s.formattedName()) return nil } glog.V(2).Infof("Sync NEG %q for %s.", s.negName, s.formattedName()) start := time.Now() - defer observeNegSync(s.negName, attachSync, err, start) + defer metrics.ObserveNegSync(s.negName, metrics.AttachSync, err, start) ep, exists, err := s.endpointLister.Get( &apiv1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ - Name: s.name, - Namespace: s.namespace, + Name: s.Name, + Namespace: s.Namespace, }, }, ) @@ -209,7 +212,7 @@ func (s *syncer) sync() (err error) { } if !exists { - glog.Warningf("Endpoint %s/%s does not exist. Skipping NEG sync", s.namespace, s.name) + glog.Warningf("Endpoint %s/%s does not exist. Skipping NEG sync", s.Namespace, s.Name) return nil } @@ -230,7 +233,7 @@ func (s *syncer) sync() (err error) { addEndpoints, removeEndpoints := calculateDifference(targetMap, currentMap) if len(addEndpoints) == 0 && len(removeEndpoints) == 0 { - glog.V(4).Infof("No endpoint change for %s/%s, skip syncing NEG. ", s.namespace, s.name) + glog.V(4).Infof("No endpoint change for %s/%s, skip syncing NEG. ", s.Namespace, s.Name) return nil } @@ -238,7 +241,7 @@ func (s *syncer) sync() (err error) { } // ensureNetworkEndpointGroups ensures negs are created in the related zones. -func (s *syncer) ensureNetworkEndpointGroups() error { +func (s *batchSyncer) ensureNetworkEndpointGroups() error { var err error zones, err := s.zoneGetter.ListZones() if err != nil { @@ -265,7 +268,7 @@ func (s *syncer) ensureNetworkEndpointGroups() error { if err != nil { errList = append(errList, err) } else { - if svc := getService(s.serviceLister, s.namespace, s.name); svc != nil { + if svc := getService(s.serviceLister, s.Namespace, s.Name); svc != nil { s.recorder.Eventf(svc, apiv1.EventTypeNormal, "Delete", "Deleted NEG %q for %s in %q.", s.negName, s.formattedName(), zone) } } @@ -284,7 +287,7 @@ func (s *syncer) ensureNetworkEndpointGroups() error { if err != nil { errList = append(errList, err) } else { - if svc := getService(s.serviceLister, s.namespace, s.name); svc != nil { + if svc := getService(s.serviceLister, s.Namespace, s.Name); svc != nil { s.recorder.Eventf(svc, apiv1.EventTypeNormal, "Create", "Created NEG %q for %s in %q.", s.negName, s.formattedName(), zone) } } @@ -294,22 +297,22 @@ func (s *syncer) ensureNetworkEndpointGroups() error { } // toZoneNetworkEndpointMap translates addresses in endpoints object into zone and endpoints map -func (s *syncer) toZoneNetworkEndpointMap(endpoints *apiv1.Endpoints) (map[string]sets.String, error) { +func (s *batchSyncer) toZoneNetworkEndpointMap(endpoints *apiv1.Endpoints) (map[string]sets.String, error) { zoneNetworkEndpointMap := map[string]sets.String{} - targetPort, _ := strconv.Atoi(s.targetPort) + targetPort, _ := strconv.Atoi(s.TargetPort) for _, subset := range endpoints.Subsets { matchPort := "" - // service spec allows target port to be a named port. - // support both explicit port and named port. + // service spec allows target Port to be a named Port. + // support both explicit Port and named Port. for _, port := range subset.Ports { if targetPort != 0 { - // targetPort is int + // TargetPort is int if int(port.Port) == targetPort { - matchPort = s.targetPort + matchPort = s.TargetPort } } else { - // targetPort is string - if port.Name == s.targetPort { + // TargetPort is string + if port.Name == s.TargetPort { matchPort = strconv.Itoa(int(port.Port)) } } @@ -318,7 +321,7 @@ func (s *syncer) toZoneNetworkEndpointMap(endpoints *apiv1.Endpoints) (map[strin } } - // subset does not contain target port + // subset does not contain target Port if len(matchPort) == 0 { continue } @@ -337,7 +340,7 @@ func (s *syncer) toZoneNetworkEndpointMap(endpoints *apiv1.Endpoints) (map[strin } // retrieveExistingZoneNetworkEndpointMap lists existing network endpoints in the neg and return the zone and endpoints map -func (s *syncer) retrieveExistingZoneNetworkEndpointMap() (map[string]sets.String, error) { +func (s *batchSyncer) retrieveExistingZoneNetworkEndpointMap() (map[string]sets.String, error) { zones, err := s.zoneGetter.ListZones() if err != nil { return nil, err @@ -375,7 +378,7 @@ func (e *ErrorList) List() []error { } // syncNetworkEndpoints adds and removes endpoints for negs -func (s *syncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]sets.String) error { +func (s *batchSyncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]sets.String) error { var wg sync.WaitGroup errList := &ErrorList{} @@ -411,7 +414,7 @@ func (s *syncer) syncNetworkEndpoints(addEndpoints, removeEndpoints map[string]s } // translate a endpoints set to a batch of network endpoints object -func (s *syncer) toNetworkEndpointBatch(endpoints sets.String) ([]*compute.NetworkEndpoint, error) { +func (s *batchSyncer) toNetworkEndpointBatch(endpoints sets.String) ([]*compute.NetworkEndpoint, error) { var ok bool list := make([]string, int(math.Min(float64(endpoints.Len()), float64(MAX_NETWORK_ENDPOINTS_PER_BATCH)))) for i := range list { @@ -436,25 +439,25 @@ func (s *syncer) toNetworkEndpointBatch(endpoints sets.String) ([]*compute.Netwo return networkEndpointList, nil } -func (s *syncer) attachNetworkEndpoints(wg *sync.WaitGroup, zone string, networkEndpoints []*compute.NetworkEndpoint, errList *ErrorList) { +func (s *batchSyncer) attachNetworkEndpoints(wg *sync.WaitGroup, zone string, networkEndpoints []*compute.NetworkEndpoint, errList *ErrorList) { wg.Add(1) glog.V(2).Infof("Attaching %d endpoint(s) for %s in NEG %s at %s.", len(networkEndpoints), s.formattedName(), s.negName, zone) go s.operationInternal(wg, zone, networkEndpoints, errList, s.cloud.AttachNetworkEndpoints, "Attach") } -func (s *syncer) detachNetworkEndpoints(wg *sync.WaitGroup, zone string, networkEndpoints []*compute.NetworkEndpoint, errList *ErrorList) { +func (s *batchSyncer) detachNetworkEndpoints(wg *sync.WaitGroup, zone string, networkEndpoints []*compute.NetworkEndpoint, errList *ErrorList) { wg.Add(1) glog.V(2).Infof("Detaching %d endpoint(s) for %s in NEG %s at %s.", len(networkEndpoints), s.formattedName(), s.negName, zone) go s.operationInternal(wg, zone, networkEndpoints, errList, s.cloud.DetachNetworkEndpoints, "Detach") } -func (s *syncer) operationInternal(wg *sync.WaitGroup, zone string, networkEndpoints []*compute.NetworkEndpoint, errList *ErrorList, syncFunc func(name, zone string, endpoints []*compute.NetworkEndpoint) error, operationName string) { +func (s *batchSyncer) operationInternal(wg *sync.WaitGroup, zone string, networkEndpoints []*compute.NetworkEndpoint, errList *ErrorList, syncFunc func(name, zone string, endpoints []*compute.NetworkEndpoint) error, operationName string) { defer wg.Done() err := syncFunc(s.negName, zone, networkEndpoints) if err != nil { errList.Add(err) } - if svc := getService(s.serviceLister, s.namespace, s.name); svc != nil { + if svc := getService(s.serviceLister, s.Namespace, s.Name); svc != nil { if err == nil { s.recorder.Eventf(svc, apiv1.EventTypeNormal, operationName, "%s %d network endpoint(s) (NEG %q in zone %q)", operationName, len(networkEndpoints), s.negName, zone) } else { @@ -463,7 +466,7 @@ func (s *syncer) operationInternal(wg *sync.WaitGroup, zone string, networkEndpo } } -func (s *syncer) nextRetryDelay() time.Duration { +func (s *batchSyncer) nextRetryDelay() time.Duration { s.retryCount += 1 s.lastRetryDelay *= 2 if s.lastRetryDelay < minRetryDelay { @@ -474,13 +477,13 @@ func (s *syncer) nextRetryDelay() time.Duration { return s.lastRetryDelay } -func (s *syncer) resetRetryDelay() { +func (s *batchSyncer) resetRetryDelay() { s.retryCount = 0 s.lastRetryDelay = time.Duration(0) } -func (s *syncer) formattedName() string { - return fmt.Sprintf("%s/%s-%v/%s", s.namespace, s.name, s.port, s.targetPort) +func (s *batchSyncer) formattedName() string { + return fmt.Sprintf("%s/%s-%v/%s", s.Namespace, s.Name, s.Port, s.TargetPort) } // encodeEndpoint encodes ip and instance into a single string @@ -514,9 +517,9 @@ func calculateDifference(targetMap, currentMap map[string]sets.String) (map[stri return addSet, removeSet } -// getService retrieves service object from serviceLister based on the input namespace and name +// getService retrieves service object from serviceLister based on the input Namespace and Name func getService(serviceLister cache.Indexer, namespace, name string) *apiv1.Service { - service, exists, err := serviceLister.GetByKey(serviceKeyFunc(namespace, name)) + service, exists, err := serviceLister.GetByKey(utils.ServiceKeyFunc(namespace, name)) if exists && err == nil { return service.(*apiv1.Service) } diff --git a/pkg/neg/syncer_test.go b/pkg/neg/syncer/batch_test.go similarity index 71% rename from pkg/neg/syncer_test.go rename to pkg/neg/syncer/batch_test.go index 55db559663..e7fc2372ff 100644 --- a/pkg/neg/syncer_test.go +++ b/pkg/neg/syncer/batch_test.go @@ -1,4 +1,4 @@ -package neg +package syncer import ( "reflect" @@ -7,12 +7,15 @@ import ( apiv1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" backendconfigclient "k8s.io/ingress-gce/pkg/backendconfig/client/clientset/versioned/fake" "k8s.io/ingress-gce/pkg/context" + negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/ingress-gce/pkg/utils" ) @@ -20,13 +23,18 @@ const ( testNegName = "test-neg-name" testServiceNamespace = "test-ns" testServiceName = "test-name" - testNamedPort = "named-port" + testNamedPort = "named-Port" + clusterID = "clusterid" ) -func NewTestSyncer() *syncer { +var ( + defaultBackend = utils.ServicePortID{Service: types.NamespacedName{Name: "default-http-backend", Namespace: "kube-system"}, Port: intstr.FromString("http")} +) + +func NewTestSyncer() *batchSyncer { kubeClient := fake.NewSimpleClientset() backendConfigClient := backendconfigclient.NewSimpleClientset() - namer := utils.NewNamer(ClusterID, "") + namer := utils.NewNamer(clusterID, "") ctxConfig := context.ControllerContextConfig{ NEGEnabled: true, BackendConfigEnabled: false, @@ -35,18 +43,18 @@ func NewTestSyncer() *syncer { DefaultBackendSvcPortID: defaultBackend, } context := context.NewControllerContext(kubeClient, backendConfigClient, nil, namer, ctxConfig) - svcPort := servicePort{ - namespace: testServiceNamespace, - name: testServiceName, - port: 80, - targetPort: "80", + svcPort := NegSyncerKey{ + Namespace: testServiceNamespace, + Name: testServiceName, + Port: 80, + TargetPort: "80", } - return newSyncer(svcPort, + return NewBatchSyncer(svcPort, testNegName, record.NewFakeRecorder(100), - NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-newtork"), - NewFakeZoneGetter(), + negtypes.NewFakeNetworkEndpointGroupCloud("test-subnetwork", "test-newtork"), + negtypes.NewFakeZoneGetter(), context.ServiceInformer.GetIndexer(), context.EndpointInformer.GetIndexer()) } @@ -104,7 +112,7 @@ func TestEnsureNetworkEndpointGroups(t *testing.T) { } ret, _ := syncer.cloud.AggregatedListNetworkEndpointGroup() - expectZones := []string{TestZone1, TestZone2} + expectZones := []string{negtypes.TestZone1, negtypes.TestZone2} for _, zone := range expectZones { negs, ok := ret[zone] if !ok { @@ -131,21 +139,21 @@ func TestToZoneNetworkEndpointMap(t *testing.T) { { targetPort: "80", expect: map[string]sets.String{ - TestZone1: sets.NewString("10.100.1.1||instance1||80", "10.100.1.2||instance1||80", "10.100.2.1||instance2||80"), - TestZone2: sets.NewString("10.100.3.1||instance3||80"), + negtypes.TestZone1: sets.NewString("10.100.1.1||instance1||80", "10.100.1.2||instance1||80", "10.100.2.1||instance2||80"), + negtypes.TestZone2: sets.NewString("10.100.3.1||instance3||80"), }, }, { targetPort: testNamedPort, expect: map[string]sets.String{ - TestZone1: sets.NewString("10.100.2.2||instance2||81"), - TestZone2: sets.NewString("10.100.4.1||instance4||81", "10.100.3.2||instance3||8081", "10.100.4.2||instance4||8081"), + negtypes.TestZone1: sets.NewString("10.100.2.2||instance2||81"), + negtypes.TestZone2: sets.NewString("10.100.4.1||instance4||81", "10.100.3.2||instance3||8081", "10.100.4.2||instance4||8081"), }, }, } for _, tc := range testCases { - syncer.targetPort = tc.targetPort + syncer.TargetPort = tc.targetPort res, _ := syncer.toZoneNetworkEndpointMap(getDefaultEndpoint()) if !reflect.DeepEqual(res, tc.expect) { @@ -176,10 +184,10 @@ func TestCalculateDifference(t *testing.T) { // unchanged { targetSet: map[string]sets.String{ - TestZone1: sets.NewString("a", "b", "c"), + negtypes.TestZone1: sets.NewString("a", "b", "c"), }, currentSet: map[string]sets.String{ - TestZone1: sets.NewString("a", "b", "c"), + negtypes.TestZone1: sets.NewString("a", "b", "c"), }, addSet: map[string]sets.String{}, removeSet: map[string]sets.String{}, @@ -194,24 +202,24 @@ func TestCalculateDifference(t *testing.T) { // add in one zone { targetSet: map[string]sets.String{ - TestZone1: sets.NewString("a", "b", "c"), + negtypes.TestZone1: sets.NewString("a", "b", "c"), }, currentSet: map[string]sets.String{}, addSet: map[string]sets.String{ - TestZone1: sets.NewString("a", "b", "c"), + negtypes.TestZone1: sets.NewString("a", "b", "c"), }, removeSet: map[string]sets.String{}, }, // add in 2 zones { targetSet: map[string]sets.String{ - TestZone1: sets.NewString("a", "b", "c"), - TestZone2: sets.NewString("e", "f", "g"), + negtypes.TestZone1: sets.NewString("a", "b", "c"), + negtypes.TestZone2: sets.NewString("e", "f", "g"), }, currentSet: map[string]sets.String{}, addSet: map[string]sets.String{ - TestZone1: sets.NewString("a", "b", "c"), - TestZone2: sets.NewString("e", "f", "g"), + negtypes.TestZone1: sets.NewString("a", "b", "c"), + negtypes.TestZone2: sets.NewString("e", "f", "g"), }, removeSet: map[string]sets.String{}, }, @@ -219,58 +227,58 @@ func TestCalculateDifference(t *testing.T) { { targetSet: map[string]sets.String{}, currentSet: map[string]sets.String{ - TestZone1: sets.NewString("a", "b", "c"), + negtypes.TestZone1: sets.NewString("a", "b", "c"), }, addSet: map[string]sets.String{}, removeSet: map[string]sets.String{ - TestZone1: sets.NewString("a", "b", "c"), + negtypes.TestZone1: sets.NewString("a", "b", "c"), }, }, // remove in 2 zones { targetSet: map[string]sets.String{}, currentSet: map[string]sets.String{ - TestZone1: sets.NewString("a", "b", "c"), - TestZone2: sets.NewString("e", "f", "g"), + negtypes.TestZone1: sets.NewString("a", "b", "c"), + negtypes.TestZone2: sets.NewString("e", "f", "g"), }, addSet: map[string]sets.String{}, removeSet: map[string]sets.String{ - TestZone1: sets.NewString("a", "b", "c"), - TestZone2: sets.NewString("e", "f", "g"), + negtypes.TestZone1: sets.NewString("a", "b", "c"), + negtypes.TestZone2: sets.NewString("e", "f", "g"), }, }, // add and delete in one zone { targetSet: map[string]sets.String{ - TestZone1: sets.NewString("a", "b", "c"), + negtypes.TestZone1: sets.NewString("a", "b", "c"), }, currentSet: map[string]sets.String{ - TestZone1: sets.NewString("b", "c", "d"), + negtypes.TestZone1: sets.NewString("b", "c", "d"), }, addSet: map[string]sets.String{ - TestZone1: sets.NewString("a"), + negtypes.TestZone1: sets.NewString("a"), }, removeSet: map[string]sets.String{ - TestZone1: sets.NewString("d"), + negtypes.TestZone1: sets.NewString("d"), }, }, // add and delete in 2 zones { targetSet: map[string]sets.String{ - TestZone1: sets.NewString("a", "b", "c"), - TestZone2: sets.NewString("a", "b", "c"), + negtypes.TestZone1: sets.NewString("a", "b", "c"), + negtypes.TestZone2: sets.NewString("a", "b", "c"), }, currentSet: map[string]sets.String{ - TestZone1: sets.NewString("b", "c", "d"), - TestZone2: sets.NewString("b", "c", "d"), + negtypes.TestZone1: sets.NewString("b", "c", "d"), + negtypes.TestZone2: sets.NewString("b", "c", "d"), }, addSet: map[string]sets.String{ - TestZone1: sets.NewString("a"), - TestZone2: sets.NewString("a"), + negtypes.TestZone1: sets.NewString("a"), + negtypes.TestZone2: sets.NewString("a"), }, removeSet: map[string]sets.String{ - TestZone1: sets.NewString("d"), - TestZone2: sets.NewString("d"), + negtypes.TestZone1: sets.NewString("d"), + negtypes.TestZone2: sets.NewString("d"), }, }, } @@ -301,35 +309,35 @@ func TestSyncNetworkEndpoints(t *testing.T) { }{ { expectSet: map[string]sets.String{ - TestZone1: sets.NewString("10.100.1.1||instance1||80", "10.100.2.1||instance2||80"), - TestZone2: sets.NewString("10.100.3.1||instance3||80", "10.100.4.1||instance4||80"), + negtypes.TestZone1: sets.NewString("10.100.1.1||instance1||80", "10.100.2.1||instance2||80"), + negtypes.TestZone2: sets.NewString("10.100.3.1||instance3||80", "10.100.4.1||instance4||80"), }, addSet: map[string]sets.String{ - TestZone1: sets.NewString("10.100.1.1||instance1||80", "10.100.2.1||instance2||80"), - TestZone2: sets.NewString("10.100.3.1||instance3||80", "10.100.4.1||instance4||80"), + negtypes.TestZone1: sets.NewString("10.100.1.1||instance1||80", "10.100.2.1||instance2||80"), + negtypes.TestZone2: sets.NewString("10.100.3.1||instance3||80", "10.100.4.1||instance4||80"), }, removeSet: map[string]sets.String{}, }, { expectSet: map[string]sets.String{ - TestZone1: sets.NewString("10.100.1.2||instance1||80"), - TestZone2: sets.NewString(), + negtypes.TestZone1: sets.NewString("10.100.1.2||instance1||80"), + negtypes.TestZone2: sets.NewString(), }, addSet: map[string]sets.String{ - TestZone1: sets.NewString("10.100.1.2||instance1||80"), + negtypes.TestZone1: sets.NewString("10.100.1.2||instance1||80"), }, removeSet: map[string]sets.String{ - TestZone1: sets.NewString("10.100.1.1||instance1||80", "10.100.2.1||instance2||80"), - TestZone2: sets.NewString("10.100.3.1||instance3||80", "10.100.4.1||instance4||80"), + negtypes.TestZone1: sets.NewString("10.100.1.1||instance1||80", "10.100.2.1||instance2||80"), + negtypes.TestZone2: sets.NewString("10.100.3.1||instance3||80", "10.100.4.1||instance4||80"), }, }, { expectSet: map[string]sets.String{ - TestZone1: sets.NewString("10.100.1.2||instance1||80"), - TestZone2: sets.NewString("10.100.3.2||instance3||80"), + negtypes.TestZone1: sets.NewString("10.100.1.2||instance1||80"), + negtypes.TestZone2: sets.NewString("10.100.3.2||instance3||80"), }, addSet: map[string]sets.String{ - TestZone2: sets.NewString("10.100.3.2||instance3||80"), + negtypes.TestZone2: sets.NewString("10.100.3.2||instance3||80"), }, removeSet: map[string]sets.String{}, }, @@ -343,7 +351,7 @@ func TestSyncNetworkEndpoints(t *testing.T) { } } -func examineNetworkEndpoints(expectSet map[string]sets.String, syncer *syncer, t *testing.T) { +func examineNetworkEndpoints(expectSet map[string]sets.String, syncer *batchSyncer, t *testing.T) { for zone, endpoints := range expectSet { expectEndpoints, err := syncer.toNetworkEndpointBatch(endpoints) if err != nil { @@ -372,10 +380,10 @@ func examineNetworkEndpoints(expectSet map[string]sets.String, syncer *syncer, t } func getDefaultEndpoint() *apiv1.Endpoints { - instance1 := TestInstance1 - instance2 := TestInstance2 - instance3 := TestInstance3 - instance4 := TestInstance4 + instance1 := negtypes.TestInstance1 + instance2 := negtypes.TestInstance2 + instance3 := negtypes.TestInstance3 + instance4 := negtypes.TestInstance4 return &apiv1.Endpoints{ ObjectMeta: metav1.ObjectMeta{ Name: testServiceName, diff --git a/pkg/neg/fakes.go b/pkg/neg/types/fakes.go similarity index 99% rename from pkg/neg/fakes.go rename to pkg/neg/types/fakes.go index 883385b36e..e11262d4d6 100644 --- a/pkg/neg/fakes.go +++ b/pkg/neg/types/fakes.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package neg +package types import ( "fmt" diff --git a/pkg/neg/interfaces.go b/pkg/neg/types/interfaces.go similarity index 81% rename from pkg/neg/interfaces.go rename to pkg/neg/types/interfaces.go index 3dde2cb44e..d063fb0113 100644 --- a/pkg/neg/interfaces.go +++ b/pkg/neg/types/interfaces.go @@ -1,5 +1,5 @@ /* -Copyright 2017 The Kubernetes Authors. +Copyright 2018 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -14,14 +14,19 @@ See the License for the specific language governing permissions and limitations under the License. */ -package neg +package types import ( computebeta "google.golang.org/api/compute/v0.beta" - "k8s.io/ingress-gce/pkg/neg/types" ) -// MetworkEndpointGroupCloud is an interface for managing gce network endpoint group. +// ZoneGetter is an interface for retrieve zone related information +type ZoneGetter interface { + ListZones() ([]string, error) + GetZoneForNode(name string) (string, error) +} + +// NetworkEndpointGroupCloud is an interface for managing gce network endpoint group. type NetworkEndpointGroupCloud interface { GetNetworkEndpointGroup(name string, zone string) (*computebeta.NetworkEndpointGroup, error) ListNetworkEndpointGroup(zone string) ([]*computebeta.NetworkEndpointGroup, error) @@ -35,20 +40,14 @@ type NetworkEndpointGroupCloud interface { SubnetworkURL() string } -// networkEndpointGroupNamer is an interface for generating network endpoint group name. -type networkEndpointGroupNamer interface { +// NetworkEndpointGroupNamer is an interface for generating network endpoint group name. +type NetworkEndpointGroupNamer interface { NEG(namespace, name string, port int32) string IsNEG(name string) bool } -// zoneGetter is an interface for retrieve zone related information -type zoneGetter interface { - ListZones() ([]string, error) - GetZoneForNode(name string) (string, error) -} - -// negSyncer is an interface to interact with syncer -type negSyncer interface { +// NegSyncer is an interface to interact with syncer +type NegSyncer interface { // Start starts the syncer. This call is synchronous. It will return after syncer is started. Start() error // Stop stops the syncer. This call is asynchronous. It will not block until syncer is stopped. @@ -61,11 +60,11 @@ type negSyncer interface { IsShuttingDown() bool } -// negSyncerManager is an interface for controllers to manage syncer -type negSyncerManager interface { +// NegSyncerManager is an interface for controllers to manage syncer +type NegSyncerManager interface { // EnsureSyncer ensures corresponding syncers are started and stops any unnecessary syncer // portMap is a map of ServicePort Port to TargetPort - EnsureSyncers(namespace, name string, portMap types.PortNameMap) error + EnsureSyncers(namespace, name string, portMap PortNameMap) error // StopSyncer stops all syncers related to the service. This call is asynchronous. It will not wait for all syncers to stop. StopSyncer(namespace, name string) // Sync signals all syncers related to the service to sync. This call is asynchronous. diff --git a/pkg/neg/types/types.go b/pkg/neg/types/types.go index a80d9f8fbd..39279d76c8 100644 --- a/pkg/neg/types/types.go +++ b/pkg/neg/types/types.go @@ -1,3 +1,19 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package types // PortNameMap is a map of ServicePort:TargetPort. diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index dc87a1764d..ef92f9a001 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -438,3 +438,7 @@ func TraverseIngressBackends(ing *extensions.Ingress, process func(id ServicePor } return } + +func ServiceKeyFunc(namespace, name string) string { + return fmt.Sprintf("%s/%s", namespace, name) +}