Skip to content

Commit

Permalink
Merge pull request #435 from kube-tarian/k8s-watch-update
Browse files Browse the repository at this point in the history
register crossplane k8s watcher when resource available
  • Loading branch information
vramk23 authored Mar 23, 2024
2 parents 31c9397 + 59bc66c commit 95845b8
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 25 deletions.
11 changes: 0 additions & 11 deletions capten/agent/internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,17 +129,6 @@ func initializeJobScheduler(cfg *config.SericeConfig, as *captenstore.Store) (*j
}
}

if cfg.TektonSyncJobEnabled {
cs, err := job.NewTektonResourcesSync(log, cfg.TektonSyncJobInterval, as)
if err != nil {
log.Fatal("failed to init tekton resources sync job", err)
}
err = s.AddJob("tekton-resources-synch", cs)
if err != nil {
log.Fatal("failed to add tekton resources sync job", err)
}
}

log.Info("successfully initialized job scheduler")
return s, nil
}
Expand Down
18 changes: 15 additions & 3 deletions capten/agent/internal/crossplane/cluster_claims.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
vaultcred "github.com/kube-tarian/kad/capten/common-pkg/vault-cred"
"github.com/kube-tarian/kad/capten/model"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
)

const (
Expand Down Expand Up @@ -54,12 +53,25 @@ func NewClusterClaimSyncHandler(log logging.Logger, dbStore *captenstore.Store)
return &ClusterClaimSyncHandler{log: log, dbStore: dbStore, tc: tc}, nil
}

func registerK8SClusterClaimWatcher(log logging.Logger, dbStore *captenstore.Store, dynamicClient dynamic.Interface) error {
func registerK8SClusterClaimWatcher(log logging.Logger, dbStore *captenstore.Store, k8sClient *k8s.K8SClient) error {
obj, err := NewClusterClaimSyncHandler(log, dbStore)
if err != nil {
return err
}
return k8s.RegisterDynamicInformers(obj, dynamicClient, cgvk)

log.Debugf("Registering resource %s wather", cgvk.String())
_, err = k8sClient.Clientset.Discovery().ServerResourcesForGroupVersion(cgvk.GroupVersion().String())
if err != nil {
log.Debugf("Resource %s not found: %v", cgvk.String(), err)
return fmt.Errorf("resource not found")
}

err = k8s.RegisterDynamicInformers(obj, k8sClient.DynamicClientInterface, cgvk)
if err != nil {
return err
}
log.Infof("Resource %s wather registered", cgvk.String())
return nil
}

func getClusterClaimObj(obj any) (*model.ClusterClaim, error) {
Expand Down
18 changes: 15 additions & 3 deletions capten/agent/internal/crossplane/package_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/kube-tarian/kad/capten/common-pkg/k8s"
"github.com/kube-tarian/kad/capten/model"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
)

var (
Expand All @@ -40,12 +39,25 @@ func NewProvidersSyncHandler(log logging.Logger, dbStore *captenstore.Store) (*P
return &ProvidersSyncHandler{log: log, dbStore: dbStore, tc: tc, activeProviders: map[string]bool{}}, nil
}

func registerK8SProviderWatcher(log logging.Logger, dbStore *captenstore.Store, dynamicClient dynamic.Interface) error {
func registerK8SProviderWatcher(log logging.Logger, dbStore *captenstore.Store, k8sClient *k8s.K8SClient) error {
provider, err := NewProvidersSyncHandler(log, dbStore)
if err != nil {
return err
}
return k8s.RegisterDynamicInformers(provider, dynamicClient, pgvk)

log.Debugf("Registering resource %s wather", pgvk.String())
_, err = k8sClient.Clientset.Discovery().ServerResourcesForGroupVersion(pgvk.GroupVersion().String())
if err != nil {
log.Debugf("Resource %s not found: %v\n", pgvk.String(), err)
return fmt.Errorf("resource not found")
}

err = k8s.RegisterDynamicInformers(provider, k8sClient.DynamicClientInterface, pgvk)
if err != nil {
return err
}
log.Infof("Resource %s wather registered", pgvk.String())
return nil
}

func getProviderObj(obj any) (*model.Provider, error) {
Expand Down
34 changes: 26 additions & 8 deletions capten/agent/internal/crossplane/watchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package crossplane

import (
"fmt"
"time"

"github.com/intelops/go-common/logging"
captenstore "github.com/kube-tarian/kad/capten/agent/internal/capten-store"
Expand All @@ -14,14 +15,31 @@ func RegisterK8SWatcher(log logging.Logger, dbStore *captenstore.Store) error {
return fmt.Errorf("failed to initalize k8s client: %v", err)
}

err = registerK8SClusterClaimWatcher(log, dbStore, k8sclient.DynamicClientInterface)
if err != nil {
return fmt.Errorf("failed to RegisterK8SClusterClaimWatcher: %v", err)
}
go retryForEver(60*time.Second, func() (err error) {
err = registerK8SClusterClaimWatcher(log, dbStore, k8sclient)
if err != nil {
return fmt.Errorf("failed to RegisterK8SClusterClaimWatcher: %v", err)
}
return nil
})

err = registerK8SProviderWatcher(log, dbStore, k8sclient.DynamicClientInterface)
if err != nil {
return fmt.Errorf("failed to RegisterK8SProviderWatcher: %v", err)
}
go retryForEver(60*time.Second, func() (err error) {
err = registerK8SProviderWatcher(log, dbStore, k8sclient)
if err != nil {
return fmt.Errorf("failed to RegisterK8SProviderWatcher: %v", err)
}
return nil
})
return nil
}

func retryForEver(sleep time.Duration, f func() error) (err error) {
for i := 0; ; i++ {
err = f()
if err == nil {
return
}
time.Sleep(sleep)
}
return
}

0 comments on commit 95845b8

Please sign in to comment.