diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index 94438868ab..5a9cde37de 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -99,7 +99,7 @@ func (manager *syncerManager) EnsureSyncers(namespace, name string, newPorts neg // Service/Ingress config changes can cause readinessGate to be turn on or off for the same service port. // By removing the duplicate ports in removes and adds, this prevents disruption of NEG syncer due to the config changes // Hence, Existing NEG syncer for the service port will always work - filterPort(adds, removes) + removeCommonPorts(adds, removes) manager.svcPortMap[key] = newPorts klog.V(3).Infof("EnsureSyncer %v/%v: syncing %v ports, removing %v ports, adding %v ports", namespace, name, newPorts, removes, adds) @@ -342,20 +342,24 @@ func getServiceKey(namespace, name string) serviceKey { } } -// filterPort removes duplicate ports in p1 and p2 if the corresponding port info has the same target port and neg name. +// removeCommonPorts removes duplicate ports in p1 and p2 if the corresponding port info has the same target port and neg name. // This function effectively removes duplicate port with different readiness gate flag if the rest of the field in port info is the same. -func filterPort(p1, p2 negtypes.PortInfoMap) { +func removeCommonPorts(p1, p2 negtypes.PortInfoMap) { for port, portInfo1 := range p1 { - if portInfo2, ok := p2[port]; ok { - if portInfo1.TargetPort != portInfo2.TargetPort { - continue - } - if portInfo1.NegName != portInfo2.NegName { - continue - } + portInfo2, ok := p2[port] + if !ok { + continue + } - delete(p1, port) - delete(p2, port) + if portInfo1.TargetPort != portInfo2.TargetPort { + continue + } + if portInfo1.NegName != portInfo2.NegName { + continue } + + delete(p1, port) + delete(p2, port) + } } diff --git a/pkg/neg/manager_test.go b/pkg/neg/manager_test.go index 6336fd2e7f..57de583a9f 100644 --- a/pkg/neg/manager_test.go +++ b/pkg/neg/manager_test.go @@ -36,6 +36,7 @@ import ( negtypes "k8s.io/ingress-gce/pkg/neg/types" "k8s.io/ingress-gce/pkg/utils" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce" + "reflect" ) const ( @@ -533,6 +534,54 @@ func TestReadinessGateEnabled(t *testing.T) { } } +func TestFilterCommonPorts(t *testing.T) { + t.Parallel() + namer := utils.NewNamer(ClusterID, "") + + for _, tc := range []struct { + desc string + p1 negtypes.PortInfoMap + p2 negtypes.PortInfoMap + expectP1 negtypes.PortInfoMap + expectP2 negtypes.PortInfoMap + }{ + { + desc: "empty input 1", + p1: negtypes.PortInfoMap{}, + p2: negtypes.PortInfoMap{}, + expectP1: negtypes.PortInfoMap{}, + expectP2: negtypes.PortInfoMap{}, + }, + { + desc: "empty input 2", + p1: negtypes.NewPortInfoMap(namespace1, name1, types.SvcPortMap{port1: targetPort1, port2: targetPort2}, namer, false), + p2: negtypes.PortInfoMap{}, + expectP1: negtypes.NewPortInfoMap(namespace1, name1, types.SvcPortMap{port1: targetPort1, port2: targetPort2}, namer, false), + expectP2: negtypes.PortInfoMap{}, + }, + { + desc: "empty input 3", + p1: negtypes.PortInfoMap{}, + p2: negtypes.NewPortInfoMap(namespace1, name1, types.SvcPortMap{port1: targetPort1, port2: targetPort2}, namer, true), + expectP1: negtypes.PortInfoMap{}, + expectP2: negtypes.NewPortInfoMap(namespace1, name1, types.SvcPortMap{port1: targetPort1, port2: targetPort2}, namer, true), + }, + } { + t.Run(tc.desc, func(t *testing.T) { + removeCommonPorts(tc.p1, tc.p2) + + if !reflect.DeepEqual(tc.p1, tc.expectP1) { + t.Errorf("Expect p1 to be %v, but got %v", tc.expectP1, tc.p1) + } + + if !reflect.DeepEqual(tc.p2, tc.expectP2) { + t.Errorf("Expect p2 to be %v, but got %v", tc.expectP2, tc.p2) + } + }) + + } +} + // populateSyncerManager for testing func populateSyncerManager(manager *syncerManager, kubeClient kubernetes.Interface) { namer := manager.namer diff --git a/pkg/neg/readiness/poller_test.go b/pkg/neg/readiness/poller_test.go index 33568dfb56..ff3ce4069a 100644 --- a/pkg/neg/readiness/poller_test.go +++ b/pkg/neg/readiness/poller_test.go @@ -17,8 +17,13 @@ limitations under the License. package readiness import ( + "github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/meta" + "google.golang.org/api/compute/v1" + "k8s.io/apimachinery/pkg/types" negtypes "k8s.io/ingress-gce/pkg/neg/types" + "k8s.io/ingress-gce/pkg/utils" "net" + "strconv" "testing" ) @@ -28,6 +33,8 @@ func newFakePoller() *poller { } func TestPollerEndpointRegistrationAndScanForWork(t *testing.T) { + t.Parallel() + poller := newFakePoller() podLister := poller.podLister fakeLookup := poller.lookup.(*fakeLookUp) @@ -296,3 +303,96 @@ func TestPollerEndpointRegistrationAndScanForWork(t *testing.T) { } } } + +func TestPoll(t *testing.T) { + t.Parallel() + + poller := newFakePoller() + negCloud := poller.negCloud + namer := utils.NewNamer("clusteruid", "") + + ns := "ns" + podName := "pod1" + negName := namer.NEG(ns, "svc", int32(80)) + zone := "us-central1-b" + key := negMeta{ + SyncerKey: negtypes.NegSyncerKey{}, + Name: negName, + Zone: zone, + } + ip := "10.1.2.3" + port := int64(80) + instance := "k8s-node-xxxxxx" + + // mark polling to true + poller.pollMap[key] = &pollTarget{ + endpointMap: negtypes.EndpointPodMap{ + negtypes.NetworkEndpoint{IP: ip, Port: strconv.FormatInt(port, 10), Node: instance}: types.NamespacedName{Namespace: ns, Name: podName}, + }, + polling: true, + } + + retry, err := poller.Poll(key) + if err != nil { + t.Errorf("Does not expect err, but got %v", err) + } + if retry != true { + t.Errorf("Expect retry = true, but got %v", retry) + } + + // unmark polling + poller.pollMap[key].polling = false + retry, err = poller.Poll(key) + // expect NEG not exist error + if err == nil { + t.Errorf("Expect err, but got %v", err) + } + if retry != true { + t.Errorf("Expect retry = true, but got %v", retry) + } + + // create NEG, but with no endpoint + negCloud.CreateNetworkEndpointGroup(&compute.NetworkEndpointGroup{Name: negName, Zone: zone}, zone) + retry, err = poller.Poll(key) + if err != nil { + t.Errorf("Does not expect err, but got %v", err) + } + if retry != true { + t.Errorf("Expect retry = true, but got %v", retry) + } + + // add NE to the NEG, but NE not healthy + ne := &compute.NetworkEndpoint{ + IpAddress: ip, + Port: port, + Instance: instance, + } + negCloud.AttachNetworkEndpoints(negName, zone, []*compute.NetworkEndpoint{ne}) + retry, err = poller.Poll(key) + if err != nil { + t.Errorf("Does not expect err, but got %v", err) + } + if retry != true { + t.Errorf("Expect retry = true, but got %v", retry) + } + + // add NE with healthy status + negtypes.GetNetworkEndpointStore(negCloud).AddNetworkEndpointHealthStatus(*meta.ZonalKey(negName, zone), negtypes.NetworkEndpointEntry{ + NetworkEndpoint: ne, + Healths: []*compute.HealthStatusForNetworkEndpoint{ + { + BackendService: &compute.BackendServiceReference{ + BackendService: negName, + }, + HealthState: healthyState, + }, + }, + }) + retry, err = poller.Poll(key) + if err != nil { + t.Errorf("Does not expect err, but got %v", err) + } + if retry != false { + t.Errorf("Expect retry = false, but got %v", retry) + } +} diff --git a/pkg/neg/types/mock.go b/pkg/neg/types/mock.go index 3b72ce4d1a..d9eb670eba 100644 --- a/pkg/neg/types/mock.go +++ b/pkg/neg/types/mock.go @@ -36,6 +36,23 @@ type NetworkEndpointEntry struct { type NetworkEndpointStore map[meta.Key][]NetworkEndpointEntry +func (s NetworkEndpointStore) AddNetworkEndpointHealthStatus(key meta.Key, entry NetworkEndpointEntry) { + v, ok := s[key] + if !ok { + v = []NetworkEndpointEntry{} + } + v = append(v, entry) + s[key] = v +} + +// GetNetworkEndpointStore is a helper function to access the NetworkEndpointStore of the mock NEG cloud +func GetNetworkEndpointStore(negCloud NetworkEndpointGroupCloud) NetworkEndpointStore { + adapter := negCloud.(*cloudProviderAdapter) + mockedCloud := adapter.c.(*cloud.MockGCE) + ret := mockedCloud.MockNetworkEndpointGroups.X.(NetworkEndpointStore) + return ret +} + func MockNetworkEndpointAPIs(fakeGCE *gce.Cloud) { m := (fakeGCE.Compute().(*cloud.MockGCE)) m.MockNetworkEndpointGroups.X = NetworkEndpointStore{}