From 63c6d3a360bb6d323af0e8db07969218be4a5836 Mon Sep 17 00:00:00 2001 From: Aditi Ghag Date: Wed, 29 May 2024 16:41:43 -0700 Subject: [PATCH] connectivity: Introduce local redirect policy tests Tests LRP connectivity scenarios with a configured skipRedirectFromBackend flag: - client pod to LRP frontend - LRP backend to LRP frontend Signed-off-by: Aditi Ghag --- connectivity/builder/builder.go | 1 + connectivity/builder/local_redirect_policy.go | 53 +++++ .../manifests/local-redirect-policy.yaml | 21 ++ connectivity/check/context.go | 12 ++ connectivity/check/deployment.go | 87 ++++++++ connectivity/check/peer.go | 58 +++++- connectivity/check/policy.go | 71 ++++++- connectivity/check/test.go | 54 ++++- connectivity/tests/lrp.go | 190 ++++++++++++++++++ k8s/client.go | 15 +- 10 files changed, 549 insertions(+), 13 deletions(-) create mode 100644 connectivity/builder/local_redirect_policy.go create mode 100644 connectivity/builder/manifests/local-redirect-policy.yaml create mode 100644 connectivity/tests/lrp.go diff --git a/connectivity/builder/builder.go b/connectivity/builder/builder.go index c3789d51c4..3cae8ebbf9 100644 --- a/connectivity/builder/builder.go +++ b/connectivity/builder/builder.go @@ -237,6 +237,7 @@ func concurrentTests(connTests []*check.ConnectivityTest) error { podToK8sOnControlplane{}, podToControlplaneHostCidr{}, podToK8sOnControlplaneCidr{}, + localRedirectPolicy{}, } return injectTests(tests, connTests...) } diff --git a/connectivity/builder/local_redirect_policy.go b/connectivity/builder/local_redirect_policy.go new file mode 100644 index 0000000000..48e04325c3 --- /dev/null +++ b/connectivity/builder/local_redirect_policy.go @@ -0,0 +1,53 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Cilium + +package builder + +import ( + _ "embed" + + "github.com/cilium/cilium-cli/connectivity/check" + "github.com/cilium/cilium-cli/connectivity/tests" + "github.com/cilium/cilium-cli/utils/features" +) + +var ( + //go:embed manifests/local-redirect-policy.yaml + localRedirectPolicyYAML string +) + +type localRedirectPolicy struct{} + +func (t localRedirectPolicy) build(ct *check.ConnectivityTest, _ map[string]string) { + lrpFrontendIP := "169.254.169.254" + lrpFrontendIPSkipRedirect := "169.254.169.255" + newTest("local-redirect-policy", ct). + WithCiliumLocalRedirectPolicy(check.CiliumLocalRedirectPolicyParams{ + Policy: localRedirectPolicyYAML, + Name: "lrp-address-matcher", + FrontendIP: lrpFrontendIP, + SkipRedirectFromBackend: false, + }). + WithCiliumLocalRedirectPolicy(check.CiliumLocalRedirectPolicyParams{ + Policy: localRedirectPolicyYAML, + Name: "lrp-address-matcher-skip-redirect-from-backend", + FrontendIP: lrpFrontendIPSkipRedirect, + SkipRedirectFromBackend: true, + }). + WithFeatureRequirements(features.RequireEnabled(features.LocalRedirectPolicy)). + WithFeatureRequirements(features.RequireEnabled(features.KPRSocketLB)). + WithScenarios( + tests.LRP(false), + tests.LRP(true), + ). + WithExpectations(func(a *check.Action) (egress, ingress check.Result) { + if a.Scenario().Name() == "lrp-skip-redirect-from-backend" { + if a.Source().HasLabel("lrp", "backend") && + a.Destination().Address(features.IPFamilyV4) == lrpFrontendIPSkipRedirect { + return check.ResultCurlTimeout, check.ResultNone + } + return check.ResultOK, check.ResultNone + } + return check.ResultOK, check.ResultNone + }) +} diff --git a/connectivity/builder/manifests/local-redirect-policy.yaml b/connectivity/builder/manifests/local-redirect-policy.yaml new file mode 100644 index 0000000000..9910b3827f --- /dev/null +++ b/connectivity/builder/manifests/local-redirect-policy.yaml @@ -0,0 +1,21 @@ +apiVersion: cilium.io/v2 +kind: CiliumLocalRedirectPolicy +metadata: + name: # set by WithCiliumLocalRedirectPolicy() +spec: + redirectFrontend: + addressMatcher: + ip: # set by WithCiliumLocalRedirectPolicy() + toPorts: + - port: "80" + name: "tcp" + protocol: TCP + redirectBackend: + localEndpointSelector: + matchLabels: + lrp: backend + toPorts: + - port: "8080" + name: "tcp-8080" + protocol: TCP + skipRedirectFromBackend: # set by WithCiliumLocalRedirectPolicy() diff --git a/connectivity/check/context.go b/connectivity/check/context.go index 9099bfcf77..b9c55a26bb 100644 --- a/connectivity/check/context.go +++ b/connectivity/check/context.go @@ -71,6 +71,8 @@ type ConnectivityTest struct { ingressService map[string]Service k8sService Service externalWorkloads map[string]ExternalWorkload + lrpClientPods map[string]Pod + lrpBackendPods map[string]Pod hostNetNSPodsByNode map[string]Pod secondaryNetworkNodeIPv4 map[string]string // node name => secondary ip @@ -204,6 +206,8 @@ func NewConnectivityTest(client *k8s.Client, p Parameters, version string, logge echoExternalPods: make(map[string]Pod), clientPods: make(map[string]Pod), clientCPPods: make(map[string]Pod), + lrpClientPods: make(map[string]Pod), + lrpBackendPods: make(map[string]Pod), perfClientPods: []Pod{}, perfServerPod: []Pod{}, PerfResults: []common.PerfSummary{}, @@ -1117,6 +1121,14 @@ func (ct *ConnectivityTest) EchoPods() map[string]Pod { return ct.echoPods } +func (ct *ConnectivityTest) LrpClientPods() map[string]Pod { + return ct.lrpClientPods +} + +func (ct *ConnectivityTest) LrpBackendPods() map[string]Pod { + return ct.lrpBackendPods +} + // EchoServices returns all the non headless services func (ct *ConnectivityTest) EchoServices() map[string]Service { svcs := map[string]Service{} diff --git a/connectivity/check/deployment.go b/connectivity/check/deployment.go index 4e16298f86..4c6e02f778 100644 --- a/connectivity/check/deployment.go +++ b/connectivity/check/deployment.go @@ -53,6 +53,9 @@ const ( kindEchoExternalNodeName = "echo-external-node" kindClientName = "client" kindPerfName = "perf" + lrpBackendDeploymentName = "lrp-backend" + lrpClientDeploymentName = "lrp-client" + kindLrpName = "lrp" hostNetNSDeploymentName = "host-netns" hostNetNSDeploymentNameNonCilium = "host-netns-non-cilium" // runs on non-Cilium test nodes @@ -934,6 +937,63 @@ func (ct *ConnectivityTest) deploy(ctx context.Context) error { } } } + + if ct.Features[features.LocalRedirectPolicy].Enabled { + ct.Logf("✨ [%s] Deploying lrp-client deployment...", ct.clients.src.ClusterName()) + lrpClientDeployment := newDeployment(deploymentParameters{ + Name: lrpClientDeploymentName, + Kind: kindLrpName, + Image: ct.params.CurlImage, + Command: []string{"/usr/bin/pause"}, + Labels: map[string]string{"lrp": "client"}, + Annotations: ct.params.DeploymentAnnotations.Match(lrpClientDeploymentName), + NodeSelector: ct.params.NodeSelector, + }) + _, err = ct.clients.src.CreateServiceAccount(ctx, ct.params.TestNamespace, k8s.NewServiceAccount(lrpClientDeploymentName), metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("unable to create service account %s: %s", lrpClientDeployment, err) + } + _, err = ct.clients.src.CreateDeployment(ctx, ct.params.TestNamespace, lrpClientDeployment, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("unable to create deployment %s: %s", lrpClientDeployment, err) + } + ct.Logf("✨ [%s] Deploying lrp-backend deployment...", ct.clients.src.ClusterName()) + containerPort := 8080 + lrpBackendDeployment := newDeployment(deploymentParameters{ + Name: lrpBackendDeploymentName, + Kind: kindLrpName, + Image: ct.params.JSONMockImage, + NamedPort: "tcp-8080", + Port: containerPort, + ReadinessProbe: newLocalReadinessProbe(containerPort, "/"), + Labels: map[string]string{"lrp": "backend"}, + Annotations: ct.params.DeploymentAnnotations.Match(lrpBackendDeploymentName), + Affinity: &corev1.Affinity{ + PodAffinity: &corev1.PodAffinity{ + RequiredDuringSchedulingIgnoredDuringExecution: []corev1.PodAffinityTerm{ + { + LabelSelector: &metav1.LabelSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + {Key: "name", Operator: metav1.LabelSelectorOpIn, Values: []string{lrpClientDeploymentName}}, + }, + }, + TopologyKey: corev1.LabelHostname, + }, + }, + }, + }, + NodeSelector: ct.params.NodeSelector, + }) + _, err = ct.clients.src.CreateServiceAccount(ctx, ct.params.TestNamespace, k8s.NewServiceAccount(lrpBackendDeploymentName), metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("unable to create service account %s: %s", lrpBackendDeployment, err) + } + _, err = ct.clients.src.CreateDeployment(ctx, ct.params.TestNamespace, lrpBackendDeployment, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("unable to create deployment %s: %s", lrpBackendDeployment, err) + } + } + return nil } @@ -1087,6 +1147,11 @@ func (ct *ConnectivityTest) deploymentList() (srcList []string, dstList []string srcList = append(srcList, echoExternalNodeDeploymentName) } + if ct.Features[features.LocalRedirectPolicy].Enabled { + srcList = append(srcList, lrpClientDeploymentName) + srcList = append(srcList, lrpBackendDeploymentName) + } + return srcList, dstList } @@ -1182,6 +1247,28 @@ func (ct *ConnectivityTest) validateDeployment(ctx context.Context) error { return nil } + if ct.Features[features.LocalRedirectPolicy].Enabled { + lrpPods, err := ct.client.ListPods(ctx, ct.params.TestNamespace, metav1.ListOptions{LabelSelector: "kind=" + kindLrpName}) + if err != nil { + return fmt.Errorf("unable to list lrp pods: %w", err) + } + for _, lrpPod := range lrpPods.Items { + if v, hasLabel := lrpPod.GetLabels()["lrp"]; hasLabel { + if v == "backend" { + ct.lrpBackendPods[lrpPod.Name] = Pod{ + K8sClient: ct.client, + Pod: lrpPod.DeepCopy(), + } + } else if v == "client" { + ct.lrpClientPods[lrpPod.Name] = Pod{ + K8sClient: ct.client, + Pod: lrpPod.DeepCopy(), + } + } + } + } + } + clientPods, err := ct.client.ListPods(ctx, ct.params.TestNamespace, metav1.ListOptions{LabelSelector: "kind=" + kindClientName}) if err != nil { return fmt.Errorf("unable to list client pods: %s", err) diff --git a/connectivity/check/peer.go b/connectivity/check/peer.go index ee5586a717..a333015bd4 100644 --- a/connectivity/check/peer.go +++ b/connectivity/check/peer.go @@ -9,9 +9,10 @@ import ( "net/url" "strconv" + corev1 "k8s.io/api/core/v1" + "github.com/cilium/cilium/api/v1/flow" ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" - corev1 "k8s.io/api/core/v1" "github.com/cilium/cilium-cli/k8s" "github.com/cilium/cilium-cli/utils/features" @@ -497,6 +498,61 @@ func (he httpEndpoint) FlowFilters() []*flow.FlowFilter { return nil } +type LRPFrontend struct { + name string + ip string + port string +} + +func NewLRPFrontend(frontend ciliumv2.RedirectFrontend) *LRPFrontend { + var lf LRPFrontend + if f := frontend.AddressMatcher; f != nil { + lf.ip = f.IP + lf.port = f.ToPorts[0].Port + lf.name = fmt.Sprintf("%s:%s", lf.ip, lf.port) + + return &lf + } + + return nil +} + +func (l LRPFrontend) Name() string { + return l.name +} + +func (l LRPFrontend) Scheme() string { + return "http" +} + +func (l LRPFrontend) Path() string { + return "" +} + +func (l LRPFrontend) Address(features.IPFamily) string { + return l.ip +} + +func (l LRPFrontend) Port() uint32 { + p, err := strconv.Atoi(l.port) + if err != nil { + return 0 + } + return uint32(p) +} + +func (l LRPFrontend) HasLabel(string, string) bool { + return false +} + +func (l LRPFrontend) Labels() map[string]string { + return nil +} + +func (l LRPFrontend) FlowFilters() []*flow.FlowFilter { + return nil +} + // EchoIPPod is a Kubernetes Pod that prints back the client IP, acting as a peer in a connectivity test. type EchoIPPod struct { Pod diff --git a/connectivity/check/policy.go b/connectivity/check/policy.go index f35588d992..988bed8b04 100644 --- a/connectivity/check/policy.go +++ b/connectivity/check/policy.go @@ -211,6 +211,23 @@ func createOrUpdateCEGP(ctx context.Context, client *k8s.Client, cegp *ciliumv2. return err } +// createOrUpdateCLRP creates the CLRP and updates it if it already exists. +func createOrUpdateCLRP(ctx context.Context, client *k8s.Client, clrp *ciliumv2.CiliumLocalRedirectPolicy) error { + _, err := CreateOrUpdatePolicy(ctx, client.CiliumClientset.CiliumV2().CiliumLocalRedirectPolicies(clrp.Namespace), + clrp, func(current *ciliumv2.CiliumLocalRedirectPolicy) bool { + if maps.Equal(current.GetLabels(), clrp.GetLabels()) && + current.Spec.DeepEqual(&clrp.Spec) { + return false + } + + current.ObjectMeta.Labels = clrp.ObjectMeta.Labels + current.Spec = clrp.Spec + return true + }, + ) + return err +} + // deleteCNP deletes a CiliumNetworkPolicy from the cluster. func deleteCNP(ctx context.Context, client *k8s.Client, cnp *ciliumv2.CiliumNetworkPolicy) error { if err := client.DeleteCiliumNetworkPolicy(ctx, cnp.Namespace, cnp.Name, metav1.DeleteOptions{}); err != nil { @@ -247,6 +264,15 @@ func deleteCEGP(ctx context.Context, client *k8s.Client, cegp *ciliumv2.CiliumEg return nil } +// deleteCLRP deletes a CiliumLocalRedirectPolicy from the cluster. +func deleteCLRP(ctx context.Context, client *k8s.Client, clrp *ciliumv2.CiliumLocalRedirectPolicy) error { + if err := client.DeleteCiliumLocalRedirectPolicy(ctx, clrp.Namespace, clrp.Name, metav1.DeleteOptions{}); err != nil { + return fmt.Errorf("%s/%s/%s policy delete failed: %w", client.ClusterName(), clrp.Namespace, clrp.Name, err) + } + + return nil +} + func defaultDropReason(flow *flowpb.Flow) bool { return flow.GetDropReasonDesc() != flowpb.DropReason_DROP_REASON_UNKNOWN } @@ -334,6 +360,11 @@ func (t *Test) addCEGPs(cegps ...*ciliumv2.CiliumEgressGatewayPolicy) (err error return err } +func (t *Test) addCLRPs(clrps ...*ciliumv2.CiliumLocalRedirectPolicy) (err error) { + t.clrps, err = RegisterPolicy(t.clrps, clrps...) + return err +} + func sumMap(m map[string]int) int { sum := 0 for _, v := range m { @@ -348,7 +379,7 @@ var policyApplyDeleteLock = sync.Mutex{} // applyPolicies applies all the Test's registered network policies. func (t *Test) applyPolicies(ctx context.Context) error { - if len(t.cnps) == 0 && len(t.ccnps) == 0 && len(t.knps) == 0 && len(t.cegps) == 0 { + if len(t.cnps) == 0 && len(t.ccnps) == 0 && len(t.knps) == 0 && len(t.cegps) == 0 && len(t.clrps) == 0 { return nil } @@ -419,6 +450,16 @@ func (t *Test) applyPolicies(ctx context.Context) error { } } + // Apply all given Cilium Local Redirect Policies. + for _, clrp := range t.clrps { + for _, client := range t.Context().clients.clients() { + t.Infof("📜 Applying CiliumLocalRedirectPolicy '%s' to namespace '%s'..", clrp.Name, clrp.Namespace) + if err := createOrUpdateCLRP(ctx, client, clrp); err != nil { + return fmt.Errorf("policy application failed: %w", err) + } + } + } + // Register a finalizer with the Test immediately to enable cleanup. // If we return a cleanup closure from this function, cleanup cannot be // performed if the user cancels during the policy revision wait time. @@ -457,12 +498,16 @@ func (t *Test) applyPolicies(ctx context.Context) error { t.Debugf("📜 Successfully applied %d CiliumEgressGatewayPolicies", len(t.cegps)) } + if len(t.clrps) > 0 { + t.Debugf("📜 Successfully applied %d CiliumLocalRedirectPolicies", len(t.clrps)) + } + return nil } // deletePolicies deletes a given set of network policies from the cluster. func (t *Test) deletePolicies(ctx context.Context) error { - if len(t.cnps) == 0 && len(t.ccnps) == 0 && len(t.knps) == 0 && len(t.cegps) == 0 { + if len(t.cnps) == 0 && len(t.ccnps) == 0 && len(t.knps) == 0 && len(t.cegps) == 0 && len(t.clrps) == 0 { return nil } @@ -522,7 +567,17 @@ func (t *Test) deletePolicies(ctx context.Context) error { } } - if len(t.cnps) != 0 || len(t.ccnps) != 0 || len(t.knps) != 0 { + // Delete all the Test's CLRPs from all clients. + for _, clrp := range t.clrps { + t.Infof("📜 Deleting CiliumLocalRedirectPolicy '%s' from namespace '%s'..", clrp.Name, clrp.Namespace) + for _, client := range t.Context().clients.clients() { + if err := deleteCLRP(ctx, client, clrp); err != nil { + return fmt.Errorf("deleting CiliumLocalRedirectPolicy: %w", err) + } + } + } + + if len(t.cnps) != 0 || len(t.ccnps) != 0 || len(t.knps) != 0 || len(t.clrps) != 0 { // Wait for policies to be deleted on all Cilium nodes. if err := t.waitCiliumPolicyRevisions(ctx, revs, revDeltas); err != nil { return fmt.Errorf("timed out removing policies on Cilium agents: %w", err) @@ -545,6 +600,10 @@ func (t *Test) deletePolicies(ctx context.Context) error { t.Debugf("📜 Successfully deleted %d CiliumEgressGatewayPolicies", len(t.cegps)) } + if len(t.clrps) > 0 { + t.Debugf("📜 Successfully deleted %d CiliumLocalRedirectPolicies", len(t.clrps)) + } + return nil } @@ -609,3 +668,9 @@ func parseK8SPolicyYAML(policy string) (policies []*networkingv1.NetworkPolicy, func parseCiliumEgressGatewayPolicyYAML(policy string) (cegps []*ciliumv2.CiliumEgressGatewayPolicy, err error) { return ParsePolicyYAML[*ciliumv2.CiliumEgressGatewayPolicy](policy, scheme.Scheme) } + +// parseCiliumLocalRedirectPolicyYAML decodes policy yaml into a slice of +// CiliumLocalRedirectPolicies. +func parseCiliumLocalRedirectPolicyYAML(policy string) (clrp []*ciliumv2.CiliumLocalRedirectPolicy, err error) { + return ParsePolicyYAML[*ciliumv2.CiliumLocalRedirectPolicy](policy, scheme.Scheme) +} diff --git a/connectivity/check/test.go b/connectivity/check/test.go index 5bd9a16949..e2ba516e0e 100644 --- a/connectivity/check/test.go +++ b/connectivity/check/test.go @@ -14,10 +14,6 @@ import ( "time" "github.com/blang/semver/v4" - k8sConst "github.com/cilium/cilium/pkg/k8s/apis/cilium.io" - ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" - "github.com/cilium/cilium/pkg/policy/api" - "github.com/cilium/cilium/pkg/versioncheck" "github.com/cloudflare/cfssl/cli/genkey" "github.com/cloudflare/cfssl/config" "github.com/cloudflare/cfssl/csr" @@ -29,6 +25,11 @@ import ( networkingv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + k8sConst "github.com/cilium/cilium/pkg/k8s/apis/cilium.io" + ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" + "github.com/cilium/cilium/pkg/policy/api" + "github.com/cilium/cilium/pkg/versioncheck" + "github.com/cilium/cilium-cli/defaults" "github.com/cilium/cilium-cli/sysdump" "github.com/cilium/cilium-cli/utils/features" @@ -66,6 +67,7 @@ func NewTest(name string, verbose bool, debug bool) *Test { ccnps: make(map[string]*ciliumv2.CiliumClusterwideNetworkPolicy), knps: make(map[string]*networkingv1.NetworkPolicy), cegps: make(map[string]*ciliumv2.CiliumEgressGatewayPolicy), + clrps: make(map[string]*ciliumv2.CiliumLocalRedirectPolicy), logBuf: &bytes.Buffer{}, // maintain internal buffer by default conditionFn: func() bool { return true }, } @@ -121,6 +123,9 @@ type Test struct { // Cilium Egress Gateway Policies active during this test. cegps map[string]*ciliumv2.CiliumEgressGatewayPolicy + // Cilium Local Redirect Policies active during this test. + clrps map[string]*ciliumv2.CiliumLocalRedirectPolicy + // Secrets that have to be present during the test. secrets map[string]*corev1.Secret @@ -537,6 +542,43 @@ func (t *Test) WithK8SPolicy(policy string) *Test { return t } +// CiliumLocalRedirectPolicyParams is used to configure a CiliumLocalRedirectPolicy template. +type CiliumLocalRedirectPolicyParams struct { + // Policy is the local redirect policy yaml. + Policy string + + // Name is the name of the local redirect policy. + Name string + + // FrontendIP is the IP address of the address matcher frontend set in the policy spec. + FrontendIP string + + // SkipRedirectFromBackend is the flag set in the policy spec. + SkipRedirectFromBackend bool +} + +func (t *Test) WithCiliumLocalRedirectPolicy(params CiliumLocalRedirectPolicyParams) *Test { + pl, err := parseCiliumLocalRedirectPolicyYAML(params.Policy) + if err != nil { + t.Fatalf("Parsing local redirect policy YAML: %s", err) + } + + for i := range pl { + pl[i].Namespace = t.ctx.params.TestNamespace + pl[i].Name = params.Name + pl[i].Spec.RedirectFrontend.AddressMatcher.IP = params.FrontendIP + pl[i].Spec.SkipRedirectFromBackend = params.SkipRedirectFromBackend + } + + if err := t.addCLRPs(pl...); err != nil { + t.Fatalf("Adding CLRPs to cilium local redirect policy context: %s", err) + } + + t.WithFeatureRequirements(features.RequireEnabled(features.LocalRedirectPolicy)) + + return t +} + type ExcludedCIDRsKind int const ( @@ -935,3 +977,7 @@ func (t *Test) CiliumClusterwideNetworkPolicies() map[string]*ciliumv2.CiliumClu func (t *Test) KubernetesNetworkPolicies() map[string]*networkingv1.NetworkPolicy { return t.knps } + +func (t *Test) CiliumLocalRedirectPolicies() map[string]*ciliumv2.CiliumLocalRedirectPolicy { + return t.clrps +} diff --git a/connectivity/tests/lrp.go b/connectivity/tests/lrp.go new file mode 100644 index 0000000000..5b37f988e5 --- /dev/null +++ b/connectivity/tests/lrp.go @@ -0,0 +1,190 @@ +// SPDX-License-Identifier: Apache-2.0 +// Copyright Authors of Cilium + +package tests + +import ( + "context" + "encoding/json" + "fmt" + "net" + "strings" + "time" + + v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" + + "github.com/cilium/cilium-cli/connectivity/check" + "github.com/cilium/cilium-cli/defaults" + "github.com/cilium/cilium-cli/utils/features" + "github.com/cilium/cilium-cli/utils/wait" +) + +// LRP runs test scenarios for local redirect policy. It tests local redirection +// connectivity from test source pods to LRP frontend. +// +// It tests connectivity with the configured skipRedirectFromBackend flag for: +// - client pods to LRP frontend +// - LRP backend pods to LRP frontend +func LRP(skipRedirectFromBackend bool) check.Scenario { + return lrp{skipRedirectFromBackend: skipRedirectFromBackend} +} + +type lrp struct { + skipRedirectFromBackend bool +} + +func (s lrp) Name() string { + if s.skipRedirectFromBackend { + return "lrp-skip-redirect-from-backend" + } + return "lrp" +} + +func (s lrp) Run(ctx context.Context, t *check.Test) { + ct := t.Context() + policies := make([]*v2.CiliumLocalRedirectPolicy, 0, len(t.CiliumLocalRedirectPolicies())) + + for _, policy := range t.CiliumLocalRedirectPolicies() { + spec := policy.Spec + if spec.RedirectFrontend.AddressMatcher == nil { + continue + } + policies = append(policies, policy) + frontend := check.NewLRPFrontend(spec.RedirectFrontend) + frontendStr := net.JoinHostPort(frontend.Address(features.IPFamilyV4), fmt.Sprint(frontend.Port())) + lrpBackendsMap := make(map[string][]string) + // Check for LRP backend pods deployed on nodes in the cluster. + for _, pod := range t.Context().LrpBackendPods() { + node := pod.NodeName() + podIP := pod.Pod.Status.PodIP + if _, ok := lrpBackendsMap[node]; !ok { + lrpBackendsMap[node] = []string{podIP} + continue + } + lrpBackendsMap[node] = append(lrpBackendsMap[node], podIP) + } + // Wait until the local redirect entries are plumbed in the BPF LB map + // on the cilium agent nodes hosting LRP backend pods. + WaitForLocalRedirectBPFEntries(ctx, t, frontendStr, lrpBackendsMap) + } + + // Tests client pods to LRP frontend connectivity: traffic gets redirected + // to the LRP backends. + for _, pod := range t.Context().LrpClientPods() { + pod := pod + + for _, policy := range policies { + policy := policy + + if policy.Spec.SkipRedirectFromBackend != s.skipRedirectFromBackend { + continue + } + + i := 0 + lf := check.NewLRPFrontend(policy.Spec.RedirectFrontend) + t.NewAction(s, fmt.Sprintf("curl-%d", i), &pod, lf, features.IPFamilyV4).Run(func(a *check.Action) { + a.ExecInPod(ctx, ct.CurlCommand(lf, features.IPFamilyV4)) + i++ + }) + } + } + + // Tests LRP backend pods to LRP frontend connectivity: traffic gets redirected + // based on the configured skipRedirectFromBackend flag. + for _, pod := range t.Context().LrpBackendPods() { + pod := pod + + for _, policy := range policies { + policy := policy + + if policy.Spec.SkipRedirectFromBackend != s.skipRedirectFromBackend { + continue + } + + i := 0 + lf := check.NewLRPFrontend(policy.Spec.RedirectFrontend) + t.NewAction(s, fmt.Sprintf("curl-%d", i), &pod, lf, features.IPFamilyV4).Run(func(a *check.Action) { + a.ExecInPod(ctx, ct.CurlCommand(lf, features.IPFamilyV4)) + + if policy.Spec.SkipRedirectFromBackend { + a.ValidateFlows(ctx, pod, a.GetEgressRequirements(check.FlowParameters{ + AltDstIP: lf.Address(features.IPFamilyV4), + AltDstPort: lf.Port(), + })) + } + i++ + }) + } + + } +} + +func WaitForLocalRedirectBPFEntries(ctx context.Context, t *check.Test, frontend string, backendsMap map[string][]string) { + ct := t.Context() + w := wait.NewObserver(ctx, wait.Parameters{Timeout: 20 * time.Second}) + defer w.Cancel() + + ensureBPFLBEntries := func() error { + cmd := strings.Split("cilium bpf lb list -o json", " ") + for _, ciliumPod := range ct.CiliumPods() { + node := ciliumPod.Pod.Spec.NodeName + backends, ok := backendsMap[node] + if !ok { + // No LRP backend pods deployed on this node. + continue + } + stdout, err := ciliumPod.K8sClient.ExecInPod(ctx, ciliumPod.Pod.Namespace, ciliumPod.Pod.Name, defaults.AgentContainerName, cmd) + if err != nil { + t.Fatal("Failed to run cilium bpf lb list -o json command:", err) + } + var resMap map[string][]string + err = json.Unmarshal(stdout.Bytes(), &resMap) + if err != nil { + return fmt.Errorf("error unmarshalling data: %w", err) + } + // An LB mapping (frontend, backend) for example: + // 169.254.169.255:80 (1) 10.244.1.210:8080 (132) (1) + parsedLB := make(map[string][]string) + for frontendEntry, backendEntry := range resMap { + // strip the space and parentheses + index := strings.Index(frontendEntry, " ") + if index > 0 { + frontendEntry = frontendEntry[:index] + } + if len(backendEntry) > 0 { + parsedLB[frontendEntry] = append(parsedLB[frontendEntry], backendEntry...) + } + } + parsedBes, ok := parsedLB[frontend] + if !ok { + return fmt.Errorf("frontend [%s] not found in BPF LB map [%+v]", frontend, parsedLB) + } + // Check for frontend and backend mapping in the parsed BPF LB map. + for _, backend := range backends { + found := false + for _, be := range parsedBes { + if strings.Contains(be, backend) { + found = true + break + } + } + if !found { + return fmt.Errorf("frontend [%s] backend [%s] mapping not found in BPF LB map [%s] %+v", frontend, backend, ciliumPod.Pod.Name, parsedLB) + } + } + } + + return nil + } + + for { + if err := ensureBPFLBEntries(); err != nil { + if err := w.Retry(err); err != nil { + t.Fatal("Failed to ensure local redirect BPF entries: %w", err) + } + + continue + } + return + } +} diff --git a/k8s/client.go b/k8s/client.go index d15af3e0fe..91eabfcd7b 100644 --- a/k8s/client.go +++ b/k8s/client.go @@ -18,11 +18,6 @@ import ( "time" "github.com/blang/semver/v4" - "github.com/cilium/cilium/api/v1/models" - ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" - ciliumv2alpha1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1" - ciliumClientset "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned" - "github.com/cilium/cilium/pkg/versioncheck" "helm.sh/helm/v3/pkg/action" "helm.sh/helm/v3/pkg/cli/output" appsv1 "k8s.io/api/apps/v1" @@ -49,6 +44,12 @@ import ( clientcmdapi "k8s.io/client-go/tools/clientcmd/api" "k8s.io/client-go/transport/spdy" + "github.com/cilium/cilium/api/v1/models" + ciliumv2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2" + ciliumv2alpha1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1" + ciliumClientset "github.com/cilium/cilium/pkg/k8s/client/clientset/versioned" + "github.com/cilium/cilium/pkg/versioncheck" + "github.com/cilium/cilium-cli/defaults" ) @@ -685,6 +686,10 @@ func (c *Client) DeleteCiliumEgressGatewayPolicy(ctx context.Context, name strin return c.CiliumClientset.CiliumV2().CiliumEgressGatewayPolicies().Delete(ctx, name, opts) } +func (c *Client) DeleteCiliumLocalRedirectPolicy(ctx context.Context, namespace, name string, opts metav1.DeleteOptions) error { + return c.CiliumClientset.CiliumV2().CiliumLocalRedirectPolicies(namespace).Delete(ctx, name, opts) +} + func (c *Client) ListCiliumBGPPeeringPolicies(ctx context.Context, opts metav1.ListOptions) (*ciliumv2alpha1.CiliumBGPPeeringPolicyList, error) { return c.CiliumClientset.CiliumV2alpha1().CiliumBGPPeeringPolicies().List(ctx, opts) }