diff --git a/pkg/config/config.go b/pkg/config/config.go index 86eeae1..9909dd7 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -31,6 +31,16 @@ type Config struct { StateKey string } +type KindConfig struct { + Selector Selector + Port Port +} + +type AggregatedResource struct { + Kind string + KindConfigs []KindConfig +} + func New(filepath string, resyncInterval uint, stateKey string) (*Config, error) { c := &Config{ ResyncInterval: resyncInterval, diff --git a/pkg/handlers/controllers.go b/pkg/handlers/controllers.go index cb97b45..57c8604 100644 --- a/pkg/handlers/controllers.go +++ b/pkg/handlers/controllers.go @@ -22,18 +22,29 @@ type ControllersHandler struct { func NewControllersHandler(exporterConfig *config.Config, k8sClient *k8s.Client, portClient *cli.PortClient) *ControllersHandler { resync := time.Minute * time.Duration(exporterConfig.ResyncInterval) informersFactory := dynamicinformer.NewDynamicSharedInformerFactory(k8sClient.DynamicClient, resync) - controllers := make([]*k8s.Controller, 0, len(exporterConfig.Resources)) + aggResources := make(map[string][]config.KindConfig) for _, resource := range exporterConfig.Resources { + kindConfig := config.KindConfig{Selector: resource.Selector, Port: resource.Port} + if _, ok := aggResources[resource.Kind]; ok { + aggResources[resource.Kind] = append(aggResources[resource.Kind], kindConfig) + } else { + aggResources[resource.Kind] = []config.KindConfig{kindConfig} + } + } + + controllers := make([]*k8s.Controller, 0, len(exporterConfig.Resources)) + + for kind, kindConfigs := range aggResources { var gvr schema.GroupVersionResource - gvr, err := k8s.GetGVRFromResource(k8sClient.DiscoveryMapper, resource.Kind) + gvr, err := k8s.GetGVRFromResource(k8sClient.DiscoveryMapper, kind) if err != nil { - klog.Errorf("Error getting GVR, skip handling for resource '%s': %s.", resource.Kind, err.Error()) + klog.Errorf("Error getting GVR, skip handling for resource '%s': %s.", kind, err.Error()) continue } informer := informersFactory.ForResource(gvr) - controller := k8s.NewController(resource, portClient, informer) + controller := k8s.NewController(config.AggregatedResource{Kind: kind, KindConfigs: kindConfigs}, portClient, informer) controllers = append(controllers, controller) } diff --git a/pkg/k8s/controller.go b/pkg/k8s/controller.go index 317a88f..fc6f20a 100644 --- a/pkg/k8s/controller.go +++ b/pkg/k8s/controller.go @@ -36,14 +36,14 @@ type EventItem struct { } type Controller struct { - resource config.Resource + resource config.AggregatedResource portClient *cli.PortClient informer cache.SharedIndexInformer lister cache.GenericLister workqueue workqueue.RateLimitingInterface } -func NewController(resource config.Resource, portClient *cli.PortClient, informer informers.GenericInformer) *Controller { +func NewController(resource config.AggregatedResource, portClient *cli.PortClient, informer informers.GenericInformer) *Controller { controller := &Controller{ resource: resource, portClient: portClient, @@ -177,28 +177,35 @@ func (c *Controller) syncHandler(item EventItem) error { } func (c *Controller) objectHandler(obj interface{}, item EventItem) error { - portEntities, err := c.getObjectEntities(obj) - if err != nil { - utilruntime.HandleError(fmt.Errorf("error getting entities for object key '%s': %v", item.Key, err)) - return nil - } - - _, err = c.portClient.Authenticate(context.Background(), c.portClient.ClientID, c.portClient.ClientSecret) + _, err := c.portClient.Authenticate(context.Background(), c.portClient.ClientID, c.portClient.ClientSecret) if err != nil { return fmt.Errorf("error authenticating with Port: %v", err) } - for _, portEntity := range portEntities { - err = c.entityHandler(portEntity, item.ActionType) + errors := make([]error, 0) + for _, kindConfig := range c.resource.KindConfigs { + portEntities, err := c.getObjectEntities(obj, kindConfig.Selector, kindConfig.Port.Entity.Mappings) if err != nil { - return fmt.Errorf("error handling entity for object key '%s': %v", item.Key, err) + utilruntime.HandleError(fmt.Errorf("error getting entities for object key '%s': %v", item.Key, err)) + continue + } + + for _, portEntity := range portEntities { + err = c.entityHandler(portEntity, item.ActionType) + if err != nil { + errors = append(errors, err) + } } } + if len(errors) > 0 { + return fmt.Errorf("error handling entity for object key '%s': %v", item.Key, errors) + } + return nil } -func (c *Controller) getObjectEntities(obj interface{}) ([]port.Entity, error) { +func (c *Controller) getObjectEntities(obj interface{}, selector config.Selector, mappings []port.EntityMapping) ([]port.Entity, error) { unstructuredObj, ok := obj.(*unstructured.Unstructured) if !ok { return nil, fmt.Errorf("error casting to unstructured") @@ -210,18 +217,18 @@ func (c *Controller) getObjectEntities(obj interface{}) ([]port.Entity, error) { } var selectorResult = true - if c.resource.Selector.Query != "" { - selectorResult, err = jq.ParseBool(c.resource.Selector.Query, structuredObj) + if selector.Query != "" { + selectorResult, err = jq.ParseBool(selector.Query, structuredObj) if err != nil { - return nil, fmt.Errorf("invalid selector query '%s': %v", c.resource.Selector.Query, err) + return nil, fmt.Errorf("invalid selector query '%s': %v", selector.Query, err) } } if !selectorResult { return nil, nil } - entities := make([]port.Entity, 0, len(c.resource.Port.Entity.Mappings)) - for _, entityMapping := range c.resource.Port.Entity.Mappings { + entities := make([]port.Entity, 0, len(mappings)) + for _, entityMapping := range mappings { var portEntity *port.Entity portEntity, err = mapping.NewEntity(structuredObj, entityMapping) if err != nil { @@ -258,13 +265,23 @@ func (c *Controller) GetEntitiesSet() (map[string]interface{}, error) { if err != nil { return nil, fmt.Errorf("error listing K8s objects of resource '%s': %v", c.resource.Kind, err) } + for _, obj := range objects { - entities, err := c.getObjectEntities(obj) - if err != nil { - return nil, fmt.Errorf("error getting entities of object: %v", err) - } - for _, entity := range entities { - k8sEntitiesSet[c.portClient.GetEntityIdentifierKey(&entity)] = nil + for _, kindConfig := range c.resource.KindConfigs { + mappings := make([]port.EntityMapping, 0, len(kindConfig.Port.Entity.Mappings)) + for _, m := range kindConfig.Port.Entity.Mappings { + mappings = append(mappings, port.EntityMapping{ + Identifier: m.Identifier, + Blueprint: m.Blueprint, + }) + } + entities, err := c.getObjectEntities(obj, kindConfig.Selector, mappings) + if err != nil { + return nil, fmt.Errorf("error getting entities of object: %v", err) + } + for _, entity := range entities { + k8sEntitiesSet[c.portClient.GetEntityIdentifierKey(&entity)] = nil + } } } diff --git a/pkg/k8s/controller_test.go b/pkg/k8s/controller_test.go index 0fa6771..b3f40e8 100644 --- a/pkg/k8s/controller_test.go +++ b/pkg/k8s/controller_test.go @@ -108,7 +108,8 @@ func newController(resource config.Resource, objects []runtime.Object, portClien s := strings.SplitN(resource.Kind, "/", 3) gvr := schema.GroupVersionResource{Group: s[0], Version: s[1], Resource: s[2]} informer := k8sI.ForResource(gvr) - c := NewController(resource, portClient, informer) + kindConfig := config.KindConfig{Selector: resource.Selector, Port: resource.Port} + c := NewController(config.AggregatedResource{Kind: resource.Kind, KindConfigs: []config.KindConfig{kindConfig}}, portClient, informer) for _, d := range objects { informer.Informer().GetIndexer().Add(d)