diff --git a/connectivity/check/context.go b/connectivity/check/context.go index 8e7301280e..bf1b93da7e 100644 --- a/connectivity/check/context.go +++ b/connectivity/check/context.go @@ -302,6 +302,9 @@ func (ct *ConnectivityTest) SetupAndValidate(ctx context.Context) error { if err := ct.validateDeployment(ctx); err != nil { return err } + if err := ct.patchEchoServicesWithExternalIPs(ctx); err != nil { + return err + } if ct.params.Hubble { if err := ct.enableHubbleClient(ctx); err != nil { return fmt.Errorf("unable to create hubble client: %s", err) @@ -760,6 +763,13 @@ func (ct *ConnectivityTest) PingCommand(peer TestPeer, ipFam IPFamily) []string return cmd } +func (ct *ConnectivityTest) DigCommand(peer TestPeer, ipFam IPFamily) []string { + cmd := []string{"dig", "+time=2", "kubernetes"} + + cmd = append(cmd, fmt.Sprintf("@%s", peer.Address(ipFam))) + return cmd +} + func (ct *ConnectivityTest) RandomClientPod() *Pod { for _, p := range ct.clientPods { return &p diff --git a/connectivity/check/deployment.go b/connectivity/check/deployment.go index 1189aa7065..0ddefa07e8 100644 --- a/connectivity/check/deployment.go +++ b/connectivity/check/deployment.go @@ -17,9 +17,11 @@ import ( networkingv1 "k8s.io/api/networking/v1" k8sErrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" "github.com/cilium/cilium-cli/defaults" + "github.com/cilium/cilium-cli/internal/utils" "github.com/cilium/cilium-cli/k8s" ) @@ -236,6 +238,7 @@ type daemonSetParameters struct { Labels map[string]string HostNetwork bool Tolerations []corev1.Toleration + Capabilities []corev1.Capability } func newDaemonSet(p daemonSetParameters) *appsv1.DaemonSet { @@ -266,7 +269,7 @@ func newDaemonSet(p daemonSetParameters) *appsv1.DaemonSet { ReadinessProbe: p.ReadinessProbe, SecurityContext: &corev1.SecurityContext{ Capabilities: &corev1.Capabilities{ - Add: []corev1.Capability{"NET_RAW"}, + Add: append([]corev1.Capability{"NET_RAW"}, p.Capabilities...), }, }, }, @@ -814,6 +817,7 @@ func (ct *ConnectivityTest) deploy(ctx context.Context) error { Tolerations: []corev1.Toleration{ {Operator: corev1.TolerationOpExists}, }, + Capabilities: []corev1.Capability{"NET_ADMIN"}, }) _, err = ct.clients.src.CreateDaemonSet(ctx, ct.params.TestNamespace, ds, metav1.CreateOptions{}) if err != nil { @@ -1202,6 +1206,106 @@ func (ct *ConnectivityTest) validateDeployment(ctx context.Context) error { return nil } +// patchEchoServicesWithExternalIPs patches the echo services (echo-same-node +// and echo-other-node) external IPs to the IP of the node running those pods. +// +// This needs to happen after validate as we need to wait for the pods to be +// scheduled on the nodes first, in order to retrieve the pods' host IPs. + +func (ct *ConnectivityTest) patchEchoServicesWithExternalIPs(ctx context.Context) error { + patchedServices := []*corev1.Service{} + + echoSamePodHostIPs := []string{} + for _, client := range ct.clients.clients() { + echoPods, err := client.ListPods(ctx, ct.params.TestNamespace, metav1.ListOptions{LabelSelector: "name=" + "echo-same-node"}) + if err != nil { + return fmt.Errorf("unable to list echo pods: %w", err) + } + + for _, echoPod := range echoPods.Items { + echoSamePodHostIPs = append(echoSamePodHostIPs, echoPod.Status.HostIP) + } + + } + + for _, client := range ct.clients.clients() { + patch := fmt.Sprintf(`{"spec":{"externalIPs":["%s"], "externalTrafficPolicy": "Local"}}`, strings.Join(echoSamePodHostIPs, ",")) + + service, err := client.PatchService(ctx, ct.params.TestNamespace, echoSameNodeDeploymentName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) + if err != nil { + return err + } + + s := ct.echoServices[service.Name] + s.Service = service.DeepCopy() + ct.echoServices[service.Name] = s + + patchedServices = append(patchedServices, service) + } + + echoOtherPodHostIPs := []string{} + for _, client := range ct.clients.clients() { + echoPods, err := client.ListPods(ctx, ct.params.TestNamespace, metav1.ListOptions{LabelSelector: "name=" + "echo-other-node"}) + if err != nil { + return fmt.Errorf("unable to list echo pods: %w", err) + } + + for _, echoPod := range echoPods.Items { + echoOtherPodHostIPs = append(echoOtherPodHostIPs, echoPod.Status.HostIP) + } + } + + for _, client := range ct.clients.clients() { + patch := fmt.Sprintf(`{"spec":{"externalIPs":["%s"], "externalTrafficPolicy": "Local"}}`, strings.Join(echoOtherPodHostIPs, ",")) + + service, err := client.PatchService(ctx, ct.params.TestNamespace, echoOtherNodeDeploymentName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{}) + if err != nil { + return err + } + + s := ct.echoServices[service.Name] + s.Service = service.DeepCopy() + ct.echoServices[service.Name] = s + + patchedServices = append(patchedServices, service) + } + + ensureFrontend := func() error { + for _, client := range ct.clients.clients() { + for _, ciliumPod := range ct.CiliumPods() { + for _, service := range patchedServices { + for _, ip := range service.Spec.ExternalIPs { + cmd := []string{"sh", "-c", + fmt.Sprintf("cilium bpf lb list --frontends | grep %s:%d", ip, service.Spec.Ports[0].Port)} + + if out, err := client.ExecInPod(ctx, ciliumPod.Pod.Namespace, ciliumPod.Pod.Name, defaults.AgentContainerName, cmd); err != nil { + fmt.Println(out.String()) + return err + } + } + } + } + } + + return nil + } + + w := utils.NewWaitObserver(ctx, utils.WaitParameters{Timeout: 10 * time.Second}) + defer w.Cancel() + for { + if err := ensureFrontend(); err != nil { + if err := w.Retry(err); err != nil { + return fmt.Errorf("Failed to ensure external IPs are exposed: %w", err) + } + + continue + } + break + } + + return nil +} + // Validate that srcPod can query the DNS server on dstPod successfully func (ct *ConnectivityTest) waitForPodDNS(ctx context.Context, srcPod, dstPod Pod) error { ct.Logf("⌛ [%s] Waiting for pod %s to reach DNS server on %s pod...", ct.client.ClusterName(), srcPod.Name(), dstPod.Name()) diff --git a/connectivity/check/peer.go b/connectivity/check/peer.go index 981d4dee78..c88a641108 100644 --- a/connectivity/check/peer.go +++ b/connectivity/check/peer.go @@ -199,6 +199,46 @@ func (s Service) Labels() map[string]string { return newMap } +func (s Service) ToExternalIPService() ExternalIPService { + return ExternalIPService{ + Service: s, + } +} + +// ExternalIPService TODO +// It implements interface TestPeer. +type ExternalIPService struct { + Service Service +} + +func (s ExternalIPService) Name() string { + return s.Service.Name() +} + +func (s ExternalIPService) Scheme() string { + return s.Service.Scheme() +} + +func (s ExternalIPService) Path() string { + return s.Service.Path() +} + +func (s ExternalIPService) Address(family IPFamily) string { + return s.Service.Service.Spec.ExternalIPs[0] +} + +func (s ExternalIPService) Port() uint32 { + return uint32(s.Service.Service.Spec.Ports[0].NodePort) +} + +func (s ExternalIPService) HasLabel(name, value string) bool { + return s.Service.HasLabel(name, value) +} + +func (s ExternalIPService) Labels() map[string]string { + return s.Service.Labels() +} + // ExternalWorkload is an external workload acting as a peer in a // connectivity test. It implements interface TestPeer. type ExternalWorkload struct { diff --git a/connectivity/manifests/egress-gateway-policy.yaml b/connectivity/manifests/egress-gateway-policy.yaml index 392856073e..6cc6f45a14 100644 --- a/connectivity/manifests/egress-gateway-policy.yaml +++ b/connectivity/manifests/egress-gateway-policy.yaml @@ -14,3 +14,19 @@ spec: nodeSelector: matchLabels: kubernetes.io/hostname: NODE_NAME_PLACEHOLDER +--- +apiVersion: cilium.io/v2 +kind: CiliumEgressGatewayPolicy +metadata: + name: cegp-sample-echo-service +spec: + selectors: + - podSelector: + matchLabels: + kind: echo + destinationCIDRs: + - 0.0.0.0/0 + egressGateway: + nodeSelector: + matchLabels: + kubernetes.io/hostname: NODE_NAME_PLACEHOLDER diff --git a/connectivity/tests/egressgateway.go b/connectivity/tests/egressgateway.go index 43e8d3a588..2db873e6d0 100644 --- a/connectivity/tests/egressgateway.go +++ b/connectivity/tests/egressgateway.go @@ -14,6 +14,8 @@ import ( "github.com/cilium/cilium-cli/connectivity/check" "github.com/cilium/cilium-cli/defaults" "github.com/cilium/cilium-cli/internal/utils" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) // EgressGateway is a test case which, given the cegp-sample @@ -48,10 +50,37 @@ func (s *egressGateway) Run(ctx context.Context, t *check.Test) { s.waitForBpfPolicyEntries(ctx, t) + // Ping hosts (pod to host connectivity) i := 0 for _, client := range ct.ClientPods() { client := client + for _, dst := range ct.HostNetNSPodsByNode() { + t.NewAction(s, fmt.Sprintf("ping-%d", i), &client, &dst, check.IPFamilyV4).Run(func(a *check.Action) { + a.ExecInPod(ctx, ct.PingCommand(dst, check.IPFamilyV4)) + }) + i++ + } + } + + // DNS query (pod to service connectivity) + i = 0 + for _, client := range ct.ClientPods() { + kubeDnsService, err := ct.K8sClient().GetService(ctx, "kube-system", "kube-dns", metav1.GetOptions{}) + if err != nil { + t.Fatal("Cannot get kube-dns service") + } + kubeDnsServicePeer := check.Service{Service: kubeDnsService} + + t.NewAction(s, fmt.Sprintf("dig-%d", i), &client, kubeDnsServicePeer, check.IPFamilyV4).Run(func(a *check.Action) { + a.ExecInPod(ctx, ct.DigCommand(kubeDnsServicePeer, check.IPFamilyV4)) + }) + i++ + } + + // Traffic matching an egress gateway policy should leave the cluster masqueraded with the egress IP (pod to external service) + i = 0 + for _, client := range ct.ClientPods() { for _, externalEcho := range ct.ExternalEchoPods() { t.NewAction(s, fmt.Sprintf("curl-%d", i), &client, externalEcho, check.IPFamilyV4).Run(func(a *check.Action) { a.ExecInPod(ctx, ct.CurlClientIPCommand(externalEcho, check.IPFamilyV4)) @@ -64,6 +93,68 @@ func (s *egressGateway) Run(ctx context.Context, t *check.Test) { i++ } } + + // When connecting from outside the cluster to a nodeport service whose pods are selected by an egress policy, + // the reply traffic should not be SNATed with the egress IP + i = 0 + for _, client := range ct.ExternalEchoPods() { + for _, echo := range ct.EchoServices() { + // convert the service to a ServiceExternalIP as we want to access it through its external IP + echo := echo.ToExternalIPService() + + t.NewAction(s, fmt.Sprintf("curl-%d", i), &client, echo, check.IPFamilyV4).Run(func(a *check.Action) { + a.ExecInPod(ctx, ct.CurlClientIPCommand(echo, check.IPFamilyV4)) + }) + i++ + } + } + + if status, ok := ct.Feature(check.FeatureTunnel); ok && status.Enabled == false { + // When connecting from outside the cluster directly to a pod which is selected by an egress policy, the + // reply traffic should not be SNATed with the egress IP (only connections originating from these pods + // should go through egress gateway). + // + // This test is executed only when Cilium is running in direct routing mode, since we can simply add a + // route on the node outside the cluster to direct pod's traffic to the node where the pod is running + // (while in tunneling mode we would need the external node to send the traffic over the tunnel) + + for _, echoPod := range ct.EchoPods() { + targetPodHostIP := echoPod.Pod.Status.HostIP + targetPodIP := echoPod.Pod.Status.PodIP + + for _, externalNode := range ct.NodesWithoutCilium() { + for node, hostNetNSPod := range ct.HostNetNSPodsByNode() { + if node != externalNode { + continue + } + + cmd := []string{"ip", "route", "add", targetPodIP, "via", targetPodHostIP} + _, err := hostNetNSPod.K8sClient.ExecInPod(ctx, hostNetNSPod.Pod.Namespace, hostNetNSPod.Pod.Name, "", cmd) + if err != nil { + t.Fatalf("failed to add ip route: %w", err) + } + + defer func(hostNetNSPod check.Pod) { + cmd = []string{"ip", "route", "del", targetPodIP, "via", targetPodHostIP} + _, err = hostNetNSPod.K8sClient.ExecInPod(ctx, hostNetNSPod.Pod.Namespace, hostNetNSPod.Pod.Name, "", cmd) + if err != nil { + t.Fatalf("failed to delete ip route: %w", err) + } + }(hostNetNSPod) + } + } + } + + i = 0 + for _, client := range ct.ExternalEchoPods() { + for _, echo := range ct.EchoPods() { + t.NewAction(s, fmt.Sprintf("curl-%d", i), &client, echo, check.IPFamilyV4).Run(func(a *check.Action) { + a.ExecInPod(ctx, ct.CurlClientIPCommand(echo, check.IPFamilyV4)) + }) + i++ + } + } + } } // getGatewayNodeInternalIP returns the k8s internal IP of the node acting as @@ -139,6 +230,16 @@ func (s *egressGateway) waitForBpfPolicyEntries(ctx context.Context, t *check.Te }) } + for _, echo := range ct.EchoPods() { + targetEntries = append(targetEntries, + bpfEgressGatewayPolicyEntry{ + SourceIP: echo.Pod.Status.PodIP, + DestCIDR: "0.0.0.0/0", + EgressIP: egressIP, + GatewayIP: gatewayNodeInternalIP.String(), + }) + } + cmd := strings.Split("cilium bpf egress list -o json", " ") stdout, err := ciliumPod.K8sClient.ExecInPod(ctx, ciliumPod.Pod.Namespace, ciliumPod.Pod.Name, defaults.AgentContainerName, cmd) if err != nil { diff --git a/k8s/client.go b/k8s/client.go index 71f97b59d4..40eee4bf02 100644 --- a/k8s/client.go +++ b/k8s/client.go @@ -253,6 +253,10 @@ func (c *Client) GetService(ctx context.Context, namespace, name string, opts me return c.Clientset.CoreV1().Services(namespace).Get(ctx, name, opts) } +func (c *Client) PatchService(ctx context.Context, namespace, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions) (*corev1.Service, error) { + return c.Clientset.CoreV1().Services(namespace).Patch(ctx, name, pt, data, opts) +} + func (c *Client) CreateEndpoints(ctx context.Context, namespace string, ep *corev1.Endpoints, opts metav1.CreateOptions) (*corev1.Endpoints, error) { return c.Clientset.CoreV1().Endpoints(namespace).Create(ctx, ep, opts) }