From b8304892042d352da1ae0f88071190c58e441864 Mon Sep 17 00:00:00 2001 From: Salman <136653428+sesmaili@users.noreply.github.com> Date: Wed, 28 Jun 2023 14:08:17 -0700 Subject: [PATCH] AWS#267 Added support for Port Annotation in ServiceExport Object (#280) * Added Port Annotations to ServiceExport * optimized the code * examples and test * Moved the port annotations key to a const * Removed extra logs * Added integration tests * Addressed the comments on the PR --------- Co-authored-by: Salman Esmaili <> --- examples/elasticsearch-export.yaml | 7 + examples/elasticsearch-import.yaml | 9 ++ examples/elasticsearch.yaml | 44 +++++ pkg/gateway/model_build_targets.go | 32 +++- pkg/gateway/model_build_targets_test.go | 55 +++++++ test/pkg/test/elasticsearch.go | 151 ++++++++++++++++++ test/pkg/test/service_export_import.go | 45 ++++++ .../srvexport_port_annotation_targets_test.go | 96 +++++++++++ 8 files changed, 435 insertions(+), 4 deletions(-) create mode 100644 examples/elasticsearch-export.yaml create mode 100644 examples/elasticsearch-import.yaml create mode 100644 examples/elasticsearch.yaml create mode 100644 test/pkg/test/elasticsearch.go create mode 100644 test/suites/integration/srvexport_port_annotation_targets_test.go diff --git a/examples/elasticsearch-export.yaml b/examples/elasticsearch-export.yaml new file mode 100644 index 00000000..6aae4cd4 --- /dev/null +++ b/examples/elasticsearch-export.yaml @@ -0,0 +1,7 @@ +apiVersion: multicluster.x-k8s.io/v1alpha1 +kind: ServiceExport +metadata: + name: elasticsearch + annotations: + multicluster.x-k8s.io/federation: "amazon-vpc-lattice" + multicluster.x-k8s.io/port: "9200" diff --git a/examples/elasticsearch-import.yaml b/examples/elasticsearch-import.yaml new file mode 100644 index 00000000..2e88d6c4 --- /dev/null +++ b/examples/elasticsearch-import.yaml @@ -0,0 +1,9 @@ +apiVersion: multicluster.x-k8s.io/v1alpha1 +kind: ServiceImport +metadata: + name: elasticsearch +spec: + type: ClusterSetIP + ports: + - port: 9200 + protocol: TCP diff --git a/examples/elasticsearch.yaml b/examples/elasticsearch.yaml new file mode 100644 index 00000000..b275e88d --- /dev/null +++ b/examples/elasticsearch.yaml @@ -0,0 +1,44 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: elasticsearch +spec: + selector: + matchLabels: + app.kubernetes.io/name: elasticsearch + template: + metadata: + labels: + app.kubernetes.io/name: elasticsearch + spec: + containers: + - name: elasticsearch + image: elasticsearch:7.9.3 + env: + - name: discovery.type + value: single-node + ports: + - name: http + containerPort: 9200 + - name: prometheus-exporter + image: justwatch/elasticsearch_exporter:1.1.0 + args: + - '--es.uri=http://localhost:9200' + ports: + - name: http-prometheus + containerPort: 9114 +--- +apiVersion: v1 +kind: Service +metadata: + name: elasticsearch +spec: + selector: + app.kubernetes.io/name: elasticsearch + ports: + - name: http + port: 9200 + targetPort: http + - name: http-prometheus + port: 9114 + targetPort: http-prometheus \ No newline at end of file diff --git a/pkg/gateway/model_build_targets.go b/pkg/gateway/model_build_targets.go index 81a942c0..ff7a2328 100644 --- a/pkg/gateway/model_build_targets.go +++ b/pkg/gateway/model_build_targets.go @@ -4,11 +4,14 @@ import ( "context" "errors" "fmt" + "strconv" + "github.com/golang/glog" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" + mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" lattice_aws "github.com/aws/aws-application-networking-k8s/pkg/aws" "github.com/aws/aws-application-networking-k8s/pkg/k8s" @@ -19,6 +22,8 @@ import ( const ( resourceIDLatticeTargets = "LatticeTargets" + portAnnotationsKey = "multicluster.x-k8s.io/port" + undefinedPort = int64(0) ) type LatticeTargetsBuilder interface { @@ -95,8 +100,6 @@ func (t *latticeTargetsModelBuildTask) buildLatticeTargets(ctx context.Context) return errors.New(errmsg) } - endPoints := &corev1.Endpoints{} - svc := &corev1.Service{} namespacedName := types.NamespacedName{ Namespace: t.tgNamespace, @@ -107,7 +110,24 @@ func (t *latticeTargetsModelBuildTask) buildLatticeTargets(ctx context.Context) errmsg := fmt.Sprintf("Build Targets failed because K8S service %v does not exist", namespacedName) return errors.New(errmsg) } + + portAnnotations := undefinedPort + serviceExport := &mcs_api.ServiceExport{} + err = t.Client.Get(ctx, namespacedName, serviceExport) + if err != nil { + glog.V(6).Infof("Failed to find Service export in the DS. Name:%v, Namespace:%v - err:%s\n ", t.tgName, t.tgNamespace, err) + } else { + // TODO: Change the code to support multiple comma separated ports instead of a single port + //portsAnnotations := strings.Split(serviceExport.ObjectMeta.Annotations["multicluster.x-k8s.io/Ports"], ",") + portAnnotations, err = strconv.ParseInt(serviceExport.ObjectMeta.Annotations[portAnnotationsKey], 10, 64) + if err != nil { + glog.V(6).Infof("Failed to read Annotaions/Port:%v, err:%s\n ", serviceExport.ObjectMeta.Annotations[portAnnotationsKey], err) + } + glog.V(6).Infof("Build Targets - portAnnotations: %v \n", portAnnotations) + } + var targetList []latticemodel.Target + endPoints := &corev1.Endpoints{} if svc.DeletionTimestamp.IsZero() { if err := t.Client.Get(ctx, namespacedName, endPoints); err != nil { @@ -119,7 +139,6 @@ func (t *latticeTargetsModelBuildTask) buildLatticeTargets(ctx context.Context) glog.V(6).Infof("Build Targets: endPoints %v \n", endPoints) for _, endPoint := range endPoints.Subsets { - for _, address := range endPoint.Addresses { for _, port := range endPoint.Ports { glog.V(6).Infof("serviceReconcile-endpoints: address %v, port %v\n", address, port) @@ -127,7 +146,12 @@ func (t *latticeTargetsModelBuildTask) buildLatticeTargets(ctx context.Context) TargetIP: address.IP, Port: int64(port.Port), } - targetList = append(targetList, target) + if portAnnotations == undefinedPort || int64(target.Port) == portAnnotations { + targetList = append(targetList, target) + glog.V(6).Infof("portAnnotations:%v, target.Port:%v\n", portAnnotations, target.Port) + } else { + glog.V(6).Infof("Found a port match, registering the target - port:%v, containerPort:%v, taerget:%v ***\n", int64(target.Port), portAnnotations, target) + } } } } diff --git a/pkg/gateway/model_build_targets_test.go b/pkg/gateway/model_build_targets_test.go index 84d07ead..8e4c0f7e 100644 --- a/pkg/gateway/model_build_targets_test.go +++ b/pkg/gateway/model_build_targets_test.go @@ -3,6 +3,7 @@ package gateway import ( "context" "fmt" + "reflect" "testing" "time" @@ -15,6 +16,7 @@ import ( "k8s.io/apimachinery/pkg/types" clientgoscheme "k8s.io/client-go/kubernetes/scheme" testclient "sigs.k8s.io/controller-runtime/pkg/client/fake" + mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" "github.com/aws/aws-application-networking-k8s/pkg/latticestore" "github.com/aws/aws-application-networking-k8s/pkg/model/core" @@ -28,6 +30,7 @@ func Test_Targets(t *testing.T) { srvExportNamespace string endPoints []corev1.Endpoints svc corev1.Service + serviceExport mcs_api.ServiceExport inDataStore bool refByServiceExport bool refByService bool @@ -81,6 +84,53 @@ func Test_Targets(t *testing.T) { }, }, }, + { + name: "Add all endpoints to build spec with port annotation", + srvExportName: "export1", + srvExportNamespace: "ns1", + endPoints: []corev1.Endpoints{ + { + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns1", + Name: "export1", + }, + Subsets: []corev1.EndpointSubset{ + { + Addresses: []corev1.EndpointAddress{{IP: "10.10.1.1"}, {IP: "10.10.2.2"}}, + Ports: []corev1.EndpointPort{{Name: "a", Port: 8675}, {Name: "b", Port: 3090}}, + }, + }, + }, + }, + svc: corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns1", + Name: "export1", + DeletionTimestamp: nil, + }, + }, + serviceExport: mcs_api.ServiceExport{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "ns1", + Name: "export1", + DeletionTimestamp: nil, + Annotations: map[string]string{"multicluster.x-k8s.io/port": "3090"}, + }, + }, + inDataStore: true, + refByServiceExport: true, + wantErrIsNil: true, + expectedTargetList: []latticemodel.Target{ + { + TargetIP: "10.10.1.1", + Port: 3090, + }, + { + TargetIP: "10.10.2.2", + Port: 3090, + }, + }, + }, { name: "Delete svc and all endpoints to build spec", srvExportName: "export1", @@ -261,9 +311,14 @@ func Test_Targets(t *testing.T) { ctx := context.TODO() k8sSchema := runtime.NewScheme() + k8sSchema.AddKnownTypes(mcs_api.SchemeGroupVersion, &mcs_api.ServiceExport{}) clientgoscheme.AddToScheme(k8sSchema) k8sClient := testclient.NewFakeClientWithScheme(k8sSchema) + if !reflect.DeepEqual(tt.serviceExport, mcs_api.ServiceExport{}) { + assert.NoError(t, k8sClient.Create(ctx, tt.serviceExport.DeepCopy())) + } + if len(tt.endPoints) > 0 { assert.NoError(t, k8sClient.Create(ctx, tt.endPoints[0].DeepCopy())) } diff --git a/test/pkg/test/elasticsearch.go b/test/pkg/test/elasticsearch.go new file mode 100644 index 00000000..43cb2174 --- /dev/null +++ b/test/pkg/test/elasticsearch.go @@ -0,0 +1,151 @@ +package test + +import ( + "github.com/aws/aws-application-networking-k8s/pkg/latticestore" + "github.com/samber/lo" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/gateway-api/apis/v1beta1" +) + +type ElasticSearchOptions struct { + Name string + Namespace string // the object will be created in this namespace + Port int + Port2 int + TargetPort int + MergeFromDeployment []*appsv1.Deployment + MergeFromService []*v1.Service +} + +func (env *Framework) NewElasticeApp(options ElasticSearchOptions) (*appsv1.Deployment, *v1.Service) { + if options.Port == 0 { + options.Port = 80 + } + if options.Port2 == 0 { + options.Port2 = 9114 + } + if options.TargetPort == 0 { + options.TargetPort = 80 + } + deployment := New(&appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: options.Namespace, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: lo.ToPtr(int32(2)), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": options.Name, + }, + }, + Template: v1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: options.Namespace, + Labels: map[string]string{ + "app": options.Name, + DiscoveryLabel: "true", + }, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Name: options.Name, + Image: "nginx", + Ports: []v1.ContainerPort{ + { + Name: options.Name, + ContainerPort: int32(80), + Protocol: "TCP", + }, + }, + Env: []v1.EnvVar{{ + Name: "PodName", + Value: options.Name + " handler pod", + }}, + }, + { + Name: "prometheus-exporter", + Image: "justwatch/elasticsearch_exporter:1.1.0", + Ports: []v1.ContainerPort{ + { + Name: "http-prometheus", + ContainerPort: int32(9114), + Protocol: "TCP", + }, + }, + Env: []v1.EnvVar{{ + Name: "PodName", + Value: options.Name + " handler pod", + }}, + }, + }, + }, + }, + }, + }, options.MergeFromDeployment...) + + service := New(&v1.Service{ + TypeMeta: metav1.TypeMeta{ + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Namespace: options.Namespace, + }, + Spec: v1.ServiceSpec{ + Selector: map[string]string{ + "app": options.Name, + }, + Ports: []v1.ServicePort{ + { + Name: options.Name, + Protocol: v1.ProtocolTCP, + Port: int32(options.Port), + }, + { + Name: "http-prometheus", + Protocol: v1.ProtocolTCP, + Port: int32(options.Port2), + }}, + }, + }, options.MergeFromService...) + env.TestCasesCreatedTargetGroupNames[latticestore.TargetGroupName(service.Name, service.Namespace)] = true + env.TestCasesCreatedK8sResource = append(env.TestCasesCreatedK8sResource, service, deployment) + return deployment, service + +} + +func (env *Framework) NewHttpRoute(parentRefsGateway *v1beta1.Gateway, service *v1.Service) *v1beta1.HTTPRoute { + var rules []v1beta1.HTTPRouteRule + rule := v1beta1.HTTPRouteRule{ + BackendRefs: []v1beta1.HTTPBackendRef{{ + BackendRef: v1beta1.BackendRef{ + BackendObjectReference: v1beta1.BackendObjectReference{ + Name: v1beta1.ObjectName(service.Name), + Namespace: (*v1beta1.Namespace)(&service.Namespace), + Kind: lo.ToPtr(v1beta1.Kind("ServiceImport")), + Port: (*v1beta1.PortNumber)(&service.Spec.Ports[0].Port), + }, + }, + }}, + } + rules = append(rules, rule) + httpRoute := New(&v1beta1.HTTPRoute{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: service.Namespace, + }, + Spec: v1beta1.HTTPRouteSpec{ + CommonRouteSpec: v1beta1.CommonRouteSpec{ + ParentRefs: []v1beta1.ParentReference{{ + Name: v1beta1.ObjectName(parentRefsGateway.Name), + //SectionName: lo.ToPtr(v1beta1.SectionName("http")), + }}, + }, + Rules: rules, + }, + }) + env.TestCasesCreatedServiceNames[latticestore.LatticeServiceName(httpRoute.Name, httpRoute.Namespace)] = true + env.TestCasesCreatedK8sResource = append(env.TestCasesCreatedK8sResource, httpRoute) + return httpRoute +} diff --git a/test/pkg/test/service_export_import.go b/test/pkg/test/service_export_import.go index d7bbb8b8..dab46e91 100644 --- a/test/pkg/test/service_export_import.go +++ b/test/pkg/test/service_export_import.go @@ -1,6 +1,8 @@ package test import ( + "strconv" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" @@ -40,3 +42,46 @@ func (env *Framework) CreateServiceExportAndServiceImportByService(service *v1.S env.TestCasesCreatedK8sResource = append(env.TestCasesCreatedK8sResource, serviceExport, serviceImport) return serviceExport, serviceImport } + +func (env *Framework) CreateServiceExport(service *v1.Service) *v1alpha1.ServiceExport { + serviceExport := New(&v1alpha1.ServiceExport{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "multicluster.x-k8s.io/v1alpha1", + Kind: "ServiceExport", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: service.Name, + Namespace: service.Namespace, + Annotations: map[string]string{ + "multicluster.x-k8s.io/federation": "amazon-vpc-lattice", + "multicluster.x-k8s.io/port": strconv.FormatInt(int64(service.Spec.Ports[0].Port), 10), + }, + }, + }) + env.TestCasesCreatedK8sResource = append(env.TestCasesCreatedK8sResource, serviceExport) + return serviceExport +} + +func (env *Framework) CreateServiceImport(service *v1.Service) *v1alpha1.ServiceImport { + serviceImport := New(&v1alpha1.ServiceImport{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "multicluster.x-k8s.io/v1alpha1", + Kind: "ServiceImport", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: service.Name, + Namespace: service.Namespace, + }, + Spec: v1alpha1.ServiceImportSpec{ + Type: v1alpha1.ClusterSetIP, + Ports: []v1alpha1.ServicePort{ + { + Port: service.Spec.Ports[0].Port, + Protocol: service.Spec.Ports[0].Protocol, + }, + }, + }, + }) + env.TestCasesCreatedK8sResource = append(env.TestCasesCreatedK8sResource, serviceImport) + return serviceImport +} diff --git a/test/suites/integration/srvexport_port_annotation_targets_test.go b/test/suites/integration/srvexport_port_annotation_targets_test.go new file mode 100644 index 00000000..2750f21f --- /dev/null +++ b/test/suites/integration/srvexport_port_annotation_targets_test.go @@ -0,0 +1,96 @@ +package integration + +import ( + "log" + "os" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + v1 "k8s.io/api/core/v1" + "sigs.k8s.io/gateway-api/apis/v1beta1" + "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" + + "github.com/aws/aws-application-networking-k8s/pkg/latticestore" + "github.com/aws/aws-application-networking-k8s/test/pkg/test" + "github.com/aws/aws-sdk-go/service/vpclattice" +) + +var _ = Describe("Port Annotations Targets", func() { + var ( + gateway *v1beta1.Gateway + deployment *appsv1.Deployment + service *v1.Service + serviceExport *v1alpha1.ServiceExport + serviceImport *v1alpha1.ServiceImport + httpRoute *v1beta1.HTTPRoute + vpcLatticeService *vpclattice.ServiceSummary + targetGroup *vpclattice.TargetGroupSummary + ) + + BeforeEach(func() { + gateway = testFramework.NewGateway("test-gateway", k8snamespace) + deployment, service = testFramework.NewElasticeApp(test.ElasticSearchOptions{ + Name: "port-test", + Namespace: k8snamespace, + }) + serviceExport = testFramework.CreateServiceExport(service) + serviceImport = testFramework.CreateServiceImport(service) + httpRoute = testFramework.NewHttpRoute(gateway, service) + testFramework.ExpectCreated( + ctx, + gateway, + serviceExport, + serviceImport, + service, + deployment, + httpRoute, + ) + time.Sleep(3 * time.Minute) // Wait for creation of VPCLattice resources + + // Verify VPC Lattice Service exists + vpcLatticeService = testFramework.GetVpcLatticeService(ctx, httpRoute) + Expect(*vpcLatticeService.DnsEntry).To(ContainSubstring(latticestore.LatticeServiceName(httpRoute.Name, httpRoute.Namespace))) + + // Verify VPC Lattice Target Group exists + targetGroup = testFramework.GetTargetGroup(ctx, service) + Expect(*targetGroup.VpcIdentifier).To(Equal(os.Getenv("CLUSTER_VPC_ID"))) + Expect(*targetGroup.Protocol).To(Equal("HTTP")) + }) + + AfterEach(func() { + testFramework.CleanTestEnvironment(ctx) + testFramework.EventuallyExpectNotFound( + ctx, + gateway, + serviceExport, + serviceImport, + service, + deployment, + httpRoute, + ) + }) + + It("Port Annotaion on Service Export", func() { + + targets := testFramework.GetTargets(ctx, targetGroup, deployment) + Expect(*targetGroup.Port).To(BeEquivalentTo(80)) + log.Println("Verifying Targets are only craeted for the port defined in Port Annotaion in ServiceExport") + for _, target := range targets { + Expect(*target.Port).To(BeEquivalentTo(service.Spec.Ports[0].Port)) + Expect(*target.Status).To(Or( + Equal(vpclattice.TargetStatusInitial), + Equal(vpclattice.TargetStatusHealthy), + )) + log.Println("Target:", target) + } + + testFramework.ExpectDeleted(ctx, service) + Eventually(func(g Gomega) { + log.Println("Verifying Targets are only craeted for the port defined in Port Annotaion in ServiceExport") + targets := testFramework.GetTargets(ctx, targetGroup, deployment) + Expect(len(targets) == 0) + }).WithTimeout(5*time.Minute + 30*time.Second) + }) +})