Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GCE] Support paginated instance listing #6376

Merged
merged 1 commit into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 77 additions & 47 deletions cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,69 +308,99 @@ func (client *autoscalingGceClientV1) DeleteInstances(migRef GceRef, instances [

func (client *autoscalingGceClientV1) FetchMigInstances(migRef GceRef) ([]cloudprovider.Instance, error) {
registerRequest("instance_group_managers", "list_managed_instances")
gceInstances, err := client.gceService.InstanceGroupManagers.ListManagedInstances(migRef.Project, migRef.Zone, migRef.Name).Do()
b := newInstanceListBuilder(migRef)
err := client.gceService.InstanceGroupManagers.ListManagedInstances(migRef.Project, migRef.Zone, migRef.Name).Pages(context.Background(), b.loadPage)
if err != nil {
klog.V(4).Infof("Failed MIG info request for %s %s %s: %v", migRef.Project, migRef.Zone, migRef.Name, err)
return nil, err
}
infos := []cloudprovider.Instance{}
errorCodeCounts := make(map[string]int)
errorLoggingQuota := klogx.NewLoggingQuota(100)
for _, gceInstance := range gceInstances.ManagedInstances {
return b.build(), nil
}

type instanceListBuilder struct {
migRef GceRef
errorCodeCounts map[string]int
errorLoggingQuota *klogx.Quota
infos []cloudprovider.Instance
}

func newInstanceListBuilder(migRef GceRef) *instanceListBuilder {
return &instanceListBuilder{
migRef: migRef,
errorCodeCounts: make(map[string]int),
errorLoggingQuota: klogx.NewLoggingQuota(100),
}
}

func (i *instanceListBuilder) loadPage(page *gce.InstanceGroupManagersListManagedInstancesResponse) error {
if i.infos == nil {
i.infos = make([]cloudprovider.Instance, 0, len(page.ManagedInstances))
}
for _, gceInstance := range page.ManagedInstances {
ref, err := ParseInstanceUrlRef(gceInstance.Instance)
if err != nil {
klog.Errorf("Received error while parsing of the instance url: %v", err)
continue
}
instance := i.gceInstanceToInstance(ref, gceInstance)
i.infos = append(i.infos, instance)
}
return nil
}

instance := cloudprovider.Instance{
Id: ref.ToProviderId(),
Status: &cloudprovider.InstanceStatus{
State: getInstanceState(gceInstance.CurrentAction),
},
}
func (i *instanceListBuilder) gceInstanceToInstance(ref GceRef, gceInstance *gce.ManagedInstance) cloudprovider.Instance {
instance := cloudprovider.Instance{
Id: ref.ToProviderId(),
Status: &cloudprovider.InstanceStatus{
State: getInstanceState(gceInstance.CurrentAction),
},
}

if instance.Status.State == cloudprovider.InstanceCreating {
var errorInfo *cloudprovider.InstanceErrorInfo
errorMessages := []string{}
lastAttemptErrors := getLastAttemptErrors(gceInstance)
for _, instanceError := range lastAttemptErrors {
errorCodeCounts[instanceError.Code]++
if newErrorInfo := GetErrorInfo(instanceError.Code, instanceError.Message, gceInstance.InstanceStatus, errorInfo); newErrorInfo != nil {
// override older error
errorInfo = newErrorInfo
} else {
// no error
continue
}
if instance.Status.State != cloudprovider.InstanceCreating {
return instance
}

if instanceError.Message != "" {
errorMessages = append(errorMessages, instanceError.Message)
}
}
if errorInfo != nil {
errorInfo.ErrorMessage = strings.Join(errorMessages, "; ")
instance.Status.ErrorInfo = errorInfo
}
var errorInfo *cloudprovider.InstanceErrorInfo
errorMessages := []string{}
lastAttemptErrors := getLastAttemptErrors(gceInstance)
for _, instanceError := range lastAttemptErrors {
i.errorCodeCounts[instanceError.Code]++
if newErrorInfo := GetErrorInfo(instanceError.Code, instanceError.Message, gceInstance.InstanceStatus, errorInfo); newErrorInfo != nil {
// override older error
errorInfo = newErrorInfo
} else {
// no error
continue
}
if instanceError.Message != "" {
errorMessages = append(errorMessages, instanceError.Message)
}
}
if errorInfo != nil {
errorInfo.ErrorMessage = strings.Join(errorMessages, "; ")
instance.Status.ErrorInfo = errorInfo
}

if len(lastAttemptErrors) > 0 {
gceInstanceJSONBytes, err := gceInstance.MarshalJSON()
var gceInstanceJSON string
if err != nil {
gceInstanceJSON = fmt.Sprintf("Got error from MarshalJSON; %v", err)
} else {
gceInstanceJSON = string(gceInstanceJSONBytes)
}
klogx.V(4).UpTo(errorLoggingQuota).Infof("Got GCE instance which is being created and has lastAttemptErrors; gceInstance=%v; errorInfo=%#v", gceInstanceJSON, errorInfo)
}
if len(lastAttemptErrors) > 0 {
gceInstanceJSONBytes, err := gceInstance.MarshalJSON()
var gceInstanceJSON string
if err != nil {
gceInstanceJSON = fmt.Sprintf("Got error from MarshalJSON; %v", err)
} else {
gceInstanceJSON = string(gceInstanceJSONBytes)
}
infos = append(infos, instance)
klogx.V(4).UpTo(i.errorLoggingQuota).Infof("Got GCE instance which is being created and has lastAttemptErrors; gceInstance=%v; errorInfo=%#v", gceInstanceJSON, errorInfo)
}
klogx.V(4).Over(errorLoggingQuota).Infof("Got %v other GCE instances being created with lastAttemptErrors", -errorLoggingQuota.Left())
if len(errorCodeCounts) > 0 {
klog.Warningf("Spotted following instance creation error codes: %#v", errorCodeCounts)

return instance
}

func (i *instanceListBuilder) build() []cloudprovider.Instance {
klogx.V(4).Over(i.errorLoggingQuota).Infof("Got %v other GCE instances being created with lastAttemptErrors", -i.errorLoggingQuota.Left())
if len(i.errorCodeCounts) > 0 {
klog.Warningf("Spotted following instance creation error codes: %#v", i.errorCodeCounts)
}
return infos, nil
return i.infos
}

// GetErrorInfo maps the error code, error message and instance status to CA instance error info
Expand Down
159 changes: 156 additions & 3 deletions cluster-autoscaler/cloudprovider/gce/autoscaling_gce_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,10 @@ func TestFetchMigInstancesInstanceUrlHandling(t *testing.T) {
g := newTestAutoscalingGceClient(t, "project1", server.URL, "")

testCases := []struct {
name string
lmiResponse gce_api.InstanceGroupManagersListManagedInstancesResponse
wantInstances []cloudprovider.Instance
name string
lmiResponse gce_api.InstanceGroupManagersListManagedInstancesResponse
lmiPageResponses map[string]gce_api.InstanceGroupManagersListManagedInstancesResponse
wantInstances []cloudprovider.Instance
}{
{
name: "all instances good",
Expand Down Expand Up @@ -269,6 +270,153 @@ func TestFetchMigInstancesInstanceUrlHandling(t *testing.T) {
},
},
},
{
name: "paginated response",
lmiResponse: gce_api.InstanceGroupManagersListManagedInstancesResponse{
ManagedInstances: []*gce_api.ManagedInstance{
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 2),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 42),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
},
NextPageToken: "foo",
},
lmiPageResponses: map[string]gce_api.InstanceGroupManagersListManagedInstancesResponse{
"foo": {
ManagedInstances: []*gce_api.ManagedInstance{
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 123),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 456),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
},
},
},
wantInstances: []cloudprovider.Instance{
{
Id: "gce://myprojid/myzone/myinst_2",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
{
Id: "gce://myprojid/myzone/myinst_42",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
{
Id: "gce://myprojid/myzone/myinst_123",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
{
Id: "gce://myprojid/myzone/myinst_456",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
},
},
{
name: "paginated response, more pages",
lmiResponse: gce_api.InstanceGroupManagersListManagedInstancesResponse{
ManagedInstances: []*gce_api.ManagedInstance{
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 2),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 42),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
},
NextPageToken: "foo",
},
lmiPageResponses: map[string]gce_api.InstanceGroupManagersListManagedInstancesResponse{
"foo": {
ManagedInstances: []*gce_api.ManagedInstance{
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 123),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 456),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
},
NextPageToken: "bar",
},
"bar": {
ManagedInstances: []*gce_api.ManagedInstance{
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 789),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
{
Instance: fmt.Sprintf(goodInstanceUrlTempl, 666),
CurrentAction: "CREATING",
LastAttempt: &gce_api.ManagedInstanceLastAttempt{
Errors: &gce_api.ManagedInstanceLastAttemptErrors{},
},
},
},
},
},
wantInstances: []cloudprovider.Instance{
{
Id: "gce://myprojid/myzone/myinst_2",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
{
Id: "gce://myprojid/myzone/myinst_42",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
{
Id: "gce://myprojid/myzone/myinst_123",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
{
Id: "gce://myprojid/myzone/myinst_456",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
{
Id: "gce://myprojid/myzone/myinst_789",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
{
Id: "gce://myprojid/myzone/myinst_666",
Status: &cloudprovider.InstanceStatus{State: cloudprovider.InstanceCreating},
},
},
},
{
name: "instances with bad url",
lmiResponse: gce_api.InstanceGroupManagersListManagedInstancesResponse{
Expand Down Expand Up @@ -329,6 +477,11 @@ func TestFetchMigInstancesInstanceUrlHandling(t *testing.T) {
b, err := json.Marshal(tc.lmiResponse)
assert.NoError(t, err)
server.On("handle", "/projects/zones/instanceGroupManagers/listManagedInstances").Return(string(b)).Times(1)
for token, response := range tc.lmiPageResponses {
b, err := json.Marshal(response)
assert.NoError(t, err)
server.On("handle", "/projects/zones/instanceGroupManagers/listManagedInstances", token).Return(string(b)).Times(1)
}
gotInstances, err := g.FetchMigInstances(GceRef{})
assert.NoError(t, err)
if diff := cmp.Diff(tc.wantInstances, gotInstances, cmpopts.EquateErrors()); diff != "" {
Expand Down
8 changes: 7 additions & 1 deletion cluster-autoscaler/utils/test/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,14 @@ func NewHttpServerMock(fields ...HttpServerMockField) *HttpServerMock {

func (l *HttpServerMock) handle(req *http.Request, w http.ResponseWriter, serverMock *HttpServerMock) string {
url := req.URL.Path
query := req.URL.Query()
var response string
args := l.Called(url)
var args mock.Arguments
if query.Has("pageToken") {
args = l.Called(url, query.Get("pageToken"))
} else {
args = l.Called(url)
}
for i, field := range l.fields {
switch field {
case MockFieldResponse:
Expand Down
Loading