diff --git a/pkg/neg/syncers/endpoints_calculator.go b/pkg/neg/syncers/endpoints_calculator.go index e25861f996..07cf392476 100644 --- a/pkg/neg/syncers/endpoints_calculator.go +++ b/pkg/neg/syncers/endpoints_calculator.go @@ -127,7 +127,7 @@ func (l *LocalL4ILBEndpointsCalculator) CalculateEndpointsDegradedMode(eds []typ return subsetMap, podMap, err } -func (l *LocalL4ILBEndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, dupCount int) error { +func (l *LocalL4ILBEndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, endpointsExcludedInCalculation int) error { // this should be a no-op for now return nil } @@ -197,7 +197,7 @@ func (l *ClusterL4ILBEndpointsCalculator) CalculateEndpointsDegradedMode(eps []t return subsetMap, podMap, err } -func (l *ClusterL4ILBEndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, dupCount int) error { +func (l *ClusterL4ILBEndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, endpointsExcludedInCalculation int) error { // this should be a no-op for now return nil } @@ -244,7 +244,7 @@ func (l *L7EndpointsCalculator) CalculateEndpoints(eds []types.EndpointsData, _ if err == nil { // If current calculation ends up in error, we trigger and emit metrics in degraded mode. l.syncMetricsCollector.UpdateSyncerEPMetrics(l.syncerKey, result.EPCount, result.EPSCount) } - return result.NetworkEndpointSet, result.EndpointPodMap, result.EPCount[negtypes.Duplicate], err + return result.NetworkEndpointSet, result.EndpointPodMap, result.EPCount[negtypes.Duplicate] + result.EPCount[negtypes.NodeInNonDefaultSubnet], err } // CalculateEndpoints determines the endpoints in the NEGs based on the current service endpoints and the current NEGs. @@ -266,12 +266,12 @@ func nodeMapToString(nodeMap map[string][]*v1.Node) string { // // For L7 Endpoint Calculator, it returns error if one of the two checks fails: // 1. The endpoint count from endpointData doesn't equal to the one from endpointPodMap: -// endpiontPodMap removes the duplicated endpoints, and dupCount stores the number of duplicated it removed +// endpiontPodMap removes the duplicated endpoints, and endpointsExcludedInCalculation stores the number of duplicated it removed // and we compare the endpoint counts with duplicates // 2. The endpoint count from endpointData or the one from endpointPodMap is 0 -func (l *L7EndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, dupCount int) error { +func (l *L7EndpointsCalculator) ValidateEndpoints(endpointData []types.EndpointsData, endpointPodMap types.EndpointPodMap, endpointsExcludedInCalculation int) error { // Endpoint count from EndpointPodMap - countFromPodMap := len(endpointPodMap) + dupCount + countFromPodMap := len(endpointPodMap) + endpointsExcludedInCalculation if countFromPodMap == 0 { l.logger.Info("Detected endpoint count from endpointPodMap going to zero", "endpointPodMap", endpointPodMap) return fmt.Errorf("%w: Detect endpoint count goes to zero", types.ErrEPCalculationCountZero) @@ -287,7 +287,7 @@ func (l *L7EndpointsCalculator) ValidateEndpoints(endpointData []types.Endpoints } if countFromEndpointData != countFromPodMap { - l.logger.Info("Detected error when comparing endpoint counts", "countFromEndpointData", countFromEndpointData, "countFromPodMap", countFromPodMap, "endpointData", endpointData, "endpointPodMap", endpointPodMap, "dupCount", dupCount) + l.logger.Info("Detected error when comparing endpoint counts", "countFromEndpointData", countFromEndpointData, "countFromPodMap", countFromPodMap, "endpointData", endpointData, "endpointPodMap", endpointPodMap, "endpointsExcludedInCalculation", endpointsExcludedInCalculation) return fmt.Errorf("%w: Detect endpoint mismatch, count from endpoint slice=%d, count after calculation=%d", types.ErrEPCountsDiffer, countFromEndpointData, countFromPodMap) } return nil diff --git a/pkg/neg/syncers/endpoints_calculator_test.go b/pkg/neg/syncers/endpoints_calculator_test.go index ecb9e1d21a..98771a08d6 100644 --- a/pkg/neg/syncers/endpoints_calculator_test.go +++ b/pkg/neg/syncers/endpoints_calculator_test.go @@ -301,6 +301,10 @@ func TestValidateEndpoints(t *testing.T) { instance1 := negtypes.TestInstance1 instance2 := negtypes.TestInstance2 duplicatePodName := "pod2-duplicate" + noPodCIDRInstance := negtypes.TestNoPodCIDRInstance + noPodCIDRPod := negtypes.TestNoPodCIDRPod + nonDefaultSubnetInstance := negtypes.TestNonDefaultSubnetInstance + nonDefaultSubnetPod := negtypes.TestNonDefaultSubnetPod svcPort := negtypes.NegSyncerKey{ Namespace: testServiceNamespace, Name: testServiceName, @@ -338,11 +342,16 @@ func TestValidateEndpoints(t *testing.T) { }, }, }) + nodeLister := testContext.NodeInformer.GetIndexer() serviceLister := testContext.ServiceInformer.GetIndexer() - zonegetter.PopulateFakeNodeInformer(testContext.NodeInformer, false) + zonegetter.PopulateFakeNodeInformer(testContext.NodeInformer, true) zoneGetter := zonegetter.NewFakeZoneGetter(testContext.NodeInformer, defaultTestSubnetURL, false) L7EndpointsCalculator := NewL7EndpointsCalculator(zoneGetter, podLister, nodeLister, serviceLister, svcPort, klog.TODO(), testContext.EnableDualStackNEG, metricscollector.FakeSyncerMetrics()) + + zoneGetterMSC := zonegetter.NewFakeZoneGetter(testContext.NodeInformer, defaultTestSubnetURL, true) + L7EndpointsCalculatorMSC := NewL7EndpointsCalculator(zoneGetterMSC, podLister, nodeLister, serviceLister, svcPort, klog.TODO(), testContext.EnableDualStackNEG, metricscollector.FakeSyncerMetrics()) + L7EndpointsCalculatorMSC.enableMultiSubnetCluster = true L4LocalEndpointCalculator := NewLocalL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, fmt.Sprintf("%s/%s", testServiceName, testServiceNamespace), klog.TODO(), &network.NetworkInfo{}) L4ClusterEndpointCalculator := NewClusterL4ILBEndpointsCalculator(listers.NewNodeLister(nodeLister), zoneGetter, fmt.Sprintf("%s/%s", testServiceName, testServiceNamespace), klog.TODO(), &network.NetworkInfo{}) @@ -390,6 +399,7 @@ func TestValidateEndpoints(t *testing.T) { testCases := []struct { desc string ec negtypes.NetworkEndpointsCalculator + ecMSC negtypes.NetworkEndpointsCalculator testEndpointSlices []*discovery.EndpointSlice currentMap map[string]negtypes.NetworkEndpointSet // Use mutation to inject error into that we cannot trigger currently. @@ -399,15 +409,16 @@ func TestValidateEndpoints(t *testing.T) { { desc: "ValidateEndpoints for L4 local endpoint calculator", // we are adding this test to make sure the test is updated when the functionality is added ec: L4LocalEndpointCalculator, + ecMSC: L4LocalEndpointCalculator, testEndpointSlices: nil, // for now it is a no-op mutation: noopMutation, currentMap: nil, expect: nil, }, { - desc: "ValidateEndpoints for L4 cluster endpoint calculator", // we are adding this test to make sure the test is updated when the functionality is added ec: L4ClusterEndpointCalculator, + ecMSC: L4LocalEndpointCalculator, testEndpointSlices: nil, // for now it is a no-op mutation: noopMutation, currentMap: nil, @@ -416,14 +427,16 @@ func TestValidateEndpoints(t *testing.T) { { desc: "ValidateEndpoints for L7 Endpoint Calculator. Endpoint counts equal, endpointData has no duplicated endpoints", ec: L7EndpointsCalculator, + ecMSC: L7EndpointsCalculatorMSC, testEndpointSlices: l7TestEPS, mutation: noopMutation, currentMap: nil, expect: nil, }, { - desc: "ValidateEndpoints for L7 Endpoint Calculator. Endpoint counts equal, endpointData has one duplicated endpoint", - ec: L7EndpointsCalculator, + desc: "ValidateEndpoints for L7 Endpoint Calculator. Endpoint counts equal, endpointData has one duplicated endpoint", + ec: L7EndpointsCalculator, + ecMSC: L7EndpointsCalculatorMSC, testEndpointSlices: []*discovery.EndpointSlice{ { ObjectMeta: metav1.ObjectMeta{ @@ -476,6 +489,7 @@ func TestValidateEndpoints(t *testing.T) { { desc: "ValidateEndpoints for L7 Endpoint Calculator. Endpoint counts not equal", ec: L7EndpointsCalculator, + ecMSC: L7EndpointsCalculatorMSC, testEndpointSlices: l7TestEPS, mutation: func(endpointData []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap) ([]negtypes.EndpointsData, negtypes.EndpointPodMap) { // Add one additional endpoint in endpointData @@ -488,6 +502,7 @@ func TestValidateEndpoints(t *testing.T) { { desc: "ValidateEndpoints for L7 Endpoint Calculator. EndpointData has zero endpoint", ec: L7EndpointsCalculator, + ecMSC: L7EndpointsCalculatorMSC, testEndpointSlices: l7TestEPS, mutation: func(endpointData []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap) ([]negtypes.EndpointsData, negtypes.EndpointPodMap) { for i := range endpointData { @@ -501,6 +516,7 @@ func TestValidateEndpoints(t *testing.T) { { desc: "ValidateEndpoints for L7 Endpoint Calculator. EndpointPodMap has zero endpoint", ec: L7EndpointsCalculator, + ecMSC: L7EndpointsCalculatorMSC, testEndpointSlices: l7TestEPS, mutation: func(endpointData []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap) ([]negtypes.EndpointsData, negtypes.EndpointPodMap) { endpointPodMap = negtypes.EndpointPodMap{} @@ -512,6 +528,7 @@ func TestValidateEndpoints(t *testing.T) { { desc: "ValidateEndpoints for L7 Endpoint Calculator. EndpointData and endpointPodMap both have zero endpoint", ec: L7EndpointsCalculator, + ecMSC: L7EndpointsCalculatorMSC, testEndpointSlices: l7TestEPS, mutation: func(endpointData []negtypes.EndpointsData, endpointPodMap negtypes.EndpointPodMap) ([]negtypes.EndpointsData, negtypes.EndpointPodMap) { for i := range endpointData { @@ -536,6 +553,209 @@ func TestValidateEndpoints(t *testing.T) { if got := tc.ec.ValidateEndpoints(endpointData, endpointPodMap, endpointsExcludedInCalculation); !errors.Is(got, tc.expect) { t.Errorf("ValidateEndpoints() = %v, expected %v", got, tc.expect) } + + // Run tests with multi-subnet cluster enabled. + endpointData = negtypes.EndpointsDataFromEndpointSlices(tc.testEndpointSlices) + _, endpointPodMap, endpointsExcludedInCalculation, err = tc.ecMSC.CalculateEndpoints(endpointData, tc.currentMap) + if err != nil { + t.Errorf("With multi-subnet cluster enabled, received error when calculating endpoint: %v", err) + } + endpointData, endpointPodMap = tc.mutation(endpointData, endpointPodMap) + if got := tc.ecMSC.ValidateEndpoints(endpointData, endpointPodMap, endpointsExcludedInCalculation); !errors.Is(got, tc.expect) { + t.Errorf("With multi-subnet cluster enabled, ValidateEndpoints() = %v, expected %v", got, tc.expect) + } + }) + } + + // Add noPodCIDRPod that corresponds to noPodCIDRInstance. + podLister.Add(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testServiceNamespace, + Name: noPodCIDRPod, + Labels: map[string]string{ + discovery.LabelServiceName: testServiceName, + discovery.LabelManagedBy: managedByEPSControllerValue, + }, + }, + Spec: v1.PodSpec{ + NodeName: noPodCIDRInstance, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: "10.101.3.1", + PodIPs: []v1.PodIP{ + {IP: "10.101.3.1"}, + }, + }, + }) + podLister.Add(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testServiceNamespace, + Name: nonDefaultSubnetPod, + Labels: map[string]string{ + discovery.LabelServiceName: testServiceName, + discovery.LabelManagedBy: managedByEPSControllerValue, + }, + }, + Spec: v1.PodSpec{ + NodeName: nonDefaultSubnetInstance, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: "10.200.1.1", + PodIPs: []v1.PodIP{ + {IP: "10.200.1.1"}, + }, + }, + }) + + podLister.Add(&v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: testServiceNamespace, + Name: nonDefaultSubnetPod, + Labels: map[string]string{ + discovery.LabelServiceName: testServiceName, + discovery.LabelManagedBy: managedByEPSControllerValue, + }, + }, + Spec: v1.PodSpec{ + NodeName: nonDefaultSubnetInstance, + }, + Status: v1.PodStatus{ + Phase: v1.PodRunning, + PodIP: "10.200.1.1", + PodIPs: []v1.PodIP{ + {IP: "10.200.1.1"}, + }, + }, + }) + + mscTestCases := []struct { + desc string + ecMSC negtypes.NetworkEndpointsCalculator + testEndpointSlices []*discovery.EndpointSlice + currentMap map[string]negtypes.NetworkEndpointSet + // Use mutation to inject error into that we cannot trigger currently. + mutation func([]negtypes.EndpointsData, negtypes.EndpointPodMap) ([]negtypes.EndpointsData, negtypes.EndpointPodMap) + expect error + }{ + { + desc: "ValidateEndpoints for L7 Endpoint Calculator. Endpoint counts equal, endpointData has an endpoint corresponds to node without PodCIDR", + ecMSC: L7EndpointsCalculator, + testEndpointSlices: []*discovery.EndpointSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: testServiceName, + Namespace: testServiceNamespace, + Labels: map[string]string{ + discovery.LabelServiceName: testServiceName, + }, + }, + AddressType: "IPv4", + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.100.1.1"}, + NodeName: &instance1, + TargetRef: &v1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod1", + }, + }, + { + Addresses: []string{"10.100.1.2"}, + NodeName: &instance1, + TargetRef: &v1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod2", + }, + }, + { + Addresses: []string{"10.101.1.3"}, + NodeName: &noPodCIDRInstance, + TargetRef: &v1.ObjectReference{ + Namespace: testServiceNamespace, + Name: noPodCIDRPod, + }, + }, + }, + Ports: []discovery.EndpointPort{ + { + Name: &emptyNamedPort, + Port: &port80, + Protocol: &protocolTCP, + }, + }, + }, + }, + mutation: noopMutation, + currentMap: nil, + expect: nil, + }, + { + desc: "ValidateEndpoints for L7 Endpoint Calculator. Endpoint counts equal, endpointData has non-default subnet endpoint", + ecMSC: L7EndpointsCalculatorMSC, + testEndpointSlices: []*discovery.EndpointSlice{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: testServiceName, + Namespace: testServiceNamespace, + Labels: map[string]string{ + discovery.LabelServiceName: testServiceName, + }, + }, + AddressType: "IPv4", + Endpoints: []discovery.Endpoint{ + { + Addresses: []string{"10.100.1.1"}, + NodeName: &instance1, + TargetRef: &v1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod1", + }, + }, + { + Addresses: []string{"10.100.1.2"}, + NodeName: &instance1, + TargetRef: &v1.ObjectReference{ + Namespace: testServiceNamespace, + Name: "pod2", + }, + }, + { + Addresses: []string{"10.200.1.1"}, + NodeName: &nonDefaultSubnetInstance, + TargetRef: &v1.ObjectReference{ + Namespace: testServiceNamespace, + Name: nonDefaultSubnetPod, + }, + }, + }, + Ports: []discovery.EndpointPort{ + { + Name: &emptyNamedPort, + Port: &port80, + Protocol: &protocolTCP, + }, + }, + }, + }, + mutation: noopMutation, + currentMap: nil, + expect: nil, + }, + } + + for _, tc := range mscTestCases { + t.Run(tc.desc, func(t *testing.T) { + endpointData := negtypes.EndpointsDataFromEndpointSlices(tc.testEndpointSlices) + _, endpointPodMap, endpointsExcludedInCalculation, err := tc.ecMSC.CalculateEndpoints(endpointData, tc.currentMap) + if err != nil { + t.Errorf("With multi-subnet cluster enabled, received error when calculating endpoint: %v", err) + } + endpointData, endpointPodMap = tc.mutation(endpointData, endpointPodMap) + if got := tc.ecMSC.ValidateEndpoints(endpointData, endpointPodMap, endpointsExcludedInCalculation); !errors.Is(got, tc.expect) { + t.Errorf("With multi-subnet cluster enabled, ValidateEndpoints() = %v, expected %v", got, tc.expect) + } }) } } diff --git a/pkg/neg/syncers/transaction.go b/pkg/neg/syncers/transaction.go index 101a1fec15..31737683f6 100644 --- a/pkg/neg/syncers/transaction.go +++ b/pkg/neg/syncers/transaction.go @@ -379,12 +379,12 @@ func (s *transactionSyncer) getEndpointsCalculation( endpointsData []negtypes.EndpointsData, currentMap map[string]negtypes.NetworkEndpointSet, ) (map[string]negtypes.NetworkEndpointSet, negtypes.EndpointPodMap, error) { - targetMap, endpointPodMap, dupCount, err := s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap) + targetMap, endpointPodMap, endpointsExcludedInCalculation, err := s.endpointsCalculator.CalculateEndpoints(endpointsData, currentMap) if err != nil { return nil, nil, err } if s.enableDegradedMode { - err = s.endpointsCalculator.ValidateEndpoints(endpointsData, endpointPodMap, dupCount) + err = s.endpointsCalculator.ValidateEndpoints(endpointsData, endpointPodMap, endpointsExcludedInCalculation) if err != nil { return nil, nil, err } diff --git a/pkg/neg/syncers/utils_test.go b/pkg/neg/syncers/utils_test.go index 4dedf2265a..4efd46c9cf 100644 --- a/pkg/neg/syncers/utils_test.go +++ b/pkg/neg/syncers/utils_test.go @@ -2325,7 +2325,7 @@ func TestValidateEndpointFields(t *testing.T) { } } -func TestValidateEndpointFieldsWithMultipleSubnets(t *testing.T) { +func TestValidateEndpointFieldsMultipleSubnets(t *testing.T) { t.Parallel() emptyNamedPort := "" diff --git a/pkg/neg/types/interfaces.go b/pkg/neg/types/interfaces.go index 89bee0ce8e..36be81e4eb 100644 --- a/pkg/neg/types/interfaces.go +++ b/pkg/neg/types/interfaces.go @@ -85,5 +85,5 @@ type NetworkEndpointsCalculator interface { // Mode indicates the mode that the EndpointsCalculator is operating in. Mode() EndpointsCalculatorMode // ValidateEndpoints validates the NEG endpoint information is correct - ValidateEndpoints(endpointData []EndpointsData, endpointPodMap EndpointPodMap, dupCount int) error + ValidateEndpoints(endpointData []EndpointsData, endpointPodMap EndpointPodMap, endpointsExcludedInCalculation int) error }