Skip to content

Commit

Permalink
fix secret resolve
Browse files Browse the repository at this point in the history
  • Loading branch information
nirvanagit committed Jul 24, 2024
1 parent dfd790b commit c1b1284
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 117 deletions.
160 changes: 47 additions & 113 deletions admiral/pkg/controller/secret/secretcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,59 +20,52 @@ import (
"fmt"
"time"

"github.com/istio-ecosystem/admiral/admiral/pkg/client"
"github.com/istio-ecosystem/admiral/admiral/pkg/registry"
"github.com/istio-ecosystem/admiral/admiral/pkg/util"
idps_sdk "github.intuit.com/idps/idps-go-sdk/v3/idps-sdk"

"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"

"github.com/istio-ecosystem/admiral/admiral/pkg/controller/secret/resolver"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/rest"

corev1 "k8s.io/api/core/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/workqueue"
)

const (
maxRetries = 5
filterLabel = "admiral/sync"
maxRetries = 5
)

// LoadKubeConfig is a unit test override variable for loading the k8s config.
// DO NOT USE - TEST ONLY.
var LoadKubeConfig = clientcmd.Load

var remoteClustersMetric common.Gauge

// addSecretCallback prototype for the add secret callback function.
type addSecretCallback func(config *rest.Config, dataKey string, resyncPeriod util.ResyncIntervals) error
type addSecretCallback func(config *rest.Config, dataKey string, resyncPeriod time.Duration) error

// updateSecretCallback prototype for the update secret callback function.
type updateSecretCallback func(config *rest.Config, dataKey string, resyncPeriod util.ResyncIntervals) error
type updateSecretCallback func(config *rest.Config, dataKey string, resyncPeriod time.Duration) error

// removeSecretCallback prototype for the remove secret callback function.
type removeSecretCallback func(dataKey string) error

// Controller is the controller implementation for Secret resources
type Controller struct {
kubeclientset kubernetes.Interface
namespace string
Cs *ClusterStore
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
addCallback addSecretCallback
updateCallback updateSecretCallback
removeCallback removeSecretCallback
secretResolver resolver.SecretResolver
clusterShardStoreHandler registry.ClusterShardStore
kubeclientset kubernetes.Interface
namespace string
Cs *ClusterStore
queue workqueue.RateLimitingInterface
informer cache.SharedIndexInformer
addCallback addSecretCallback
updateCallback updateSecretCallback
removeCallback removeSecretCallback
secretResolver resolver.SecretResolver
}

// RemoteCluster defines cluster structZZ
Expand All @@ -93,12 +86,6 @@ func newClustersStore() *ClusterStore {
}
}

type IdpsSdkWrapper struct{}

func (c *IdpsSdkWrapper) IdpsClientInstanceFromMap(props map[string]string) (client.IdpsClientInterface, error) {
return idps_sdk.IdpsClientInstanceFromMap(props)
}

// NewController returns a new secret controller
func NewController(
kubeclientset kubernetes.Interface,
Expand All @@ -107,18 +94,17 @@ func NewController(
addCallback addSecretCallback,
updateCallback updateSecretCallback,
removeCallback removeSecretCallback,
admiralProfile string,
secretResolverConfig string) *Controller {
secretResolverType string) *Controller {

ctx := context.Background()
secretsInformer := cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(opts meta_v1.ListOptions) (runtime.Object, error) {
opts.LabelSelector = common.GetSecretFilterTags() + "=true"
opts.LabelSelector = filterLabel + "=true"
return kubeclientset.CoreV1().Secrets(namespace).List(ctx, opts)
},
WatchFunc: func(opts meta_v1.ListOptions) (watch.Interface, error) {
opts.LabelSelector = common.GetSecretFilterTags() + "=true"
opts.LabelSelector = filterLabel + "=true"
return kubeclientset.CoreV1().Secrets(namespace).Watch(ctx, opts)
},
},
Expand All @@ -129,16 +115,11 @@ func NewController(

var secretResolver resolver.SecretResolver
var err error

if admiralProfile == common.AdmiralProfileIntuit {
log.Info("Initializing Intuit secret resolver")
idpsClientProviderWrapper := &IdpsSdkWrapper{}
secretResolver, err = resolver.NewIDPSResolver(secretResolverConfig, idpsClientProviderWrapper)
} else if admiralProfile == common.AdmiralProfileDefault || admiralProfile == common.AdmiralProfilePerf {
if len(secretResolverType) == 0 {
log.Info("Initializing default secret resolver")
secretResolver, err = resolver.NewDefaultResolver()
} else {
err = fmt.Errorf("unrecognized secret resolver type %v specified", admiralProfile)
err = fmt.Errorf("unrecognized secret resolver type %v specified", secretResolverType)
}

if err != nil {
Expand All @@ -147,16 +128,15 @@ func NewController(
}

controller := &Controller{
kubeclientset: kubeclientset,
namespace: namespace,
Cs: cs,
informer: secretsInformer,
queue: queue,
addCallback: addCallback,
updateCallback: updateCallback,
removeCallback: removeCallback,
secretResolver: secretResolver,
clusterShardStoreHandler: registry.NewClusterShardStoreHandler(),
kubeclientset: kubeclientset,
namespace: namespace,
Cs: cs,
informer: secretsInformer,
queue: queue,
addCallback: addCallback,
updateCallback: updateCallback,
removeCallback: removeCallback,
secretResolver: secretResolver,
}

log.Info("Setting up event handlers")
Expand All @@ -183,17 +163,12 @@ func NewController(
}
},
})

remoteClustersMetric = common.NewGaugeFrom(common.ClustersMonitoredMetricName, "Gauge for the clusters monitored by Admiral")
return controller
}

// Run starts the controller until it receives a message over stopCh
func (c *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
if c == nil {
return
}
defer c.queue.ShutDown()

log.Info("Starting Secrets controller")
Expand All @@ -213,12 +188,16 @@ func (c *Controller) Run(stopCh <-chan struct{}) {

// StartSecretController creates the secret controller.
func StartSecretController(
ctx context.Context, k8s kubernetes.Interface, addCallback addSecretCallback,
updateCallback updateSecretCallback, removeCallback removeSecretCallback,
namespace, admiralProfile, secretResolverConfig string) (*Controller, error) {
ctx context.Context,
k8s kubernetes.Interface,
addCallback addSecretCallback,
updateCallback updateSecretCallback,
removeCallback removeSecretCallback,
namespace string,
secretResolverType string) (*Controller, error) {

clusterStore := newClustersStore()
controller := NewController(k8s, namespace, clusterStore, addCallback, updateCallback, removeCallback, admiralProfile, secretResolverConfig)
controller := NewController(k8s, namespace, clusterStore, addCallback, updateCallback, removeCallback, secretResolverType)

go controller.Run(ctx.Done())

Expand Down Expand Up @@ -310,10 +289,6 @@ func (c *Controller) createRemoteCluster(kubeConfig []byte, secretName string, c
}

func (c *Controller) addMemberCluster(secretName string, s *corev1.Secret) {
shard, err := getShardNameFromClusterSecret(s)
if err != nil {
log.Errorf("unable to find shard information from secret")
}
for clusterID, kubeConfig := range s.Data {
// clusterID must be unique even across multiple secrets
if prev, ok := c.Cs.RemoteClusters[clusterID]; !ok {
Expand All @@ -329,15 +304,11 @@ func (c *Controller) addMemberCluster(secretName string, s *corev1.Secret) {

c.Cs.RemoteClusters[clusterID] = remoteCluster

if err := c.addCallback(restConfig, clusterID, common.GetResyncIntervals()); err != nil {
if err := c.addCallback(restConfig, clusterID, common.GetAdmiralParams().CacheRefreshDuration); err != nil {
log.Errorf("error during secret loading for clusterID: %s %v", clusterID, err)
continue
}
err = c.addClusterToShard(clusterID, shard)
if err != nil {
log.Errorf("error adding cluster=%s to shard=%s", clusterID, shard)
continue
}

log.Infof("Secret loaded for cluster %s in the secret %s in namespace %s.", clusterID, c.Cs.RemoteClusters[clusterID].secretName, s.ObjectMeta.Namespace)

} else {
Expand All @@ -357,19 +328,14 @@ func (c *Controller) addMemberCluster(secretName string, s *corev1.Secret) {
}

c.Cs.RemoteClusters[clusterID] = remoteCluster
if err := c.updateCallback(restConfig, clusterID, common.GetResyncIntervals()); err != nil {
if err := c.updateCallback(restConfig, clusterID, common.GetAdmiralParams().CacheRefreshDuration); err != nil {
log.Errorf("Error updating cluster_id from secret=%v: %s %v",
clusterID, secretName, err)
}
err = c.addClusterToShard(clusterID, shard)
if err != nil {
log.Errorf("error adding cluster=%s to shard=%s", clusterID, shard)
continue
}
}
}

remoteClustersMetric.Set(float64(len(c.Cs.RemoteClusters)))
}
common.RemoteClustersMetric.Set(float64(len(c.Cs.RemoteClusters)))
log.Infof("Number of remote clusters: %d", len(c.Cs.RemoteClusters))
}

Expand All @@ -384,38 +350,6 @@ func (c *Controller) deleteMemberCluster(secretName string) {
delete(c.Cs.RemoteClusters, clusterID)
}
}
remoteClustersMetric.Set(float64(len(c.Cs.RemoteClusters)))
common.RemoteClustersMetric.Set(float64(len(c.Cs.RemoteClusters)))
log.Infof("Number of remote clusters: %d", len(c.Cs.RemoteClusters))
}

func getShardNameFromClusterSecret(secret *corev1.Secret) (string, error) {
if !common.IsAdmiralStateSyncerMode() {
return "", nil
}
if secret == nil {
return "", fmt.Errorf("nil secret passed")
}
annotation := secret.GetAnnotations()
if len(annotation) == 0 {
return "", fmt.Errorf("no annotations found on secret=%s", secret.GetName())
}
shard, ok := annotation[util.SecretShardKey]
if ok {
return shard, nil
}
return "", fmt.Errorf("shard not found")
}
func (c *Controller) addClusterToShard(cluster, shard string) error {
if !common.IsAdmiralStateSyncerMode() {
return nil
}
return c.clusterShardStoreHandler.AddClusterToShard(cluster, shard)
}

// TODO: invoke function in delete workflow
func (c *Controller) removeClusterFromShard(cluster, shard string) error {
if !common.IsAdmiralStateSyncerMode() {
return nil
}
return c.clusterShardStoreHandler.RemoveClusterFromShard(cluster, shard)
}
4 changes: 0 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ require (
github.com/jamiealquiza/tachymeter v2.0.0+incompatible
github.com/jedib0t/go-pretty/v6 v6.5.3
github.com/prometheus/common v0.53.0
github.intuit.com/idps/idps-go-sdk/v3 v3.9909.0
go.opentelemetry.io/otel v1.27.0
go.opentelemetry.io/otel/exporters/prometheus v0.49.0
go.opentelemetry.io/otel/metric v1.27.0
Expand Down Expand Up @@ -103,9 +102,6 @@ require (
github.com/stretchr/objx v0.5.2 // indirect
github.com/tevino/abool v1.2.0 // indirect
github.com/ugorji/go/codec v1.2.7 // indirect
github.intuit.com/idps/device-grant-flow/go/dgfsdk v0.0.0-20220428022612-cf054cda65f7 // indirect
github.intuit.com/idps/idps-go-commons/v3 v3.4.4 // indirect
github.intuit.com/idps/idps-go-swagger-clients v1.8.1 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/oauth2 v0.16.0 // indirect
Expand Down

0 comments on commit c1b1284

Please sign in to comment.