Skip to content

Commit

Permalink
mig_info_provider.go:fillMigInstances will now use locking when calli…
Browse files Browse the repository at this point in the history
…ng the gce api.

This is to avoid multiple gce calls for the same mig during scale down (which is done in parallel).
  • Loading branch information
jayantjain93 committed Feb 2, 2022
1 parent 0d52c16 commit 2145582
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 13 deletions.
2 changes: 1 addition & 1 deletion cluster-autoscaler/cloudprovider/gce/gce_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,7 @@ func TestGetMigForInstance(t *testing.T) {
buildListInstanceGroupManagersResponsePart(defaultPoolMigName, zoneB, 7),
buildListInstanceGroupManagersResponsePart(extraPoolMigName, zoneB, 8),
)).Once()
server.On("handle", "/projects/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/listManagedInstances").Return(buildFourRunningInstancesOnDefaultMigManagedInstancesResponse(zoneB)).Twice()
server.On("handle", "/projects/project1/zones/us-central1-b/instanceGroupManagers/gke-cluster-1-default-pool/listManagedInstances").Return(buildFourRunningInstancesOnDefaultMigManagedInstancesResponse(zoneB))
gceRef1 := GceRef{
Project: projectId,
Zone: zoneB,
Expand Down
38 changes: 26 additions & 12 deletions cluster-autoscaler/cloudprovider/gce/mig_info_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ type MigInfoProvider interface {
}

type cachingMigInfoProvider struct {
mutex sync.Mutex
migInfoMutex sync.Mutex
cache *GceCache
migLister MigLister
gceClient AutoscalingGceClient
projectId string
concurrentGceRefreshes int
migInstanceMutex sync.Mutex
}

// NewCachingMigInfoProvider creates an instance of caching MigInfoProvider
Expand Down Expand Up @@ -127,7 +128,7 @@ func (c *cachingMigInfoProvider) RegenerateMigInstancesCache() error {
migs := c.migLister.GetMigs()
errors := make([]error, len(migs))
workqueue.ParallelizeUntil(context.Background(), c.concurrentGceRefreshes, len(migs), func(piece int) {
errors[piece] = c.fillMigInstances(migs[piece].GceRef())
errors[piece] = c.fillMigInstancesNoLock(migs[piece].GceRef())
}, workqueue.WithChunkSize(c.concurrentGceRefreshes))

for _, err := range errors {
Expand All @@ -149,7 +150,7 @@ func (c *cachingMigInfoProvider) findMigWithMatchingBasename(instanceRef GceRef)
return nil
}

func (c *cachingMigInfoProvider) fillMigInstances(migRef GceRef) error {
func (c *cachingMigInfoProvider) fillMigInstancesNoLock(migRef GceRef) error {
klog.V(4).Infof("Regenerating MIG instances cache for %s", migRef.String())
instances, err := c.gceClient.FetchMigInstances(migRef)
if err != nil {
Expand All @@ -159,9 +160,22 @@ func (c *cachingMigInfoProvider) fillMigInstances(migRef GceRef) error {
return c.cache.SetMigInstances(migRef, instances)
}

// fillMigInstances is now using locks due to multiple parallel calls to the gce api
// for the same MIG during large scale down. This should only affect the unique number of
// migs and all other instances would pick up from cache, thereby saving networking
func (c *cachingMigInfoProvider) fillMigInstances(migRef GceRef) error {
c.migInstanceMutex.Lock()
defer c.migInstanceMutex.Unlock()
_, ok := c.cache.GetMigInstances(migRef)
if ok {
return nil
}
return c.fillMigInstancesNoLock(migRef)
}

func (c *cachingMigInfoProvider) GetMigTargetSize(migRef GceRef) (int64, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.migInfoMutex.Lock()
defer c.migInfoMutex.Unlock()

targetSize, found := c.cache.GetMigTargetSize(migRef)
if found {
Expand All @@ -185,8 +199,8 @@ func (c *cachingMigInfoProvider) GetMigTargetSize(migRef GceRef) (int64, error)
}

func (c *cachingMigInfoProvider) GetMigBasename(migRef GceRef) (string, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.migInfoMutex.Lock()
defer c.migInfoMutex.Unlock()

basename, found := c.cache.GetMigBasename(migRef)
if found {
Expand All @@ -210,8 +224,8 @@ func (c *cachingMigInfoProvider) GetMigBasename(migRef GceRef) (string, error) {
}

func (c *cachingMigInfoProvider) GetMigInstanceTemplateName(migRef GceRef) (string, error) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.migInfoMutex.Lock()
defer c.migInfoMutex.Unlock()

templateName, found := c.cache.GetMigInstanceTemplateName(migRef)
if found {
Expand Down Expand Up @@ -240,8 +254,8 @@ func (c *cachingMigInfoProvider) GetMigInstanceTemplate(migRef GceRef) (*gce.Ins
return nil, err
}

c.mutex.Lock()
defer c.mutex.Unlock()
c.migInfoMutex.Lock()
defer c.migInfoMutex.Unlock()

template, found := c.cache.GetMigInstanceTemplate(migRef)
if found && template.Name == templateName {
Expand All @@ -257,7 +271,7 @@ func (c *cachingMigInfoProvider) GetMigInstanceTemplate(migRef GceRef) (*gce.Ins
return template, nil
}

// filMigInfoCache needs to be called with mutex locked
// filMigInfoCache needs to be called with migInfoMutex locked
func (c *cachingMigInfoProvider) fillMigInfoCache() error {
var zones []string
for zone := range c.listAllZonesWithMigs() {
Expand Down

0 comments on commit 2145582

Please sign in to comment.