Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature to exclude an asset via command line parameter #260

Closed
wants to merge 8 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'master' into exclude-asset
nirvanagit authored Sep 19, 2022
commit 82554a6437414e3aded4270107093eb37954a8be
108 changes: 67 additions & 41 deletions admiral/pkg/clusters/handler.go
Original file line number Diff line number Diff line change
@@ -91,8 +91,23 @@ func getDestinationRule(se *v1alpha32.ServiceEntry, locality string, gtpTrafficP
dr = &v1alpha32.DestinationRule{}
)
dr.Host = se.Hosts[0]
dr.TrafficPolicy = &v1alpha32.TrafficPolicy{Tls: &v1alpha32.ClientTLSSettings{Mode: v1alpha32.ClientTLSSettings_ISTIO_MUTUAL}}
processGtp := true
dr.TrafficPolicy = &v1alpha32.TrafficPolicy{
Tls: &v1alpha32.ClientTLSSettings{
Mode: v1alpha32.ClientTLSSettings_ISTIO_MUTUAL,
},
ConnectionPool: &v1alpha32.ConnectionPoolSettings{
Http: &v1alpha32.ConnectionPoolSettings_HTTPSettings{
Http2MaxRequests: DefaultHTTP2MaxRequests,
MaxRequestsPerConnection: DefaultMaxRequestsPerConnection,
},
},
LoadBalancer: &v1alpha32.LoadBalancerSettings{
LbPolicy: &v1alpha32.LoadBalancerSettings_Simple{
Simple: v1alpha32.LoadBalancerSettings_LEAST_REQUEST,
},
},
}
if len(locality) == 0 {
log.Warnf(LogErrFormat, "Process", "GlobalTrafficPolicy", dr.Host, "", "Skipping gtp processing, locality of the cluster nodes cannot be determined. Is this minikube?")
processGtp = false
@@ -310,25 +325,22 @@ func IgnoreIstioResource(exportTo []string, annotations map[string]string, names
}

func handleDestinationRuleEvent(ctx context.Context, obj *v1alpha3.DestinationRule, dh *DestinationRuleHandler, event common.Event, resourceType common.ResourceType) {
//nolint
destinationRule := obj.Spec

clusterId := dh.ClusterID

syncNamespace := common.GetSyncNamespace()

r := dh.RemoteRegistry

dependentClusters := r.AdmiralCache.CnameDependentClusterCache.Get(destinationRule.Host).Copy()

var (
//nolint
destinationRule = obj.Spec
clusterId = dh.ClusterID
syncNamespace = common.GetSyncNamespace()
r = dh.RemoteRegistry
dependentClusters = r.AdmiralCache.CnameDependentClusterCache.Get(destinationRule.Host).Copy()
allDependentClusters = make(map[string]string)
)
if len(dependentClusters) > 0 {
log.Infof(LogFormat, "Event", "DestinationRule", obj.Name, clusterId, "Processing")
util.MapCopy(allDependentClusters, dependentClusters)
allDependentClusters[clusterId] = clusterId
for _, dependentCluster := range allDependentClusters {
rc := r.GetRemoteController(dependentCluster)
if event == common.Delete {

err := rc.DestinationRuleController.IstioClient.NetworkingV1alpha3().DestinationRules(syncNamespace).Delete(ctx, obj.Name, v12.DeleteOptions{})
if err != nil {
if k8sErrors.IsNotFound(err) {
@@ -340,9 +352,7 @@ func handleDestinationRuleEvent(ctx context.Context, obj *v1alpha3.DestinationRu
log.Infof(LogFormat, "Delete", "DestinationRule", obj.Name, clusterId, "Success")
}
} else {

exist, _ := rc.DestinationRuleController.IstioClient.NetworkingV1alpha3().DestinationRules(syncNamespace).Get(ctx, obj.Name, v12.GetOptions{})

//copy destination rule only to other clusters
if dependentCluster != clusterId {
addUpdateDestinationRule(ctx, obj, exist, syncNamespace, rc)
@@ -378,19 +388,17 @@ func handleDestinationRuleEvent(ctx context.Context, obj *v1alpha3.DestinationRu
}
}

func handleVirtualServiceEvent(ctx context.Context, obj *v1alpha3.VirtualService, vh *VirtualServiceHandler, event common.Event, resourceType common.ResourceType) error {

func handleVirtualServiceEvent(
ctx context.Context, obj *v1alpha3.VirtualService, vh *VirtualServiceHandler,
event common.Event, resourceType common.ResourceType) error {
var (
//nolint
virtualService = obj.Spec
clusterId = vh.ClusterID
r = vh.RemoteRegistry
syncNamespace = common.GetSyncNamespace()
)
log.Infof(LogFormat, "Event", resourceType, obj.Name, vh.ClusterID, "Received event")

//nolint
virtualService := obj.Spec

clusterId := vh.ClusterID

r := vh.RemoteRegistry

syncNamespace := common.GetSyncNamespace()

if len(virtualService.Hosts) > 1 {
log.Errorf(LogFormat, "Event", resourceType, obj.Name, clusterId, "Skipping as multiple hosts not supported for virtual service namespace="+obj.Namespace)
return nil
@@ -420,7 +428,6 @@ func handleVirtualServiceEvent(ctx context.Context, obj *v1alpha3.VirtualService
if clusterId != dependentCluster {
log.Infof(LogFormat, "Event", "VirtualService", obj.Name, clusterId, "Processing")
if event == common.Delete {
log.Infof(LogFormat, "Delete", "VirtualService", obj.Name, clusterId, "Success")
err := rc.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(syncNamespace).Delete(ctx, obj.Name, v12.DeleteOptions{})
if err != nil {
if k8sErrors.IsNotFound(err) {
@@ -432,9 +439,7 @@ func handleVirtualServiceEvent(ctx context.Context, obj *v1alpha3.VirtualService
log.Infof(LogFormat, "Delete", "VirtualService", obj.Name, clusterId, "Success")
}
} else {

exist, _ := rc.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(syncNamespace).Get(ctx, obj.Name, v12.GetOptions{})

//change destination host for all http routes <service_name>.<ns>. to same as host on the virtual service
for _, httpRoute := range virtualService.Http {
for _, destination := range httpRoute.Route {
@@ -452,7 +457,6 @@ func handleVirtualServiceEvent(ctx context.Context, obj *v1alpha3.VirtualService
}
}
}

addUpdateVirtualService(ctx, obj, exist, syncNamespace, rc)
}
}
@@ -489,8 +493,11 @@ func handleVirtualServiceEvent(ctx context.Context, obj *v1alpha3.VirtualService
}

func addUpdateVirtualService(ctx context.Context, obj *v1alpha3.VirtualService, exist *v1alpha3.VirtualService, namespace string, rc *RemoteController) {
var err error
var op string
var (
err error
op string
)

if obj.Annotations == nil {
obj.Annotations = map[string]string{}
}
@@ -516,10 +523,30 @@ func addUpdateVirtualService(ctx context.Context, obj *v1alpha3.VirtualService,
}
}

func validateAndProcessServiceEntryEndpoints(obj *v1alpha3.ServiceEntry) bool {
var areEndpointsValid = true

temp := make([]*v1alpha32.WorkloadEntry, 0)
for _, endpoint := range obj.Spec.Endpoints {
if endpoint.Address == "dummy.admiral.global" {
areEndpointsValid = false
} else {
temp = append(temp, endpoint)
}
}
obj.Spec.Endpoints = temp
log.Infof("type=ServiceEntry, name=%s, endpointsValid=%v, numberOfValidEndpoints=%d", obj.Name, areEndpointsValid, len(obj.Spec.Endpoints))

return areEndpointsValid
}

func addUpdateServiceEntry(ctx context.Context, obj *v1alpha3.ServiceEntry, exist *v1alpha3.ServiceEntry, namespace string, rc *RemoteController) {
var err error
var op, diff string
var skipUpdate bool
var (
err error
op, diff string
skipUpdate bool
)

if obj.Annotations == nil {
obj.Annotations = map[string]string{}
}
@@ -528,9 +555,6 @@ func addUpdateServiceEntry(ctx context.Context, obj *v1alpha3.ServiceEntry, exis
areEndpointsValid := validateAndProcessServiceEntryEndpoints(obj)

if exist == nil || exist.Spec.Hosts == nil {
obj.Namespace = namespace
obj.ResourceVersion = ""
_, err = rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries(namespace).Create(ctx, obj, v12.CreateOptions{})
op = "Add"
//se will be created if endpoints are valid, in case they are not valid se will be created with just valid endpoints
if len(obj.Spec.Endpoints) > 0 {
@@ -737,7 +761,10 @@ func getDependentClusters(dependents map[string]string, identityClusterCache *co
}

func copyEndpoint(e *v1alpha32.WorkloadEntry) *v1alpha32.WorkloadEntry {
labels := make(map[string]string)
var (
labels = make(map[string]string)
ports = make(map[string]uint32)
)
util.MapCopy(labels, e.Labels)
util.MapCopy(ports, e.Ports)
return &v1alpha32.WorkloadEntry{Address: e.Address, Ports: ports, Locality: e.Locality, Labels: labels}
@@ -747,7 +774,6 @@ func copyEndpoint(e *v1alpha32.WorkloadEntry) *v1alpha32.WorkloadEntry {
// 1. Canary strategy - which can use a virtual service to manage the weights associated with a stable and canary service. Admiral created endpoints in service entries will use the weights assigned in the Virtual Service
// 2. Blue green strategy- this contains 2 service instances in a namespace, an active service and a preview service. Admiral will use repective service to create active and preview endpoints
func getServiceForRollout(ctx context.Context, rc *RemoteController, rollout *argo.Rollout) map[string]*WeightedService {

if rollout == nil {
return nil
}
75 changes: 42 additions & 33 deletions admiral/pkg/clusters/handler_test.go
Original file line number Diff line number Diff line change
@@ -184,8 +184,23 @@ func TestGetDestinationRule(t *testing.T) {
Interval: &duration.Duration{Seconds: 60},
MaxEjectionPercent: 100,
}
mTLS := &v1alpha3.TrafficPolicy{Tls: &v1alpha3.ClientTLSSettings{Mode: v1alpha3.ClientTLSSettings_ISTIO_MUTUAL}, OutlierDetection: outlierDetection}

mTLS := &v1alpha3.TrafficPolicy{
Tls: &v1alpha3.ClientTLSSettings{
Mode: v1alpha3.ClientTLSSettings_ISTIO_MUTUAL,
},
OutlierDetection: outlierDetection,
ConnectionPool: &v1alpha3.ConnectionPoolSettings{
Http: &v1alpha3.ConnectionPoolSettings_HTTPSettings{
Http2MaxRequests: DefaultHTTP2MaxRequests,
MaxRequestsPerConnection: DefaultMaxRequestsPerConnection,
},
},
LoadBalancer: &v1alpha3.LoadBalancerSettings{
LbPolicy: &v1alpha3.LoadBalancerSettings_Simple{
Simple: v1alpha3.LoadBalancerSettings_LEAST_REQUEST,
},
},
}
se := &v1alpha3.ServiceEntry{Hosts: []string{"qa.myservice.global"}, Endpoints: []*v1alpha3.WorkloadEntry{
{Address: "east.com", Locality: "us-east-2"}, {Address: "west.com", Locality: "us-west-2"},
}}
@@ -583,17 +598,14 @@ func TestHandleVirtualServiceEvent(t *testing.T) {
handlerEmptyClient := VirtualServiceHandler{
RemoteRegistry: rr1,
}
ctx := context.Background()
fullFakeIstioClient := istiofake.NewSimpleClientset()
fullFakeIstioClient.NetworkingV1alpha3().VirtualServices("ns").Create(ctx, &v1alpha32.VirtualService{
ObjectMeta: v12.ObjectMeta{
ObjectMeta: metaV1.ObjectMeta{
Name: "vs-name",
},
Spec: v1alpha3.VirtualService{
Hosts: []string{"e2e.blah.global"},
},
}, v12.CreateOptions{})
rr2 := NewRemoteRegistry(nil, common.AdmiralParams{})
}, metaV1.CreateOptions{})
rr2.AdmiralCache = &AdmiralCache{
CnameDependentClusterCache: goodCnameCache,
SeClusterCache: common.NewMapOfMaps(),
@@ -684,7 +696,6 @@ func TestHandleVirtualServiceEvent(t *testing.T) {
//Run the test for every provided case
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {

err := handleVirtualServiceEvent(ctx, c.vs, c.handler, c.event, common.VirtualService)
if err != c.expectedError {
t.Fatalf("Error mismatch, expected %v but got %v", c.expectedError, err)
@@ -861,9 +872,9 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
},
}
ctx := context.Background()
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(Namespace).Create(ctx, virtualService, v12.CreateOptions{})
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(Namespace).Create(ctx, vsMutipleRoutesWithMatch, v12.CreateOptions{})
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(Namespace).Create(ctx, vsMutipleRoutesWithZeroWeight, v12.CreateOptions{})
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(Namespace).Create(ctx, virtualService, metaV1.CreateOptions{})
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(Namespace).Create(ctx, vsMutipleRoutesWithMatch, metaV1.CreateOptions{})
rcTemp.VirtualServiceController.IstioClient.NetworkingV1alpha3().VirtualServices(Namespace).Create(ctx, vsMutipleRoutesWithZeroWeight, metaV1.CreateOptions{})

canaryRollout := argo.Rollout{
Spec: argo.RolloutSpec{Template: coreV1.PodTemplateSpec{
@@ -927,7 +938,7 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: &argo.IstioVirtualService{Name: VS_NAME_1},
VirtualService: &argo.IstioVirtualService{Name: vsName1},
},
},
},
@@ -946,7 +957,7 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: &argo.IstioVirtualService{Name: VS_NAME_2, Routes: []string{VS_ROUTE_PRIMARY}},
VirtualService: &argo.IstioVirtualService{Name: vsName2, Routes: []string{vsRoutePrimary}},
},
},
},
@@ -965,7 +976,7 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: &argo.IstioVirtualService{Name: VS_NAME_2, Routes: []string{"random"}},
VirtualService: &argo.IstioVirtualService{Name: vsName2, Routes: []string{"random"}},
},
},
},
@@ -984,7 +995,7 @@ func TestGetServiceForRolloutCanary(t *testing.T) {
CanaryService: CanaryServiceName,
TrafficRouting: &argo.RolloutTrafficRouting{
Istio: &argo.IstioTrafficRouting{
VirtualService: &argo.IstioVirtualService{Name: VS_NAME_4},
VirtualService: &argo.IstioVirtualService{Name: vsName4},
},
},
},
@@ -1459,31 +1470,31 @@ func TestSkipDestructiveUpdate(t *testing.T) {
}

newSeTwoEndpoints := &v1alpha32.ServiceEntry{
ObjectMeta: v12.ObjectMeta{Name: "se1", Namespace: "random"},
ObjectMeta: metaV1.ObjectMeta{Name: "se1", Namespace: "random"},
//nolint
Spec: twoEndpointSe,
}

newSeTwoEndpointsUpdated := &v1alpha32.ServiceEntry{
ObjectMeta: v12.ObjectMeta{Name: "se1", Namespace: "random"},
ObjectMeta: metaV1.ObjectMeta{Name: "se1", Namespace: "random"},
//nolint
Spec: twoEndpointSeUpdated,
}

newSeOneEndpoint := &v1alpha32.ServiceEntry{
ObjectMeta: v12.ObjectMeta{Name: "se1", Namespace: "random"},
ObjectMeta: metaV1.ObjectMeta{Name: "se1", Namespace: "random"},
//nolint
Spec: oneEndpointSe,
}

oldSeTwoEndpoints := &v1alpha32.ServiceEntry{
ObjectMeta: v12.ObjectMeta{Name: "se1", Namespace: "random"},
ObjectMeta: metaV1.ObjectMeta{Name: "se1", Namespace: "random"},
//nolint
Spec: twoEndpointSe,
}

oldSeOneEndpoint := &v1alpha32.ServiceEntry{
ObjectMeta: v12.ObjectMeta{Name: "se1", Namespace: "random"},
ObjectMeta: metaV1.ObjectMeta{Name: "se1", Namespace: "random"},
//nolint
Spec: oneEndpointSe,
}
@@ -1574,15 +1585,13 @@ func TestSkipDestructiveUpdate(t *testing.T) {
}

func TestAddUpdateServiceEntry(t *testing.T) {

ctx := context.Background()

fakeIstioClient := istiofake.NewSimpleClientset()

seCtrl := &istio.ServiceEntryController{
IstioClient: fakeIstioClient,
}

var (
ctx = context.Background()
fakeIstioClient = istioFake.NewSimpleClientset()
seCtrl = &istio.ServiceEntryController{
IstioClient: fakeIstioClient,
}
)
twoEndpointSe := v1alpha3.ServiceEntry{
Hosts: []string{"e2e.my-first-service.mesh"},
Addresses: []string{"240.10.1.1"},
@@ -1631,18 +1640,18 @@ func TestAddUpdateServiceEntry(t *testing.T) {
}

newSeOneEndpoint := &v1alpha32.ServiceEntry{
ObjectMeta: v12.ObjectMeta{Name: "se1", Namespace: "namespace"},
ObjectMeta: metaV1.ObjectMeta{Name: "se1", Namespace: "namespace"},
//nolint
Spec: oneEndpointSe,
}

oldSeTwoEndpoints := &v1alpha32.ServiceEntry{
ObjectMeta: v12.ObjectMeta{Name: "se2", Namespace: "namespace"},
ObjectMeta: metaV1.ObjectMeta{Name: "se2", Namespace: "namespace"},
//nolint
Spec: twoEndpointSe,
}

_, err := seCtrl.IstioClient.NetworkingV1alpha3().ServiceEntries("namespace").Create(ctx, oldSeTwoEndpoints, v12.CreateOptions{})
_, err := seCtrl.IstioClient.NetworkingV1alpha3().ServiceEntries("namespace").Create(ctx, oldSeTwoEndpoints, metaV1.CreateOptions{})
if err != nil {
t.Error(err)
}
@@ -1701,7 +1710,7 @@ func TestAddUpdateServiceEntry(t *testing.T) {
addUpdateServiceEntry(ctx, c.newSe, c.oldSe, "namespace", c.rc)
if c.skipDestructive {
//verify the update did not go through
se, err := c.rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries("namespace").Get(ctx, c.oldSe.Name, v12.GetOptions{})
se, err := c.rc.ServiceEntryController.IstioClient.NetworkingV1alpha3().ServiceEntries("namespace").Get(ctx, c.oldSe.Name, metaV1.GetOptions{})
if err != nil {
t.Error(err)
}
34 changes: 19 additions & 15 deletions admiral/pkg/clusters/serviceentry.go
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ type SeDrTuple struct {
DestinationRule *networking.DestinationRule
}

func createServiceEntry(ctx context.Context, event admiral.EventType, rc *RemoteController, admiralCache *AdmiralCache,
func createServiceEntryForDeployment(ctx context.Context, event admiral.EventType, rc *RemoteController, admiralCache *AdmiralCache,
meshPorts map[string]uint32, destDeployment *k8sAppsV1.Deployment, serviceEntries map[string]*networking.ServiceEntry) *networking.ServiceEntry {

workloadIdentityKey := common.GetWorkloadIdentifier()
@@ -54,8 +54,8 @@ func createServiceEntry(ctx context.Context, event admiral.EventType, rc *Remote
}

func modifyServiceEntryForNewServiceOrPod(
ctx context.Context, event admiral.EventType, env string, sourceIdentity string,
remoteRegistry *RemoteRegistry) map[string]*networking.ServiceEntry {
ctx context.Context, event admiral.EventType, env string,
sourceIdentity string, remoteRegistry *RemoteRegistry) map[string]*networking.ServiceEntry {
defer util.LogElapsedTime("modifyServiceEntryForNewServiceOrPod", sourceIdentity, env, "")()
if CurrentAdmiralState.ReadOnly {
log.Infof(LogFormat, event, env, sourceIdentity, "", "Processing skipped as Admiral is in Read-only mode")
@@ -74,15 +74,15 @@ func modifyServiceEntryForNewServiceOrPod(
rollout *argo.Rollout
deployment *k8sAppsV1.Deployment
gtps = make(map[string][]*v1.GlobalTrafficPolicy)
start = time.Now()
gtpKey = common.ConstructGtpKey(env, sourceIdentity)
clusters = remoteRegistry.GetClusterIds()
cnames = make(map[string]string)
sourceServices = make(map[string]*k8sV1.Service)
sourceWeightedServices = make(map[string]map[string]*WeightedService)
sourceDeployments = make(map[string]*k8sAppsV1.Deployment)
sourceRollouts = make(map[string]*argo.Rollout)
serviceEntries = make(map[string]*networking.ServiceEntry)
cnames = make(map[string]string)
start = time.Now()
gtpKey = common.ConstructGtpKey(env, sourceIdentity)
clusters = remoteRegistry.GetClusterIds()
)

for _, clusterId := range clusters {
@@ -97,12 +97,10 @@ func modifyServiceEntryForNewServiceOrPod(
if rc.RolloutController != nil {
rollout = rc.RolloutController.Cache.Get(sourceIdentity, env)
}

if deployment == nil && rollout == nil {
log.Infof("Neither deployment nor rollouts found for identity=%s in env=%s namespace=%s", sourceIdentity, env, namespace)
continue
}

if deployment != nil {
if isAnExcludedAsset(common.GetDeploymentGlobalIdentifier(deployment), remoteRegistry.ExcludeAssetList) {
log.Infof(LogFormat, event, env, sourceIdentity, clusterId, "Processing skipped as asset is in the exclude list")
@@ -117,7 +115,7 @@ func modifyServiceEntryForNewServiceOrPod(
localMeshPorts := GetMeshPorts(rc.ClusterID, serviceInstance, deployment)
cname = common.GetCname(deployment, common.GetWorkloadIdentifier(), common.GetHostnameSuffix())
sourceDeployments[rc.ClusterID] = deployment
createServiceEntry(ctx, event, rc, remoteRegistry.AdmiralCache, localMeshPorts, deployment, serviceEntries)
createServiceEntryForDeployment(ctx, event, rc, remoteRegistry.AdmiralCache, localMeshPorts, deployment, serviceEntries)
} else if rollout != nil {
if isAnExcludedAsset(common.GetRolloutGlobalIdentifier(rollout), remoteRegistry.ExcludeAssetList) {
log.Infof(LogFormat, event, env, sourceIdentity, clusterId, "Processing skipped as asset is in the exclude list")
@@ -140,6 +138,8 @@ func modifyServiceEntryForNewServiceOrPod(
cnames[cname] = "1"
sourceRollouts[rc.ClusterID] = rollout
createServiceEntryForRollout(ctx, event, rc, remoteRegistry.AdmiralCache, localMeshPorts, rollout, serviceEntries)
} else {
continue
}

gtpsInNamespace := rc.GlobalTraffic.Cache.Get(gtpKey, namespace)
@@ -380,7 +380,6 @@ func modifySidecarForLocalClusterCommunication(ctx context.Context, sidecarNames
if err != nil {
return
}

if sidecar == nil || (sidecar.Spec.Egress == nil) {
return
}
@@ -417,7 +416,6 @@ func modifySidecarForLocalClusterCommunication(ctx context.Context, sidecarNames
func addUpdateSidecar(ctx context.Context, obj *v1alpha3.Sidecar, exist *v1alpha3.Sidecar, namespace string, rc *RemoteController) {
var err error
_, err = rc.SidecarController.IstioClient.NetworkingV1alpha3().Sidecars(namespace).Update(ctx, obj, v12.UpdateOptions{})

if err != nil {
log.Infof(LogErrFormat, "Update", "Sidecar", obj.Name, rc.ClusterID, err)
} else {
@@ -435,7 +433,6 @@ func copySidecar(sidecar *v1alpha3.Sidecar) *v1alpha3.Sidecar {

//AddServiceEntriesWithDr will create the default service entries and also additional ones specified in GTP
func AddServiceEntriesWithDr(ctx context.Context, rr *RemoteRegistry, sourceClusters map[string]string, serviceEntries map[string]*networking.ServiceEntry) {

cache := rr.AdmiralCache
syncNamespace := common.GetSyncNamespace()
for _, se := range serviceEntries {
@@ -475,8 +472,16 @@ func AddServiceEntriesWithDr(ctx context.Context, rr *RemoteRegistry, sourceClus
log.Infof(LogFormat, "Get (error)", "old DestinationRule", seDr.DrName, sourceCluster, err)
oldDestinationRule = nil
}
var deleteOldServiceEntry = false
if oldServiceEntry != nil {
areEndpointsValid := validateAndProcessServiceEntryEndpoints(oldServiceEntry)
if !areEndpointsValid && len(oldServiceEntry.Spec.Endpoints) == 0 {
deleteOldServiceEntry = true
}
}

if len(seDr.ServiceEntry.Endpoints) == 0 {
//clean service entry in case no endpoints are configured or if all the endpoints are invalid
if (len(seDr.ServiceEntry.Endpoints) == 0) || deleteOldServiceEntry {
deleteServiceEntry(ctx, oldServiceEntry, syncNamespace, rc)
cache.SeClusterCache.Delete(seDr.ServiceEntry.Hosts[0])
// after deleting the service entry, destination rule also need to be deleted if the service entry host no longer exists
@@ -737,7 +742,6 @@ func getUniqueAddress(ctx context.Context, admiralCache *AdmiralCache, globalFqd
address, needsCacheUpdate, err = GetLocalAddressForSe(
ctx, getIstioResourceName(globalFqdn, "-se"),
admiralCache.ServiceEntryAddressStore, admiralCache.ConfigMapController)

if err != nil {
log.Errorf("Error getting local address for Service Entry. Err: %v", err)
break
35 changes: 27 additions & 8 deletions admiral/pkg/clusters/serviceentry_test.go
Original file line number Diff line number Diff line change
@@ -405,17 +405,30 @@ func TestAddServiceEntriesWithDr(t *testing.T) {
Endpoints: []*istioNetworkingV1Alpha3.WorkloadEntry{},
}

dummyEndpointSe := istionetworkingv1alpha3.ServiceEntry{
Hosts: []string{"dev.dummy.global"},
Endpoints: []*istionetworkingv1alpha3.WorkloadEntry{
{Address: "dummy.admiral.global", Ports: map[string]uint32{"https": 80}, Labels: map[string]string{}, Network: "mesh1", Locality: "us-west", Weight: 100},
},
}

seConfig := v1alpha3.ServiceEntry{
//nolint
Spec: se,
}
seConfig.Name = "se1"
seConfig.Namespace = "admiral-sync"

dummySeConfig := v1alpha3.ServiceEntry{
//nolint
Spec: dummyEndpointSe,
}
dummySeConfig.Name = "dummySe"
dummySeConfig.Namespace = "admiral-sync"
ctx := context.Background()

fakeIstioClient := istiofake.NewSimpleClientset()
fakeIstioClient.NetworkingV1alpha3().ServiceEntries("admiral-sync").Create(ctx, &seConfig, v12.CreateOptions{})
fakeIstioClient.NetworkingV1alpha3().ServiceEntries("admiral-sync").Create(ctx, &dummySeConfig, v12.CreateOptions{})

rc := &RemoteController{
ServiceEntryController: &istio.ServiceEntryController{
IstioClient: fakeIstioClient,
@@ -434,6 +447,7 @@ func TestAddServiceEntriesWithDr(t *testing.T) {
rr.AdmiralCache = &admiralCache
AddServiceEntriesWithDr(ctx, rr, map[string]string{"cl1": "cl1"}, map[string]*istioNetworkingV1Alpha3.ServiceEntry{"se1": &se})
AddServiceEntriesWithDr(ctx, rr, map[string]string{"cl1": "cl1"}, map[string]*istioNetworkingV1Alpha3.ServiceEntry{"se1": &emptyEndpointSe})
AddServiceEntriesWithDr(ctx, rr, map[string]string{"cl1": "cl1"}, map[string]*istioNetworkingV1Alpha3.ServiceEntry{"dummySe": &dummyEndpointSe})
}

func TestCreateSeAndDrSetFromGtp(t *testing.T) {
@@ -825,6 +839,10 @@ func TestModifyNonExistingSidecarForLocalClusterCommunication(t *testing.T) {
ctx := context.Background()

modifySidecarForLocalClusterCommunication(ctx, "test-sidecar-namespace", sidecarEgressMap, remoteController)
sidecarObj, err := sidecarController.IstioClient.NetworkingV1alpha3().Sidecars("test-sidecar-namespace").Get(ctx, common.GetWorkloadSidecarName(), v12.GetOptions{})
if err == nil {
t.Errorf("expected 404 not found error but got nil")
}

sidecarObj, err := sidecarController.IstioClient.NetworkingV1alpha3().Sidecars("test-sidecar-namespace").Get(ctx, common.GetWorkloadSidecarName(), v12.GetOptions{})
if err == nil {
@@ -861,8 +879,13 @@ func TestModifyExistingSidecarForLocalClusterCommunication(t *testing.T) {
if err != nil {
t.Error(err)
}
if createdSidecar != nil {

ctx := context.Background()
createdSidecar, err := sidecarController.IstioClient.NetworkingV1alpha3().Sidecars("test-sidecar-namespace").Create(ctx, existingSidecarObj, v12.CreateOptions{})
if err != nil {
t.Error(err)
}
if createdSidecar != nil {
sidecarEgressMap := make(map[string]common.SidecarEgress)
sidecarEgressMap["test-dependency-namespace"] = common.SidecarEgress{Namespace: "test-dependency-namespace", FQDN: "test-local-fqdn", CNAMEs: map[string]string{"test.myservice.global": "1"}}
time.Sleep(5 * time.Second)
@@ -1139,8 +1162,7 @@ func TestCreateServiceEntry(t *testing.T) {
//Run the test for every provided case
for _, c := range deploymentSeCreationTestCases {
t.Run(c.name, func(t *testing.T) {
var createdSE *istioNetworkingV1Alpha3.ServiceEntry
createdSE = createServiceEntry(ctx, c.action, c.rc, &c.admiralCache, c.meshPorts, &c.deployment, c.serviceEntries)
createdSE := createServiceEntryForDeployment(ctx, c.action, c.rc, &c.admiralCache, c.meshPorts, &c.deployment, c.serviceEntries)
if !reflect.DeepEqual(createdSE, c.expectedResult) {
t.Errorf("Test %s failed, expected: %v got %v", c.name, c.expectedResult, createdSE)
}
@@ -1536,7 +1558,6 @@ func TestUpdateEndpointsForBlueGreen(t *testing.T) {
rollout = &argo.Rollout{}
meshPorts = map[string]uint32{"http": 8080}
)

rollout.Spec.Strategy = argo.RolloutStrategy{
BlueGreen: &argo.BlueGreenStrategy{
ActiveService: activeService,
@@ -1545,7 +1566,6 @@ func TestUpdateEndpointsForBlueGreen(t *testing.T) {
}
rollout.Spec.Template.Annotations = map[string]string{}
rollout.Spec.Template.Annotations[common.SidecarEnabledPorts] = "8080"

endpoint := &istioNetworkingV1Alpha3.WorkloadEntry{
Labels: map[string]string{}, Address: clusterIngress1, Ports: map[string]uint32{"http": 15443},
}
@@ -1639,7 +1659,6 @@ func TestUpdateEndpointsForWeightedServices(t *testing.T) {
{Address: stableService + common.Sep + namespace + common.DotLocalDomainSuffix, Weight: 100, Ports: meshPorts},
}
)

testCases := []struct {
name string
inputServiceEntry *istioNetworkingV1Alpha3.ServiceEntry
2 changes: 0 additions & 2 deletions admiral/pkg/clusters/types.go
Original file line number Diff line number Diff line change
@@ -306,7 +306,6 @@ func (r RoutingPolicyHandler) Added(ctx context.Context, obj *v1.RoutingPolicy)
return
}
r.processRoutingPolicy(ctx, dependents, obj, admiral.Add)

log.Infof(LogFormat, admiral.Add, "routingpolicy", obj.Name, "", "finished processing routing policy")
} else {
log.Infof(LogFormat, admiral.Add, "routingpolicy", obj.Name, "", "routingpolicy disabled")
@@ -348,7 +347,6 @@ func (r RoutingPolicyHandler) Updated(ctx context.Context, obj *v1.RoutingPolicy
return
}
r.processRoutingPolicy(ctx, dependents, obj, admiral.Update)

log.Infof(LogFormat, admiral.Update, "routingpolicy", obj.Name, "", "updated routing policy")
} else {
log.Infof(LogFormat, admiral.Update, "routingpolicy", obj.Name, "", "routingpolicy disabled")
You are viewing a condensed version of this merge commit. You can view the full changes here.