Skip to content

Commit

Permalink
Add toService E2E test for exported Service (antrea-io#3140)
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Ding <[email protected]>
  • Loading branch information
Dyanngg authored and luolanzone committed Jan 16, 2022
1 parent fbc8d99 commit 69ea772
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 3 deletions.
7 changes: 7 additions & 0 deletions ci/jenkins/test-mc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -261,11 +261,18 @@ function run_multicluster_e2e {
docker tag "${DOCKER_REGISTRY}/antrea/nginx:latest" "nginx:latest"
docker save nginx:latest -o "${WORKDIR}"/nginx.tar

docker pull "${DOCKER_REGISTRY}/antrea/agnhost:2.26"
docker tag "${DOCKER_REGISTRY}/antrea/agnhost:2.26" "agnhost:2.26"
docker save agnhost:2.26 -o "${WORKDIR}"/agnhost.tar

for kubeconfig in ${membercluter_kubeconfigs[@]}
do
kubectl get nodes -o wide --no-headers=true "${kubeconfig}"| awk '{print $6}' | while read IP; do
rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" "${WORKDIR}"/nginx.tar jenkins@["${IP}"]:"${WORKDIR}"/nginx.tar
ssh -o StrictHostKeyChecking=no -n jenkins@"${IP}" "${CLEAN_STALE_IMAGES}; docker load -i ${WORKDIR}/nginx.tar" || true

rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" "${WORKDIR}"/agnhost.tar jenkins@["${IP}"]:"${WORKDIR}"/agnhost.tar
ssh -o StrictHostKeyChecking=no -n jenkins@"${IP}" "docker load -i ${WORKDIR}/agnhost.tar" || true
done

done
Expand Down
148 changes: 145 additions & 3 deletions multicluster/test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,29 @@
package e2e

import (
"bytes"
"context"
"fmt"
"math/rand"
"os"
"strings"
"time"

log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"

crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
crdclientset "antrea.io/antrea/pkg/client/clientset/versioned"
antreae2e "antrea.io/antrea/test/e2e"
"antrea.io/antrea/test/e2e/providers"
)

Expand All @@ -52,7 +60,8 @@ const (

nameSuffixLength int = 8

nginxImage = "nginx:latest"
nginxImage = "nginx:latest"
agnhostImage = "agnhost:2.26"
)

var provider providers.ProviderInterface
Expand All @@ -69,6 +78,7 @@ var testOptions TestOptions
type TestData struct {
kubeconfigs map[string]*restclient.Config
clients map[string]kubernetes.Interface
crdClients map[string]crdclientset.Interface
clusters []string

logsDirForTestCase string
Expand All @@ -79,6 +89,7 @@ var testData *TestData
func (data *TestData) createClients() error {
data.clients = make(map[string]kubernetes.Interface)
data.kubeconfigs = make(map[string]*restclient.Config)
data.crdClients = make(map[string]crdclientset.Interface)
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
configOverrides := &clientcmd.ConfigOverrides{}

Expand All @@ -98,12 +109,16 @@ func (data *TestData) createClients() error {
}
clusterClient, err := kubernetes.NewForConfig(kubeConfig)
if err != nil {
return fmt.Errorf("error when creating kubernetes client of cluster %s: %v", cluster, err)
return fmt.Errorf("error when creating Kubernetes client of cluster %s: %v", cluster, err)
}
crdClient, err := crdclientset.NewForConfig(kubeConfig)
if err != nil {
return fmt.Errorf("error when creating crd client of cluster %s: %v", cluster, err)
}
data.kubeconfigs[cluster] = kubeConfig
data.clients[cluster] = clusterClient
data.crdClients[cluster] = crdClient
}

return nil
}

Expand Down Expand Up @@ -166,6 +181,10 @@ func (data *TestData) getClientOfCluster(clusterName string) kubernetes.Interfac
return data.clients[clusterName]
}

func (data *TestData) getCRDClientOfCluster(clusterName string) crdclientset.Interface {
return data.crdClients[clusterName]
}

type PodCondition func(*corev1.Pod) (bool, error)

// podWaitFor polls the K8s apiserver until the specified Pod is found (in the test Namespace) and
Expand Down Expand Up @@ -318,6 +337,100 @@ func createPod(client kubernetes.Interface, name string, namespace string, ctrNa
return err
}

func (data *TestData) probe(
cluster string,
podNamespace string,
podName string,
containerName string,
dstAddr string,
dstName string,
port int32,
protocol corev1.Protocol,
) antreae2e.PodConnectivityMark {
protocolStr := map[corev1.Protocol]string{
corev1.ProtocolTCP: "tcp",
corev1.ProtocolUDP: "udp",
corev1.ProtocolSCTP: "sctp",
}
// There seems to be an issue when running Antrea in Kind where tunnel traffic is dropped at
// first. This leads to the first test being run consistently failing. To avoid this issue
// until it is resolved, we try to connect 3 times.
// See https://github.com/antrea-io/antrea/issues/467.
cmd := []string{
"/bin/sh",
"-c",
fmt.Sprintf("for i in $(seq 1 3); do /agnhost connect %s:%d --timeout=1s --protocol=%s; done;", dstAddr, port, protocolStr[protocol]),
}
log.Tracef("Running: kubectl exec %s -c %s -n %s -- %s", podName, containerName, podNamespace, strings.Join(cmd, " "))
stdout, stderr, err := data.runCommandFromPod(cluster, podNamespace, podName, containerName, cmd)
if err != nil {
// log this error as trace since may be an expected failure
log.Tracef("%s -> %s: error when running command: err - %v /// stdout - %s /// stderr - %s", podName, dstName, err, stdout, stderr)
// If err != nil and stderr == "", then it means this probe failed because of
// the command instead of connectivity. For example, container name doesn't exist.
if stderr == "" {
return antreae2e.Error
}
return decideProbeResult(stderr, 3)
}
return antreae2e.Connected
}

// decideProbeResult uses the probe stderr to decide the connectivity.
func decideProbeResult(stderr string, probeNum int) antreae2e.PodConnectivityMark {
countConnected := probeNum - strings.Count(stderr, "\n")
countDropped := strings.Count(stderr, "TIMEOUT")
// For our UDP rejection cases, agnhost will return:
// For IPv4: 'UNKNOWN: read udp [src]->[dst]: read: no route to host'
// For IPv6: 'UNKNOWN: read udp [src]->[dst]: read: permission denied'
// To avoid incorrect identification, we use 'no route to host' and
// `permission denied`, instead of 'UNKNOWN' as key string.
// For our other protocols rejection cases, agnhost will return 'REFUSED'.
countRejected := strings.Count(stderr, "REFUSED") + strings.Count(stderr, "no route to host") + strings.Count(stderr, "permission denied")

if countRejected == 0 && countConnected > 0 {
return antreae2e.Connected
}
if countConnected == 0 && countRejected > 0 {
return antreae2e.Rejected
}
if countDropped == probeNum {
return antreae2e.Dropped
}
return antreae2e.Error
}

// Run the provided command in the specified Container for the given Pod and returns the contents of
// stdout and stderr as strings. An error either indicates that the command couldn't be run or that
// the command returned a non-zero error code.
func (data *TestData) runCommandFromPod(cluster, podNamespace, podName, containerName string, cmd []string) (stdout string, stderr string, err error) {
request := data.clients[cluster].CoreV1().RESTClient().Post().
Namespace(podNamespace).
Resource("pods").
Name(podName).
SubResource("exec").
Param("container", containerName).
VersionedParams(&corev1.PodExecOptions{
Command: cmd,
Stdin: false,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(data.kubeconfigs[cluster], "POST", request.URL())
if err != nil {
return "", "", err
}
var stdoutB, stderrB bytes.Buffer
if err := exec.Stream(remotecommand.StreamOptions{
Stdout: &stdoutB,
Stderr: &stderrB,
}); err != nil {
return stdoutB.String(), stderrB.String(), err
}
return stdoutB.String(), stderrB.String(), nil
}

func createService(client kubernetes.Interface, serviceName string, namespace string, port int32, targetPort int32,
protocol corev1.Protocol, selector map[string]string, affinity bool, nodeLocalExternal bool, serviceType corev1.ServiceType,
ipFamily *corev1.IPFamily, annotation map[string]string) (*corev1.Service, error) {
Expand Down Expand Up @@ -382,3 +495,32 @@ func podWaitFor(client kubernetes.Interface, timeout time.Duration, name string,
}
return client.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{})
}

// createOrUpdateANP is a convenience function for updating/creating Antrea NetworkPolicies.
func createOrUpdateANP(crdClient crdclientset.Interface, anp *crdv1alpha1.NetworkPolicy) (*crdv1alpha1.NetworkPolicy, error) {
log.Infof("Creating/updating Antrea NetworkPolicy %s/%s", anp.Namespace, anp.Name)
cnpReturned, err := crdClient.CrdV1alpha1().NetworkPolicies(anp.Namespace).Get(context.TODO(), anp.Name, metav1.GetOptions{})
if err != nil {
log.Debugf("Creating Antrea NetworkPolicy %s", anp.Name)
anp, err = crdClient.CrdV1alpha1().NetworkPolicies(anp.Namespace).Create(context.TODO(), anp, metav1.CreateOptions{})
if err != nil {
log.Debugf("Unable to create Antrea NetworkPolicy: %s", err)
}
return anp, err
} else if cnpReturned.Name != "" {
log.Debugf("Antrea NetworkPolicy with name %s already exists, updating", anp.Name)
anp, err = crdClient.CrdV1alpha1().NetworkPolicies(anp.Namespace).Update(context.TODO(), anp, metav1.UpdateOptions{})
return anp, err
}
return nil, fmt.Errorf("error occurred in creating/updating Antrea NetworkPolicy %s", anp.Name)
}

// deleteANP is a convenience function for deleting ANP by name and Namespace.
func deleteANP(crdClient crdclientset.Interface, ns, name string) error {
log.Infof("Deleting Antrea NetworkPolicy '%s/%s'", ns, name)
err := crdClient.CrdV1alpha1().NetworkPolicies(ns).Delete(context.TODO(), name, metav1.DeleteOptions{})
if err != nil {
return fmt.Errorf("unable to delete Antrea NetworkPolicy %s: %v", name, err)
}
return nil
}
39 changes: 39 additions & 0 deletions multicluster/test/e2e/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,15 @@ package e2e

import (
"fmt"
"strconv"
"testing"
"time"

corev1 "k8s.io/api/core/v1"

crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
antreae2e "antrea.io/antrea/test/e2e"
e2euttils "antrea.io/antrea/test/e2e/utils"
)

func TestConnectivity(t *testing.T) {
Expand All @@ -42,8 +47,10 @@ func testServiceExport(t *testing.T, data *TestData) {
// we create a nginx in on cluster(east), and try to curl it in another cluster(west).
// If we got status code 200, it means that the resources is exported by the east cluster
// and imported by the west cluster.
// TODO(yang): reorg test function contents
func (data *TestData) testServiceExport(t *testing.T) {
podName := randName("test-nginx-")
clientPodName := "test-service-client"

if err := createPodWrapper(t, data, westCluster, multiClusterTestNamespace, podName, nginxImage, "nginx", nil, nil, nil, nil, false, nil); err != nil {
t.Fatalf("Error when creating nginx Pod in west cluster: %v", err)
Expand Down Expand Up @@ -94,6 +101,38 @@ func (data *TestData) testServiceExport(t *testing.T) {
if err := data.probeFromCluster(westCluster, westIP); err != nil {
t.Fatalf("Error when probe service from %s", westCluster)
}

if err := data.createPod(eastCluster, clientPodName, multiClusterTestNamespace, "client", agnhostImage,
[]string{"sleep", strconv.Itoa(3600)}, nil, nil, nil, false, nil); err != nil {
t.Fatalf("Error when creating client Pod in east cluster: %v", err)
}
defer deletePodWrapper(t, data, eastCluster, multiClusterTestNamespace, clientPodName)
_, err = data.podWaitFor(defaultTimeout, eastCluster, clientPodName, multiClusterTestNamespace, func(pod *corev1.Pod) (bool, error) {
return pod.Status.Phase == corev1.PodRunning, nil
})
if err != nil {
t.Fatalf("Error when waiting for Pod '%s' in east cluster: %v", clientPodName, err)
}

anpBuilder := &e2euttils.AntreaNetworkPolicySpecBuilder{}
anpBuilder = anpBuilder.SetName(multiClusterTestNamespace, "block-west-exported-service").
SetPriority(1.0).
SetAppliedToGroup([]e2euttils.ANPAppliedToSpec{{PodSelector: map[string]string{"app": "client"}}}).
AddToServicesRule([]crdv1alpha1.ServiceReference{{
Name: fmt.Sprintf("antrea-mc-%s", westClusterTestService),
Namespace: multiClusterTestNamespace},
}, "", nil, crdv1alpha1.RuleActionDrop)
if _, err := createOrUpdateANP(data.getCRDClientOfCluster(eastCluster), anpBuilder.Get()); err != nil {
t.Fatalf("Error creating ANP %s: %v", anpBuilder.Name, err)
}
defer deleteANP(data.getCRDClientOfCluster(eastCluster), multiClusterTestNamespace, anpBuilder.Name)

connectivity := data.probe(eastCluster, multiClusterTestNamespace, clientPodName, "client", westIP, "westClusterServiceIP", 80, corev1.ProtocolTCP)
if connectivity == antreae2e.Error {
t.Errorf("Failure -- could not complete probe: %v", err)
} else if connectivity != antreae2e.Dropped {
t.Errorf("Failure -- wrong result from probing exported Service after applying toService AntreaNetworkPolicy. Expected: %v, Actual: %v", antreae2e.Dropped, connectivity)
}
}

func (data *TestData) deployServiceExport(clusterName string) error {
Expand Down
17 changes: 17 additions & 0 deletions test/e2e/utils/anpspecbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,23 @@ func (b *AntreaNetworkPolicySpecBuilder) AddEgress(protoc v1.Protocol,
return b
}

func (b *AntreaNetworkPolicySpecBuilder) AddToServicesRule(svcRefs []crdv1alpha1.ServiceReference,
name string, ruleAppliedToSpecs []ANPAppliedToSpec, action crdv1alpha1.RuleAction) *AntreaNetworkPolicySpecBuilder {
var appliedTos []crdv1alpha1.NetworkPolicyPeer
for _, at := range ruleAppliedToSpecs {
appliedTos = append(appliedTos, b.GetAppliedToPeer(at.PodSelector, at.PodSelectorMatchExp))
}
newRule := crdv1alpha1.Rule{
To: make([]crdv1alpha1.NetworkPolicyPeer, 0),
ToServices: svcRefs,
Action: &action,
Name: name,
AppliedTo: appliedTos,
}
b.Spec.Egress = append(b.Spec.Egress, newRule)
return b
}

func (b *AntreaNetworkPolicySpecBuilder) AddEgressLogging() *AntreaNetworkPolicySpecBuilder {
for i, e := range b.Spec.Egress {
e.EnableLogging = true
Expand Down

0 comments on commit 69ea772

Please sign in to comment.