Skip to content

Commit

Permalink
Fix NodePort/LoadBalancer issue when proxyAll is enabled (antrea-io#3295
Browse files Browse the repository at this point in the history
)

When proxyAll is enabled, create a NodePort/LoadBalancer Service whose
externalTrafficPolicy is Cluster, then only an OVS group with all
Endpoints will be installed. If change externalTrafficPolicy of the
Service from Cluster to Local, an OVS group with only local Endpoints
should be also installed since externalTrafficPolicy is Local, but it
is not. This patch fixes the issue that OVS group with only local
Endpoints is not installed when externalTrafficPolicy of Service is
changed from Cluster to Local.

Signed-off-by: Hongliang Liu <[email protected]>
  • Loading branch information
hongliangl authored Feb 15, 2022
1 parent 397750d commit ff87705
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 28 deletions.
41 changes: 27 additions & 14 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,15 @@ func (p *proxier) removeStaleServices() {
if svcInfo.NodeLocalExternal() {
groupIDLocal, _ := p.groupCounter.Get(svcPortName, true)
if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil {
klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName)
klog.ErrorS(err, "Failed to remove Group of local Endpoints for Service", "Service", svcPortName)
continue
}
p.groupCounter.Recycle(svcPortName, true)
}
// Remove Service group which has all Endpoints.
groupID, _ := p.groupCounter.Get(svcPortName, false)
if err := p.ofClient.UninstallServiceGroup(groupID); err != nil {
klog.ErrorS(err, "Failed to remove flows of Service", "Service", svcPortName)
klog.ErrorS(err, "Failed to remove Group of all Endpoints for Service", "Service", svcPortName)
continue
}

Expand Down Expand Up @@ -370,7 +370,7 @@ func (p *proxier) installServices() {
pSvcInfo = installedSvcPort.(*types.ServiceInfo)
needRemoval = serviceIdentityChanged(svcInfo, pSvcInfo) || (svcInfo.SessionAffinityType() != pSvcInfo.SessionAffinityType())
needUpdateService = needRemoval || (svcInfo.StickyMaxAgeSeconds() != pSvcInfo.StickyMaxAgeSeconds())
needUpdateEndpoints = pSvcInfo.SessionAffinityType() != svcInfo.SessionAffinityType()
needUpdateEndpoints = pSvcInfo.SessionAffinityType() != svcInfo.SessionAffinityType() || pSvcInfo.NodeLocalExternal() != svcInfo.NodeLocalExternal()
} else { // Need to install.
needUpdateService = true
}
Expand Down Expand Up @@ -452,19 +452,32 @@ func (p *proxier) installServices() {
continue
}

// Install another group when Service externalTrafficPolicy is Local.
if p.proxyAll && svcInfo.NodeLocalExternal() {
groupIDLocal, _ := p.groupCounter.Get(svcPortName, true)
var localEndpointList []k8sproxy.Endpoint
for _, ed := range endpointUpdateList {
if !ed.GetIsLocal() {
if p.proxyAll {
if svcInfo.NodeLocalExternal() {
// Install another group when Service externalTrafficPolicy is Local.
groupIDLocal, _ := p.groupCounter.Get(svcPortName, true)
var localEndpointList []k8sproxy.Endpoint
for _, ed := range endpointUpdateList {
if !ed.GetIsLocal() {
continue
}
localEndpointList = append(localEndpointList, ed)
}
if err = p.ofClient.InstallServiceGroup(groupIDLocal, svcInfo.StickyMaxAgeSeconds() != 0, localEndpointList); err != nil {
klog.ErrorS(err, "Error when installing Group for Service whose externalTrafficPolicy is Local")
continue
}
localEndpointList = append(localEndpointList, ed)
}
if err = p.ofClient.InstallServiceGroup(groupIDLocal, svcInfo.StickyMaxAgeSeconds() != 0, localEndpointList); err != nil {
klog.ErrorS(err, "Error when installing Group for Service whose externalTrafficPolicy is Local")
continue
} else {
// Uninstall the group with only local Endpoints when Service externalTrafficPolicy is Cluster
// unconditionally. If the group doesn't exist on OVS, then the return value will be nil; if the
// group exists on OVS, and after it is uninstalled successfully, then the return value will be also
// nil.
groupIDLocal, _ := p.groupCounter.Get(svcPortName, true)
if err := p.ofClient.UninstallServiceGroup(groupIDLocal); err != nil {
klog.ErrorS(err, "Failed to remove Group of local Endpoints for Service", "Service", svcPortName)
continue
}
p.groupCounter.Recycle(svcPortName, true)
}
}

Expand Down
8 changes: 6 additions & 2 deletions pkg/agent/proxy/proxier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ func testClusterIP(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool, extraSv
mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1)
mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP).Times(1)
mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1)

fp.syncProxyRules()
}
Expand Down Expand Up @@ -292,6 +293,8 @@ func testLoadBalancer(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep
if nodeLocalExternal {
groupID, _ = fp.groupCounter.Get(svcPortName, true)
mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1)
} else {
mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1)
}
if proxyLoadBalancerIPs {
mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), nodeLocalExternal, corev1.ServiceTypeLoadBalancer).Times(1)
Expand Down Expand Up @@ -386,6 +389,8 @@ func testNodePort(t *testing.T, nodePortAddresses []net.IP, svcIP, ep1IP, ep2IP
if nodeLocalExternal {
groupID, _ = fp.groupCounter.Get(svcPortName, true)
mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1)
} else {
mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1)
}

mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), uint16(svcNodePort), bindingProtocol, uint16(0), nodeLocalExternal, corev1.ServiceTypeNodePort).Times(1)
Expand Down Expand Up @@ -624,8 +629,7 @@ func testClusterIPRemoval(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool)
mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1)
mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1)
mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1)
mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1)

mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(2)
fp.syncProxyRules()

fp.serviceChanges.OnServiceUpdate(service, nil)
Expand Down
15 changes: 15 additions & 0 deletions test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -1521,12 +1521,27 @@ func (data *TestData) createAgnhostNodePortService(serviceName string, affinity,
return data.createService(serviceName, testNamespace, 8080, 8080, map[string]string{"app": "agnhost"}, affinity, nodeLocalExternal, corev1.ServiceTypeNodePort, ipFamily)
}

func (data *TestData) updateServiceExternalTrafficPolicy(serviceName string, nodeLocalExternal bool) (*corev1.Service, error) {
svc, err := data.clientset.CoreV1().Services(testNamespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
if err != nil {
return svc, err
}
if nodeLocalExternal {
svc.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyTypeLocal
} else {
svc.Spec.ExternalTrafficPolicy = corev1.ServiceExternalTrafficPolicyTypeCluster
}

return data.clientset.CoreV1().Services(testNamespace).Update(context.TODO(), svc, metav1.UpdateOptions{})
}

// createAgnhostLoadBalancerService creates a LoadBalancer agnhost service with the given name.
func (data *TestData) createAgnhostLoadBalancerService(serviceName string, affinity, nodeLocalExternal bool, ingressIPs []string, ipFamily *corev1.IPFamily) (*corev1.Service, error) {
svc, err := data.createService(serviceName, testNamespace, 8080, 8080, map[string]string{"app": "agnhost"}, affinity, nodeLocalExternal, corev1.ServiceTypeLoadBalancer, ipFamily)
if err != nil {
return svc, err
}

ingress := make([]corev1.LoadBalancerIngress, len(ingressIPs))
for idx, ingressIP := range ingressIPs {
ingress[idx].IP = ingressIP
Expand Down
95 changes: 83 additions & 12 deletions test/e2e/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ func probeClientIPFromPod(data *TestData, pod string, baseUrl string) (string, e
return host, err
}

func reverseStrs(strs []string) []string {
var res []string
for i := len(strs) - 1; i >= 0; i-- {
res = append(res, strs[i])
}
return res
}

func TestProxyLoadBalancerServiceIPv4(t *testing.T) {
skipIfNotIPv4Cluster(t)
testProxyLoadBalancerService(t, false)
Expand Down Expand Up @@ -359,13 +367,6 @@ func nodePortTestCases(t *testing.T, data *TestData, portStrCluster, portStrLoca
clusterUrls = append(clusterUrls, net.JoinHostPort(nodeIP, portStrCluster))
localUrls = append(localUrls, net.JoinHostPort(nodeIP, portStrLocal))
}
reverseStrs := func(strs []string) []string {
var res []string
for i := len(strs) - 1; i >= 0; i-- {
res = append(res, strs[i])
}
return res
}

t.Run("ExternalTrafficPolicy:Cluster/Client:Remote", func(t *testing.T) {
testNodePortClusterFromRemote(t, data, nodes, reverseStrs(clusterUrls))
Expand Down Expand Up @@ -434,11 +435,11 @@ func testNodePortLocalFromRemote(t *testing.T, data *TestData, nodes, urls, expe
for idx, node := range nodes {
hostname, err := probeHostnameFromNode(node, urls[idx])
require.NoError(t, err, errMsg)
require.Equal(t, hostname, expectedHostnames[idx])
require.Equal(t, expectedHostnames[idx], hostname)

clientIP, err := probeClientIPFromNode(node, urls[idx])
require.NoError(t, err, errMsg)
require.Equal(t, clientIP, expectedClientIPs[idx])
require.Equal(t, expectedClientIPs[idx], clientIP)
}
}

Expand All @@ -447,7 +448,7 @@ func testNodePortLocalFromNode(t *testing.T, data *TestData, nodes, urls, expect
for idx, node := range nodes {
hostname, err := probeHostnameFromNode(node, urls[idx])
require.NoError(t, err, "Service NodePort whose externalTrafficPolicy is Local should be able to be connected rom Node")
require.Equal(t, hostname, expectedHostnames[idx])
require.Equal(t, expectedHostnames[idx], hostname)
}
}

Expand All @@ -456,11 +457,11 @@ func testNodePortLocalFromPod(t *testing.T, data *TestData, pods, urls, expected
for idx, pod := range pods {
hostname, err := probeHostnameFromPod(data, pod, urls[idx])
require.NoError(t, err, errMsg)
require.Equal(t, hostname, expectedHostnames[idx])
require.Equal(t, expectedHostnames[idx], hostname)

clientIP, err := probeClientIPFromPod(data, pod, urls[idx])
require.NoError(t, err, errMsg)
require.Equal(t, clientIP, expectedClientIPs[idx])
require.Equal(t, expectedClientIPs[idx], clientIP)
}
}

Expand All @@ -484,6 +485,75 @@ func TestProxyServiceSessionAffinity(t *testing.T) {
}
}

func TestProxyExternalTrafficPolicyIPv4(t *testing.T) {
skipIfNotIPv4Cluster(t)
testProxyExternalTrafficPolicy(t, false)
}

func TestProxyExternalTrafficPolicyIPv6(t *testing.T) {
skipIfNotIPv6Cluster(t)
testProxyExternalTrafficPolicy(t, true)
}

func testProxyExternalTrafficPolicy(t *testing.T, isIPv6 bool) {
skipIfHasWindowsNodes(t)
skipIfNumNodesLessThan(t, 2)
skipIfProxyDisabled(t)
data, err := setupTest(t)
if err != nil {
t.Fatalf("Error when setting up test: %v", err)
}
defer teardownTest(t, data)
skipIfProxyAllDisabled(t, data)

svcName := fmt.Sprintf("nodeport-external-traffic-policy-test-ipv6-%v", isIPv6)
nodes := []string{nodeName(0), nodeName(1)}
nodeIPs := []string{controlPlaneNodeIPv4(), workerNodeIPv4(1)}
ipProtocol := corev1.IPv4Protocol
if isIPv6 {
nodeIPs = []string{controlPlaneNodeIPv6(), workerNodeIPv6(1)}
ipProtocol = corev1.IPv6Protocol
}

// Create agnhost Pods which are not on host network.
var podNames []string
for idx, node := range nodes {
podName := fmt.Sprintf("agnhost-%d-ipv6-%v", idx, isIPv6)
createAgnhostPod(t, data, podName, node, false)
podNames = append(podNames, podName)
}

// Create a NodePort Service whose externalTrafficPolicy is Cluster and backend Pods are created above.
var portStr string
nodePortSvc, err := data.createAgnhostNodePortService(svcName, false, false, &ipProtocol)
require.NoError(t, err)
for _, port := range nodePortSvc.Spec.Ports {
if port.NodePort != 0 {
portStr = fmt.Sprint(port.NodePort)
break
}
}
require.NotEqual(t, "", portStr, "NodePort port number should not be empty")

// Get test NodePort URLs.
var urls []string
for _, nodeIP := range nodeIPs {
urls = append(urls, net.JoinHostPort(nodeIP, portStr))
}

// Hold on to make sure that the Service is realized, then test the NodePort on each Node.
time.Sleep(2 * time.Second)
testNodePortClusterFromRemote(t, data, nodes, reverseStrs(urls))

// Update the NodePort Service's externalTrafficPolicy from Cluster to Local.
_, err = data.updateServiceExternalTrafficPolicy(svcName, true)
require.NoError(t, err)

// Hold on to make sure that the update of Service is realized, then test the NodePort on each Node.
time.Sleep(2 * time.Second)
testNodePortLocalFromRemote(t, data, nodes, reverseStrs(urls), nodeIPs, reverseStrs(podNames))
}

func testProxyServiceSessionAffinity(ipFamily *corev1.IPFamily, ingressIPs []string, data *TestData, t *testing.T) {
nodeName := nodeName(1)
nginx := randName("nginx-")
Expand Down Expand Up @@ -533,6 +603,7 @@ func testProxyServiceSessionAffinity(ipFamily *corev1.IPFamily, ingressIPs []str
}
}
}

func testProxyHairpinCase(t *testing.T, data *TestData) {
if len(clusterInfo.podV4NetworkCIDR) != 0 {
ipFamily := corev1.IPv4Protocol
Expand Down

0 comments on commit ff87705

Please sign in to comment.