Skip to content

Commit

Permalink
Handle k8s service events and trigger SE gen
Browse files Browse the repository at this point in the history
  • Loading branch information
aattuluri committed Jul 1, 2022
1 parent 3cd7ac8 commit 7d2098a
Show file tree
Hide file tree
Showing 12 changed files with 236 additions and 189 deletions.
27 changes: 7 additions & 20 deletions admiral/pkg/clusters/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"net"
"reflect"
"sort"
"strings"
"time"

Expand All @@ -25,7 +24,6 @@ import (
)

const (
ROLLOUT_POD_HASH_LABEL string = "rollouts-pod-template-hash"
DefaultBaseEjectionTime int64 = 300
DefaultConsecutiveGatewayErrors uint32 = 50
DefaultInterval int64 = 60
Expand Down Expand Up @@ -730,13 +728,13 @@ func getServiceForDeployment(rc *RemoteController, deployment *k8sAppsV1.Deploym
return nil
}

cachedService := rc.ServiceController.Cache.Get(deployment.Namespace)
cachedServices := rc.ServiceController.Cache.Get(deployment.Namespace)

if cachedService == nil {
if cachedServices == nil {
return nil
}
var matchedService *k8sV1.Service
for _, service := range cachedService.Service[deployment.Namespace] {
for _, service := range cachedServices {
var match = true
for lkey, lvalue := range service.Spec.Selector {
value, ok := deployment.Spec.Selector.MatchLabels[lkey]
Expand Down Expand Up @@ -795,9 +793,9 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin
if rollout == nil {
return nil
}
cachedService := rc.ServiceController.Cache.Get(rollout.Namespace)
cachedServices := rc.ServiceController.Cache.Get(rollout.Namespace)

if cachedService == nil {
if cachedServices == nil {
return nil
}
rolloutStrategy := rollout.Spec.Strategy
Expand Down Expand Up @@ -875,18 +873,7 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin

var matchedServices = make(map[string]*WeightedService)

//if we have more than one matching service we will pick the first one, for this to be deterministic we sort services
var servicesInNamespace = cachedService.Service[rollout.Namespace]

servicesOrdered := make([]string, 0, len(servicesInNamespace))
for k := range servicesInNamespace {
servicesOrdered = append(servicesOrdered, k)
}

sort.Strings(servicesOrdered)

for _, s := range servicesOrdered {
var service = servicesInNamespace[s]
for _, service := range cachedServices {
var match = true
//skip services that are not referenced in the rollout
if len(blueGreenActiveService) > 0 && service.ObjectMeta.Name != blueGreenActiveService && service.ObjectMeta.Name != blueGreenPreviewService {
Expand All @@ -897,7 +884,7 @@ func getServiceForRollout(rc *RemoteController, rollout *argo.Rollout) map[strin
for lkey, lvalue := range service.Spec.Selector {
// Rollouts controller adds a dynamic label with name rollouts-pod-template-hash to both active and passive replicasets.
// This dynamic label is not available on the rollout template. Hence ignoring the label with name rollouts-pod-template-hash
if lkey == ROLLOUT_POD_HASH_LABEL {
if lkey == common.ROLLOUT_POD_HASH_LABEL {
continue
}
value, ok := rollout.Spec.Selector.MatchLabels[lkey]
Expand Down
57 changes: 29 additions & 28 deletions admiral/pkg/clusters/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,31 +120,21 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste

var err error

log.Infof("starting global traffic policy controller custerID: %v", clusterID)

rc.GlobalTraffic, err = admiral.NewGlobalTrafficController(clusterID, stop, &GlobalTrafficHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting service controller clusterID: %v", clusterID)
rc.ServiceController, err = admiral.NewServiceController(clusterID, stop, &ServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with GlobalTrafficController controller init: %v", err)
return fmt.Errorf(" Error with ServiceController controller init: %v", err)
}

log.Infof("starting deployment controller clusterID: %v", clusterID)
rc.DeploymentController, err = admiral.NewDeploymentController(clusterID, stop, &DeploymentHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
log.Infof("starting global traffic policy controller custerID: %v", clusterID)

rc.GlobalTraffic, err = admiral.NewGlobalTrafficController(clusterID, stop, &GlobalTrafficHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with DeploymentController controller init: %v", err)
return fmt.Errorf(" Error with GlobalTrafficController controller init: %v", err)
}

if r.AdmiralCache == nil {
log.Warn("admiral cache was nil!")
} else if r.AdmiralCache.argoRolloutsEnabled {
log.Infof("starting rollout controller clusterID: %v", clusterID)
rc.RolloutController, err = admiral.NewRolloutsController(clusterID, stop, &RolloutHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with Rollout controller init: %v", err)
}
}

log.Infof("starting node controller clusterID: %v", clusterID)
rc.NodeController, err = admiral.NewNodeController(clusterID, stop, &NodeHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig)
Expand All @@ -153,40 +143,51 @@ func (r *RemoteRegistry) createCacheController(clientConfig *rest.Config, cluste
return fmt.Errorf(" Error with NodeController controller init: %v", err)
}

log.Infof("starting service controller clusterID: %v", clusterID)
rc.ServiceController, err = admiral.NewServiceController(clusterID, stop, &ServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with ServiceController controller init: %v", err)
}

log.Infof("starting service entry controller for custerID: %v", clusterID)
rc.ServiceEntryController, err = istio.NewServiceEntryController(clusterID, stop, &ServiceEntryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
rc.ServiceEntryController, err = istio.NewServiceEntryController(clusterID, stop, &ServiceEntryHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with ServiceEntryController init: %v", err)
}

log.Infof("starting destination rule controller for custerID: %v", clusterID)
rc.DestinationRuleController, err = istio.NewDestinationRuleController(clusterID, stop, &DestinationRuleHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
rc.DestinationRuleController, err = istio.NewDestinationRuleController(clusterID, stop, &DestinationRuleHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with DestinationRuleController init: %v", err)
}

log.Infof("starting virtual service controller for custerID: %v", clusterID)
rc.VirtualServiceController, err = istio.NewVirtualServiceController(clusterID, stop, &VirtualServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
rc.VirtualServiceController, err = istio.NewVirtualServiceController(clusterID, stop, &VirtualServiceHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with VirtualServiceController init: %v", err)
}

rc.SidecarController, err = istio.NewSidecarController(clusterID, stop, &SidecarHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)
rc.SidecarController, err = istio.NewSidecarController(clusterID, stop, &SidecarHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, 0)

if err != nil {
return fmt.Errorf(" Error with DestinationRuleController init: %v", err)
}

log.Infof("starting deployment controller clusterID: %v", clusterID)
rc.DeploymentController, err = admiral.NewDeploymentController(clusterID, stop, &DeploymentHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with DeploymentController controller init: %v", err)
}

if r.AdmiralCache == nil {
log.Warn("admiral cache was nil!")
} else if r.AdmiralCache.argoRolloutsEnabled {
log.Infof("starting rollout controller clusterID: %v", clusterID)
rc.RolloutController, err = admiral.NewRolloutsController(clusterID, stop, &RolloutHandler{RemoteRegistry: r, ClusterID: clusterID}, clientConfig, resyncPeriod)

if err != nil {
return fmt.Errorf(" Error with Rollout controller init: %v", err)
}
}

r.Lock()
defer r.Unlock()
r.RemoteControllers[clusterID] = &rc
Expand Down
43 changes: 42 additions & 1 deletion admiral/pkg/clusters/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/secret"
log "github.com/sirupsen/logrus"
k8sAppsV1 "k8s.io/api/apps/v1"
k8sV1 "k8s.io/api/core/v1"
k8s "k8s.io/client-go/kubernetes"
)

Expand Down Expand Up @@ -145,6 +146,47 @@ type ServiceHandler struct {
ClusterID string
}

func (sh *ServiceHandler) Added(obj *k8sV1.Service) {
log.Infof(LogFormat, "Added", "service", obj.Name, sh.ClusterID, "received")
err := HandleEventForService(obj, sh.RemoteRegistry, sh.ClusterID)
if err != nil {
log.Infof(err.Error())
}
}

func (sh *ServiceHandler) Updated(obj *k8sV1.Service) {
log.Infof(LogFormat, "Updated", "service", obj.Name, sh.ClusterID, "received")
err := HandleEventForService(obj, sh.RemoteRegistry, sh.ClusterID)
if err != nil {
log.Infof(err.Error())
}
}

func (sh *ServiceHandler) Deleted(obj *k8sV1.Service) {
log.Infof(LogFormat, "Deleted", "service", obj.Name, sh.ClusterID, "received")
err := HandleEventForService(obj, sh.RemoteRegistry, sh.ClusterID)
if err != nil {
log.Infof(err.Error())
}
}

func HandleEventForService(svc *k8sV1.Service, remoteRegistry *RemoteRegistry, clusterName string) error {
matchingDeployements := remoteRegistry.RemoteControllers[clusterName].DeploymentController.GetDeploymentBySelectorInNamespace(svc.Spec.Selector, svc.Namespace)
for _, deployment := range matchingDeployements {
HandleEventForDeployment(admiral.Update, &deployment, remoteRegistry, clusterName)
}
if common.GetAdmiralParams().ArgoRolloutsEnabled {
rollouts := remoteRegistry.RemoteControllers[clusterName].RolloutController.GetRolloutBySelectorInNamespace(svc.Spec.Selector, svc.Namespace)

if len(rollouts) > 0 {
for _, rollout := range rollouts {
HandleEventForRollout(admiral.Update, &rollout, remoteRegistry, clusterName)
}
}
}
return nil
}

func (dh *DependencyHandler) Added(obj *v1.Dependency) {

log.Infof(LogFormat, "Add", "dependency-record", obj.Name, "", "Received=true namespace="+obj.Namespace)
Expand Down Expand Up @@ -243,7 +285,6 @@ func HandleEventForRollout(event admiral.EventType, obj *argo.Rollout, remoteReg

// helper function to handle add and delete for DeploymentHandler
func HandleEventForDeployment(event admiral.EventType, obj *k8sAppsV1.Deployment, remoteRegistry *RemoteRegistry, clusterName string) {
log.Infof(LogFormat, event, "deployment", obj.Name, clusterName, "Received")

globalIdentifier := common.GetDeploymentGlobalIdentifier(obj)

Expand Down
3 changes: 2 additions & 1 deletion admiral/pkg/controller/admiral/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ func (c *Controller) Run(stopCh <-chan struct{}) {
}

log.Infof("Informer caches synced for controller=%v, current keys=%v", c.name, c.informer.GetStore().ListKeys())
wait.Until(c.runWorker, 5*time.Second, stopCh)
//wait for 30 seconds for the first time (for all caches to sync)
wait.Until(c.runWorker, 30 * time.Second, stopCh)
}

func (c *Controller) runWorker() {
Expand Down
56 changes: 13 additions & 43 deletions admiral/pkg/controller/admiral/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
k8sAppsV1 "k8s.io/api/apps/v1"
k8sAppsinformers "k8s.io/client-go/informers/apps/v1"
"k8s.io/client-go/rest"
"reflect"
"time"

log "github.com/sirupsen/logrus"
Expand Down Expand Up @@ -78,45 +79,6 @@ func (p *deploymentCache) DeleteFromDeploymentClusterCache(key string, deploymen
}
}

func (d *DeploymentController) GetDeployments() ([]*k8sAppsV1.Deployment, error) {

ns := d.K8sClient.CoreV1().Namespaces()

namespaceSidecarInjectionLabelFilter := d.labelSet.NamespaceSidecarInjectionLabel + "=" + d.labelSet.NamespaceSidecarInjectionLabelValue
istioEnabledNs, err := ns.List(meta_v1.ListOptions{LabelSelector: namespaceSidecarInjectionLabelFilter})

if err != nil {
return nil, fmt.Errorf("error getting istio labled namespaces: %v", err)
}

var res []*k8sAppsV1.Deployment

for _, v := range istioEnabledNs.Items {

deployments := d.K8sClient.AppsV1().Deployments(v.Name)
deploymentsList, err := deployments.List(meta_v1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("error listing deployments: %v", err)
}
var admiralDeployments []k8sAppsV1.Deployment
for _, deployment := range deploymentsList.Items {
if !d.shouldIgnoreBasedOnLabels(&deployment) {
admiralDeployments = append(admiralDeployments, deployment)
}
}

if err != nil {
return nil, fmt.Errorf("error getting istio labled namespaces: %v", err)
}

for _, pi := range admiralDeployments {
res = append(res, &pi)
}
}

return res, nil
}

func NewDeploymentController(clusterID string, stopCh <-chan struct{}, handler DeploymentHandler, config *rest.Config, resyncPeriod time.Duration) (*DeploymentController, error) {

deploymentController := DeploymentController{}
Expand Down Expand Up @@ -211,10 +173,10 @@ func (d *DeploymentController) shouldIgnoreBasedOnLabels(deployment *k8sAppsV1.D
return false //labels are fine, we should not ignore
}

func (d *DeploymentController) GetDeploymentByLabel(labelValue string, namespace string) []k8sAppsV1.Deployment {
matchLabel := common.GetGlobalTrafficDeploymentLabel()
func (d *DeploymentController) GetDeploymentBySelectorInNamespace(serviceSelector map[string]string, namespace string) []k8sAppsV1.Deployment {

labelOptions := meta_v1.ListOptions{}
labelOptions.LabelSelector = fmt.Sprintf("%s=%s", matchLabel, labelValue)

matchedDeployments, err := d.K8sClient.AppsV1().Deployments(namespace).List(labelOptions)

if err != nil {
Expand All @@ -226,5 +188,13 @@ func (d *DeploymentController) GetDeploymentByLabel(labelValue string, namespace
return []k8sAppsV1.Deployment{}
}

return matchedDeployments.Items
var filteredDeployments = make([]k8sAppsV1.Deployment, 0)

for _, deployment := range matchedDeployments.Items {
if reflect.DeepEqual(deployment.Spec.Selector.MatchLabels, serviceSelector) {
filteredDeployments = append(filteredDeployments, deployment)
}
}

return filteredDeployments
}
Loading

0 comments on commit 7d2098a

Please sign in to comment.