Skip to content

Commit

Permalink
#267 Added support for Port Annotation in ServiceExport Object (#280)
Browse files Browse the repository at this point in the history
* Added Port Annotations to ServiceExport

* optimized the code

* examples and test

* Moved the port annotations key to a const

* Removed extra logs

* Added integration tests

* Addressed the comments on the PR

---------

Co-authored-by: Salman Esmaili <>
  • Loading branch information
sesmaili authored Jun 28, 2023
1 parent bbc6bb5 commit b830489
Show file tree
Hide file tree
Showing 8 changed files with 435 additions and 4 deletions.
7 changes: 7 additions & 0 deletions examples/elasticsearch-export.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: multicluster.x-k8s.io/v1alpha1
kind: ServiceExport
metadata:
name: elasticsearch
annotations:
multicluster.x-k8s.io/federation: "amazon-vpc-lattice"
multicluster.x-k8s.io/port: "9200"
9 changes: 9 additions & 0 deletions examples/elasticsearch-import.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
apiVersion: multicluster.x-k8s.io/v1alpha1
kind: ServiceImport
metadata:
name: elasticsearch
spec:
type: ClusterSetIP
ports:
- port: 9200
protocol: TCP
44 changes: 44 additions & 0 deletions examples/elasticsearch.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: elasticsearch
spec:
selector:
matchLabels:
app.kubernetes.io/name: elasticsearch
template:
metadata:
labels:
app.kubernetes.io/name: elasticsearch
spec:
containers:
- name: elasticsearch
image: elasticsearch:7.9.3
env:
- name: discovery.type
value: single-node
ports:
- name: http
containerPort: 9200
- name: prometheus-exporter
image: justwatch/elasticsearch_exporter:1.1.0
args:
- '--es.uri=http://localhost:9200'
ports:
- name: http-prometheus
containerPort: 9114
---
apiVersion: v1
kind: Service
metadata:
name: elasticsearch
spec:
selector:
app.kubernetes.io/name: elasticsearch
ports:
- name: http
port: 9200
targetPort: http
- name: http-prometheus
port: 9114
targetPort: http-prometheus
32 changes: 28 additions & 4 deletions pkg/gateway/model_build_targets.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"context"
"errors"
"fmt"
"strconv"

"github.com/golang/glog"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"

lattice_aws "github.com/aws/aws-application-networking-k8s/pkg/aws"
"github.com/aws/aws-application-networking-k8s/pkg/k8s"
Expand All @@ -19,6 +22,8 @@ import (

const (
resourceIDLatticeTargets = "LatticeTargets"
portAnnotationsKey = "multicluster.x-k8s.io/port"
undefinedPort = int64(0)
)

type LatticeTargetsBuilder interface {
Expand Down Expand Up @@ -95,8 +100,6 @@ func (t *latticeTargetsModelBuildTask) buildLatticeTargets(ctx context.Context)
return errors.New(errmsg)
}

endPoints := &corev1.Endpoints{}

svc := &corev1.Service{}
namespacedName := types.NamespacedName{
Namespace: t.tgNamespace,
Expand All @@ -107,7 +110,24 @@ func (t *latticeTargetsModelBuildTask) buildLatticeTargets(ctx context.Context)
errmsg := fmt.Sprintf("Build Targets failed because K8S service %v does not exist", namespacedName)
return errors.New(errmsg)
}

portAnnotations := undefinedPort
serviceExport := &mcs_api.ServiceExport{}
err = t.Client.Get(ctx, namespacedName, serviceExport)
if err != nil {
glog.V(6).Infof("Failed to find Service export in the DS. Name:%v, Namespace:%v - err:%s\n ", t.tgName, t.tgNamespace, err)
} else {
// TODO: Change the code to support multiple comma separated ports instead of a single port
//portsAnnotations := strings.Split(serviceExport.ObjectMeta.Annotations["multicluster.x-k8s.io/Ports"], ",")
portAnnotations, err = strconv.ParseInt(serviceExport.ObjectMeta.Annotations[portAnnotationsKey], 10, 64)
if err != nil {
glog.V(6).Infof("Failed to read Annotaions/Port:%v, err:%s\n ", serviceExport.ObjectMeta.Annotations[portAnnotationsKey], err)
}
glog.V(6).Infof("Build Targets - portAnnotations: %v \n", portAnnotations)
}

var targetList []latticemodel.Target
endPoints := &corev1.Endpoints{}

if svc.DeletionTimestamp.IsZero() {
if err := t.Client.Get(ctx, namespacedName, endPoints); err != nil {
Expand All @@ -119,15 +139,19 @@ func (t *latticeTargetsModelBuildTask) buildLatticeTargets(ctx context.Context)
glog.V(6).Infof("Build Targets: endPoints %v \n", endPoints)

for _, endPoint := range endPoints.Subsets {

for _, address := range endPoint.Addresses {
for _, port := range endPoint.Ports {
glog.V(6).Infof("serviceReconcile-endpoints: address %v, port %v\n", address, port)
target := latticemodel.Target{
TargetIP: address.IP,
Port: int64(port.Port),
}
targetList = append(targetList, target)
if portAnnotations == undefinedPort || int64(target.Port) == portAnnotations {
targetList = append(targetList, target)
glog.V(6).Infof("portAnnotations:%v, target.Port:%v\n", portAnnotations, target.Port)
} else {
glog.V(6).Infof("Found a port match, registering the target - port:%v, containerPort:%v, taerget:%v ***\n", int64(target.Port), portAnnotations, target)
}
}
}
}
Expand Down
55 changes: 55 additions & 0 deletions pkg/gateway/model_build_targets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gateway
import (
"context"
"fmt"
"reflect"
"testing"
"time"

Expand All @@ -15,6 +16,7 @@ import (
"k8s.io/apimachinery/pkg/types"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
testclient "sigs.k8s.io/controller-runtime/pkg/client/fake"
mcs_api "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"

"github.com/aws/aws-application-networking-k8s/pkg/latticestore"
"github.com/aws/aws-application-networking-k8s/pkg/model/core"
Expand All @@ -28,6 +30,7 @@ func Test_Targets(t *testing.T) {
srvExportNamespace string
endPoints []corev1.Endpoints
svc corev1.Service
serviceExport mcs_api.ServiceExport
inDataStore bool
refByServiceExport bool
refByService bool
Expand Down Expand Up @@ -81,6 +84,53 @@ func Test_Targets(t *testing.T) {
},
},
},
{
name: "Add all endpoints to build spec with port annotation",
srvExportName: "export1",
srvExportNamespace: "ns1",
endPoints: []corev1.Endpoints{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
Name: "export1",
},
Subsets: []corev1.EndpointSubset{
{
Addresses: []corev1.EndpointAddress{{IP: "10.10.1.1"}, {IP: "10.10.2.2"}},
Ports: []corev1.EndpointPort{{Name: "a", Port: 8675}, {Name: "b", Port: 3090}},
},
},
},
},
svc: corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
Name: "export1",
DeletionTimestamp: nil,
},
},
serviceExport: mcs_api.ServiceExport{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
Name: "export1",
DeletionTimestamp: nil,
Annotations: map[string]string{"multicluster.x-k8s.io/port": "3090"},
},
},
inDataStore: true,
refByServiceExport: true,
wantErrIsNil: true,
expectedTargetList: []latticemodel.Target{
{
TargetIP: "10.10.1.1",
Port: 3090,
},
{
TargetIP: "10.10.2.2",
Port: 3090,
},
},
},
{
name: "Delete svc and all endpoints to build spec",
srvExportName: "export1",
Expand Down Expand Up @@ -261,9 +311,14 @@ func Test_Targets(t *testing.T) {
ctx := context.TODO()

k8sSchema := runtime.NewScheme()
k8sSchema.AddKnownTypes(mcs_api.SchemeGroupVersion, &mcs_api.ServiceExport{})
clientgoscheme.AddToScheme(k8sSchema)
k8sClient := testclient.NewFakeClientWithScheme(k8sSchema)

if !reflect.DeepEqual(tt.serviceExport, mcs_api.ServiceExport{}) {
assert.NoError(t, k8sClient.Create(ctx, tt.serviceExport.DeepCopy()))
}

if len(tt.endPoints) > 0 {
assert.NoError(t, k8sClient.Create(ctx, tt.endPoints[0].DeepCopy()))
}
Expand Down
151 changes: 151 additions & 0 deletions test/pkg/test/elasticsearch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package test

import (
"github.com/aws/aws-application-networking-k8s/pkg/latticestore"
"github.com/samber/lo"
appsv1 "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/gateway-api/apis/v1beta1"
)

type ElasticSearchOptions struct {
Name string
Namespace string // the object will be created in this namespace
Port int
Port2 int
TargetPort int
MergeFromDeployment []*appsv1.Deployment
MergeFromService []*v1.Service
}

func (env *Framework) NewElasticeApp(options ElasticSearchOptions) (*appsv1.Deployment, *v1.Service) {
if options.Port == 0 {
options.Port = 80
}
if options.Port2 == 0 {
options.Port2 = 9114
}
if options.TargetPort == 0 {
options.TargetPort = 80
}
deployment := New(&appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Namespace: options.Namespace,
},
Spec: appsv1.DeploymentSpec{
Replicas: lo.ToPtr(int32(2)),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": options.Name,
},
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Namespace: options.Namespace,
Labels: map[string]string{
"app": options.Name,
DiscoveryLabel: "true",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: options.Name,
Image: "nginx",
Ports: []v1.ContainerPort{
{
Name: options.Name,
ContainerPort: int32(80),
Protocol: "TCP",
},
},
Env: []v1.EnvVar{{
Name: "PodName",
Value: options.Name + " handler pod",
}},
},
{
Name: "prometheus-exporter",
Image: "justwatch/elasticsearch_exporter:1.1.0",
Ports: []v1.ContainerPort{
{
Name: "http-prometheus",
ContainerPort: int32(9114),
Protocol: "TCP",
},
},
Env: []v1.EnvVar{{
Name: "PodName",
Value: options.Name + " handler pod",
}},
},
},
},
},
},
}, options.MergeFromDeployment...)

service := New(&v1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
},
ObjectMeta: metav1.ObjectMeta{
Namespace: options.Namespace,
},
Spec: v1.ServiceSpec{
Selector: map[string]string{
"app": options.Name,
},
Ports: []v1.ServicePort{
{
Name: options.Name,
Protocol: v1.ProtocolTCP,
Port: int32(options.Port),
},
{
Name: "http-prometheus",
Protocol: v1.ProtocolTCP,
Port: int32(options.Port2),
}},
},
}, options.MergeFromService...)
env.TestCasesCreatedTargetGroupNames[latticestore.TargetGroupName(service.Name, service.Namespace)] = true
env.TestCasesCreatedK8sResource = append(env.TestCasesCreatedK8sResource, service, deployment)
return deployment, service

}

func (env *Framework) NewHttpRoute(parentRefsGateway *v1beta1.Gateway, service *v1.Service) *v1beta1.HTTPRoute {
var rules []v1beta1.HTTPRouteRule
rule := v1beta1.HTTPRouteRule{
BackendRefs: []v1beta1.HTTPBackendRef{{
BackendRef: v1beta1.BackendRef{
BackendObjectReference: v1beta1.BackendObjectReference{
Name: v1beta1.ObjectName(service.Name),
Namespace: (*v1beta1.Namespace)(&service.Namespace),
Kind: lo.ToPtr(v1beta1.Kind("ServiceImport")),
Port: (*v1beta1.PortNumber)(&service.Spec.Ports[0].Port),
},
},
}},
}
rules = append(rules, rule)
httpRoute := New(&v1beta1.HTTPRoute{
ObjectMeta: metav1.ObjectMeta{
Namespace: service.Namespace,
},
Spec: v1beta1.HTTPRouteSpec{
CommonRouteSpec: v1beta1.CommonRouteSpec{
ParentRefs: []v1beta1.ParentReference{{
Name: v1beta1.ObjectName(parentRefsGateway.Name),
//SectionName: lo.ToPtr(v1beta1.SectionName("http")),
}},
},
Rules: rules,
},
})
env.TestCasesCreatedServiceNames[latticestore.LatticeServiceName(httpRoute.Name, httpRoute.Namespace)] = true
env.TestCasesCreatedK8sResource = append(env.TestCasesCreatedK8sResource, httpRoute)
return httpRoute
}
Loading

0 comments on commit b830489

Please sign in to comment.