-
Notifications
You must be signed in to change notification settings - Fork 4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor gce.RegenerateMigInstancesCache() to use Instance.List #6955
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,15 +62,16 @@ type timeProvider interface { | |
} | ||
|
||
type cachingMigInfoProvider struct { | ||
migInfoMutex sync.Mutex | ||
cache *GceCache | ||
migLister MigLister | ||
gceClient AutoscalingGceClient | ||
projectId string | ||
concurrentGceRefreshes int | ||
migInstanceMutex sync.Mutex | ||
migInstancesMinRefreshWaitTime time.Duration | ||
timeProvider timeProvider | ||
migInfoMutex sync.Mutex | ||
cache *GceCache | ||
migLister MigLister | ||
gceClient AutoscalingGceClient | ||
projectId string | ||
concurrentGceRefreshes int | ||
migInstanceMutex sync.Mutex | ||
migInstancesMinRefreshWaitTime time.Duration | ||
timeProvider timeProvider | ||
bulkGceMigInstancesListingEnabled bool | ||
} | ||
|
||
type realTime struct{} | ||
|
@@ -80,15 +81,16 @@ func (r *realTime) Now() time.Time { | |
} | ||
|
||
// NewCachingMigInfoProvider creates an instance of caching MigInfoProvider | ||
func NewCachingMigInfoProvider(cache *GceCache, migLister MigLister, gceClient AutoscalingGceClient, projectId string, concurrentGceRefreshes int, migInstancesMinRefreshWaitTime time.Duration) MigInfoProvider { | ||
func NewCachingMigInfoProvider(cache *GceCache, migLister MigLister, gceClient AutoscalingGceClient, projectId string, concurrentGceRefreshes int, migInstancesMinRefreshWaitTime time.Duration, bulkGceMigInstancesListingEnabled bool) MigInfoProvider { | ||
return &cachingMigInfoProvider{ | ||
cache: cache, | ||
migLister: migLister, | ||
gceClient: gceClient, | ||
projectId: projectId, | ||
concurrentGceRefreshes: concurrentGceRefreshes, | ||
migInstancesMinRefreshWaitTime: migInstancesMinRefreshWaitTime, | ||
timeProvider: &realTime{}, | ||
cache: cache, | ||
migLister: migLister, | ||
gceClient: gceClient, | ||
projectId: projectId, | ||
concurrentGceRefreshes: concurrentGceRefreshes, | ||
migInstancesMinRefreshWaitTime: migInstancesMinRefreshWaitTime, | ||
timeProvider: &realTime{}, | ||
bulkGceMigInstancesListingEnabled: bulkGceMigInstancesListingEnabled, | ||
} | ||
} | ||
|
||
|
@@ -151,6 +153,11 @@ func (c *cachingMigInfoProvider) getCachedMigForInstance(instanceRef GceRef) (Mi | |
func (c *cachingMigInfoProvider) RegenerateMigInstancesCache() error { | ||
c.cache.InvalidateAllMigInstances() | ||
c.cache.InvalidateAllInstancesToMig() | ||
|
||
if c.bulkGceMigInstancesListingEnabled { | ||
return c.bulkListMigInstances() | ||
} | ||
|
||
migs := c.migLister.GetMigs() | ||
errors := make([]error, len(migs)) | ||
workqueue.ParallelizeUntil(context.Background(), c.concurrentGceRefreshes, len(migs), func(piece int) { | ||
|
@@ -165,6 +172,116 @@ func (c *cachingMigInfoProvider) RegenerateMigInstancesCache() error { | |
return nil | ||
} | ||
|
||
func (c *cachingMigInfoProvider) bulkListMigInstances() error { | ||
c.cache.InvalidateMigInstancesState() | ||
err := c.fillMigInfoCache() | ||
if err != nil { | ||
damikag marked this conversation as resolved.
Show resolved
Hide resolved
|
||
return err | ||
} | ||
instances, listErr := c.listInstancesInAllZonesWithMigs() | ||
migToInstances := groupInstancesToMigs(instances) | ||
updateErr := c.updateMigInstancesCache(migToInstances) | ||
|
||
if listErr != nil { | ||
return listErr | ||
} | ||
return updateErr | ||
} | ||
|
||
func (c *cachingMigInfoProvider) listInstancesInAllZonesWithMigs() ([]GceInstance, error) { | ||
var zones []string | ||
for zone := range c.listAllZonesWithMigs() { | ||
zones = append(zones, zone) | ||
} | ||
var allInstances []GceInstance | ||
errors := make([]error, len(zones)) | ||
zoneInstances := make([][]GceInstance, len(zones)) | ||
workqueue.ParallelizeUntil(context.Background(), c.concurrentGceRefreshes, len(zones), func(piece int) { | ||
zoneInstances[piece], errors[piece] = c.gceClient.FetchAllInstances(c.projectId, zones[piece], "") | ||
}, workqueue.WithChunkSize(c.concurrentGceRefreshes)) | ||
|
||
for _, instances := range zoneInstances { | ||
allInstances = append(allInstances, instances...) | ||
} | ||
for _, err := range errors { | ||
if err != nil { | ||
return allInstances, err | ||
} | ||
} | ||
return allInstances, nil | ||
} | ||
|
||
func groupInstancesToMigs(instances []GceInstance) map[GceRef][]GceInstance { | ||
migToInstances := map[GceRef][]GceInstance{} | ||
for _, instance := range instances { | ||
migToInstances[instance.Igm] = append(migToInstances[instance.Igm], instance) | ||
} | ||
return migToInstances | ||
} | ||
|
||
func (c *cachingMigInfoProvider) isMigInstancesConsistent(mig Mig, migToInstances map[GceRef][]GceInstance) bool { | ||
migRef := mig.GceRef() | ||
state, found := c.cache.GetMigInstancesState(migRef) | ||
if !found { | ||
return false | ||
} | ||
instanceCount := state[cloudprovider.InstanceRunning] + state[cloudprovider.InstanceCreating] + state[cloudprovider.InstanceDeleting] | ||
|
||
instances, found := migToInstances[migRef] | ||
if !found && instanceCount > 0 { | ||
return false | ||
} | ||
return instanceCount == int64(len(instances)) | ||
} | ||
|
||
func (c *cachingMigInfoProvider) isMigCreatingOrDeletingInstances(mig Mig) bool { | ||
migRef := mig.GceRef() | ||
state, found := c.cache.GetMigInstancesState(migRef) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I haven't thought about this before, but we need a better name for this. Maybe MigInstancesCount or MigInstancesStateCount? Because now, when I read below state[cloudprovider.InstanceCreating] it reads as state[state] and doesn't parse well in my head. It can be a separate PR though. |
||
if !found { | ||
return false | ||
} | ||
return state[cloudprovider.InstanceCreating] > 0 || state[cloudprovider.InstanceDeleting] > 0 | ||
} | ||
|
||
// updateMigInstancesCache updates the mig instances for each mig | ||
func (c *cachingMigInfoProvider) updateMigInstancesCache(migToInstances map[GceRef][]GceInstance) error { | ||
damikag marked this conversation as resolved.
Show resolved
Hide resolved
|
||
var errors []error | ||
for _, mig := range c.migLister.GetMigs() { | ||
migRef := mig.GceRef() | ||
// If there is an inconsistency between number of instances according to instances.List | ||
// and number of instances according to migInstancesStateCount for the given mig, which can be due to | ||
// - abandoned instance | ||
// - missing/malformed "created-by" reference | ||
// we use an igm.ListInstances call as the authoritative source of instance information | ||
if !c.isMigInstancesConsistent(mig, migToInstances) { | ||
if err := c.fillMigInstances(migRef); err != nil { | ||
errors = append(errors, err) | ||
} | ||
continue | ||
} | ||
|
||
// mig instances are re-fetched along with instance.Status.ErrorInfo for migs with | ||
// instances in creating or deleting state | ||
if c.isMigCreatingOrDeletingInstances(mig) { | ||
if err := c.fillMigInstances(migRef); err != nil { | ||
errors = append(errors, err) | ||
} | ||
continue | ||
} | ||
|
||
// for all other cases, mig instances cache is updated with the instances obtained from instance.List api | ||
instances := migToInstances[migRef] | ||
err := c.cache.SetMigInstances(migRef, instances, c.timeProvider.Now()) | ||
if err != nil { | ||
errors = append(errors, err) | ||
} | ||
} | ||
if len(errors) > 0 { | ||
return errors[0] | ||
} | ||
return nil | ||
} | ||
|
||
func (c *cachingMigInfoProvider) findMigWithMatchingBasename(instanceRef GceRef) Mig { | ||
for _, mig := range c.migLister.GetMigs() { | ||
migRef := mig.GceRef() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be skipped if bulkGceMigInstancesListingEnabled=true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. We are doing the same thing in different ways.