Skip to content

Commit

Permalink
Merge pull request #6955 from damikag/refactor-mig-fetch
Browse files Browse the repository at this point in the history
refactor gce.RegenerateMigInstancesCache() to use Instance.List
  • Loading branch information
k8s-ci-robot authored Jul 9, 2024
2 parents 60a8ec2 + 8971a29 commit 01e8918
Show file tree
Hide file tree
Showing 7 changed files with 653 additions and 52 deletions.
2 changes: 1 addition & 1 deletion cluster-autoscaler/cloudprovider/gce/gce_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func BuildGCE(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscover
defer config.Close()
}

manager, err := CreateGceManager(config, do, opts.GCEOptions.LocalSSDDiskSizeProvider, opts.Regional, opts.GCEOptions.ConcurrentRefreshes, opts.UserAgent, opts.GCEOptions.DomainUrl, opts.GCEOptions.MigInstancesMinRefreshWaitTime)
manager, err := CreateGceManager(config, do, opts.GCEOptions.LocalSSDDiskSizeProvider, opts.Regional, opts.GCEOptions.BulkMigInstancesListingEnabled, opts.GCEOptions.ConcurrentRefreshes, opts.UserAgent, opts.GCEOptions.DomainUrl, opts.GCEOptions.MigInstancesMinRefreshWaitTime)
if err != nil {
klog.Fatalf("Failed to create GCE Manager: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions cluster-autoscaler/cloudprovider/gce/gce_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ type gceManagerImpl struct {
// CreateGceManager constructs GceManager object.
func CreateGceManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGroupDiscoveryOptions,
localSSDDiskSizeProvider localssdsize.LocalSSDSizeProvider,
regional bool, concurrentGceRefreshes int, userAgent, domainUrl string, migInstancesMinRefreshWaitTime time.Duration) (GceManager, error) {
regional, bulkGceMigInstancesListingEnabled bool, concurrentGceRefreshes int, userAgent, domainUrl string, migInstancesMinRefreshWaitTime time.Duration) (GceManager, error) {
// Create Google Compute Engine token.
var err error
tokenSource := google.ComputeTokenSource("")
Expand Down Expand Up @@ -188,7 +188,7 @@ func CreateGceManager(configReader io.Reader, discoveryOpts cloudprovider.NodeGr
cache: cache,
GceService: gceService,
migLister: migLister,
migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, concurrentGceRefreshes, migInstancesMinRefreshWaitTime),
migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, concurrentGceRefreshes, migInstancesMinRefreshWaitTime, bulkGceMigInstancesListingEnabled),
location: location,
regional: regional,
projectId: projectId,
Expand Down
2 changes: 1 addition & 1 deletion cluster-autoscaler/cloudprovider/gce/gce_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ func newTestGceManager(t *testing.T, testServerURL string, regional bool) *gceMa
manager := &gceManagerImpl{
cache: cache,
migLister: migLister,
migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, 1, 0*time.Second),
migInfoProvider: NewCachingMigInfoProvider(cache, migLister, gceService, projectId, 1, 0*time.Second, false),
GceService: gceService,
projectId: projectId,
regional: regional,
Expand Down
151 changes: 134 additions & 17 deletions cluster-autoscaler/cloudprovider/gce/mig_info_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,16 @@ type timeProvider interface {
}

type cachingMigInfoProvider struct {
migInfoMutex sync.Mutex
cache *GceCache
migLister MigLister
gceClient AutoscalingGceClient
projectId string
concurrentGceRefreshes int
migInstanceMutex sync.Mutex
migInstancesMinRefreshWaitTime time.Duration
timeProvider timeProvider
migInfoMutex sync.Mutex
cache *GceCache
migLister MigLister
gceClient AutoscalingGceClient
projectId string
concurrentGceRefreshes int
migInstanceMutex sync.Mutex
migInstancesMinRefreshWaitTime time.Duration
timeProvider timeProvider
bulkGceMigInstancesListingEnabled bool
}

type realTime struct{}
Expand All @@ -80,15 +81,16 @@ func (r *realTime) Now() time.Time {
}

// NewCachingMigInfoProvider creates an instance of caching MigInfoProvider
func NewCachingMigInfoProvider(cache *GceCache, migLister MigLister, gceClient AutoscalingGceClient, projectId string, concurrentGceRefreshes int, migInstancesMinRefreshWaitTime time.Duration) MigInfoProvider {
func NewCachingMigInfoProvider(cache *GceCache, migLister MigLister, gceClient AutoscalingGceClient, projectId string, concurrentGceRefreshes int, migInstancesMinRefreshWaitTime time.Duration, bulkGceMigInstancesListingEnabled bool) MigInfoProvider {
return &cachingMigInfoProvider{
cache: cache,
migLister: migLister,
gceClient: gceClient,
projectId: projectId,
concurrentGceRefreshes: concurrentGceRefreshes,
migInstancesMinRefreshWaitTime: migInstancesMinRefreshWaitTime,
timeProvider: &realTime{},
cache: cache,
migLister: migLister,
gceClient: gceClient,
projectId: projectId,
concurrentGceRefreshes: concurrentGceRefreshes,
migInstancesMinRefreshWaitTime: migInstancesMinRefreshWaitTime,
timeProvider: &realTime{},
bulkGceMigInstancesListingEnabled: bulkGceMigInstancesListingEnabled,
}
}

Expand Down Expand Up @@ -151,6 +153,11 @@ func (c *cachingMigInfoProvider) getCachedMigForInstance(instanceRef GceRef) (Mi
func (c *cachingMigInfoProvider) RegenerateMigInstancesCache() error {
c.cache.InvalidateAllMigInstances()
c.cache.InvalidateAllInstancesToMig()

if c.bulkGceMigInstancesListingEnabled {
return c.bulkListMigInstances()
}

migs := c.migLister.GetMigs()
errors := make([]error, len(migs))
workqueue.ParallelizeUntil(context.Background(), c.concurrentGceRefreshes, len(migs), func(piece int) {
Expand All @@ -165,6 +172,116 @@ func (c *cachingMigInfoProvider) RegenerateMigInstancesCache() error {
return nil
}

func (c *cachingMigInfoProvider) bulkListMigInstances() error {
c.cache.InvalidateMigInstancesState()
err := c.fillMigInfoCache()
if err != nil {
return err
}
instances, listErr := c.listInstancesInAllZonesWithMigs()
migToInstances := groupInstancesToMigs(instances)
updateErr := c.updateMigInstancesCache(migToInstances)

if listErr != nil {
return listErr
}
return updateErr
}

func (c *cachingMigInfoProvider) listInstancesInAllZonesWithMigs() ([]GceInstance, error) {
var zones []string
for zone := range c.listAllZonesWithMigs() {
zones = append(zones, zone)
}
var allInstances []GceInstance
errors := make([]error, len(zones))
zoneInstances := make([][]GceInstance, len(zones))
workqueue.ParallelizeUntil(context.Background(), c.concurrentGceRefreshes, len(zones), func(piece int) {
zoneInstances[piece], errors[piece] = c.gceClient.FetchAllInstances(c.projectId, zones[piece], "")
}, workqueue.WithChunkSize(c.concurrentGceRefreshes))

for _, instances := range zoneInstances {
allInstances = append(allInstances, instances...)
}
for _, err := range errors {
if err != nil {
return allInstances, err
}
}
return allInstances, nil
}

func groupInstancesToMigs(instances []GceInstance) map[GceRef][]GceInstance {
migToInstances := map[GceRef][]GceInstance{}
for _, instance := range instances {
migToInstances[instance.Igm] = append(migToInstances[instance.Igm], instance)
}
return migToInstances
}

func (c *cachingMigInfoProvider) isMigInstancesConsistent(mig Mig, migToInstances map[GceRef][]GceInstance) bool {
migRef := mig.GceRef()
state, found := c.cache.GetMigInstancesState(migRef)
if !found {
return false
}
instanceCount := state[cloudprovider.InstanceRunning] + state[cloudprovider.InstanceCreating] + state[cloudprovider.InstanceDeleting]

instances, found := migToInstances[migRef]
if !found && instanceCount > 0 {
return false
}
return instanceCount == int64(len(instances))
}

func (c *cachingMigInfoProvider) isMigCreatingOrDeletingInstances(mig Mig) bool {
migRef := mig.GceRef()
state, found := c.cache.GetMigInstancesState(migRef)
if !found {
return false
}
return state[cloudprovider.InstanceCreating] > 0 || state[cloudprovider.InstanceDeleting] > 0
}

// updateMigInstancesCache updates the mig instances for each mig
func (c *cachingMigInfoProvider) updateMigInstancesCache(migToInstances map[GceRef][]GceInstance) error {
var errors []error
for _, mig := range c.migLister.GetMigs() {
migRef := mig.GceRef()
// If there is an inconsistency between number of instances according to instances.List
// and number of instances according to migInstancesStateCount for the given mig, which can be due to
// - abandoned instance
// - missing/malformed "created-by" reference
// we use an igm.ListInstances call as the authoritative source of instance information
if !c.isMigInstancesConsistent(mig, migToInstances) {
if err := c.fillMigInstances(migRef); err != nil {
errors = append(errors, err)
}
continue
}

// mig instances are re-fetched along with instance.Status.ErrorInfo for migs with
// instances in creating or deleting state
if c.isMigCreatingOrDeletingInstances(mig) {
if err := c.fillMigInstances(migRef); err != nil {
errors = append(errors, err)
}
continue
}

// for all other cases, mig instances cache is updated with the instances obtained from instance.List api
instances := migToInstances[migRef]
err := c.cache.SetMigInstances(migRef, instances, c.timeProvider.Now())
if err != nil {
errors = append(errors, err)
}
}
if len(errors) > 0 {
return errors[0]
}
return nil
}

func (c *cachingMigInfoProvider) findMigWithMatchingBasename(instanceRef GceRef) Mig {
for _, mig := range c.migLister.GetMigs() {
migRef := mig.GceRef()
Expand Down
Loading

0 comments on commit 01e8918

Please sign in to comment.