From 063d070514f5ff9c28199dbe69e668df526afcd1 Mon Sep 17 00:00:00 2001 From: talsabagport Date: Tue, 13 Aug 2024 19:57:58 +0300 Subject: [PATCH 01/13] Refactor for independent resync --- pkg/handlers/controllers.go | 81 +++++---- pkg/jq/parser.go | 6 +- pkg/k8s/controller.go | 343 ++++++++++++++++-------------------- pkg/k8s/controller_test.go | 2 +- pkg/port/cli/entity.go | 2 +- pkg/port/entity/entity.go | 128 ++++++++++++++ pkg/port/mapping/entity.go | 48 ----- pkg/port/models.go | 12 +- 8 files changed, 349 insertions(+), 273 deletions(-) create mode 100644 pkg/port/entity/entity.go delete mode 100644 pkg/port/mapping/entity.go diff --git a/pkg/handlers/controllers.go b/pkg/handlers/controllers.go index f7ef18a..c2f2790 100644 --- a/pkg/handlers/controllers.go +++ b/pkg/handlers/controllers.go @@ -2,11 +2,11 @@ package handlers import ( "context" + "github.com/port-labs/port-k8s-exporter/pkg/port/integration" + "sync" "time" "github.com/port-labs/port-k8s-exporter/pkg/config" - "github.com/port-labs/port-k8s-exporter/pkg/port/integration" - "github.com/port-labs/port-k8s-exporter/pkg/crd" "github.com/port-labs/port-k8s-exporter/pkg/goutils" "github.com/port-labs/port-k8s-exporter/pkg/k8s" @@ -71,56 +71,73 @@ func NewControllersHandler(exporterConfig *port.Config, portConfig *port.Integra func (c *ControllersHandler) Handle() { klog.Info("Starting informers") c.informersFactory.Start(c.stopCh) - klog.Info("Waiting for informers cache sync") + + currentEntitiesSets := make([]map[string]interface{}, 0) + shouldDeleteStaleEntities := true + var syncWg sync.WaitGroup + for _, controller := range c.controllers { + controller := controller + + klog.Infof("Waiting for informer cache to sync for resource '%s'", controller.Resource.Kind) if err := controller.WaitForCacheSync(c.stopCh); err != nil { klog.Fatalf("Error while waiting for informer cache sync: %s", err.Error()) } - } - currentEntitiesSet := make([]map[string]interface{}, 0) - for _, controller := range c.controllers { - controllerEntitiesSet, rawDataExamples, err := controller.GetEntitiesSet() - if err != nil { - klog.Errorf("error getting controller entities set: %s", err.Error()) - } - currentEntitiesSet = append(currentEntitiesSet, controllerEntitiesSet) - if len(rawDataExamples) > 0 { - err = integration.PostIntegrationKindExample(c.portClient, c.stateKey, controller.Resource.Kind, rawDataExamples) - if err != nil { - klog.Warningf("failed to post integration kind example: %s", err.Error()) + go func() { + <-c.stopCh + klog.Info("Shutting down controllers") + controller.Shutdown() + klog.Info("Exporter exiting") + }() + + syncWg.Add(1) + go func() { + defer syncWg.Done() + klog.Infof("Starting full initial resync for resource '%s'", controller.Resource.Kind) + initialSyncResult := controller.RunInitialSync() + klog.Infof("Done full initial resync, starting live events sync for resource '%s'", controller.Resource.Kind) + controller.RunEventsSync(1, c.stopCh) + if initialSyncResult.EntitiesSet != nil { + currentEntitiesSets = append(currentEntitiesSets, initialSyncResult.EntitiesSet) } - } - } - - klog.Info("Deleting stale entities") - c.RunDeleteStaleEntities(currentEntitiesSet) - klog.Info("Starting controllers") - for _, controller := range c.controllers { - controller.Run(1, c.stopCh) + if len(initialSyncResult.RawDataExamples) > 0 { + err := integration.PostIntegrationKindExample(c.portClient, c.stateKey, controller.Resource.Kind, initialSyncResult.RawDataExamples) + if err != nil { + klog.Warningf("failed to post integration kind example: %s", err.Error()) + } + } + shouldDeleteStaleEntities = shouldDeleteStaleEntities && initialSyncResult.ShouldDeleteStaleEntities + }() } + syncWg.Wait() + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() go func() { <-c.stopCh - klog.Info("Shutting down controllers") - for _, controller := range c.controllers { - controller.Shutdown() - } - klog.Info("Exporter exiting") + ctx.Done() }() + + if shouldDeleteStaleEntities { + klog.Info("Deleting stale entities") + c.RunDeleteStaleEntities(ctx, currentEntitiesSets) + klog.Info("Done deleting stale entities") + } else { + klog.Warning("Skipping delete of stale entities due to a failure in getting all current entities from k8s") + } } -func (c *ControllersHandler) RunDeleteStaleEntities(currentEntitiesSet []map[string]interface{}) { - _, err := c.portClient.Authenticate(context.Background(), c.portClient.ClientID, c.portClient.ClientSecret) +func (c *ControllersHandler) RunDeleteStaleEntities(ctx context.Context, currentEntitiesSet []map[string]interface{}) { + _, err := c.portClient.Authenticate(ctx, c.portClient.ClientID, c.portClient.ClientSecret) if err != nil { klog.Errorf("error authenticating with Port: %v", err) } - err = c.portClient.DeleteStaleEntities(context.Background(), c.stateKey, goutils.MergeMaps(currentEntitiesSet...)) + err = c.portClient.DeleteStaleEntities(ctx, c.stateKey, goutils.MergeMaps(currentEntitiesSet...)) if err != nil { klog.Errorf("error deleting stale entities: %s", err.Error()) } - klog.Info("Done deleting stale entities") } func (c *ControllersHandler) Stop() { diff --git a/pkg/jq/parser.go b/pkg/jq/parser.go index 3edfa51..df8e872 100644 --- a/pkg/jq/parser.go +++ b/pkg/jq/parser.go @@ -117,7 +117,7 @@ func ParseMapInterface(jqQueries map[string]string, obj interface{}) (map[string return mapInterface, nil } -func ParseRelations(jqQueries map[string]interface{}, obj interface{}) (map[string]interface{}, error) { +func ParseMapRecursively(jqQueries map[string]interface{}, obj interface{}) (map[string]interface{}, error) { mapInterface := make(map[string]interface{}, len(jqQueries)) for key, jqQuery := range jqQueries { @@ -127,7 +127,7 @@ func ParseRelations(jqQueries map[string]interface{}, obj interface{}) (map[stri mapInterface = goutils.MergeMaps(mapInterface, queryRes) } else if reflect.TypeOf(jqQuery).Kind() == reflect.Map { for mapKey, mapValue := range jqQuery.(map[string]interface{}) { - queryRes, _ := ParseRelations(map[string]interface{}{mapKey: mapValue}, obj) + queryRes, _ := ParseMapRecursively(map[string]interface{}{mapKey: mapValue}, obj) for queryKey, queryVal := range queryRes { if mapInterface[key] == nil { mapInterface[key] = make(map[string]interface{}) @@ -139,7 +139,7 @@ func ParseRelations(jqQueries map[string]interface{}, obj interface{}) (map[stri jqArrayValue := reflect.ValueOf(jqQuery) relations := make([]interface{}, jqArrayValue.Len()) for i := 0; i < jqArrayValue.Len(); i++ { - relation, err := ParseRelations(map[string]interface{}{key: jqArrayValue.Index(i).Interface()}, obj) + relation, err := ParseMapRecursively(map[string]interface{}{key: jqArrayValue.Index(i).Interface()}, obj) if err != nil { return nil, err } diff --git a/pkg/k8s/controller.go b/pkg/k8s/controller.go index 92d8b63..ae5346b 100644 --- a/pkg/k8s/controller.go +++ b/pkg/k8s/controller.go @@ -3,23 +3,18 @@ package k8s import ( "context" "fmt" + "github.com/port-labs/port-k8s-exporter/pkg/goutils" + "github.com/port-labs/port-k8s-exporter/pkg/port/entity" "time" - "hash/fnv" - "strconv" - "github.com/port-labs/port-k8s-exporter/pkg/config" "github.com/port-labs/port-k8s-exporter/pkg/jq" "github.com/port-labs/port-k8s-exporter/pkg/port" "github.com/port-labs/port-k8s-exporter/pkg/port/cli" - "github.com/port-labs/port-k8s-exporter/pkg/port/mapping" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/informers" - "encoding/json" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" @@ -42,13 +37,22 @@ type EventItem struct { ActionType EventActionType } +type SyncResult struct { + EntitiesSet map[string]interface{} + RawDataExamples []interface{} + ShouldDeleteStaleEntities bool +} + type Controller struct { - Resource port.AggregatedResource - portClient *cli.PortClient - integrationConfig *port.IntegrationAppConfig - informer cache.SharedIndexInformer - lister cache.GenericLister - workqueue workqueue.RateLimitingInterface + Resource port.AggregatedResource + portClient *cli.PortClient + integrationConfig *port.IntegrationAppConfig + informer cache.SharedIndexInformer + lister cache.GenericLister + eventHandler cache.ResourceEventHandlerRegistration + eventsWorkqueue workqueue.RateLimitingInterface + initialSyncWorkqueue workqueue.RateLimitingInterface + isInitialSyncDone bool } func NewController(resource port.AggregatedResource, informer informers.GenericInformer, integrationConfig *port.IntegrationAppConfig, applicationConfig *config.ApplicationConfiguration) *Controller { @@ -58,22 +62,32 @@ func NewController(resource port.AggregatedResource, informer informers.GenericI cli.WithDeleteDependents(integrationConfig.DeleteDependents)(portClient) cli.WithCreateMissingRelatedEntities(integrationConfig.CreateMissingRelatedEntities)(portClient) controller := &Controller{ - Resource: resource, - portClient: portClient, - integrationConfig: integrationConfig, - informer: informer.Informer(), - lister: informer.Lister(), - workqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + Resource: resource, + portClient: portClient, + integrationConfig: integrationConfig, + informer: informer.Informer(), + lister: informer.Lister(), + initialSyncWorkqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + eventsWorkqueue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), } - controller.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + controller.eventHandler, _ = controller.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { var err error var item EventItem item.ActionType = CreateAction item.Key, err = cache.MetaNamespaceKeyFunc(obj) - if err == nil { - controller.workqueue.Add(item) + if err != nil { + return + } + + if controller.isInitialSyncDone || controller.eventHandler.HasSynced() { + if !controller.isInitialSyncDone { + controller.isInitialSyncDone = true + } + controller.eventsWorkqueue.Add(item) + } else { + controller.initialSyncWorkqueue.Add(item) } }, UpdateFunc: func(old interface{}, new interface{}) { @@ -81,11 +95,12 @@ func NewController(resource port.AggregatedResource, informer informers.GenericI var item EventItem item.ActionType = UpdateAction item.Key, err = cache.MetaNamespaceKeyFunc(new) - if err == nil { + if err != nil { + return + } - if controller.shouldSendUpdateEvent(old, new, integrationConfig.UpdateEntityOnlyOnDiff == nil || *(integrationConfig.UpdateEntityOnlyOnDiff)) { - controller.workqueue.Add(item) - } + if controller.shouldSendUpdateEvent(old, new, integrationConfig.UpdateEntityOnlyOnDiff == nil || *(integrationConfig.UpdateEntityOnlyOnDiff)) { + controller.eventsWorkqueue.Add(item) } }, DeleteFunc: func(obj interface{}) { @@ -93,11 +108,13 @@ func NewController(resource port.AggregatedResource, informer informers.GenericI var item EventItem item.ActionType = DeleteAction item.Key, err = cache.DeletionHandlingMetaNamespaceKeyFunc(obj) - if err == nil { - err = controller.objectHandler(obj, item) - if err != nil { - klog.Errorf("Error deleting item '%s' of resource '%s': %s", item.Key, resource.Kind, err.Error()) - } + if err != nil { + return + } + + _, err = controller.objectHandler(obj, item) + if err != nil { + klog.Errorf("Error deleting item '%s' of resource '%s': %s", item.Key, resource.Kind, err.Error()) } }, }) @@ -106,13 +123,11 @@ func NewController(resource port.AggregatedResource, informer informers.GenericI } func (c *Controller) Shutdown() { - klog.Infof("Shutting down controller for resource '%s'", c.Resource.Kind) - c.workqueue.ShutDown() - klog.Infof("Closed controller for resource '%s'", c.Resource.Kind) + c.initialSyncWorkqueue.ShutDown() + c.eventsWorkqueue.ShutDown() } func (c *Controller) WaitForCacheSync(stopCh <-chan struct{}) error { - klog.Infof("Waiting for informer cache to sync for resource '%s'", c.Resource.Kind) if ok := cache.WaitForCacheSync(stopCh, c.informer.HasSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } @@ -120,106 +135,152 @@ func (c *Controller) WaitForCacheSync(stopCh <-chan struct{}) error { return nil } -func (c *Controller) Run(workers int, stopCh <-chan struct{}) { +func (c *Controller) RunInitialSync() *SyncResult { + entitiesSet := make(map[string]interface{}) + rawDataExamples := make([]interface{}, 0) + shouldDeleteStaleEntities := true + shouldContinue := true + requeueCounter := 0 + var requeueCounterDiff int + var syncResult *SyncResult + for shouldContinue && (requeueCounter > 0 || c.initialSyncWorkqueue.Len() > 0 || !c.eventHandler.HasSynced()) { + syncResult, requeueCounterDiff, shouldContinue = c.processNextWorkItem(c.initialSyncWorkqueue) + requeueCounter += requeueCounterDiff + if syncResult != nil { + entitiesSet = goutils.MergeMaps(entitiesSet, syncResult.EntitiesSet) + amountOfExamplesToAdd := min(len(syncResult.RawDataExamples), MaxRawDataExamplesToSend-len(rawDataExamples)) + rawDataExamples = append(rawDataExamples, syncResult.RawDataExamples[:amountOfExamplesToAdd]...) + shouldDeleteStaleEntities = shouldDeleteStaleEntities && syncResult.ShouldDeleteStaleEntities + } + } + + return &SyncResult{ + EntitiesSet: entitiesSet, + RawDataExamples: rawDataExamples, + ShouldDeleteStaleEntities: shouldDeleteStaleEntities, + } +} + +func (c *Controller) RunEventsSync(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() - klog.Infof("Starting workers for resource '%s'", c.Resource.Kind) for i := 0; i < workers; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.Until(func() { + for _, _, shouldContinue := c.processNextWorkItem(c.eventsWorkqueue); shouldContinue; { + } + }, time.Second, stopCh) } - klog.Infof("Started workers for resource '%s'", c.Resource.Kind) } -func (c *Controller) runWorker() { - for c.processNextWorkItem() { +func (c *Controller) processNextWorkItem(workqueue workqueue.RateLimitingInterface) (*SyncResult, int, bool) { + permanentErrorSyncResult := &SyncResult{ + EntitiesSet: make(map[string]interface{}), + RawDataExamples: make([]interface{}, 0), + ShouldDeleteStaleEntities: false, } -} -func (c *Controller) processNextWorkItem() bool { - obj, shutdown := c.workqueue.Get() + obj, shutdown := workqueue.Get() if shutdown { - return false + return permanentErrorSyncResult, 0, false } - err := func(obj interface{}) error { - defer c.workqueue.Done(obj) + syncResult, requeueCounterDiff, err := func(obj interface{}) (*SyncResult, int, error) { + defer workqueue.Done(obj) + + numRequeues := workqueue.NumRequeues(obj) + requeueCounterDiff := 0 + if numRequeues > 0 { + requeueCounterDiff = -1 + } item, ok := obj.(EventItem) if !ok { - c.workqueue.Forget(obj) - utilruntime.HandleError(fmt.Errorf("expected event item of resource '%s' in workqueue but got %#v", c.Resource.Kind, obj)) - return nil + workqueue.Forget(obj) + return permanentErrorSyncResult, requeueCounterDiff, fmt.Errorf("expected event item of resource '%s' in workqueue but got %#v", c.Resource.Kind, obj) } - if err := c.syncHandler(item); err != nil { - if c.workqueue.NumRequeues(obj) >= MaxNumRequeues { - utilruntime.HandleError(fmt.Errorf("error syncing '%s' of resource '%s': %s, give up after %d requeues", item.Key, c.Resource.Kind, err.Error(), MaxNumRequeues)) - return nil + syncResult, err := c.syncHandler(item) + if err != nil { + if numRequeues >= MaxNumRequeues { + workqueue.Forget(obj) + return syncResult, requeueCounterDiff, fmt.Errorf("error syncing '%s' of resource '%s': %s, give up after %d requeues", item.Key, c.Resource.Kind, err.Error(), MaxNumRequeues) } - c.workqueue.AddRateLimited(obj) - return fmt.Errorf("error syncing '%s' of resource '%s': %s, requeuing", item.Key, c.Resource.Kind, err.Error()) + if numRequeues == 0 { + requeueCounterDiff = 1 + } else { + requeueCounterDiff = 0 + } + workqueue.AddRateLimited(obj) + return syncResult, requeueCounterDiff, fmt.Errorf("error syncing '%s' of resource '%s': %s, requeuing", item.Key, c.Resource.Kind, err.Error()) } - c.workqueue.Forget(obj) - return nil + workqueue.Forget(obj) + return syncResult, requeueCounterDiff, nil }(obj) if err != nil { utilruntime.HandleError(err) - return true } - return true + return syncResult, requeueCounterDiff, true } -func (c *Controller) syncHandler(item EventItem) error { +func (c *Controller) syncHandler(item EventItem) (*SyncResult, error) { obj, exists, err := c.informer.GetIndexer().GetByKey(item.Key) if err != nil { - return fmt.Errorf("error fetching object with key '%s' from informer cache: %v", item.Key, err) + return nil, fmt.Errorf("error fetching object with key '%s' from informer cache: %v", item.Key, err) } if !exists { utilruntime.HandleError(fmt.Errorf("'%s' in work queue no longer exists", item.Key)) - return nil + return nil, nil } - err = c.objectHandler(obj, item) - if err != nil { - return fmt.Errorf("error handling object with key '%s': %v", item.Key, err) - } - - return nil + return c.objectHandler(obj, item) } -func (c *Controller) objectHandler(obj interface{}, item EventItem) error { - _, err := c.portClient.Authenticate(context.Background(), c.portClient.ClientID, c.portClient.ClientSecret) - if err != nil { - return fmt.Errorf("error authenticating with Port: %v", err) - } - +func (c *Controller) objectHandler(obj interface{}, item EventItem) (*SyncResult, error) { errors := make([]error, 0) + entitiesSet := make(map[string]interface{}) + rawDataExamplesToReturn := make([]interface{}, 0) for _, kindConfig := range c.Resource.KindConfigs { - portEntities, _, err := c.getObjectEntities(obj, kindConfig.Selector, kindConfig.Port.Entity.Mappings, kindConfig.Port.ItemsToParse) + portEntities, rawDataExamples, err := c.getObjectEntities(obj, kindConfig.Selector, kindConfig.Port.Entity.Mappings, kindConfig.Port.ItemsToParse) if err != nil { + entitiesSet = nil utilruntime.HandleError(fmt.Errorf("error getting entities for object key '%s': %v", item.Key, err)) continue } + if rawDataExamplesToReturn != nil { + amountOfExamplesToAdd := min(len(rawDataExamples), MaxRawDataExamplesToSend-len(rawDataExamplesToReturn)) + rawDataExamplesToReturn = append(rawDataExamplesToReturn, rawDataExamples[:amountOfExamplesToAdd]...) + } + for _, portEntity := range portEntities { - err = c.entityHandler(portEntity, item.ActionType) + handledEntity, err := c.entityHandler(portEntity, item.ActionType) if err != nil { errors = append(errors, err) + entitiesSet = nil + } + + if entitiesSet != nil { + entitiesSet[c.portClient.GetEntityIdentifierKey(handledEntity)] = nil } } } + var err error if len(errors) > 0 { - return fmt.Errorf("error handling entity for object key '%s': %v", item.Key, errors) + err = fmt.Errorf("error handling entity for object key '%s': %v", item.Key, errors) } - return nil + return &SyncResult{ + EntitiesSet: entitiesSet, + RawDataExamples: rawDataExamplesToReturn, + ShouldDeleteStaleEntities: entitiesSet != nil, + }, err } func isPassSelector(obj interface{}, selector port.Selector) (bool, error) { @@ -235,20 +296,7 @@ func isPassSelector(obj interface{}, selector port.Selector) (bool, error) { return selectorResult, err } -func mapEntities(obj interface{}, mappings []port.EntityMapping) ([]port.Entity, error) { - entities := make([]port.Entity, 0, len(mappings)) - for _, entityMapping := range mappings { - portEntity, err := mapping.NewEntity(obj, entityMapping) - if err != nil { - return nil, fmt.Errorf("invalid entity mapping '%#v': %v", entityMapping, err) - } - entities = append(entities, *portEntity) - } - - return entities, nil -} - -func (c *Controller) getObjectEntities(obj interface{}, selector port.Selector, mappings []port.EntityMapping, itemsToParse string) ([]port.Entity, []interface{}, error) { +func (c *Controller) getObjectEntities(obj interface{}, selector port.Selector, mappings []port.EntityMapping, itemsToParse string) ([]port.EntityRequest, []interface{}, error) { unstructuredObj, ok := obj.(*unstructured.Unstructured) if !ok { return nil, nil, fmt.Errorf("error casting to unstructured") @@ -259,7 +307,7 @@ func (c *Controller) getObjectEntities(obj interface{}, selector port.Selector, return nil, nil, fmt.Errorf("error converting from unstructured: %v", err) } - entities := make([]port.Entity, 0, len(mappings)) + entities := make([]port.EntityRequest, 0, len(mappings)) objectsToMap := make([]interface{}, 0) if itemsToParse == "" { @@ -297,7 +345,7 @@ func (c *Controller) getObjectEntities(obj interface{}, selector port.Selector, if *c.integrationConfig.SendRawDataExamples && len(rawDataExamples) < MaxRawDataExamplesToSend { rawDataExamples = append(rawDataExamples, objectToMap) } - currentEntities, err := mapEntities(objectToMap, mappings) + currentEntities, err := entity.MapEntities(objectToMap, mappings) if err != nil { return nil, nil, err } @@ -309,117 +357,38 @@ func (c *Controller) getObjectEntities(obj interface{}, selector port.Selector, return entities, rawDataExamples, nil } -func checkIfOwnEntity(entity port.Entity, portClient *cli.PortClient) (*bool, error) { - portEntities, err := portClient.SearchEntities(context.Background(), port.SearchBody{ - Rules: []port.Rule{ - { - Property: "$datasource", - Operator: "contains", - Value: "port-k8s-exporter", - }, - { - Property: "$identifier", - Operator: "=", - Value: entity.Identifier, - }, - { - Property: "$datasource", - Operator: "contains", - Value: fmt.Sprintf("statekey/%s", config.ApplicationConfig.StateKey), - }, - { - Property: "$blueprint", - Operator: "=", - Value: entity.Blueprint, - }, - }, - Combinator: "and", - }) +func (c *Controller) entityHandler(portEntity port.EntityRequest, action EventActionType) (*port.Entity, error) { + _, err := c.portClient.Authenticate(context.Background(), c.portClient.ClientID, c.portClient.ClientSecret) if err != nil { - return nil, err + return nil, fmt.Errorf("error authenticating with Port: %v", err) } - if len(portEntities) > 0 { - result := true - return &result, nil - } - result := false - return &result, nil -} - -func (c *Controller) entityHandler(portEntity port.Entity, action EventActionType) error { switch action { case CreateAction, UpdateAction: - _, err := c.portClient.CreateEntity(context.Background(), &portEntity, "", c.portClient.CreateMissingRelatedEntities) + upsertedEntity, err := c.portClient.CreateEntity(context.Background(), &portEntity, "", c.portClient.CreateMissingRelatedEntities) if err != nil { - return fmt.Errorf("error upserting Port entity '%s' of blueprint '%s': %v", portEntity.Identifier, portEntity.Blueprint, err) + return nil, fmt.Errorf("error upserting Port entity '%s' of blueprint '%s': %v", portEntity.Identifier, portEntity.Blueprint, err) } klog.V(0).Infof("Successfully upserted entity '%s' of blueprint '%s'", portEntity.Identifier, portEntity.Blueprint) + return upsertedEntity, nil case DeleteAction: - result, err := checkIfOwnEntity(portEntity, c.portClient) + result, err := entity.CheckIfOwnEntity(portEntity, c.portClient) if err != nil { - return fmt.Errorf("error checking if entity '%s' of blueprint '%s' is owned by this exporter: %v", portEntity.Identifier, portEntity.Blueprint, err) + return nil, fmt.Errorf("error checking if entity '%s' of blueprint '%s' is owned by this exporter: %v", portEntity.Identifier, portEntity.Blueprint, err) } if *result { - err := c.portClient.DeleteEntity(context.Background(), portEntity.Identifier, portEntity.Blueprint, c.portClient.DeleteDependents) + err := c.portClient.DeleteEntity(context.Background(), portEntity.Identifier.(string), portEntity.Blueprint, c.portClient.DeleteDependents) if err != nil { - return fmt.Errorf("error deleting Port entity '%s' of blueprint '%s': %v", portEntity.Identifier, portEntity.Blueprint, err) + return nil, fmt.Errorf("error deleting Port entity '%s' of blueprint '%s': %v", portEntity.Identifier, portEntity.Blueprint, err) } klog.V(0).Infof("Successfully deleted entity '%s' of blueprint '%s'", portEntity.Identifier, portEntity.Blueprint) } else { klog.Warningf("trying to delete entity but didn't find it in port with this exporter ownership, entity id: '%s', blueprint:'%s'", portEntity.Identifier, portEntity.Blueprint) } - - } - - return nil -} - -func (c *Controller) GetEntitiesSet() (map[string]interface{}, []interface{}, error) { - k8sEntitiesSet := map[string]interface{}{} - objects, err := c.lister.List(labels.Everything()) - if err != nil { - return nil, nil, fmt.Errorf("error listing K8s objects of resource '%s': %v", c.Resource.Kind, err) - } - - rawDataExamples := make([]interface{}, 0) - for _, obj := range objects { - for _, kindConfig := range c.Resource.KindConfigs { - mappings := make([]port.EntityMapping, 0, len(kindConfig.Port.Entity.Mappings)) - for _, m := range kindConfig.Port.Entity.Mappings { - mappings = append(mappings, port.EntityMapping{ - Identifier: m.Identifier, - Blueprint: m.Blueprint, - }) - } - entities, examples, err := c.getObjectEntities(obj, kindConfig.Selector, mappings, kindConfig.Port.ItemsToParse) - if err != nil { - return nil, nil, fmt.Errorf("error getting entities of object: %v", err) - } - for _, entity := range entities { - k8sEntitiesSet[c.portClient.GetEntityIdentifierKey(&entity)] = nil - } - rawDataExamples = append(rawDataExamples, examples[:min(len(examples), MaxRawDataExamplesToSend-len(rawDataExamples))]...) - } } - return k8sEntitiesSet, rawDataExamples, nil -} - -func hashAllEntities(entities []port.Entity) (string, error) { - h := fnv.New64a() - for _, entity := range entities { - entityBytes, err := json.Marshal(entity) - if err != nil { - return "", err - } - _, err = h.Write(entityBytes) - if err != nil { - return "", err - } - } - return strconv.FormatUint(h.Sum64(), 10), nil + return nil, nil } func (c *Controller) shouldSendUpdateEvent(old interface{}, new interface{}, updateEntityOnlyOnDiff bool) bool { @@ -438,12 +407,12 @@ func (c *Controller) shouldSendUpdateEvent(old interface{}, new interface{}, upd klog.Errorf("Error getting new entities: %v", err) return true } - oldEntitiesHash, err := hashAllEntities(oldEntities) + oldEntitiesHash, err := entity.HashAllEntities(oldEntities) if err != nil { klog.Errorf("Error hashing old entities: %v", err) return true } - newEntitiesHash, err := hashAllEntities(newEntities) + newEntitiesHash, err := entity.HashAllEntities(newEntities) if err != nil { klog.Errorf("Error hashing new entities: %v", err) return true diff --git a/pkg/k8s/controller_test.go b/pkg/k8s/controller_test.go index a4cdf74..686abd1 100644 --- a/pkg/k8s/controller_test.go +++ b/pkg/k8s/controller_test.go @@ -280,7 +280,7 @@ func TestJqSearchRelation(t *testing.T) { }, }, } - res, _ := jq.ParseRelations(mapping[0].Relations, nil) + res, _ := jq.ParseMapRecursively(mapping[0].Relations, nil) assert.Equal(t, res, map[string]interface{}{ "k8s-relation": map[string]interface{}{ "combinator": "or", diff --git a/pkg/port/cli/entity.go b/pkg/port/cli/entity.go index 7d3ce53..6103358 100644 --- a/pkg/port/cli/entity.go +++ b/pkg/port/cli/entity.go @@ -51,7 +51,7 @@ func (c *PortClient) ReadEntity(ctx context.Context, id string, blueprint string return &pb.Entity, nil } -func (c *PortClient) CreateEntity(ctx context.Context, e *port.Entity, runID string, createMissingRelatedEntities bool) (*port.Entity, error) { +func (c *PortClient) CreateEntity(ctx context.Context, e *port.EntityRequest, runID string, createMissingRelatedEntities bool) (*port.Entity, error) { pb := &port.ResponseBody{} resp, err := c.Client.R(). SetBody(e). diff --git a/pkg/port/entity/entity.go b/pkg/port/entity/entity.go new file mode 100644 index 0000000..9b150d8 --- /dev/null +++ b/pkg/port/entity/entity.go @@ -0,0 +1,128 @@ +package entity + +import ( + "context" + "encoding/json" + "fmt" + "github.com/port-labs/port-k8s-exporter/pkg/config" + "github.com/port-labs/port-k8s-exporter/pkg/jq" + "github.com/port-labs/port-k8s-exporter/pkg/port" + "github.com/port-labs/port-k8s-exporter/pkg/port/cli" + "hash/fnv" + "reflect" + "strconv" +) + +func CheckIfOwnEntity(entity port.EntityRequest, portClient *cli.PortClient) (*bool, error) { + portEntities, err := portClient.SearchEntities(context.Background(), port.SearchBody{ + Rules: []port.Rule{ + { + Property: "$datasource", + Operator: "contains", + Value: "port-k8s-exporter", + }, + { + Property: "$identifier", + Operator: "=", + Value: entity.Identifier, + }, + { + Property: "$datasource", + Operator: "contains", + Value: fmt.Sprintf("statekey/%s", config.ApplicationConfig.StateKey), + }, + { + Property: "$blueprint", + Operator: "=", + Value: entity.Blueprint, + }, + }, + Combinator: "and", + }) + if err != nil { + return nil, err + } + + if len(portEntities) > 0 { + result := true + return &result, nil + } + result := false + return &result, nil +} + +func HashAllEntities(entities []port.EntityRequest) (string, error) { + h := fnv.New64a() + for _, entity := range entities { + entityBytes, err := json.Marshal(entity) + if err != nil { + return "", err + } + _, err = h.Write(entityBytes) + if err != nil { + return "", err + } + } + return strconv.FormatUint(h.Sum64(), 10), nil +} + +func MapEntities(obj interface{}, mappings []port.EntityMapping) ([]port.EntityRequest, error) { + entities := make([]port.EntityRequest, 0, len(mappings)) + for _, entityMapping := range mappings { + portEntity, err := newEntityRequest(obj, entityMapping) + if err != nil { + return nil, fmt.Errorf("invalid entity mapping '%#v': %v", entityMapping, err) + } + entities = append(entities, *portEntity) + } + + return entities, nil +} + +func newEntityRequest(obj interface{}, mapping port.EntityMapping) (*port.EntityRequest, error) { + var err error + entity := &port.EntityRequest{} + + if reflect.TypeOf(mapping.Identifier).Kind() == reflect.String { + entity.Identifier, err = jq.ParseString(mapping.Identifier.(string), obj) + } else if reflect.TypeOf(mapping.Identifier).Kind() == reflect.Map { + entity.Identifier, err = jq.ParseMapRecursively(mapping.Identifier.(map[string]interface{}), obj) + } + + if err != nil { + return nil, err + } + if mapping.Title != "" { + entity.Title, err = jq.ParseString(mapping.Title, obj) + if err != nil { + return nil, err + } + } + entity.Blueprint, err = jq.ParseString(mapping.Blueprint, obj) + if err != nil { + return nil, err + } + if mapping.Team != "" { + entity.Team, err = jq.ParseInterface(mapping.Team, obj) + if err != nil { + return nil, err + } + } + if mapping.Icon != "" { + entity.Icon, err = jq.ParseString(mapping.Icon, obj) + if err != nil { + return nil, err + } + } + entity.Properties, err = jq.ParseMapInterface(mapping.Properties, obj) + if err != nil { + return nil, err + } + entity.Relations, err = jq.ParseMapRecursively(mapping.Relations, obj) + if err != nil { + return nil, err + } + + return entity, err + +} diff --git a/pkg/port/mapping/entity.go b/pkg/port/mapping/entity.go deleted file mode 100644 index 1cb4d70..0000000 --- a/pkg/port/mapping/entity.go +++ /dev/null @@ -1,48 +0,0 @@ -package mapping - -import ( - "github.com/port-labs/port-k8s-exporter/pkg/jq" - "github.com/port-labs/port-k8s-exporter/pkg/port" -) - -func NewEntity(obj interface{}, mapping port.EntityMapping) (*port.Entity, error) { - var err error - entity := &port.Entity{} - entity.Identifier, err = jq.ParseString(mapping.Identifier, obj) - if err != nil { - return &port.Entity{}, err - } - if mapping.Title != "" { - entity.Title, err = jq.ParseString(mapping.Title, obj) - if err != nil { - return &port.Entity{}, err - } - } - entity.Blueprint, err = jq.ParseString(mapping.Blueprint, obj) - if err != nil { - return &port.Entity{}, err - } - if mapping.Team != "" { - entity.Team, err = jq.ParseInterface(mapping.Team, obj) - if err != nil { - return &port.Entity{}, err - } - } - if mapping.Icon != "" { - entity.Icon, err = jq.ParseString(mapping.Icon, obj) - if err != nil { - return &port.Entity{}, err - } - } - entity.Properties, err = jq.ParseMapInterface(mapping.Properties, obj) - if err != nil { - return &port.Entity{}, err - } - entity.Relations, err = jq.ParseRelations(mapping.Relations, obj) - if err != nil { - return &port.Entity{}, err - } - - return entity, err - -} diff --git a/pkg/port/models.go b/pkg/port/models.go index 7ddfb0d..82fdb09 100644 --- a/pkg/port/models.go +++ b/pkg/port/models.go @@ -221,7 +221,7 @@ type ResponseBody struct { } type EntityMapping struct { - Identifier string `json:"identifier" yaml:"identifier"` + Identifier interface{} `json:"identifier" yaml:"identifier"` Title string `json:"title" yaml:"title"` Blueprint string `json:"blueprint" yaml:"blueprint"` Icon string `json:"icon,omitempty" yaml:"icon,omitempty"` @@ -230,6 +230,16 @@ type EntityMapping struct { Relations map[string]interface{} `json:"relations,omitempty" yaml:"relations,omitempty"` } +type EntityRequest struct { + Identifier interface{} `json:"identifier" yaml:"identifier"` + Title string `json:"title" yaml:"title"` + Blueprint string `json:"blueprint" yaml:"blueprint"` + Icon string `json:"icon,omitempty" yaml:"icon,omitempty"` + Team interface{} `json:"team,omitempty" yaml:"team,omitempty"` + Properties map[string]interface{} `json:"properties,omitempty" yaml:"properties,omitempty"` + Relations map[string]interface{} `json:"relations,omitempty" yaml:"relations,omitempty"` +} + type EntityMappings struct { Mappings []EntityMapping `json:"mappings" yaml:"mappings"` } From 7ce1cdcf7ee32c527b7f774b452b648b9e25fcbc Mon Sep 17 00:00:00 2001 From: talsabagport Date: Wed, 14 Aug 2024 18:44:24 +0300 Subject: [PATCH 02/13] fix tests --- pkg/handlers/controllers.go | 12 ++-- pkg/k8s/controller.go | 7 +- pkg/k8s/controller_test.go | 138 +++++++++++++++++------------------- 3 files changed, 76 insertions(+), 81 deletions(-) diff --git a/pkg/handlers/controllers.go b/pkg/handlers/controllers.go index c2f2790..d0f2894 100644 --- a/pkg/handlers/controllers.go +++ b/pkg/handlers/controllers.go @@ -79,11 +79,6 @@ func (c *ControllersHandler) Handle() { for _, controller := range c.controllers { controller := controller - klog.Infof("Waiting for informer cache to sync for resource '%s'", controller.Resource.Kind) - if err := controller.WaitForCacheSync(c.stopCh); err != nil { - klog.Fatalf("Error while waiting for informer cache sync: %s", err.Error()) - } - go func() { <-c.stopCh klog.Info("Shutting down controllers") @@ -91,6 +86,11 @@ func (c *ControllersHandler) Handle() { klog.Info("Exporter exiting") }() + klog.Infof("Waiting for informer cache to sync for resource '%s'", controller.Resource.Kind) + if err := controller.WaitForCacheSync(c.stopCh); err != nil { + klog.Fatalf("Error while waiting for informer cache sync: %s", err.Error()) + } + syncWg.Add(1) go func() { defer syncWg.Done() @@ -116,7 +116,7 @@ func (c *ControllersHandler) Handle() { defer cancelCtx() go func() { <-c.stopCh - ctx.Done() + cancelCtx() }() if shouldDeleteStaleEntities { diff --git a/pkg/k8s/controller.go b/pkg/k8s/controller.go index ae5346b..6394040 100644 --- a/pkg/k8s/controller.go +++ b/pkg/k8s/controller.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/port-labs/port-k8s-exporter/pkg/goutils" "github.com/port-labs/port-k8s-exporter/pkg/port/entity" + "reflect" "time" "github.com/port-labs/port-k8s-exporter/pkg/config" @@ -265,7 +266,7 @@ func (c *Controller) objectHandler(obj interface{}, item EventItem) (*SyncResult entitiesSet = nil } - if entitiesSet != nil { + if entitiesSet != nil && item.ActionType != DeleteAction { entitiesSet[c.portClient.GetEntityIdentifierKey(handledEntity)] = nil } } @@ -372,6 +373,10 @@ func (c *Controller) entityHandler(portEntity port.EntityRequest, action EventAc klog.V(0).Infof("Successfully upserted entity '%s' of blueprint '%s'", portEntity.Identifier, portEntity.Blueprint) return upsertedEntity, nil case DeleteAction: + if reflect.TypeOf(portEntity.Identifier).Kind() != reflect.String { + return nil, nil + } + result, err := entity.CheckIfOwnEntity(portEntity, c.portClient) if err != nil { return nil, fmt.Errorf("error checking if entity '%s' of blueprint '%s' is owned by this exporter: %v", portEntity.Identifier, portEntity.Blueprint, err) diff --git a/pkg/k8s/controller_test.go b/pkg/k8s/controller_test.go index 686abd1..797cba3 100644 --- a/pkg/k8s/controller_test.go +++ b/pkg/k8s/controller_test.go @@ -28,6 +28,7 @@ import ( var ( noResyncPeriodFunc = func() time.Duration { return 0 } + blueprint = "k8s-export-test-bp" ) type fixture struct { @@ -110,6 +111,10 @@ func newDeployment() *appsv1.Deployment { "app": "port-k8s-exporter", } return &appsv1.Deployment{ + TypeMeta: v1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, ObjectMeta: v1.ObjectMeta{ Name: "port-k8s-exporter", Namespace: "port-k8s-exporter", @@ -141,6 +146,10 @@ func newDeploymentWithCustomLabels(generation int64, labels map[string]string, ) *appsv1.Deployment { return &appsv1.Deployment{ + TypeMeta: v1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, ObjectMeta: v1.ObjectMeta{ Name: "port-k8s-exporter", Namespace: "port-k8s-exporter", @@ -169,6 +178,12 @@ func newDeploymentWithCustomLabels(generation int64, } } +func newGvrToListKind() map[schema.GroupVersionResource]string { + return map[schema.GroupVersionResource]string{ + {Group: "apps", Version: "v1", Resource: "deployments"}: "DeploymentList", + } +} + func newUnstructured(obj interface{}) *unstructured.Unstructured { res, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) if err != nil { @@ -192,32 +207,26 @@ func newController(resource port.Resource, objects []runtime.Object, kubeclient return c } -func (f *fixture) runControllerSyncHandler(item EventItem, expectError bool) { - err := f.controller.syncHandler(item) +func (f *fixture) runControllerSyncHandler(item EventItem, expectedResult *SyncResult, expectError bool) { + syncResult, err := f.controller.syncHandler(item) if !expectError && err != nil { f.t.Errorf("error syncing item: %v", err) } else if expectError && err == nil { f.t.Error("expected error syncing item, got nil") } -} - -func (f *fixture) runControllerGetEntitiesSet(expectedEntitiesSet map[string]interface{}, expectedExamples []interface{}, expectError bool) { - entitiesSet, examples, err := f.controller.GetEntitiesSet() - if !expectError && err != nil { - f.t.Errorf("error syncing item: %v", err) - } else if expectError && err == nil { - f.t.Error("expected error syncing item, got nil") + eq := reflect.DeepEqual(syncResult.EntitiesSet, expectedResult.EntitiesSet) + if !eq { + f.t.Errorf("expected entities set: %v, got: %v", expectedResult.EntitiesSet, syncResult.EntitiesSet) } - eq := reflect.DeepEqual(entitiesSet, expectedEntitiesSet) + eq = reflect.DeepEqual(syncResult.RawDataExamples, expectedResult.RawDataExamples) if !eq { - f.t.Errorf("expected entities set: %v, got: %v", expectedEntitiesSet, entitiesSet) + f.t.Errorf("expected raw data examples: %v, got: %v", expectedResult.RawDataExamples, syncResult.RawDataExamples) } - eq = reflect.DeepEqual(examples, expectedExamples) - if !eq { - f.t.Errorf("expected raw data examples: %v, got: %v", expectedExamples, examples) + if syncResult.ShouldDeleteStaleEntities != expectedResult.ShouldDeleteStaleEntities { + f.t.Errorf("expected should delete stale entities: %v, got: %v", expectedResult.ShouldDeleteStaleEntities, syncResult.ShouldDeleteStaleEntities) } } @@ -232,11 +241,12 @@ func getKey(deployment *appsv1.Deployment, t *testing.T) string { func TestCreateDeployment(t *testing.T) { d := newDeployment() - objects := []runtime.Object{newUnstructured(d)} + ud := newUnstructured(d) + objects := []runtime.Object{ud} resource := newResource("", []port.EntityMapping{ { Identifier: ".metadata.name", - Blueprint: "\"k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), Icon: "\"Microservice\"", Team: "\"Test\"", Properties: map[string]string{ @@ -254,7 +264,7 @@ func TestCreateDeployment(t *testing.T) { item := EventItem{Key: getKey(d, t), ActionType: CreateAction} f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) - f.runControllerSyncHandler(item, false) + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) } func TestJqSearchRelation(t *testing.T) { @@ -262,7 +272,7 @@ func TestJqSearchRelation(t *testing.T) { mapping := []port.EntityMapping{ { Identifier: ".metadata.name", - Blueprint: "\"k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), Icon: "\"Microservice\"", Team: "\"Test\"", Properties: map[string]string{}, @@ -298,12 +308,13 @@ func TestJqSearchRelation(t *testing.T) { func TestCreateDeploymentWithSearchRelation(t *testing.T) { d := newDeployment() - objects := []runtime.Object{newUnstructured(d)} + ud := newUnstructured(d) + objects := []runtime.Object{ud} item := EventItem{Key: getKey(d, t), ActionType: CreateAction} resource := newResource("", []port.EntityMapping{ { Identifier: ".metadata.name", - Blueprint: "\"k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), Icon: "\"Microservice\"", Team: "\"Test\"", Properties: map[string]string{ @@ -333,16 +344,17 @@ func TestCreateDeploymentWithSearchRelation(t *testing.T) { }, }) f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) - f.runControllerSyncHandler(item, false) + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) } func TestUpdateDeployment(t *testing.T) { d := newDeployment() - objects := []runtime.Object{newUnstructured(d)} + ud := newUnstructured(d) + objects := []runtime.Object{ud} resource := newResource("", []port.EntityMapping{ { Identifier: ".metadata.name", - Blueprint: "\"k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), Icon: "\"Microservice\"", Team: "[\"Test\", \"Test2\"]", Properties: map[string]string{ @@ -360,26 +372,27 @@ func TestUpdateDeployment(t *testing.T) { item := EventItem{Key: getKey(d, t), ActionType: UpdateAction} f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) - f.runControllerSyncHandler(item, false) + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) } func TestDeleteDeploymentSameOwner(t *testing.T) { d := newDeployment() - objects := []runtime.Object{newUnstructured(d)} + ud := newUnstructured(d) + objects := []runtime.Object{ud} resource := newResource("", []port.EntityMapping{ { Identifier: "\"entityWithSameOwner\"", - Blueprint: "\"k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), }, }) createItem := EventItem{Key: getKey(d, t), ActionType: CreateAction} item := EventItem{Key: getKey(d, t), ActionType: DeleteAction} f := newFixture(t, &fixtureConfig{stateKey: config.ApplicationConfig.StateKey, resource: resource, objects: objects}) - f.runControllerSyncHandler(createItem, false) + f.runControllerSyncHandler(createItem, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;entityWithSameOwner", blueprint): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) - f.runControllerSyncHandler(item, false) - _, err := f.controller.portClient.ReadEntity(context.Background(), "entityWithSameOwner", "k8s-export-test-bp") + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) + _, err := f.controller.portClient.ReadEntity(context.Background(), "entityWithSameOwner", blueprint) if err != nil && !strings.Contains(err.Error(), "was not found") { t.Errorf("expected entity to be deleted") } @@ -387,21 +400,22 @@ func TestDeleteDeploymentSameOwner(t *testing.T) { func TestDeleteDeploymentDifferentOwner(t *testing.T) { d := newDeployment() - objects := []runtime.Object{newUnstructured(d)} + ud := newUnstructured(d) + objects := []runtime.Object{ud} resource := newResource("", []port.EntityMapping{ { Identifier: "\"entityWithDifferentOwner\"", - Blueprint: "\"k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), }, }) createItem := EventItem{Key: getKey(d, t), ActionType: CreateAction} item := EventItem{Key: getKey(d, t), ActionType: DeleteAction} f := newFixture(t, &fixtureConfig{stateKey: "non_exist_statekey", resource: resource, objects: objects}) - f.runControllerSyncHandler(createItem, false) + f.runControllerSyncHandler(createItem, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;entityWithDifferentOwner", blueprint): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) - f.runControllerSyncHandler(item, false) - _, err := f.controller.portClient.ReadEntity(context.Background(), "entityWithDifferentOwner", "k8s-export-test-bp") + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) + _, err := f.controller.portClient.ReadEntity(context.Background(), "entityWithDifferentOwner", blueprint) if err != nil && strings.Contains(err.Error(), "was not found") { t.Errorf("expected entity to exist") } @@ -409,74 +423,50 @@ func TestDeleteDeploymentDifferentOwner(t *testing.T) { func TestSelectorQueryFilterDeployment(t *testing.T) { d := newDeployment() - objects := []runtime.Object{newUnstructured(d)} + ud := newUnstructured(d) + objects := []runtime.Object{ud} resource := newResource(".metadata.name != \"port-k8s-exporter\"", []port.EntityMapping{ { Identifier: ".metadata.name", - Blueprint: "\"wrong-k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"wrong-%s\"", blueprint), }, }) item := EventItem{Key: getKey(d, t), ActionType: DeleteAction} f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) - f.runControllerSyncHandler(item, false) + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{}, ShouldDeleteStaleEntities: true}, false) } func TestFailPortAuth(t *testing.T) { d := newDeployment() - objects := []runtime.Object{newUnstructured(d)} + ud := newUnstructured(d) + objects := []runtime.Object{ud} resource := newResource("", []port.EntityMapping{ { Identifier: ".metadata.name", - Blueprint: "\"k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), }, }) item := EventItem{Key: getKey(d, t), ActionType: CreateAction} f := newFixture(t, &fixtureConfig{portClientId: "wrongclientid", portClientSecret: "wrongclientsecret", resource: resource, objects: objects}) - f.runControllerSyncHandler(item, true) + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: nil, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: false}, true) } func TestFailDeletePortEntity(t *testing.T) { d := newDeployment() - objects := []runtime.Object{newUnstructured(d)} + ud := newUnstructured(d) + objects := []runtime.Object{ud} resource := newResource("", []port.EntityMapping{ { Identifier: ".metadata.name", - Blueprint: "\"wrong-k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"wrong-%s\"", blueprint), }, }) item := EventItem{Key: getKey(d, t), ActionType: DeleteAction} f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) - f.runControllerSyncHandler(item, false) -} - -func TestGetEntitiesSet(t *testing.T) { - d := newUnstructured(newDeployment()) - var structuredObj interface{} - err := runtime.DefaultUnstructuredConverter.FromUnstructured(d.Object, &structuredObj) - if err != nil { - t.Errorf("Error from unstructured: %s", err.Error()) - } - - objects := []runtime.Object{d} - resource := newResource("", []port.EntityMapping{ - { - Identifier: ".metadata.name", - Blueprint: "\"k8s-export-test-bp\"", - }, - }) - expectedEntitiesSet := map[string]interface{}{ - "k8s-export-test-bp;port-k8s-exporter": nil, - } - - f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) - f.runControllerGetEntitiesSet(expectedEntitiesSet, []interface{}{structuredObj}, false) - - sendRawDataExamples := false - f = newFixture(t, &fixtureConfig{sendRawDataExamples: &sendRawDataExamples, resource: resource, objects: objects}) - f.runControllerGetEntitiesSet(expectedEntitiesSet, []interface{}{}, false) + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) } func TestUpdateHandlerWithIndividualPropertyChanges(t *testing.T) { @@ -490,7 +480,7 @@ func TestUpdateHandlerWithIndividualPropertyChanges(t *testing.T) { newResource("", []port.EntityMapping{ { Identifier: ".metadata.name", - Blueprint: "\"k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), Icon: "\"Microservice\"", Team: "\"Test\"", Properties: map[string]string{ @@ -508,7 +498,7 @@ func TestUpdateHandlerWithIndividualPropertyChanges(t *testing.T) { { Identifier: ".metadata.name", - Blueprint: "\"k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), Icon: "\"Microservice\"", Team: "\"Test\"", Properties: map[string]string{}, @@ -516,7 +506,7 @@ func TestUpdateHandlerWithIndividualPropertyChanges(t *testing.T) { }, { Identifier: ".metadata.name", - Blueprint: "\"k8s-export-test-bp\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), Icon: "\"Microservice\"", Team: "\"Test\"", Properties: map[string]string{ From 8e03ee0988959531a894d91540cd479f99f5b30c Mon Sep 17 00:00:00 2001 From: talsabagport Date: Wed, 14 Aug 2024 18:48:18 +0300 Subject: [PATCH 03/13] fix tests --- pkg/k8s/controller_test.go | 40 -------------------------------------- 1 file changed, 40 deletions(-) diff --git a/pkg/k8s/controller_test.go b/pkg/k8s/controller_test.go index 797cba3..5cc0cea 100644 --- a/pkg/k8s/controller_test.go +++ b/pkg/k8s/controller_test.go @@ -8,7 +8,6 @@ import ( "testing" "time" - "github.com/port-labs/port-k8s-exporter/pkg/jq" "github.com/stretchr/testify/assert" "github.com/port-labs/port-k8s-exporter/pkg/config" @@ -267,45 +266,6 @@ func TestCreateDeployment(t *testing.T) { f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) } -func TestJqSearchRelation(t *testing.T) { - - mapping := []port.EntityMapping{ - { - Identifier: ".metadata.name", - Blueprint: fmt.Sprintf("\"%s\"", blueprint), - Icon: "\"Microservice\"", - Team: "\"Test\"", - Properties: map[string]string{}, - Relations: map[string]interface{}{ - "k8s-relation": map[string]interface{}{ - "combinator": "\"or\"", - "rules": []interface{}{ - map[string]interface{}{ - "property": "\"$identifier\"", - "operator": "\"=\"", - "value": "\"e_AgPMYvq1tAs8TuqM\"", - }, - }, - }, - }, - }, - } - res, _ := jq.ParseMapRecursively(mapping[0].Relations, nil) - assert.Equal(t, res, map[string]interface{}{ - "k8s-relation": map[string]interface{}{ - "combinator": "or", - "rules": []interface{}{ - map[string]interface{}{ - "property": "$identifier", - "operator": "=", - "value": "e_AgPMYvq1tAs8TuqM", - }, - }, - }, - }) - -} - func TestCreateDeploymentWithSearchRelation(t *testing.T) { d := newDeployment() ud := newUnstructured(d) From ef5e13f256efafcb9536dae25dc9540382a30384 Mon Sep 17 00:00:00 2001 From: talsabagport Date: Wed, 14 Aug 2024 18:48:24 +0300 Subject: [PATCH 04/13] fix tests --- pkg/jq/parser_test.go | 53 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 pkg/jq/parser_test.go diff --git a/pkg/jq/parser_test.go b/pkg/jq/parser_test.go new file mode 100644 index 0000000..eab5d40 --- /dev/null +++ b/pkg/jq/parser_test.go @@ -0,0 +1,53 @@ +package jq + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "testing" + + "github.com/port-labs/port-k8s-exporter/pkg/port" + _ "github.com/port-labs/port-k8s-exporter/test_utils" +) + +var ( + blueprint = "k8s-export-test-bp" +) + +func TestJqSearchRelation(t *testing.T) { + + mapping := []port.EntityMapping{ + { + Identifier: ".metadata.name", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), + Icon: "\"Microservice\"", + Team: "\"Test\"", + Properties: map[string]string{}, + Relations: map[string]interface{}{ + "k8s-relation": map[string]interface{}{ + "combinator": "\"or\"", + "rules": []interface{}{ + map[string]interface{}{ + "property": "\"$identifier\"", + "operator": "\"=\"", + "value": "\"e_AgPMYvq1tAs8TuqM\"", + }, + }, + }, + }, + }, + } + res, _ := ParseMapRecursively(mapping[0].Relations, nil) + assert.Equal(t, res, map[string]interface{}{ + "k8s-relation": map[string]interface{}{ + "combinator": "or", + "rules": []interface{}{ + map[string]interface{}{ + "property": "$identifier", + "operator": "=", + "value": "e_AgPMYvq1tAs8TuqM", + }, + }, + }, + }) + +} From 2f99c860762e13d388180fd6574381f719b2b81b Mon Sep 17 00:00:00 2001 From: talsabagport Date: Wed, 14 Aug 2024 18:49:42 +0300 Subject: [PATCH 05/13] fix tests --- pkg/k8s/controller_test.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/k8s/controller_test.go b/pkg/k8s/controller_test.go index 5cc0cea..7667a3e 100644 --- a/pkg/k8s/controller_test.go +++ b/pkg/k8s/controller_test.go @@ -177,12 +177,6 @@ func newDeploymentWithCustomLabels(generation int64, } } -func newGvrToListKind() map[schema.GroupVersionResource]string { - return map[schema.GroupVersionResource]string{ - {Group: "apps", Version: "v1", Resource: "deployments"}: "DeploymentList", - } -} - func newUnstructured(obj interface{}) *unstructured.Unstructured { res, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) if err != nil { From b26453142eb247f8fdbcb4298727100c20bb8da8 Mon Sep 17 00:00:00 2001 From: talsabagport Date: Thu, 15 Aug 2024 10:58:21 +0300 Subject: [PATCH 06/13] add test for search identifier and fix jq parser --- pkg/handlers/controllers.go | 11 +++++------ pkg/jq/parser.go | 10 ++++++++-- pkg/k8s/controller_test.go | 38 +++++++++++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 8 deletions(-) diff --git a/pkg/handlers/controllers.go b/pkg/handlers/controllers.go index d0f2894..f88a548 100644 --- a/pkg/handlers/controllers.go +++ b/pkg/handlers/controllers.go @@ -2,20 +2,19 @@ package handlers import ( "context" - "github.com/port-labs/port-k8s-exporter/pkg/port/integration" - "sync" - "time" - "github.com/port-labs/port-k8s-exporter/pkg/config" "github.com/port-labs/port-k8s-exporter/pkg/crd" "github.com/port-labs/port-k8s-exporter/pkg/goutils" "github.com/port-labs/port-k8s-exporter/pkg/k8s" "github.com/port-labs/port-k8s-exporter/pkg/port" "github.com/port-labs/port-k8s-exporter/pkg/port/cli" + "github.com/port-labs/port-k8s-exporter/pkg/port/integration" "github.com/port-labs/port-k8s-exporter/pkg/signal" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/klog/v2" + "sync" + "time" ) type ControllersHandler struct { @@ -121,14 +120,14 @@ func (c *ControllersHandler) Handle() { if shouldDeleteStaleEntities { klog.Info("Deleting stale entities") - c.RunDeleteStaleEntities(ctx, currentEntitiesSets) + c.runDeleteStaleEntities(ctx, currentEntitiesSets) klog.Info("Done deleting stale entities") } else { klog.Warning("Skipping delete of stale entities due to a failure in getting all current entities from k8s") } } -func (c *ControllersHandler) RunDeleteStaleEntities(ctx context.Context, currentEntitiesSet []map[string]interface{}) { +func (c *ControllersHandler) runDeleteStaleEntities(ctx context.Context, currentEntitiesSet []map[string]interface{}) { _, err := c.portClient.Authenticate(ctx, c.portClient.ClientID, c.portClient.ClientSecret) if err != nil { klog.Errorf("error authenticating with Port: %v", err) diff --git a/pkg/jq/parser.go b/pkg/jq/parser.go index df8e872..d7e5b76 100644 --- a/pkg/jq/parser.go +++ b/pkg/jq/parser.go @@ -123,11 +123,17 @@ func ParseMapRecursively(jqQueries map[string]interface{}, obj interface{}) (map for key, jqQuery := range jqQueries { if reflect.TypeOf(jqQuery).Kind() == reflect.String { - queryRes, _ := ParseMapInterface(map[string]string{key: jqQuery.(string)}, obj) + queryRes, err := ParseMapInterface(map[string]string{key: jqQuery.(string)}, obj) + if err != nil { + return nil, err + } mapInterface = goutils.MergeMaps(mapInterface, queryRes) } else if reflect.TypeOf(jqQuery).Kind() == reflect.Map { for mapKey, mapValue := range jqQuery.(map[string]interface{}) { - queryRes, _ := ParseMapRecursively(map[string]interface{}{mapKey: mapValue}, obj) + queryRes, err := ParseMapRecursively(map[string]interface{}{mapKey: mapValue}, obj) + if err != nil { + return nil, err + } for queryKey, queryVal := range queryRes { if mapInterface[key] == nil { mapInterface[key] = make(map[string]interface{}) diff --git a/pkg/k8s/controller_test.go b/pkg/k8s/controller_test.go index 7667a3e..9702393 100644 --- a/pkg/k8s/controller_test.go +++ b/pkg/k8s/controller_test.go @@ -516,3 +516,41 @@ func TestUpdateHandlerWithIndividualPropertyChanges(t *testing.T) { assert.True(t, result, fmt.Sprintf("Expected true when labels changes and feature flag is off")) } } + +func TestCreateDeploymentWithSearchIdentifier(t *testing.T) { + d := newDeployment() + ud := newUnstructured(d) + objects := []runtime.Object{ud} + item := EventItem{Key: getKey(d, t), ActionType: CreateAction} + resource := newResource("", []port.EntityMapping{ + { + Identifier: map[string]interface{}{ + "combinator": "\"and\"", + "rules": []interface{}{ + map[string]interface{}{ + "property": "\"text\"", + "operator": "\"=\"", + "value": "\"pod\"", + }, + }}, + Blueprint: fmt.Sprintf("\"%s\"", blueprint), + Icon: "\"Microservice\"", + Team: "\"Test\"", + Properties: map[string]string{ + "text": "\"pod\"", + "num": "1", + "bool": "true", + "obj": ".spec.selector", + "arr": ".spec.template.spec.containers", + }, + Relations: map[string]interface{}{ + "k8s-relation": "\"e_AgPMYvq1tAs8TuqM\"", + }, + }, + }) + f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) + + deleteItem := EventItem{Key: getKey(d, t), ActionType: DeleteAction} + f.runControllerSyncHandler(deleteItem, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) +} From fbf740aa0bdb48ca1f29983febd8650602217adc Mon Sep 17 00:00:00 2001 From: talsabagport Date: Sun, 18 Aug 2024 19:51:00 +0300 Subject: [PATCH 07/13] Add and improve tests --- pkg/k8s/controller_test.go | 256 ++++++++++++++++++++++++++++++------- 1 file changed, 208 insertions(+), 48 deletions(-) diff --git a/pkg/k8s/controller_test.go b/pkg/k8s/controller_test.go index 9702393..ff0f35f 100644 --- a/pkg/k8s/controller_test.go +++ b/pkg/k8s/controller_test.go @@ -3,25 +3,26 @@ package k8s import ( "context" "fmt" + guuid "github.com/google/uuid" + "github.com/port-labs/port-k8s-exporter/pkg/signal" "reflect" "strings" "testing" "time" - "github.com/stretchr/testify/assert" - "github.com/port-labs/port-k8s-exporter/pkg/config" "github.com/port-labs/port-k8s-exporter/pkg/port" _ "github.com/port-labs/port-k8s-exporter/test_utils" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - k8sfake "k8s.io/client-go/dynamic/fake" - + "github.com/stretchr/testify/assert" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic/dynamicinformer" + k8sfake "k8s.io/client-go/dynamic/fake" "k8s.io/client-go/tools/cache" ) @@ -33,6 +34,7 @@ var ( type fixture struct { t *testing.T controller *Controller + kubeClient *k8sfake.FakeDynamicClient } type fixtureConfig struct { @@ -41,7 +43,7 @@ type fixtureConfig struct { stateKey string sendRawDataExamples *bool resource port.Resource - objects []runtime.Object + existingObjects []runtime.Object } func newFixture(t *testing.T, fixtureConfig *fixtureConfig) *fixture { @@ -57,7 +59,6 @@ func newFixture(t *testing.T, fixtureConfig *fixtureConfig) *fixture { SendRawDataExamples: sendRawDataExamples, Resources: []port.Resource{fixtureConfig.resource}, } - kubeclient := k8sfake.NewSimpleDynamicClient(runtime.NewScheme()) newConfig := &config.ApplicationConfiguration{ ConfigFilePath: config.ApplicationConfig.ConfigFilePath, @@ -85,9 +86,17 @@ func newFixture(t *testing.T, fixtureConfig *fixtureConfig) *fixture { newConfig.StateKey = fixtureConfig.stateKey } + kubeClient := k8sfake.NewSimpleDynamicClientWithCustomListKinds(runtime.NewScheme(), newGvrToListKind(), fixtureConfig.existingObjects...) + controller := newController(t, fixtureConfig.resource, kubeClient, interationConfig, newConfig) + _, err := controller.portClient.Authenticate(context.Background(), newConfig.PortClientId, newConfig.PortClientSecret) + if err != nil { + t.Errorf("Failed to authenticate with port %v", err) + } + return &fixture{ t: t, - controller: newController(fixtureConfig.resource, fixtureConfig.objects, kubeclient, interationConfig, newConfig), + controller: controller, + kubeClient: kubeClient, } } @@ -185,19 +194,81 @@ func newUnstructured(obj interface{}) *unstructured.Unstructured { return &unstructured.Unstructured{Object: res} } -func newController(resource port.Resource, objects []runtime.Object, kubeclient *k8sfake.FakeDynamicClient, integrationConfig *port.IntegrationAppConfig, applicationConfig *config.ApplicationConfiguration) *Controller { - k8sI := dynamicinformer.NewDynamicSharedInformerFactory(kubeclient, noResyncPeriodFunc()) - s := strings.SplitN(resource.Kind, "/", 3) - gvr := schema.GroupVersionResource{Group: s[0], Version: s[1], Resource: s[2]} - informer := k8sI.ForResource(gvr) +func newGvrToListKind() map[schema.GroupVersionResource]string { + return map[schema.GroupVersionResource]string{ + {Group: "apps", Version: "v1", Resource: "deployments"}: "DeploymentList", + } +} + +func getGvr(kind string) schema.GroupVersionResource { + s := strings.SplitN(kind, "/", 3) + return schema.GroupVersionResource{Group: s[0], Version: s[1], Resource: s[2]} +} + +func newController(t *testing.T, resource port.Resource, kubeClient *k8sfake.FakeDynamicClient, integrationConfig *port.IntegrationAppConfig, applicationConfig *config.ApplicationConfiguration) *Controller { + informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(kubeClient, noResyncPeriodFunc()) + gvr := getGvr(resource.Kind) + informer := informerFactory.ForResource(gvr) kindConfig := port.KindConfig{Selector: resource.Selector, Port: resource.Port} - c := NewController(port.AggregatedResource{Kind: resource.Kind, KindConfigs: []port.KindConfig{kindConfig}}, informer, integrationConfig, applicationConfig) + controller := NewController(port.AggregatedResource{Kind: resource.Kind, KindConfigs: []port.KindConfig{kindConfig}}, informer, integrationConfig, applicationConfig) + ctx := context.Background() + + informerFactory.Start(ctx.Done()) + if synced := informerFactory.WaitForCacheSync(ctx.Done()); !synced[gvr] { + t.Errorf("informer for %s hasn't synced", gvr) + } + + return controller +} + +func (f *fixture) createObjects(t *testing.T, objects []*unstructured.Unstructured) { + gvr := getGvr(f.controller.Resource.Kind) + currentNumEventsInQueue := f.controller.eventsWorkqueue.Len() + if objects != nil { + for _, d := range objects { + _, err := f.kubeClient.Resource(gvr).Namespace(d.GetNamespace()).Create(context.TODO(), d, metav1.CreateOptions{}) + if err != nil { + t.Errorf("error creating object %s: %v", d.GetName(), err) + } + } + + for f.controller.eventsWorkqueue.Len() != currentNumEventsInQueue+len(objects) { + } + } +} + +func (f *fixture) updateObjects(t *testing.T, objects []*unstructured.Unstructured) { + gvr := getGvr(f.controller.Resource.Kind) + currentNumEventsInQueue := f.controller.eventsWorkqueue.Len() + if objects != nil { + for _, d := range objects { + _, err := f.kubeClient.Resource(gvr).Namespace(d.GetNamespace()).Update(context.TODO(), d, metav1.UpdateOptions{}) + if err != nil { + t.Errorf("error updating object %s: %v", d.GetName(), err) + } + } - for _, d := range objects { - informer.Informer().GetIndexer().Add(d) + for f.controller.eventsWorkqueue.Len() != currentNumEventsInQueue+len(objects) { + } } +} - return c +func (f *fixture) deleteObjects(t *testing.T, objects []struct{ namespace, name string }) { + gvr := getGvr(f.controller.Resource.Kind) + if objects != nil { + for _, d := range objects { + err := f.kubeClient.Resource(gvr).Namespace(d.namespace).Delete(context.TODO(), d.name, metav1.DeleteOptions{}) + if err != nil { + t.Errorf("error deleting object %s: %v", d.name, err) + } + } + } +} + +func (f *fixture) assertSyncResult(result *SyncResult, expectedResult *SyncResult) { + assert.True(f.t, reflect.DeepEqual(result.EntitiesSet, expectedResult.EntitiesSet), fmt.Sprintf("expected entities set: %v, got: %v", expectedResult.EntitiesSet, result.EntitiesSet)) + assert.True(f.t, reflect.DeepEqual(result.RawDataExamples, expectedResult.RawDataExamples), fmt.Sprintf("expected raw data examples: %v, got: %v", expectedResult.RawDataExamples, result.RawDataExamples)) + assert.True(f.t, result.ShouldDeleteStaleEntities == expectedResult.ShouldDeleteStaleEntities, fmt.Sprintf("expected should delete stale entities: %v, got: %v", expectedResult.ShouldDeleteStaleEntities, result.ShouldDeleteStaleEntities)) } func (f *fixture) runControllerSyncHandler(item EventItem, expectedResult *SyncResult, expectError bool) { @@ -208,18 +279,20 @@ func (f *fixture) runControllerSyncHandler(item EventItem, expectedResult *SyncR f.t.Error("expected error syncing item, got nil") } - eq := reflect.DeepEqual(syncResult.EntitiesSet, expectedResult.EntitiesSet) - if !eq { - f.t.Errorf("expected entities set: %v, got: %v", expectedResult.EntitiesSet, syncResult.EntitiesSet) - } + f.assertSyncResult(syncResult, expectedResult) +} - eq = reflect.DeepEqual(syncResult.RawDataExamples, expectedResult.RawDataExamples) - if !eq { - f.t.Errorf("expected raw data examples: %v, got: %v", expectedResult.RawDataExamples, syncResult.RawDataExamples) - } +func (f *fixture) runControllerInitialSync(expectedResult *SyncResult) { + syncResult := f.controller.RunInitialSync() - if syncResult.ShouldDeleteStaleEntities != expectedResult.ShouldDeleteStaleEntities { - f.t.Errorf("expected should delete stale entities: %v, got: %v", expectedResult.ShouldDeleteStaleEntities, syncResult.ShouldDeleteStaleEntities) + f.assertSyncResult(syncResult, expectedResult) +} + +func (f *fixture) runControllerEventsSync() func() { + f.controller.RunEventsSync(1, signal.SetupSignalHandler()) + return func() { + for f.controller.eventsWorkqueue.Len() > 0 { + } } } @@ -232,10 +305,105 @@ func getKey(deployment *appsv1.Deployment, t *testing.T) string { return key } +func TestSuccessfulRunInitialSync(t *testing.T) { + d := newDeployment() + ud := newUnstructured(d) + resource := newResource("", []port.EntityMapping{ + { + Identifier: ".metadata.name", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), + Icon: "\"Microservice\"", + Team: "\"Test\"", + Properties: map[string]string{ + "text": "\"pod\"", + "num": "1", + "bool": "true", + "obj": ".spec.selector", + "arr": ".spec.template.spec.containers", + }, + Relations: map[string]interface{}{ + "k8s-relation": "\"e_AgPMYvq1tAs8TuqM\"", + }, + }, + }) + + f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) + f.runControllerInitialSync(&SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}) +} + +func TestRunInitialSyncWithBadMapping(t *testing.T) { + d := newDeployment() + ud := newUnstructured(d) + resource := newResource("", []port.EntityMapping{ + { + Identifier: ".metadata.name", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), + Icon: "\"Microservice\"", + Team: "\"Test\"", + Properties: map[string]string{ + "text": "bad-jq", + "num": "1", + "bool": "true", + "obj": ".spec.selector", + "arr": ".spec.template.spec.containers", + }, + Relations: map[string]interface{}{ + "k8s-relation": "\"e_AgPMYvq1tAs8TuqM\"", + }, + }, + }) + + f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) + f.runControllerInitialSync(&SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{}, ShouldDeleteStaleEntities: false}) +} + +func TestRunEventsSyncWithCreateEvent(t *testing.T) { + d := newDeployment() + ud := newUnstructured(d) + id := guuid.NewString() + resource := newResource("", []port.EntityMapping{ + { + Identifier: fmt.Sprintf("\"%s\"", id), + Blueprint: fmt.Sprintf("\"%s\"", blueprint), + }, + }) + + f := newFixture(t, &fixtureConfig{stateKey: config.ApplicationConfig.StateKey, resource: resource, existingObjects: []runtime.Object{}}) + f.createObjects(t, []*unstructured.Unstructured{ud}) + defer f.controller.portClient.DeleteEntity(context.Background(), id, blueprint, true) + waitForSync := f.runControllerEventsSync() + waitForSync() + + assert.Eventually(t, func() bool { + _, err := f.controller.portClient.ReadEntity(context.Background(), id, blueprint) + return err == nil + }, time.Second*5, time.Millisecond*500) +} + +func TestRunEventsSyncWithDeleteEvent(t *testing.T) { + d := newDeployment() + ud := newUnstructured(d) + resource := newResource("", []port.EntityMapping{ + { + Identifier: "\"entityToBeDeleted\"", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), + }, + }) + f := newFixture(t, &fixtureConfig{stateKey: config.ApplicationConfig.StateKey, resource: resource, existingObjects: []runtime.Object{ud}}) + + f.runControllerInitialSync(&SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, "entityToBeDeleted"): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}) + f.deleteObjects(t, []struct{ namespace, name string }{{namespace: d.Namespace, name: d.Name}}) + + assert.Eventually(t, func() bool { + _, err := f.controller.portClient.ReadEntity(context.Background(), "entityToBeDeleted", blueprint) + return err != nil && strings.Contains(err.Error(), "was not found") + }, time.Second*5, time.Millisecond*500) + +} + func TestCreateDeployment(t *testing.T) { d := newDeployment() ud := newUnstructured(d) - objects := []runtime.Object{ud} resource := newResource("", []port.EntityMapping{ { Identifier: ".metadata.name", @@ -256,14 +424,13 @@ func TestCreateDeployment(t *testing.T) { }) item := EventItem{Key: getKey(d, t), ActionType: CreateAction} - f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) + f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) } func TestCreateDeploymentWithSearchRelation(t *testing.T) { d := newDeployment() ud := newUnstructured(d) - objects := []runtime.Object{ud} item := EventItem{Key: getKey(d, t), ActionType: CreateAction} resource := newResource("", []port.EntityMapping{ { @@ -297,14 +464,13 @@ func TestCreateDeploymentWithSearchRelation(t *testing.T) { }, }, }) - f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) + f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) } func TestUpdateDeployment(t *testing.T) { d := newDeployment() ud := newUnstructured(d) - objects := []runtime.Object{ud} resource := newResource("", []port.EntityMapping{ { Identifier: ".metadata.name", @@ -325,14 +491,13 @@ func TestUpdateDeployment(t *testing.T) { }) item := EventItem{Key: getKey(d, t), ActionType: UpdateAction} - f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) + f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) } func TestDeleteDeploymentSameOwner(t *testing.T) { d := newDeployment() ud := newUnstructured(d) - objects := []runtime.Object{ud} resource := newResource("", []port.EntityMapping{ { Identifier: "\"entityWithSameOwner\"", @@ -342,7 +507,7 @@ func TestDeleteDeploymentSameOwner(t *testing.T) { createItem := EventItem{Key: getKey(d, t), ActionType: CreateAction} item := EventItem{Key: getKey(d, t), ActionType: DeleteAction} - f := newFixture(t, &fixtureConfig{stateKey: config.ApplicationConfig.StateKey, resource: resource, objects: objects}) + f := newFixture(t, &fixtureConfig{stateKey: config.ApplicationConfig.StateKey, resource: resource, existingObjects: []runtime.Object{ud}}) f.runControllerSyncHandler(createItem, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;entityWithSameOwner", blueprint): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) @@ -355,7 +520,6 @@ func TestDeleteDeploymentSameOwner(t *testing.T) { func TestDeleteDeploymentDifferentOwner(t *testing.T) { d := newDeployment() ud := newUnstructured(d) - objects := []runtime.Object{ud} resource := newResource("", []port.EntityMapping{ { Identifier: "\"entityWithDifferentOwner\"", @@ -365,7 +529,7 @@ func TestDeleteDeploymentDifferentOwner(t *testing.T) { createItem := EventItem{Key: getKey(d, t), ActionType: CreateAction} item := EventItem{Key: getKey(d, t), ActionType: DeleteAction} - f := newFixture(t, &fixtureConfig{stateKey: "non_exist_statekey", resource: resource, objects: objects}) + f := newFixture(t, &fixtureConfig{stateKey: "non_exist_statekey", resource: resource, existingObjects: []runtime.Object{ud}}) f.runControllerSyncHandler(createItem, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;entityWithDifferentOwner", blueprint): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) @@ -378,7 +542,6 @@ func TestDeleteDeploymentDifferentOwner(t *testing.T) { func TestSelectorQueryFilterDeployment(t *testing.T) { d := newDeployment() ud := newUnstructured(d) - objects := []runtime.Object{ud} resource := newResource(".metadata.name != \"port-k8s-exporter\"", []port.EntityMapping{ { Identifier: ".metadata.name", @@ -387,14 +550,13 @@ func TestSelectorQueryFilterDeployment(t *testing.T) { }) item := EventItem{Key: getKey(d, t), ActionType: DeleteAction} - f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) + f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{}, ShouldDeleteStaleEntities: true}, false) } func TestFailPortAuth(t *testing.T) { d := newDeployment() ud := newUnstructured(d) - objects := []runtime.Object{ud} resource := newResource("", []port.EntityMapping{ { Identifier: ".metadata.name", @@ -403,14 +565,13 @@ func TestFailPortAuth(t *testing.T) { }) item := EventItem{Key: getKey(d, t), ActionType: CreateAction} - f := newFixture(t, &fixtureConfig{portClientId: "wrongclientid", portClientSecret: "wrongclientsecret", resource: resource, objects: objects}) + f := newFixture(t, &fixtureConfig{portClientId: "wrongclientid", portClientSecret: "wrongclientsecret", resource: resource, existingObjects: []runtime.Object{ud}}) f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: nil, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: false}, true) } func TestFailDeletePortEntity(t *testing.T) { d := newDeployment() ud := newUnstructured(d) - objects := []runtime.Object{ud} resource := newResource("", []port.EntityMapping{ { Identifier: ".metadata.name", @@ -419,7 +580,7 @@ func TestFailDeletePortEntity(t *testing.T) { }) item := EventItem{Key: getKey(d, t), ActionType: DeleteAction} - f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) + f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) } @@ -478,7 +639,7 @@ func TestUpdateHandlerWithIndividualPropertyChanges(t *testing.T) { for _, mapping := range fullMapping { - controllerWithFullMapping := newFixture(t, &fixtureConfig{resource: mapping, objects: []runtime.Object{}}).controller + controllerWithFullMapping := newFixture(t, &fixtureConfig{resource: mapping, existingObjects: []runtime.Object{}}).controller // Test changes in each individual property properties := map[string]Property{ @@ -520,7 +681,6 @@ func TestUpdateHandlerWithIndividualPropertyChanges(t *testing.T) { func TestCreateDeploymentWithSearchIdentifier(t *testing.T) { d := newDeployment() ud := newUnstructured(d) - objects := []runtime.Object{ud} item := EventItem{Key: getKey(d, t), ActionType: CreateAction} resource := newResource("", []port.EntityMapping{ { @@ -548,7 +708,7 @@ func TestCreateDeploymentWithSearchIdentifier(t *testing.T) { }, }, }) - f := newFixture(t, &fixtureConfig{resource: resource, objects: objects}) + f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) deleteItem := EventItem{Key: getKey(d, t), ActionType: DeleteAction} From f5adae62b6b16e9417e94d8c569f5b6654287c1b Mon Sep 17 00:00:00 2001 From: talsabagport Date: Mon, 19 Aug 2024 21:08:43 +0300 Subject: [PATCH 08/13] Add tests --- pkg/handlers/controllers_test.go | 421 +++++++++++++++++++++++++++++++ pkg/k8s/controller.go | 8 +- pkg/k8s/controller_test.go | 327 ++++++++++++------------ pkg/port/cli/integration.go | 14 + pkg/port/models.go | 18 +- 5 files changed, 613 insertions(+), 175 deletions(-) create mode 100644 pkg/handlers/controllers_test.go diff --git a/pkg/handlers/controllers_test.go b/pkg/handlers/controllers_test.go new file mode 100644 index 0000000..8b6b574 --- /dev/null +++ b/pkg/handlers/controllers_test.go @@ -0,0 +1,421 @@ +package handlers + +import ( + "context" + "errors" + "fmt" + guuid "github.com/google/uuid" + "github.com/port-labs/port-k8s-exporter/pkg/config" + "github.com/port-labs/port-k8s-exporter/pkg/defaults" + "github.com/port-labs/port-k8s-exporter/pkg/k8s" + "github.com/port-labs/port-k8s-exporter/pkg/port" + "github.com/port-labs/port-k8s-exporter/pkg/port/cli" + _ "github.com/port-labs/port-k8s-exporter/test_utils" + "github.com/stretchr/testify/assert" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1" + apiextensionsv1fake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/typed/apiextensions/v1/fake" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery" + "k8s.io/client-go/discovery/cached/memory" + discoveryfake "k8s.io/client-go/discovery/fake" + k8sfake "k8s.io/client-go/dynamic/fake" + fakeclientset "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/restmapper" + "strings" + "sync" + "testing" + "time" +) + +var ( + blueprint = "k8s-export-test-bp" + deploymentKind = "apps/v1/deployments" +) + +type fixture struct { + t *testing.T + controllersHandler *ControllersHandler + k8sClient *k8s.Client + portClient *cli.PortClient +} + +type fixtureConfig struct { + portClientId string + portClientSecret string + stateKey string + sendRawDataExamples *bool + resource port.Resource + existingObjects []runtime.Object +} + +type resourceMapEntry struct { + list *metav1.APIResourceList + err error +} + +type fakeDiscovery struct { + *discoveryfake.FakeDiscovery + + lock sync.Mutex + groupList *metav1.APIGroupList + groupListErr error + resourceMap map[string]*resourceMapEntry +} + +func (c *fakeDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { + c.lock.Lock() + defer c.lock.Unlock() + if rl, ok := c.resourceMap[groupVersion]; ok { + return rl.list, rl.err + } + return nil, errors.New("doesn't exist") +} + +func (c *fakeDiscovery) ServerGroups() (*metav1.APIGroupList, error) { + c.lock.Lock() + defer c.lock.Unlock() + if c.groupList == nil { + return nil, errors.New("doesn't exist") + } + return c.groupList, c.groupListErr +} + +func newFixture(t *testing.T, fixtureConfig *fixtureConfig) *fixture { + defaultTrue := true + sendRawDataExamples := &defaultTrue + if fixtureConfig.sendRawDataExamples != nil { + sendRawDataExamples = fixtureConfig.sendRawDataExamples + } + + integrationConfig := &port.IntegrationAppConfig{ + DeleteDependents: true, + CreateMissingRelatedEntities: true, + SendRawDataExamples: sendRawDataExamples, + Resources: []port.Resource{fixtureConfig.resource}, + } + + applicationConfig := &config.ApplicationConfiguration{ + ConfigFilePath: config.ApplicationConfig.ConfigFilePath, + ResyncInterval: config.ApplicationConfig.ResyncInterval, + PortBaseURL: config.ApplicationConfig.PortBaseURL, + EventListenerType: config.ApplicationConfig.EventListenerType, + CreateDefaultResources: config.ApplicationConfig.CreateDefaultResources, + OverwriteConfigurationOnRestart: config.ApplicationConfig.OverwriteConfigurationOnRestart, + Resources: integrationConfig.Resources, + DeleteDependents: integrationConfig.DeleteDependents, + CreateMissingRelatedEntities: integrationConfig.CreateMissingRelatedEntities, + UpdateEntityOnlyOnDiff: config.ApplicationConfig.UpdateEntityOnlyOnDiff, + PortClientId: config.ApplicationConfig.PortClientId, + PortClientSecret: config.ApplicationConfig.PortClientSecret, + StateKey: config.ApplicationConfig.StateKey, + } + + if fixtureConfig.portClientId != "" { + applicationConfig.PortClientId = fixtureConfig.portClientId + } + if fixtureConfig.portClientSecret != "" { + applicationConfig.PortClientSecret = fixtureConfig.portClientSecret + } + if fixtureConfig.stateKey != "" { + applicationConfig.StateKey = fixtureConfig.stateKey + } + + exporterConfig := &port.Config{ + StateKey: applicationConfig.StateKey, + EventListenerType: applicationConfig.EventListenerType, + CreateDefaultResources: applicationConfig.CreateDefaultResources, + ResyncInterval: applicationConfig.ResyncInterval, + OverwriteConfigurationOnRestart: applicationConfig.OverwriteConfigurationOnRestart, + Resources: applicationConfig.Resources, + DeleteDependents: applicationConfig.DeleteDependents, + CreateMissingRelatedEntities: applicationConfig.CreateMissingRelatedEntities, + } + + gvr := getGvr(fixtureConfig.resource.Kind) + + fakeD := &fakeDiscovery{ + groupList: &metav1.APIGroupList{ + Groups: []metav1.APIGroup{{ + Name: gvr.Group, + Versions: []metav1.GroupVersionForDiscovery{{ + GroupVersion: fmt.Sprintf("%s/%s", gvr.Group, gvr.Version), + Version: gvr.Version, + }}, + }}, + }, + resourceMap: map[string]*resourceMapEntry{ + fmt.Sprintf("%s/%s", gvr.Group, gvr.Version): { + list: &metav1.APIResourceList{ + GroupVersion: fmt.Sprintf("%s/%s", gvr.Group, gvr.Version), + APIResources: []metav1.APIResource{{ + Name: gvr.Resource, + Namespaced: true, + Group: gvr.Group, + Version: gvr.Version, + }}, + }, + }, + }, + } + + kClient := fakeclientset.NewSimpleClientset() + discoveryClient := discovery.NewDiscoveryClient(fakeD.RESTClient()) + dynamicClient := k8sfake.NewSimpleDynamicClientWithCustomListKinds(runtime.NewScheme(), newGvrToListKind(), fixtureConfig.existingObjects...) + fae := apiextensionsv1fake.FakeApiextensionsV1{Fake: &kClient.Fake} + apiExtensionsClient := apiextensionsv1.New(fae.RESTClient()) + cacheClient := memory.NewMemCacheClient(fakeD) + discoveryMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheClient) + k8sClient := &k8s.Client{DiscoveryClient: discoveryClient, DynamicClient: dynamicClient, DiscoveryMapper: discoveryMapper, ApiExtensionClient: apiExtensionsClient} + portClient := cli.New(applicationConfig) + err := defaults.InitIntegration(portClient, exporterConfig) + if err != nil { + t.Errorf("error initializing integration: %v", err) + } + + controllersHandler := NewControllersHandler(exporterConfig, integrationConfig, k8sClient, portClient) + + return &fixture{ + t: t, + controllersHandler: controllersHandler, + k8sClient: k8sClient, + portClient: portClient, + } +} + +func newResource(selectorQuery string, mappings []port.EntityMapping) port.Resource { + return port.Resource{ + Kind: deploymentKind, + Selector: port.Selector{ + Query: selectorQuery, + }, + Port: port.Port{ + Entity: port.EntityMappings{ + Mappings: mappings, + }, + }, + } +} + +func newDeployment() *appsv1.Deployment { + labels := map[string]string{ + "app": "port-k8s-exporter", + } + return &appsv1.Deployment{ + TypeMeta: v1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + }, + ObjectMeta: v1.ObjectMeta{ + Name: "port-k8s-exporter", + Namespace: "port-k8s-exporter", + }, + Spec: appsv1.DeploymentSpec{ + Selector: &v1.LabelSelector{ + MatchLabels: labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: v1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "port-k8s-exporter", + Image: "port-k8s-exporter:latest", + }, + }, + }, + }, + }, + } +} + +func newUnstructured(obj interface{}) *unstructured.Unstructured { + res, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) + if err != nil { + panic(err) + } + return &unstructured.Unstructured{Object: res} +} + +func newGvrToListKind() map[schema.GroupVersionResource]string { + return map[schema.GroupVersionResource]string{ + {Group: "apps", Version: "v1", Resource: "deployments"}: "DeploymentList", + } +} + +func getGvr(kind string) schema.GroupVersionResource { + s := strings.SplitN(kind, "/", 3) + return schema.GroupVersionResource{Group: s[0], Version: s[1], Resource: s[2]} +} + +func getBaseDeploymentResource() port.Resource { + return newResource("", []port.EntityMapping{ + { + Identifier: ".metadata.name", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), + Icon: "\"Microservice\"", + Team: "\"Test\"", + Properties: map[string]string{ + "text": "\"pod\"", + "num": "1", + "bool": "true", + "obj": ".spec.selector", + "arr": ".spec.template.spec.containers", + }, + Relations: map[string]interface{}{ + "k8s-relation": "\"e_AgPMYvq1tAs8TuqM\"", + }, + }, + }) +} + +func (f *fixture) createObjects(objects []*unstructured.Unstructured, kind string) { + if objects != nil { + for _, d := range objects { + gvr := getGvr(kind) + _, err := f.k8sClient.DynamicClient.Resource(gvr).Namespace(d.GetNamespace()).Create(context.TODO(), d, metav1.CreateOptions{}) + if err != nil { + f.t.Errorf("error creating object %s: %v", d.GetName(), err) + } + } + } +} + +func (f *fixture) updateObjects(objects []*unstructured.Unstructured, kind string) { + if objects != nil { + for _, d := range objects { + gvr := getGvr(kind) + _, err := f.k8sClient.DynamicClient.Resource(gvr).Namespace(d.GetNamespace()).Update(context.TODO(), d, metav1.UpdateOptions{}) + if err != nil { + f.t.Errorf("error updating object %s: %v", d.GetName(), err) + } + } + } +} + +func (f *fixture) deleteObjects(objects []struct{ kind, namespace, name string }) { + if objects != nil { + for _, d := range objects { + gvr := getGvr(d.kind) + err := f.k8sClient.DynamicClient.Resource(gvr).Namespace(d.namespace).Delete(context.TODO(), d.name, metav1.DeleteOptions{}) + if err != nil { + f.t.Errorf("error deleting object %s: %v", d.name, err) + } + } + } +} + +func (f *fixture) assertDeploymentHandled(d *appsv1.Deployment) { + assert.Eventually(f.t, func() bool { + integrationKinds, err := f.portClient.GetIntegrationKinds(f.controllersHandler.stateKey) + if err != nil { + return false + } + + examples := integrationKinds[deploymentKind].Examples + for _, example := range examples { + if example.Data["metadata"].(map[string]interface{})["name"] == d.GetName() { + return true + } + } + + return false + }, time.Second*5, time.Millisecond*500) + + assert.Eventually(f.t, func() bool { + entities, err := f.portClient.SearchEntities(context.Background(), port.SearchBody{ + Rules: []port.Rule{ + { + Property: "$datasource", + Operator: "contains", + Value: "port-k8s-exporter", + }, + { + Property: "$datasource", + Operator: "contains", + Value: fmt.Sprintf("statekey/%s", f.controllersHandler.stateKey), + }, + }, + Combinator: "and", + }) + + return err == nil && len(entities) == 1 && entities[0].Identifier == d.GetName() + }, time.Second*5, time.Millisecond*500) +} + +func (f *fixture) runControllersHandle() { + f.controllersHandler.Handle() +} + +func TestSuccessfulControllersHandle(t *testing.T) { + id := guuid.NewString() + d := newDeployment() + d.Name = id + ud := newUnstructured(d) + resource := getBaseDeploymentResource() + f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) + + f.runControllersHandle() + + f.assertDeploymentHandled(d) + + nid := guuid.NewString() + nd := newDeployment() + nd.Name = nid + f.createObjects([]*unstructured.Unstructured{newUnstructured(nd)}, deploymentKind) + + assert.Eventually(t, func() bool { + _, err := f.portClient.ReadEntity(context.Background(), nid, blueprint) + return err == nil + }, time.Second*5, time.Millisecond*500) + + nd.Spec.Selector.MatchLabels["app"] = "new-label" + f.updateObjects([]*unstructured.Unstructured{newUnstructured(nd)}, deploymentKind) + + assert.Eventually(t, func() bool { + entity, err := f.portClient.ReadEntity(context.Background(), nid, blueprint) + return err == nil && entity.Properties["obj"].(map[string]interface{})["matchLabels"].(map[string]interface{})["app"] == nd.Spec.Selector.MatchLabels["app"] + }, time.Second*5, time.Millisecond*500) + + f.deleteObjects([]struct{ kind, namespace, name string }{{kind: deploymentKind, namespace: nd.Namespace, name: nd.Name}}) + + assert.Eventually(t, func() bool { + _, err := f.portClient.ReadEntity(context.Background(), nid, blueprint) + return err != nil && strings.Contains(err.Error(), "was not found") + }, time.Second*5, time.Millisecond*500) +} + +func TestControllersHandleTolerateFailure(t *testing.T) { + resource := getBaseDeploymentResource() + f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{}}) + + f.runControllersHandle() + + invalidId := fmt.Sprintf("%s!@#", guuid.NewString()) + d := newDeployment() + d.Name = invalidId + f.createObjects([]*unstructured.Unstructured{newUnstructured(d)}, deploymentKind) + + id := guuid.NewString() + d.Name = id + f.createObjects([]*unstructured.Unstructured{newUnstructured(d)}, deploymentKind) + + assert.Eventually(t, func() bool { + _, err := f.portClient.ReadEntity(context.Background(), id, blueprint) + return err == nil + }, time.Second*5, time.Millisecond*500) + + f.deleteObjects([]struct{ kind, namespace, name string }{{kind: deploymentKind, namespace: d.Namespace, name: d.Name}}) + + assert.Eventually(t, func() bool { + _, err := f.portClient.ReadEntity(context.Background(), id, blueprint) + return err != nil && strings.Contains(err.Error(), "was not found") + }, time.Second*5, time.Millisecond*500) +} diff --git a/pkg/k8s/controller.go b/pkg/k8s/controller.go index 6394040..443e0b3 100644 --- a/pkg/k8s/controller.go +++ b/pkg/k8s/controller.go @@ -167,7 +167,9 @@ func (c *Controller) RunEventsSync(workers int, stopCh <-chan struct{}) { for i := 0; i < workers; i++ { go wait.Until(func() { - for _, _, shouldContinue := c.processNextWorkItem(c.eventsWorkqueue); shouldContinue; { + shouldContinue := true + for shouldContinue { + _, _, shouldContinue = c.processNextWorkItem(c.eventsWorkqueue) } }, time.Second, stopCh) } @@ -215,7 +217,7 @@ func (c *Controller) processNextWorkItem(workqueue workqueue.RateLimitingInterfa requeueCounterDiff = 0 } workqueue.AddRateLimited(obj) - return syncResult, requeueCounterDiff, fmt.Errorf("error syncing '%s' of resource '%s': %s, requeuing", item.Key, c.Resource.Kind, err.Error()) + return nil, requeueCounterDiff, fmt.Errorf("error syncing '%s' of resource '%s': %s, requeuing", item.Key, c.Resource.Kind, err.Error()) } workqueue.Forget(obj) @@ -370,7 +372,7 @@ func (c *Controller) entityHandler(portEntity port.EntityRequest, action EventAc if err != nil { return nil, fmt.Errorf("error upserting Port entity '%s' of blueprint '%s': %v", portEntity.Identifier, portEntity.Blueprint, err) } - klog.V(0).Infof("Successfully upserted entity '%s' of blueprint '%s'", portEntity.Identifier, portEntity.Blueprint) + klog.V(0).Infof("Successfully upserted entity '%s' of blueprint '%s'", upsertedEntity.Identifier, upsertedEntity.Blueprint) return upsertedEntity, nil case DeleteAction: if reflect.TypeOf(portEntity.Identifier).Kind() != reflect.String { diff --git a/pkg/k8s/controller_test.go b/pkg/k8s/controller_test.go index ff0f35f..ba5ea19 100644 --- a/pkg/k8s/controller_test.go +++ b/pkg/k8s/controller_test.go @@ -88,10 +88,7 @@ func newFixture(t *testing.T, fixtureConfig *fixtureConfig) *fixture { kubeClient := k8sfake.NewSimpleDynamicClientWithCustomListKinds(runtime.NewScheme(), newGvrToListKind(), fixtureConfig.existingObjects...) controller := newController(t, fixtureConfig.resource, kubeClient, interationConfig, newConfig) - _, err := controller.portClient.Authenticate(context.Background(), newConfig.PortClientId, newConfig.PortClientSecret) - if err != nil { - t.Errorf("Failed to authenticate with port %v", err) - } + controller.portClient.Authenticate(context.Background(), newConfig.PortClientId, newConfig.PortClientSecret) return &fixture{ t: t, @@ -200,11 +197,6 @@ func newGvrToListKind() map[schema.GroupVersionResource]string { } } -func getGvr(kind string) schema.GroupVersionResource { - s := strings.SplitN(kind, "/", 3) - return schema.GroupVersionResource{Group: s[0], Version: s[1], Resource: s[2]} -} - func newController(t *testing.T, resource port.Resource, kubeClient *k8sfake.FakeDynamicClient, integrationConfig *port.IntegrationAppConfig, applicationConfig *config.ApplicationConfiguration) *Controller { informerFactory := dynamicinformer.NewDynamicSharedInformerFactory(kubeClient, noResyncPeriodFunc()) gvr := getGvr(resource.Kind) @@ -221,6 +213,41 @@ func newController(t *testing.T, resource port.Resource, kubeClient *k8sfake.Fak return controller } +func getGvr(kind string) schema.GroupVersionResource { + s := strings.SplitN(kind, "/", 3) + return schema.GroupVersionResource{Group: s[0], Version: s[1], Resource: s[2]} +} + +func getKey(deployment *appsv1.Deployment, t *testing.T) string { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(deployment) + if err != nil { + t.Errorf("Unexpected error getting key for deployment %v: %v", deployment.Name, err) + return "" + } + return key +} + +func getBaseDeploymentResource() port.Resource { + return newResource("", []port.EntityMapping{ + { + Identifier: ".metadata.name", + Blueprint: fmt.Sprintf("\"%s\"", blueprint), + Icon: "\"Microservice\"", + Team: "\"Test\"", + Properties: map[string]string{ + "text": "\"pod\"", + "num": "1", + "bool": "true", + "obj": ".spec.selector", + "arr": ".spec.template.spec.containers", + }, + Relations: map[string]interface{}{ + "k8s-relation": "\"e_AgPMYvq1tAs8TuqM\"", + }, + }, + }) +} + func (f *fixture) createObjects(t *testing.T, objects []*unstructured.Unstructured) { gvr := getGvr(f.controller.Resource.Kind) currentNumEventsInQueue := f.controller.eventsWorkqueue.Len() @@ -232,8 +259,9 @@ func (f *fixture) createObjects(t *testing.T, objects []*unstructured.Unstructur } } - for f.controller.eventsWorkqueue.Len() != currentNumEventsInQueue+len(objects) { - } + assert.Eventually(t, func() bool { + return f.controller.eventsWorkqueue.Len() == currentNumEventsInQueue+len(objects) + }, time.Second*2, time.Millisecond*100) } } @@ -248,8 +276,9 @@ func (f *fixture) updateObjects(t *testing.T, objects []*unstructured.Unstructur } } - for f.controller.eventsWorkqueue.Len() != currentNumEventsInQueue+len(objects) { - } + assert.Eventually(t, func() bool { + return f.controller.eventsWorkqueue.Len() == currentNumEventsInQueue+len(objects) + }, time.Second*2, time.Millisecond*100) } } @@ -288,72 +317,50 @@ func (f *fixture) runControllerInitialSync(expectedResult *SyncResult) { f.assertSyncResult(syncResult, expectedResult) } -func (f *fixture) runControllerEventsSync() func() { +func (f *fixture) runControllerEventsSync() { f.controller.RunEventsSync(1, signal.SetupSignalHandler()) - return func() { - for f.controller.eventsWorkqueue.Len() > 0 { - } - } } -func getKey(deployment *appsv1.Deployment, t *testing.T) string { - key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(deployment) - if err != nil { - t.Errorf("Unexpected error getting key for deployment %v: %v", deployment.Name, err) - return "" - } - return key +func TestSuccessfulRunInitialSync(t *testing.T) { + d := newDeployment() + ud := newUnstructured(d) + f := newFixture(t, &fixtureConfig{resource: getBaseDeploymentResource(), existingObjects: []runtime.Object{ud}}) + f.runControllerInitialSync(&SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}) } -func TestSuccessfulRunInitialSync(t *testing.T) { +func TestRunInitialSyncWithSelectorQuery(t *testing.T) { d := newDeployment() ud := newUnstructured(d) - resource := newResource("", []port.EntityMapping{ - { - Identifier: ".metadata.name", - Blueprint: fmt.Sprintf("\"%s\"", blueprint), - Icon: "\"Microservice\"", - Team: "\"Test\"", - Properties: map[string]string{ - "text": "\"pod\"", - "num": "1", - "bool": "true", - "obj": ".spec.selector", - "arr": ".spec.template.spec.containers", - }, - Relations: map[string]interface{}{ - "k8s-relation": "\"e_AgPMYvq1tAs8TuqM\"", - }, - }, - }) + notSelectedResource := getBaseDeploymentResource() + notSelectedResource.Selector.Query = ".metadata.name != \"port-k8s-exporter\"" + f := newFixture(t, &fixtureConfig{resource: notSelectedResource, existingObjects: []runtime.Object{ud}}) + f.runControllerInitialSync(&SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{}, ShouldDeleteStaleEntities: true}) +} - f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) - f.runControllerInitialSync(&SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}) +func TestRunInitialSyncWithBadPropMapping(t *testing.T) { + d := newDeployment() + ud := newUnstructured(d) + badPropMappingResource := getBaseDeploymentResource() + badPropMappingResource.Port.Entity.Mappings[0].Properties["text"] = "bad-jq" + f := newFixture(t, &fixtureConfig{resource: badPropMappingResource, existingObjects: []runtime.Object{ud}}) + f.runControllerInitialSync(&SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{}, ShouldDeleteStaleEntities: false}) } -func TestRunInitialSyncWithBadMapping(t *testing.T) { +func TestRunInitialSyncWithBadEntity(t *testing.T) { d := newDeployment() ud := newUnstructured(d) - resource := newResource("", []port.EntityMapping{ - { - Identifier: ".metadata.name", - Blueprint: fmt.Sprintf("\"%s\"", blueprint), - Icon: "\"Microservice\"", - Team: "\"Test\"", - Properties: map[string]string{ - "text": "bad-jq", - "num": "1", - "bool": "true", - "obj": ".spec.selector", - "arr": ".spec.template.spec.containers", - }, - Relations: map[string]interface{}{ - "k8s-relation": "\"e_AgPMYvq1tAs8TuqM\"", - }, - }, - }) + badEntityResource := getBaseDeploymentResource() + badEntityResource.Port.Entity.Mappings[0].Identifier = "\"!@#\"" + f := newFixture(t, &fixtureConfig{resource: badEntityResource, existingObjects: []runtime.Object{ud}}) + f.runControllerInitialSync(&SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: false}) +} - f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) +func TestRunInitialSyncWithBadSelector(t *testing.T) { + d := newDeployment() + ud := newUnstructured(d) + badSelectorResource := getBaseDeploymentResource() + badSelectorResource.Selector.Query = "bad-jq" + f := newFixture(t, &fixtureConfig{resource: badSelectorResource, existingObjects: []runtime.Object{ud}}) f.runControllerInitialSync(&SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{}, ShouldDeleteStaleEntities: false}) } @@ -367,12 +374,11 @@ func TestRunEventsSyncWithCreateEvent(t *testing.T) { Blueprint: fmt.Sprintf("\"%s\"", blueprint), }, }) - f := newFixture(t, &fixtureConfig{stateKey: config.ApplicationConfig.StateKey, resource: resource, existingObjects: []runtime.Object{}}) + f.createObjects(t, []*unstructured.Unstructured{ud}) defer f.controller.portClient.DeleteEntity(context.Background(), id, blueprint, true) - waitForSync := f.runControllerEventsSync() - waitForSync() + f.runControllerEventsSync() assert.Eventually(t, func() bool { _, err := f.controller.portClient.ReadEntity(context.Background(), id, blueprint) @@ -380,6 +386,32 @@ func TestRunEventsSyncWithCreateEvent(t *testing.T) { }, time.Second*5, time.Millisecond*500) } +func TestRunEventsSyncWithUpdateEvent(t *testing.T) { + id := guuid.NewString() + resource := getBaseDeploymentResource() + resource.Port.Entity.Mappings[0].Identifier = fmt.Sprintf("\"%s\"", id) + resource.Port.Entity.Mappings[0].Properties["bool"] = ".spec.selector.matchLabels.app == \"new-label\"" + d := newDeployment() + ud := newUnstructured(d) + f := newFixture(t, &fixtureConfig{stateKey: config.ApplicationConfig.StateKey, resource: resource, existingObjects: []runtime.Object{ud}}) + + defer f.controller.portClient.DeleteEntity(context.Background(), id, blueprint, true) + f.runControllerInitialSync(&SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, id): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}) + assert.Eventually(t, func() bool { + entity, err := f.controller.portClient.ReadEntity(context.Background(), id, blueprint) + return err == nil && entity.Properties["bool"] == false + }, time.Second*5, time.Millisecond*500) + + d.Spec.Selector.MatchLabels["app"] = "new-label" + f.updateObjects(t, []*unstructured.Unstructured{newUnstructured(d)}) + f.runControllerEventsSync() + + assert.Eventually(t, func() bool { + entity, err := f.controller.portClient.ReadEntity(context.Background(), id, blueprint) + return err == nil && entity.Properties["bool"] == true + }, time.Second*5, time.Millisecond*500) +} + func TestRunEventsSyncWithDeleteEvent(t *testing.T) { d := newDeployment() ud := newUnstructured(d) @@ -404,26 +436,8 @@ func TestRunEventsSyncWithDeleteEvent(t *testing.T) { func TestCreateDeployment(t *testing.T) { d := newDeployment() ud := newUnstructured(d) - resource := newResource("", []port.EntityMapping{ - { - Identifier: ".metadata.name", - Blueprint: fmt.Sprintf("\"%s\"", blueprint), - Icon: "\"Microservice\"", - Team: "\"Test\"", - Properties: map[string]string{ - "text": "\"pod\"", - "num": "1", - "bool": "true", - "obj": ".spec.selector", - "arr": ".spec.template.spec.containers", - }, - Relations: map[string]interface{}{ - "k8s-relation": "\"e_AgPMYvq1tAs8TuqM\"", - }, - }, - }) + resource := getBaseDeploymentResource() item := EventItem{Key: getKey(d, t), ActionType: CreateAction} - f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) } @@ -432,38 +446,24 @@ func TestCreateDeploymentWithSearchRelation(t *testing.T) { d := newDeployment() ud := newUnstructured(d) item := EventItem{Key: getKey(d, t), ActionType: CreateAction} - resource := newResource("", []port.EntityMapping{ - { - Identifier: ".metadata.name", - Blueprint: fmt.Sprintf("\"%s\"", blueprint), - Icon: "\"Microservice\"", - Team: "\"Test\"", - Properties: map[string]string{ - "text": "\"pod\"", - "num": "1", - "bool": "true", - "obj": ".spec.selector", - "arr": ".spec.template.spec.containers", - }, - Relations: map[string]interface{}{ - "k8s-relation": map[string]interface{}{ - "combinator": "\"or\"", - "rules": []interface{}{ - map[string]interface{}{ - "property": "\"$identifier\"", - "operator": "\"=\"", - "value": "\"e_AgPMYvq1tAs8TuqM\"", - }, - map[string]interface{}{ - "property": "\"$identifier\"", - "operator": "\"=\"", - "value": ".metadata.name", - }, - }, + resource := getBaseDeploymentResource() + resource.Port.Entity.Mappings[0].Relations = map[string]interface{}{ + "k8s-relation": map[string]interface{}{ + "combinator": "\"or\"", + "rules": []interface{}{ + map[string]interface{}{ + "property": "\"$identifier\"", + "operator": "\"=\"", + "value": "\"e_AgPMYvq1tAs8TuqM\"", + }, + map[string]interface{}{ + "property": "\"$identifier\"", + "operator": "\"=\"", + "value": ".metadata.name", }, }, }, - }) + } f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) } @@ -471,26 +471,8 @@ func TestCreateDeploymentWithSearchRelation(t *testing.T) { func TestUpdateDeployment(t *testing.T) { d := newDeployment() ud := newUnstructured(d) - resource := newResource("", []port.EntityMapping{ - { - Identifier: ".metadata.name", - Blueprint: fmt.Sprintf("\"%s\"", blueprint), - Icon: "\"Microservice\"", - Team: "[\"Test\", \"Test2\"]", - Properties: map[string]string{ - "text": "\"pod\"", - "num": "1", - "bool": "true", - "obj": ".spec.selector", - "arr": ".spec.template.spec.containers", - }, - Relations: map[string]interface{}{ - "k8s-relation": "\"e_AgPMYvq1tAs8TuqM\"", - }, - }, - }) + resource := getBaseDeploymentResource() item := EventItem{Key: getKey(d, t), ActionType: UpdateAction} - f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) } @@ -506,11 +488,11 @@ func TestDeleteDeploymentSameOwner(t *testing.T) { }) createItem := EventItem{Key: getKey(d, t), ActionType: CreateAction} item := EventItem{Key: getKey(d, t), ActionType: DeleteAction} - f := newFixture(t, &fixtureConfig{stateKey: config.ApplicationConfig.StateKey, resource: resource, existingObjects: []runtime.Object{ud}}) - f.runControllerSyncHandler(createItem, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;entityWithSameOwner", blueprint): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) + f.runControllerSyncHandler(createItem, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;entityWithSameOwner", blueprint): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) + _, err := f.controller.portClient.ReadEntity(context.Background(), "entityWithSameOwner", blueprint) if err != nil && !strings.Contains(err.Error(), "was not found") { t.Errorf("expected entity to be deleted") @@ -528,11 +510,11 @@ func TestDeleteDeploymentDifferentOwner(t *testing.T) { }) createItem := EventItem{Key: getKey(d, t), ActionType: CreateAction} item := EventItem{Key: getKey(d, t), ActionType: DeleteAction} - f := newFixture(t, &fixtureConfig{stateKey: "non_exist_statekey", resource: resource, existingObjects: []runtime.Object{ud}}) - f.runControllerSyncHandler(createItem, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;entityWithDifferentOwner", blueprint): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) + f.runControllerSyncHandler(createItem, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;entityWithDifferentOwner", blueprint): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) + _, err := f.controller.portClient.ReadEntity(context.Background(), "entityWithDifferentOwner", blueprint) if err != nil && strings.Contains(err.Error(), "was not found") { t.Errorf("expected entity to exist") @@ -549,7 +531,6 @@ func TestSelectorQueryFilterDeployment(t *testing.T) { }, }) item := EventItem{Key: getKey(d, t), ActionType: DeleteAction} - f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{}, ShouldDeleteStaleEntities: true}, false) } @@ -564,7 +545,6 @@ func TestFailPortAuth(t *testing.T) { }, }) item := EventItem{Key: getKey(d, t), ActionType: CreateAction} - f := newFixture(t, &fixtureConfig{portClientId: "wrongclientid", portClientSecret: "wrongclientsecret", resource: resource, existingObjects: []runtime.Object{ud}}) f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: nil, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: false}, true) } @@ -579,7 +559,6 @@ func TestFailDeletePortEntity(t *testing.T) { }, }) item := EventItem{Key: getKey(d, t), ActionType: DeleteAction} - f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) } @@ -679,37 +658,45 @@ func TestUpdateHandlerWithIndividualPropertyChanges(t *testing.T) { } func TestCreateDeploymentWithSearchIdentifier(t *testing.T) { + id := guuid.NewString() + randTxt := guuid.NewString() d := newDeployment() ud := newUnstructured(d) + resource := getBaseDeploymentResource() + resource.Port.Entity.Mappings[0].Identifier = fmt.Sprintf("\"%s\"", id) + resource.Port.Entity.Mappings[0].Properties["text"] = fmt.Sprintf("\"%s\"", randTxt) + resource.Port.Entity.Mappings[0].Properties["bool"] = "true" item := EventItem{Key: getKey(d, t), ActionType: CreateAction} - resource := newResource("", []port.EntityMapping{ - { - Identifier: map[string]interface{}{ - "combinator": "\"and\"", - "rules": []interface{}{ - map[string]interface{}{ - "property": "\"text\"", - "operator": "\"=\"", - "value": "\"pod\"", - }, - }}, - Blueprint: fmt.Sprintf("\"%s\"", blueprint), - Icon: "\"Microservice\"", - Team: "\"Test\"", - Properties: map[string]string{ - "text": "\"pod\"", - "num": "1", - "bool": "true", - "obj": ".spec.selector", - "arr": ".spec.template.spec.containers", - }, - Relations: map[string]interface{}{ - "k8s-relation": "\"e_AgPMYvq1tAs8TuqM\"", - }, - }, - }) f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) - f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) + + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, id): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) + + entity, err := f.controller.portClient.ReadEntity(context.Background(), id, blueprint) + if err != nil { + t.Errorf("error reading entity: %v", err) + } + assert.True(t, entity.Properties["bool"] == true, fmt.Sprintf("expected bool to be true, got: %v", entity.Properties["bool"])) + + item = EventItem{Key: getKey(d, t), ActionType: UpdateAction} + resource.Port.Entity.Mappings[0].Identifier = map[string]interface{}{ + "combinator": "\"and\"", + "rules": []interface{}{ + map[string]interface{}{ + "property": "\"text\"", + "operator": "\"=\"", + "value": fmt.Sprintf("\"%s\"", randTxt), + }, + }} + resource.Port.Entity.Mappings[0].Properties["bool"] = "false" + f = newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) + + f.runControllerSyncHandler(item, &SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, id): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) + + entity, err = f.controller.portClient.ReadEntity(context.Background(), id, blueprint) + if err != nil { + t.Errorf("error reading entity: %v", err) + } + assert.True(t, entity.Properties["bool"] == false, fmt.Sprintf("expected bool to be false, got: %v", entity.Properties["bool"])) deleteItem := EventItem{Key: getKey(d, t), ActionType: DeleteAction} f.runControllerSyncHandler(deleteItem, &SyncResult{EntitiesSet: map[string]interface{}{}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}, false) diff --git a/pkg/port/cli/integration.go b/pkg/port/cli/integration.go index 3130a46..dad75f3 100644 --- a/pkg/port/cli/integration.go +++ b/pkg/port/cli/integration.go @@ -96,3 +96,17 @@ func (c *PortClient) PostIntegrationKindExample(stateKey string, kind string, ex } return nil } + +func (c *PortClient) GetIntegrationKinds(stateKey string) (map[string]port.IntegrationKind, error) { + pb := &port.IntegrationKindsResponse{} + resp, err := c.Client.R(). + SetResult(&pb). + Get(fmt.Sprintf("v1/integration/%s/kinds", stateKey)) + if err != nil { + return nil, err + } + if !pb.OK { + return nil, fmt.Errorf("failed to get integration kinds, got: %s", resp.Body()) + } + return pb.Data, nil +} diff --git a/pkg/port/models.go b/pkg/port/models.go index 82fdb09..8e2edff 100644 --- a/pkg/port/models.go +++ b/pkg/port/models.go @@ -38,6 +38,15 @@ type ( UpdatedAt *time.Time `json:"updatedAt,omitempty"` } + Example struct { + Id string `json:"_id,omitempty"` + Data map[string]any `json:"data,omitempty"` + } + + IntegrationKind struct { + Examples []Example `json:"examples"` + } + Property struct { Type string `json:"type,omitempty"` Title string `json:"title,omitempty"` @@ -220,9 +229,14 @@ type ResponseBody struct { Pages Page `json:"pages"` } +type IntegrationKindsResponse struct { + OK bool `json:"ok"` + Data map[string]IntegrationKind `json:"data"` +} + type EntityMapping struct { Identifier interface{} `json:"identifier" yaml:"identifier"` - Title string `json:"title" yaml:"title"` + Title string `json:"title,omitempty" yaml:"title,omitempty"` Blueprint string `json:"blueprint" yaml:"blueprint"` Icon string `json:"icon,omitempty" yaml:"icon,omitempty"` Team string `json:"team,omitempty" yaml:"team,omitempty"` @@ -232,7 +246,7 @@ type EntityMapping struct { type EntityRequest struct { Identifier interface{} `json:"identifier" yaml:"identifier"` - Title string `json:"title" yaml:"title"` + Title string `json:"title,omitempty" yaml:"title,omitempty"` Blueprint string `json:"blueprint" yaml:"blueprint"` Icon string `json:"icon,omitempty" yaml:"icon,omitempty"` Team interface{} `json:"team,omitempty" yaml:"team,omitempty"` From d9d931d31968cf0e17a757df5a8c69b0c978c0a3 Mon Sep 17 00:00:00 2001 From: talsabagport Date: Tue, 20 Aug 2024 17:45:32 +0300 Subject: [PATCH 09/13] Add tests --- pkg/handlers/controllers_test.go | 223 ++++++++++++++++++++++--------- pkg/k8s/controller_test.go | 35 ++--- 2 files changed, 182 insertions(+), 76 deletions(-) diff --git a/pkg/handlers/controllers_test.go b/pkg/handlers/controllers_test.go index 8b6b574..5719604 100644 --- a/pkg/handlers/controllers_test.go +++ b/pkg/handlers/controllers_test.go @@ -36,6 +36,7 @@ import ( var ( blueprint = "k8s-export-test-bp" deploymentKind = "apps/v1/deployments" + daemonSetKind = "apps/v1/daemonsets" ) type fixture struct { @@ -50,7 +51,7 @@ type fixtureConfig struct { portClientSecret string stateKey string sendRawDataExamples *bool - resource port.Resource + resources []port.Resource existingObjects []runtime.Object } @@ -97,7 +98,7 @@ func newFixture(t *testing.T, fixtureConfig *fixtureConfig) *fixture { DeleteDependents: true, CreateMissingRelatedEntities: true, SendRawDataExamples: sendRawDataExamples, - Resources: []port.Resource{fixtureConfig.resource}, + Resources: fixtureConfig.resources, } applicationConfig := &config.ApplicationConfiguration{ @@ -137,31 +138,55 @@ func newFixture(t *testing.T, fixtureConfig *fixtureConfig) *fixture { CreateMissingRelatedEntities: applicationConfig.CreateMissingRelatedEntities, } - gvr := getGvr(fixtureConfig.resource.Kind) + groups := make([]metav1.APIGroup, 0) + resourceMap := make(map[string]*resourceMapEntry) + + for _, resource := range integrationConfig.Resources { + gvr := getGvr(resource.Kind) + version := metav1.GroupVersionForDiscovery{Version: gvr.Version, GroupVersion: fmt.Sprintf("%s/%s", gvr.Group, gvr.Version)} + groupFound := false + for i, group := range groups { + if group.Name == gvr.Group { + groupFound = true + versionFound := false + for _, v := range group.Versions { + if v.Version == gvr.Version { + versionFound = true + break + } + } + if !versionFound { + groups[i].Versions = append(groups[i].Versions, version) + } + } + } + if !groupFound { + groups = append(groups, metav1.APIGroup{Name: gvr.Group, Versions: []metav1.GroupVersionForDiscovery{version}}) + } + resourceMapKey := fmt.Sprintf("%s/%s", gvr.Group, gvr.Version) + apiResource := metav1.APIResource{ + Name: gvr.Resource, + Namespaced: true, + Group: gvr.Group, + Version: gvr.Version, + } + if _, ok := resourceMap[resourceMapKey]; ok { + resourceMap[resourceMapKey].list.APIResources = append(resourceMap[resourceMapKey].list.APIResources, apiResource) + } else { + resourceMap[resourceMapKey] = &resourceMapEntry{ + list: &metav1.APIResourceList{ + GroupVersion: resourceMapKey, + APIResources: []metav1.APIResource{apiResource}, + }, + } + } + } fakeD := &fakeDiscovery{ groupList: &metav1.APIGroupList{ - Groups: []metav1.APIGroup{{ - Name: gvr.Group, - Versions: []metav1.GroupVersionForDiscovery{{ - GroupVersion: fmt.Sprintf("%s/%s", gvr.Group, gvr.Version), - Version: gvr.Version, - }}, - }}, - }, - resourceMap: map[string]*resourceMapEntry{ - fmt.Sprintf("%s/%s", gvr.Group, gvr.Version): { - list: &metav1.APIResourceList{ - GroupVersion: fmt.Sprintf("%s/%s", gvr.Group, gvr.Version), - APIResources: []metav1.APIResource{{ - Name: gvr.Resource, - Namespaced: true, - Group: gvr.Group, - Version: gvr.Version, - }}, - }, - }, + Groups: groups, }, + resourceMap: resourceMap, } kClient := fakeclientset.NewSimpleClientset() @@ -188,9 +213,9 @@ func newFixture(t *testing.T, fixtureConfig *fixtureConfig) *fixture { } } -func newResource(selectorQuery string, mappings []port.EntityMapping) port.Resource { +func newResource(selectorQuery string, mappings []port.EntityMapping, kind string) port.Resource { return port.Resource{ - Kind: deploymentKind, + Kind: kind, Selector: port.Selector{ Query: selectorQuery, }, @@ -236,6 +261,40 @@ func newDeployment() *appsv1.Deployment { } } +func newDaemonSet() *appsv1.DaemonSet { + labels := map[string]string{ + "app": "port-k8s-exporter", + } + return &appsv1.DaemonSet{ + TypeMeta: v1.TypeMeta{ + Kind: "DaemonSet", + APIVersion: "apps/v1", + }, + ObjectMeta: v1.ObjectMeta{ + Name: "port-k8s-exporter-ds", + Namespace: "port-k8s-exporter", + }, + Spec: appsv1.DaemonSetSpec{ + Selector: &v1.LabelSelector{ + MatchLabels: labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: v1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "port-k8s-exporter", + Image: "port-k8s-exporter:latest", + }, + }, + }, + }, + }, + } +} + func newUnstructured(obj interface{}) *unstructured.Unstructured { res, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj) if err != nil { @@ -247,6 +306,7 @@ func newUnstructured(obj interface{}) *unstructured.Unstructured { func newGvrToListKind() map[schema.GroupVersionResource]string { return map[schema.GroupVersionResource]string{ {Group: "apps", Version: "v1", Resource: "deployments"}: "DeploymentList", + {Group: "apps", Version: "v1", Resource: "daemonsets"}: "DaemonSetList", } } @@ -255,7 +315,7 @@ func getGvr(kind string) schema.GroupVersionResource { return schema.GroupVersionResource{Group: s[0], Version: s[1], Resource: s[2]} } -func getBaseDeploymentResource() port.Resource { +func getBaseResource(kind string) port.Resource { return newResource("", []port.EntityMapping{ { Identifier: ".metadata.name", @@ -273,7 +333,7 @@ func getBaseDeploymentResource() port.Resource { "k8s-relation": "\"e_AgPMYvq1tAs8TuqM\"", }, }, - }) + }, kind) } func (f *fixture) createObjects(objects []*unstructured.Unstructured, kind string) { @@ -312,22 +372,29 @@ func (f *fixture) deleteObjects(objects []struct{ kind, namespace, name string } } } -func (f *fixture) assertDeploymentHandled(d *appsv1.Deployment) { +func (f *fixture) assertObjectsHandled(objects []struct{ kind, name string }) { assert.Eventually(f.t, func() bool { integrationKinds, err := f.portClient.GetIntegrationKinds(f.controllersHandler.stateKey) if err != nil { return false } - examples := integrationKinds[deploymentKind].Examples - for _, example := range examples { - if example.Data["metadata"].(map[string]interface{})["name"] == d.GetName() { - return true + for _, obj := range objects { + examples := integrationKinds[obj.kind].Examples + found := false + for _, example := range examples { + if example.Data["metadata"].(map[string]interface{})["name"] == obj.name { + found = true + continue + } + } + if !found { + return false } } - return false - }, time.Second*5, time.Millisecond*500) + return true + }, time.Second*10, time.Millisecond*500) assert.Eventually(f.t, func() bool { entities, err := f.portClient.SearchEntities(context.Background(), port.SearchBody{ @@ -346,8 +413,21 @@ func (f *fixture) assertDeploymentHandled(d *appsv1.Deployment) { Combinator: "and", }) - return err == nil && len(entities) == 1 && entities[0].Identifier == d.GetName() - }, time.Second*5, time.Millisecond*500) + for _, obj := range objects { + found := false + for _, entity := range entities { + if entity.Identifier == obj.name { + found = true + continue + } + } + if !found { + return false + } + } + + return err == nil && len(entities) == len(objects) + }, time.Second*10, time.Millisecond*500) } func (f *fixture) runControllersHandle() { @@ -355,46 +435,67 @@ func (f *fixture) runControllersHandle() { } func TestSuccessfulControllersHandle(t *testing.T) { - id := guuid.NewString() - d := newDeployment() - d.Name = id - ud := newUnstructured(d) - resource := getBaseDeploymentResource() - f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{ud}}) + de := newDeployment() + de.Name = guuid.NewString() + da := newDaemonSet() + da.Name = guuid.NewString() + resources := []port.Resource{getBaseResource(deploymentKind), getBaseResource(daemonSetKind)} + f := newFixture(t, &fixtureConfig{resources: resources, existingObjects: []runtime.Object{newUnstructured(de), newUnstructured(da)}}) f.runControllersHandle() - f.assertDeploymentHandled(d) + f.assertObjectsHandled([]struct{ kind, name string }{{kind: deploymentKind, name: de.Name}, {kind: daemonSetKind, name: da.Name}}) + + nde := newDeployment() + nde.Name = guuid.NewString() + f.createObjects([]*unstructured.Unstructured{newUnstructured(nde)}, deploymentKind) - nid := guuid.NewString() - nd := newDeployment() - nd.Name = nid - f.createObjects([]*unstructured.Unstructured{newUnstructured(nd)}, deploymentKind) + nda := newDaemonSet() + nda.Name = guuid.NewString() + f.createObjects([]*unstructured.Unstructured{newUnstructured(nda)}, daemonSetKind) assert.Eventually(t, func() bool { - _, err := f.portClient.ReadEntity(context.Background(), nid, blueprint) - return err == nil - }, time.Second*5, time.Millisecond*500) + for _, eid := range []string{nde.Name, nda.Name} { + _, err := f.portClient.ReadEntity(context.Background(), eid, blueprint) + if err != nil { + return false + } + } + return true + }, time.Second*10, time.Millisecond*500) - nd.Spec.Selector.MatchLabels["app"] = "new-label" - f.updateObjects([]*unstructured.Unstructured{newUnstructured(nd)}, deploymentKind) + nde.Spec.Selector.MatchLabels["app"] = "new-label" + f.updateObjects([]*unstructured.Unstructured{newUnstructured(nde)}, deploymentKind) + da.Spec.Selector.MatchLabels["app"] = "new-label" + f.updateObjects([]*unstructured.Unstructured{newUnstructured(da)}, daemonSetKind) assert.Eventually(t, func() bool { - entity, err := f.portClient.ReadEntity(context.Background(), nid, blueprint) - return err == nil && entity.Properties["obj"].(map[string]interface{})["matchLabels"].(map[string]interface{})["app"] == nd.Spec.Selector.MatchLabels["app"] - }, time.Second*5, time.Millisecond*500) + entity, err := f.portClient.ReadEntity(context.Background(), nde.Name, blueprint) + if err != nil || entity.Properties["obj"].(map[string]interface{})["matchLabels"].(map[string]interface{})["app"] != nde.Spec.Selector.MatchLabels["app"] { + return false + } + entity, err = f.portClient.ReadEntity(context.Background(), da.Name, blueprint) + return err == nil && entity.Properties["obj"].(map[string]interface{})["matchLabels"].(map[string]interface{})["app"] == nde.Spec.Selector.MatchLabels["app"] + }, time.Second*10, time.Millisecond*500) - f.deleteObjects([]struct{ kind, namespace, name string }{{kind: deploymentKind, namespace: nd.Namespace, name: nd.Name}}) + f.deleteObjects([]struct{ kind, namespace, name string }{ + {kind: deploymentKind, namespace: de.Namespace, name: de.Name}, {kind: daemonSetKind, namespace: da.Namespace, name: da.Name}, + {kind: deploymentKind, namespace: nde.Namespace, name: nde.Name}, {kind: daemonSetKind, namespace: nda.Namespace, name: nda.Name}}) assert.Eventually(t, func() bool { - _, err := f.portClient.ReadEntity(context.Background(), nid, blueprint) - return err != nil && strings.Contains(err.Error(), "was not found") - }, time.Second*5, time.Millisecond*500) + for _, eid := range []string{de.Name, da.Name, nde.Name, nda.Name} { + _, err := f.portClient.ReadEntity(context.Background(), eid, blueprint) + if err == nil || !strings.Contains(err.Error(), "was not found") { + return false + } + } + return true + }, time.Second*10, time.Millisecond*500) } func TestControllersHandleTolerateFailure(t *testing.T) { - resource := getBaseDeploymentResource() - f := newFixture(t, &fixtureConfig{resource: resource, existingObjects: []runtime.Object{}}) + resources := []port.Resource{getBaseResource(deploymentKind)} + f := newFixture(t, &fixtureConfig{resources: resources, existingObjects: []runtime.Object{}}) f.runControllersHandle() diff --git a/pkg/k8s/controller_test.go b/pkg/k8s/controller_test.go index ba5ea19..7f3475f 100644 --- a/pkg/k8s/controller_test.go +++ b/pkg/k8s/controller_test.go @@ -248,47 +248,47 @@ func getBaseDeploymentResource() port.Resource { }) } -func (f *fixture) createObjects(t *testing.T, objects []*unstructured.Unstructured) { +func (f *fixture) createObjects(objects []*unstructured.Unstructured) { gvr := getGvr(f.controller.Resource.Kind) currentNumEventsInQueue := f.controller.eventsWorkqueue.Len() if objects != nil { for _, d := range objects { _, err := f.kubeClient.Resource(gvr).Namespace(d.GetNamespace()).Create(context.TODO(), d, metav1.CreateOptions{}) if err != nil { - t.Errorf("error creating object %s: %v", d.GetName(), err) + f.t.Errorf("error creating object %s: %v", d.GetName(), err) } } - assert.Eventually(t, func() bool { + assert.Eventually(f.t, func() bool { return f.controller.eventsWorkqueue.Len() == currentNumEventsInQueue+len(objects) }, time.Second*2, time.Millisecond*100) } } -func (f *fixture) updateObjects(t *testing.T, objects []*unstructured.Unstructured) { +func (f *fixture) updateObjects(objects []*unstructured.Unstructured) { gvr := getGvr(f.controller.Resource.Kind) currentNumEventsInQueue := f.controller.eventsWorkqueue.Len() if objects != nil { for _, d := range objects { _, err := f.kubeClient.Resource(gvr).Namespace(d.GetNamespace()).Update(context.TODO(), d, metav1.UpdateOptions{}) if err != nil { - t.Errorf("error updating object %s: %v", d.GetName(), err) + f.t.Errorf("error updating object %s: %v", d.GetName(), err) } } - assert.Eventually(t, func() bool { + assert.Eventually(f.t, func() bool { return f.controller.eventsWorkqueue.Len() == currentNumEventsInQueue+len(objects) }, time.Second*2, time.Millisecond*100) } } -func (f *fixture) deleteObjects(t *testing.T, objects []struct{ namespace, name string }) { +func (f *fixture) deleteObjects(objects []struct{ namespace, name string }) { gvr := getGvr(f.controller.Resource.Kind) if objects != nil { for _, d := range objects { err := f.kubeClient.Resource(gvr).Namespace(d.namespace).Delete(context.TODO(), d.name, metav1.DeleteOptions{}) if err != nil { - t.Errorf("error deleting object %s: %v", d.name, err) + f.t.Errorf("error deleting object %s: %v", d.name, err) } } } @@ -322,10 +322,15 @@ func (f *fixture) runControllerEventsSync() { } func TestSuccessfulRunInitialSync(t *testing.T) { - d := newDeployment() - ud := newUnstructured(d) - f := newFixture(t, &fixtureConfig{resource: getBaseDeploymentResource(), existingObjects: []runtime.Object{ud}}) - f.runControllerInitialSync(&SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, d.Name): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}) + ud1 := newUnstructured(newDeployment()) + ud1.SetName("deployment1") + ud2 := newUnstructured(newDeployment()) + ud2.SetName("deployment2") + f := newFixture(t, &fixtureConfig{resource: getBaseDeploymentResource(), existingObjects: []runtime.Object{ud1, ud2}}) + f.runControllerInitialSync(&SyncResult{ + EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, ud1.GetName()): nil, fmt.Sprintf("%s;%s", blueprint, ud2.GetName()): nil}, + RawDataExamples: []interface{}{ud1.Object, ud2.Object}, ShouldDeleteStaleEntities: true, + }) } func TestRunInitialSyncWithSelectorQuery(t *testing.T) { @@ -376,7 +381,7 @@ func TestRunEventsSyncWithCreateEvent(t *testing.T) { }) f := newFixture(t, &fixtureConfig{stateKey: config.ApplicationConfig.StateKey, resource: resource, existingObjects: []runtime.Object{}}) - f.createObjects(t, []*unstructured.Unstructured{ud}) + f.createObjects([]*unstructured.Unstructured{ud}) defer f.controller.portClient.DeleteEntity(context.Background(), id, blueprint, true) f.runControllerEventsSync() @@ -403,7 +408,7 @@ func TestRunEventsSyncWithUpdateEvent(t *testing.T) { }, time.Second*5, time.Millisecond*500) d.Spec.Selector.MatchLabels["app"] = "new-label" - f.updateObjects(t, []*unstructured.Unstructured{newUnstructured(d)}) + f.updateObjects([]*unstructured.Unstructured{newUnstructured(d)}) f.runControllerEventsSync() assert.Eventually(t, func() bool { @@ -424,7 +429,7 @@ func TestRunEventsSyncWithDeleteEvent(t *testing.T) { f := newFixture(t, &fixtureConfig{stateKey: config.ApplicationConfig.StateKey, resource: resource, existingObjects: []runtime.Object{ud}}) f.runControllerInitialSync(&SyncResult{EntitiesSet: map[string]interface{}{fmt.Sprintf("%s;%s", blueprint, "entityToBeDeleted"): nil}, RawDataExamples: []interface{}{ud.Object}, ShouldDeleteStaleEntities: true}) - f.deleteObjects(t, []struct{ namespace, name string }{{namespace: d.Namespace, name: d.Name}}) + f.deleteObjects([]struct{ namespace, name string }{{namespace: d.Namespace, name: d.Name}}) assert.Eventually(t, func() bool { _, err := f.controller.portClient.ReadEntity(context.Background(), "entityToBeDeleted", blueprint) From 6b2c8a36df3139a6d3bad4ecb0b043fc45ecd0c7 Mon Sep 17 00:00:00 2001 From: talsabagport Date: Tue, 20 Aug 2024 19:27:49 +0300 Subject: [PATCH 10/13] Fix test to use different stateKey to avoid conflicts --- pkg/handlers/controllers_test.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/pkg/handlers/controllers_test.go b/pkg/handlers/controllers_test.go index 5719604..56fdf01 100644 --- a/pkg/handlers/controllers_test.go +++ b/pkg/handlers/controllers_test.go @@ -101,6 +101,12 @@ func newFixture(t *testing.T, fixtureConfig *fixtureConfig) *fixture { Resources: fixtureConfig.resources, } + if fixtureConfig.stateKey != "" { + config.ApplicationConfig.StateKey = fixtureConfig.stateKey + } else { + config.ApplicationConfig.StateKey = "my-k8s-exporter" + } + applicationConfig := &config.ApplicationConfiguration{ ConfigFilePath: config.ApplicationConfig.ConfigFilePath, ResyncInterval: config.ApplicationConfig.ResyncInterval, @@ -123,9 +129,6 @@ func newFixture(t *testing.T, fixtureConfig *fixtureConfig) *fixture { if fixtureConfig.portClientSecret != "" { applicationConfig.PortClientSecret = fixtureConfig.portClientSecret } - if fixtureConfig.stateKey != "" { - applicationConfig.StateKey = fixtureConfig.stateKey - } exporterConfig := &port.Config{ StateKey: applicationConfig.StateKey, @@ -440,7 +443,8 @@ func TestSuccessfulControllersHandle(t *testing.T) { da := newDaemonSet() da.Name = guuid.NewString() resources := []port.Resource{getBaseResource(deploymentKind), getBaseResource(daemonSetKind)} - f := newFixture(t, &fixtureConfig{resources: resources, existingObjects: []runtime.Object{newUnstructured(de), newUnstructured(da)}}) + f := newFixture(t, &fixtureConfig{resources: resources, existingObjects: []runtime.Object{newUnstructured(de), newUnstructured(da)}, stateKey: guuid.NewString()}) + defer f.portClient.DeleteIntegration(f.controllersHandler.stateKey) f.runControllersHandle() From 924368a1e73b57d52862846a0f8fc7341c75c3ad Mon Sep 17 00:00:00 2001 From: talsabagport Date: Tue, 20 Aug 2024 19:42:34 +0300 Subject: [PATCH 11/13] add test for delete stale entity --- pkg/handlers/controllers_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/handlers/controllers_test.go b/pkg/handlers/controllers_test.go index 56fdf01..9629eaf 100644 --- a/pkg/handlers/controllers_test.go +++ b/pkg/handlers/controllers_test.go @@ -446,6 +446,9 @@ func TestSuccessfulControllersHandle(t *testing.T) { f := newFixture(t, &fixtureConfig{resources: resources, existingObjects: []runtime.Object{newUnstructured(de), newUnstructured(da)}, stateKey: guuid.NewString()}) defer f.portClient.DeleteIntegration(f.controllersHandler.stateKey) + // To test later that the delete stale entities is working + f.portClient.CreateEntity(context.Background(), &port.EntityRequest{Blueprint: blueprint, Identifier: guuid.NewString()}, "", false) + f.runControllersHandle() f.assertObjectsHandled([]struct{ kind, name string }{{kind: deploymentKind, name: de.Name}, {kind: daemonSetKind, name: da.Name}}) From 5ac76d103c351c743c11588c6d8b3abc15aa561e Mon Sep 17 00:00:00 2001 From: talsabagport Date: Mon, 26 Aug 2024 14:06:18 +0300 Subject: [PATCH 12/13] fix cr comments --- pkg/jq/parser.go | 2 ++ pkg/jq/parser_test.go | 31 +++++++++++++++++++++++++++++++ pkg/port/entity/entity.go | 2 ++ 3 files changed, 35 insertions(+) diff --git a/pkg/jq/parser.go b/pkg/jq/parser.go index d7e5b76..2e1dad6 100644 --- a/pkg/jq/parser.go +++ b/pkg/jq/parser.go @@ -152,6 +152,8 @@ func ParseMapRecursively(jqQueries map[string]interface{}, obj interface{}) (map relations[i] = relation[key] } mapInterface[key] = relations + } else { + return nil, fmt.Errorf("invalid jq query type '%T'", jqQuery) } } diff --git a/pkg/jq/parser_test.go b/pkg/jq/parser_test.go index eab5d40..f091b14 100644 --- a/pkg/jq/parser_test.go +++ b/pkg/jq/parser_test.go @@ -51,3 +51,34 @@ func TestJqSearchRelation(t *testing.T) { }) } + +func TestJqSearchIdentifier(t *testing.T) { + + mapping := []port.EntityMapping{ + { + Identifier: map[string]interface{}{ + "combinator": "\"and\"", + "rules": []interface{}{ + map[string]interface{}{ + "property": "\"prop1\"", + "operator": "\"in\"", + "value": ".values", + }, + }, + }, + Blueprint: fmt.Sprintf("\"%s\"", blueprint), + }, + } + res, _ := ParseMapRecursively(mapping[0].Identifier.(map[string]interface{}), map[string]interface{}{"values": []string{"val1", "val2"}}) + assert.Equal(t, res, map[string]interface{}{ + "combinator": "and", + "rules": []interface{}{ + map[string]interface{}{ + "property": "prop1", + "operator": "in", + "value": []string{"val1", "val2"}, + }, + }, + }) + +} diff --git a/pkg/port/entity/entity.go b/pkg/port/entity/entity.go index 9b150d8..f3ed326 100644 --- a/pkg/port/entity/entity.go +++ b/pkg/port/entity/entity.go @@ -87,6 +87,8 @@ func newEntityRequest(obj interface{}, mapping port.EntityMapping) (*port.Entity entity.Identifier, err = jq.ParseString(mapping.Identifier.(string), obj) } else if reflect.TypeOf(mapping.Identifier).Kind() == reflect.Map { entity.Identifier, err = jq.ParseMapRecursively(mapping.Identifier.(map[string]interface{}), obj) + } else { + return nil, fmt.Errorf("invalid identifier type '%T'", mapping.Identifier) } if err != nil { From a11424c3b60d3460ff68edaa4b4768f9381dc0f7 Mon Sep 17 00:00:00 2001 From: talsabagport Date: Wed, 28 Aug 2024 13:56:35 +0300 Subject: [PATCH 13/13] Fix small bugs --- pkg/event_handler/event_handler.go | 3 ++- pkg/event_handler/polling/polling.go | 6 ++---- pkg/handlers/controllers.go | 6 ++++++ pkg/handlers/controllers_test.go | 11 +++++++++++ 4 files changed, 21 insertions(+), 5 deletions(-) diff --git a/pkg/event_handler/event_handler.go b/pkg/event_handler/event_handler.go index ae4b41d..0f72413 100644 --- a/pkg/event_handler/event_handler.go +++ b/pkg/event_handler/event_handler.go @@ -2,6 +2,7 @@ package event_handler import ( "fmt" + "github.com/port-labs/port-k8s-exporter/pkg/handlers" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/klog/v2" ) @@ -22,7 +23,7 @@ func Start(eventListener IListener, initControllerHandler func() (IStoppableRsyn return eventListener.Run(func() { klog.Infof("Resync request received. Recreating controllers for the new port configuration") - if controllerHandler != nil { + if controllerHandler != (*handlers.ControllersHandler)(nil) { controllerHandler.Stop() } diff --git a/pkg/event_handler/polling/polling.go b/pkg/event_handler/polling/polling.go index 8a7bdc6..770d1a5 100644 --- a/pkg/event_handler/polling/polling.go +++ b/pkg/event_handler/polling/polling.go @@ -75,10 +75,8 @@ func (h *Handler) Run(resync func()) { klog.Infof("Polling event listener iteration after %d seconds. Checking for changes...", h.pollingRate) configuration, err := integration.GetIntegration(h.portClient, h.stateKey) if err != nil { - klog.Errorf("error resyncing: %s", err.Error()) - } - - if reflect.DeepEqual(currentState, configuration) != true { + klog.Errorf("error getting integration: %s", err.Error()) + } else if reflect.DeepEqual(currentState, configuration) != true { klog.Infof("Changes detected. Resyncing...") currentState = configuration resync() diff --git a/pkg/handlers/controllers.go b/pkg/handlers/controllers.go index f88a548..db2b722 100644 --- a/pkg/handlers/controllers.go +++ b/pkg/handlers/controllers.go @@ -23,6 +23,7 @@ type ControllersHandler struct { stateKey string portClient *cli.PortClient stopCh chan struct{} + isStopped bool } func NewControllersHandler(exporterConfig *port.Config, portConfig *port.IntegrationAppConfig, k8sClient *k8s.Client, portClient *cli.PortClient) *ControllersHandler { @@ -140,6 +141,11 @@ func (c *ControllersHandler) runDeleteStaleEntities(ctx context.Context, current } func (c *ControllersHandler) Stop() { + if c.isStopped { + return + } + klog.Info("Stopping controllers") close(c.stopCh) + c.isStopped = true } diff --git a/pkg/handlers/controllers_test.go b/pkg/handlers/controllers_test.go index 9629eaf..c8db339 100644 --- a/pkg/handlers/controllers_test.go +++ b/pkg/handlers/controllers_test.go @@ -527,3 +527,14 @@ func TestControllersHandleTolerateFailure(t *testing.T) { return err != nil && strings.Contains(err.Error(), "was not found") }, time.Second*5, time.Millisecond*500) } + +func TestControllersHandler_Stop(t *testing.T) { + resources := []port.Resource{getBaseResource(deploymentKind)} + f := newFixture(t, &fixtureConfig{resources: resources, existingObjects: []runtime.Object{}}) + + f.controllersHandler.Stop() + assert.True(t, f.controllersHandler.isStopped) + f.controllersHandler.Stop() + assert.True(t, f.controllersHandler.isStopped) + assert.Panics(t, func() { close(f.controllersHandler.stopCh) }) +}