Skip to content

Commit

Permalink
MESH-5878: Admiral does not create envoy filter on client onboardings
Browse files Browse the repository at this point in the history
- Add DeltaUpdate and Delete api
- Increase coverage

TODO: Implement DeletaUpdate() correctly
Signed-off-by: Adil Fulara <[email protected]>
  • Loading branch information
Adil Fulara committed Dec 12, 2024
1 parent 51396fc commit 5a4ddd5
Show file tree
Hide file tree
Showing 3 changed files with 520 additions and 135 deletions.
3 changes: 3 additions & 0 deletions admiral/pkg/clusters/dependency_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ func (dh *DependencyHandler) HandleDependencyRecord(ctx context.Context, obj *v1
return handleDepRecordErrors
}

// process routing policies
_ = dh.RoutingPolicyProcessor.DeltaUpdate(ctx, eventType, obj)

remoteRegistry.AdmiralCache.SourceToDestinations.put(obj)
return handleDepRecordErrors
}
Expand Down
132 changes: 80 additions & 52 deletions admiral/pkg/clusters/routingpolicy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func NewRoutingPolicyHandler(rr *RemoteRegistry, cId string, rpProcessor Routing

type RoutingPolicyProcessor interface {
ProcessAddOrUpdate(ctx context.Context, eventType admiral.EventType, routingPolicy *v1.RoutingPolicy, dependents map[string]string) error
DeltaUpdate(ctx context.Context, eventType admiral.EventType, dependency *v1.Dependency) error
Delete(ctx context.Context, eventType admiral.EventType, routingPolicy *v1.RoutingPolicy) error
}

type RoutingPolicyService struct {
Expand All @@ -51,6 +53,81 @@ func (r *RoutingPolicyService) ProcessAddOrUpdate(ctx context.Context, eventType
return err
}

func (r *RoutingPolicyService) DeltaUpdate(ctx context.Context, eventType admiral.EventType, dependency *v1.Dependency) error {
//ns := getEnvoyFilterNamespace()

//sourceClustersMap := r.RemoteRegistry.AdmiralCache.IdentityDependencyCache.Get(dependency.Spec.Source)
//if sourceClustersMap == nil {
// log.Infof(LogFormat, string(eventType), common.DependencyResourceType, dependency.Name, "", fmt.Sprintf("identity: %s, does not have any clusters. Skipping RoutingPolicy delta update", dependency.Spec.Source))
// return nil
//}
getDestinationsToBeProcessed(eventType, dependency, r.RemoteRegistry)

// for each destination, identify the rp.
// for each rp, call ProcessAddOrUpdate to add the envoyfilter

//sourceClusters := make([]*RemoteController, 0)
//for _, sClusterID := range sourceClustersMap.GetKeys() {
// it := r.RemoteRegistry.GetRemoteController(sClusterID)
// if it != nil {
// sourceClusters = append(sourceClusters, it)
// }
//}
//
//for _, rc := range sourceClusters {
// options := metaV1.ListOptions{}
// options.LabelSelector = fmt.Sprintf("%s=%s", common.GetRoutingPolicyLabel(), dependency.Spec.Source)
// rp, err := rc.RoutingPolicyController.CrdClient.AdmiralV1alpha1().RoutingPolicies(ns).List(ctx, options)
// if err != nil {
// log.Errorf(LogFormat, eventType, "dependency", dependency.Spec.Source, rc.ClusterID, err)
// return err
// }
// d := make(map[string]string)
// for _, it := range destinations {
// d[it] = it
// }
// for _, it := range rp.Items {
// err = r.ProcessAddOrUpdate(ctx, eventType, &it, d)
// if err != nil {
// log.Errorf(LogErrFormat, admiral.Update, "routingpolicy", it.Name, "", "failed to process routing policy")
// return err
// }
// }
//}

return nil
}

func (r *RoutingPolicyService) Delete(ctx context.Context, eventType admiral.EventType, routingPolicy *v1.RoutingPolicy) error {
key := routingPolicy.Name + common.GetRoutingPolicyIdentity(routingPolicy) + common.GetRoutingPolicyEnv(routingPolicy)
if r.RemoteRegistry == nil || r.RemoteRegistry.AdmiralCache == nil || r.RemoteRegistry.AdmiralCache.RoutingPolicyFilterCache == nil {
log.Infof(LogFormat, eventType, "routingpolicy", routingPolicy.Name, "", "skipping delete event as cache is nil")
return nil
}
clusterIdFilterMap := r.RemoteRegistry.AdmiralCache.RoutingPolicyFilterCache.Get(key) // RoutingPolicyFilterCache key=rpname+rpidentity+environment of the routingPolicy, value is a map [clusterId -> map [filterName -> filterNameSpace]]
var err error
for _, rc := range r.RemoteRegistry.remoteControllers {
if rc != nil {
if filterMap, ok := clusterIdFilterMap[rc.ClusterID]; ok {
for filter, filterNs := range filterMap {
log.Infof(LogFormat, eventType, "envoyfilter", filter, rc.ClusterID, "deleting")
err1 := rc.RoutingPolicyController.IstioClient.NetworkingV1alpha3().EnvoyFilters(filterNs).Delete(ctx, filter, metaV1.DeleteOptions{})
if err1 != nil {
log.Errorf(LogErrFormat, eventType, "envoyfilter", filter, rc.ClusterID, err1)
err = common.AppendError(err, err1)
} else {
log.Infof(LogFormat, eventType, "envoyfilter", filter, rc.ClusterID, "deleting from cache")
}
}
}
}
}
if err == nil {
r.RemoteRegistry.AdmiralCache.RoutingPolicyFilterCache.Delete(key)
}
return err
}

func NewRoutingPolicyProcessor(remoteRegistry *RemoteRegistry) RoutingPolicyProcessor {
return &RoutingPolicyService{RemoteRegistry: remoteRegistry}
}
Expand Down Expand Up @@ -109,7 +186,7 @@ func (r RoutingPolicyHandler) Added(ctx context.Context, obj *v1.RoutingPolicy)
if common.ShouldIgnoreResource(obj.ObjectMeta) {
log.Infof("op=%s type=%v name=%v namespace=%s cluster=%s message=%s", "admiralIoIgnoreAnnotationCheck", common.RoutingPolicyResourceType,
obj.Name, obj.Namespace, "", "Value=true")
log.Infof(LogFormat, "success", "routingpolicy", obj.Name, "", "Ignored the RoutingPolicy because of the annotation")
log.Infof(LogFormat, "success", "routingpolicy", obj.Name, "", "Ignored the RoutingPolicy because of the annotation or label")
return nil
}
dependents := getDependents(obj, r)
Expand All @@ -129,25 +206,6 @@ func (r RoutingPolicyHandler) Added(ctx context.Context, obj *v1.RoutingPolicy)
return nil
}

func (r RoutingPolicyHandler) processroutingPolicy(ctx context.Context, dependents map[string]string, routingPolicy *v1.RoutingPolicy, eventType admiral.EventType) error {
var err error
for _, remoteController := range r.RemoteRegistry.remoteControllers {
for _, dependent := range dependents {
// Check if the dependent exists in this remoteCluster. If so, we create an envoyFilter with dependent identity as workload selector
if _, ok := r.RemoteRegistry.AdmiralCache.IdentityClusterCache.Get(dependent).Copy()[remoteController.ClusterID]; ok {
_, err1 := createOrUpdateEnvoyFilter(ctx, remoteController, routingPolicy, eventType, dependent, r.RemoteRegistry.AdmiralCache)
if err1 != nil {
log.Errorf(LogErrFormat, eventType, "routingpolicy", routingPolicy.Name, remoteController.ClusterID, err)
err = common.AppendError(err, err1)
} else {
log.Infof(LogFormat, eventType, "routingpolicy ", routingPolicy.Name, remoteController.ClusterID, "created envoyfilters")
}
}
}
}
return err
}

func (r RoutingPolicyHandler) Updated(ctx context.Context, obj *v1.RoutingPolicy) error {
if commonUtil.IsAdmiralReadOnly() {
log.Infof(LogFormat, admiral.Update, "routingpolicy", "", "", "skipping read-only mode")
Expand All @@ -157,7 +215,7 @@ func (r RoutingPolicyHandler) Updated(ctx context.Context, obj *v1.RoutingPolicy
if common.ShouldIgnoreResource(obj.ObjectMeta) {
log.Infof("op=%s type=%v name=%v namespace=%s cluster=%s message=%s", "admiralIoIgnoreAnnotationCheck", common.RoutingPolicyResourceType,
obj.Name, obj.Namespace, "", "Value=true")
log.Infof(LogFormat, admiral.Update, "routingpolicy", obj.Name, "", "Ignored the RoutingPolicy because of the annotation")
log.Infof(LogFormat, admiral.Update, "routingpolicy", obj.Name, "", "Ignored the RoutingPolicy because of the annotation or label")
// We need to process this as a delete event.
r.Deleted(ctx, obj)
return nil
Expand Down Expand Up @@ -196,39 +254,9 @@ func getDependents(obj *v1.RoutingPolicy, r RoutingPolicyHandler) map[string]str
Deleted - deletes the envoyFilters for the routingPolicy when delete event received for routing policy
*/
func (r RoutingPolicyHandler) Deleted(ctx context.Context, obj *v1.RoutingPolicy) error {
err := r.deleteEnvoyFilters(ctx, obj, admiral.Delete)
err := r.RoutingPolicyService.Delete(ctx, admiral.Delete, obj)
if err != nil {
log.Infof(LogFormat, admiral.Delete, "routingpolicy", obj.Name, "", "deleted envoy filter for routing policy")
}
return err
}

func (r RoutingPolicyHandler) deleteEnvoyFilters(ctx context.Context, obj *v1.RoutingPolicy, eventType admiral.EventType) error {
key := obj.Name + common.GetRoutingPolicyIdentity(obj) + common.GetRoutingPolicyEnv(obj)
if r.RemoteRegistry == nil || r.RemoteRegistry.AdmiralCache == nil || r.RemoteRegistry.AdmiralCache.RoutingPolicyFilterCache == nil {
log.Infof(LogFormat, eventType, "routingpolicy", obj.Name, "", "skipping delete event as cache is nil")
return nil
}
clusterIdFilterMap := r.RemoteRegistry.AdmiralCache.RoutingPolicyFilterCache.Get(key) // RoutingPolicyFilterCache key=rpname+rpidentity+environment of the routingPolicy, value is a map [clusterId -> map [filterName -> filterNameSpace]]
var err error
for _, rc := range r.RemoteRegistry.remoteControllers {
if rc != nil {
if filterMap, ok := clusterIdFilterMap[rc.ClusterID]; ok {
for filter, filterNs := range filterMap {
log.Infof(LogFormat, eventType, "envoyfilter", filter, rc.ClusterID, "deleting")
err1 := rc.RoutingPolicyController.IstioClient.NetworkingV1alpha3().EnvoyFilters(filterNs).Delete(ctx, filter, metaV1.DeleteOptions{})
if err1 != nil {
log.Errorf(LogErrFormat, eventType, "envoyfilter", filter, rc.ClusterID, err1)
err = common.AppendError(err, err1)
} else {
log.Infof(LogFormat, eventType, "envoyfilter", filter, rc.ClusterID, "deleting from cache")
}
}
}
}
}
if err == nil {
r.RemoteRegistry.AdmiralCache.RoutingPolicyFilterCache.Delete(key)
}
return err
}
Loading

0 comments on commit 5a4ddd5

Please sign in to comment.