From 59bc66c8749aaa0d48fb4d3cdabb1f6b2ed8e6a9 Mon Sep 17 00:00:00 2001 From: Venkatreddy KP Date: Sat, 23 Mar 2024 13:58:37 +0530 Subject: [PATCH] register crossplane k8s watcher when resource available --- capten/agent/internal/app/app.go | 11 ------ .../internal/crossplane/cluster_claims.go | 18 ++++++++-- .../internal/crossplane/package_providers.go | 18 ++++++++-- capten/agent/internal/crossplane/watchers.go | 34 ++++++++++++++----- 4 files changed, 56 insertions(+), 25 deletions(-) diff --git a/capten/agent/internal/app/app.go b/capten/agent/internal/app/app.go index 280e649b..6f9fa5f9 100644 --- a/capten/agent/internal/app/app.go +++ b/capten/agent/internal/app/app.go @@ -129,17 +129,6 @@ func initializeJobScheduler(cfg *config.SericeConfig, as *captenstore.Store) (*j } } - if cfg.TektonSyncJobEnabled { - cs, err := job.NewTektonResourcesSync(log, cfg.TektonSyncJobInterval, as) - if err != nil { - log.Fatal("failed to init tekton resources sync job", err) - } - err = s.AddJob("tekton-resources-synch", cs) - if err != nil { - log.Fatal("failed to add tekton resources sync job", err) - } - } - log.Info("successfully initialized job scheduler") return s, nil } diff --git a/capten/agent/internal/crossplane/cluster_claims.go b/capten/agent/internal/crossplane/cluster_claims.go index ac733f61..9fb0cbcc 100644 --- a/capten/agent/internal/crossplane/cluster_claims.go +++ b/capten/agent/internal/crossplane/cluster_claims.go @@ -20,7 +20,6 @@ import ( vaultcred "github.com/kube-tarian/kad/capten/common-pkg/vault-cred" "github.com/kube-tarian/kad/capten/model" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/dynamic" ) const ( @@ -54,12 +53,25 @@ func NewClusterClaimSyncHandler(log logging.Logger, dbStore *captenstore.Store) return &ClusterClaimSyncHandler{log: log, dbStore: dbStore, tc: tc}, nil } -func registerK8SClusterClaimWatcher(log logging.Logger, dbStore *captenstore.Store, dynamicClient dynamic.Interface) error { +func registerK8SClusterClaimWatcher(log logging.Logger, dbStore *captenstore.Store, k8sClient *k8s.K8SClient) error { obj, err := NewClusterClaimSyncHandler(log, dbStore) if err != nil { return err } - return k8s.RegisterDynamicInformers(obj, dynamicClient, cgvk) + + log.Debugf("Registering resource %s wather", cgvk.String()) + _, err = k8sClient.Clientset.Discovery().ServerResourcesForGroupVersion(cgvk.GroupVersion().String()) + if err != nil { + log.Debugf("Resource %s not found: %v", cgvk.String(), err) + return fmt.Errorf("resource not found") + } + + err = k8s.RegisterDynamicInformers(obj, k8sClient.DynamicClientInterface, cgvk) + if err != nil { + return err + } + log.Infof("Resource %s wather registered", cgvk.String()) + return nil } func getClusterClaimObj(obj any) (*model.ClusterClaim, error) { diff --git a/capten/agent/internal/crossplane/package_providers.go b/capten/agent/internal/crossplane/package_providers.go index b4dc0b2f..53077e82 100644 --- a/capten/agent/internal/crossplane/package_providers.go +++ b/capten/agent/internal/crossplane/package_providers.go @@ -16,7 +16,6 @@ import ( "github.com/kube-tarian/kad/capten/common-pkg/k8s" "github.com/kube-tarian/kad/capten/model" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/dynamic" ) var ( @@ -40,12 +39,25 @@ func NewProvidersSyncHandler(log logging.Logger, dbStore *captenstore.Store) (*P return &ProvidersSyncHandler{log: log, dbStore: dbStore, tc: tc, activeProviders: map[string]bool{}}, nil } -func registerK8SProviderWatcher(log logging.Logger, dbStore *captenstore.Store, dynamicClient dynamic.Interface) error { +func registerK8SProviderWatcher(log logging.Logger, dbStore *captenstore.Store, k8sClient *k8s.K8SClient) error { provider, err := NewProvidersSyncHandler(log, dbStore) if err != nil { return err } - return k8s.RegisterDynamicInformers(provider, dynamicClient, pgvk) + + log.Debugf("Registering resource %s wather", pgvk.String()) + _, err = k8sClient.Clientset.Discovery().ServerResourcesForGroupVersion(pgvk.GroupVersion().String()) + if err != nil { + log.Debugf("Resource %s not found: %v\n", pgvk.String(), err) + return fmt.Errorf("resource not found") + } + + err = k8s.RegisterDynamicInformers(provider, k8sClient.DynamicClientInterface, pgvk) + if err != nil { + return err + } + log.Infof("Resource %s wather registered", pgvk.String()) + return nil } func getProviderObj(obj any) (*model.Provider, error) { diff --git a/capten/agent/internal/crossplane/watchers.go b/capten/agent/internal/crossplane/watchers.go index 6a1b91e5..444ec9a6 100644 --- a/capten/agent/internal/crossplane/watchers.go +++ b/capten/agent/internal/crossplane/watchers.go @@ -2,6 +2,7 @@ package crossplane import ( "fmt" + "time" "github.com/intelops/go-common/logging" captenstore "github.com/kube-tarian/kad/capten/agent/internal/capten-store" @@ -14,14 +15,31 @@ func RegisterK8SWatcher(log logging.Logger, dbStore *captenstore.Store) error { return fmt.Errorf("failed to initalize k8s client: %v", err) } - err = registerK8SClusterClaimWatcher(log, dbStore, k8sclient.DynamicClientInterface) - if err != nil { - return fmt.Errorf("failed to RegisterK8SClusterClaimWatcher: %v", err) - } + go retryForEver(60*time.Second, func() (err error) { + err = registerK8SClusterClaimWatcher(log, dbStore, k8sclient) + if err != nil { + return fmt.Errorf("failed to RegisterK8SClusterClaimWatcher: %v", err) + } + return nil + }) - err = registerK8SProviderWatcher(log, dbStore, k8sclient.DynamicClientInterface) - if err != nil { - return fmt.Errorf("failed to RegisterK8SProviderWatcher: %v", err) - } + go retryForEver(60*time.Second, func() (err error) { + err = registerK8SProviderWatcher(log, dbStore, k8sclient) + if err != nil { + return fmt.Errorf("failed to RegisterK8SProviderWatcher: %v", err) + } + return nil + }) return nil } + +func retryForEver(sleep time.Duration, f func() error) (err error) { + for i := 0; ; i++ { + err = f() + if err == nil { + return + } + time.Sleep(sleep) + } + return +}