Skip to content

Commit

Permalink
refactor: cache virtual project calculation to improve performance (a…
Browse files Browse the repository at this point in the history
…rgoproj#7274)

Signed-off-by: Alexander Matyushentsev <[email protected]>
Signed-off-by: viktorplakida <[email protected]>
  • Loading branch information
Alexander Matyushentsev authored and plakyda1 committed Sep 28, 2021
1 parent 341e25a commit 7b90030
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,14 @@ func NewCommand() *cobra.Command {
errors.CheckError(err)
cache.Cache.SetClient(cacheutil.NewTwoLevelClient(cache.Cache.GetClient(), 10*time.Minute))

settingsMgr := settings.NewSettingsManager(ctx, kubeClient, namespace)
var appController *controller.ApplicationController

settingsMgr := settings.NewSettingsManager(ctx, kubeClient, namespace, settings.WithRepoOrClusterChangedHandler(func() {
appController.InvalidateProjectsCache()
}))
kubectl := kubeutil.NewKubectl()
clusterFilter := getClusterFilter()
appController, err := controller.NewApplicationController(
appController, err = controller.NewApplicationController(
namespace,
settingsMgr,
kubeClient,
Expand Down
41 changes: 40 additions & 1 deletion controller/appcontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ type ApplicationController struct {
metricsServer *metrics.MetricsServer
kubectlSemaphore *semaphore.Weighted
clusterFilter func(cluster *appv1.Cluster) bool
projByNameCache sync.Map
}

// NewApplicationController creates new instance of ApplicationController.
Expand Down Expand Up @@ -151,6 +152,7 @@ func NewApplicationController(
settingsMgr: settingsMgr,
selfHealTimeout: selfHealTimeout,
clusterFilter: clusterFilter,
projByNameCache: sync.Map{},
}
if kubectlParallelismLimit > 0 {
ctrl.kubectlSemaphore = semaphore.NewWeighted(kubectlParallelismLimit)
Expand All @@ -163,16 +165,19 @@ func NewApplicationController(
AddFunc: func(obj interface{}) {
if key, err := cache.MetaNamespaceKeyFunc(obj); err == nil {
ctrl.projectRefreshQueue.Add(key)
ctrl.InvalidateProjectsCache()
}
},
UpdateFunc: func(old, new interface{}) {
if key, err := cache.MetaNamespaceKeyFunc(new); err == nil {
ctrl.projectRefreshQueue.Add(key)
ctrl.InvalidateProjectsCache()
}
},
DeleteFunc: func(obj interface{}) {
if key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj); err == nil {
ctrl.projectRefreshQueue.Add(key)
ctrl.InvalidateProjectsCache()
}
},
})
Expand Down Expand Up @@ -201,6 +206,13 @@ func NewApplicationController(
return &ctrl, nil
}

func (ctrl *ApplicationController) InvalidateProjectsCache() {
ctrl.projByNameCache.Range(func(key, _ interface{}) bool {
ctrl.projByNameCache.Delete(key)
return true
})
}

func (ctrl *ApplicationController) GetMetricsServer() *metrics.MetricsServer {
return ctrl.metricsServer
}
Expand Down Expand Up @@ -230,8 +242,35 @@ func isSelfReferencedApp(app *appv1.Application, ref v1.ObjectReference) bool {
gvk.Kind == application.ApplicationKind
}

func (ctrl *ApplicationController) newAppProjCache(name string) *appProjCache {
return &appProjCache{name: name, ctrl: ctrl}
}

type appProjCache struct {
name string
ctrl *ApplicationController

lock sync.Mutex
appProj *appv1.AppProject
}

func (projCache *appProjCache) GetAppProject(ctx context.Context) (*appv1.AppProject, error) {
projCache.lock.Lock()
defer projCache.lock.Unlock()
if projCache.appProj != nil {
return projCache.appProj, nil
}
proj, err := argo.GetAppProjectByName(projCache.name, applisters.NewAppProjectLister(projCache.ctrl.projInformer.GetIndexer()), projCache.ctrl.namespace, projCache.ctrl.settingsMgr, projCache.ctrl.db, ctx)
if err != nil {
return nil, err
}
projCache.appProj = proj
return projCache.appProj, nil
}

func (ctrl *ApplicationController) getAppProj(app *appv1.Application) (*appv1.AppProject, error) {
return argo.GetAppProject(&app.Spec, applisters.NewAppProjectLister(ctrl.projInformer.GetIndexer()), ctrl.namespace, ctrl.settingsMgr, ctrl.db, context.TODO())
projCache, _ := ctrl.projByNameCache.LoadOrStore(app.Spec.GetProject(), ctrl.newAppProjCache(app.Spec.GetProject()))
return projCache.(*appProjCache).GetAppProject(context.TODO())
}

func (ctrl *ApplicationController) handleObjectUpdated(managedByApp map[string]bool, ref v1.ObjectReference) {
Expand Down
35 changes: 30 additions & 5 deletions util/settings/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,10 +325,11 @@ type SettingsManager struct {
// subscribers is a list of subscribers to settings updates
subscribers []chan<- *ArgoCDSettings
// mutex protects concurrency sensitive parts of settings manager: access to subscribers list and initialization flag
mutex *sync.Mutex
initContextCancel func()
reposCache []Repository
repoCredsCache []RepositoryCredentials
mutex *sync.Mutex
initContextCancel func()
reposCache []Repository
repoCredsCache []RepositoryCredentials
reposOrClusterChanged func()
}

type incompleteSettingsError struct {
Expand Down Expand Up @@ -357,6 +358,12 @@ func (e *incompleteSettingsError) Error() string {
return e.message
}

func (mgr *SettingsManager) onRepoOrClusterChanged() {
if mgr.reposOrClusterChanged != nil {
go mgr.reposOrClusterChanged()
}
}

func (mgr *SettingsManager) GetSecretsLister() (v1listers.SecretLister, error) {
err := mgr.ensureSynced(false)
if err != nil {
Expand Down Expand Up @@ -958,6 +965,13 @@ func (mgr *SettingsManager) initialize(ctx context.Context) error {
eventHandler := cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
mgr.invalidateCache()
mgr.onRepoOrClusterChanged()
},
AddFunc: func(obj interface{}) {
mgr.onRepoOrClusterChanged()
},
DeleteFunc: func(obj interface{}) {
mgr.onRepoOrClusterChanged()
},
}
indexers := cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}
Expand Down Expand Up @@ -1300,15 +1314,26 @@ func (mgr *SettingsManager) SaveGPGPublicKeyData(ctx context.Context, gpgPublicK

}

type SettingsManagerOpts func(mgs *SettingsManager)

func WithRepoOrClusterChangedHandler(handler func()) SettingsManagerOpts {
return func(mgr *SettingsManager) {
mgr.reposOrClusterChanged = handler
}
}

// NewSettingsManager generates a new SettingsManager pointer and returns it
func NewSettingsManager(ctx context.Context, clientset kubernetes.Interface, namespace string) *SettingsManager {
func NewSettingsManager(ctx context.Context, clientset kubernetes.Interface, namespace string, opts ...SettingsManagerOpts) *SettingsManager {

mgr := &SettingsManager{
ctx: ctx,
clientset: clientset,
namespace: namespace,
mutex: &sync.Mutex{},
}
for i := range opts {
opts[i](mgr)
}

return mgr
}
Expand Down

0 comments on commit 7b90030

Please sign in to comment.