Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Map only identifier and blueprint in get entities for delete stale entities, and aggregate resources by kind #14

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,16 @@ type Config struct {
StateKey string
}

type KindConfig struct {
Selector Selector
Port Port
}

type AggregatedResource struct {
Kind string
KindConfigs []KindConfig
}

func New(filepath string, resyncInterval uint, stateKey string) (*Config, error) {
c := &Config{
ResyncInterval: resyncInterval,
Expand Down
19 changes: 15 additions & 4 deletions pkg/handlers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,29 @@ type ControllersHandler struct {
func NewControllersHandler(exporterConfig *config.Config, k8sClient *k8s.Client, portClient *cli.PortClient) *ControllersHandler {
resync := time.Minute * time.Duration(exporterConfig.ResyncInterval)
informersFactory := dynamicinformer.NewDynamicSharedInformerFactory(k8sClient.DynamicClient, resync)
controllers := make([]*k8s.Controller, 0, len(exporterConfig.Resources))

aggResources := make(map[string][]config.KindConfig)
for _, resource := range exporterConfig.Resources {
kindConfig := config.KindConfig{Selector: resource.Selector, Port: resource.Port}
if _, ok := aggResources[resource.Kind]; ok {
aggResources[resource.Kind] = append(aggResources[resource.Kind], kindConfig)
} else {
aggResources[resource.Kind] = []config.KindConfig{kindConfig}
}
}

controllers := make([]*k8s.Controller, 0, len(exporterConfig.Resources))

for kind, kindConfigs := range aggResources {
var gvr schema.GroupVersionResource
gvr, err := k8s.GetGVRFromResource(k8sClient.DiscoveryMapper, resource.Kind)
gvr, err := k8s.GetGVRFromResource(k8sClient.DiscoveryMapper, kind)
if err != nil {
klog.Errorf("Error getting GVR, skip handling for resource '%s': %s.", resource.Kind, err.Error())
klog.Errorf("Error getting GVR, skip handling for resource '%s': %s.", kind, err.Error())
continue
}

informer := informersFactory.ForResource(gvr)
controller := k8s.NewController(resource, portClient, informer)
controller := k8s.NewController(config.AggregatedResource{Kind: kind, KindConfigs: kindConfigs}, portClient, informer)
controllers = append(controllers, controller)
}

Expand Down
65 changes: 41 additions & 24 deletions pkg/k8s/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ type EventItem struct {
}

type Controller struct {
resource config.Resource
resource config.AggregatedResource
portClient *cli.PortClient
informer cache.SharedIndexInformer
lister cache.GenericLister
workqueue workqueue.RateLimitingInterface
}

func NewController(resource config.Resource, portClient *cli.PortClient, informer informers.GenericInformer) *Controller {
func NewController(resource config.AggregatedResource, portClient *cli.PortClient, informer informers.GenericInformer) *Controller {
controller := &Controller{
resource: resource,
portClient: portClient,
Expand Down Expand Up @@ -177,28 +177,35 @@ func (c *Controller) syncHandler(item EventItem) error {
}

func (c *Controller) objectHandler(obj interface{}, item EventItem) error {
portEntities, err := c.getObjectEntities(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error getting entities for object key '%s': %v", item.Key, err))
return nil
}

_, err = c.portClient.Authenticate(context.Background(), c.portClient.ClientID, c.portClient.ClientSecret)
_, err := c.portClient.Authenticate(context.Background(), c.portClient.ClientID, c.portClient.ClientSecret)
if err != nil {
return fmt.Errorf("error authenticating with Port: %v", err)
}

for _, portEntity := range portEntities {
err = c.entityHandler(portEntity, item.ActionType)
errors := make([]error, 0)
for _, kindConfig := range c.resource.KindConfigs {
portEntities, err := c.getObjectEntities(obj, kindConfig.Selector, kindConfig.Port.Entity.Mappings)
if err != nil {
return fmt.Errorf("error handling entity for object key '%s': %v", item.Key, err)
utilruntime.HandleError(fmt.Errorf("error getting entities for object key '%s': %v", item.Key, err))
continue
}

for _, portEntity := range portEntities {
err = c.entityHandler(portEntity, item.ActionType)
if err != nil {
errors = append(errors, err)
}
}
}

if len(errors) > 0 {
return fmt.Errorf("error handling entity for object key '%s': %v", item.Key, errors)
}

return nil
}

func (c *Controller) getObjectEntities(obj interface{}) ([]port.Entity, error) {
func (c *Controller) getObjectEntities(obj interface{}, selector config.Selector, mappings []port.EntityMapping) ([]port.Entity, error) {
unstructuredObj, ok := obj.(*unstructured.Unstructured)
if !ok {
return nil, fmt.Errorf("error casting to unstructured")
Expand All @@ -210,18 +217,18 @@ func (c *Controller) getObjectEntities(obj interface{}) ([]port.Entity, error) {
}

var selectorResult = true
if c.resource.Selector.Query != "" {
selectorResult, err = jq.ParseBool(c.resource.Selector.Query, structuredObj)
if selector.Query != "" {
selectorResult, err = jq.ParseBool(selector.Query, structuredObj)
if err != nil {
return nil, fmt.Errorf("invalid selector query '%s': %v", c.resource.Selector.Query, err)
return nil, fmt.Errorf("invalid selector query '%s': %v", selector.Query, err)
}
}
if !selectorResult {
return nil, nil
}

entities := make([]port.Entity, 0, len(c.resource.Port.Entity.Mappings))
for _, entityMapping := range c.resource.Port.Entity.Mappings {
entities := make([]port.Entity, 0, len(mappings))
for _, entityMapping := range mappings {
var portEntity *port.Entity
portEntity, err = mapping.NewEntity(structuredObj, entityMapping)
if err != nil {
Expand Down Expand Up @@ -258,13 +265,23 @@ func (c *Controller) GetEntitiesSet() (map[string]interface{}, error) {
if err != nil {
return nil, fmt.Errorf("error listing K8s objects of resource '%s': %v", c.resource.Kind, err)
}

for _, obj := range objects {
entities, err := c.getObjectEntities(obj)
if err != nil {
return nil, fmt.Errorf("error getting entities of object: %v", err)
}
for _, entity := range entities {
k8sEntitiesSet[c.portClient.GetEntityIdentifierKey(&entity)] = nil
for _, kindConfig := range c.resource.KindConfigs {
mappings := make([]port.EntityMapping, 0, len(kindConfig.Port.Entity.Mappings))
for _, m := range kindConfig.Port.Entity.Mappings {
mappings = append(mappings, port.EntityMapping{
Identifier: m.Identifier,
Blueprint: m.Blueprint,
})
}
entities, err := c.getObjectEntities(obj, kindConfig.Selector, mappings)
if err != nil {
return nil, fmt.Errorf("error getting entities of object: %v", err)
}
for _, entity := range entities {
k8sEntitiesSet[c.portClient.GetEntityIdentifierKey(&entity)] = nil
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/k8s/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ func newController(resource config.Resource, objects []runtime.Object, portClien
s := strings.SplitN(resource.Kind, "/", 3)
gvr := schema.GroupVersionResource{Group: s[0], Version: s[1], Resource: s[2]}
informer := k8sI.ForResource(gvr)
c := NewController(resource, portClient, informer)
kindConfig := config.KindConfig{Selector: resource.Selector, Port: resource.Port}
c := NewController(config.AggregatedResource{Kind: resource.Kind, KindConfigs: []config.KindConfig{kindConfig}}, portClient, informer)

for _, d := range objects {
informer.Informer().GetIndexer().Add(d)
Expand Down