diff --git a/ci/jenkins/test-mc.sh b/ci/jenkins/test-mc.sh index f421adeeb8c..4f8ad311a5d 100755 --- a/ci/jenkins/test-mc.sh +++ b/ci/jenkins/test-mc.sh @@ -261,11 +261,18 @@ function run_multicluster_e2e { docker tag "${DOCKER_REGISTRY}/antrea/nginx:latest" "nginx:latest" docker save nginx:latest -o "${WORKDIR}"/nginx.tar + docker pull "${DOCKER_REGISTRY}/antrea/agnhost:2.26" + docker tag "${DOCKER_REGISTRY}/antrea/agnhost:2.26" "agnhost:2.26" + docker save agnhost:2.26 -o "${WORKDIR}"/agnhost.tar + for kubeconfig in ${membercluter_kubeconfigs[@]} do kubectl get nodes -o wide --no-headers=true "${kubeconfig}"| awk '{print $6}' | while read IP; do rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" "${WORKDIR}"/nginx.tar jenkins@["${IP}"]:"${WORKDIR}"/nginx.tar ssh -o StrictHostKeyChecking=no -n jenkins@"${IP}" "${CLEAN_STALE_IMAGES}; docker load -i ${WORKDIR}/nginx.tar" || true + + rsync -avr --progress --inplace -e "ssh -o StrictHostKeyChecking=no" "${WORKDIR}"/agnhost.tar jenkins@["${IP}"]:"${WORKDIR}"/agnhost.tar + ssh -o StrictHostKeyChecking=no -n jenkins@"${IP}" "docker load -i ${WORKDIR}/agnhost.tar" || true done done diff --git a/multicluster/test/e2e/framework.go b/multicluster/test/e2e/framework.go index 6d7ae8ea59d..1e7065a3ba1 100644 --- a/multicluster/test/e2e/framework.go +++ b/multicluster/test/e2e/framework.go @@ -15,21 +15,29 @@ package e2e import ( + "bytes" "context" "fmt" "math/rand" "os" + "strings" "time" + log "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/remotecommand" + crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + crdclientset "antrea.io/antrea/pkg/client/clientset/versioned" + antreae2e "antrea.io/antrea/test/e2e" "antrea.io/antrea/test/e2e/providers" ) @@ -52,7 +60,8 @@ const ( nameSuffixLength int = 8 - nginxImage = "nginx:latest" + nginxImage = "nginx:latest" + agnhostImage = "agnhost:2.26" ) var provider providers.ProviderInterface @@ -69,6 +78,7 @@ var testOptions TestOptions type TestData struct { kubeconfigs map[string]*restclient.Config clients map[string]kubernetes.Interface + crdClients map[string]crdclientset.Interface clusters []string logsDirForTestCase string @@ -79,6 +89,7 @@ var testData *TestData func (data *TestData) createClients() error { data.clients = make(map[string]kubernetes.Interface) data.kubeconfigs = make(map[string]*restclient.Config) + data.crdClients = make(map[string]crdclientset.Interface) loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() configOverrides := &clientcmd.ConfigOverrides{} @@ -98,12 +109,16 @@ func (data *TestData) createClients() error { } clusterClient, err := kubernetes.NewForConfig(kubeConfig) if err != nil { - return fmt.Errorf("error when creating kubernetes client of cluster %s: %v", cluster, err) + return fmt.Errorf("error when creating Kubernetes client of cluster %s: %v", cluster, err) + } + crdClient, err := crdclientset.NewForConfig(kubeConfig) + if err != nil { + return fmt.Errorf("error when creating crd client of cluster %s: %v", cluster, err) } data.kubeconfigs[cluster] = kubeConfig data.clients[cluster] = clusterClient + data.crdClients[cluster] = crdClient } - return nil } @@ -166,6 +181,10 @@ func (data *TestData) getClientOfCluster(clusterName string) kubernetes.Interfac return data.clients[clusterName] } +func (data *TestData) getCRDClientOfCluster(clusterName string) crdclientset.Interface { + return data.crdClients[clusterName] +} + type PodCondition func(*corev1.Pod) (bool, error) // podWaitFor polls the K8s apiserver until the specified Pod is found (in the test Namespace) and @@ -318,6 +337,100 @@ func createPod(client kubernetes.Interface, name string, namespace string, ctrNa return err } +func (data *TestData) probe( + cluster string, + podNamespace string, + podName string, + containerName string, + dstAddr string, + dstName string, + port int32, + protocol corev1.Protocol, +) antreae2e.PodConnectivityMark { + protocolStr := map[corev1.Protocol]string{ + corev1.ProtocolTCP: "tcp", + corev1.ProtocolUDP: "udp", + corev1.ProtocolSCTP: "sctp", + } + // There seems to be an issue when running Antrea in Kind where tunnel traffic is dropped at + // first. This leads to the first test being run consistently failing. To avoid this issue + // until it is resolved, we try to connect 3 times. + // See https://github.com/antrea-io/antrea/issues/467. + cmd := []string{ + "/bin/sh", + "-c", + fmt.Sprintf("for i in $(seq 1 3); do /agnhost connect %s:%d --timeout=1s --protocol=%s; done;", dstAddr, port, protocolStr[protocol]), + } + log.Tracef("Running: kubectl exec %s -c %s -n %s -- %s", podName, containerName, podNamespace, strings.Join(cmd, " ")) + stdout, stderr, err := data.runCommandFromPod(cluster, podNamespace, podName, containerName, cmd) + if err != nil { + // log this error as trace since may be an expected failure + log.Tracef("%s -> %s: error when running command: err - %v /// stdout - %s /// stderr - %s", podName, dstName, err, stdout, stderr) + // If err != nil and stderr == "", then it means this probe failed because of + // the command instead of connectivity. For example, container name doesn't exist. + if stderr == "" { + return antreae2e.Error + } + return decideProbeResult(stderr, 3) + } + return antreae2e.Connected +} + +// decideProbeResult uses the probe stderr to decide the connectivity. +func decideProbeResult(stderr string, probeNum int) antreae2e.PodConnectivityMark { + countConnected := probeNum - strings.Count(stderr, "\n") + countDropped := strings.Count(stderr, "TIMEOUT") + // For our UDP rejection cases, agnhost will return: + // For IPv4: 'UNKNOWN: read udp [src]->[dst]: read: no route to host' + // For IPv6: 'UNKNOWN: read udp [src]->[dst]: read: permission denied' + // To avoid incorrect identification, we use 'no route to host' and + // `permission denied`, instead of 'UNKNOWN' as key string. + // For our other protocols rejection cases, agnhost will return 'REFUSED'. + countRejected := strings.Count(stderr, "REFUSED") + strings.Count(stderr, "no route to host") + strings.Count(stderr, "permission denied") + + if countRejected == 0 && countConnected > 0 { + return antreae2e.Connected + } + if countConnected == 0 && countRejected > 0 { + return antreae2e.Rejected + } + if countDropped == probeNum { + return antreae2e.Dropped + } + return antreae2e.Error +} + +// Run the provided command in the specified Container for the given Pod and returns the contents of +// stdout and stderr as strings. An error either indicates that the command couldn't be run or that +// the command returned a non-zero error code. +func (data *TestData) runCommandFromPod(cluster, podNamespace, podName, containerName string, cmd []string) (stdout string, stderr string, err error) { + request := data.clients[cluster].CoreV1().RESTClient().Post(). + Namespace(podNamespace). + Resource("pods"). + Name(podName). + SubResource("exec"). + Param("container", containerName). + VersionedParams(&corev1.PodExecOptions{ + Command: cmd, + Stdin: false, + Stdout: true, + Stderr: true, + TTY: false, + }, scheme.ParameterCodec) + exec, err := remotecommand.NewSPDYExecutor(data.kubeconfigs[cluster], "POST", request.URL()) + if err != nil { + return "", "", err + } + var stdoutB, stderrB bytes.Buffer + if err := exec.Stream(remotecommand.StreamOptions{ + Stdout: &stdoutB, + Stderr: &stderrB, + }); err != nil { + return stdoutB.String(), stderrB.String(), err + } + return stdoutB.String(), stderrB.String(), nil +} + func createService(client kubernetes.Interface, serviceName string, namespace string, port int32, targetPort int32, protocol corev1.Protocol, selector map[string]string, affinity bool, nodeLocalExternal bool, serviceType corev1.ServiceType, ipFamily *corev1.IPFamily, annotation map[string]string) (*corev1.Service, error) { @@ -382,3 +495,32 @@ func podWaitFor(client kubernetes.Interface, timeout time.Duration, name string, } return client.CoreV1().Pods(namespace).Get(context.TODO(), name, metav1.GetOptions{}) } + +// createOrUpdateANP is a convenience function for updating/creating Antrea NetworkPolicies. +func createOrUpdateANP(crdClient crdclientset.Interface, anp *crdv1alpha1.NetworkPolicy) (*crdv1alpha1.NetworkPolicy, error) { + log.Infof("Creating/updating Antrea NetworkPolicy %s/%s", anp.Namespace, anp.Name) + cnpReturned, err := crdClient.CrdV1alpha1().NetworkPolicies(anp.Namespace).Get(context.TODO(), anp.Name, metav1.GetOptions{}) + if err != nil { + log.Debugf("Creating Antrea NetworkPolicy %s", anp.Name) + anp, err = crdClient.CrdV1alpha1().NetworkPolicies(anp.Namespace).Create(context.TODO(), anp, metav1.CreateOptions{}) + if err != nil { + log.Debugf("Unable to create Antrea NetworkPolicy: %s", err) + } + return anp, err + } else if cnpReturned.Name != "" { + log.Debugf("Antrea NetworkPolicy with name %s already exists, updating", anp.Name) + anp, err = crdClient.CrdV1alpha1().NetworkPolicies(anp.Namespace).Update(context.TODO(), anp, metav1.UpdateOptions{}) + return anp, err + } + return nil, fmt.Errorf("error occurred in creating/updating Antrea NetworkPolicy %s", anp.Name) +} + +// deleteANP is a convenience function for deleting ANP by name and Namespace. +func deleteANP(crdClient crdclientset.Interface, ns, name string) error { + log.Infof("Deleting Antrea NetworkPolicy '%s/%s'", ns, name) + err := crdClient.CrdV1alpha1().NetworkPolicies(ns).Delete(context.TODO(), name, metav1.DeleteOptions{}) + if err != nil { + return fmt.Errorf("unable to delete Antrea NetworkPolicy %s: %v", name, err) + } + return nil +} diff --git a/multicluster/test/e2e/service_test.go b/multicluster/test/e2e/service_test.go index 74277f5759c..6deca699da4 100644 --- a/multicluster/test/e2e/service_test.go +++ b/multicluster/test/e2e/service_test.go @@ -16,10 +16,15 @@ package e2e import ( "fmt" + "strconv" "testing" "time" corev1 "k8s.io/api/core/v1" + + crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1" + antreae2e "antrea.io/antrea/test/e2e" + e2euttils "antrea.io/antrea/test/e2e/utils" ) func TestConnectivity(t *testing.T) { @@ -42,8 +47,10 @@ func testServiceExport(t *testing.T, data *TestData) { // we create a nginx in on cluster(east), and try to curl it in another cluster(west). // If we got status code 200, it means that the resources is exported by the east cluster // and imported by the west cluster. +// TODO(yang): reorg test function contents func (data *TestData) testServiceExport(t *testing.T) { podName := randName("test-nginx-") + clientPodName := "test-service-client" if err := createPodWrapper(t, data, westCluster, multiClusterTestNamespace, podName, nginxImage, "nginx", nil, nil, nil, nil, false, nil); err != nil { t.Fatalf("Error when creating nginx Pod in west cluster: %v", err) @@ -94,6 +101,38 @@ func (data *TestData) testServiceExport(t *testing.T) { if err := data.probeFromCluster(westCluster, westIP); err != nil { t.Fatalf("Error when probe service from %s", westCluster) } + + if err := data.createPod(eastCluster, clientPodName, multiClusterTestNamespace, "client", agnhostImage, + []string{"sleep", strconv.Itoa(3600)}, nil, nil, nil, false, nil); err != nil { + t.Fatalf("Error when creating client Pod in east cluster: %v", err) + } + defer deletePodWrapper(t, data, eastCluster, multiClusterTestNamespace, clientPodName) + _, err = data.podWaitFor(defaultTimeout, eastCluster, clientPodName, multiClusterTestNamespace, func(pod *corev1.Pod) (bool, error) { + return pod.Status.Phase == corev1.PodRunning, nil + }) + if err != nil { + t.Fatalf("Error when waiting for Pod '%s' in east cluster: %v", clientPodName, err) + } + + anpBuilder := &e2euttils.AntreaNetworkPolicySpecBuilder{} + anpBuilder = anpBuilder.SetName(multiClusterTestNamespace, "block-west-exported-service"). + SetPriority(1.0). + SetAppliedToGroup([]e2euttils.ANPAppliedToSpec{{PodSelector: map[string]string{"app": "client"}}}). + AddToServicesRule([]crdv1alpha1.ServiceReference{{ + Name: fmt.Sprintf("antrea-mc-%s", westClusterTestService), + Namespace: multiClusterTestNamespace}, + }, "", nil, crdv1alpha1.RuleActionDrop) + if _, err := createOrUpdateANP(data.getCRDClientOfCluster(eastCluster), anpBuilder.Get()); err != nil { + t.Fatalf("Error creating ANP %s: %v", anpBuilder.Name, err) + } + defer deleteANP(data.getCRDClientOfCluster(eastCluster), multiClusterTestNamespace, anpBuilder.Name) + + connectivity := data.probe(eastCluster, multiClusterTestNamespace, clientPodName, "client", westIP, "westClusterServiceIP", 80, corev1.ProtocolTCP) + if connectivity == antreae2e.Error { + t.Errorf("Failure -- could not complete probe: %v", err) + } else if connectivity != antreae2e.Dropped { + t.Errorf("Failure -- wrong result from probing exported Service after applying toService AntreaNetworkPolicy. Expected: %v, Actual: %v", antreae2e.Dropped, connectivity) + } } func (data *TestData) deployServiceExport(clusterName string) error { diff --git a/test/e2e/utils/anpspecbuilder.go b/test/e2e/utils/anpspecbuilder.go index 13af754f55e..f33feb2bf7d 100644 --- a/test/e2e/utils/anpspecbuilder.go +++ b/test/e2e/utils/anpspecbuilder.go @@ -207,6 +207,23 @@ func (b *AntreaNetworkPolicySpecBuilder) AddEgress(protoc v1.Protocol, return b } +func (b *AntreaNetworkPolicySpecBuilder) AddToServicesRule(svcRefs []crdv1alpha1.ServiceReference, + name string, ruleAppliedToSpecs []ANPAppliedToSpec, action crdv1alpha1.RuleAction) *AntreaNetworkPolicySpecBuilder { + var appliedTos []crdv1alpha1.NetworkPolicyPeer + for _, at := range ruleAppliedToSpecs { + appliedTos = append(appliedTos, b.GetAppliedToPeer(at.PodSelector, at.PodSelectorMatchExp)) + } + newRule := crdv1alpha1.Rule{ + To: make([]crdv1alpha1.NetworkPolicyPeer, 0), + ToServices: svcRefs, + Action: &action, + Name: name, + AppliedTo: appliedTos, + } + b.Spec.Egress = append(b.Spec.Egress, newRule) + return b +} + func (b *AntreaNetworkPolicySpecBuilder) AddEgressLogging() *AntreaNetworkPolicySpecBuilder { for i, e := range b.Spec.Egress { e.EnableLogging = true