Skip to content

Commit

Permalink
WIP: connectivity: add full egress gateway test suite
Browse files Browse the repository at this point in the history
Signed-off-by: Gilberto Bertin <[email protected]>
  • Loading branch information
jibi committed May 25, 2023
1 parent 3237c9b commit 3bde22b
Show file tree
Hide file tree
Showing 9 changed files with 340 additions and 1 deletion.
10 changes: 10 additions & 0 deletions connectivity/check/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,9 @@ func (ct *ConnectivityTest) SetupAndValidate(ctx context.Context) error {
if err := ct.validateDeployment(ctx); err != nil {
return err
}
if err := ct.patchEchoServicesWithExternalIPs(ctx); err != nil {
return err
}
if ct.params.Hubble {
if err := ct.enableHubbleClient(ctx); err != nil {
return fmt.Errorf("unable to create hubble client: %s", err)
Expand Down Expand Up @@ -760,6 +763,13 @@ func (ct *ConnectivityTest) PingCommand(peer TestPeer, ipFam IPFamily) []string
return cmd
}

func (ct *ConnectivityTest) DigCommand(peer TestPeer, ipFam IPFamily) []string {
cmd := []string{"dig", "+time=2", "kubernetes"}

cmd = append(cmd, fmt.Sprintf("@%s", peer.Address(ipFam)))
return cmd
}

func (ct *ConnectivityTest) RandomClientPod() *Pod {
for _, p := range ct.clientPods {
return &p
Expand Down
111 changes: 110 additions & 1 deletion connectivity/check/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@ import (
networkingv1 "k8s.io/api/networking/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"

"github.com/cilium/cilium-cli/defaults"
"github.com/cilium/cilium-cli/internal/utils"
"github.com/cilium/cilium-cli/k8s"
)

Expand Down Expand Up @@ -236,6 +238,7 @@ type daemonSetParameters struct {
Labels map[string]string
HostNetwork bool
Tolerations []corev1.Toleration
Capabilities []corev1.Capability
}

func newDaemonSet(p daemonSetParameters) *appsv1.DaemonSet {
Expand Down Expand Up @@ -266,7 +269,7 @@ func newDaemonSet(p daemonSetParameters) *appsv1.DaemonSet {
ReadinessProbe: p.ReadinessProbe,
SecurityContext: &corev1.SecurityContext{
Capabilities: &corev1.Capabilities{
Add: []corev1.Capability{"NET_RAW"},
Add: append([]corev1.Capability{"NET_RAW"}, p.Capabilities...),
},
},
},
Expand Down Expand Up @@ -814,6 +817,7 @@ func (ct *ConnectivityTest) deploy(ctx context.Context) error {
Tolerations: []corev1.Toleration{
{Operator: corev1.TolerationOpExists},
},
Capabilities: []corev1.Capability{"NET_ADMIN"},
})
_, err = ct.clients.src.CreateDaemonSet(ctx, ct.params.TestNamespace, ds, metav1.CreateOptions{})
if err != nil {
Expand Down Expand Up @@ -1202,6 +1206,111 @@ func (ct *ConnectivityTest) validateDeployment(ctx context.Context) error {
return nil
}

// patchEchoServicesWithExternalIPs patches the echo services (echo-same-node
// and echo-other-node) external IPs to the IP of the node running those pods.
//
// This needs to happen after validate as we need to wait for the pods to be
// scheduled on the nodes first, in order to retrieve the pods' host IPs.

func (ct *ConnectivityTest) patchEchoServicesWithExternalIPs(ctx context.Context) error {
patchedServices := []*corev1.Service{}

echoSamePodHostIPs := []string{}
for _, client := range ct.clients.clients() {
echoPods, err := client.ListPods(ctx, ct.params.TestNamespace, metav1.ListOptions{LabelSelector: "name=" + "echo-same-node"})
if err != nil {
return fmt.Errorf("unable to list echo pods: %w", err)
}

for _, echoPod := range echoPods.Items {
echoSamePodHostIPs = append(echoSamePodHostIPs, echoPod.Status.HostIP)
}

}

for _, client := range ct.clients.clients() {
patch := fmt.Sprintf(`{"spec":{"externalIPs":["%s"], "externalTrafficPolicy": "Local"}}`, strings.Join(echoSamePodHostIPs, ","))

service, err := client.PatchService(ctx, ct.params.TestNamespace, echoSameNodeDeploymentName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
if err != nil {
return err
}

fmt.Println("Patched service:", service.Spec.ExternalIPs)

s := ct.echoServices[service.Name]
s.Service = service.DeepCopy()
ct.echoServices[service.Name] = s

patchedServices = append(patchedServices, service)
}

echoOtherPodHostIPs := []string{}
for _, client := range ct.clients.clients() {
echoPods, err := client.ListPods(ctx, ct.params.TestNamespace, metav1.ListOptions{LabelSelector: "name=" + "echo-other-node"})
if err != nil {
return fmt.Errorf("unable to list echo pods: %w", err)
}

for _, echoPod := range echoPods.Items {
echoOtherPodHostIPs = append(echoOtherPodHostIPs, echoPod.Status.HostIP)
}
}

for _, client := range ct.clients.clients() {
patch := fmt.Sprintf(`{"spec":{"externalIPs":["%s"], "externalTrafficPolicy": "Local"}}`, strings.Join(echoOtherPodHostIPs, ","))

service, err := client.PatchService(ctx, ct.params.TestNamespace, echoOtherNodeDeploymentName, types.StrategicMergePatchType, []byte(patch), metav1.PatchOptions{})
if err != nil {
return err
}

fmt.Println("Patched service:", service.Spec.ExternalIPs)

s := ct.echoServices[service.Name]
s.Service = service.DeepCopy()
ct.echoServices[service.Name] = s

patchedServices = append(patchedServices, service)
}

ensureFrontend := func() error {
for _, client := range ct.clients.clients() {
for _, ciliumPod := range ct.CiliumPods() {
for _, service := range patchedServices {
for _, ip := range service.Spec.ExternalIPs {
cmd := []string{"sh", "-c",
fmt.Sprintf("cilium bpf lb list --frontends | grep %s:%d", ip, service.Spec.Ports[0].Port)}

fmt.Println(strings.Join(cmd, " "))
if out, err := client.ExecInPod(ctx, ciliumPod.Pod.Namespace, ciliumPod.Pod.Name, defaults.AgentContainerName, cmd); err != nil {
fmt.Println(out.String())
return err
}
}
}
}
}

return nil
}

w := utils.NewWaitObserver(ctx, utils.WaitParameters{Timeout: 10 * time.Second})
defer w.Cancel()
for {
if err := ensureFrontend(); err != nil {
if err := w.Retry(err); err != nil {
return fmt.Errorf("Failed to ensure external IPs are exposed: %w", err)
}

continue
}
break
}

return nil
}

// Validate that srcPod can query the DNS server on dstPod successfully
func (ct *ConnectivityTest) waitForPodDNS(ctx context.Context, srcPod, dstPod Pod) error {
ct.Logf("⌛ [%s] Waiting for pod %s to reach DNS server on %s pod...", ct.client.ClusterName(), srcPod.Name(), dstPod.Name())
Expand Down
40 changes: 40 additions & 0 deletions connectivity/check/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,46 @@ func (s Service) Labels() map[string]string {
return newMap
}

func (s Service) ToExternalIPService() ExternalIPService {
return ExternalIPService{
Service: s,
}
}

// ExternalIPService TODO
// It implements interface TestPeer.
type ExternalIPService struct {
Service Service
}

func (s ExternalIPService) Name() string {
return s.Service.Name()
}

func (s ExternalIPService) Scheme() string {
return s.Service.Scheme()
}

func (s ExternalIPService) Path() string {
return s.Service.Path()
}

func (s ExternalIPService) Address(_ IPFamily) string {
return s.Service.Service.Spec.ExternalIPs[0]
}

func (s ExternalIPService) Port() uint32 {
return uint32(s.Service.Service.Spec.Ports[0].NodePort)
}

func (s ExternalIPService) HasLabel(name, value string) bool {
return s.Service.HasLabel(name, value)
}

func (s ExternalIPService) Labels() map[string]string {
return s.Service.Labels()
}

// ExternalWorkload is an external workload acting as a peer in a
// connectivity test. It implements interface TestPeer.
type ExternalWorkload struct {
Expand Down
16 changes: 16 additions & 0 deletions connectivity/check/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,22 @@ func (t *Test) WithCiliumEgressGatewayPolicy(policy string) *Test {

// Set the egress gateway node
pl[i].Spec.EgressGateway.NodeSelector.MatchLabels["kubernetes.io/hostname"] = egressGatewayNode

// Set the excluded CIDR
/*
if len(pl[i].Spec.ExcludedCIDRs) != 0 {
excludedCIDRs := []v2.IPv4CIDR{}
for _, nodeWithoutCilium := range t.NodesWithoutCilium() {
fmt.Println("node without cilium:", nodeWithoutCilium)
for lol := range t.Context().Nodes() {
fmt.Println("nodes:", lol)
}
excludedCIDRs = append(excludedCIDRs, v2.IPv4CIDR(fmt.Sprintf("%s/32", t.Context().Nodes()[nodeWithoutCilium].Status.Addresses[0].Address)))
}
pl[i].Spec.ExcludedCIDRs = excludedCIDRs
}
*/
}

if err := t.addCEGPs(pl...); err != nil {
Expand Down
18 changes: 18 additions & 0 deletions connectivity/manifests/egress-gateway-policy-excluded-cidrs.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: cilium.io/v2
kind: CiliumEgressGatewayPolicy
metadata:
name: cegp-sample-excluded-cidrs
spec:
selectors:
- podSelector:
matchLabels:
io.kubernetes.pod.namespace: cilium-test
kind: client
destinationCIDRs:
- 0.0.0.0/0
# excludedCIDRs:
# - NODE_WITHOUT_CILIUM_PLACEHOLDER/32
egressGateway:
nodeSelector:
matchLabels:
kubernetes.io/hostname: NODE_NAME_PLACEHOLDER
16 changes: 16 additions & 0 deletions connectivity/manifests/egress-gateway-policy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,19 @@ spec:
nodeSelector:
matchLabels:
kubernetes.io/hostname: NODE_NAME_PLACEHOLDER
---
apiVersion: cilium.io/v2
kind: CiliumEgressGatewayPolicy
metadata:
name: cegp-sample-echo-service
spec:
selectors:
- podSelector:
matchLabels:
kind: echo
destinationCIDRs:
- 0.0.0.0/0
egressGateway:
nodeSelector:
matchLabels:
kubernetes.io/hostname: NODE_NAME_PLACEHOLDER
15 changes: 15 additions & 0 deletions connectivity/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/cilium/cilium/pkg/versioncheck"

"github.com/cilium/cilium-cli/connectivity/check"
"github.com/cilium/cilium-cli/connectivity/manifests/template"
"github.com/cilium/cilium-cli/connectivity/tests"
Expand Down Expand Up @@ -157,6 +159,9 @@ var (

//go:embed manifests/egress-gateway-policy.yaml
egressGatewayPolicyYAML string

//go:embed manifests/egress-gateway-policy-excluded-cidrs.yaml
egressGatewayPolicyExcludedCIDRsYAML string
)

var (
Expand Down Expand Up @@ -233,6 +238,16 @@ func Run(ctx context.Context, ct *check.ConnectivityTest, addExtraTests func(*ch
tests.EgressGateway(),
)

if versioncheck.MustCompile(">=1.14.0")(ct.CiliumVersion) {
ct.NewTest("egress-gateway-excluded-cidrs").
WithCiliumEgressGatewayPolicy(egressGatewayPolicyExcludedCIDRsYAML).
WithFeatureRequirements(check.RequireFeatureEnabled(check.FeatureEgressGateway),
check.RequireFeatureEnabled(check.FeatureNodeWithoutCilium)).
WithScenarios(
tests.EgressGateway(),
)
}

return ct.Run(ctx)
}

Expand Down
Loading

0 comments on commit 3bde22b

Please sign in to comment.